# 分布式事务消息源码分析

作者:Ethan.Yang
博客:https://blog.ethanyang.cn (opens new window)


# 1 什么是分布式事务?

业务场景:用户 A 转账 100 元给用户 B,需要两个步骤:

  1. 扣减 A 的余额
  2. 增加 B 的余额

如果两个操作都在同一个数据库事务里,就能做到:

要么全部成功,要么全部失败(ACID 原子性)

但在微服务架构中:

  • A 服务负责扣钱
  • B 服务负责加钱

它们是两个独立服务,无法共享同一个数据库事务

image.png

这就产生了:

分布式事务问题:跨服务、跨数据库的数据一致性如何保证?

# 2 RocketMQ 的解决思路:消息最终一致性

RocketMQ 并不是“一键解决分布式事务”,而是提供一种最终一致性方案

核心思想:

先让本地事务执行 → 再根据结果决定是否投递消息 → 消费者再做下一步业务。

也就是说: A 扣款成功 → 发消息到 MQ → B 收到消息再加钱。

image.png

但这种模式有两个核心问题:

问题 1:本地事务成功,但消息发送失败怎么办?

问题 2:消息发送成功,但本地事务失败怎么办?

无论你“先业务后发送”、“先发送后业务”,都无法避免一致性问题。

为了解决这个矛盾,RocketMQ 引入了:

半消息(Half Message) + 事务回查机制

# 3 RocketMQ 事务消息怎么解决一致性?

# 3.1 半消息(Half Message)

半消息 = 已写入 CommitLog,但不会进入 ConsumeQueue、消费者不可见的消息。

特点:

  • Producer 先发送半消息
  • Broker 不会投递给消费者
  • 生产者随后执行本地事务
  • 最终由生产者告诉 Broker:提交(commit)还是回滚(rollback)

为什么要这么做?

因为这样 RocketMQ 就获得了“参与权”,可以根据业务执行结果决定消息投递与否。

# 3.2 本地事务与消息事务的区别

项目 本地事务(DB) RocketMQ 消息事务
由谁管理? Spring / MyBatis / DB RocketMQ Producer + Broker
控制范围 数据库操作 消息是否投递
失败会怎样? DB 自动回滚 消息被丢弃 / 等待回查
是否感知对方? 不感知消息 不感知数据库

两者互不干扰,但会通过事务监听器回调形成“最终一致性”。

RocketMQ 不会替你管理数据库事务;数据库事务也不会影响 MQ 的 commit/rollback 除了你主动返回结果。

# 3.3 完整流程:两阶段提交

img

第一阶段:发送半消息 + 执行本地事务

  1. Producer 调用 sendMessageInTransaction
  2. Broker 写入半消息(不可消费)
  3. RocketMQ SDK 回调你的 executeLocalTransaction
  4. 你在这里执行 DB 本地事务(通常加 @Transactional)
    • 成功 → 返回 COMMIT_MESSAGE
    • 失败 → 返回 ROLLBACK_MESSAGE
    • 未知 → 返回 UNKNOW

第二阶段:Broker 最终确认

  • 如果收到 Commit → 将半消息变成正式消息(写入 ConsumeQueue)
  • 如果收到 Rollback → 删除半消息
  • 如果长时间没响应 → Broker 主动回查生产者(checkLocalTransaction)

# 3.4 为什么需要回查?

比如:

  • Producer 服务挂了
  • 网络抖动导致 commit 没发成功
  • Producer 返回 UNKNOW

Broker 会定时扫描 CommitLog 中的半消息,并主动调用:

CHECK_TRANSACTION_STATE
1

回查本地事务是否成功。

# 4 事务消息 Demo

# 4.1 生产者

