2019独角兽企业重金招聘Python工程师标准>>>
需求需要将Hadoop的数据插入到MongoDB。
数据类型是将字符串转换成一个类似Map的对象,插入到数据库中。以替换原有的单线程接口。
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.StringUtils;import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.Mongo;public class MongoOutputFormat implements OutputFormat<Text, Text> {@Overridepublic void checkOutputSpecs(FileSystem arg0, JobConf arg1) throws IOException {// TODO Auto-generated method stubSystem.out.println("OutputFormat CheckOutpuSpecs() function is not supported~!");}//实现OutputFormat接口的时候,返回一个RecordWriter对象。//这里可以实例化数据库连接JDBC对象,和RecordWriter共一个生命周期。//数据库连接串的相关对象,通过JobConf传入。@Overridepublic RecordWriter<Text, Text> getRecordWriter(FileSystem arg0, JobConf conf, String arg2, Progressable arg3) throws IOException {
// Configuration conf = jobconf.g.getConfiguration() ;String ip = conf.get("mongoIp");String port = conf.get("mongoPort");Mongo mongo = new Mongo(ip,Integer.parseInt(port));String username = conf.get("muser");String password = conf.get("mpwd");String dbname = conf.get("mongoDb");String collectionName = conf.get("mongoCollection");try {return new MongoDBRecordWriter(mongo,dbname,collectionName,username,password);}catch (Exception ex) {throw new IOException(ex);}}/*** A RecordWriter that writes the reduce output to a SQL table or MongoDB Collection!*/public static class MongoDBRecordWriter implements RecordWriter<Text, Text> {private DBCollection coll;private Mongo mongo;public MongoDBRecordWriter() throws SQLException {}//使用这个构造函数public MongoDBRecordWriter(DBCollection coll) {this.coll = coll;}public MongoDBRecordWriter(Mongo mongo, String dbname, String collectionName, String username, String password) {this.mongo = mongo;DB d = this.mongo.getDB(dbname);d.authenticate(username, password.toCharArray());this.coll = d.getCollection(collectionName);}public DBCollection getCollection() {return coll;}// public PreparedStatement getStatement() {// return statement;// }@Override/** Close函数,用于关闭OutputFormat中用到的资源对象 */public void close(Reporter arg0) throws IOException {try {this.mongo.close();}catch (Exception e) {try {System.out.println("Close() is not supported here...");}catch (Exception ex) {ex.printStackTrace();}throw new IOException(e);} finally {try {System.out.println("Close() is not supported here...");}catch (Exception ex) {ex.printStackTrace();}}}//RecordWriter中输出的方法,必须实现的。@Overridepublic void write(Text key, Text value) throws IOException {try {String line = value.toString();String[] rs = line.split("\001");Map m = new HashMap();m.put("created_by", rs[7]);m.put("created_date", rs[8]);m.put("updated_by", rs[9]);m.put("updated_date", rs[10]);DBObject dbObj = new BasicDBObject();dbObj.putAll(m);coll.save(dbObj);}catch (Exception e) {// LoggingUtils.logAll(LOG, "Exception encountered", e);.System.err.print(e);e.printStackTrace();}}}
}