一、概括
上篇文章解析了一下Nacos1.4.1的源码流程及全流程图,包括客户端注册/发现/删除/集群通信/心跳等功能。本次拿Nacos2.1.0与其做个对比,最大的区别就是Nacos由http通信变为rpc通信,且增加很多事件处理功能。
二、客户端服务注册
服务注册功能大部分跟1.4.1很像可以参考https://www.toutiao.com/article/7174284508034974240/?log_from=0128e8ae14ea9_1670654861328,不同的地方是进入NacosNamingService的registerInstance函数后调用clientProxy.registerService(此函数就是通过发送RPC注册服务请求给server端注册服务),代码及流程图(流程图是完整流程图中截取的一部分,完整流程图参考七、流程图 )如下:
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
// clientProxy是NamingClientProxyDelegate,在NacosNamingService初始化时调用它的init时创建的
clientProxy.registerService(serviceName, groupName, instance);//进行service注册
}
@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
// 临时实例 getExecuteClientProxy(instance)=NamingGrpcClientProxy
getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
}
@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName,
instance);
//将serviceName/groupNmae/instance通过InstanceRedoData对象关联并放到registeredInstances缓存中
redoService.cacheInstanceForRedo(serviceName, groupName, instance);
doRegisterService(serviceName, groupName, instance);//执行注册操作
}
public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
//封装 InstanceRequest 对象 RPC调用服务端交互对象
InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
NamingRemoteConstants.REGISTER_INSTANCE, instance);
requestToServer(request, Response.class);//服务注册请求通过RPC发送给客户端,最后会封装Payload对象通过存根对象grpcFutureServiceStub发送给server
redoService.instanceRegistered(serviceName, groupName);//从registeredInstances缓存中获取InstanceRedoData对象并设置注册状态为true
}
三、客户端服务发现
服务发现是服务第一次调用客户端时根据服务名获取,会先从缓存serviceInfoMap中获取,若缓存中为空,则通过RPC发送SubscribeServiceRequest对象,请求服务端获取服务实例列表,代码和流程图(流程图是完整流程图中截取的一部分,完整流程图参考七、流程图 )如下:
@Override
public List selectInstances(String serviceName, String groupName, List clusters, boolean healthy,
boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
String clusterString = StringUtils.join(clusters, ",");
if (subscribe) {//是否订阅,默认是true
//从缓存中获取服务实例列表,第一次肯定是空,具体逻辑见下面代码
serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
if (null == serviceInfo) {
//服务订阅,大致分三步,1、执行定时任务,定时从注册中心获取服务实例,并存到本地缓存serviceInfoMap中
//2、发送RPC订阅请求server 3、缓存第二步得到的信息到本地缓存map中,并发布事件缓存到本地文件
serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
}
} else {
serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
}
return selectInstances(serviceInfo, healthy);//获取ServiceInfo中instance实例集合,循环排除掉不满足healthy的实例
}
public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: {}", failoverReactor.isFailoverSwitch());
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);//获取组名
String key = ServiceInfo.getKey(groupedServiceName, clusters);//通过组名和集群名组装成缓存的key
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);//如果开启了failover 策略 就从failoverReactor获取服务信息
}
return serviceInfoMap.get(key);//从缓存中获取
}
@Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
NAMING_LOGGER.info("[SUBSCRIBE-SERVICE] service:{}, group:{}, clusters:{} ", serviceName, groupName, clusters);
String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);//service名、组名、集群名组合成key
// 执行定时任务UpdateTask,定时从注册中心获取服务实例,并存到本地缓存serviceInfoMap中
serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);//再次从缓存中获取有没有要订阅的服务信息
if (null == result || !isSubscribed(serviceName, groupName, clusters)) {
//发送RPC订阅请求server,获取服务信息
result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
}
//缓存result的信息到本地缓存serviceInfoMap中,并发布事件InstancesChangeEvent,缓存到本地文件DiskCache.write
serviceInfoHolder.processServiceInfo(result);
return result;
}
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("[GRPC-SUBSCRIBE] service:{}, group:{}, cluster:{} ", serviceName, groupName, clusters);
}
//缓存SubscriberRedoData信息到订阅者缓存subscribes中
redoService.cacheSubscriberForRedo(serviceName, groupName, clusters);
return doSubscribe(serviceName, groupName, clusters);//发送RPC请求
}
public ServiceInfo doSubscribe(String serviceName, String groupName, String clusters) throws NacosException {
SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, groupName, serviceName, clusters,
true);//封装SubscribeServiceRequest对象,为了rpc请求服务端订阅服务信息
//下面这一步跟注册是一样的,只不过请求对象换成SubscribeServiceRequest
SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class);
redoService.subscriberRegistered(serviceName, groupName, clusters);//从服务端获取完信息后,更改SubscriberRedoData对象Registered状态为true
return response.getServiceInfo();
}
四、服务端注册流程
自改为RPC调用后,服务端处理的逻辑处理类改为Handler类(rpcn内部原理,有很多个handler对应客户端传的request对象),因客户端注册时发送的InstanceRequest故服务端在InstanceRequestHandler,执行handle方法,分别获取service对象Client对象及InstancePublishInfo对象,放服务信息到publishers缓存,发布客户端变化事件,源代码和流程图(流程图是完整流程图中截取的一部分,完整流程图参考七、流程图 )如下:
private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) {
clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());//注册表注册实例
return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
}
@Override
public void registerInstance(Service service, Instance instance, String clientId) {
//获取Service对象,singletonRepository缓存中获得service对象,没有的话放入,namespaceSingletonMaps放入service,key是服务对应的命名空间名称
Service singleton = ServiceManager.getInstance().getSingleton(service);
if (!singleton.isEphemeral()) {//必须是临时实例
throw new NacosRuntimeException(NacosException.INVALID_PARAM,
String.format("Current service %s is persistent service, can't register ephemeral instance.",
singleton.getGroupedServiceName()));
}
Client client = clientManager.getClient(clientId);//通过clientid(也就是两个客户端和服务端的连接id)获取Client对象,此对象是rpc必须封装的,通过EphemeralIpPortClientManager的clients来get的
if (!clientIsLegal(client, clientId)) {
return;
}
InstancePublishInfo instanceInfo = getPublishInfo(instance);//封装InstancePublishInfo信息
//缓存服务Service和InstancePublishInfo到publishers缓存,其中key是service,然后发布客户端变化事件
client.addServiceInstance(singleton, instanceInfo);
client.setLastUpdatedTime();
//发布客户端服务注册事件
NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
//发布实例元数据变动事件
NotifyCenter
.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}
//调用IpPortBasedClient的addServiceInstance方法,super再掉其父类AbstractClient
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
return super.addServiceInstance(service, parseToHealthCheckInstance(instancePublishInfo));
}
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
///缓存服务Service和InstancePublishInfo到publishers注册者缓存
if (null == publishers.put(service, instancePublishInfo)) {
MetricsMonitor.incrementInstanceCount();//监控器,自加1,统计注册实例个数
}
//发布客户端变化事件,细节流程见 六、流程中的事件处理
NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
return true;
}
五、服务端订阅流程(服务查询流程跟这个类似)
服务端订阅流程通过SubscribeServiceRequestHandler处理,进入handle方法,大致分两步,第一步,从缓存serviceDataIndexes中获取,缓中没有时从注册表获得服务信息,获取实例放到service中,缓存集群信息,最后返回实例集合并且缓存到serviceDataIndexes中,第二步,把service和订阅者放到subscribers缓存中并且发布订阅事件,代码和流程图(流程图是完整流程图中截取的一部分,完整流程图参考七、流程图 )如下:
@Override
@Secured(action = ActionTypes.READ)
public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException {
String namespaceId = request.getNamespace();//获取命名空间名称
String serviceName = request.getServiceName();//获取服务名字
String groupName = request.getGroupName();//获取组名
String app = request.getHeader("app", "unknown");
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);//整合service名和组名
Service service = Service.newService(namespaceId, groupName, serviceName, true);
//封装订阅者对象
Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app, meta.getClientIp(),
namespaceId, groupedServiceName, 0, request.getClusters());
//获取ServiceInfo对象信息,主要看serviceStorage.getData(service)
ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service),
metadataManager.getServiceMetadata(service).orElse(null), subscriber);
if (request.isSubscribe()) {//是否是订阅请求
//把service和订阅者放到subscribers缓存中并且发布订阅事件ClientSubscribeServiceEvent
clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());
} else {
clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());
}
return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
}
public ServiceInfo getData(Service service) {
//先从serviceDataIndexes缓存中获取ServiceInfo信息,没有的话调用getPushData(service)获取
return serviceDataIndexes.containsKey(service) ? serviceDataIndexes.get(service) : getPushData(service);
}
public ServiceInfo getPushData(Service service) {
ServiceInfo result = emptyServiceInfo(service);//创建ServiceInfo
if (!ServiceManager.getInstance().containSingleton(service)) {
return result;
}
result.setHosts(getAllInstancesFromIndex(service));//从注册表获得服务信息,获取实例放到service中,缓存集群信息,最后返回实例集合
serviceDataIndexes.put(service, result);//缓存服务信息到serviceDataIndexes
return result;
}
public void subscribeService(Service service, Subscriber subscriber, String clientId) {
//从singletonRepository中获取Service对象
Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);
Client client = clientManager.getClient(clientId);//从clients缓存中获取Client对象
if (!clientIsLegal(client, clientId)) {
return;
}
client.addServiceSubscriber(singleton, subscriber);//添加服务和订阅者
client.setLastUpdatedTime();//添加上次更新时间
//发布客户端订阅服务事件
NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));
}
六、流程中的事件处理
1、发布客户端变化事件:调用DistroClientDataProcessor的onEvent函数,分两部分,第一,向定时任务NacosDelayTaskExecuteEngine的tasks集合中添加定时任务DistroDelayTask,第二,调用集群的其他节点发送DistroDataRequest,用来同步数据,发布发布客户端注册或注销事件、发布客户端断连事件,源码和流程图(流程图是完整流程图中截取的一部分,完整流程图参考七、流程图 )如下:
private void syncToAllServer(ClientEvent event) {
Client client = event.getClient();
// Only ephemeral data sync by Distro, persist client should sync by raft.
if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
return;
}
if (event instanceof ClientEvent.ClientDisconnectEvent) {
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
distroProtocol.sync(distroKey, DataOperation.DELETE);//删除
} else if (event instanceof ClientEvent.ClientChangedEvent) {
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
distroProtocol.sync(distroKey, DataOperation.CHANGE);//客户端改变
}
}
public void sync(DistroKey distroKey, DataOperation action, long delay) {
for (Member each : memberManager.allMembersWithoutSelf()) {
syncToTarget(distroKey, action, each.getAddress(), delay);//同步数据给其他集群
}
}
public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
targetServer);
DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
//向定时任务NacosDelayTaskExecuteEngine的tasks集合中添加定时任务DistroDelayTask,最终会执行
//NacosDelayTaskExecuteEngine是Nacos的定时任务引擎,最终会执行子类的process方法
//这一步会执行DistroDelayTaskProcessor.process
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);
}
}
public boolean process(NacosTask task) {
if (!(task instanceof DistroDelayTask)) {
return true;
}
DistroDelayTask distroDelayTask = (DistroDelayTask) task;
DistroKey distroKey = distroDelayTask.getDistroKey();
switch (distroDelayTask.getAction()) {
case DELETE://删除
DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);
distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);
return true;
case CHANGE://修改
case ADD://添加
//执行DistroSyncChangeTask父类的run,然后执行DistroSyncChangeTask的doExecuteWithCallback方法
DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
return true;
default:
return false;
}
}
protected void doExecuteWithCallback(DistroCallback callback) {
String type = getDistroKey().getResourceType();
DistroData distroData = getDistroData(type);//根据类型获取Distro协议数据
if (null == distroData) {
Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());
return;
}
getDistroComponentHolder().findTransportAgent(type)
.syncData(distroData, getDistroKey().getTargetServer(), callback);//发送给其他集群节点数据
}
public void syncData(DistroData data, String targetServer, DistroCallback callback) {
if (isNoExistTarget(targetServer)) {
callback.onSuccess();
return;
}
DistroDataRequest request = new DistroDataRequest(data, data.getType());//封装DistroDataRequest
Member member = memberManager.find(targetServer);
try {
//集群间RPC通信发送DistroDataRequest,再进入DistroDataRequestHandler.handle方法,再根据不同的操作到不同的方法,改变是handleSyncData方法
//然后DistroProtocol的执行onReceive方法,再执行DistroClientDataProcessor的processData方法
clusterRpcClientProxy.asyncRequest(member, request, new DistroRpcCallbackWrapper(callback, member));
} catch (NacosException nacosException) {
callback.onFailed(nacosException);
}
}
public boolean processData(DistroData distroData) {
switch (distroData.getType()) {
case ADD:
case CHANGE:
ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class)
.deserialize(distroData.getContent(), ClientSyncData.class);
handlerClientSyncData(clientSyncData);//同步客户端连接,更新clients注册表数据,发布客户端注册或注销事件,详见下面源码及流程图
return true;
case DELETE:
String deleteClientId = distroData.getDistroKey().getResourceKey();
Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);
clientManager.clientDisconnected(deleteClientId);//clients.remove移除客户端,并发布发布客户端断连事件,详见下面源码及流程图
return true;
default:
return false;
}
}
2、客户端注册事件和客户端订阅事件和客户端断连事件:前两个事件都属于是ClientOperationEvent事件,后面的是ClientDisconnectEvent事件,都会执行ClientServiceIndexesManager.onEvent方法,然后根据不同的操作进入不同的方法,注册、注销、订阅、取消订阅、断连,源码和流程图(流程图是完整流程图中截取的一部分,完整流程图参考七、流程图 )如下:
public void onEvent(Event event) {
if (event instanceof ClientEvent.ClientDisconnectEvent) {
handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);//执行客户端断连事件
} else if (event instanceof ClientOperationEvent) {
handleClientOperation((ClientOperationEvent) event);//注册、注销、订阅、取消订阅事件
}
}
//执行客户端断连事件
private void handleClientDisconnect(ClientEvent.ClientDisconnectEvent event) {
Client client = event.getClient();
for (Service each : client.getAllSubscribeService()) {
removeSubscriberIndexes(each, client.getClientId());//注册表中移除客户端id,下面会解析
}
for (Service each : client.getAllPublishedService()) {
removePublisherIndexes(each, client.getClientId());//订阅表也移除客户端id,下面会解析
}
}
//根据事件具体分类区分注册、注销、订阅、取消订阅事件
private void handleClientOperation(ClientOperationEvent event) {
Service service = event.getService();
String clientId = event.getClientId();
if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
addPublisherIndexes(service, clientId);//注册服务
} else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
removePublisherIndexes(service, clientId);//注销服务
} else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
addSubscriberIndexes(service, clientId);//添加订阅
} else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
removeSubscriberIndexes(service, clientId);//取消订阅
}
}
//注册表publisherIndexes添加客户端id,发布服务改变事件
private void addPublisherIndexes(Service service, String clientId) {
publisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
publisherIndexes.get(service).add(clientId);
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
}
//注册表publisherIndexes移除客户端id,发布服务改变事件
private void removePublisherIndexes(Service service, String clientId) {
if (!publisherIndexes.containsKey(service)) {
return;
}
publisherIndexes.get(service).remove(clientId);
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
}
//订阅表subscriberIndexes添加客户端id,发布服务订阅事件
private void addSubscriberIndexes(Service service, String clientId) {
subscriberIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
if (subscriberIndexes.get(service).add(clientId)) {
NotifyCenter.publishEvent(new ServiceEvent.ServiceSubscribedEvent(service, clientId));
}
}
//订阅表subscriberIndexes移除客户端id
private void removeSubscriberIndexes(Service service, String clientId) {
if (!subscriberIndexes.containsKey(service)) {
return;
}
subscriberIndexes.get(service).remove(clientId);
if (subscriberIndexes.get(service).isEmpty()) {
subscriberIndexes.remove(service);
}
}
3、服务改变事件和服务订阅事件:向定时任务NacosDelayTaskExecuteEngine的tasks集合中添加定时任务PushDelayTask,通过参数区分,订阅只推送给当前订阅者,注册时推送给所有订阅者,都会执行NamingSubscriberServiceV2Impl.onEvent方法,然后执行PushExecuteTask.run,最后会通过rpc请求客户端推送最新的信息,客户端缓存起来,请求对象是ServerRequest,源码和流程图(流程图是完整流程图中截取的一部分,完整流程图参考七、流程图 )如下:
public void onEvent(Event event) {
if (!upgradeJudgement.isUseGrpcFeatures()) {
return;
}
if (event instanceof ServiceEvent.ServiceChangedEvent) {
//如果 service发生变化, 推送给所有的订阅者.new PushDelayTask只有两个参数
ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
Service service = serviceChangedEvent.getService();
//添加任务
delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
} else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {
//如果订阅变化, 推送给当前的订阅者.new PushDelayTask有3个参数,最终subscribedEvent.getClientId()会一直传下去,有一个地方有区分
ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;
Service service = subscribedEvent.getService();
//添加任务
delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(),
subscribedEvent.getClientId()));
}
}
NacosDelayTaskExecuteEngine初始化时产生定时任务,在processTasks()取任务处理,这儿代码相对简单不在分析,最终会进入PushDelayTaskExecuteEngine.PushDelayTaskProcessor.process方法。
public boolean process(NacosTask task) {
PushDelayTask pushDelayTask = (PushDelayTask) task;
Service service = pushDelayTask.getService();
//执行任务PushExecuteTask,执行PushExecuteTask.run,循环得到ClientID,获取订阅者对象,推送给客户端变化数据
//在进入PushExecutorRpcImpl.doPushWithCallback,再到RpcPushService.pushWithCallback
//通过connection.asyncRequest发送ServerRequest到客户端,然后客户端获取service信息放到本地缓存serviceInfoMap中
//并发布服务改变InstancesChangeEvent事件,最终写到本地缓存文件中
NamingExecuteTaskDispatcher.getInstance()
.dispatchAndExecuteTask(service, new PushExecuteTask(service, executeEngine, pushDelayTask));
return true;
}
七、流程图
完整流程图链接:https://kdocs.cn/l/cucHfYowAItw
页面更新:2024-06-08
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号