探秘RocketMQ源码【1】——Producer视角看事务消息

前言

Apache RocketMQ作为广为人知的开源消息中间件,诞生于阿里巴巴,于2016年捐赠给了Apache。从RocketMQ 4.0到如今最新的v@ x S C T / $4.7.1,不论是在阿里巴巴内部还是外部社区,都赢得了广泛的关注和好评。
出于兴趣和工作的需要,近期本人对RocketT * k # e = r } nMQ 4.7.1的部分代码进行了研读,其间产生了很多困惑,也收获了更多的启发。

本文将站在发送t n @方视角,通过阅读RocketMQ Producer源码,来分析在事务消息发送中RocketM 5 5 : + 7MQ是如何工作的。需要说明的是,本文所贴代k y | 4 f 4 U码,均来自40 o + i.7.1版本的Rocketz ? SMQ源码。本文中所讨论的发送,仅指从ProduA w ~ P t w 2cZ u x 8 n Her发送到Broker的过程,并不包含Broker将消息投递到ConsQ F dumer的过程。

宏观概览

RocketMQ事务消息发送流程a & r , 6 Y k

结合源码来看,RocketMQ的事务消息TransactionMQProducer的sendMessageInTransaction方法,实际调用了DefaultMQProducerImpl的sendMessageInTransaction方法。我们进入sendMessageInTransaction方法,整个事务消息的发送流程清晰可见:

首先,做发U 0 _ 1 E ; w 1 )送前检查,并填入必要参数,包括设prepare事务消息。

源码清单-1

public TransactionSendResult senh X g H :dMessageInTransaction(final Message msg,
final LocalTransactionExe+ E Ccuter localTransactionExecuter, final Object arg)
throws MQClientException {
TransactionListener transactionListener = getCheckListem D N : Lner();
if (null == localTransactionExecu R / i f ^ ~ter && null == transactionListener) {
throw new MQClie_ } { *ntException("tranExecutor is null", null);
}
// ignoA d / & f Ore DelayTimeLevel parameter
if (msg.getDelayTimeLevel() != 0) {
MessageAccesso6 4 & lr.clearProperty(msg, MessageCoE z L J @ = Dnst.h r v $ $ V W ^ LPROPERTY_DELAY_TIME_LEVEL);
}
Validators.chef b xckMessage(msg, this.defaultMQProducer);
SendResult sendResulL - E 6 e N } %t = null;
MessagQ J + T K 9eAccessor.putProperty(msg, MessageConst.PROPERT] 1 ( e QY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, Mo b g ` u r a EessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()H o 9 9 * j - y _);

进入发送处理流程:

源码清单-2

    t9 3 f ` v V |ry {8 [ ; A ?
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClie7 h a 5 + 0 T h _ntException("send message Exception", e);
}

根据broker返回的处理结果决策本地事务是否执行,半消息发送成功则开始本地事务执行:

源码清单-3

    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwablj ~ * 5 g {e localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult& % F 4.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendRes- + T q b M pult.getTransactionId());
}
String transactionId = msg.getProperty(MessagB | @ } XeConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null$  4 l l o $ n { != transactionId && !"".equals(transactionId))Z 3 f C {
msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeL8 G b W O JocalTransactionBranch(msg7 8 ; % y G !, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
localTrank ~ L E I z asactionState = transactionListener.executeLP P & * 1 0 3 pocalTransaction(msg, arg);
}
if (null == locab J b y % z F G 3lTransactionState) {
localTrans9 p g g K i ; Oai U v n C $ k J &ctionState = Lo) / ; ) ! GcalTr 2 D z VransactionState.UNKNOW;
}
ifn S = d (localTransac2 a ;tionState != Localq R ) b c _ KTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransG T = + Ea] ) M .ctionBranch return {}",Q A Y F localTransactionStag X ] + zte);
log.info(msg.toString()c { _ Z $ : D t );
}
} catch (Throwable e) {
log.info("executeLoR * o mco ] q c H 0alTransactionBranch exception", e);
log.info(msg.toString());
localExcepti9 3 9 Don = e;
}
}
break;
case FLU# V p * / ( 1 D KSH_DISK_TIME f ! 0 ~ u ! T ~OUT:
case F+ 5 ) ; ^LUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:  // 当备broker状态不! 8 _ i可用时,半消息要回滚,不执行本地事务
localf B : ) [ Y nTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}

