首页 > 代码库 > 创建异步MQ操作

创建异步MQ操作

package com;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;

/**
 * MQ 
 * @author pengbo
 *
 */
public class QueueManager {
	public static final int MAX_QUEUE_SIZE = 200;
	//创建一个Map缓存 用来存储  BlockingQueued队列 
	public static Map<QueueType, ArrayBlockingQueue<Object>> queueMap = new ConcurrentHashMap<QueueManager.QueueType, ArrayBlockingQueue<Object>>();
	//创建一个endMap缓存 用来存储    读取文件的最后一段信息  用来标记 当前文档结束 
	public static Map<QueueType, ArrayBlockingQueue<Object>> queueEndMap = new ConcurrentHashMap<QueueManager.QueueType, ArrayBlockingQueue<Object>>();
	//创建一个 状态缓存 value 存储的是 读取文件的数量
	public static Map<QueueType, Integer> m = new HashMap<QueueType, Integer>();
	
	public static enum QueueType {
		INFO, 		//txt堆栈的Key
		INFOEND,		//txt文件  结束堆栈的Key
		SET,		//加入堆栈的状态的Key
		GET			//从堆栈取出信息的状态的Key
	}

	/**
	 * 初始化堆栈 将空的Q 和空的状态设置到缓存中
	 */
	public static void init() {
			ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(MAX_QUEUE_SIZE);
			ArrayBlockingQueue<Object> queueEnd = new ArrayBlockingQueue<Object>(MAX_QUEUE_SIZE);
			queueMap.put(QueueType.INFO,queue);
			queueEndMap.put(QueueType.INFOEND,queueEnd);
			m.put(QueueType.SET,0);
			m.put(QueueType.GET,0);
		}
	/**
	 * 将信息推送到MQ中 存在缓存中
	 * @param type Map的Key 
	 * @param obj	需要推进去的信息
	 * @throws InterruptedException
	 */
	public static void startPut(QueueType type, Object obj) throws InterruptedException {
		ArrayBlockingQueue<Object> queue = queueMap.get(type);
			if (queue != null) 
				queue.put(obj);
	}
	/**
	 * 将获取文件最末位的信息推送到 endMQ中 存在缓存中
	 * @param type Map的Key 
	 * @param obj	需要推进去的信息
	 * @throws InterruptedException
	 */
	public static void endPut(QueueType type, Object obj) throws InterruptedException {
		ArrayBlockingQueue<Object> queue = queueEndMap.get(type);
			if (queue != null) 
				queue.put(obj);
	}
	/**
	 * 从缓存中  取走 Q中首个信息 如果队列是空的 返回 null 
	 * @param type
	 * @return
	 */
	public static Object startPoll(QueueType type) {
		ArrayBlockingQueue<Object> queue = queueMap.get(type);
		Object o = null;
			if (queue != null) {
				o = queue.poll();
			}
		return o;
	}
	/**
	 * 从缓存中  取走 endQ中首个信息 如果队列是空的 返回 null 
	 * @param type
	 * @return
	 */
	public static Object endPoll(QueueType type) {
		ArrayBlockingQueue<Object> queue = queueEndMap.get(type);
		Object o = null;
			if (queue != null) {
				o = queue.poll();
			}
		return o;
	}
	/**
	 * 在缓存中  查看Q中首个信息   如果队列是空的 返回 null 
	 * @param type
	 * @return
	 */
	public static Object startPeek(QueueType type) {
		ArrayBlockingQueue<Object> queue = queueMap.get(type);
		Object o = null;
			if (queue != null) {
				o = queue.peek();
			}
		return o;
	}
	
	/**
	 * 从缓存中  查看 endQ中首个信息    如果队列是空的 返回 null 
	 * @param type
	 * @return
	 */
	public static Object endPeek(QueueType type) {
		ArrayBlockingQueue<Object> queue = queueEndMap.get(type);
		Object o = null;
			if (queue != null) {
				o = queue.peek();
			}
		return o;
	}
	
	/**
	 * 结束标识  true:结束   false 未结束
	 * @param i 需要操作的文件总数
	 * @return
	 */
	public static boolean over(int i){
		boolean tf =false;
		if (QueueManager.m.get(QueueType.SET) == i
				&& QueueManager.m.get(QueueType.GET) == i
				&& QueueManager.startPeek(QueueType.INFO) == null
				&& QueueManager.endPeek(QueueType.INFOEND) == null) {
			tf = true;
		}
		return tf;
	}
}


创建异步MQ操作