首页 > 代码库 > 使用 Apache MINA2 实现 Web 系统的消息中间件

使用 Apache MINA2 实现 Web 系统的消息中间件

本文将介绍如何使用 Apache MINA2(以下简称 MINA2)解决复杂 Web 系统内各子系统之间同步消息中间件的问题。MINA2 为开发高性能和高可用性的网络应用程序提供了非常便利的框架。从本文中可以了解 MINA2 的基本原理和主要功能,此外在本文中您还可以看到 MINA2 实现消息中间件的服务端和客户端程序的详细内容。

苏 梦, 软件工程师, IBM

尹 文清, Java 开发工程师, 百度在线

2011 年 8 月 25 日

  • +内容

项目背景介绍

系统发展遇到的瓶颈问题

目前主流网站都是由开源软件构建的。使用 Nginx 做为 Web 服务器,Tomcat/Resin 做 App 容器,Memcached 做通用 Cache,MySQL 做数据库,使用 Linux 操作系统。网站系统刚上线初期,用户数并不多,所有的模块都整合一个系统中,所有业务由一个应用提供,此时采取将全部的逻辑都放在一个应用的方式利于系统的维护和管理。但是,随着网站用户的不断增加,系统的访问压力越来越大,为了满足越来越多用户的需求,原有的系统需要增加新的功能进来,随着系统功能模块的增多,系统就会变得越来越难以维护和扩展,同时系统伸缩性和可用性也会受到影响。例如一个网站初期只有用户服务功能,随着网站发展,可能会需要用户信息中心、充值支付中心、商户服务中心等越来越多的子系统,如果把这些子系统都整合在原有的系统中,整个网站将会变得非常复杂,并且难以维护。另外,由于所有子系统都整合在一起,只要有一个模块出问题,那么所有的功能都会受影响,造成非常严重的后果。所以系统发展遇到的瓶颈就是随着系统的发展,如果所有模块都整合在一起,系统的可伸缩性和扩展性将受到影响。

如何解决系统发展遇到的瓶颈问题

遇到以上瓶颈该如何解决呢?明智的办法就是系统拆分,将系统根据一定的标准,比如业务相关性等拆分为不同的子系统, 不同的子系统负责不同的业务功。拆分完成后,每个子系统单独进行扩展和维护,不会影响其他子系统,从而大大提高整个网站系统的扩展性和可维护性,同时系统的水平伸缩性也大大提升了。对于压力比较大的子系统可以再进行扩展而不影响其他子系统,如果某个子系统出现问题也不会影响其他服务。从而增强了整个网站系统的健壮性,更有利于保障核心业务。因此一个大型的互联网应用,肯定是要经过系统拆分的,因为只有进行拆分,系统的扩展性、维护性、伸缩性、可用性才会变得更好。但是拆分也会给系统带来问题,就是子系统之间如何通信。本文介绍 MINA2 就是用来充当消息中间件解决各子系统之间的通信问题

 

MINA2 的原理及主要功能

MINA2 简介

MINA2 是一个网络通信应用框架,它主要用于基于 TCP/IP、UDP/IP 协议栈的通信框架,也可以提供 Java 对象的序列化服务、虚拟机管道通信服务等。MINA2 可以帮助我们快速开发高性能、高扩展性的网络通信应用。MINA2 提供了事件驱动、异步(MINA2 的异步 IO 默认使用的是 Java NIO 作为底层支持)操作的编程模型。

MINA2 同时提供了网络通信的 Server 端、Client 端的封装,无论是哪端,MINA2 在整个网络通信结构中都处于如下的位置:

图 1.MINA2 在网络通信中的作用图
图 1.MINA2 在网络通信中的作用图

可见 MINA2 的 API 将真正的网络通信与我们的应用程序隔离开来,你只需要关心你要发送、接收的数据以及你的业务逻辑即可。同样的,无论是哪端,MINA2 的执行流程如下所示:

图 2.MINA2 执行流程图
图 2.MINA2 执行流程图
 

