Nacos2.1.0源码解析-附带全流程图


一、概括

上篇文章解析了一下Nacos1.4.1的源码流程及全流程图,包括客户端注册/发现/删除/集群通信/心跳等功能。本次拿Nacos2.1.0与其做个对比,最大的区别就是Nacos由http通信变为rpc通信,且增加很多事件处理功能。

二、客户端服务注册

服务注册功能大部分跟1.4.1很像可以参考https://www.toutiao.com/article/7174284508034974240/?log_from=0128e8ae14ea9_1670654861328,不同的地方是进入NacosNamingService的registerInstance函数后调用clientProxy.registerService(此函数就是通过发送RPC注册服务请求给server端注册服务),代码及流程图(流程图是完整流程图中截取的一部分,完整流程图参考七、流程图 )如下:

@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    NamingUtils.checkInstanceIsLegal(instance);
   // clientProxy是NamingClientProxyDelegate,在NacosNamingService初始化时调用它的init时创建的
    clientProxy.registerService(serviceName, groupName, instance);//进行service注册
}
@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
  // 临时实例 getExecuteClientProxy(instance)=NamingGrpcClientProxy
  getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
}
@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName,
            instance);
  //将serviceName/groupNmae/instance通过InstanceRedoData对象关联并放到registeredInstances缓存中
    redoService.cacheInstanceForRedo(serviceName, groupName, instance);
    doRegisterService(serviceName, groupName, instance);//执行注册操作
}
public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
  //封装  InstanceRequest 对象 RPC调用服务端交互对象
  InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
            NamingRemoteConstants.REGISTER_INSTANCE, instance);
    requestToServer(request, Response.class);//服务注册请求通过RPC发送给客户端,最后会封装Payload对象通过存根对象grpcFutureServiceStub发送给server
    redoService.instanceRegistered(serviceName, groupName);//从registeredInstances缓存中获取InstanceRedoData对象并设置注册状态为true
}

客户端服务注册流程

三、客户端服务发现

服务发现是服务第一次调用客户端时根据服务名获取,会先从缓存serviceInfoMap中获取,若缓存中为空,则通过RPC发送SubscribeServiceRequest对象,请求服务端获取服务实例列表,代码和流程图(流程图是完整流程图中截取的一部分,完整流程图参考七、流程图 如下:

@Override
public List selectInstances(String serviceName, String groupName, List clusters, boolean healthy,
        boolean subscribe) throws NacosException {
    ServiceInfo serviceInfo;
    String clusterString = StringUtils.join(clusters, ",");
    if (subscribe) {//是否订阅,默认是true
      //从缓存中获取服务实例列表,第一次肯定是空,具体逻辑见下面代码
        serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
        if (null == serviceInfo) {
          //服务订阅,大致分三步,1、执行定时任务,定时从注册中心获取服务实例,并存到本地缓存serviceInfoMap中
          //2、发送RPC订阅请求server 3、缓存第二步得到的信息到本地缓存map中,并发布事件缓存到本地文件
          serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
        }
    } else {
        serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
    }
    return selectInstances(serviceInfo, healthy);//获取ServiceInfo中instance实例集合,循环排除掉不满足healthy的实例
}
public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) {
    NAMING_LOGGER.debug("failover-mode: {}", failoverReactor.isFailoverSwitch());
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);//获取组名
    String key = ServiceInfo.getKey(groupedServiceName, clusters);//通过组名和集群名组装成缓存的key
    if (failoverReactor.isFailoverSwitch()) {
        return failoverReactor.getService(key);//如果开启了failover 策略 就从failoverReactor获取服务信息
    }
    return serviceInfoMap.get(key);//从缓存中获取
}
@Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
    NAMING_LOGGER.info("[SUBSCRIBE-SERVICE] service:{}, group:{}, clusters:{} ", serviceName, groupName, clusters);
    String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
    String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);//service名、组名、集群名组合成key
   // 执行定时任务UpdateTask,定时从注册中心获取服务实例,并存到本地缓存serviceInfoMap中
   serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
    ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);//再次从缓存中获取有没有要订阅的服务信息
    if (null == result || !isSubscribed(serviceName, groupName, clusters)) {
        //发送RPC订阅请求server,获取服务信息
        result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
    }
  //缓存result的信息到本地缓存serviceInfoMap中,并发布事件InstancesChangeEvent,缓存到本地文件DiskCache.write
    serviceInfoHolder.processServiceInfo(result);
    return result;
}
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
    if (NAMING_LOGGER.isDebugEnabled()) {
        NAMING_LOGGER.debug("[GRPC-SUBSCRIBE] service:{}, group:{}, cluster:{} ", serviceName, groupName, clusters);
    }
  //缓存SubscriberRedoData信息到订阅者缓存subscribes中
    redoService.cacheSubscriberForRedo(serviceName, groupName, clusters);
    return doSubscribe(serviceName, groupName, clusters);//发送RPC请求
}
public ServiceInfo doSubscribe(String serviceName, String groupName, String clusters) throws NacosException {
    SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, groupName, serviceName, clusters,
            true);//封装SubscribeServiceRequest对象,为了rpc请求服务端订阅服务信息
    //下面这一步跟注册是一样的,只不过请求对象换成SubscribeServiceRequest
    SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class);
    redoService.subscriberRegistered(serviceName, groupName, clusters);//从服务端获取完信息后,更改SubscriberRedoData对象Registered状态为true
    return response.getServiceInfo();
}

