文档章节

数据库中间件 MyCAT源码分析 —— 跨库两表Join

芋道源码
 芋道源码
发布于 2017/07/13 21:46
字数 2064
阅读 4.7K
收藏 87

行业解决方案、产品招募中!想赚钱就来传!>>>

wechat_mp

🙂🙂🙂关注微信公众号有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右


1. 概述

MyCAT 支持跨库表 Join,目前版本仅支持跨库表 Join。虽然如此,已经能够满足我们大部分的业务场景。况且,Join 过多的表可能带来的性能问题也是很麻烦的。

本文主要分享:

  1. 整体流程、调用顺序图
  2. 核心代码的分析

前置阅读:《MyCAT 源码分析 —— 【单库单表】查询》

OK,Let's Go。

2. 主流程

当执行跨库两表 Join SQL 时,经历的大体流程如下:

SQL 上,需要添加注解 /*!mycat:catlet=io.mycat.catlets.ShareJoin */ ${SQL}RouteService#route(...) 解析注解 mycat:catlet 后,路由给 HintCatletHandler 作进一步处理。

HintCatletHandler 获取注解对应的 Catlet 实现类,io.mycat.catlets.ShareJoin 就是其中一种实现(目前也只有这一种实现),提供了跨库两表 Join 的功能。从类命名上看,ShareJoin 很大可能性后续会提供完整的跨库多表的 Join 功能。

核心代码如下:

// HintCatletHandler.java
public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema,
                           int sqlType, String realSQL, String charset, ServerConnection sc,
                           LayerCachePool cachePool, String hintSQLValue, int hintSqlType, Map hintMap)
       throws SQLNonTransientException {
   String cateletClass = hintSQLValue;
   if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("load catelet class:" + hintSQLValue + " to run sql " + realSQL);
   }
   try {
       Catlet catlet = (Catlet) MycatServer.getInstance().getCatletClassLoader().getInstanceofClass(cateletClass);
       catlet.route(sysConfig, schema, sqlType, realSQL, charset, sc, cachePool);
       catlet.processSQL(realSQL, new EngineCtx(sc.getSession2()));
   } catch (Exception e) {
       LOGGER.warn("catlet error " + e);
       throw new SQLNonTransientException(e);
   }
   return null;
}

3. ShareJoin

目前支持跨库表 Join。ShareJoin 将 SQL 拆分成左表 SQL 和 右表 SQL,发送给各数据节点执行,汇总数据结果进行合后返回。

伪代码如下:

// SELECT u.id, o.id FROM t_order o 
// INNER JOIN t_user u ON o.uid = u.id
// 【顺序】查询左表
String leftSQL = "SELECT o.id, u.id FROM t_order o";
List leftList = dn[0].select(leftSQL) + dn[1].select(leftSQL) + ... + dn[n].select(leftsql);
// 【并行】查询右表
String rightSQL = "SELECT u.id FROM t_user u WHERE u.id IN (${leftList.uid})";
for (dn : dns) { // 此处是并行执行,使用回调逻辑
    for (rightRecord : dn.select(rightSQL)) { // 查询右表
        // 合并结果
        for (leftRecord : leftList) {
            if (leftRecord.uid == rightRecord.id) {
                write(leftRecord + leftRecord.uid 拼接结果);
            }
        }
    }
} 

实际情况会更加复杂,我们接下来一点点往下看。

3.1 JoinParser

JoinParser 负责对 SQL 进行解析。整体流程如下:

举个例子,/*!mycat:catlet=io.mycat.catlets.ShareJoin */ SELECT o.id, u.username from t_order o join t_user u on o.uid = u.id; 解析后,TableFilter 结果如下:

  • tName :表名
  • tAlia :表自定义命名
  • where :过滤条件
  • order :排序条件
  • parenTable :左连接的 Join 的表名。t_user表 在 join属性 的 parenTable 为 "o",即 t_order
  • joinParentkey :左连接的 Join 字段
  • joinKey :join 字段。t_user表 在 join属性 为 id
  • join :子 tableFilter。即,该表连接的右边的表。
  • parent :和 join属性 相对。

看到此处,大家可能有疑问,为什么要把 SQL 解析成 TableFilterJoinParser 根据 TableFilter 生成数据节点执行 SQL。代码如下:

// TableFilter.java
public String getSQL() {
   String sql = "";
   // fields
   for (Entry<String, String> entry : fieldAliasMap.entrySet()) {
       String key = entry.getKey();
       String val = entry.getValue();
       if (val == null) {
           sql = unionsql(sql, getFieldfrom(key), ",");
       } else {
           sql = unionsql(sql, getFieldfrom(key) + " as " + val, ",");
       }
   }
   // where
   if (parent == null) {    // on/where 等于号左边的表
       String parentJoinKey = getJoinKey(true);
       // fix sharejoin bug:
       // (AbstractConnection.java:458) -close connection,reason:program err:java.lang.IndexOutOfBoundsException:
       // 原因是左表的select列没有包含 join 列,在获取结果时报上面的错误
       if (sql != null && parentJoinKey != null &&
               !sql.toUpperCase().contains(parentJoinKey.trim().toUpperCase())) {
           sql += ", " + parentJoinKey;
       }
       sql = "select " + sql + " from " + tName;
       if (!(where.trim().equals(""))) {
           sql += " where " + where.trim();
       }
   } else {    // on/where 等于号右边边的表
       if (allField) {
           sql = "select " + sql + " from " + tName;
       } else {
           sql = unionField("select " + joinKey, sql, ",");
           sql = sql + " from " + tName;
           //sql="select "+joinKey+","+sql+" from "+tName;
       }
       if (!(where.trim().equals(""))) {
           sql += " where " + where.trim() + " and (" + joinKey + " in %s )";
       } else {
           sql += " where " + joinKey + " in %s ";
       }
   }
   // order
   if (!(order.trim().equals(""))) {
       sql += " order by " + order.trim();
   }
   // limit
   if (parent == null) {
       if ((rowCount > 0) && (offset > 0)) {
           sql += " limit" + offset + "," + rowCount;
       } else {
           if (rowCount > 0) {
               sql += " limit " + rowCount;
           }
       }
   }
   return sql;
}
  • parent 为空时,即on/where 等于号左边的表。例如:select id, uid from t_order
  • parent 不为空时,即on/where 等于号右边的表。例如:select id, username from t_user where id in (1, 2, 3)

3.2 ShareJoin.processSQL(...)

当 SQL 解析完后,生成左边的表执行的 SQL,发送给对应的数据节点查询数据。大体流程如下:

当 SQL 为 /*!mycat:catlet=io.mycat.catlets.ShareJoin */ SELECT o.id, u.username from t_order o join t_user u on o.uid = u.id; 时, sql = getSql() 的返回结果为 select id, uid from t_order

生成左边的表执行的 SQL 后,顺序顺序顺序发送给对应的数据节点查询数据。具体顺序查询是怎么实现的,我们来看下章 BatchSQLJob

3.3 BatchSQLJob

EngineCtxBatchSQLJob 封装,提供上层两个方法:

  1. executeNativeSQLSequnceJob :顺序(非并发)在每个数据节点执行SQL任务
  2. executeNativeSQLParallJob :并发在每个数据节点执行SQL任务

核心代码如下:

// EngineCtx.java
public void executeNativeSQLSequnceJob(String[] dataNodes, String sql,
		SQLJobHandler jobHandler) {
	for (String dataNode : dataNodes) {
		SQLJob job = new SQLJob(jobId.incrementAndGet(), sql, dataNode,
				jobHandler, this);
		bachJob.addJob(job, false);
	}
}

public void executeNativeSQLParallJob(String[] dataNodes, String sql,
		SQLJobHandler jobHandler) {
	for (String dataNode : dataNodes) {
		SQLJob job = new SQLJob(jobId.incrementAndGet(), sql, dataNode,
				jobHandler, this);
		bachJob.addJob(job, true);
	}
}	

BatchSQLJob 通过执行中任务列表待执行任务列表来实现顺序/并发执行任务。核心代码如下:

// BatchSQLJob.java
/**
* 执行中任务列表
*/
private ConcurrentHashMap<Integer, SQLJob> runningJobs = new ConcurrentHashMap<Integer, SQLJob>();
/**
* 待执行任务列表
*/
private ConcurrentLinkedQueue<SQLJob> waitingJobs = new ConcurrentLinkedQueue<SQLJob>();

public void addJob(SQLJob newJob, boolean parallExecute) {
   if (parallExecute) {
       runJob(newJob);
   } else {
       waitingJobs.offer(newJob);
       if (runningJobs.isEmpty()) { // 若无正在执行中的任务,则从等待队列里获取任务进行执行。
           SQLJob job = waitingJobs.poll();
           if (job != null) {
               runJob(job);
           }
       }
   }
}

