Dubbo之服务暴露流程浅析

本文基于:dubbo 2.6.3版本。

分享人:某人zZ

Dubbo的核心架构



一、前言:

总所周知,Dubbo的配置有两种方式:XML方式或者注解方式

DubboNamespaceHandler是干啥?

由于我司常用的为XML,所以主要聊一下XML的流程,Dubbo 是基于Sprin配置文件宽展了自定义的解析,在Spring容器(AbstractApplicationContext)启动时执行refresh()方法中会调用DubboNamespaceHandler的init()方法,具体如下:

(PS:Spring如何调用DubboNamespaceHandler的方法具体不做分析)

public class DubboNamespaceHandler extends NamespaceHandlerSupport {

    static {
        Version.checkDuplicate(DubboNamespaceHandler.class);
    }

    @Override
    public void init() {
        registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
        registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
        registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
        registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
        registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
        registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
        registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
        registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
        registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
        registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
    }

}

主要作用为:将XML和对应XML解析器做映射,具体的调用链路如下:



为什么会加载ServiceBean?

Spring容器启动时,会将扫描到xml中的各个Dubbo自定义的如dubbo:application、dubbo:registry、dubbo:service等标签,根据DubboNameSpaceHandler中初始化的解析器,解析为对应的BeanDefiniton。

<?xml version="1.0" encoding="UTF-8"?>


    
    

    
    

    
    

    
    

    

    
    

    

最终会加载如下的BeanDefinition



由于ServiceBean实现了ApplicationListener,而对应的onApplicationEvent方法中的export() 方法就是本文的主角。

public class ServiceBean extends ServiceConfig implements InitializingBean, DisposableBean,ApplicationContextAware, ApplicationListener, BeanNameAware,ApplicationEventPublisherAware {
  
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        // 延迟暴露 
        if (isDelay() && !isExported() && !isUnexported()) {
            if (logger.isInfoEnabled()) {
                logger.info("The service ready on spring started. service: " + getInterface());
            }
            export();
        }
    }
}

PS:ApplicationListener的onApplicationEvent具体执行的链路如下,忘记的可以按照下面的流程跟一下代码:



二、Dubbo服务暴露的流程

由于直接撸代码可能会比较乱,所以先简单地捋一下整体的流程。



1.校验配置和初始化

Dubbo的配置类

服务发现:表示该配置项用于服务的注册与发现,目的是让消费方找到提供方。 服务治理:表示该配置项用于治理服务间的关系,或为开发测试提供便利条件。 性能调优:表示该配置项用于调优性能,不同的选项对性能会产生影响。

数据配置类

含义

ApplicationConfig

应用信息配置 dubbo:application/ 用于配置当前应用信息,不管该应用是提供者还是消费者。

ResistryConfig

注册中心配置 dubbo:registry/ 用于配置连接注册中心相关信息。

ProtocolConfig

服务提供者协议配置 dubbo:protocol/ 用于配置提供服务的协议信息,协议由提供方指定,消费方被动接受。

MonitorConfig

监控中心配置 dubbo:monitor/ 用于配置连接监控中心相关信息,可选。

ServiceConfig

服务提供者暴露服务配置 dubbo:service/ 用于暴露一个服务,定义服务的元信息,一个服务可以用多个协议暴露,一个服务也可以注册到多个注册中心。

ProviderConfig

服务提供者缺省值配置 dubbo:provider/ 当ProtocolConfig和ServiceConfig某属性没有配置时,采用此缺省值,可选。

ReferenceConfig

服务消费者引用服务配置 dubbo:reference/ 用于创建一个远程服务代理,一个引用可以指向多个注册中心。

ConsumerConfig

服务消费者缺省值配置 dubbo:consumer/ 当ReferenceConfig某属性没有配置时,采用此缺省值,可选。

MoudleConfig

模块信息配置 dubbo:moudle/ 用于配置当前模块信息,可选。

MethodConfig

方法及配置 dubbo:method/ 用于ServiceConfig和ReferenceConfig指定方法级的配置信息。

Dubbo的Config的类继承图



2.根据配置构建URL

(本地暴露的URL、远程暴露的URL)

本地暴露的URL

为什么要本地暴露? 因为同一个jvm的其他服务调用当前服务时, 就不用走远程服务调用了,直接调用injvm的服务就可以了。

