首页 > 代码库 > ofbiz jms activemq

ofbiz jms activemq

最近在研究ofbiz jms,用了activemq玩了一下,ofbiz使用JNDI与其它JMS进行消息接收与发送,总结一下。

准备

ofbiz12.04 版本

activemq 5.5 版本


jar包

在base/lib下引入包

activemq-all-5.5.0.jar

geronimo-j2ee-management_1.1_spec-1.0.1.jar


服务引擎配置

<span style="font-size:14px;"><jms-service name="serviceMessenger" send-mode="all">
            <server jndi-server-name="default"
                    jndi-name="topicConnectionFactory"
                    topic-queue="OFBTopic"
                    type="topic"
                    listen="true"/>
        </jms-service></span>

jndi.properties配置

<span style="font-size:14px;">java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://127.0.0.1:61616
topic.OFBTopic=OFBTopic
connectionFactoryNames=connectionFactory, queueConnectionFactory, topicConnectionFactory</span>

只试一个topic消息模式,如果想用queue队列,配置

<span style="font-size:14px;">queue.OFBQueue= OFBQueue</span>

测试

先启动activemq ,再启动ofbiz

ofbiz启动之后看到:

<span style="font-size:14px;">2015-01-15 11:47:56,970 (org.ofbiz.service.jms.JmsListenerFactory@1b1ce9a) [ JmsListenerFactory.java:89 :INFO ] JMS Listener Factory Thread Finished; All listeners connected.</span>
如果报监听失败,自己查查什么问题。

之后再看activemq控制台

技术分享


消息发送和接收

咱们是用ofbiz给出的测试例子:

services_test.xml

<span style="font-size:14px;"><service name="testJMSTopic" engine="jms" location="serviceMessenger" invoke="testScv">
        <description>Test JMS Topic service</description>
        <attribute name="message" type="String" mode="IN"/>
    </service></span>

运行此服务,看下效果:

<span style="font-size:14px;">2015-01-15 12:09:10,754 (ActiveMQ Session Task-1) [       ModelService.java:469:INFO ] Set default value [999.9999] for parameter [defaultValue]
2015-01-15 12:09:10,755 (ActiveMQ Session Task-1) [     ServiceEcaRule.java:137:INFO ] For Service ECA [testScv] on [invoke] got false for condition: [message][equals][auto][true][String]
---- SVC-CONTEXT: message => 测试TOPIC消息
---- SVC-CONTEXT: locale => zh_CN
---- SVC-CONTEXT: timeZone => sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=28800000,dstSavings=0,useDaylight=false,transitions=19,lastRule=null]
---- SVC-CONTEXT: userLogin => [GenericEntity:UserLogin][createdStamp,2014-07-17 20:36:06.0(java.sql.Timestamp)][createdTxStamp,2014-07-17 20:36:06.0(java.sql.Timestamp)][currentPassword,{SHA}47ca69ebb4bdc9ae0adec130880165d2cc05db1a(java.lang.String)][enabled,Y(java.lang.String)][hasLoggedOut,N(java.lang.String)][lastTimeZone,Asia/Macao(java.lang.String)][lastUpdatedStamp,2014-11-26 16:12:57.0(java.sql.Timestamp)][lastUpdatedTxStamp,2014-11-26 16:12:57.0(java.sql.Timestamp)][partyId,admin(java.lang.String)][successiveFailedLogins,0(java.lang.Long)][userLoginId,admin(java.lang.String)]
---- SVC-CONTEXT: defaultValue =http://www.mamicode.com/> 999.9999>看下activemq控制台

技术分享


入列消息一条,接收消息一条。

说明一下为什么会这样

我们运行服务发送了一条广播消息,这就是为什么Messages Enqueued有一条数据,那么为什么Messages Dequeued也有一条消息,因为ofbiz是服务端同时也是客户端,我们设置了监听不是。。


ofbiz作为服务端

我们只要看一下JMS的服务引擎就明白了,不多解释


ofbiz作为客户端

这块是怎么玩呢?

首先我们在服务引擎配置文件设置了监听为true,

其次,我们看下在加载服务引擎之后,在根据服务引擎配置文件加载JmsListenerFactory.java去加载JMS的监听。

最后,贴出来一段代码