本地事务执行结束,根据本地事务状态进行二T $ ? u !阶段处理:

源码清单-4

    t5 X S n T }ry {
this.endTransacS . :tion(senG 3 | 2 KdResult, localTran: | S ssa# [ U B & s O 7 @ctionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
// 组装发送结果
// ...
retur 3 } 1 q o ( * gn transacw o d & # [ 6tionSenv 9 h t A pdResult;
}

接下来,我们深入每个阶段代码分析。

l H 6 i l y $ @扒内幕

一阶段发送

重点分析send方法。进入send方法后,我们发现,RocketMQ的事务消息的一阶段,使G c +用了SYNC同步模式:

源码清单-5

public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingExce1 D $ v F +ption, MQBrokerException, InterruptedException {
return this.sendDefaultImpl(msg, Communicatioq P a }nMode.SYNC, null, timeout);
}

这一点很容S e c ? b { : I易理解,毕竟事务消息是要根据一阶段发送结果来决定要不要执m ( z p Z + v }行本地事务的,所以一定要阻塞等待broker的ack。

我们进入DefaultMQProducerImpl.java中去看sendDefaultImpl方法的实现,通过读这个方法的代码,来尝试了解在事务消息的一阶段发送过程中producer的行为。 值得注意的是,这个方法并非为事务消息定制,甚至不是为SYS C _ # kNC同步模式定制的,因此读懂了这段代码,基本可以对RocketR g F $MQ的消息发送机制有了一个较为全面的认识。
这段代码逻辑非常通畅,不忍切片。为了节省篇幅,将代码中较为繁杂但信息量不大的部分以注释代替,尽可能保留流程的完整性。个人认为较为重要或是容易被忽略的部分,以注释标出,后文还有部分细节的详细解读。

源码清单-6

private SendResult se& C l , /ndDefaul! 2 $ :tImpl(
Mes@ 2 K Dsage msg,
final Communic[ A ^ 6 _ationModf Y Ke communi* U x M u q bcationMode,
final SendCp | 5 ? _ oallback sendCallG y i W r xback,
final long timeoutJ ( + r
) throws MQClientException, RemotingException, MQBrokerException, InterrupteS 1 fdException {
this.makeSureStateOK();
// 一、消息有效性校验。见后文
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();D E H O
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampM * 1 _ d SPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
// 获取当前topic的发送路由信息,主要是要broker,如果没找到则从namesrv获取
TopicPublishInfo topicPublishInfo = this.Q o , ] # 5tryToFindTopicPublishInfo(msg.getTe E w & m e e Topic())K J 3 t b { T g;
if (topic . ) fPublishInfo != null && topicPublishInfo.ok()) {
boolean caf N b 1 Y D &llTimeout = false;
MessageQueue mq = null;
Except h * M . , Ttion exception = null;
Se6 d PndResult sendResult = null;
/b s K b u/ 二、发送重试机制。见后文Y i Z ;
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.r w g q F b M & xdefaultMQProducer.getRetryTiB 3 w bmesWhenSendFailed() : 1m h 9 L { ;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times <~ N 0 v g 3 . C P timesTotal; times++) {
// 第一次发c C } 8 ,送是mq == nuW = @ . E jll, 之后都是有broker信息的
String lastBrokerName = null == mq ? null : mq.getBr = P p crokerNS  = _  e Vame();
// 三、W m ^ w rrocx + V J m - o !ketmq发送消息时如何选择队列?——broker异常规避机制
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSel= ; Q Kected != null) {
mq = mqSelectg c Eed;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
if (times > 0) {
//Reset topic with namespace during rese: 7 l 3 Ond.
msg.setTopic(thiD ^ A u J 0 gs.defa? Q G sultMQProducer.withNamespace(msg.geT M 2 e P 1 r B |tTopic()));
}
long c) . C q m _ostTime = beginTimestampP% t @ orev - beginTimestampFirst;
if (timeout < costTime) {
callTimeout = true;
break;
}
// 发送核心代码
sendResult = this.sendKernelImpl(msg, mq, coR ! fmmunicationMode, sendCallback, topicPublishInfo, timeout - costT7 g 3 Wime);
endTimestamp = System.currentTimeMil[ ^ ,lis();
// rock~ . , + i { p W @etmq 选择 broker 时的规避机制,开启 sendLatencyFaultEnaJ U M m u J Kble == true 才生效
this.updateFaultItem(mq.D . S r * _ | pgetBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
// 四、RocketMQ的三种Commu1 ? S M w Z B 0niy ] _ F _ ~ X I &cationMode。见后文
case ASYNC: // 异步模式
rm ] Y Keturn null;
case ONEWAY: // 单向模式
return null;
case SYNC: // 同步模式
if (3 r ! | n vsendResult.getSj d M # tendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.iso ( * f ` g @ ; XRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResX C T 0 . ` ?ult;
default:
break;
}
} catch (Remo: _ V 9tingException e) {
// ...
// 自动重试
} catch (MQL d l B d |ClientException e) {
// ...
// 自动重试
}C 9 6 R catch (% 9 k b  r s kMQBrokerException e) {
// ...
// 仅返回码==NOT_IN_CURRENT_L c 6UNIT==205 时自动重试
// 其他情况不重试,抛异常
} ca} R D H m % u -tch (InterruptedException e) {
// ...
// 不重试,{ x ~抛异常
}
} else {
break;
}
}
if (sendResult != null) {
return sendResult;
}
// 组装返回的info信息,最后Y t % 9 F l M ~以MQClid m .  ` _ % | AentExcep~  ^ ~ R 2 4tion抛出
// ... ...
// 超时场景抛RemotingTooMuchRequestException
if (callTimeout) {
throw new RemotingTooMuchRequestExc$ ^ v , Leption("sS L O V f R | IendDefaultImpl call timeout");
}
// 填充MQClientException异常信息
// ...
}
validateNameServerSetting();
thr- Q Q Q H y / t :ow new MQClientException("Ni S @o route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}

一、消息有效性校- Y B _ j R o P f验:

源码清单-7

 Validators.checkM0 5 r X 9 ? nessage(msg, this.defaultMQProducer);

在此方法中校验消息的有效性,包括对w Q Ytopic和消息_ n X w t ! V G q体的校验。topicE d d g j V的命名必须符合规范,且避免e ! v L ?使用内置的系统消息Z 3 U % s Y `TOPICa Z M _ 2 * 6 R。消息体长度 > 0 &&P c z y s; 消息体长度 <= 1024*1024*4 = 4M 。

源码清单-8

p& P T G J L l +ublic static void chec* _ Z s ! y } !kMessage(Message msg, DefaultMQProducer defaultMQProducer)
throws MQClientException {
if (null == msg) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
}
// topic
Validators.checkTopic(msg.getTopic());
Validators.isNotAllowedSendTopic(msg.getTopic());
// body
if (null ==X : 9 . msg.g9 O JetBody()) {
throw new MQClie[ 2 rntException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
}
if (0 =T & 6  8= msg.getBody().length) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
}
if (msg.getBod| = Y 2  R W b -y().leng= B L C ] L # A sth > defaultMQProducer.getMaxMessagq ] ( R V %eSize()L O R) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
"the message body size over max value, MAX: " +u 5 - Z P defaultMQProducer.getMaxMessageSize());
}
}

二、发送重试机制

Producer在消息发送不成功时,会自p ! H动重试,最多发送次数 = retryTimesWhenSendFa- H J b 7 5 U Giled + 1 = 3次 。

值得注意的是,并非所有异常情况都会重试,从以上源码中可以提取到的信息告诉我们,在以下三种情况下,会自动重试:
1)发生RemotingException,MQClientExcepti{ j _on两 i Y & O M ) A种异常之一时
2)发生MQf m O Z 6 S JBrR O u 2 LokerException异常,且ResponseCode是NOT_IN_CURRENT_UNIT = 205时
3)SYNC模式下,未发生异常且发送结果状态非 SEU w MND_OK

在每次发送消息之前,会先检查是否在前面这两步就已经耗时超长(超时时长默认3000ms),若是,则不再继续发送并且直接返回超时,不再重试。这里说明了2个w R O 0 r & j &问题:
1)producer内部自动重U [ o试对业务应用而言是无感知5 } Z的,应用看到的发送耗时是包含所有重试/ l 8 % P的耗时在内的;
2)一旦超时意味着本次消息发送已经以失败告终,. J U x q原因是超时。这个信息最后会以RemotingTooMuchRequestException的形式抛出。K W - Z

这里需要指出的4 i 7 Z S r f f是,在RocketMQ官方文档中指出,发送超时时长是10s,即10000ms,网上许多人对rocketMQ的超时时间解读也认为是10s。然p q W A $ ! /而代码中却明明白白写着3000ms,最终我debug之: Q 8 e U后确认,默认超时时间确实是3000ms。这里也建议RocketMQ团队对文档进行确认,如确有误,还是早日更正为好。
探秘RocketMQ源码【1】——Producer视角看事务消息

三、broker的异常7 a - ` ` Q #规避机制

源码清单-8

MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, las2 P rtBrokerName);  

这行代码是发送9 ; U a e S 9 `前选择queue的过程。

这里涉及RocketMQ消息发送高可用的的一个@ * b核心机制,latencyFaultTolerance。这个机制是9 ; Prq % Coducer负载均衡的一部分,通过sendLatencyFaulC $ : H N | 8 J utEnable的值来控制,默认是false关闭状态,不启动broker故障延迟机制,值为true时启用broker故障延迟机制,可由Producer主动打开。

选择队列时,开启异常规避机制,则根据broker的工作状态避免选择当前状态不佳的broker代理,不健康的br5 & & I aoker会在一段时6 K u a l P q 4 3间内被规避,不开启异常规避机制时,则按顺序选取下一个队列,但在重试场景下会尽量选择不同于上次发送broker的queue。每次消息发送都会通过updateFaultItem方法来维护broker的状态Z ? 3 d .信息。

源码清单-9

public void uk C h ZpdateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) {
// 计算延迟v % S x多久,isolation表示是D O J o s ` D )否需要隔离该broker,若是,则从30s往前找第一个比30s小的延迟值,再按下标判断规避的周期,若30s,则是10min规避;
// 否则,按上一次发送耗时来决定规避时长;
long duration = computeNotA; ] 7vailableDuration(isolation ? 30000 : currentLatency);
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency,` y z duration);
}
}  

深入到selectOneMessageQueue方法内} 2 % ! -部一探究竟:d ! ; $ i g

源码清单-10

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, fiO & ) s n onal String lastBrokerName) {
if (this.sendLatencyFaultEnable)R _ G J _ V {
// 开启异常规避
try {
i_ t ^ C w f J Hnt index = tpInfo.getE O 0 D QSendWhichQueue().getAnq o 0dIncrement();
for (int i = 0; i < tpInfo.getMessageQu@ T K * 6 g G PeueList().size(); i++) {n 2 9
int pos = Math.ab4 t T us(index++) % tpInfo.getMessager = G O V SQueueList().size();
if (pos < 0)
pos = 0;
// 按顺序取下一个message queue作为发送M I ~ } [的queue
MessageQueue mq = tpInfo.getMR ( t 9 F , oessageQueueList().get(pos);
// 当前queue所在的broke[ _ Xr可2 r # + = 1 # : ^用,且与上一个queue的broker相同,
// 或者第一次发送,则使用这个queue
if (latN 7 = K &encyFaultTolerance.isAvj * 8ailable(mq.getBrokerName())) {
if6  a t x g % 2 ] (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
final String nA z [ ? l N Q ! ZotBestBroker = latencyFaultToleranc_ ? 6 k - e v { Ve.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBh 1 H 3 v 4 groker(notBestBroke- ^ S | ; lr);
if (writeQueueNums > 0) {
fik Z C e I -nal MR @ { - oessage^ ; ` x 9 d # P ~Queue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.s) d & _ letBrokerName(notBestBroke_ U 2 O { { Q xr);
mq.B ~ s 5 J R l XsetQueueId(tpInfo.getSendWhichQuC ( z c 1eue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error on % A w } ! , {ccurred when selecting messn ) B *age queue"G  + A . c, e);
}Q U  6 t ( {
return tpInfo.selectOneMessageQueue();
}
// 不开启异常规避,则随机自增选择Queue
return tpInfo.selectOneMh ( S m [ / r { sessage^ i Y gQueue(lastBrokerName);
}

四、RocketMQ的三种CommunicationMode:

源码: t ~ :清单-11

 public enum CommunicationMode {
SYNC,
ASYE { i { $ / BNC,
ONEWAY,
}

以上三种模式指的都是消息从发送方到达broker的阶段,不包含broker将消息投递给订阅方的过程V m 6 l w 2 2 ; 3
三种模式的发送p _ o方式的差异:

  • 单向模式:ONEWAY。消息发送方只管发送,并不关心broker处理的结果如何。这种模式下,由于处理流程少,发送耗时非常小,吞吐量大,但不能保证消息可靠不丢,常用于流量巨大但不重要的消息场景,例如心跳发送等。
  • 异步模式:ASYNC。消息发送方N % X / 9 A发送消息到broker后,无需等待broker处理,拿到的是null的返回值,而由一个异步的线程来做消息处理,处理完成后以回调的形式告诉发送方发送结W ] d e z H A 2 (果。异步处理时如有异常,返回发送方失败结果之前,. 6 ) f X ~ ( 1会经过内部重- g W U y试(默认3次,发送方不感知)。这种模式下,发送方等待时长较小,吞吐量较大,消息可靠,用于流量大但重要的消息场景。
  • 同步模式:SYNC。消息发送方需等待broker处` ~ I [ + s理完成并@ m % N F明确返回成功或失败,在消息发送方拿到消息发送失败的结果P E I } b之前,也u d ! . v v C会经历过内部重试(默认3次,发送方不感知)。这种模式下,发送方会阻塞等待消息处理结果,等待时长较长,消息可靠,用于流量不大但重要的消息场景。需要强调的是,事务消息的一阶段半事务消息的处理是同步模式。

在sendKernelImpl1 i u O方法中也可以看到具体的实现差异。ONEI I 0 rWAY模式最为简单,不做任何处理。负责发送的sendMessage方H p q E / Q法参数中,相比^ G f )同步模式,异步模式多了回调方0 m 9 o K q法、包含topic发送路由元信息的topicPublishInfo、包含发送broker信息的instance、包含发送队列信息的producS & ] F Z : ) ;er、重试次L V 5 T l v v I M数。另外,异步模式下,会对有压缩的消息先做copT I , s Xy。

源码清单-12

    switch (communicationMode) {
cas1 b T  O v _e ASYNC:
Message tmpMessage = msg;
boolean messageCloned = false;
if (mt f c / OsgBodyCompressed) {
//If msgb g L  body was co% 6 d c ^ Z ` Fmprej 8 k (ssed, msgbody should be reset using prevBody.
//Clone new message using commpresser q u k g , zd message body an5 Z f C ^ ` (d recover origin massage.
//Fix bug:https@ } | -://V 3 0 P _ { agithub.com/apache/rocketmq-externE A 7 A `als/issues/66
tmpMessage = Mee % 2 { )ssageAccessor.cloneMessage(msg);
messageCloned =W 0 a n / w true;
msg.setB[ r ? / R % lody(prevBody);
}
if (topW ? A #icWithNae 7 j H r # Zmet v f sspace) {
i& P ] @f (!messageCloned) {
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
}
msg.setx z ?Topic(NamespaceUp 2 H 1 M U k ~til.withoutNamH 9  aespace(msg.getTopic(), this.defaultMQProducer.g- e h p O h { netNamespace()));
}
long costTimeAsync = System.currentTimeMillis() - beginStartTimet ) b B | M @ y J;
if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().s$ ( $ m /endMessage(
brokerAddr,
mq.getBrokerName(),
tmpMessage,
requestHeader,| { g J
timeout - costTimeAsync,
commZ t +unicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFaT _ d nctory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
case ONEWAY:
case SYNC:
long costTimeSync = System.c^ ! @ yurrentTimeMillis() - beginSI  y WtartTime;; m ; p y S Y
if (timeout < costTimeSS a |ync) {
th{ 0 e & Wrow new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
senl 8 G d :dResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.W @ ? 1 + egetBrokerName(),
msg,
requestHD V r B 4 [ ? 7eader,
tim& h Y N F X / k *eF 7 Z u 2out - costTimeSync,
communicationMode,
context,
this);
b1 S Y .reak;
default:
assert false;
break;
} 

官方文档中有这样一张图,十分清晰的描述了异步通信的详细过程:
探秘RocketMQ源码【1】——Producer视角看事务消息

二阶段发送

源码清单-3体现了本地事务的执行,localTransactionSt8 Y !ate将本地事务执行结果与事务消息二阶段的发送关联起来。
值得注意的是,如z ? s =果一阶段的发送结果是SLAVE_NOT_AVAILABLE,即备broker不可用时,也会将localTransactionState置为Rollback,此时将不会执行本地事务。之后由endTransaction方法负责二阶段提交,见源码清单-4。具体到endTrans} a ( r %action的实现:

源码清单-13

public void endTransaction(
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwabl_ L Z s 1e localException) throws RemotingExch u p U m { 6 ` ueption, MQBrokerExce] _ B M }ption, Intr ; ` ) 7erruptedException, UnknownHostException {
final MW z P cessag b U r y aeId id;
if (sendResult.getOffsetMsgId() != null) {
id = Messr 3 z FageDecoder.decodeMessageId(sendResult.getOfA n R @ X efsetMsgId());
} else {
id = Mz L aessageDecoder.decodeMess{ ; 2 N ( & N uageId(| 5 }sendResul4 M ! O v { Ft.getMq o a @ -sgId());
}
String transactionId = sendRW 9 v ) ! u k 0 Yesult.getTransactionId();
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResum T * A jlt.getMessageQueue().getBroY ~ ( : h ) h j 3kerName()n H ) F ~ R);
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setComm9 m k qitLogOffset(id.U f egetOffset());
s9 | # q M p @ i vwitch (localTransactionState) {
casek j _ g COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRAM . #NSAO / o u ~CTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommig 5 e FtOrRollbaM O R Eck(MessageSysFlag.TRANSACTION_ROLLBACK_TYP+ Q j mE);
break;
case UNKNOW:
requestHep . O @ Q .ader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
requestHeader.setProducerGroup(this.defaultMD G B @ R E r 2 pQProducer.getProduce } ~rGroup());f f R @ 8 A d U W
requeS s G M # pstHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeadd : $er.setMsgI2 A D C ed(sendResult.getMsgId());
String remark = localE= K /xception != null ? ("exW H p [ 0 K O B oecuteLocalTransactionBranch exception: " + localExcepW 0 T C @ c ^ Ftion.toString()) : null;
//Q l _ A p ( ; 5 采用oneway的方式发送二阶段消息
this.mQClientFactory.getMQClientAPIIms 9 I ? kpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
}

在二阶段发送时,之所以用oneway的方式发送,个人理解这正是因为事务消息有一个特殊的可靠机制——回查。

消息回查

当Broker经过了一个特定的时间,发现依然没有得到事务消息的二阶段是否要提交或者回滚的确切@ E J信息,Brom + n @ zker不知道Producer发生了什么情况(可能producer挂了 ) $ b (,也可能producer发了commit但网络抖动丢了,也可能...),于是主动发起回查。
事务消息的回查机制,更多的是在broker端的体现。RocketMQ的broker以Half消息、Op消息、真M * Y 5 _ d ! 8实消息三个不同的topic来将不同发送阶段的事务消息进行了隔离,使得Consumer只能看到最终确认commit需要投递出去的消息。其中详细的实现逻辑在本文中暂不多d u r &赘述,后续可q H X w , j另开一篇专门来从Broker视角来解读。

回到Producer的视角,当收到了Broker的# m : ; _ l k 回查请求,Producer将根据消息检查本地事务状态,根据结果决定提交或回滚,M ^ ] 1这就要求Producer必须指定回查& % u & = . p W实现,以备不时之需。
当然,正常情况下,并不推荐主动发送UNKNOp + 6 iW状态,这个状态毫无疑) ~ *问会给broker带来D 6 Q i 6 8 M额外回查开销,只在出O 7 ] u现不可预知的异常情况时才启动回查机制,是一种比较合理的选择。

另外,4.7.1版本的事8 1 z & V 4 p务回查并非无限回查,而是最多回查15次:

源码清单-14

/**
* The maximum number of times the message was checked, if exceed this value, this message will be discaS G o / P O Brded.
*/
@ImportantField
private int transactionCheckMax = 15;

附录

官方给出Producer的默认参数如下(其中超时& F ] ( /时长的参数,在前文中也已经提到,debug的结果是默认3000ms,并非10000ms):
探秘RocketMQ源码【1】——Producer视角看事务消息

RocketMQ作为一款优秀Q _ [的开源消息中间件,有很多开发者基D z $ u G H w Q于它做了二次开发,例如蚂蚁集团商业化产品SOFAStack MQ消息队列,就是基于RocketMz z p w q JQ内核进行的再次开发的金融级消息中间件,在消息管控、透明运维等方面做了大量优秀的工作。
愿RocketMQ在社区广大开发者的共创共建之下,能够不断发展壮大,迸发更强的生命力。