injvm://127.0.0.1/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20890&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=49599&side=provider×tamp=1666425598425

远程暴露的URL

registry的URL

registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo://192.168.0.100:20890/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20890&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=51020&side=provider×tamp=1666426018092&group=aaa&pid=51020®istry=zookeeper×tamp=1666426018075

Dubbo的URL

dubbo://192.168.0.100:20890/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20890&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=49599&side=provider×tamp=1666425598425

3.将服务暴露的本地和远程

本地暴露

1.构造本地暴露的URL。

injvm://127.0.0.1/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20890&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=94019&side=provider×tamp=1666446463499

2.通过proxyFactory.getInvoker,把需要暴露的服务封装为Wrapper,然后构造一个Invoker。具体链路如下:



3.通过protocol.export(invoker),构造一个Exporter。 具体链路如下:



4.把exporter缓存到exporters。

关于Protocol

Protocol为Dubbo的SPI接口,Dubbo的配置类:com.alibaba.dubbo.rpc.Protocol具体如下:

-- Protocol的SPI的配置
filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper
mock=com.alibaba.dubbo.rpc.support.MockProtocol
dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol
injvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol
rmi=com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol
hessian=com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocol
com.alibaba.dubbo.rpc.protocol.http.HttpProtocol
com.alibaba.dubbo.rpc.protocol.webservice.WebServiceProtocol
thrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocol
memcached=com.alibaba.dubbo.rpc.protocol.memcached.MemcachedProtocol
redis=com.alibaba.dubbo.rpc.protocol.redis.RedisProtocol
rest=com.alibaba.dubbo.rpc.protocol.rest.RestProtocol
registry=com.alibaba.dubbo.registry.integration.RegistryProtocol
qos=com.alibaba.dubbo.qos.protocol.QosProtocolWrapper

Dubbo会通过ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension() 构造一个Protocol的Adaptive类。具体流程不清楚可以看上一篇《Dubbo的SPI简介》。

重点涉及到的有如下:



1.QosProtocolWrapper:如果是注册的Invoker,则会开启QOS服务。

QoS(Quality of Service,服务质量)指一个网络能够利用各种基础技术,为指定的网络通信提供更好的服务能力,是网络的一种安全机制, 是用来解决网络延迟和阻塞等问题的一种技术。dubbo为用户提供类似的网络服务用来online和offline service来解决网络延迟,阻塞等问题。

ProtocolFilterWrapper:如果不是注册的Invoker,组装一个invoker链。

EchoFilter -> ClassLoaderFilter -> GenericFilter -> ContextFilter -> TraceFilter -> TimeoutFilter -> MonitorFilter -> ExceptionFilter

如图:



ProtocolListenerWrapper:如果不是注册的Invoker,则构建一个ListenerExporterWrapper,包含了ExporterListener集合。(Dubbo默认没有listener)。



关于Filter

Filter也是Dubbo的SPI接口。

Filter的配置com.alibaba.dubbo.rpc.Filter如下:

-- filter的配置
cache=com.alibaba.dubbo.cache.filter.CacheFilter
validation=com.alibaba.dubbo.validation.filter.ValidationFilter
echo=com.alibaba.dubbo.rpc.filter.EchoFilter
generic=com.alibaba.dubbo.rpc.filter.GenericFilter
genericimpl=com.alibaba.dubbo.rpc.filter.GenericImplFilter
token=com.alibaba.dubbo.rpc.filter.TokenFilter
accesslog=com.alibaba.dubbo.rpc.filter.AccessLogFilter
activelimit=com.alibaba.dubbo.rpc.filter.ActiveLimitFilter
classloader=com.alibaba.dubbo.rpc.filter.ClassLoaderFilter
context=com.alibaba.dubbo.rpc.filter.ContextFilter
consumercontext=com.alibaba.dubbo.rpc.filter.ConsumerContextFilter
exception=com.alibaba.dubbo.rpc.filter.ExceptionFilter
executelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter
deprecated=com.alibaba.dubbo.rpc.filter.DeprecatedFilter
compatible=com.alibaba.dubbo.rpc.filter.CompatibleFilter
timeout=com.alibaba.dubbo.rpc.filter.TimeoutFilter
trace=com.alibaba.dubbo.rpc.protocol.dubbo.filter.TraceFilter
future=com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter
monitor=com.alibaba.dubbo.monitor.support.MonitorFilter

