日韩成人免费在线_国产成人一二_精品国产免费人成电影在线观..._日本一区二区三区久久久久久久久不

當(dāng)前位置:首頁(yè) > 科技  > 軟件

事務(wù)提交之后異步執(zhí)行工具類(lèi)封裝

來(lái)源: 責(zé)編: 時(shí)間:2023-09-18 21:40:27 326觀(guān)看
導(dǎo)讀一、背景許多時(shí)候,我們期望在事務(wù)提交之后異步執(zhí)行某些邏輯,調(diào)用外部系統(tǒng),發(fā)送MQ,推送ES等等;當(dāng)事務(wù)回滾時(shí),異步操作也不執(zhí)行,這些異步操作需要等待事務(wù)完成后才執(zhí)行;比如出入庫(kù)的事務(wù)執(zhí)行完畢后,異步發(fā)送MQ給報(bào)表系統(tǒng)、ES等等

Nha28資訊網(wǎng)——每日最新資訊28at.com

一、背景

許多時(shí)候,我們期望在事務(wù)提交之后異步執(zhí)行某些邏輯,調(diào)用外部系統(tǒng),發(fā)送MQ,推送ES等等;當(dāng)事務(wù)回滾時(shí),異步操作也不執(zhí)行,這些異步操作需要等待事務(wù)完成后才執(zhí)行;比如出入庫(kù)的事務(wù)執(zhí)行完畢后,異步發(fā)送MQ給報(bào)表系統(tǒng)、ES等等。Nha28資訊網(wǎng)——每日最新資訊28at.com

二、猜想

我們?cè)陧?xiàng)目中大多都是使用聲明式事務(wù)(@Transactional注解) ,spring會(huì)基于動(dòng)態(tài)代理機(jī)制對(duì)我們的業(yè)務(wù)方法進(jìn)行增強(qiáng),控制connection,從而達(dá)到事務(wù)的目的。那么我們能否在此找尋一些蛛絲馬跡。我們來(lái)看下spring事務(wù)的相關(guān)核心類(lèi)(裝配流程不詳細(xì)敘述)。Nha28資訊網(wǎng)——每日最新資訊28at.com

TransactionInterceptor:Nha28資訊網(wǎng)——每日最新資訊28at.com

