仅此一招,再无消息乱序的烦恼

1. 概览

RocketMQ 早已提供了一组最佳实践,但工作在一线的伙伴却很少知道,项目中的各种随性代码经常导致消息错乱问题,严重影响业务的准确性。为了保障最佳实践的落地,降低一线伙伴的使用成本,统一 MQ 使用规范,需要对其进行抽象和封装…

1.1. 背景

RocketMQ的最佳实践中推荐:一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置。

在使用rocketMQTemplate发送消息时,通过设置发送方法的destination参数来设置消息的目的地,destination的格式为topicName:tagName,:前面表示topic的名称,后面表示tags名称,简单示例如下:

// 计算 destination
protected String createDestination(String topic, String tag) {
    if (org.apache.commons.lang3.StringUtils.isNotEmpty(tag)){
        return topic + ":" + tag;
    }else {
        return topic;
    }
}
// 发送信息
String destination = createDestination(topic, tag);
SendResult sendResult = this.rocketMQTemplate.syncSendOrderly(destination, msg, shardingKey, 2000);

tags从命名来看像是一个复数,但发送消息时,目的地只能指定一个topic下的一个tag,不能指定多个。

但,在消费消息时,就变的没那么方便了,简单示例如下:

@Service
@RocketMQMessageListener(
    topic = "consumer-test-topic-1",
        consumerGroup ="user-message-consumer-1",
        selectorExpression = "*",
        consumeMode = ConsumeMode.ORDERLY
)
@Slf4j
public class RocketBasedUserMessageConsumer extends UserMessageConsumer
    implements RocketMQListener {
    @Override
    public void onMessage(MessageExt message) {
        String tag = message.getTags();
        byte[] body = message.getBody();
        log.info("handle msg body {}", new String(body));
        switch (tag){
            case "UserCreatedEvent":
                UserEvents.UserCreatedEvent createdEvent = JSON.parseObject(body, UserEvents.UserCreatedEvent.class);
                handle(createdEvent);
                return;
            case "UserEnableEvent":
                UserEvents.UserEnableEvent enableEvent = JSON.parseObject(body, UserEvents.UserEnableEvent.class);
                handle(enableEvent);
                return;
            case "UserDisableEvent":
                UserEvents.UserDisableEvent disableEvent = JSON.parseObject(body, UserEvents.UserDisableEvent.class);
                handle(disableEvent);
                return;
            case "UserDeletedEvent":
                UserEvents.UserDeletedEvent deletedEvent = JSON.parseObject(body, UserEvents.UserDeletedEvent.class);
                handle(deletedEvent);
                return;
        }
    }
}

该方法有几个问题:

  1. tag 维护成本较高,RocketMQMessageListener 设置 selectorExpression 为 *,将拉取全部数据,增加通讯成本;如果使用 tag1 || tag2 方式,每次调整都需要对代码和配置进行更新,特别容易遗漏;
  2. 充斥大量模板代码,比如 case 分支,反序列化,调用业务方法等;
  3. API 具有侵入性,开发是需要关心 RocketMQ API,存在一定学习成本;

1.2. 目标

提供一种面向业务场景的,灵活进行业务扩展的模式,具有以下特征:

  1. Tag 和代码保持一致,不需要多处配置,新增逻辑自动完成 Tag 注册;
  2. 消除模板方法,类中只保留核心业务方法,框架完成 方法分发、消息反序列化等操作;
  3. 代码零侵入,仅使用注解,无需了解 RocketMQ API;

2. 快速入门

框架依赖 rocketmq-spring-boot-starter 完成消息发送和回收。

2.1. 环境准备

2.1.1. 增加依赖

首先,增加 rocketmq 相关依赖。


    org.apache.rocketmq
    rocketmq-spring-boot-starter
    2.2.1

然后,增加 lego starter。


    com.geekhalo.lego
    lego-starter
    0.1.13-tag_based_dispatcher_message_consumer-SNAPSHOT

2.1.2. 增加配置

在 application.yml 文件中增加 rocketmq 配置。

rocketmq:
  name-server: http://127.0.0.1:9876
  producer:
    group: rocket-demo

2.2. 定义消费者