Dubbo会通过 ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group) 方法获取Filter列表。



该方法会对Filter列表进行排序:

1.如果Filter 有order,则会根据order,从小到大排序

2.如果没有order,则会将cachedActivates中的Filter的顺序 倒排。

FAQ: 1.cachedActivates中的Filter的顺序是按照Filter的name的Hash值排序的。 2.为什么没有Filter,没有order会根据cachedActivates中的Filter的顺序 倒排 ? 具体可以看一下ActivateComparator.COMPARATOR的排序规则。具体的规则大概流程:

n1 > n2 ? 1: -1 filter1的order 不大于 filter2的order,则放在filter2的前面。

远程暴露

1.构建URL

registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&group=aaa&pid=94019®istry=zookeeper×tamp=1666446463342

2.通过proxyFactory.getInvoker,把需要暴露的服务封装为Wrapper,然后构造一个Invoker。(和本地暴露一下)。

3.将invoker封装为DelegateProviderMetaDataInvoker

4.(重要)通过protocol.export(invoker),构造一个Exporter。

具体链路如下:



RegistryPortocol

1.服务暴露(构建Dubbo 的URL 然后调用DubooProtocol的export的方法)
2.构造真正的registryURL(ZK的注册中心URL)
zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo://192.168.0.100:20890/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20890&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=62781&side=provider×tamp=1666429076800&group=aaa&pid=62781×tamp=1666429076786
3.获取ZK的Registry
4.构建Dubbo的URL
5.将invoker缓存到providerInvokers中
6.将服务注册到ZK中,并将注册表中的服务的是否已经注册(isReg) 设置为true
7.创建监听器,放入缓存中,并且订阅注册中心的override数据(2.7.X之后废弃)

DubboProtocol

1.构造exporter,然后缓存到exporterMap中。
2.开启Netty服务

5.把exporter缓存到exporters。

三、源码解析

先从ServiceBean的onApplicationEvent()方法开始

@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
  // 如果是延时暴露 或者 已经暴露 或者 不能暴露,则不暴露
  if (isDelay() && !isExported() && !isUnexported()) {
    if (logger.isInfoEnabled()) {
      logger.info("The service ready on spring started. service: " + getInterface());
    }
    // 调用ServiceConfig的export()方法
    export();
  }
}

ServiceConfig的export()方法代码如下:

public synchronized void export() {
    // 服务提供者的配置是否为空(默认为空)
    if (provider != null) {
        if (export == null) {
            export = provider.getExport();
        }
        if (delay == null) {
            delay = provider.getDelay();
        }
    }
    if (export != null && !export) {
        return;
    }
    
            // 延迟暴露的配置是否为空,不为空则延迟暴露
    if (delay != null && delay > 0) {
        delayExportExecutor.schedule(new Runnable() {
            @Override
            public void run() {
                doExport();
            }
        }, delay, TimeUnit.MILLISECONDS);
    } else {
        // 直接暴露服务
        doExport();
    }
}

ServiceConfig的doExport()方法代码如下:

protected synchronized void doExport() {
    if (unexported) {
        throw new IllegalStateException("Already unexported!");
    }
    if (exported) {
        return;
    }
    exported = true;
    if (interfaceName == null || interfaceName.length() == 0) {
        throw new IllegalStateException(" interface not allow null!");
    }
    // 校验ProviderConfig是否为空,为空则构建一个并初始化
    checkDefault();
    if (provider != null) {
        if (application == null) {
            application = provider.getApplication();
        }
        if (module == null) {
            module = provider.getModule();
        }
        if (registries == null) {
            registries = provider.getRegistries();
        }
        if (monitor == null) {
            monitor = provider.getMonitor();
        }
        if (protocols == null) {
            protocols = provider.getProtocols();
        }
    }
    if (module != null) {
        if (registries == null) {
            registries = module.getRegistries();
        }
        if (monitor == null) {
            monitor = module.getMonitor();
        }
    }
    if (application != null) {
        if (registries == null) {
            registries = application.getRegistries();
        }
        if (monitor == null) {
            monitor = application.getMonitor();
        }
    }
    if (ref instanceof GenericService) {
        interfaceClass = GenericService.class;
        if (StringUtils.isEmpty(generic)) {
            generic = Boolean.TRUE.toString();
        }
    } else {
        try {
            interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
                    .getContextClassLoader());
        } catch (ClassNotFoundException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        checkInterfaceAndMethods(interfaceClass, methods);
        checkRef();
        generic = Boolean.FALSE.toString();
    }
    if (local != null) {
        if ("true".equals(local)) {
            local = interfaceName + "Local";
        }
        Class<?> localClass;
        try {
            localClass = ClassHelper.forNameWithThreadContextClassLoader(local);
        } catch (ClassNotFoundException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        if (!interfaceClass.isAssignableFrom(localClass)) {
            throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName);
        }
    }
    if (stub != null) {
        if ("true".equals(stub)) {
            stub = interfaceName + "Stub";
        }
        Class<?> stubClass;
        try {
            stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub);
        } catch (ClassNotFoundException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        if (!interfaceClass.isAssignableFrom(stubClass)) {
            throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + interfaceName);
        }
    }
    // 检验ApplicationConfig不为空,并append系统参数
    checkApplication();
    // 检验RegistryConfig不为空,并append系统参数
    checkRegistry();
    // 检验ProtocolConfig不为空,并append系统参数
    checkProtocol();
    // append系统参数到ServiceConfig
    appendProperties(this);
    // stub、mock的合理性校验(是否有构造方法)
    checkStubAndMock(interfaceClass);
    if (path == null || path.length() == 0) {
        path = interfaceName;
    }
    // 暴露服务
    doExportUrls();
    // 构造providerModel 并缓存到providedServices中
    ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
    ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
}

调用ServiceConfig的doExportUrls()方法

private void doExportUrls() {
    // 获取配置的注册中心列表(默认为zk)
    List registryURLs = loadRegistries(true);
    // 遍历 协议列表(默认为dubbo协议)
    for (ProtocolConfig protocolConfig : protocols) {
        // 根据不同的协议 暴露服务
        doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    }
}

调用ServiceConfig的doExportUrlsFor1Protocol方法

