一文教你玩转 TDengine 3.0.4.0 重要特性 Python UDF

原创
2023/05/30 17:30
阅读数 111
AI总结






TDengine 3.0.4.0 发布了一个重要特性:支持用 Python 语言编写的自定义函数(UDF)。这个特性极大节省了 UDF 开发的时间成本。作为时序大数据处理平台,不支持 Python UDF 显然是不完整的。UDF 在实现自己业务中特有的逻辑时非常有用,比如量化交易场景计算自研的交易信号。本文内容由浅入深包括 4 个示例程序:

1

定义一个只接收一个整数的标量函数:输入 n, 输出 ln(n^2 + 1)。

2

定义一个接收 n 个整数的标量函数, 输入 (x1, x2, ..., xn), 输出每个值和它们的序号的乘积的和:x1 + 2 * x2 + ... + n * xn。

3

定义一个标量函数,输入一个时间戳,输出距离这个时间最近的下一个周日。完成这个函数要用到第三方库 moment。我们在这个示例中讲解使用第三方库的注意事项。

4

定义一个聚合函数,计算某一列最大值和最小值的差,  也就是实现 TDengien 内置的 spread 函数。

同时也包含大量实用的 debug 技巧。
本文假设你用的是 Linux 系统,且已安装好了 TDengine 3.0.4.0+ 和 Python 3.x。






示例一

最简单的 UDF

编写一个只接收一个整数的 UDF 函数:输入 n, 输出 ln(n^2 + 1)。
首先编写一个 Python 文件,存在系统某个目录,比如 /root/udf/myfun.py 内容如下:
from math import log
def init(): pass
def destroy(): pass
def process(block): rows, _ = block.shape() return [log(block.data(i, 0) ** 2 + 1) for i in range(rows)]
这个文件包含 3 个函数, init 和 destroy 都是空函数,它们是 UDF 的生命周期函数,即使什么都不做也要定义。最关键的是 process 函数, 它接受一个数据块,这个数据块对象有两个方法
  1. shape() 返回数据块的行数和列数
  2. data(i, j) 返回 i 行 j 列的数据
标量函数的 process 方法传入的数据块有多少行,就需要返回多少个数据。上述代码中我们忽略的列数,因为我们只想对每行的第一个数做计算。
接下来我们在时序数据库(Time Series Database) TDengine 中创建对应的 UDF 函数,执行下面语句:
create function myfun as '/root/udf/myfun.py' outputtype double language 'Python'
taos> create function myfun as '/root/udf/myfun.py' outputtype double language 'Python';Create OK, 0 row(s) affected (0.005202s)
看起来很顺利,接下来 show 一下系统中所有的自定义函数,确认创建成功:
taos> show functions;              name              |================================= myfun                          |Query OK, 1 row(s) in set (0.005767s)
接下来就来测试一下这个函数,测试之前先执行下面的 SQL 命令,制造些测试数据:
create database test;create table t(ts timestamp, v1 int, v2 int, v3 int);insert into t values('2023-05-01 12:13:14', 1, 2, 3);insert into t values('2023-05-03 08:09:10', 2, 3, 4);insert into t values('2023-05-10 07:06:05', 3, 4, 5);
测试 myfun 函数:
taos> select myfun(v1, v2) from t;
DB error: udf function execution failure (0.011088s)
不幸的是执行失败了,什么原因呢?
查看 udfd 进程的日志: /var/log/taos/udfd.log 发现以下错误信息:
05/24 22:46:28.733545 01665799 UDF ERROR can not load library libtaospyudf.so. error: operation not permitted05/24 22:46:28.733561 01665799 UDF ERROR can not load python plugin. lib path libtaospyudf.so
错误很明确:没有加载到 Python 插件 libtaospyudf.so, 看官方文档原来是要先安装 taospyudf 这个 Python 包。于是:
pip3 install taospyudf
安装过程会编译 C++ 源码,因此系统上要有 cmake 和 gcc。编译生成的 libtaospyudf.so 文件自动会被复制到 /usr/local/lib/ 目录,因此如果是非 root 用户,安装时需加 sudo。安装完可以检查这个目录是否有了这个文件:
root@slave11 ~/udf $ ls -l /usr/local/lib/libtaos*-rw-r--r-- 1 root root 671344 May 24 22:54 /usr/local/lib/libtaospyudf.so
这时再去执行 SQL 测试 UDF,会发现报同样的错误,原因是新安装的共享库还未生效,还需执行命令:
ldconfig
此时再去测试 UDF,终于成功了:
taos> select myfun(v1) from t;         myfun(v1)         |============================               0.693147181 |               1.609437912 |               2.302585093 |
至此,我们完成了第一个 UDF 😊,并学会了简单的 debug 方法。

