在单体系统的开发过程中,假如某个场景下需要对数据库的多张表进行操作,为了保证数据的一致性,一般会使用事务,将所有的操作全部提交或者在出错的时候全部回滚。以创建订单为例,假设下单后需要做两个操作:
在单体架构下只需使用@Transactional开启事务,就可以保证数据的一致性:
@Transactional
public void order() {
String orderId = UUID.randomUUID().toString();
// 生成订单
orderService.createOrder(orderId);
// 增加积分
creditService.addCredits(orderId);
}
然而现在越来越多系统开始使用分布式架构,在分布式架构下,订单系统和积分系统可能是两个独立的服务,此时就不能使用上述的方法开启事务了,因为它们不处于同一个事务中,在出错的情况下,无法进行全部回滚,只能对当前服务的事务进行回滚,所以就有可能出现订单生成成功但是积分服务增加积分失败的情况(也可能相反),此时数据处于不一致的状态。
分布式架构下如果需要保证事务的一致性,需要使用分布式事务,分布式事务的实现方式有多种,这里我们先看通过RocketMQ事务的实现方式。
同样以下单流程为例,在分布式架构下的处理流程如下:
普通MQ消息存在的问题
如果使用@Transactional + 发送普通MQ的方式,看下存在的问题:
@Transactional
public void order() {
String orderId = UUID.randomUUID().toString();
// 创建订单
Order order = orderService.createOrder(orderDTO.getOrderId());
// 发送订单创建的MQ消息
sendOrderMessge(order);
return;
}
解决上述问题的方式就是使用RocketMQ事务消息。
RocketMQ事务消息的使用
使用事务消息需要实现自定义的事务监听器,TransactionListener提供了本地事务执行和状态回查的接口,executeLocalTransaction方法用于执行我们的本地事务,checkLocalTransaction是一种补偿机制,在异常情况下如果未收到事务的提交请求,会调用此方法进行事务状态查询,以此决定是否将事务进行提交/回滚:
public interface TransactionListener {
/**
* 执行本地事务
*
* @param msg Half(prepare) message half消息
* @param arg Custom business parameter
* @return Transaction state
*/
LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
/**
* 本地事务状态回查
*
* @param msg Check message
* @return Transaction state
*/
LocalTransactionState checkLocalTransaction(final MessageExt msg);
}
这里我们实现自定义的事务监听器OrderTransactionListenerImpl:
public class OrderTransactionListenerImpl implements TransactionListener {
@Autowired
private OrderService orderService;
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
String body = new String(msg.getBody(), Charset.forName("UTF-8"));
OrderDTO orderDTO = JSON.parseObject(body, OrderDTO.class);
// 模拟生成订单
orderService.createOrder(orderDTO.getOrderId());
} catch (Exception e) {
// 出现异常,返回回滚状态
return LocalTransactionState.ROLLBACK_MESSAGE;
}
// 创建成功,返回提交状态
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String body = new String(msg.getBody(), Charset.forName("UTF-8"));
OrderDTO orderDTO = JSON.parseObject(body, OrderDTO.class);
try {
// 根据订单ID查询订单是否存在
Order order = orderService.getOrderByOrderId(orderDTO.getOrderId());
if (null != order) {
return LocalTransactionState.COMMIT_MESSAGE;
}
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
接下来看如何发送事务消息,事务消息对应的生产者为TransactionMQProducer,创建TransactionMQProducer之后,设置上一步自定义的事务监听器OrderTransactionListenerImpl,然后将订单ID放入消息体中, 调用sendMessageInTransaction发送事务消息:
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
// 创建下单事务监听器
TransactionListener transactionListener = new OrderTransactionListenerImpl();
// 创建生产者
TransactionMQProducer producer = new TransactionMQProducer("order_group");
// 事务状态回查线程池
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
// 设置线程池
producer.setExecutorService(executorService);
// 设置事务监听器
producer.setTransactionListener(transactionListener);
// 启动生产者
producer.start();
try {
// 创建订单消息
OrderDTO orderDTO = new OrderDTO();
// 模拟生成订单唯一标识
orderDTO.setOrderId(UUID.randomUUID().toString());
// 转为字节数组
byte[] msgBody = JSON.toJSONString(orderDTO).getBytes(RemotingHelper.DEFAULT_CHARSET);
// 构建消息
Message msg = new Message("ORDER_TOPIC", msgBody);
// 调用sendMessageInTransaction发送事务消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf(sendResult.toString());
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
事务的执行流程:
使用事务消息不会存在订单创建失败但是消息发送成功的情况,不过你可能还有一个疑问,假如订单创建成功了,消息已经投送到队列中,但是积分服务在消费的时候失败了,这样数据还是处于不一致的状态,个人感觉,积分服务可以在失败的时候进行重试或者进行一些其他的补偿机制来保证积分记录成功的生成,在极端情况下积分记录依旧没有生成,此时可能就要人工接入处理了。
RocketMQ在4.3.0版中开始支持事务消息,它使用两阶段提交协议实现事务消息,同时增加补偿机制定时对事务的状态进行回查,来处理未提交/回滚的事务。
两阶段提交
发送事务消息分为两个阶段:
第一阶段:生产者向Broker发送half(prepare)消息,生产者发送事务消息的时候,消息不会直接存入对应的主题中,而是先将消息存入RMQ_SYS_TRANS_HALF_TOPIC主题中,此时消息对消费者不可见,不能被消费者消费,称为half消息,half消息发送成功之后,开始执行本地事务。
第二阶段:提交阶段,根据第一阶段的本地事务执行结果来决定是提交事务还是回滚事务,提交或者回滚的事务会从RMQ_SYS_TRANS_HALF_TOPIC中删除,对于提交的事务消息,会将消息投送到实际的主题队列中,之后消费者可以从队列中拉取到消息进行消费,对于回滚的事务消息,直接从RMQ_SYS_TRANS_HALF_TOPIC主题中删除即可。
注意:由于RocketMQ追加写的性能并不会直接从RMQ_SYS_TRANS_HALF_TOPIC队列中删除消息,而是使用了另外一个队列,将已提交或者回滚的事务放入到OP队列中,在补偿机制对half消息进行检查的时候会从OP中判断是消息是否已经提交或者回滚。
补偿机制
两阶段提交事务的过程中,任一阶段出现异常都有可能导致事务未能成功的进行提交/回滚,所以需要增加一种补偿机制,定时对RMQ_SYS_TRANS_HALF_TOPIC主题中的half消息进行处理。
RocketMQ使用了一种回查机制,在处理half消息时,对该消息的本地事务执行状态进行回查,根据回查结果决定是否需要提交/回滚,或者是等待下一次回查。
接下来就从源码的角度研究一下事务的实现原理。
上面可知,发送事务消息调用的是TransactionMQProducer的sendMessageInTransaction方法:
public class TransactionMQProducer extends DefaultMQProducer {
@Override
public TransactionSendResult sendMessageInTransaction(final Message msg,
final Object arg) throws MQClientException {
if (null == this.transactionListener) {
throw new MQClientException("TransactionListener is null", null);
}
// 设置主题
msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
// 发送事务消息
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}
}
sendMessageInTransaction在DefaultMQProducerImpl中实现,主要有以下几个步骤:
public class DefaultMQProducerImpl implements MQProducerInner {
// 发送事务消息
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
// 获取事务监听器
TransactionListener transactionListener = getCheckListener();
// 如果本地事务执行器或者监听为空
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
// ...
SendResult sendResult = null;
// 设置prepared属性
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
// 设置生产者组
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
// 发送消息
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
// 本地事务状态
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) { // 判断消息发送状态
case SEND_OK: { // 如果发送成功
try {
// ...
if (null != localTransactionExecuter) { // 如果本地事务执行器不为空
// 执行本地事务
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) { // 如果事务监听器不为空
log.debug("Used new transaction API");
// 执行本地事务
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
// 如果本地事务状态为空,设置为UNKNOW
localTransactionState = LocalTransactionState.UNKNOW;
}
// ...
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; // 本地事务状态设置为回滚
break;
default:
break;
}
try {
// 结束事务
this.endTransaction(msg, sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
// ...
return transactionSendResult;
}
}
Broker对消息发送请求的处理在SendMessageProcessor中,当Broker收到消息后,判断消息是否含有PROPERTY_TRANSACTION_PREPARED属性,如果含有prepared属性,会获取TransactionalMessageService,然后调用asyncPrepareMessage对消息进行处理:
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
private CompletableFuture asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
SendMessageContext mqtraceContext,
SendMessageRequestHeader requestHeader) {
final RemotingCommand response = preSend(ctx, request, requestHeader);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
// ...
CompletableFuture putMessageResult = null;
// 获取prepared属性标记
String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
// 如果事务标记不为空
if (transFlag != null && Boolean.parseBoolean(transFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return CompletableFuture.completedFuture(response);
}
// 事务消息持久化
putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
// 普通消息持久化
putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
}
}
TransactionalMessageServiceImpl的asyncPrepareMessage方法中,又调用了TransactionalMessageBridge的asyncPutHalfMessage方法,添加half消息:
public class TransactionalMessageServiceImpl implements TransactionalMessageService {
@Override
public CompletableFuture asyncPrepareMessage(MessageExtBrokerInner messageInner) {
// 添加half消息
return transactionalMessageBridge.asyncPutHalfMessage(messageInner);
}
}
在TransactionalMessageBridge的asyncPutHalfMessage方法中,调用了parseHalfMessageInner方法设置half消息的相关属性。
因为是half消息,此时还不能直接加入到实际的消息队列中,否则一旦加入就会被消费者消费,所以需要先对half消息暂存,等收到消息提交请求时才可以添加到实际的消息队列中,RocketMQ设置了一个RMQ_SYS_TRANS_HALF_TOPIC主题来暂存half消息。
在parseHalfMessageInner方法中,会对消息进行如下处理:
之后调用asyncPutMessage添加消息,接下来的流程就和普通消息的添加一致了,具体可参考【RocketMQ】消息的存储 :
public class TransactionalMessageBridge {
public CompletableFuture asyncPutHalfMessage(MessageExtBrokerInner messageInner) {
// 添加消息
return store.asyncPutMessage(parseHalfMessageInner(messageInner));
}
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
// 设置实际的主题
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
// 设置实际的队列ID
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
// 设置事务主题RMQ_SYS_TRANS_HALF_TOPIC
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
// 设置事务队列ID
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
}
public class TransactionalMessageUtil {
public static String buildHalfTopic() {
// half消息主题
return TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
}
}
在进行了half消息发送和执行本地事务的操作后,消息暂存在Broker的half主题中,接下来生产者需要根据本地事务的执行结果,向Broker发送结束事务的请求,结束事务的方法endTransaction在DefaultMQProducerImpl中实现:
public class DefaultMQProducerImpl implements MQProducerInner {
public void endTransaction(
final Message msg,
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
// 消息
final MessageId id;
if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
} else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}
// 获取事务ID
String transactionId = sendResult.getTransactionId();
// 获取Broker地址
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
// 结束事务请求头
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
// 设置事务ID
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
// 判断本地事务状态
switch (localTransactionState) {
case COMMIT_MESSAGE: // 如果提交
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE: // 如果是回滚
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW: // 未知
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
// 发送结束事务的请求
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
}
}
Broker对事务结束的请求处理在EndTransactionProcessor中:
public class EndTransactionProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
RemotingCommandException {
// 创建响应
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final EndTransactionRequestHeader requestHeader =
(EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
LOGGER.debug("Transaction request:{}", requestHeader);
// 如果是从节点,从节点没有结束事务的权限,返回SLAVE_NOT_AVAILABLE
if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
return response;
}
// ...
OperationResult result = new OperationResult();
// 判断事务提交类型,如果是提交事务
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
// 提交消息
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
// 校验Prepare消息
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 结束事务,恢复消息的原始主题和队列
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
// 删除half消息
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) { // 如果是回滚
// 回滚消息
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 删除half消息
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
response.setCode(result.getResponseCode());
response.setRemark(result.getResponseRemark());
return response;
}
}
由于CommitLog追加写的性质,RocketMQ并不会直接将half消息从CommitLog中删除,而是使用了另外一个OP主题RMQ_SYS_TRANS_OP_HALF_TOPIC(以下简称OP主题/队列),将已经提交/回滚的消息记录在OP主题队列中:
public class TransactionalMessageServiceImpl implements TransactionalMessageService {
@Override
public boolean deletePrepareMessage(MessageExt msgExt) {
// 添加到OP消息队列
if (this.transactionalMessageBridge.putOpMessage(msgExt, TransactionalMessageUtil.REMOVETAG)) {
log.debug("Transaction op message write successfully. messageId={}, queueId={} msgExt:{}", msgExt.getMsgId(), msgExt.getQueueId(), msgExt);
return true;
} else {
log.error("Transaction op message write failed. messageId is {}, queueId is {}", msgExt.getMsgId(), msgExt.getQueueId());
return false;
}
}
}
putOpMessage方法在TransactionalMessageBridge中实现,它又调用了addRemoveTagInTransactionOp方法向OP队列中添加消息:
public class TransactionalMessageBridge {
private final ConcurrentHashMap opQueueMap = new ConcurrentHashMap<>();
public boolean putOpMessage(MessageExt messageExt, String opType) {
// 构建消息队列,设置消息所属主题、Broker名称、队列ID信息
MessageQueue messageQueue = new MessageQueue(messageExt.getTopic(),
this.brokerController.getBrokerConfig().getBrokerName(), messageExt.getQueueId());
if (TransactionalMessageUtil.REMOVETAG.equals(opType)) {
// 添加OP消息
return addRemoveTagInTransactionOp(messageExt, messageQueue);
}
return true;
}
/**
* 当事务消息进行提交或者回滚时,记录在operation队列中(OP队列)
*/
private boolean addRemoveTagInTransactionOp(MessageExt prepareMessage, MessageQueue messageQueue) {
// 构建OP消息,主题为RMQ_SYS_TRANS_OP_HALF_TOPIC
Message message = new Message(TransactionalMessageUtil.buildOpTopic(), TransactionalMessageUtil.REMOVETAG,
String.valueOf(prepareMessage.getQueueOffset()).getBytes(TransactionalMessageUtil.charset));
// 将消息写入OP队列
writeOp(message, messageQueue);
return true;
}
private void writeOp(Message message, MessageQueue mq) {
MessageQueue opQueue;
// 如果已经添加过
if (opQueueMap.containsKey(mq)) {
opQueue = opQueueMap.get(mq);
} else {
opQueue = getOpQueueByHalf(mq);
MessageQueue oldQueue = opQueueMap.putIfAbsent(mq, opQueue);
if (oldQueue != null) {
opQueue = oldQueue;
}
}
// 如果为空
if (opQueue == null) {
// 创建
opQueue = new MessageQueue(TransactionalMessageUtil.buildOpTopic(), mq.getBrokerName(), mq.getQueueId());
}
// 构建OP消息添加到OP队列中
putMessage(makeOpMessageInner(message, opQueue));
}
}
由于各种原因有可能未成功收到提交/回滚事务的请求,所以RocketMQ需要定期检查half消息,检查事务的执行结果,TransactionalMessageCheckService用于half消息状态的检查,它实现了ServiceThread,默认可以看到在onWaitEnd方法中调用了check方法进行状态检查:
public class TransactionalMessageCheckService extends ServiceThread {
@Override
protected void onWaitEnd() {
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);
// 状态检查
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}
}
check方法在TransactionalMessageServiceImpl中实现:
public class TransactionalMessageServiceImpl implements TransactionalMessageService {
@Override
public void check(long transactionTimeout, int transactionCheckMax,
AbstractTransactionalMessageCheckListener listener) {
try {
String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
// 根据主题获取消息队列
Set msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0) {
log.warn("The queue of topic is empty :" + topic);
return;
}
log.debug("Check topic={}, queues={}", topic, msgQueues);
// 遍历所有的消息队列
for (MessageQueue messageQueue : msgQueues) {
// 获取当前时间做为开始时间
long startTime = System.currentTimeMillis();
// 获取对应的OP消息队列
MessageQueue opQueue = getOpQueue(messageQueue);
// 获取half消息队列的消费进度
long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
// 获取op消息队列的消费进度
long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
// 如果消费进度小于0表示不合法
if (halfOffset < 0 || opOffset < 0) {
log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
halfOffset, opOffset);
continue;
}
// 存储已处理的消息
List doneOpOffset = new ArrayList<>();
HashMap removeMap = new HashMap<>();
// 根据当前的消费进度从已处理队列中拉取消息
PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
// 如果拉取消息为空,打印错误继续处理下一个消息队列
if (null == pullResult) {
log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",
messageQueue, halfOffset, opOffset);
continue;
}
// 获取消息为空的数量默认为1
int getMessageNullCount = 1;
// 新的进度
long newOffset = halfOffset;
// 获取half队列的消费进度,赋值给i
long i = halfOffset;
while (true) {
// 如果当前时间减去开始时间大于最大处理时间限制,终止循环
if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
break;
}
// 如果OP队列中包含当前偏移量,表示消息已经被处理,加入到已处理集合中
if (removeMap.containsKey(i)) {
log.debug("Half offset {} has been committed/rolled back", i);
Long removedOpOffset = removeMap.remove(i);
// 加入到doneOpOffset集合中
doneOpOffset.add(removedOpOffset);
} else { // 如果已处理队列中不包含当前消息
// 根据偏移量从half队列获取half消息
GetResult getResult = getHalfMsg(messageQueue, i);
// 获取消息对象
MessageExt msgExt = getResult.getMsg();
// 如果获取消息为空
if (msgExt == null) {
// 判断获取空消息的次数是否大于MAX_RETRY_COUNT_WHEN_HALF_NULL
if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
break;
}
// 判断从half队列获取消息的结果是NO_NEW_MSG,表示没有消息,此时终止循环等待下一次进行检查
if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
messageQueue, getMessageNullCount, getResult.getPullResult());
break;
} else {
log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",
i, messageQueue, getMessageNullCount, getResult.getPullResult());
// 走到这里说明消息的偏移量不合法,继续获取下一条消息进行处理
i = getResult.getPullResult().getNextBeginOffset();
newOffset = i;
continue;
}
}
// 是否需要丢弃消息或者需要跳过消息
if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
listener.resolveDiscardMsg(msgExt);
// 继续处理下一条消息
newOffset = i + 1;
i++;
continue;
}
// 如果消息的添加时间是否大于等于本次检查的开始时间,说明是在检查开始之后加入的消息,暂不进行处理
if (msgExt.getStoreTimestamp() >= startTime) {
log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,
new Date(msgExt.getStoreTimestamp()));
break;
}
// 计算half消息在队列中的保留时间:当前时间减去消息加入的时间
long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
// 事务超时时间
long checkImmunityTime = transactionTimeout;
// 获取PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS属性,表示事务回查最晚的时间
String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
// 如果PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS属性不为空
if (null != checkImmunityTimeStr) {
// 获取事务回查最晚检查时间,如果checkImmunityTimeStr为-1则返回事务超时时间,否则返回checkImmunityTimeStr转为long后乘以1000得到的值
checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
// 如果消息的保留时间小于事务回查最晚检查时间
if (valueOfCurrentMinusBorn < checkImmunityTime) {
// 检查half消息在队列中的偏移量,如果返回true跳过本条消息
if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
// 处理下一个消息
newOffset = i + 1;
i++;
continue;
}
}
} else {
// 如果valueOfCurrentMinusBorn小于checkImmunityTime
if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
checkImmunityTime, new Date(msgExt.getBornTimestamp()));
break;
}
}
// 获取OP消息
List opMsg = pullResult.getMsgFoundList();
// 判断是否需要检查,满足检查的条件为以下三种情况之一:
// 1.拉取消息为空并且消息的保留时间已经大于事务设置的最晚回查时间
// 2.拉取消息不为空并且拉取到的最后一条消息的存入时间减去当前时间超过了事务的超时时间
// 3.half消息存留时间为负数
boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
|| (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
|| (valueOfCurrentMinusBorn <= -1);
// 如果需要进行回查
if (isNeedCheck) {
// 将half消息重新加入到队列中
if (!putBackHalfMsgQueue(msgExt, i)) {
continue;
}
// 发送回查请求
listener.resolveHalfMsg(msgExt);
} else {
// 继续从OP队列中拉取消息
pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
messageQueue, pullResult);
continue;
}
}
// 加1继续处理下一条消息
newOffset = i + 1;
i++;
}
if (newOffset != halfOffset) {
// 更新消费进度
transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
}
long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
if (newOpOffset != opOffset) {
// 更新处理进度
transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
}
}
} catch (Throwable e) {
log.error("Check error", e);
}
}
}
在check方法中会获取half主题(RMQ_SYS_TRANS_HALF_TOPIC)下的所有消息队列,遍历所有的half消息队列,对队列中的half消息进行处理,主要步骤如下。
一、 构建OP队列的消息队列对象MessageQueue
调用getOpQueue获取当前half消息队列对应的OP队列的MessageQueue对象,实际上是创建了一个MessageQueue对象,设置为OP队列的主题、以及Broker名称和队列的ID,在后面获取消费进度时使用:
private MessageQueue getOpQueue(MessageQueue messageQueue) {
// 获取OP消息队列
MessageQueue opQueue = opQueueMap.get(messageQueue);
if (opQueue == null) {
// 如果获取为空,则创建MessageQueue,主题设置为OP TOPIC,设置Broker名称和队列ID
opQueue = new MessageQueue(TransactionalMessageUtil.buildOpTopic(), messageQueue.getBrokerName(),
messageQueue.getQueueId());
// 加入到opQueueMap中
opQueueMap.put(messageQueue, opQueue);
}
return opQueue;
}
二、获取half队列的消费进度和OP消费队列的消费进度
消费进度的获取是通过调用transactionalMessageBridge的fetchConsumeOffset方法进行查询的,可以看到方法的入参是MessageQueue类型的,所以第一步需要构造OP队列的MessageQueue对象,在这一步查询消费进度使用:
public long fetchConsumeOffset(MessageQueue mq) {
long offset = brokerController.getConsumerOffsetManager().queryOffset(TransactionalMessageUtil.buildConsumerGroup(),
mq.getTopic(), mq.getQueueId());
if (offset == -1) {
offset = store.getMinOffsetInQueue(mq.getTopic(), mq.getQueueId());
}
return offset;
}
三、从OP队列中拉取消息
调用fillOpRemoveMap方法根据消费进度信息从OP队列中拉取消息,将拉取的消费放入removeMap中,用于判断half消息是否已经处理:
private PullResult fillOpRemoveMap(HashMap removeMap,
MessageQueue opQueue, long pullOffsetOfOp, long miniOffset, List doneOpOffset) {
// 从OP队列中拉取消息,每次拉取32条
PullResult pullResult = pullOpMsg(opQueue, pullOffsetOfOp, 32);
// 如果拉取为空返回null
if (null == pullResult) {
return null;
}
// 如果拉取状态为消费进度不合法或者没有匹配的消息
if (pullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL
|| pullResult.getPullStatus() == PullStatus.NO_MATCHED_MSG) {
log.warn("The miss op offset={} in queue={} is illegal, pullResult={}", pullOffsetOfOp, opQueue,
pullResult);
// 从拉取结果中获取消费进度并更新
transactionalMessageBridge.updateConsumeOffset(opQueue, pullResult.getNextBeginOffset());
return pullResult;
} else if (pullResult.getPullStatus() == PullStatus.NO_NEW_MSG) { // 如果没有消息
log.warn("The miss op offset={} in queue={} is NO_NEW_MSG, pullResult={}", pullOffsetOfOp, opQueue,
pullResult);
return pullResult;
}
// 获取拉取到的消息
List opMsg = pullResult.getMsgFoundList();
if (opMsg == null) { // 如果为空打印日志
log.warn("The miss op offset={} in queue={} is empty, pullResult={}", pullOffsetOfOp, opQueue, pullResult);
return pullResult;
}
// 遍历拉取的消息
for (MessageExt opMessageExt : opMsg) {
// 获取队列中的偏移量
Long queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset));
log.debug("Topic: {} tags: {}, OpOffset: {}, HalfOffset: {}", opMessageExt.getTopic(),
opMessageExt.getTags(), opMessageExt.getQueueOffset(), queueOffset);
if (TransactionalMessageUtil.REMOVETAG.equals(opMessageExt.getTags())) {
// 如果偏移量小于最小的偏移量
if (queueOffset < miniOffset) {
// 加入到doneOpOffset中
doneOpOffset.add(opMessageExt.getQueueOffset());
} else {
// 加入到已处理消息的集合removeMap中
removeMap.put(queueOffset, opMessageExt.getQueueOffset());
}
} else {
log.error("Found a illegal tag in opMessageExt= {} ", opMessageExt);
}
}
log.debug("Remove map: {}", removeMap);
log.debug("Done op list: {}", doneOpOffset);
return pullResult;
}
四、处理每一个half消息
开启while循环,从half队列的消费进度处开始,处理每一个half消息:
五、更新消费进度
主要是更half队列和OP队列的消费进度。
从putBackHalfMsgQueue方法中可以看出将消息重新加入到了half队列:
private boolean putBackHalfMsgQueue(MessageExt msgExt, long offset) {
// 重新将消息入到half消息队列中
PutMessageResult putMessageResult = putBackToHalfQueueReturnResult(msgExt);
// 如果加入成功
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
// 设置消息的逻辑偏移量
msgExt.setQueueOffset(
putMessageResult.getAppendMessageResult().getLogicsOffset());
// 设置消息在CommitLog的偏移量
msgExt.setCommitLogOffset(
putMessageResult.getAppendMessageResult().getWroteOffset());
// 设消息ID
msgExt.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
log.debug(
"Send check message, the offset={} restored in queueOffset={} "
+ "commitLogOffset={} "
+ "newMsgId={} realMsgId={} topic={}",
offset, msgExt.getQueueOffset(), msgExt.getCommitLogOffset(), msgExt.getMsgId(),
msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX),
msgExt.getTopic());
return true;
} else {
// 加入失败
log.error(
"PutBackToHalfQueueReturnResult write failed, topic: {}, queueId: {}, "
+ "msgId: {}",
msgExt.getTopic(), msgExt.getQueueId(), msgExt.getMsgId());
return false;
}
}
resolveHalfMsg方法中向客户端发送事务状态回查的请求,可以看到是通过线程池异步实现的:
public abstract class AbstractTransactionalMessageCheckListener {
public void resolveHalfMsg(final MessageExt msgExt) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
// 发送状态回查请求
sendCheckMessage(msgExt);
} catch (Exception e) {
LOGGER.error("Send check message error!", e);
}
}
});
}
}
sendCheckMessage方法在AbstractTransactionalMessageCheckListener中实现,主要是构建请求信息,然后向消息的生产者发送事务状态回查的请求:
public abstract class AbstractTransactionalMessageCheckListener {
public void sendCheckMessage(MessageExt msgExt) throws Exception {
// 构建回查请求头
CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset()); // 设置Commitlog偏移量
checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
// 设置消息实际的TOPIC
msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
// 设置消息实际的队列ID
msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
msgExt.setStoreSize(0);
String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
// 获取channel
Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);
if (channel != null) {
// 发送回查请求
brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
} else {
LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
}
}
}
事务状态回查请求的处理在ClientRemotingProcessor中,如果请求类型是CHECK_TRANSACTION_STATE表示是事务状态回查请求,调用checkTransactionState方法进行事务状态检查:
public class ClientRemotingProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.CHECK_TRANSACTION_STATE:
// 检查事务状态
return this.checkTransactionState(ctx, request);
// ...
default:
break;
}
return null;
}
public RemotingCommand checkTransactionState(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final CheckTransactionStateRequestHeader requestHeader =
(CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
// 获取消息
final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
// 如果消息不为空
if (messageExt != null) {
// ...
// 获取事务ID
String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
messageExt.setTransactionId(transactionId);
}
// 获取生产者组
final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
if (group != null) {
// 获取MQProducerInner
MQProducerInner producer = this.mqClientFactory.selectProducer(group);
if (producer != null) {
final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
// 调用checkTransactionState进行状态检查
producer.checkTransactionState(addr, messageExt, requestHeader);
} else {
log.debug("checkTransactionState, pick producer by group[{}] failed", group);
}
} else {
log.warn("checkTransactionState, pick producer group failed");
}
} else {
log.warn("checkTransactionState, decode message failed");
}
return null;
}
}
checkTransactionState方法在DefaultMQProducerImpl中实现,可以看到它创建了Runnable对象,然后提交到线程池中异步执行事务的状态检查,检查的主要逻辑如下:
public class DefaultMQProducerImpl implements MQProducerInner {
@Override
public void checkTransactionState(final String addr, final MessageExt msg,
final CheckTransactionStateRequestHeader header) {
Runnable request = new Runnable() {
// ...
@Override
public void run() {
// 获取TransactionCheckListener监听器(已不推荐使用)
TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
// 获取事务监听器
TransactionListener transactionListener = getCheckListener();
// 如果其中之一不为空
if (transactionCheckListener != null || transactionListener != null) {
// 初始化为UNKNOW状态
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable exception = null;
try {
if (transactionCheckListener != null) {
localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
} else if (transactionListener != null) {
log.debug("Used new check API in transaction message");
// 调用checkLocalTransaction回查状态
localTransactionState = transactionListener.checkLocalTransaction(message);
} else {
log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);
}
} catch (Throwable e) {
log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
exception = e;
}
// 处理事务状态
this.processTransactionState(
localTransactionState,
group,
exception);
} else {
log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
}
}
// 处理事务状态
private void processTransactionState(
final LocalTransactionState localTransactionState,
final String producerGroup,
final Throwable exception) {
// 构建结束事务的请求头
final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
// 设置tCommitLog的偏移量
thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
thisHeader.setProducerGroup(producerGroup);// 设置生产者组
thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
thisHeader.setFromTransactionCheck(true); // 设置状态检查为true
// ...
thisHeader.setTransactionId(checkRequestHeader.getTransactionId()); // 设置事务ID
switch (localTransactionState) {
case COMMIT_MESSAGE:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); // 设置为提交
break;
case ROLLBACK_MESSAGE:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); // 设置为回滚
log.warn("when broker check, client rollback this transaction, {}", thisHeader);
break;
case UNKNOW:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); // 设置为未知
log.warn("when broker check, client does not know this transaction state, {}", thisHeader);
break;
default:
break;
}
// ...
// 执行结束事务钩子函数
doExecuteEndTransactionHook(msg, uniqueKey, brokerAddr, localTransactionState, true);
try {
// 向Broker发送消息的回查结果
DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
3000);
} catch (Exception e) {
log.error("endTransactionOneway exception", e);
}
}
};
// 提交到线程池中执行任务
this.checkExecutor.submit(request);
}
}
总结
更新时间:2024-07-24
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号