给个营销型网站/北京seo优化外包
MapReduce功能实现系列:
MapReduce功能实现一—Hbase和Hdfs之间数据相互转换
MapReduce功能实现二—排序
MapReduce功能实现三—Top N
MapReduce功能实现四—小综合(从hbase中读取数据统计并在hdfs中降序输出Top 3)
MapReduce功能实现五—去重(Distinct)、计数(Count)
MapReduce功能实现六—最大值(Max)、求和(Sum)、平均值(Avg)
MapReduce功能实现七—小综合(多个job串行处理计算平均值)
MapReduce功能实现八—分区(Partition)
MapReduce功能实现九—Pv、Uv
MapReduce功能实现十—倒排索引(Inverted Index)
MapReduce功能实现十一—join
一、背景
在Hadoop的MapReduce过程中,每个map task处理完数据后,如果存在自定义Combiner类,会先进行一次本地的reduce操作,然后把数据发送到Partitioner,由Partitioner来决定每条记录应该送往哪个reducer节点,默认使用的是HashPartitioner,其核心代码如下:
public class HashPartitioner<K, V> extends Partitioner<K, V> {/** Use {@link Object#hashCode()} to partition. */public int getPartition(K key, V value,int numReduceTasks) {return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;}}
上面的getPartition函数的作用:
- 获取key的哈希值
- 使用key的哈希值对reduce任务数求模
- 这样做的目的是可以把(key,value)对均匀的分发到各个对应编号的reduce task节点上,达到reduce task节点的负载均衡。
上面的代码只是实现了(key,value)键值对的均匀分布,但是无法实现如下需求:
- 假设输入的数据文件有4个,里面包含各个部门各个季度的销售额
- 使用mapreduce程序进行统计各个部门全年销售额,同时每个部门对应一个输出文件
由于输出的文件是区分数据类型的(部门类型),所以这个时候就需要我们自定义partition,分别把各个部门的数据分发到各自的reduce task上。
在面试的时候我能够以WorldCount为例将MapReduce的过程说清楚,但是面试官可能会问如何把想要的数据放到一个reduce中呢?一开我还有点懵,后来面试结束后觉得这是在问自定义分区啊。(在hive中可以通过distribute by
实现)
二、自定义分区
自定义分区很简单,我们只需要继承抽象类Partitioner,重写getPartition方法即可,另外还要给任务设置分区:job.setPartitionerClass(),就可以了。
注意:自定义分区的数量需要和reduce task的数量保持一致。设置分区数:job.setNumReduceTasks(3);
1.模拟数据:
[hadoop@h71 q1]$ vi aa.txt
aa 1 2
bb 2 22
cc 11
dd 1
ee 99 99999
ff 12 23123
注意:这里的分隔符是/t(Tab键)而不是空格
[hadoop@h71 q1]$ hadoop fs -put aa.txt /input
2.java代码:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser; public class MyPartitioner { public static class MyPartitionerMap extends Mapper<LongWritable, Text, Text, Text> { protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws java.io.IOException, InterruptedException { String arr_value[] = value.toString().split("\t"); if (arr_value.length > 3) { context.write(new Text("long"), value); } else if (arr_value.length < 3) { context.write(new Text("short"), value); } else { context.write(new Text("right"), value); } } } /** * partitioner的输入就是map的输出 */ public static class MyPartitionerPar extends Partitioner<Text, Text> { @Override public int getPartition(Text key, Text value, int numPartitions) { int result = 0; /*********************************************************************/ /***key.toString().equals("long") must use toString()!!!! ***/ /***开始的时候我没有用 ,导致都在一个区里,结果也在一个reduce输出文件中。 ***/ /********************************************************************/ if (key.toString().equals("long")) { result = 0 % numPartitions; } else if (key.toString().equals("short")) { result = 1 % numPartitions; } else if (key.toString().equals("right")) { result = 2 % numPartitions; } return result; } } public static class MyPartitionerReduce extends Reducer<Text, Text, Text, Text> { protected void reduce(Text key, java.lang.Iterable<Text> value, Context context) throws java.io.IOException, InterruptedException { for (Text val : value) { context.write(key, val); //context.write(key, val); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: MyPartitioner <in> <out>"); System.exit(2); }conf.set("mapred.jar","mp1.jar");Job job = new Job(conf, "MyPartitioner"); job.setNumReduceTasks(3); job.setJarByClass(MyPartitioner.class); job.setMapperClass(MyPartitionerMap.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setPartitionerClass(MyPartitionerPar.class); job.setReducerClass(MyPartitionerReduce.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }
}
3.执行:
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac MyPartitioner.java
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar MyPartitioner*class
[hadoop@h71 q1]$ hadoop jar xx.jar MyPartitioner /input/aa.txt /output
4.查看数据:
[hadoop@h71 q1]$ hadoop fs -lsr /output
rw-r--r-- 2 hadoop supergroup 0 2017-03-18 22:55 /output/_SUCCESS
-rw-r--r-- 2 hadoop supergroup 36 2017-03-18 22:55 /output/part-r-00000
-rw-r--r-- 2 hadoop supergroup 23 2017-03-18 22:55 /output/part-r-00001
-rw-r--r-- 2 hadoop supergroup 27 2017-03-18 22:55 /output/part-r-00002[hadoop@h71 q1]$ hadoop fs -cat /output/part-r-00000
long ff 12 23 123
long ee 99 99 999
[hadoop@h71 q1]$ hadoop fs -cat /output/part-r-00001
short dd 1
short cc 11
[hadoop@h71 q1]$ hadoop fs -cat /output/part-r-00002
right bb 2 22
right aa 1 2