微信搜索superit|邀请体验:大数据, 数据管理、OLAP分析与可视化平台 | 赞助作者:赞助作者

基于Nginx1.9+LuaJIT+Kafka的点播监控系统实战(上海卓越智慧树网点播监控系统)

日志采集 aide_941 26℃

基于Nginx1.9+LuaJIT+Kafka的点播监控系统实战(上海卓越智慧树网点播监控系统)

最近在做点监控系统,先后采用了两套方案:
方案一:Nginx记录日志 –> tail语句扫描日志到Kafka –> Java站点消费Kafka消息后存储到Mysql数据库 –> Mysql数据库 –> 数据库定时汇聚数据 –> 界面呈现
方案一遇到的问题:
1.1 面对海量数据,日志文件增长很快,磁盘占用大
1.2 Java站点消费Kafka消息后存储到Mysql数据库 太慢为了解决上述两个问题,采用方案二:
方案二:Nginx+Lua直接向Kafa发送数据  –> Java站点消费Kafka消息后存储到Mysql数据库 –> Mysql数据库 –> 数据库定时汇聚数据 –> 界面呈现
2.1 Nginx+Lua直接向Kafa发送数据: Nginx不记录日志,直接发送到Kafka,不占用磁盘空间
2.2 Java站点消费Kafka消息后存储到Mysql数据库: 合并更新操作,如对同一个sessionid的更新操作有100条,合并后只有一条更新语句
方案二遇到的问题:
2.3 Nginx可以直接用完整的OpenResty,也可以在现有的Nginx基础上添加插件
我采用的是在现有的Nginx基础上添加插件

方案一具体如下:
Nginx1.9.9
kafka_2.11-0.9.0.1

1.1、Nginx配置文件里的配置:
log_format ht-video  ‘$request|$msec|$http_x_forwarded_for|$http_user_agent|$request_body’;

server{
server_name lc.zhihuishu.com;

location / {
root /usr/local/nginxnew/html/view;
index index.html;
}
location /ht{
#root   html;
#index  index.html index.htm;
if ( $request ~ “GET” ) {
#access_log logs/ht-video.log ht-video;
access_log /htlog/ht-video.log ht-video;
}
proxy_set_header Host $http_host;
proxy_pass http://localhost:6099;
client_max_body_size 8000M;
#error_page 405 =200 $1;
}
}

