首页 > 代码库 > 分布式实时日志处理平台ELK
分布式实时日志处理平台ELK
这三样东西分别作用是:日志收集、索引与搜索、可视化展现
l logstash
这张架构图可以看出logstash只是collect和index的地方,运行时传入一个.conf文件,配置分三部分:input ,filter,output。
l redis
redis在这里是作为日志收集与索引之间解耦作用
l elasticsearch
核心组件,用来搜索。主要特点:real-time,distributed,Highly Available,document oriented,schema free,RESTful
kibana
可视化日志组件,让数据交互变得更容易
部署
需要的组件
- logstash https://download.elasticsearch.org/logstash/logstash/logstash-1.4.2.tar.gz
- redis http://download.redis.io/releases/redis-stable.tar.gz
- elasticsearch https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-1.3.2.zip
- kibana https://github.com/elasticsearch/kibana
Logstash
logstash 10 分钟教程 :http://logstash.net/docs/1.4.2/tutorials/10-minute-walkthrough/
下载最新logstash版本并解压
编辑logstash.conf配置文件
logstash用户说明文档:http://logstash.net/docs/1.4.2/
log4j server配置实例:log4j.conf
input { log4j { data_timeout => 5 # mode => "server" # port => 4560 } }
filter { json { source => "message" remove_field => ["message","class","file","host","method","path","priority","thread","type","logger_name"] } }
output{ #stdout { codec => json } redis { host => "redis.internal.173" port => 6379 data_type => "list" key => "soalog" } }
|
logstash输出elasticsearch配置实例:soalog-es.conf
input { redis { host => "redis.internal.173" port => "6379" key => "soalog" data_type => "list" } }
filter { json { source => "message" remove_field => ["message","type"] } }
output { elasticsearch { #host => "es1.internal.173,es2.internal.173,es3.internal.173" cluster => "soaes" index => "soa_logs-%{+YYYY.MM.dd}" } }
|
这里filter配置source => message,是把message里json串解析出来,作为索引字段,然后配置remove_field 把不需要字段删除
启动
./logstash -f soalog-es.conf --verbose -l ../soalog-es.log &
./logstash -f log4j.conf --verbose -l ../log4j.log &
Elastcisearch
下载最新版本elasticsearch并解压
bin/elasticsearch -d 后端运行
验证
elasticsearch集群配置:
编辑 config/elasticsearch.yml
#指定你的集群名称,默认是elasticsearch,在使用客户端连接集群模式会用到
cluster.name: soaes
#指定数据存储目录,可以多个磁盘 /path/to/data1,/path/to/data2
path.data: /mnt/hadoop/esdata
#指定日志存储目录
path.logs: /mnt/hadoop/eslogs
#集群主节点列表,执行发现新节点
discovery.zen.ping.unicast.hosts: ["hadoop74", "hadoop75"]
配置es模板 ,可以指定字段是否索引,以及存储类型
在config目录下创建templates目录
增加模板文件template-soalogs.json
{ "template-soalogs" : { "template" : "soa_logs*", "settings" : { "index.number_of_shards" : 5, "number_of_replicas" : 1, "index" : { "store" : { "compress" : { "stored" : true, "tv": true } } } }, "mappings" : { "logs" : { "properties" : { "providerNode" : { "index" : "not_analyzed", "type" : "string" }, "serviceMethod" : { "index" : "not_analyzed", "type" : "string" }, "appId" : { "index" : "not_analyzed", "type" : "string" }, "status" : { "type" : "long" }, "srcAppId" : { "index" : "not_analyzed", "type" : "string" }, "remark" : { "type" : "string" }, "serviceVersion" : { "index" : "not_analyzed", "type" : "string" }, "srcServiceVersion" : { "index" : "not_analyzed", "type" : "string" }, "logSide" : { "type" : "long" }, "invokeTime" : { "type" : "long" }, "@version" : { "type" : "string" }, "@timestamp" : { "format" : "dateOptionalTime", "type" : "date" }, "srcServiceInterface" : { "index" : "not_analyzed", "type" : "string" }, "serviceInterface" : { "index" : "not_analyzed", "type" : "string" }, "retryCount" : { "type" : "long" }, "traceId" : { "index" : "not_analyzed", "type" : "string" }, "processTime" : { "type" : "long" }, "consumerNode" : { "index" : "not_analyzed", "type" : "string" }, "rpcId" : { "index" : "not_analyzed", "type" : "string" }, "srcServiceMethod" : { "index" : "not_analyzed", "type" : "string" } } } } } } |
kibana
进入elasticsearch目录
bin/plugin -install elasticsearch/kibana
验证:http://localhost:9200/_plugin/kibana
kibana需要配置查询索引规则
这里index是soa_logs,按天分索引格式需要指定为YYYY-MM-DD
logstash时差8小时问题
logstash在按每天输出到elasticsearch时,因为时区使用utc,造成每天8:00才创建当天索引,而8:00以前数据则输出到昨天的索引
修改logstash/lib/logstash/event.rb 可以解决这个问题
第226行
.withZone(org.joda.time.DateTimeZone::UTC)
修改为
.withZone(org.joda.time.DateTimeZone.getDefault())
log4j.properties配置
#remote logging
log4j.additivity.logstash=false
log4j.logger.logstash=INFO,logstash
log4j.appender.logstash =org.apache.log4j.net.SocketAppender
log4j.appender.logstash.RemoteHost= localhost
log4j.appender.logstash.Port =4560
log4j.appender.logstash.LocationInfo= false
java日志输出
private static finalorg.slf4j.Logger logstash = org.slf4j.LoggerFactory.getLogger("logstash");
logstash.info(JSONObject.toJSONString(rpcLog));
KOPF
elasticsearch集群监控
bin/plugin -installlmenezes/elasticsearch-kopf
http://localhost:9200/_plugin/kopf
logstash接入tomcat日志示例:
logstash代理端配置tomcat.conf
input { file { type=> "usap" path=> ["/opt/17173/apache-tomcat-7.0.50-8090/logs/catalina.out","/opt/17173/apache-tomcat-7.0.50-8088/logs/catalina.out","/opt/17173/apache-tomcat-7.0.50-8086/logs/catalina.out","/opt/ 17173/apache-tomcat-7.0.50-8085/logs/catalina.out","/opt/17173/apache-tomcat-6.0.37-usap-image/logs/catalina.out"] codec=> multiline { pattern => "(^.+Exception:.+)|(^\s+at .+)|(^\s+... \d+ more)|(^\s*Caused by:.+)" what=> "previous" } } } filter { grok { #match => { "message" => "%{COMBINEDAPACHELOG}" } match => [ "message", "%{TOMCATLOG}", "message", "%{CATALINALOG}" ] remove_field => ["message"] } } output { # stdout{ codec => rubydebug } redis {host => "redis.internal.173" data_type => "list" key=> "usap" } } |
修改logstash/patterns/grok-patterns
增加tomcat日志过滤正则
#tomcat log JAVACLASS (?:[a-zA-Z0-9-]+\:)+[A-Za-z0-9$]+ JAVALOGMESSAGE (.*) THREAD [A-Za-z0-9\-\[\]]+ # MMM dd, yyyy HH:mm:ss eg: Jan 9, 2014 7:13:13 AM CATALINA_DATESTAMP %{MONTH} %{MONTHDAY}, 20%{YEAR} %{HOUR}:?%{MINUTE}(?::?%{SECOND}) (?:AM|PM) # yyyy-MM-dd HH:mm:ss,SSS ZZZ eg: 2014-01-09 17:32:25,527 -0800 TOMCAT_DATESTAMP 20%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{HOUR}:?%{MINUTE}(?::?%{SECOND}) %{ISO8601_TIMEZONE} LOG_TIME %{HOUR}:?%{MINUTE}(?::?%{SECOND}) CATALINALOG %{CATALINA_DATESTAMP:timestamp} %{JAVACLASS:class} %{JAVALOGMESSAGE:logmessage} # 11:27:51,786 [http-bio-8088-exec-4] DEBUG JsonRpcServer:504 - Invoking method: getHistory #TOMCATLOG %{LOG_TIME:timestamp} %{THREAD:thread} %{LOGLEVEL:level} %{JAVACLASS:class} - %{JAVALOGMESSAGE:logmessage} TOMCATLOG %{TOMCAT_DATESTAMP:timestamp} %{LOGLEVEL:level} %{JAVACLASS:class} - %{JAVALOGMESSAGE:logmessage} |
启动 tomcat 日志代理:
./logstash -f tomcat.conf --verbose -l../tomcat.log &
tomcat日志存入es
配置tomcat-es.conf
input { redis { host => ‘redis.internal.173‘ data_type => ‘list‘ port => "6379" key => ‘usap‘ #type => ‘redis-input‘ #codec => json } } output { # stdout { codec => rubydebug } elasticsearch { #host => "es1.internal.173,es2.internal.173,es3.internal.173" cluster => "soaes" index => "usap-%{+YYYY.MM.dd}" } } |
启动tomcat日志存储
./logstash -f tomcat-es.conf --verbose -l../tomcat-es.log &
logstash接入nginx\syslog日志示例
logstash代理端配置nginx.conf
input { file{ type => "linux-syslog" path => [ "/var/log/*.log", "/var/log/messages"] } file { type => "nginx-access" path => "/usr/local/nginx/logs/access.log" } file { type => "nginx-error" path => "/usr/local/nginx/logs/error.log" } } output { # stdout{ codec => rubydebug } redis {host => "redis.internal.173" data_type => "list" key=> "nginx" } } |
启动nginx日志代理
./logstash -f nginx.conf --verbose -l../nginx.log &
nginx日志存入es
配置nginx-es.conf
input { redis { host => ‘redis.internal.173‘ data_type => ‘list‘ port => "6379" key => ‘nginx‘ #type => ‘redis-input‘ #codec => json } } filter { grok { type => "linux-syslog" pattern => "%{SYSLOGLINE}" } grok { type => "nginx-access" pattern => "%{IPORHOST:source_ip} - %{USERNAME:remote_user} \[%{HTTPDATE:timestamp}\] %{IPORHOST:host} %{QS:request} %{INT:status} %{INT:body_bytes_sent} %{QS:http_refere r} %{QS:http_user_agent}" } } output { # stdout { codec => rubydebug } elasticsearch { #host => "es1.internal.173,es2.internal.173,es3.internal.173" cluster => "soaes" index => "nginx-%{+YYYY.MM.dd}" } } |
启动nginx日志存储
./logstash -f nginx-es.conf --verbose -l../nginx-es.log &
分布式实时日志处理平台ELK