public static void main(String[] args) throws Exception {

    TransactionListener transactionListener = new TransactionListenerImpl();

    TransactionMQProducer producer =
            new TransactionMQProducer("tx_producer_group");

    producer.setNamesrvAddr("127.0.0.1:9876");

    // 回查线程池
    ExecutorService executorService = new ThreadPoolExecutor(
            2, 5, 100,
            TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000)
    );

    producer.setExecutorService(executorService);
    producer.setTransactionListener(transactionListener);

    producer.start();

    // ===== 第一步:发送半消息 =====
    Message msg = new Message(
            "TxTopic",
            "TagA",
            "KEY_10001",
            "A转B 100元".getBytes(RemotingHelper.DEFAULT_CHARSET));

    SendResult result = producer.sendMessageInTransaction(msg, null);
    System.out.println("半消息发送结果:" + result);

    Thread.sleep(100000);
    producer.shutdown();
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

# 4.2 执行本地事务

public class TransactionListenerImpl implements TransactionListener {

    // 第一阶段:执行本地事务
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {

        try {
            // ===== 这里执行你的本地数据库事务 =====
            // @Transactional 方法内部发生异常会自动回滚,并继续向外抛出
            transactionService.doBizTransaction(); 

            // DB 成功 → 提交消息
            return LocalTransactionState.COMMIT_MESSAGE;

        } catch (Exception e) {
            // DB 失败 → 回滚消息
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }

    // Broker 回查事务状态
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {

        // 去数据库查该事务是否成功
        boolean success = transactionService.checkBizResult(msg.getKeys());

        if (success) {
            return LocalTransactionState.COMMIT_MESSAGE;
        } else {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

避免单点问题, 如果 producer1 挂了,而 producer2 仍在同一个 Producer Group, 那么:

Broker 的事务回查会发送给任意存活的 Producer。同一个 group 里都能处理。

所以事务消息不会因为生产者宕机而卡死。

# 5 分布式事务源码分析

从前面的流程可以看到,RocketMQ 的事务消息大致分为三步:

  1. 发送半消息(Half Message)
  2. 结束事务:提交 / 回滚半消息
  3. Broker 定时回查未决事务

注意:本章分析的是 RocketMQ 消息事务 的源码,不包含 DB 的 @Transactional,DB 事务完全由业务代码负责。

# 5.1 发送半消息源码流程

# 5.1.1 Producer :sendMessageInTransaction

业务代码调用:

SendResult sendResult = producer.sendMessageInTransaction(msg, null);
1

事务 Producer 先做一层封装:

@Override
public TransactionSendResult sendMessageInTransaction(final Message msg,
                                                      final Object arg) throws MQClientException {
    if (null == this.transactionListener) {
        throw new MQClientException("TransactionListener is null", null);
    }

    msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
    return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}
1
2
3
4
5
6
7
8
9
10

真正的事务发送逻辑在 DefaultMQProducerImpl.sendMessageInTransaction

public TransactionSendResult sendMessageInTransaction(final Message msg,
                                                      final LocalTransactionExecuter localTransactionExecuter,
                                                      final Object arg) throws MQClientException {
    TransactionListener transactionListener = getCheckListener();
    if (localTransactionExecuter == null && transactionListener == null) {
        throw new MQClientException("tranExecutor is null", null);
    }

    // 事务消息不支持延时等级
    if (msg.getDelayTimeLevel() != 0) {
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
    }

    Validators.checkMessage(msg, this.defaultMQProducer);

    // 标记这是一个“Prepared 事务消息”
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP,
            this.defaultMQProducer.getProducerGroup());

    // === 第一步:同步发送“半消息” ===
    SendResult sendResult = this.send(msg);

    // === 第二步:回调本地事务(executeLocalTransaction),后面在别处分析 ===
    ...
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

关键点:

  • 此时只是把消息标记为 事务预备消息(Prepared),并发送给 Broker;
  • 还没有执行本地业务事务,这一步只解决“把半消息落到 Broker”。

# 5.1.2 Broker :写入半消息专用 Topic

Broker 使用 Netty 接收请求,对应处理器是 SendMessageProcessor.processRequest,最终会走到消息存储逻辑 DefaultMessageStore.asyncPutMessage

关键区别在这里(简化后的逻辑):

if (isTransactionPreparedMessage(msgInner)) {
    // 分布式事务:处理半消息(Prepared)
    putMessageResult = this.brokerController
            .getTransactionalMessageService()
            .asyncPrepareMessage(msgInner);
} else {
    // 普通消息 / 事务消息的 Commit / Rollback
    putMessageResult = this.brokerController
            .getMessageStore()
            .asyncPutMessage(msgInner);
}
1
2
3
4
5
6
7
8
9
10
11

asyncPrepareMessage 内部调用 TransactionalMessageBridge.asyncPutHalfMessage,在这里对“半消息”进行转换:

private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
    // 保存真实的 Topic 和 QueueId 到属性中
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
            String.valueOf(msgInner.getQueueId()));

    // 设置为“事务未决类型”
    msgInner.setSysFlag(
            MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));

    // 半消息真正写入的 Topic:RMQ_SYS_TRANS_HALF_TOPIC
    msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
    msgInner.setQueueId(0);

    msgInner.setPropertiesString(
            MessageDecoder.messageProperties2String(msgInner.getProperties()));
    return msgInner;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

小结:

  • 业务看到的 Topic 是你自己写的,例如:TxTopic
  • Broker 实际存储时会把半消息写到系统 Topic:RMQ_SYS_TRANS_HALF_TOPIC
  • 同时在属性中记录原来的 REAL_TOPICREAL_QUEUE_ID,后续 commit 时会用回去。

# 5.2 提交 / 回滚事务消息源码流程

当 Producer 执行完本地事务(DB 事务)后,会在 TransactionListener.executeLocalTransaction 中返回:

  • COMMIT_MESSAGE → 提交
  • ROLLBACK_MESSAGE → 回滚
  • UNKNOW → 暂时不确定,后面由回查决定

这些状态最终会在 DefaultMQProducerImpl.sendMessageInTransaction 中交给 endTransaction

try {
    // endTransaction:告诉 Broker 本条半消息是 commit 还是 rollback
    this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
    log.warn("local transaction execute " + localTransactionState
             + ", but end broker transaction failed", e);
}
1
2
3
4
5
6
7

endTransaction 会组装请求头并调用 endTransactionOneway

requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());

String remark = localException != null
        ? ("executeLocalTransactionBranch exception: " + localException)
        : null;

this.mQClientFactory.getMQClientAPIImpl()
        .endTransactionOneway(brokerAddr, requestHeader, remark,
                this.defaultMQProducer.getSendMsgTimeout());
1
2
3
4
5
6
7
8
9
10
11

endTransactionOneway 只是发一个 END_TRANSACTION 请求给 Broker:

public void endTransactionOneway(final String addr,
                                 final EndTransactionRequestHeader requestHeader,
                                 final String remark,
                                 final long timeoutMillis) throws RemotingException {
    RemotingCommand request =
            RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);
    request.setRemark(remark);
    this.remotingClient.invokeOneway(addr, request, timeoutMillis);
}
1
2
3
4
5
6
7
8
9