/**
 * 真正的开始暴露服务
 * registryURLs:  registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&group=aaa&pid=4939®istry=zookeeper×tamp=1666491937481
 */
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List registryURLs) {
    // 获取协议名称,默认为dubbo
    String name = protocolConfig.getName();
    if (name == null || name.length() == 0) {
        name = "dubbo";
    }

    // 组装URL的各个参数
    Map map = new HashMap();
    // side、dubbo、timestamp、pid的参数组装
    map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
    map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
    map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
    if (ConfigUtils.getPid() > 0) {
        map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
    }
    // application、moudle、provider、protocol参数组装
    appendParameters(map, application);
    appendParameters(map, module);
    appendParameters(map, provider, Constants.DEFAULT_KEY);
    appendParameters(map, protocolConfig);
    appendParameters(map, this);
  
    // 遍历methods,然后放入map中
    if (methods != null && !methods.isEmpty()) {
        for (MethodConfig method : methods) {
            appendParameters(map, method, method.getName());
            String retryKey = method.getName() + ".retry";
            if (map.containsKey(retryKey)) {
                String retryValue = map.remove(retryKey);
                if ("false".equals(retryValue)) {
                    map.put(method.getName() + ".retries", "0");
                }
            }
            List arguments = method.getArguments();
            if (arguments != null && !arguments.isEmpty()) {
                for (ArgumentConfig argument : arguments) {
                    // convert argument type
                    if (argument.getType() != null && argument.getType().length() > 0) {
                        Method[] methods = interfaceClass.getMethods();
                        // visit all methods
                        if (methods != null && methods.length > 0) {
                            for (int i = 0; i < methods.length; i++) {
                                String methodName = methods[i].getName();
                                // target the method, and get its signature
                                if (methodName.equals(method.getName())) {
                                    Class<?>[] argtypes = methods[i].getParameterTypes();
                                    // one callback in the method
                                    if (argument.getIndex() != -1) {
                                        if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
                                            appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                                        } else {
                                            throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                        }
                                    } else {
                                        // multiple callbacks in the method
                                        for (int j = 0; j < argtypes.length; j++) {
                                            Class<?> argclazz = argtypes[j];
                                            if (argclazz.getName().equals(argument.getType())) {
                                                appendParameters(map, argument, method.getName() + "." + j);
                                                if (argument.getIndex() != -1 && argument.getIndex() != j) {
                                                    throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                                }
                                            }
                                        }
                                    }
                                }
                            }
                        }
                    } else if (argument.getIndex() != -1) {
                        appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                    } else {
                        throw new IllegalArgumentException("argument config must set index or type attribute.eg:  or ");
                    }

                }
            }
        } // end of methods for
    }
    // generic、revision、token参数组装
    if (ProtocolUtils.isGeneric(generic)) {
        map.put(Constants.GENERIC_KEY, generic);
        map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
    } else {
        String revision = Version.getVersion(interfaceClass, version);
        if (revision != null && revision.length() > 0) {
            map.put("revision", revision);
        }

        String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
        if (methods.length == 0) {
            logger.warn("NO method found in service interface " + interfaceClass.getName());
            map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
        } else {
            map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet(Arrays.asList(methods)), ","));
        }
    }
    if (!ConfigUtils.isEmpty(token)) {
        if (ConfigUtils.isDefault(token)) {
            map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
        } else {
            map.put(Constants.TOKEN_KEY, token);
        }
    }
    if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
        protocolConfig.setRegister(false);
        map.put("notify", "false");
    }
    // export service
    String contextPath = protocolConfig.getContextpath();
    if ((contextPath == null || contextPath.length() == 0) && provider != null) {
        contextPath = provider.getContextpath();
    }
    
    // 获取 host、port,并组装URL
    String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
    Integer port = this.findConfigedPorts(protocolConfig, name, map);
    URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
    
    // 默认不执行
    if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
            .hasExtension(url.getProtocol())) {
        url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
    }
    
    // 获取URL的scope
    String scope = url.getParameter(Constants.SCOPE_KEY);
    // don't export when none is configured
    // scope 不等于 none
    if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

        // export to local if the config is not remote (export to remote only when config is remote)
        // scope 不等于remote
        if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
            // 本地暴露
            exportLocal(url);
        }
        // export to remote if the config is not local (export to local only when config is local)
        // scope 不等于 local
        if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
            if (logger.isInfoEnabled()) {
                logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
            }
            // registryURLs不为空
            if (registryURLs != null && !registryURLs.isEmpty()) {
                // 循环遍历url,远程暴露服务
                for (URL registryURL : registryURLs) {
                    // 重新构造Dubbo的URL
                    url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
                    // 构造monitor的URL
                    URL monitorUrl = loadMonitor(registryURL);
                    if (monitorUrl != null) {
                        url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                    }
                    if (logger.isInfoEnabled()) {
                        logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                    }

                    // 从URL中获取代理方式
                    String proxy = url.getParameter(Constants.PROXY_KEY);
                    if (StringUtils.isNotEmpty(proxy)) {
                        registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
                    }
                    
                    // 根据 proxyFactory.getInvoker 获取invoker
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                    
                    // 服务远程暴露
                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
            } else {
                // registryURLs为空的场景
                Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                Exporter<?> exporter = protocol.export(wrapperInvoker);
                exporters.add(exporter);
            }
        }
    }
    this.urls.add(url);
}

本地暴露

ServiceConfig的exportLocal方法代码如下:

Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
/**
 * url: dubbo://192.168.0.100:20890/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20890&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=12906&side=provider×tamp=1666493114123
 */
@SuppressWarnings({"unchecked", "rawtypes"})
private void exportLocal(URL url) {
    // protocol 不等于 injvm的话
    if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
        // 构造本地暴露的URL
        // injvm://127.0.0.1/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20890&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=12906&side=provider×tamp=1666493114123
        URL local = URL.valueOf(url.toFullString())
                .setProtocol(Constants.LOCAL_PROTOCOL)
                .setHost(LOCALHOST)
                .setPort(0);
        ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));
        // 执行protocol.export方法 本地暴露(重要)
        Exporter<?> exporter = protocol.export(
                proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
        // 放入缓存中
        exporters.add(exporter);
        logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
    }
}

