麒麟v10 上部署 TiDB v5.1.2 生产环境优化实践
920
2023-05-02
数据库中间件 MyCAT源码分析 —— 跨库两表Join
1. 概述
MyCAT 支持跨库表 Join,目前版本仅支持跨库两表 Join。虽然如此,已经能够满足我们大部分的业务场景。况且,Join 过多的表可能带来的性能问题也是很麻烦的。
本文主要分享:
整体流程、调用顺序图核心代码的分析
前置阅读:《MyCAT 源码分析 —— 【单库单表】查询》。
OK,Let's Go。
2. 主流程
当执行跨库两表 Join SQL 时,经历的大体流程如下:
SQL 上,需要添加注解 /*!mycat:catlet=io.mycat.catlets.ShareJoin */ ${SQL} 。RouteService#route(...) 解析注解 mycat:catlet 后,路由给 HintCatletHandler 作进一步处理。
HintCatletHandler 获取注解对应的 Catlet 实现类,io.mycat.catlets.ShareJoin 就是其中一种实现(目前也只有这一种实现),提供了跨库两表 Join 的功能。从类命名上看,ShareJoin 很大可能性后续会提供完整的跨库多表的 Join 功能。
核心代码如下:
// HintCatletHandler.java public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema, int sqlType, String realSQL, String charset, ServerConnection sc, LayerCachePool cachePool, String hintSQLValue, int hintSqlType, Map hintMap) throws SQLNonTransientException { String cateletClass = hintSQLValue; if (LOGGER.isDebugEnabled()) { LOGGER.debug("load catelet class:" + hintSQLValue + " to run sql " + realSQL); } try { Catlet catlet = (Catlet) MycatServer.getInstance().getCatletClassLoader().getInstanceofClass(cateletClass); catlet.route(sysConfig, schema, sqlType, realSQL, charset, sc, cachePool); catlet.processSQL(realSQL, new EngineCtx(sc.getSession2())); } catch (Exception e) { LOGGER.warn("catlet error " + e); throw new SQLNonTransientException(e); } return null; }
3. ShareJoin
目前支持跨库两表 Join。ShareJoin 将 SQL 拆分成左表 SQL 和 右表 SQL,发送给各数据节点执行,汇总数据结果进行合后返回。
伪代码如下:
// SELECT u.id, o.id FROM t_order o // INNER JOIN t_user u ON o.uid = u.id // 【顺序】查询左表 String leftSQL = "SELECT o.id, u.id FROM t_order o"; List leftList = dn[0].select(leftSQL) + dn[1].select(leftSQL) + ... + dn[n].select(leftsql); // 【并行】查询右表 String rightSQL = "SELECT u.id FROM t_user u WHERE u.id IN (${leftList.uid})"; for (dn : dns) { // 此处是并行执行,使用回调逻辑 for (rightRecord : dn.select(rightSQL)) { // 查询右表 // 合并结果 for (leftRecord : leftList) { if (leftRecord.uid == rightRecord.id) { write(leftRecord + leftRecord.uid 拼接结果); } } } }
实际情况会更加复杂,我们接下来一点点往下看。
3.1 JoinParser
JoinParser 负责对 SQL 进行解析。整体流程如下:
举个例子,/*!mycat:catlet=io.mycat.catlets.ShareJoin */ SELECT o.id, u.username from t_order o join t_user u on o.uid = u.id; 解析后,TableFilter 结果如下:
tName :表名tAlia :表自定义命名where :过滤条件order :排序条件parenTable :左连接的 Join 的表名。t_user表 在 join属性 的 parenTable 为 "o",即 t_order。joinParentkey :左连接的 Join 字段joinKey :join 字段。t_user表 在 join属性 为 id。join :子 tableFilter。即,该表连接的右边的表。parent :和 join属性 相对。
看到此处,大家可能有疑问,为什么要把 SQL 解析成 TableFilter。JoinParser 根据 TableFilter 生成数据节点执行 SQL。代码如下:
// TableFilter.java public String getSQL() { String sql = ""; // fields for (Entry
当 parent 为空时,即on/where 等于号左边的表。例如:select id, uid from t_order。当 parent 不为空时,即on/where 等于号右边的表。例如:select id, username from t_user where id in (1, 2, 3)。
3.2 ShareJoin.processSQL(...)
当 SQL 解析完后,生成左边的表执行的 SQL,发送给对应的数据节点查询数据。大体流程如下:
当 SQL 为 /*!mycat:catlet=io.mycat.catlets.ShareJoin */ SELECT o.id, u.username from t_order o join t_user u on o.uid = u.id; 时, sql = getSql() 的返回结果为 select id, uid from t_order。
生成左边的表执行的 SQL 后,顺序顺序顺序发送给对应的数据节点查询数据。具体顺序查询是怎么实现的,我们来看下章 BatchSQLJob。
3.3 BatchSQLJob
EngineCtx 对 BatchSQLJob 封装,提供上层两个方法:
executeNativeSQLSequnceJob :顺序(非并发)在每个数据节点执行SQL任务executeNativeSQLParallJob :并发在每个数据节点执行SQL任务
核心代码如下:
// EngineCtx.java public void executeNativeSQLSequnceJob(String[] dataNodes, String sql, SQLJobHandler jobHandler) { for (String dataNode : dataNodes) { SQLJob job = new SQLJob(jobId.incrementAndGet(), sql, dataNode, jobHandler, this); bachJob.addJob(job, false); } } public void executeNativeSQLParallJob(String[] dataNodes, String sql, SQLJobHandler jobHandler) { for (String dataNode : dataNodes) { SQLJob job = new SQLJob(jobId.incrementAndGet(), sql, dataNode, jobHandler, this); bachJob.addJob(job, true); } }
BatchSQLJob 通过执行中任务列表、待执行任务列表来实现顺序/并发执行任务。核心代码如下:
// BatchSQLJob.java /** * 执行中任务列表 */ private ConcurrentHashMap
顺序执行时,当 runningJobs 存在执行中的任务时,#addJob(...) 时,不立即执行,添加到 waitingJobs。当 SQLJob 完成时,顺序调用下一个任务。并发执行时,#addJob(...) 时,立即执行。
SQLJob SQL 异步执行任务。其 jobHandler(SQLJobHandler) 属性,在 SQL 执行有返回结果时,会进行回调,从而实现异步执行。
在 ShareJoin 里,SQLJobHandler 有两个实现:ShareDBJoinHandler、ShareRowOutPutDataHandler。前者,左边的表执行的 SQL 回调;后者,右边的表执行的 SQL 回调。
3.4 ShareDBJoinHandler
ShareDBJoinHandler,左边的表执行的 SQL 回调。流程如下:
#fieldEofResponse(...) :接收数据节点返回的 fields,放入内存。#rowResponse(...) :接收数据节点返回的 row,放入内存。#rowEofResponse(...) :接收完一个数据节点返回所有的 row。当所有数据节点都完成 SQL 执行时,提交右边的表执行的 SQL 任务,并行执行,即图中#createQryJob(...)。
当 SQL 为 /*!mycat:catlet=io.mycat.catlets.ShareJoin */ SELECT o.id, u.username from t_order o join t_user u on o.uid = u.id; 时, sql = getChildSQL() 的返回结果为 select id, username from t_user where id in (1, 2, 3)。
核心代码如下:
// ShareJoin.java private void createQryJob(int batchSize) { int count = 0; Map
3.5 ShareRowOutPutDataHandler
ShareRowOutPutDataHandler,右边的表执行的 SQL 回调。流程如下:
#fieldEofResponse(...) :接收数据节点返回的 fields,返回 header 给 MySQL Client。#rowResponse(...) :接收数据节点返回的 row,匹配左表的记录,返回合并后返回的 row 给 MySQL Client。#rowEofResponse(...) :当所有 row 都返回完后,返回 eof 给 MySQL Client。
核心代码如下:
// ShareRowOutPutDataHandler.java public boolean onRowData(String dataNode, byte[] rowData) { RowDataPacket rowDataPkgold = ResultSetUtil.parseRowData(rowData, bfields); //拷贝一份batchRows Map
4. 彩蛋
如下是本文涉及到的核心类,有兴趣的同学可以翻一翻。
ShareJoin 另外不支持的功能:
恩,MyCAT 弱XA 源码继续走起!
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。