Flume采集到HDFS
一、需求:
采集指定文件的内容到HDFS
技术选型:exec – memory – hdfs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1. type = exec a1.sources.r1. command = tail -F /home/hadoop/data/data .log # Describe the sink a1.sinks.k1. type = hdfs a1.sinks.k1.hdfs.path = hdfs: //192 .168.0.129:9000 /user/hadoop/flume a1.sinks.k1.hdfs.batchSize = 10 #10行产生新文件 a1.sinks.k1.hdfs.fileType = DataStream #压缩格式 a1.sinks.k1.hdfs.writeFormat = Text #格式类型 # Use a channel which buffers events in memory a1.channels.c1. type = memory # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
启动:
1
2
3
4
5
6
7
|
. /flume-ng agent \ --name a1 \ --conf $FLUME_HOME /conf \ --conf- file /home/hadoop/script/flume/exec-memory-hdfs .conf \ -Dflume.root.logger=INFO,console \ -Dflume.monitoring. type =http \ -Dflume.monitoring.port=34343 |
添加测试数据:
1
2
3
4
5
6
|
[hadoop@hadoop001 data]$ touch data.log [hadoop@hadoop001 data]$ echo test >> data.log [hadoop@hadoop001 data]$ echo test >> data.log [hadoop@hadoop001 data]$ echo test >> data.log [hadoop@hadoop001 data]$ echo test >> data.log [hadoop@hadoop001 data]$ echo test >> data.log |
检查HDFS:
1
2
3
4
5
6
7
|
[hadoop@hadoop001 flume]$ hdfs dfs -text hdfs: //192 .168.0.129:9000 /user/hadoop/flume/ * 18 /08/09 20:59:02 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin -java classes where applicable test test test test test |
二、需求:
采集指定文件夹的内容到(HDFS或者控制台)
==》文件夹下文件不能修改切不能重名
==》处理完当前文件添加 .COMPLETED标识
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /home/hadoop/data/ a1.sources.r1.fileHeader = true # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
三、需求:(生产使用,记录偏移量)
采集指定文件夹和文件内容到( 控制台 或者 HDFS )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1. type = TAILDIR a1.sources.r1.channels = c1 #记录偏移量,重启续传 a1.sources.r1.positionFile = /home/hadoop/script/flume/taildir_position .json a1.sources.r1.filegroups = f1 f2 #监控指定log文件 a1.sources.r1.filegroups.f1 = /home/hadoop/data/example .log a1.sources.r1.headers.f1.headerKey1 = value1 #监控文加下的所有log*文件夹和内容 a1.sources.r1.filegroups.f2 = /home/hadoop/data/test/ .*log.* a1.sources.r1.headers.f2.headerKey1 = value2 a1.sources.r1.headers.f2.headerKey2 = value2-2 # 控制台输出 a1.sinks.k1. type = logger # Use a channel which buffers events in memory a1.channels.c1. type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
启动:
1
2
3
4
5
|
./flume-ng agent \ --name a1 \ --conf $FLUME_HOME/conf \ --conf-file /home/hadoop/script/flume/taildir-memory-logger.conf \ -Dflume.root.logger=INFO,console |
记录偏移量:
[hadoop@hadoop001 flume]$ cat taildir_position.json
[{“inode”:679982,” pos”:14 ,”file”:”/home/hadoop/data/example.log”}
{“inode”:679984,”pos”:0,”file”:”/home/hadoop/data/test/log1.log”}]
转载请注明:SuperIT » Flume采集到HDFS