Flume 日志收集、使用Flume收集日志到HDFS
1.1 源、通道与接收器
Flume代理的架构如下图:输入叫做源,输出叫作接收器。通过提供类源与接收器之间的胶水。它们都运行在叫做代理的守护进程中。源将事件写到一个或者多个通道中。
通道作为事件从源到接收器传递的保留区。
接收器只能从一个通道接收事件。
代理可能会有多个源、通道与接收器。
1.2 Flume事件
Flume传输的基本的数据负载叫做事件。事件由0个或者多个头与体组成。
头是一些键值对,可以看作与HTTP头完成相同的功能——传递与体不同的额外信息。
体是个字节数组,包含类实际的负载。例如如果输入文件由日志文件组成,那么该数组就非常类似于包含了单行文本的UTF-8编码的字符串。
Flume可能会自动添加头(比如,源添加了数据来自的主机名或者创建了事件的时间戳),不过体基本上不会受影响,除非在中途使用拦截器对其进行编辑。
1.3 拦截器、通道选择器与选择处理器
拦截器值的是数据流中的一个点,可以在这里检查和修改Flume事件。可以在【源创建事件】后/【接收器发送事件】前链接0个或者多个拦截器。类似于SpringAOP中的MethodInterceptor和Java Servlet中的ServletFilter。
通道选择器负责将数据从一个源转向一个或者多个通道上。Flume自带两个通道选择器,分别是复制通道选择器和多路通道选择器。复制通道选择器(默认的)只是将时间事件的副本放到每个通道上,前提是你已经配置好了多个通道。多路通道选择器会根据某些头信息将事件写到不同的通道中。
输入处理器为输入器创建故障恢复路径,或者是跨越一个通道的多个输入器创建负载均衡时间。
1.4 多层数据收集(多数据流与代理)
拦截器、通道选择器等的组合可以实现复杂的多层数据收集。
不同的数据源可以发送到同一个通道上,同一个数据源可以通过通道选择器转向一个或者多个通道上,由此构成了复杂的有向无环网状结构。
第二章 Flume快速起步
2.1 下载Flume
2.2 Flume配置文件概览
每个代理的配置都以如下3个参数开始:
agent.sources=
agent.chanels=
agent.sinks
分别对应源、通道和接收器
2.3 Hello,World
在flume的conf目录(~/apache-flume-1.5.0-bin/conf$)下新建一个hello.conf文件,内容如下:
#源:s1
agent.sources=s1
#通道:c1
agent.channels=c1
#接收器:k1
agent.sinks=k1
#源s1的类型为netcat,它会打开一个Socket监听事件(每个事件一行文本)。它需要#两个参数,分别是一个绑定IP与一个端口号。
agent.sources.s1.type=netcat
#源s1的绑定IP
agent.sources.s1.bind=0.0.0.0
#源s1的监听端口
agent.sources.s1.port=12345
#源s1的去向通道为通道c1
agent.sources.s1.channels=c1
#通道c1的类型为内存通道
agent.channels.c1.type=memory
#接收器k1的类型为日志
agent.sinks.k1.type=logger
#接收器k1的来向通道为c1
agent.sinks.k1.channel=c1
然后回到flume根目录(~/apache-flume-1.5.0-bin$),执行命令
./bin/flume-ng agent -n agent -c conf -f conf/flume.conf -Dflume.root.logger=INFO,console -Duser.timezone=UTC -Dflume.monitoring.type=http -Dflume.monitoring.port=44444
其中:
-Dflume.root.logger=INFO,console 属性覆盖了conf/log4j.propertites中的rootlogger,使用console追加器。如果没有覆盖,输出会被写到log/flume.log文件中。
-Duser.timezone=UTC 属性设置了时区(后面会提到)
-Dflume.monitoring.type=http 属性指定了监控类型(后面会提到)
-Dflume.monitoring.port=44444 属性指定了监控端口(后面会提到)
会看到如下输出:
2017-12-02 06:04:10,625 (lifecycleSupervisor-1-0) [INFO – org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:61)] Configuration provider starting
2017-12-02 06:04:10,634 (conf-file-poller-0) [INFO – org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:133)] Reloading configuration file:conf/hello.conf
2017-12-02 06:04:10,645 (conf-file-poller-0) [INFO – org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:930)] Added sinks: k1 Agent: agent
2017-12-02 06:04:10,645 (conf-file-poller-0) [INFO – org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k1
2017-12-02 06:04:10,645 (conf-file-poller-0) [INFO – org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k1
2017-12-02 06:04:10,656 (conf-file-poller-0) [INFO – org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:140)] Post-validation flume configuration contains configuration for agents: [agent]
2017-12-02 06:04:10,657 (conf-file-poller-0) [INFO – org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:150)] Creating channels
2017-12-02 06:04:10,666 (conf-file-poller-0) [INFO – org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:40)] Creating instance of channel c1 type memory
2017-12-02 06:04:10,670 (conf-file-poller-0) [INFO – org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:205)] Created channel c1
2017-12-02 06:04:10,671 (conf-file-poller-0) [INFO – org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:39)] Creating instance of source s1, type netcat
2017-12-02 06:04:10,681 (conf-file-poller-0) [INFO – org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:40)] Creating instance of sink: k1, type: logger
2017-12-02 06:04:10,683 (conf-file-poller-0) [INFO – org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:119)] Channel c1 connected to [s1, k1]
2017-12-02 06:04:10,689 (conf-file-poller-0) [INFO – org.apache.flume.node.Application.startAllComponents(Application.java:138)] Starting new configuration:{ sourceRunners:{s1=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:s1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@20c5df70 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
2017-12-02 06:04:10,697 (conf-file-poller-0) [INFO – org.apache.flume.node.Application.startAllComponents(Application.java:145)] Starting Channel c1
2017-12-02 06:04:10,754 (lifecycleSupervisor-1-0) [INFO – org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
2017-12-02 06:04:10,756 (lifecycleSupervisor-1-0) [INFO – org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started
2017-12-02 06:04:10,760 (conf-file-poller-0) [INFO – org.apache.flume.node.Application.startAllComponents(Application.java:173)] Starting Sink k1
2017-12-02 06:04:10,761 (conf-file-poller-0) [INFO – org.apache.flume.node.Application.startAllComponents(Application.java:184)] Starting Source s1
2017-12-02 06:04:10,762 (lifecycleSupervisor-1-2) [INFO – org.apache.flume.source.NetcatSource.start(NetcatSource.java:150)] Source starting
2017-12-02 06:04:10,775 (lifecycleSupervisor-1-2) [INFO – org.apache.flume.source.NetcatSource.start(NetcatSource.java:164)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:12345]
2017-12-02 06:04:10,784 (conf-file-poller-0) [INFO – org.mortbay.log.Slf4jLog.info(Slf4jLog.java:67)] Logging to org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog
2017-12-02 06:04:10,831 (conf-file-poller-0) [INFO – org.mortbay.log.Slf4jLog.info(Slf4jLog.java:67)] jetty-6.1.26
2017-12-02 06:04:10,853 (conf-file-poller-0) [INFO – org.mortbay.log.Slf4jLog.info(Slf4jLog.java:67)] Started SelectChannelConnector@0.0.0.0:44444
然后另外打开一个终端,输入命令nc localhost 12345进入消息发送状态,然后输入一条消息,并按回车,会看到出现OK。此时,刚才开启flume的窗口出现:
2017-12-02 06:08:08,780 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO – org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 hello world }
截图如下:
第三章 通道
3.1 内存通道(非持久化通道)
事件存储在内存中的通道。
好处:接收速度快(内存的速度要比磁盘快几个数量级)
坏处:代理失败(硬件问题、断电、JVM崩溃、Flume重启等)会导致代理失败。
设置:
必须设置agent.channels.c1.type=memory
可选
capacity:int 默认值100 默认通道容量
transactionCapacity单个事务中可以写入的最大事件数量
byteCapacityBufferPercentage和byteCapacity,使用字节而非事件数量来作为调整内存通道大小的方式
keep-alive 通道已满的情况下,线程将数据写入到通道的等待时间,超时将放弃此数据
3.2 文件通道(持久化通道)
事件存储到本地系统中的通道。
优点:数据流不会出现缺口、支持加密
缺点:较慢
设置:
必须设置agent.channels.c1.type=file
可选
checkpointDir
dataDirs
用来设置Flume代理持有数据的位置
capacity
keep-alive
transactionCapacity
同上
checkpointInterval 两个检查点之间间隔的毫秒数
write-timeout 写入的超时时间
maxFileSize 最大文件大小
minimumRequiredSpace不想用作日志的空间数量
第四章 接收器与接收处理器
4.1 HDFS接收器
agent.sinks.k1.type=hdfs
4.1.1 路径与文件名
agent.sinks.k1.hdfs.path=/logs/chenjie/web/%Y%m%d 指定路径(Flume支持各种基于时间的转义序列)
此外还支持一种转移序列机制:在路径中使用Flume头值的功能,例如有个键为logType的头,那么就能使用:
agent.sinks.k1.hdfs.path=/logs/chenjie/web/%{logType}/%Y%m%d
agent.sinks.k1.hdfs.filePrefix=access 指定前缀
agent.sinks.k1.hdfs.fileSuffix=.log 指定后缀
4.1.2 文件转储
默认情况下,Flume会每隔30s、10个事件或者是1024字节来转储写入的文件。
如果希望每分钟转储一次:
agent.sinks.k1.hdfs.rollInterval=60
agent.sinks.k1.hdfs.rollCount=0
agent.sinks.k1.hdfs.rollSize=0
4.2 压缩编解码器
agent.sinks.k1.hdfs.codeC=gzip
4.3 事件序列化器
4.3.1 文本输出
4.3.2 带有头信息的文本
4.3.3 Apache Avro
4.3.4 文件类型
4.3.5 超时设置与线程池
4.4 接收器组
为了消除单点失败。
agent.sinkgroups=sg1
agent.sinkgroups.sg1.sinks=k1,k2
4.4.1 负载均衡
如果想要均衡地对k1和k2的流量进行负载均衡
processor.type load_balance循环选择/round_robin/random
4.4.2 故障恢复
在使用某个接收器时,如果其不可用,那么你希望能够使用其它的接收器。
agent.sinkgroups.sg1.sinks=k1,k2,k3
agent.sinkgroups.sg1.processor.type=failover
agent.sinkgroups.sg1.processor.priority.k1=10
agent.sinkgroups.sg1.processor.priority.k2=210
agent.sinkgroups.sg1.processor.priority.k3=30
数字任意,越小越优先
第五章 源与通道选择器
5.1 exec源
在flume的conf文件夹下新建一个exec.conf文件,内容如下:
#源:s1
agent.sources=s1
#通道:c1
agent.channels=c1
#接收器:k1
agent.sinks=k1
#源s1的类型为exec源
agent.sources.s1.type=exec
#源s1的exec源 命令
agent.sources.s1.command=tail -F /home/chenjie/app.log
#源s1的去向通道为通道c1
agent.sources.s1.channels=c1
#通道c1的类型为内存通道
agent.channels.c1.type=memory
#接收器k1的类型为日志
agent.sinks.k1.type=logger
#接收器k1的来向通道为c1
agent.sinks.k1.channel=c1
到flume根目录下启动:
./bin/flume-ng agent -n agent -c conf -f conf/exec.conf -Dflume.root.logger=INFO,console -Duser.timezone=UTC -Dflume.monitoring.type=http -Dflume.monitoring.port=44444
打开/home/chenjie/app.log文件,输入文本,看到flume窗口感知到变化。
2017-12-02 07:07:11,959 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO – org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 69 20 6C 6F 76 65 20 79 6F 75 i love you }
截图如下
5.2 假脱机目录源
5.3 syslog源
用作操作系统级的机制来捕获和移动系统日志
5.3.1 syslog UDP源
5.3.2 syslog TCP源
5.3.3 多端口 syslog UDP源
5.4 通道选择器
第六章 拦截器、ETL与路由
6.1 拦截器
6.1.1 Timestamp
在头部添加时间戳
6.1.2 Host
6.1.3 Static
agent.sources.s1.interceptors.i2.type=static
agent.sources.s1.interceptors.i2.key=author
agent.sources.s1.interceptors.i2.value=CHENJIE
用于向每个处理的Flume事件插入任意单个的键值头
6.1.4 正则表达式过滤
根据内容体的内容来过滤事件
6.1.5 正则表达式抽取
将事件体的内容抽取出来并放到Flume头中以便可以通过通道选择器执行路由。
6.1.6 自定义拦截器
6.2 数据流分层
6.2.1 Avro源/接收器
6.2.2 命令行Avro
6.2.3 Log4J追加器
6.2.4 负载均衡Log4J追加器
6.3 路由
第7章 监控Flume
7.1 监控代理进程
7.1.1 Monit
7.1.2 Nagios
7.2 监控性能度量情况
7.2.1 Ganglia
7.2.2 内部HTTP服务器
-Dflume.monitoring.type=http 属性指定了监控类型(前面提到)
-Dflume.monitoring.port=44444 属性指定了监控端口(前面提到)
访问http://localhost:44444/metrics可以监控Flume
{“SINK.k1”:{“ConnectionCreatedCount”:”3″,”ConnectionClosedCount”:”2″,”Type”:”SINK”,”BatchCompleteCount”:”0″,”BatchEmptyCount”:”21″,”EventDrainAttemptCount”:”19″,”StartTime”:”1512191883957″,”EventDrainSuccessCount”:”19″,”BatchUnderflowCount”:”3″,”StopTime”:”0″,”ConnectionFailedCount”:”0″},”CHANNEL.c1″:{“ChannelCapacity”:”1000000″,”ChannelFillPercentage”:”0.0″,”Type”:”CHANNEL”,”ChannelSize”:”0″,”EventTakeSuccessCount”:”19″,”EventTakeAttemptCount”:”43″,”StartTime”:”1512191883951″,”EventPutAttemptCount”:”19″,”EventPutSuccessCount”:”19″,”StopTime”:”0″},”SOURCE.s1″:{“EventReceivedCount”:”19″,”AppendBatchAcceptedCount”:”0″,”Type”:”SOURCE”,”EventAcceptedCount”:”19″,”AppendReceivedCount”:”0″,”StartTime”:”1512191883960″,”AppendAcceptedCount”:”0″,”OpenConnectionCount”:”0″,”AppendBatchReceivedCount”:”0″,”StopTime”:”0″}}
7.2.3 自定义监控钩子
最后,使用flume进行tomcat日志监控,并将结果写入HDFS的完整示例如下。
1、新建tomcat_log.conf
-
agent.sources=s1
-
agent.channels=c1
-
agent.sinks=k1
-
agent.sources.s1.type=exec
-
agent.sources.s1.command=tail -F /media/chenjie/0009418200012FF3/ubuntu/apache-tomcat-7.0.82/logs/localhost_access_log.2017-12-02.txt
-
agent.sources.s1.channels=c1
-
-
-
agent.channels.c1.type=file
-
agent.sinks.k1.type=hdfs
-
agent.sinks.k1.hdfs.path=/logs/chenjie/web/%Y%m%d
-
agent.sinks.k1.hdfs.rollInterval=60
-
agent.sinks.k1.hdfs.rollCount=0
-
agent.sinks.k1.hdfs.rollSize=0
-
agent.sinks.k1.hdfs.filePrefix=access
-
agent.sinks.k1.hdfs.fileSuffix=.log
-
agent.sinks.k1.hdfs.writeType=text
-
agent.sinks.k1.channel=c1
-
-
agent.sources.s1.interceptors=i1 i2
-
agent.sources.s1.interceptors.i1.type=timestamp
-
agent.sources.s1.interceptors.i1.preserveExisting=true
-
agent.sources.s1.interceptors.i2.type=static
-
agent.sources.s1.interceptors.i2.key=author
-
agent.sources.s1.interceptors.i2.value=CHENJIE
2、启动Flume
./bin/flume-ng agent -n agent -c conf -f conf/tomcat_log.conf -Dflume.root.logger=INFO,console -Duser.timezone=UTC -Dflume.monitoring.type=http -Dflume.monitoring.port=44444
3、启动tomcat4、在tomcat中访问部署在其中的web程序(tomcat会在日志文件中写入此操作)
-
127.0.0.1 – – [02/Dec/2017:16:44:49 +0800] “GET / HTTP/1.1” 200 11217
-
0:0:0:0:0:0:0:1 – – [02/Dec/2017:16:44:50 +0800] “GET /CJHadoopOnline/ HTTP/1.1” 200 281
-
0:0:0:0:0:0:0:1 – – [02/Dec/2017:16:55:03 +0800] “GET /CJHadoopOnline HTTP/1.1” 302 –
-
0:0:0:0:0:0:0:1 – – [02/Dec/2017:16:55:03 +0800] “GET /CJHadoopOnline/ HTTP/1.1” 200 281
-
0:0:0:0:0:0:0:1 – – [02/Dec/2017:16:55:17 +0800] “GET /CJHadoopOnline/login.jsp HTTP/1.1” 200 450
5、观察HDFS中的文件
6、使用程序读取HDFS中的文件
-
import org.apache.spark.{SparkConf, SparkContext}
-
object FlumeTest {
-
def main(args: Array[String]): Unit = {
-
val sparkConf = new SparkConf().setAppName(“CJDouban”).setMaster(“local”)
-
val sc = new SparkContext(sparkConf)
-
val log = sc.textFile(“hdfs://pc1:9000/logs/chenjie/web/20171202”)
-
log.foreach(println)
-
}
-
}
转载请注明:SuperIT » Flume 日志收集、使用Flume收集日志到HDFS