客户端订阅服务流程图

四、服务端注册流程

自改为RPC调用后,服务端处理的逻辑处理类改为Handler类(rpcn内部原理,有很多个handler对应客户端传的request对象),因客户端注册时发送的InstanceRequest故服务端在InstanceRequestHandler,执行handle方法,分别获取service对象Client对象及InstancePublishInfo对象,放服务信息到publishers缓存,发布客户端变化事件,源代码和流程图流程图是完整流程图中截取的一部分,完整流程图参考七、流程图 )如下:

private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) {
    clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());//注册表注册实例
    return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
}
@Override
public void registerInstance(Service service, Instance instance, String clientId) {
  //获取Service对象,singletonRepository缓存中获得service对象,没有的话放入,namespaceSingletonMaps放入service,key是服务对应的命名空间名称
    Service singleton = ServiceManager.getInstance().getSingleton(service);
    if (!singleton.isEphemeral()) {//必须是临时实例
        throw new NacosRuntimeException(NacosException.INVALID_PARAM,
                String.format("Current service %s is persistent service, can't register ephemeral instance.",
                        singleton.getGroupedServiceName()));
    }
    Client client = clientManager.getClient(clientId);//通过clientid(也就是两个客户端和服务端的连接id)获取Client对象,此对象是rpc必须封装的,通过EphemeralIpPortClientManager的clients来get的
    if (!clientIsLegal(client, clientId)) {
        return;
    }
    InstancePublishInfo instanceInfo = getPublishInfo(instance);//封装InstancePublishInfo信息
    //缓存服务Service和InstancePublishInfo到publishers缓存,其中key是service,然后发布客户端变化事件
    client.addServiceInstance(singleton, instanceInfo);
    client.setLastUpdatedTime();
    //发布客户端服务注册事件
    NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
   //发布实例元数据变动事件  
   NotifyCenter
            .publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}
//调用IpPortBasedClient的addServiceInstance方法,super再掉其父类AbstractClient
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
    return super.addServiceInstance(service, parseToHealthCheckInstance(instancePublishInfo));
}
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
  ///缓存服务Service和InstancePublishInfo到publishers注册者缓存  
  if (null == publishers.put(service, instancePublishInfo)) {
        MetricsMonitor.incrementInstanceCount();//监控器,自加1,统计注册实例个数
    }
  //发布客户端变化事件,细节流程见  六、流程中的事件处理
    NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
    Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
    return true;
}

服务端注册流程

五、服务端订阅流程(服务查询流程跟这个类似)