public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {  @Override  @Nullable  public Object invoke(MethodInvocation invocation) throws Throwable {     // Work out the target class: may be {@code null}.     // The TransactionAttributeSource should be passed the target class     // as well as the method, which may be from an interface.     Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);     // Adapt to TransactionAspectSupport's invokeWithinTransaction...     return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);  }}

TransactionAspectSupport(重點(diǎn)關(guān)注事務(wù)提交之后做了哪些事情,有哪些擴(kuò)展點(diǎn))。Nha28資訊網(wǎng)——每日最新資訊28at.com

public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean { protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable {   // If the transaction attribute is null, the method is non-transactional.   TransactionAttributeSource tas = getTransactionAttributeSource();   final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);   final TransactionManager tm = determineTransactionManager(txAttr);   if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {      ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {         if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {            throw new TransactionUsageException(                  "Unsupported annotated transaction on suspending function detected: " + method +                  ". Use TransactionalOperator.transactional extensions instead.");         }         ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());         if (adapter == null) {            throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +                  method.getReturnType());         }         return new ReactiveTransactionSupport(adapter);      });      return txSupport.invokeWithinTransaction(            method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);   }   PlatformTransactionManager ptm = asPlatformTransactionManager(tm);   final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);   if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {      // 創(chuàng)建事務(wù),此處也會(huì)創(chuàng)建connection      TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);      Object retVal;      try {         // 執(zhí)行目標(biāo)方法         retVal = invocation.proceedWithInvocation();      }      catch (Throwable ex) {         // 目標(biāo)方法異常時(shí)處理         completeTransactionAfterThrowing(txInfo, ex);         throw ex;      }      finally {		 // 重置TransactionInfo ThreadLocal         cleanupTransactionInfo(txInfo);      }      if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {         // Set rollback-only in case of Vavr failure matching our rollback rules...         TransactionStatus status = txInfo.getTransactionStatus();         if (status != null && txAttr != null) {            retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);         }      }	  // 業(yè)務(wù)方法成功執(zhí)行,提交事務(wù)(重點(diǎn)關(guān)注此處),最終會(huì)調(diào)用AbstractPlatformTransactionManager#commit方法      commitTransactionAfterReturning(txInfo);      return retVal;   }   else {      final ThrowableHolder throwableHolder = new ThrowableHolder();      // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.      try {         Object result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {            TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);            try {               Object retVal = invocation.proceedWithInvocation();               if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {                  // Set rollback-only in case of Vavr failure matching our rollback rules...                  retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);               }               return retVal;            }            catch (Throwable ex) {               if (txAttr.rollbackOn(ex)) {                  // A RuntimeException: will lead to a rollback.                  if (ex instanceof RuntimeException) {                     throw (RuntimeException) ex;                  }                  else {                     throw new ThrowableHolderException(ex);                  }               }               else {                  // A normal return value: will lead to a commit.                  throwableHolder.throwable = ex;                  return null;               }            }            finally {               cleanupTransactionInfo(txInfo);            }         });         // Check result state: It might indicate a Throwable to rethrow.         if (throwableHolder.throwable != null) {            throw throwableHolder.throwable;         }         return result;      }      catch (ThrowableHolderException ex) {         throw ex.getCause();      }      catch (TransactionSystemException ex2) {         if (throwableHolder.throwable != null) {            logger.error("Application exception overridden by commit exception", throwableHolder.throwable);            ex2.initApplicationException(throwableHolder.throwable);         }         throw ex2;      }      catch (Throwable ex2) {         if (throwableHolder.throwable != null) {            logger.error("Application exception overridden by commit exception", throwableHolder.throwable);         }         throw ex2;      }   }}}

AbstractPlatformTransactionManager:Nha28資訊網(wǎng)——每日最新資訊28at.com

public final void commit(TransactionStatus status) throws TransactionException {   if (status.isCompleted()) {      throw new IllegalTransactionStateException(            "Transaction is already completed - do not call commit or rollback more than once per transaction");   }   DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;   if (defStatus.isLocalRollbackOnly()) {      if (defStatus.isDebug()) {         logger.debug("Transactional code has requested rollback");      }      processRollback(defStatus, false);      return;   }   if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {      if (defStatus.isDebug()) {         logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");      }      processRollback(defStatus, true);      return;   }   // 事務(wù)提交處理   processCommit(defStatus);}private void processCommit(DefaultTransactionStatus status) throws TransactionException {   try {      boolean beforeCompletionInvoked = false;      try {         boolean unexpectedRollback = false;         prepareForCommit(status);         triggerBeforeCommit(status);         triggerBeforeCompletion(status);         beforeCompletionInvoked = true;         if (status.hasSavepoint()) {            if (status.isDebug()) {               logger.debug("Releasing transaction savepoint");            }            unexpectedRollback = status.isGlobalRollbackOnly();            status.releaseHeldSavepoint();         }         else if (status.isNewTransaction()) {            if (status.isDebug()) {               logger.debug("Initiating transaction commit");            }            unexpectedRollback = status.isGlobalRollbackOnly();            doCommit(status);         }         else if (isFailEarlyOnGlobalRollbackOnly()) {            unexpectedRollback = status.isGlobalRollbackOnly();         }         // Throw UnexpectedRollbackException if we have a global rollback-only         // marker but still didn't get a corresponding exception from commit.         if (unexpectedRollback) {            throw new UnexpectedRollbackException(                  "Transaction silently rolled back because it has been marked as rollback-only");         }      }      catch (UnexpectedRollbackException ex) {         // can only be caused by doCommit         triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);         throw ex;      }      catch (TransactionException ex) {         // can only be caused by doCommit         if (isRollbackOnCommitFailure()) {            doRollbackOnCommitException(status, ex);         }         else {            triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);         }         throw ex;      }      catch (RuntimeException | Error ex) {         if (!beforeCompletionInvoked) {            triggerBeforeCompletion(status);         }         doRollbackOnCommitException(status, ex);         throw ex;      }      // Trigger afterCommit callbacks, with an exception thrown there      // propagated to callers but the transaction still considered as committed.      try {		 // 在事務(wù)提交后觸發(fā)(追蹤到這里就離真相不遠(yuǎn)了)         triggerAfterCommit(status);      }      finally {         triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);      }   }   finally {      cleanupAfterCompletion(status);   }}

TransactionSynchronizationUtils:Nha28資訊網(wǎng)——每日最新資訊28at.com

public abstract class TransactionSynchronizationUtils {  public static void triggerAfterCommit() {     // TransactionSynchronizationManager: 事務(wù)同步器管理     invokeAfterCommit(TransactionSynchronizationManager.getSynchronizations());  }  public static void invokeAfterCommit(@Nullable List<TransactionSynchronization> synchronizations) {     if (synchronizations != null) {        for (TransactionSynchronization synchronization : synchronizations) {		   // 調(diào)用TransactionSynchronization#afterCommit方法,默認(rèn)實(shí)現(xiàn)為空,留給子類(lèi)擴(kuò)展		   // 那么我們想在事務(wù)提交之后做一些異步操作,實(shí)現(xiàn)此方法即可           synchronization.afterCommit();        }     }  }}

TransactionSynchronization:Nha28資訊網(wǎng)——每日最新資訊28at.com

public interface TransactionSynchronization extends Flushable {   default void afterCommit() {}}

過(guò)程中我們發(fā)現(xiàn)TransactionSynchronizationManager、TransactionSynchronization、TransactionSynchronizationAdapter 等相關(guān)類(lèi)涉及aop的整個(gè)流程,篇幅有限,在此不詳細(xì)展開(kāi),當(dāng)然我們的一些擴(kuò)展也是離不開(kāi)這些基礎(chǔ)類(lèi)的。Nha28資訊網(wǎng)——每日最新資訊28at.com

三、實(shí)現(xiàn)

事務(wù)提交之后異步執(zhí)行,我們需自定義synchronization.afterCommit,結(jié)合線(xiàn)程池一起使用,定義線(xiàn)程池TaskExecutor。Nha28資訊網(wǎng)——每日最新資訊28at.com

@Beanpublic TaskExecutor taskExecutor() {    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();    taskExecutor.setCorePoolSize(******);    taskExecutor.setMaxPoolSize(******);    taskExecutor.setKeepAliveSeconds(******);    taskExecutor.setQueueCapacity(******);    taskExecutor.setThreadNamePrefix(******);    taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());    taskExecutor.initialize();    return taskExecutor;}

定義AfterCommitExecutor接口。Nha28資訊網(wǎng)——每日最新資訊28at.com

public interface AfterCommitExecutor extends Executor { }

定義AfterCommitExecutorImpl實(shí)現(xiàn)類(lèi),注意需繼承TransactionSynchronizationAdapter類(lèi)。Nha28資訊網(wǎng)——每日最新資訊28at.com

import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;import org.springframework.core.NamedThreadLocal;import org.springframework.core.task.TaskExecutor;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.transaction.support.TransactionSynchronizationAdapter;import org.springframework.transaction.support.TransactionSynchronizationManager;import java.util.List;import java.util.ArrayList;@Componentpublic class AfterCommitExecutorImpl extends TransactionSynchronizationAdapter implements AfterCommitExecutor {    private static final Logger LOGGER = LoggerFactory.getLogger(AfterCommitExecutorImpl.class);    // 保存要運(yùn)行的任務(wù)線(xiàn)程    private static final ThreadLocal<List<Runnable>> RUNNABLE_THREAD_LOCAL = new NamedThreadLocal<>("AfterCommitRunnable");    // 設(shè)置線(xiàn)程池    @Autowired    private TaskExecutor taskExecutor;    /**     * 異步執(zhí)行     *     * @param runnable 異步線(xiàn)程     */    @Override    public void execute(Runnable runnable) {        LOGGER.info("Submitting new runnable {} to run after commit", runnable);        // 如果事務(wù)已經(jīng)提交,馬上進(jìn)行異步處理        if (!TransactionSynchronizationManager.isSynchronizationActive()) {            LOGGER.info("Transaction synchronization is NOT ACTIVE. Executing right now runnable {}", runnable);            runnable.run();            return;        }        // 同一個(gè)事務(wù)的合并到一起處理(注意:沒(méi)有初始化則初始化,并注冊(cè))        List<Runnable> threadRunnableList = RUNNABLE_THREAD_LOCAL.get();        if (null == threadRunnableList) {            threadRunnableList = new ArrayList<>();            RUNNABLE_THREAD_LOCAL.set(threadRunnableList);            TransactionSynchronizationManager.registerSynchronization(this);        }        threadRunnableList.add(runnable);    }    /**     * 監(jiān)聽(tīng)到事務(wù)提交之后執(zhí)行方法     */    @Override    public void afterCommit() {        List<Runnable> threadRunnableList = RUNNABLE_THREAD_LOCAL.get();        LOGGER.info("Transaction successfully committed, executing {} threadRunnable", threadRunnableList.size());        for (Runnable runnable : threadRunnableList) {            try {                taskExecutor.execute(runnable);            } catch (RuntimeException e) {                LOGGER.error("Failed to execute runnable " + runnable, e);            }        }    }    /**     * 事務(wù)提交/回滾執(zhí)行     *     * @param status (STATUS_COMMITTED-0、STATUS_ROLLED_BACK-1、STATUS_UNKNOWN-2)     */    @Override    public void afterCompletion(int status) {        LOGGER.info("Transaction completed with status {}", status == STATUS_COMMITTED ? "COMMITTED" : "ROLLED_BACK");        RUNNABLE_THREAD_LOCAL.remove();    }}

使用。Nha28資訊網(wǎng)——每日最新資訊28at.com

工具類(lèi)封裝好了,使用上那就很簡(jiǎn)便了:注入AfterCommitExecutor,調(diào)用AfterCommitExecutor.execute(runnable)方法即可

四、總結(jié)

spring如此龐大,找準(zhǔn)切入點(diǎn),許多問(wèn)題都是可以找到解決思路、或者方案;Nha28資訊網(wǎng)——每日最新資訊28at.com

你對(duì)spring了解多少......Nha28資訊網(wǎng)——每日最新資訊28at.com

本文鏈接:http://www.www897cc.com/showinfo-26-10420-0.html事務(wù)提交之后異步執(zhí)行工具類(lèi)封裝

聲明:本網(wǎng)頁(yè)內(nèi)容旨在傳播知識(shí),若有侵權(quán)等問(wèn)題請(qǐng)及時(shí)與本網(wǎng)聯(lián)系,我們將在第一時(shí)間刪除處理。郵件:2376512515@qq.com

上一篇: 為什么說(shuō)MyBatis默認(rèn)的DefaultSqlSession是線(xiàn)程不安全?

下一篇: AIoTel下視頻編碼(一)--移動(dòng)看家視頻水印溯源技術(shù)

標(biāo)簽:
  • 熱門(mén)焦點(diǎn)
  • 小米降噪藍(lán)牙耳機(jī)Necklace分享:聽(tīng)一首歌 讀懂一個(gè)故事

    在今天下午的小米Civi 2新品發(fā)布會(huì)上,小米還帶來(lái)了一款新的降噪藍(lán)牙耳機(jī)Necklace,我們也在發(fā)布結(jié)束的第一時(shí)間給大家?guī)?lái)這款耳機(jī)的簡(jiǎn)單分享。現(xiàn)在大家能見(jiàn)到最多的藍(lán)牙耳機(jī)
  • vivo TWS Air開(kāi)箱體驗(yàn):真輕 臻好聽(tīng)

    在vivo S15系列新機(jī)的發(fā)布會(huì)上,vivo的最新款真無(wú)線(xiàn)藍(lán)牙耳機(jī)vivo TWS Air也一同發(fā)布,本次就這款耳機(jī)新品給大家?guī)?lái)一個(gè)簡(jiǎn)單的分享。外包裝盒上,vivo TWS Air保持了vivo自家產(chǎn)
  • 5月安卓手機(jī)好評(píng)榜:魅族20 Pro奪冠

    性能榜和性?xún)r(jià)比榜之后,我們來(lái)看最后的安卓手機(jī)好評(píng)榜,數(shù)據(jù)來(lái)源安兔兔評(píng)測(cè),收集時(shí)間2023年5月1日至5月31日,僅限國(guó)內(nèi)市場(chǎng)。第一名:魅族20 Pro好評(píng)率:97.50%不得不感慨魅族老品牌還
  • 只需五步,使用start.spring.io快速入門(mén)Spring編程

    步驟1打開(kāi)https://start.spring.io/,按照屏幕截圖中的內(nèi)容創(chuàng)建項(xiàng)目,添加 Spring Web 依賴(lài)項(xiàng),并單擊“生成”按鈕下載 .zip 文件,為下一步做準(zhǔn)備。請(qǐng)?jiān)谶M(jìn)入步驟2之前進(jìn)行解壓。圖
  • Temu起訴SHEIN,跨境電商戰(zhàn)事升級(jí)

    來(lái)源 | 伯虎財(cái)經(jīng)(bohuFN)作者 | 陳平安日前據(jù)外媒報(bào)道,拼多多旗下跨境電商平臺(tái)Temu正對(duì)競(jìng)爭(zhēng)對(duì)手SHEIN提起新訴訟,訴狀稱(chēng)Shein&ldquo;利用市場(chǎng)支配力量強(qiáng)迫服裝廠(chǎng)商與之簽訂獨(dú)家
  • 電視息屏休眠仍有網(wǎng)絡(luò)上傳 愛(ài)奇藝被質(zhì)疑“薅消費(fèi)者羊毛”

    記者丨寧曉敏 見(jiàn)習(xí)生丨汗青出品丨鰲頭財(cái)經(jīng)(theSankei) 前不久,愛(ài)奇藝發(fā)布了一份亮眼的一季報(bào),不僅營(yíng)收和會(huì)員營(yíng)收創(chuàng)造歷史最佳表現(xiàn),其運(yùn)營(yíng)利潤(rùn)也連續(xù)6個(gè)月實(shí)現(xiàn)增長(zhǎng)。自去年年初
  • 破圈是B站頭上的緊箍咒

    來(lái)源 | 光子星球撰文 | 吳坤諺編輯 | 吳先之每年的暑期檔都少不了瞄準(zhǔn)追劇女孩們的古偶劇集,2021年有優(yōu)酷的《山河令》,2022年有愛(ài)奇藝的《蒼蘭訣》,今年卻輪到小破站抓住了追
  • 超級(jí)標(biāo)準(zhǔn)版旗艦!iQOO 11S全球首發(fā)iQOO超算獨(dú)顯芯片

    上半年已接近尾聲,截至目前各大品牌旗下的頂級(jí)旗艦都已悉數(shù)亮相,而下半年即將推出的頂級(jí)旗艦已經(jīng)成為了數(shù)碼圈爆料的主流,其中就包括全新的iQOO 11S系
  • iQOO Neo8 Pro即將開(kāi)售:到手價(jià)3099元起 安卓性能最強(qiáng)旗艦

    5月23日,iQOO如期舉行了新品發(fā)布會(huì),全新的iQOO Neo8系列也正式與大家見(jiàn)面,包含iQOO Neo8和iQOO Neo8 Pro兩個(gè)版本,其中標(biāo)準(zhǔn)版搭載高通驍龍8+,而Pro版更
Top 主站蜘蛛池模板: 舟山市| 延吉市| 澄江县| 庆安县| 阜阳市| 福建省| 武隆县| 沾化县| 铜陵市| 永清县| 潢川县| 灵寿县| 西昌市| 驻马店市| 张掖市| 门头沟区| 大竹县| 大洼县| 安远县| 驻马店市| 盐边县| 合阳县| 汽车| 成安县| 建平县| 井陉县| 张掖市| 长寿区| 恩施市| 辽源市| 千阳县| 兴山县| 大安市| 田林县| 石狮市| 札达县| 肇东市| 弥渡县| 五原县| 泰和县| 广州市|