示例一改进

异常处理

上面的 myfun 虽然测试测试通过了,但是有两个缺点:
  • 这个标量函数只接受 1 列数据作为输入,如果用户传入了多列也不会抛异常。我们期望改成:如果用户输入多列,则提醒用户输入错误,这个函数只接收 1 个参数。
taos> select myfun(v1, v2) from t;       myfun(v1, v2)       |============================               0.693147181 |               1.609437912 |               2.302585093 |
‍‍‍‍‍
没有处理 null 值。我们期望如果输入有 null,则会抛异常终止执行。

没有处理 null 值。我们期望如果输入有 null,则会抛异常终止执行。

  • 没有处理 null 值, 如果用户输入了 null 值则会抛异常终止执行。我们期望改成:如果输入是 null,则输出也是 null, 不影响后续执行。
因此 process 函数改进如下:
def process(block):    rows, cols = block.shape()    if cols > 1:        raise Exception(f"require 1 parameter but given {cols}")    return [ None if block.data(i, 0) is None else log(block.data(i, 0) ** 2 + 1) for i in range(rows)]
然后执行下面的语句更新已有的 UDF:
create or replace function myfun as '/root/udf/myfun.py' outputtype double language 'Python';
再传入 myfun 两个参数,就会执行失败了:
taos> select myfun(v1, v2) from t;
DB error: udf function execution failure (0.014643s)
但遗憾的是我们自定义的异常信息没有展示给用户,而是在插件的日志文件 /var/log/taos/taospyudf.log  中:
2023-05-24 23:21:06.790 ERROR [1666188] [doPyUdfScalarProc@507] call pyUdfScalar proc function. context 0x7faade26d180. error: Exception: require 1 parameter but given 2
At: /var/lib/taos//.udf/myfun_3_1884e1281d9.py(12): process
至此,我们学会了如何更新 UDF,并查看 UDF 输出的错误日志。
(注:如果 UDF 更新后未生效,可以重启 taosd 试试,TDengine 3.0.5.0 及以后的版本会确保不重启 UDF 更新就能生效)

示例二

接收 n 个参数的 UDF

编写一个 UDF:输入(x1, x2, ..., xn), 输出每个值和它们的序号的乘积的和:1 *  x1 + 2 * x2 + ... + n * xn。如果 x1 至 xn 中包含 null,则结果为 null。
这个示例与示例一的区别是,可以接受任意多列作为输入,且要处理每一列的值。编写 UDF 文件 /root/udf/nsum.py:
def init():    pass

def destroy(): pass