定义消费者,只需:

  1. 在 Bean 上增加 @TagBasedDispatcherMessageConsumer 注解,并指定 topic 和 consumer
  2. 在 Bean 的方法上添加 @HandleTag 注解,并指定监听的 tag

示例如下:

@TagBasedDispatcherMessageConsumer(
        topic = "consumer-test-topic",
        consumer = "user-message-consumer"
)
public class UserMessageConsumer {
    private final Map> events = Maps.newHashMap();
    public void clean(){
        this.events.clear();;
    }
    public List getUserEvents(Long userId){
        return this.events.get(userId);
    }
    @HandleTag("UserCreatedEvent")
    public void handle(UserEvents.UserCreatedEvent userCreatedEvent){
        List userEvents = this.events.computeIfAbsent(userCreatedEvent.getUserId(), userId -> new ArrayList<>());
        userEvents.add(userCreatedEvent);
    }
    @HandleTag("UserEnableEvent")
    public void handle(UserEvents.UserEnableEvent userEnableEvent){
        List userEvents = this.events.computeIfAbsent(userEnableEvent.getUserId(), userId -> new ArrayList<>());
        userEvents.add(userEnableEvent);
    }
    @HandleTag("UserDisableEvent")
    public void handle(UserEvents.UserDisableEvent userDisableEvent){
        List userEvents = this.events.computeIfAbsent(userDisableEvent.getUserId(), userId -> new ArrayList<>());
        userEvents.add(userDisableEvent);
    }
    @HandleTag("UserDeletedEvent")
    public void handle(UserEvents.UserDeletedEvent userDeletedEvent){
        List userEvents = this.events.computeIfAbsent(userDeletedEvent.getUserId(), userId -> new ArrayList<>());
        userEvents.add(userDeletedEvent);
    }
}

2.3. 测试

编写测试用例如下:

@SpringBootTest(classes = DemoApplication.class)
@Slf4j
class UserMessageConsumerTest {
    @Autowired
    private UserMessageConsumer userMessageConsumer;
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    private List userIds;
    @BeforeEach
    void setUp() throws InterruptedException {
        this.userMessageConsumer.clean();
        this.userIds = new ArrayList<>();
        for (int i = 0; i< 100; i++){
            userIds.add(10000L + i);
        }
        this.userIds.forEach(userId -> sendMessage(userId));
        TimeUnit.SECONDS.sleep(3);
    }
    private void sendMessage(Long userId) {
        String topic = "consumer-test-topic";
        {
            String tag = "UserCreatedEvent";
            UserEvents.UserCreatedEvent userCreatedEvent = new UserEvents.UserCreatedEvent();
            userCreatedEvent.setUserId(userId);
            userCreatedEvent.setUserName("Name-" + userId);
            sendOrderlyMessage(topic, tag, userCreatedEvent);
        }
        {
            String tag = "UserEnableEvent";
            UserEvents.UserEnableEvent userEnableEvent = new UserEvents.UserEnableEvent();
            userEnableEvent.setUserId(userId);
            userEnableEvent.setUserName("Name-" + userId);
            sendOrderlyMessage(topic, tag, userEnableEvent);
        }
        {
            String tag = "UserDisableEvent";
            UserEvents.UserDisableEvent userDisableEvent = new UserEvents.UserDisableEvent();
            userDisableEvent.setUserId(userId);
            userDisableEvent.setUserName("Name-" + userId);
            sendOrderlyMessage(topic, tag, userDisableEvent);
        }
        {
            String tag = "UserDeletedEvent";
            UserEvents.UserDeletedEvent userDeletedEvent = new UserEvents.UserDeletedEvent();
            userDeletedEvent.setUserId(userId);
            userDeletedEvent.setUserName("Name-" + userId);
            sendOrderlyMessage(topic, tag, userDeletedEvent);
        }
    }
    private void sendOrderlyMessage(String topic, String tag, UserEvents.UserEvent event) {
        String shardingKey = String.valueOf(event.getUserId());
        String json = JSON.toJSONString(event);
        Message msg = MessageBuilder
                .withPayload(json)
                .build();
        String destination = createDestination(topic, tag);
        SendResult sendResult = this.rocketMQTemplate.syncSendOrderly(destination, msg, shardingKey, 2000);
        log.info("Send result is {} for msg", sendResult, msg);
    }
    protected String createDestination(String topic, String tag) {
        if (org.apache.commons.lang3.StringUtils.isNotEmpty(tag)){
            return topic + ":" + tag;
        }else {
            return topic;
        }
    }
    @AfterEach
    void tearDown() {
    }
    @Test
    void getUserEvents() {
        this.userIds.forEach(userId ->{
            List userEvents = this.userMessageConsumer.getUserEvents(userId);
            Assertions.assertEquals(4, userEvents.size());
            Assertions.assertTrue(userEvents.get(0) instanceof UserEvents.UserCreatedEvent);
            Assertions.assertTrue(userEvents.get(1) instanceof UserEvents.UserEnableEvent);
            Assertions.assertTrue(userEvents.get(2) instanceof UserEvents.UserDisableEvent);
            Assertions.assertTrue(userEvents.get(3) instanceof UserEvents.UserDeletedEvent);
        });
    }
}

