多钱网网站/seo月薪
MapReduce之序列化对象作为key来进行排序
- 一、需求说明
- 二、测试数据
- 三、编程思路
- 四、实现步骤
- 四、打包上传到集群中运行(参考如下步骤)
一、需求说明
- 要求:按照员工的部门号排序
select * from emp order by deptno;
按照员工的部门号、薪水排序
select * from emp order by deptno asc,sal desc;
二、测试数据
- 参考测试数据
三、编程思路
- 思路:
1、定义一个java类,实现WritableComparable接口
2、重写方法compare,实现排序规则
3、只需写Mapper阶段,验证是否按照排序规则进行排序
4、编写Job类,设置mapper及输入输出
四、实现步骤
-
在Idea或eclipse中创建maven项目
-
在pom.xml中添加hadoop依赖
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.7.3</version> </dependency> <dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>2.7.3</version> </dependency> <dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-common</artifactId><version>2.7.3</version> </dependency> <dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>2.7.3</version> </dependency>
-
添加log4j.properties文件在资源目录下即resources,文件内容如下:
### 配置根 ### log4j.rootLogger = debug,console,fileAppender ## 配置输出到控制台 ### log4j.appender.console = org.apache.log4j.ConsoleAppender log4j.appender.console.Target = System.out log4j.appender.console.layout = org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern = %d{ABSOLUTE} %5p %c:%L - %m%n ### 配置输出到文件 ### log4j.appender.fileAppender = org.apache.log4j.FileAppender log4j.appender.fileAppender.File = logs/logs.log log4j.appender.fileAppender.Append = false log4j.appender.fileAppender.Threshold = DEBUG,INFO,WARN,ERROR log4j.appender.fileAppender.layout = org.apache.log4j.PatternLayout log4j.appender.fileAppender.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n
-
编写Employee序列化类实现WritableComparable接口,重写compare方法实现自定义排序
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable;import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;public class Employee implements WritableComparable<Employee> {//7369,SMITH,CLERK,7902,1980/12/17,800,,20private IntWritable empNo;private Text empName;private Text empJob;private IntWritable leaderNo;private Text hireDate;private IntWritable empSalary;private Text empBonus;private IntWritable deptNo;public Employee() {this.empNo = new IntWritable();this.empName = new Text("");this.empJob = new Text("");this.leaderNo = new IntWritable();this.hireDate = new Text("");this.empSalary =new IntWritable();this.empBonus = new Text("");this.deptNo = new IntWritable();}public Employee(int empNo, String empName, String empJob, int leaderNo,String hireDate, int empSalary, String empBonus, int deptNo) {this.empNo = new IntWritable(empNo);this.empName = new Text(empName);this.empJob = new Text(empJob);this.leaderNo = new IntWritable(leaderNo);this.hireDate = new Text(hireDate);this.empSalary =new IntWritable(empSalary);this.empBonus = new Text(empBonus);this.deptNo = new IntWritable(deptNo);}@Overridepublic void write(DataOutput out) throws IOException {//序列化this.empNo.write(out);this.empName.write(out);this.empJob.write(out);this.leaderNo.write(out);this.hireDate.write(out);this.empSalary.write(out);this.empBonus.write(out);this.deptNo.write(out);}@Overridepublic void readFields(DataInput in) throws IOException {this.empNo.readFields(in);this.empName.readFields(in);this.empJob.readFields(in);this.leaderNo.readFields(in);this.hireDate.readFields(in);this.empSalary.readFields(in);this.empBonus.readFields(in);this.deptNo.readFields(in);}@Overridepublic String toString() {return "Employee{" +"empNo=" + empNo +", empName=" + empName +", empJob=" + empJob +", leaderNo=" + leaderNo +", hireDate=" + hireDate +", empSalary=" + empSalary +", empBonus=" + empBonus +", deptNo=" + deptNo +'}';}public IntWritable getEmpNo() {return empNo;}public void setEmpNo(IntWritable empNo) {this.empNo = empNo;}public Text getEmpName() {return empName;}public void setEmpName(Text empName) {this.empName = empName;}public Text getEmpJob() {return empJob;}public void setEmpJob(Text empJob) {this.empJob = empJob;}public IntWritable getLeaderNo() {return leaderNo;}public void setLeaderNo(IntWritable leaderNo) {this.leaderNo = leaderNo;}public Text getHireDate() {return hireDate;}public void setHireDate(Text hireDate) {this.hireDate = hireDate;}public IntWritable getEmpSalary() {return empSalary;}public void setEmpSalary(IntWritable empSalary) {this.empSalary = empSalary;}public Text getEmpBonus() {return empBonus;}public void setEmpBonus(Text empBonus) {this.empBonus = empBonus;}public IntWritable getDeptNo() {return deptNo;}public void setDeptNo(IntWritable deptNo) {this.deptNo = deptNo;}/*** 自定义排序规则* 按照部门号升序排* @param o* @return*/// public int compareTo(Employee o) {// if (this.deptNo.get() > o.getDeptNo().get()){// return 1;// }else if(this.deptNo.get() < o.getDeptNo().get()){// return -1;// }else{// return 0;// }// }/*** 自定义排序规则* 按照部门号升序排,员工工资降序排序* @param o* @return*/public int compareTo(Employee o) {if (this.deptNo.get() > o.getDeptNo().get()){return 1;}else if(this.deptNo.get() < o.getDeptNo().get()){return -1;}//说明:部门号是相同的情况,执行下面代码//按照工资降序排if (this.empSalary.get() > o.getEmpSalary().get()){return -1;}else if(this.empSalary.get() < o.getEmpSalary().get()){return 1;}else{return 0;}} }
-
编写自定义Mapper类
参考MapReduce之利用对象序列化方式来求各个部门工资与奖金的总额
-
编写自定义Driver类
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.util.Random;public class EmpJob {public static void main(String[] args) throws Exception {Job job = Job.getInstance(new Configuration());job.setJarByClass(EmpJob.class);job.setMapperClass(EmpMapper.class);job.setMapOutputKeyClass(Employee.class);//key2 : Employeejob.setMapOutputValueClass(IntWritable.class);//value2job.setOutputKeyClass(Employee.class);job.setOutputValueClass(IntWritable.class);//先使用本地文件做测试FileInputFormat.setInputPaths(job,new Path("F:\\NIIT\\hadoopOnWindow\\input\\emp.csv"));FileOutputFormat.setOutputPath(job,new Path(getOutputDir()));boolean result = job.waitForCompletion(true);System.out.println("result:" + result);}//用于产生随机输出目录public static String getOutputDir(){String prefix = "F:\\NIIT\\hadoopOnWindow\\output\\";long time = System.currentTimeMillis();int random = new Random().nextInt();return prefix + "result_" + time + "_" + random;} }
-
本地运行代码,测试下结果正确与否
- 结果截图:
- 结果截图:
四、打包上传到集群中运行(参考如下步骤)
-
上传测试数据到hdfs中的datas目录下
-
本地运行测试结果正确后,需要对Driver类输入输出部分代码进行修改,具体修改如下:
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1])); -
将程序打成jar包,需要在pom.xml中配置打包插件
<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId> maven-assembly-plugin </artifactId><configuration><!-- 使用Maven预配置的描述符--><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><!-- 绑定到package生命周期 --><phase>package</phase><goals><!-- 只运行一次 --><goal>single</goal></goals></execution></executions></plugin></plugins></build>
按照如下图所示进行操作
-
提交集群运行,执行如下命令:
hadoop jar packagedemo-1.0-SNAPSHOT.jar com.niit.mr.WordCountJob /datas/test.txt /output/wc/
至此,所有的步骤已经完成,大家可以试试,祝大家好运~~~~