public boolean jobFinished(SQLJob sqlJob) {
	runningJobs.remove(sqlJob.getId());
	SQLJob job = waitingJobs.poll();
	if (job != null) {
		runJob(job);
		return false;
	} else {
		if (noMoreJobInput) {
			return runningJobs.isEmpty() && waitingJobs.isEmpty();
		} else {
			return false;
		}
	}
}
  • 顺序执行时,当 runningJobs 存在执行中的任务时,#addJob(...) 时,不立即执行,添加到 waitingJobs。当 SQLJob 完成时,顺序调用下一个任务。
  • 并发执行时,#addJob(...) 时,立即执行。

SQLJob SQL 异步执行任务。其 jobHandler(SQLJobHandler) 属性,在 SQL 执行有返回结果时,会进行回调,从而实现异步执行。

ShareJoin 里,SQLJobHandler 有两个实现:ShareDBJoinHandlerShareRowOutPutDataHandler。前者,左边的表执行的 SQL 回调;后者,右边的表执行的 SQL 回调。

3.4 ShareDBJoinHandler

ShareDBJoinHandler左边的表执行的 SQL 回调。流程如下:

  • #fieldEofResponse(...) :接收数据节点返回的 fields,放入内存。
  • #rowResponse(...) :接收数据节点返回的 row,放入内存。
  • #rowEofResponse(...) :接收完一个数据节点返回所有的 row。当所有数据节点都完成 SQL 执行时,提交右边的表执行的 SQL 任务,并行执行,即图中**#createQryJob(...)**。

当 SQL 为 /*!mycat:catlet=io.mycat.catlets.ShareJoin */ SELECT o.id, u.username from t_order o join t_user u on o.uid = u.id; 时, sql = getChildSQL() 的返回结果为 select id, username from t_user where id in (1, 2, 3)

核心代码如下:

// ShareJoin.java
private void createQryJob(int batchSize) {
   int count = 0;
   Map<String, byte[]> batchRows = new ConcurrentHashMap<String, byte[]>();
   String theId = null;
   StringBuilder sb = new StringBuilder().append('(');
   String svalue = "";
   for (Map.Entry<String, String> e : ids.entrySet()) {
       theId = e.getKey();
       byte[] rowbyte = rows.remove(theId);
       if (rowbyte != null) {
           batchRows.put(theId, rowbyte);
       }
       if (!svalue.equals(e.getValue())) {
           if (joinKeyType == Fields.FIELD_TYPE_VAR_STRING
                   || joinKeyType == Fields.FIELD_TYPE_STRING) { // joinkey 为varchar
               sb.append("'").append(e.getValue()).append("'").append(','); // ('digdeep','yuanfang')
           } else { // 默认joinkey为int/long
               sb.append(e.getValue()).append(','); // (1,2,3)
           }
       }
       svalue = e.getValue();
       if (count++ > batchSize) {
           break;
       }
   }
   if (count == 0) {
       return;
   }
   jointTableIsData = true;
   sb.deleteCharAt(sb.length() - 1).append(')');
   String sql = String.format(joinParser.getChildSQL(), sb);
   getRoute(sql);
   ctx.executeNativeSQLParallJob(getDataNodes(), sql, new ShareRowOutPutDataHandler(this, fields, joinindex, joinParser.getJoinRkey(), batchRows, ctx.getSession()));
}

3.5 ShareRowOutPutDataHandler

ShareRowOutPutDataHandler右边的表执行的 SQL 回调。流程如下:

  • #fieldEofResponse(...) :接收数据节点返回的 fields,返回 header 给 MySQL Client。
  • #rowResponse(...) :接收数据节点返回的 row,匹配左表的记录,返回合并后返回的 row 给 MySQL Client。
  • #rowEofResponse(...) :当所有 row 都返回完后,返回 eof 给 MySQL Client。

核心代码如下:

// ShareRowOutPutDataHandler.java
public boolean onRowData(String dataNode, byte[] rowData) {
   RowDataPacket rowDataPkgold = ResultSetUtil.parseRowData(rowData, bfields);
   //拷贝一份batchRows
   Map<String, byte[]> batchRowsCopy = new ConcurrentHashMap<String, byte[]>();
   batchRowsCopy.putAll(arows);
   // 获取Id字段,
   String id = ByteUtil.getString(rowDataPkgold.fieldValues.get(joinR));
   // 查找ID对应的A表的记录
   byte[] arow = getRow(batchRowsCopy, id, joinL);
   while (arow != null) {
       RowDataPacket rowDataPkg = ResultSetUtil.parseRowData(arow, afields);//ctx.getAllFields());
       for (int i = 1; i < rowDataPkgold.fieldCount; i++) {
           // 设置b.name 字段
           byte[] bname = rowDataPkgold.fieldValues.get(i);
           rowDataPkg.add(bname);
           rowDataPkg.addFieldCount(1);
       }
       // huangyiming add
       MiddlerResultHandler middlerResultHandler = session.getMiddlerResultHandler();
       if (null == middlerResultHandler) {
           ctx.writeRow(rowDataPkg);
       } else {
           if (middlerResultHandler instanceof MiddlerQueryResultHandler) {
               byte[] columnData = rowDataPkg.fieldValues.get(0);
               if (columnData != null && columnData.length > 0) {
                   String rowValue = new String(columnData);
                   middlerResultHandler.add(rowValue);
               }
               //}
           }

       }
       arow = getRow(batchRowsCopy, id, joinL);
   }
   return false;
}

