文档章节

Mycat的JDBC后端框架

IT哈哈
 IT哈哈
发布于 2016/07/02 16:59
字数 1402
阅读 125
收藏 0

JDBC方式访问后端数据库

 

Mycat对JDBC支持部分的代码比较简单,主要实现了下面三个类:

1. JDBCDatasource JDBC物理数据源

2. JDBCConnection JDBC连接类

3. JDBCHeartbeat JDBC心跳类

JDBC相关类图

JDBCDatasource

 

JDBCDatasource继承PhysicalDatasource

初始化的时候加载支持数据库的驱动

 

static {        // 加载可能的驱动     

                 List<String> drivers = Lists.newArrayList("com.mysql.jdbc.Driver", "org.opencloudb.jdbc.mongodb.MongoDriver", "oracle.jdbc.OracleDriver",                "com.microsoft.sqlserver.jdbc.SQLServerDriver","org.apache.hive.jdbc.HiveDriver","com.ibm.db2.jcc.DB2Driver","org.postgresql.Driver");  

              for (String driver : drivers) {        

                        try{               

                                  Class.forName(driver);      

                        } catch (ClassNotFoundException ignored)  {  

                        }    

               }   

 }

 

创建连接的时候,从配置文件中获取host,port,dbtype,还有连接数据库的url,User,Password 

    public void createNewConnection(ResponseHandler handler,

                 String schema) throws IOException { 

       DBHostConfig cfg = getConfig();

       JDBCConnection c = new JDBCConnection();

       c.setHost(cfg.getIp());

       c.setPort(cfg.getPort());

       c.setPool(this);

       c.setSchema(schema);

       c.setDbType(cfg.getDbType());

       try { 

           // TODO 这里应该有个连接池

           Connection con = getConnection();

           // c.setIdleTimeout(pool.getConfig().getIdleTimeout()); 

           c.setCon(con);            // notify handler

           handler.connectionAcquired(c); 

       } catch (Exception e) {

           handler.connectionError(e, c);

       } 

   }

获取连接的时候,判断是否配置的初始化语句,如果存在,就执行初始化语句,此功能可用于设置日期格式,字符集等

 

Connection getConnection() throws SQLException    {

       DBHostConfig cfg = getConfig();

       Connection connection = DriverManager.getConnection(cfg.getUrl(),

                               cfg.getUser(), cfg.getPassword());

       String initSql=getHostConfig().getConnectionInitSql();

       if(initSql!=null&&!"".equals(initSql)) //初始化语句是否存在        

       {

               Statement statement =null;

               try{

                      statement = connection.createStatement();

                      statement.execute(initSql);

                }finally{

                      if(statement!=null){ 

                                statement.close(); 

                      }

                 } 

       } 

       return connection; 

   }

 

mycat 又从哪里创建JDBCDatasource的呢? 

 

请看org.opencloudb.ConfigInitializer. 

