首页 > 代码库 > 使用 log4js UDP 发送数据到 logstash

使用 log4js UDP 发送数据到 logstash

因为 nodejs 一般会部署在多台机器,并且每台机器会起多个进程,因此查看日志时往往要人工区分一个完整的请求包含哪些行。如果在日志中添加 服务器名称和进程id,就比较容易了。

如果在 filebeat 配置中修改正则表达式肯定是可以完成这个工作的,但今天发现 log4js(1.1.1版本) 的模块 logstashUDP 支持通过 UDP 直接发送数据到 logstash,这就更自由了,这样就不用在每台机器上跑 filebeat。

 

这里我们假定已经配置好了 ELK。EKL 配置方法参见本博客文章 http://www.cnblogs.com/jasonxuli/p/6397244.html

 

根据 logstashUDP 的代码,以 log4js.getLogger().info("message", arg1) 为例,第二个参数 arg1 会被作为 kv 结构,与 appender 配置中的 fields 对象合并,然后发送给 logstash。

也就是说,如果 arg1 = {"customField1": "", "customField2": ""},那么最终在 elasticsearch 中,数据大体结构为: {"fields.customField1": "", "fields.customField2": ""}。

 

appender 配置:


var logLayout = {
type:"pattern",
pattern: "%h %x{pid} [%d] [%p] %c > %m",
tokens: {
pid: function () {
return process.pid
}
}
};

appenders : [ { type:
"console", layout : logLayout }, { type : "logstashUDP", // 定义默认的 appender level : "ALL", layout : logLayout, // 自定义 layout host : "192.168.1.123", port : "5050", fields : { // fields 会和 第二个参数 arg1 合并,然后发送给 logstash;并且这个 fields 会作为 config.fields 用于之后每次日志事件 machine : os.hostname(), // 指定机器名 processId: process.pid // 指定线程ID } }
   ], replaceConsole:
true }

 

如果想要动态指定某个字段的值,需要包装一下 logger 的各个方法:

exports.logger = function getLogger (moduleName) {
    var l = log4js.getLogger(moduleName);

    l.trace = wrapper(l.trace);
    l.debug = wrapper(l.debug);
    l.info  = wrapper(l.info);
    l.warn  = wrapper(l.warn);
    l.error = wrapper(l.error);
    l.fatal = wrapper(l.fatal);

    return l;
};

function wrapper(func){
    return function(message){
        var data = http://www.mamicode.com/{log: Array.prototype.join.call(arguments, ‘, ‘)}; // 拼接原始log参数;logger.info("attr1", "attr2"), 那么不会把 "attr2" 当做 data 发给 logstash
        return func.call(this, data.log, data);  // 这里默认传递 data 参数给 logstash,不需要在每次 info() 时手动传递第二个参数
    }
}

 

logstash 的配置, /etc/logstash/conf.d/first-pipeline.conf

input {
    beats {
        port => "5043"
    }
    udp {
        port => "5050"
        codec => json  // 这一行一定要指定
    }
}
filter {
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG}"}
    }
    geoip {
        source => "clientip"
    }
}
output {
    if [fields][category] != "memory" {  // 因为 logstashUDP 模块会将第二个参数合并到 config.fields 中, [][] 的结构用于指定二级字段 fields.category
        elasticsearch {
            hosts => [ "192.168.20.50:9200" ]
            index => "vrslog-%{+YYYY.MM.dd}"
        }
    }
    if [fields][category] == "memory" {  // 为 memory 日志生成单独的 elasticsearch 索引
        elasticsearch {
            hosts => [ "192.168.20.50:9200" ]
            index => "memory-%{+YYYY.MM.dd}"
        }
    }
}

 

使用 log4js UDP 发送数据到 logstash