MINA2 通信原理

异步 IO 模型

异步 I/O 模型大体上可以分为两种,反应式 (Reactive) 模型和前摄式 (Proactive) 模型:

1. 传统的 select / epoll / kqueue 模型,以及 Java NIO 模型,都是典型的反应式模型,即应用代码对 I/O 描述符进行注册,然后等待 I/O 事件。当某个或某些 I/O 描述符所对应的 I/O 设备上产生 I/O 事件(可读、可写、异常等)时,系统将发出通知,于是应用便有机会进行 I/O 操作并避免阻塞。由于在反应式模型中应用代码需要根据相应的事件类型采取不同的动作,最常见的结构便是嵌套的 if {...} else {...} 或 switch ,并常常需要结合状态机来完成复杂的逻辑。

2. 前摄式模型则不同。在前摄式模型中,应用代码主动地投递异步操作而不管 I/O 设备当前是否可读或可写。投递的异步 I/O 操作被系统接管,应用代码也并不阻塞在该操作上,而是指定一个回调函数并继续自己的应用逻辑。当该异步操作完成时,系统将发起通知并调用应用代码指定的回调函数。在前摄式模型中,程序逻辑由各个回调函数串联起来:异步操作 A 的回调发起异步操作 B ,B 的回调再发起异步操作 C ,以此往复。 MINA2 便是一个前摄式的异步 I/O 框架。

MINA2 基本概念

I/O服务:I/O 服务用来执行实际的 I/O 操作。MINA2 已经提供了一系列支持不同协议的 I/O 服务,如 TCP/IP、UDP/IP、串口和虚拟机内部的管道等。开发人员也可以实现自己的 I/O 服务。由于 I/O 服务执行的是输入和输出两种操作,实际上有两种具体的子类型。一种称为“I/O 接受器(I/O acceptor)”,用来接受连接,一般用在服务器的实现中;另外一种称为“I/O 连接器(I/O connector)”,用来发起连接,一般用在客户端的实现中。对应在 MINA2 中的实现,org.apache.mina.core.service.IoService 是 I/O 服务的接口,而继承自它的接口 org.apache.mina.core.service.IoAcceptor 和 org.apache.mina.core.service.IoConnector 则分别表示 I/O 接受器和 I/O 连接器。

I/O 接受器I/O 接受器用来接受连接,与对等体(客户端)进行通讯,并发出相应的 I/O 事件交给 I/O 处理器来处理。使用 I/O 接受器的时候,只需要调用 bind 方法并指定要监听的套接字地址。当不再接受连接的时候,调用 unbind 停止监听即可。

I/O 连接器I/O 连接器用来发起连接,与对等体(服务器)进行通讯,并发出相应的 I/O 事件交给 I/O 处理器来处理。使用 I/O 连接器的时候,只需要调用 connect 方法连接指定的套接字地址。另外可以通过 setConnectTimeoutMillis 设置连接超时时间(毫秒数)。

I/O 会话I/O 会话表示一个活动的网络连接,与所使用的传输方式无关。I/O 会话可以用来存储用户自定义的与应用相关的属性。这些属性通常用来保存应用的状态信息,还可以用来在 I/O 过滤器和 I/O 处理器之间交换数据。I/O 会话在作用上类似于 Servlet 规范中的 HTTP 会话。

I/O过滤器:I/O 服务能够传输的是字节流,而上层应用需要的是特定的对象与数据结构。I/O 过滤器用来完成这两者之间的转换。I/O 过滤器的另外一个重要作用是对输入输出的数据进行处理,满足横切的需求。多个 I/O 过滤器串联起来,形成 I/O 过滤器链。每个过滤器都可对通过的数据进行任意的操作,包括增加、删除、更新、类型转换等。先装上的过滤器更靠近远程端点 ( 客户端),后装上的更靠近本地端点 ( 服务器)。