# 5.2.1 Broker :处理 COMMIT / ROLLBACK

Broker 启动时注册了 END_TRANSACTION 的处理器 EndTransactionProcessor

this.remotingServer.registerProcessor(
        RequestCode.END_TRANSACTION,
        new EndTransactionProcessor(this),
        this.endTransactionExecutor);
1
2
3
4

EndTransactionProcessor 中,对于 Commit / Rollback 的处理大致是:

# 1. Commit 分支
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
    // 通过 commitLogOffset 找到原来的半消息
    OperationResult result = this.brokerController
            .getTransactionalMessageService()
            .commitMessage(requestHeader);

    if (result.getResponseCode() == ResponseCode.SUCCESS) {

        // 校验半消息合法性
        RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);

        if (res.getCode() == ResponseCode.SUCCESS) {
            // 从半消息还原出真实 Topic / QueueId
            MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
            msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(
                    msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));

            msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
            msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
            msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());

            // 清除事务标记
            MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);

            // === 1)写入真实 Topic,对消费者可见 ===
            RemotingCommand sendResult = sendFinalMessage(msgInner);

            // === 2)删除 RMQ_SYS_TRANS_HALF_TOPIC 中的半消息 ===
            if (sendResult.getCode() == ResponseCode.SUCCESS) {
                this.brokerController.getTransactionalMessageService()
                        .deletePrepareMessage(result.getPrepareMessage());
            }
            return sendResult;
        }
        return res;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

