日韩成人免费在线_国产成人一二_精品国产免费人成电影在线观..._日本一区二区三区久久久久久久久不

當前位置:首頁 > 科技  > 軟件

分布式場景下的事務(wù)機制

來源: 責編: 時間:2023-12-26 09:29:25 328觀看
導(dǎo)讀事務(wù)消息是RocketMQ的一個非常特色的高級特性,它的基礎(chǔ)訴求是通過RocketMQ的事務(wù)機制,來保證上下游的數(shù)據(jù)?致性。我們在單機版本下面只需要在業(yè)務(wù)方法上加上對應(yīng)的事務(wù)就可以達到效果,但是分布式的場景下,多個系統(tǒng)之間的

事務(wù)消息是RocketMQ的一個非常特色的高級特性,它的基礎(chǔ)訴求是通過RocketMQ的事務(wù)機制,來保證上下游的數(shù)據(jù)?致性。spX28資訊網(wǎng)——每日最新資訊28at.com

我們在單機版本下面只需要在業(yè)務(wù)方法上加上對應(yīng)的事務(wù)就可以達到效果,但是分布式的場景下,多個系統(tǒng)之間的協(xié)調(diào)配合,你無法知道到底是那個先執(zhí)行那個后執(zhí)行,當然在微服務(wù)里面存在Seate框架來保證事務(wù),但是這事務(wù)的保證始終是心頭大患,只能用一句話形容魚和熊掌不可兼得。spX28資訊網(wǎng)——每日最新資訊28at.com

而RocketMq的事務(wù)消息能夠在提升性能的情況下滿足要求,其主要實現(xiàn)是支持分布式情況下保障消息生產(chǎn)和本地事務(wù)的最終一致性,消息生產(chǎn)我們可以使用順序消息去執(zhí)行,這樣我們只需要滿足這兩個的事務(wù)即可。spX28資訊網(wǎng)——每日最新資訊28at.com

spX28資訊網(wǎng)——每日最新資訊28at.com

spX28資訊網(wǎng)——每日最新資訊28at.com

spX28資訊網(wǎng)——每日最新資訊28at.com

 實現(xiàn)過程

spX28資訊網(wǎng)——每日最新資訊28at.com

圖片圖片spX28資訊網(wǎng)——每日最新資訊28at.com

準備階段:生產(chǎn)者將消息發(fā)送到Broker,Broker向生產(chǎn)者發(fā)送ack表示消息發(fā)送成功,但是此時的消息為一個等待狀態(tài),不會被消費者去消費。(生產(chǎn)者繼續(xù)執(zhí)行接下來的代碼)spX28資訊網(wǎng)——每日最新資訊28at.com

確認階段:當我們執(zhí)行完所有的代碼后,本地事務(wù)要么回滾要么提交,此時當我們了解本地事務(wù)的狀態(tài)后,將結(jié)果推送給Broker做二次確認結(jié)果,如果為Commit則將修改激活準備推送給消費者,如果為Rollback則將消息進行回滾。spX28資訊網(wǎng)——每日最新資訊28at.com

補償機制:當出現(xiàn)異常情況沒有發(fā)生二次確認,此時我們在固定時間后將會進行回查,檢查回查消息對應(yīng)的本地事務(wù)的狀態(tài),重寫Commit或者Rollback。spX28資訊網(wǎng)——每日最新資訊28at.com

spX28資訊網(wǎng)——每日最新資訊28at.com

spX28資訊網(wǎng)——每日最新資訊28at.com

 涉及狀態(tài)以及注意點

spX28資訊網(wǎng)——每日最新資訊28at.com

事務(wù)消息存在三種狀態(tài):spX28資訊網(wǎng)——每日最新資訊28at.com

CommitTransaction:提交事務(wù)狀態(tài),此狀態(tài)下允許消費者消費。spX28資訊網(wǎng)——每日最新資訊28at.com

RollbackTransaction:回滾事務(wù)狀態(tài),此狀態(tài)下消息會被刪除。spX28資訊網(wǎng)——每日最新資訊28at.com

Unknown:中間狀態(tài),此狀態(tài)下會等待本地事務(wù)處理結(jié)果進行對應(yīng)操作。spX28資訊網(wǎng)——每日最新資訊28at.com

注意點:spX28資訊網(wǎng)——每日最新資訊28at.com

