我們有個(gè)業(yè)務(wù),會(huì)調(diào)用其他部門(mén)提供的一個(gè)基于http的服務(wù),日調(diào)用量在千萬(wàn)級(jí)別。使用了httpclient來(lái)完成業(yè)務(wù)。之前因?yàn)閝ps上不去,就看了一下業(yè)務(wù)代碼,并做了一些優(yōu)化,記錄在這里。
先對(duì)比前后:優(yōu)化之前,平均執(zhí)行時(shí)間是250ms;
優(yōu)化之后,平均執(zhí)行時(shí)間是80ms,降低了三分之二的消耗,容器不再動(dòng)不動(dòng)就報(bào)警線程耗盡了,清爽~
項(xiàng)目的原實(shí)現(xiàn)比較粗略,就是每次請(qǐng)求時(shí)初始化一個(gè)httpclient,生成一個(gè)httpPost對(duì)象,執(zhí)行,然后從返回結(jié)果取出entity,保存成一個(gè)字符串,最后顯式關(guān)閉response和client。
我們一點(diǎn)點(diǎn)分析和優(yōu)化:
httpclient是一個(gè)線程安全的類,沒(méi)有必要由每個(gè)線程在每次使用時(shí)創(chuàng)建,全局保留一個(gè)即可。
tcp的三次握手與四次揮手兩大裹腳布過(guò)程,對(duì)于高頻次的請(qǐng)求來(lái)說(shuō),消耗實(shí)在太大。試想如果每次請(qǐng)求我們需要花費(fèi)5ms用于協(xié)商過(guò)程,那么對(duì)于qps為100的單系統(tǒng),1秒鐘我們就要花500ms用于握手和揮手。又不是高級(jí)領(lǐng)導(dǎo),我們程序員就不要搞這么大做派了,改成keep alive方式以實(shí)現(xiàn)連接復(fù)用!
原本的邏輯里,使用了如下代碼:
HttpEntity entity = httpResponse.getEntity();String response = EntityUtils.toString(entity);
這里我們相當(dāng)于額外復(fù)制了一份content到一個(gè)字符串里,而原本的httpResponse仍然保留了一份content,需要被consume掉,在高并發(fā)且content非常大的情況下,會(huì)消耗大量?jī)?nèi)存。關(guān)注工眾號(hào):碼猿技術(shù)專欄,回復(fù)關(guān)鍵詞:1111 獲取阿里內(nèi)部Java性能調(diào)優(yōu)手冊(cè)!并且,我們需要顯式的關(guān)閉連接,ugly。
按上面的分析,我們主要要做三件事:一是單例的client,二是緩存的?;钸B接,三是更好的處理返回結(jié)果。一就不說(shuō)了,來(lái)說(shuō)說(shuō)二。
提到連接緩存,很容易聯(lián)想到數(shù)據(jù)庫(kù)連接池。httpclient4提供了一個(gè)PoolingHttpClientConnectionManager 作為連接池。接下來(lái)我們通過(guò)以下步驟來(lái)優(yōu)化:
關(guān)于keep-alive,本文不展開(kāi)說(shuō)明,只提一點(diǎn),是否使用keep-alive要根據(jù)業(yè)務(wù)情況來(lái)定,它并不是靈丹妙藥。還有一點(diǎn),keep-alive和time_wait/close_wait之間也有不少故事。
在本業(yè)務(wù)場(chǎng)景里,我們相當(dāng)于有少數(shù)固定客戶端,長(zhǎng)時(shí)間極高頻次的訪問(wèn)服務(wù)器,啟用keep-alive非常合適
再多提一嘴,http的keep-alive 和tcp的KEEPALIVE不是一個(gè)東西。回到正文,定義一個(gè)strategy如下:
ConnectionKeepAliveStrategy myStrategy = new ConnectionKeepAliveStrategy() { @Override public long getKeepAliveDuration(HttpResponse response, HttpContext context) { HeaderElementIterator it = new BasicHeaderElementIterator (response.headerIterator(HTTP.CONN_KEEP_ALIVE)); while (it.hasNext()) { HeaderElement he = it.nextElement(); String param = he.getName(); String value = he.getValue(); if (value != null && param.equalsIgnoreCase ("timeout")) { return Long.parseLong(value) * 1000; } } return 60 * 1000;//如果沒(méi)有約定,則默認(rèn)定義時(shí)長(zhǎng)為60s }};
PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();connectionManager.setMaxTotal(500);connectionManager.setDefaultMaxPerRoute(50);//例如默認(rèn)每路由最高50并發(fā),具體依據(jù)業(yè)務(wù)來(lái)定
也可以針對(duì)每個(gè)路由設(shè)置并發(fā)數(shù)。
httpClient = HttpClients.custom() .setConnectionManager(connectionManager) .setKeepAliveStrategy(kaStrategy) .setDefaultRequestConfig(RequestConfig.custom().setStaleConnectionCheckEnabled(true).build()) .build();
注意:使用setStaleConnectionCheckEnabled方法來(lái)逐出已被關(guān)閉的鏈接不被推薦。更好的方式是手動(dòng)啟用一個(gè)線程,定時(shí)運(yùn)行closeExpiredConnections 和closeIdleConnections方法,如下所示。
public static class IdleConnectionMonitorThread extends Thread { private final HttpClientConnectionManager connMgr; private volatile boolean shutdown; public IdleConnectionMonitorThread(HttpClientConnectionManager connMgr) { super(); this.connMgr = connMgr; } @Override public void run() { try { while (!shutdown) { synchronized (this) { wait(5000); // Close expired connections connMgr.closeExpiredConnections(); // Optionally, close connections // that have been idle longer than 30 sec connMgr.closeIdleConnections(30, TimeUnit.SECONDS); } } } catch (InterruptedException ex) { // terminate } } public void shutdown() { shutdown = true; synchronized (this) { notifyAll(); } } }
這里要注意的是,不要關(guān)閉connection。
一種可行的獲取內(nèi)容的方式類似于,把entity里的東西復(fù)制一份:
res = EntityUtils.toString(response.getEntity(),"UTF-8");EntityUtils.consume(response1.getEntity());
但是,更推薦的方式是定義一個(gè)ResponseHandler,方便你我他,不再自己catch異常和關(guān)閉流。在此我們可以看一下相關(guān)的源碼:
public <T> T execute(final HttpHost target, final HttpRequest request, final ResponseHandler<? extends T> responseHandler, final HttpContext context) throws IOException, ClientProtocolException { Args.notNull(responseHandler, "Response handler"); final HttpResponse response = execute(target, request, context); final T result; try { result = responseHandler.handleResponse(response); } catch (final Exception t) { final HttpEntity entity = response.getEntity(); try { EntityUtils.consume(entity); } catch (final Exception t2) { // Log this exception. The original exception is more // important and will be thrown to the caller. this.log.warn("Error consuming content after an exception.", t2); } if (t instanceof RuntimeException) { throw (RuntimeException) t; } if (t instanceof IOException) { throw (IOException) t; } throw new UndeclaredThrowableException(t); } // Handling the response was successful. Ensure that the content has // been fully consumed. final HttpEntity entity = response.getEntity(); EntityUtils.consume(entity);//看這里看這里 return result;}
可以看到,如果我們使用resultHandler執(zhí)行execute方法,會(huì)最終自動(dòng)調(diào)用consume方法,而這個(gè)consume方法如下所示:
public static void consume(final HttpEntity entity) throws IOException { if (entity == null) { return; } if (entity.isStreaming()) { final InputStream instream = entity.getContent(); if (instream != null) { instream.close(); } }}
可以看到最終它關(guān)閉了輸入流。
通過(guò)以上步驟,基本就完成了一個(gè)支持高并發(fā)的httpclient的寫(xiě)法,下面是一些額外的配置和提醒:
CONNECTION_TIMEOUT是連接超時(shí)時(shí)間,SO_TIMEOUT是socket超時(shí)時(shí)間,這兩者是不同的。連接超時(shí)時(shí)間是發(fā)起請(qǐng)求前的等待時(shí)間;socket超時(shí)時(shí)間是等待數(shù)據(jù)的超時(shí)時(shí)間。
HttpParams params = new BasicHttpParams();//設(shè)置連接超時(shí)時(shí)間Integer CONNECTION_TIMEOUT = 2 * 1000; //設(shè)置請(qǐng)求超時(shí)2秒鐘 根據(jù)業(yè)務(wù)調(diào)整Integer SO_TIMEOUT = 2 * 1000; //設(shè)置等待數(shù)據(jù)超時(shí)時(shí)間2秒鐘 根據(jù)業(yè)務(wù)調(diào)整 //定義了當(dāng)從ClientConnectionManager中檢索ManagedClientConnection實(shí)例時(shí)使用的毫秒級(jí)的超時(shí)時(shí)間//這個(gè)參數(shù)期望得到一個(gè)java.lang.Long類型的值。如果這個(gè)參數(shù)沒(méi)有被設(shè)置,默認(rèn)等于CONNECTION_TIMEOUT,因此一定要設(shè)置。Long CONN_MANAGER_TIMEOUT = 500L; //在httpclient4.2.3中我記得它被改成了一個(gè)對(duì)象導(dǎo)致直接用long會(huì)報(bào)錯(cuò),后來(lái)又改回來(lái)了 params.setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, CONNECTION_TIMEOUT);params.setIntParameter(CoreConnectionPNames.SO_TIMEOUT, SO_TIMEOUT);params.setLongParameter(ClientPNames.CONN_MANAGER_TIMEOUT, CONN_MANAGER_TIMEOUT);//在提交請(qǐng)求之前 測(cè)試連接是否可用params.setBooleanParameter(CoreConnectionPNames.STALE_CONNECTION_CHECK, true); //另外設(shè)置http client的重試次數(shù),默認(rèn)是3次;當(dāng)前是禁用掉(如果項(xiàng)目量不到,這個(gè)默認(rèn)即可)httpClient.setHttpRequestRetryHandler(new DefaultHttpRequestRetryHandler(0, false));
現(xiàn)在的業(yè)務(wù)里,沒(méi)有nginx的情況反而比較稀少。nginx默認(rèn)和client端打開(kāi)長(zhǎng)連接而和server端使用短鏈接。
注意client端的keepalive_timeout和keepalive_requests參數(shù),以及upstream端的keepalive參數(shù)設(shè)置,這三個(gè)參數(shù)的意義在此也不再贅述。
以上就是我的全部設(shè)置。通過(guò)這些設(shè)置,成功地將原本每次請(qǐng)求250ms的耗時(shí)降低到了80左右,效果顯著。
JAR包如下:
<!-- httpclient --><dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.6</version></dependency>
代碼如下:
//Basic認(rèn)證private static final CredentialsProvider credsProvider = new BasicCredentialsProvider();//httpClientprivate static final CloseableHttpClient httpclient;//httpGet方法private static final HttpGet httpget;//private static final RequestConfig reqestConfig;//響應(yīng)處理器private static final ResponseHandler<String> responseHandler;//jackson解析工具private static final ObjectMapper mapper = new ObjectMapper();static { System.setProperty("http.maxConnections","50"); System.setProperty("http.keepAlive", "true"); //設(shè)置basic校驗(yàn) credsProvider.setCredentials( new AuthScope(AuthScope.ANY_HOST, AuthScope.ANY_PORT, AuthScope.ANY_REALM), new UsernamePasswordCredentials("", "")); //創(chuàng)建http客戶端 httpclient = HttpClients.custom() .useSystemProperties() .setRetryHandler(new DefaultHttpRequestRetryHandler(3,true)) .setDefaultCredentialsProvider(credsProvider) .build(); //初始化httpGet httpget = new HttpGet(); //初始化HTTP請(qǐng)求配置 reqestConfig = RequestConfig.custom() .setContentCompressionEnabled(true) .setSocketTimeout(100) .setAuthenticationEnabled(true) .setConnectionRequestTimeout(100) .setConnectTimeout(100).build(); httpget.setConfig(reqestConfig); //初始化response解析器 responseHandler = new BasicResponseHandler();}/* * 功能:返回響應(yīng) * @author zhangdaquan * @param [url] * @return org.apache.http.client.methods.CloseableHttpResponse * @exception */public static String getResponse(String url) throws IOException { HttpGet get = new HttpGet(url); String response = httpclient.execute(get,responseHandler); return response;} /* * 功能:發(fā)送http請(qǐng)求,并用net.sf.json工具解析 * @author zhangdaquan * @param [url] * @return org.json.JSONObject * @exception */public static JSONObject getUrl(String url) throws Exception{ try { httpget.setURI(URI.create(url)); String response = httpclient.execute(httpget,responseHandler); JSONObject json = JSONObject.fromObject(response); return json; } catch (IOException e) { e.printStackTrace(); } return null;}/* * 功能:發(fā)送http請(qǐng)求,并用jackson工具解析 * @author zhangdaquan * @param [url] * @return com.fasterxml.jackson.databind.JsonNode * @exception */public static JsonNode getUrl2(String url){ try { httpget.setURI(URI.create(url)); String response = httpclient.execute(httpget,responseHandler); JsonNode node = mapper.readTree(response); return node; } catch (IOException e) { e.printStackTrace(); } return null;}/* * 功能:發(fā)送http請(qǐng)求,并用fastjson工具解析 * @author zhangdaquan * @param [url] * @return com.fasterxml.jackson.databind.JsonNode * @exception */public static com.alibaba.fastjson.JSONObject getUrl3(String url){ try { httpget.setURI(URI.create(url)); String response = httpclient.execute(httpget,responseHandler); com.alibaba.fastjson.JSONObject jsonObject = com.alibaba.fastjson.JSONObject.parseObject(response); return jsonObject; } catch (IOException e) { e.printStackTrace(); } return null;}
本文鏈接:http://www.www897cc.com/showinfo-26-12328-0.html高并發(fā)場(chǎng)景下的 HttpClient 優(yōu)化方案,QPS 大大提升!
聲明:本網(wǎng)頁(yè)內(nèi)容旨在傳播知識(shí),若有侵權(quán)等問(wèn)題請(qǐng)及時(shí)與本網(wǎng)聯(lián)系,我們將在第一時(shí)間刪除處理。郵件:2376512515@qq.com
上一篇: Istio流量路由小試牛刀,這幾個(gè)方法超有效
下一篇: 測(cè)試驅(qū)動(dòng)開(kāi)發(fā)實(shí)踐:如何使用 Xunit 框架進(jìn)行單元測(cè)試和集成測(cè)試