首页 > 代码库 > JMS,java消息服务之mQ(二)

JMS,java消息服务之mQ(二)

结合实地代码的MQ应用:

1、配置

技术分享

spring-context.xml中的配置:

技术分享

spring-jms.xml 配置:

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"	xmlns:mvc="http://www.springframework.org/schema/mvc"	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd		http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd">	<description>spring-mq CHO3.0三网</description>		<!-- JMS连接工厂_X网 -->	<bean id="jmsConnectionFactoryByCHP30" class="org.apache.activemq.pool.PooledConnectionFactory"		destroy-method="stop">		<property name="connectionFactory">			<bean class="org.apache.activemq.ActiveMQConnectionFactory">				<property name="brokerURL" value="http://www.mamicode.com/${jms.brokerURL}" />				<property name="closeTimeout" value="http://www.mamicode.com/${jms.closeTimeout}" />				<property name="userName" value="http://www.mamicode.com/${jms.userName}" />				<property name="password" value="http://www.mamicode.com/${jms.password}" />				<property name="optimizedAckScheduledAckInterval" value="http://www.mamicode.com/${jms.optimizedAckScheduledAckInterval}" />			</bean>		</property>	</bean>	<!-- JMS连接工厂_CRM系统 -->	<bean id="jmsConnectionFactoryByMCRM" class="org.apache.activemq.pool.PooledConnectionFactory"		destroy-method="stop">		<property name="connectionFactory">			<bean class="org.apache.activemq.ActiveMQConnectionFactory">				<property name="brokerURL" value="http://www.mamicode.com/${jms.brokerURL.MCRM}" />				<property name="closeTimeout" value="http://www.mamicode.com/${jms.closeTimeout.MCRM}" />				<property name="userName" value="http://www.mamicode.com/${jms.userName.MCRM}" />				<property name="password" value="http://www.mamicode.com/${jms.password.MCRM}" />				<property name="optimizedAckScheduledAckInterval" value="http://www.mamicode.com/${jms.optimizedAckScheduledAckInterval.MCRM}" />			</bean>		</property>	</bean></beans>

 spring-jms-triple.xml 配置:

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"	xmlns:mvc="http://www.springframework.org/schema/mvc"	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd		http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd">	<description>spring-mq CHO3.0三网通</description>	<!-- ==================== A010 (发送) : 组织机构信息同步 ==================== -->	<!-- 消息定义 :CPO3.0三网通 -->    <bean id="OrgInfo" class="org.apache.activemq.command.ActiveMQQueue">        <constructor-arg value="http://www.mamicode.com/XH.DJR.Queue.A010.OrgInfo,XH.JX.Queue.A010.OrgInfo,XH.CX.Queue.A010.OrgInfo"/>    </bean>    <!-- 消息模板 :CPO3.0三网通 -->	<bean id="jmsTemplate_OrgInfoSync" class="org.springframework.jms.core.JmsTemplate">		<property name="connectionFactory" ref="jmsConnectionFactoryByCPO30" />		<property name="defaultDestination" ref="OrgInfo" />		<property name="pubSubDomain" value="http://www.mamicode.com/true" />		<property name="receiveTimeout" value="http://www.mamicode.com/${jms.receiveTimeout}" />		<property name="explicitQosEnabled" value="http://www.mamicode.com/true"/>		<!-- 发送模式(2:持久) -->		<property name="deliveryMode" value="http://www.mamicode.com/2"/>	</bean>        <!-- ==================== A011 (发送) : 理财经理信息同步 ==================== -->    <!-- 消息定义 :CPO3.0三网通(大金X/X信) -->    <bean id="LeadManagerInfo" class="org.apache.activemq.command.ActiveMQQueue">        <constructor-arg value="http://www.mamicode.com/XH.DJR.Queue.A011.LeadManagerInfo,XH.JX.Queue.A011.LeadManagerInfo"/>    </bean>    <!-- 消息定义 :CPO3.0三网通(创新) -->    <bean id="LeadManagerInfoByCX" class="org.apache.activemq.command.ActiveMQQueue">        <constructor-arg value="http://www.mamicode.com/XH.CX.Queue.A011.LeadManagerInfo"/>    </bean>    <!-- 消息模板 :CPO3.0三网通 -->	<bean id="jmsTemplate_LeadManagerInfoSync" class="org.springframework.jms.core.JmsTemplate">		<property name="connectionFactory" ref="jmsConnectionFactoryByCPO30" />		<property name="defaultDestination" ref="LeadManagerInfo" />		<property name="pubSubDomain" value="http://www.mamicode.com/true" />		<property name="receiveTimeout" value="http://www.mamicode.com/${jms.receiveTimeout}" />		<property name="explicitQosEnabled" value="http://www.mamicode.com/true"/>		<property name="deliveryMode" value="http://www.mamicode.com/2"/>	</bean>	<!-- 消息模板 :CPO3.0三网通(创新) -->	<bean id="jmsTemplate_LeadManagerInfoSyncByCX" class="org.springframework.jms.core.JmsTemplate">		<property name="connectionFactory" ref="jmsConnectionFactoryByCPO30" />		<property name="defaultDestination" ref="LeadManagerInfoByCX" />		<property name="pubSubDomain" value="http://www.mamicode.com/true" />		<property name="receiveTimeout" value="http://www.mamicode.com/${jms.receiveTimeout}" />		<property name="explicitQosEnabled" value="http://www.mamicode.com/true"/>		<property name="deliveryMode" value="http://www.mamicode.com/2"/>	</bean>    	<!-- ==================== A012 (接收) : 新增客户信息 ==================== -->	<!-- 消息定义 :CPO3.0三网通 -->    <bean id="NewCustomer" class="org.apache.activemq.command.ActiveMQQueue">        <constructor-arg value="http://www.mamicode.com/XH.CPO.Queue.A012.NewCustomer"/>    </bean>	<!-- 消息监听模板 :CPO3.0三网通 -->    <bean id="listenerContainer_NewCustomer"        class="org.springframework.jms.listener.DefaultMessageListenerContainer">          <property name="connectionFactory" ref="jmsConnectionFactoryByCPO30" />        <property name="receiveTimeout" value="http://www.mamicode.com/${jms.receiveTimeout}"/>        <property name="destination" ref="NewCustomer" />        <property name="messageListener" ref="triple_NewCustomerListenerService" />    </bean>	<!-- ==================== A013 (接收) : 客户首单 ==================== -->	<!-- 消息定义 :CPO3.0三网通 -->    <bean id="FirstOrder" class="org.apache.activemq.command.ActiveMQQueue">        <constructor-arg value="http://www.mamicode.com/XH.CPO.Queue.A013.FirstOrder"/>    </bean>	<!-- 消息监听模板 :CPO3.0三网通 -->	<bean id="listenerContainer_FirstOrder"        class="org.springframework.jms.listener.DefaultMessageListenerContainer">          <property name="connectionFactory" ref="jmsConnectionFactoryByCPO30" />        <property name="receiveTimeout" value="http://www.mamicode.com/${jms.receiveTimeout}"/>        <property name="destination" ref="FirstOrder" />        <property name="messageListener" ref="triple_FirstOrderListenerService" />    </bean>		<!-- ==================== A014 (发送) : 客户理财经理变更 ==================== -->	<!-- 消息定义 :CPO3.0三网通 -->	<bean id="ChangeCustomerEmp" class="org.apache.activemq.command.ActiveMQQueue">        <constructor-arg value="http://www.mamicode.com/XH.DJR.Queue.A014.ChangeCustomerEmp,XH.JX.Queue.A014.ChangeCustomerEmp,XH.CX.Queue.A014.ChangeCustomerEmp,XH.ZCJ.Queue.A014.ChangeCustomerEmp"/>    </bean>	<!-- 消息定义 :MCRM系统 -->	<bean id="ChangeCustomerEmpByMCRM" class="org.apache.activemq.command.ActiveMQTopic">        <constructor-arg value="http://www.mamicode.com/XH.Topic.A014.ChangeCustomerEmp"/>    </bean>	<!-- 消息模板 :CPO3.0三网通 -->	<bean id="jmsTemplate_ChangeCustomerEmp" class="org.springframework.jms.core.JmsTemplate">		<property name="connectionFactory" ref="jmsConnectionFactoryByCPO30" />		<property name="defaultDestination" ref="ChangeCustomerEmp" />		<property name="pubSubDomain" value="http://www.mamicode.com/true" />		<property name="receiveTimeout" value="http://www.mamicode.com/${jms.receiveTimeout}" />		<property name="explicitQosEnabled" value="http://www.mamicode.com/true"/>		<property name="deliveryMode" value="http://www.mamicode.com/2"/>	</bean>	<!-- 消息模板 :MCRM系统 -->	<bean id="jmsTemplate_ChangeCustomerEmpByMCRM" class="org.springframework.jms.core.JmsTemplate">		<property name="connectionFactory" ref="jmsConnectionFactoryByMCRM" />		<property name="defaultDestination" ref="ChangeCustomerEmpByMCRM" />		<property name="pubSubDomain" value="http://www.mamicode.com/true" />		<property name="receiveTimeout" value="http://www.mamicode.com/${jms.receiveTimeout.MCRM}" />		<property name="explicitQosEnabled" value="http://www.mamicode.com/true"/>		<property name="deliveryMode" value="http://www.mamicode.com/2"/>	</bean>		<!-- ==================== A015 (接收) : 客户手机号变更 ==================== -->	<!-- 消息定义 :CPO3.0三网通 -->	<bean id="ChangePhone" class="org.apache.activemq.command.ActiveMQQueue">        <constructor-arg value="http://www.mamicode.com/XH.CPO.Queue.A015.ChangePhone"/>    </bean>	<!-- 消息模板 :CPO3.0三网通 --> 	<bean id="listenerContainer_ChangePhone"        class="org.springframework.jms.listener.DefaultMessageListenerContainer">          <property name="connectionFactory" ref="jmsConnectionFactoryByCPO30" />        <property name="receiveTimeout" value="http://www.mamicode.com/${jms.receiveTimeout}"/>        <property name="destination" ref="ChangePhone" />        <property name="messageListener" ref="triple_ChangePhoneListenerService" />    </bean>		<!-- ==================== A016 (发送) : 用户角色信息同步 ==================== -->	<!-- 消息定义 :CPO3.0三网通 -->	<bean id="RoleInfoSync" class="org.apache.activemq.command.ActiveMQQueue">        <constructor-arg value="http://www.mamicode.com/XH.CX.Queue.A016.RoleInfo"/>    </bean>	<!-- 消息模板 :CPO3.0三网通 -->	<bean id="jmsTemplate_RoleInfoSync" class="org.springframework.jms.core.JmsTemplate">		<property name="connectionFactory" ref="jmsConnectionFactoryByCPO30" />		<property name="defaultDestination" ref="RoleInfoSync" />		<property name="pubSubDomain" value="http://www.mamicode.com/true" />		<property name="receiveTimeout" value="http://www.mamicode.com/${jms.receiveTimeout}" />		<property name="explicitQosEnabled" value="http://www.mamicode.com/true"/>		<property name="deliveryMode" value="http://www.mamicode.com/2"/>	</bean>	    <!-- ==================== A030 (CPO3.0三网通-发送 /监听) : 消息回执 ==================== -->    <!-- 定义消息(接收) : 消息回馈 -->    <bean id="CallBack" class="org.apache.activemq.command.ActiveMQQueue">        <constructor-arg value="http://www.mamicode.com/XH.CPO.Queue.A030.CallBack"/>    </bean>    	<!-- 定义消息(发送) : 消息回馈(大金X) -->    <bean id="DJR_CallBack" class="org.apache.activemq.command.ActiveMQQueue">        <constructor-arg value="http://www.mamicode.com/XH.DJR.Queue.A030.CallBack"/>    </bean>    <!-- 定义消息(发送) : 消息回馈(资产家) -->    <bean id="ZCJ_CallBack" class="org.apache.activemq.command.ActiveMQQueue">        <constructor-arg value="http://www.mamicode.com/XH.ZCJ.Queue.A030.CallBack"/>    </bean>    <!-- 定义消息(发送) : 消息回馈(X信) -->    <bean id="JX_CallBack" class="org.apache.activemq.command.ActiveMQQueue">        <constructor-arg value="http://www.mamicode.com/XH.JX.Queue.A030.CallBack"/>    </bean>    <!-- 定义消息(发送) : 消息回馈(创新) -->    <bean id="CX_CallBack" class="org.apache.activemq.command.ActiveMQQueue">        <constructor-arg value="http://www.mamicode.com/XH.CX.Queue.A030.CallBack"/>    </bean>	<!-- 消息监听容器(接收) : 回执接收 -->    <bean id="listenerContainer_CallBack"        class="org.springframework.jms.listener.DefaultMessageListenerContainer">          <property name="connectionFactory" ref="jmsConnectionFactoryByCPO30" />        <property name="receiveTimeout" value="http://www.mamicode.com/${jms.receiveTimeout}"/>        <property name="destination" ref="CallBack" />        <property name="messageListener" ref="triple_CallBackListenerService" />    </bean></beans>

 上面的bean id="DJR_CallBack"  被注入到代码中:

    /** 消息目的地:大金X. */
    @Autowired
    @Qualifier("DJR_CallBack")
    private Destination destinationByDJR;

