网站建设步骤和流程/网络广告的类型有哪些
第一部分:flume介绍
1.Flume是Cloudera提供的一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统
2.Flume可以采集文件,socket数据包、文件夹等各种形式源数据,又可以将采集到的数据输出到HDFS、hbase、hive、kafka等众多外部存储系统中
3.一般的采集需求,通过对flume的简单配置即可实现
4.Flume针对特殊场景也具备良好的自定义扩展能力,因此,flume可以适用于大部分的日常数据采集场景
5.动态收集日志文件,数据流,一份给dhfs,做离线分析,一份给kafka,做实时处理
6.组件: Flume分布式系统中最核心的角色是agent,flume采集系统就是由一个个agent所连接起来形成
7.每一个agent相当于一个数据传递员 ,内部有三个组件:
a) Source:采集源,将数据转换为数据流,丢给channel
b)Channel:angent内部的数据传输通道,类似队列,临时存储
c)Sink:从channel读取数据,发生目的地
第二部分:安装部署
1.上传解压
2.然后进入flume的目录,修改conf下的flume-env.sh,在里面配置JAVA_HOME
3.将core-site.xml和hdfs-site.xml放到flume配置文件下(为了让flume找到hadoop)
4.添加HDFS的Jar包lib目录下
第三部分:简单使用
1.读取hive日志信息到控制台
a1.sources = s1
a1.channels = c1
a1.sinks = k1# defined sources
#读取单个文件
a1.sources.s1.type = exec
#动态查看日志文件
a1.sources.s1.command = tail -F /opt/cdh5.7.6/hive-1.1.0-cdh5.7.6/logs/hive.log
#解析
a1.sources.s1.shell=/bin/sh -c# defined channel
#存到内存中
a1.channels.c1.type = memory
#容量
a1.channels.c1.capacity=1000
#瓶口大小
a1.channels.c1.transactionCapacity=100# defined sink
#输出到控制台
a1.sinks.k1.type = logger#bond
#建立组件之间的关系
a1.sinks.k1.channel = c1
a1.sources.s1.channels = c1
2.读取hive日志信息到hdfs
a1.sources = s1
a1.channels = c1
a1.sinks = k1# defined sources
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /opt/cdh5.7.6/hive-1.1.0-cdh5.7.6/logs/hive.log
a1.sources.s1.shell=/bin/sh -c# defined channel
#存到磁盘中
a1.channels.c1.type = file
#设置检查点,记录相关传输的信息,比如取了多少event
a1.channels.c1.checkpointDir=/opt/datas/flume/channel/checkpoint
a1.channels.c1.dataDirs=/opt/datas/flume/channel/data# defined sink
#输出到hdfs
a1.sinks.k1.type = hdfs
#hdfs中的目录
a1.sinks.k1.hdfs.path=/flume/hdfs2/
#设置文件类型和写的格式
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=Text#bond
a1.sinks.k1.channel = c1
a1.sources.s1.channels = c1
3.修改存储在hdfs中的文件大小
a1.sources = s1
a1.channels = c1
a1.sinks = k1# defined sources
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /opt/cdh5.7.6/hive-1.1.0-cdh5.7.6/logs/hive.log
a1.sources.s1.shell=/bin/sh -c# defined channel
a1.channels.c1.type = file
#设置检查点,记录相关传输的信息,比如取了多少event
a1.channels.c1.checkpointDir=/opt/datas/flume/channel/checkpoint
a1.channels.c1.dataDirs=/opt/datas/flume/channel/data# defined sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=/flume/hdfs3/
#设置文件类型和写的格式
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=Text#设置HDFS文件大小
a1.sinks.k1.hdfs.rollInterval=0
a1.sinks.k1.hdfs.rollSize=10240
a1.sinks.k1.hdfs.rollCount=0#bond
a1.sinks.k1.channel = c1
a1.sources.s1.channels = c1
4.数据分区储存
a1.sources = s1
a1.channels = c1
a1.sinks = k1# defined sources
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /opt/cdh5.7.6/hive-1.1.0-cdh5.7.6/logs/hive.log
a1.sources.s1.shell=/bin/sh -c# defined channel
a1.channels.c1.type = file
#设置检查点,记录相关传输的信息,比如取了多少event
a1.channels.c1.checkpointDir=/opt/datas/flume/channel/checkpoint
a1.channels.c1.dataDirs=/opt/datas/flume/channel/data# defined sink
a1.sinks.k1.type = hdfs
#分区信息
a1.sinks.k1.hdfs.path=/flume/part/yearst=%Y/monthstr=%m/daystr=%d/minutestr=%M
a1.sinks.k1.hdfs.useLocalTimeStamp=true
#设置文件类型和写的格式
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=Text#设置HDFS文件大小
a1.sinks.k1.hdfs.rollInterval=0
a1.sinks.k1.hdfs.rollSize=10240
a1.sinks.k1.hdfs.rollCount=0#bond
a1.sinks.k1.channel = c1
a1.sources.s1.channels = c1
5.动态监听一个目录
a1.sources = s1
a1.channels = c1
a1.sinks = k1# defined sources
#动态监听文件夹
a1.sources.s1.type = spooldir
#动态监听的文件夹目录
a1.sources.s1.spoolDir = /opt/datas/flume/spool# defined channel
a1.channels.c1.type = memory
#容量
a1.channels.c1.capacity=1000
#瓶口大小
a1.channels.c1.transactionCapacity=100# defined sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=/flume/spooling
a1.sinks.k1.hdfs.useLocalTimeStamp=true
#设置文件类型和写的格式
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=Text#设置HDFS文件大小
a1.sinks.k1.hdfs.rollInterval=0
a1.sinks.k1.hdfs.rollSize=10240
a1.sinks.k1.hdfs.rollCount=0#bond
a1.sinks.k1.channel = c1
a1.sources.s1.channels = c1
6.5的基础上添加过滤
a1.sources = s1
a1.channels = c1
a1.sinks = k1# defined sources
a1.sources.s1.type = spooldir
a1.sources.s1.spoolDir = /opt/datas/flume/spool
#不编程后缀是.tmp的文件
a1.sources.s1.ignorePattern=([^ ]*\.tmp)# defined channel
a1.channels.c1.type = memory
#容量
a1.channels.c1.capacity=1000
#瓶口大小
a1.channels.c1.transactionCapacity=100# defined sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=/flume/spooling
a1.sinks.k1.hdfs.useLocalTimeStamp=true
#设置文件类型和写的格式
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=Text#设置HDFS文件大小
a1.sinks.k1.hdfs.rollInterval=0
a1.sinks.k1.hdfs.rollSize=10240
a1.sinks.k1.hdfs.rollCount=0#bond
a1.sinks.k1.channel = c1
a1.sources.s1.channels = c1
7.多日志文件实时采集
a1.sources = s1
a1.channels = c1
a1.sinks = k1# defined sources
#如果是自己编译的类,这里写类的全路径
a1.sources.s1.type = TAILDIR
a1.sources.s1.positionFile =/opt/cdh5.7.6/flume-1.6.0-cdh5.7.6-bin/position/taildir_position.json
a1.sources.s1.filegroups = f1 f2#文件
a1.sources.s1.filegroups.f1 = /opt/datas/flume/taildir/hd.txt
a1.sources.s1.headers.f1.age = 17#文件夹
a1.sources.s1.filegroups.f2 = /opt/datas/flume/taildir/huadian/.*
a1.sources.s1.headers.f2.age = 18
a1.sources.s1.headers.f2.type = aa# defined channel
a1.channels.c1.type = memory
#容量
a1.channels.c1.capacity=1000
#瓶口大小
a1.channels.c1.transactionCapacity=100# defined sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=/flume/taildir
a1.sinks.k1.hdfs.useLocalTimeStamp=true
#设置文件类型和写的格式
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=Text#设置HDFS文件大小
a1.sinks.k1.hdfs.rollInterval=0
a1.sinks.k1.hdfs.rollSize=10240
a1.sinks.k1.hdfs.rollCount=0#bond
a1.sinks.k1.channel = c1
a1.sources.s1.channels = c1