本篇主要分析下dubbo中服务是如何暴露的。
首先关注下ServiceBean,它实现了很多接口,
重点关注下与spring会回调的bean即
ApplicationListener
1
2
3
4
5
6
7
8
9
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (!isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
export();//方法入口
}
}
查看export()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public synchronized void export() {
//可能会在多个地方配置相同的项,所以需要根据优先级来进行一个覆盖操作
checkAndUpdateSubConfigs();
if (!shouldExport()) {
return;
}
if (shouldDelay()) {
//由单线程池来完成延时服务的暴露
delayExportExecutor.schedule(this::doExport, delay, TimeUnit.MILLISECONDS);
} else {
doExport();
}
}
doExportUrls()方法
1
2
3
4
5
6
7
8
9
10
private void doExportUrls() {
List<URL> registryURLs = loadRegistries(true);
for (ProtocolConfig protocolConfig : protocols) {
String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
ApplicationModel.initProviderModel(pathKey, providerModel);
//一个一个协议的进行导出
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
接下来是对url的一顿封装,较为核心的代码如下:
1
2
3
4
5
6
7
//将具体的服务转为invoker,返回对象为AbstractProxyInvoker
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
//将invoker转为exporter
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
这里的getInvoker三个参数说明下,第一个为实现类,第二个为接口,第三个参数格式如下
1
2
3
4
5
6
7
8
registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2
&export=
http://192.168.0.103:8989/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider
&bean.name=org.apache.dubbo.demo.DemoService&bind.ip=192.168.0.103&bind.port=8989
&default.deprecated=false&default.dynamic=false&default.register=true&deprecated=false&dubbo=2.0.2
&dynamic=false&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello
&payload=8388608&pid=90756&qos.port=22222®ister=true&release=
&side=provider×tamp=1554254604056&pid=90756&qos.port=22222®istry=zookeeper×tamp=1554254473099
这个url其实是由两个url组成,一是注册中心的url,第二个是服务的url,内容为export后面这一截儿。 接下来通过动态字节码技术拼接了个实现类的代理类,在代理类中直接使用目标对象调用目标方法。
AbstractProxyInvoker中invoke方法的实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public Result invoke(Invocation invocation) throws RpcException {
RpcContext rpcContext = RpcContext.getContext();
try {
//调用Wrapper中动态生成的代理
Object obj = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
if (RpcUtils.isReturnTypeFuture(invocation)) {
return new AsyncRpcResult((CompletableFuture<Object>) obj);
} else if (rpcContext.isAsyncStarted()) { // ignore obj in case of RpcContext.startAsync()? always rely on user to write back.
return new AsyncRpcResult(((AsyncContextImpl)(rpcContext.getAsyncContext())).getInternalFuture());
} else {
return new RpcResult(obj);
}
} catch (InvocationTargetException e) {
// TODO async throw exception before async thread write back, should stop asyncContext
if (rpcContext.isAsyncStarted() && !rpcContext.stopAsync()) {
logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
}
return new RpcResult(e.getTargetException());
} catch (Throwable e) {
throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
再来剖析下
1
2
//将invoker转为exporter
Exporter<?> exporter = protocol.export(wrapperInvoker);
因为protocol有两个包装类ProtocolFilterWrapper和ProtocolListenerWrapper,这里先暂不关注, 直接看RegistryProtocol和AbstractProxyProtocol两个类中的实现。
在RegistryProtocol中export实现如下,主要是
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
URL registryUrl = getRegistryUrl(originInvoker);
// url to export locally
URL providerUrl = getProviderUrl(originInvoker);
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
// the same service. Because the subscribed is cached key with the name of the service, it causes the
// subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// url to registry
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
// 向服务提供者与消费者注册表中注册服务提供者
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
registryUrl, registeredProviderUrl);
//to judge if we need to delay publish
boolean register = registeredProviderUrl.getParameter("register", true);
if (register) {
// 向注册中心注册服务(从url中取出注册中心协议,SPI加载实现类,这里为zk创建个临时节点)
register(registryUrl, registeredProviderUrl);
providerInvokerWrapper.setReg(true);
}
//注册监听器监听参数动态修改
// Deprecated! Subscribe to override rules in 2.6.x or before.
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
}
在AbstractProxyProtocol中export实现如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException {
final String uri = serviceKey(invoker.getUrl());
Exporter<T> exporter = (Exporter<T>) exporterMap.get(uri);
if (exporter != null) {
// When modifying the configuration through override, you need to re-expose the newly modified service.
if (Objects.equals(exporter.getInvoker().getUrl(), invoker.getUrl())) {
return exporter;
}
}
//判断是否创建server,如果没有则创建,具体实现在AbstractProxyProtocol的实现类中,如HttpProtocol
final Runnable runnable = doExport(proxyFactory.getProxy(invoker, true), invoker.getInterface(), invoker.getUrl());
exporter = new AbstractExporter<T>(invoker) {
@Override
public void unexport() {
super.unexport();
exporterMap.remove(uri);
if (runnable != null) {
try {
runnable.run();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
};
exporterMap.put(uri, exporter);
return exporter;
}
在AbstractProxyProtocol中的doExport中就会根据服务协议去创建server,接收连接以及请求。 先简单的写到这里,其实还有很多细节需要补充。
//TODO