此处的protocol是Dubbo的SPI接口,是根据ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension() 方法获取的,会先获取适配次Adaptive。具体SPI相关的代码请看上一篇《Dubbo的SPI机制简介解析》。

关于本地暴露的export的流程:



Protocol$Adaptive的export方法

@Override
public Exporter export(Invoker arg0) throws RpcException {
    // 判空校验
    if (arg0 == null) {
        throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
    }
    // url判空检验
    if (arg0.getUrl() == null) {
        throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
    }
    // 如果协议参数为空,则默认为dubbo
    URL url = arg0.getUrl();
    String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
    if (extName == null) {
        throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
    }
    // 根据协议获取protocol的服务提供者,getExtension方法会先获取包装类(具体请看上一篇《Dubbo SPI核心源码解析》)
    Protocol extension = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
    // 再调用export方法
    return extension.export(arg0);
}

QosProtocolWrapper的export方法

@Override
public  Exporter export(Invoker invoker) throws RpcException {
    // 如果protocol等于registry
    if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
        // 则开启qos
        startQosServer(invoker.getUrl());
        return protocol.export(invoker);
    }
    // 不等于registry
    return protocol.export(invoker);
}

ProtocolListenerWrapper的export方法

@Override
public  Exporter export(Invoker invoker) throws RpcException {
    // 如果protocol 等于 registry
    if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
        // 继续调用export
        return protocol.export(invoker);
    }
    // 如果 protocol 不等于 registry,则根据Dubbo的SPI 获取ExporterListener的服务提供者列表, 然后构造ListenerExporterWrapper。
    return new ListenerExporterWrapper(protocol.export(invoker),
            Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                    .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
}

InjvmProtocol的export方法

@Override
public  Exporter export(Invoker invoker) throws RpcException {
    // 构造InjvmExporter
    return new InjvmExporter(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}

远程暴露

此处的protocol和本地暴露也是一样的。

远程暴露的流程



RegistryProtocol的export方法

@Override
public  Exporter export(final Invoker originInvoker) throws RpcException {
    //export invoker
    // 真正的服务暴露方法
    final ExporterChangeableWrapper exporter = doLocalExport(originInvoker);

    // 获取注册中心的URL
    /**
     * 构造zk注册中心的URL
     * zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo://192.168.0.100:20890/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20890&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=28778&side=provider×tamp=1666495390735&group=aaa&pid=28778×tamp=1666495390726
     */
    URL registryUrl = getRegistryUrl(originInvoker);

    // 获取注册中心ZookeeperRegistry
    final Registry registry = getRegistry(originInvoker);

    /**
     * 构造需要注册到ZK上的URL
     * dubbo://192.168.0.100:20890/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=28778&side=provider×tamp=1666495390735
     */
    final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);

    // 是否需要注册
    boolean register = registedProviderUrl.getParameter("register", true);
    
    //添加到注册表
    ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);

    // 如果需要注册的话,则注册服务,并将注册表中的服务的是否已经注册(isReg) 设置为true
    if (register) {
        register(registryUrl, registedProviderUrl);
        ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
    }

    // Subscribe the override data
    // 订阅重写数据,在2.7.0之后的版本废弃,所以不做详细分析
    // 获取重写订阅的URL
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
    // 构造重写订阅的监听器
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    // 订阅注册中心的重写数据
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    // 返回构造的exporter
    return new DestroyableExporter(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl);
}

订阅ZK具体有啥用:

Dubbo提供了动态配置的功能,当服务发布后,用户可以通过管理中心来进行动态配置,提供者可以感知这些配置的变化并更新到本地,如果需要将服务合并配置后重新发布。如当我们使用dubbo-admin 进行动态配置时,dubbo-admin 会在注册中心或配置中心创建配置节点(Dubbo 2.6 没有配置中心,会在注册中心上创建,而 Dubbo 2.7 则是在配置中心上创建),提供者只需要监听这些节点的变化即可。监听的首要工作是生成一个 URL 代表监听的节点信息,即本步的工作。

RegistryProtocol的doLocalExport方法

/**
 * registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo://192.168.0.100:20890/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20890&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=28778&side=provider×tamp=1666495390735&group=aaa&pid=28778®istry=zookeeper×tamp=1666495390726
 */
