18 | 消息投递:如何保证消息仅仅被消费一次?


上节课我们通过在电商系统增加消息组件达到了削峰填谷的作用,对次要业务逻辑做了异步处理,对不同的系统模块做了解耦。因为业务逻辑从队列中移除了,所以我们要有响应的队列处理逻辑了,这时 系统架构就变成了 如下:

18 | 消息投递:如何保证消息仅仅被消费一次?

今天 再来讨论一个非主流程的业务逻辑,比如客户购买了一个商品之后,会给用户发红包,但是发红包这个流程是非必要的业务关键流程,所以可以通过消息队列来异步处理。但是这里面容易出现一个问题,如果红包消息重复处理了就会导致公司利益损失,这个问题怎么解决呢?这个就是我们今天的话题:消息的重复消费问题。

那么我们如何来保证产生的消息一定会被消费并且只被消费一次呢?

消息为什么会丢失

如果消息只被消费一次,首先保证消息不丢失。那么消息从被写入消息队列到被消费者完成,这个链路会有哪些丢失消息的可能,三个方面

  1. 消息从生产者写入消息队列的过程
  2. 消息在消息队列中的存储场景
  3. 消息被消费者消费的过程
18 | 消息投递:如何保证消息仅仅被消费一次?

1、在消息生产的过程中丢失消息

首先消息生产者是我们的业务服务器,消息队列服务器为单独部署的。虽然是内网,但这个过程也有可能出现网络抖动导致消息丢失的可能。

针对这种情况,建议采用消息重发机制,但是也不能无限制的重发,如果不是因为消息队列发生故障或是网络断开,一般重试 2~3次即可解决。而重发机制,也是大多数消息队列本身提供的功能。

消息重发存在的问题就是消息的重复消费,现实的意义就是你收到了两个红包,公司就亏损了。

那么消息发送到了消息队列是不是就万无一失了呢?也不是,消息队列中仍然有丢失的可能。

2、在消息队列中丢失消息

已kafka 为例,消息在kafka中是存在本地磁盘上的,而为了减少消息存储对磁盘的随机IO,一般消息会先写入到操作系统的page cache 中,然后再找合适的时间刷新到磁盘上。

比如,卡夫卡可以设置一定的时间或者消息累积到一定量时候再刷盘,也就是所说的异步刷盘。

不过如果发生机器断电或者机器重启,page cache 中还没来得及刷盘消息就会丢失,那怎么解决呢?

你可以设置把刷盘时间设置很短或者消息累计很少就刷,但是这样频繁刷盘对性能也有很大影响,经验来看,出现机器宕机或者掉电的几率也不高,所以不建议这么做。

18 | 消息投递:如何保证消息仅仅被消费一次?

如果系统对消息丢失容忍度很低,可以考虑集群部署消息服务,通过部署多个副本保证消息尽量不丢失。

kafka集群中还有一个Leader负责消息的写入和消费,还有多个follower负责消息的数据备份,Follower中还有一个特殊的集合 ISR(in-sync replicas)当Leader故障时,新的Leader会从ISR中选择,默认Leader的数据会异步的复制给Follower,这样在 Leader 发生掉电或者宕机时,Kafka 会从 Follower 中消费消息,减少消息丢失的可能。

由于消息默认是从 Leader复制到Follower的,所以一旦Leader宕机,那么没来得及复制的消息还是会丢失,为了解决这个问题,Kafka提供了一种机制 :acks当这个选项被设置为“all”时,生产者发送的每一条消息除了发给 Leader 外还会发给所有的 ISR,并且必须得到 Leader 和所有 ISR 的确认后才被认为发送成功。这样,只有 Leader 和所有的 ISR 都挂了消息才会丢失。

18 | 消息投递:如何保证消息仅仅被消费一次?

从上面这张图来看,当设置“acks=all”时,需要同步执行 1、3、4 三个步骤,对于消息生产的性能来说也是有比较大的影响的,所以你在实际应用中需要仔细地权衡考量。我给你的建议是:

  1. 如果你的消息一条不能丢失,建议不要开启同步刷盘,而是用集群方式解决,可以配置当所有ISR Follower都接收到消息才返回成功
  2. 如果对消息丢失有一定容忍度,不建议使用集群,或者即使是集群也配置一个Follower同步成功即返回成功。
  3. 上面我们发红包的业务就有一定容忍度,如果消息丢失可以通过补发来实现。

3、在消费的过程中存在消息丢失的可能

一个消息消费的进度是记录在消息队列群里的,消费的过程分为三步:接收消息、处理消息、更新消费进度。

