首页 > 代码库 > [编织消息框架][分层模型设计]会话与节点

[编织消息框架][分层模型设计]会话与节点

技术分享

技术分享
 1 public class QNode implements IRecycle { 2     /** 3      * session会话,记录通信层属性 4      **/ 5     private QSession session; 6     /** 7      * message cb 维护消息回调 8      **/ 9     private QCallbackManager callbackManager;10     /**11      * netty channel12      **/13     private Channel channel;14 15     private InetSocketAddress address;16 }
QNode
技术分享
1 public class QSession implements IRecycle {2 3     public static enum SESSION_KEY {4     ALIAS, OP_TIME, ID,5     }6 7     private long id;8     private Map<String, Object> values = new HashMap<>();9 }
QSession
技术分享
  1 /***  2  * 响应异步消息 针对业务上的逻辑处理 {@link QNode#send(Object, IQCallback)}  3  *   4  * @author solq  5  */  6 public class QCallbackManager implements IRecycle {  7     private final static Logger LOGGER = LoggerFactory.getLogger(QCallbackManager.class);  8   9     private final static ScheduledExecutorService pool = PoolUtil.createScheduledPool(QMConfig.getInstance().POOL_CLEAR_MESSAGE_CORE, "message clear"); 10  11     private Map<Long, IQCallback<?>> messageRecoreds = Collections.synchronizedMap(new HashMap<>()); 12  13     @SuppressWarnings({ "unchecked", "rawtypes" }) 14     private void buildTask(long sn, IQCallback cb) { 15     messageRecoreds.put(sn, cb); 16     Future<?> future = pool.schedule(new Runnable() { 17         private long _sn = sn; 18  19         @Override 20         public void run() { 21         IQCallback<?> _cb = messageRecoreds.remove(_sn); 22         if (_cb == null) { 23             return; 24         } 25         try { 26             _cb.onReceiveError(QCode.MESSAGE_ERROR_TIMEOUT); 27         } finally { 28             _cb.recycle(); 29         } 30  31         } 32     }, QMConfig.getInstance().NETTY_MESSAGE_CALLBACK_CLEAR_INTERVAL, TimeUnit.MILLISECONDS); 33     cb.setFuture(future); 34     } 35  36     public <T> IQCallback<T> doSend(QPacket sendPacket, IQCallback<T> cb) { 37     if (cb == null) { 38         return null; 39     } 40     sendPacket.setStatus(QPacket.MASK_RESPONSE); 41     cb.setSendPacket(sendPacket); 42     buildTask(sendPacket.getSn(), cb); 43     return cb; 44     } 45  46     public void doSendError(QPacket sendPacket, short code) { 47     final long key = sendPacket.getSn(); 48     IQCallback<?> cb = messageRecoreds.remove(key); 49     if (cb == null) { 50         if (LOGGER.isWarnEnabled()) { 51         LOGGER.warn("发送失败 未找到回调 :" + key); 52         } 53         return; 54     } 55     try { 56         cb.onSendError(code); 57     } finally { 58         cb.recycle(); 59     } 60     } 61  62     public void doReceiveSucceed(QPacket rePacket) { 63     final long key = rePacket.getSn(); 64     IQCallback<?> cb = messageRecoreds.remove(key); 65     if (cb == null) { 66         if (LOGGER.isWarnEnabled()) { 67         LOGGER.warn("响应成功 未找到回调 :" + key); 68         } 69         return; 70     } 71     try { 72         short code = rePacket.toCode(); 73         cb.setCode(code); 74         cb.onSucceed(code); 75     } finally { 76         cb.recycle(); 77     } 78     } 79  80     public void doReceiveError(QPacket rePacket) { 81     final long key = rePacket.getSn(); 82     IQCallback<?> cb = messageRecoreds.remove(key); 83     if (cb == null) { 84         if (LOGGER.isWarnEnabled()) { 85         LOGGER.warn("响应失败 未找到回调 :" + key); 86         } 87         return; 88     } 89     try { 90         short code = rePacket.toCode(); 91         cb.setCode(code); 92         cb.onReceiveError(code); 93     } finally { 94         cb.recycle(); 95     } 96     } 97  98     public int getMessageRecoredSize() { 99     return messageRecoreds.size();100     }101 102     @Override103     public void recycle() {104     // 释放所有消息105     messageRecoreds.forEach((sn, cb) -> {106         cb.recycle();107     });108     messageRecoreds.clear();109     }
QCallbackManager
 1 public abstract class IQCallback<T> implements IRecycle, QResult<T> { 2     //响应成功回调 3     abstract public void onSucceed(short code); 4  5     // 默认什么也不用做 6     public void onSendError(short code) { 7     if (LOGGER.isWarnEnabled()) { 8         LOGGER.warn("onSendError : {)", code); 9     }10     this.code = code;11     }12 13     // 默认什么也不用做14     public void onReceiveError(short code) {15     if (LOGGER.isWarnEnabled()) {16         LOGGER.warn("onReceiveError : {)", code);17     }18     this.code = code;19     }20 }

IQCallback 有三种响应消息处理

1.onSucceed 响应返回成功

2.onSendError 发送时失败

3.onReceiveError 响应返回失败

有的业务非常复杂,如果响应失败了可以根据返回码深度处理

[编织消息框架][分层模型设计]会话与节点