日韩成人免费在线_国产成人一二_精品国产免费人成电影在线观..._日本一区二区三区久久久久久久久不

當前位置:首頁 > 科技  > 軟件

Sharding-JDBC源碼解析與vivo的定制開發(fā)

來源: 責編: 時間:2024-03-18 09:42:24 167觀看
導讀本文源碼基于Sharding-JDBC 4.1.1版本。一、業(yè)務背景隨著業(yè)務并發(fā)請求和數(shù)據(jù)規(guī)模的不斷擴大,單節(jié)點庫表壓力往往會成為系統(tǒng)的性能瓶頸。公司IT內(nèi)部營銷庫存、交易訂單、財經(jīng)臺賬、考勤記錄等多領域的業(yè)務場景的日增數(shù)

2vK28資訊網(wǎng)——每日最新資訊28at.com

本文源碼基于Sharding-JDBC 4.1.1版本。2vK28資訊網(wǎng)——每日最新資訊28at.com

一、業(yè)務背景

隨著業(yè)務并發(fā)請求和數(shù)據(jù)規(guī)模的不斷擴大,單節(jié)點庫表壓力往往會成為系統(tǒng)的性能瓶頸。公司IT內(nèi)部營銷庫存、交易訂單、財經(jīng)臺賬、考勤記錄等多領域的業(yè)務場景的日增數(shù)據(jù)量巨大,存在著數(shù)據(jù)庫節(jié)點壓力過大、連接過多、查詢速度變慢等情況,根據(jù)數(shù)據(jù)來源、時間、工號等信息來將沒有聯(lián)系的數(shù)據(jù)盡量均分到不同的庫表中,從而在不影響業(yè)務需求的前提下,減輕數(shù)據(jù)庫節(jié)點壓力,提升查詢效率和系統(tǒng)穩(wěn)定性。2vK28資訊網(wǎng)——每日最新資訊28at.com

圖片2vK28資訊網(wǎng)——每日最新資訊28at.com

二、技術選型

我們對比了幾款比較常見的支持分庫分表和讀寫分離的中間件。2vK28資訊網(wǎng)——每日最新資訊28at.com

圖片2vK28資訊網(wǎng)——每日最新資訊28at.com

Sharding-JDBC作為輕量化的增強版的JDBC框架,相較其他中間件性能更好,接入難度更低,其數(shù)據(jù)分片、讀寫分離功能也覆蓋了我們的業(yè)務訴求,因此我們在業(yè)務中廣泛使用了Sharding-JDBC。但在使用Sharding-JDBC的過程中,我們也發(fā)現(xiàn)了諸多問題,為了業(yè)務更便捷的使用Sharding-JDBC,我們對源碼做了針對性的定制開發(fā)和組件封裝來滿足業(yè)務需求。2vK28資訊網(wǎng)——每日最新資訊28at.com

圖片2vK28資訊網(wǎng)——每日最新資訊28at.com

三、源碼解析

3.1 引言

Sharding-JDBC作為基于JDBC的數(shù)據(jù)庫中間件,實現(xiàn)了JDBC的標準api,Sharding-JDBC與原生JDBC的執(zhí)行對比流程如下圖所示:2vK28資訊網(wǎng)——每日最新資訊28at.com

圖片2vK28資訊網(wǎng)——每日最新資訊28at.com


2vK28資訊網(wǎng)——每日最新資訊28at.com

相關執(zhí)行流程的代碼樣例如下:2vK28資訊網(wǎng)——每日最新資訊28at.com

JDBC執(zhí)行樣例2vK28資訊網(wǎng)——每日最新資訊28at.com

//獲取數(shù)據(jù)庫連接try (Connection conn = DriverManager.getConnection("mysqlUrl", "userName", "password")) {    String sql = "SELECT * FROM  t_user WHERE name = ?";    //預編譯SQL    try (PreparedStatement preparedStatement = conn.prepareStatement(sql)) {        //參數(shù)設置與執(zhí)行        preparedStatement.setString(1, "vivo");        preparedStatement.execute(sql);        //獲取結果集        try (ResultSet resultSet = preparedStatement.getResultSet()) {            while (resultSet.next()) {                //處理結果            }        }    }}

Sharding-JDBC 源碼2vK28資訊網(wǎng)——每日最新資訊28at.com

org.apache.shardingsphere.shardingjdbc.jdbc.core.statement#execute    public boolean execute() throws SQLException {        try {            clearPrevious();            //解析+路由+重寫 內(nèi)部調(diào)用BasePrepareEngine#prepare方法            prepare();            initPreparedStatementExecutor();            //執(zhí)行            return preparedStatementExecutor.execute();        } finally {            clearBatch();        }    } org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#prepare    public ExecutionContext prepare(final String sql, final List<Object> parameters) {        List<Object> clonedParameters = cloneParameters(parameters);        //解析+路由(executeRoute內(nèi)部先進行解析再執(zhí)行路由)        RouteContext routeContext = executeRoute(sql, clonedParameters);        ExecutionContext result = new ExecutionContext(routeContext.getSqlStatementContext());        //重寫        result.getExecutionUnits().addAll(executeRewrite(sql, clonedParameters, routeContext));        if (properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW)) {            SQLLogger.logSQL(sql, properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SIMPLE), result.getSqlStatementContext(), result.getExecutionUnits());        }        return result;    } org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#getResultSet    public ResultSet getResultSet() throws SQLException {        if (null != currentResultSet) {            return currentResultSet;        }        if (executionContext.getSqlStatementContext() instanceof SelectStatementContext || executionContext.getSqlStatementContext().getSqlStatement() instanceof DALStatement) {            List<ResultSet> resultSets = getResultSets();            //歸并結果集            MergedResult mergedResult = mergeQuery(getQueryResults(resultSets));            currentResultSet = new ShardingResultSet(resultSets, mergedResult, this, executionContext);        }        return currentResultSet;    }

從對比的執(zhí)行流程圖可見:2vK28資訊網(wǎng)——每日最新資訊28at.com

  • 【JDBC】:執(zhí)行的主要流程是通過Datasource獲取Connection,再注入SQL語句生成PreparedStatement對象,PreparedStatement設置占位符參數(shù)執(zhí)行后得到結果集ResultSet。
  • 【Sharding-JDBC】:主要流程基本一致,但Sharding基于PreparedStatement進行了實現(xiàn)與擴展,具體實現(xiàn)類ShardingPreparedStatement中會抽象出解析、路由、重寫、歸并等引擎,從而實現(xiàn)分庫分表、讀寫分離等能力,每個引擎的作用說明如下表所示:

圖片2vK28資訊網(wǎng)——每日最新資訊28at.com

//*相關引擎的源碼解析在下文會作更深入的闡述。2vK28資訊網(wǎng)——每日最新資訊28at.com

3.2 解析引擎

3.2.1 引擎解析2vK28資訊網(wǎng)——每日最新資訊28at.com

解析引擎是Sharding-JDBC進行分庫分表邏輯的基礎,其作用是將SQL拆解為不可再分的原子符號(稱為token),再根據(jù)數(shù)據(jù)庫類型將這些token分類成關鍵字、表達式、操作符、字面量等不同類型,進而生成抽象語法樹,而語法樹是后續(xù)進行路由、改寫操作的前提(這也正是語法樹的存在使得Sharding-JDBC存在各式各樣的語法限制的原因之一)。2vK28資訊網(wǎng)——每日最新資訊28at.com

圖片2vK28資訊網(wǎng)——每日最新資訊28at.com

▲圖片來源:ShardingSphere 官方文檔2vK28資訊網(wǎng)——每日最新資訊28at.com

4.x的版本采用ANTLR(ANother Tool for Language Recognition)作為解析引擎,在ShardingSphere-sql-parser-dialect模塊中定義了適用于不同數(shù)據(jù)庫語法的解析規(guī)則(.g4文件),idea中也可以下載ANTLR v4的插件,輸入SQL查看解析后的語法樹結果。2vK28資訊網(wǎng)——每日最新資訊28at.com

圖片2vK28資訊網(wǎng)——每日最新資訊28at.com

解析方法的入口在DataNodeRouter的createRouteContext方法中,解析引擎根據(jù)數(shù)據(jù)庫類型和SQL創(chuàng)建SQLParserExecutor執(zhí)行得到解析樹,再通過ParseTreeVisitor()的visit方法,對解析樹進行處理得到SQLStatement。ANTLR支持listener和visitor兩種模式的接口,visitor方式可以更靈活的控制解析樹的遍歷過程,更適用于SQL解析的場景。2vK28資訊網(wǎng)——每日最新資訊28at.com

解析引擎核心代碼:2vK28資訊網(wǎng)——每日最新資訊28at.com

org.apache.shardingsphere.underlying.route.DataNodeRouter#createRouteContext#96    private RouteContext createRouteContext(final String sql, final List<Object> parameters, final boolean useCache) {        //解析引擎解析SQL        SQLStatement sqlStatement = parserEngine.parse(sql, useCache);        try {            SQLStatementContext sqlStatementContext = SQLStatementContextFactory.newInstance(metaData.getSchema(), sql, parameters, sqlStatement);            return new RouteContext(sqlStatementContext, parameters, new RouteResult());            // TODO should pass parameters for master-slave        } catch (final IndexOutOfBoundsException ex) {            return new RouteContext(new CommonSQLStatementContext(sqlStatement), parameters, new RouteResult());        }    } org.apache.shardingsphere.sql.parser.SQLParserEngine#parse0#72    private SQLStatement parse0(final String sql, final boolean useCache) {        //緩存        if (useCache) {            Optional<SQLStatement> cachedSQLStatement = cache.getSQLStatement(sql);            if (cachedSQLStatement.isPresent()) {                return cachedSQLStatement.get();            }        }        //根據(jù)數(shù)據(jù)庫類型和sql生成解析樹        ParseTree parseTree = new SQLParserExecutor(databaseTypeName, sql).execute().getRootNode();        //ParseTreeVisitor的visit方法對解析樹進行處理得到SQLStatement      SQLStatement result = (SQLStatement) ParseTreeVisitorFactory.newInstance(databaseTypeName, VisitorRule.valueOf(parseTree.getClass())).visit(parseTree);        if (useCache) {            cache.put(sql, result);        }        return result;    }

SQLStatement實際上是一個接口,其實現(xiàn)對應著不同的SQL類型,如SelectStatement 類中就包括查詢的字段、表名、where條件、分組、排序、分頁、lock等變量,可以看到這里并沒有對having這種字段做定義,相當于Sharding-JDBC無法識別到SQL中的having,這使得Sharding-JDBC對having語法有一定的限制。2vK28資訊網(wǎng)——每日最新資訊28at.com

SelectStatement2vK28資訊網(wǎng)——每日最新資訊28at.com