服务端订阅流程通过SubscribeServiceRequestHandler处理,进入handle方法,大致分两步,第一步,从缓存serviceDataIndexes中获取,缓中没有时从注册表获得服务信息,获取实例放到service中,缓存集群信息,最后返回实例集合并且缓存到serviceDataIndexes中,第二步,把service和订阅者放到subscribers缓存中并且发布订阅事件,代码和流程图(流程图是完整流程图中截取的一部分,完整流程图参考七、流程图 如下:

@Override
@Secured(action = ActionTypes.READ)
public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException {
    String namespaceId = request.getNamespace();//获取命名空间名称
    String serviceName = request.getServiceName();//获取服务名字
    String groupName = request.getGroupName();//获取组名
    String app = request.getHeader("app", "unknown");
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);//整合service名和组名
    Service service = Service.newService(namespaceId, groupName, serviceName, true);
   //封装订阅者对象
    Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app, meta.getClientIp(),
            namespaceId, groupedServiceName, 0, request.getClusters());
  //获取ServiceInfo对象信息,主要看serviceStorage.getData(service)
    ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service),
            metadataManager.getServiceMetadata(service).orElse(null), subscriber);
    if (request.isSubscribe()) {//是否是订阅请求
      //把service和订阅者放到subscribers缓存中并且发布订阅事件ClientSubscribeServiceEvent
        clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());
    } else {
        clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());
    }
    return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
}
public ServiceInfo getData(Service service) {
  //先从serviceDataIndexes缓存中获取ServiceInfo信息,没有的话调用getPushData(service)获取
        return serviceDataIndexes.containsKey(service) ? serviceDataIndexes.get(service) : getPushData(service);
    }
public ServiceInfo getPushData(Service service) {
    ServiceInfo result = emptyServiceInfo(service);//创建ServiceInfo
    if (!ServiceManager.getInstance().containSingleton(service)) {
        return result;
    }
    result.setHosts(getAllInstancesFromIndex(service));//从注册表获得服务信息,获取实例放到service中,缓存集群信息,最后返回实例集合
    serviceDataIndexes.put(service, result);//缓存服务信息到serviceDataIndexes
    return result;
}
public void subscribeService(Service service, Subscriber subscriber, String clientId) {
  //从singletonRepository中获取Service对象
    Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);
    Client client = clientManager.getClient(clientId);//从clients缓存中获取Client对象
    if (!clientIsLegal(client, clientId)) {
        return;
    }
    client.addServiceSubscriber(singleton, subscriber);//添加服务和订阅者
    client.setLastUpdatedTime();//添加上次更新时间
   //发布客户端订阅服务事件
    NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));
}

服务端订阅流程

六、流程中的事件处理

1、发布客户端变化事件:调用DistroClientDataProcessor的onEvent函数,分两部分,第一,向定时任务NacosDelayTaskExecuteEngine的tasks集合中添加定时任务DistroDelayTask,第二,调用集群的其他节点发送DistroDataRequest,用来同步数据,发布发布客户端注册或注销事件、发布客户端断连事件,源码和流程图流程图是完整流程图中截取的一部分,完整流程图参考七、流程图 )如下:

private void syncToAllServer(ClientEvent event) {
    Client client = event.getClient();
    // Only ephemeral data sync by Distro, persist client should sync by raft.
    if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
        return;
    }
    if (event instanceof ClientEvent.ClientDisconnectEvent) {
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        distroProtocol.sync(distroKey, DataOperation.DELETE);//删除
    } else if (event instanceof ClientEvent.ClientChangedEvent) {
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        distroProtocol.sync(distroKey, DataOperation.CHANGE);//客户端改变
    }
}
public void sync(DistroKey distroKey, DataOperation action, long delay) {
    for (Member each : memberManager.allMembersWithoutSelf()) {
        syncToTarget(distroKey, action, each.getAddress(), delay);//同步数据给其他集群
    }
}
public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {
    DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
            targetServer);
    DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
  //向定时任务NacosDelayTaskExecuteEngine的tasks集合中添加定时任务DistroDelayTask,最终会执行
  //NacosDelayTaskExecuteEngine是Nacos的定时任务引擎,最终会执行子类的process方法
  //这一步会执行DistroDelayTaskProcessor.process
    distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
    if (Loggers.DISTRO.isDebugEnabled()) {
        Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);
    }
}
public boolean process(NacosTask task) {
    if (!(task instanceof DistroDelayTask)) {
        return true;
    }
    DistroDelayTask distroDelayTask = (DistroDelayTask) task;
    DistroKey distroKey = distroDelayTask.getDistroKey();
    switch (distroDelayTask.getAction()) {
        case DELETE://删除
            DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);
            distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);
            return true;
        case CHANGE://修改
        case ADD://添加
           //执行DistroSyncChangeTask父类的run,然后执行DistroSyncChangeTask的doExecuteWithCallback方法
            DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
            distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
            return true;
        default:
            return false;
    }
}
protected void doExecuteWithCallback(DistroCallback callback) {
        String type = getDistroKey().getResourceType();
        DistroData distroData = getDistroData(type);//根据类型获取Distro协议数据
        if (null == distroData) {
            Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());
            return;
        }
        getDistroComponentHolder().findTransportAgent(type)
                .syncData(distroData, getDistroKey().getTargetServer(), callback);//发送给其他集群节点数据
    }