def process(block): rows, cols = block.shape() result = [] for i in range(rows): total = 0 for j in range(cols): v = block.data(i, j) if v is None: total = None break total += (j + 1) * block.data(i, j) result.append(total) return result
创建 UDF:
create function nsum as '/root/udf/nsum.py' outputtype double language 'Python';
测试:
taos> insert into t values('2023-05-25 09:09:15', 6, null, 8);Insert OK, 1 row(s) affected (0.003675s)
taos> select ts, v1, v2, v3, nsum(v1, v2, v3) from t; ts | v1 | v2 | v3 | nsum(v1, v2, v3) |================================================================================================ 2023-05-01 12:13:14.000 | 1 | 2 | 3 | 14.000000000 | 2023-05-03 08:09:10.000 | 2 | 3 | 4 | 20.000000000 | 2023-05-10 07:06:05.000 | 3 | 4 | 5 | 26.000000000 | 2023-05-25 09:09:15.000 | 6 | NULL | 8 | NULL |Query OK, 4 row(s) in set (0.010653s)

示例三

使用第三方库

编写一个 UDF,输入一个时间戳,输出距离这个时间最近的下一个周日。比如今天是 2023-05-25, 则下一个周日是 2023-05-28。
完成这个函数要用到第三方库 momen。先安装这个库:
pip3 install moment
然后编写 UDF 文件 /root/udf/nextsunday.py
import moment

def init(): pass

def destroy(): pass

def process(block): rows, cols = block.shape() if cols > 1: raise Exception("require only 1 parameter") if not type(block.data(0, 0)) is int: raise Exception("type error") return [moment.unix(block.data(i, 0)).replace(weekday=7).format('YYYY-MM-DD') for i in range(rows)]
UDF 框架会将 TDengine 的 timestamp 类型映射为 Python 的 int 类型,所以这个函数只接受一个表示毫秒数的整数。process 方法先做参数检查,然后用 moment 包替换时间的星期为星期日,最后格式化输出。输出的字符串长度是固定的10个字符长,因此可以这样创建 UDF 函数:
create function nextsunday as '/root/udf/nextsunday.py' outputtype binary(10) language 'Python';
此时测试函数,如果你是用 systemctl 启动的 taosd,肯定会遇到错误:
taos> select ts, nextsunday(ts) from t;
DB error: udf function execution failure (1.123615s)
 tail -20 taospyudf.log  2023-05-25 11:42 tail -20 taospyudf.log  2023-05-25 11:42:34.541 ERROR [1679419] [PyUdf::PyUdf@217] py udf load module failure. error ModuleNotFoundError: No module named 'moment':34.541 ERROR [1679419] [PyUdf::PyUdf@217] py udf load module failure. error ModuleNotFoundError: No module named 'moment'
这是因为 “moment” 所在位置不在 python udf 插件默认的库搜索路径中。怎么确认这一点呢?通过以下命令搜索 taospyudf.log:
grep 'sys path' taospyudf.log  | tail -1
2023-05-25 10:58:48.554 INFO  [1679419] [doPyOpen@592] python sys path: ['', '/lib/python38.zip', '/lib/python3.8', '/lib/python3.8/lib-dynload', '/lib/python3/dist-packages', '/var/lib/taos//.udf']
发现 python udf 插件默认搜索的第三方库安装路径是:/lib/python3/dist-packages,而 moment 默认安装到了 /usr/local/lib/python3.8/dist-packages。下面我们修改 python udf 插件默认的库搜索路径,把当前 python 解释器默认使用的库路径全部加进去。
先打开 python3 命令行,查看当前的 sys.path
>>> import sys>>> ":".join(sys.path)'/usr/lib/python3.8:/usr/lib/python3.8/lib-dynload:/usr/local/lib/python3.8/dist-packages:/usr/lib/python3/dist-packages'
复制上面脚本的输出的字符串,然后编辑 /var/taos/taos.cfg 加入以下配置:
UdfdLdLibPath /usr/lib/python3.8:/usr/lib/python3.8/lib-dynload:/usr/local/lib/python3.8/dist-packages:/usr/lib/python3/dist-packages
保存后执行 systemctl restart taosd, 再测试就不报错了
taos> select ts, nextsunday(ts) from t;           ts            | nextsunday(ts) |=========================================== 2023-05-01 12:13:14.000 | 2023-05-07     | 2023-05-03 08:09:10.000 | 2023-05-07     | 2023-05-10 07:06:05.000 | 2023-05-14     | 2023-05-25 09:09:15.000 | 2023-05-28     |Query OK, 4 row(s) in set (1.011474s)