判断是否dbType是mysql并且dbDriver是native,使用MySQLDataSource连接后台数据库,如果dbDriver是jdbc就使用JDBCDatasource连接后台数据库,否则抛出异常。 

 

    private PhysicalDatasource[] createDataSource(DataHostConfig conf,            String hostName, String dbType, String dbDriver,            DBHostConfig[] nodes, boolean isRead) {        PhysicalDatasource[] dataSources = new PhysicalDatasource[nodes.length];        if (dbType.equals("mysql") && dbDriver.equals("native")) {            for (int i = 0; i < nodes.length; i++) {                nodes[i].setIdleTimeout(system.getIdleTimeout());                MySQLDataSource ds = new MySQLDataSource(nodes[i], conf, isRead);                dataSources[i] = ds;            }        } else if(dbDriver.equals("jdbc"))//是jdbc方式            {            for (int i = 0; i < nodes.length; i++) {                nodes[i].setIdleTimeout(system.getIdleTimeout());                JDBCDatasource ds = new JDBCDatasource(nodes[i], conf, isRead);                dataSources[i] = ds;            }            }        else {            throw new ConfigException("not supported yet !" + hostName);        }        return dataSources;    }

JDBCConnection

JDBCConnection主要做两件事情,就是执行SQL语句,然后把执行结果发回给mpp(SQL合并引擎,mycat处理多节点结果集排序,分组,分页),需要实现ResponseHandler的接口。

下面来分析下执行SQL语句的代码:

创建线程Runnable,在线程中执行executeSQL的方法,并把线程放入MycatServer的线程池中执行,据测试,比不用线程方式执行SQL语句效率提高20%-30%。

 

  public void execute(final RouteResultsetNode node, final ServerConnection source,      final boolean autocommit) throws IOException {    Runnable runnable=new Runnable()    {      @Override      public void run()      {        try        {          executeSQL(node, source, autocommit);        } catch (IOException e)        {          throw new RuntimeException(e);        }      }    } ;    MycatServer.getInstance().getBusinessExecutor().execute(runnable);  }

 

执行SQL语句的过程,先判断是select,或show语句还是ddl语句 

1:如果是show指令,并且不是mysql数据库,执行ShowVariables.execute,构造mysql的固定信息包 

2:如果是SELECT CONNECTION_ID()语句,执行ShowVariables.justReturnValue,也是构造mysql的固定信息包 

3:如果是SELECT语句,执行并且有返回结果数据集 

4:如果是DDL语句,执行并且返回OkPacket 

 

private void executeSQL(RouteResultsetNode rrn, ServerConnection sc,                    boolean autocommit) throws IOException {

       String orgin = rrn.getStatement();

       if (!modifiedSQLExecuted && rrn.isModifySQL()) {

           modifiedSQLExecuted = true;

       } 

      try {

           if (!this.schema.equals(this.oldSchema)) {//判断

               con.setCatalog(schema);

               this.oldSchema = schema;

           } 

           if (!this.isSpark){//spark sql ,hive 不支持事务

              con.setAutoCommit(autocommit);

           }

           int sqlType = rrn.getSqlType();            //判断是否是查询或者mysql的show指令            

           if (sqlType == ServerParse.SELECT || sqlType == ServerParse.SHOW ) {                    

     if ((sqlType ==ServerParse.SHOW) && (!dbType.equals("MYSQL")) ){                    ShowVariables.execute(sc, orgin,this);//show指令的返回结果                              } else if("SELECT CONNECTION_ID()".equalsIgnoreCase(orgin)) {                         ShowVariables.justReturnValue(sc, String.valueOf(sc.getId()),this);            }else { 

            ouputResultSet(sc, orgin);//执行select语句,并处理结果集               

    }

} else {//sql ddl 执行

      executeddl(sc, orgin);

}

} catch (SQLException e) {//异常处理

           String msg = e.getMessage();

           ErrorPacket error = new ErrorPacket();

           error.packetId = ++packetId;

           error.errno = e.getErrorCode();

           error.message = msg.getBytes();            //触发错误数据包的响应事件               this.respHandler.errorResponse(error.writeToBytes(sc), this);

 } finally { 

           this.running = false; 

 }  }

ouputResultSet(sc, orgin);//执行select语句,并处理结果集 

stmt = con.createStatement();

rs = stmt.executeQuery(sql); 执行sql语句

List<FieldPacket> fieldPks = new LinkedList<FieldPacket>();//创建字段列表       //把字段的元数据转换为mysql的元数据并放入fieldPks中,主要是数据类型      ResultSetUtil.resultSetToFieldPacket(sc.getCharset(), fieldPks, rs, this.isSpark);

把字段信息封装成mysql的网络封包

 

      int colunmCount =fieldPks.size();

     ByteBuffer byteBuf = sc.allocate();

     ResultSetHeaderPacket headerPkg = new ResultSetHeaderPacket();

     headerPkg.fieldCount = fieldPks.size();

     headerPkg.packetId = ++packetId;

     byteBuf = headerPkg.write(byteBuf, sc, true);

     byteBuf.flip();

     byte[] header = new byte[byteBuf.limit()];

     byteBuf.get(header);

     byteBuf.clear();

     List<byte[]> fields = new ArrayList<byte[]>(fieldPks.size());

     Iterator<FieldPacket> itor = fieldPks.iterator();

     while (itor.hasNext()) {

           FieldPacket curField = itor.next();

           curField.packetId = ++packetId;

           byteBuf = curField.write(byteBuf, sc, false);

           byteBuf.flip();

           byte[] field = new byte[byteBuf.limit()];

           byteBuf.get(field);

           byteBuf.clear();

           fields.add(field);

           itor.remove();

      } 

      EOFPacket eofPckg = new EOFPacket(); 

      eofPckg.packetId = ++packetId;

      byteBuf = eofPckg.write(byteBuf, sc, false);

      byteBuf.flip();

      byte[] eof = new byte[byteBuf.limit()];

      byteBuf.get(eof);

      byteBuf.clear();

       //触发收到字段数据包结束的响应事件

      this.respHandler.fieldEofResponse(header, fields, eof, this);

 

遍历结果数据集ResultSet,并把每一条记录封装成一个数据包,数据发送完成,还需要在封装一个行结束的数据包 

 

// output row      

while (rs.next()) {

       RowDataPacket curRow = new RowDataPacket(colunmCount);

       for (int i = 0; i < colunmCount; i++) {

             int j = i + 1;

             curRow.add(StringUtil.encode(rs.getString(j), sc.getCharset()));

       }

       curRow.packetId = ++packetId;

       byteBuf = curRow.write(byteBuf, sc, false);

       byteBuf.flip();

       byte[] row = new byte[byteBuf.limit()];

       byteBuf.get(row);

       byteBuf.clear();

        //触发收到行数据包的响应事件

       this.respHandler.rowResponse(row, this);

 }      // end row      

 eofPckg = new EOFPacket();

 eofPckg.packetId = ++packetId;

 byteBuf = eofPckg.write(byteBuf, sc, false);

 byteBuf.flip();

 eof = new byte[byteBuf.limit()];

 byteBuf.get(eof);

  //收到行数据包结束的响应处理

 this.respHandler.rowEofResponse(eof, this);

 

JDBCHeartbeat

 

JDBCHeartbeat就是定时执行schema.xml中dataHost的heartbeat语句。

在启动的时候判断心跳语句是否为空,如果为空则执行stop(),后面再执行heartbeat()方法时,直接返回。

 

public class JDBCHeartbeat extends DBHeartbeat{

     private final ReentrantLock lock;

     private final JDBCDatasource source;

     private final boolean heartbeatnull;

     public JDBCHeartbeat(JDBCDatasource source)  {

   this.source = source;

          lock = new ReentrantLock(false);

          this.status = INIT_STATUS;

          this.heartbeatSQL = source.getHostConfig().getHearbeatSQL().trim();              this.heartbeatnull= heartbeatSQL.length()==0;//判断心跳语句是否为空                             }    

@Override  

public void start()//启动  

{    

    if (this.heartbeatnull){

          stop();

          return;

   } 

   lock.lock();

  try {

     isStop.compareAndSet(true, false);

     this.status = DBHeartbeat.OK_STATUS;

  } finally{

     lock.unlock();

   }  

}  

@Override

public void stop()//停止  

{    

      lock.lock();

       try{

            if (isStop.compareAndSet(false, true))      {

                isChecking.set(false);

            }

       } finally    {

            lock.unlock();

       }

 }

....  

@Override

public void heartbeat()//执行心跳语句  

{    

   if (isStop.get())      return;

   lock.lock();

   try    {

     isChecking.set(true);

     try (Connection c = source.getConnection()){ 

         try (Statement s = c.createStatement()){

              s.execute(heartbeatSQL);

          }

     }

     status = OK_STATUS;

   } catch (SQLException ex) {

          status = ERROR_STATUS;

   } finally  {

         lock.unlock();

         this.isChecking.set(false);

   } 

 }

 

更多内容请关注微信公众号:it_haha

© 著作权归作者所有

IT哈哈
粉丝 2
博文 34
码字总数 55375
作品 0
东城
私信 提问
数据库中间件01-认识mycat

简述 Mycat是国产的一套免费开源的分布式数据库中间件。想必做开发或者运维的朋友对中间件这个词应该是比较熟悉了,我们见到过java中间件,消息中间件等等,这里又来了一个数据库中间件。那么...

细节探索者
2018/05/11
75
0
Mycat - 高可用与负载均衡实现,满满的干货!

前情回顾 Mycat - 实现数据库的读写分离与高可用中我们实现了mysql的读写分离与高可用,有几个点我们回顾下 如果想学习Java工程化、高性能及分布式、深入浅出。微服务、Spring,MyBatis,Net...

编程SHA
02/20
188
0
Mycat - 实现数据库的读写分离与高可用

mysql主从实现 Mycat不负责任何的数据同步问题,mysql的主从复制还得从mysql层面来实现;如果没有实现mysql的主从复制,后文就都成 如果想学习Java工程化、高性能及分布式、深入浅出。微服务...

编程SHA
02/20
216
0
MyCAT PreparedStatement 重新入门

概述 相信很多同学在学习 JDBC 时,都碰到 和 。究竟该使用哪个呢?最终很可能是懵里懵懂的看了各种总结,使用 。那么本文,通过 MyCAT 对 的实现对大家能够重新理解下。 本文主要分成两部分...

wangchen1999
2018/05/02
34
0
Mycat学习实战-Mycat初识

Mycat学习实战-Mycat初识 Mycat学习实战-Mycat初识 1. Mycat是什么 2. Mycat与其他中间件的区别 3. Mycat能解决的问题 4. Mycat核心概念 5. Mycat文件夹以及文件介绍 1. Mycat是什么 Mycat是...

ygqygq2
2018/06/29
0
0

没有更多内容

加载失败,请刷新页面

加载更多

java通过ServerSocket与Socket实现通信

首先说一下ServerSocket与Socket. 1.ServerSocket ServerSocket是用来监听客户端Socket连接的类,如果没有连接会一直处于等待状态. ServetSocket有三个构造方法: (1) ServerSocket(int port);...

Blueeeeeee
今天
6
0
用 Sphinx 搭建博客时,如何自定义插件?

之前有不少同学看过我的个人博客(http://python-online.cn),也根据我写的教程完成了自己个人站点的搭建。 点此:使用 Python 30分钟 教你快速搭建一个博客 为防有的同学不清楚 Sphinx ,这...

王炳明
昨天
5
0
黑客之道-40本书籍助你快速入门黑客技术免费下载

场景 黑客是一个中文词语,皆源自英文hacker,随着灰鸽子的出现,灰鸽子成为了很多假借黑客名义控制他人电脑的黑客技术,于是出现了“骇客”与"黑客"分家。2012年电影频道节目中心出品的电影...

badaoliumang
昨天
15
0
很遗憾,没有一篇文章能讲清楚线程的生命周期!

(手机横屏看源码更方便) 注:java源码分析部分如无特殊说明均基于 java8 版本。 简介 大家都知道线程是有生命周期,但是彤哥可以认真负责地告诉你网上几乎没有一篇文章讲得是完全正确的。 ...

彤哥读源码
昨天
15
0
jquery--DOM操作基础

本文转载于:专业的前端网站➭jquery--DOM操作基础 元素的访问 元素属性操作 获取:attr(name);$("#my").attr("src"); 设置:attr(name,value);$("#myImg").attr("src","images/1.jpg"); ......

前端老手
昨天
7
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部