private  ExporterChangeableWrapper doLocalExport(final Invoker originInvoker) {
    /**
     * 获取参数export的值
     * dubbo://192.168.0.100:20890/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20890&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=28778&side=provider×tamp=1666495390735
     */
    String key = getCacheKey(originInvoker);
    ExporterChangeableWrapper exporter = (ExporterChangeableWrapper) bounds.get(key);
    // 默认为空(DCL 双重校验)
    if (exporter == null) {
        synchronized (bounds) {
            exporter = (ExporterChangeableWrapper) bounds.get(key);
            if (exporter == null) {
                // 构造invoker
                final Invoker<?> invokerDelegete = new InvokerDelegete(originInvoker, getProviderUrl(originInvoker));
                // 调用DubboProtocol的export方法
                exporter = new ExporterChangeableWrapper((Exporter) protocol.export(invokerDelegete), originInvoker);
                bounds.put(key, exporter);
            }
        }
    }
    return exporter;
}

DubboProtocol的export方法

/**
 * 1.构造exporter,然后缓存到exporterMap中。
 * 2.开启Netty服务
 * 3.序列化URL(Dubbo默认不处理)
 * URL: dubbo://192.168.0.100:20890/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20890&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=53733&side=provider×tamp=1666500518340
 */
@Override
public  Exporter export(Invoker invoker) throws RpcException {
    URL url = invoker.getUrl();

    // export service.
    // 获取需要暴露的服务(org.apache.dubbo.samples.api.client.HelloService:20890)
    String key = serviceKey(url);
    // 构造Dubbo的exporter
    DubboExporter exporter = new DubboExporter(invoker, key, exporterMap);
    // 翻入缓存
    exporterMap.put(key, exporter);

    //export an stub service for dispatching event
    // 如果是dubbo.stub.event或者is_callback_service这两个参数的值,没有的话默认为false(Dubbo默认流程不执行)
    Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
    Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
    if (isStubSupportEvent && !isCallbackservice) {
        String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
        if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
            if (logger.isWarnEnabled()) {
                logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                        "], has set stubproxy support event ,but no stub methods founded."));
            }
        } else {
            stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
        }
    }
    
    // 开启服务
    openServer(url);
    // 序列化URL
    optimizeSerialization(url);
    return exporter;
}

Dubbo开启Netty服务的核心代码流程

DubboProtocol的openServer方法

private void openServer(URL url) {
    // find server.
    // 获取 address: 192.168.0.100:20890
    String key = url.getAddress();
    // 判断URL的isserver的值,默认为true
    boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
    if (isServer) {
        // 判断server是否为空,第一次执行为空,后续不为空
        ExchangeServer server = serverMap.get(key);
        if (server == null) {
            // 构造一个netty服务。并缓存到serverMap中
            serverMap.put(key, createServer(url));
        } else {
            // server supports reset, use together with override
            server.reset(url);
        }
    }
}
/**
 * 构造一个netty服务
 */
private ExchangeServer createServer(URL url) {
    // 设置URL的channel.readonly.sent为true
    url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
    // 设置URL的heartbeat 为60s
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
    // 获取URL的server,默认为netty
    String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
        throw new RpcException("Unsupported server type: " + str + ", url: " + url);
    // 设置URL的codec为dubbo
    url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
    ExchangeServer server;
    try {
        // 真正的开始netty服务
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }
    str = url.getParameter(Constants.CLIENT_KEY);
    if (str != null && str.length() > 0) {
        Set supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
        if (!supportedTypes.contains(str)) {
            throw new RpcException("Unsupported client type: " + str);
        }
    }
    return server;
}

Exchangers的bind的方法

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handler == null) {
        throw new IllegalArgumentException("handler == null");
    }
    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
    // 获取Exchager,并绑定URL 和handler
    return getExchanger(url).bind(url, handler);
}

Exchangers的getExchanger方法,最后获取到HeaderExchanger服务提供者

/**
 * 获取Exchanger,通过Dubbo的SPI 获取
 */
public static Exchanger getExchanger(URL url) {
    // 默认类型为header
    String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
    return getExchanger(type);
}

/**
 * 通过SPI获取Exchagner --> HeaderExchanger
 */
public static Exchanger getExchanger(String type) {
    return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}