接受消息和处理消息都有可能发生失败,比如接收消息发生网络抖动,导致消息没有正确接收到,处理消息可能发生一些异常,这是入股更新了消费进度,那么这条消息就永远不会被处理了。

所以你这里需要注意了,一定要等到消息正确处理完成再正确提交,这时也有一个问题就是,加入消息已经正确处理,但是由于网络问题没有正确提交消费进度,这个时候就会导致消息会重复的消费。

如何保证消息只被消费一次

从上面分析可以得出,为了避免消息丢失,我们做了两方面的努力:一是需要牺牲一部分性能,而是可能造成消息的重复消费。

性能损耗还可以,一般业务系统只有写操作才会发送消息队列,一般系统的写请求并发量不高,但是消息一旦被重复消费就会造成业务逻辑的错误。那么如何避免呢?

完全的做到不消费重复消息 很难做到,所以我们转换思路,只要保证即使消费到了重复消息,从消费结果来看,跟只消费一次的等同就好了。也就是保证 生产和消费的过程 幂等!!!

说白了,就是一件事无论做了多少次,产生的结果都是一样的,这个就是幂等了。

在生产、消费过程中增加消息幂等性的保证

消息的生产和消费过程都有可能产生重复,所以要做的就是生产和消费的过程中增加消息的幂等性,这样就可以认为从结果上看实际只消费了一次。

在消息生产过程中,在 Kafka0.11 版本和 Pulsar 中都支持“producer idempotency”的特性,翻译过来就是生产过程的幂等性,这种特性保证消息虽然可能在生产端产生重复,但是最终在消息队列存储时只会存储一份

它的做法是给每一个生产者一个唯一的 ID,并且为生产的每一条消息赋予一个唯一 ID,消息队列的服务端会存储 < 生产者 ID,最后一条消息 ID> 的映射。当某一个生产者产生新的消息时,消息队列服务端会比对消息 ID 是否与存储的最后一条 ID 一致,如果一致就认为是重复的消息,服务端会自动丢弃。

18 | 消息投递:如何保证消息仅仅被消费一次?

而在消费端,幂等性的保证会稍微复杂一些,你可以从通用层和业务层两个层面来考虑。

通用层面,你可以在消息被生产的时候使用发号器给生成一个唯一id,消息被处理后,把这个id存储在数据库中,在处理下一条消息之前从数据库查询这个id 消息是否被消费过,如果被消费就放弃消费。

你可以看到,无论是生产端的幂等性保证方式还是消费端通用的幂等性保证方式,它们的共同特点都是为每一个消息生成一个唯一的 ID,然后在使用这个消息的时候,先比对这个 ID 是否已经存在,如果存在则认为消息已经被使用过。所以这种方式是一种标准的实现幂等的方式,你在项目之中可以拿来直接使用,它在逻辑上的伪代码就像下面这样:

boolean isIDExisted = selectByID(ID); // 判断ID是否存在
if(isIDExisted) {
  return; //存在则直接返回
} else {
  process(message); //不存在,则处理消息
  saveID(ID);   //存储ID
}

这样存在一个问题,如果消息处理完成之后,还没来得及写入数据库,消费者宕机了,重启之后发现数据库没有,还是会重复消费消息,这个时候就考虑引入事务机制,保证消息的消费跟数据库的写入必须同时成功或者失败,但是这样的成本会很高,所以如果要求不是很严格,可以直接使用这种通用的方案,不要考虑引入事务。

在业务层面怎么处理呢?这里有很多种处理方式,其中有一种是增加乐观锁的方式。比如你的消息处理程序需要给一个人的账号加钱,那么你可以通过乐观锁的方式来解决。

具体的操作方式是这样的:你给每个人的账号数据中增加一个版本号的字段,在生产消息时先查询这个账户的版本号,并且将版本号连同消息一起发送给消息队列。消费端在拿到消息和版本号后,在执行更新账户金额 SQL 的时候带上版本号,类似于执行:

你看,我们在更新数据时给数据加了乐观锁,这样在消费第一条消息时,version 值为 1,SQL 可以执行成功,并且同时把 version 值改为了 2;在执行第二条相同的消息时,由于 version 值不再是 1,所以这条 SQL 不能执行成功,也就保证了消息的幂等性。



展开阅读全文

页面更新:2024-04-12

标签:容忍度   消息   生产者   队列   集群   版本号   进度   逻辑   机制   性能   过程   方式   建议   业务   系统   科技

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top