下面代码的例子是接收MQ消息的:

package com.creditharmony.adapter.service.thirdparty;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.jms.listener.SessionAwareMessageListener;import org.springframework.stereotype.Service;import com.alibaba.fastjson.JSON;import com.creditharmony.adapter.constant.ClientPoint;import com.creditharmony.adapter.constant.ClientType;import com.creditharmony.adapter.constant.Constant;import com.creditharmony.adapter.constant.MsgKey;import com.creditharmony.adapter.core.client.ClientServicePoxy;import com.creditharmony.adapter.core.service.IParamRecord;import com.creditharmony.adapter.exception.AdapterException;import com.creditharmony.adapter.model.thirdparty.TripleChangePhoneInfoJsonModel;import com.creditharmony.adapter.service.triple.bean.TripleChangePhoneInBean;import com.creditharmony.adapter.service.triple.bean.TripleChangePhoneOutBean;import com.creditharmony.adapter.utils.AdapterUtils;import com.creditharmony.adapter.utils.Messages;import com.creditharmony.adapter.utils.TripleUtils;import com.creditharmony.common.util.PropertyUtil;/** * 客户手机号变更 接口.<br> * ---消息接收---<br> * 各系统中手机号变更后将手机变更信息(旧手机号、新手机号、理财经理)推送到CPO3.0<br> * 接收方式 : JMS Queue 消息接收方<br> * 发送通道:<br> *      大金X -> CPO3.0<br> *      X信     -> CPO3.0<br> *      CPO -> 创新<br> * */@Servicepublic class Triple_ChangePhoneListenerService implements SessionAwareMessageListener<TextMessage> {        /** log日志 */    private static final Logger logger = LoggerFactory.getLogger(Triple_ChangePhoneListenerService.class);    /** 业务处理名. */    private static String procName = PropertyUtil.getStrValue(MsgKey.LOG_PROP_FILE, MsgKey.TRIPLE_CHANGE_PHONE_TITLE, "三网合一(客户手机号变更)");    /** JMS Header : FromSys. */    private static final String FROM_SYS = "XH_CPO";        /** 参数记录处理. */    @Autowired    protected IParamRecord paramRecord;        /** 消息目的地:大金X. */    @Autowired    @Qualifier("DJR_CallBack")    private Destination destinationByDJR;        /** 消息目的地:X信. */    @Autowired    @Qualifier("JX_CallBack")    private Destination destinationByJX;        /** 消息目的地:创新. */    @Autowired    @Qualifier("CX_CallBack")    private Destination destinationByCX;        /** 消息目的地:资本家. */    @Autowired    @Qualifier("ZCJ_CallBack")    private Destination destinationByZCJ;        /**     * 客户手机号变更 监听接口.     * @param msgParam 监听消息     */    public void onMessage(TextMessage mapMessage, Session session) {        logger.info(Messages.get(MsgKey.INFO_START, new String[] { procName }));        // 唯一编号        String serialNum = AdapterUtils.getSerialNum();        try {                        // 发送方ID            String fromSys = mapMessage.getStringProperty(Constant.TRIPLE_HEADER_FROM_SYS);            // 发送消息ID            String msgTypeID = mapMessage.getStringProperty(Constant.TRIPLE_HEADER_MSG_TYPE_ID);            // 发送消息名            String msgTypeName = mapMessage.getStringProperty(Constant.TRIPLE_HEADER_MSG_TYPE_NAME);            // 消息主体            String jsonMsg = mapMessage.getText();            logger.info(                    Messages.get(                            MsgKey.GENERALSERVICE_INPARAM,                            new String[] { procName, jsonMsg }));            paramRecord.doInParamRecord(                    serialNum,                    null,                    jsonMsg,                    "triple_ChangePhoneService",                    fromSys, msgTypeID, msgTypeName);            TripleChangePhoneInfoJsonModel jsonModel = JSON.parseObject(jsonMsg, TripleChangePhoneInfoJsonModel.class);            ClientServicePoxy service = new ClientServicePoxy(ClientType.Triple_ChangePhone, ClientPoint.CF);            TripleChangePhoneInBean param = new TripleChangePhoneInBean();                        param.setSerialNum(serialNum);            param.setOldPhone(jsonModel.getOldPhone());            param.setNewPhone(jsonModel.getNewPhone());            param.setEmpCode(jsonModel.getEmpCode());            param.setOsType(jsonModel.getOsType());            param.setCardId(jsonModel.getCardId());                        // 调用业务参数日志输出            logger.info(                    Messages.get(                            MsgKey.INFO_POST_MESSAGE_DEC,                            new String[] { procName, param.getParam() }));            paramRecord.doSendContentRecord(                    serialNum,                    param.getParam(),                    Messages.get(MsgKey.MEMO_PSOT_MESSAGE_BODY));                        // 业务调用            TripleChangePhoneOutBean outParam = (TripleChangePhoneOutBean) service.callService(param);                        // 业务参数回执日志输出            logger.info(Messages.get(MsgKey.INFO_RET_MESSAGE, new String[] { procName, outParam.getParam() }));            paramRecord.doReceiveContentRecord(serialNum, outParam.getParam(), null);                        /*             * 消息回馈处理             * 转为Json串             * 该监听部分收到消息后,判定发送方,将消息序列号反馈给发送发             */            String postJson = TripleUtils.getReturnMessage(outParam, jsonModel.getUniqueNum());                        MessageProducer producer = null;            // 根据消息发送源,设定回执方            if ("XH_DJR".equals(fromSys)) {                producer = session.createProducer(destinationByDJR);            } else if ("XH_ZCJ".equals(fromSys)) {                producer = session.createProducer(destinationByZCJ);            } else if ("XH_JX".equals(fromSys)) {                producer = session.createProducer(destinationByJX);            } else {                producer = session.createProducer(destinationByCX);            }            TextMessage message = session.createTextMessage();            message.setStringProperty("FromSys", FROM_SYS);            message.setStringProperty("MsgTypeID", msgTypeID);            message.setStringProperty("MsgTypeName", msgTypeName);            message.setJMSTimestamp(System.currentTimeMillis());            message.setText(postJson);                        // 参数日志记录: 返回参数            paramRecord.doOutParamRecord(serialNum, postJson);            logger.info(Messages.get(MsgKey.GENERALSERVICE_OUTPARAM, new String[] { postJson }));                        producer.send(message);                    } catch (JMSException e) {            // 异常错误记录            AdapterException businessException = new AdapterException(e);            paramRecord.doExceptionRecord(serialNum, businessException);        }                logger.info(Messages.get(MsgKey.INFO_END, new String[] { procName }));    }}

 重点:onMessage 方法;接收的三个参数 fromid,etc。

下面是发送消息的方法:

  /**     * 消息发送(Queue).     * @param jsonParam json字符串     */    public void sendMqMessage(final String jsonParam) {        // 取得消息        Destination destination = jmsTemplate_OrgInfoSync.getDefaultDestination();        // 发送消息        jmsTemplate_OrgInfoSync.send(destination, new MessageCreator() {            public Message createMessage(Session session) throws JMSException {                TextMessage message = session.createTextMessage();                message.setStringProperty("FromSys", FROM_SYS);                message.setStringProperty("MsgTypeID", MSG_TYPE_ID);                message.setStringProperty("MsgTypeName", MSG_TYPE_NAME);                                message.setText(jsonParam);                return message;            }        });    }

 

@Servicepublic class Triple_OrgInfoSyncService extends BaseService implements IBaseService {    /** log日志 */    private static final Logger logger = LoggerFactory.getLogger(Triple_OrgInfoSyncService.class);        /** JMS Header : FromSys. */    private static final String FROM_SYS = "XH_CPO";        /** JMS Header : MsgTypeID. */    private static final String MSG_TYPE_ID = "A010";        /** JMS Header : MsgTypeName. */    private static final String MSG_TYPE_NAME = "OrgInfo";        /** 业务处理名. */    private static String procName = PropertyUtil.getStrValue(MsgKey.LOG_PROP_FILE, MsgKey.TRIPLE_ORG_INFO_SYNC_TITLE, "三网合一(组织机构信息同步)");    /** 消息模板(CPO3.0三网). */    @Autowired    private JmsTemplate jmsTemplate_OrgInfoSync;        /**     * 组织机构信息同步.     * @param paramInfo 组织机构信息     * @return 同步状态     */    public BaseOutInfo doExec(BaseInfo paramInfo) {        logger.info(Messages.get(MsgKey.INFO_START, new String[] { procName }));        TripleOrgInfoSyncOutInfo outParam = new TripleOrgInfoSyncOutInfo();        TripleOrgInfoSyncInfo param = null;        param = (TripleOrgInfoSyncInfo) paramInfo;                try {            TripleOrgInfoSyncInfoJsonModel jsonModel = new TripleOrgInfoSyncInfoJsonModel();                        BeanUtils.copyProperties(param, jsonModel);            jsonModel.setOperation(param.getOperation().getName());            jsonModel.setStatus(param.getStatus().getName());            jsonModel.setLastModifyTime(DateUtils.formatDate(param.getLastModifyTime(), TripleOrgInfoSyncInfo.DATE_PATTERN));            // 将传入参数Bean转为Json串            String postJson = JSON.toJSONString(jsonModel);            logger.info(                    Messages.get(                            MsgKey.INFO_POST_MESSAGE_BODY,                            new String[] { procName, postJson }));            paramRecord.doSendContentRecord(                    paramInfo.getSerialNum(),                    postJson,                    Messages.get(MsgKey.MEMO_PSOT_MESSAGE_BODY));            try {                // 发送                this.sendMqMessage(postJson);            } catch (Exception e) {                throw new AdapterException(ErrorType.ERROR_EXT_NET, e);            }                    } catch (Exception e) {            logger.error(                    Messages.get(                            MsgKey.ERROR_SYSTEM, new String[] { paramInfo.getSerialNum(), procName }));            // 将新产生的例外封装            if (e instanceof AdapterException) {                throw (AdapterException) e;            } else {                throw new AdapterException(e);            }        }        outParam.setRetCode(ReturnConstant.SUCCESS);        logger.info(Messages.get(MsgKey.INFO_END, new String[] { procName }));        return outParam;    }        /**     * 消息发送(Queue).     * @param jsonParam json字符串     */    public void sendMqMessage(final String jsonParam) {        // 取得消息        Destination destination = jmsTemplate_OrgInfoSync.getDefaultDestination();        // 发送消息        jmsTemplate_OrgInfoSync.send(destination, new MessageCreator() {            public Message createMessage(Session session) throws JMSException {                TextMessage message = session.createTextMessage();                message.setStringProperty("FromSys", FROM_SYS);                message.setStringProperty("MsgTypeID", MSG_TYPE_ID);                message.setStringProperty("MsgTypeName", MSG_TYPE_NAME);                                message.setText(jsonParam);                return message;            }        });    }}

 比较简单。

 

JMS,java消息服务之mQ(二)