# 通常架构
# filebeat=>kafka=>logstash=>elasticsearch=>kibana
1.filebeat的配置:
# filebeat/config/filebeat.yml:
#=========================== Filebeat inputs =============================
filebeat.inputs:
- type: log
enabled: true
paths:
- /data2/log/nginx/logs/access7.log
# - /var/log/*.log
#- c:\programdata\elasticsearch\logs\*
# 如果采集json日志开启下面三行:
# json.keys_under_root: true
# json.overwrite_keys: true
# json.add_error_key: true
tag: ['nginx']
fields:
log_source: nginx
level: debug
review: 1
scan_frequency: 10s
如果用es:
#----------------------------- Logstash output OK!!!--------------------------------
output.logstash:
hosts: ["10.13.113.237:5044"]
# The Logstash hosts
如果用kafka:
#----------------------------- Kafka output OK!!!!!--------------------------------
output.kafka:
enabled: true
hosts: ["yz-re011.kafka.com.cn:9110",
"yz-re012.kafka.com.cn:9110",
"yz-re005.kafka..com.cn:9110",
"dbl-re002.kafka.com.cn:9110",
"yz-re003.kafka.com.cn:9110",
"dbl-re001.kafka.com.cn:9110"]
topic: 'topic_biz_data'
username: 'name'
password: 'pwd'
version: '0.10.2.1'
compression: snappy
# version非常重要!!,filebeat默认:security_protocol=’SASL_PLAINTEXT’, sasl_mechanism=’PLAIN’,
# 如果没找到符合topics里的规则的,就采用topic为默认的
# 有关filebeat的log input的配置介绍见官网文档https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-log.html
# 有关filebeat output到kafka的配置介绍见官方文档
# https://www.elastic.co/guide/en/logstash/7.x/output-plugins.html
# https://www.elastic.co/guide/en/beats/filebeat/7.x/kafka-output.html
# 新版本的Kafka中支持SASL的GSSAPI与PLAINkafka两种认证方式,
# 但是在fIlebeat output kafka配置中只支持SASL/PLAIN
# http://rk700.github.io/2016/12/16/filebeat-kafka-logstash-authentication-authorization/
# https://serverfault.com/questions/1001134/filebeat-kafka-input-with-sasl
2.kafka配置:
太多了,不讲了,这里不是重点
security_protocol=’SASL_PLAINTEXT’
sasl_mechanism=’PLAIN’,
3.logstash配置:
input{
#es
#beats{
# host => "0.0.0.0" # 监听的所有ip 地址发送到5044端口的数据,这里是全部地址
# port => 5044
#}
#kafka
# https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html
# https://www.elastic.co/guide/en/logstash/current/plugins-outputs-kafka.html
# https://github.com/logstash-plugins/logstash-input-kafka/blob/master/lib/logstash/inputs/kafka.rb
kafka{ # 从kafka消费数据
bootstrap_servers => ["172.168.50.41:9092,172.168.50.41:9097,172.168.50.41:9098"] # kafka节点的IP+端口号
topics => ["topic_biz_data"] #
#topics => "%{[@metadata][topic]}" # 使用kafka传过来的topic
# topics_pattern => "topic_.*" # 使用正则匹配topic
codec => "json" # 数据格式!!!,重要
sasl_mechanism => "PLAIN"
security_protocol => "SASL_PLAINTEXT"
#consumer_threads => 2 # 消费线程数量
#decorate_events => true # 可向事件添加Kafka元数据,比如主题、消息大小的选项,这将向logstash事件中添加一个名为kafka的字段,
#auto_offset_reset => "latest" # 自动重置偏移量到最新的偏移量
#auto_commit_interval_ms => "1000"
sasl_jaas_config => "org.apache.kafka.common.security.scram.ScramLoginModule required username='xxx' password='xxx';"
group_id => "group_biz_data" # 消费组ID,多个有相同group_id的logstash实例为一个消费组
}
}
filter{
if [fields][log_source] == "nginx" {
# 拼接两个字段
mutate {
# 替换
gsub => ["message",'"',"'"]
}
grok{
match => {
"message" => "%{TIMESTAMP_ISO8601:@timestamp} %{IPV4:remote_addr} %{IPORHOST:http_host} %{QS:referer} %{WORD:scheme} %{QS:request} %{NOTSPACE:request_method} %{NUMBER:request_time:float} %{NOTSPACE:server_protocol} %{NOTSPACE:uri} %{HOSTNAME:host} %{HOSTNAME:domain} %{HOSTNAME:hostname} %{NUMBER:status} %{NUMBER:bytes} %{QS:agent} %{QS:x_forwarded} %{QS:upstr_addr} %{NUMBER:upstr_status} %{NOTSPACE:upstr_host} %{NUMBER:upstr_resp_time}"
}
}
mutate{
add_field => {"str" => "%{referer}%{request}"}
}
grok{
match => {
"str" => "(\?|\&)(site_id|id)=(?<siteid>[\d]+)"
}
}
mutate {
# 替换
gsub => ["upstr_resp_time","-","0"]
}
mutate{
remove_field => "message"
remove_field => "str"
remove_field => "@version"
remove_field => "host"
remove_field => "path"
remove_field => "tags"
split => ["x_forwarded", ","]
add_field => {"real_remote_ip" => "%{[x_forwarded][0]}"}
remove_field => "x_forwarded"
}
mutate {
convert => {
"upstr_resp_time" => "float"
}
gsub => ["referer","'",""]
gsub => ["real_remote_ip","'",""]
gsub => ["request","'",""]
gsub => ["upstr_addr","'",""]
}
ruby {
code => "event.set('index_day', event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
}
# json {
# source => "message"
# }
}
}
output{
if [fields][log_source] == "nginx" {
elasticsearch {
hosts => ["10.13.13.109:9200"]
# index => "nginx-access-logs-%{+YYYY.MM.dd}" #差8小时
index=> "nginx-access-logs-%{index_day}"
user => elastic
password => pwd23
# codec => json # 如果转换成功到json才记录
}
}
# # 调试的时候用
# stdout {
# codec => rubydebug
# }
}
转载请注明:SuperIT » (强烈推荐)log+filebeat+kafka+logstash+es配置, filebeat连不上kafka原因解析!!!
转载请注明:SuperIT » (强烈推荐)log+filebeat+kafka+logstash+es配置, filebeat连不上kafka原因解析!!!