首先,我們先簡單解釋下,什么是“內存Join”。
相信大家對關系數據庫的 join 語句肯定不陌生,其作用就是通過關聯關系從多個表中查詢數據,關聯條件和數據聚合全部由 數據庫服務完成。
圖片
而 內存 Join,簡單來說就是把原本數據庫幫我們完成的數據聚合操作遷移到應用服務,在應用服務的內存中完成。
圖片
數據庫join非常簡單,但隨著系統的發展,內存join變得越來越重要,其核心驅動力有:
發現變化,封裝變化,管理變化,是開發人員的必備技能。
本篇文章從查詢訂單這個業務場景為入口,針對數據的內存join進行多次抽象和封裝,最終實現“內存Join聲明化”。
首先,先看下最終的效果,從直觀上感受下“抽象”帶來的效率提升。
圖片
通過抽象,可以達到如下效果:
神秘背后的本質便是“抽象”。讓我們以訂單查詢為線索,層層遞進,最終實現“能力聲明化”。
能力聲明化,是抽象的一種高級表現,無需編寫代碼,通過配置的方式為特定組件進行能力加強。
在正式開始之前,可以先了解下整體的推演流程:
圖片
假設,我們是訂單中心的一位研發伙伴,需要開發 “我的訂單” 模塊,其核心接口包括:
根據需求定義 OrderService 接口如下:
public interface OrderService { // 我的訂單 List<OrderListVO> getByUserId(Long userId); // 訂單詳情 OrderDetailVO getDetailByOrderId(Long orderId);}// 為配合多種實現策略,使用抽象類進行統一public abstract class OrderListVO { public abstract OrderVO getOrder(); public abstract UserVO getUser(); public abstract AddressVO getAddress(); public abstract ProductVO getProduct();}// 為配合多種實現策略,使用抽象類進行統一public abstract class OrderDetailVO { public abstract OrderVO getOrder(); public abstract UserVO getUser(); public abstract AddressVO getAddress(); public abstract ProductVO getProduct(); public abstract List<PayInfoVO> getPayInfo();}
這么簡單的需求,那不是信手拈來,很快就提供了一版
圖片
代碼具體如下:
@Servicepublic class OrderServiceCodingV1 implements OrderService { @Autowired private OrderRepository orderRepository; @Autowired private AddressRepository addressRepository; @Autowired private ProductRepository productRepository; @Autowired private UserRepository userRepository; @Autowired private PayInfoRepository payInfoRepository; @Override public List<OrderListVO> getByUserId(Long userId) { // 獲取用戶訂單 List<Order> orders = this.orderRepository.getByUserId(userId); // 依次進行數據綁定 return orders.stream() .map(order -> convertToOrderListVO(order)) .collect(toList()); } private OrderListVOCodingV1 convertToOrderListVO(Order order) { OrderVO orderVO = OrderVO.apply(order); OrderListVOCodingV1 orderDetailVO = new OrderListVOCodingV1(orderVO); // 綁定地址信息 Address address = this.addressRepository.getById(order.getAddressId()); AddressVO addressVO = AddressVO.apply(address); orderDetailVO.setAddress(addressVO); // 綁定用戶信息 User user = this.userRepository.getById(order.getUserId()); UserVO userVO = UserVO.apply(user); orderDetailVO.setUser(userVO); // 綁定商品信息 Product product = this.productRepository.getById(order.getProductId()); ProductVO productVO = ProductVO.apply(product); orderDetailVO.setProduct(productVO); return orderDetailVO; } @Override public OrderDetailVO getDetailByOrderId(Long orderId) { // 暫時忽略 Order order = this.orderRepository.getById(orderId); return convertToOrderDetailVO(order); } private OrderDetailVO convertToOrderDetailVO(Order order) { OrderDetailVOCodingV1 orderDetail = new OrderDetailVOCodingV1(OrderVO.apply(order)); // 獲取地址并進行綁定 Address address = this.addressRepository.getById(order.getAddressId()); AddressVO addressVO = AddressVO.apply(address); orderDetail.setAddress(addressVO); // 獲取用戶并進行綁定 User user = this.userRepository.getById(order.getUserId()); UserVO userVO = UserVO.apply(user); orderDetail.setUser(userVO); // 獲取商品并進行綁定 Product product = this.productRepository.getById(order.getProductId()); ProductVO productVO = ProductVO.apply(product); orderDetail.setProduct(productVO); // 獲取支付信息并進行綁定 List<PayInfo> payInfos = this.payInfoRepository.getByOrderId(order.getId()); List<PayInfoVO> payInfoVOList = payInfos.stream() .map(PayInfoVO::apply) .collect(toList()); orderDetail.setPayInfo(payInfoVOList); return orderDetail; }}
如果真的這樣實現,那你離“被跑路”不遠了。
為什么會這么說呢?因為 ==“我的訂單”這個接口存在嚴重的性能問題!==
“我的訂單”接口具體實現如下:
單個用戶請求,數據庫訪問總次數 = 1(獲取用戶訂單)+ N(訂單數量) * 3(需要抓取的關聯數據)
其中,N(訂單數量) * 3(關聯數據數量) 存在性能隱患,存在嚴重的==讀放大效應==。一旦遇到忠實用戶,存在成百上千訂單,除了超時別無辦法。
“訂單詳情”接口實現,目前問題不大,最大的問題為:==“訂單詳情”與“我的訂單”兩個接口存在大量的重復邏輯!==
首先,我們先來解決 “我的訂單”接口的性能問題。從之前的分析可知,性能低下的根本原因在于==“讀放大效應”==,數據庫請求次數與用戶訂單數成正比,為了更好的保障性能,最好將數據庫操作控制在一個常量。
整體思路為:先批量獲取要綁定的數據,然后遍歷每一個訂單,在內存中完成數據綁定。
圖片
實現代碼如下:
@Servicepublic class OrderServiceCodingV2 implements OrderService { @Autowired private OrderRepository orderRepository; @Autowired private AddressRepository addressRepository; @Autowired private ProductRepository productRepository; @Autowired private UserRepository userRepository; @Autowired private PayInfoRepository payInfoRepository; @Override public List<OrderListVO> getByUserId(Long userId) { List<Order> orders = this.orderRepository.getByUserId(userId); List<OrderListVOCodingV2> orderDetailVOS = orders.stream() .map(order -> new OrderListVOCodingV2(OrderVO.apply(order))) .collect(toList()); // 批量獲取用戶,并依次進行綁定 List<Long> userIds = orders.stream() .map(Order::getUserId) .collect(toList()); List<User> users = this.userRepository.getByIds(userIds); Map<Long, User> userMap = users.stream() .collect(toMap(User::getId, Function.identity(), (a, b) -> a)); for (OrderListVOCodingV2 orderDetailVO : orderDetailVOS){ User user = userMap.get(orderDetailVO.getOrder().getUserId()); UserVO userVO = UserVO.apply(user); orderDetailVO.setUser(userVO); } // 批量獲取地址,并依次進行綁定 List<Long> addressIds = orders.stream() .map(Order::getAddressId) .collect(toList()); List<Address> addresses = this.addressRepository.getByIds(addressIds); Map<Long, Address> addressMap = addresses.stream() .collect(toMap(Address::getId, Function.identity(), (a, b) -> a)); for (OrderListVOCodingV2 orderDetailVO : orderDetailVOS){ Address address = addressMap.get(orderDetailVO.getOrder().getAddressId()); AddressVO addressVO = AddressVO.apply(address); orderDetailVO.setAddress(addressVO); } // 批量獲取商品,并依次進行綁定 List<Long> productIds = orders.stream() .map(Order::getProductId) .collect(toList()); List<Product> products = this.productRepository.getByIds(productIds); Map<Long, Product> productMap = products.stream() .collect(toMap(Product::getId, Function.identity(), (a, b) -> a)); for (OrderListVOCodingV2 orderDetailVO : orderDetailVOS){ Product product = productMap.get(orderDetailVO.getOrder().getProductId()); ProductVO productVO = ProductVO.apply(product); orderDetailVO.setProduct(productVO); } return orderDetailVOS.stream() .collect(toList()); } @Override public OrderDetailVO getDetailByOrderId(Long orderId) { // 暫時忽略 Order order = this.orderRepository.getById(orderId); return convertToOrderDetailVO(order); } private OrderDetailVO convertToOrderDetailVO(Order order) { // 暫時忽略 return orderDetail; }}
調整之后,對于“我的訂單”接口,單個用戶請求==數據庫的訪問次數變成了常量(4)==。
如果你是這么實現的,那恭喜你,你已步入==合格程序員行列==。
批量查詢+內存Join 方案能滿足大部分場景,如果要抓取的數據太多,也就是數據庫訪問這個==常量變大==時,性能也會越來越差。
原因很簡單,由于串行執行,==整體耗時 = 獲取訂單耗時 + sum(抓取數據耗時)==
聰明的同學早就躍躍欲試,這個我會:==多線程并行執行唄。==
是的,基于 Future 的實現如下(還有很多版本,比如 CountDownLatch)
整體設計如下:
圖片
示例代碼如下:
@Servicepublic class OrderServiceCodingV3 implements OrderService { private ExecutorService executorService; @Autowired private OrderRepository orderRepository; @Autowired private AddressRepository addressRepository; @Autowired private ProductRepository productRepository; @Autowired private UserRepository userRepository; @Autowired private PayInfoRepository payInfoRepository; @PostConstruct public void init(){ // 初始化線程池(不要使用Executors,這里只是演示,需要對資源進行評估) this.executorService = Executors.newFixedThreadPool(20); } @SneakyThrows @Override public List<OrderListVO> getByUserId(Long userId) { List<Order> orders = this.orderRepository.getByUserId(userId); List<OrderListVOCodingV2> orderDetailVOS = orders.stream() .map(order -> new OrderListVOCodingV2(OrderVO.apply(order))) .collect(toList()); List<Callable<Void>> callables = Lists.newArrayListWithCapacity(3); // 創建異步任務 callables.add(() -> { // 批量獲取用戶,并依次進行綁定 List<Long> userIds = orders.stream() .map(Order::getUserId) .collect(toList()); List<User> users = this.userRepository.getByIds(userIds); Map<Long, User> userMap = users.stream() .collect(toMap(User::getId, Function.identity(), (a, b) -> a)); for (OrderListVOCodingV2 orderDetailVO : orderDetailVOS){ User user = userMap.get(orderDetailVO.getOrder().getUserId()); UserVO userVO = UserVO.apply(user); orderDetailVO.setUser(userVO); } return null; }); // 創建異步任務 callables.add(() ->{ // 批量獲取地址,并依次進行綁定 List<Long> addressIds = orders.stream() .map(Order::getAddressId) .collect(toList()); List<Address> addresses = this.addressRepository.getByIds(addressIds); Map<Long, Address> addressMap = addresses.stream() .collect(toMap(Address::getId, Function.identity(), (a, b) -> a)); for (OrderListVOCodingV2 orderDetailVO : orderDetailVOS){ Address address = addressMap.get(orderDetailVO.getOrder().getAddressId()); AddressVO addressVO = AddressVO.apply(address); orderDetailVO.setAddress(addressVO); } return null; }); // 創建異步任務 callables.add(() -> { // 批量獲取商品,并依次進行綁定 List<Long> productIds = orders.stream() .map(Order::getProductId) .collect(toList()); List<Product> products = this.productRepository.getByIds(productIds); Map<Long, Product> productMap = products.stream() .collect(toMap(Product::getId, Function.identity(), (a, b) -> a)); for (OrderListVOCodingV2 orderDetailVO : orderDetailVOS){ Product product = productMap.get(orderDetailVO.getOrder().getProductId()); ProductVO productVO = ProductVO.apply(product); orderDetailVO.setProduct(productVO); } return null; }); // 執行異步任務 this.executorService.invokeAll(callables); return orderDetailVOS.stream() .collect(toList()); } @Override public OrderDetailVO getDetailByOrderId(Long orderId) { // 暫時忽略 Order order = this.orderRepository.getById(orderId); return convertToOrderDetailVO(order); } private OrderDetailVO convertToOrderDetailVO(Order order) { // 暫時忽略 }}
多線程并發執行,==整體耗時 = 獲取訂單耗時 + max(抓取數據耗時)==
如果你能夠這樣實現的,那恭喜你,你已步入==高級程序員行列==。
然后呢,到此為止了?NO,接下來才是高潮?。?!
讓我們==打開認知==,開啟==“抽象+封裝”==之旅。
仔細研究上述代碼,尋找里面的==“變與不變”==,你會發現:
找到邏輯中的變化點,接下來便是有針對性的進行封裝。
對于 “我的訂單” 和 “訂單詳情” 返回==不同的 VO==,該怎么處理呢?
非常簡單,思路如下:
整體設計如下:
圖片
簡單示例如下:
// 以 UserVO 為例,ProductVO、AddressVO,PayInfoVO 基本一致,不在贅述public interface UserVOFetcherV1 { Long getUserId(); void setUser(UserVO user);}// OrderDetailVO 實現對應的接口,為了突出重點暫時忽略具體實現public class OrderDetailVOFetcherV1 extends OrderDetailVO implements AddressVOFetcherV1, ProductVOFetcherV1, UserVOFetcherV1, PayInfoVOFetcherV1{}// OrderListVO 實現對應接口,為了突出重點暫時忽略具體實現public class OrderListVOFetcherV1 extends OrderListVO implements AddressVOFetcherV1, ProductVOFetcherV1, UserVOFetcherV1 {}
有了統一的操作接口,接下來便是抽取具體的綁定邏輯,以 UserVOFetcherExecutor 為例:
@Componentpublic class UserVOFetcherExecutorV1 { @Autowired private UserRepository userRepository; public void fetch(List<? extends UserVOFetcherV1> fetchers){ List<Long> ids = fetchers.stream() .map(UserVOFetcherV1::getUserId) .distinct() .collect(Collectors.toList()); List<User> users = userRepository.getByIds(ids); Map<Long, User> userMap = users.stream() .collect(toMap(user -> user.getId(), Function.identity())); fetchers.forEach(fetcher -> { Long userId = fetcher.getUserId(); User user = userMap.get(userId); if (user != null){ UserVO userVO = UserVO.apply(user); fetcher.setUser(userVO); } }); }}
實現邏輯沒有變化,最重要的變化在于“入參類型”,不在是具體的 VO,而是抽象的 UserVOFetcher 接口。
AddressVOFetcherExecutor、ProductVOFetcherExecutor、PayInfoVOFetcherExecutor 與 UserVOFetcherExecutorV1 邏輯基本一致,篇幅問題不在贅述。
這樣一個小小的調整,會給使用方帶來什么便利?一起看下使用方的變化:
@Servicepublic class OrderServiceFetcherV1 implements OrderService { @Autowired private OrderRepository orderRepository; @Autowired private AddressVOFetcherExecutorV1 addressVOFetcherExecutorV1; @Autowired private ProductVOFetcherExecutorV1 productVOFetcherExecutorV1; @Autowired private UserVOFetcherExecutorV1 userVOFetcherExecutorV1; @Autowired private PayInfoVOFetcherExecutorV1 payInfoVOFetcherExecutorV1; @Override public List<OrderListVO> getByUserId(Long userId) { List<Order> orders = this.orderRepository.getByUserId(userId); List<OrderListVOFetcherV1> orderDetailVOS = orders.stream() .map(order -> new OrderListVOFetcherV1(OrderVO.apply(order))) .collect(toList()); // 直接使用 FetcherExecutor 完成數據綁定 this.addressVOFetcherExecutorV1.fetch(orderDetailVOS); this.productVOFetcherExecutorV1.fetch(orderDetailVOS); this.userVOFetcherExecutorV1.fetch(orderDetailVOS); return orderDetailVOS.stream() .collect(toList()); } @Override public OrderDetailVO getDetailByOrderId(Long orderId) { Order order = this.orderRepository.getById(orderId); OrderDetailVOFetcherV1 orderDetail = new OrderDetailVOFetcherV1(OrderVO.apply(order)); List<OrderDetailVOFetcherV1> orderDetailVOS = Arrays.asList(orderDetail); // 直接使用 FetcherExecutor 完成數據綁定 this.addressVOFetcherExecutorV1.fetch(orderDetailVOS); this.productVOFetcherExecutorV1.fetch(orderDetailVOS); this.userVOFetcherExecutorV1.fetch(orderDetailVOS); this.payInfoVOFetcherExecutorV1.fetch(orderDetailVOS); return orderDetail; }}
兩個方法直接使用 FetcherExecutor 完成數據抓取和綁定,實現了==綁定邏輯的復用==。
如果再有 VO 需要進行數據綁定,只需:
至此,面對新業務基本上與“綁定邏輯”說再見了。
接下來讓我們一起聚焦于綁定邏輯,先對比下上述的UserVOFetcherExecutor 與下面的 AddressVOFetcherExecutor, 找到里面的變化與不變:
@Componentpublic class AddressVOFetcherExecutorV1 { @Autowired private AddressRepository addressRepository; public void fetch(List<? extends AddressVOFetcherV1> fetchers){ // 獲取關聯信息 List<Long> ids = fetchers.stream() .map(AddressVOFetcherV1::getAddressId) .distinct() .collect(Collectors.toList()); // 查詢關聯數據 List<Address> addresses = addressRepository.getByIds(ids); // 轉為為 Map Map<Long, Address> addressMap = addresses.stream() .collect(toMap(address -> address.getId(), Function.identity())); // 依次進行數據綁定 fetchers.forEach(fetcher -> { Long addressId = fetcher.getAddressId(); Address address = addressMap.get(addressId); if (address != null){ // 轉換為 VO AddressVO addressVO = AddressVO.apply(address); // 將數據寫回到結果 fetcher.setAddress(addressVO); } }); }}
仔細觀察,會發現:
獲取關聯信息
查詢關聯數據
將其轉換為 Map
講數據轉化為 VO
將 VO 綁定到結果對象
熟悉設計模式的伙伴是否眼前一亮?停頓一下好好回想一下,哪種模式就是用來處理這種問題的?
答案便是:模板方法模式
整體思想為:
整體設計如下:
圖片
抽取公共父類如下:
abstract class BaseItemFetcherExecutor<FETCHER extends ItemFetcher, DATA, RESULT> implements ItemFetcherExecutor<FETCHER>{ @Override public void fetch(List<FETCHER> fetchers) { // 獲取關聯信息 List<Long> ids = fetchers.stream() .map(this::getFetchId) .distinct() .collect(Collectors.toList()); // 查詢關聯數據 List<DATA> datas = loadData(ids); // 轉為為 Map Map<Long, List<DATA>> dataMap = datas.stream() .collect(groupingBy(this::getDataId)); // 依次進行數據綁定 fetchers.forEach(fetcher -> { Long id = getFetchId(fetcher); List<DATA> ds = dataMap.get(id); if (ds != null){ // 轉換為 VO List<RESULT> result = ds.stream() .map( data -> convertToVo(data)) .collect(Collectors.toList()); // 將數據寫回到結果 setResult(fetcher, result); } }); } protected abstract Long getFetchId(FETCHER fetcher); protected abstract List<DATA> loadData(List<Long> ids); protected abstract Long getDataId(DATA data); protected abstract RESULT convertToVo(DATA data); protected abstract void setResult(FETCHER fetcher, List<RESULT> result);}
基于 BaseItemFetcherExecutor 的 UserFetcherExecutor 如下:
@Componentpublic class UserVOFetcherExecutorV2 extends BaseItemFetcherExecutor<UserVOFetcherV2, User, UserVO>{ @Autowired private UserRepository userRepository; @Override protected Long getFetchId(UserVOFetcherV2 fetcher) { return fetcher.getUserId(); } @Override protected List<User> loadData(List<Long> ids) { return this.userRepository.getByIds(ids); } @Override protected Long getDataId(User user) { return user.getId(); } @Override protected UserVO convertToVo(User user) { return UserVO.apply(user); } @Override protected void setResult(UserVOFetcherV2 fetcher, List<UserVO> userVO) { if (CollectionUtils.isNotEmpty(userVO)) { fetcher.setUser(userVO.get(0)); } } @Override public boolean support(Class<UserVOFetcherV2> cls) { // 暫時忽略,稍后會細講 return UserVOFetcherV2.class.isAssignableFrom(cls); }}
UserVOFetcherExecutor究竟發生什么變化呢?好像變得更復雜了:
那我們究竟得到了什么好處?可以花幾分鐘好好思考一下?。。?span style="display:none">62X28資訊網——每日最新資訊28at.com
在說結果之前,讓我們看下另一個變化點。回想下 FetcherExecutor 的執行點,如下:
@Override public List<OrderListVO> getByUserId(Long userId) { List<Order> orders = this.orderRepository.getByUserId(userId); List<OrderListVOFetcherV1> orderDetailVOS = orders.stream() .map(order -> new OrderListVOFetcherV1(OrderVO.apply(order))) .collect(toList()); // 手工調用,OrderListVO 實現新接口,需要增加新的依賴和調用 this.addressVOFetcherExecutorV1.fetch(orderDetailVOS); this.productVOFetcherExecutorV1.fetch(orderDetailVOS); this.userVOFetcherExecutorV1.fetch(orderDetailVOS); return orderDetailVOS.stream() .collect(toList()); } @Override public OrderDetailVO getDetailByOrderId(Long orderId) { Order order = this.orderRepository.getById(orderId); OrderDetailVOFetcherV1 orderDetail = new OrderDetailVOFetcherV1(OrderVO.apply(order)); List<OrderDetailVOFetcherV1> orderDetailVOS = Arrays.asList(orderDetail); // 手工調用,OrderDetailVO 實現新接口,需要增加新的依賴和調用 this.addressVOFetcherExecutorV1.fetch(orderDetailVOS); this.productVOFetcherExecutorV1.fetch(orderDetailVOS); this.userVOFetcherExecutorV1.fetch(orderDetailVOS); this.payInfoVOFetcherExecutorV1.fetch(orderDetailVOS); return orderDetail; }
其實,需要調用哪些 FetcherExecutor 完全可以由 VO 實現的接口來確定。也就是說,==需要綁定新數據,只需 VO 繼承并實現新的 Fetcher 接口即可。==
對此,我們需要:
哪個設計模式是用來解決這個問題?花幾分鐘好好思考一下!
答案是:==責任鏈模型==
標準的責任鏈模式用起來比較繁瑣,在 Spring 實現中大量使用他的一種變現,及提供一個驗證接口,由組件自身完成判斷,用于決定是否執行自身邏輯。
整體設計如下:
圖片
首先,為了統一 FetcherExecutor 的行為,抽取通用接口:
public interface ItemFetcherExecutor<F extends ItemFetcher> { /** * 該組件是否能處理 cls 類型 * @param cls * @return */ boolean support(Class<F> cls); /** * 執行真正的數據綁定 * @param fetchers */ void fetch(List<F> fetchers);}
具體的實現,可以見 UserVOFetcherExecutorV2 的 support 方法:
@Overridepublic boolean support(Class<UserVOFetcherV2> cls) { return UserVOFetcherV2.class.isAssignableFrom(cls);}
實現邏輯非常簡單,只是判斷 cls 是否實現了 UserVOFetcherV2 接口。
有了 FetcherExecutor 組件后,接下來就是為其提供統一的訪問入口:
@Servicepublic class FetcherService { @Autowired private List<ItemFetcherExecutor> itemFetcherExecutors; public <F extends ItemFetcher> void fetch(Class<F> cls, List<F> fetchers){ if (CollectionUtils.isNotEmpty(fetchers)){ this.itemFetcherExecutors.stream() // 是否能處理該類型 .filter(itemFetcherExecutor -> itemFetcherExecutor.support(cls)) // 執行真正的綁定 .forEach(itemFetcherExecutor -> itemFetcherExecutor.fetch(fetchers)); } }}
邏輯即為簡單,依次遍歷 FetcherExecutor,根據 support 執行結果,執行 fetch 邏輯。
【小常識】Spring 可以將容器中的全部實現直接注入到 List<Bean>。在上述代碼中,將會把所有的 ItemFetcherExecutor 實現注入到 itemFetcherExecutors 屬性。因此,在新增 FetcherExecutor 時,只需將其聲明為 Spring Bean,無需調整代碼邏輯。
OK,我們有了 FetcherService 提供統一的數據綁定能力,原來 OrderServiceFetcher 中 fetch 操作的變化點轉移到 FetcherService,自身變得非常穩定。具體如下:
@Servicepublic class OrderServiceFetcherV2 implements OrderService { @Autowired private OrderRepository orderRepository; @Autowired private FetcherService fetcherService; @Override public List<OrderListVO> getByUserId(Long userId) { List<Order> orders = this.orderRepository.getByUserId(userId); List<OrderListVOFetcherV2> orderDetailVOS = orders.stream() .map(order -> new OrderListVOFetcherV2(OrderVO.apply(order))) .collect(toList()); // VO 數據綁定發生變化,只需調整 VO 實現接口,此處無需變化 fetcherService.fetch(OrderListVOFetcherV2.class, orderDetailVOS); return orderDetailVOS.stream() .collect(toList()); } @Override public OrderDetailVO getDetailByOrderId(Long orderId) { Order order = this.orderRepository.getById(orderId); OrderDetailVOFetcherV2 orderDetail = new OrderDetailVOFetcherV2(OrderVO.apply(order)); // VO 數據綁定發生變化,只需調整 VO 實現接口,此處無需變化 fetcherService.fetch(OrderDetailVOFetcherV2.class, Arrays.asList(orderDetail)); return orderDetail; }}
終于,我們將變化收斂到 VO 內,VO 需要綁定新的數據,只需實現對應接口即可。
經過重構,代碼結構變得非常清晰,如果想通過多線程并發方式提供性能,需要調整哪些組件呢?好好想想?。?!
只需對FetcherService進行調整,讓我們來一個并發版本,具體如下:
@Servicepublic class ConcurrentFetcherService { private ExecutorService executorService; @Autowired private List<ItemFetcherExecutor> itemFetcherExecutors; @PostConstruct public void init(){ this.executorService = Executors.newFixedThreadPool(20); } @SneakyThrows public <F extends ItemFetcher> void fetch(Class<F> cls, List<F> fetchers){ if (CollectionUtils.isNotEmpty(fetchers)){ // 創建異步執行任務 List<Callable<Void>> callables = this.itemFetcherExecutors.stream() .filter(itemFetcherExecutor -> itemFetcherExecutor.support(cls)) .map(itemFetcherExecutor -> (Callable<Void>) () -> { itemFetcherExecutor.fetch(fetchers); return null; }).collect(Collectors.toList()); // 線程池中并行執行 this.executorService.invokeAll(callables); } }}
OrderServiceFetcherV3 只需使用 ConcurrentFetcherService 替代 原來的 FetcherService 并擁有了并發能力。
縱觀整個 Fetcher 封裝,雖然結構清晰,但細節過于繁瑣,特別是:
這些不便將成為落地最大的阻礙,那有沒有辦法進行進一步簡化?
這需要思考下這些設計背后的深層需求:
提供綁定信息
設置綁定結果
被 FetcherExecutor 識別并進行處理
所有這些需求是否可用 ==注解== 的方式實現?
根據上述分析,注解可完成全部任務,新建注解如下:
@Target({ElementType.FIELD, ElementType.TYPE})@Retention(RetentionPolicy.RUNTIME)public @interface JoinInMemory { /** * 從 sourceData 中提取 key * @return */ String keyFromSourceData(); /** * 從 joinData 中提取 key * @return */ String keyFromJoinData(); /** * 批量數據抓取 * @return */ String loader(); /** * 結果轉換器 * @return */ String joinDataConverter() default ""; /** * 運行級別,同一級別的 join 可 并行執行 * @return */ int runLevel() default 10;}
乍一看,需要配置的信息真多,其實大多數配置全部與 FetcherExecutor 實現相關。
abstract class AbstractJoinItemExecutor<SOURCE_DATA, JOIN_KEY, JOIN_DATA, JOIN_RESULT> implements JoinItemExecutor<SOURCE_DATA> { /** * 從原始數據中生成 JoinKey * @param data * @return */ protected abstract JOIN_KEY createJoinKeyFromSourceData(SOURCE_DATA data); /** * 根據 JoinKey 批量獲取 JoinData * @param joinKeys * @return */ protected abstract List<JOIN_DATA> getJoinDataByJoinKeys(List<JOIN_KEY> joinKeys); /** * 從 JoinData 中獲取 JoinKey * @param joinData * @return */ protected abstract JOIN_KEY createJoinKeyFromJoinData(JOIN_DATA joinData); /** * 將 JoinData 轉換為 JoinResult * @param joinData * @return */ protected abstract JOIN_RESULT convertToResult(JOIN_DATA joinData); /** * 將 JoinResult 寫回至 SourceData * @param data * @param JoinResults */ protected abstract void onFound(SOURCE_DATA data, List<JOIN_RESULT> JoinResults); /** * 未找到對應的 JoinData * @param data * @param joinKey */ protected abstract void onNotFound(SOURCE_DATA data, JOIN_KEY joinKey); @Override public void execute(List<SOURCE_DATA> sourceDatas) { // 從源數據中提取 JoinKey List<JOIN_KEY> joinKeys = sourceDatas.stream() .filter(Objects::nonNull) .map(this::createJoinKeyFromSourceData) .filter(Objects::nonNull) .distinct() .collect(toList()); log.debug("get join key {} from source data {}", joinKeys, sourceDatas); // 根據 JoinKey 獲取 JoinData List<JOIN_DATA> allJoinDatas = getJoinDataByJoinKeys(joinKeys); log.debug("get join data {} by join key {}", allJoinDatas, joinKeys); // 將 JoinData 以 Map 形式進行組織 Map<JOIN_KEY, List<JOIN_DATA>> joinDataMap = allJoinDatas.stream() .filter(Objects::nonNull) .collect(groupingBy(this::createJoinKeyFromJoinData)); log.debug("group by join key, result is {}", joinDataMap); // 處理每一條 SourceData for (SOURCE_DATA data : sourceDatas){ // 從 SourceData 中 獲取 JoinKey JOIN_KEY joinKey = createJoinKeyFromSourceData(data); if (joinKey == null){ log.warn("join key from join data {} is null", data); continue; } // 根據 JoinKey 獲取 JoinData List<JOIN_DATA> joinDatasByKey = joinDataMap.get(joinKey); if (CollectionUtils.isNotEmpty(joinDatasByKey)){ // 獲取到 JoinData, 轉換為 JoinResult,進行數據寫回 List<JOIN_RESULT> joinResults = joinDatasByKey.stream() .filter(Objects::nonNull) .map(joinData -> convertToResult(joinData)) .collect(toList()); log.debug("success to convert join data {} to join result {}", joinDatasByKey, joinResults); onFound(data, joinResults); log.debug("success to write join result {} to source data {}", joinResults, data); }else { log.warn("join data lost by join key {} for source data {}", joinKey, data); // 為獲取到 JoinData,進行 notFound 回調 onNotFound(data, joinKey); } } }}
JoinInMemory 注解屬性和AbstractJoinItemExecutor基本一致,在此就不做贅述,我們先看下具體的使用方式:
@Datapublic class OrderDetailVOAnnV1 extends OrderDetailVO { private final OrderVO order; @JoinInMemory(keyFromSourceData = "#{order.userId}", keyFromJoinData = "#{id}", loader = "#{@userRepository.getByIds(#root)}", joinDataConverter = "#{T(com.geekhalo.lego.joininmemory.order.UserVO).apply(#root)}" ) private UserVO user; // 其他暫時忽略}@Datapublic class OrderListVOAnnV1 extends OrderListVO { private final OrderVO order; @JoinInMemory(keyFromSourceData = "#{order.userId}", keyFromJoinData = "#{id}", loader = "#{@userRepository.getByIds(#root)}", joinDataConverter = "#{T(com.geekhalo.lego.joininmemory.order.UserVO).apply(#root)}" ) private UserVO user; // 其他暫時忽略}
我們以 UserVO user 屬性為例
屬性 | 含義 |
keyFromSourceData = "#{order.userId}" | 以屬性 order 中的 userId 作為 JoinKey |
keyFromJoinData = "#{id}" | 以 user 的 id 作為 JoinKey |
loader = "#{@userRepository.getByIds(#root)}" | 將 userRepository bean 的 getByIds 方法作為加載器,其中 #root 為 joinKey 集合(user id 集合) |
joinDataConverter = "#{T(com.geekhalo.lego.joininmemory.order.UserVO).apply(#root)}" | 將 com.geekhalo.lego.joininmemory.order.UserVO 靜態方法 apply 作為轉換器,#root 指的是 User 對象 |
@JoinInMemory 注解中大量使用 SpEL,不熟悉的伙伴可以自行網上進行檢索。
其他部分不變,定義 OrderService 如下:
@Servicepublic class OrderServiceAnnV1 implements OrderService { @Autowired private OrderRepository orderRepository; @Autowired private JoinService joinService; @Override public List<OrderListVO> getByUserId(Long userId) { List<Order> orders = this.orderRepository.getByUserId(userId); List<OrderListVOAnnV1> orderDetailVOS = orders.stream() .map(order -> new OrderListVOAnnV1(OrderVO.apply(order))) .collect(toList()); this.joinService.joinInMemory(OrderListVOAnnV1.class, orderDetailVOS); return orderDetailVOS.stream() .collect(toList()); } @Override public OrderDetailVO getDetailByOrderId(Long orderId) { Order order = this.orderRepository.getById(orderId); OrderDetailVOAnnV1 orderDetail = new OrderDetailVOAnnV1(OrderVO.apply(order)); this.joinService.joinInMemory(OrderDetailVOAnnV1.class, Arrays.asList(orderDetail)); return orderDetail; }}
相對于 Fetcher 抽象,我們將 Fetcher、FetcherExecutor 全部配置化,并通過 注解的方式進行呈現,相對于 Coding 方案,注解方案更加靈活,工作量也更小。
相對于 Fetcher 封裝,一個 @JoinInMemory 成功干掉了兩個組件,但觀其自身配置起來還是非常繁瑣。比如,在訂單查詢這個場景,在 OrderListVO 和 OrderDetailVO 中都需要對 UserVO 進行數據綁定,觀察兩個注解,我們會發現很多重復配置:
//OrderListVO@JoinInMemory(keyFromSourceData = "#{order.userId}", keyFromJoinData = "#{id}", loader = "#{@userRepository.getByIds(#root)}", joinDataConverter = "#{T(com.geekhalo.lego.joininmemory.order.UserVO).apply(#root)}" )private UserVO user;// OrderDetailVO@JoinInMemory(keyFromSourceData = "#{order.userId}", keyFromJoinData = "#{id}", loader = "#{@userRepository.getByIds(#root)}", joinDataConverter = "#{T(com.geekhalo.lego.joininmemory.order.UserVO).apply(#root)}" )private UserVO user;
兩個配置完全一樣,細品之后會發現:
OrderListVO 指的是 OrderListVO 屬性 order 的id值
OrderDetailVO 指的是 OrderDetailVO 屬性 order 的值
對于不變部分如何進行統一管理?
自定義注解 結合 Spring @AliasFor 便可以解決這個問題,以 UserVO 為例:
@Target(ElementType.FIELD)@Retention(RetentionPolicy.RUNTIME)// 管理通用屬性@JoinInMemory(keyFromSourceData = "", keyFromJoinData = "#{id}", loader = "#{@userRepository.getByIds(#root)}", joinDataConverter = "#{T(com.geekhalo.lego.joininmemory.order.UserVO).apply(#root)}")public @interface JoinUserVOOnId { // 使用別名將 keyFromSourceData 的配置暴露出來 @AliasFor( annotation = JoinInMemory.class ) String keyFromSourceData();}
新注解有如下幾個特點:
有了自定義注解,使用變的非常方便:
@Datapublic class OrderListVOAnnV2 extends OrderListVO { private final OrderVO order; // 只需配置參數即可,其他配置由 JoinUserVOOnId 進行管理 @JoinUserVOOnId(keyFromSourceData = "#{order.userId}") private UserVO user;}@Datapublic class OrderDetailVOAnnV2 extends OrderDetailVO { private final OrderVO order; // 只需配置參數即可,其他配置由 JoinUserVOOnId 進行管理 @JoinUserVOOnId(keyFromSourceData = "#{order.userId}") private UserVO user;}
其他使用方式不變,但實現了邏輯簡化:
如果擔心性能,可以一鍵開啟并發綁定,示例如下:
@Data@JoinInMemoryConfig(executorType = JoinInMemeoryExecutorType.PARALLEL)public class OrderListVOAnnV3 extends OrderListVO { private final OrderVO order; @JoinUserVOOnId(keyFromSourceData = "#{order.userId}") private UserVO user; @JoinAddressVOOnId(keyFromSourceData = "#{order.addressId}") private AddressVO address; @JoinProductVOOnId(keyFromSourceData = "#{order.productId}") private ProductVO product;}
JoinInMemoryConfig 配置如下:
屬性 | 含義 |
executorType | PARALLEL 并行執行;SERIAL 串行執行 |
executorName | 執行器名稱,并行執行所使用的線程池名稱,默認為 defaultExecutor |
@JoinInMemory 注解上配置的信息太多,如果直接在業務代碼中使用,非常難以維護,當每個配置發生變化后,很難一次性修改到位。所以,建議只將他作為“原注解”使用。
整體思路詳見:
圖片
對于不同的數據綁定需求,建議使用不同的線程池,從資源層面對不同功能進行隔離,從而將由于依賴接口發生阻塞導致線程耗盡所造成的影響控制在最小范圍。
@JoinInMemoryConfig 的 executorName 屬性配置的便是執行器名稱,不配置直接使用 “defaultExecutor”,具體代碼如下:
@Beanpublic ExecutorService defaultExecutor(){ BasicThreadFactory basicThreadFactory = new BasicThreadFactory.Builder() .namingPattern("JoinInMemory-Thread-%d") .daemon(true) .build(); int maxSize = Runtime.getRuntime().availableProcessors() * 3; return new ThreadPoolExecutor(0, maxSize, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), basicThreadFactory, new ThreadPoolExecutor.CallerRunsPolicy());}
如需使用自定義線程池需:
推導邏輯有點長不知道你get到多少,先簡單回顧一下:
不知道你有什么感觸,接下來,有什么計劃?
本文鏈接:http://www.www897cc.com/showinfo-26-34640-0.htmlDDD死黨:內存Join--將復用和擴展用到極致
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com
下一篇: Python VTK 初探數據源