# 分布式事务消息源码分析
作者:Ethan.Yang
博客:https://blog.ethanyang.cn (opens new window)
# 1 什么是分布式事务?
业务场景:用户 A 转账 100 元给用户 B,需要两个步骤:
- 扣减 A 的余额
- 增加 B 的余额
如果两个操作都在同一个数据库事务里,就能做到:
要么全部成功,要么全部失败(ACID 原子性)
但在微服务架构中:
- A 服务负责扣钱
- B 服务负责加钱
它们是两个独立服务,无法共享同一个数据库事务

这就产生了:
分布式事务问题:跨服务、跨数据库的数据一致性如何保证?
# 2 RocketMQ 的解决思路:消息最终一致性
RocketMQ 并不是“一键解决分布式事务”,而是提供一种最终一致性方案:
核心思想:
先让本地事务执行 → 再根据结果决定是否投递消息 → 消费者再做下一步业务。
也就是说: A 扣款成功 → 发消息到 MQ → B 收到消息再加钱。

但这种模式有两个核心问题:
问题 1:本地事务成功,但消息发送失败怎么办?
问题 2:消息发送成功,但本地事务失败怎么办?
无论你“先业务后发送”、“先发送后业务”,都无法避免一致性问题。
为了解决这个矛盾,RocketMQ 引入了:
半消息(Half Message) + 事务回查机制
# 3 RocketMQ 事务消息怎么解决一致性?
# 3.1 半消息(Half Message)
半消息 = 已写入 CommitLog,但不会进入 ConsumeQueue、消费者不可见的消息。
特点:
- Producer 先发送半消息
- Broker 不会投递给消费者
- 生产者随后执行本地事务
- 最终由生产者告诉 Broker:提交(commit)还是回滚(rollback)
为什么要这么做?
因为这样 RocketMQ 就获得了“参与权”,可以根据业务执行结果决定消息投递与否。
# 3.2 本地事务与消息事务的区别
| 项目 | 本地事务(DB) | RocketMQ 消息事务 |
|---|---|---|
| 由谁管理? | Spring / MyBatis / DB | RocketMQ Producer + Broker |
| 控制范围 | 数据库操作 | 消息是否投递 |
| 失败会怎样? | DB 自动回滚 | 消息被丢弃 / 等待回查 |
| 是否感知对方? | 不感知消息 | 不感知数据库 |
两者互不干扰,但会通过事务监听器回调形成“最终一致性”。
RocketMQ 不会替你管理数据库事务;数据库事务也不会影响 MQ 的 commit/rollback 除了你主动返回结果。
# 3.3 完整流程:两阶段提交