public final class SelectStatement extends DMLStatement {    // 字段    private ProjectionsSegment projections;    // 表    private final Collection<TableReferenceSegment> tableReferences = new LinkedList<>();    // where    private WhereSegment where;    // groupBy    private GroupBySegment groupBy;    // orderBy    private OrderBySegment orderBy;    // limit    private LimitSegment limit;    // 父statement    private SelectStatement parentStatement;    // lock    private LockSegment lock;}

SQLStatement還會被進一步轉換成SQLStatementContext,如SelectStatement 會被轉換成SelectStatementContext ,其結構與SelectStatement 類似不再多說,值得注意的是雖然這里定義了containsSubquery來判斷是否包含子查詢,但4.1.1源碼永遠是返回的false,與having類似,這意味著Sharding-JDBC不會對子查詢語句做特殊處理。2vK28資訊網(wǎng)——每日最新資訊28at.com

SelectStatementContext2vK28資訊網(wǎng)——每日最新資訊28at.com

public final class SelectStatementContext extends CommonSQLStatementContext<SelectStatement> implements TableAvailable, WhereAvailable {         private final TablesContext tablesContext;         private final ProjectionsContext projectionsContext;         private final GroupByContext groupByContext;         private final OrderByContext orderByContext;         private final PaginationContext paginationContext;         private final boolean containsSubquery;}     private boolean containsSubquery() {        // FIXME process subquery//        Collection<SubqueryPredicateSegment> subqueryPredicateSegments = getSqlStatement().findSQLSegments(SubqueryPredicateSegment.class);//        for (SubqueryPredicateSegment each : subqueryPredicateSegments) {//            if (!each.getAndPredicates().isEmpty()) {//                return true;//            }//        }        return false;    }

3.2.2 引擎總結2vK28資訊網(wǎng)——每日最新資訊28at.com

解析引擎是進行路由改寫的前提基礎,其作用就是將SQL按照定義的語法規(guī)則拆分成原子符號(token),生成語法樹,根據(jù)不同的SQL類型生成對應的SQLStatement,SQLStatement由各自的Segment組成,所有的Segment都包含startIndex和endIndex來定位token在SQL中所屬的位置,但解析語法難以涵蓋所有的SQL場景,使得部分SQL無法按照預期的結果路由執(zhí)行。2vK28資訊網(wǎng)——每日最新資訊28at.com

3.3 路由引擎

3.3.1 引擎解析2vK28資訊網(wǎng)——每日最新資訊28at.com

路由引擎是Sharding-JDBC的核心步驟,作用是根據(jù)定義的分庫分表規(guī)則將解析引擎生成的SQL上下文生成對應的路由結果,RouteResult 包括DataNode和RouteUnit,DataNode是實際的數(shù)據(jù)源節(jié)點,包括數(shù)據(jù)源名稱和實際的物理表名,RouteUnit則記錄了邏輯表/庫與物理表/庫的映射關系,后面的改寫引擎也是根據(jù)這個映射關系來決定如何替換SQL中的邏輯表(實際上RouteResult 就是維護了一條SQL需要往哪些庫哪些表執(zhí)行的關系)。2vK28資訊網(wǎng)——每日最新資訊28at.com

RouteResult2vK28資訊網(wǎng)——每日最新資訊28at.com

public final class RouteResult {         private final Collection<Collection<DataNode>> originalDataNodes = new LinkedList<>();         private final Collection<RouteUnit> routeUnits = new LinkedHashSet<>();} public final class DataNode {         private static final String DELIMITER = ".";         private final String dataSourceName;         private final String tableName;} public final class RouteUnit {         private final RouteMapper dataSourceMapper;         private final Collection<RouteMapper> tableMappers;} public final class RouteMapper {         private final String logicName;         private final String actualName;}

其中,路由有分為分片路由主從路由,兩者可以單獨使用,也可以組合使用。2vK28資訊網(wǎng)——每日最新資訊28at.com

分片路由2vK28資訊網(wǎng)——每日最新資訊28at.com

ShardingRouteDecorator的decorate方法是路由引擎的核心邏輯,經(jīng)過SQL校驗->生成分片條件->合并分片值后得到路由結果。2vK28資訊網(wǎng)——每日最新資訊28at.com

分片路由decorate方法2vK28資訊網(wǎng)——每日最新資訊28at.com

org.apache.shardingsphere.sharding.route.engine.ShardingRouteDecorator#decorate#57public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final ShardingRule shardingRule, final ConfigurationProperties properties) {        SQLStatementContext sqlStatementContext = routeContext.getSqlStatementContext();        List<Object> parameters = routeContext.getParameters();        //SQL校驗  校驗INSERT INTO .... ON DUPLICATE KEY UPDATE 和UPDATE語句中是否存在分片鍵      ShardingStatementValidatorFactory.newInstance(                sqlStatementContext.getSqlStatement()).ifPresent(validator -> validator.validate(shardingRule, sqlStatementContext.getSqlStatement(), parameters));        //生成分片條件        ShardingConditions shardingConditions = getShardingConditions(parameters, sqlStatementContext, metaData.getSchema(), shardingRule);        //合并分片值        boolean needMergeShardingValues = isNeedMergeShardingValues(sqlStatementContext, shardingRule);        if (sqlStatementContext.getSqlStatement() instanceof DMLStatement && needMergeShardingValues) {            checkSubqueryShardingValues(sqlStatementContext, shardingRule, shardingConditions);            mergeShardingConditions(shardingConditions);        }        ShardingRouteEngine shardingRouteEngine = ShardingRouteEngineFactory.newInstance(shardingRule, metaData, sqlStatementContext, shardingConditions, properties);        //得到路由結果        RouteResult routeResult = shardingRouteEngine.route(shardingRule);        if (needMergeShardingValues) {            Preconditions.checkState(1 == routeResult.getRouteUnits().size(), "Must have one sharding with subquery.");        }        return new RouteContext(sqlStatementContext, parameters, routeResult);    }

ShardingStatementValidator有ShardingInsertStatementValidator和ShardingUpdateStatementValidator兩種實現(xiàn),INSERT INTO .... ON DUPLICATE KEY UPDATE和UPDATE語法都會涉及到字段值的更新,Sharding-JDBC是不允許更新分片值的,畢竟修改分片值還需要將數(shù)據(jù)遷移至新分片值對應的庫表中,才能保證數(shù)據(jù)分片規(guī)則一致。兩者的校驗細節(jié)也有所不同:2vK28資訊網(wǎng)——每日最新資訊28at.com

  • INSERT INTO .... ON DUPLICATE KEY UPDATE僅僅是對UPDATE字段的校驗, ON DUPLICATE KEY UPDATE中包含分片鍵就會報錯;
  • 而UPDATE語句則會額外校驗WHERE條件中分片鍵的原始值和SET的值是否一樣,不一樣則會拋出異常。

圖片2vK28資訊網(wǎng)——每日最新資訊28at.com

ShardingCondition中只有一個變量routeValues,RouteValue是一個接口,有ListRouteValue和RangeRouteValue兩種實現(xiàn),前者記錄了分片鍵的in或=條件的分片值,后者則記錄了范圍查詢的分片值,兩者被封裝為ShardingValue對象后,將會透傳至分片算法中計算得到分片結果集。2vK28資訊網(wǎng)——每日最新資訊28at.com

ShardingCondition2vK28資訊網(wǎng)——每日最新資訊28at.com

public final class ShardingConditions {         private final List<ShardingCondition> conditions;} public class ShardingCondition {         private final List<RouteValue> routeValues = new LinkedList<>();}  public final class ListRouteValue<T extends Comparable<?>> implements RouteValue {         private final String columnName;         private final String tableName;    //in或=條件對應的值    private final Collection<T> values;         @Override    public String toString() {        return tableName + "." + columnName + (1 == values.size() ? " = " + new ArrayList<>(values).get(0) : " in (" + Joiner.on(",").join(values) + ")");    }} public final class RangeRouteValue<T extends Comparable<?>> implements RouteValue {         private final String columnName;         private final String tableName;    //between and 大于小于等范圍值的上下限    private final Range<T> valueRange;}

生成分片條件后還會合并分片條件,但是前文提過在SelectStatementContext中的containsSubquery永遠是false,所以這段邏輯永遠返回false,即不會合并分片條件。2vK28資訊網(wǎng)——每日最新資訊28at.com

判斷是否需要合并分片條件2vK28資訊網(wǎng)——每日最新資訊28at.com

org.apache.shardingsphere.sharding.route.engine.ShardingRouteDecorator#isNeedMergeShardingValues#87private boolean isNeedMergeShardingValues(final SQLStatementContext sqlStatementContext, final ShardingRule shardingRule) {        return sqlStatementContext instanceof SelectStatementContext && ((SelectStatementContext) sqlStatementContext).isContainsSubquery()                && !shardingRule.getShardingLogicTableNames(sqlStatementContext.getTablesContext().getTableNames()).isEmpty();    }

然后就是通過分片路由引擎調(diào)用分片算法計算路由結果了,ShardingRouteEngine實現(xiàn)較多,介紹起來篇幅較多,這里就不展開說明了,可以參考官方文檔來了解路由引擎的選擇規(guī)則。2vK28資訊網(wǎng)——每日最新資訊28at.com

圖片2vK28資訊網(wǎng)——每日最新資訊28at.com

▲圖片來源:ShardingSphere 官方文檔2vK28資訊網(wǎng)——每日最新資訊28at.com

Sharding-JDBC定義了多種分片策略和算法接口,主要的分配策略與算法說明如下表所示:2vK28資訊網(wǎng)——每日最新資訊28at.com


2vK28資訊網(wǎng)——每日最新資訊28at.com

圖片2vK28資訊網(wǎng)——每日最新資訊28at.com

補充兩個細節(jié):2vK28資訊網(wǎng)——每日最新資訊28at.com

(1)當ALLOW_RANGE_QUERY_WITH2vK28資訊網(wǎng)——每日最新資訊28at.com

_INLINE_SHARDING配置設置true時,InlineShardingStrategy支持范圍查詢,但是并不是根據(jù)分片值計算范圍,而是直接全路由至配置的數(shù)據(jù)節(jié)點,會存在性能隱患。2vK28資訊網(wǎng)——每日最新資訊28at.com

InlineShardingStrategy.doSharding2vK28資訊網(wǎng)——每日最新資訊28at.com

org.apache.shardingsphere.core.strategy.route.inline.InlineShardingStrategy#doSharding    public Collection<String> doSharding(final Collection<String> availableTargetNames, final Collection<RouteValue> shardingValues, final ConfigurationProperties properties) {        RouteValue shardingValue = shardingValues.iterator().next();        //ALLOW_RANGE_QUERY_WITH_INLINE_SHARDING設置為true,直接返回availableTargetNames,而不是根據(jù)RangeRouteValue計算        if (properties.<Boolean>getValue(ConfigurationPropertyKey.ALLOW_RANGE_QUERY_WITH_INLINE_SHARDING) && shardingValue instanceof RangeRouteValue) {            return availableTargetNames;        }        Preconditions.checkState(shardingValue instanceof ListRouteValue, "Inline strategy cannot support this type sharding:" + shardingValue.toString());        Collection<String> shardingResult = doSharding((ListRouteValue) shardingValue);        Collection<String> result = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);        for (String each : shardingResult) {            if (availableTargetNames.contains(each)) {                result.add(each);            }        }        return result;    }

(2)4.1.1的官方文檔雖然說Hint可以跳過解析和改寫,但在我們上面解析引擎的源碼解析中,我們并沒有看到有對Hint策略的額外跳過。事實上,即使使用了Hint分片SQL也同樣需要解析重寫,也同樣受Sharding-JDBC的語法限制,這在官方的issue中也曾經(jīng)被提及。2vK28資訊網(wǎng)——每日最新資訊28at.com

圖片2vK28資訊網(wǎng)——每日最新資訊28at.com

▲圖片來源:ShardingSphere 官方文檔2vK28資訊網(wǎng)——每日最新資訊28at.com

主從路由2vK28資訊網(wǎng)——每日最新資訊28at.com

主從路由的核心邏輯就是通過MasterSlaveDataSourceRouter的route方法進行判定SQL走主庫還是從庫。主從情況下,配置的數(shù)據(jù)源實際是一組主從,而不是單個的實例,所以需要通過masterSlaveRule獲取到具體的主庫或者從庫名字。2vK28資訊網(wǎng)——每日最新資訊28at.com

主從路由decorate2vK28資訊網(wǎng)——每日最新資訊28at.com

org.apache.shardingsphere.masterslave.route.engine.MasterSlaveRouteDecorator#decorate        public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final MasterSlaveRule masterSlaveRule, final ConfigurationProperties properties) {        //為空證明沒有經(jīng)過分片路由        if (routeContext.getRouteResult().getRouteUnits().isEmpty()) {            //根據(jù)SQL判斷選擇走主庫還是從庫            String dataSourceName = new MasterSlaveDataSourceRouter(masterSlaveRule).route(routeContext.getSqlStatementContext().getSqlStatement());            RouteResult routeResult = new RouteResult();           //根據(jù)具體的主庫/從庫名創(chuàng)建路由單元            routeResult.getRouteUnits().add(new RouteUnit(new RouteMapper(dataSourceName, dataSourceName), Collections.emptyList()));            return new RouteContext(routeContext.getSqlStatementContext(), Collections.emptyList(), routeResult);        }        Collection<RouteUnit> toBeRemoved = new LinkedList<>();        Collection<RouteUnit> toBeAdded = new LinkedList<>();        //不為空證明已經(jīng)被分片路由處理了        for (RouteUnit each : routeContext.getRouteResult().getRouteUnits()) {            if (masterSlaveRule.getName().equalsIgnoreCase(each.getDataSourceMapper().getActualName())) {                //先標記移除 因為這里是一組主從的名字而不是實際的庫                toBeRemoved.add(each);                //根據(jù)SQL判斷選擇走主庫還是從庫                String actualDataSourceName = new MasterSlaveDataSourceRouter(masterSlaveRule).route(routeContext.getSqlStatementContext().getSqlStatement());                //根據(jù)具體的主庫/從庫名創(chuàng)建路由單元                toBeAdded.add(new RouteUnit(new RouteMapper(each.getDataSourceMapper().getLogicName(), actualDataSourceName), each.getTableMappers()));            }        }        routeContext.getRouteResult().getRouteUnits().removeAll(toBeRemoved);        routeContext.getRouteResult().getRouteUnits().addAll(toBeAdded);        return routeContext;    }

MasterSlaveDataSourceRouter中isMasterRoute方法會判斷SQL是否需要走主庫,當出現(xiàn)以下情況時走主庫:2vK28資訊網(wǎng)——每日最新資訊28at.com

