男女做那个的免费视频网站/软件开发外包
该案例中要实现的是根据数据文件中的学生性别,将男生和女生分别放入不同的文件夹中,所以需要我们自定义一个OutputFormat类去实现我们的逻辑。为什么不用分区的思想直接对性别不同的数据进行区分呢,这是因为分区只是把不同的数据放入不同的分区文件中,而我们要实现的是放入不同的文件夹中。
- 数据准备
第三列的值表示性别,1表示男生,0表示女生。
将数据源文件上传至HDFS:
[root@hadoop01 test_data]# hdfs dfs -mkdir /test_custom_ouputformat_input
[root@hadoop01 test_data]# hdfs dfs -put test_custom_outputformat.txt /test_custom_ouputformat_input
新建Project:
- 引入pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>wyh.test</groupId><artifactId>test_custom_output_format</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><packaging>jar</packaging><dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.7.5</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.7.5</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>2.7.5</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>2.7.5</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>RELEASE</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><minimizeJar>true</minimizeJar></configuration></execution></executions></plugin></plugins></build></project>
- 自定义OutputFormat
package wyh.test.outputformat;import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*** 这里的两个泛型是K3,V3的泛型。* 由于该案例中不需要reduce阶段进行特殊处理,所以K3,V3延用K2,V2的类型即可。*/
public class CustomOutputFormat extends FileOutputFormat<Text, NullWritable> {@Overridepublic RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {//获取文件系统FileSystem fileSystem = FileSystem.get(taskAttemptContext.getConfiguration());//通过文件系统创建两个要输出文件的输出流(因为我们最后是分为男生和女生两个文件夹,所以要创建两个输出流),这里指定的路径就是将来生成的结果要存放的文件路径FSDataOutputStream girlOutputStream = fileSystem.create(new Path("hdfs://192.168.126.132:8020/custom_outputformat_girl/girl_student.txt"));FSDataOutputStream boyOutputStream = fileSystem.create(new Path("hdfs://192.168.126.132:8020/custom_outputformat_boy/boy_student.txt"));//将两个输出流通过构造方法传递给CustomRecordWriterCustomRecordWriter customRecordWriter = new CustomRecordWriter(girlOutputStream, boyOutputStream);return customRecordWriter;}
}
- 自定义RecordWriter
package wyh.test.outputformat;import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;import java.io.IOException;/*** 该处的泛型与CustomOutputFormat中的泛型保持一致*/
public class CustomRecordWriter extends RecordWriter<Text, NullWritable> {//定义两个输出流,再通过下面的构造方法,将CustomOutputFormat中创建的两个输出流对象传递给CustomRecordWriterprivate FSDataOutputStream girlOutputStream;private FSDataOutputStream boyOutputStream;public CustomRecordWriter() {}public CustomRecordWriter(FSDataOutputStream girlOutputStream, FSDataOutputStream boyOutputStream) {this.girlOutputStream = girlOutputStream;this.boyOutputStream = boyOutputStream;}//定义如何实现数据写入@Overridepublic void write(Text text, NullWritable nullWritable) throws IOException, InterruptedException {//从原始的行文本数据中截取出性别字段String[] split = text.toString().split(",");String s = split[2];//根据性别字段将数据写入不同的文件夹中if(Integer.parseInt(s) == 0){//性别为女生girlOutputStream.write(text.toString().getBytes());//将文本数据先转为字符串,再转为字节数组//写完一行数据之后,需要加上换行符girlOutputStream.write("\r\n".getBytes());}else{//性别为男生boyOutputStream.write(text.toString().getBytes());boyOutputStream.write("\r\n".getBytes());}}@Overridepublic void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {//关闭流girlOutputStream.close();boyOutputStream.close();}
}
- 自定义Mapper
package wyh.test.outputformat;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** K1:行偏移量* V1:行文本数据* K2:行文本数据* V2:置空*/
public class CustomOutputFormatMapper extends Mapper<LongWritable, Text, Text, NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {context.write(value, NullWritable.get());}
}
- 自定义主类
package wyh.test.outputformat;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;public class CustomOutputFormatJobMain extends Configured implements Tool {@Overridepublic int run(String[] strings) throws Exception {Job job = Job.getInstance(super.getConf(), "test_custom_outputformat_job");job.setJarByClass(CustomOutputFormatJobMain.class);job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job, new Path("hdfs://192.168.126.132:8020/test_custom_ouputformat_input"));job.setMapperClass(CustomOutputFormatMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);job.setOutputFormatClass(CustomOutputFormat.class);//虽然在CustomOutputFormat中我们已经指定了output的路径,但那里指定的路径是存放输出数据的文件路径。在这里我们仍然要指定在数据输出过程中产生的校验文件以及辅助性文件等要存放的路径。CustomOutputFormat.setOutputPath(job, new Path("hdfs://192.168.126.132:8020/test_custom_ouputformat_output"));boolean b = job.waitForCompletion(true);return b?0:1;}public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();int run = ToolRunner.run(configuration, new CustomOutputFormatJobMain(), args);System.exit(run);}
}
- 打包jar并上传至服务器
- 运行jar
[root@hadoop01 test_jar]# hadoop jar test_custom_output_format-1.0-SNAPSHOT.jar wyh.test.outputformat.CustomOutputFormatJobMain
- 查看运行结果
两个目录下分别产生各自的文件:
输出过程中产生的其他文件:
查看输出结果文件:
与我们的预期结果相同,将源文件中的男生和女生分别存放在了不同的文件夹下。
以上就是MapReduce中通过自定义OuputFormat实现对输出结果存放在不同文件夹下的过程。