首页 > 代码库 > [编织消息框架][分层模型设计]会话与节点
[编织消息框架][分层模型设计]会话与节点
1 public class QNode implements IRecycle { 2 /** 3 * session会话,记录通信层属性 4 **/ 5 private QSession session; 6 /** 7 * message cb 维护消息回调 8 **/ 9 private QCallbackManager callbackManager;10 /**11 * netty channel12 **/13 private Channel channel;14 15 private InetSocketAddress address;16 }
1 public class QSession implements IRecycle {2 3 public static enum SESSION_KEY {4 ALIAS, OP_TIME, ID,5 }6 7 private long id;8 private Map<String, Object> values = new HashMap<>();9 }
1 /*** 2 * 响应异步消息 针对业务上的逻辑处理 {@link QNode#send(Object, IQCallback)} 3 * 4 * @author solq 5 */ 6 public class QCallbackManager implements IRecycle { 7 private final static Logger LOGGER = LoggerFactory.getLogger(QCallbackManager.class); 8 9 private final static ScheduledExecutorService pool = PoolUtil.createScheduledPool(QMConfig.getInstance().POOL_CLEAR_MESSAGE_CORE, "message clear"); 10 11 private Map<Long, IQCallback<?>> messageRecoreds = Collections.synchronizedMap(new HashMap<>()); 12 13 @SuppressWarnings({ "unchecked", "rawtypes" }) 14 private void buildTask(long sn, IQCallback cb) { 15 messageRecoreds.put(sn, cb); 16 Future<?> future = pool.schedule(new Runnable() { 17 private long _sn = sn; 18 19 @Override 20 public void run() { 21 IQCallback<?> _cb = messageRecoreds.remove(_sn); 22 if (_cb == null) { 23 return; 24 } 25 try { 26 _cb.onReceiveError(QCode.MESSAGE_ERROR_TIMEOUT); 27 } finally { 28 _cb.recycle(); 29 } 30 31 } 32 }, QMConfig.getInstance().NETTY_MESSAGE_CALLBACK_CLEAR_INTERVAL, TimeUnit.MILLISECONDS); 33 cb.setFuture(future); 34 } 35 36 public <T> IQCallback<T> doSend(QPacket sendPacket, IQCallback<T> cb) { 37 if (cb == null) { 38 return null; 39 } 40 sendPacket.setStatus(QPacket.MASK_RESPONSE); 41 cb.setSendPacket(sendPacket); 42 buildTask(sendPacket.getSn(), cb); 43 return cb; 44 } 45 46 public void doSendError(QPacket sendPacket, short code) { 47 final long key = sendPacket.getSn(); 48 IQCallback<?> cb = messageRecoreds.remove(key); 49 if (cb == null) { 50 if (LOGGER.isWarnEnabled()) { 51 LOGGER.warn("发送失败 未找到回调 :" + key); 52 } 53 return; 54 } 55 try { 56 cb.onSendError(code); 57 } finally { 58 cb.recycle(); 59 } 60 } 61 62 public void doReceiveSucceed(QPacket rePacket) { 63 final long key = rePacket.getSn(); 64 IQCallback<?> cb = messageRecoreds.remove(key); 65 if (cb == null) { 66 if (LOGGER.isWarnEnabled()) { 67 LOGGER.warn("响应成功 未找到回调 :" + key); 68 } 69 return; 70 } 71 try { 72 short code = rePacket.toCode(); 73 cb.setCode(code); 74 cb.onSucceed(code); 75 } finally { 76 cb.recycle(); 77 } 78 } 79 80 public void doReceiveError(QPacket rePacket) { 81 final long key = rePacket.getSn(); 82 IQCallback<?> cb = messageRecoreds.remove(key); 83 if (cb == null) { 84 if (LOGGER.isWarnEnabled()) { 85 LOGGER.warn("响应失败 未找到回调 :" + key); 86 } 87 return; 88 } 89 try { 90 short code = rePacket.toCode(); 91 cb.setCode(code); 92 cb.onReceiveError(code); 93 } finally { 94 cb.recycle(); 95 } 96 } 97 98 public int getMessageRecoredSize() { 99 return messageRecoreds.size();100 }101 102 @Override103 public void recycle() {104 // 释放所有消息105 messageRecoreds.forEach((sn, cb) -> {106 cb.recycle();107 });108 messageRecoreds.clear();109 }
1 public abstract class IQCallback<T> implements IRecycle, QResult<T> { 2 //响应成功回调 3 abstract public void onSucceed(short code); 4 5 // 默认什么也不用做 6 public void onSendError(short code) { 7 if (LOGGER.isWarnEnabled()) { 8 LOGGER.warn("onSendError : {)", code); 9 }10 this.code = code;11 }12 13 // 默认什么也不用做14 public void onReceiveError(short code) {15 if (LOGGER.isWarnEnabled()) {16 LOGGER.warn("onReceiveError : {)", code);17 }18 this.code = code;19 }20 }
IQCallback 有三种响应消息处理
1.onSucceed 响应返回成功
2.onSendError 发送时失败
3.onReceiveError 响应返回失败
有的业务非常复杂,如果响应失败了可以根据返回码深度处理
[编织消息框架][分层模型设计]会话与节点
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。