第一阶段:发送半消息 + 执行本地事务
- Producer 调用
sendMessageInTransaction - Broker 写入半消息(不可消费)
- RocketMQ SDK 回调你的
executeLocalTransaction - 你在这里执行 DB 本地事务(通常加 @Transactional)
- 成功 → 返回
COMMIT_MESSAGE - 失败 → 返回
ROLLBACK_MESSAGE - 未知 → 返回
UNKNOW
- 成功 → 返回
第二阶段:Broker 最终确认
- 如果收到 Commit → 将半消息变成正式消息(写入 ConsumeQueue)
- 如果收到 Rollback → 删除半消息
- 如果长时间没响应 → Broker 主动回查生产者(checkLocalTransaction)
# 3.4 为什么需要回查?
比如:
- Producer 服务挂了
- 网络抖动导致 commit 没发成功
- Producer 返回
UNKNOW
Broker 会定时扫描 CommitLog 中的半消息,并主动调用:
CHECK_TRANSACTION_STATE
回查本地事务是否成功。
# 4 事务消息 Demo
# 4.1 生产者
public static void main(String[] args) throws Exception {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer =
new TransactionMQProducer("tx_producer_group");
producer.setNamesrvAddr("127.0.0.1:9876");
// 回查线程池
ExecutorService executorService = new ThreadPoolExecutor(
2, 5, 100,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000)
);
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
// ===== 第一步:发送半消息 =====
Message msg = new Message(
"TxTopic",
"TagA",
"KEY_10001",
"A转B 100元".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult result = producer.sendMessageInTransaction(msg, null);
System.out.println("半消息发送结果:" + result);
Thread.sleep(100000);
producer.shutdown();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 4.2 执行本地事务
public class TransactionListenerImpl implements TransactionListener {
// 第一阶段:执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// ===== 这里执行你的本地数据库事务 =====
// @Transactional 方法内部发生异常会自动回滚,并继续向外抛出
transactionService.doBizTransaction();
// DB 成功 → 提交消息
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
// DB 失败 → 回滚消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
// Broker 回查事务状态
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 去数据库查该事务是否成功
boolean success = transactionService.checkBizResult(msg.getKeys());
if (success) {
return LocalTransactionState.COMMIT_MESSAGE;
} else {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
避免单点问题, 如果 producer1 挂了,而 producer2 仍在同一个 Producer Group, 那么:
Broker 的事务回查会发送给任意存活的 Producer。同一个 group 里都能处理。
所以事务消息不会因为生产者宕机而卡死。
# 5 分布式事务源码分析
从前面的流程可以看到,RocketMQ 的事务消息大致分为三步:
- 发送半消息(Half Message)
- 结束事务:提交 / 回滚半消息
- Broker 定时回查未决事务
注意:本章分析的是 RocketMQ 消息事务 的源码,不包含 DB 的
@Transactional,DB 事务完全由业务代码负责。
# 5.1 发送半消息源码流程
# 5.1.1 Producer :sendMessageInTransaction
业务代码调用:
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
事务 Producer 先做一层封装:
@Override
public TransactionSendResult sendMessageInTransaction(final Message msg,
final Object arg) throws MQClientException {
if (null == this.transactionListener) {
throw new MQClientException("TransactionListener is null", null);
}
msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}
2
3
4
5
6
7
8
9
10
真正的事务发送逻辑在 DefaultMQProducerImpl.sendMessageInTransaction:
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter,
final Object arg) throws MQClientException {
TransactionListener transactionListener = getCheckListener();
if (localTransactionExecuter == null && transactionListener == null) {
throw new MQClientException("tranExecutor is null", null);
}
// 事务消息不支持延时等级
if (msg.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}
Validators.checkMessage(msg, this.defaultMQProducer);
// 标记这是一个“Prepared 事务消息”
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP,
this.defaultMQProducer.getProducerGroup());
// === 第一步:同步发送“半消息” ===
SendResult sendResult = this.send(msg);
// === 第二步:回调本地事务(executeLocalTransaction),后面在别处分析 ===
...
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
关键点:
- 此时只是把消息标记为 事务预备消息(Prepared),并发送给 Broker;
- 还没有执行本地业务事务,这一步只解决“把半消息落到 Broker”。
# 5.1.2 Broker :写入半消息专用 Topic
Broker 使用 Netty 接收请求,对应处理器是 SendMessageProcessor.processRequest,最终会走到消息存储逻辑 DefaultMessageStore.asyncPutMessage。
关键区别在这里(简化后的逻辑):
if (isTransactionPreparedMessage(msgInner)) {
// 分布式事务:处理半消息(Prepared)
putMessageResult = this.brokerController
.getTransactionalMessageService()
.asyncPrepareMessage(msgInner);
} else {
// 普通消息 / 事务消息的 Commit / Rollback
putMessageResult = this.brokerController
.getMessageStore()
.asyncPutMessage(msgInner);
}
2
3
4
5
6
7
8
9
10
11
asyncPrepareMessage 内部调用 TransactionalMessageBridge.asyncPutHalfMessage,在这里对“半消息”进行转换:
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
// 保存真实的 Topic 和 QueueId 到属性中
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
// 设置为“事务未决类型”
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
// 半消息真正写入的 Topic:RMQ_SYS_TRANS_HALF_TOPIC
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
msgInner.setQueueId(0);
msgInner.setPropertiesString(
MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
小结:
- 业务看到的 Topic 是你自己写的,例如:
TxTopic - Broker 实际存储时会把半消息写到系统 Topic:
RMQ_SYS_TRANS_HALF_TOPIC - 同时在属性中记录原来的
REAL_TOPIC和REAL_QUEUE_ID,后续 commit 时会用回去。
# 5.2 提交 / 回滚事务消息源码流程
当 Producer 执行完本地事务(DB 事务)后,会在 TransactionListener.executeLocalTransaction 中返回:
COMMIT_MESSAGE→ 提交ROLLBACK_MESSAGE→ 回滚UNKNOW→ 暂时不确定,后面由回查决定
这些状态最终会在 DefaultMQProducerImpl.sendMessageInTransaction 中交给 endTransaction:
try {
// endTransaction:告诉 Broker 本条半消息是 commit 还是 rollback
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState
+ ", but end broker transaction failed", e);
}
2
3
4
5
6
7
endTransaction 会组装请求头并调用 endTransactionOneway:
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null
? ("executeLocalTransactionBranch exception: " + localException)
: null;
this.mQClientFactory.getMQClientAPIImpl()
.endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
2
3
4
5
6
7
8
9
10
11
endTransactionOneway 只是发一个 END_TRANSACTION 请求给 Broker:
public void endTransactionOneway(final String addr,
final EndTransactionRequestHeader requestHeader,
final String remark,
final long timeoutMillis) throws RemotingException {
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);
request.setRemark(remark);
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
}
2
3
4
5
6
7
8
9
# 5.2.1 Broker :处理 COMMIT / ROLLBACK
Broker 启动时注册了 END_TRANSACTION 的处理器 EndTransactionProcessor:
this.remotingServer.registerProcessor(
RequestCode.END_TRANSACTION,
new EndTransactionProcessor(this),
this.endTransactionExecutor);
2
3
4
在 EndTransactionProcessor 中,对于 Commit / Rollback 的处理大致是:
# 1. Commit 分支
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
// 通过 commitLogOffset 找到原来的半消息
OperationResult result = this.brokerController
.getTransactionalMessageService()
.commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
// 校验半消息合法性
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 从半消息还原出真实 Topic / QueueId
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(
msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
// 清除事务标记
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
// === 1)写入真实 Topic,对消费者可见 ===
RemotingCommand sendResult = sendFinalMessage(msgInner);
// === 2)删除 RMQ_SYS_TRANS_HALF_TOPIC 中的半消息 ===
if (sendResult.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService()
.deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 2. Rollback 分支
else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
OperationResult result = this.brokerController
.getTransactionalMessageService()
.rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 回滚只需要删除半消息即可,不投递给消费者
this.brokerController.getTransactionalMessageService()
.deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
commitMessage / rollbackMessage 的核心都是通过 commitLogOffset 找到原来的半消息:
@Override
public OperationResult commitMessage(EndTransactionRequestHeader requestHeader) {
return getHalfMessageByOffset(requestHeader.getCommitLogOffset());
}
@Override
public OperationResult rollbackMessage(EndTransactionRequestHeader requestHeader) {
return getHalfMessageByOffset(requestHeader.getCommitLogOffset());
}
private OperationResult getHalfMessageByOffset(long commitLogOffset) {
OperationResult response = new OperationResult();
// 根据 commitLogOffset 拿到半消息
MessageExt messageExt = this.transactionalMessageBridge.lookMessageByOffset(commitLogOffset);
if (messageExt != null) {
response.setPrepareMessage(messageExt);
response.setResponseCode(ResponseCode.SUCCESS);
} else {
response.setResponseCode(ResponseCode.SYSTEM_ERROR);
response.setResponseRemark("Find prepared transaction message failed");
}
return response;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
小结:
- Commit:从半消息恢复真实 Topic → 写入正常队列 → 删除半消息
- Rollback:只删除半消息,不投递
这一步完全是 “消息事务”层面的提交与回滚,不关心 DB。
# 5.3 事务回查源码流程
当 Producer 在执行本地事务时返回 UNKNOW,或者写半消息成功后迟迟没有 commit/rollback,Broker 就会通过定时任务发起回查。
# 5.3.1 Broker 启动事务回查定时任务
Broker 初始化事务相关组件:
private void initialTransaction() {
this.transactionalMessageService =
new TransactionalMessageServiceImpl(
new TransactionalMessageBridge(this, this.getMessageStore()));
this.transactionalMessageCheckListener =
new DefaultTransactionalMessageCheckListener();
this.transactionalMessageCheckListener.setBrokerController(this);
// 事务回查定时任务(默认 60s)
this.transactionalMessageCheckService =
new TransactionalMessageCheckService(this);
}
2
3
4
5
6
7
8
9
10
11
12
主节点启动时真正开启回查线程:
private void startProcessorByHa(BrokerRole role) {
if (BrokerRole.SLAVE != role) {
if (this.transactionalMessageCheckService != null) {
this.transactionalMessageCheckService.start();
}
}
}
2
3
4
5
6
7
回查线程内部是一个定时循环:
@Override
public void run() {
long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
while (!this.isStopped()) {
this.waitForRunning(checkInterval);
// 核心:check()
}
}
2
3
4
5
6
7
8
TransactionalMessageServiceImpl.check() 中会遍历半消息队列,找出需要回查的消息:
if (isNeedCheck) {
if (!putBackHalfMsgQueue(msgExt, i)) {
continue;
}
// 这里触发回查
listener.resolveHalfMsg(msgExt);
}
2
3
4
5
6
7
# 5.3.2 Broker → Producer:发送 CHECK_TRANSACTION_STATE
AbstractTransactionalMessageCheckListener.resolveHalfMsg:
public void resolveHalfMsg(final MessageExt msgExt) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
// 发送事务回查请求到 Producer
sendCheckMessage(msgExt);
} catch (Exception e) {
LOGGER.error("Send check message error!", e);
}
}
});
}
2
3
4
5
6
7
8
9
10
11
12
13
sendCheckMessage 组装请求头:
public void sendCheckMessage(MessageExt msgExt) throws Exception {
CheckTransactionStateRequestHeader header = new CheckTransactionStateRequestHeader();
header.setCommitLogOffset(msgExt.getCommitLogOffset());
header.setOffsetMsgId(msgExt.getMsgId());
header.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
header.setTransactionId(header.getMsgId());
header.setTranStateTableOffset(msgExt.getQueueOffset());
// 还原真实 Topic 和 QueueId,便于业务识别
msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
msgExt.setStoreSize(0);
String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);
if (channel != null) {
brokerController.getBroker2Client()
.checkProducerTransactionState(groupId, channel, header, msgExt);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Broker2Client.checkProducerTransactionState 最终发一个 CHECK_TRANSACTION_STATE 请求给 Producer:
public void checkProducerTransactionState(
final String group,
final Channel channel,
final CheckTransactionStateRequestHeader header,
final MessageExt messageExt) throws Exception {
RemotingCommand request =
RemotingCommand.createRequestCommand(
RequestCode.CHECK_TRANSACTION_STATE, header);
request.setBody(MessageDecoder.encode(messageExt, false));
this.brokerController.getRemotingServer()
.invokeOneway(channel, request, 10);
}
2
3
4
5
6
7
8
9
10
11
12
13
# 5.3.3 Producer :接收回查 & 回应状态
Producer 作为“Server 端”接受回查请求,在 MQClientAPIImpl 构造时注册处理器:
this.remotingClient.registerProcessor(
RequestCode.CHECK_TRANSACTION_STATE,
this.clientRemotingProcessor, null);
2
3
ClientRemotingProcessor.processRequest 中分发到 checkTransactionState:
case RequestCode.CHECK_TRANSACTION_STATE:
return this.checkTransactionState(ctx, request);
2
内部会调用到 DefaultMQProducerImpl.checkTransactionState:
@Override
public void checkTransactionState(final String addr,
final MessageExt msg,
final CheckTransactionStateRequestHeader header) {
Runnable request = new Runnable() {
@Override
public void run() {
TransactionListener listener = getCheckListener();
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable exception = null;
try {
// === 回调业务侧的事务回查逻辑 ===
localTransactionState = listener.checkLocalTransaction(msg);
} catch (Throwable e) {
exception = e;
}
// 把回查结果再通过 endTransaction 通知 Broker
processTransactionState(localTransactionState,
defaultMQProducer.getProducerGroup(),
exception);
}
};
// 放进线程池执行
this.checkExecutor.submit(request);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
TransactionListenerImpl.checkLocalTransaction 就是业务回查代码,例如:
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 根据 msg.getKeys() / transactionId 去查 DB
boolean ok = checkFromDB(msg.getKeys());
return ok ? LocalTransactionState.COMMIT_MESSAGE
: LocalTransactionState.ROLLBACK_MESSAGE;
}
2
3
4
5
6
7
processTransactionState 最终会再次调用 endTransactionOneway 把结果发回 Broker(和 5.2 完全同一套逻辑)。
# 5.3.4 回查的本质
- 回查由 Broker 定时发起:Broker 会定期扫描未提交/回滚的半消息,并主动向 Producer 发起回查请求。
- Producer 收到回查后只做一件事:执行你实现的
checkLocalTransaction方法,根据业务状态返回 提交 / 回滚 / 未知。 - Broker 根据返回结果处理半消息:
COMMIT→ 恢复为真实消息并投递ROLLBACK→ 删除半消息UNKNOW→ 下次继续回查