示例四

定义聚合函数

编写一个聚合函数,计算某一列最大值和最小值的差。
聚合函数与标量函数的区别是:标量函数是多行输入对应多个输出,聚合函数是多行输入对应一个输出。聚合函数的执行过程有点像经典的 map-reduce 框架的执行过程,框架把数据分成若干块,每个 mapper 处理一个块,reducer 再把 mapper 的结果做聚合。
不一样的地方在于,对于 TDengine Python UDF 中的 reduce 函数既有 map 的功能又有 reduce 的功能。reduce 函数接受两个参数:一个是自己要处理的数据,一个是别的任务执行 reduce 函数的处理结果。如下面的示例 /root/udf/myspread.py:
import ioimport mathimport pickle
LOG_FILE: io.TextIOBase = None

def init(): global LOG_FILE LOG_FILE = open("/var/log/taos/spread.log", "wt") log("init function myspead success")

def log(o): LOG_FILE.write(str(o) + '\n')

def destroy(): log("close log file: spread.log") LOG_FILE.close()

def start(): return pickle.dumps((-math.inf, math.inf))

def reduce(block, buf): max_number, min_number = pickle.loads(buf) log(f"initial max_number={max_number}, min_number={min_number}") rows, _ = block.shape() for i in range(rows): v = block.data(i, 0) if v > max_number: log(f"max_number={v}") max_number = v if v < min_number: log(f"min_number={v}") min_number = v return pickle.dumps((max_number, min_number))

def finish(buf): max_number, min_number = pickle.loads(buf) return max_number - min_number
在这个示例中我们不光定义了一个聚合函数,还添加记录执行日志的功能,讲解如下:
  • init 函数不再是空函数,而是打开了一个文件用于写执行日志
  • log 函数是记录日志的工具,自动将传入的对象转成字符串,加换行符输出
  • destroy 函数用来在执行结束关闭文件
  • start 返回了初始的 buffer,用来存聚合函数的中间结果,我们把最大值初始化为负无穷大,最小值初始化为正无穷大
  • reduce 处理每个数据块并聚合结果
  • finish 函数将最终的 buffer 转换成最终的输出
执行下面的 SQL语句创建对应的 UDF:
create or replace aggregate function myspread as '/root/udf/myspread.py' outputtype double bufsize 128 language 'Python';
这个 SQL 语句与创建标量函数的 SQL 语句有两个重要区别:
  1. 增加了 aggregate 关键字
  2. 增加了 bufsize 关键字,用来指定存储中间结果的内存大小,这个数值可以大于实际使用的数值。本例中间结果是两个浮点数组成的 tuple,序列化后实际占用大小只有 32 个字节,但指定的 bufsize 是128,可以用 python 命令行打印实际占用的字节数
>>> len(pickle.dumps((12345.6789, 23456789.9877)))32
测试这个函数,可以看到 myspread 的输出结果和内置的 spread 函数的输出结果是一致的。
taos> select myspread(v1) from t;       myspread(v1)        |============================               5.000000000 |Query OK, 1 row(s) in set (0.013486s)
taos> select spread(v1) from t; spread(v1) |============================ 5.000000000 |Query OK, 1 row(s) in set (0.005501s)
最后,查看我们自己打印的执行日志,从日志可以看出,reduce 函数被执行了 3 次。执行过程中 max 值被更新了 4 次, min 值只被更新 1 次。
root@slave11 /var/log/taos $ cat spread.loginit function myspead successinitial max_number=-inf, min_number=infmax_number=1min_number=1initial max_number=1, min_number=1max_number=2max_number=3initial max_number=3, min_number=1max_number=6close log file: spread.log
通过这个示例,我们学会了如何定义聚合函数,并打印自定义的日志信息。


