10个零网站建设/百度怎么进入官方网站
计数器:计数器是用来记录job的执行进度和状态的。它的作用可以理解为日志。我们通常可以在程序的某个位置插入计数器,用来记录数据或者进度的变化情况,它比日志更便利进行分析。
1. 内置计数器
Hadoop其实内置了很多计数器,那么这些计数器在哪看呢?
我们先来看下最简单的wordcount程序。
HDFS上的源文件:
[hadoop@master logfile]$ hadoop fs -cat /MR_Counter/diary
Today is 2016-3-22
I study mapreduce counter
I realized that mapreduce counter is simple
WordCount.java:
package com.oner.mr.mrcounter;import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class WordCount {public static void main(String[] args) throws IOException,InterruptedException, URISyntaxException, ClassNotFoundException {Path inPath = new Path("/MR_Counter/");// 输入目录Path outPath = new Path("/MR_Counter/out");// 输出目录Configuration conf = new Configuration();// conf.set("fsdefaultFS", "hdfs://master:9000");FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"), conf,"hadoop");if (fs.exists(outPath)) {// 如果输出目录已存在,则删除fs.delete(outPath, true);}Job job = Job.getInstance(conf);job.setJarByClass(WordCount.class);job.setMapperClass(MyMapper.class);job.setReducerClass(MyReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(LongWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);FileInputFormat.setInputPaths(job, inPath);FileOutputFormat.setOutputPath(job, outPath);job.waitForCompletion(true);}public static class MyMapper extendsMapper<LongWritable, Text, Text, LongWritable> {private static Text k = new Text();private static LongWritable v = new LongWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String line = value.toString();String[] words = line.split(" ");for (String word : words) {k.set(word);context.write(k, v);}}}public static class MyReducer extendsReducer<Text, LongWritable, Text, LongWritable> {private static LongWritable v = new LongWritable();@Overrideprotected void reduce(Text key, Iterable<LongWritable> values,Reducer<Text, LongWritable, Text, LongWritable>.Context context)throws IOException, InterruptedException {int sum = 0;for (LongWritable value : values) {sum += value.get();}v.set(sum);context.write(key, v);}}}
打成jar包后执行: hadoop jar wc.jar com.oner.mr.mrcounter.WordCount
发现有如下信息(注释部分是自己加的):
16/03/22 14:25:30 INFO mapreduce.Job: Counters: 49 // 表示本次job共49个计数器File System Counters // 文件系统计数器FILE: Number of bytes read=235FILE: Number of bytes written=230421FILE: Number of read operations=0FILE: Number of large read operations=0FILE: Number of write operations=0HDFS: Number of bytes read=189HDFS: Number of bytes written=86HDFS: Number of read operations=6HDFS: Number of large read operations=0HDFS: Number of write operations=2Job Counters // 作业计数器Launched map tasks=1 // 启动的map数为1Launched reduce tasks=1 // 启动的reduce数为1Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=12118Total time spent by all reduces in occupied slots (ms)=11691Total time spent by all map tasks (ms)=12118Total time spent by all reduce tasks (ms)=11691Total vcore-seconds taken by all map tasks=12118Total vcore-seconds taken by all reduce tasks=11691Total megabyte-seconds taken by all map tasks=12408832Total megabyte-seconds taken by all reduce tasks=11971584Map-Reduce Framework //MapReduce框架计数器Map input records=3Map output records=14Map output bytes=201Map output materialized bytes=235Input split bytes=100Combine input records=0Combine output records=0Reduce input groups=10Reduce shuffle bytes=235Reduce input records=14Reduce output records=10Spilled Records=28Shuffled Maps =1Failed Shuffles=0Merged Map outputs=1GC time elapsed (ms)=331CPU time spent (ms)=2820Physical memory (bytes) snapshot=306024448Virtual memory (bytes) snapshot=1690583040Total committed heap usage (bytes)=136122368Shuffle Errors // Shuffle错误计数器BAD_ID=0CONNECTION=0IO_ERROR=0WRONG_LENGTH=0WRONG_MAP=0WRONG_REDUCE=0File Input Format Counters // 文件输入格式计数器Bytes Read=89 // Map从HDFS上读取的字节数,共89个字节File Output Format Counters // 文件输出格式计数器Bytes Written=86 //Reduce输出到HDFS上的字节数,共86个字节
上面的信息就是内置计数器的一些信息,包括:
文件系统计数器(File System Counters)
作业计数器(Job Counters)
MapReduce框架计数器(Map-Reduce Framework)
Shuffle 错误计数器(Shuffle Errors)
文件输入格式计数器(File Output Format Counters)
文件输出格式计数器(File Input Format Counters)
2. 自定义计数器
Hadoop也支持自定义计数器,在Hadoop2.x中可以使用Context的getCounter()方法(其实是接口TaskAttemptContext的方法,Context继承了该接口)得到自定义计数器。
public Counter getCounter(Enum<?> counterName):Get the Counter for the given counterName
public Counter getCounter(String groupName, String counterName):Get the Counter for the given groupName and counterName
由此可见,可以通过枚举或者字符串来得到计数器。
计数器常见的方法有几下几个:
String getName():Get the name of the counter
String getDisplayName():Get the display name of the counter
long getValue():Get the current value
void setValue(long value):Set this counter by the given value
void increment(long incr):Increment this counter by the given value
假设现在要在控制台输出源文件中的一些敏感词的个数,这里设定“mapreduce”为敏感词,该如何做呢?
package com.oner.mr.mrcounter;import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class WordCount {public static void main(String[] args) throws IOException,InterruptedException, URISyntaxException, ClassNotFoundException {Path inPath = new Path("/MR_Counter/");// 输入目录Path outPath = new Path("/MR_Counter/out");// 输出目录Configuration conf = new Configuration();// conf.set("fsdefaultFS", "hdfs://master:9000");FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"), conf,"hadoop");if (fs.exists(outPath)) {// 如果输出目录已存在,则删除fs.delete(outPath, true);}Job job = Job.getInstance(conf);job.setJarByClass(WordCount.class);job.setMapperClass(MyMapper.class);job.setReducerClass(MyReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(LongWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);FileInputFormat.setInputPaths(job, inPath);FileOutputFormat.setOutputPath(job, outPath);job.waitForCompletion(true);}public static class MyMapper extendsMapper<LongWritable, Text, Text, LongWritable> {private static Text k = new Text();private static LongWritable v = new LongWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {<span style="color:#FF0000;">Counter sensitiveCounter = context.getCounter("Sensitive Words:","mapreduce");</span>// 创建一个组是Sensitive Words,名是mapreduce的计数器String line = value.toString();String[] words = line.split(" ");for (String word : words) {<span style="color:#FF0000;">if (word.equalsIgnoreCase("mapreduce")) {//如果出现了mapreduce,则计数器值加1sensitiveCounter.increment(1L);}</span>k.set(word);context.write(k, v);}}}public static class MyReducer extendsReducer<Text, LongWritable, Text, LongWritable> {private static LongWritable v = new LongWritable();@Overrideprotected void reduce(Text key, Iterable<LongWritable> values,Reducer<Text, LongWritable, Text, LongWritable>.Context context)throws IOException, InterruptedException {int sum = 0;for (LongWritable value : values) {sum += value.get();}v.set(sum);context.write(key, v);}}}
打成jar包后重新执行,发现控制台中确实多了一组计数器Sensitive Words:,其中有一个名叫mapreduce的计数器,值为2。