I/O处理器I/O 事件通过过滤器链之后会到达 I/O 处理器。I/O 处理器中与 I/O 事件对应的方法会被调用。MINA2 中 org.apache.mina.core.service.IoHandler 是 I/O 处理器要实现的接口,一般情况下,只需要继承自 org.apache.mina.core.service.IoHandlerAdapter 并覆写所需方法即可。

MINA2 就是用来充当消息中间件解决各子系统之间通信的问题。在每个子系统增加 MINA2 的客户端和服务端,负责接收和发送 Mina 消息,调用其他子系统的业务功能和数据。

 

MINA2 实现消息中间件

MINA2 在系统功能拆分中的作用

基于 MINA2 消息中间件的系统架构如下所示

图 3. 系统架构
图 3. 系统架构

以某业务运营平台拆分后的系统架构为例,整个平台包含三个子系统:业务运营子系统,负责提供用户服务;用户社区子系统是类似于 SNS 用户交互平台;用户子系统是用户账户、个人信息的子系统,是整个平台的公共基础系统。整个平台的最顶层是 web 服务器层,包含数台 Nginx 服务器(根据业务流量确定)。web 服务器层负责把不同的请求分发到对应的子系统,平台服务相关请求分发到业务运营子系统,用户社区动态信息相关请求分发到用户社区子系统,用户个人账户相关相关请求分发到用户子系统。三个子系统的 APP 服务器都是由一定数量的 Tomcat 服务器组成,一般情况下,运行 Spring+Struts2+Hibernate 程序的 tomcat 服务器能够支持 130-150 个并发请求。APP 服务器把大量数据缓存在 Memcache 服务器。另外通过 DB-Proxy 实现主从数据库分离和集群。每个子系统都有一个消息中间件层,即 MINA 服务器,通过 DB-Proxy 与本子系统的数据库进行交互,消息中间件层都包括 MINA 客户端和 MINA 服务端用于接收和发送 MINA 消息。

图 4. 某业务运营平台拆分后的系统架构图
图 4. 某业务运营平台拆分后的系统架构图

