探秘RocketMQ 5.0 client端Nameserver地址更新的源码实现方式

源码版本

Client如何更新本地namesrvAddr

client更新Nameserver的定时器主要是在这里启动的

this.startScheduledTask();

我们进去这个方法看一看

如果clientConfig获取的namesrvAddr地址为空就会启动一个定时任务

其中定时任务中方法

MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();

就是通过我们设置的wsAddr(也就是获取namesrvAddr的url地址)地址去获取namesrvAddr地址

线程池scheduledExecutorService的初始化

private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "MQClientFactoryScheduledThread"));

可以看到是线程池延迟1s后执行,每隔2分钟去http服务器去拉取namesrvAddr的地址

实际更新本地的namesrvAddr地址是通过updateNameServerAddressList方法

最终实际去更新的还是NettyRemotingClient中的namesrvAddrList值

    @Override
    public void updateNameServerAddressList(List addrs) {
        List old = this.namesrvAddrList.get();
        boolean update = false;

        if (!addrs.isEmpty()) {
            if (null == old) {
                update = true;
            } else if (addrs.size() != old.size()) {
                update = true;
            } else {
                for (int i = 0; i < addrs.size() && !update; i++) {
                    if (!old.contains(addrs.get(i))) {
                        update = true;
                    }
                }
            }

            if (update) {
                Collections.shuffle(addrs);
                LOGGER.info("name server address updated. NEW : {} , OLD: {}", addrs, old);
                this.namesrvAddrList.set(addrs);

                // should close the channel if choosed addr is not exist.
                if (this.namesrvAddrChoosed.get() != null && !addrs.contains(this.namesrvAddrChoosed.get())) {
                    String namesrvAddr = this.namesrvAddrChoosed.get();
                    for (String addr : this.channelTables.keySet()) {
                        if (addr.contains(namesrvAddr)) {
                            ChannelWrapper channelWrapper = this.channelTables.get(addr);
                            if (channelWrapper != null) {
                                closeChannel(channelWrapper.getChannel());
                            }
                        }
                    }
                }
            }
        }
    }

这里除了更新namesrvAddrList的地址外,还通过Collections.shuffle(addrs)方法对addrs的地址进行了随机打散,就是为了后面再随机选用一个namesrvAddr地址,同时还做了一个事情就是如果新获取到的namesrvAddr和原有的对比,如果旧的namesrvAddr地址有被新namesrvAddr地址剔除的就会将客户端与旧NameServer断开tcp连接

closeChannel(channelWrapper.getChannel());

值得注意的这里都是客户端的namesrvAddr更新,还没有涉及到namesrvAddr的重连,所以我们还需要看看namesrvAddr重连的定时器

跟随源码可以定位到重连的定时器主要在NettyRemotingClient这个类中启动的

其中timer的初始化也是new了一个简单的Timer

private final Timer timer = new Timer("ClientHouseKeepingService", true);

可以看到是启动就直接执行没有延时时间,循环时间默认3s执行一次,可通过参数com.rocketmq.remoting.client.connect.timeout设置

这里我们进入到scanAvailableNameSrv方法看看

	private void scanAvailableNameSrv() {
        List nameServerList = this.namesrvAddrList.get();
        if (nameServerList == null) {
            LOGGER.debug("scanAvailableNameSrv Addresses of name server is empty!");
            return;
        }

        for (final String namesrvAddr : nameServerList) {
            scanExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        Channel channel = NettyRemotingClient.this.getAndCreateChannel(namesrvAddr);
                        if (channel != null) {
                            NettyRemotingClient.this.availableNamesrvAddrMap.putIfAbsent(namesrvAddr, true);
                        } else {
                            NettyRemotingClient.this.availableNamesrvAddrMap.remove(namesrvAddr);
                        }
                    } catch (Exception e) {
                        LOGGER.error("scanAvailableNameSrv get channel of {} failed, ", namesrvAddr, e);
                    }
                }
            });
        }

    }

可以看到这里面又通过一个线程池scanExecutor去异步扫描多个namesrvAddr

我们这里看看scanExecutor线程池的初始化配置

	this.scanExecutor = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS,
            new ArrayBlockingQueue(32), new ThreadFactory() {
                private final AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyClientScan_thread_" + this.threadIndex.incrementAndGet());
                }
            }
        );

我们进入NettyRemotingClient.this.getAndCreateChannel方法看看

注意这里第一行有一个判断就是

null == addr

这个判断是为了MQClientInstance.this.updateTopicRouteInfoFromNameServer(); 这里准备的

还记得我们的startScheduledTask()方法吗 这里是开了一个定时器,通过方法MQClientInstance.this.updateTopicRouteInfoFromNameServer();定时去更新topic的信息

