【技术干货】代码示例:使用 Apache Flink 连接 TDengine

原创
2022/05/25 17:30
阅读数 221
小 T 导读:想用 Flink 对接 TDengine?保姆级教程来了。

0、前言


TDengine 是由涛思数据开发并开源的一款高性能、分布式、支持 SQL 的时序数据库(Time-Series Database)。


除了核心的时序数据库功能外,TDengine 还提供缓存、数据订阅、流式计算等大数据平台所需要的系列功能。但是很多小伙伴出于架构的考虑,还是需要将数据导出到 Apache Flink、Apache Spark 等平台进行计算分析。


为了帮助大家对接,我们特别推出了保姆级课程,包学包会。


1、技术实现


Apache Flink 提供了 SourceFunction 和 SinkFunction,用来提供 Flink 和外部数据源的连接,其中 SouceFunction 为从数据源读取数据,SinkFunction 为将数据写入数据源。


与此同时,Flink 提供了 RichSourceFunction 和 RichSinkFunction 这两个类(继承自 AbstractRichFunction),提供了额外的初始化(open(Configuration))和销毁方法(close())。


通过重写这两个方法,可以避免每次读写数据时都重新建立连接。


2、代码实现


完整源码:https://github.com/liuyq-617/TD-Flink(需复制到浏览器打开)

代码逻辑:

1) 自定义类 SourceFromTDengine

用途:数据源连接,数据读取

package com.taosdata.flink;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import com.taosdata.model.Sensor;import java.sql.*;import java.util.Properties;
public class SourceFromTDengine extends RichSourceFunction<Sensor> { Statement statement; private Connection connection; private String property; public SourceFromTDengine(){ super(); }
@Override public void open(Configuration parameters) throws Exception { super.open(parameters); String driver = "com.taosdata.jdbc.rs.RestfulDriver"; String host = "u05"; String username = "root"; String password = "taosdata"; String prop = System.getProperty("java.library.path"); Logger LOG = LoggerFactory.getLogger(SourceFromTDengine.class); LOG.info("java.library.path:{}", prop); System.out.println(prop); Class.forName( driver ); Properties properties = new Properties(); connection = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/tt" + "?user=root&password=taosdata" , properties); statement = connection.createStatement(); }
@Override public void close() throws Exception { super.close(); if (connection != null) { connection.close(); } if (statement != null) { statement.close(); } }
@Override public void run(SourceContext<Sensor> sourceContext) throws Exception { try { String sql = "select * from tt.meters"; ResultSet resultSet = statement.executeQuery(sql); while (resultSet.next()) { Sensor sensor = new Sensor( resultSet.getLong(1), resultSet.getInt( "vol" ), resultSet.getFloat( "current" ), resultSet.getString( "location" ).trim()); sourceContext.collect( sensor ); } } catch (Exception e) { e.printStackTrace(); } }
@Override public void cancel() {
}}


2) 自定义类 SinkToTDengine

用途:数据源连接,数据写入

SinkToTDengine  

package com.taosdata.flink;
import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import com.taosdata.model.Sensor;import java.sql.*;import java.util.Properties;

public class SinkToTDengine extends RichSinkFunction<Sensor> { Statement statement; private Connection connection;
@Override public void open(Configuration parameters) throws Exception { super.open(parameters); String driver = "com.taosdata.jdbc.rs.RestfulDriver"; String host = "TAOS-FQDN"; String username = "root"; String password = "taosdata"; String prop = System.getProperty("java.library.path"); System.out.println(prop); Class.forName( driver ); Properties properties = new Properties(); connection = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/tt" + "?user=root&password=taosdata" , properties); statement = connection.createStatement();
}
@Override public void close() throws Exception { super.close(); if (connection != null) { connection.close(); } if (statement != null) { statement.close(); } }
@Override public void invoke(Sensor sensor, Context context) throws Exception { try { String sql = String.format("insert into sinktest.%s using sinktest.meters tags('%s') values(%d,%d,%f)", sensor.getLocation(), sensor.getLocation(), sensor.getTs(), sensor.getVal(), sensor.getCurrent() ); statement.executeUpdate(sql);
} catch (Exception e) { e.printStackTrace(); } }}


