首页 > 代码库 > 项目积累——Blockingqueue,ConcurrentLinkedQueue,Executors

项目积累——Blockingqueue,ConcurrentLinkedQueue,Executors

背景

通过做下面一个小的接口系统gate,了解一下minajava并发包里的东西。A系统为javaweb项目,BC语言项目,gate是本篇需要完成的系统。


需求

1. A为集群系统,并发较高,会批量发送给gate消息,并且接受gate返回的消息;

2. gate独立部署,将从A接受到的消息压入队列,与B建立连接后,将每条消息验证签名等工作后,发送给B,需要保证性能;

3. B负责处理消息,并返回处理结果,Bgate提供提供六个端口,一个端口可有三个长连接(须由gate发送心跳保持长连接,否则超时切断连接)。

实例

项目中用到了两个框架mina2.0.7axis2。首先,gate需要接收从A发送过来的消息,为保证消息顺序性,压入队列中,为保证性能,将队列中的消息通过不同的连接发送至B,这让我们很快就想到了多线程中生产者消费者的那张图,并且这是一个生产者,多个消费者,下面我们来看代码。

首先,gate作为服务端,要为A提供一个接口,使用axis2完成了,关于webservice就不必多说,可看我前面的博客,配置如下:

<serviceGroup>
<service name="sendService" scope="application">
    <description>
        SendService
    </description>
    <messageReceivers>
        <messageReceiver mep="http://www.w3.org/2004/08/wsdl/in-only" class="org.apache.axis2.rpc.receivers.RPCInOnlyMessageReceiver"/>
        <messageReceiver mep="http://www.w3.org/2004/08/wsdl/in-out" class="org.apache.axis2.rpc.receivers.RPCMessageReceiver"/>
    </messageReceivers>
    <parameter name="ServiceClass">
        cn.net.easyway.customer.SendService
    </parameter>
</service>
</serviceGroup>

下面是服务实现类:

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import cn.net.easyway.nds.MsgConsumer;
import cn.net.easyway.nds.MsgProducer;

/**
 * 为用户管理系统提供服务接口
 * @author yuanfubiao
 *
 */
public class SendService {
	
	private static Log logger = LogFactory.getLog(SendService.class);
	
	private static int num = 0;
	//消息队列
	private static LinkedBlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>();
	//生产者线程池
	private static ExecutorService executorProducer = Executors.newFixedThreadPool(20); //创建20个线程,应对并发较高的情况
	//消费者线程池
	private static ExecutorService executorCustomer = Executors.newFixedThreadPool(18); //和连接数对应
	
	/**
	 * 放入消息
	 * @param list 消息列表
	 */
	public void putMsg(List<String> list){
		
		//将消息放入队列
		executorProducer.execute(new MsgProducer(msgQueue,list));
		
		//取出消息:数据量大,启用所有线程
		if(list.size() > 18){
			for(int i=0;i<18;i++){
				executorCustomer.execute(new MsgConsumer(msgQueue));
			}
		}else{
			executorCustomer.execute(new MsgConsumer(msgQueue));
		}
	}
}

Java并发包为我们提供了很多实用的多线程东西,因此没有必要自己去实现一个队列和线程池,如上面代码我们用到的队列是LinkedBlockingQueue,他为线程安全的阻塞队列,多线程操作时不必为了同步而操心,并且会将进出两边自动负载,他实现自BlockingQueue接口。

jdk中可以看到实现BlockingQueue接口的还有ArrayBlockingQueue,DelayQueue, LinkedBlockingDeque,LinkedBlockingQueue,LinkedTransferQueue,PriorityBlockingQueue,SynchronousQueue;此接口就是提供一个阻塞队列,从api中我们看到如下一张图:


Throwsexception:当队列已满,再次添加会抛出错误,取数据也是如此;

Specialvalue:添加或取出时会有一个返回值;

Blocks:是在队列已满或为空时,会一直阻塞;

Time Out:指阻塞到一定时间,线程退出;