  • select語句包含鎖,如for update語句
  • 不是select語句
  • MasterVisitedManager.isMasterVisited()設置為true
  • HintManager.isMasterRouteOnly()設置為true

不走主庫則通過負載算法選擇從庫,Sharding-JDBC提供了輪詢和隨機兩種算法。2vK28資訊網(wǎng)——每日最新資訊28at.com

MasterSlaveDataSourceRouter2vK28資訊網(wǎng)——每日最新資訊28at.com

public final class MasterSlaveDataSourceRouter {         private final MasterSlaveRule masterSlaveRule;         /**     * Route.     *     * @param sqlStatement SQL statement     * @return data source name     */    public String route(final SQLStatement sqlStatement) {        if (isMasterRoute(sqlStatement)) {            MasterVisitedManager.setMasterVisited();            return masterSlaveRule.getMasterDataSourceName();        }        return masterSlaveRule.getLoadBalanceAlgorithm().getDataSource(                masterSlaveRule.getName(), masterSlaveRule.getMasterDataSourceName(), new ArrayList<>(masterSlaveRule.getSlaveDataSourceNames()));    }         private boolean isMasterRoute(final SQLStatement sqlStatement) {        return containsLockSegment(sqlStatement) || !(sqlStatement instanceof SelectStatement) || MasterVisitedManager.isMasterVisited() || HintManager.isMasterRouteOnly();    }         private boolean containsLockSegment(final SQLStatement sqlStatement) {        return sqlStatement instanceof SelectStatement && ((SelectStatement) sqlStatement).getLock().isPresent();    }}

是否走主庫的信息存在MasterVisitedManager中,MasterVisitedManager是通過ThreadLocal實現(xiàn)的,但這種實現(xiàn)會有一個問題,當我們使用事務先查詢再更新/插入時,第一條查詢SQL并不會走主庫,而是走從庫,如果業(yè)務需要事務的第一條查詢也走主庫,事務查詢前需要手動調(diào)用一次MasterVisitedManager.setMasterVisited()。2vK28資訊網(wǎng)——每日最新資訊28at.com

MasterVisitedManager2vK28資訊網(wǎng)——每日最新資訊28at.com

public final class MasterVisitedManager {         private static final ThreadLocal<Boolean> MASTER_VISITED = ThreadLocal.withInitial(() -> false);         /**     * Judge master data source visited in current thread.     *     * @return master data source visited or not in current thread     */    public static boolean isMasterVisited() {        return MASTER_VISITED.get();    }         /**     * Set master data source visited in current thread.     */    public static void setMasterVisited() {        MASTER_VISITED.set(true);    }         /**     * Clear master data source visited.     */    public static void clear() {        MASTER_VISITED.remove();    }}

3.3.2 引擎總結2vK28資訊網(wǎng)——每日最新資訊28at.com

路由引擎的作用是將SQL根據(jù)參數(shù)通過實現(xiàn)的策略算法計算出實際該在哪些庫的哪些表執(zhí)行,也就是路由結果。路由引擎有兩種實現(xiàn),分別是分片路由和主從路由,兩者都提供了標準化的策略接口來讓業(yè)務實現(xiàn)自己的路由策略,分片路由需要注意自身SQL場景和策略算法相匹配,主從路由中同一線程且同一數(shù)據(jù)庫連接內(nèi),有寫入操作后,之后的讀操作會從主庫讀取,寫入操作前的讀操作不會走主庫。2vK28資訊網(wǎng)——每日最新資訊28at.com

3.4 改寫引擎

3.4.1 引擎解析2vK28資訊網(wǎng)——每日最新資訊28at.com

經(jīng)過解析路由后雖然確定了執(zhí)行的實際庫表,但SQL中表名依舊是邏輯表,不能執(zhí)行,改寫引擎可以將邏輯表替換為物理表。同時,路由至多庫表的SQL也需要拆分為多條SQL執(zhí)行。2vK28資訊網(wǎng)——每日最新資訊28at.com

改寫的入口仍舊在BasePrepareEngine中,創(chuàng)建重寫上下文createSQLRewriteContext,再根據(jù)上下文進行改寫rewrite,最終返回執(zhí)行單元ExecutionUnit。2vK28資訊網(wǎng)——每日最新資訊28at.com

改寫邏輯入口2vK28資訊網(wǎng)——每日最新資訊28at.com

org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#executeRewrite    private Collection<ExecutionUnit> executeRewrite(final String sql, final List<Object> parameters, final RouteContext routeContext) {        //注冊重寫裝飾器        registerRewriteDecorator();        //創(chuàng)建 SQLRewriteContext        SQLRewriteContext sqlRewriteContext = rewriter.createSQLRewriteContext(sql, parameters, routeContext.getSqlStatementContext(), routeContext);        //重寫        return routeContext.getRouteResult().getRouteUnits().isEmpty() ? rewrite(sqlRewriteContext) : rewrite(routeContext, sqlRewriteContext);    }

執(zhí)行單元包含了數(shù)據(jù)源名稱,改寫后的SQL,以及對應的參數(shù),SQL一樣的兩個SQLUnit會被視為相等。2vK28資訊網(wǎng)——每日最新資訊28at.com

ExecutionUnit2vK28資訊網(wǎng)——每日最新資訊28at.com

@RequiredArgsConstructor@Getter@EqualsAndHashCode@ToStringpublic final class ExecutionUnit {         private final String dataSourceName;         private final SQLUnit sqlUnit;} @AllArgsConstructor@RequiredArgsConstructor@Getter@Setter//根據(jù)sql判斷是否相等@EqualsAndHashCode(of = { "sql" })@ToStringpublic final class SQLUnit {     private String sql;     private final List<Object> parameters; }

createSQLRewriteContext完成了兩件事,一個是對SQL參數(shù)進行了重寫,一個是生成了SQLToken。2vK28資訊網(wǎng)——每日最新資訊28at.com

createSQLRewriteContext2vK28資訊網(wǎng)——每日最新資訊28at.com

org.apache.shardingsphere.underlying.rewrite.SQLRewriteEntry#createSQLRewriteContext    public SQLRewriteContext createSQLRewriteContext(final String sql, final List<Object> parameters, final SQLStatementContext sqlStatementContext, final RouteContext routeContext) {        SQLRewriteContext result = new SQLRewriteContext(schemaMetaData, sqlStatementContext, sql, parameters);        //sql參數(shù)重寫        decorate(decorators, result, routeContext);        //生成SQLToken        result.generateSQLTokens();        return result;    } org.apache.shardingsphere.sharding.rewrite.context.ShardingSQLRewriteContextDecorator#decorate    public void decorate(final ShardingRule shardingRule, final ConfigurationProperties properties, final SQLRewriteContext sqlRewriteContext) {        for (ParameterRewriter each : new ShardingParameterRewriterBuilder(shardingRule, routeContext).getParameterRewriters(sqlRewriteContext.getSchemaMetaData())) {            if (!sqlRewriteContext.getParameters().isEmpty() && each.isNeedRewrite(sqlRewriteContext.getSqlStatementContext())) {                //參數(shù)重寫                each.rewrite(sqlRewriteContext.getParameterBuilder(), sqlRewriteContext.getSqlStatementContext(), sqlRewriteContext.getParameters());            }        }        //sqlTokenGenerators        sqlRewriteContext.addSQLTokenGenerators(new ShardingTokenGenerateBuilder(shardingRule, routeContext).getSQLTokenGenerators());    } org.apache.shardingsphere.underlying.rewrite.context.SQLRewriteContext#generateSQLTokens    public void generateSQLTokens() {        sqlTokens.addAll(sqlTokenGenerators.generateSQLTokens(sqlStatementContext, parameters, schemaMetaData));    }

ParameterRewriter中與分片相關的實現(xiàn)有兩種。2vK28資訊網(wǎng)——每日最新資訊28at.com

圖片2vK28資訊網(wǎng)——每日最新資訊28at.com

//*詳細的例子可以參考官方文檔中分頁修正和補列部分。2vK28資訊網(wǎng)——每日最新資訊28at.com

SQLToken記錄了SQL中每個token(解析引擎中提過的不可再分的原子符號)的起始位置,從而方便改寫引擎知道哪些位置需要改寫。2vK28資訊網(wǎng)——每日最新資訊28at.com

SQLToken2vK28資訊網(wǎng)——每日最新資訊28at.com

@RequiredArgsConstructor@Getterpublic abstract class SQLToken implements Comparable<SQLToken> {         private final int startIndex;         @Override    public final int compareTo(final SQLToken sqlToken) {        return startIndex - sqlToken.getStartIndex();    }}

創(chuàng)建完SQLRewriteContext后就對整條SQL進行重寫和組裝參數(shù),可以看出每個RouteUnit都會重寫SQL并獲取自己對應的參數(shù)。2vK28資訊網(wǎng)——每日最新資訊28at.com

SQLRouteRewriteEngine.rewrite2vK28資訊網(wǎng)——每日最新資訊28at.com

org.apache.shardingsphere.underlying.rewrite.engine.SQLRouteRewriteEngine#rewrite    public Map<RouteUnit, SQLRewriteResult> rewrite(final SQLRewriteContext sqlRewriteContext, final RouteResult routeResult) {        Map<RouteUnit, SQLRewriteResult> result = new LinkedHashMap<>(routeResult.getRouteUnits().size(), 1);        for (RouteUnit each : routeResult.getRouteUnits()) {            //重寫SQL+組裝參數(shù)            result.put(each, new SQLRewriteResult(new RouteSQLBuilder(sqlRewriteContext, each).toSQL(), getParameters(sqlRewriteContext.getParameterBuilder(), routeResult, each)));        }        return result;    }

toSQL核心就是根據(jù)SQLToken將SQL拆分改寫再拼裝,比如:2vK28資訊網(wǎng)——每日最新資訊28at.com

select * from t_order where created_by = '123' 2vK28資訊網(wǎng)——每日最新資訊28at.com

就會被拆分為select * from | t_order | where created_by = '123'三部分進行改寫拼裝。2vK28資訊網(wǎng)——每日最新資訊28at.com

toSQL2vK28資訊網(wǎng)——每日最新資訊28at.com

org.apache.shardingsphere.underlying.rewrite.sql.impl.AbstractSQLBuilder#toSQL    public final String toSQL() {        if (context.getSqlTokens().isEmpty()) {            return context.getSql();        }        Collections.sort(context.getSqlTokens());        StringBuilder result = new StringBuilder();        //截取第一個SQLToken之前的內(nèi)容  select * from        result.append(context.getSql().substring(0, context.getSqlTokens().get(0).getStartIndex()));        for (SQLToken each : context.getSqlTokens()) {            //重寫拼接每個SQLToken對應的內(nèi)容  t_order ->t_order_0            result.append(getSQLTokenText(each));            //拼接SQLToken中間不變的內(nèi)容 where created_by = '123'            result.append(getConjunctionText(each));        }        return result.toString();    }

ParameterBuilder有StandardParameterBuilder和GroupedParameterBuilder兩個實現(xiàn)。2vK28資訊網(wǎng)——每日最新資訊28at.com

