首页 > 代码库 > flume与Mosquitto的集成

flume与Mosquitto的集成

文章来自:http://www.cnblogs.com/hark0623/p/4173714.html  转发请注明

 

 

因业务需求,需要flume收集MQTT(Mosquitto)的数据。  方法就是flume自定义source,source中来订阅(subscribe)MQTT

 

flume source的java代码如下:

package com.yhx.sensor.flume.source;import java.util.HashMap;import java.util.Map;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.EventDrivenSource;import org.apache.flume.conf.Configurable;import org.apache.flume.event.EventBuilder;import org.apache.flume.source.AbstractSource;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttCallback;import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.eclipse.paho.client.mqttv3.MqttTopic;public class MQTTSource extends AbstractSource implements EventDrivenSource,        Configurable {    /**     * The initialization method for the Source. The context contains all the     * Flume configuration info, and can be used to retrieve any configuration     * values necessary to set up the Source.     */    @Override    public void configure(Context arg0) {        // TODO Auto-generated method stub    }    SimpleMqttClient client = null;    /**     * Start any dependent systems and begin processing events.     */    @Override    public void start() {        // TODO Auto-generated method stub        // super.start();        client = new SimpleMqttClient();        client.runClient();    }    /**     * Stop processing events and shut any dependent systems down.     */    @Override    public void stop() {        // TODO Auto-generated method stub        // super.stop();        if (client != null) {            client.closeConn();        }    }    // public static void main(String[] args) {    // SimpleMqttClient smc = new SimpleMqttClient();    // smc.runClient();    // }    public class SimpleMqttClient implements MqttCallback {        MqttClient myClient;        MqttConnectOptions connOpt;        String BROKER_URL = "tcp://192.168.116.128:1883";        String M2MIO_DOMAIN = "192.168.116.128";        String M2MIO_STUFF = "yhx";        String M2MIO_THING = "yhx_flume";        // String M2MIO_USERNAME = "<m2m.io username>";        // String M2MIO_PASSWORD_MD5 =        // "<m2m.io password (MD5 sum of password)>";        Boolean subscriber = true;        Boolean publisher = false;        /**         *          * connectionLost This callback is invoked upon losing the MQTT         * connection.         *          */        @Override        public void connectionLost(Throwable t) {            System.out.println("Connection lost!");            // code to reconnect to the broker would go here if desired        }        public void closeConn() {            if (myClient != null) {                if (myClient.isConnected()) {                    try {                        myClient.disconnect();                    } catch (MqttException e) {                        // TODO Auto-generated catch block                        e.printStackTrace();                    }                }            }        }        /**         *          * deliveryComplete This callback is invoked when a message published by         * this client is successfully received by the broker.         *          */        @Override        public void deliveryComplete(IMqttDeliveryToken token) {            // System.out.println("Pub complete" + new            // String(token.getMessage().getPayload()));        }        /**         *          * messageArrived This callback is invoked when a message is received on         * a subscribed topic.         *          */        @Override        public void messageArrived(String topic, MqttMessage message)                throws Exception {            // System.out            // .println("-------------------------------------------------");            // // System.out.println("| Topic:" + topic.getName());            // System.out.println("| Topic:" + topic);            // System.out            // .println("| Message: " + new String(message.getPayload()));            // System.out            // .println("-------------------------------------------------");            Map<String, String> headers = new HashMap<String, String>();            //headers.put("curDate", df.format(new Date()));            Event flumeEvent = EventBuilder.withBody(message.getPayload(),                    headers);            try {                getChannelProcessor().processEvent(flumeEvent);            } catch (Exception e) {                // TODO: handle exception                e.printStackTrace();            }        }        /**         *          * runClient The main functionality of this simple example. Create a         * MQTT client, connect to broker, pub/sub, disconnect.         *          */        public void runClient() {            // setup MQTT Client            String clientID = M2MIO_THING;            connOpt = new MqttConnectOptions();            connOpt.setCleanSession(true);            connOpt.setKeepAliveInterval(3000);            // connOpt.setUserName(M2MIO_USERNAME);            // connOpt.setPassword(M2MIO_PASSWORD_MD5.toCharArray());            // Connect to Broker            try {                myClient = new MqttClient(BROKER_URL, clientID);                myClient.setCallback(this);                myClient.connect(connOpt);            } catch (MqttException e) {                e.printStackTrace();                System.exit(-1);            }            System.out.println("Connected to " + BROKER_URL);            // setup topic            // topics on m2m.io are in the form <domain>/<stuff>/<thing>            String myTopic = M2MIO_DOMAIN + "/" + M2MIO_STUFF + "/"                    + M2MIO_THING;            System.out.println("myTopic:" + myTopic);            MqttTopic topic = myClient.getTopic(myTopic);            // subscribe to topic if subscriber            if (subscriber) {                try {                    int subQoS = 0;                    myClient.subscribe(myTopic, subQoS);                } catch (Exception e) {                    e.printStackTrace();                }            }            // publish messages if publisher            if (publisher) {                for (int i = 1; i <= 10; i++) {                    String pubMsg = "{\"pubmsg\":" + i + "}";                    int pubQoS = 0;                    MqttMessage message = new MqttMessage(pubMsg.getBytes());                    message.setQos(pubQoS);                    message.setRetained(false);                    // Publish the message                    System.out.println("Publishing to topic \"" + topic                            + "\" qos " + pubQoS);                    MqttDeliveryToken token = null;                    try {                        // publish message to broker                        token = topic.publish(message);                        // Wait until the message has been delivered to the                        // broker                        token.waitForCompletion();                        Thread.sleep(100);                    } catch (Exception e) {                        e.printStackTrace();                    }                }            }            // disconnect            try {                // wait to ensure subscribed messages are delivered                if (subscriber) {                    while (true) {                        Thread.sleep(5000);                    }                }                // myClient.disconnect();            } catch (Exception e) {                e.printStackTrace();            } finally {            }        }    }}

打JAR包注意要把Class-Path写上,如下:

Manifest-Version: 1.0Class-Path: flume-ng-configuration-1.5.2.jar flume-ng-core-1.5.2.jar flume-ng-node-1.5.2.jar flume-ng-sdk-1.5.2.jar org.eclipse.paho.client.mqttv3-1.0.0.jar

 

将打好的JAR包放到flume的lib目录(注意,class-path说明的jar包在lib一定要有。 如果没有,则放上去)

 

接着修改一下flume的配置文件,如下(主要是sourceMqtt ,看这个。  因为我这块同时还监听了UDP):

a1.sources = sourceMqtt sourceUdpa1.sinks = sinkMqtt sinkUdpa1.channels = channelMqtt channelUdp# Describe/configure the sourcea1.sources.sourceMqtt.type = com.yhx.sensor.flume.source.MQTTSource# Describe the sinka1.sinks.sinkMqtt.type = logger# Use a channel which buffers events in memorya1.channels.channelMqtt.type = memorya1.channels.channelMqtt.capacity = 1000a1.channels.channelMqtt.transactionCapacity = 100# Bind the source and sink to the channela1.sources.sourceMqtt.channels = channelMqtta1.sinks.sinkMqtt.channel = channelMqtt# a2.sources = sourceUdp# a2.sinks = sinkUdp# a2.channels = channelUdp# Describe/configure the sourcea1.sources.sourceUdp.type = syslogudpa1.sources.sourceUdp.host = 0.0.0.0a1.sources.sourceUdp.port = 12459a1.sources.sourceUdp.interceptors=interceptorUdpa1.sources.sourceUdp.interceptors.interceptorUdp.type=com.yhx.sensor.flume.intercepter.UDPIntercepter$Builder# Describe the sinka1.sinks.sinkUdp.type = logger# Use a channel which buffers events in memorya1.channels.channelUdp.type = memorya1.channels.channelUdp.capacity = 1000a1.channels.channelUdp.transactionCapacity = 100# Bind the source and sink to the channela1.sources.sourceUdp.channels = channelUdpa1.sinks.sinkUdp.channel = channelUdp

 

配置文件保存至flume目录下的conf,叫flume.conf

然后flume启动命令如下 

bin/flume-ng agent --conf conf --conf-file conf/flume.conf --name a1

 

flume与Mosquitto的集成