首页 > 代码库 > Dubbo点滴(4)之暴露服务解析

Dubbo点滴(4)之暴露服务解析

Dubbo点滴(1) SPI入门 :了解下底层DUBBO底层扩展保证

Dubbo点滴(2)之集群容错:简单分析了DUBBO对高可用的保证手段

Dubbo点滴(3)之服务配置ServiceConfig:了解DUBBO相关元数据与对象

本节内容主要了解下DUBBO服务端的暴露服务过程。

技术分享

吐下槽,这个Exporter命名个人感觉不好。

Export,在DUBBO中既代表暴露过程,这儿是动作,对应export方法。

同时,Exporter接口,却代表着暴露的信息,比如URL。 总之,在研究过程中,对自己的理解代理干扰。

1.暴露服务时序图

首先分享下DUBBO官网的暴露服务时序图

技术分享

吐下槽,虽然该图来自官网,但有些信息和实际代码有些出入。

不管怎样,从整体上来看,可以帮助我们了解整体过程。个人也花了些时间,对该图细化下。两图可以互相参照着看。下图,是个人花时间整理的。

技术分享


该图,比官网提供的图描述更细,当然也只是把主体调用过程进行了罗列。个人按习惯分为三部分。

section 1: 生成代理,构造Invoker对象;

section 2: Protocol调用链层,实现功能动态扩展

section 3: 负责socket端口绑定,借助底层netty,mina框架实现具体网络交互工作。

接下来,从section 2开发剖析。

2.Protocol协议

Protocol是将Invoker暴露的入口。

2.1 层次结构图

技术分享


2.2 Protocol API

Protocol被@SPI注解,这说明该接口是动态适配的。

@SPI("dubbo")
public interface Protocol {
    
    /**
     * 获取缺省端口,当用户没有配置端口时使用。
     * 
     * @return 缺省端口
     */
    int getDefaultPort();

    /**
     * 暴露远程服务:<br>
     */
    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

    /**
     * 引用远程服务:<br>
     */
    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

    /**
     * 释放协议:<br>
     */
    void destroy();

}
filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper
registry=com.alibaba.dubbo.registry.integration.RegistryProtocol

injvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol
rmi=com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol
dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol
hessian=com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocol
redis=com.alibaba.dubbo.rpc.protocol.redis.RedisProtocol
# com.alibaba.dubbo.rpc.protocol.http.HttpProtocol??
thrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocol
# com.alibaba.dubbo.rpc.protocol.webservice.WebServiceProtocol
memcached=memcom.alibaba.dubbo.rpc.protocol.memcached.MemcachedProtocol

简单把Protocol实现分为2大类。

filter,listener,register 采用代理模式,主要实现Protocol的扩展,调用具体protocal实现类完成。

其他,如dubbo,injvm,hession等,负责具体通讯工作。

3. 过程分析

3.1 测试代码(来自测试类DubboProtocolTest)

Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
ProxyFactory proxy = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
DemoService service = new DemoServiceImpl();
protocol.export(proxy.getInvoker(service, DemoService.class, URL.valueOf("dubbo://127.0.0.1:9010/" + DemoService.class.getName())));

3.2 Protocol选择

由ExtensionLoader负责完成。具体代码片段

Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

最后的protocol是个包装类。如下图

技术分享

至于具体解析过程,参见《Dubbo原理解析-Dubbo内核实现之基于SPI思想Dubbo内核实现 》

3.3 ProtocolFilterWrapper VS ProtocolListenerWrapper

技术分享

通过图,可以知道

Listener,Filter ,DUBBO提供了动态扩展。

ProtocolFilterWrapper 和 ProtocolListenerWrapper都是使用代理模式,实现功能扩展。但两者从使用场景上有所差别。

ProtocolFilterWrapper :主要通过构造调用链,实现功能扩展。提供了丰富的Filter接口。

ProtocolListenerWrapper:通过对服务的暴露和下线进行消息通知,DUBBO提供Listener接口不多。

下面简单分享下调用时机

ListenerExporterWrapper.java

public class ListenerExporterWrapper<T> implements Exporter<T> {

    private static final Logger logger = LoggerFactory.getLogger(ListenerExporterWrapper.class);

    private final Exporter<T> exporter;
    
    private final List<ExporterListener> listeners;