public void syncData(DistroData data, String targetServer, DistroCallback callback) {
    if (isNoExistTarget(targetServer)) {
        callback.onSuccess();
        return;
    }
    DistroDataRequest request = new DistroDataRequest(data, data.getType());//封装DistroDataRequest
    Member member = memberManager.find(targetServer);
    try {
      //集群间RPC通信发送DistroDataRequest,再进入DistroDataRequestHandler.handle方法,再根据不同的操作到不同的方法,改变是handleSyncData方法
      //然后DistroProtocol的执行onReceive方法,再执行DistroClientDataProcessor的processData方法
        clusterRpcClientProxy.asyncRequest(member, request, new DistroRpcCallbackWrapper(callback, member));
    } catch (NacosException nacosException) {
        callback.onFailed(nacosException);
    }
}
public boolean processData(DistroData distroData) {
        switch (distroData.getType()) {
            case ADD:
            case CHANGE:
                ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class)
                        .deserialize(distroData.getContent(), ClientSyncData.class);
                handlerClientSyncData(clientSyncData);//同步客户端连接,更新clients注册表数据,发布客户端注册或注销事件,详见下面源码及流程图
                return true;
            case DELETE:
                String deleteClientId = distroData.getDistroKey().getResourceKey();
                Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);
                clientManager.clientDisconnected(deleteClientId);//clients.remove移除客户端,并发布发布客户端断连事件,详见下面源码及流程图
                return true;
            default:
                return false;
        }
    }

客户端变化事件流程图

2、客户端注册事件和客户端订阅事件和客户端断连事件:前两个事件都属于是ClientOperationEvent事件,后面的是ClientDisconnectEvent事件,都会执行ClientServiceIndexesManager.onEvent方法,然后根据不同的操作进入不同的方法,注册、注销、订阅、取消订阅、断连,源码和流程图流程图是完整流程图中截取的一部分,完整流程图参考七、流程图 )如下:

