开一个监听持续间断的获取某个日志的续写的信息,并传入sink中,在flume默认的组建中并没用这样的功能,只能自己根据业务就行开发,下面flume获得source信息

概要:首先 我们在获得持续输出的日志并创建一个文件中记录我们获取这个日志的信息变化的位置,根据这个位置文件来完成,我们需要的断点续传功能.

所谓日志搬家我们必须要知道这个日志 是哪里来要搬到哪里去 这里是source我只做在哪里来,

首先上三段代码

package org.jueshizhanhun.flume.source.filemonitor;import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.AbstractSource;
import org.apache.log4j.Logger;import com.google.common.base.Preconditions;
import com.ule.flume.util.Constants;public class FileMonitorSource extends AbstractSource implements Configurable, EventDrivenSource {private static final Logger log =  Logger.getLogger("sourcelog");private String path = "";//设置本机ipprivate long rollInterval = 10;//启动监听获取信息时间间隔private boolean record = true;//是否开启获得IP和日志文件名 开关private ChannelProcessor channelProcessor;private RandomAccessFile monitorFile = null;private File coreFile = null;private long lastMod = 0L;private String monitorFilePath = null;private String monitorFileName = null;private String positionFile = null;private FileChannel monitorFileChannel = null;private ByteBuffer buffer = ByteBuffer.allocate(1 << 20);// 1MBprivate long positionValue = 0L;private ScheduledExecutorService executor;private FileMonitorThread runner;private PositionLog positionLog = null;private Charset charset = null;private CharsetDecoder decoder = null;private CharBuffer charBuffer = null;private long counter = 0L;private Map<String, String> headers = new HashMap<String, String>();// event// headerprivate long lastFileSize = 0L;private long nowFileSize = 0L;private SourceCounter sourceCounter;@Overridepublic synchronized void start() {channelProcessor = getChannelProcessor();executor = Executors.newSingleThreadScheduledExecutor();runner = new FileMonitorThread();executor.scheduleWithFixedDelay(runner, 0, rollInterval, TimeUnit.SECONDS);sourceCounter.start();super.start();log.info("FileMonitorSource source started");}@Overridepublic synchronized void stop() {positionLog.setPosition(positionValue);log.debug("Set the positionValue {} when stopped : " +  positionValue);if (this.monitorFileChannel != null) {try {this.monitorFileChannel.close();} catch (IOException e) {log.error(this.monitorFilePath + " filechannel close Exception", e);}}if (this.monitorFile != null) {try {this.monitorFile.close();} catch (IOException e) {log.error(this.monitorFilePath + " file close Exception", e);}}executor.shutdown();try {executor.awaitTermination(10L, TimeUnit.SECONDS);} catch (InterruptedException ex) {log.error("Interrupted while awaiting termination", ex);}executor.shutdownNow();sourceCounter.stop();super.stop();log.info("FileMonitorSource source stopped");}@Overridepublic void configure(Context context) {charset = Charset.forName("UTF-8");decoder = charset.newDecoder();//需要监测的文件this.monitorFilePath = context.getString("file");this.rollInterval = context.getInteger("rollInterval",10);this.path = context.getString("path", catchLocalIP().toString());this.record = context.getBoolean("record",true);//需要监测的文件的所在路径this.positionFile =  monitorFilePath + ".position";Preconditions.checkArgument(monitorFilePath != null, "The file can not be null !");try {for (String name : monitorFilePath.split("/")) {monitorFileName = name;}} catch (Exception e) {log.error("获得监控文件名称失败!",e);}try {//获得这个文件coreFile = new File(monitorFilePath);//获得文件的最后修改时间lastMod = coreFile.lastModified();} catch (Exception e) {log.error("Initialize the File/FileChannel Error", e);return;}try {positionLog = new PositionLog(positionFile);//获得当前位置文件中的数值positionValue = positionLog.initPosition();} catch (Exception e) {log.error("Initialize the positionValue in File positionLog", e);return;}lastFileSize = positionValue;if (sourceCounter == null) {sourceCounter = new SourceCounter(getName());}}class FileMonitorThread implements Runnable {/*** a thread to check whether the file is modified*/@Overridepublic void run() {{log.info("FileMonitorThread running ...");// coreFile = new File(monitorFilePath);long nowModified = coreFile.lastModified();// the file has been changedif (lastMod != nowModified) {log.debug(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>File modified ...");// you must record the last modified and now file size as// soon// as possiblelastMod = nowModified;nowFileSize = coreFile.length();int readDataBytesLen = 0;try {log.debug("The LastlastFileSize:" +lastFileSize+" nowFileSize :"+nowFileSize);// it indicated the file is rolled by log4jif (nowFileSize <= lastFileSize) {log.debug("The file size is changed to be lower,it indicated that the file is rolled by log4j.");positionValue = 0L;}lastFileSize = nowFileSize;monitorFile = new RandomAccessFile(coreFile, "r");// you must be instantiate the file channel Object when// the// file// changedmonitorFileChannel = monitorFile.getChannel();monitorFileChannel.position(positionValue);// read file content into bufferint bytesRead = monitorFileChannel.read(buffer);// this while for it can not read all the data when the// file// modifiedwhile (bytesRead != -1) {log.debug("How many bytes read in this loop ? -->  {} = "+ bytesRead);String contents = buffer2String(buffer);// every read,the last byte is \n,this can make sure// the// integrity of read data// include the \nint lastLineBreak = contents.lastIndexOf("\n") + 1;String readData = contents.substring(0, lastLineBreak);byte[] readDataBytes = readData.getBytes();readDataBytesLen = readDataBytes.length;positionValue += readDataBytesLen;// change the position value for next readString infoString ="";int infoLength = 0;if (record) {infoString = Constants.SPLIT+path+Constants.SPLIT+monitorFileName;infoLength = infoString.getBytes().length; log.info("infoString:" + infoString);log.debug("data: "+(new String(readDataBytes) + infoString)+" end");}monitorFileChannel.position(positionValue+infoLength);
//                            headers.put(Constants.KEY_DATA_SIZE, String.valueOf(readDataBytesLen));
//                            headers.put(Constants.KEY_DATA_LINE, String.valueOf(readData.split("\n")));sourceCounter.incrementEventReceivedCount();
//                            channelProcessor.processEvent(EventBuilder.withBody(readDataBytes,headers));if (record) {channelProcessor.processEvent(EventBuilder.withBody((new String(readDataBytes) + infoString).getBytes()));}else {channelProcessor.processEvent(EventBuilder.withBody(readDataBytes));}sourceCounter.addToEventAcceptedCount(1);// channelProcessor.processEventBatch(getEventByReadData(readData));log.debug("Change the next read position {} = "+ positionValue);buffer.clear();bytesRead = monitorFileChannel.read(buffer);}} catch (Exception e) {log.error("Read data into Channel Error", e);log.debug("Save the last positionValue {} into Disk File = positionValue- readDataBytesLen :"+ (positionValue- readDataBytesLen));positionLog.setPosition(positionValue - readDataBytesLen);}counter++;if (counter % Constants.POSITION_SAVE_COUNTER == 0) {log.debug(Constants.POSITION_SAVE_COUNTER+ " times file modified checked,save the position Value {} into Disk file = "+"positionValue:"+positionValue);positionLog.setPosition(positionValue);}}}}}public List<Event> getEventByReadData(String readData) {String str[] = readData.split("\n");int len = str.length;List<Event> events = new ArrayList<Event>();for (int i = 0; i < len; i++) {Event event = EventBuilder.withBody((str[i]).getBytes());events.add(event);}return events;}public String buffer2String(ByteBuffer buffer) {buffer.flip();try {charBuffer = decoder.decode(buffer);return charBuffer.toString();} catch (Exception ex) {ex.printStackTrace();return "";}}//取得本机IP地址  public static String catchLocalIP()  {  InetAddress LocalIP =null;try   {  LocalIP = InetAddress.getLocalHost(); } catch (UnknownHostException e)   {  }  return LocalIP.getHostAddress();  }  
}
package org.jueshizhanhun.flume.source.filemonitor;import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class PositionLog {private static final Logger log = LoggerFactory.getLogger(PositionLog.class);private FileChannel positionFileChannel;private String postionFile;private RandomAccessFile raf = null;private String filePath = null;public FileChannel getPositionFileChannel() {return positionFileChannel;}public void setPositionFileChannel(FileChannel positionFileChannel) {this.positionFileChannel = positionFileChannel;}public String getPostionFilePath() {return postionFile;}public void setPostionFilePath(String postionFilePath) {this.postionFile = postionFilePath;}public PositionLog() {}public PositionLog(String postionFilePath) {this.postionFile = postionFilePath;}public long initPosition() throws Exception {filePath = postionFile;File file = new File(filePath);if (!file.exists()) {try {file.createNewFile();log.debug("Create the postionFile file");} catch (IOException e) {log.error("Create the postionFile error", e);throw e;}}try {//使用找个文件的权限   rw  读写权限raf = new RandomAccessFile(filePath, "rw");//获得文件的通道this.positionFileChannel =raf.getChannel();long fileSize = positionFileChannel.size();if(fileSize==0) {log.debug("The file content is null,init the value 0");ByteBuffer buffer = ByteBuffer.allocate(1);buffer.put("0".getBytes());buffer.flip();positionFileChannel.write(buffer);raf.close();return 0L;}else {return getPosition();}} catch (Exception e) {log.error("Init the position file error",e);throw e;} }public long getPosition() {try {raf = new RandomAccessFile(filePath, "rw");this.positionFileChannel =raf.getChannel();long fileSize = positionFileChannel.size();ByteBuffer buffer = ByteBuffer.allocate((int) fileSize);int bytesRead = positionFileChannel.read(buffer);StringBuffer sb = new StringBuffer();while(bytesRead!=-1) {buffer.flip();while(buffer.hasRemaining()) {sb.append((char)buffer.get());}buffer.clear();bytesRead = positionFileChannel.read(buffer);}raf.close();return Long.parseLong(sb.toString());} catch (Exception e) {log.error("Get Position Value Error",e);return -1;}}public long setPosition(Long position) {try {raf = new RandomAccessFile(filePath, "rw");this.positionFileChannel =raf.getChannel();String positionStr = String.valueOf(position);int bufferSize = positionStr.length();ByteBuffer buffer = ByteBuffer.allocate(bufferSize);buffer.clear();buffer.put(positionStr.getBytes());buffer.flip();while(buffer.hasRemaining()) {this.positionFileChannel.write(buffer);}raf.close();log.debug("Set Position Value Successfully {}",position);return position;} catch (Exception e) {log.error("Set Position Value Error",e);return -1;}}}
package org.jueshizhanhun.flume.util;public class Constants {public static String SPLIT = ":jbgsn:";public static long POSITION_INIT_VALUE = 0L;public static String KEY_DATA_SIZE = "readDataSize";public static String KEY_DATA_LINE = "readDataLine";public static int POSITION_SAVE_COUNTER = 10;}

获取文件的核心代码每间隔一段时间扫描一次日志文件将变动的信息获得,并添加标记信息,在传输到sink中

#这里要写自己的类
home.sources.s1.type = org.jueshizhanhun.flume.source.filemonitor.FileMonitorSource
home.sources.s1.channels = c1
#自定义 监控文件日志的全路径
home.sources.s1.file = /tmp/flume/hostnamelog.log
#自定义 搬家后需要创建的文件路径
home.sources.s1.path = pub/message/172.24.138.40
#home.sources.s1.rollInterval = 30      //间隔多少秒 获取一次日志信息 默认10秒
#home.sources.s1.record = true       //获取日志信息后 是否添加ip与日志名称信息 默认开启