首页 > 代码库 > 创建异步MQ操作
创建异步MQ操作
package com; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; /** * MQ * @author pengbo * */ public class QueueManager { public static final int MAX_QUEUE_SIZE = 200; //创建一个Map缓存 用来存储 BlockingQueued队列 public static Map<QueueType, ArrayBlockingQueue<Object>> queueMap = new ConcurrentHashMap<QueueManager.QueueType, ArrayBlockingQueue<Object>>(); //创建一个endMap缓存 用来存储 读取文件的最后一段信息 用来标记 当前文档结束 public static Map<QueueType, ArrayBlockingQueue<Object>> queueEndMap = new ConcurrentHashMap<QueueManager.QueueType, ArrayBlockingQueue<Object>>(); //创建一个 状态缓存 value 存储的是 读取文件的数量 public static Map<QueueType, Integer> m = new HashMap<QueueType, Integer>(); public static enum QueueType { INFO, //txt堆栈的Key INFOEND, //txt文件 结束堆栈的Key SET, //加入堆栈的状态的Key GET //从堆栈取出信息的状态的Key } /** * 初始化堆栈 将空的Q 和空的状态设置到缓存中 */ public static void init() { ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(MAX_QUEUE_SIZE); ArrayBlockingQueue<Object> queueEnd = new ArrayBlockingQueue<Object>(MAX_QUEUE_SIZE); queueMap.put(QueueType.INFO,queue); queueEndMap.put(QueueType.INFOEND,queueEnd); m.put(QueueType.SET,0); m.put(QueueType.GET,0); } /** * 将信息推送到MQ中 存在缓存中 * @param type Map的Key * @param obj 需要推进去的信息 * @throws InterruptedException */ public static void startPut(QueueType type, Object obj) throws InterruptedException { ArrayBlockingQueue<Object> queue = queueMap.get(type); if (queue != null) queue.put(obj); } /** * 将获取文件最末位的信息推送到 endMQ中 存在缓存中 * @param type Map的Key * @param obj 需要推进去的信息 * @throws InterruptedException */ public static void endPut(QueueType type, Object obj) throws InterruptedException { ArrayBlockingQueue<Object> queue = queueEndMap.get(type); if (queue != null) queue.put(obj); } /** * 从缓存中 取走 Q中首个信息 如果队列是空的 返回 null * @param type * @return */ public static Object startPoll(QueueType type) { ArrayBlockingQueue<Object> queue = queueMap.get(type); Object o = null; if (queue != null) { o = queue.poll(); } return o; } /** * 从缓存中 取走 endQ中首个信息 如果队列是空的 返回 null * @param type * @return */ public static Object endPoll(QueueType type) { ArrayBlockingQueue<Object> queue = queueEndMap.get(type); Object o = null; if (queue != null) { o = queue.poll(); } return o; } /** * 在缓存中 查看Q中首个信息 如果队列是空的 返回 null * @param type * @return */ public static Object startPeek(QueueType type) { ArrayBlockingQueue<Object> queue = queueMap.get(type); Object o = null; if (queue != null) { o = queue.peek(); } return o; } /** * 从缓存中 查看 endQ中首个信息 如果队列是空的 返回 null * @param type * @return */ public static Object endPeek(QueueType type) { ArrayBlockingQueue<Object> queue = queueEndMap.get(type); Object o = null; if (queue != null) { o = queue.peek(); } return o; } /** * 结束标识 true:结束 false 未结束 * @param i 需要操作的文件总数 * @return */ public static boolean over(int i){ boolean tf =false; if (QueueManager.m.get(QueueType.SET) == i && QueueManager.m.get(QueueType.GET) == i && QueueManager.startPeek(QueueType.INFO) == null && QueueManager.endPeek(QueueType.INFOEND) == null) { tf = true; } return tf; } }
创建异步MQ操作
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。