首页 > 代码库 > 日志监控_ElasticStack-0003.Logstash输入插件及实际生产案例应用?

日志监控_ElasticStack-0003.Logstash输入插件及实际生产案例应用?

新版插件:


说明: 从5.0开始,插件都独立拆分成gem包,每个插件可独立更新,无需等待Logstash自身整体更新,具体管理命令可参考./bin/logstash-plugin --help帮助信息../bin/logstash-plugin list其实所有的插件就位于本地./vendor/bundle/jruby/1.9/gems/目录下

扩展: 如果GitHub上面(https://github.com/logstash-plugins/)发布了扩展插件,可通过./bin/logstash-plugin install <plugin-name>,当然升级也很方便./bin/logstash-plugin update <plugin-name>,如果要安装更新本地已有的插件可通过./bin/logstash-plugin install/update <plugin-path>即可.

注意: 默认./bin/logstash-plugin install/update时是到https://rubygems.org/下载包,速度非常慢,所以强烈推荐手动从https://github.com/logstash-plugins/https://rubygems.org/下载下来更新


输入插件: https://www.elastic.co/guide/en/logstash/current/input-plugins.html


说明: 输入插件只用于input区段中,可以通过不同的方式来采集数据,如上为目前Logstash支持的所有输入插件.


插件名称: udp (https://www.elastic.co/guide/en/logstash/current/plugins-inputs-udp.html)

input {
    udp {
        port => 25826
        workers => 2
        queue_size => 20000
        buffer_size => 1452
        codec => collectd { }
        type => "collectd"
    }
}
output{
    stdout{
        codec => rubydebug
    }
}

生产案例:

yum -y install collectd*
cp -rfp /etc/collectd.conf /etc/collectd.conf_org
vim /etc/collectd.conf
# Collectd基础信息采集配置
Hostname    "xm-server-00001"
FQDNLookup  false
LoadPlugin  df
LoadPlugin  cpu
LoadPlugin  disk
LoadPlugin  memory
LoadPlugin  network
<Plugin network>
    Server "10.2.5.51" "25826"
</Plugin>
LoadPlugin  interface
<Plugin interface>
    Interface "eth0"
    IgnoreSelected false
</Plugin>
Include "/etc/collectd.d"

说明: 如上是Logstash Udp插件的生产案例,Logstash配置文件中udp区段port表示监听的udp端口,workers表示读取socket数据的线程数,一般设置为CPU个数即可,queue_size表示udp包可堆积在内存中个数,buffer_size表示读取缓冲区最大大小,codec 表示编码器,type表示手工定义的类型,可用于filter区段中以及后面前端Kibana检索


插件名称: file (https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html)

input {
    file {
        path => ["/xm-workspace/xm-webs/xmcloud/logs/*.log"]
        type => "dss-pubserver"
codec => json
        start_position => "beginning"
    }
}
output{
    stdout{
        codec => rubydebug
    }
}

生产案例:

log_format json ‘{"@timestamp":"$time_iso8601",‘
                 ‘"host":"$server_addr",‘
                 ‘"clientip":"$remote_addr",‘
                 ‘"size":$body_bytes_sent,‘
                 ‘"responsetime":$request_time,‘
                 ‘"upstreamtime":"$upstream_response_time",‘
                 ‘"upstreamhost":"$upstream_addr",‘
                 ‘"http_host":"$host",‘
                 ‘"url":"$uri",‘
                 ‘"xff":"$http_x_forwarded_for",‘
                 ‘"referer":"$http_referer",‘
                 ‘"agent":"$http_user_agent",‘
                 ‘"status":"$status"}‘;

说明: 分析网站/接口访问日志时可监听匹配日志文件,同时会生成.sincedb数据库文件,记录被监听文件的inode/major/minor/position,file区段中还支持delimiter表示行分割符,discover_interval表示检查是否有新文件频率,默认15秒,exclude表示排除监听的匹配列表,close_older表示多久没修改就关闭监听句柄,默认3600秒,ignore_older表示只检查多久内的文件,默认86400,sincedb_path表示sincedb存放位置,默认位于(Linux: $HOME/.,sincedb | Windows: C:\Windows\System32\config\systemprofile\.sincedb),sincedb_write_interval表示多久写一次sincedb,默认为15秒,stat_interval为每隔多久检查一次被监听的文件状态,默认为1秒,start_position表示从什么位置读取文件数据,默认为结束位置,类似tail -f,当然也可设定beginning从头读取.

注意: 通常导入原有数据进Elasticsearch前需通过filter/date插件规范默认的@timestamp字段值,file插件并不支持递归监视,如果有需求可以数组形式匹配,支持/path/to/*/*/*/*.log写法,如果需要反复测试可将sincedb_path定义为/dev/null,这样每次重启都会自动从头开始读,还有一点儿需要注意的是Windows由于没有inode概念,所以监听文件不靠谱,推荐使用nxlog作为收集端.


插件名称: stdin (https://www.elastic.co/guide/en/logstash/current/plugins-inputs-stdin.html)

input {
    stdin {
        codec => json
        tags => ["dss-pubserver"]
        type => "stdin"
        add_field => {
            "runenv" => "docker"
        }
    }
}
output{
    stdout{
        codec => rubydebug
    }
}

生产案例:

{"status": "", "content_length": "", "body_bytes_sent": "", "http_x_forwarded_for": "", "request_time": "", "errors": {"text": "res body is invalid protocol format args", "line": 128, "func": "process_msg()", "file": "dsspub-server.lua"}, "request_body": "", "http_referer": "", "level": "", "hostname": "", "request": "", "remote_addr": "", "upstream_response_time": "", "time_local": "", "upstream_addr": ""}

说明: type和tags是Logstash事件中两个特殊的字段,通常会在输入区段中来标记事件类型,然后在数据处理区域根据事件类型来调用插件处理消息或添加删除tags,最后在输出中通过tags来判断并选择输出


插件名称: syslog(https://www.elastic.co/guide/en/logstash/current/plugins-inputs-syslog.html)

input {
    syslog {
        port => "6514"
    }
}
output{
    stdout{
        codec => rubydebug
    }
}

生产案例:

vim /etc/rsyslog.conf 
*.*                                                     @@127.0.0.1:6514

说明: syslog在收集网络设备端日志时也许是几乎唯一可行办法,简单测试只需修改/etc/rsyslog.conf配置,加入网络传输支持,发送到本地的Logstash监听端口6514,默认接收过滤操作都在input中完成,虽然启动时可通过-w指定worker数但是同一个客户端数据的处理以及过滤都在同一个线程中完成,导致处理性能严重下降,如果使用udp+grok的match的%{SYSLOGLINE}和syslog_pri实现,虽然将input和grok拆分拆分到两个线程,但udp接收缓冲区有限制(netstat -plnu | awk ‘NR==1 || $4~/:514$/{print $2}‘),当大于228096时就开始丢包,所以还是推荐使用tcp方式

扩展: 如果实在没法切换到tcp协议,可通过这个小工具来实现异步IO的UDP监听数据输入写入Elasticsearch(https://gist.github.com/chenryn/7c922ac424324ee0d695)


插件名称: tcp(https://www.elastic.co/guide/en/logstash/current/plugins-inputs-tcp.html)

input {
    tcp {
        port => "8888"
        mode => "server"
        ssl_enable => false
    }
}
output{
    stdout{
        codec => rubydebug
    }
}

生产实例:

nc 127.0.0.1 8888 < error.log

说明: Logstash也可通过Tcp组件实现简单消息队列,但千万别用于生产环境,生产环境还是换用专业的Logstash broker,比较常用的就是配合nc实现旧数据导入,这种导入旧数据的方式相比file好处在于可以准确的知道何时导入完成.


本文出自 “满满李 - 运维开发之路” 博客,请务必保留此出处http://xmdevops.blog.51cto.com/11144840/1884068

日志监控_ElasticStack-0003.Logstash输入插件及实际生产案例应用?