要点总结

  • 创建标量函数的语法
    CREATE FUNCTION function_name AS library_path OUTPUTTYPE output_type LANGUAGE 'Python';
    OUTPUTTYPE 对应的是 TDengine 的数据类型,如 TIMESTAMP, BIGINT, VARCHAR(64), 类型映射关系见官方文档:https://docs.taosdata.com/develop/udf/
    • 创建聚合函数的语法
      CREATE AGGREGATE FUNCTION function_name library_path OUTPUTTYPE output_type LANGUAGE 'Python';
      • 更新 UDF 的语法
        更新标量函数
      CREATE OR REPLACE FUNCTION function_name AS OUTPUTTYPE int LANGUAGE 'Python';
        更新聚合函数
      CREATE OR REPLACE AGGREGATE FUNCTION function_name AS OUTPUTTYPE BUFSIZE buf_size int LANGUAGE 'Python';
      注意:如果加了 “AGGREGATE” 关键字,更新之后函数将被当作聚合函数,无论之前是什么类型的函数。相反,如果没有加 “AGGREGATE” 关键字,更新之后的函数将被当作标量函数,无论之前是什么类型的函数。
      • 同名的 UDF 每更新一次,版本号会增加 1。用
      select * from ins_functions \G;
      可查看 UDF 的完整信息,包括 UDF 的源码。
      • 查看和删除已有的 UDF
        SHOW functions;DROP FUNCTION function_name;
        • 安装 taospyudf 动态库
          sudo pip3 install taospyudf
          安装过程会从源码编译出共享库 libtaospyudf.so,因此系统上要有 cmake 和 gcc,编译后这个库会被安装到 /usr/local/lib。安装完别忘了执行命令 ldconfig 更新系统动态链接库。
          • 调试 Python UDF 的两个重要日志文件
            1. /var/log/taos/udfdlog.*  这个文件是 UDF 框架的日志。框架负责加载各语言 UDF 的插件,执行 UDF 的生命周期函数
            2. /var/log/taos/taospyudf.log 这个文件是 libtaospyudf.so 输出的日志,每个文件最大 50M,最多保留 5 个。
          • 定义标量函数最重要是要实现 process 函数,同时必须定义 init 和 destroy 函数即使什么都不做。
            
            
            
          def init():  pass  def process(block: datablock) -> tuple[output_type]:    rows, cols = block.shape()    result = []    for i in range(rows):        for j in range(cols):            cell_data = block.data(i, j)            # your logic here    return result
          def destroy(): pass
            • 定义聚合函数最重要是要实现  start, reduce 和 finish,同样必须定义 init 和 destroy 函数。
              start 生成最初结果 buffer,然后输入数据会被分为多个行数据块,对每个数据块 inputs 和当前中间结果 buf 调用 reduce,得到新的中间结果,最后再调用 finish 从中间结果 buf 产生最终输出。
              def init():def destroy():def start() -> bytes:def reduce(inputs: datablock, buf: bytes) -> bytesdef finish(buf: bytes) -> output_type:
            • 使用第三方 python 库。
              使用第三方库需要检查这个库是否安装到了 Python UDF 插件默认的库搜索路径,如果没有需要修改 taos.cfg, 添加 UdfdLdLibPath 配置,库路径用冒号分隔。
            • UDF 内无法通过 print 函数输出日志,需要自己写文件或用 python 内置的 logging 库写文件。

             往期推荐  



            👇  点击阅读原文,快速体验 TDengine 3.0 !


            本文分享自微信公众号 - TDengine(taosdata_news)。
            如有侵权,请联系 support@oschina.cn 删除。
            本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

            展开阅读全文
            加载中
            点击引领话题📣 发布并加入讨论🔥
            0 评论
            0 收藏
            0
            分享
            AI总结
            返回顶部
            顶部