Flume NG高可用集群搭建
软件版本:
- CentOS 6.7
- hadoop-2.7.4
- apache-flume-1.6.0
一、Flume NG简述
- Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。
- Flume将采集到的文件,socket数据包等各种形式的数据源,输出到HDFS、Hbase、hive、kafka等众多外部存储系统中
- Flume针对特殊场景也具备良好的自定义扩展能力,因此flume适用于大部分的日常数据采集场景
- 一般的采集需求,通过对flume的简单配置即可实现
- Flume分布式系统中最核心的角色时agent,flume采集系统就是由一个个agent所连接起来形成的
- 每个agent相当于一个数据传递员(Source到Channel到Sink之间传递数据的形式时event事件,event事件是一个数据流单元)
Flume的架构图中有3个组件,分别是source、channel、sink
- Source:采集数据源,用于和数据源对接,获取数据
- Sink:下沉,采集数据传送目的地,用于往下一级agent传递数据或者往最终的存储系统传递数据
- Channel:agent的内部传输管道,用于将数据源以事件event的形式,从Source到Sink
运行流程:
从外部系统(Web Server)中收集产生的日志,然后通过Agent的Source组件将数据发送到临时存储Channel组件,最后传递给Sink组件,Sink组件将满足预设值的临时文件,存储到HDFS文件系统中。
二、搭建单点Flume NG
1、解压软件包
tar -zxvf apache-flume-1.6.0-bin.tar.gz -C /export/servers/
2、配置环境变量
-
export FLUME_HOME=/export/servers//flume-1.6.0
-
export PATH=$PATH:$FLUME_HOME/bin
3、修改flume配置文件
$FLUME_HOME/conf/flume-env.sh(flume-env.sh.template修改成flume-env.sh)
export JAVA_HOME=/export/servers/jdk1.8.0_171
4、简单测试—采集指定文件到 HDFS
服务器会在指定目录下,会不断产生新的日志文件,每当有新的日志文件产生,flume自动将新产生的数据源采集到文件存储系统HDFS中
创建配置文件spooldir-hdfs.properties
-
# Name the components on this agent
-
a1.sources = r1 # agent的别名
-
a1.sinks = k1
-
a1.channels = c1
-
-
# Describe/configure the source
-
# 采集数据的类型
-
a1.sources.r1.type = exec
-
# 指定执行命令(flume自动执行该命令)
-
a1.sources.r1.command = tail -F /export/data/callLog.log
-
-
# Describe the sink
-
# 指定采集信息下沉到哪里
-
a1.sinks.k1.type = hdfs
-
# 指定采集到的数据存放在hdfs文件系统的哪个路径下
-
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
-
# 指定保存信息文件名的前缀
-
a1.sinks.k1.hdfs.filePrefix = events-
-
# 是否舍弃已下沉的文件,舍弃指的是:根据指定的时间间隔创建文件夹
-
a1.sinks.k1.hdfs.round = true
-
#指定每10分钟舍弃已下沉的文件
-
a1.sinks.k1.hdfs.roundValue = 10
-
a1.sinks.k1.hdfs.roundUnit = minute
-
# 每30s,将临时文件,持久化到hdfs文件系统中;设置为0,不滚动,滚动即下沉
-
a1.sinks.k1.hdfs.rollInterval = 30
-
# 临时文件达到指定字节就滚动,默认1024;设置为0,不根据临时文件大小来滚动文件
-
a1.sinks.k1.hdfs.rollSize = 1024
-
# 临时文件的事件event个数达到指定值就滚动,默认10;如果设置成0,不根据events数据来滚动文件
-
a1.sinks.k1.hdfs.rollCount = 10
-
# 每个事件写入的行数,默认100
-
a1.sinks.k1.hdfs.batchSize = 100
-
# 是否使用本地的时间戳
-
a1.sinks.k1.hdfs.useLocalTimeStamp = true
-
# 生成的文件类型,默认是 Sequencefile,可用 DataStream(普通文本)
-
a1.sinks.k1.hdfs.fileType = DataStream
-
-
# Use a channel which buffers events in memory
-
# channels数据缓存类型
-
a1.channels.c1.type = memory
-
# 该通道中最大的可以存储的event数量
-
a1.channels.c1.capacity = 1000
-
# 每次最大可以从source中拿到或者送到sink中的event数量
-
a1.channels.c1.transactionCapacity = 100
-
-
# Bind the source and sink to the channel
-
a1.sources.r1.channels = c1
-
a1.sinks.k1.channel = c1
提示:配置文件中的注释,在虚拟机配置时尽可能删除,如果保留可能会报错
启动flume
flume-ng agent -c conf -f conf/spooldir-hdfs.properties -n a1 -Dflume.root.logger=INFO,console
命令行参数解释:
- -c conf 指定flume自身的配置文件所在目录
- -f conf/spooldir-hdfs.properties 指定我们所描述的采集方案
- -n a1 指定我们这个agent的名字
下面截图是复制的节点启动发送数据源后,收到的数据源
复制该节点,产生数据,这里是自己编写的java代码,也可以使用下面shell命令测试
[root@node01 flume-1.6.0]# while true;do echo test >> /export/data/callLog.log;sleep 0.5;done
三、搭建高可用Flume NG
高可用的Flume NG集群,架构图如下所示:
由于电脑性能的限制,将agent减少到1个节点,Collector维持原来的2个节点。主要是为了后面测试负载均衡、以及容错考虑
1、节点分配
图中所示,Agent数据分别流入到Collector1和Collector2,Flume NG本身提供了Failover机制,可以自动切换和恢复。在上图中,有3个产生日志服务器分布在不同的机房,要把所有的日志都收集到一个集群中存储。下面我们开发配置Flume NG集群
2、Flume 的 load-balance
负载均衡是用于解决一台机器(一个进程)无法解决所有请求而产生的一种算法。Load balancing Sink Processor能够实现load balance功能,如上图 Agent1是一个路由节点,负责将Channel暂存的Event均衡到对应的Collector1和Collector2中
2.1、agent端的配置文件:exec-avro.properties
-
#agent1 name
-
agent1.channels = c1
-
agent1.sources = r1
-
agent1.sinks = k1 k2
-
-
#set channel
-
agent1.channels.c1.type = memory
-
agent1.channels.c1.capacity = 1000
-
agent1.channels.c1.transactionCapacity = 100
-
-
agent1.sources.r1.channels = c1
-
agent1.sources.r1.type = exec
-
agent1.sources.r1.command = tail -F /export/data/callLog.log
-
-
# set sink1
-
agent1.sinks.k1.channel = c1
-
agent1.sinks.k1.type = avro
-
agent1.sinks.k1.hostname = node02
-
agent1.sinks.k1.port = 52020
-
-
# set sink2
-
agent1.sinks.k2.channel = c1
-
agent1.sinks.k2.type = avro
-
agent1.sinks.k2.hostname = node03
-
agent1.sinks.k2.port = 52020
-
-
#set sink group
-
agent1.sinkgroups = g1
-
agent1.sinkgroups.g1.sinks = k1 k2
-
-
#set load_balance
-
agent1.sinkgroups.g1.processor.type = load_balance
-
agent1.sinkgroups.g1.processor.backoff = true
-
agent1.sinkgroups.g1.processor.selector = round_robin
-
agent1.sinkgroups.g1.processor.selector.maxTimeOut=10000
2.2、Collector1端的配置文件:avro-logger.properties
-
# Name the components on this agent
-
a1.sources = r1
-
a1.sinks = k1
-
a1.channels = c1
-
-
# Describe/configure the source
-
a1.sources.r1.type = avro
-
a1.sources.r1.bind = node02
-
a1.sources.r1.port = 52020
-
-
# Describe the sink
-
a1.sinks.k1.type = logger
-
-
# Use a channel which buffers events in memory
-
a1.channels.c1.type = memory
-
a1.channels.c1.capacity = 1000
-
a1.channels.c1.transactionCapacity = 100
-
-
# Bind the source and sink to the channel
-
a1.sources.r1.channels = c1
-
a1.sinks.k1.channel = c1
提示:Collector2的配置文件与Collector1的基本相同,只需要修改a1.sources.r1.bind
2.3、启动测试
- 分别启动Collector1、Collector2、agent
- 复制agent虚拟机,执行shell命令:while true;do echo test >> /export/data/callLog.log;sleep 0.5;done
Collector1的运行结果
Collector2的运行结果
3、Flume 的 failover
3.1、故障转移机制
实现 failover 功能,具体流程类似 load balance,但是内部处理机制与load balance完全不同
- Failover Sink Processor维护一个优先级Sink组件列表,只要有一个Sink 组件可用,Event就会被传递到下一个组件。
- 故障转移机制的作用是将失败的Sink 降级到一个池,在这些池中它们被分配一个冷却时间,随着故障的连续,增加下次该Sink的重试时间。但只要失败的Sink成功发送一个Event,它将恢复到活动池。
- Sink具有与之相关的优先级,数值越大,优先级越高。
例如,具有优先级为10的sink在优先级为8的Sink之前被激活。在激活过程中,如果发送事件时汇聚失败,则将尝试让优先级为8的Sink发送事件。如果没有指定优先级,则根据在配置中指定Sink的顺序来确定发送顺序
3.2、配置如下:
-
#agent1 name
-
agent1.channels = c1
-
agent1.sources = r1
-
agent1.sinks = k1 k2
-
-
#set channel
-
agent1.channels.c1.type = memory
-
agent1.channels.c1.capacity = 1000
-
agent1.channels.c1.transactionCapacity = 100
-
-
agent1.sources.r1.channels = c1
-
agent1.sources.r1.type = exec
-
agent1.sources.r1.command = tail -F /export/data/callLog.log
-
-
# set sink1
-
agent1.sinks.k1.channel = c1
-
agent1.sinks.k1.type = avro
-
agent1.sinks.k1.hostname = node02
-
agent1.sinks.k1.port = 52020
-
-
# set sink2
-
agent1.sinks.k2.channel = c1
-
agent1.sinks.k2.type = avro
-
agent1.sinks.k2.hostname = node03
-
agent1.sinks.k2.port = 52020
-
-
#set sink group
-
agent1.sinkgroups = g1
-
agent1.sinkgroups.g1.sinks = k1 k2
-
-
#set failover
-
a1.sinkgroups.g1.processor.type = failover
-
# 如果开启,则将失败的 sink 放入黑名单
-
a1.sinkgroups.g1.processor.backoff = true
-
# 还支持random
-
a1.sinkgroups.g1.processor.selector = round_robin
-
#在黑名单放置的超时时间,超时结束时,若仍然无法接收,则超时时间呈指数增长
-
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000
-
#优先级值, 绝对值越大表示优先级越高,若不设置,则按照sink的先后顺序
-
a1.sinkgroups.g1.processor.priority.k1 = 5
-
a1.sinkgroups.g1.processor.priority.k2 = 10
-
a1.sinkgroups.g1.processor.priority.k3 = 6
-
#失败的 Sink 的最大回退期(millis)
-
a1.sinkgroups.g1.processor.maxpenalty = 20000
3.3、测试参考负载均衡,这里就不测试了
转载请注明:SuperIT » Flume NG高可用集群搭建