本文基于:dubbo 2.6.3版本。
分享人:某人zZ
总所周知,Dubbo的配置有两种方式:XML方式或者注解方式
由于我司常用的为XML,所以主要聊一下XML的流程,Dubbo 是基于Sprin配置文件宽展了自定义的解析,在Spring容器(AbstractApplicationContext)启动时执行refresh()方法中会调用DubboNamespaceHandler的init()方法,具体如下:
(PS:Spring如何调用DubboNamespaceHandler的方法具体不做分析)
public class DubboNamespaceHandler extends NamespaceHandlerSupport {
static {
Version.checkDuplicate(DubboNamespaceHandler.class);
}
@Override
public void init() {
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
}
}
主要作用为:将XML和对应XML解析器做映射,具体的调用链路如下:
Spring容器启动时,会将扫描到xml中的各个Dubbo自定义的如dubbo:application、dubbo:registry、dubbo:service等标签,根据DubboNameSpaceHandler中初始化的解析器,解析为对应的BeanDefiniton。
<?xml version="1.0" encoding="UTF-8"?>
最终会加载如下的BeanDefinition
由于ServiceBean实现了ApplicationListener,而对应的onApplicationEvent方法中的export() 方法就是本文的主角。
public class ServiceBean extends ServiceConfig implements InitializingBean, DisposableBean,ApplicationContextAware, ApplicationListener, BeanNameAware,ApplicationEventPublisherAware {
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
// 延迟暴露
if (isDelay() && !isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
export();
}
}
}
PS:ApplicationListener的onApplicationEvent具体执行的链路如下,忘记的可以按照下面的流程跟一下代码:
由于直接撸代码可能会比较乱,所以先简单地捋一下整体的流程。
服务发现:表示该配置项用于服务的注册与发现,目的是让消费方找到提供方。 服务治理:表示该配置项用于治理服务间的关系,或为开发测试提供便利条件。 性能调优:表示该配置项用于调优性能,不同的选项对性能会产生影响。
数据配置类 | 含义 |
ApplicationConfig | 应用信息配置 dubbo:application/ 用于配置当前应用信息,不管该应用是提供者还是消费者。 |
ResistryConfig | 注册中心配置 dubbo:registry/ 用于配置连接注册中心相关信息。 |
ProtocolConfig | 服务提供者协议配置 dubbo:protocol/ 用于配置提供服务的协议信息,协议由提供方指定,消费方被动接受。 |
MonitorConfig | 监控中心配置 dubbo:monitor/ 用于配置连接监控中心相关信息,可选。 |
ServiceConfig | 服务提供者暴露服务配置 dubbo:service/ 用于暴露一个服务,定义服务的元信息,一个服务可以用多个协议暴露,一个服务也可以注册到多个注册中心。 |
ProviderConfig | 服务提供者缺省值配置 dubbo:provider/ 当ProtocolConfig和ServiceConfig某属性没有配置时,采用此缺省值,可选。 |
ReferenceConfig | 服务消费者引用服务配置 dubbo:reference/ 用于创建一个远程服务代理,一个引用可以指向多个注册中心。 |
ConsumerConfig | 服务消费者缺省值配置 dubbo:consumer/ 当ReferenceConfig某属性没有配置时,采用此缺省值,可选。 |
MoudleConfig | 模块信息配置 dubbo:moudle/ 用于配置当前模块信息,可选。 |
MethodConfig | 方法及配置 dubbo:method/ 用于ServiceConfig和ReferenceConfig指定方法级的配置信息。 |
(本地暴露的URL、远程暴露的URL)
为什么要本地暴露? 因为同一个jvm的其他服务调用当前服务时, 就不用走远程服务调用了,直接调用injvm的服务就可以了。
injvm://127.0.0.1/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20890&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=49599&side=provider×tamp=1666425598425
registry的URL
registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo://192.168.0.100:20890/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20890&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=51020&side=provider×tamp=1666426018092&group=aaa&pid=51020®istry=zookeeper×tamp=1666426018075
Dubbo的URL
dubbo://192.168.0.100:20890/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20890&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=49599&side=provider×tamp=1666425598425
1.构造本地暴露的URL。
injvm://127.0.0.1/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20890&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=94019&side=provider×tamp=1666446463499
2.通过proxyFactory.getInvoker,把需要暴露的服务封装为Wrapper,然后构造一个Invoker。具体链路如下:
3.通过protocol.export(invoker),构造一个Exporter。 具体链路如下:
4.把exporter缓存到exporters。
Protocol为Dubbo的SPI接口,Dubbo的配置类:com.alibaba.dubbo.rpc.Protocol具体如下:
-- Protocol的SPI的配置
filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper
mock=com.alibaba.dubbo.rpc.support.MockProtocol
dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol
injvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol
rmi=com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol
hessian=com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocol
com.alibaba.dubbo.rpc.protocol.http.HttpProtocol
com.alibaba.dubbo.rpc.protocol.webservice.WebServiceProtocol
thrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocol
memcached=com.alibaba.dubbo.rpc.protocol.memcached.MemcachedProtocol
redis=com.alibaba.dubbo.rpc.protocol.redis.RedisProtocol
rest=com.alibaba.dubbo.rpc.protocol.rest.RestProtocol
registry=com.alibaba.dubbo.registry.integration.RegistryProtocol
qos=com.alibaba.dubbo.qos.protocol.QosProtocolWrapper
Dubbo会通过ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension() 构造一个Protocol的Adaptive类。具体流程不清楚可以看上一篇《Dubbo的SPI简介》。
重点涉及到的有如下:
1.QosProtocolWrapper:如果是注册的Invoker,则会开启QOS服务。
QoS(Quality of Service,服务质量)指一个网络能够利用各种基础技术,为指定的网络通信提供更好的服务能力,是网络的一种安全机制, 是用来解决网络延迟和阻塞等问题的一种技术。dubbo为用户提供类似的网络服务用来online和offline service来解决网络延迟,阻塞等问题。
ProtocolFilterWrapper:如果不是注册的Invoker,组装一个invoker链。
EchoFilter -> ClassLoaderFilter -> GenericFilter -> ContextFilter -> TraceFilter -> TimeoutFilter -> MonitorFilter -> ExceptionFilter
如图:
ProtocolListenerWrapper:如果不是注册的Invoker,则构建一个ListenerExporterWrapper,包含了ExporterListener集合。(Dubbo默认没有listener)。
Filter也是Dubbo的SPI接口。
Filter的配置com.alibaba.dubbo.rpc.Filter如下:
-- filter的配置
cache=com.alibaba.dubbo.cache.filter.CacheFilter
validation=com.alibaba.dubbo.validation.filter.ValidationFilter
echo=com.alibaba.dubbo.rpc.filter.EchoFilter
generic=com.alibaba.dubbo.rpc.filter.GenericFilter
genericimpl=com.alibaba.dubbo.rpc.filter.GenericImplFilter
token=com.alibaba.dubbo.rpc.filter.TokenFilter
accesslog=com.alibaba.dubbo.rpc.filter.AccessLogFilter
activelimit=com.alibaba.dubbo.rpc.filter.ActiveLimitFilter
classloader=com.alibaba.dubbo.rpc.filter.ClassLoaderFilter
context=com.alibaba.dubbo.rpc.filter.ContextFilter
consumercontext=com.alibaba.dubbo.rpc.filter.ConsumerContextFilter
exception=com.alibaba.dubbo.rpc.filter.ExceptionFilter
executelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter
deprecated=com.alibaba.dubbo.rpc.filter.DeprecatedFilter
compatible=com.alibaba.dubbo.rpc.filter.CompatibleFilter
timeout=com.alibaba.dubbo.rpc.filter.TimeoutFilter
trace=com.alibaba.dubbo.rpc.protocol.dubbo.filter.TraceFilter
future=com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter
monitor=com.alibaba.dubbo.monitor.support.MonitorFilter
Dubbo会通过 ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group) 方法获取Filter列表。
该方法会对Filter列表进行排序:
1.如果Filter 有order,则会根据order,从小到大排序
2.如果没有order,则会将cachedActivates中的Filter的顺序 倒排。
FAQ: 1.cachedActivates中的Filter的顺序是按照Filter的name的Hash值排序的。 2.为什么没有Filter,没有order会根据cachedActivates中的Filter的顺序 倒排 ? 具体可以看一下ActivateComparator.COMPARATOR的排序规则。具体的规则大概流程:
n1 > n2 ? 1: -1 filter1的order 不大于 filter2的order,则放在filter2的前面。
1.构建URL
registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&group=aaa&pid=94019®istry=zookeeper×tamp=1666446463342
2.通过proxyFactory.getInvoker,把需要暴露的服务封装为Wrapper,然后构造一个Invoker。(和本地暴露一下)。
3.将invoker封装为DelegateProviderMetaDataInvoker
4.(重要)通过protocol.export(invoker),构造一个Exporter。
具体链路如下:
RegistryPortocol
1.服务暴露(构建Dubbo 的URL 然后调用DubooProtocol的export的方法)
2.构造真正的registryURL(ZK的注册中心URL)
zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo://192.168.0.100:20890/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20890&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=62781&side=provider×tamp=1666429076800&group=aaa&pid=62781×tamp=1666429076786
3.获取ZK的Registry
4.构建Dubbo的URL
5.将invoker缓存到providerInvokers中
6.将服务注册到ZK中,并将注册表中的服务的是否已经注册(isReg) 设置为true
7.创建监听器,放入缓存中,并且订阅注册中心的override数据(2.7.X之后废弃)
DubboProtocol
1.构造exporter,然后缓存到exporterMap中。
2.开启Netty服务
5.把exporter缓存到exporters。
先从ServiceBean的onApplicationEvent()方法开始
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
// 如果是延时暴露 或者 已经暴露 或者 不能暴露,则不暴露
if (isDelay() && !isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
// 调用ServiceConfig的export()方法
export();
}
}
ServiceConfig的export()方法代码如下:
public synchronized void export() {
// 服务提供者的配置是否为空(默认为空)
if (provider != null) {
if (export == null) {
export = provider.getExport();
}
if (delay == null) {
delay = provider.getDelay();
}
}
if (export != null && !export) {
return;
}
// 延迟暴露的配置是否为空,不为空则延迟暴露
if (delay != null && delay > 0) {
delayExportExecutor.schedule(new Runnable() {
@Override
public void run() {
doExport();
}
}, delay, TimeUnit.MILLISECONDS);
} else {
// 直接暴露服务
doExport();
}
}
ServiceConfig的doExport()方法代码如下:
protected synchronized void doExport() {
if (unexported) {
throw new IllegalStateException("Already unexported!");
}
if (exported) {
return;
}
exported = true;
if (interfaceName == null || interfaceName.length() == 0) {
throw new IllegalStateException(" interface not allow null!");
}
// 校验ProviderConfig是否为空,为空则构建一个并初始化
checkDefault();
if (provider != null) {
if (application == null) {
application = provider.getApplication();
}
if (module == null) {
module = provider.getModule();
}
if (registries == null) {
registries = provider.getRegistries();
}
if (monitor == null) {
monitor = provider.getMonitor();
}
if (protocols == null) {
protocols = provider.getProtocols();
}
}
if (module != null) {
if (registries == null) {
registries = module.getRegistries();
}
if (monitor == null) {
monitor = module.getMonitor();
}
}
if (application != null) {
if (registries == null) {
registries = application.getRegistries();
}
if (monitor == null) {
monitor = application.getMonitor();
}
}
if (ref instanceof GenericService) {
interfaceClass = GenericService.class;
if (StringUtils.isEmpty(generic)) {
generic = Boolean.TRUE.toString();
}
} else {
try {
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
checkInterfaceAndMethods(interfaceClass, methods);
checkRef();
generic = Boolean.FALSE.toString();
}
if (local != null) {
if ("true".equals(local)) {
local = interfaceName + "Local";
}
Class<?> localClass;
try {
localClass = ClassHelper.forNameWithThreadContextClassLoader(local);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
if (!interfaceClass.isAssignableFrom(localClass)) {
throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName);
}
}
if (stub != null) {
if ("true".equals(stub)) {
stub = interfaceName + "Stub";
}
Class<?> stubClass;
try {
stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
if (!interfaceClass.isAssignableFrom(stubClass)) {
throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + interfaceName);
}
}
// 检验ApplicationConfig不为空,并append系统参数
checkApplication();
// 检验RegistryConfig不为空,并append系统参数
checkRegistry();
// 检验ProtocolConfig不为空,并append系统参数
checkProtocol();
// append系统参数到ServiceConfig
appendProperties(this);
// stub、mock的合理性校验(是否有构造方法)
checkStubAndMock(interfaceClass);
if (path == null || path.length() == 0) {
path = interfaceName;
}
// 暴露服务
doExportUrls();
// 构造providerModel 并缓存到providedServices中
ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
}
调用ServiceConfig的doExportUrls()方法
private void doExportUrls() {
// 获取配置的注册中心列表(默认为zk)
List registryURLs = loadRegistries(true);
// 遍历 协议列表(默认为dubbo协议)
for (ProtocolConfig protocolConfig : protocols) {
// 根据不同的协议 暴露服务
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
调用ServiceConfig的doExportUrlsFor1Protocol方法
/**
* 真正的开始暴露服务
* registryURLs: registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&group=aaa&pid=4939®istry=zookeeper×tamp=1666491937481
*/
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List registryURLs) {
// 获取协议名称,默认为dubbo
String name = protocolConfig.getName();
if (name == null || name.length() == 0) {
name = "dubbo";
}
// 组装URL的各个参数
Map map = new HashMap();
// side、dubbo、timestamp、pid的参数组装
map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
// application、moudle、provider、protocol参数组装
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, provider, Constants.DEFAULT_KEY);
appendParameters(map, protocolConfig);
appendParameters(map, this);
// 遍历methods,然后放入map中
if (methods != null && !methods.isEmpty()) {
for (MethodConfig method : methods) {
appendParameters(map, method, method.getName());
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
List arguments = method.getArguments();
if (arguments != null && !arguments.isEmpty()) {
for (ArgumentConfig argument : arguments) {
// convert argument type
if (argument.getType() != null && argument.getType().length() > 0) {
Method[] methods = interfaceClass.getMethods();
// visit all methods
if (methods != null && methods.length > 0) {
for (int i = 0; i < methods.length; i++) {
String methodName = methods[i].getName();
// target the method, and get its signature
if (methodName.equals(method.getName())) {
Class<?>[] argtypes = methods[i].getParameterTypes();
// one callback in the method
if (argument.getIndex() != -1) {
if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
} else {
// multiple callbacks in the method
for (int j = 0; j < argtypes.length; j++) {
Class<?> argclazz = argtypes[j];
if (argclazz.getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + j);
if (argument.getIndex() != -1 && argument.getIndex() != j) {
throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
}
}
}
}
}
}
} else if (argument.getIndex() != -1) {
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("argument config must set index or type attribute.eg: or ");
}
}
}
} // end of methods for
}
// generic、revision、token参数组装
if (ProtocolUtils.isGeneric(generic)) {
map.put(Constants.GENERIC_KEY, generic);
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
}
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("NO method found in service interface " + interfaceClass.getName());
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet(Arrays.asList(methods)), ","));
}
}
if (!ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
} else {
map.put(Constants.TOKEN_KEY, token);
}
}
if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
protocolConfig.setRegister(false);
map.put("notify", "false");
}
// export service
String contextPath = protocolConfig.getContextpath();
if ((contextPath == null || contextPath.length() == 0) && provider != null) {
contextPath = provider.getContextpath();
}
// 获取 host、port,并组装URL
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
// 默认不执行
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
// 获取URL的scope
String scope = url.getParameter(Constants.SCOPE_KEY);
// don't export when none is configured
// scope 不等于 none
if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
// export to local if the config is not remote (export to remote only when config is remote)
// scope 不等于remote
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
// 本地暴露
exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
// scope 不等于 local
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
// registryURLs不为空
if (registryURLs != null && !registryURLs.isEmpty()) {
// 循环遍历url,远程暴露服务
for (URL registryURL : registryURLs) {
// 重新构造Dubbo的URL
url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
// 构造monitor的URL
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
// 从URL中获取代理方式
String proxy = url.getParameter(Constants.PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
}
// 根据 proxyFactory.getInvoker 获取invoker
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 服务远程暴露
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
// registryURLs为空的场景
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
}
this.urls.add(url);
}
ServiceConfig的exportLocal方法代码如下:
Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
/**
* url: dubbo://192.168.0.100:20890/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20890&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=12906&side=provider×tamp=1666493114123
*/
@SuppressWarnings({"unchecked", "rawtypes"})
private void exportLocal(URL url) {
// protocol 不等于 injvm的话
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
// 构造本地暴露的URL
// injvm://127.0.0.1/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20890&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=12906&side=provider×tamp=1666493114123
URL local = URL.valueOf(url.toFullString())
.setProtocol(Constants.LOCAL_PROTOCOL)
.setHost(LOCALHOST)
.setPort(0);
ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));
// 执行protocol.export方法 本地暴露(重要)
Exporter<?> exporter = protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
// 放入缓存中
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
}
}
此处的protocol是Dubbo的SPI接口,是根据ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension() 方法获取的,会先获取适配次Adaptive。具体SPI相关的代码请看上一篇《Dubbo的SPI机制简介解析》。
关于本地暴露的export的流程:
@Override
public Exporter export(Invoker arg0) throws RpcException {
// 判空校验
if (arg0 == null) {
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
}
// url判空检验
if (arg0.getUrl() == null) {
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
}
// 如果协议参数为空,则默认为dubbo
URL url = arg0.getUrl();
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null) {
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
}
// 根据协议获取protocol的服务提供者,getExtension方法会先获取包装类(具体请看上一篇《Dubbo SPI核心源码解析》)
Protocol extension = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
// 再调用export方法
return extension.export(arg0);
}
@Override
public Exporter export(Invoker invoker) throws RpcException {
// 如果protocol等于registry
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
// 则开启qos
startQosServer(invoker.getUrl());
return protocol.export(invoker);
}
// 不等于registry
return protocol.export(invoker);
}
@Override
public Exporter export(Invoker invoker) throws RpcException {
// 如果protocol 等于 registry
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
// 继续调用export
return protocol.export(invoker);
}
// 如果 protocol 不等于 registry,则根据Dubbo的SPI 获取ExporterListener的服务提供者列表, 然后构造ListenerExporterWrapper。
return new ListenerExporterWrapper(protocol.export(invoker),
Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
.getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
}
@Override
public Exporter export(Invoker invoker) throws RpcException {
// 构造InjvmExporter
return new InjvmExporter(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
此处的protocol和本地暴露也是一样的。
远程暴露的流程
@Override
public Exporter export(final Invoker originInvoker) throws RpcException {
//export invoker
// 真正的服务暴露方法
final ExporterChangeableWrapper exporter = doLocalExport(originInvoker);
// 获取注册中心的URL
/**
* 构造zk注册中心的URL
* zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo://192.168.0.100:20890/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20890&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=28778&side=provider×tamp=1666495390735&group=aaa&pid=28778×tamp=1666495390726
*/
URL registryUrl = getRegistryUrl(originInvoker);
// 获取注册中心ZookeeperRegistry
final Registry registry = getRegistry(originInvoker);
/**
* 构造需要注册到ZK上的URL
* dubbo://192.168.0.100:20890/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=28778&side=provider×tamp=1666495390735
*/
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
// 是否需要注册
boolean register = registedProviderUrl.getParameter("register", true);
//添加到注册表
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);
// 如果需要注册的话,则注册服务,并将注册表中的服务的是否已经注册(isReg) 设置为true
if (register) {
register(registryUrl, registedProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// Subscribe the override data
// 订阅重写数据,在2.7.0之后的版本废弃,所以不做详细分析
// 获取重写订阅的URL
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
// 构造重写订阅的监听器
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
// 订阅注册中心的重写数据
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
// 返回构造的exporter
return new DestroyableExporter(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl);
}
订阅ZK具体有啥用:
Dubbo提供了动态配置的功能,当服务发布后,用户可以通过管理中心来进行动态配置,提供者可以感知这些配置的变化并更新到本地,如果需要将服务合并配置后重新发布。如当我们使用dubbo-admin 进行动态配置时,dubbo-admin 会在注册中心或配置中心创建配置节点(Dubbo 2.6 没有配置中心,会在注册中心上创建,而 Dubbo 2.7 则是在配置中心上创建),提供者只需要监听这些节点的变化即可。监听的首要工作是生成一个 URL 代表监听的节点信息,即本步的工作。
RegistryProtocol的doLocalExport方法
/**
* registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo://192.168.0.100:20890/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20890&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=28778&side=provider×tamp=1666495390735&group=aaa&pid=28778®istry=zookeeper×tamp=1666495390726
*/
private ExporterChangeableWrapper doLocalExport(final Invoker originInvoker) {
/**
* 获取参数export的值
* dubbo://192.168.0.100:20890/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20890&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=28778&side=provider×tamp=1666495390735
*/
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper exporter = (ExporterChangeableWrapper) bounds.get(key);
// 默认为空(DCL 双重校验)
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper) bounds.get(key);
if (exporter == null) {
// 构造invoker
final Invoker<?> invokerDelegete = new InvokerDelegete(originInvoker, getProviderUrl(originInvoker));
// 调用DubboProtocol的export方法
exporter = new ExporterChangeableWrapper((Exporter) protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}
/**
* 1.构造exporter,然后缓存到exporterMap中。
* 2.开启Netty服务
* 3.序列化URL(Dubbo默认不处理)
* URL: dubbo://192.168.0.100:20890/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20890&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=53733&side=provider×tamp=1666500518340
*/
@Override
public Exporter export(Invoker invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
// 获取需要暴露的服务(org.apache.dubbo.samples.api.client.HelloService:20890)
String key = serviceKey(url);
// 构造Dubbo的exporter
DubboExporter exporter = new DubboExporter(invoker, key, exporterMap);
// 翻入缓存
exporterMap.put(key, exporter);
//export an stub service for dispatching event
// 如果是dubbo.stub.event或者is_callback_service这两个参数的值,没有的话默认为false(Dubbo默认流程不执行)
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
// 开启服务
openServer(url);
// 序列化URL
optimizeSerialization(url);
return exporter;
}
DubboProtocol的openServer方法
private void openServer(URL url) {
// find server.
// 获取 address: 192.168.0.100:20890
String key = url.getAddress();
// 判断URL的isserver的值,默认为true
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
// 判断server是否为空,第一次执行为空,后续不为空
ExchangeServer server = serverMap.get(key);
if (server == null) {
// 构造一个netty服务。并缓存到serverMap中
serverMap.put(key, createServer(url));
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
/**
* 构造一个netty服务
*/
private ExchangeServer createServer(URL url) {
// 设置URL的channel.readonly.sent为true
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
// 设置URL的heartbeat 为60s
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
// 获取URL的server,默认为netty
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
// 设置URL的codec为dubbo
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server;
try {
// 真正的开始netty服务
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
Exchangers的bind的方法
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
// 获取Exchager,并绑定URL 和handler
return getExchanger(url).bind(url, handler);
}
Exchangers的getExchanger方法,最后获取到HeaderExchanger服务提供者
/**
* 获取Exchanger,通过Dubbo的SPI 获取
*/
public static Exchanger getExchanger(URL url) {
// 默认类型为header
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
return getExchanger(type);
}
/**
* 通过SPI获取Exchagner --> HeaderExchanger
*/
public static Exchanger getExchanger(String type) {
return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}
调用HeaderExchanger的bind方法
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
// 调用Transporters的bind方法获取Server
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
// 通过Dubbo 的SPI 获取Transporter,并绑定URL和Handler
return getTransporter().bind(url, handler);
}
获取Transporter的服务提供者(通过Dubbo的SPI获取)
// 通过Dubbo的SPI获取Transporter
public static Transporter getTransporter() {
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
NettyTransporter的bind方法
public class NettyTransporter implements Transporter {
public static final String NAME = "netty";
@Override
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
@Override
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}
}
new NettyServer的代码逻辑
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
/**
* netty真正启动逻辑
*/
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = NetUtils.ANYHOST;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
try {
// 初始化ServerBootstrap,并绑定address
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
//fixme replace this with better method
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}
@Override
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
// https://issues.jboss.org/browse/NETTY-365
// https://issues.jboss.org/browse/NETTY-379
// final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
/*int idleTimeout = getIdleTimeout();
if (idleTimeout > 10000) {
pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
}*/
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// bind
channel = bootstrap.bind(getBindAddress());
}
RegisteryProtocol的register方法
/**
* 注册中心URL:zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo://192.168.0.100:20890/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20890&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=76029&side=provider×tamp=1666504784182&group=aaa&pid=76029×tamp=1666504784173
* 服务的URL: dubbo://192.168.0.100:20890/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=76029&side=provider×tamp=1666504784182
*/
public void register(URL registryUrl, URL registedProviderUrl) {
// 根据Dubbo 的SPI的获取注册中心(默认为ZookerRegistry,调用new ZookerRegistry()方法)
Registry registry = registryFactory.getRegistry(registryUrl);
// 调用ZK的注册中心注册 服务
registry.register(registedProviderUrl);
}
ZookeeperRegistry的register方法:
因为ZookeeperRegistry继承了FailbackRegistry,所以看FailbackRegistry的register方法
@Override
public void register(URL url) {
// 把URL缓存到registered中
super.register(url);
failedRegistered.remove(url);
failedUnregistered.remove(url);
try {
// Sending a registration request to the server side
// 真正注册URL
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// Record a failed registration request to a failed list, retry regularly
failedRegistered.add(url);
}
}
ZookeeperRegistry的doRegister方法
@Override
protected void doRegister(URL url) {
try {
// 调用zkClient的create方法创建节点
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
Over.
页面更新:2024-05-15
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号