4. 彩蛋

如下是本文涉及到的核心类,有兴趣的同学可以翻一翻。

ShareJoin 另外不支持的功能:

  1. 只支持 inner join,不支持 left join、right join 等等连接。
  2. 不支持 order by。
  3. 不支持 group by 以及 相关聚合函数。
  4. 即使 join 左表的字段未声明为返回 fields 也会返回。

恩,MyCAT 弱XA 源码继续走起!

芋道源码

芋道源码

粉丝 346
博文 75
码字总数 193913
作品 0
徐汇
后端工程师
私信 提问
加载中
此博客有 1 条评论,请先登录后再查看。
Flappy Bird(安卓版)逆向分析(一)

更改每过一关的增长分数 反编译的步骤就不介绍了,我们直接来看反编译得到的文件夹 方法1:在smali目录下,我们看到org/andengine/,可以知晓游戏是由andengine引擎开发的。打开/res/raw/at...

enimey
2014/03/04
6K
18
Swift百万线程攻破单例(Singleton)模式

一、不安全的单例实现 在上一篇文章我们给出了单例的设计模式,直接给出了线程安全的实现方法。单例的实现有多种方法,如下面: class SwiftSingleton { } 这段代码的实现,在shared中进行条...

一叶博客
2014/06/20
3.4K
16
beego API开发以及自动化文档

beego API开发以及自动化文档 beego1.3版本已经在上个星期发布了,但是还是有很多人不了解如何来进行开发,也是在一步一步的测试中开发,期间QQ群里面很多人都问我如何开发,我的业余时间实在...

astaxie
2014/06/25
2.7W
22
树莓派(Raspberry Pi):完美的家用服务器

自从树莓派发布后,所有在互联网上的网站为此激动人心的设备提供了很多有趣和具有挑战性的使用方法。虽然这些想法都很棒,但树莓派( RPi )最明显却又是最不吸引人的用处是:创建你的完美家用...

异次元
2013/11/09
6.4K
8
程序猿媛一:Android滑动翻页+区域点击事件

滑动翻页+区域点击事件 ViewPager+GrideView 声明:博文为原创,文章内容为,效果展示,思路阐述,及代码片段。文尾附注源码获取途径。 转载请保留原文出处“http://my.oschina.net/gluoyer...

花佟林雨月
2013/11/09
4.2K
1

没有更多内容

加载失败,请刷新页面

加载更多

【重讲拉勾】5 MySQL索引失效

文章内容输出来源:拉勾教育Java高薪训练营 【重讲拉勾】1 JVM内存结构 https://my.oschina.net/u/4033707/blog/4444869 【重讲拉勾】2 Java容器 https://my.oschina.net/u/4033707/blog/44...

不正经的大叔
11分钟前
0
0
每个程序员应阅读的最具影响力的单一书籍是什么? [关闭]

问题: If you could go back in time and tell yourself to read a specific book at the beginning of your career as a developer, which book would it be? 如果你可以回到过去并告诉自己......

技术盛宴
26分钟前
5
0
Spring Cloud系列之微服务介绍

相信我,请认真读完,点开每一个链接,或许你才能真正了解什么是微服务?什么是分布式?什么是云计算?绝对没有多余! 1 微服务 微服务架构是“新常态”。构建小型的、自包含的、随时可以运行...

冯文议
42分钟前
6
0
在Visual Studio中使用Git [关闭] - Using Git with Visual Studio [closed]

问题: As a long-time Visual SourceSafe user (and hater) I was discussing switching to SVN with a colleague; 作为一个长期的Visual SourceSafe用户(和仇恨)我正在讨论与同事转换到S......

富含淀粉
今天
24
0
第三讲:毕业论文(设计)开题报告

本讲讲解开题报告的设计。 一、字体与格式 参看第二讲的内容,这里不再赘述。 二、头部信息 如下图所示,样式不要有太多的差异化。 三、选题的背景与意义 2.1背景 选题背景即阐述做这件事情(...

刘日辉
今天
244
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部