MINA2 实现服务器端程序开发

  1. 建立 IOListener 类继承 IOHandlerAdapter。IoHandlerAdapter 类实现了 IoHandler 接口要求的方法,但是都没有任何业务逻辑处理。如果要编写 Handler 时,可以扩展 IoHandlerAdapter,重写需要的事件方法即可。一般情况,我们比较关心接收到请求消息这个事件,那么我们就可以覆盖 messageReceived 方法,不用管其他方法。 程序清单如下:
    清单 1. 创建 IOListener 类
     import java.lang.reflect.Method; 
     import java.util.Map; 
    
     import org.apache.commons.logging.Log; 
     import org.apache.commons.logging.LogFactory; 
     import org.apache.mina.core.service.IoHandlerAdapter; 
     import org.apache.mina.core.session.IdleStatus; 
     import org.apache.mina.core.session.IoSession; 
     import org.springframework.beans.factory.annotation.Autowired; 
    
     public class IOListener extends IoHandlerAdapter { 
    
       private final static Log logger = LogFactory.getLog(MainProtocolHandler.class); 
    
     @Autowired 
     private AService aService; 
     @Autowired 
     private BService bService; 
    ……
     }
  2. 重写 messageReceived 方法。messageReceived 由 IoHandler 接口声明。IoHandler 封装了来自客户端不同事件的处理,如果对某个事件感兴趣,可以实现相应的方法,当该事件发生时,IoHandler 中的方法就会被触发执行。

    当接收到新的消息的时候,该方法就会被调用。此处的逻辑是如果传入 invoke_class 是 aService、,则通过反射机制调用 aService 的 invoke_method 方法,并把结果通过 session.write 发送回去。如果消息参数中包含"return_method"值 , 则直接把 service 返回结果回写给 session,由 session 通知客户端调用"return_method"对应的方法。程序清单如下:

    清单 2 消息处理方法
     public void messageReceived(IoSession session, Object message) { 
     try { 
    			 /** 
    			 * invote_class 决定调用实例对象,
    			 * invoke_method 是要调用实例对象的方法
    			 * return_method 是执行完成后的回调方法
    			 */ 
     Map<String, String> map = (Map<String, String>) message; 
     if (!map.isEmpty() && map.containsKey("invoke_class") 
    					 && map.containsKey("invoke_method")) { 
     // 根据 message 中的 invoke_class 值调用对应 service 
            if (map.get("invoke_class").equals("aService")) { 
     if (!map.containsKey("return_method")) { 
     // 如果 message 中包含 return_method 键值,则调用 aService 中 return_method 键对应的方法
    		 Method method = aService.getClass().getMethod( 
    		     map.get("invoke_method"), 
    		    new Class[] { Map.class }); 
    	    method.invoke(aService, new Object[] { map }); 
    		session.write("done"); 
    			 } else { 
    		 // 通过 Java 的反射机制调用阿 Service 的方法,并把结果回写给 session 
    		 Method method = aService.getClass().getMethod( 
    				 map.get("invoke_method"), 
    				 new Class[] { Map.class }); 
    			    session.write(method.invoke(aService, 
    				 new Object[] { map })); 
    					 } 
    				 } 
    			……
    			 } else { 
    				 session.write("parameter error"); 
    			 } 
    		 } catch (Exception ex) { 
    			 logger.error(ex.getMessage()); 
    		 } 
    		 return; 
    	 }
  3. MINA2 与 Spring 集成的配置文件,其中 mainHandler 是处理器。

    声明 IO 过滤器包括:

    • executorFilter:MINA  可以通过 ExecutorFilter  将 IO 线程与业务处理线程分开。
    • textCodecFilter:ProtocolCodecFilter  用来在字节流和消息对象之间互相转换。
    • loggingFilter:记录所有 MINA 的协议事件。由于该过滤器只是实现了 MINA 事件的简单记 录,实际作用不大,可配合 log4j 等日志框架一起使用。
    • filterChainBuilder:用来构建过滤器链。
    清单 3 Spring 配置 MINA2
     <bean id="mainHandler" class="com.xxx.ProcessHandler"></bean> 
     <!-- the IoFilters --> 
     <! — - 配置 executorFilter --> 
     <bean id="executorFilter" class="org.apache.mina.filter.executor.ExecutorFilter"> 
       <constructor-arg index="0"> 
       <value>1000</value> 
    		   </constructor-arg> 
       <constructor-arg index="1"> 
    			   <value>1800</value> 
    		   </constructor-arg> 
     </bean> 
     <! — - 配置 textCodecFilter --> 
     <bean id="textCodecFilter" 
     class="org.apache.mina.filter.codec.ProtocolCodecFilter"> 
        <constructor-arg> 
          <bean class="org.apache.mina.filter.codec.textline.TextLineCodecFactory" /> 
        </constructor-arg> 
     </bean> 
     <! — - 配置 codecFilter --> 
     <bean id="codecFilter" class="org.apache.mina.filter.codec.ProtocolCodecFilter"> 
    		 <constructor-arg> 
     <bean  
     class=" org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory"/> 
    		 </constructor-arg> 
    	 </bean> 
     <bean id="loggingFilter" class="org.apache.mina.filter.logging.LoggingFilter" /> 
     <! —声明过滤器链 --> 
     <bean id="filterChainBuilder"
     class="org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder"> 
        <property name="filters"> 
    			 <map> 
          <entry key="codecFilter" value-ref="codecFilter" /> 
          <entry key="executor" value-ref="executorFilter" /> 
          <entry key="loggingFilter" value-ref="loggingFilter" /> 
    			 </map> 
        </property> 
     </bean> 
     <!-- 设置 I/O 接受器,并指定接收到请求后交给 mainHandler 进行处理 --> 
     <bean class="org.springframework.beans.factory.config.CustomEditorConfigurer" > 
        <property name="customEditors" > 
        <map> 
            <entry key="Java.net.SocketAddress" > 
               <bean class="org.apache.mina.integration.beans.InetSocketAddressEditor"
                      /> 
    		</entry> 
    	</map> 
    	</property> 
     </bean> 
     <bean id="ioAcceptor" 
        class="org.apache.mina.transport.socket.nio.NioSocketAcceptor" 
     init-method="bind" destroy-method="unbind" > 
     <property name="defaultLocalAddress" value="http://www.mamicode.com/:1234" /> 
     <property name="handler" ref="mainHandler" /> 
     <property name="reuseAddress" value="http://www.mamicode.com/true" /> 
     <property name="filterChainBuilder" ref="filterChainBuilder" /> 
     </bean>

