將相似或重復(fù)請求在上游系統(tǒng)中合并后發(fā)往下游系統(tǒng),可以大大降低下游系統(tǒng)的負(fù)載,提升系統(tǒng)整體吞吐率。文章介紹了 hystrix collapser、ConcurrentHashMultiset、自實(shí)現(xiàn)BatchCollapser 三種請求合并技術(shù),并通過其具體實(shí)現(xiàn)對比各自適用的場景。
工作中,我們常見的請求模型都是”請求-應(yīng)答”式,即一次請求中,服務(wù)給請求分配一個(gè)獨(dú)立的線程,一塊獨(dú)立的內(nèi)存空間,所有的操作都是獨(dú)立的,包括資源和系統(tǒng)運(yùn)算。我們也知道,在請求中處理一次系統(tǒng) I/O 的消耗是非常大的,如果有非常多的請求都進(jìn)行同一類 I/O 操作,那么是否可以將這些 I/O 操作都合并到一起,進(jìn)行一次 I/O 操作,是否可以大大降低下游資源服務(wù)器的負(fù)擔(dān)呢?
最近我工作之余的大部分時(shí)間都花在這個(gè)問題的探究上了,對比了幾個(gè)現(xiàn)有類庫,為了解決一個(gè)小問題把 hystrix javanica 的代碼翻了一遍,也根據(jù)自己工作中遇到的業(yè)務(wù)需求實(shí)現(xiàn)了一個(gè)簡單的合并類,收獲還是挺大的。可能這個(gè)需求有點(diǎn)”偏門”,在網(wǎng)上搜索結(jié)果并不多,也沒有綜合一點(diǎn)的資料,索性自己總結(jié)分享一下,希望能幫到后來遇到這種問題的小伙伴。
開源的請求合并類庫(知名的)好像也只有 Netflix 公司開源的 Hystrix 了, hystrix 專注于保持 WEB 服務(wù)器在高并發(fā)環(huán)境下的系統(tǒng)穩(wěn)定,我們常用它的熔斷器(Circuit Breaker) 來實(shí)現(xiàn)服務(wù)的服務(wù)隔離和災(zāi)時(shí)降級,有了它,可以使整個(gè)系統(tǒng)不至于被某一個(gè)接口的高并發(fā)洪流沖塌,即使接口掛了也可以將服務(wù)降級,返回一個(gè)人性化的響應(yīng)。請求合并作為一個(gè)保障下游服務(wù)穩(wěn)定的利器,在 hystrix 內(nèi)實(shí)現(xiàn)也并不意外。
我們在使用 hystrix 時(shí),常用它的 javanica 模塊,以注解的方式編寫 hystrix 代碼,使代碼更簡潔而且對業(yè)務(wù)代碼侵入更低。所以在項(xiàng)目中我們一般至少需要引用 hystrix-core 和 hystrix-javanica 兩個(gè)包。
另外,hystrix 的實(shí)現(xiàn)都是通過 AOP,我們要還要在項(xiàng)目 xml 里顯式配置 HystrixAspect 的 bean 來啟用它。
<aop:aspectj-autoproxy/> <bean id="hystrixAspect" class="com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect" />
hystrix collapser 是 hystrix 內(nèi)的請求合并器,它有自定義 BatchMethod 和 注解兩種實(shí)現(xiàn)方式,自定義 BatchMethod 網(wǎng)上有各種教程,實(shí)現(xiàn)起來很復(fù)雜,需要手寫大量代碼,而注解方式只需要添加兩行注解即可,但配置方式我在官方文檔上也沒找見,中文方面本文應(yīng)該是獨(dú)一份兒了。
其實(shí)現(xiàn)需要注意的是:
下面是一個(gè)簡單的示例:
public class HystrixCollapserSample { @HystrixCollapser(batchMethod = "batch") public Future<Boolean> single(String input) { return null; // single方法不會(huì)被執(zhí)行到 } public List<Boolean> batch(List<String> inputs) { return inputs.stream().map(it -> Boolean.TRUE).collect(Collectors.toList()); } }
為了解決 hystrix collapser 的配置問題看了下 hystrix javanica 的源碼,這里簡單總結(jié)一下 hystrix 請求合并器的具體實(shí)現(xiàn),源碼的詳細(xì)解析在我的筆記:Hystrix collasper 源碼解析。
需要注意,由于需要等待 timer 執(zhí)行真正的請求操作,collapser 會(huì)導(dǎo)致所有的請求的 cost 都會(huì)增加約 timerInterval/2 ms;
hystrix collapser 的配置需要在 @HystrixCollapser 注解上使用,主要包括兩個(gè)部分,專有配置和 hystrixCommand 通用配置;
專有配置包括:
通用配置包括:
一個(gè)完整的配置如下:
@HystrixCollapser( batchMethod = "batch", collapserKey = "single", scope = com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL, collapserProperties = { @HystrixProperty(name = "maxRequestsInBatch", value = "100"), @HystrixProperty(name = "timerDelayInMilliseconds", value = "1000"), @HystrixProperty(name = "requestCache.enabled", value = "true") })
由于業(yè)務(wù)需求,我們并不太關(guān)心被合并請求的返回值,而且覺得 hystrix 保持那么多的 Future 并沒有必要,于是自己實(shí)現(xiàn)了一個(gè)簡單的請求合并器,業(yè)務(wù)線程簡單地將請求放到一個(gè)容器里,請求數(shù)累積到一定量或延遲了一定的時(shí)間,就取出容器內(nèi)的數(shù)據(jù)統(tǒng)一發(fā)送給下游系統(tǒng)。
設(shè)計(jì)思想跟 hystrix 類似,合并器有一個(gè)字段作為存儲(chǔ)請求的容器,且設(shè)置一個(gè) timer 線程定時(shí)消費(fèi)容器內(nèi)的請求,業(yè)務(wù)線程將請求參數(shù)提交到合并 器的容器內(nèi)。不同之處在于,業(yè)務(wù)線程將請求提交給容器后立即同步返回成功,不必管請求的消費(fèi)結(jié)果,這樣便實(shí)現(xiàn)了時(shí)間維度上的合并觸發(fā)。
另外,我還添加了另外一個(gè)維度的觸發(fā)條件,每次將請求參數(shù)添加到容器后都會(huì)檢驗(yàn)一下容器內(nèi)請求的數(shù)量,如果數(shù)量達(dá)到一定的閾值,將在業(yè)務(wù)線程內(nèi)合并執(zhí)行一次。
由于有兩個(gè)維度會(huì)觸發(fā)合并,就不可避免會(huì)遇到線程安全問題。為了保證容器內(nèi)的請求不會(huì)被多個(gè)線程重復(fù)消費(fèi)或都漏掉,我需要一個(gè)容器能滿足以下條件:
java.util.concurrent 包內(nèi)的 LinkedBlockingDeque 剛好符合要求,首先它實(shí)現(xiàn)了 BlockingDeque 接口,多線程環(huán)境下的存取操作是安全的;此外,它還提供 drainTo(Collection<? super E> c, int maxElements)方法,可以將容器內(nèi) maxElements 個(gè)元素安全地取出來,放到 Collection c 中。
以下是具體的代碼實(shí)現(xiàn):
public class BatchCollapser<E> implements InitializingBean { private static final Logger logger = LoggerFactory.getLogger(BatchCollapser.class); private static volatile Map<Class, BatchCollapser> instance = Maps.newConcurrentMap(); private static final ScheduledExecutorService SCHEDULE_EXECUTOR = Executors.newScheduledThreadPool(1); private volatile LinkedBlockingDeque<E> batchContainer = new LinkedBlockingDeque<>(); private Handler<List<E>, Boolean> cleaner; private long interval; private int threshHold; private BatchCollapser(Handler<List<E>, Boolean> cleaner, int threshHold, long interval) { this.cleaner = cleaner; this.threshHold = threshHold; this.interval = interval; } @Override public void afterPropertiesSet() throws Exception { SCHEDULE_EXECUTOR.scheduleAtFixedRate(() -> { try { this.clean(); } catch (Exception e) { logger.error("clean container exception", e); } }, 0, interval, TimeUnit.MILLISECONDS); } public void submit(E event) { batchContainer.add(event); if (batchContainer.size() >= threshHold) { clean(); } } private void clean() { List<E> transferList = Lists.newArrayListWithExpectedSize(threshHold); batchContainer.drainTo(transferList, 100); if (CollectionUtils.isEmpty(transferList)) { return; } try { cleaner.handle(transferList); } catch (Exception e) { logger.error("batch execute error, transferList:{}", transferList, e); } } public static <E> BatchCollapser getInstance(Handler<List<E>, Boolean> cleaner, int threshHold, long interval) { Class jobClass = cleaner.getClass(); if (instance.get(jobClass) == null) { synchronized (BatchCollapser.class) { if (instance.get(jobClass) == null) { instance.put(jobClass, new BatchCollapser<>(cleaner, threshHold, interval)); } } } return instance.get(jobClass); } }
以下代碼內(nèi)需要注意的點(diǎn):
上面介紹的請求合并都是將多個(gè)請求一次發(fā)送,下游服務(wù)器處理時(shí)本質(zhì)上還是多個(gè)請求,最好的請求合并是在內(nèi)存中進(jìn)行,將請求結(jié)果簡單合并成一個(gè)發(fā)送給下游服務(wù)器。如我們經(jīng)常會(huì)遇到的需求:元素分值累加或數(shù)據(jù)統(tǒng)計(jì),就可以先在內(nèi)存中將某一項(xiàng)的分值或數(shù)據(jù)累加起來,定時(shí)請求數(shù)據(jù)庫保存。
Guava 內(nèi)就提供了這么一種數(shù)據(jù)結(jié)構(gòu):ConcurrentHashMultiset,它不同于普通的 set 結(jié)構(gòu)存儲(chǔ)相同元素時(shí)直接覆蓋原有元素,而是給每個(gè)元素保持一個(gè)計(jì)數(shù) count, 插入重復(fù)時(shí)元素的 count 值加1。而且它在添加和刪除時(shí)并不加鎖也能保證線程安全,具體實(shí)現(xiàn)是通過一個(gè) while(true) 循環(huán)嘗試操作,直到操作夠所需要的數(shù)量。
ConcurrentHashMultiset 這種排重計(jì)數(shù)的特性,非常適合數(shù)據(jù)統(tǒng)計(jì)這種元素在短時(shí)間內(nèi)重復(fù)率很高的場景,經(jīng)過排重后的數(shù)量計(jì)算,可以大大降低下游服務(wù)器的壓力,即使重復(fù)率不高,能用少量的內(nèi)存空間換取系統(tǒng)可用性的提高,也是很劃算的。
使用 ConcurrentHashMultiset 進(jìn)行請求合并與使用普通容器在整體結(jié)構(gòu)上并無太大差異,具體類似于:
if (ConcurrentHashMultiset.isEmpty()) { return; } List<Request> transferList = Lists.newArrayList(); ConcurrentHashMultiset.elementSet().forEach(request -> { int count = ConcurrentHashMultiset.count(request); if (count <= 0) { return; } transferList.add(count == 1 ? request : new Request(request.getIncrement() * count)); ConcurrentHashMultiset.remove(request, count); });
最后總結(jié)一下各個(gè)技術(shù)適用的場景:
另外,如果選擇自己來實(shí)現(xiàn)的話,完全可以將 BatchCollapser 和 ConcurrentHashMultiset 結(jié)合一下,在BatchCollapser 里使用 ConcurrentHashMultiset 作為容器,這樣就可以結(jié)合兩者的優(yōu)勢了。
本文鏈接:http://www.www897cc.com/showinfo-26-89407-0.html請求合并的三種技巧,性能起飛!
聲明:本網(wǎng)頁內(nèi)容旨在傳播知識,若有侵權(quán)等問題請及時(shí)與本網(wǎng)聯(lián)系,我們將在第一時(shí)間刪除處理。郵件:2376512515@qq.com