dubbo服务暴露

Posted by Clear Blog on September 3, 2018

本篇主要分析下dubbo中服务是如何暴露的。

首先关注下ServiceBean,它实现了很多接口, 重点关注下与spring会回调的bean即 ApplicationListener

1
2
3
4
5
6
7
8
9
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
    if (!isExported() && !isUnexported()) {
        if (logger.isInfoEnabled()) {
            logger.info("The service ready on spring started. service: " + getInterface());
        }
        export();//方法入口
    }
}

查看export()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public synchronized void export() {
    //可能会在多个地方配置相同的项,所以需要根据优先级来进行一个覆盖操作
    checkAndUpdateSubConfigs();

    if (!shouldExport()) {
        return;
    }

    if (shouldDelay()) {
        //由单线程池来完成延时服务的暴露
        delayExportExecutor.schedule(this::doExport, delay, TimeUnit.MILLISECONDS);
    } else {
        doExport();
    }
}

doExportUrls()方法

1
2
3
4
5
6
7
8
9
10
private void doExportUrls() {
    List<URL> registryURLs = loadRegistries(true);
    for (ProtocolConfig protocolConfig : protocols) {
        String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
        ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
        ApplicationModel.initProviderModel(pathKey, providerModel);
        //一个一个协议的进行导出
        doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    }
}

接下来是对url的一顿封装,较为核心的代码如下:

1
2
3
4
5
6
7
 //将具体的服务转为invoker,返回对象为AbstractProxyInvoker
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

//将invoker转为exporter
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);

这里的getInvoker三个参数说明下,第一个为实现类,第二个为接口,第三个参数格式如下

1
2
3
4
5
6
7
8
registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2
&export=
http://192.168.0.103:8989/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider
&bean.name=org.apache.dubbo.demo.DemoService&bind.ip=192.168.0.103&bind.port=8989
&default.deprecated=false&default.dynamic=false&default.register=true&deprecated=false&dubbo=2.0.2
&dynamic=false&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello
&payload=8388608&pid=90756&qos.port=22222&register=true&release=
&side=provider&timestamp=1554254604056&pid=90756&qos.port=22222&registry=zookeeper&timestamp=1554254473099

这个url其实是由两个url组成,一是注册中心的url,第二个是服务的url,内容为export后面这一截儿。 接下来通过动态字节码技术拼接了个实现类的代理类,在代理类中直接使用目标对象调用目标方法。

AbstractProxyInvoker中invoke方法的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public Result invoke(Invocation invocation) throws RpcException {
    RpcContext rpcContext = RpcContext.getContext();
    try {
        //调用Wrapper中动态生成的代理
        Object obj = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
        if (RpcUtils.isReturnTypeFuture(invocation)) {
            return new AsyncRpcResult((CompletableFuture<Object>) obj);
        } else if (rpcContext.isAsyncStarted()) { // ignore obj in case of RpcContext.startAsync()? always rely on user to write back.
            return new AsyncRpcResult(((AsyncContextImpl)(rpcContext.getAsyncContext())).getInternalFuture());
        } else {
            return new RpcResult(obj);
        }
    } catch (InvocationTargetException e) {
        // TODO async throw exception before async thread write back, should stop asyncContext
        if (rpcContext.isAsyncStarted() && !rpcContext.stopAsync()) {
            logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
        }
        return new RpcResult(e.getTargetException());
    } catch (Throwable e) {
        throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

再来剖析下

1
2
//将invoker转为exporter
Exporter<?> exporter = protocol.export(wrapperInvoker);

因为protocol有两个包装类ProtocolFilterWrapper和ProtocolListenerWrapper,这里先暂不关注, 直接看RegistryProtocol和AbstractProxyProtocol两个类中的实现。

在RegistryProtocol中export实现如下,主要是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    URL registryUrl = getRegistryUrl(originInvoker);
    // url to export locally
    URL providerUrl = getProviderUrl(originInvoker);

    // Subscribe the override data
    // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
    //  the same service. Because the subscribed is cached key with the name of the service, it causes the
    //  subscription information to cover.
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

    providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
    //export invoker
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

    // url to registry
    final Registry registry = getRegistry(originInvoker);
    final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
    // 向服务提供者与消费者注册表中注册服务提供者
    ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
            registryUrl, registeredProviderUrl);
    //to judge if we need to delay publish
    boolean register = registeredProviderUrl.getParameter("register", true);
    if (register) {
        // 向注册中心注册服务(从url中取出注册中心协议,SPI加载实现类,这里为zk创建个临时节点)
        register(registryUrl, registeredProviderUrl);
        providerInvokerWrapper.setReg(true);
    }

    //注册监听器监听参数动态修改
    // Deprecated! Subscribe to override rules in 2.6.x or before.
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

    exporter.setRegisterUrl(registeredProviderUrl);
    exporter.setSubscribeUrl(overrideSubscribeUrl);
    //Ensure that a new exporter instance is returned every time export
    return new DestroyableExporter<>(exporter);
}

在AbstractProxyProtocol中export实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException {
    final String uri = serviceKey(invoker.getUrl());
    Exporter<T> exporter = (Exporter<T>) exporterMap.get(uri);
    if (exporter != null) {
        // When modifying the configuration through override, you need to re-expose the newly modified service.
        if (Objects.equals(exporter.getInvoker().getUrl(), invoker.getUrl())) {
            return exporter;
        }
    }
    //判断是否创建server,如果没有则创建,具体实现在AbstractProxyProtocol的实现类中,如HttpProtocol
    final Runnable runnable = doExport(proxyFactory.getProxy(invoker, true), invoker.getInterface(), invoker.getUrl());
    exporter = new AbstractExporter<T>(invoker) {
        @Override
        public void unexport() {
            super.unexport();
            exporterMap.remove(uri);
            if (runnable != null) {
                try {
                    runnable.run();
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        }
    };
    exporterMap.put(uri, exporter);
    return exporter;
}

在AbstractProxyProtocol中的doExport中就会根据服务协议去创建server,接收连接以及请求。 先简单的写到这里,其实还有很多细节需要补充。

//TODO