在上一篇文章中,我們通過雙異步的方式導入了10萬行的Excel,有個小伙伴在評論區問我,如果保證事務呢,如果分批的話。
通過串行讀取Excel,單個Excel耗時191s。
通過Future獲取異步返回值,再和Excel文件數據行進行比較,實現對數據準確性的判斷!
Java8中引入了CompletableFuture,它實現了對Future的全面升級,可以通過回調的方式,獲取異步線程返回值。
CompletableFuture的異步執行通過ForkJoinPool實現, 它使用守護線程去執行任務。
ForkJoinPool在于可以充分利用多核CPU的優勢,把一個任務拆分成多個小任務,把多個小任務放到多個CPU上并行執行,當多個小任務執行完畢后,再將其執行結果合并起來。
想要保證事務,肯定是使用@Transactional來實現。
現在的場景是導入若干個大的Excel文件數據,因為每個Excel導入的表不同,所以只要保證單Excel的事務即可。
上文中,是使用異步批量讀取并插入的方式實現的Excel文件入庫。
也就是說,1個主線程事務 + 若干個子線程事務,我們想要保證單Excel的插入事務,所有異步子線程有任何一個報錯,都要進行事務回滾,如果全部都沒報錯,則進行事務提交。
這個時候,有的小伙伴可能會想到,主線程加個@Transactional注解,所有子線程分別加@Transactional注解,就可以了吧?
但是,這樣是不行的,子線程的異常只會回滾其自身的事務。
如果Excel中有10萬條數據,一次插入4200條數據,最后一次插入3400條。如果其它線程都插入成功了,最后一個報錯了,此時,數據庫中還是會有96600條數據插入成功,與單Excel的事務需求不符。
通過代碼模擬這種情況:
if(end == sheet.getLastRowNum()){ logger.info("插入最后一批數據,模擬異常"); int a = 1/0;}
聲明式事務管理建立在AOP之上的。其本質是對方法前后進行攔截,然后在目標方法開始之前創建或者加入一個事務,在執行完目標方法之后根據執行情況提交或者回滾事務。
簡而言之,@Transactional注解在代碼執行出錯的時候能夠進行事務的回滾。
使用@Transactional后,當程序發生RuntimeException運行時異常在沒有使用try,catch進行捕獲的時候,程序都會中止,當程序發生中止,則會觸發數據庫的回滾。
當使用了trycatch進行捕獲到這個異常,假如在catch中加入了throw e拋出異常,則程序中止,數據庫回滾。
加入在try catch中沒有throw e 拋出異常,只是簡單的打印異常,則異常被捕獲未拋出異常去終止程序,在trycatch中的操作數據庫語句插入失敗,在trycatch上面和下面的數據庫相關插入語句成功,也就是程序成功跑完,數據庫不會發生回滾。
在@Transactional注解中如果不配置rollbackFor屬性,那么事物只會在遇到RuntimeException的時候才會回滾,加上rollbackFor=Exception.class,可以讓事物在遇到非運行時異常時也回滾。
事務攔截器在目標方法執行前后進行攔截,內部會調用方法來獲取Transactional 注解的事務配置信息,調用前會檢查目標方法的修飾符是否為 public,不是 public則不會獲取@Transactional 的屬性配置信息。
rollbackFor 可以指定能夠觸發事務回滾的異常類型。
Spring默認拋出了未檢查unchecked異常(繼承自 RuntimeException 的異常)或者 Error才回滾事務;其他異常不會觸發回滾事務。
如果在事務中拋出其他類型的異常,但卻期望 Spring 能夠回滾事務,就需要指定rollbackFor屬性。
開發中避免不了會對同一個類里面的方法調用,比如有一個類Test,它的一個方法A,A再調用本類的方法B(不論方法B是用public還是private修飾),但方法A沒有聲明注解事務,而B方法有。則外部調用方法A之后,方法B的事務是不會起作用的。這也是經常犯錯誤的一個地方。
那為啥會出現這種情況?其實這還是由于使用Spring AOP代理造成的,因為只有當事務方法被當前類以外的代碼調用時,才會由Spring生成的代理對象來管理。
在同一個類中調用異步方法,等于調用this本類的方法,沒有走Spring生成的代理類,也就不會讓他異步執行,@Transactional的原理也類似。
如果你手動的catch捕獲這個異常并進行處理,事務管理器會認為當前事務應該正常commit,就會導致注解失效,如果非要捕獲且不失效,就必須在代碼塊內throw new Exception拋出異常。
@Transactional(rollbackFor = Exception.class)public void readXls(String filePath, String filename) throws Exception{ try { // 省略一些復雜操作... List<Future<Integer>> futureList = new ArrayList<>(); for (int time = 0; time < times; time++) { Future<Integer> sumFuture = readExcelDataAsyncFutureService.readXlsCacheAsyncMybatis(); futureList.add(sumFuture); } // 主線程獲取Future返回值 boolean futureFlag = getFutureResult(futureList, excelRow); if (futureFlag) { logger.info("readXlsCacheAsync---插入數據成功,提交事務"); } else { TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); logger.info("readXlsCacheAsync---插入數據失敗,回滾事務"); } } catch (Exception e) { TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); logger.error("readXlsCacheAsync---插入數據異常,回滾事務:", e); }}@Async("async-executor")//是否開啟異步@Overridepublic Integer readXlsCacheAsyncMybatis() { try { // 省略一些復雜操作... }catch (Exception e){ throw new RuntimeException("插入數據庫異常", e); }}
如果入庫異常,事務回滾成功。
回滾失敗!
public void readXls(String filePath, String filename) throws Exception{ // 手動開啟事務,不自動提交 TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); try { // 省略一些復雜操作... List<Future<Integer>> futureList = new ArrayList<>(); for (int time = 0; time < times; time++) { Future<Integer> sumFuture = readExcelDataAsyncFutureService.readXlsCacheAsyncMybatis(); futureList.add(sumFuture); } // 主線程獲取Future返回值 boolean futureFlag = getFutureResult(futureList, excelRow); if (futureFlag) { dataSourceTransactionManager.commit(transactionStatus); // 提交 logger.info("readXlsCacheAsync---插入數據成功,提交事務"); } else { dataSourceTransactionManager.rollback(transactionStatus);// 回滾 logger.info("readXlsCacheAsync---插入數據失敗,回滾事務"); } } catch (Exception e) { dataSourceTransactionManager.rollback(transactionStatus);// 回滾 logger.error("readXlsCacheAsync---插入數據異常,回滾事務:", e); }}@Async("async-executor")//是否開啟異步@Overridepublic Integer readXlsCacheAsyncMybatis() { try { // 省略一些復雜操作... }catch (Exception e){ throw new RuntimeException("插入數據庫異常", e); }}
如果入庫異常,事務回滾成功。
回顧一下需求:異步某線程失敗時,主線程回滾所有異步線程的事務!
是代碼有問題,還是就是實現不了呢?
@Async和@Transactional注解都是通過Spring aop實現的,核心都是靠著關鍵的MethodInterceptor實現,@Async會給對應bean代理對象中放入一個AnnotationAsyncExecutionInterceptor攔截器,而@Transactional會給對應bean的代理對象中放入一個TransactionInterceptor攔截器。
Spring事務管理的傳播機制是使用 ThreadLocal 實現的。因為 ThreadLocal 是線程私有的,所以 Spring 的事務傳播機制是不能夠跨線程的。
/** * 數據源事務管理器 */private DataSourceTransactionManager dataSourceTransactionManager;@Autowiredpublic void setUserService(DataSourceTransactionManager dataSourceTransactionManager) { this.dataSourceTransactionManager = dataSourceTransactionManager;}@Overridepublic void readXls(String filePath, String filename) { List<TransactionStatus> transactionStatusList = Collections.synchronizedList(new ArrayList<>()); List<TransactionResource> transactionResourceList = Collections.synchronizedList(new ArrayList<>()); try { List<Future<Integer>> futureList = new ArrayList<>(); for (int time = 0; time < times; time++) { Future<Integer> sumFuture = readAsyncFutureTransactionDBService.readXlsCacheAsyncMybatis(sheet, row, start, end, insertBuilder,transactionStatusList,transactionResourceList); futureList.add(sumFuture); } // 主線程獲取Future返回值 boolean futureFlag = getFutureResult(futureList, excelRow); if (futureFlag) { for (int i = 0; i < transactionStatusList.size(); i++) { TransactionStatus transactionStatus = transactionStatusList.get(i); dataSourceTransactionManager.commit(transactionStatus); // 提交 } logger.info("readXlsCacheAsync---插入數據成功,提交事務"); } else { for (int i = 0; i < transactionStatusList.size(); i++) { TransactionStatus transactionStatus = transactionStatusList.get(i); dataSourceTransactionManager.rollback(transactionStatus);// 回滾 } logger.info("readXlsCacheAsync---插入數據失敗,事務回滾"); throw new RuntimeException("readXlsCacheAsync---插入數據異常,異常事務回滾"); } } catch (Exception e) { logger.error("readXlsCacheAsync---插入數據異常,事務回滾:", e); for (int i = 0; i < transactionStatusList.size(); i++) { TransactionStatus transactionStatus = transactionStatusList.get(i); dataSourceTransactionManager.rollback(transactionStatus);// 回滾 } //connection.rollback(); throw new RuntimeException("readXlsCacheAsync---插入數據異常,異常事務回滾"); }}
@Async("async-executor")@Overridepublic Future<Integer> readXlsCacheAsyncMybatis(XSSFSheet sheet, XSSFRow row, int start, int end, StringBuilder insertBuilder, List<TransactionStatus> transactionStatusList, List<ReadAsyncFutureTransactionServiceImpl.TransactionResource> transactionResourceList) throws Exception { DefaultTransactionDefinition defaultTransactionDefinition = new DefaultTransactionDefinition(); TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(defaultTransactionDefinition); // 開啟新事務 transactionStatusList.add(transactionStatus); // copy事務資源 transactionResourceList.add(ReadAsyncFutureTransactionServiceImpl.TransactionResource.copyTransactionResource()); try { // 入庫操作 }catch (Exception e){ throw new RuntimeException("readXlsCacheAsyncMybatis分批異步讀取Excel,通過Mybatis插入數據庫異常"); }}
/** * 保存當前事務資源,用于線程間的事務資源COPY操作 * <p> * `@Builder`注解是Lombok庫提供的一個注解,它可以用于自動生成Builder模式的代碼,使用@Builder注解可以簡化創建對象實例的過程,并且可以使代碼更加清晰和易于維護 */static class TransactionResource { // TransactionSynchronizationManager類內部默認提供了下面六個ThreadLocal屬性,分別保存當前線程對應的不同事務資源 // 保存當前事務關聯的資源,默認只會在新建事務的時候保存當前獲取到的DataSource和當前事務對應Connection的映射關系 // 當然這里Connection被包裝為了ConnectionHolder // 事務結束后默認會移除集合中的DataSource作為key關聯的資源記錄 private Map<Object, Object> resources; //下面五個屬性會在事務結束后被自動清理,無需我們手動清理 // 事務監聽者,在事務執行到某個階段的過程中,會去回調監聽者對應的回調接口(典型觀察者模式的應用),默認為空集合 private Set<TransactionSynchronization> synchronizations; // 存放當前事務名字 private String currentTransactionName; // 存放當前事務是否是只讀事務 private Boolean currentTransactionReadOnly; // 存放當前事務的隔離級別 private Integer currentTransactionIsolationLevel; // 存放當前事務是否處于激活狀態 private Boolean actualTransactionActive; /** * 對事務資源進行復制 * * @return TransactionResource */ public static TransactionResource copyTransactionResource() { return TransactionResource.builder() //返回的是不可變集合 .resources(TransactionSynchronizationManager.getResourceMap()) //如果需要注冊事務監聽者,這里記得修改,我們這里不需要,就采用默認負責,spring事務內部默認也是這個值 .synchronizations(new LinkedHashSet<>()).currentTransactionName(TransactionSynchronizationManager.getCurrentTransactionName()).currentTransactionReadOnly(TransactionSynchronizationManager.isCurrentTransactionReadOnly()).currentTransactionIsolationLevel(TransactionSynchronizationManager.getCurrentTransactionIsolationLevel()).actualTransactionActive(TransactionSynchronizationManager.isActualTransactionActive()).build(); } /** * 使用 */ public void autoWiredTransactionResource() { resources.forEach(TransactionSynchronizationManager::bindResource); //如果需要注冊事務監聽者,這里記得修改,我們這里不需要,就采用默認負責,spring事務內部默認也是這個值 TransactionSynchronizationManager.initSynchronization(); TransactionSynchronizationManager.setActualTransactionActive(actualTransactionActive); TransactionSynchronizationManager.setCurrentTransactionName(currentTransactionName); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(currentTransactionIsolationLevel); TransactionSynchronizationManager.setCurrentTransactionReadOnly(currentTransactionReadOnly); } /** * 移除 */ public void removeTransactionResource() { // 事務結束后默認會移除集合中的DataSource作為key關聯的資源記錄 // DataSource如果重復移除,unbindResource時會因為不存在此key關聯的事務資源而報錯 resources.keySet().forEach(key -> { if (!(key instanceof DataSource)) { TransactionSynchronizationManager.unbindResource(key); } }); }}
如何不加會怎么樣?
在提交和回滾的時候,會出現異常:
經過不懈的努力,終于解決了“異步某線程失敗時,主線程回滾所有異步線程的事務!”這個看起來很簡單的問題。
也是對雙異步入庫系列的一個完結。
通過添加事務,可以有效的控制Excel異步插入數據的準確性。
本文鏈接:http://www.www897cc.com/showinfo-26-70399-0.html雙異步系列完結撒花,如何解決異步事務問題?
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com
上一篇: 如何在 Npm 上發布二進制文件?