  • StandardParameterBuilder:適用于非insert語句,getParameters無需分組處理直接返回即可。
  • GroupedParameterBuilder:適用于insert語句,需要根據(jù)路由情況對參數(shù)進行分組。

原因和樣例可以參考官方文檔批量拆分部分2vK28資訊網(wǎng)——每日最新資訊28at.com

getParameters2vK28資訊網(wǎng)——每日最新資訊28at.com

org.apache.shardingsphere.underlying.rewrite.engine.SQLRouteRewriteEngine#getParameters    private List<Object> getParameters(final ParameterBuilder parameterBuilder, final RouteResult routeResult, final RouteUnit routeUnit) {        if (parameterBuilder instanceof StandardParameterBuilder || routeResult.getOriginalDataNodes().isEmpty() || parameterBuilder.getParameters().isEmpty()) {            //非插入語句直接返回            return parameterBuilder.getParameters();        }        List<Object> result = new LinkedList<>();        int count = 0;        for (Collection<DataNode> each : routeResult.getOriginalDataNodes()) {            if (isInSameDataNode(each, routeUnit)) {                //插入語句參數(shù)分組構造                result.addAll(((GroupedParameterBuilder) parameterBuilder).getParameters(count));            }            count++;        }        return result;    }

3.4.2 引擎總結2vK28資訊網(wǎng)——每日最新資訊28at.com

改寫引擎的作用是將邏輯SQL轉換為實際可執(zhí)行的SQL,這其中既有邏輯表名的替換,也有多路由的SQL拆分,還有為了后續(xù)歸并操作而進行的分頁、分組、排序等改寫,select語句不會對參數(shù)進行重組,而insert語句為了避免插入多余數(shù)據(jù),會通過路由單元對參數(shù)進行重組。2vK28資訊網(wǎng)——每日最新資訊28at.com

3.5 執(zhí)行引擎

3.5.1 引擎解析2vK28資訊網(wǎng)——每日最新資訊28at.com

改寫完成后的SQL就可以執(zhí)行了,執(zhí)行引擎需要平衡好資源和效率,如果為每條真實SQL都創(chuàng)建一個數(shù)據(jù)庫連接顯然會造成資源的濫用,但如果單線程串行也必然會影響執(zhí)行效率。2vK28資訊網(wǎng)——每日最新資訊28at.com

執(zhí)行引擎會先將執(zhí)行單元中需要執(zhí)行的SQLUnit根據(jù)數(shù)據(jù)源分組,同一個數(shù)據(jù)源下的SQLUnit會放入一個list,然后會根據(jù)maxConnectionsSizePerQuery對同一個數(shù)據(jù)源的SQLUnit繼續(xù)分組,創(chuàng)建連接并綁定SQLUnit。2vK28資訊網(wǎng)——每日最新資訊28at.com

執(zhí)行組創(chuàng)建2vK28資訊網(wǎng)——每日最新資訊28at.com

org.apache.shardingsphere.sharding.execute.sql.prepare.SQLExecutePrepareTemplate#getSynchronizedExecuteUnitGroups    private Collection<InputGroup<StatementExecuteUnit>> getSynchronizedExecuteUnitGroups(            final Collection<ExecutionUnit> executionUnits, final SQLExecutePrepareCallback callback) throws SQLException {        //根據(jù)數(shù)據(jù)源將SQLUnit分組 key=dataSourceName        Map<String, List<SQLUnit>> sqlUnitGroups = getSQLUnitGroups(executionUnits);        Collection<InputGroup<StatementExecuteUnit>> result = new LinkedList<>();        //創(chuàng)建sql執(zhí)行組        for (Entry<String, List<SQLUnit>> entry : sqlUnitGroups.entrySet()) {            result.addAll(getSQLExecuteGroups(entry.getKey(), entry.getValue(), callback));        }        return result;    } org.apache.shardingsphere.sharding.execute.sql.prepare.SQLExecutePrepareTemplate#getSQLExecuteGroups    private List<InputGroup<StatementExecuteUnit>> getSQLExecuteGroups(final String dataSourceName,                                                                       final List<SQLUnit> sqlUnits, final SQLExecutePrepareCallback callback) throws SQLException {        List<InputGroup<StatementExecuteUnit>> result = new LinkedList<>();        //每個連接需要執(zhí)行的最大sql數(shù)量        int desiredPartitionSize = Math.max(0 == sqlUnits.size() % maxConnectionsSizePerQuery ? sqlUnits.size() / maxConnectionsSizePerQuery : sqlUnits.size() / maxConnectionsSizePerQuery + 1, 1);        //分組,每組對應一條數(shù)據(jù)庫連接        List<List<SQLUnit>> sqlUnitPartitions = Lists.partition(sqlUnits, desiredPartitionSize);        //選擇連接模式 連接限制/內(nèi)存限制        ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY;        //創(chuàng)建連接        List<Connection> connections = callback.getConnections(connectionMode, dataSourceName, sqlUnitPartitions.size());        int count = 0;        for (List<SQLUnit> each : sqlUnitPartitions) {            //綁定連接和SQLUnit 創(chuàng)建StatementExecuteUnit            result.add(getSQLExecuteGroup(connectionMode, connections.get(count++), dataSourceName, each, callback));        }        return result;    }

SQLUnit分組和連接模式選擇沒有任何關系,連接模式的選擇只取決于maxConnectionsSizePerQuery和SQLUnit數(shù)量的大小關系,maxConnectionsSizePerQuery代表了一個數(shù)據(jù)源一次查詢允許的最大連接數(shù)。2vK28資訊網(wǎng)——每日最新資訊28at.com