调用HeaderExchanger的bind方法

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
  	// 调用Transporters的bind方法获取Server
    return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handlers == null || handlers.length == 0) {
        throw new IllegalArgumentException("handlers == null");
    }
    ChannelHandler handler;
    if (handlers.length == 1) {
        handler = handlers[0];
    } else {
        handler = new ChannelHandlerDispatcher(handlers);
    }
    // 通过Dubbo 的SPI 获取Transporter,并绑定URL和Handler
    return getTransporter().bind(url, handler);
}

获取Transporter的服务提供者(通过Dubbo的SPI获取)

// 通过Dubbo的SPI获取Transporter
public static Transporter getTransporter() {
    return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}

NettyTransporter的bind方法

public class NettyTransporter implements Transporter {

    public static final String NAME = "netty";

    @Override
    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }

    @Override
    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);
    }

}

new NettyServer的代码逻辑

public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
    super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}

/**
 * netty真正启动逻辑
 */
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
    super(url, handler);
    localAddress = getUrl().toInetSocketAddress();

    String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
    int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
    if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
        bindIp = NetUtils.ANYHOST;
    }
    bindAddress = new InetSocketAddress(bindIp, bindPort);
    this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
    this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
    try {
        // 初始化ServerBootstrap,并绑定address
        doOpen();
        if (logger.isInfoEnabled()) {
            logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
        }
    } catch (Throwable t) {
        throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
    }
    //fixme replace this with better method
    DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
    executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}

@Override
protected void doOpen() throws Throwable {
    NettyHelper.setNettyLoggerFactory();
    ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
    ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
    ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
    bootstrap = new ServerBootstrap(channelFactory);

    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    channels = nettyHandler.getChannels();
    // https://issues.jboss.org/browse/NETTY-365
    // https://issues.jboss.org/browse/NETTY-379
    // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
    bootstrap.setOption("child.tcpNoDelay", true);
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        @Override
        public ChannelPipeline getPipeline() {
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
            ChannelPipeline pipeline = Channels.pipeline();
            /*int idleTimeout = getIdleTimeout();
            if (idleTimeout > 10000) {
                pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
            }*/
            pipeline.addLast("decoder", adapter.getDecoder());
            pipeline.addLast("encoder", adapter.getEncoder());
            pipeline.addLast("handler", nettyHandler);
            return pipeline;
        }
    });
    // bind
    channel = bootstrap.bind(getBindAddress());
}

向Zookeeper注册服务

RegisteryProtocol的register方法

/**
 * 注册中心URL:zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo://192.168.0.100:20890/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20890&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=76029&side=provider×tamp=1666504784182&group=aaa&pid=76029×tamp=1666504784173
 * 服务的URL: dubbo://192.168.0.100:20890/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=76029&side=provider×tamp=1666504784182
 */
public void register(URL registryUrl, URL registedProviderUrl) {
    // 根据Dubbo 的SPI的获取注册中心(默认为ZookerRegistry,调用new ZookerRegistry()方法)
    Registry registry = registryFactory.getRegistry(registryUrl);
    // 调用ZK的注册中心注册 服务
    registry.register(registedProviderUrl);
}

ZookeeperRegistry的register方法:

因为ZookeeperRegistry继承了FailbackRegistry,所以看FailbackRegistry的register方法

@Override
public void register(URL url) {
    // 把URL缓存到registered中
    super.register(url);
    failedRegistered.remove(url);
    failedUnregistered.remove(url);
    try {
        // Sending a registration request to the server side
        // 真正注册URL
        doRegister(url);
    } catch (Exception e) {
        Throwable t = e;

        // If the startup detection is opened, the Exception is thrown directly.
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                && url.getParameter(Constants.CHECK_KEY, true)
                && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
        boolean skipFailback = t instanceof SkipFailbackWrapperException;
        if (check || skipFailback) {
            if (skipFailback) {
                t = t.getCause();
            }
            throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
        } else {
            logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }

        // Record a failed registration request to a failed list, retry regularly
        failedRegistered.add(url);
    }
}

ZookeeperRegistry的doRegister方法

@Override
protected void doRegister(URL url) {
    try {
        // 调用zkClient的create方法创建节点
        zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

Over.

展开阅读全文

页面更新:2024-05-15

标签:流程   重写   提供者   缓存   协议   参数   代码   方法   中心   信息

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top