首页 > 代码库 > Logstash语法常用案例解析(二)

Logstash语法常用案例解析(二)

摘要

此篇主要讲Filter插件,已经对nginx 日志的各种处理实例

接着上篇继续说插件

1,Filter插件

  • Grok:正则捕获

  • Date:时间处理

  • Mutate:数据修改

  • Geoip:查询归类

  • JSON:编解码

Grok:解析和结构化任何文本。

http://grokdebug.herokuapp.com/patterns#        匹配规则,注意空格,如果空格不匹配也会报错

http://grokdebug.herokuapp.com/                         匹配检查,而且有语法提示

Grok 目前是logstash最好的方式对非结构化日志数据解析成结构化和可查询化。logstash内置了120个匹配模式,满足大部分需求。

格式:

filter {
    grok {
        match => { "message" => "grok_pattern" }
    }
}

注 :
        grok_pattern由零个或多个%{SYNTAX:SEMANTIC}组成,其中SYNTAX是表达式的名字,是由grok提供的,例如数字表达式的名字是NUMBER,IP地址表达式的名字是IP。SEMANTIC表示解析出来的这个字符的名字,由自己定义,例如IP字段的名字可以是client。

简单例子:

#cat  conf.d/test.conf
 
input {stdin{}}          #输入方式为标准输入
  filter {
  grok {                   #grok插件匹配
    #patterns_dir => "/path/to/patterns"   #将匹配规则写到指定文件方便管理
    match => {
    "message" => "%{WORD} %{NUMBER:request_time:float} %{WORD}"
}           #WORD匹配字符串,NUMBER匹配数值,支持int,float格式。匹配的值赋给request_time变量
  #remove_field => ["message"]  #处理结果删除掉message字段
  }
}
output {                  #输出方式为标准输出
  stdout {codec=>rubydebug}        #定义输出格式为rubydebug
}

        结果:

# ./bin/logstash   -f conf.d/test.conf
Logstash startup completed
begin 123.456 end
 
{
"message" => "begin 123.456 end",   #remove之后就不显示了。
"@version" => "1",
"@timestamp" => "2016-05-09T02:43:47.952Z",
"host" => "dev-online",
"request_time" => 123.456               #grok匹配中新加的变量
}

        Nginx 日志处理匹配:
        因为nginx日志已经被处理成json数据,传过来就是key:value的方式,打印成rubydebug格式如下:

技术分享        所以现在想要筛选不要的字段

input {
  file {
    path => "/var/log/nginx/access.log"
    type => "json"
    codec => "json"
    start_position => "beginning"
  }
}
filter {
  grok {
    match => {
    "@timestamp" =>"%{WORD}"        #先把不想要的字段匹配出来
    "type" => "%{WORD}"
    }
  remove_field => ["@timestamp","type"]  #再移除字段
}
}
output {
  stdout {
    codec=>rubydebug
  }
}

        运行结果:

技术分享        nginx 日志json格式:

log_format     json        ‘{"@timestamp":"$time_iso8601",‘
                            ‘"@version":"1",‘
                            ‘"host":"$server_addr",‘
                            ‘"client":"$remote_addr",‘
                            ‘"size":$body_bytes_sent,‘
                            ‘"responsetime":$request_time,‘
                            ‘"domain":"$host",‘
                            ‘"url":"$request",‘
                            ‘"refer":"$http_referer",‘
                            ‘"agent":"$http_user_agent",‘
                            ‘"status":"$status"}‘;
access_log /var/log/nginx/access.log json;

nginx配置文件常用正则匹配参数

nginx 日志格式                     匹配项目                                                        备注

$remote_addr                      %{IPORHOST:clientip}

$remote_user                       %{NOTSPACE:remote_user}

[$time_local]                        \[%{HTTPDATE:timestamp}\]                  “[]"需要属于特殊字符需要转义一下

"$request"                             "%{WORD:method}                                   访问请求,一般都加"",匹配时也加一下。 WORD匹配GET,POST

method                              %{URIPATHPARAM:request}                   URIPATHPARAM匹配请求的uri

HTTP                                %{NUMBER:httpversion}"                         NUMBER匹配数字,并赋值给httpversion http协议版本

$status                                  %{NUMBER:status}                                     NUMBER匹配数字,并赋值给status,作为返回状态

$body_bytes_sent                %{NUMBER:response}                                内容大小

"$http_referer"                    "%{QS:referrer}"                                            匹配请求refer

"$http_user_agent"             "%{QS:agent}"                                               匹配亲切agent

