简介
TopN算法是一个经典的算法,由于每个map都只是实现了本地的TopN算法,而假设map有M个,在归约的阶段只有M x N个,这个结果是可以接受的并不会造成性能瓶颈。
这个TopN算法在map阶段将使用TreeMap来实现排序,以到达可伸缩的目的。
当然算法有两种,一种是唯一键,就是说key的类型是唯一的(是指在比较的实际阶段),比如本篇就是唯一键的TopN实现;
另一种就是非唯一键,比如key值可能会有A、B、C三种,然后分别对他们求TopN,当然,我们假设数据是混在一起的,非唯一键方面的内容,将会写到另一篇博客上。
进入正题
一、输入、期望输出、思路。
由于是唯一键实际上与排序有关的只是value部分,我们大可以简单点,输入数据为一列数字好了。
TopN.txt内容如下:
20 78 56 45 23 15 12 35 79 68 98 63 111 222 333 444 555
但我们设置N=10时,期望输出为:
555
444
333
222
111
98
79
78
68
63
思路嘛,在简介部分已经说的很清楚了,没必要再赘述了,直接上代码:
2.用Java编写MapReduce程序实现TopN:
为了能够真正意义上的称为TopN,这里在context里设置了N的值。所以在输入参数的时候也许相应的增加!
package TopN;import java.io.IOException; import java.util.StringTokenizer; import java.util.TreeMap;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class TopN {public static class TopTenMapper extendsMapper<Object, Text, NullWritable, IntWritable> {private TreeMap<Integer, String> repToRecordMap = new TreeMap<Integer, String>();public void map(Object key, Text value, Context context) {int N = 10; //默认为Top10N = Integer.parseInt(context.getConfiguration().get("N"));StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {repToRecordMap.put(Integer.parseInt(itr.nextToken()), " ");if (repToRecordMap.size() > N) {repToRecordMap.remove(repToRecordMap.firstKey());}}}protected void cleanup(Context context) {for (Integer i : repToRecordMap.keySet()) {try {context.write(NullWritable.get(), new IntWritable(i));} catch (Exception e) {e.printStackTrace();}}}}public static class TopTenReducer extendsReducer<NullWritable, IntWritable, NullWritable, IntWritable> {private TreeMap<Integer, String> repToRecordMap = new TreeMap<Integer, String>();public void reduce(NullWritable key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {int N = 10; //默认为Top10N = Integer.parseInt(context.getConfiguration().get("N"));for (IntWritable value : values) {repToRecordMap.put(value.get(), " ");if (repToRecordMap.size() > N) {repToRecordMap.remove(repToRecordMap.firstKey());}}for (Integer i : repToRecordMap.descendingMap().keySet()) {context.write(NullWritable.get(), new IntWritable(i));}}}public static void main(String[] args) throws Exception {if (args.length != 3) {throw new IllegalArgumentException("!!!!!!!!!!!!!! Usage!!!!!!!!!!!!!!: hadoop jar <jar-name> "+ "TopN.TopN "+ "<the value of N>"+ "<input-path> "+ "<output-path>");}Configuration conf = new Configuration();conf.set("N", args[0]);Job job = Job.getInstance(conf, "TopN");job.setJobName("TopN");Path inputPath = new Path(args[1]);Path outputPath = new Path(args[2]);FileInputFormat.setInputPaths(job, inputPath);FileOutputFormat.setOutputPath(job, outputPath);job.setJarByClass(TopN.class);job.setMapperClass(TopTenMapper.class);job.setReducerClass(TopTenReducer.class);job.setNumReduceTasks(1);job.setMapOutputKeyClass(NullWritable.class);// map阶段的输出的keyjob.setMapOutputValueClass(IntWritable.class);// map阶段的输出的value job.setOutputKeyClass(NullWritable.class);// reduce阶段的输出的keyjob.setOutputValueClass(IntWritable.class);// reduce阶段的输出的value System.exit(job.waitForCompletion(true) ? 0 : 1);}}
3.用Scala写Spark程序实现TopN:
依然简洁的代码:
package spark import org.apache.spark.{ SparkContext, SparkConf } import org.apache.spark.rdd.RDD.rddToOrderedRDDFunctions import org.apache.spark.rdd.RDD.rddToPairRDDFunctions object TopN {def main(args: Array[String]) {var N = 10 //这里指定N的值val conf = new SparkConf().setAppName(" TopN ").setMaster("local")var sc = new SparkContext(conf)sc.setLogLevel("Warn")val file = sc.textFile("e:\\TopN.txt")val rdd = file.flatMap(_.split(" ")).map(x => (x.toInt, null)).sortByKey(false).map(_._1).take(N).foreach { println }} }