启动时,可以看到如下日志:

TagBasedDispatcherConsumerContainer : success to subscribe  http://127.0.0.1:9876, topic consumer-test-topic, tag UserCreatedEvent||UserEnableEvent||UserDeletedEvent||UserDisableEvent, group user-message-consumer

从日志上可以看出,框架以组 group user-message-consumer 创建 Consumer,并订阅 consumer-test-topic 的 UserCreatedEvent||UserEnableEvent||UserDeletedEvent||UserDisableEvent 等 Tag,初始化流程符合预期。

测试逻辑比较简单,逻辑如下:

  1. 创建 100 个用户
  2. 每个用户创建并依次发布领域事件,UserCreatedEvent、UserEnableEvent、UserDisableEvent、UserDeletedEvent
  3. 消费发送完成后,停顿 3 秒
  4. 依次检测每个用户收到的消息,并对顺序进行检测

观察日志,可以看到发送和消费日志交替出现:

UserMessageConsumerTest        : Send result is SendResult [sendStatus=SEND_OK, msgId=2408820718EADE005827F0B9E9D4D6D9B98158644D467D38DE4900FD, offsetMsgId=C0A8010A00002A9F00000000056077FB, messageQueue=MessageQueue [topic=consumer-test-topic, brokerName=bogon, queueId=2], queueOffset=1121] for msg
TagBasedDispatcherConsumerContainer : consume 2408820718EADE005827F0B9E9D4D6D9B98158644D467D38DE4700FC cost: 0 ms

用例通过,运行结果符合预期。

3. 设计&扩展

3.1. 初始化流程

image

框架初始化流程如下:

  1. TagBasedDispatcherConsumerContainerRegistry 实现 Spring 的 BeanPostProcessor 接口,依次对托管 bean 进行处理;
  2. 如果 Bean 上存在 @TagBasedDispatcherMessageConsumer 注解,便会提取配置信息,构建 TagBasedDispatcherConsumerContainer 实例
  3. TagBasedDispatcherConsumerContainer 收集方法上的 @HandleTag 注解,结合 @TagBasedDispatcherMessageConsumer 上的 topic、consumer 等信息构建 DefaultMQPushConsumer 并完成 topic 和 tag 的订阅
  4. TagBasedDispatcherConsumerContainer 内部会构建 tag 与 method 的映射关系,以对指定tag进行处理;

3.2. 运行流程

image
运行流程如下:


  1. 消息发送者将消息发送至 MQ;
  2. MQ 将消息发送至 Consumer;
  3. Consumer 收到消息后,根据 tag 对消息进行分发;
  4. 处理器对消息进行反序列化,获取调用参数,然后调用方法执行业务逻辑;

4. 项目信息

项目仓库地址:https://gitee.com/litao851025/lego

项目文档地址:https://gitee.com/litao851025/lego/wikis/support/TagBasedDispatcherMessageConsumer

展开阅读全文

页面更新:2024-04-29

标签:消息   注解   示例   初始化   框架   逻辑   烦恼   流程   代码   业务   方法

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top