本消息狀態(tài)是一種對消費者不可見的狀態(tài),將消息的內(nèi)容放到系統(tǒng)Topic的RMQ_SYS_TRANS_HALF_TOPIC隊列里面去。spX28資訊網(wǎng)——每日最新資訊28at.com

事務(wù)消息中的相關(guān)參數(shù)可以進行設(shè)置,比如:本地事務(wù)回查次數(shù)transactionCheckMax默認15次,本地事務(wù)回查的間隙transactionCheckInterval默認60s,超出后會直接將消息丟棄。spX28資訊網(wǎng)——每日最新資訊28at.com

RocketMQ的事務(wù)消息是指應(yīng)用本地事務(wù)和發(fā)送消息操作可以定義到全局事務(wù)中,要么同時成功,要么同時失敗,通過RocketMQ的事務(wù)信息可以實現(xiàn)可靠消息的最終一致性方案。spX28資訊網(wǎng)——每日最新資訊28at.com

spX28資訊網(wǎng)——每日最新資訊28at.com

spX28資訊網(wǎng)——每日最新資訊28at.com

 源碼解析

spX28資訊網(wǎng)——每日最新資訊28at.com

Producer端通過構(gòu)建TransactionMQProducer對象綁定事務(wù)監(jiān)聽。spX28資訊網(wǎng)——每日最新資訊28at.com

TransactionListener transactionListener = new TransactionListener() {    @Override    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {        return LocalTransactionState.COMMIT_MESSAGE;    }    @Override    public LocalTransactionState checkLocalTransaction(MessageExt msg) {        return LocalTransactionState.COMMIT_MESSAGE;    }};TransactionMQProducer producer = new TransactionMQProducer(producerGroupTemp);producer.setTransactionListener(transactionListener);producer.setNamesrvAddr("127.0.0.1:9876");product.start();SendResult result = producer.sendMessageInTransaction(message, arg);

執(zhí)行sendMessageInTransaction方法來發(fā)送消息。spX28資訊網(wǎng)——每日最新資訊28at.com

