小 T 导读:TDengine Kafka Connector(https://docs.taosdata.com/third-party/kafka/) 在 TDengine 的官方文档上放出来已经有一段时间了,我们也收到了一些开发者的反馈。文档中的教程使用 Confluent 平台(集成了 Kafka)演示了如何使用 Source Connector 和 Sink Connector,但是很多开发者在生产环境中并没有使用 Confluent,所以为方便大家,本文将使用独立部署的 Kafka 来演示。
· 什么是 Kafka?
· 什么是 Kafka Connect? 为什么使用 Kafka Connect?
TDengine Sink Connector 的实现原理
· 支持的数据格式
db.schemaless=line
value.converter=org.apache.kafka.connect.storage.StringConverter
· 如何指定 Consumer 的参数?
consumer.override.max.poll.records=3000
· 如何控制写入线程数?
TDengine Sink Connector 使用示例
· 环境准备
第一步:安装 Kafka
wget https://dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz
tar -xzf kafka_2.13-3.2.0.tgz
export KAFKA_HOME=/home/bding/kafka_2.13-3.2.0
export PATH=$PATH:$KAFKA_HOME/bin
source .bash_profile
第二步:配置 Kafka
cd kafka_2.13-3.2.0/config/
vi connect-standalone.properties
plugin.path=/home/bding/connectors
vi connect-log4j.properties
log4j.logger.com.taosdata.kafka.connect.sink=DEBUG
第三步:编译并安装插件
git clone git@github.com:taosdata/kafka-connect-tdengine.git
cd kafka-connect-tdengine
mvn clean package
unzip -d ~/connectors target/components/packages/taosdata-kafka-connect-tdengine-*.zip
第四步:启动 ZooKeeper Server 和 Kafka Server
zookeeper-server-start.sh -daemon $KAFKA_HOME/config/zookeeper.properties
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
第五步:创建 topic
kafka-topics.sh --create --topic meters --partitions 1 --bootstrap-server localhost:9092
第六步:生成测试数据
#!/usr/bin/python3
import random
import sys
topic = sys.argv[1]
count = int(sys.argv[2])
start_ts = 1648432611249000000
location = ["SanFrancisco", "LosAngeles", "SanDiego"]
for i in range(count):
ts = start_ts + i
row = f"{topic},location={location[i % 3]},groupid=2 current={random.random() * 10},voltage={random.randint(100, 300)},phase={random.random()} {ts}"
print(row)
python3 gen-data.py meters 10000 | kafka-console-producer.sh --broker-list localhost:9092 --topic meters
第七步:启动 Kafka Connect
name=TDengineSinkConnector
connector.class=com.taosdata.kafka.connect.sink.TDengineSinkConnector
tasks.max=1
topics=meters
connection.url=jdbc:TAOS://127.0.0.1:6030
connection.user=root
connection.password=taosdata
connection.database=power
db.schemaless=line
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
connect-standalone.sh -daemon $KAFKA_HOME/config/connect-standalone.properties sink-test.properties
第八步:检查 TDengine 中的数据
[ ]$ taos
Welcome to the TDengine shell from Linux, Client Version:2.6.0.4
Copyright (c) 2022 by TAOS Data, Inc. All rights reserved.
taos> select count(*) from power.meters;
count(*) |
========================
10000 |
TDengine Sink Connector 性能测试
· 测试流程
-
将 topic 的分区数作为脚本的第 1 个参数, 同时配置 tasks.max,使其等于分区数。这样我们可以控制每次测试使用的写入线程数。 -
将生成测试数据的条数作为脚本的第 2 个参数,用来控制每次测试同步的数据量。 启动测试前清空所有数据,测试结束后停止 Connect、Kafka 和 ZooKeeper。
每次测试都先写数据到 Kafka,然后再启动 Connect 同步数据到 TDengine,这样做可以把同步数据的压力全部集中到 Sink 插件这边。我们统计 Sink Connector 从接收到第一批数据到接收到最后一批数据之间的时间,作为同步数据的总耗时。
if [ $# -lt 2 ];then
echo "Usage: ./run-test.sh <num_of_partitions> <total_records>"
exit 0
fi
echo "---------------------------TEST STARTED---------------------------------------"
echo clean data and logs
taos -s "DROP DATABASE IF EXISTS power"
rm -rf /tmp/kafka-logs /tmp/zookeeper
rm -f $KAFKA_HOME/logs/connect.log
np=$1 # number of partitions
total=$2 # number of records
echo number of partitions is $np, number of recordes is $total.
echo start zookeeper
zookeeper-server-start.sh -daemon $KAFKA_HOME/config/zookeeper.properties
echo start kafka
sleep 3
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
sleep 5
echo create topic
kafka-topics.sh --create --topic meters --partitions $np --bootstrap-server localhost:9092
kafka-topics.sh --describe --topic meters --bootstrap-server localhost:9092
echo generate test data
python3 gen-data.py meters $total | kafka-console-producer.sh --broker-list localhost:9092 --topic meters
echo alter connector configuration setting tasks.max=$np
sed -i "s/tasks.max=.*/tasks.max=${np}/" sink-test.properties
echo start kafka connect
connect-standalone.sh -daemon $KAFKA_HOME/config/connect-standalone.properties sink-test.properties
echo -e "\e[1;31m open another console to monitor connect.log. press enter when no more data received.\e[0m"
read
echo stop connect
jps | grep ConnectStandalone | awk '{print $1}' | xargs kill
echo stop kafka server
kafka-server-stop.sh
echo stop zookeeper
zookeeper-server-stop.sh
# extract timestamps of receiving the first batch of data and the last batch of data
grep "records" $KAFKA_HOME/logs/connect.log | grep meters- > tmp.log
start_time=`cat tmp.log | grep -Eo "[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}" | head -1`
stop_time=`cat tmp.log | grep -Eo "[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}" | tail -1`
echo "--------------------------TEST FINISHED------------------------------------"
echo "| records | partitions | start time | stop time |"
echo "|---------|------------|------------|-----------|"
echo "| $total | $np | $start_time | $stop_time |"
./run-test.sh 1 1000000

[bding@vm95 ~]$ cd kafka_2.13-3.2.0/logs/
[bding@vm95 logs]$ tail -f connect.log
[2022-06-21 17:39:00,176] DEBUG [TDengineSinkConnector|task-0] Received 500 records. First record kafka coordinates:(meters-0-314496). Writing them to the database... (com.taosdata.kafka.connect.sink.TDengineSinkTask:101)
[2022-06-21 17:39:00,180] DEBUG [TDengineSinkConnector|task-0] Received 500 records. First record kafka coordinates:(meters-0-314996). Writing them to the database... (com.taosdata.kafka.connect.sink.TDengineSinkTask:101)
· 测试结果
写入速度与数据量和线程数的关系表

写入速度与数据量和线程数的关系图

从上图可以看出,相同数据量,线程越多写入速度越快。当使用单线程写入时,每秒能写入大概 10 万以上。当使用 5 个线程写入时,每秒写入大概 35 万左右。当使用10 个线程时,每秒能写入55 万左右。
写入速度比较平稳,与总数据量关系不大。
kafka-logs]$ du -h ./ -d 1
125M ./meters-8
149M ./meters-7
119M ./meters-9
138M ./meters-4
110M ./meters-3
158M ./meters-6
131M ./meters-5
105M ./meters-0
113M ./meters-2
99M ./meters-1
附录
· 测试程序
· 测试环境
✨\\\\٩( 'ω' )و ////✨
首届「TDengine 开发者大会」来啦!!
1⃣️ 一直被万众期待的 TDengine 3.0 将正式和大家见面
2⃣️ 基础软件领域知名大咖、知名投资人现场演讲
3⃣️ 各行各业 TDengine 典型用户现身说法
4⃣️ 现场惊喜不断,精美礼品拿到手软
❗️点击上方图片了解大会详情,读到最后有惊喜,快人一步获🉐️大会入场券🎫
👇 点击阅读原文,了解体验 TDengine!
本文分享自微信公众号 - TDengine(taosdata_news)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。