JmsTopicListener.java

<span style="font-size:14px;"> public synchronized void load() throws GenericServiceException {
        try {
            InitialContext jndi = JNDIContextFactory.getInitialContext(jndiServer);
            TopicConnectionFactory factory = (TopicConnectionFactory) jndi.lookup(jndiName);

            if (factory != null) {
                con = factory.createTopicConnection(userName, password);
                con.setExceptionListener(this);
                session = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
                topic = (Topic) jndi.lookup(topicName);
                if (topic != null) {
                    TopicSubscriber subscriber = session.createSubscriber(topic);
                    subscriber.setMessageListener(this);
                    con.start();
                    this.setConnected(true);
                    if (Debug.infoOn()) Debug.logInfo("Listening to topic [" + topicName + "] on [" + jndiServer + "]...", module);
                } else {
                    throw new GenericServiceException("Topic lookup failed.");
                }
            } else {
                throw new GenericServiceException("Factory (broker) lookup failed.");
            }
        } catch (NamingException ne) {
            throw new GenericServiceException("JNDI lookup problems; listener not running.", ne);
        } catch (JMSException je) {
            throw new GenericServiceException("JMS internal error; listener not running.", je);
        } catch (GeneralException ge) {
            throw new GenericServiceException("Problems with InitialContext; listener not running.", ge);
        }
    }</span>


当时,还有一个疑问,那有监听之后ofbiz做了什么事情去处理监听到消息事件之后的事情?

同样,贴出一段代码,大家就都明白了:

AbstractJmsListener.java

<span style="font-size:14px;">/**
     * Receives the MapMessage and processes the service.
     * @see javax.jms.MessageListener#onMessage(Message)
     */
    public void onMessage(Message message) {
        MapMessage mapMessage = null;

        if (Debug.verboseOn()) Debug.logVerbose("JMS Message Received --> " + message, module);

        if (message instanceof MapMessage) {
            mapMessage = (MapMessage) message;
        } else {
            Debug.logError("Received message is not a MapMessage!", module);
            return;
        }
        runService(mapMessage);
    }</span>

<span style="font-size:14px;">/**
     * Runs the service defined in the MapMessage
     * @param message
     * @return Map
     */
    protected Map<String, Object> runService(MapMessage message) {
        Map<String, ? extends Object> context = null;
        String serviceName = null;
        String xmlContext = null;

        try {
            serviceName = message.getString("serviceName");
            xmlContext = message.getString("serviceContext");
            if (serviceName == null || xmlContext == null) {
                Debug.logError("Message received is not an OFB service message. Ignored!", module);
                return null;
            }

            Object o = XmlSerializer.deserialize(xmlContext, dispatcher.getDelegator());

            if (Debug.verboseOn()) Debug.logVerbose("De-Serialized Context --> " + o, module);
            if (ObjectType.instanceOf(o, "java.util.Map"))
                context = UtilGenerics.checkMap(o);
        } catch (JMSException je) {
            Debug.logError(je, "Problems reading message.", module);
        } catch (Exception e) {
            Debug.logError(e, "Problems deserializing the service context.", module);
        }

        try {
            ModelService model = dispatcher.getDispatchContext().getModelService(serviceName);
            if (!model.export) {
                Debug.logWarning("Attempt to invoke a non-exported service: " + serviceName, module);
                return null;
            }
        } catch (GenericServiceException e) {
            Debug.logError(e, "Unable to get ModelService for service : " + serviceName, module);
        }

        if (Debug.verboseOn()) Debug.logVerbose("Running service: " + serviceName, module);

        Map<String, Object> result = null;
        if (context != null) {
            try {
                result = dispatcher.runSync(serviceName, context);
            } catch (GenericServiceException gse) {
                Debug.logError(gse, "Problems with service invocation.", module);
            }
        }
        return result;
    }</span>


注意

可以同时配置队列和广播两种模式消息,配置同上并参考ACTIVE MQ官网对JNDI支持


参考

ofbiz  jms:https://cwiki.apache.org/confluence/display/OFBIZ/Distributed+Entity+Cache+Clear+(DCC)+Mechanism

activemq jndi:http://activemq.apache.org/jndi-support.html



ofbiz jms activemq