  • 當maxConnectionsSizePerQuery<sqlunit數(shù)量時,意味著無法做到每個sqlunit獨享一個連接,需要直接查詢出結果集至內(nèi)存中;< li="">
  • 當maxConnectionsSizePerQuery>=SQLUnit數(shù)量時,意味著可以支持每個SQLUnit獨享一個連接,可以通過ResultSet游標下移的方式查詢結果集。

不過maxConnectionsSizePerQuery默認值為1,所以當一條SQL需要路由至多張表時(即有多個SQLUnit)會采用連接限制,當路由至單表時是內(nèi)存限制模式。2vK28資訊網(wǎng)——每日最新資訊28at.com

圖片2vK28資訊網(wǎng)——每日最新資訊28at.com

為了避免產(chǎn)生數(shù)據(jù)庫連接死鎖問題,在內(nèi)存限制模式時,Sharding-JDBC通過鎖住數(shù)據(jù)源對象一次性創(chuàng)建出本條SQL需要的所有數(shù)據(jù)庫連接。連接限制模式下,各連接一次性查出各自的結果,不會出現(xiàn)多連接相互等待的情況,因此不會發(fā)生死鎖,而內(nèi)存限制模式通過游標讀取結果集,需要多條連接去查詢不同的表做合并,如果不一次性拿到所有需要的連接,則可能存在連接相互等待的情況造成死鎖。可以參照官方文檔中執(zhí)行引擎相關例子2vK28資訊網(wǎng)——每日最新資訊28at.com

不同連接模式創(chuàng)建連接2vK28資訊網(wǎng)——每日最新資訊28at.com

private List<Connection> createConnections(final String dataSourceName, final ConnectionMode connectionMode, final DataSource dataSource, final int connectionSize) throws SQLException {    if (1 == connectionSize) {        Connection connection = createConnection(dataSourceName, dataSource);        replayMethodsInvocation(connection);        return Collections.singletonList(connection);    }    if (ConnectionMode.CONNECTION_STRICTLY == connectionMode) {        return createConnections(dataSourceName, dataSource, connectionSize);    }    //內(nèi)存限制模式加鎖 一次性獲取所有的連接    synchronized (dataSource) {        return createConnections(dataSourceName, dataSource, connectionSize);    }}

此外,結果集的內(nèi)存合并和流式合并只在調(diào)用JDBC的executeQuery的情況下生效,如果使用execute方式進行查詢,都是統(tǒng)一使用流式方式的查詢。2vK28資訊網(wǎng)——每日最新資訊28at.com

查詢結果歸并對比2vK28資訊網(wǎng)——每日最新資訊28at.com

org.apache.shardingsphere.shardingjdbc.executor.PreparedStatementExecutor#executeQuery#101   org.apache.shardingsphere.shardingjdbc.executor.PreparedStatementExecutor#getQueryResult    private QueryResult getQueryResult(final Statement statement, final ConnectionMode connectionMode) throws SQLException {        PreparedStatement preparedStatement = (PreparedStatement) statement;        ResultSet resultSet = preparedStatement.executeQuery();        getResultSets().add(resultSet);        //executeQuery 中根據(jù)連接模式選擇流式/內(nèi)存        return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new StreamQueryResult(resultSet) : new MemoryQueryResult(resultSet);    } //execute 單獨調(diào)用getResultSet中只會使用流式合并org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#getResultSet#158  org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#getQueryResults     private List<QueryResult> getQueryResults(final List<ResultSet> resultSets) throws SQLException {        List<QueryResult> result = new ArrayList<>(resultSets.size());        for (ResultSet each : resultSets) {            if (null != each) {                result.add(new StreamQueryResult(each));            }        }        return result;    }

多條連接的執(zhí)行方式分為串行和并行,在本地事務和XA事務中是串行的方式,其余情況是并行,具體的執(zhí)行邏輯這里就不再展開了。2vK28資訊網(wǎng)——每日最新資訊28at.com

isHoldTransaction2vK28資訊網(wǎng)——每日最新資訊28at.com

public boolean isHoldTransaction() {        return (TransactionType.LOCAL == transactionType && !getAutoCommit()) || (TransactionType.XA == transactionType && isInShardingTransaction());    }

3.5.2 引擎總結2vK28資訊網(wǎng)——每日最新資訊28at.com

執(zhí)行引擎通過maxConnectionsSizePerQuery和同數(shù)據(jù)源的SQLUnit的數(shù)量大小確定連接模式,maxConnectionsSizePerQuery=SQLUnit數(shù)量使用內(nèi)存限制模式,當使用內(nèi)存限制模式時會通過對數(shù)據(jù)源對象加鎖來保證一次性獲取本條SQL需要的連接而避免死鎖。在使用executeQuery查詢時,處理結果集時會根據(jù)連接模式選擇流式或者內(nèi)存合并,但使用execute方法查詢,處理結果集只會使用流式合并。2vK28資訊網(wǎng)——每日最新資訊28at.com

3.6 歸并引擎

3.6.1 引擎解析2vK28資訊網(wǎng)——每日最新資訊28at.com

查詢出的結果集需要經(jīng)過歸并引擎歸并后才是最終的結果,歸并的核心入口在MergeEntry的process方法中,優(yōu)先處理分片場景的合并,再進行脫敏,只有讀寫分離的情況下則直接返回TransparentMergedResult,TransparentMergedResult實際上沒做合并的額外處理,其內(nèi)部實現(xiàn)都是完全調(diào)用queryResult的實現(xiàn)。2vK28資訊網(wǎng)——每日最新資訊28at.com

圖片2vK28資訊網(wǎng)——每日最新資訊28at.com

歸并邏輯入口2vK28資訊網(wǎng)——每日最新資訊28at.com

org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#mergeQuery#190 org.apache.shardingsphere.underlying.pluggble.merge.MergeEngine#merge#61    org.apache.shardingsphere.underlying.merge.MergeEntry#process    public MergedResult process(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext) throws SQLException {        //分片合并        Optional<MergedResult> mergedResult = merge(queryResults, sqlStatementContext);        //脫敏處理        Optional<MergedResult> result = mergedResult.isPresent() ? Optional.of(decorate(mergedResult.get(), sqlStatementContext)) : decorate(queryResults.get(0), sqlStatementContext);        //只有讀寫分離的情況下,orElseGet會不存在,TransparentMergedResult        return result.orElseGet(() -> new TransparentMergedResult(queryResults.get(0)));    }

TransparentMergedResult2vK28資訊網(wǎng)——每日最新資訊28at.com

@RequiredArgsConstructorpublic final class TransparentMergedResult implements MergedResult {         private final QueryResult queryResult;         @Override    public boolean next() throws SQLException {        return queryResult.next();    }         @Override    public Object getValue(final int columnIndex, final Class<?> type) throws SQLException {        return queryResult.getValue(columnIndex, type);    }         @Override    public Object getCalendarValue(final int columnIndex, final Class<?> type, final Calendar calendar) throws SQLException {        return queryResult.getCalendarValue(columnIndex, type, calendar);    }         @Override    public InputStream getInputStream(final int columnIndex, final String type) throws SQLException {        return queryResult.getInputStream(columnIndex, type);    }         @Override    public boolean wasNull() throws SQLException {        return queryResult.wasNull();    }}

我們只看分片相關的操作,ResultMergerEngine只有一個實現(xiàn)類ShardingResultMergerEngine,所以只有存在分片情況的時候,上文的第一個merge才會有結果。根據(jù)SQL類型的不同選擇ResultMerger實現(xiàn),查詢類的合并是最常用也是最復雜的合并。2vK28資訊網(wǎng)——每日最新資訊28at.com

MergeEntry.merge2vK28資訊網(wǎng)——每日最新資訊28at.com

org.apache.shardingsphere.underlying.merge.MergeEntry#merge    private Optional<MergedResult> merge(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext) throws SQLException {        for (Entry<BaseRule, ResultProcessEngine> entry : engines.entrySet()) {            if (entry.getValue() instanceof ResultMergerEngine) {                //選擇不同類型的 resultMerger                ResultMerger resultMerger = ((ResultMergerEngine) entry.getValue()).newInstance(databaseType, entry.getKey(), properties, sqlStatementContext);                //歸并                return Optional.of(resultMerger.merge(queryResults, sqlStatementContext, schemaMetaData));            }        }        return Optional.empty();    } org.apache.shardingsphere.sharding.merge.ShardingResultMergerEngine#newInstance    public ResultMerger newInstance(final DatabaseType databaseType, final ShardingRule shardingRule, final ConfigurationProperties properties, final SQLStatementContext sqlStatementContext) {        if (sqlStatementContext instanceof SelectStatementContext) {            return new ShardingDQLResultMerger(databaseType);        }        if (sqlStatementContext.getSqlStatement() instanceof DALStatement) {            return new ShardingDALResultMerger(shardingRule);        }        return new TransparentResultMerger();    }

ShardingDQLResultMerger的merge方法就是根據(jù)SQL解析結果中包含的token選擇合適的歸并方式(分組聚合、排序、遍歷),歸并后的mergedResult統(tǒng)一經(jīng)過decorate方法進行判斷是否需要分頁歸并,整體處理流程圖可以概括如下。2vK28資訊網(wǎng)——每日最新資訊28at.com

歸并方式選擇2vK28資訊網(wǎng)——每日最新資訊28at.com

org.apache.shardingsphere.sharding.merge.dql.ShardingDQLResultMerger#merge    public MergedResult merge(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext, final SchemaMetaData schemaMetaData) throws SQLException {        if (1 == queryResults.size()) {            return new IteratorStreamMergedResult(queryResults);        }        Map<String, Integer> columnLabelIndexMap = getColumnLabelIndexMap(queryResults.get(0));        SelectStatementContext selectStatementContext = (SelectStatementContext) sqlStatementContext;        selectStatementContext.setIndexes(columnLabelIndexMap);        //分組聚合,排序,遍歷        MergedResult mergedResult = build(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);        //分頁歸并        return decorate(queryResults, selectStatementContext, mergedResult);    } org.apache.shardingsphere.sharding.merge.dql.ShardingDQLResultMerger#build    private MergedResult build(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext,                               final Map<String, Integer> columnLabelIndexMap, final SchemaMetaData schemaMetaData) throws SQLException {        if (isNeedProcessGroupBy(selectStatementContext)) {            //分組聚合歸并            return getGroupByMergedResult(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);        }        if (isNeedProcessDistinctRow(selectStatementContext)) {            setGroupByForDistinctRow(selectStatementContext);            //分組聚合歸并            return getGroupByMergedResult(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);        }        if (isNeedProcessOrderBy(selectStatementContext)) {            //排序歸并            return new OrderByStreamMergedResult(queryResults, selectStatementContext, schemaMetaData);        }        //遍歷歸并        return new IteratorStreamMergedResult(queryResults);    } org.apache.shardingsphere.sharding.merge.dql.ShardingDQLResultMerger#decorate    private MergedResult decorate(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext, final MergedResult mergedResult) throws SQLException {        PaginationContext paginationContext = selectStatementContext.getPaginationContext();        if (!paginationContext.isHasPagination() || 1 == queryResults.size()) {            return mergedResult;        }        String trunkDatabaseName = DatabaseTypes.getTrunkDatabaseType(databaseType.getName()).getName();        //根據(jù)數(shù)據(jù)庫類型分頁歸并        if ("MySQL".equals(trunkDatabaseName) || "PostgreSQL".equals(trunkDatabaseName)) {            return new LimitDecoratorMergedResult(mergedResult, paginationContext);        }        if ("Oracle".equals(trunkDatabaseName)) {            return new RowNumberDecoratorMergedResult(mergedResult, paginationContext);        }        if ("SQLServer".equals(trunkDatabaseName)) {            return new TopAndRowNumberDecoratorMergedResult(mergedResult, paginationContext);        }        return mergedResult;    }

每種歸并方式的作用在官方文檔有比較詳細的案例,這里就不再重復介紹了。2vK28資訊網(wǎng)——每日最新資訊28at.com

3.6.2 引擎總結2vK28資訊網(wǎng)——每日最新資訊28at.com

歸并引擎是Sharding-JDBC執(zhí)行SQL的最后一步,其作用是將多個數(shù)節(jié)點的結果集組合為一個正確的結果集返回,查詢類的歸并有分組歸并、聚合歸并、排序歸并、遍歷歸并、分頁歸并五種,這五種歸并方式并不是互斥的,而是相互組合的。2vK28資訊網(wǎng)——每日最新資訊28at.com

四、定制開發(fā)

在使用Sharding-JDBC過程中,我們發(fā)現(xiàn)了一些問題可以改進,比如存量系統(tǒng)數(shù)據(jù)量到達一定規(guī)模而需要分庫分表引入Sharding-JDBC時,就會存在兩大問題2vK28資訊網(wǎng)——每日最新資訊28at.com

一個是存量數(shù)據(jù)的遷移,這個問題我們可以通過分片算法兼容,前文已經(jīng)提過分片鍵的值是不允許更改的,而且SQL如果不包含分片鍵,如果這個分片鍵對應的值是遞增的(如id,時間等),我們可以設置一個閾值,在分片算法的doSharding中判斷分片值與閾值的大小決定將數(shù)據(jù)路由至舊表或新表,避免數(shù)據(jù)遷移的麻煩。如果是根據(jù)用戶id取模分表,而新增的數(shù)據(jù)無法只通過用戶id判斷,這時可以考慮采用復合分片算法,將用戶id與訂單id或者時間等遞增的字段同時設置為分片鍵,根據(jù)訂單id或時間判斷是否是新數(shù)據(jù),再根據(jù)用戶id取模得到路由結果即可。2vK28資訊網(wǎng)——每日最新資訊28at.com

另一個是Sharding-JDBC語法限制會使得存量SQL面對巨大的改造壓力,而實際上業(yè)務更關心的是需要分片的表,非分片的表不應該發(fā)生改動和影響。實際上,非分片表理論上無需通過解析、路由、重寫、合并,為此我們在源碼層面對這段邏輯進行了優(yōu)化,支持跳過部分解析,完全跳過分片路由、重寫和合并,盡可能減少Sharding-JDBC對非分片表的語法限制,來減少業(yè)務系統(tǒng)的改造壓力與風險。2vK28資訊網(wǎng)——每日最新資訊28at.com

圖片2vK28資訊網(wǎng)——每日最新資訊28at.com

4.1 跳過Sharding語法限制

Sharding-JDBC執(zhí)行解析路由重寫的邏輯都是在BasePrepareEngine中,最終構造ExecutionContext交由執(zhí)行引擎執(zhí)行,ExecutionContext中包含sqlStatementContext和executionUnits,非分片表不涉及路由改寫,所以其ExecutionUnit我們非常容易手動構造,而查看SQLStatementContext的使用情況,我們發(fā)現(xiàn)SQLStatementContext只會影響結果集的合并而不會影響實際的執(zhí)行,而不分片表也無需進行結果集的合并,整體實現(xiàn)思路如圖。2vK28資訊網(wǎng)——每日最新資訊28at.com

圖片2vK28資訊網(wǎng)——每日最新資訊28at.com

ExecutionContext相關對象2vK28資訊網(wǎng)——每日最新資訊28at.com

public class ExecutionContext {     private final SQLStatementContext sqlStatementContext;     private final Collection<ExecutionUnit> executionUnits = new LinkedHashSet<>();} public final class ExecutionUnit {         private final String dataSourceName;         private final SQLUnit sqlUnit;} public final class SQLUnit {     private String sql;     private final List<Object> parameters; }

(1)校驗SQL中是否包含分片表:我們是通過正則將SQL中的各個單詞分隔成Set,然后再遍歷BaseRule判斷是否存在分片表。大家可能會奇怪明明解析引擎可以幫我們解析出SQL中的表名,為什么還要自己來解析。因為我們測試的過程中發(fā)現(xiàn),存量業(yè)務上的SQL很多在解析階段就會報錯,只能提前判斷,當然這種判斷方式并不嚴謹,比如 SELECT order_id FROM t_order_record WHERE order_id=1 AND remarks=' t_order xxx';,配置的分片表t_order時就會存在誤判,但這種場景在我們的業(yè)務中沒有,所以暫時并沒有處理。由于這個信息需要在多個對象方法中使用,為了避免修改大量的對象變量和方法入?yún)ⅲ帜芊奖愕耐競鬟@個信息,判斷的結果我們選擇放在ThreadLocal里。2vK28資訊網(wǎng)——每日最新資訊28at.com

RuleContextManager2vK28資訊網(wǎng)——每日最新資訊28at.com

public final class RuleContextManager {     private static final ThreadLocal<RuleContextManager> SKIP_CONTEXT_HOLDER = ThreadLocal.withInitial(RuleContextManager::new);     /**     * 是否跳過sharding     */    private boolean skipSharding;     /**     * 是否路由至主庫     */    private boolean masterRoute;     public static boolean isSkipSharding() {        return SKIP_CONTEXT_HOLDER.get().skipSharding;    }     public static void setSkipSharding(boolean skipSharding) {        SKIP_CONTEXT_HOLDER.get().skipSharding = skipSharding;    }     public static boolean isMasterRoute() {         return SKIP_CONTEXT_HOLDER.get().masterRoute;    }     public static void setMasterRoute(boolean masterRoute) {        SKIP_CONTEXT_HOLDER.get().masterRoute = masterRoute;    }     public static void clear(){        SKIP_CONTEXT_HOLDER.remove();    } }

判斷SQL是否包含分片表2vK28資訊網(wǎng)——每日最新資訊28at.com

org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#buildSkipContext// 判斷是否可以跳過sharding,構造RuleContextManager的值private void buildSkipContext(final String sql){    Set<String> sqlTokenSet = new HashSet<>(Arrays.asList(sql.split("[//s]")));        if (CollectionUtils.isNotEmpty(rules)) {            for (BaseRule baseRule : rules) {                //定制方法,ShardingRule實現(xiàn),判斷sqlTokenSet是否包含邏輯表即可                if(baseRule.hasContainShardingTable(sqlTokenSet)){                    RuleContextManager.setSkipSharding(false);                    break;                }else {                    RuleContextManager.setSkipSharding(true);                }            }        }} org.apache.shardingsphere.core.rule.ShardingRule#hasContainShardingTablepublic Boolean hasContainShardingTable(Set<String> sqlTokenSet) {      //logicTableNameList通過遍歷TableRule可以得到       for (String logicTable : logicTableNameList) {            if (sqlTokenSet.contains(logicTable)) {                return true;            }        }        return false;    }

(2)跳過解析路由:通過RuleContextManager中的skipSharding判斷是否需要跳過Sharding解析路由,但為了兼容讀寫分離的場景,我們還需要知道這條SQL應該走主庫還是從庫,走主庫的場景在后面強制路由主庫部分有說明,SQL走主庫實際上只有兩種情況,一種是非SELECT語句,另一種就是SELECT語句帶鎖,如SELECT...FOR UPDATE,因此整體實現(xiàn)的步驟如下:2vK28資訊網(wǎng)——每日最新資訊28at.com

  • 如果標記了跳過Sharding且不為select語句,直接返回SkipShardingStatement,單獨構造一個SkipShardingStatement的目的是為了能利用解析引擎中的緩存,緩存中不能放入null值。
  • 如果是select語句需要繼續(xù)解析,判斷是否有鎖后直接返回,避免后續(xù)解析造成語法不兼容,這里也曾嘗試用反射獲取lockClause來判斷是否包含鎖,但最終沒有成功。
  • ShardingRouteDecorator根據(jù)

