首页 > 代码库 > JMS异步消息解决分布式应用的EhCache缓存同步问题
JMS异步消息解决分布式应用的EhCache缓存同步问题
上篇博客中讲到了怎样用拦截器给用EJB发布的WebService添加缓存,这样可以提高WebService的响应效率。可是即使是这样做,还是要经历网络的传输的。于是决定在调用WebService的程序本地也添加EJB方法缓存,如果WebService调用的结果已经存在于本地缓存中,就直接从内存中拿数据,不用再访问WebService了。
架构图如下所示
但是另一个问题又出现了,那就是WebService中的缓存和客户程序本地缓存的同步问题,这个问题可以具体描述如下:
当提供WebService的程序的数据库中的数据发生改变后(程序执行了增删改方法后),就需要将WebService的缓存清空,因为那些是脏数据。可是调用WebService的客户程序本地的缓存却没有清空。
怎样解决这个问题呢?怎样才能清空WebService缓存的同时也清空调用客户端本地的缓存呢?利用JMS的消息机制就可以解决这一问题
具体思路
在WebService服务端创建一个JMS Topic,起名CacheTopic
当服务端执行增删改方法后,向CacheTopic中发一条消息
客户程序在自己的服务器中部署Message Driven Bean,监听CacheTopic中的消息,收到消息后清空本地缓存
架构图如下所示
项目中使用的AS都是JBoss,在JBoss中添加JMS Topic的方法是在deploy目录下部署一个Destination描述文件,文件名符合*-service.xml。
本项目中使用的CacheTopic的部署文件内容如下
<?xml version="1.0" encoding="UTF-8"?> <server> <!--使用jboss messaging定义topic--> <mbean code="org.jboss.jms.server.destination.TopicService" name="jboss.messaging.destination:service=Topic,name=CacheTopic" xmbean-dd="xmdesc/Topic-xmbean.xml"> <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends> <depends>jboss.messaging:service=PostOffice</depends> <attribute name="JNDIName" >topic/CacheTopic</attribute> </mbean> </server>
服务端程序在执行增删改方法后,不仅要清除WebService中的缓存,还要向CacheTopic中发送消息
上篇博客中的拦截器修改如下(主要是添加了发送消息的功能):
public class CacheClearSyncInterceptor { @AroundInvoke public Object clearCache(InvocationContext context) throws Exception{ //执行目标方法 Object returnObj =context.proceed(); /**************************清空本地缓存 begin**************************************/ System.out.println("清空前的缓存数:"+CacheHandler.getInstance().getCache().getSize()); //清空本地缓存 CacheHandler.getInstance().clearCache(); System.out.println("清空后的缓存数:"+CacheHandler.getInstance().getCache().getSize()); /**************************清空本地缓存 end**************************************/ //发送消息到CacheTopic,实现缓存同步 StringBuilder txtMsgBuilder = new StringBuilder(); txtMsgBuilder.append("【gxpt-jc】系统执行了【") .append(context.getTarget().getClass().getName()) .append(".") .append(context.getMethod().getName()) .append("】") .append("方法,需要同步缓存"); MessageSender.send(txtMsgBuilder.toString(), DestinationType.TOPIC,"topic/CacheTopic","192.168.24.48:1199"); return returnObj; } }
上面用到的消息发送者类MessageSender的代码如下
public class MessageSender { /** * @MethodName : send * @Description : 发送消息 * @param msg 消息 * @param type 目的地类型:TOPIC或QUEUE * @param destinationJndi 目的地的jndi名称 * @param url 目的地url */ public static void send(String msg,DestinationType type,String destinationJndi,String url) throws Exception{ //定义连接对象和session TopicConnection topicConnection=null; TopicSession topicSession = null; QueueConnection queueConnection=null; QueueSession queueSession = null; try { //创建context Properties props = new Properties(); props.setProperty("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory"); props.setProperty("java.naming.provider.url", url); Context ctx = new InitialContext(props); /************************************发消息给TOPIC begin******************************************************/ if(type==DestinationType.TOPIC){ TopicConnectionFactory topicFactory=(TopicConnectionFactory)ctx.lookup("ConnectionFactory"); //获取Connection topicConnection=topicFactory.createTopicConnection(); //获取Session topicSession=topicConnection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); //获取destination Topic topic=(Topic)ctx.lookup(destinationJndi); //创建消息发送者 TopicPublisher publisher=topicSession.createPublisher(topic); //创建消息 TextMessage txtMsg = topicSession.createTextMessage(msg); //发送消息 publisher.publish(txtMsg); } /************************************发消息给TOPIC end******************************************************/ /************************************发消息给QUEUE begin******************************************************/ if(type==DestinationType.QUEUE){ QueueConnectionFactory queueFactory=(QueueConnectionFactory)ctx.lookup("ConnectionFactory"); //获取Connection queueConnection=queueFactory.createQueueConnection(); //获取Session queueSession=queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); //获取destination Queue queue=(Queue)ctx.lookup(destinationJndi); //创建消息发送者 QueueSender sender=queueSession.createSender(queue); //创建消息 TextMessage txtMsg = queueSession.createTextMessage(msg); //发送消息 sender.send(txtMsg); } /************************************发消息给QUEUE end******************************************************/ } finally{ //关闭对象 if(topicConnection!=null && topicSession!=null){ topicSession.close(); topicConnection.close(); } if(queueConnection!=null && queueSession!=null){ queueSession.close(); queueConnection.close(); } } } }
客户端接收消息的MDB的代码如下
@MessageDriven( activationConfig={ @ActivationConfigProperty(propertyName="destinationType",propertyValue=http://www.mamicode.com/"javax.jms.Topic"),>因为在JBoss5.1.0中部署的MDB默认只能监听本地Destination中的消息,为了让MDB可以监听远程Destination中的消息,客户端仍需再部署一个RemoteJMSProvider描述文件,文件名同样需符合*-service.xml。文件内容如下
<?xml version="1.0" encoding="UTF-8"?> <server> <mbean code="org.jboss.jms.jndi.JMSProviderLoader" name="jboss.messaging:service=JMSProviderLoader,name=RemoteJMSProvider"> <attribute name="ProviderName">RemoteJMSProvider</attribute> <attribute name="ProviderAdapterClass"> org.jboss.jms.jndi.JNDIProviderAdapter </attribute> <!-- The combined connection factory --> <attribute name="FactoryRef">XAConnectionFactory</attribute> <!-- The queue connection factory --> <attribute name="QueueFactoryRef">XAConnectionFactory</attribute> <!-- The topic factory --> <attribute name="TopicFactoryRef">XAConnectionFactory</attribute> <!-- Uncomment to use HAJNDI to access JMS--> <attribute name="Properties"> java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces java.naming.provider.url=192.168.24.48:1199 </attribute> </mbean> </server>
这样就实现了分布式应用中的缓存同步