浪潮云溪分布式数据库协议代码解析(2)

原创
05/26 15:38
阅读数 898

- 数据请求阶段 -

Part 1 - 简单查询

 

1. 客户端发送Query (‘Q’)消息给服务端,包含了一条字符串类型的SQL语句。

func(cn *conn) query(query string, args []driver.Value)(_ *rows, err error){    ...    // Check to see if we can use the "simpleQuery"interface, which is    // *much* faster than going through prepare/exec    iflen(args)==0{        return cn.simpleQuery(query)    }    ...}

 

2. 服务端收到Query 消息,解析SQL语句,生成抽象语法树(AST),并传给执行器执行,获得结果。

func(c *conn) serveImpl(    ctx context.Context,    draining func()bool,    sqlServer *sql.Server,    reserved mon.BoundAccount,    stopper *stop.Stopper,)error{    ...Loop:    for{        typ, n, err = c.readBuf.ReadTypedMsg(&c.rd)        if err !=nil{            break Loop        }        ...        switch typ {        case pgwirebase.ClientMsgSimpleQuery:            ...        case pgwirebase.ClientMsgExecute:            ...        case pgwirebase.ClientMsgParse:            ...        case pgwirebase.ClientMsgDescribe:            ...        case pgwirebase.ClientMsgBind:            ...        case pgwirebase.ClientMsgSync:            ...        }    }    ...}

 

3. 服务端根据SQL结果,首先发送RowDescription(B:‘T’)消息,包含列的数量,列名,列的类型等参数。​​​​​​​

func(c *conn) writeRowDescription(    ctx context.Context,    columns []sqlbase.ResultColumn,    formatCodes []pgwirebase.FormatCode,    w io.Writer,)error{    c.msgBuilder.initMsg(pgwirebase.ServerMsgRowDescription)    c.msgBuilder.putInt16(int16(len(columns)))    for i, column :=range columns {        ...        c.msgBuilder.writeTerminatedString(column.Name)        ...        c.msgBuilder.putInt32(0)//Table OID (optional).        c.msgBuilder.putInt16(0)//Column attribute ID (optional).        c.msgBuilder.putInt32(int32(typ.oid))        c.msgBuilder.putInt16(int16(typ.size))        ...    }    ...}

 

4. RowDescription消息后面将跟着多个DataRow(B:‘D’)消息,每个DataRow消息包含一行的数据。

func(c *conn) bufferRow(    ctx context.Context,    row tree.Datums,    formatCodes []pgwirebase.FormatCode,    convsessiondata.DataConversionConfig,    types []*types.T,){    c.msgBuilder.initMsg(pgwirebase.ServerMsgDataRow)    c.msgBuilder.putInt16(int16(len(row)))    for i, col :=range row {        ...        switch fmtCode {        case pgwirebase.FormatText:            c.msgBuilder.writeTextDatum(ctx, col, conv, types[i])        case pgwirebase.FormatBinary:            c.msgBuilder.writeBinaryDatum(ctx, col, conv.Location, types[i])        ...    }    if err := c.msgBuilder.finishMsg(&c.writerState.buf); err !=nil{        panic(fmt.Sprintf("unexpected err from buffer: %s", err))    }}

 

5. 发送CommandComplete(B:‘C’)消息表示这个SQL请求执行结束了。

6. 服务端发送ReadyForQuery(‘Z’),通知客户端可以发送下一条SQL请求了。

func(r *commandResult) Close(ctx context.Context, t sql.TransactionStatusIndicator){    ...    switch r.typ {    case commandComplete:        tag := cookTag(            r.cmdCompleteTag, r.conn.writerState.tagBuf[:0], r.stmtType, r.rowsAffected,        )        r.conn.bufferCommandComplete(tag)        case parseComplete:        r.conn.bufferParseComplete()    case bindComplete:        r.conn.bufferBindComplete()    case closeComplete:        r.conn.bufferCloseComplete()    case readyForQuery:        r.conn.bufferReadyForQuery(byte(t))        // The error is saved on conn.err.        _ /* err */= r.conn.Flush(r.pos)        ...    }    ...}

 

7.客户端根据接受SQL请求的结果。

func(cn *conn) simpleQuery(q string)(res *rows, err error){    b := cn.writeBuf('Q')    b.string(q)    cn.send(b)
    for{        t, r := cn.recv1()        switch t {        case'C','I':            ...        case'Z':            ...        case'E':            ...        case'D':            ...        case'T':            ...        }    }}

 

Part 2 - 扩展查询

 

1. 客户端发送扩展查询请求,依次发送Parse (F:‘P’), Bind (F:‘B’), Describe (F:‘D’), Execute (F:‘E’), Sync(F:‘S’)消息。

func(cn *conn) query(query string, args []driver.Value)(_ *rows, err error){    ...    if cn.binaryParameters {        cn.sendBinaryModeQuery(query, args)        cn.readParseResponse()        cn.readBindResponse()        rows :=&rows{cn: cn}        rows.rowsHeader = cn.readPortalDescribeResponse()        cn.postExecuteWorkaround()        return rows,nil    }    ...}
func(cn *conn) sendBinaryModeQuery(query string, args []driver.Value){    b := cn.writeBuf('P')    b.byte(0)//unnamed statement    b.string(query)    b.int16(0)
    b.next('B')    b.int16(0)//unnamed portal and statement    cn.sendBinaryParameters(b, args)    b.bytes(colFmtDataAllText)
    b.next('D')    b.byte('P')    b.byte(0)//unnamed portal
    b.next('E')    b.byte(0)    b.int32(0)
    b.next('S')    cn.send(b)}

 

2. 服务端处理扩展查询请求。

 

3. 服务端发送回应消息,ParseComplete (B:‘1’), BindComplete (B:‘2’), ParameterDescription(B:‘t’), CommandComplete (B:‘C’), CloseComplete (B:‘3’), ReadyForQuery (B:‘Z’)​​​​​​​

func(r *commandResult) Close(ctx context.Context, t sql.TransactionStatusIndicator){    ...    switch r.typ {    case commandComplete:        tag := cookTag(            r.cmdCompleteTag, r.conn.writerState.tagBuf[:0], r.stmtType, r.rowsAffected,        )        r.conn.bufferCommandComplete(tag)        case parseComplete:        r.conn.bufferParseComplete()    case bindComplete:        r.conn.bufferBindComplete()    case closeComplete:        r.conn.bufferCloseComplete()    case readyForQuery:        r.conn.bufferReadyForQuery(byte(t))        // The error is saved on conn.err.        _ /* err */= r.conn.Flush(r.pos)        ...    }    ...}
展开阅读全文
加载中

作者的其它热门文章

打赏
0
0 收藏
分享
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部
返回顶部
顶部