# 2. Rollback 分支

else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
    OperationResult result = this.brokerController
            .getTransactionalMessageService()
            .rollbackMessage(requestHeader);

    if (result.getResponseCode() == ResponseCode.SUCCESS) {
        RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
        if (res.getCode() == ResponseCode.SUCCESS) {
            // 回滚只需要删除半消息即可,不投递给消费者
            this.brokerController.getTransactionalMessageService()
                    .deletePrepareMessage(result.getPrepareMessage());
        }
        return res;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

commitMessage / rollbackMessage 的核心都是通过 commitLogOffset 找到原来的半消息:

@Override
public OperationResult commitMessage(EndTransactionRequestHeader requestHeader) {
    return getHalfMessageByOffset(requestHeader.getCommitLogOffset());
}

@Override
public OperationResult rollbackMessage(EndTransactionRequestHeader requestHeader) {
    return getHalfMessageByOffset(requestHeader.getCommitLogOffset());
}

private OperationResult getHalfMessageByOffset(long commitLogOffset) {
    OperationResult response = new OperationResult();
    // 根据 commitLogOffset 拿到半消息
    MessageExt messageExt = this.transactionalMessageBridge.lookMessageByOffset(commitLogOffset);
    if (messageExt != null) {
        response.setPrepareMessage(messageExt);
        response.setResponseCode(ResponseCode.SUCCESS);
    } else {
        response.setResponseCode(ResponseCode.SYSTEM_ERROR);
        response.setResponseRemark("Find prepared transaction message failed");
    }
    return response;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

小结:

  • Commit:从半消息恢复真实 Topic → 写入正常队列 → 删除半消息
  • Rollback:只删除半消息,不投递

这一步完全是 “消息事务”层面的提交与回滚,不关心 DB。

# 5.3 事务回查源码流程

当 Producer 在执行本地事务时返回 UNKNOW,或者写半消息成功后迟迟没有 commit/rollback,Broker 就会通过定时任务发起回查。

# 5.3.1 Broker 启动事务回查定时任务

Broker 初始化事务相关组件:

private void initialTransaction() {
    this.transactionalMessageService =
            new TransactionalMessageServiceImpl(
                    new TransactionalMessageBridge(this, this.getMessageStore()));
    this.transactionalMessageCheckListener =
            new DefaultTransactionalMessageCheckListener();
    this.transactionalMessageCheckListener.setBrokerController(this);

    // 事务回查定时任务(默认 60s)
    this.transactionalMessageCheckService =
            new TransactionalMessageCheckService(this);
}
1
2
3
4
5
6
7
8
9
10
11
12

主节点启动时真正开启回查线程:

private void startProcessorByHa(BrokerRole role) {
    if (BrokerRole.SLAVE != role) {
        if (this.transactionalMessageCheckService != null) {
            this.transactionalMessageCheckService.start();
        }
    }
}
1
2
3
4
5
6
7

回查线程内部是一个定时循环:

@Override
public void run() {
    long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
    while (!this.isStopped()) {
        this.waitForRunning(checkInterval);
        // 核心:check()
    }
}
1
2
3
4
5
6
7
8

TransactionalMessageServiceImpl.check() 中会遍历半消息队列,找出需要回查的消息:

if (isNeedCheck) {
    if (!putBackHalfMsgQueue(msgExt, i)) {
        continue;
    }
    // 这里触发回查
    listener.resolveHalfMsg(msgExt);
}
1
2
3
4
5
6
7

# 5.3.2 Broker → Producer:发送 CHECK_TRANSACTION_STATE

AbstractTransactionalMessageCheckListener.resolveHalfMsg

public void resolveHalfMsg(final MessageExt msgExt) {
    executorService.execute(new Runnable() {
        @Override
        public void run() {
            try {
                // 发送事务回查请求到 Producer
                sendCheckMessage(msgExt);
            } catch (Exception e) {
                LOGGER.error("Send check message error!", e);
            }
        }
    });
}
1
2
3
4
5
6
7
8
9
10
11
12
13

sendCheckMessage 组装请求头:

public void sendCheckMessage(MessageExt msgExt) throws Exception {

    CheckTransactionStateRequestHeader header = new CheckTransactionStateRequestHeader();
    header.setCommitLogOffset(msgExt.getCommitLogOffset());
    header.setOffsetMsgId(msgExt.getMsgId());
    header.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
    header.setTransactionId(header.getMsgId());
    header.setTranStateTableOffset(msgExt.getQueueOffset());

    // 还原真实 Topic 和 QueueId,便于业务识别
    msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
    msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
    msgExt.setStoreSize(0);

    String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
    Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);

    if (channel != null) {
        brokerController.getBroker2Client()
                .checkProducerTransactionState(groupId, channel, header, msgExt);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

Broker2Client.checkProducerTransactionState 最终发一个 CHECK_TRANSACTION_STATE 请求给 Producer:

public void checkProducerTransactionState(
        final String group,
        final Channel channel,
        final CheckTransactionStateRequestHeader header,
        final MessageExt messageExt) throws Exception {
    RemotingCommand request =
            RemotingCommand.createRequestCommand(
                    RequestCode.CHECK_TRANSACTION_STATE, header);
    request.setBody(MessageDecoder.encode(messageExt, false));

    this.brokerController.getRemotingServer()
            .invokeOneway(channel, request, 10);
}
1
2
3
4
5
6
7
8
9
10
11
12
13

# 5.3.3 Producer :接收回查 & 回应状态

Producer 作为“Server 端”接受回查请求,在 MQClientAPIImpl 构造时注册处理器:

this.remotingClient.registerProcessor(
        RequestCode.CHECK_TRANSACTION_STATE,
        this.clientRemotingProcessor, null);
1
2
3

ClientRemotingProcessor.processRequest 中分发到 checkTransactionState

case RequestCode.CHECK_TRANSACTION_STATE:
    return this.checkTransactionState(ctx, request);
1
2

内部会调用到 DefaultMQProducerImpl.checkTransactionState

@Override
public void checkTransactionState(final String addr,
                                  final MessageExt msg,
                                  final CheckTransactionStateRequestHeader header) {
    Runnable request = new Runnable() {
        @Override
        public void run() {
            TransactionListener listener = getCheckListener();
            LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
            Throwable exception = null;
            try {
                // === 回调业务侧的事务回查逻辑 ===
                localTransactionState = listener.checkLocalTransaction(msg);
            } catch (Throwable e) {
                exception = e;
            }
            // 把回查结果再通过 endTransaction 通知 Broker
            processTransactionState(localTransactionState,
                    defaultMQProducer.getProducerGroup(),
                    exception);
        }
    };
    // 放进线程池执行
    this.checkExecutor.submit(request);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

TransactionListenerImpl.checkLocalTransaction 就是业务回查代码,例如:

@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
    // 根据 msg.getKeys() / transactionId 去查 DB
    boolean ok = checkFromDB(msg.getKeys());
    return ok ? LocalTransactionState.COMMIT_MESSAGE
              : LocalTransactionState.ROLLBACK_MESSAGE;
}
1
2
3
4
5
6
7

processTransactionState 最终会再次调用 endTransactionOneway 把结果发回 Broker(和 5.2 完全同一套逻辑)。

# 5.3.4 回查的本质

  • 回查由 Broker 定时发起:Broker 会定期扫描未提交/回滚的半消息,并主动向 Producer 发起回查请求。
  • Producer 收到回查后只做一件事:执行你实现的 checkLocalTransaction 方法,根据业务状态返回 提交 / 回滚 / 未知
  • Broker 根据返回结果处理半消息
    • COMMIT → 恢复为真实消息并投递
    • ROLLBACK → 删除半消息
    • UNKNOW → 下次继续回查