微信搜索superit|邀请体验:大数据, 数据管理、OLAP分析与可视化平台 | 赞助作者:赞助作者

基于 Flume+Kafka+Spark Streaming 实现实时监控输出日志的报警系统

日志采集 aide_941 40℃

基于 Flume+Kafka+Spark Streaming 实现实时监控输出日志的报警系统

  • 运用场景:我们机器上每天或者定期都要跑很多任务,很多时候任务出现错误不能及时发现,导致发现的时候任务已经挂了很久了。
  • 解决方法:基于 Flume+Kafka+Spark Streaming 的框架对这些任务的输出日志进行实时监控,当检测到日志出现Error的信息就发送邮件给项目的负责人。
  • 目的:通过这个小项目熟悉基于 Flume+Kafka+Spark Streaming 框架实时分析处理日志,能用到真实项目就更好了。

一、Flume

Flume是用来收集、汇聚并且传输日志数据Kafka去。可以设置多个sources对应多个任务的日志,到一个kafka sinks。配置文件如下:

  1. #define agent
  2. agent_log.sources = s1 s2
  3. agent_log.channels = c1
  4. agent_log.sinks = k1
  5. #define sources.s1
  6. agent_log.sources.s1.type=exec
  7. agent_log.sources.s1.command=tail -F /data/log1.log
  8. #define sources.s2
  9. agent_log.sources.s2.type=exec
  10. agent_log.sources.s2.command=tail -F /data/log2.log
  11. #定义拦截器
  12. agent_log.sources.s1.interceptors = i1
  13. agent_log.sources.s1.interceptors.i1.type = static
  14. agent_log.sources.s1.interceptors.i1.preserveExisting = false
  15. agent_log.sources.s1.interceptors.i1.key = projectName
  16. agent_log.sources.s1.interceptors.i1.value= project1
  17. agent_log.sources.s2.interceptors = i2
  18. agent_log.sources.s2.interceptors.i2.type = static
  19. agent_log.sources.s2.interceptors.i2.preserveExisting = false
  20. agent_log.sources.s2.interceptors.i2.key = projectName
  21. agent_log.sources.s2.interceptors.i2.value= project2
  22. #define channels
  23. agent_log.channels.c1.type = memory
  24. agent_log.channels.c1.capacity = 1000
  25. agent_log.channels.c1.transactionCapacity = 1000
  26. #define sinks
  27. #设置Kafka接收器
  28. agent_log.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
  29. #设置Kafka的broker地址和端口号
  30. agent_log.sinks.k1.brokerList=cdh1:9092,cdh2:9092,cdh3:9092
  31. #设置Kafka的Topic
  32. agent_log.sinks.k1.topic=result_log
  33. #包含header
  34. agent_log.sinks.k1.useFlumeEventFormat = true
  35. #设置序列化方式
  36. agent_log.sinks.k1.serializer.class=kafka.serializer.StringEncoder
  37. agent_log.sinks.k1.partitioner.class=org.apache.flume.plugins.SinglePartition
  38. agent_log.sinks.k1.partition.key=1
  39. agent_log.sinks.k1.request.required.acks=0
  40. agent_log.sinks.k1.max.message.size=1000000
  41. agent_log.sinks.k1.agent_log.type=sync
  42. agent_log.sinks.k1.custom.encoding=UTF-8
  43. # bind the sources and sinks to the channels
  44. agent_log.sources.s1.channels=c1
  45. agent_log.sources.s2.channels=c1
  46. agent_log.sinks.k1.channel=c1

执行flume-ng命令启动flume:
flume-ng agent -c /etc/flume-ng/conf -f result_log.conf -n agent_log

二、Kafka

Kafka是一个消息系统,可以缓冲消息。Flume收集的日志传送到Kafka消息队列中(Flume作为生产者),然后就可以被Spark Streaming消费了,而且可以保证不丢失数据。kafka的具体知识可以阅读:https://www.cnblogs.com/likehua/p/3999538.html
#创建result_log主题
kafka-topics –zookeeper cdh1:2181,cdh1:2181,cdh3:2181 –create –topic result_log –partitions 3 –replication-factor 1
#测试-查看kafka主题列表,观察result_log是否创建成功
kafka-topics –list –zookeeper cdh1:2181,cdh1:2181,cdh3:2181
#测试-启动一个消费者测试flume传输日志到kafka这一环节是否正常运行
kafka-console-consumer –bootstrap-server cdh1:9092,cdh1:9092,cdh3:9092 –topic result_log

三、Spark Streaming (编程语言:scala,开发工具:Idea)

新建一个maven项目,配置pom.xml添加依赖。//具体见项目代码

我们用Zookeeper来管理spark streaming 消费者的offset。调用

KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams, newOffset))
与kafka建立连接,返回InputDStream,获取数据流,

stream.foreachRDD(rdd => {//处理程序})  //处理数据流。
val ssc = new StreamingContext(sc, Durations.seconds(60))
ssc.start()  //启动ssc
发送邮件的功能配置org.apache.commons.mail这个包的 HtmlEmail 这个类,调用 HtmlEmail.send 发送邮件。

编写一个start.sh脚本启动 Spark Streaming 程序,最后 sh start.sh 启动脚本。

  1. #!/bin/bash
  2. export HADOOP_USER_NAME=hdfs
  3. spark2-submit \
  4. –master yarn \
  5. –deploy-mode client \
  6. –executor-cores 3 \
  7. –num-executors 10 \
  8. –driver-memory 2g \
  9. –executor-memory 1G \
  10. –conf spark.default.parallelism=30 \
  11. –conf spark.storage.memoryFraction=0.5 \
  12. –conf spark.shuffle.memoryFraction=0.3 \
  13. –conf spark.reducer.maxSizeInFlight=128m \
  14. –driver-class-path mysql-connector-java-5.1.38.jar \
  15. –jars mysql-connector-java-5.1.38.jar,qqwry-java-0.7.0.jar,fastjson-1.2.47.jar,spark-streaming-kafka-10_2.11-2.2.0.jar,hive-hbase-handler-1.1.0-cdh5.13.0.jar,commons-email-1.5.jar,commons-email-1.5-sources.jar,mail-1.4.7.jar \
  16. –class com.lin.monitorlog.mianer.Handler \
  17. monitorLog.jar
  18. #[END]

spark streaming 程序代码链接: https://download.csdn.net/download/linge1995/10576773

转载请注明:SuperIT » 基于 Flume+Kafka+Spark Streaming 实现实时监控输出日志的报警系统

喜欢 (0)or分享 (0)