其中,还有一个并发队列也是作为生产者消费者的首选:ConcurrentLinkedQueue,它是非阻塞队列,肯定就不是出自Blockingqueue接口,而是出自AbstractQueue,因此也就没有put和take方法,使用这个并发队列需要有两点注意:第一,判断是否为空尽量使用isEmpty方法,不要用size(),有人测试过size方法很耗费时间;第二就是线程问题,虽然ConcurrentLinkedQueue是线程安全的,但是只负责原子性的,就是说当你操作queue.add() or queue.poll的时候是安全的,当并发量较大时,你在使用queue.isEmpty时还不为空,但就在这空当有可能就执行poll操作,导致队列为空引起异常,可用如下代码:

synchronized(queue) {
    if(!queue.isEmpty()) {
       queue.poll();
    }
}

gate中,我定义了两个线程池,一个是生产者,另一个是消费者:

//生产者线程池
	private static ExecutorService executorProducer = Executors.newFixedThreadPool(20); //创建20个线程,应对并发较高的情况
	//消费者线程池
	private static ExecutorService executorCustomer = Executors.newFixedThreadPool(18); 

Executors提供了一个工厂方法,用来创建线程池,返回的线程池都实现了ExecutorService接口,可以创建如下线程池:

newCachedThreadPool():创建一个可缓存的线程池,调用execute将重用以前构造的线程,如果现在线程没有可用的,则创建一个新线程添加到池中,终止并从缓存中溢出那些已有60秒未被使用的线程;

newFixedThreadPool(intnThreads):创建固定的线程;

newScheduledThreadPool(intcorePoolSize):创建一个支持定时及周期性的任务执行的线程池;

newSingleThreadExecutor():创建一个单线程的Executor

启动线程,有两个方法,一个是execute(),另一个是submit(),后者是有返回值的,会将执行的结果Future返回,关于Future可移步这里

下面就是生产者和消费者代码:

生产者:

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * 向队列添加消息
 * @author yuanfubiao
 *
 */
public class MsgProducer implements Runnable {

	private LinkedBlockingQueue<String> msgQueue;
	
	private List<String> message;
	
	public MsgProducer(LinkedBlockingQueue<String> queue,List<String> msg) {
		this.msgQueue = queue;
		this.message = msg;
	}

	@Override
	public void run() {
		Iterator<String> iter = message.iterator();
		while(iter.hasNext()){
			String msg = iter.next();
			try {
				msgQueue.put(msg);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

消费者:

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.LinkedBlockingQueue;

import nds.framework.security.NDSMD5;

import org.apache.commons.codec.binary.Hex;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.mina.core.session.IoSession;

/**
 * 从消息队列取出消息
 * @author yuanfubiao
 *
 */
public class MsgConsumer implements Runnable{

	private static Log logger = LogFactory.getLog(MsgConsumer.class);
	private LinkedBlockingQueue<String> msgQueue;
	
	public MsgConsumer(LinkedBlockingQueue<String> queue) {
		this.msgQueue = queue;
	}

	@Override
	public void run() {
		while(!msgQueue.isEmpty()){
			
			String msg = null;
			try {
				msg = msgQueue.take();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			
			if(null == msg){
				return;
			}
			
			//加入时间
			SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmmss");
			String now = format.format(new Date());
			String prefix = msg.substring(0, 19);
			String suffix = msg.substring(33, msg.length());
			String packet = prefix.trim() + now.trim() + suffix.trim();

			//签名
			String signature = null;
			String securityKey = "XXXX";
			try {
				byte binSignature[] = NDSMD5.signPacket(packet.getBytes(), securityKey);
				signature = new String(Hex.encodeHex(binSignature));
			} catch (Exception e) {
				e.printStackTrace();
			}
			String newStr = packet + signature.toUpperCase().trim();
			//关于mina,可见我下篇文章
			IoSession session = SessionPool.getSession(newStr.substring(13, 15));
			logger.info("发送数据:" + newStr);
			session.write(newStr);
			
			try {
				Thread.sleep(1000); //等待一秒
			} catch (InterruptedException e1) {
				e1.printStackTrace();
			}
		}
	}
}