    public ListenerExporterWrapper(Exporter<T> exporter, List<ExporterListener> listeners){
        if (exporter == null) {
            throw new IllegalArgumentException("exporter == null");
        }
        this.exporter = exporter;
        this.listeners = listeners;
        if (listeners != null && listeners.size() > 0) {
            RuntimeException exception = null;
            for (ExporterListener listener : listeners) {
                if (listener != null) {
                    try {
                        listener.exported(this);
                    } catch (RuntimeException t) {
                        logger.error(t.getMessage(), t);
                        exception = t;
                    }
                }
            }
            if (exception != null) {
                throw exception;
            }
        }
    }

    public Invoker<T> getInvoker() {
        return exporter.getInvoker();
    }

    public void unexport() {
        try {
            exporter.unexport();
        } finally {
            if (listeners != null && listeners.size() > 0) {
                RuntimeException exception = null;
                for (ExporterListener listener : listeners) {
                    if (listener != null) {
                        try {
                            listener.unexported(this);
                        } catch (RuntimeException t) {
                            logger.error(t.getMessage(), t);
                            exception = t;
                        }
                    }
                }
                if (exception != null) {
                    throw exception;
                }
            }
        }
    }

}

Listener的调用,发生在export,unexport执行后进行

ProtocolFilterWrapper.java

public class ProtocolFilterWrapper implements Protocol {

    private final Protocol protocol;

    public ProtocolFilterWrapper(Protocol protocol){
        if (protocol == null) {
            throw new IllegalArgumentException("protocol == null");
        }
        this.protocol = protocol;
    }

    public int getDefaultPort() {
        return protocol.getDefaultPort();
    }

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
    }

    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
            return protocol.refer(type, url);
        }
        return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
    }

    public void destroy() {
        protocol.destroy();
    }

    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (filters.size() > 0) {
            for (int i = filters.size() - 1; i >= 0; i --) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {

                    public Class<T> getInterface() {
                        return invoker.getInterface();
                    }

                    public URL getUrl() {
                        return invoker.getUrl();
                    }

                    public boolean isAvailable() {
                        return invoker.isAvailable();
                    }

                    public Result invoke(Invocation invocation) throws RpcException {
                        return filter.invoke(next, invocation);
                    }

                    public void destroy() {
                        invoker.destroy();
                    }

                    @Override
                    public String toString() {
                        return invoker.toString();
                    }
                };
            }
        }
        return last;
    }
    
}

以CacheFilter.java为例

@Activate(group = {Constants.CONSUMER, Constants.PROVIDER}, value = Constants.CACHE_KEY)
public class CacheFilter implements Filter {

    private CacheFactory cacheFactory;

    public void setCacheFactory(CacheFactory cacheFactory) {
        this.cacheFactory = cacheFactory;
    }

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        if (cacheFactory != null && ConfigUtils.isNotEmpty(invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.CACHE_KEY))) {
            Cache cache = cacheFactory.getCache(invoker.getUrl().addParameter(Constants.METHOD_KEY, invocation.getMethodName()));
            if (cache != null) {
                String key = StringUtils.toArgumentString(invocation.getArguments());
                if (cache != null && key != null) {
                    Object value = cache.get(key);
                    if (value != null) {
                        return new RpcResult(value);
                    }
                    Result result = invoker.invoke(invocation);
                    if (! result.hasException()) {
                        cache.put(key, result.getValue());
                    }
                    return result;
                }
            }
        }
        return invoker.invoke(invocation);
    }

}

可以看出,Filter会在包裹着服务调用。

最后总结下两者区别

区别
ProtocolFilterWrapperProtocolListenerWrapper
内部实现类


调用时机
服务具体调用时
服务暴露,服务下线时
使用场景
RPC调用时,动态扩展
消息通知
上下文
类似Around  Advise
类似After Advise
组织形式
调用链
循环遍历

4. 服务暴露过程主体类图

技术分享

  1. DubboProtocol:具体协议实现类,服务的暴露,调用入口

  2. Exchangers:ExchangeServer,ExchangeClient工厂类

  3. HeaderExchanger:Exchanger意为“交换”的意思,表示数据交换接口,连接上下两侧。

  4. HeaderExchangeServer:提供分布式服务的服务器Exchange接口

  5. NettyTransporter:提供网络基本接口(bind,connection)

  6. NettyServer:借助第三方netty,对外提供端口。

本文出自 “简单” 博客,请务必保留此出处http://dba10g.blog.51cto.com/764602/1885295

Dubbo点滴(4)之暴露服务解析