public void onEvent(Event event) {
    if (event instanceof ClientEvent.ClientDisconnectEvent) {
        handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);//执行客户端断连事件
    } else if (event instanceof ClientOperationEvent) {
        handleClientOperation((ClientOperationEvent) event);//注册、注销、订阅、取消订阅事件
    }
}
//执行客户端断连事件
private void handleClientDisconnect(ClientEvent.ClientDisconnectEvent event) {
    Client client = event.getClient();
    for (Service each : client.getAllSubscribeService()) {
        removeSubscriberIndexes(each, client.getClientId());//注册表中移除客户端id,下面会解析
    }
    for (Service each : client.getAllPublishedService()) {
        removePublisherIndexes(each, client.getClientId());//订阅表也移除客户端id,下面会解析
    }
}
//根据事件具体分类区分注册、注销、订阅、取消订阅事件
private void handleClientOperation(ClientOperationEvent event) {
    Service service = event.getService();
    String clientId = event.getClientId();
    if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
        addPublisherIndexes(service, clientId);//注册服务
    } else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
        removePublisherIndexes(service, clientId);//注销服务
    } else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
        addSubscriberIndexes(service, clientId);//添加订阅
    } else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
        removeSubscriberIndexes(service, clientId);//取消订阅
    }
}
//注册表publisherIndexes添加客户端id,发布服务改变事件
private void addPublisherIndexes(Service service, String clientId) {
        publisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
        publisherIndexes.get(service).add(clientId);
        NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
    }
    //注册表publisherIndexes移除客户端id,发布服务改变事件
    private void removePublisherIndexes(Service service, String clientId) {
        if (!publisherIndexes.containsKey(service)) {
            return;
        }
        publisherIndexes.get(service).remove(clientId);
        NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
    }
    //订阅表subscriberIndexes添加客户端id,发布服务订阅事件
    private void addSubscriberIndexes(Service service, String clientId) {
        subscriberIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
        if (subscriberIndexes.get(service).add(clientId)) {
            NotifyCenter.publishEvent(new ServiceEvent.ServiceSubscribedEvent(service, clientId));
        }
    }
     //订阅表subscriberIndexes移除客户端id
    private void removeSubscriberIndexes(Service service, String clientId) {
        if (!subscriberIndexes.containsKey(service)) {
            return;
        }
        subscriberIndexes.get(service).remove(clientId);
        if (subscriberIndexes.get(service).isEmpty()) {
            subscriberIndexes.remove(service);
        }
    }

客户端注册事件事件和客户端订阅事件和客户端断连事件流程

3、服务改变事件和服务订阅事件:向定时任务NacosDelayTaskExecuteEngine的tasks集合中添加定时任务PushDelayTask,通过参数区分,订阅只推送给当前订阅者,注册时推送给所有订阅者,都会执行NamingSubscriberServiceV2Impl.onEvent方法,然后执行PushExecuteTask.run,最后会通过rpc请求客户端推送最新的信息,客户端缓存起来,请求对象是ServerRequest,源码和流程图流程图是完整流程图中截取的一部分,完整流程图参考七、流程图 )如下:

public void onEvent(Event event) {
    if (!upgradeJudgement.isUseGrpcFeatures()) {
        return;
    }
    if (event instanceof ServiceEvent.ServiceChangedEvent) {
        //如果 service发生变化, 推送给所有的订阅者.new PushDelayTask只有两个参数
        ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
        Service service = serviceChangedEvent.getService();
        //添加任务
        delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
    } else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {
        //如果订阅变化, 推送给当前的订阅者.new PushDelayTask有3个参数,最终subscribedEvent.getClientId()会一直传下去,有一个地方有区分
        ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;
        Service service = subscribedEvent.getService();
            //添加任务
        delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(),
                subscribedEvent.getClientId()));
    }
}

NacosDelayTaskExecuteEngine初始化时产生定时任务,在processTasks()取任务处理,这儿代码相对简单不在分析,最终会进入PushDelayTaskExecuteEngine.PushDelayTaskProcessor.process方法。

public boolean process(NacosTask task) {
    PushDelayTask pushDelayTask = (PushDelayTask) task;
    Service service = pushDelayTask.getService();
  //执行任务PushExecuteTask,执行PushExecuteTask.run,循环得到ClientID,获取订阅者对象,推送给客户端变化数据
  //在进入PushExecutorRpcImpl.doPushWithCallback,再到RpcPushService.pushWithCallback
  //通过connection.asyncRequest发送ServerRequest到客户端,然后客户端获取service信息放到本地缓存serviceInfoMap中
  //并发布服务改变InstancesChangeEvent事件,最终写到本地缓存文件中
  NamingExecuteTaskDispatcher.getInstance()
            .dispatchAndExecuteTask(service, new PushExecuteTask(service, executeEngine, pushDelayTask));
    return true;
}

服务改变事件和服务订阅事件流程图




七、流程图

完整流程图链接:https://kdocs.cn/l/cucHfYowAItw

Nacos2.1.0源码解析完整流程图

展开阅读全文

页面更新:2024-06-08

标签:流程图   缓存   服务端   源码   客户端   实例   流程   对象   完整   事件   信息

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top