"$http_x_forwarded_for"    "%{QS:xforwardedfor}"                                   匹配xfw

$upstream_addr                  %{IPV4:upstream}:%{POSINT:port}

$scheme                               %{WORD:scheme}                                    匹配http or https

eg: nginx日志格式

log_format  access  ‘$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"‘;

日志实例:

"192.168.1.22 - - [20/Apr/2016:16:28:14 +0800] "GET /ask/232323.html HTTP/1.1" 500 15534 "http://test.103.100xhs.com/" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101 Safari/537.36""

匹配规则

"%{IPORHOST:clientip} - %{NOTSPACE:remote_user} \[%{HTTPDATE:timestamp}\] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:status} %{NUMBER:response} %{QS:referrer} %{QS:agent}"

Geoip地址查询:

GeoIP 是最常见的免费 IP 地址归类查询库,GeoIP 库可以根据 IP地址提供对应的地域信息,包括国别,省市,经纬度等,对于可视化地图和区域统计非常有用

input {
    stdin{}
    }  
filter {
  geoip {
      source => "message"     #source必须为公网ip  否则geoip不会显示数据    #
      fields => ["city_name","country_code2","country_name","latitude","longitude"]    
      }      #geoip输出的内容比较多,可以指定输出的列
      }
output {
  stdout{
      codec=>rubydebug
        }
  }

找到对应IP的key 就是geoip  中source所指定的值。

完整的例子:

技术分享

        结果:

技术分享

注意:geoip 插件的 "source" 字段可以是任一处理后的字段,比如 "client_ip",但是字段内容却需要
        小心!geoip 库内只存有公共网络上的 IP 信息,查询不到结果的,会直接返回 null,

JSON:

input {stdin{}}
  filter {
    json {
      source => "message"              #必选项
    }
}
output {
  stdout{
    codec=>rubydebug
  }
}

        结果:

{"name":"wd","age":"15"}
{
"message" => "{\"name\":\"wd\",\"age\":\"15\"}",
"@version" => "1",
"@timestamp" => "2016-05-09T06:32:13.546Z",     #加一个时间戳的好处是方便kibana导入
"host" => "dev-online",
"name" => "wd",
"age" => "15"
}

Date事件处理

注意:因为在稍后的 outputs/elasticsearch 中常用的 %{+YYYY.MM.dd} 这种写法必须读取 @timestamp 数据,所以一定不要直接删掉这个字段保留自己的字段,而是应该用 filters/date 转换后删除自己的字段!

filter {
  grok {
    match => ["message", "%{HTTPDATE:logdate}"]
  }
  date {
    match => ["logdate", "dd/MMM/yyyy:HH:mm:ss Z"]
  }
}

注意:时区偏移量只需要用一个字母 Z 即可。

Mutate数据修改

1,类型转换

可以设置的转换类型包括:"integer","float" 和 "string"。示例如下

filter {
  mutate {
    convert => ["request_time", "float"]
  }
}

注意:mutate 除了转换简单的字符值,还支持对数组类型的字段进行转换,即将 ["1","2"] 转换成 [1,2]。但不支持对哈希类型的字段做类似处理。有这方面需求的可以采用稍后讲述的 filters/ruby 插件完成。

2,字符串处理

gsub 仅对字符串类型字段有效

gsub => ["urlparams", "[\\?#]", "_"]

split

split => ["message", "|"]

随意输入一串以|分割的字符,比如 "123|321|adfd|dfjld*=123",可以看到如下输出:

技术分享

join 仅对数组类型字段有效

我们在之前已经用 split 割切的基础再 join 回去。配置改成:

join => ["message", ","]

merge合并两个数组或者哈希字段。依然在之前 split 的基础上继续:

merge => ["message", "message"]

技术分享

rename 重命名某个字段,如果目的字段已经存在,会被覆盖掉:

rename => ["syslog_host", "host"]

update 更新某个字段的内容。如果字段不存在,不会新建。

replace 作用和 update 类似,但是当字段不存在的时候,它会起到 add_field 参数一样的效果,自动添加新的字段。

Codec编码插件

json: 直接输入预定义好的 JSON 数据,这样就可以省略掉 filter/grok 配置!

path => "/var/log/nginx/access.log_json""

codec => "json"

Multiline:合并多行数据

stdin {
  codec => multiline {
    pattern => "^\["
    negate => true
    what => "previous"
  }
}

        终端输入:以 the end 结束,换行无法结束。

本文出自 “aolens·程超” 博客,请务必保留此出处http://aolens.blog.51cto.com/7021142/1929705

Logstash语法常用案例解析(二)