首页 > 代码库 > JMS异步消息解决分布式应用的EhCache缓存同步问题

JMS异步消息解决分布式应用的EhCache缓存同步问题

上篇博客中讲到了怎样用拦截器给用EJB发布的WebService添加缓存,这样可以提高WebService的响应效率。可是即使是这样做,还是要经历网络的传输的。于是决定在调用WebService的程序本地也添加EJB方法缓存,如果WebService调用的结果已经存在于本地缓存中,就直接从内存中拿数据,不用再访问WebService了。

 

架构图如下所示



但是另一个问题又出现了,那就是WebService中的缓存和客户程序本地缓存的同步问题,这个问题可以具体描述如下:


当提供WebService的程序的数据库中的数据发生改变后(程序执行了增删改方法后),就需要将WebService的缓存清空,因为那些是脏数据。可是调用WebService的客户程序本地的缓存却没有清空。

 

怎样解决这个问题呢?怎样才能清空WebService缓存的同时也清空调用客户端本地的缓存呢?利用JMS的消息机制就可以解决这一问题

 

具体思路

WebService服务端创建一个JMS Topic,起名CacheTopic

当服务端执行增删改方法后,向CacheTopic中发一条消息

客户程序在自己的服务器中部署Message Driven Bean,监听CacheTopic中的消息,收到消息后清空本地缓存

 

 

架构图如下所示



项目中使用的AS都是JBoss,在JBoss中添加JMS Topic的方法是在deploy目录下部署一个Destination描述文件,文件名符合*-service.xml

 

本项目中使用的CacheTopic的部署文件内容如下

<?xml version="1.0" encoding="UTF-8"?>

<server>
   
   <!--使用jboss messaging定义topic-->
   <mbean code="org.jboss.jms.server.destination.TopicService"
      name="jboss.messaging.destination:service=Topic,name=CacheTopic"
      xmbean-dd="xmdesc/Topic-xmbean.xml">
      <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
      <depends>jboss.messaging:service=PostOffice</depends>
	  <attribute name="JNDIName" >topic/CacheTopic</attribute>
   </mbean>     
   
</server>

服务端程序在执行增删改方法后,不仅要清除WebService中的缓存,还要向CacheTopic中发送消息

上篇博客中的拦截器修改如下(主要是添加了发送消息的功能):


public class CacheClearSyncInterceptor {
	@AroundInvoke
	public Object clearCache(InvocationContext context) throws Exception{
		
		//执行目标方法
		Object returnObj =context.proceed();
		
		/**************************清空本地缓存  begin**************************************/
		System.out.println("清空前的缓存数:"+CacheHandler.getInstance().getCache().getSize());
		//清空本地缓存
		CacheHandler.getInstance().clearCache();
		System.out.println("清空后的缓存数:"+CacheHandler.getInstance().getCache().getSize());
		/**************************清空本地缓存  end**************************************/
		
		//发送消息到CacheTopic,实现缓存同步
		StringBuilder txtMsgBuilder = new StringBuilder();
		txtMsgBuilder.append("【gxpt-jc】系统执行了【")
								.append(context.getTarget().getClass().getName())
								.append(".")
								.append(context.getMethod().getName())
								.append("】")
								.append("方法,需要同步缓存");
		MessageSender.send(txtMsgBuilder.toString(), DestinationType.TOPIC,"topic/CacheTopic","192.168.24.48:1199");
		
		return returnObj;
	}

}

上面用到的消息发送者类MessageSender的代码如下

public class MessageSender {
	
	/**
	 * @MethodName	: send
	 * @Description	: 发送消息
	 * @param msg	消息
	 * @param type	目的地类型:TOPIC或QUEUE
	 * @param destinationJndi	目的地的jndi名称
	 * @param url	目的地url
	 */
	public static void send(String msg,DestinationType type,String destinationJndi,String url) throws Exception{
		
		//定义连接对象和session
		TopicConnection topicConnection=null;
		TopicSession topicSession = null;
		QueueConnection queueConnection=null;
		QueueSession queueSession = null;
		
		try {
			//创建context
			Properties props = new Properties();
			props.setProperty("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");
			props.setProperty("java.naming.provider.url", url);
			Context ctx = new InitialContext(props);
			
			
			/************************************发消息给TOPIC  begin******************************************************/
			if(type==DestinationType.TOPIC){
				TopicConnectionFactory topicFactory=(TopicConnectionFactory)ctx.lookup("ConnectionFactory");
				
				//获取Connection
				topicConnection=topicFactory.createTopicConnection();
				
				//获取Session
				topicSession=topicConnection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
				
				//获取destination
				Topic topic=(Topic)ctx.lookup(destinationJndi);
				
				//创建消息发送者
				TopicPublisher publisher=topicSession.createPublisher(topic);
				
				//创建消息
				TextMessage txtMsg = topicSession.createTextMessage(msg);
				//发送消息
				publisher.publish(txtMsg);
				
			}
			/************************************发消息给TOPIC  end******************************************************/
			
			
			/************************************发消息给QUEUE  begin******************************************************/
			if(type==DestinationType.QUEUE){
				QueueConnectionFactory queueFactory=(QueueConnectionFactory)ctx.lookup("ConnectionFactory");
				
				//获取Connection
				queueConnection=queueFactory.createQueueConnection();
				
				//获取Session
				queueSession=queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
				
				//获取destination
				Queue queue=(Queue)ctx.lookup(destinationJndi);
				
				//创建消息发送者
				QueueSender sender=queueSession.createSender(queue);
				
				//创建消息
				TextMessage txtMsg = queueSession.createTextMessage(msg);
				//发送消息
				sender.send(txtMsg);
			}
			/************************************发消息给QUEUE  end******************************************************/
		
		} finally{
			//关闭对象
			if(topicConnection!=null && topicSession!=null){
				topicSession.close();
				topicConnection.close();
			}
			
			if(queueConnection!=null && queueSession!=null){
				queueSession.close();
				queueConnection.close();
			}
		}
		
	}

}

客户端接收消息的MDB的代码如下

@MessageDriven(
		activationConfig={
				@ActivationConfigProperty(propertyName="destinationType",propertyValue=http://www.mamicode.com/"javax.jms.Topic"),>

因为在JBoss5.1.0中部署的MDB默认只能监听本地Destination中的消息,为了让MDB可以监听远程Destination中的消息,客户端仍需再部署一个RemoteJMSProvider描述文件,文件名同样需符合*-service.xml。文件内容如下


<?xml version="1.0" encoding="UTF-8"?> 
<server>
	<mbean code="org.jboss.jms.jndi.JMSProviderLoader" name="jboss.messaging:service=JMSProviderLoader,name=RemoteJMSProvider">
		<attribute name="ProviderName">RemoteJMSProvider</attribute>
		<attribute name="ProviderAdapterClass">
		  org.jboss.jms.jndi.JNDIProviderAdapter
		</attribute>
		<!-- The combined connection factory -->
		<attribute name="FactoryRef">XAConnectionFactory</attribute>
		<!-- The queue connection factory -->
		<attribute name="QueueFactoryRef">XAConnectionFactory</attribute>
		<!-- The topic factory -->
		<attribute name="TopicFactoryRef">XAConnectionFactory</attribute>
		<!-- Uncomment to use HAJNDI to access JMS-->
		<attribute name="Properties">
		   java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
		   java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
		   java.naming.provider.url=192.168.24.48:1199
		</attribute>
	</mbean>

</server> 

这样就实现了分布式应用中的缓存同步