3) 自定义类 Sensor

用途:定义数据结构,用来接受数据

package com.taosdata.model;
public class Sensor {
public long ts; public int val; public float current; public String location;
public Sensor() {
}
public Sensor(long ts, int val, float current, String location) { this.ts = ts; this.val = val; this.current = current; this.location = location; }
public long getTs() { return ts; }
public void setTs(long ts) { this.ts = ts; }
public int getVal() { return val; }
public void setVal(int val) { this.val = val; }
public float getCurrent() { return current; }
public void setCurrent(float current) { this.current = current; }
public String getLocation() { return location; }
public void setLocation(String location) { this.location = location; }
@Override public String toString() { return "Sensor{" + "ts=" + ts + ", val=" + val + ", current=" + current + ", location='" + location + '\'' + '}'; }}


4) 主程序类 ReadFromTDengine

用途:调用 Flink 进行读取和写入数据

package com.taosdata;
import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.datastream.DataStream;import com.taosdata.model.Sensor;
import org.slf4j.LoggerFactory;import org.slf4j.Logger;
public class ReadFromTDengine { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Sensor> SensorList = env.addSource( new com.taosdata.flink.SourceFromTDengine() ); SensorList.print(); SensorList.addSink( new com.taosdata.flink.SinkToTDengine() ); env.execute();
}}


3、简单测试 RESTful 接口


1) 环境准备:

    a) Flink 安装&启动:

  • wget https://dlcdn.apache.org/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.12.tgz (需复制到浏览器打开)
  • tar zxf flink-1.14.3-bin-scala_2.12.tgz -C /usr/local
  • /usr/local/flink-1.14.3/bin/start-cluster.sh


    b) TDengine Database 环境准备:

  • 创建原始数据: 
    • create database tt;

    • create table `meters` (`ts` TIMESTAMP,`vol` INT,`current` FLOAT) TAGS (`location` BINARY(20));

    • insert into beijing using meters tags('beijing') values(now,220,30.2);

  • 创建目标数据库表: 
    • create database sinktest;

    • create table `meters` (`ts` TIMESTAMP,`vol` INT,`current` FLOAT) TAGS (`location` BINARY(20));


    2) 打包编译:

    源码位置:https://github.com/liuyq-617/TD-Flink (需复制到浏览器打开)
    mvn clean package

    3) 程序启动:

    flink run target/test-flink-1.0-SNAPSHOT-dist.jar
    • 读取数据 
      • vi log/flink-root-taskexecutor-0-xxxxx.out 
      • 查看到数据打印:Sensor{ts=1645166073101, val=220, current=5.7, location='beijing'}
    • 写入数据 
      • 已经创建了beijing 子表
        • show sinktest.tables; 
      • select * from sinktest.beijing; 
        • 可以查询到刚插入的数据


    4、使用 JNI 方式


    举一反三的小伙伴此时已经猜到,只要把 JDBC URL 修改一下就可以了。

    但是 Flink 每次分派作业时都在使用一个新的 ClassLoader,而我们在计算节点上就会得到“Native library already loaded in another classloader”错误。

    为了避免此问题,可以将 JDBC 的 jar 包放到 Flink 的 lib 目录下,不去调用 dist 包就可以了。

    • cp taos-jdbcdriver-2.0.37-dist.jar /usr/local/flink-1.14.3/lib
    • flink run target/test-flink-1.0-SNAPSHOT.jar


    5、小结

    通过在项目中引入 SourceFromTDengine 和 SinkToTDengine 两个类,即可完成在 Flink 中对 TDengine 的读写操作。后面我们会有文章介绍 Spark 和 TDengine 的对接。

    注:文中使用的是 JDBC 的 RESTful 接口,这样就不用在 Flink 的节点安装 TDengine,JNI 方式需要在 Flink 节点安装 TDengine Database 的客户端。


    👇 点击阅读原文,了解体验 TDengine!

    本文分享自微信公众号 - TDengine(taosdata_news)。
    如有侵权,请联系 support@oschina.cn 删除。
    本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

    展开阅读全文
    加载中
    点击引领话题📣 发布并加入讨论🔥
    0 评论
    0 收藏
    0
    分享
    返回顶部
    顶部