MINA2 实现客户端程序

建立 ProcessHandler 类继承 IOHandlerAdapter。无论客户端还是服务端,都需要创建继承自 IOHandlerAdapter 的 hanlder 类。清单如下:

清单 4 建立 ProcessHandler 类
 import java.lang.reflect.Method; 
 import java.net.InetSocketAddress; 
 import java.util.Map; 
 import java.util.Random; 

 import org.apache.commons.logging.Log; 
 import org.apache.commons.logging.LogFactory; 
 import org.apache.mina.core.future.ConnectFuture; 
 import org.apache.mina.core.service.IoHandlerAdapter; 
 import org.apache.mina.core.session.IdleStatus; 
 import org.apache.mina.core.session.IoSession; 
 import org.apache.mina.filter.codec.ProtocolCodecFilter; 
 import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory; 
 import org.apache.mina.filter.logging.LoggingFilter; 
 import org.apache.mina.transport.socket.nio.NioSocketConnector; 
 public class ProcessHandler extends IoHandlerAdapter { 
	 private String hostName ; 
 //MINA2 服务器 IP 数组
 private static final String[] HOSTS = {"xx.xx.xx.xx","xx.xx.xx.xx"}; 
 private static final int CONNECT_TIMEOUT = 1000; 
 private NioSocketConnector connector; 
 private static final int PORT = 1234; 
 private IoSession session; 
 // 构造方法
 public ProcessHandler (Map<String, String> map) { 
		 this.map = map; 
 this.hostName = this.selectServer(); 
	 } 
 // 随机选择 MINA2 服务器 IP,以实现 MINA2 集群
 private String selectServer() { 
		 try { 
			 int cc = HOSTS.length; 
			 if (cc <= 0) 
				 return null; 
			 Random rd = new Random(); 
			 int idx = (Math.abs(rd.nextInt()) % cc); 
			 return HOSTS[idx]; 
		 } catch (Exception e) { 
			 e.printStackTrace(); 
			 return null; 
		 } 
	 } 
……
 }

由于消息中间件需要处理较高并发的请求,所以一般使用 MINA2 服务器集群。在程序清 1 中,定义了一个 String 数组 HOSTS 保存所有集群的 MINA2 服务器 IP,在初始化 ProcessHandler 对象实例的时候通过 selectServer()方法为本次连接选择一个 MINA2 服务器 IP。