   RuleContextManager.isSkipSharding判斷是否跳過路由。2vK28資訊網(wǎng)——每日最新資訊28at.com

跳過解析路由2vK28資訊網(wǎng)——每日最新資訊28at.com

public class SkipShardingStatement implements SQLStatement{    @Override    public int getParameterCount() {        return 0;    }} org.apache.shardingsphere.sql.parser.SQLParserEngine#parse0    private SQLStatement parse0(final String sql, final boolean useCache) {        if (useCache) {            Optional<SQLStatement> cachedSQLStatement = cache.getSQLStatement(sql);            if (cachedSQLStatement.isPresent()) {                return cachedSQLStatement.get();            }        }        ParseTree parseTree = new SQLParserExecutor(databaseTypeName, sql).execute().getRootNode();        /**         * 跳過sharding 需要判斷是否需要路由至主庫 如果不是select語句直接跳過         * 是select語句則需要通過繼續(xù)解析判斷是否有鎖         */        SQLStatement result ;        if(RuleContextManager.isSkipSharding()&&!VisitorRule.SELECT.equals(VisitorRule.valueOf(parseTree.getClass()))){            RuleContextManager.setMasterRoute(true);            result = new SkipShardingStatement();        }else {            result = (SQLStatement) ParseTreeVisitorFactory.newInstance(databaseTypeName, VisitorRule.valueOf(parseTree.getClass())).visit(parseTree);        }        if (useCache) {            cache.put(sql, result);        }        return result;    } org.apache.shardingsphere.sql.parser.mysql.visitor.impl.MySQLDMLVisitor#visitSelectClause    public ASTNode visitSelectClause(final SelectClauseContext ctx) {        SelectStatement result = new SelectStatement();        // 跳過sharding 只需要判斷是否有鎖來決定是否路由至主庫即可        if(RuleContextManager.isSkipSharding()){            if (null != ctx.lockClause()) {                result.setLock((LockSegment) visit(ctx.lockClause()));                RuleContextManager.setMasterRoute(true);            }            return result;        }        //...后續(xù)解析    } org.apache.shardingsphere.underlying.route.DataNodeRouter#createRouteContext    private RouteContext createRouteContext(final String sql, final List<Object> parameters, final boolean useCache) {        SQLStatement sqlStatement = parserEngine.parse(sql, useCache);        //如果需要跳過sharding 不進行后續(xù)的解析直接返回        if (RuleContextManager.isSkipSharding()) {            return new RouteContext(sqlStatement, parameters, new RouteResult());        }        //...解析    } org.apache.shardingsphere.sharding.route.engine.ShardingRouteDecorator#decorate    public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final ShardingRule shardingRule, final ConfigurationProperties properties) {        // 跳過sharding路由        if(RuleContextManager.isSkipSharding()){            return routeContext;        }        //...路由    }

(3)手動構造ExecutionUnit:ExecutionUnit中我們需要確定的內(nèi)容就是datasourceName,這里我們認為跳過Sharding的SQL最終執(zhí)行的庫一定只有一個。如果只是跳過Sharding的情況,直接從元數(shù)據(jù)中獲取數(shù)據(jù)源名稱即可,如果存在讀寫分離的情況,主從路由的結果也一定是唯一的。創(chuàng)建完ExecutionUnit直接放入ExecutionContext返回即可,從而跳過后續(xù)的改寫邏輯。2vK28資訊網(wǎng)——每日最新資訊28at.com

手動構造ExecutionUnit2vK28資訊網(wǎng)——每日最新資訊28at.com

public ExecutionContext prepare(final String sql, final List<Object> parameters) {    List<Object> clonedParameters = cloneParameters(parameters);    // 判斷是否可以跳過sharding,構造RuleContextManager的值    buildSkipContext(sql);      RouteContext routeContext = executeRoute(sql, clonedParameters);    ExecutionContext result = new ExecutionContext(routeContext.getSqlStatementContext());    // 跳過sharding的sql最后的路由結果一定只有一個庫    if(RuleContextManager.isSkipSharding()){        log.debug("可以跳過sharding的場景 {}", sql);        if(!Objects.isNull(routeContext.getRouteResult())){            Collection<String> allInstanceDataSourceNames = this.metaData.getDataSources().getAllInstanceDataSourceNames();            int routeUnitsSize = routeContext.getRouteResult().getRouteUnits().size();            /*             * 1. 沒有讀寫分離的情況下  跳過sharding路由會導致routeUnitsSize為0 此時需要判斷數(shù)據(jù)源數(shù)量是否為1             * 2. 讀寫分離情況下 只會路由至具體的主庫或從庫 routeUnitsSize數(shù)量應該為1             */            if(!(routeUnitsSize == 0 && allInstanceDataSourceNames.size()==1)|| routeUnitsSize>1){                throw new ShardingSphereException("可以跳過sharding,但是路由結果不唯一,SQL= %s ,routeUnits= %s ",sql, routeContext.getRouteResult().getRouteUnits());            }            Collection<String> actualDataSourceNames = routeContext.getRouteResult().getActualDataSourceNames();            // 手動創(chuàng)建執(zhí)行單元            String datasourceName = CollectionUtils.isEmpty(actualDataSourceNames)? allInstanceDataSourceNames.iterator().next():actualDataSourceNames.iterator().next();            ExecutionUnit executionUnit = new ExecutionUnit(datasourceName, new SQLUnit(sql, clonedParameters));            result.getExecutionUnits().add(executionUnit);            //標記該結果需要跳過            result.setSkipShardingScenarioFlag(true);        }    }else {        result.getExecutionUnits().addAll(executeRewrite(sql, clonedParameters, routeContext));    }    if (properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW)) {        SQLLogger.logSQL(sql, properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SIMPLE), result.getSqlStatementContext(), result.getExecutionUnits());    }    return result;}

(4)跳過合并:跳過查詢結果的合并和影響行數(shù)計算的合并,注意ShardingPreparedStatement和ShardingStatement都需要跳過2vK28資訊網(wǎng)——每日最新資訊28at.com

跳過合并2vK28資訊網(wǎng)——每日最新資訊28at.com

org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#executeQuery    public ResultSet executeQuery() throws SQLException {        ResultSet result;        try {            clearPrevious();            prepare();            initPreparedStatementExecutor();            List<QueryResult> queryResults = preparedStatementExecutor.executeQuery();            List<ResultSet> resultSets = preparedStatementExecutor.getResultSets();        // 定制開發(fā),不分片跳過合并            if(executionContext.isSkipShardingScenarioFlag()){                return CollectionUtils.isNotEmpty(resultSets) ? resultSets.get(0) : null;            }            MergedResult mergedResult = mergeQuery(queryResults);            result = new ShardingResultSet(resultSets, mergedResult, this, executionContext);        } finally {            clearBatch();        }        currentResultSet = result;        return result;    }org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#getResultSet    public ResultSet getResultSet() throws SQLException {        if (null != currentResultSet) {            return currentResultSet;        }        List<ResultSet> resultSets = getResultSets();        // 定制開發(fā),不分片跳過合并        if(executionContext.isSkipShardingScenarioFlag()){            return CollectionUtils.isNotEmpty(resultSets) ? resultSets.get(0) : null;        }         if (executionContext.getSqlStatementContext() instanceof SelectStatementContext || executionContext.getSqlStatementContext().getSqlStatement() instanceof DALStatement) {            MergedResult mergedResult = mergeQuery(getQueryResults(resultSets));            currentResultSet = new ShardingResultSet(resultSets, mergedResult, this, executionContext);        }        return currentResultSet;    }org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#isAccumulate    public boolean isAccumulate() {        //定制開發(fā),不分片跳過計算        if(executionContext.isSkipShardingScenarioFlag()){            return false;        }        return !connection.getRuntimeContext().getRule().isAllBroadcastTables(executionContext.getSqlStatementContext().getTablesContext().getTableNames());    }

(5)清空RuleContextManager:查看一下Sharding-JDBC其他ThreadLocal的清空位置,對應的清空RuleContextManager就好。2vK28資訊網(wǎng)——每日最新資訊28at.com

清空ThreadLocal2vK28資訊網(wǎng)——每日最新資訊28at.com

org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractConnectionAdapter#closepublic final void close() throws SQLException {        closed = true;        MasterVisitedManager.clear();        TransactionTypeHolder.clear();        RuleContextManager.clear();        int connectionSize = cachedConnections.size();        try {            forceExecuteTemplateForClose.execute(cachedConnections.entries(), cachedConnections -> cachedConnections.getValue().close());        } finally {            cachedConnections.clear();            rootInvokeHook.finish(connectionSize);        }    }

舉個例子,比如Sharding-JDBC本身是不支持INSERT INTO tbl_name (col1, col2, …) SELECT col1, col2, … FROM tbl_name WHERE col3 = ?    這種語法的,會報空指針異常。2vK28資訊網(wǎng)——每日最新資訊28at.com

圖片2vK28資訊網(wǎng)——每日最新資訊28at.com

經(jīng)過我們上述改造驗證后,非分片表是可以跳過語法限制執(zhí)行如下的SQL的。2vK28資訊網(wǎng)——每日最新資訊28at.com

圖片2vK28資訊網(wǎng)——每日最新資訊28at.com

通過該功能的實現(xiàn),業(yè)務可以更關注與分片表的SQL改造,而無需擔心引入Sharding-JDBC造成所有SQL的驗證改造,大幅減少改造成本和風險。2vK28資訊網(wǎng)——每日最新資訊28at.com

4.2 強制路由主庫

Sharding-JDBC可以通過配置主從庫數(shù)據(jù)源方便的實現(xiàn)讀寫分離的功能,但使用讀寫分離就必須面對主從延遲和從庫失聯(lián)的痛點,針對這一問題,我們實現(xiàn)了強制路由主庫的動態(tài)配置,當主從延遲過大或從庫失聯(lián)時,通過修改配置來實現(xiàn)SQL語句強制走主庫的不停機路由切換。2vK28資訊網(wǎng)——每日最新資訊28at.com

后面會說明了配置的動態(tài)生效的實現(xiàn)方式,這里只說明強制路由主庫的實現(xiàn),我們直接使用前文的RuleContextManager即可,在主從路由引擎里判斷下是否開啟了強制主庫路由。2vK28資訊網(wǎng)——每日最新資訊28at.com

MasterSlaveRouteDecorator.decorate改造2vK28資訊網(wǎng)——每日最新資訊28at.com

org.apache.shardingsphere.masterslave.route.engine.MasterSlaveRouteDecorator#decorate    public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final MasterSlaveRule masterSlaveRule, final ConfigurationProperties properties) {        /**         * 如果配置了強制主庫 MasterVisitedManager設置為true         * 后續(xù)isMasterRoute中會保證路由至主庫         */        if(properties.<Boolean>getValue(ConfigurationPropertyKey.MASTER_ROUTE_ONLY)){            MasterVisitedManager.setMasterVisited();        }        //...路由邏輯        return routeContext;    }

為了兼容之前跳過Sharding的功能,我們需要同步修改下isMasterRoute方法,如果是跳過了Sharding路由需要通過RuleContextManager來判斷是否走主庫。2vK28資訊網(wǎng)——每日最新資訊28at.com

isMasterRoute改造2vK28資訊網(wǎng)——每日最新資訊28at.com

org.apache.shardingsphere.masterslave.route.engine.impl.MasterSlaveDataSourceRouter#isMasterRoute    private boolean isMasterRoute(final SQLStatement sqlStatement) {        if(sqlStatement instanceof SkipShardingStatement){            // 優(yōu)先以MasterVisitedManager中的值為準            return MasterVisitedManager.isMasterVisited()|| RuleContextManager.isMasterRoute();        }        return containsLockSegment(sqlStatement) || !(sqlStatement instanceof SelectStatement) || MasterVisitedManager.isMasterVisited() || HintManager.isMasterRouteOnly();    }

當然,更理想的狀況是通過監(jiān)控主從同步延遲和數(shù)據(jù)庫撥測,當超過閾值時或從庫失聯(lián)時直接自動修改配置中心的庫,實現(xiàn)自動切換主庫,減少業(yè)務故障時間和運維壓力。2vK28資訊網(wǎng)——每日最新資訊28at.com

4.3 配置動態(tài)生效

Sharding-JDBC中的ConfigurationPropertyKey中提供了許多配置屬性,而Sharding-JDBCB并沒有為這些配置提供在線修改的方法,而在實際的應用場景中,像SQL_SHOW這樣控制SQL打印的開關配置,我們更希望能夠在線修改配置值來控制SQL日志的打印,而不是修改完配置再重啟服務。2vK28資訊網(wǎng)——每日最新資訊28at.com

以SQL打印為例,BasePrepareEngine中存在ConfigurationProperties對象,通過調(diào)用getValue方法來獲取SQL_SHOW的值。2vK28資訊網(wǎng)——每日最新資訊28at.com

SQL 打印2vK28資訊網(wǎng)——每日最新資訊28at.com

org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#prepare    /**     * Prepare to execute.     *     * @param sql SQL     * @param parameters SQL parameters     * @return execution context     */    public ExecutionContext prepare(final String sql, final List<Object> parameters) {        List<Object> clonedParameters = cloneParameters(parameters);        RouteContext routeContext = executeRoute(sql, clonedParameters);        ExecutionContext result = new ExecutionContext(routeContext.getSqlStatementContext());        result.getExecutionUnits().addAll(executeRewrite(sql, clonedParameters, routeContext));        //sql打印        if (properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW)) {            SQLLogger.logSQL(sql, properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SIMPLE), result.getSqlStatementContext(), result.getExecutionUnits());        }        return result;    }

ConfigurationProperties繼承了抽象類TypedProperties,其getValue方法就是根據(jù)key獲取對應的配置值,因此我們直接在TypedProperties中實現(xiàn)刷新緩存中的配置值的方法。2vK28資訊網(wǎng)——每日最新資訊28at.com

TypedProperties刷新配置2vK28資訊網(wǎng)——每日最新資訊28at.com

public abstract class TypedProperties<E extends Enum & TypedPropertyKey> {         private static final String LINE_SEPARATOR = System.getProperty("line.separator");         @Getter    private final Properties props;         private final Map<E, TypedPropertyValue> cache;         public TypedProperties(final Class<E> keyClass, final Properties props) {        this.props = props;        cache = preload(keyClass);    }         private Map<E, TypedPropertyValue> preload(final Class<E> keyClass) {        E[] enumConstants = keyClass.getEnumConstants();        Map<E, TypedPropertyValue> result = new HashMap<>(enumConstants.length, 1);        Collection<String> errorMessages = new LinkedList<>();        for (E each : enumConstants) {            TypedPropertyValue value = null;            try {                value = new TypedPropertyValue(each, props.getOrDefault(each.getKey(), each.getDefaultValue()).toString());            } catch (final TypedPropertyValueException ex) {                errorMessages.add(ex.getMessage());            }            result.put(each, value);        }        if (!errorMessages.isEmpty()) {            throw new ShardingSphereConfigurationException(Joiner.on(LINE_SEPARATOR).join(errorMessages));        }        return result;    }         /**     * Get property value.     *     * @param key property key     * @param <T> class type of return value     * @return property value     */    @SuppressWarnings("unchecked")    public <T> T getValue(final E key) {        return (T) cache.get(key).getValue();    }     /**     * vivo定制改造方法 refresh property value.     * @param key property key     * @param value property value     * @return 更新配置是否成功     */    public boolean refreshValue(String key, String value){        //獲取配置類支持的配置項        E[] enumConstants = targetKeyClass.getEnumConstants();        for (E each : enumConstants) {            //遍歷新的值            if(each.getKey().equals(key)){                try {                    //空白value認為無效,取默認值                    if(!StringUtils.isBlank(value)){                        value = each.getDefaultValue();                    }                    //構造新屬性                    TypedPropertyValue typedPropertyValue = new TypedPropertyValue(each, value);                    //替換緩存                    cache.put(each, typedPropertyValue);                    //原始屬性也替換下,有可能會通過RuntimeContext直接獲取Properties                    props.put(key,value);                    return true;                } catch (final TypedPropertyValueException ex) {                    log.error("refreshValue error. key={} , value={}", key, value, ex);                }            }        }        return false;    }}

實現(xiàn)了刷新方法后,我們還需要將該方法一步步暴露至一個外部可以調(diào)用的類中,以便在服務監(jiān)聽配置的方法中,能夠調(diào)用這個刷新方法。ConfigurationProperties直接在asePrepareEngine的構造函數(shù)中傳入,我們通過構造函數(shù)逐步反推最外層的這一對象調(diào)用來源,最終可以定位到在AbstractDataSourceAdapter中的getRuntimeContext()方法中可以獲取到這個配置,而這個就是Sharding-JDBC實現(xiàn)的JDBC中Datasource接口的抽象類,我們直接在這個類中調(diào)用剛剛實現(xiàn)的refreshValue方法,剩下的就是監(jiān)聽配置,通過自己實現(xiàn)的AbstractDataSourceAdapter來調(diào)用這個方法就好了。2vK28資訊網(wǎng)——每日最新資訊28at.com

圖片2vK28資訊網(wǎng)——每日最新資訊28at.com

通過這一功能,我們可以方便的控制一些開關屬性的在線修改,如SQL打印、強制路由主庫等,業(yè)務無需重啟服務即可做到配置的動態(tài)生效。2vK28資訊網(wǎng)——每日最新資訊28at.com

4.4 批量update語法支持

業(yè)務中存在使用foreach標簽來批量update的語句,這種SQL在Sharding-JDBC中無法被正確路由,只會路由第一組參數(shù),后面的無法被路由改寫,原因是解析引擎無法將語句拆分解析。2vK28資訊網(wǎng)——每日最新資訊28at.com

批量update樣例2vK28資訊網(wǎng)——每日最新資訊28at.com

<update id="batchUpdate">        <foreach collectinotallow="orderList" item="item">               update t_order set               status = 1,               updated_by = #{item.updatedBy}               WHERE created_by = #{item.createdBy};        </foreach>    </update>

圖片2vK28資訊網(wǎng)——每日最新資訊28at.com

圖片2vK28資訊網(wǎng)——每日最新資訊28at.com

我們通過將批量update按照;拆分為多個語句,然后分別路由,最后手動匯總路有結果生成執(zhí)行單元。2vK28資訊網(wǎng)——每日最新資訊28at.com

為了能正確重寫SQL,批量update拆分后的語句需要完全一樣,這樣就不能使用動態(tài)拼接set條件,而是使用ifnull語法或者字段值不發(fā)生變化時也將原來的值放入set中,只不過set前后的值保持一致,整體思路與實現(xiàn)如下。2vK28資訊網(wǎng)——每日最新資訊28at.com

圖片2vK28資訊網(wǎng)——每日最新資訊28at.com

prepareBatch實現(xiàn)2vK28資訊網(wǎng)——每日最新資訊28at.com

org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#prepareBatch   private ExecutionContext prepareBatch(List<String> splitSqlList, final List<Object> allParameters) {       //SQL去重       List<String> sqlList = splitSqlList.stream().distinct().collect(Collectors.toList());       if (sqlList.size() > 1) {           throw new ShardingSphereException("不支持多條SQL,請檢查SQL," + sqlList.toString());       }       //以第一條SQL為標準       String sql = sqlList.get(0);       //所有的執(zhí)行單元       Collection<ExecutionUnit> globalExecutionUnitList = new ArrayList<>();       //初始化最后的執(zhí)行結果       ExecutionContext executionContextResult = null;       //根據(jù)所有參數(shù)數(shù)量和SQL語句數(shù)量 計算每組參數(shù)的數(shù)量       int eachSqlParameterCount = allParameters.size() / splitSqlList.size();       //平均分配每條SQL的參數(shù)       List<List<Object>> eachSqlParameterListList = Lists.partition(allParameters, eachSqlParameterCount);       for (List<Object> eachSqlParameterList : eachSqlParameterListList) {           //每條SQL參數(shù)不同 需要根據(jù)參數(shù)路由不同的結果  實際的SqlStatementContext 是一致的           RouteContext routeContext = executeRoute(sql, eachSqlParameterList);           //由于SQL一樣  實際的SqlStatementContext 是一致的 只需初始化一次           if (executionContextResult == null) {               executionContextResult = new ExecutionContext(routeContext.getSqlStatementContext());           }           globalExecutionUnitList.addAll(executeRewrite(sql, eachSqlParameterList, routeContext));       }       //排序打印日志       executionContextResult.getExtendMap().put(EXECUTION_UNIT_LIST, globalExecutionUnitList.stream().sorted(Comparator.comparing(ExecutionUnit::getDataSourceName)).collect(Collectors.toList()));       if (properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW)) {           SQLLogger.logSQL(sql, properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SIMPLE),                   executionContextResult.getSqlStatementContext(), (Collection<ExecutionUnit>) executionContextResult.getExtendMap().get(EXECUTION_UNIT_LIST));       }       return executionContextResult;   }

這里我們在ExecutionContext單獨構造了一個了ExtendMap來存放ExecutionUnit,原因是ExecutionContext中的executionUnits是HashSet,而判斷ExecutionUnit中的SqlUnit只會根據(jù)SQL去重,批量update的SQL是一致的,但parameters不同,為了不影響原有的邏輯,單獨使用了另外的變量來存放。2vK28資訊網(wǎng)——每日最新資訊28at.com

ExecutionContext改造2vK28資訊網(wǎng)——每日最新資訊28at.com

@RequiredArgsConstructor@Getterpublic class ExecutionContext {     private final SQLStatementContext sqlStatementContext;     private final Collection<ExecutionUnit> executionUnits = new LinkedHashSet<>();     /**     * 自定義擴展變量     */    private final Map<ExtendEnum,Object> extendMap = new HashMap<>();     /**     * 定制擴展,是否可以跳過分片邏輯     */    @Setter    private boolean skipShardingScenarioFlag = false;} @RequiredArgsConstructor@Getter@EqualsAndHashCode@ToStringpublic final class ExecutionUnit {         private final String dataSourceName;         private final SQLUnit sqlUnit;} @AllArgsConstructor@RequiredArgsConstructor@Getter@Setter//根據(jù)SQL判斷是否相等@EqualsAndHashCode(of = { "sql" })@ToStringpublic final class SQLUnit {     private String sql;     private final List<Object> parameters; }

我們還需要改造下執(zhí)行方法,在初始化執(zhí)行器的時候,判斷下ExtendMap中存在我們自定義的EXECUTION_UNIT_LIST是否存在,存在則使用生成InputGroup,同一個數(shù)據(jù)源下的ExecutionUnit會被放入同一個InputGroup中。2vK28資訊網(wǎng)——每日最新資訊28at.com

InputGroup改造2vK28資訊網(wǎng)——每日最新資訊28at.com

org.apache.shardingsphere.shardingjdbc.executor.PreparedStatementExecutor#init    public void init(final ExecutionContext executionContext) throws SQLException {        setSqlStatementContext(executionContext.getSqlStatementContext());        //兼容批量update 分庫分表后同一張表的情況 判斷是否存在EXECUTION_UNIT_LIST 存在則使用未去重的List進行后續(xù)的操作        if (MapUtils.isNotEmpty(executionContext.getExtendMap())){            Collection<ExecutionUnit> executionUnitCollection = (Collection<ExecutionUnit>) executionContext.getExtendMap().get(EXECUTION_UNIT_LIST);            if(CollectionUtils.isNotEmpty(executionUnitCollection)){                getInputGroups().addAll(obtainExecuteGroups(executionUnitCollection));            }        }else {            getInputGroups().addAll(obtainExecuteGroups(executionContext.getExecutionUnits()));        }        cacheStatements();    }

改造完成后,批量update中的每條SQL都可以被正確路由執(zhí)行。2vK28資訊網(wǎng)——每日最新資訊28at.com

圖片2vK28資訊網(wǎng)——每日最新資訊28at.com

4.5 ShardingCondition去重

當where語句包括多個or條件時,而or條件不包含分片鍵時,會造成createShardingConditions方法生成重復的分片條件,導致重復調(diào)用doSharding方法。2vK28資訊網(wǎng)——每日最新資訊28at.com

SELECT * FROM t_order  WHERE created_by = ? and (   (status = ?) or  (status = ?) or  (status = ?) )這種SQL,存在三個or條件,分片鍵是created_by ,實際產(chǎn)生的shardingCondition會是三個一樣的值,并會調(diào)用三次doSharding的方法。雖然實際執(zhí)行還是只有一次(批量update那里說明過執(zhí)行單元會去重),但為了減少方法的重復調(diào)用,我們還是對這里做了一次去重。2vK28資訊網(wǎng)——每日最新資訊28at.com

圖片2vK28資訊網(wǎng)——每日最新資訊28at.com

圖片2vK28資訊網(wǎng)——每日最新資訊28at.com

去重的方法也比較簡單粗暴,我們對ListRouteValue和RangeRouteValue添加了@EqualsAndHashCode注解,然后在WhereClauseShardingConditionEngine的createShardingConditions方法返回最終結果前加一次去重,從而避免生成重復的shardingCondition造成doSharding方法的重復調(diào)用。2vK28資訊網(wǎng)——每日最新資訊28at.com

createShardingConditions去重2vK28資訊網(wǎng)——每日最新資訊28at.com

org.apache.shardingsphere.sharding.route.engine.condition.engine.WhereClauseShardingConditionEngine#createShardingConditions    private Collection<ShardingCondition> createShardingConditions(final SQLStatementContext sqlStatementContext, final Collection<AndPredicate> andPredicates, final List<Object> parameters) {        Collection<ShardingCondition> result = new LinkedList<>();        for (AndPredicate each : andPredicates) {            Map<Column, Collection<RouteValue>> routeValueMap = createRouteValueMap(sqlStatementContext, each, parameters);            if (routeValueMap.isEmpty()) {                return Collections.emptyList();            }            result.add(createShardingCondition(routeValueMap));        }        //去重        Collection<ShardingCondition> distinctResult = result.stream().distinct().collect(Collectors.toCollection(LinkedList::new));        return distinctResult;    }
4.6  全路由校驗

分片表的SQL中如果沒有攜帶分片鍵(或者帶上了分片鍵結果沒有被正確解析)將會導致全路由,產(chǎn)生性能問題,而這種SQL并不會報錯,這就導致在實際的業(yè)務改造中,開發(fā)和測試很難保證百分百改造徹底。為此,我們在源碼層面對這種情況做了額外的校驗,當產(chǎn)生全路由,也就是ShardingConditions為空時,主動拋出異常,從而方便開發(fā)和測試能夠快速發(fā)現(xiàn)全路由SQL。2vK28資訊網(wǎng)——每日最新資訊28at.com

實現(xiàn)方式也比較簡單,校驗下ShardingConditions是否為空即可,只不過需要額外兼容下Hint策略ShardingConditions始終為空的特殊情況。2vK28資訊網(wǎng)——每日最新資訊28at.com

全路由校驗2vK28資訊網(wǎng)——每日最新資訊28at.com

org.apache.shardingsphere.sharding.route.engine.ShardingRouteDecorator#decoratepublic RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final ShardingRule shardingRule, final ConfigurationProperties properties) {        //省略...        //獲取 ShardingConditions        ShardingConditions shardingConditions = getShardingConditions(parameters, sqlStatementContext, metaData.getSchema(), shardingRule);        boolean hintAlgorithm = isHintAlgorithm(sqlStatementContext, shardingRule);        //判斷是否允許全路由        if (!properties.<Boolean>getValue(ConfigurationPropertyKey.ALLOW_EMPTY_SHARDING_CONDITIONS)) {            //如果不是Hint算法            if(!isHintAlgorithm(sqlStatementContext, shardingRule)){                /** 如果是DML語句  則可能有兩種情況 這兩種情況是根據(jù)getShardingConditions方法的內(nèi)部邏輯而來的                 *  一種是非插入語句  shardingConditions.getConditions()為空即可                 *  一種是插入語句 插入語句shardingConditions.getConditions()不會為空  但是ShardingCondition的routeValues是空的                 */                if (sqlStatementContext.getSqlStatement() instanceof DMLStatement) {                    if(shardingConditions.getConditions().isEmpty()) {                        throw new ShardingSphereException("SQL不包含分庫分表鍵,請檢查SQL");                    }else {                        if (sqlStatementContext instanceof InsertStatementContext) {                            List<ShardingCondition> routeValuesNotEmpty = shardingConditions.getConditions().stream().filter(r -> CollectionUtils.isNotEmpty(r.getRouteValues())).collect(Collectors.toList());                            if(CollectionUtils.isEmpty(routeValuesNotEmpty)){                                throw new ShardingSphereException("SQL不包含分庫分表鍵,請檢查SQL");                            }                        }                    }                }            }        }        boolean needMergeShardingValues = isNeedMergeShardingValues(sqlStatementContext, shardingRule);        //省略...        return new RouteContext(sqlStatementContext, parameters, routeResult);    } private boolean isHintAlgorithm(final SQLStatementContext sqlStatementContext, final ShardingRule shardingRule) {        // 場景a 全局默認策略是否使用強制路由策略        if(shardingRule.getDefaultDatabaseShardingStrategy() instanceof HintShardingStrategy                || shardingRule.getDefaultTableShardingStrategy() instanceof HintShardingStrategy){            return true;        }        for (String each : sqlStatementContext.getTablesContext().getTableNames()) {            Optional<TableRule> tableRule = shardingRule.findTableRule(each);            //場景b 指定表是否使用強制路由策略            if (tableRule.isPresent() && (shardingRule.getDatabaseShardingStrategy(tableRule.get()) instanceof HintShardingStrategy                    || shardingRule.getTableShardingStrategy(tableRule.get()) instanceof HintShardingStrategy)) {                return true;            }        }        return false;    }

當然這塊功能也可以在完善些,比如對分片路由結果中的數(shù)據(jù)源數(shù)量進行校驗,從而避免跨庫操作,我們這邊沒有實現(xiàn)也就不再贅述了。2vK28資訊網(wǎng)——每日最新資訊28at.com

4.7 組件封裝

業(yè)務接入Sharding-JDBC的步驟是一樣的,都需要通過Java創(chuàng)建數(shù)據(jù)源和配置對象或者使用SpringBoot進行配置,存在一定的熟悉成本和重復開發(fā)的問題,為此我們也對定制開發(fā)版本的Sharding-JDBC封裝了一個公共組件,從而簡化業(yè)務配置,減少重復開發(fā),提升業(yè)務的開發(fā)效率,具體功能可見下。這塊沒有涉及源碼的改造,只是在定制版本上包裝的一個公共組件。2vK28資訊網(wǎng)——每日最新資訊28at.com

  • 提供了默認的數(shù)據(jù)源與連接池配置
  • 簡化分庫分表配置,業(yè)務配置邏輯表名和后綴,組件拼裝行表達式和actual-data-nodes
  • 封裝常用的分片算法(時間、業(yè)務字段值等),
  • 統(tǒng)一的配置監(jiān)聽與動態(tài)修改(SQL打印、強制主從切換等)

開源Sharding-JDBC配置2vK28資訊網(wǎng)——每日最新資訊28at.com

//數(shù)據(jù)源名稱spring.shardingsphere.datasource.names=ds0,ds1//ds0配置spring.shardingsphere.datasource.ds0.type=org.apache.commons.dbcp.BasicDataSourcespring.shardingsphere.datasource.ds0.driver-class-name=com.mysql.jdbc.Driverspring.shardingsphere.datasource.ds0.url=jdbc:mysql://localhost:3306/ds0spring.shardingsphere.datasource.ds0.username=rootspring.shardingsphere.datasource.ds0.password=//ds1配置spring.shardingsphere.datasource.ds1.type=org.apache.commons.dbcp.BasicDataSourcespring.shardingsphere.datasource.ds1.driver-class-name=com.mysql.jdbc.Driverspring.shardingsphere.datasource.ds1.url=jdbc:mysql://localhost:3306/ds1spring.shardingsphere.datasource.ds1.username=rootspring.shardingsphere.datasource.ds1.password=//分表規(guī)則spring.shardingsphere.sharding.tables.t_order.actual-data-nodes=ds$->{0..1}.t_order$->{0..1}spring.shardingsphere.sharding.tables.t_order.table-strategy.inline.sharding-column=order_idspring.shardingsphere.sharding.tables.t_order.table-strategy.inline.algorithm-expressinotallow=t_order$->{order_id % 2}spring.shardingsphere.sharding.tables.t_order_item.actual-data-nodes=ds$->{0..1}.t_order_item$->{0..1}spring.shardingsphere.sharding.tables.t_order_item.table-strategy.inline.sharding-column=order_idspring.shardingsphere.sharding.tables.t_order_item.table-strategy.inline.algorithm-expressinotallow=t_order_item$->{order_id % 2}//默認分庫規(guī)則spring.shardingsphere.sharding.default-database-strategy.inline.sharding-column=user_idspring.shardingsphere.sharding.default-database-strategy.inline.algorithm-expressinotallow=ds$->{user_id % 2}

組件簡化配置2vK28資訊網(wǎng)——每日最新資訊28at.com

//數(shù)據(jù)源名稱vivo.it.sharding.datasource.names = ds0,ds1//ds0配置vivo.it.sharding.datasource.ds0.url = jdbc:mysql://localhost:3306/ds1vivo.it.sharding.datasource.ds0.username = rootvivo.it.sharding.datasource.ds0.password =//ds1配置vivo.it.sharding.datasource.ds1.url = jdbc:mysql://localhost:3306/ds1vivo.it.sharding.datasource.ds1.username = rootvivo.it.sharding.datasource.ds1.password =//分表規(guī)則vivo.it.sharding.table.rule.config = [{"logicTable":"t_order,t_order_item","tableRange":"0..1","shardingColumn":"order_id ","algorithmExpression":"order_id %2"}]//默認分庫規(guī)則vivo.it.sharding.default.db.rule.config = {"shardingColumn":"user_id","algorithmExpression":"user_id %2"}

五、使用建議

結合官方文檔和業(yè)務實踐經(jīng)驗,我們也梳理了部分使用Sharding-JDBC的建議供大家參考,實際具體如何優(yōu)化SQL寫法(比如子查詢、分頁、分組排序等)還需要結合業(yè)務的實際場景來進行測試和調(diào)優(yōu)。2vK28資訊網(wǎng)——每日最新資訊28at.com

(1)強制等級2vK28資訊網(wǎng)——每日最新資訊28at.com

建議①:涉及分片表的SQL必須攜帶分片鍵2vK28資訊網(wǎng)——每日最新資訊28at.com

原因:無分片鍵會導致全路由,存在嚴重的性能隱患2vK28資訊網(wǎng)——每日最新資訊28at.com

建議②:禁止一條SQL中的分片值路由至不同的庫2vK28資訊網(wǎng)——每日最新資訊28at.com

原因:跨庫操作存在嚴重的性能隱患,事務操作會升級為分布式事務,增加業(yè)務復雜度2vK28資訊網(wǎng)——每日最新資訊28at.com

建議③:禁止對分片鍵使用運算表達式或函數(shù)操作2vK28資訊網(wǎng)——每日最新資訊28at.com

原因:無法提前計算表達式和函數(shù)獲取分片值,導致全路由2vK28資訊網(wǎng)——每日最新資訊28at.com

說明:詳見官方文檔2vK28資訊網(wǎng)——每日最新資訊28at.com

圖片2vK28資訊網(wǎng)——每日最新資訊28at.com

建議④:禁止在子查詢中使用分片表2vK28資訊網(wǎng)——每日最新資訊28at.com

原因:無法正常解析子查詢中的分片表,導致業(yè)務錯誤2vK28資訊網(wǎng)——每日最新資訊28at.com

說明:雖然官方文檔中說有限支持子查詢 ,但在實際的使用中發(fā)現(xiàn)4.1.1并不支持子查詢,可見官方issue6164 | issue 62282vK28資訊網(wǎng)——每日最新資訊28at.com

圖片2vK28資訊網(wǎng)——每日最新資訊28at.com

建議⑤:包含CASE WHEN、HAVING、UNION (ALL)語法的分片SQL,不支持路由至多數(shù)據(jù)節(jié)點2vK28資訊網(wǎng)——每日最新資訊28at.com

說明:詳見官方文檔2vK28資訊網(wǎng)——每日最新資訊28at.com

(2)建議等級2vK28資訊網(wǎng)——每日最新資訊28at.com

① 建議使用分布式id來保證分片表主鍵的全局唯一性2vK28資訊網(wǎng)——每日最新資訊28at.com

原因:方便判斷數(shù)據(jù)的唯一性和后續(xù)的遷移擴容2vK28資訊網(wǎng)——每日最新資訊28at.com

說明:詳見文章《vivo 自研魯班分布式 ID 服務實踐》2vK28資訊網(wǎng)——每日最新資訊28at.com

② 建議跨多表的分組SQL的分組字段與排序字段保證一致2vK28資訊網(wǎng)——每日最新資訊28at.com

原因:分組和排序字段不一致只能通過內(nèi)存合并,大數(shù)據(jù)量時存在性能隱患2vK28資訊網(wǎng)——每日最新資訊28at.com

說明:詳見官方文檔2vK28資訊網(wǎng)——每日最新資訊28at.com

③ 建議通過全局遞增的分布式id來優(yōu)化分頁查詢2vK28資訊網(wǎng)——每日最新資訊28at.com

原因:Sharding-JDBC的分頁優(yōu)化側重于結果集的流式合并來避免內(nèi)存爆漲,但深度分頁自身的性能問題并不能解決2vK28資訊網(wǎng)——每日最新資訊28at.com

說明:詳見官方文檔2vK28資訊網(wǎng)——每日最新資訊28at.com

六、總結

本文結合個人理解梳理了各個引擎的源碼入口和關鍵邏輯,讀者可以結合本文和官方文檔更好的定位理解Sharding-JDBC的源碼實現(xiàn)。定制開發(fā)的目的是為了降低業(yè)務接入成本,盡可能減少業(yè)務存量SQL的改造,部分改造思想其實與官方社區(qū)也存在差異,比如跳過語法解析,官方社區(qū)致力于通過優(yōu)化解析引擎來適配各種語法,而不是跳過解析階段,可參考官方issue。源碼分析和定制改造只涉及了Sharding-JDBC的數(shù)據(jù)分片和讀寫分離功能,定制開發(fā)的功能也在生產(chǎn)環(huán)境經(jīng)過了考驗,如有不足和優(yōu)化建議,也歡迎大家批評指正。 2vK28資訊網(wǎng)——每日最新資訊28at.com

本文鏈接:http://www.www897cc.com/showinfo-26-76541-0.htmlSharding-JDBC源碼解析與vivo的定制開發(fā)

聲明:本網(wǎng)頁內(nèi)容旨在傳播知識,若有侵權等問題請及時與本網(wǎng)聯(lián)系,我們將在第一時間刪除處理。郵件:2376512515@qq.com

上一篇: Go 錯誤處理:用 select-case 來解決這個歷史難題?

下一篇: 用 Python 優(yōu)雅地玩轉 Elasticsearch:實用技巧與最佳實踐

標簽:
  • 熱門焦點
Top 主站蜘蛛池模板: 崇义县| 晴隆县| 宿州市| 呼玛县| 大厂| 自贡市| 乌拉特后旗| 前郭尔| 宁陵县| 疏勒县| 海盐县| 安宁市| 五峰| 东城区| 武城县| 夏河县| 鲜城| 东丰县| 巴南区| 垫江县| 滁州市| 政和县| 固阳县| 肥东县| 巢湖市| 隆化县| 当雄县| 高雄县| 德庆县| 鄂托克前旗| 伊春市| 临泽县| 饶平县| 逊克县| 比如县| 秀山| 莱阳市| 八宿县| 武清区| 郁南县| 安岳县|