openGemini如何开发一个字符串算子?

原创
2023/10/13 18:06
阅读数 407

本篇文章内容前导知识可参考文章:openGemini大解析:查询引擎框架

本篇文章将教大家如何在openGemini内核中添加一个内置函数(算子)。这里以hello算子为例,实现的功能为打印"hello, $filed_value",类似这样的输出:

望能帮助大家熟悉算子开发流程,为熟悉内核源码打下一定的基础。

开发 hello 算子

在这里,我们将实现一个HELLO()函数,该函数有一个参数,即字符串。为此,您需要克隆 openGemini/openGemini 存储库

docs.opengemini.org/zh/dev-guide/get_started/build_source_code.html)

目前执行 hello 算子查询会报如下错误:

> select hello("name") from mstERR: undefined function hello()

我们正式开始修改源码:

第一步是在open_src/influx/query/compile.go中添加定义函数的名称,"hello":

func isStringFunction(call *influxql.Call) bool {  switch call.Name {  case "str", "strlen", "substr", "hello":    return true  }

下一步是修改engine/executor/schema.go,添加"hello"

func (qs *QuerySchema) isStringFunction(call *influxql.Call) bool {  switch call.Name {  case "str", "strlen", "substr", "hello":    return true  }

然后是修改open_src/influx/query/functions.go,添加"hello"的CallType

func (m StringFunctionTypeMapper) CallType(name string, _ []influxql.DataType) (influxql.DataType, error) {  switch name {  case "str":    return influxql.Boolean, nil  case "strlen":    return influxql.Integer, nil  case "substr":    return influxql.String, nil  case "hello":    return influxql.String, nil

最后需要补充我们的hello方法的真正实现,代码在engine/executor/string_functions.go中

func (v StringValuer) Call(name string, args []interface{}) (interface{}, bool) {  switch name {  case "strlen":   ......  case "hello":    if len(args) != 1 {      return nil, false    }    if arg0, ok := args[0].(string); ok {      return HelloFunc(arg0), true    }    return nil, true  default:    return nil, false}
func HelloFunc(srcStr string) string {    // 测试性能优化时放开下面注释  // var h []byte  // h = make([]byte, 200*1024*1024)  // fmt.Println(h)  return "hello, " + srcStr}

现在您需要重新构建 openGemini并尝试新添加的功能。(https://docs.opengemini.org/zh/dev-guide/get_started/build_source_code.html)

最终结果:

> insert mst name="Tom"> SELECT HELLO(name) from mst+----------------------+------------------+| time                 | hello            |+----------------------+------------------+| 2021-08-16T16:00:00Z | hello, Tom       |+----------------------+------------------+

单元测试

我们需要测试engine/executor/string_functions.go中的HelloFunc是否符合预期:以hello开头。

在engine/executor/string_function_test.go文件中,添加如下测试:

func TestStringFunctionHello(t *testing.T) {  stringValuer := executor.StringValuer{}  inputName := "hello"  inputArgs := []interface{}{"Alice", "Bob", "Carry"}  expects := []interface{}{"hello, Alice", "hello, Bob", "hello, Carry"}  outputs := make([]interface{}, 0, len(expects))  for _, arg := range inputArgs {    if out, ok := stringValuer.Call(inputName, []interface{}{arg}); ok {      outputs = append(outputs, out)    }  }  assert.Equal(t, outputs, expects)}

集成测试

如果需要添加集成测试(https://docs.opengemini.org/zh/dev-guide/get_started/test_tutorials.html),请在tests/server_test.go文件中,增加如下测试函数:

func TestServer_Query_Aggregate_For_Hello_Functions(t *testing.T) {  t.Parallel()  s := OpenServer(NewParseConfig(testCfgPath))  defer s.Close()
  if err := s.CreateDatabaseAndRetentionPolicy("db0", NewRetentionPolicySpec("rp0", 1, 0), true); err != nil {    t.Fatal(err)  }
  writes := []string{    fmt.Sprintf(`mst,country=china,name=azhu age=12.3,height=70i,address="shenzhen",alive=TRUE 1629129600000000000`),    fmt.Sprintf(`mst,country=american,name=alan age=20.5,height=80i,address="shanghai",alive=FALSE 1629129601000000000`),    fmt.Sprintf(`mst,country=germany,name=alang age=3.4,height=90i,address="beijin",alive=TRUE 1629129602000000000`),    fmt.Sprintf(`mst,country=japan,name=ahui age=30,height=121i,address="guangzhou",alive=FALSE 1629129603000000000`),    fmt.Sprintf(`mst,country=canada,name=aqiu age=35,height=138i,address="chengdu",alive=TRUE 1629129604000000000`),    fmt.Sprintf(`mst,country=china,name=agang age=48.8,height=149i,address="wuhan" 1629129605000000000`),    fmt.Sprintf(`mst,country=american,name=agan age=52.7,height=153i,alive=TRUE 1629129606000000000`),    fmt.Sprintf(`mst,country=germany,name=alin age=28.3,address="anhui",alive=FALSE 1629129607000000000`),    fmt.Sprintf(`mst,country=japan,name=ali height=179i,address="xian",alive=TRUE 1629129608000000000`),    fmt.Sprintf(`mst,country=canada age=60.8,height=180i,address="hangzhou",alive=FALSE 1629129609000000000`),    fmt.Sprintf(`mst,name=ahuang age=102,height=191i,address="nanjin",alive=TRUE 1629129610000000000`),    fmt.Sprintf(`mst,country=china,name=ayin age=123,height=203i,address="zhengzhou",alive=FALSE 1629129611000000000`),  }  test := NewTest("db0", "rp0")  test.writes = Writes{    &Write{data: strings.Join(writes, "\n")},  }
  test.addQueries([]*Query{    &Query{      name:    "SELECT hello(address)",      command: `SELECT hello("address") FROM db0.rp0.mst`,      exp:     `{"results":[{"statement_id":0,"series":[{"name":"mst","columns":["time","hello"],"values":[["2021-08-16T16:00:00Z","hello, shenzhen"],["2021-08-16T16:00:01Z","hello, shanghai"],["2021-08-16T16:00:02Z","hello, beijin"],["2021-08-16T16:00:03Z","hello, guangzhou"],["2021-08-16T16:00:04Z","hello, chengdu"],["2021-08-16T16:00:05Z","hello, wuhan"],["2021-08-16T16:00:07Z","hello, anhui"],["2021-08-16T16:00:08Z","hello, xian"],["2021-08-16T16:00:09Z","hello, hangzhou"],["2021-08-16T16:00:10Z","hello, nanjin"],["2021-08-16T16:00:11Z","hello, zhengzhou"]]}]}]}`,    },  }...)
  for i, query := range test.queries {    t.Run(query.name, func(t *testing.T) {      if i == 0 {        if err := test.init(s); err != nil {          t.Fatalf("test init failed: %s", err)        }      }      if query.skip {        t.Skipf("SKIP:: %s", query.name)      }      if err := query.Execute(s); err != nil {        t.Error(query.Error(err))      } else if !query.success() {        t.Error(query.failureMessage())      }    })  }}

性能分析(profiling)和优化

(一)性能分析(profiling)

对于任何数据库系统来说,性能始终很重要。如果你想知道性能瓶颈在哪里,可以使用一个强大的 Go 分析工具,名为pprof。

启动进程的时候需要修改配置文件,才能开启SQL端的pprof功能,端口为6061

[http]pprof-enabled = true

通过HTTP 端点收集运行时分析信息

通常,当 openGemini 服务器运行时,它会通过 HTTP 在http://127.0.0.1:6061/debug/pprof/profile. 您可以通过运行以下命令获取配置文件结果:

curl -G "http://127.0.0.1:6061/debug/pprof/profile?seconds=45" > profile.profile go tool pprof -http 127.0.0.1:4001 profile.profile

这些命令捕获45秒的分析信息,然后在浏览器中输入127.0.0.1:4001,打开分析CPU结果的 Web 视图。该视图包含执行的火焰图和更多可以帮助您诊断性能瓶颈的视图。https://www.brendangregg.com/flamegraphs.html

您还可以通过此端点收集其他运行时信息。例如:

  • goroutine

curl -G "http://127.0.0.1:6061/debug/pprof/goroutine" > goroutine.profilego tool trace -http 127.0.0.1:4001 goroutine.profile
  • trace(调用链)

curl -G "http://127.0.0.1:6061/debug/pprof/trace?seconds=3" > trace.profilego tool trace -http 127.0.0.1:4001 trace.profile
  • heap(内存)

curl -G "http://127.0.0.1:6061/debug/pprof/heap" > heap.profilego tool pprof -http 127.0.0.1:4001 heap.profile

要了解如何分析运行时信息,请参阅 Go 的诊断文档。(https://golang.org/doc/diagnostics)

 

内存申请火焰图:

HelloFunc,申请了845MB的内存。

优化后火焰图如下:

基本上看不到特别消耗内存的地方,总共才申请8MB左右。

(二)性能优化

性能优化的手段一般有:

1.优化GC,减少小对象申请

2.去掉无用对象的内存申请

3.缓存区内容一次分配足够大小空间,并适当复用

4.高并发的任务处理使用goroutine池

5.减少[]byte与string之间转换,尽量采用[]byte字符串处理

 

总结

我们已成功向openGemini内核添加了 hello 函数,并且完成了单元测试、集成测试、性能分析与优化,至此一个完整的开发流程就体验完毕了。在绝大部分企业级项目中都是遵循此开发节奏的。相信此篇文章能作为开发数据库项目的入门篇章,为开发者日后更加复杂的功能开发上带来一点点帮助


openGemini官网:http://www.openGemini.org

openGemini开源地址:https://github.com/openGemini

openGemini公众号:

欢迎关注~ 诚邀你加入 openGemini 社区,共建、共治、共享未来!

展开阅读全文
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部