首页 > 代码库 > motan源码分析十一:部分特性

motan源码分析十一:部分特性

本章将描述motan部分的特性并对源码进行分析。

1.requestid的维护,使用了当前时间左移20位,再和一个自增变量组合

public class RequestIdGenerator {    protected static final AtomicLong offset = new AtomicLong(0);    protected static final int BITS = 20;    protected static final long MAX_COUNT_PER_MILLIS = 1 << BITS;    /**     * 获取 requestId     *      * @return     */    public static long getRequestId() {        long currentTime = System.currentTimeMillis();        long count = offset.incrementAndGet();        while(count >= MAX_COUNT_PER_MILLIS){            synchronized (RequestIdGenerator.class){                if(offset.get() >= MAX_COUNT_PER_MILLIS){                    offset.set(0);                }            }            count = offset.incrementAndGet();        }        return (currentTime << BITS) + count;    }    public static long getRequestIdFromClient() {        // TODO 上下文 requestid        return 0;    }}

2.限流,motan支持简单的限流,是利用filter来实现的

@SpiMeta(name = "active")@Activation(sequence = 1)public class ActiveLimitFilter implements Filter {    @Override    public Response filter(Caller<?> caller, Request request) {        int maxAcvitivyCount = caller.getUrl().getIntParameter(URLParamType.actives.getName(), URLParamType.actives.getIntValue());        if (maxAcvitivyCount > 0) {            int activeCount = RpcStats.getServiceStat(caller.getUrl()).getActiveCount();            if (activeCount >= maxAcvitivyCount) {                throw new MotanServiceException(String.format("Request(%s) active count exceed the limit (%s), referer:%s", request,                        maxAcvitivyCount, caller.getUrl()), MotanErrorMsgConstant.SERVICE_REJECT);            }        }        long startTime = System.currentTimeMillis();        RpcStats.beforeCall(caller.getUrl(), request);        try {            Response rs = caller.call(request);            RpcStats.afterCall(caller.getUrl(), request, true, System.currentTimeMillis() - startTime);            return rs;        } catch (RuntimeException re) {            RpcStats.afterCall(caller.getUrl(), request, false, System.currentTimeMillis() - startTime);            throw re;        }    }}

3.对于连续失败的client进行不可用操作

    void incrErrorCount() {        long count = errorCount.incrementAndGet();        // 如果节点是可用状态,同时当前连续失败的次数超过限制maxClientConnection次,那么把该节点标示为不可用        if (count >= maxClientConnection && state.isAliveState()) {            synchronized (this) {                count = errorCount.longValue();                if (count >= maxClientConnection && state.isAliveState()) {                    LoggerUtil.error("NettyClient unavailable Error: url=" + url.getIdentity() + " "                            + url.getServerPortStr());                    state = ChannelState.UNALIVE;                }            }        }    }    void resetErrorCount() {        errorCount.set(0);        if (state.isAliveState()) {            return;        }        synchronized (this) {            if (state.isAliveState()) {                return;            }            // 如果节点是unalive才进行设置,而如果是 close 或者 uninit,那么直接忽略            if (state.isUnAliveState()) {                long count = errorCount.longValue();                // 过程中有其他并发更新errorCount的,因此这里需要进行一次判断                if (count < maxClientConnection) {                    state = ChannelState.ALIVE;                    LoggerUtil.info("NettyClient recover available: url=" + url.getIdentity() + " "                            + url.getServerPortStr());                }            }        }    }

4.支持多注册中心,因此cluster的refer集合是所有注册中心包含服务器的集合,如果同一个服务器在不同的注册中心注册,则cluster中当作两个服务器

5.服务端的采用boss线程池+工作线程池+业务线程池的处理方式

	private final static ChannelFactory channelFactory = new NioServerSocketChannelFactory(//boss线程池和工作线程池,主要负责接收消息			Executors.newCachedThreadPool(new DefaultThreadFactory("nettyServerBoss", true)),			Executors.newCachedThreadPool(new DefaultThreadFactory("nettyServerWorker", true)));        private StandardThreadExecutor standardThreadExecutor = null;//业务线程池,负责具体的业务处理        standardThreadExecutor = (standardThreadExecutor != null && !standardThreadExecutor.isShutdown()) ? standardThreadExecutor				: new StandardThreadExecutor(minWorkerThread, maxWorkerThread, workerQueueSize,						new DefaultThreadFactory("NettyServer-" + url.getServerPortStr(), true));		standardThreadExecutor.prestartAllCoreThreads();       final NettyChannelHandler handler = new NettyChannelHandler(NettyServer.this, messageHandler,				standardThreadExecutor);//handler使用业务线程池今天处理具体的业务

  

motan源码分析十一:部分特性