本文源碼基于Sharding-JDBC 4.1.1版本。
隨著業(yè)務并發(fā)請求和數(shù)據(jù)規(guī)模的不斷擴大,單節(jié)點庫表壓力往往會成為系統(tǒng)的性能瓶頸。公司IT內(nèi)部營銷庫存、交易訂單、財經(jīng)臺賬、考勤記錄等多領域的業(yè)務場景的日增數(shù)據(jù)量巨大,存在著數(shù)據(jù)庫節(jié)點壓力過大、連接過多、查詢速度變慢等情況,根據(jù)數(shù)據(jù)來源、時間、工號等信息來將沒有聯(lián)系的數(shù)據(jù)盡量均分到不同的庫表中,從而在不影響業(yè)務需求的前提下,減輕數(shù)據(jù)庫節(jié)點壓力,提升查詢效率和系統(tǒng)穩(wěn)定性。
我們對比了幾款比較常見的支持分庫分表和讀寫分離的中間件。
Sharding-JDBC作為輕量化的增強版的JDBC框架,相較其他中間件性能更好,接入難度更低,其數(shù)據(jù)分片、讀寫分離功能也覆蓋了我們的業(yè)務訴求,因此我們在業(yè)務中廣泛使用了Sharding-JDBC。但在使用Sharding-JDBC的過程中,我們也發(fā)現(xiàn)了諸多問題,為了業(yè)務更便捷的使用Sharding-JDBC,我們對源碼做了針對性的定制開發(fā)和組件封裝來滿足業(yè)務需求。
Sharding-JDBC作為基于JDBC的數(shù)據(jù)庫中間件,實現(xiàn)了JDBC的標準api,Sharding-JDBC與原生JDBC的執(zhí)行對比流程如下圖所示:
相關執(zhí)行流程的代碼樣例如下:
JDBC執(zhí)行樣例
//獲取數(shù)據(jù)庫連接try (Connection conn = DriverManager.getConnection("mysqlUrl", "userName", "password")) { String sql = "SELECT * FROM t_user WHERE name = ?"; //預編譯SQL try (PreparedStatement preparedStatement = conn.prepareStatement(sql)) { //參數(shù)設置與執(zhí)行 preparedStatement.setString(1, "vivo"); preparedStatement.execute(sql); //獲取結果集 try (ResultSet resultSet = preparedStatement.getResultSet()) { while (resultSet.next()) { //處理結果 } } }}
Sharding-JDBC 源碼
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement#execute public boolean execute() throws SQLException { try { clearPrevious(); //解析+路由+重寫 內(nèi)部調(diào)用BasePrepareEngine#prepare方法 prepare(); initPreparedStatementExecutor(); //執(zhí)行 return preparedStatementExecutor.execute(); } finally { clearBatch(); } } org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#prepare public ExecutionContext prepare(final String sql, final List<Object> parameters) { List<Object> clonedParameters = cloneParameters(parameters); //解析+路由(executeRoute內(nèi)部先進行解析再執(zhí)行路由) RouteContext routeContext = executeRoute(sql, clonedParameters); ExecutionContext result = new ExecutionContext(routeContext.getSqlStatementContext()); //重寫 result.getExecutionUnits().addAll(executeRewrite(sql, clonedParameters, routeContext)); if (properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW)) { SQLLogger.logSQL(sql, properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SIMPLE), result.getSqlStatementContext(), result.getExecutionUnits()); } return result; } org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#getResultSet public ResultSet getResultSet() throws SQLException { if (null != currentResultSet) { return currentResultSet; } if (executionContext.getSqlStatementContext() instanceof SelectStatementContext || executionContext.getSqlStatementContext().getSqlStatement() instanceof DALStatement) { List<ResultSet> resultSets = getResultSets(); //歸并結果集 MergedResult mergedResult = mergeQuery(getQueryResults(resultSets)); currentResultSet = new ShardingResultSet(resultSets, mergedResult, this, executionContext); } return currentResultSet; }
從對比的執(zhí)行流程圖可見:
//*相關引擎的源碼解析在下文會作更深入的闡述。
3.2.1 引擎解析
解析引擎是Sharding-JDBC進行分庫分表邏輯的基礎,其作用是將SQL拆解為不可再分的原子符號(稱為token),再根據(jù)數(shù)據(jù)庫類型將這些token分類成關鍵字、表達式、操作符、字面量等不同類型,進而生成抽象語法樹,而語法樹是后續(xù)進行路由、改寫操作的前提(這也正是語法樹的存在使得Sharding-JDBC存在各式各樣的語法限制的原因之一)。
▲圖片來源:ShardingSphere 官方文檔
4.x的版本采用ANTLR(ANother Tool for Language Recognition)作為解析引擎,在ShardingSphere-sql-parser-dialect模塊中定義了適用于不同數(shù)據(jù)庫語法的解析規(guī)則(.g4文件),idea中也可以下載ANTLR v4的插件,輸入SQL查看解析后的語法樹結果。
解析方法的入口在DataNodeRouter的createRouteContext方法中,解析引擎根據(jù)數(shù)據(jù)庫類型和SQL創(chuàng)建SQLParserExecutor執(zhí)行得到解析樹,再通過ParseTreeVisitor()的visit方法,對解析樹進行處理得到SQLStatement。ANTLR支持listener和visitor兩種模式的接口,visitor方式可以更靈活的控制解析樹的遍歷過程,更適用于SQL解析的場景。
解析引擎核心代碼:
org.apache.shardingsphere.underlying.route.DataNodeRouter#createRouteContext#96 private RouteContext createRouteContext(final String sql, final List<Object> parameters, final boolean useCache) { //解析引擎解析SQL SQLStatement sqlStatement = parserEngine.parse(sql, useCache); try { SQLStatementContext sqlStatementContext = SQLStatementContextFactory.newInstance(metaData.getSchema(), sql, parameters, sqlStatement); return new RouteContext(sqlStatementContext, parameters, new RouteResult()); // TODO should pass parameters for master-slave } catch (final IndexOutOfBoundsException ex) { return new RouteContext(new CommonSQLStatementContext(sqlStatement), parameters, new RouteResult()); } } org.apache.shardingsphere.sql.parser.SQLParserEngine#parse0#72 private SQLStatement parse0(final String sql, final boolean useCache) { //緩存 if (useCache) { Optional<SQLStatement> cachedSQLStatement = cache.getSQLStatement(sql); if (cachedSQLStatement.isPresent()) { return cachedSQLStatement.get(); } } //根據(jù)數(shù)據(jù)庫類型和sql生成解析樹 ParseTree parseTree = new SQLParserExecutor(databaseTypeName, sql).execute().getRootNode(); //ParseTreeVisitor的visit方法對解析樹進行處理得到SQLStatement SQLStatement result = (SQLStatement) ParseTreeVisitorFactory.newInstance(databaseTypeName, VisitorRule.valueOf(parseTree.getClass())).visit(parseTree); if (useCache) { cache.put(sql, result); } return result; }
SQLStatement實際上是一個接口,其實現(xiàn)對應著不同的SQL類型,如SelectStatement 類中就包括查詢的字段、表名、where條件、分組、排序、分頁、lock等變量,可以看到這里并沒有對having這種字段做定義,相當于Sharding-JDBC無法識別到SQL中的having,這使得Sharding-JDBC對having語法有一定的限制。
SelectStatement
public final class SelectStatement extends DMLStatement { // 字段 private ProjectionsSegment projections; // 表 private final Collection<TableReferenceSegment> tableReferences = new LinkedList<>(); // where private WhereSegment where; // groupBy private GroupBySegment groupBy; // orderBy private OrderBySegment orderBy; // limit private LimitSegment limit; // 父statement private SelectStatement parentStatement; // lock private LockSegment lock;}
SQLStatement還會被進一步轉換成SQLStatementContext,如SelectStatement 會被轉換成SelectStatementContext ,其結構與SelectStatement 類似不再多說,值得注意的是雖然這里定義了containsSubquery來判斷是否包含子查詢,但4.1.1源碼永遠是返回的false,與having類似,這意味著Sharding-JDBC不會對子查詢語句做特殊處理。
SelectStatementContext
public final class SelectStatementContext extends CommonSQLStatementContext<SelectStatement> implements TableAvailable, WhereAvailable { private final TablesContext tablesContext; private final ProjectionsContext projectionsContext; private final GroupByContext groupByContext; private final OrderByContext orderByContext; private final PaginationContext paginationContext; private final boolean containsSubquery;} private boolean containsSubquery() { // FIXME process subquery// Collection<SubqueryPredicateSegment> subqueryPredicateSegments = getSqlStatement().findSQLSegments(SubqueryPredicateSegment.class);// for (SubqueryPredicateSegment each : subqueryPredicateSegments) {// if (!each.getAndPredicates().isEmpty()) {// return true;// }// } return false; }
3.2.2 引擎總結
解析引擎是進行路由改寫的前提基礎,其作用就是將SQL按照定義的語法規(guī)則拆分成原子符號(token),生成語法樹,根據(jù)不同的SQL類型生成對應的SQLStatement,SQLStatement由各自的Segment組成,所有的Segment都包含startIndex和endIndex來定位token在SQL中所屬的位置,但解析語法難以涵蓋所有的SQL場景,使得部分SQL無法按照預期的結果路由執(zhí)行。
3.3.1 引擎解析
路由引擎是Sharding-JDBC的核心步驟,作用是根據(jù)定義的分庫分表規(guī)則將解析引擎生成的SQL上下文生成對應的路由結果,RouteResult 包括DataNode和RouteUnit,DataNode是實際的數(shù)據(jù)源節(jié)點,包括數(shù)據(jù)源名稱和實際的物理表名,RouteUnit則記錄了邏輯表/庫與物理表/庫的映射關系,后面的改寫引擎也是根據(jù)這個映射關系來決定如何替換SQL中的邏輯表(實際上RouteResult 就是維護了一條SQL需要往哪些庫哪些表執(zhí)行的關系)。
RouteResult
public final class RouteResult { private final Collection<Collection<DataNode>> originalDataNodes = new LinkedList<>(); private final Collection<RouteUnit> routeUnits = new LinkedHashSet<>();} public final class DataNode { private static final String DELIMITER = "."; private final String dataSourceName; private final String tableName;} public final class RouteUnit { private final RouteMapper dataSourceMapper; private final Collection<RouteMapper> tableMappers;} public final class RouteMapper { private final String logicName; private final String actualName;}
其中,路由有分為分片路由和主從路由,兩者可以單獨使用,也可以組合使用。
分片路由
ShardingRouteDecorator的decorate方法是路由引擎的核心邏輯,經(jīng)過SQL校驗->生成分片條件->合并分片值后得到路由結果。
分片路由decorate方法
org.apache.shardingsphere.sharding.route.engine.ShardingRouteDecorator#decorate#57public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final ShardingRule shardingRule, final ConfigurationProperties properties) { SQLStatementContext sqlStatementContext = routeContext.getSqlStatementContext(); List<Object> parameters = routeContext.getParameters(); //SQL校驗 校驗INSERT INTO .... ON DUPLICATE KEY UPDATE 和UPDATE語句中是否存在分片鍵 ShardingStatementValidatorFactory.newInstance( sqlStatementContext.getSqlStatement()).ifPresent(validator -> validator.validate(shardingRule, sqlStatementContext.getSqlStatement(), parameters)); //生成分片條件 ShardingConditions shardingConditions = getShardingConditions(parameters, sqlStatementContext, metaData.getSchema(), shardingRule); //合并分片值 boolean needMergeShardingValues = isNeedMergeShardingValues(sqlStatementContext, shardingRule); if (sqlStatementContext.getSqlStatement() instanceof DMLStatement && needMergeShardingValues) { checkSubqueryShardingValues(sqlStatementContext, shardingRule, shardingConditions); mergeShardingConditions(shardingConditions); } ShardingRouteEngine shardingRouteEngine = ShardingRouteEngineFactory.newInstance(shardingRule, metaData, sqlStatementContext, shardingConditions, properties); //得到路由結果 RouteResult routeResult = shardingRouteEngine.route(shardingRule); if (needMergeShardingValues) { Preconditions.checkState(1 == routeResult.getRouteUnits().size(), "Must have one sharding with subquery."); } return new RouteContext(sqlStatementContext, parameters, routeResult); }
ShardingStatementValidator有ShardingInsertStatementValidator和ShardingUpdateStatementValidator兩種實現(xiàn),INSERT INTO .... ON DUPLICATE KEY UPDATE和UPDATE語法都會涉及到字段值的更新,Sharding-JDBC是不允許更新分片值的,畢竟修改分片值還需要將數(shù)據(jù)遷移至新分片值對應的庫表中,才能保證數(shù)據(jù)分片規(guī)則一致。兩者的校驗細節(jié)也有所不同:
ShardingCondition中只有一個變量routeValues,RouteValue是一個接口,有ListRouteValue和RangeRouteValue兩種實現(xiàn),前者記錄了分片鍵的in或=條件的分片值,后者則記錄了范圍查詢的分片值,兩者被封裝為ShardingValue對象后,將會透傳至分片算法中計算得到分片結果集。
ShardingCondition
public final class ShardingConditions { private final List<ShardingCondition> conditions;} public class ShardingCondition { private final List<RouteValue> routeValues = new LinkedList<>();} public final class ListRouteValue<T extends Comparable<?>> implements RouteValue { private final String columnName; private final String tableName; //in或=條件對應的值 private final Collection<T> values; @Override public String toString() { return tableName + "." + columnName + (1 == values.size() ? " = " + new ArrayList<>(values).get(0) : " in (" + Joiner.on(",").join(values) + ")"); }} public final class RangeRouteValue<T extends Comparable<?>> implements RouteValue { private final String columnName; private final String tableName; //between and 大于小于等范圍值的上下限 private final Range<T> valueRange;}
生成分片條件后還會合并分片條件,但是前文提過在SelectStatementContext中的containsSubquery永遠是false,所以這段邏輯永遠返回false,即不會合并分片條件。
判斷是否需要合并分片條件
org.apache.shardingsphere.sharding.route.engine.ShardingRouteDecorator#isNeedMergeShardingValues#87private boolean isNeedMergeShardingValues(final SQLStatementContext sqlStatementContext, final ShardingRule shardingRule) { return sqlStatementContext instanceof SelectStatementContext && ((SelectStatementContext) sqlStatementContext).isContainsSubquery() && !shardingRule.getShardingLogicTableNames(sqlStatementContext.getTablesContext().getTableNames()).isEmpty(); }
然后就是通過分片路由引擎調(diào)用分片算法計算路由結果了,ShardingRouteEngine實現(xiàn)較多,介紹起來篇幅較多,這里就不展開說明了,可以參考官方文檔來了解路由引擎的選擇規(guī)則。
▲圖片來源:ShardingSphere 官方文檔
Sharding-JDBC定義了多種分片策略和算法接口,主要的分配策略與算法說明如下表所示:
補充兩個細節(jié):
(1)當ALLOW_RANGE_QUERY_WITH
_INLINE_SHARDING配置設置true時,InlineShardingStrategy支持范圍查詢,但是并不是根據(jù)分片值計算范圍,而是直接全路由至配置的數(shù)據(jù)節(jié)點,會存在性能隱患。
InlineShardingStrategy.doSharding
org.apache.shardingsphere.core.strategy.route.inline.InlineShardingStrategy#doSharding public Collection<String> doSharding(final Collection<String> availableTargetNames, final Collection<RouteValue> shardingValues, final ConfigurationProperties properties) { RouteValue shardingValue = shardingValues.iterator().next(); //ALLOW_RANGE_QUERY_WITH_INLINE_SHARDING設置為true,直接返回availableTargetNames,而不是根據(jù)RangeRouteValue計算 if (properties.<Boolean>getValue(ConfigurationPropertyKey.ALLOW_RANGE_QUERY_WITH_INLINE_SHARDING) && shardingValue instanceof RangeRouteValue) { return availableTargetNames; } Preconditions.checkState(shardingValue instanceof ListRouteValue, "Inline strategy cannot support this type sharding:" + shardingValue.toString()); Collection<String> shardingResult = doSharding((ListRouteValue) shardingValue); Collection<String> result = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); for (String each : shardingResult) { if (availableTargetNames.contains(each)) { result.add(each); } } return result; }
(2)4.1.1的官方文檔雖然說Hint可以跳過解析和改寫,但在我們上面解析引擎的源碼解析中,我們并沒有看到有對Hint策略的額外跳過。事實上,即使使用了Hint分片SQL也同樣需要解析重寫,也同樣受Sharding-JDBC的語法限制,這在官方的issue中也曾經(jīng)被提及。
▲圖片來源:ShardingSphere 官方文檔
主從路由
主從路由的核心邏輯就是通過MasterSlaveDataSourceRouter的route方法進行判定SQL走主庫還是從庫。主從情況下,配置的數(shù)據(jù)源實際是一組主從,而不是單個的實例,所以需要通過masterSlaveRule獲取到具體的主庫或者從庫名字。
主從路由decorate
org.apache.shardingsphere.masterslave.route.engine.MasterSlaveRouteDecorator#decorate public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final MasterSlaveRule masterSlaveRule, final ConfigurationProperties properties) { //為空證明沒有經(jīng)過分片路由 if (routeContext.getRouteResult().getRouteUnits().isEmpty()) { //根據(jù)SQL判斷選擇走主庫還是從庫 String dataSourceName = new MasterSlaveDataSourceRouter(masterSlaveRule).route(routeContext.getSqlStatementContext().getSqlStatement()); RouteResult routeResult = new RouteResult(); //根據(jù)具體的主庫/從庫名創(chuàng)建路由單元 routeResult.getRouteUnits().add(new RouteUnit(new RouteMapper(dataSourceName, dataSourceName), Collections.emptyList())); return new RouteContext(routeContext.getSqlStatementContext(), Collections.emptyList(), routeResult); } Collection<RouteUnit> toBeRemoved = new LinkedList<>(); Collection<RouteUnit> toBeAdded = new LinkedList<>(); //不為空證明已經(jīng)被分片路由處理了 for (RouteUnit each : routeContext.getRouteResult().getRouteUnits()) { if (masterSlaveRule.getName().equalsIgnoreCase(each.getDataSourceMapper().getActualName())) { //先標記移除 因為這里是一組主從的名字而不是實際的庫 toBeRemoved.add(each); //根據(jù)SQL判斷選擇走主庫還是從庫 String actualDataSourceName = new MasterSlaveDataSourceRouter(masterSlaveRule).route(routeContext.getSqlStatementContext().getSqlStatement()); //根據(jù)具體的主庫/從庫名創(chuàng)建路由單元 toBeAdded.add(new RouteUnit(new RouteMapper(each.getDataSourceMapper().getLogicName(), actualDataSourceName), each.getTableMappers())); } } routeContext.getRouteResult().getRouteUnits().removeAll(toBeRemoved); routeContext.getRouteResult().getRouteUnits().addAll(toBeAdded); return routeContext; }
MasterSlaveDataSourceRouter中isMasterRoute方法會判斷SQL是否需要走主庫,當出現(xiàn)以下情況時走主庫:
不走主庫則通過負載算法選擇從庫,Sharding-JDBC提供了輪詢和隨機兩種算法。
MasterSlaveDataSourceRouter
public final class MasterSlaveDataSourceRouter { private final MasterSlaveRule masterSlaveRule; /** * Route. * * @param sqlStatement SQL statement * @return data source name */ public String route(final SQLStatement sqlStatement) { if (isMasterRoute(sqlStatement)) { MasterVisitedManager.setMasterVisited(); return masterSlaveRule.getMasterDataSourceName(); } return masterSlaveRule.getLoadBalanceAlgorithm().getDataSource( masterSlaveRule.getName(), masterSlaveRule.getMasterDataSourceName(), new ArrayList<>(masterSlaveRule.getSlaveDataSourceNames())); } private boolean isMasterRoute(final SQLStatement sqlStatement) { return containsLockSegment(sqlStatement) || !(sqlStatement instanceof SelectStatement) || MasterVisitedManager.isMasterVisited() || HintManager.isMasterRouteOnly(); } private boolean containsLockSegment(final SQLStatement sqlStatement) { return sqlStatement instanceof SelectStatement && ((SelectStatement) sqlStatement).getLock().isPresent(); }}
是否走主庫的信息存在MasterVisitedManager中,MasterVisitedManager是通過ThreadLocal實現(xiàn)的,但這種實現(xiàn)會有一個問題,當我們使用事務先查詢再更新/插入時,第一條查詢SQL并不會走主庫,而是走從庫,如果業(yè)務需要事務的第一條查詢也走主庫,事務查詢前需要手動調(diào)用一次MasterVisitedManager.setMasterVisited()。
MasterVisitedManager
public final class MasterVisitedManager { private static final ThreadLocal<Boolean> MASTER_VISITED = ThreadLocal.withInitial(() -> false); /** * Judge master data source visited in current thread. * * @return master data source visited or not in current thread */ public static boolean isMasterVisited() { return MASTER_VISITED.get(); } /** * Set master data source visited in current thread. */ public static void setMasterVisited() { MASTER_VISITED.set(true); } /** * Clear master data source visited. */ public static void clear() { MASTER_VISITED.remove(); }}
3.3.2 引擎總結
路由引擎的作用是將SQL根據(jù)參數(shù)通過實現(xiàn)的策略算法計算出實際該在哪些庫的哪些表執(zhí)行,也就是路由結果。路由引擎有兩種實現(xiàn),分別是分片路由和主從路由,兩者都提供了標準化的策略接口來讓業(yè)務實現(xiàn)自己的路由策略,分片路由需要注意自身SQL場景和策略算法相匹配,主從路由中同一線程且同一數(shù)據(jù)庫連接內(nèi),有寫入操作后,之后的讀操作會從主庫讀取,寫入操作前的讀操作不會走主庫。
3.4.1 引擎解析
經(jīng)過解析路由后雖然確定了執(zhí)行的實際庫表,但SQL中表名依舊是邏輯表,不能執(zhí)行,改寫引擎可以將邏輯表替換為物理表。同時,路由至多庫表的SQL也需要拆分為多條SQL執(zhí)行。
改寫的入口仍舊在BasePrepareEngine中,創(chuàng)建重寫上下文createSQLRewriteContext,再根據(jù)上下文進行改寫rewrite,最終返回執(zhí)行單元ExecutionUnit。
改寫邏輯入口
org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#executeRewrite private Collection<ExecutionUnit> executeRewrite(final String sql, final List<Object> parameters, final RouteContext routeContext) { //注冊重寫裝飾器 registerRewriteDecorator(); //創(chuàng)建 SQLRewriteContext SQLRewriteContext sqlRewriteContext = rewriter.createSQLRewriteContext(sql, parameters, routeContext.getSqlStatementContext(), routeContext); //重寫 return routeContext.getRouteResult().getRouteUnits().isEmpty() ? rewrite(sqlRewriteContext) : rewrite(routeContext, sqlRewriteContext); }
執(zhí)行單元包含了數(shù)據(jù)源名稱,改寫后的SQL,以及對應的參數(shù),SQL一樣的兩個SQLUnit會被視為相等。
ExecutionUnit
@RequiredArgsConstructor@Getter@EqualsAndHashCode@ToStringpublic final class ExecutionUnit { private final String dataSourceName; private final SQLUnit sqlUnit;} @AllArgsConstructor@RequiredArgsConstructor@Getter@Setter//根據(jù)sql判斷是否相等@EqualsAndHashCode(of = { "sql" })@ToStringpublic final class SQLUnit { private String sql; private final List<Object> parameters; }
createSQLRewriteContext完成了兩件事,一個是對SQL參數(shù)進行了重寫,一個是生成了SQLToken。
createSQLRewriteContext
org.apache.shardingsphere.underlying.rewrite.SQLRewriteEntry#createSQLRewriteContext public SQLRewriteContext createSQLRewriteContext(final String sql, final List<Object> parameters, final SQLStatementContext sqlStatementContext, final RouteContext routeContext) { SQLRewriteContext result = new SQLRewriteContext(schemaMetaData, sqlStatementContext, sql, parameters); //sql參數(shù)重寫 decorate(decorators, result, routeContext); //生成SQLToken result.generateSQLTokens(); return result; } org.apache.shardingsphere.sharding.rewrite.context.ShardingSQLRewriteContextDecorator#decorate public void decorate(final ShardingRule shardingRule, final ConfigurationProperties properties, final SQLRewriteContext sqlRewriteContext) { for (ParameterRewriter each : new ShardingParameterRewriterBuilder(shardingRule, routeContext).getParameterRewriters(sqlRewriteContext.getSchemaMetaData())) { if (!sqlRewriteContext.getParameters().isEmpty() && each.isNeedRewrite(sqlRewriteContext.getSqlStatementContext())) { //參數(shù)重寫 each.rewrite(sqlRewriteContext.getParameterBuilder(), sqlRewriteContext.getSqlStatementContext(), sqlRewriteContext.getParameters()); } } //sqlTokenGenerators sqlRewriteContext.addSQLTokenGenerators(new ShardingTokenGenerateBuilder(shardingRule, routeContext).getSQLTokenGenerators()); } org.apache.shardingsphere.underlying.rewrite.context.SQLRewriteContext#generateSQLTokens public void generateSQLTokens() { sqlTokens.addAll(sqlTokenGenerators.generateSQLTokens(sqlStatementContext, parameters, schemaMetaData)); }
ParameterRewriter中與分片相關的實現(xiàn)有兩種。
//*詳細的例子可以參考官方文檔中分頁修正和補列部分。
SQLToken記錄了SQL中每個token(解析引擎中提過的不可再分的原子符號)的起始位置,從而方便改寫引擎知道哪些位置需要改寫。
SQLToken
@RequiredArgsConstructor@Getterpublic abstract class SQLToken implements Comparable<SQLToken> { private final int startIndex; @Override public final int compareTo(final SQLToken sqlToken) { return startIndex - sqlToken.getStartIndex(); }}
創(chuàng)建完SQLRewriteContext后就對整條SQL進行重寫和組裝參數(shù),可以看出每個RouteUnit都會重寫SQL并獲取自己對應的參數(shù)。
SQLRouteRewriteEngine.rewrite
org.apache.shardingsphere.underlying.rewrite.engine.SQLRouteRewriteEngine#rewrite public Map<RouteUnit, SQLRewriteResult> rewrite(final SQLRewriteContext sqlRewriteContext, final RouteResult routeResult) { Map<RouteUnit, SQLRewriteResult> result = new LinkedHashMap<>(routeResult.getRouteUnits().size(), 1); for (RouteUnit each : routeResult.getRouteUnits()) { //重寫SQL+組裝參數(shù) result.put(each, new SQLRewriteResult(new RouteSQLBuilder(sqlRewriteContext, each).toSQL(), getParameters(sqlRewriteContext.getParameterBuilder(), routeResult, each))); } return result; }
toSQL核心就是根據(jù)SQLToken將SQL拆分改寫再拼裝,比如:
select * from t_order where created_by = '123'
就會被拆分為select * from | t_order | where created_by = '123'三部分進行改寫拼裝。
toSQL
org.apache.shardingsphere.underlying.rewrite.sql.impl.AbstractSQLBuilder#toSQL public final String toSQL() { if (context.getSqlTokens().isEmpty()) { return context.getSql(); } Collections.sort(context.getSqlTokens()); StringBuilder result = new StringBuilder(); //截取第一個SQLToken之前的內(nèi)容 select * from result.append(context.getSql().substring(0, context.getSqlTokens().get(0).getStartIndex())); for (SQLToken each : context.getSqlTokens()) { //重寫拼接每個SQLToken對應的內(nèi)容 t_order ->t_order_0 result.append(getSQLTokenText(each)); //拼接SQLToken中間不變的內(nèi)容 where created_by = '123' result.append(getConjunctionText(each)); } return result.toString(); }
ParameterBuilder有StandardParameterBuilder和GroupedParameterBuilder兩個實現(xiàn)。
原因和樣例可以參考官方文檔批量拆分部分。
getParameters
org.apache.shardingsphere.underlying.rewrite.engine.SQLRouteRewriteEngine#getParameters private List<Object> getParameters(final ParameterBuilder parameterBuilder, final RouteResult routeResult, final RouteUnit routeUnit) { if (parameterBuilder instanceof StandardParameterBuilder || routeResult.getOriginalDataNodes().isEmpty() || parameterBuilder.getParameters().isEmpty()) { //非插入語句直接返回 return parameterBuilder.getParameters(); } List<Object> result = new LinkedList<>(); int count = 0; for (Collection<DataNode> each : routeResult.getOriginalDataNodes()) { if (isInSameDataNode(each, routeUnit)) { //插入語句參數(shù)分組構造 result.addAll(((GroupedParameterBuilder) parameterBuilder).getParameters(count)); } count++; } return result; }
3.4.2 引擎總結
改寫引擎的作用是將邏輯SQL轉換為實際可執(zhí)行的SQL,這其中既有邏輯表名的替換,也有多路由的SQL拆分,還有為了后續(xù)歸并操作而進行的分頁、分組、排序等改寫,select語句不會對參數(shù)進行重組,而insert語句為了避免插入多余數(shù)據(jù),會通過路由單元對參數(shù)進行重組。
3.5.1 引擎解析
改寫完成后的SQL就可以執(zhí)行了,執(zhí)行引擎需要平衡好資源和效率,如果為每條真實SQL都創(chuàng)建一個數(shù)據(jù)庫連接顯然會造成資源的濫用,但如果單線程串行也必然會影響執(zhí)行效率。
執(zhí)行引擎會先將執(zhí)行單元中需要執(zhí)行的SQLUnit根據(jù)數(shù)據(jù)源分組,同一個數(shù)據(jù)源下的SQLUnit會放入一個list,然后會根據(jù)maxConnectionsSizePerQuery對同一個數(shù)據(jù)源的SQLUnit繼續(xù)分組,創(chuàng)建連接并綁定SQLUnit。
執(zhí)行組創(chuàng)建
org.apache.shardingsphere.sharding.execute.sql.prepare.SQLExecutePrepareTemplate#getSynchronizedExecuteUnitGroups private Collection<InputGroup<StatementExecuteUnit>> getSynchronizedExecuteUnitGroups( final Collection<ExecutionUnit> executionUnits, final SQLExecutePrepareCallback callback) throws SQLException { //根據(jù)數(shù)據(jù)源將SQLUnit分組 key=dataSourceName Map<String, List<SQLUnit>> sqlUnitGroups = getSQLUnitGroups(executionUnits); Collection<InputGroup<StatementExecuteUnit>> result = new LinkedList<>(); //創(chuàng)建sql執(zhí)行組 for (Entry<String, List<SQLUnit>> entry : sqlUnitGroups.entrySet()) { result.addAll(getSQLExecuteGroups(entry.getKey(), entry.getValue(), callback)); } return result; } org.apache.shardingsphere.sharding.execute.sql.prepare.SQLExecutePrepareTemplate#getSQLExecuteGroups private List<InputGroup<StatementExecuteUnit>> getSQLExecuteGroups(final String dataSourceName, final List<SQLUnit> sqlUnits, final SQLExecutePrepareCallback callback) throws SQLException { List<InputGroup<StatementExecuteUnit>> result = new LinkedList<>(); //每個連接需要執(zhí)行的最大sql數(shù)量 int desiredPartitionSize = Math.max(0 == sqlUnits.size() % maxConnectionsSizePerQuery ? sqlUnits.size() / maxConnectionsSizePerQuery : sqlUnits.size() / maxConnectionsSizePerQuery + 1, 1); //分組,每組對應一條數(shù)據(jù)庫連接 List<List<SQLUnit>> sqlUnitPartitions = Lists.partition(sqlUnits, desiredPartitionSize); //選擇連接模式 連接限制/內(nèi)存限制 ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY; //創(chuàng)建連接 List<Connection> connections = callback.getConnections(connectionMode, dataSourceName, sqlUnitPartitions.size()); int count = 0; for (List<SQLUnit> each : sqlUnitPartitions) { //綁定連接和SQLUnit 創(chuàng)建StatementExecuteUnit result.add(getSQLExecuteGroup(connectionMode, connections.get(count++), dataSourceName, each, callback)); } return result; }
SQLUnit分組和連接模式選擇沒有任何關系,連接模式的選擇只取決于maxConnectionsSizePerQuery和SQLUnit數(shù)量的大小關系,maxConnectionsSizePerQuery代表了一個數(shù)據(jù)源一次查詢允許的最大連接數(shù)。
不過maxConnectionsSizePerQuery默認值為1,所以當一條SQL需要路由至多張表時(即有多個SQLUnit)會采用連接限制,當路由至單表時是內(nèi)存限制模式。
為了避免產(chǎn)生數(shù)據(jù)庫連接死鎖問題,在內(nèi)存限制模式時,Sharding-JDBC通過鎖住數(shù)據(jù)源對象一次性創(chuàng)建出本條SQL需要的所有數(shù)據(jù)庫連接。連接限制模式下,各連接一次性查出各自的結果,不會出現(xiàn)多連接相互等待的情況,因此不會發(fā)生死鎖,而內(nèi)存限制模式通過游標讀取結果集,需要多條連接去查詢不同的表做合并,如果不一次性拿到所有需要的連接,則可能存在連接相互等待的情況造成死鎖。可以參照官方文檔中執(zhí)行引擎相關例子。
不同連接模式創(chuàng)建連接
private List<Connection> createConnections(final String dataSourceName, final ConnectionMode connectionMode, final DataSource dataSource, final int connectionSize) throws SQLException { if (1 == connectionSize) { Connection connection = createConnection(dataSourceName, dataSource); replayMethodsInvocation(connection); return Collections.singletonList(connection); } if (ConnectionMode.CONNECTION_STRICTLY == connectionMode) { return createConnections(dataSourceName, dataSource, connectionSize); } //內(nèi)存限制模式加鎖 一次性獲取所有的連接 synchronized (dataSource) { return createConnections(dataSourceName, dataSource, connectionSize); }}
此外,結果集的內(nèi)存合并和流式合并只在調(diào)用JDBC的executeQuery的情況下生效,如果使用execute方式進行查詢,都是統(tǒng)一使用流式方式的查詢。
查詢結果歸并對比
org.apache.shardingsphere.shardingjdbc.executor.PreparedStatementExecutor#executeQuery#101 org.apache.shardingsphere.shardingjdbc.executor.PreparedStatementExecutor#getQueryResult private QueryResult getQueryResult(final Statement statement, final ConnectionMode connectionMode) throws SQLException { PreparedStatement preparedStatement = (PreparedStatement) statement; ResultSet resultSet = preparedStatement.executeQuery(); getResultSets().add(resultSet); //executeQuery 中根據(jù)連接模式選擇流式/內(nèi)存 return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new StreamQueryResult(resultSet) : new MemoryQueryResult(resultSet); } //execute 單獨調(diào)用getResultSet中只會使用流式合并org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#getResultSet#158 org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#getQueryResults private List<QueryResult> getQueryResults(final List<ResultSet> resultSets) throws SQLException { List<QueryResult> result = new ArrayList<>(resultSets.size()); for (ResultSet each : resultSets) { if (null != each) { result.add(new StreamQueryResult(each)); } } return result; }
多條連接的執(zhí)行方式分為串行和并行,在本地事務和XA事務中是串行的方式,其余情況是并行,具體的執(zhí)行邏輯這里就不再展開了。
isHoldTransaction
public boolean isHoldTransaction() { return (TransactionType.LOCAL == transactionType && !getAutoCommit()) || (TransactionType.XA == transactionType && isInShardingTransaction()); }
3.5.2 引擎總結
執(zhí)行引擎通過maxConnectionsSizePerQuery和同數(shù)據(jù)源的SQLUnit的數(shù)量大小確定連接模式,maxConnectionsSizePerQuery=SQLUnit數(shù)量使用內(nèi)存限制模式,當使用內(nèi)存限制模式時會通過對數(shù)據(jù)源對象加鎖來保證一次性獲取本條SQL需要的連接而避免死鎖。在使用executeQuery查詢時,處理結果集時會根據(jù)連接模式選擇流式或者內(nèi)存合并,但使用execute方法查詢,處理結果集只會使用流式合并。
3.6.1 引擎解析
查詢出的結果集需要經(jīng)過歸并引擎歸并后才是最終的結果,歸并的核心入口在MergeEntry的process方法中,優(yōu)先處理分片場景的合并,再進行脫敏,只有讀寫分離的情況下則直接返回TransparentMergedResult,TransparentMergedResult實際上沒做合并的額外處理,其內(nèi)部實現(xiàn)都是完全調(diào)用queryResult的實現(xiàn)。
歸并邏輯入口
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#mergeQuery#190 org.apache.shardingsphere.underlying.pluggble.merge.MergeEngine#merge#61 org.apache.shardingsphere.underlying.merge.MergeEntry#process public MergedResult process(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext) throws SQLException { //分片合并 Optional<MergedResult> mergedResult = merge(queryResults, sqlStatementContext); //脫敏處理 Optional<MergedResult> result = mergedResult.isPresent() ? Optional.of(decorate(mergedResult.get(), sqlStatementContext)) : decorate(queryResults.get(0), sqlStatementContext); //只有讀寫分離的情況下,orElseGet會不存在,TransparentMergedResult return result.orElseGet(() -> new TransparentMergedResult(queryResults.get(0))); }
TransparentMergedResult
@RequiredArgsConstructorpublic final class TransparentMergedResult implements MergedResult { private final QueryResult queryResult; @Override public boolean next() throws SQLException { return queryResult.next(); } @Override public Object getValue(final int columnIndex, final Class<?> type) throws SQLException { return queryResult.getValue(columnIndex, type); } @Override public Object getCalendarValue(final int columnIndex, final Class<?> type, final Calendar calendar) throws SQLException { return queryResult.getCalendarValue(columnIndex, type, calendar); } @Override public InputStream getInputStream(final int columnIndex, final String type) throws SQLException { return queryResult.getInputStream(columnIndex, type); } @Override public boolean wasNull() throws SQLException { return queryResult.wasNull(); }}
我們只看分片相關的操作,ResultMergerEngine只有一個實現(xiàn)類ShardingResultMergerEngine,所以只有存在分片情況的時候,上文的第一個merge才會有結果。根據(jù)SQL類型的不同選擇ResultMerger實現(xiàn),查詢類的合并是最常用也是最復雜的合并。
MergeEntry.merge
org.apache.shardingsphere.underlying.merge.MergeEntry#merge private Optional<MergedResult> merge(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext) throws SQLException { for (Entry<BaseRule, ResultProcessEngine> entry : engines.entrySet()) { if (entry.getValue() instanceof ResultMergerEngine) { //選擇不同類型的 resultMerger ResultMerger resultMerger = ((ResultMergerEngine) entry.getValue()).newInstance(databaseType, entry.getKey(), properties, sqlStatementContext); //歸并 return Optional.of(resultMerger.merge(queryResults, sqlStatementContext, schemaMetaData)); } } return Optional.empty(); } org.apache.shardingsphere.sharding.merge.ShardingResultMergerEngine#newInstance public ResultMerger newInstance(final DatabaseType databaseType, final ShardingRule shardingRule, final ConfigurationProperties properties, final SQLStatementContext sqlStatementContext) { if (sqlStatementContext instanceof SelectStatementContext) { return new ShardingDQLResultMerger(databaseType); } if (sqlStatementContext.getSqlStatement() instanceof DALStatement) { return new ShardingDALResultMerger(shardingRule); } return new TransparentResultMerger(); }
ShardingDQLResultMerger的merge方法就是根據(jù)SQL解析結果中包含的token選擇合適的歸并方式(分組聚合、排序、遍歷),歸并后的mergedResult統(tǒng)一經(jīng)過decorate方法進行判斷是否需要分頁歸并,整體處理流程圖可以概括如下。
歸并方式選擇
org.apache.shardingsphere.sharding.merge.dql.ShardingDQLResultMerger#merge public MergedResult merge(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext, final SchemaMetaData schemaMetaData) throws SQLException { if (1 == queryResults.size()) { return new IteratorStreamMergedResult(queryResults); } Map<String, Integer> columnLabelIndexMap = getColumnLabelIndexMap(queryResults.get(0)); SelectStatementContext selectStatementContext = (SelectStatementContext) sqlStatementContext; selectStatementContext.setIndexes(columnLabelIndexMap); //分組聚合,排序,遍歷 MergedResult mergedResult = build(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData); //分頁歸并 return decorate(queryResults, selectStatementContext, mergedResult); } org.apache.shardingsphere.sharding.merge.dql.ShardingDQLResultMerger#build private MergedResult build(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext, final Map<String, Integer> columnLabelIndexMap, final SchemaMetaData schemaMetaData) throws SQLException { if (isNeedProcessGroupBy(selectStatementContext)) { //分組聚合歸并 return getGroupByMergedResult(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData); } if (isNeedProcessDistinctRow(selectStatementContext)) { setGroupByForDistinctRow(selectStatementContext); //分組聚合歸并 return getGroupByMergedResult(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData); } if (isNeedProcessOrderBy(selectStatementContext)) { //排序歸并 return new OrderByStreamMergedResult(queryResults, selectStatementContext, schemaMetaData); } //遍歷歸并 return new IteratorStreamMergedResult(queryResults); } org.apache.shardingsphere.sharding.merge.dql.ShardingDQLResultMerger#decorate private MergedResult decorate(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext, final MergedResult mergedResult) throws SQLException { PaginationContext paginationContext = selectStatementContext.getPaginationContext(); if (!paginationContext.isHasPagination() || 1 == queryResults.size()) { return mergedResult; } String trunkDatabaseName = DatabaseTypes.getTrunkDatabaseType(databaseType.getName()).getName(); //根據(jù)數(shù)據(jù)庫類型分頁歸并 if ("MySQL".equals(trunkDatabaseName) || "PostgreSQL".equals(trunkDatabaseName)) { return new LimitDecoratorMergedResult(mergedResult, paginationContext); } if ("Oracle".equals(trunkDatabaseName)) { return new RowNumberDecoratorMergedResult(mergedResult, paginationContext); } if ("SQLServer".equals(trunkDatabaseName)) { return new TopAndRowNumberDecoratorMergedResult(mergedResult, paginationContext); } return mergedResult; }
每種歸并方式的作用在官方文檔有比較詳細的案例,這里就不再重復介紹了。
3.6.2 引擎總結
歸并引擎是Sharding-JDBC執(zhí)行SQL的最后一步,其作用是將多個數(shù)節(jié)點的結果集組合為一個正確的結果集返回,查詢類的歸并有分組歸并、聚合歸并、排序歸并、遍歷歸并、分頁歸并五種,這五種歸并方式并不是互斥的,而是相互組合的。
在使用Sharding-JDBC過程中,我們發(fā)現(xiàn)了一些問題可以改進,比如存量系統(tǒng)數(shù)據(jù)量到達一定規(guī)模而需要分庫分表引入Sharding-JDBC時,就會存在兩大問題。
一個是存量數(shù)據(jù)的遷移,這個問題我們可以通過分片算法兼容,前文已經(jīng)提過分片鍵的值是不允許更改的,而且SQL如果不包含分片鍵,如果這個分片鍵對應的值是遞增的(如id,時間等),我們可以設置一個閾值,在分片算法的doSharding中判斷分片值與閾值的大小決定將數(shù)據(jù)路由至舊表或新表,避免數(shù)據(jù)遷移的麻煩。如果是根據(jù)用戶id取模分表,而新增的數(shù)據(jù)無法只通過用戶id判斷,這時可以考慮采用復合分片算法,將用戶id與訂單id或者時間等遞增的字段同時設置為分片鍵,根據(jù)訂單id或時間判斷是否是新數(shù)據(jù),再根據(jù)用戶id取模得到路由結果即可。
另一個是Sharding-JDBC語法限制會使得存量SQL面對巨大的改造壓力,而實際上業(yè)務更關心的是需要分片的表,非分片的表不應該發(fā)生改動和影響。實際上,非分片表理論上無需通過解析、路由、重寫、合并,為此我們在源碼層面對這段邏輯進行了優(yōu)化,支持跳過部分解析,完全跳過分片路由、重寫和合并,盡可能減少Sharding-JDBC對非分片表的語法限制,來減少業(yè)務系統(tǒng)的改造壓力與風險。
Sharding-JDBC執(zhí)行解析路由重寫的邏輯都是在BasePrepareEngine中,最終構造ExecutionContext交由執(zhí)行引擎執(zhí)行,ExecutionContext中包含sqlStatementContext和executionUnits,非分片表不涉及路由改寫,所以其ExecutionUnit我們非常容易手動構造,而查看SQLStatementContext的使用情況,我們發(fā)現(xiàn)SQLStatementContext只會影響結果集的合并而不會影響實際的執(zhí)行,而不分片表也無需進行結果集的合并,整體實現(xiàn)思路如圖。
ExecutionContext相關對象
public class ExecutionContext { private final SQLStatementContext sqlStatementContext; private final Collection<ExecutionUnit> executionUnits = new LinkedHashSet<>();} public final class ExecutionUnit { private final String dataSourceName; private final SQLUnit sqlUnit;} public final class SQLUnit { private String sql; private final List<Object> parameters; }
(1)校驗SQL中是否包含分片表:我們是通過正則將SQL中的各個單詞分隔成Set,然后再遍歷BaseRule判斷是否存在分片表。大家可能會奇怪明明解析引擎可以幫我們解析出SQL中的表名,為什么還要自己來解析。因為我們測試的過程中發(fā)現(xiàn),存量業(yè)務上的SQL很多在解析階段就會報錯,只能提前判斷,當然這種判斷方式并不嚴謹,比如 SELECT order_id FROM t_order_record WHERE order_id=1 AND remarks=' t_order xxx';,配置的分片表t_order時就會存在誤判,但這種場景在我們的業(yè)務中沒有,所以暫時并沒有處理。由于這個信息需要在多個對象方法中使用,為了避免修改大量的對象變量和方法入?yún)ⅲ帜芊奖愕耐競鬟@個信息,判斷的結果我們選擇放在ThreadLocal里。
RuleContextManager
public final class RuleContextManager { private static final ThreadLocal<RuleContextManager> SKIP_CONTEXT_HOLDER = ThreadLocal.withInitial(RuleContextManager::new); /** * 是否跳過sharding */ private boolean skipSharding; /** * 是否路由至主庫 */ private boolean masterRoute; public static boolean isSkipSharding() { return SKIP_CONTEXT_HOLDER.get().skipSharding; } public static void setSkipSharding(boolean skipSharding) { SKIP_CONTEXT_HOLDER.get().skipSharding = skipSharding; } public static boolean isMasterRoute() { return SKIP_CONTEXT_HOLDER.get().masterRoute; } public static void setMasterRoute(boolean masterRoute) { SKIP_CONTEXT_HOLDER.get().masterRoute = masterRoute; } public static void clear(){ SKIP_CONTEXT_HOLDER.remove(); } }
判斷SQL是否包含分片表
org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#buildSkipContext// 判斷是否可以跳過sharding,構造RuleContextManager的值private void buildSkipContext(final String sql){ Set<String> sqlTokenSet = new HashSet<>(Arrays.asList(sql.split("[//s]"))); if (CollectionUtils.isNotEmpty(rules)) { for (BaseRule baseRule : rules) { //定制方法,ShardingRule實現(xiàn),判斷sqlTokenSet是否包含邏輯表即可 if(baseRule.hasContainShardingTable(sqlTokenSet)){ RuleContextManager.setSkipSharding(false); break; }else { RuleContextManager.setSkipSharding(true); } } }} org.apache.shardingsphere.core.rule.ShardingRule#hasContainShardingTablepublic Boolean hasContainShardingTable(Set<String> sqlTokenSet) { //logicTableNameList通過遍歷TableRule可以得到 for (String logicTable : logicTableNameList) { if (sqlTokenSet.contains(logicTable)) { return true; } } return false; }
(2)跳過解析路由:通過RuleContextManager中的skipSharding判斷是否需要跳過Sharding解析路由,但為了兼容讀寫分離的場景,我們還需要知道這條SQL應該走主庫還是從庫,走主庫的場景在后面強制路由主庫部分有說明,SQL走主庫實際上只有兩種情況,一種是非SELECT語句,另一種就是SELECT語句帶鎖,如SELECT...FOR UPDATE,因此整體實現(xiàn)的步驟如下:
RuleContextManager.isSkipSharding判斷是否跳過路由。
跳過解析路由
public class SkipShardingStatement implements SQLStatement{ @Override public int getParameterCount() { return 0; }} org.apache.shardingsphere.sql.parser.SQLParserEngine#parse0 private SQLStatement parse0(final String sql, final boolean useCache) { if (useCache) { Optional<SQLStatement> cachedSQLStatement = cache.getSQLStatement(sql); if (cachedSQLStatement.isPresent()) { return cachedSQLStatement.get(); } } ParseTree parseTree = new SQLParserExecutor(databaseTypeName, sql).execute().getRootNode(); /** * 跳過sharding 需要判斷是否需要路由至主庫 如果不是select語句直接跳過 * 是select語句則需要通過繼續(xù)解析判斷是否有鎖 */ SQLStatement result ; if(RuleContextManager.isSkipSharding()&&!VisitorRule.SELECT.equals(VisitorRule.valueOf(parseTree.getClass()))){ RuleContextManager.setMasterRoute(true); result = new SkipShardingStatement(); }else { result = (SQLStatement) ParseTreeVisitorFactory.newInstance(databaseTypeName, VisitorRule.valueOf(parseTree.getClass())).visit(parseTree); } if (useCache) { cache.put(sql, result); } return result; } org.apache.shardingsphere.sql.parser.mysql.visitor.impl.MySQLDMLVisitor#visitSelectClause public ASTNode visitSelectClause(final SelectClauseContext ctx) { SelectStatement result = new SelectStatement(); // 跳過sharding 只需要判斷是否有鎖來決定是否路由至主庫即可 if(RuleContextManager.isSkipSharding()){ if (null != ctx.lockClause()) { result.setLock((LockSegment) visit(ctx.lockClause())); RuleContextManager.setMasterRoute(true); } return result; } //...后續(xù)解析 } org.apache.shardingsphere.underlying.route.DataNodeRouter#createRouteContext private RouteContext createRouteContext(final String sql, final List<Object> parameters, final boolean useCache) { SQLStatement sqlStatement = parserEngine.parse(sql, useCache); //如果需要跳過sharding 不進行后續(xù)的解析直接返回 if (RuleContextManager.isSkipSharding()) { return new RouteContext(sqlStatement, parameters, new RouteResult()); } //...解析 } org.apache.shardingsphere.sharding.route.engine.ShardingRouteDecorator#decorate public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final ShardingRule shardingRule, final ConfigurationProperties properties) { // 跳過sharding路由 if(RuleContextManager.isSkipSharding()){ return routeContext; } //...路由 }
(3)手動構造ExecutionUnit:ExecutionUnit中我們需要確定的內(nèi)容就是datasourceName,這里我們認為跳過Sharding的SQL最終執(zhí)行的庫一定只有一個。如果只是跳過Sharding的情況,直接從元數(shù)據(jù)中獲取數(shù)據(jù)源名稱即可,如果存在讀寫分離的情況,主從路由的結果也一定是唯一的。創(chuàng)建完ExecutionUnit直接放入ExecutionContext返回即可,從而跳過后續(xù)的改寫邏輯。
手動構造ExecutionUnit
public ExecutionContext prepare(final String sql, final List<Object> parameters) { List<Object> clonedParameters = cloneParameters(parameters); // 判斷是否可以跳過sharding,構造RuleContextManager的值 buildSkipContext(sql); RouteContext routeContext = executeRoute(sql, clonedParameters); ExecutionContext result = new ExecutionContext(routeContext.getSqlStatementContext()); // 跳過sharding的sql最后的路由結果一定只有一個庫 if(RuleContextManager.isSkipSharding()){ log.debug("可以跳過sharding的場景 {}", sql); if(!Objects.isNull(routeContext.getRouteResult())){ Collection<String> allInstanceDataSourceNames = this.metaData.getDataSources().getAllInstanceDataSourceNames(); int routeUnitsSize = routeContext.getRouteResult().getRouteUnits().size(); /* * 1. 沒有讀寫分離的情況下 跳過sharding路由會導致routeUnitsSize為0 此時需要判斷數(shù)據(jù)源數(shù)量是否為1 * 2. 讀寫分離情況下 只會路由至具體的主庫或從庫 routeUnitsSize數(shù)量應該為1 */ if(!(routeUnitsSize == 0 && allInstanceDataSourceNames.size()==1)|| routeUnitsSize>1){ throw new ShardingSphereException("可以跳過sharding,但是路由結果不唯一,SQL= %s ,routeUnits= %s ",sql, routeContext.getRouteResult().getRouteUnits()); } Collection<String> actualDataSourceNames = routeContext.getRouteResult().getActualDataSourceNames(); // 手動創(chuàng)建執(zhí)行單元 String datasourceName = CollectionUtils.isEmpty(actualDataSourceNames)? allInstanceDataSourceNames.iterator().next():actualDataSourceNames.iterator().next(); ExecutionUnit executionUnit = new ExecutionUnit(datasourceName, new SQLUnit(sql, clonedParameters)); result.getExecutionUnits().add(executionUnit); //標記該結果需要跳過 result.setSkipShardingScenarioFlag(true); } }else { result.getExecutionUnits().addAll(executeRewrite(sql, clonedParameters, routeContext)); } if (properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW)) { SQLLogger.logSQL(sql, properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SIMPLE), result.getSqlStatementContext(), result.getExecutionUnits()); } return result;}
(4)跳過合并:跳過查詢結果的合并和影響行數(shù)計算的合并,注意ShardingPreparedStatement和ShardingStatement都需要跳過
跳過合并
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#executeQuery public ResultSet executeQuery() throws SQLException { ResultSet result; try { clearPrevious(); prepare(); initPreparedStatementExecutor(); List<QueryResult> queryResults = preparedStatementExecutor.executeQuery(); List<ResultSet> resultSets = preparedStatementExecutor.getResultSets(); // 定制開發(fā),不分片跳過合并 if(executionContext.isSkipShardingScenarioFlag()){ return CollectionUtils.isNotEmpty(resultSets) ? resultSets.get(0) : null; } MergedResult mergedResult = mergeQuery(queryResults); result = new ShardingResultSet(resultSets, mergedResult, this, executionContext); } finally { clearBatch(); } currentResultSet = result; return result; }org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#getResultSet public ResultSet getResultSet() throws SQLException { if (null != currentResultSet) { return currentResultSet; } List<ResultSet> resultSets = getResultSets(); // 定制開發(fā),不分片跳過合并 if(executionContext.isSkipShardingScenarioFlag()){ return CollectionUtils.isNotEmpty(resultSets) ? resultSets.get(0) : null; } if (executionContext.getSqlStatementContext() instanceof SelectStatementContext || executionContext.getSqlStatementContext().getSqlStatement() instanceof DALStatement) { MergedResult mergedResult = mergeQuery(getQueryResults(resultSets)); currentResultSet = new ShardingResultSet(resultSets, mergedResult, this, executionContext); } return currentResultSet; }org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#isAccumulate public boolean isAccumulate() { //定制開發(fā),不分片跳過計算 if(executionContext.isSkipShardingScenarioFlag()){ return false; } return !connection.getRuntimeContext().getRule().isAllBroadcastTables(executionContext.getSqlStatementContext().getTablesContext().getTableNames()); }
(5)清空RuleContextManager:查看一下Sharding-JDBC其他ThreadLocal的清空位置,對應的清空RuleContextManager就好。
清空ThreadLocal
org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractConnectionAdapter#closepublic final void close() throws SQLException { closed = true; MasterVisitedManager.clear(); TransactionTypeHolder.clear(); RuleContextManager.clear(); int connectionSize = cachedConnections.size(); try { forceExecuteTemplateForClose.execute(cachedConnections.entries(), cachedConnections -> cachedConnections.getValue().close()); } finally { cachedConnections.clear(); rootInvokeHook.finish(connectionSize); } }
舉個例子,比如Sharding-JDBC本身是不支持INSERT INTO tbl_name (col1, col2, …) SELECT col1, col2, … FROM tbl_name WHERE col3 = ? 這種語法的,會報空指針異常。
經(jīng)過我們上述改造驗證后,非分片表是可以跳過語法限制執(zhí)行如下的SQL的。
通過該功能的實現(xiàn),業(yè)務可以更關注與分片表的SQL改造,而無需擔心引入Sharding-JDBC造成所有SQL的驗證改造,大幅減少改造成本和風險。
Sharding-JDBC可以通過配置主從庫數(shù)據(jù)源方便的實現(xiàn)讀寫分離的功能,但使用讀寫分離就必須面對主從延遲和從庫失聯(lián)的痛點,針對這一問題,我們實現(xiàn)了強制路由主庫的動態(tài)配置,當主從延遲過大或從庫失聯(lián)時,通過修改配置來實現(xiàn)SQL語句強制走主庫的不停機路由切換。
后面會說明了配置的動態(tài)生效的實現(xiàn)方式,這里只說明強制路由主庫的實現(xiàn),我們直接使用前文的RuleContextManager即可,在主從路由引擎里判斷下是否開啟了強制主庫路由。
MasterSlaveRouteDecorator.decorate改造
org.apache.shardingsphere.masterslave.route.engine.MasterSlaveRouteDecorator#decorate public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final MasterSlaveRule masterSlaveRule, final ConfigurationProperties properties) { /** * 如果配置了強制主庫 MasterVisitedManager設置為true * 后續(xù)isMasterRoute中會保證路由至主庫 */ if(properties.<Boolean>getValue(ConfigurationPropertyKey.MASTER_ROUTE_ONLY)){ MasterVisitedManager.setMasterVisited(); } //...路由邏輯 return routeContext; }
為了兼容之前跳過Sharding的功能,我們需要同步修改下isMasterRoute方法,如果是跳過了Sharding路由需要通過RuleContextManager來判斷是否走主庫。
isMasterRoute改造
org.apache.shardingsphere.masterslave.route.engine.impl.MasterSlaveDataSourceRouter#isMasterRoute private boolean isMasterRoute(final SQLStatement sqlStatement) { if(sqlStatement instanceof SkipShardingStatement){ // 優(yōu)先以MasterVisitedManager中的值為準 return MasterVisitedManager.isMasterVisited()|| RuleContextManager.isMasterRoute(); } return containsLockSegment(sqlStatement) || !(sqlStatement instanceof SelectStatement) || MasterVisitedManager.isMasterVisited() || HintManager.isMasterRouteOnly(); }
當然,更理想的狀況是通過監(jiān)控主從同步延遲和數(shù)據(jù)庫撥測,當超過閾值時或從庫失聯(lián)時直接自動修改配置中心的庫,實現(xiàn)自動切換主庫,減少業(yè)務故障時間和運維壓力。
Sharding-JDBC中的ConfigurationPropertyKey中提供了許多配置屬性,而Sharding-JDBCB并沒有為這些配置提供在線修改的方法,而在實際的應用場景中,像SQL_SHOW這樣控制SQL打印的開關配置,我們更希望能夠在線修改配置值來控制SQL日志的打印,而不是修改完配置再重啟服務。
以SQL打印為例,BasePrepareEngine中存在ConfigurationProperties對象,通過調(diào)用getValue方法來獲取SQL_SHOW的值。
SQL 打印
org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#prepare /** * Prepare to execute. * * @param sql SQL * @param parameters SQL parameters * @return execution context */ public ExecutionContext prepare(final String sql, final List<Object> parameters) { List<Object> clonedParameters = cloneParameters(parameters); RouteContext routeContext = executeRoute(sql, clonedParameters); ExecutionContext result = new ExecutionContext(routeContext.getSqlStatementContext()); result.getExecutionUnits().addAll(executeRewrite(sql, clonedParameters, routeContext)); //sql打印 if (properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW)) { SQLLogger.logSQL(sql, properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SIMPLE), result.getSqlStatementContext(), result.getExecutionUnits()); } return result; }
ConfigurationProperties繼承了抽象類TypedProperties,其getValue方法就是根據(jù)key獲取對應的配置值,因此我們直接在TypedProperties中實現(xiàn)刷新緩存中的配置值的方法。
TypedProperties刷新配置
public abstract class TypedProperties<E extends Enum & TypedPropertyKey> { private static final String LINE_SEPARATOR = System.getProperty("line.separator"); @Getter private final Properties props; private final Map<E, TypedPropertyValue> cache; public TypedProperties(final Class<E> keyClass, final Properties props) { this.props = props; cache = preload(keyClass); } private Map<E, TypedPropertyValue> preload(final Class<E> keyClass) { E[] enumConstants = keyClass.getEnumConstants(); Map<E, TypedPropertyValue> result = new HashMap<>(enumConstants.length, 1); Collection<String> errorMessages = new LinkedList<>(); for (E each : enumConstants) { TypedPropertyValue value = null; try { value = new TypedPropertyValue(each, props.getOrDefault(each.getKey(), each.getDefaultValue()).toString()); } catch (final TypedPropertyValueException ex) { errorMessages.add(ex.getMessage()); } result.put(each, value); } if (!errorMessages.isEmpty()) { throw new ShardingSphereConfigurationException(Joiner.on(LINE_SEPARATOR).join(errorMessages)); } return result; } /** * Get property value. * * @param key property key * @param <T> class type of return value * @return property value */ @SuppressWarnings("unchecked") public <T> T getValue(final E key) { return (T) cache.get(key).getValue(); } /** * vivo定制改造方法 refresh property value. * @param key property key * @param value property value * @return 更新配置是否成功 */ public boolean refreshValue(String key, String value){ //獲取配置類支持的配置項 E[] enumConstants = targetKeyClass.getEnumConstants(); for (E each : enumConstants) { //遍歷新的值 if(each.getKey().equals(key)){ try { //空白value認為無效,取默認值 if(!StringUtils.isBlank(value)){ value = each.getDefaultValue(); } //構造新屬性 TypedPropertyValue typedPropertyValue = new TypedPropertyValue(each, value); //替換緩存 cache.put(each, typedPropertyValue); //原始屬性也替換下,有可能會通過RuntimeContext直接獲取Properties props.put(key,value); return true; } catch (final TypedPropertyValueException ex) { log.error("refreshValue error. key={} , value={}", key, value, ex); } } } return false; }}
實現(xiàn)了刷新方法后,我們還需要將該方法一步步暴露至一個外部可以調(diào)用的類中,以便在服務監(jiān)聽配置的方法中,能夠調(diào)用這個刷新方法。ConfigurationProperties直接在asePrepareEngine的構造函數(shù)中傳入,我們通過構造函數(shù)逐步反推最外層的這一對象調(diào)用來源,最終可以定位到在AbstractDataSourceAdapter中的getRuntimeContext()方法中可以獲取到這個配置,而這個就是Sharding-JDBC實現(xiàn)的JDBC中Datasource接口的抽象類,我們直接在這個類中調(diào)用剛剛實現(xiàn)的refreshValue方法,剩下的就是監(jiān)聽配置,通過自己實現(xiàn)的AbstractDataSourceAdapter來調(diào)用這個方法就好了。
通過這一功能,我們可以方便的控制一些開關屬性的在線修改,如SQL打印、強制路由主庫等,業(yè)務無需重啟服務即可做到配置的動態(tài)生效。
業(yè)務中存在使用foreach標簽來批量update的語句,這種SQL在Sharding-JDBC中無法被正確路由,只會路由第一組參數(shù),后面的無法被路由改寫,原因是解析引擎無法將語句拆分解析。
批量update樣例
<update id="batchUpdate"> <foreach collectinotallow="orderList" item="item"> update t_order set status = 1, updated_by = #{item.updatedBy} WHERE created_by = #{item.createdBy}; </foreach> </update>
我們通過將批量update按照;拆分為多個語句,然后分別路由,最后手動匯總路有結果生成執(zhí)行單元。
為了能正確重寫SQL,批量update拆分后的語句需要完全一樣,這樣就不能使用動態(tài)拼接set條件,而是使用ifnull語法或者字段值不發(fā)生變化時也將原來的值放入set中,只不過set前后的值保持一致,整體思路與實現(xiàn)如下。
prepareBatch實現(xiàn)
org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#prepareBatch private ExecutionContext prepareBatch(List<String> splitSqlList, final List<Object> allParameters) { //SQL去重 List<String> sqlList = splitSqlList.stream().distinct().collect(Collectors.toList()); if (sqlList.size() > 1) { throw new ShardingSphereException("不支持多條SQL,請檢查SQL," + sqlList.toString()); } //以第一條SQL為標準 String sql = sqlList.get(0); //所有的執(zhí)行單元 Collection<ExecutionUnit> globalExecutionUnitList = new ArrayList<>(); //初始化最后的執(zhí)行結果 ExecutionContext executionContextResult = null; //根據(jù)所有參數(shù)數(shù)量和SQL語句數(shù)量 計算每組參數(shù)的數(shù)量 int eachSqlParameterCount = allParameters.size() / splitSqlList.size(); //平均分配每條SQL的參數(shù) List<List<Object>> eachSqlParameterListList = Lists.partition(allParameters, eachSqlParameterCount); for (List<Object> eachSqlParameterList : eachSqlParameterListList) { //每條SQL參數(shù)不同 需要根據(jù)參數(shù)路由不同的結果 實際的SqlStatementContext 是一致的 RouteContext routeContext = executeRoute(sql, eachSqlParameterList); //由于SQL一樣 實際的SqlStatementContext 是一致的 只需初始化一次 if (executionContextResult == null) { executionContextResult = new ExecutionContext(routeContext.getSqlStatementContext()); } globalExecutionUnitList.addAll(executeRewrite(sql, eachSqlParameterList, routeContext)); } //排序打印日志 executionContextResult.getExtendMap().put(EXECUTION_UNIT_LIST, globalExecutionUnitList.stream().sorted(Comparator.comparing(ExecutionUnit::getDataSourceName)).collect(Collectors.toList())); if (properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW)) { SQLLogger.logSQL(sql, properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SIMPLE), executionContextResult.getSqlStatementContext(), (Collection<ExecutionUnit>) executionContextResult.getExtendMap().get(EXECUTION_UNIT_LIST)); } return executionContextResult; }
這里我們在ExecutionContext單獨構造了一個了ExtendMap來存放ExecutionUnit,原因是ExecutionContext中的executionUnits是HashSet,而判斷ExecutionUnit中的SqlUnit只會根據(jù)SQL去重,批量update的SQL是一致的,但parameters不同,為了不影響原有的邏輯,單獨使用了另外的變量來存放。
ExecutionContext改造
@RequiredArgsConstructor@Getterpublic class ExecutionContext { private final SQLStatementContext sqlStatementContext; private final Collection<ExecutionUnit> executionUnits = new LinkedHashSet<>(); /** * 自定義擴展變量 */ private final Map<ExtendEnum,Object> extendMap = new HashMap<>(); /** * 定制擴展,是否可以跳過分片邏輯 */ @Setter private boolean skipShardingScenarioFlag = false;} @RequiredArgsConstructor@Getter@EqualsAndHashCode@ToStringpublic final class ExecutionUnit { private final String dataSourceName; private final SQLUnit sqlUnit;} @AllArgsConstructor@RequiredArgsConstructor@Getter@Setter//根據(jù)SQL判斷是否相等@EqualsAndHashCode(of = { "sql" })@ToStringpublic final class SQLUnit { private String sql; private final List<Object> parameters; }
我們還需要改造下執(zhí)行方法,在初始化執(zhí)行器的時候,判斷下ExtendMap中存在我們自定義的EXECUTION_UNIT_LIST是否存在,存在則使用生成InputGroup,同一個數(shù)據(jù)源下的ExecutionUnit會被放入同一個InputGroup中。
InputGroup改造
org.apache.shardingsphere.shardingjdbc.executor.PreparedStatementExecutor#init public void init(final ExecutionContext executionContext) throws SQLException { setSqlStatementContext(executionContext.getSqlStatementContext()); //兼容批量update 分庫分表后同一張表的情況 判斷是否存在EXECUTION_UNIT_LIST 存在則使用未去重的List進行后續(xù)的操作 if (MapUtils.isNotEmpty(executionContext.getExtendMap())){ Collection<ExecutionUnit> executionUnitCollection = (Collection<ExecutionUnit>) executionContext.getExtendMap().get(EXECUTION_UNIT_LIST); if(CollectionUtils.isNotEmpty(executionUnitCollection)){ getInputGroups().addAll(obtainExecuteGroups(executionUnitCollection)); } }else { getInputGroups().addAll(obtainExecuteGroups(executionContext.getExecutionUnits())); } cacheStatements(); }
改造完成后,批量update中的每條SQL都可以被正確路由執(zhí)行。
當where語句包括多個or條件時,而or條件不包含分片鍵時,會造成createShardingConditions方法生成重復的分片條件,導致重復調(diào)用doSharding方法。
如SELECT * FROM t_order WHERE created_by = ? and ( (status = ?) or (status = ?) or (status = ?) )這種SQL,存在三個or條件,分片鍵是created_by ,實際產(chǎn)生的shardingCondition會是三個一樣的值,并會調(diào)用三次doSharding的方法。雖然實際執(zhí)行還是只有一次(批量update那里說明過執(zhí)行單元會去重),但為了減少方法的重復調(diào)用,我們還是對這里做了一次去重。
去重的方法也比較簡單粗暴,我們對ListRouteValue和RangeRouteValue添加了@EqualsAndHashCode注解,然后在WhereClauseShardingConditionEngine的createShardingConditions方法返回最終結果前加一次去重,從而避免生成重復的shardingCondition造成doSharding方法的重復調(diào)用。
createShardingConditions去重
org.apache.shardingsphere.sharding.route.engine.condition.engine.WhereClauseShardingConditionEngine#createShardingConditions private Collection<ShardingCondition> createShardingConditions(final SQLStatementContext sqlStatementContext, final Collection<AndPredicate> andPredicates, final List<Object> parameters) { Collection<ShardingCondition> result = new LinkedList<>(); for (AndPredicate each : andPredicates) { Map<Column, Collection<RouteValue>> routeValueMap = createRouteValueMap(sqlStatementContext, each, parameters); if (routeValueMap.isEmpty()) { return Collections.emptyList(); } result.add(createShardingCondition(routeValueMap)); } //去重 Collection<ShardingCondition> distinctResult = result.stream().distinct().collect(Collectors.toCollection(LinkedList::new)); return distinctResult; }
分片表的SQL中如果沒有攜帶分片鍵(或者帶上了分片鍵結果沒有被正確解析)將會導致全路由,產(chǎn)生性能問題,而這種SQL并不會報錯,這就導致在實際的業(yè)務改造中,開發(fā)和測試很難保證百分百改造徹底。為此,我們在源碼層面對這種情況做了額外的校驗,當產(chǎn)生全路由,也就是ShardingConditions為空時,主動拋出異常,從而方便開發(fā)和測試能夠快速發(fā)現(xiàn)全路由SQL。
實現(xiàn)方式也比較簡單,校驗下ShardingConditions是否為空即可,只不過需要額外兼容下Hint策略ShardingConditions始終為空的特殊情況。
全路由校驗
org.apache.shardingsphere.sharding.route.engine.ShardingRouteDecorator#decoratepublic RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final ShardingRule shardingRule, final ConfigurationProperties properties) { //省略... //獲取 ShardingConditions ShardingConditions shardingConditions = getShardingConditions(parameters, sqlStatementContext, metaData.getSchema(), shardingRule); boolean hintAlgorithm = isHintAlgorithm(sqlStatementContext, shardingRule); //判斷是否允許全路由 if (!properties.<Boolean>getValue(ConfigurationPropertyKey.ALLOW_EMPTY_SHARDING_CONDITIONS)) { //如果不是Hint算法 if(!isHintAlgorithm(sqlStatementContext, shardingRule)){ /** 如果是DML語句 則可能有兩種情況 這兩種情況是根據(jù)getShardingConditions方法的內(nèi)部邏輯而來的 * 一種是非插入語句 shardingConditions.getConditions()為空即可 * 一種是插入語句 插入語句shardingConditions.getConditions()不會為空 但是ShardingCondition的routeValues是空的 */ if (sqlStatementContext.getSqlStatement() instanceof DMLStatement) { if(shardingConditions.getConditions().isEmpty()) { throw new ShardingSphereException("SQL不包含分庫分表鍵,請檢查SQL"); }else { if (sqlStatementContext instanceof InsertStatementContext) { List<ShardingCondition> routeValuesNotEmpty = shardingConditions.getConditions().stream().filter(r -> CollectionUtils.isNotEmpty(r.getRouteValues())).collect(Collectors.toList()); if(CollectionUtils.isEmpty(routeValuesNotEmpty)){ throw new ShardingSphereException("SQL不包含分庫分表鍵,請檢查SQL"); } } } } } } boolean needMergeShardingValues = isNeedMergeShardingValues(sqlStatementContext, shardingRule); //省略... return new RouteContext(sqlStatementContext, parameters, routeResult); } private boolean isHintAlgorithm(final SQLStatementContext sqlStatementContext, final ShardingRule shardingRule) { // 場景a 全局默認策略是否使用強制路由策略 if(shardingRule.getDefaultDatabaseShardingStrategy() instanceof HintShardingStrategy || shardingRule.getDefaultTableShardingStrategy() instanceof HintShardingStrategy){ return true; } for (String each : sqlStatementContext.getTablesContext().getTableNames()) { Optional<TableRule> tableRule = shardingRule.findTableRule(each); //場景b 指定表是否使用強制路由策略 if (tableRule.isPresent() && (shardingRule.getDatabaseShardingStrategy(tableRule.get()) instanceof HintShardingStrategy || shardingRule.getTableShardingStrategy(tableRule.get()) instanceof HintShardingStrategy)) { return true; } } return false; }
當然這塊功能也可以在完善些,比如對分片路由結果中的數(shù)據(jù)源數(shù)量進行校驗,從而避免跨庫操作,我們這邊沒有實現(xiàn)也就不再贅述了。
業(yè)務接入Sharding-JDBC的步驟是一樣的,都需要通過Java創(chuàng)建數(shù)據(jù)源和配置對象或者使用SpringBoot進行配置,存在一定的熟悉成本和重復開發(fā)的問題,為此我們也對定制開發(fā)版本的Sharding-JDBC封裝了一個公共組件,從而簡化業(yè)務配置,減少重復開發(fā),提升業(yè)務的開發(fā)效率,具體功能可見下。這塊沒有涉及源碼的改造,只是在定制版本上包裝的一個公共組件。
開源Sharding-JDBC配置
//數(shù)據(jù)源名稱spring.shardingsphere.datasource.names=ds0,ds1//ds0配置spring.shardingsphere.datasource.ds0.type=org.apache.commons.dbcp.BasicDataSourcespring.shardingsphere.datasource.ds0.driver-class-name=com.mysql.jdbc.Driverspring.shardingsphere.datasource.ds0.url=jdbc:mysql://localhost:3306/ds0spring.shardingsphere.datasource.ds0.username=rootspring.shardingsphere.datasource.ds0.password=//ds1配置spring.shardingsphere.datasource.ds1.type=org.apache.commons.dbcp.BasicDataSourcespring.shardingsphere.datasource.ds1.driver-class-name=com.mysql.jdbc.Driverspring.shardingsphere.datasource.ds1.url=jdbc:mysql://localhost:3306/ds1spring.shardingsphere.datasource.ds1.username=rootspring.shardingsphere.datasource.ds1.password=//分表規(guī)則spring.shardingsphere.sharding.tables.t_order.actual-data-nodes=ds$->{0..1}.t_order$->{0..1}spring.shardingsphere.sharding.tables.t_order.table-strategy.inline.sharding-column=order_idspring.shardingsphere.sharding.tables.t_order.table-strategy.inline.algorithm-expressinotallow=t_order$->{order_id % 2}spring.shardingsphere.sharding.tables.t_order_item.actual-data-nodes=ds$->{0..1}.t_order_item$->{0..1}spring.shardingsphere.sharding.tables.t_order_item.table-strategy.inline.sharding-column=order_idspring.shardingsphere.sharding.tables.t_order_item.table-strategy.inline.algorithm-expressinotallow=t_order_item$->{order_id % 2}//默認分庫規(guī)則spring.shardingsphere.sharding.default-database-strategy.inline.sharding-column=user_idspring.shardingsphere.sharding.default-database-strategy.inline.algorithm-expressinotallow=ds$->{user_id % 2}
組件簡化配置
//數(shù)據(jù)源名稱vivo.it.sharding.datasource.names = ds0,ds1//ds0配置vivo.it.sharding.datasource.ds0.url = jdbc:mysql://localhost:3306/ds1vivo.it.sharding.datasource.ds0.username = rootvivo.it.sharding.datasource.ds0.password =//ds1配置vivo.it.sharding.datasource.ds1.url = jdbc:mysql://localhost:3306/ds1vivo.it.sharding.datasource.ds1.username = rootvivo.it.sharding.datasource.ds1.password =//分表規(guī)則vivo.it.sharding.table.rule.config = [{"logicTable":"t_order,t_order_item","tableRange":"0..1","shardingColumn":"order_id ","algorithmExpression":"order_id %2"}]//默認分庫規(guī)則vivo.it.sharding.default.db.rule.config = {"shardingColumn":"user_id","algorithmExpression":"user_id %2"}
結合官方文檔和業(yè)務實踐經(jīng)驗,我們也梳理了部分使用Sharding-JDBC的建議供大家參考,實際具體如何優(yōu)化SQL寫法(比如子查詢、分頁、分組排序等)還需要結合業(yè)務的實際場景來進行測試和調(diào)優(yōu)。
(1)強制等級
建議①:涉及分片表的SQL必須攜帶分片鍵
原因:無分片鍵會導致全路由,存在嚴重的性能隱患
建議②:禁止一條SQL中的分片值路由至不同的庫
原因:跨庫操作存在嚴重的性能隱患,事務操作會升級為分布式事務,增加業(yè)務復雜度
建議③:禁止對分片鍵使用運算表達式或函數(shù)操作
原因:無法提前計算表達式和函數(shù)獲取分片值,導致全路由
說明:詳見官方文檔
建議④:禁止在子查詢中使用分片表
原因:無法正常解析子查詢中的分片表,導致業(yè)務錯誤
說明:雖然官方文檔中說有限支持子查詢 ,但在實際的使用中發(fā)現(xiàn)4.1.1并不支持子查詢,可見官方issue6164 | issue 6228。
建議⑤:包含CASE WHEN、HAVING、UNION (ALL)語法的分片SQL,不支持路由至多數(shù)據(jù)節(jié)點
說明:詳見官方文檔
(2)建議等級
① 建議使用分布式id來保證分片表主鍵的全局唯一性
原因:方便判斷數(shù)據(jù)的唯一性和后續(xù)的遷移擴容
說明:詳見文章《vivo 自研魯班分布式 ID 服務實踐》
② 建議跨多表的分組SQL的分組字段與排序字段保證一致
原因:分組和排序字段不一致只能通過內(nèi)存合并,大數(shù)據(jù)量時存在性能隱患
說明:詳見官方文檔
③ 建議通過全局遞增的分布式id來優(yōu)化分頁查詢
原因:Sharding-JDBC的分頁優(yōu)化側重于結果集的流式合并來避免內(nèi)存爆漲,但深度分頁自身的性能問題并不能解決
說明:詳見官方文檔
本文結合個人理解梳理了各個引擎的源碼入口和關鍵邏輯,讀者可以結合本文和官方文檔更好的定位理解Sharding-JDBC的源碼實現(xiàn)。定制開發(fā)的目的是為了降低業(yè)務接入成本,盡可能減少業(yè)務存量SQL的改造,部分改造思想其實與官方社區(qū)也存在差異,比如跳過語法解析,官方社區(qū)致力于通過優(yōu)化解析引擎來適配各種語法,而不是跳過解析階段,可參考官方issue。源碼分析和定制改造只涉及了Sharding-JDBC的數(shù)據(jù)分片和讀寫分離功能,定制開發(fā)的功能也在生產(chǎn)環(huán)境經(jīng)過了考驗,如有不足和優(yōu)化建議,也歡迎大家批評指正。
本文鏈接:http://www.www897cc.com/showinfo-26-76541-0.htmlSharding-JDBC源碼解析與vivo的定制開發(fā)
聲明:本網(wǎng)頁內(nèi)容旨在傳播知識,若有侵權等問題請及時與本網(wǎng)聯(lián)系,我們將在第一時間刪除處理。郵件:2376512515@qq.com