1
定义一个只接收一个整数的标量函数:输入 n, 输出 ln(n^2 + 1)。
2
定义一个接收 n 个整数的标量函数, 输入 (x1, x2, ..., xn), 输出每个值和它们的序号的乘积的和:x1 + 2 * x2 + ... + n * xn。
3
定义一个标量函数,输入一个时间戳,输出距离这个时间最近的下一个周日。完成这个函数要用到第三方库 moment。我们在这个示例中讲解使用第三方库的注意事项。
4
定义一个聚合函数,计算某一列最大值和最小值的差, 也就是实现 TDengien 内置的 spread 函数。
示例一
最简单的 UDF
from math import log
def init():
pass
def destroy():
pass
def process(block):
rows, _ = block.shape()
return [log(block.data(i, 0) ** 2 + 1) for i in range(rows)]
-
shape() 返回数据块的行数和列数 -
data(i, j) 返回 i 行 j 列的数据
create function myfun as '/root/udf/myfun.py' outputtype double language 'Python'
function myfun as '/root/udf/myfun.py' outputtype double language 'Python'; create
Create OK, 0 row(s) affected (0.005202s)
taos> show functions;
name |
=================================
myfun |
Query OK, 1 row(s) in set (0.005767s)
create database test;
create table t(ts timestamp, v1 int, v2 int, v3 int);
insert into t values('2023-05-01 12:13:14', 1, 2, 3);
insert into t values('2023-05-03 08:09:10', 2, 3, 4);
insert into t values('2023-05-10 07:06:05', 3, 4, 5);
taos> select myfun(v1, v2) from t;
DB error: udf function execution failure (0.011088s)
05/24 22:46:28.733545 01665799 UDF ERROR can not load library libtaospyudf.so. error: operation not permitted
05/24 22:46:28.733561 01665799 UDF ERROR can not load python plugin. lib path libtaospyudf.so
pip3 install taospyudf
root@slave11 ~/udf $ ls -l /usr/local/lib/libtaos*
-rw-r--r-- 1 root root 671344 May 24 22:54 /usr/local/lib/libtaospyudf.so
ldconfig
taos> select myfun(v1) from t;
myfun(v1) |
============================
0.693147181 |
1.609437912 |
2.302585093 |
示例一改进
异常处理
-
这个标量函数只接受 1 列数据作为输入,如果用户传入了多列也不会抛异常。我们期望改成:如果用户输入多列,则提醒用户输入错误,这个函数只接收 1 个参数。
taos> select myfun(v1, v2) from t;
myfun(v1, v2) |
============================
0.693147181 |
1.609437912 |
2.302585093 |
-
没有处理 null 值, 如果用户输入了 null 值则会抛异常终止执行。我们期望改成:如果输入是 null,则输出也是 null, 不影响后续执行。
def process(block):
rows, cols = block.shape()
if cols > 1:
raise Exception(f"require 1 parameter but given {cols}")
return [ None if block.data(i, 0) is None else log(block.data(i, 0) ** 2 + 1) for i in range(rows)]
create or replace function myfun as '/root/udf/myfun.py' outputtype double language 'Python';
taos> select myfun(v1, v2) from t;
DB error: udf function execution failure (0.014643s)
2023-05-24 23:21:06.790 ERROR [1666188] [doPyUdfScalarProc@507] call pyUdfScalar proc function. context 0x7faade26d180. error: Exception: require 1 parameter but given 2
At:
/var/lib/taos//.udf/myfun_3_1884e1281d9.py(12): process
示例二
接收 n 个参数的 UDF
def init():
pass
def destroy():
pass
def process(block):
rows, cols = block.shape()
result = []
for i in range(rows):
total = 0
for j in range(cols):
v = block.data(i, j)
if v is None:
total = None
break
total += (j + 1) * block.data(i, j)
result.append(total)
return result
create function nsum as '/root/udf/nsum.py' outputtype double language 'Python';
taos> insert into t values('2023-05-25 09:09:15', 6, null, 8);
Insert OK, 1 row(s) affected (0.003675s)
taos> select ts, v1, v2, v3, nsum(v1, v2, v3) from t;
ts | v1 | v2 | v3 | nsum(v1, v2, v3) |
================================================================================================
2023-05-01 12:13:14.000 | 1 | 2 | 3 | 14.000000000 |
2023-05-03 08:09:10.000 | 2 | 3 | 4 | 20.000000000 |
2023-05-10 07:06:05.000 | 3 | 4 | 5 | 26.000000000 |
2023-05-25 09:09:15.000 | 6 | NULL | 8 | NULL |
Query OK, 4 row(s) in set (0.010653s)
示例三
使用第三方库
pip3 install moment
import moment
def init():
pass
def destroy():
pass
def process(block):
rows, cols = block.shape()
if cols > 1:
raise Exception("require only 1 parameter")
if not type(block.data(0, 0)) is int:
raise Exception("type error")
return [moment.unix(block.data(i, 0)).replace(weekday=7).format('YYYY-MM-DD')
for i in range(rows)]
create function nextsunday as '/root/udf/nextsunday.py' outputtype binary(10) language 'Python';
taos> select ts, nextsunday(ts) from t;
DB error: udf function execution failure (1.123615s)
tail -20 taospyudf.log 2023-05-25 11:42 tail -20 taospyudf.log
2023-05-25 11:42:34.541 ERROR [1679419] [PyUdf::PyUdf@217] py udf load module failure. error ModuleNotFoundError: No module named 'moment':34.541 ERROR [1679419] [PyUdf::PyUdf@217] py udf load module failure. error ModuleNotFoundError: No module named 'moment'
grep 'sys path' taospyudf.log | tail -1
2023-05-25 10:58:48.554 INFO [1679419] [doPyOpen@592] python sys path: ['', '/lib/python38.zip', '/lib/python3.8', '/lib/python3.8/lib-dynload', '/lib/python3/dist-packages', '/var/lib/taos//.udf']
> import sys
":".join(sys.path) >
'/usr/lib/python3.8:/usr/lib/python3.8/lib-dynload:/usr/local/lib/python3.8/dist-packages:/usr/lib/python3/dist-packages'
UdfdLdLibPath /usr/lib/python3.8:/usr/lib/python3.8/lib-dynload:/usr/local/lib/python3.8/dist-packages:/usr/lib/python3/dist-packages
taos> select ts, nextsunday(ts) from t;
ts | nextsunday(ts) |
===========================================
2023-05-01 12:13:14.000 | 2023-05-07 |
2023-05-03 08:09:10.000 | 2023-05-07 |
2023-05-10 07:06:05.000 | 2023-05-14 |
2023-05-25 09:09:15.000 | 2023-05-28 |
Query OK, 4 row(s) in set (1.011474s)
示例四
定义聚合函数
import io
import math
import pickle
LOG_FILE: io.TextIOBase = None
def init():
global LOG_FILE
LOG_FILE = open("/var/log/taos/spread.log", "wt")
log("init function myspead success")
def log(o):
LOG_FILE.write(str(o) + '\n')
def destroy():
log("close log file: spread.log")
LOG_FILE.close()
def start():
return pickle.dumps((-math.inf, math.inf))
def reduce(block, buf):
max_number, min_number = pickle.loads(buf)
log(f"initial max_number={max_number}, min_number={min_number}")
rows, _ = block.shape()
for i in range(rows):
v = block.data(i, 0)
if v > max_number:
log(f"max_number={v}")
max_number = v
if v < min_number:
log(f"min_number={v}")
min_number = v
return pickle.dumps((max_number, min_number))
def finish(buf):
max_number, min_number = pickle.loads(buf)
return max_number - min_number
-
init 函数不再是空函数,而是打开了一个文件用于写执行日志 -
log 函数是记录日志的工具,自动将传入的对象转成字符串,加换行符输出 -
destroy 函数用来在执行结束关闭文件 -
start 返回了初始的 buffer,用来存聚合函数的中间结果,我们把最大值初始化为负无穷大,最小值初始化为正无穷大 -
reduce 处理每个数据块并聚合结果 -
finish 函数将最终的 buffer 转换成最终的输出
create or replace aggregate function myspread as '/root/udf/myspread.py' outputtype double bufsize 128 language 'Python';
-
增加了 aggregate 关键字 -
增加了 bufsize 关键字,用来指定存储中间结果的内存大小,这个数值可以大于实际使用的数值。本例中间结果是两个浮点数组成的 tuple,序列化后实际占用大小只有 32 个字节,但指定的 bufsize 是128,可以用 python 命令行打印实际占用的字节数
>>> len(pickle.dumps((12345.6789, 23456789.9877)))
32
taos> select myspread(v1) from t;
myspread(v1) |
============================
5.000000000 |
Query OK, 1 row(s) in set (0.013486s)
taos> select spread(v1) from t;
spread(v1) |
============================
5.000000000 |
Query OK, 1 row(s) in set (0.005501s)
root@slave11 /var/log/taos $ cat spread.log
init function myspead success
initial max_number=-inf, min_number=inf
max_number=1
min_number=1
initial max_number=1, min_number=1
max_number=2
max_number=3
initial max_number=3, min_number=1
max_number=6
close log file: spread.log
要点总结
-
创建标量函数的语法
CREATE FUNCTION function_name AS library_path OUTPUTTYPE output_type LANGUAGE 'Python';
-
创建聚合函数的语法
CREATE AGGREGATE FUNCTION function_name library_path OUTPUTTYPE output_type LANGUAGE 'Python';
-
更新 UDF 的语法 更新标量函数
CREATE OR REPLACE FUNCTION function_name AS OUTPUTTYPE int LANGUAGE 'Python';
CREATE OR REPLACE AGGREGATE FUNCTION function_name AS OUTPUTTYPE BUFSIZE buf_size int LANGUAGE 'Python';
-
同名的 UDF 每更新一次,版本号会增加 1。用
select * from ins_functions \G;
-
查看和删除已有的 UDF
SHOW functions;
DROP FUNCTION function_name;
-
安装 taospyudf 动态库
sudo pip3 install taospyudf
-
调试 Python UDF 的两个重要日志文件
-
/var/log/taos/udfdlog.* 这个文件是 UDF 框架的日志。框架负责加载各语言 UDF 的插件,执行 UDF 的生命周期函数 -
/var/log/taos/taospyudf.log 这个文件是 libtaospyudf.so 输出的日志,每个文件最大 50M,最多保留 5 个。
-
定义标量函数最重要是要实现 process 函数,同时必须定义 init 和 destroy 函数即使什么都不做。
def init():
pass
def process(block: datablock) -> tuple[output_type]:
rows, cols = block.shape()
result = []
for i in range(rows):
for j in range(cols):
cell_data = block.data(i, j)
# your logic here
return result
def destroy():
pass
-
定义聚合函数最重要是要实现 start, reduce 和 finish,同样必须定义 init 和 destroy 函数。 start 生成最初结果 buffer,然后输入数据会被分为多个行数据块,对每个数据块 inputs 和当前中间结果 buf 调用 reduce,得到新的中间结果,最后再调用 finish 从中间结果 buf 产生最终输出。 def init():
def destroy():
def start() -> bytes:
def reduce(inputs: datablock, buf: bytes) -> bytes
def finish(buf: bytes) -> output_type:
-
使用第三方 python 库。 使用第三方库需要检查这个库是否安装到了 Python UDF 插件默认的库搜索路径,如果没有需要修改 taos.cfg, 添加 UdfdLdLibPath 配置,库路径用冒号分隔。 -
UDF 内无法通过 print 函数输出日志,需要自己写文件或用 python 内置的 logging 库写文件。
往期推荐
本文分享自微信公众号 - TDengine(taosdata_news)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。