默认时间可以看到是30s

我们可以看看TopicRouteData的相关数据

值得一提的我们发送消息最终使用的topic、queue相关信息是使用的TopicPublishInfo属性,TopicRouteData与TopicPublishInfo相关的转化是通过 public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route)方法

我们可以看看转换后的TopicPublishInfo是啥样

然后我们来看看getAndCreateNameserverChannel()方法

        private Channel getAndCreateNameserverChannel() throws InterruptedException {
        String addr = this.namesrvAddrChoosed.get();
        if (addr != null) {
            ChannelWrapper cw = this.channelTables.get(addr);
            if (cw != null && cw.isOK()) {
                return cw.getChannel();
            }
        }

        final List addrList = this.namesrvAddrList.get();
        if (this.namesrvChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
            try {
                addr = this.namesrvAddrChoosed.get();
                if (addr != null) {
                    ChannelWrapper cw = this.channelTables.get(addr);
                    if (cw != null && cw.isOK()) {
                        return cw.getChannel();
                    }
                }

                if (addrList != null && !addrList.isEmpty()) {
                    for (int i = 0; i < addrList.size(); i++) {
                        int index = this.namesrvIndex.incrementAndGet();
                        index = Math.abs(index);
                        index = index % addrList.size();
                        String newAddr = addrList.get(index);

                        this.namesrvAddrChoosed.set(newAddr);
                        LOGGER.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);
                        Channel channelNew = this.createChannel(newAddr);
                        if (channelNew != null) {
                            return channelNew;
                        }
                    }
                    throw new RemotingConnectException(addrList.toString());
                }
            } catch (Exception e) {
                LOGGER.error("getAndCreateNameserverChannel: create name server channel exception", e);
            } finally {
                this.namesrvChannelLock.unlock();
            }
        } else {
            LOGGER.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
        }

        return null;
    }

有一些值得注意的是

  1. 通过namesrvAddrChoosed获取nameserver地址
  2. 连接的channal存储在channelTables中复用
  3. 如果namesrvAddrChoosed获取到的nameserver地址不存在或者不是存活的则随机再从namesrvAddrList获取一个地址 具体核心代码如下
                        int index = this.namesrvIndex.incrementAndGet();
                        index = Math.abs(index);
                        index = index % addrList.size();
                        String newAddr = addrList.get(index);

                        this.namesrvAddrChoosed.set(newAddr);

这里就是每次发送消息更新topic的流程

然后我们来看看scanAvailableNameSrv流程的this.createChannel(addr)方法

private Channel createChannel(final String addr) throws InterruptedException {
	//获取原来的ChannelWrapper
        ChannelWrapper cw = this.channelTables.get(addr);
	// 如果存在并且存活则直接返回
        if (cw != null && cw.isOK()) {
            return cw.getChannel();
        }

        if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
            try {
		// 是否创建新的连接
                boolean createNewConnection;
		//双重检查 类似双重检查的单例模式
                cw = this.channelTables.get(addr);
                if (cw != null) {

                    if (cw.isOK()) {
                        return cw.getChannel();
                    } else if (!cw.getChannelFuture().isDone()) {
                        createNewConnection = false;
                    } else {
                        this.channelTables.remove(addr);
                        createNewConnection = true;
                    }
                } else {
                    createNewConnection = true;
                }
		            // 创建新的连接
                if (createNewConnection) {
		                // 连接到 Nameserver
                    ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));
                    LOGGER.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
                    cw = new ChannelWrapper(channelFuture);
                    this.channelTables.put(addr, cw);
                }
            } catch (Exception e) {
                LOGGER.error("createChannel: create channel exception", e);
            } finally {
                this.lockChannelTables.unlock();
            }
        } else {
            LOGGER.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
        }

        if (cw != null) {
            ChannelFuture channelFuture = cw.getChannelFuture();
            if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {
                if (cw.isOK()) {
                    LOGGER.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
                    return cw.getChannel();
                } else {
                    LOGGER.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString());
                }
            } else {
                LOGGER.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),
                    channelFuture.toString());
            }
        }

        return null;
    }

小结

总的来说客户端获取namesrvAddr的方式是三秒更新一次,所以客户端3s就能感受到namesrvAddr是否下线。

连接相关的信息存储在channelTables中,即

总结

  1. 客户端每隔30s就会去扫描判断Nameserver是否存活
  2. 客户端每隔2分钟才会去http服务器拉取最新的Nameserver地址
  3. 客户端topic相关的缓存信息每隔30s会去Nameserver拉取最新的topic、broker相关信息
展开阅读全文

页面更新:2024-02-19

标签:源码   地址   定时器   初始化   线程   客户端   流程   方式   时间   方法   信息

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top