与 Server 端 Handler 类不同的是,需要开发者重写相应的连接方法,已建立与 Server 端的连接。为满足需要实现了三种连接方式:

  1. Client 不等待返回结果的连接方式,适合传递数据进行更新插入的请求,并返回连接建立后创建的会话,方便在同一个 Action 方法里重用。代码中的 IoSession connect() 就是这种连接方式。
    清单 5.connect 方法
     public IoSession connect() { 
     if (session != null && session.isConnected()) { 
     throw new IllegalStateException( 
        "Already connected. Disconnect first."); 
    		 } 
     try { 
     connector = new NioSocketConnector(); 
     connector.setConnectTimeoutMillis(CONNECT_TIMEOUT ); 
     connector.getFilterChain().addLast( "codec", 
       new ProtocolCodecFilter( 
       new ObjectSerializationCodecFactory())); 
     connector.getFilterChain().addLast("logger", new LoggingFilter()); 
     connector.setHandler(this); 
     future = connector.connect(new InetSocketAddress(hostName, PORT )); 
    			 future.awaitUninterruptibly(); 
     if (!future.isConnected()) { 
     return null; 
    			 } 
     session = future.getSession(); 
     session.write(map); 
     } catch (Exception ex) { 
     throw new IllegalStateException("session is already closed"); 
    		 } 
     return session; 
    	 }
  2. 创建连接并等待返回结果后关闭连接和会话。适合从 Server 端请求对象的操作。connectWithClose(boolean waitingFlag) 方法连接根据 waitinFlag 是否为真判断是否等待返回结果,并关闭连接。
    清单 6 connectWithClosure 方法
        public boolean connectWithClosure(IoSession session, 
     Map<String, String> map, boolean waitingFlag) { 
     if (session != null && session.isConnected()) { 
     try { 
    	 session.write(map); 
     if (waitingFlag) { 
        session.getCloseFuture().awaitUninterruptibly(); 
    				 } 
     if (connector != null) { 
        connector.dispose(); 
    				 } 
     return true; 
     } catch (Exception ex) { 
     throw new IllegalStateException("session is already closed"); 
    			 } 
    		 } 
     return false; 
    	 }
  3. 续用已创建的会话,并等待返回结果后关闭连接和会话。适合从 Server 端请求对象的操作。connectWithClose(IoSession session,Map<String, String> map, boolean waitingFlag) 方法复用 IoSession,根据 waitingFlag 是否为真判断是否等待返回结果,并关闭连接。
    清单 7 connectWithClosure 重写方法
        public boolean connectWithClosure(IoSession session, 
     Map<String, String> map, boolean waitingFlag) { 
     if (session != null && session.isConnected()) { 
     try { 
       session.write(map); 
     if (waitingFlag) { 
       session.getCloseFuture().awaitUninterruptibly(); 
    				 } 
     if (connector != null) { 
       connector.dispose(); 
    				 } 
     return true; 
     } catch (Exception ex) { 
     throw new IllegalStateException("session is already closed"); 
    			 } 
    		 } 
     return false; 
    	 }

重写请求结果回调方法,在请求返回结果后,通过反射调用对应方法。

清单 8.messageReceived 方法
 @Override 
 public void messageReceived(IoSession session, Object message) { 
 try { 
 if (!map.isEmpty() 
 if (aService!=null){ 
 Method method = aService.getClass().getMethod( 
 map.get("return_method"), 
 new Class[] { Object.class }); 
 method.invoke(aService, new Object[] { message }); 
				 } 
 session.close(true); 
			 } 
 } catch (Exception ex) { 
 logger.error("exception: " + ex.getMessage()); 
 return; 
		 } 
	 }

MINA2 调用方法如下:

清单 9 MINA2 调用方法
 Map<String,String> map = new HashMap<String,String>(); 
 map.put("invoke_class", "userFeedService"); 
 map.put("invoke_class","findUserFeed"); 
 map.put("invoke_class","setFeedList"); 
 map.put("userName", userName); 
 ProcessHandler  handler = new ProcessHandler (map,this); 
 handler.connectWithClose(true);

以上代码的含义是:创建 ProcessHandler 实例,调 MINA2 服务器的 userFeedService 的 findUserFeed 方法, 并在接收到处理结果后直接调用本对象的 setFeedList 把结果赋给 feedList。

 

总结与展望

本文首先提出了大型 Web 系统在发展过程中遇到的瓶颈—系统扩展性和维护性越来越困难,只有系统拆分才能突破这个瓶颈,而 MINA2 正是作为消息中间件解决系统拆分后的同步通信问题。本文接着介绍了 MINA2 的通信原理和核心组件。本文提供了基于 MINA2 实现同步通信的客户端、服务端程序,方便读者掌握 MINA2 开发消息中间件。