public TransactionSendResult sendMessageInTransaction(final Message msg,    final LocalTransactionExecuter localTransactionExecuter, final Object arg)    throws MQClientException {  // 檢查TransactionListener是否存在,如果不存在就直接拋異常    TransactionListener transactionListener = getCheckListener();    if (null == localTransactionExecuter && null == transactionListener) {        throw new MQClientException("tranExecutor is null", null);    }    // 事務(wù)消息不支持延遲等特性    if (msg.getDelayTimeLevel() != 0) {        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);    }    Validators.checkMessage(msg, this.defaultMQProducer);    SendResult sendResult = null;    // 設(shè)置half屬性,表明是事務(wù)屬性    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");    // 設(shè)置所屬生成者組    // broker向生產(chǎn)者發(fā)送回查事務(wù)請求根據(jù)這個producergroup找到指定的channel    // 生產(chǎn)者能找到所有在同一個組的機器實例從而檢查事務(wù)狀態(tài)    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());    try {        // 同步發(fā)送        sendResult = this.send(msg);    } catch (Exception e) {        throw new MQClientException("send message Exception", e);    }    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;    Throwable localException = null;    // 消息返回信息    switch (sendResult.getSendStatus()) {            // 第一階段消息發(fā)送成功        case SEND_OK: {            try {                if (sendResult.getTransactionId() != null) {                    // 設(shè)置事務(wù)ID屬性                    msg.putUserProperty("__transactionId__", sendResult.getTransactionId());                }                                String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);                if (null != transactionId && !"".equals(transactionId)) {                    msg.setTransactionId(transactionId);                }                if (null != localTransactionExecuter) {                    // 執(zhí)行本地事務(wù)                    localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);                } else if (transactionListener != null) {                    log.debug("Used new transaction API");                    // 發(fā)送消息成功后,執(zhí)行本地操作                    localTransactionState = transactionListener.executeLocalTransaction(msg, arg);                }                if (null == localTransactionState) {                    localTransactionState = LocalTransactionState.UNKNOW;                }                if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {                    log.info("executeLocalTransactionBranch return {}", localTransactionState);                    log.info(msg.toString());                }            } catch (Throwable e) {                log.info("executeLocalTransactionBranch exception", e);                log.info(msg.toString());                localException = e;            }        }        break;        case FLUSH_DISK_TIMEOUT:        case FLUSH_SLAVE_TIMEOUT:        case SLAVE_NOT_AVAILABLE:            localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;            break;        default:            break;    }    try {        // 本地事務(wù)執(zhí)行完畢向broker提交事務(wù)或回滾事務(wù)        this.endTransaction(msg, sendResult, localTransactionState, localException);    } catch (Exception e) {        log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);    }    TransactionSendResult transactionSendResult = new TransactionSendResult();    transactionSendResult.setSendStatus(sendResult.getSendStatus());    transactionSendResult.setMessageQueue(sendResult.getMessageQueue());    transactionSendResult.setMsgId(sendResult.getMsgId());    transactionSendResult.setQueueOffset(sendResult.getQueueOffset());    transactionSendResult.setTransactionId(sendResult.getTransactionId());    transactionSendResult.setLocalTransactionState(localTransactionState);    return transactionSendResult;}

首先發(fā)送第一階段信息直接返回半提交狀態(tài),然后執(zhí)行本地事務(wù)返回事務(wù)的三種狀態(tài),未知,回滾,提交,最后執(zhí)行endTransaction方法,把事務(wù)執(zhí)行的狀態(tài)告訴broker。spX28資訊網(wǎng)——每日最新資訊28at.com

endTransaction方法

根據(jù)本地事務(wù)執(zhí)行狀態(tài)構(gòu)建requestHeader對象執(zhí)行二階段提交。spX28資訊網(wǎng)——每日最新資訊28at.com

public void endTransaction(    final Message msg,    final SendResult sendResult,    final LocalTransactionState localTransactionState,    final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {    final MessageId id;    // 獲取消息中的MessageId    if (sendResult.getOffsetMsgId() != null) {        id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());    } else {        id = MessageDecoder.decodeMessageId(sendResult.getMsgId());    }    String transactionId = sendResult.getTransactionId();    // 找到broker地址    final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());    // 構(gòu)建EndTransactionRequestHeader對象    EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();    requestHeader.setTransactionId(transactionId);    // offset是prepare消息中offsetMsgId中獲取的    requestHeader.setCommitLogOffset(id.getOffset());    requestHeader.setBname(sendResult.getMessageQueue().getBrokerName());    // 社會提交/回滾狀態(tài)    switch (localTransactionState) {        case COMMIT_MESSAGE:            // 提交            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);            break;        case ROLLBACK_MESSAGE:            // 回滾            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);            break;        case UNKNOW:            // 未知            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);            break;        default:            break;    }    doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());    requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());    requestHeader.setMsgId(sendResult.getMsgId());    String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;    // 發(fā)送給broker端    this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,        this.defaultMQProducer.getSendMsgTimeout());}

將本地方法執(zhí)行事務(wù)的結(jié)果發(fā)送給Broker,通過endTransactionOneway方法創(chuàng)建Code為END_TRANSACTION的消息,然后在Broker就會找出對應(yīng)的Processor來處理。spX28資訊網(wǎng)——每日最新資訊28at.com

    Broker端處理     

Broker總共存在兩個處理,首先針對第一個階段發(fā)送的Half消息,broker要進行相關(guān)的操作,后面endTransaction提交進來的事務(wù)狀態(tài),針對三種狀態(tài)進行相關(guān)操作。spX28資訊網(wǎng)——每日最新資訊28at.com

接收第一階段發(fā)送的Half消息

SendMessageProcessor的sendMessage方法中去執(zhí)行處理事務(wù)消息。spX28資訊網(wǎng)——每日最新資訊28at.com

// 發(fā)送Half消息時,在屬性中設(shè)置了PROPERTY_TRANSACTION_PREPARED為true,這里根據(jù)這個屬性判斷是否是事務(wù)消息String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (Boolean.parseBoolean(traFlag)    && !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1    if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {        response.setCode(ResponseCode.NO_PERMISSION);        response.setRemark(            "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()                + "] sending transaction message is forbidden");        return response;    }    // 事務(wù)消息進入這里,把消息的topic改成RMQ_SYS_TRANS_HALF_TOPIC,以同步刷盤的方式存入store    putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);}

如果消息攜帶事務(wù)標記就去執(zhí)行TransactionMessageService類的prepareMessage方法進行相關(guān)的處理。spX28資訊網(wǎng)——每日最新資訊28at.com

// 解析Half消息private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {      // 把真實的topic和真實的queueId放在消息的屬性中     MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());     MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,         String.valueOf(msgInner.getQueueId()));     // 設(shè)置默認的事務(wù)狀態(tài)為TRANSACTION_NOT_TYPE=>unknow     msgInner.setSysFlag(         MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));     // 將消息的topic設(shè)置為RMQ_SYS_TRANS_HALF_TOPIC,這個是對消費者不可見的     msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());     // 設(shè)置queueId=0     msgInner.setQueueId(0);     msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));     return msgInner;}

進行topic的切換,將原來的topic存入到消息的屬性里面,將消息的topic設(shè)置為RMQ_SYS_TRANS_HALF_TOPIC。spX28資訊網(wǎng)——每日最新資訊28at.com

處理endTransaction方法

在endTransaction方法中將消息同步給Broker處理的Code對應(yīng)為END_TRANSACTION,Broker就會找出對應(yīng)的Processor來處理該類即調(diào)用EndTransactionProcessor類的processRequest方法處理。spX28資訊網(wǎng)——每日最新資訊28at.com

if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {    // 根據(jù)commitLogOffset獲取文件中的message,獲取到了返回success    result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);    if (result.getResponseCode() == ResponseCode.SUCCESS) {        // 檢查消息是否一致        RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);        if (res.getCode() == ResponseCode.SUCCESS) {            // 生成要保存的消息            MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());            msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));            msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());            msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());            msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());            MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);            // 把真實的topic消息存儲到CommitLog中            RemotingCommand sendResult = sendFinalMessage(msgInner);            if (sendResult.getCode() == ResponseCode.SUCCESS) {                // 移除prepare消息,存入opQueueMap中                this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());            }            return sendResult;        }        return res;    }    // 回滾} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {    // 查詢到half消息則返回成功    result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);    if (result.getResponseCode() == ResponseCode.SUCCESS) {        // 檢查消息是否一致        RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);        if (res.getCode() == ResponseCode.SUCCESS) {            // 移除prepare消息,存入opQueueMap中            this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());        }        return res;    }}

僅僅展示相關(guān)核心代碼,其主要邏輯:首先去判斷請求的方式是commit還是rollback,如果是commit查詢到消息還原消息原來的topic,然后刪除half topic上的消息轉(zhuǎn)存到opQueueMap中,如果是rollback直接進行刪除half topic上的消息并轉(zhuǎn)存到opQueueMap中去。spX28資訊網(wǎng)——每日最新資訊28at.com

注意:opQueueMap的引入為了解決有可能出現(xiàn)網(wǎng)絡(luò)、進程、線程等各種因素導(dǎo)致消費端未能成功處理消息的情況,該機制的作用是在消費者端將未成功處理的消息重新發(fā)送到服務(wù)端進行重試,直到確認消息已經(jīng)被成功處理或者達到最大重試次數(shù)后進行回滾操作。而 Op 消息本身則是通過修改消息狀態(tài)來實現(xiàn)的。spX28資訊網(wǎng)——每日最新資訊28at.com

消息回查

當網(wǎng)絡(luò)中斷或者響應(yīng)超時等各種異常信息導(dǎo)致消息并沒有傳送到broker端去,為了解決這一問題在Broker就開啟一個回查線程每隔一分鐘執(zhí)行一次處理超過6s未回查的消息,當超過15次回查后直接將消息丟棄。spX28資訊網(wǎng)——每日最新資訊28at.com

在啟動BrokerController類時,會去調(diào)用startProcessorByHa方法如果是Master節(jié)點就會去啟動一個線程每隔6s處理未回查的消息,檢查最大次數(shù)為15次。spX28資訊網(wǎng)——每日最新資訊28at.com

public void run() {    log.info("Start transaction check service thread!");    long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();    while (!this.isStopped()) {        this.waitForRunning(checkInterval);    }    log.info("End transaction check service thread!");}protected void onWaitEnd() {    long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();    int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();    long begin = System.currentTimeMillis();    log.info("Begin to check prepare message, begin time:{}", begin);    // 檢查回查消息 timeout = 6s checkMax=15    this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());    log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);}

在check方法里面去調(diào)用listener.resolveHalfMsg(msgExt)方法去處理事務(wù)消息。spX28資訊網(wǎng)——每日最新資訊28at.com

public void resolveHalfMsg(final MessageExt msgExt) {    executorService.execute(new Runnable() {        @Override        public void run() {            try {                sendCheckMessage(msgExt);            } catch (Exception e) {                LOGGER.error("Send check message error!", e);            }        }    });}

執(zhí)行sendCheckMessage方法發(fā)送一個檢查事務(wù)狀態(tài)的Code為CHECK_TRANSACTION_STATE的消息,在客戶端MQClientAPIImpl初始化的時候就會去注冊一個Code對應(yīng)的Processor,最終就會去執(zhí)行checkTransactionState方法,判斷本地事務(wù)的狀態(tài),然后再去執(zhí)行endTransactionOneway發(fā)起END_TRANSACTION處理。spX28資訊網(wǎng)——每日最新資訊28at.com

public void checkTransactionState(final String addr, final MessageExt msg,    final CheckTransactionStateRequestHeader header) {    Runnable request = new Runnable() {        private final String brokerAddr = addr;        private final MessageExt message = msg;        private final CheckTransactionStateRequestHeader checkRequestHeader = header;        private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();        // 執(zhí)行線程方法        @Override        public void run() {            TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();            TransactionListener transactionListener = getCheckListener();            if (transactionCheckListener != null || transactionListener != null) {                LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;                Throwable exception = null;                try {                    if (transactionCheckListener != null) {                        localTransactionState = transactionCheckListener.checkLocalTransactionState(message);                    } else if (transactionListener != null) {                        log.debug("Used new check API in transaction message");                        // 檢查本地事務(wù)                        localTransactionState = transactionListener.checkLocalTransaction(message);                    } else {                        log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);                    }                } catch (Throwable e) {                    log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);                    exception = e;                }                // 處理事務(wù)狀態(tài)                this.processTransactionState(                    localTransactionState,                    group,                    exception);            } else {                log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);            }        }      //         private void processTransactionState(            final LocalTransactionState localTransactionState,            final String producerGroup,            final Throwable exception) {            final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();            thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());            thisHeader.setProducerGroup(producerGroup);            thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());            thisHeader.setFromTransactionCheck(true);            thisHeader.setBname(checkRequestHeader.getBname());            String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);            if (uniqueKey == null) {                uniqueKey = message.getMsgId();            }            thisHeader.setMsgId(uniqueKey);            thisHeader.setTransactionId(checkRequestHeader.getTransactionId());            switch (localTransactionState) {                // 提交狀態(tài)                case COMMIT_MESSAGE:                    thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);                    break;                // 回滾狀態(tài)                case ROLLBACK_MESSAGE:                    thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);                    log.warn("when broker check, client rollback this transaction, {}", thisHeader);                    break;                // 未知狀態(tài)                case UNKNOW:                    thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);                    log.warn("when broker check, client does not know this transaction state, {}", thisHeader);                    break;                default:                    break;            }            String remark = null;            if (exception != null) {                remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);            }            doExecuteEndTransactionHook(msg, uniqueKey, brokerAddr, localTransactionState, true);            try {                // 再次執(zhí)行endTransactionOneway發(fā)起END_TRANSACTION                DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,                    3000);            } catch (Exception e) {                log.error("endTransactionOneway exception", e);            }        }    };    this.checkExecutor.submit(request);}

spX28資訊網(wǎng)——每日最新資訊28at.com

spX28資訊網(wǎng)——每日最新資訊28at.com

總結(jié)

spX28資訊網(wǎng)——每日最新資訊28at.com

首先客戶端Producer通過sendMessageInTransaction方法發(fā)送事務(wù)消息,Broker判斷是事務(wù)消息就將消息topic存入到RMQ_SYS_TRANS_HALF_TOPIC返回給客戶端,客戶端繼續(xù)執(zhí)行邏輯。spX28資訊網(wǎng)——每日最新資訊28at.com

然后調(diào)用endTransaction方法去提交本地事務(wù)通過endTransactionOneway將消息提交給Broker端,Broker端通過Code為END_TRANSACTION的處理器去處理消息調(diào)用processRequest方法來處理對應(yīng)的消息,spX28資訊網(wǎng)——每日最新資訊28at.com

如果由于各種原因?qū)е孪⒌氖鬏?,為了防止這些現(xiàn)象的出現(xiàn)所以在BrokerController啟動時就啟動一個線程每隔6s處理未回查的消息(檢查最大次數(shù)為15次)的任務(wù)來進行消息的回查,簡單來說就是通過sendCheckMessage方法去注冊一個Code為CHECK_TRANSACTION_STATE的消息將內(nèi)容發(fā)送給客戶端,然后客戶端在啟動時也注冊對應(yīng)Code的處理邏輯,通過processTransactionState方法去處理事務(wù)的狀態(tài),如果正常最后還是會去執(zhí)行endTransactionOneway方法,完成事務(wù)消息。spX28資訊網(wǎng)——每日最新資訊28at.com

本文鏈接:http://www.www897cc.com/showinfo-26-54132-0.html分布式場景下的事務(wù)機制

聲明:本網(wǎng)頁內(nèi)容旨在傳播知識,若有侵權(quán)等問題請及時與本網(wǎng)聯(lián)系,我們將在第一時間刪除處理。郵件:2376512515@qq.com

上一篇: Java新的結(jié)構(gòu)化并行模式入門指南

下一篇: JVM中Init、Used、Committed、Max參數(shù)與物理和虛擬內(nèi)存的關(guān)系

標簽:
  • 熱門焦點
  • MIX Fold3包裝盒泄露 新機本月登場

    小米的全新折疊屏旗艦MIX Fold3將于本月發(fā)布,近日該機的真機包裝盒在網(wǎng)上泄露。從圖上來看,新的MIX Fold3包裝盒在外觀設(shè)計方面延續(xù)了之前的方案,變化不大,這也是目前小米旗艦
  • Raft算法:保障分布式系統(tǒng)共識的穩(wěn)健之道

    1. 什么是Raft算法?Raft 是英文”Reliable、Replicated、Redundant、And Fault-Tolerant”(“可靠、可復(fù)制、可冗余、可容錯”)的首字母縮寫。Raft算法是一種用于在分布式系統(tǒng)
  • 破圈是B站頭上的緊箍咒

    來源 | 光子星球撰文 | 吳坤諺編輯 | 吳先之每年的暑期檔都少不了瞄準追劇女孩們的古偶劇集,2021年有優(yōu)酷的《山河令》,2022年有愛奇藝的《蒼蘭訣》,今年卻輪到小破站抓住了追
  • 共享單車的故事講到哪了?

    來源丨??素斀?jīng)與共享充電寶相差不多,共享單車已很久沒有被國內(nèi)熱點新聞關(guān)照到了。除了一再漲價和用戶直呼用不起了。近日多家媒體再發(fā)報道稱,成都、天津、鄭州等地多個共享單
  • 郭明錤稱華為和江淮汽車合作開發(fā)問界MPV,定價100萬左右、計劃明年量產(chǎn)

    8 月 1 日消息,郭明錤今天在 Medium 平臺發(fā)布博文,稱華為正在和江淮汽車合作,開發(fā)售價在 100 萬元的問界 MPV,預(yù)計在 2024 年第 2 季度量產(chǎn),銷量目標為
  • Android 14發(fā)布:首批適配機型公布

    5月11日消息,谷歌在今天凌晨舉行了I/O大會,本次發(fā)布會谷歌帶來了自家的AI語言模型PaLM 2、谷歌Pixel Fold折疊屏、谷歌Pixel 7a手機,同時發(fā)布了Androi
  • 蘋果140W USB-C充電器:采用氮化鎵技術(shù)

    據(jù)10 月 30 日 9to5 Mac 消息報道,當蘋果推出新的 MacBook Pro 2021 時,該公司還推出了新的 140W USB-C 充電器,附贈在 MacBook Pro 16 英寸機型的盒子里,也支
  • 英特爾Xe HPG游戲顯卡:擁有512EU,單風扇版本

    據(jù)10 月 30 日外媒 TheVerge 消息報道,英特爾 Xe HPG Arc Alchemist 的正面實被曝光,不僅擁有 512 EU 版顯卡,還擁有 128EU 的單風扇版本。另外,這款顯卡 PCB
  • 蘋果MacBook Pro 2021測試:仍不支持平滑滾動

    據(jù)10月30日9to5 Mac 消息報道,蘋果新的 14 英寸和 16 英寸 MacBook Pro 2021 上市后獲得了不錯的評價,亮點包括行業(yè)領(lǐng)先的性能,令人印象深刻的電池續(xù)航,精美豐
Top 主站蜘蛛池模板: 庆元县| 余庆县| 濮阳县| 防城港市| 宝山区| 阳山县| 夏津县| 凤山市| 隆林| 兴宁市| 古田县| 丰原市| 芷江| 民丰县| 全南县| 本溪市| 马公市| 页游| 金寨县| 夏河县| 和林格尔县| 行唐县| 宣恩县| 石河子市| 岳普湖县| 杂多县| 邻水| 麻城市| 涟水县| 乌兰浩特市| 来宾市| 华安县| 清丰县| 清河县| 白沙| 车险| 云浮市| 原阳县| 宜宾县| 沙雅县| 广平县|