Flume Taildir Source监听实时追加内容的文件
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/wangpei1949/article/details/80472928
flume中有三种可监控文件或目录的source、分别是Exec Source、Spooling Directory Source和Taildir Source。
Taildir Source是1.7版本的新特性,综合了Spooling Directory Source和Exec Source的优点。
- 1
- 2
使用场景
Exec Source
Exec Source可通过tail -f命令去tail住一个文件,然后实时同步日志到sink。但存在的问题是,当agent进程挂掉重启后,会有重复消费的问题。可以通过增加UUID来解决,或通过改进ExecSource来解决。
- 1
Spooling Directory Source
Spooling Directory Source可监听一个目录,同步目录中的新文件到sink,被同步完的文件可被立即删除或被打上标记。适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步。如果需要实时监听追加内容的文件,可对SpoolDirectorySource进行改进。
- 1
Taildir Source
Taildir Source可实时监控一批文件,并记录每个文件最新消费位置,agent进程重启后不会有重复消费的问题。
使用时建议用1.8.0版本的flume,1.8.0版本中解决了Taildir Source一个可能会丢数据的bug。
- 1
- 2
Taildir Source
agent配置
# source的名字
agent.sources = s1
# channels的名字
agent.channels = c1
# sink的名字
agent.sinks = r1
# 指定source使用的channel
agent.sources.s1.channels = c1
# 指定sink使用的channel
agent.sinks.r1.channel = c1
######## source相关配置 ########
# source类型
agent.sources.s1.type = TAILDIR
# 元数据位置
agent.sources.s1.positionFile = /Users/wangpei/tempData/flume/taildir_position.json
# 监控的目录
agent.sources.s1.filegroups = f1
agent.sources.s1.filegroups.f1=/Users/wangpei/tempData/flume/data/.*log
agent.sources.s1.fileHeader = true
######## channel相关配置 ########
# channel类型
agent.channels.c1.type = file
# 数据存放路径
agent.channels.c1.dataDirs = /Users/wangpei/tempData/flume/filechannle/dataDirs
# 检查点路径
agent.channels.c1.checkpointDir = /Users/wangpei/tempData/flume/filechannle/checkpointDir
# channel中最多缓存多少
agent.channels.c1.capacity = 1000
# channel一次最多吐给sink多少
agent.channels.c1.transactionCapacity = 100
######## sink相关配置 ########
# sink类型
agent.sinks.r1.type = org.apache.flume.sink.kafka.KafkaSink
# brokers地址
agent.sinks.r1.kafka.bootstrap.servers = localhost:9092
# topic
agent.sinks.r1.kafka.topic = testTopic3
# 压缩
agent.sinks.r1.kafka.producer.compression.type = snappy
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
记录每个文件消费位置的元数据
#配置
agent.sources.s1.positionFile = /Users/wangpei/tempData/flume/taildir_position.json
#内容
[
{
"inode":6028358,
"pos":144,
"file":"/Users/wangpei/tempData/flume/data/test.log"
},
{
"inode":6028612,
"pos":20,
"file":"/Users/wangpei/tempData/flume/data/test_a.log"
}
]
可以看到,在taildir_position.json文件中,通过json数组的方式,记录了每个文件最新的消费位置,每消费一次便去更新这个文件。