自定义排序,是基于k2的排序,设现有以下一组数据,分别表示矩形的长和宽,先按照面积的升序进行排序。
9 9
6 6
7 8
1 1
5 4
现在需要重新定义数据类型,MR的key值必须继承WritableComparable接口,因此定义RectangleWritable数据类型如下:
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class RectangleWritable implements WritableComparable {int length,width;public RectangleWritable() {super();// TODO Auto-generated constructor stub}public RectangleWritable(int length, int width) {super();this.length = length;this.width = width;}public int getLength() {return length;}public void setLength(int length) {this.length = length;}public int getWidth() {return width;}public void setWidth(int width) {this.width = width;}@Overridepublic void write(DataOutput out) throws IOException {// TODO Auto-generated method stubout.writeInt(length);out.writeInt(width);}@Overridepublic void readFields(DataInput in) throws IOException {// TODO Auto-generated method stubthis.length=in.readInt();this.width=in.readInt();}@Overridepublic int compareTo(Object arg0) {// TODO Auto-generated method stubRectangleWritable other = (RectangleWritable)arg0;if (this.getLength() * this.getWidth() > other.length * other.width ) {return 1;}if (this.getLength() * this.getWidth() < other.length * other.width ) {return -1;}return 0;}@Overridepublic String toString() {return this.getLength() + "\t" + this.getWidth();}}
其中,compareTo方法自定义排序规则,然后由框架进行排序。
map函数和Reduce函数并无大变化,还是按照WrodCount的思路进行,具体代码如下:
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
public class SelfDefineSort {/*** @param args* @author nwpulisz* @date 2016.4.1*/static final String INPUT_PATH="hdfs://192.168.255.132:9000/input";static final String OUTPUT_PATH="hdfs://192.168.255.132:9000/output";public static void main(String[] args) throws Exception {// TODO Auto-generated method stubConfiguration conf = new Configuration();Path outPut_path= new Path(OUTPUT_PATH);Job job = new Job(conf, "SelfDefineSort");//如果输出路径是存在的,则提前删除输出路径FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), conf);if(fileSystem.exists(outPut_path)){fileSystem.delete(outPut_path,true);}job.setJarByClass(RectangleWritable.class);FileInputFormat.setInputPaths(job, INPUT_PATH);FileOutputFormat.setOutputPath(job, outPut_path);job.setMapperClass(MyMapper.class);job.setReducerClass(MyReducer.class);job.setMapOutputKeyClass(RectangleWritable.class);job.setMapOutputValueClass(NullWritable.class);job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(IntWritable.class);job.waitForCompletion(true);}static class MyMapper extends Mapper<LongWritable, Text, RectangleWritable, NullWritable>{protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {String[] splits = v1.toString().split("\t");RectangleWritable k2 = new RectangleWritable(Integer.parseInt(splits[0]),Integer.parseInt(splits[1]));context.write(k2,NullWritable.get());}}static class MyReducer extends Reducer<RectangleWritable, NullWritable,IntWritable, IntWritable>{protected void reduce(RectangleWritable k2,Iterable<NullWritable> v2s,Context context)throws IOException, InterruptedException {// TODO Auto-generated method stubcontext.write(new IntWritable(k2.getLength()), new IntWritable(k2.getWidth()));}}
}
根据自定义结果,输出结果如下:

来自为知笔记(Wiz)