首页 > 代码库 > RocketMQ生产者示例程序

RocketMQ生产者示例程序

  转载请注明出处:http://www.cnblogs.com/xiaodf/

  本示例展示了一个RocketMQ producer的简单实现,通过解析文本文件获取输入数据,将数据经过Avro序列化后发送到RocketMQ。

  程序通过stdin.xml配置文件获取主要参数值,stdin.xml文件内容如下:

<?xml version="1.0" encoding="UTF-8"?><operator>	<parameters>		<parameter>			<key>rocketmq.nameserver.list</key>			<value>172.16.8.106:9876</value>		</parameter>		<parameter>			<key>rocketmq.group.id</key>			<value>test006</value>		</parameter>		<parameter>			<key>rocketmq.topic</key>			<value>TopicTest2</value>		</parameter>		<parameter>			<key>rocketmq.tags</key>			<value>*</value>		</parameter>		<parameter>			<key>rocketmq.message.key</key>			<value>OrderID0034</value>		</parameter>		<parameter>			<key>schemaStr</key>			<value>col1:string,col2:double</value>		</parameter>		<parameter>			<key>filePath</key>			<value>/home/test/rocketmq/input.txt</value>		</parameter>	</parameters></operator>

  

生产者示例程序如下:

import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.client.producer.DefaultMQProducer;import com.alibaba.rocketmq.client.producer.SendResult;import com.alibaba.rocketmq.common.message.Message;import com.scistor.datavision.operator.common.AvroUtils;import com.scistor.datavision.operator.common.OperatorConfiguration;import org.apache.avro.Schema;import org.apache.hive.hcatalog.common.HCatException;import org.apache.hive.hcatalog.data.schema.HCatSchema;import java.io.BufferedReader;import java.io.File;import java.io.FileReader;import java.io.IOException;import java.util.ArrayList;import java.util.List;public class RocketProducer {    // parameters    private String nameserver;    private String rocketmqTopic;    private String tags;    private String key;    private String schemaStr;    private String filePath;    public RocketProducer configure(OperatorConfiguration conf) {        this.nameserver = conf.get("rocketmq.nameserver.list");        this.rocketmqTopic = conf.get("rocketmq.topic");        this.tags = conf.get("rocketmq.tags");        this.key = conf.get("rocketmq.message.key");        this.schemaStr = conf.get("schemaStr");        this.filePath = conf.get("filePath");        return this;    }    public int run() {        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");        producer.setNamesrvAddr(nameserver);        producer.setInstanceName("RocketProducer");        /**         * Producer对象在使用之前必须要调用start初始化,初始化一次即可<br>         * 注意:切记不可以在每次发送消息时,都调用start方法         */        try {            producer.start();        } catch (MQClientException e) {            e.printStackTrace();        }        HCatSchema hcatSchema = null;        Schema schema = null;        SchemaUtil schemaUtil = new SchemaUtil();        try {            hcatSchema = schemaUtil.createHCatSchema(schemaStr);            schema = schemaUtil.createSchema("com.scistor.rocketmq.producer", rocketmqTopic, hcatSchema);        } catch (HCatException e) {            e.printStackTrace();        }        List<String> content = RocketProducer.readFileByLines(filePath);        /**         * 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。         * 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,<br>         * 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,<br>         * 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。         */        for (int i = 0; i < content.size(); i++) {            try {                {                    String[] fields = content.get(i).split(",");                    Object[] record = AvroUtils.convert(schema, fields);                    byte[] bytes = AvroUtils.serialize(schema, record);                    Message msg = new Message(rocketmqTopic,// topic                            tags,// tag                            key,// key                            bytes);// body                    SendResult sendResult = producer.send(msg);                    System.out.println(sendResult);                }            } catch (Exception e) {                e.printStackTrace();            }            //TimeUnit.MILLISECONDS.sleep(10);        }        /**         * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己         * 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法         */        producer.shutdown();        return 0;    }    public static List<String> readFileByLines(String fileName) {        List<String> list = new ArrayList<String>();        File file = new File(fileName);        BufferedReader reader = null;        try {            System.out.println("以行为单位读取文件内容,一次读一整行:");            reader = new BufferedReader(new FileReader(file));            String tempString = null;            int line = 1;            // 一次读入一行,直到读入null为文件结束            while ((tempString = reader.readLine()) != null) {                // 显示行号                list.add(tempString);                System.out.println("line " + line + ": " + tempString);                line++;            }            reader.close();        } catch (IOException e) {            e.printStackTrace();        } finally {            if (reader != null) {                try {                    reader.close();                } catch (IOException e1) {                }            }        }        return list;    }    public static void main(String[] args) {        if (args.length < 1) {            System.err.println("需要: 参数配置文件<stdin.xml>所在的hdfs目录");            System.exit(-1);        }        OperatorConfiguration conf = new OperatorConfiguration(args[0]);        RocketProducer trainer = new RocketProducer();        System.exit(trainer.configure(conf).run());    }}

  

程序运行输出打印到控制台:

[root@m106 rocketmq]# ./produce.sh 
以行为单位读取文件内容,一次读一整行:line 1: hdfs:///user/xdf/streaming/file-web/file/1.html,1line 2: hdfs:///user/xdf/streaming/file-web/file/2.html,2line 3: hdfs:///user/xdf/streaming/file-web/file/3.html,3line 4: hdfs:///user/xdf/streaming/file-web/file/4.html,4line 5: hdfs:///user/xdf/streaming/file-web/file,1line 6: /home/xdf/workflow/file-web/file/1.html,1line 7: /home/xdf/workflow/file-web/file/2.html,2line 8: /home/xdf/workflow/file-web/file/3.html,3line 9: /home/xdf/workflow/file-web/file/4.html,4line 10: /home/xdf/workflow/file-web/file,1SendResult [sendStatus=SEND_OK, msgId=AC10086A00002A9F000000001FB00A36, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0], queueOffset=18710]SendResult [sendStatus=SEND_OK, msgId=AC10086A00002A9F000000001FB00AED, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1], queueOffset=18700]SendResult [sendStatus=SEND_OK, msgId=AC10086A00002A9F000000001FB00BA4, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2], queueOffset=18668]SendResult [sendStatus=SEND_OK, msgId=AC10086A00002A9F000000001FB00C5B, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3], queueOffset=18663]SendResult [sendStatus=SEND_OK, msgId=AC10086B00002A9F000000001E197504, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-b, queueId=0], queueOffset=18649]SendResult [sendStatus=SEND_OK, msgId=AC10086B00002A9F000000001E1975B4, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-b, queueId=1], queueOffset=18633]SendResult [sendStatus=SEND_OK, msgId=AC10086B00002A9F000000001E197663, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-b, queueId=2], queueOffset=18629]SendResult [sendStatus=SEND_OK, msgId=AC10086B00002A9F000000001E197712, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-b, queueId=3], queueOffset=18626]SendResult [sendStatus=SEND_OK, msgId=AC10086A00002A9F000000001FB00D12, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0], queueOffset=18711]SendResult [sendStatus=SEND_OK, msgId=AC10086A00002A9F000000001FB00DC1, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1], queueOffset=18701]

  

RocketMQ生产者示例程序