server {
listen       6099;
server_name  localhost;
location /ht {
root   /usr/local/nginxnew/html/hightower;
index  index.html index.htm;
client_max_body_size 8000M;
error_page 405 =200 $1;
}

1.2、修改本机Hosts文件
vi /etc/hosts

10.252.126.242  kafka1.livecourse.com
10.252.126.242  zookeeper1.livecourse.com

1.3、修改Kafka文件
vi /usr/local/kafka_2.11-0.9.0.1/config/server.properties
这个文件里要改下面5点:
broker.id=0
port=9092
host.name=10.252.126.242
log.dirs=/able4/kafka-logs
zookeeper.connect=zookeeper1.livecourse.com:2181

vi /usr/local/kafka_2.11-0.9.0.1/bin/kafka-run-class.sh
添加两行:
export JAVA_HOME=/usr/local/java/jdk1.7.0_79
export JRE_HOME=/usr/local/java/jdk1.7.0_79/jre

1.4、启动zookeeper和kafka
/usr/local/kafka_2.11-0.9.0.1/bin/zookeeper-server-start.sh -daemon /usr/local/kafka_2.11-0.9.0.1/config/zookeeper.properties
/usr/local/kafka_2.11-0.9.0.1/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.11-0.9.0.1/config/server.properties

1.5、扫描Nginx日志发送到kafka
tail -n 0 -f  /usr/local/nginxnew/logs/ht-video.log | /usr/local/kafka_2.11-0.9.0.1/bin/kafka-console-producer.sh –broker-list kafka1.livecourse.com:9092 –topic HT-VIDEO

方案二具体如下:
Nginx1.9.9
LuaJIT-2.0.4
lua-nginx-module-0.10.2
ngx_devel_kit-0.3.0rc1
lua-resty-kafka
kafka_2.11-0.9.0.1

参考的这几篇文章方成此方案
Nginx与Lua

基于Lua+Kafka+Heka的Nginx Log实时监控系统

Lua如何读取nginx内置对象和方法

Kafka官方文档

nginx+lua+kafka实现日志统一收集汇总

1 安装LuaJIT
下载http://luajit.org/download.html
http://luajit.org/install.html
命令如下:
tar zxf LuaJIT-2.0.4.tar.gz
cd LuaJIT-2.0.4
make PREFIX=/usr/local/LuaJIT
make install PREFIX=/usr/local/LuaJIT
echo “/usr/local/LuaJIT/lib” > /etc/ld.so.conf.d/usr_local_luajit_lib.conf
ldconfig
#注意环境变量!
export LUAJIT_LIB=/usr/local/LuaJIT/lib
export LUAJIT_INC=/usr/local/LuaJIT/include/luajit-2.0

2 安装lua-nginx-module
https://github.com/openresty/lua-nginx-module/tags
cd /usr/local
tar zxvf lua-nginx-module-0.10.2.tar.gz

3 安装ngx_devel_kit

https://github.com/simpl/ngx_devel_kit/tags

http://17173ops.com/2013/11/01/17173-ngx-lua-manual.shtml
cd /usr/local
tar zxvf ngx_devel_kit-0.3.0rc1.tar.gz

4 安装编译Nginx
http://17173ops.com/2013/11/01/17173-ngx-lua-manual.shtml
给Nginx添加下面的参数,如果已经安装了Nginx则用
nginx -V
–add-module=/usr/local/lua-nginx-module-0.10.2 –add-module=/usr/local/ngx_devel_kit-0.3.0rc1
如:
cd /usr/local/nginx-1.9.9
./configure  –prefix=/usr/local/nginxnew/ –with-http_stub_status_module –with-http_ssl_module –with-http_realip_module –add-module=/root/install/ngx_log_if-master –add-module=/usr/local/lua-nginx-module-0.10.2 –add-module=/usr/local/ngx_devel_kit-0.3.0rc1

5 lua插件lua-resty-kafka
https://github.com/doujiang24/lua-resty-kafka
mkdir /usr/local/lua
上传lua-cjson-2.1.0.3.tar.gz到/usr/local/lua
上传lua-resty-kafka到/usr/local/lua
这里遇到些问题,我改写了其中的client.lua文件的两个方法:
方法1:
function _M.new(self, broker_list, client_config)
local opts = client_config or {}
local socket_config = {
socket_timeout = opts.socket_timeout or 3000,
keepalive_timeout = opts.keepalive_timeout or 600 * 1000,   — 10 min
keepalive_size = opts.keepalive_size or 2,
socket_config
}
–start 0 == wangsihong 2016-4-16
local broker_list_host_ip = opts.broker_list_host_ip or {}

local cli = setmetatable({
broker_list = broker_list,
broker_list_host_ip = broker_list_host_ip,
topic_partitions = {},
brokers = {},
client_id = “worker” .. pid(),
socket_config = socket_config,
}, mt)
–end 0 == wangsihong 2016-4-16

if opts.refresh_interval then
meta_refresh(nil, cli, opts.refresh_interval / 1000) — in ms
end

return cli
end

方法2:
function _M.choose_broker(self, topic, partition_id)
local brokers, partitions = self:fetch_metadata(topic)

if not brokers then
return nil, partitions
end

local partition = partitions[partition_id]
if not partition then
return nil, “not found partition”
end

local config = brokers[partition.leader]
if not config then
return nil, “not found broker”
end

–start 1 == wangsihong 2016-4-16
local broker_list_host_ip = self.broker_list_host_ip
for k = 1, #broker_list_host_ip do
local hostip = broker_list_host_ip[k]
if config ~= nil and hostip ~= nil and config.host == hostip.host then
config.host = broker_list_host_ip[k].ip
end
end
–end  1 == wangsihong 2016-4-16

return config
end

6 lua插件cjson
http://www.kyne.com.au/~mark/software/lua-cjson-manual.html
cd /usr/local/lua
tar zxvf lua-cjson-2.1.0.3.tar.gz

7 安装Nginx1.9.9好后修改nginx.conf

http{
#    lua_package_path “/usr/local/lua/lua-resty-kafka/lib/?.lua;/usr/local/lua/lua-cjson-2.1.0.3/lua/?.lua;;”;
#    lua_package_cpath ‘/usr/local/LuaJIT/lib/lua/5.1/?.so;’;
lua_package_path “/usr/local/lua/lua-resty-kafka/lib/?.lua;;”;

server{
#为了方便调试,关闭了lua_code_cache,如果是生产环境,应该开启它。
lua_code_cache off;

listen 80;
server_name localhost;
location = /lua-v {
content_by_lua ‘
ngx.header.content_type = “text/plain”;
if jit then
ngx.say(jit.version)
else
ngx.say(_VERSION)
end
‘;
}

location /ht3/video/p {
content_by_lua ‘
ngx.header.content_type = “text/plain”;
–local cjson = require “cjson”
–local client = require “resty.kafka.client”
local producer = require “resty.kafka.producer”

local broker_list = {
{ host = “10.252.126.242”, port = 9092 },
{ host = “10.252.126.242”, port = 9093 },
{ host = “10.252.126.242”, port = 9094 },
{ host = “10.252.126.242”, port = 9095 },
{ host = “10.252.126.242”, port = 9096 },
}
local broker_list_host_ip = {
{ host = “kafka1.livecourse.com”, ip = “10.252.126.242” },
}

local key = “key”
–local message = $request|$msec|$remote_addrx|$http_user_agent|$request_body

local myIP = ngx.req.get_headers()[“X-Real-IP”]
if myIP == nil then
myIP = ngx.req.get_headers()[“x_forwarded_for”]
end
if myIP == nil then
myIP = ngx.var.remote_addr
end
local h = ngx.req.get_headers()

local message = ngx.req.get_method() .. ” ” .. ngx.var.uri
if ngx.var.args  ~= nil then
message = message .. “?” .. ngx.var.args .. “|”
end
message = message .. ngx.now() .. “|”
message = message .. myIP .. “|”
message = message .. h[“user-agent”] .. “|”
local bodyData = ngx.req.get_body_data()
if bodyData  == nil then
bodyData = “-”
end
message = message .. bodyData

— 定义kafka异步生产者
— 发送日志消息,send第二个参数key,用于kafka路由控制:
— key为nill(空)时,一段时间向同一partition写入数据
— 指定key,按照key的hash写入到对应的partition
— 指定key,按照key的hash写入到对应的partition

local key = nil

— this is async producer_type and bp will be reused in the whole nginx worker
local bp = producer:new(broker_list, { producer_type = “async” , broker_list_host_ip = broker_list_host_ip,refresh_interval = 3000})

local ok, err = bp:send(“HT-VIDEO”, key, message)
if not ok then
–ngx.say(“send err:”, err)
ngx.say(“void(1);”);
return
end
ngx.say(“void(0);”);
–ngx.say(“send success, ok:”, ok)
‘;
}

}

}

8 完成
请求Url:http://localhost/ht3/video/p?a=bbbb 就能将日志内容发送到kafka

转载请注明:SuperIT » 基于Nginx1.9+LuaJIT+Kafka的点播监控系统实战(上海卓越智慧树网点播监控系统)

喜欢 (0)or分享 (0)