PostgreSQL PipelineDB插件实现数据条数汇总

原创
2020/10/16 11:46
阅读数 1.6K

中文教程: https://pipelinedb-doc-cn.readthedocs.io/zh_CN/latest/introduction.html

PipelineDB是一个用于在时序数据上持续执行SQL查询的高性能PostgreSQL插件。SQL查询的输出被持久化到普通的表中,可以像其它的表或视图一样进行查询。可以认为持续查询的结果是一个高吞吐量并且快速更新的物化视图。PipelineDB可以在某些任务场景下表现得十分优秀,若超出这个范畴,可能就会同其它数据处理系统一样面临一些问题。

客户端  快速开始 部分包含了一些PipelineDB的操作示例。

PipelineDB的长处

PipelineDB被设计用来在精简后的流式数据集上进行SQL查询。比如:概要和聚合;基于滑动时间窗口的性能计算;文本索引和过滤;空间信息过滤等。通过减少流数据的输入,PipelineDB可以显著地减少持久化到磁盘中的数据量,因为之后聚合后的结果被存储下来原始数据(foreign table)一旦被需要它的持续查询读取后就会被销毁。

大多数写入到PipelineDB中的数据可以被视为 虚拟数据。数据虚拟化是PipelineDB设计的精髓,凭着这种设计,PipelineDB可以只占用较小的硬件资源实现高效的大数据量处理。

PipelineDB的目标是消除许多数据传输中的ETL过程。原始数据流式写入PipelineDB,被已声明的流式查询实时地转换和提取,这使得它在将成型的输出加载到数据库前不必周期性地处理颗粒数据,前提是这些处理过程可以通过SQL定义。

在PipelineDB的设计理念中,实用性是第一要素,这也是我们将其包装为PostgreSQL插件的原因。所有数据存储和处理都交由PostgreSQL:一个极其稳定、成熟以及运用广泛的数据库。此外,PipelineDB兼容活跃的PostgreSQL生态中的所有工具。我们没有为PipelineDB设计特有的语法甚至是客户端,因为它可以很好地兼容任何基于PostgreSQL开发的库。

PipelineDB的短板

鉴于流查询需要一些 先验知识,PipelineDB不是一个特定的数据仓库。流式查询的输出可能在特定方式下被访问,所有写入到PipelineDB中的原始数据都不会被持久化因为它们都会在读取后被销毁。此外,如果流式计算不能以SQL的形式表达,PipelineDB可能并不是一个合适的选择!

流聚合

PipelineDB最核心的追求之一就是 促进高性能的连续聚合,所以聚合函数毫无疑问是PipelineDB的核心功能。连续聚合在大多数通用场景是非常有用的,它使得PipelineDB中持久化的数据始终与写入的数据保持同步。它可以通过一定的硬件实现稳定和高吞度量的服务。

连续聚合是随着流视图新 event 的生成实时 增量更新的。对于如 count  sum 之类的简单聚合,我们很容易理解结果是如何增量更新的–将新值累加到已有结果上而已。

但对更加复杂的聚合而言,如 avg, stddev, percentile_cont 等,需要更优化的架构来支持高效的增量更新,PipelineDB在内部自动实现了这些复杂的逻辑。

下面是所有PipelineDB支持的聚合函数的说明。有一些函数与标准的聚合函数有略微的差别以高效处理源源不断的流式数据,文中已经标注出了这部分函数的区别。

聚合支持

PipelineDB的设计初衷之一就是 提供简便、高性能的流式聚合,PostgreSQL的聚合函数在 流视图 中是完全支持的(还存在少数几个罕见的异常)。除了这一大票标准的聚合,PipelineDB也为时序流数据处理添加了一些特有 流聚合 算法。

查看 流聚合 部分了解更多PipelineDB的实用特性。

Docker测试环境准备

pipelineDB镜像继承自Postgresql数据库镜像,实现的时候需要设置下端口和用户密码

 docker run -d -p 5432:5432 -e POSTGRES_PASSWORD=postgres pipelinedb/pipelinedb-postgresql-11

PipelineDB流表、流视图、流转换

流(foreign table)

流是一种允许客户端将时序数据写入 流视图 的抽象管道。流里面的一行数据(或者简单称作 event),与数据表中的行数据是很相似的,并且二者的写入也是完全一致的。然而,流和数据表的语义是完全不同的。

换言之,event 只会在被所有的 流视图 消费完之前“存在”于流中。即使这样,用户仍然不能直接从流中 SELECT 数据。流唯一的作用就是充当 流视图 的输入。

流在PipelineDB中是作为 pipelinedb 外部服务 管理下的 外部表 存在的。创建外部表的语法跟创建普通的PostgreSQL数据表类似:

CREATE FOREIGN TABLE stream_name ( [
   { column_name data_type [ COLLATE collation ] } [, ... ]
] )
SERVER pipelinedb;

流视图

流视图(continuous view)是PipelineDB的基础概念抽象。流视图跟普通的数据库视图非常相似,但它是将流和表中的数据组合后作为输入并进行实时增量更新。流数据一旦被流视图读取后就会被销毁,流数据不会存储在任何地方。只有诸如 SELECT * FROM that_view 查询返回的结果才会被持久化,也就是说,流视图可以被视为高吞吐量、实时的物化视图。

流转换

流转换可以在不存储时序的情况下对其进行实时转换。由于数据不存储数据,所以流转换不支持聚合操作。转换后的数据既可以作为另一个流的输入,也可以写入到外部数据存储中。

--表

SELECT * FROM pipelinedb."_exec_lock";

SELECT * FROM pipelinedb.combine;

SELECT * FROM pipelinedb.cont_query;

SELECT * FROM pipelinedb.stream;

---视图

SELECT * FROM pipelinedb.db_stats;

SELECT * FROM pipelinedb.proc_query_stats;

SELECT * FROM pipelinedb.proc_stats;

SELECT * FROM pipelinedb.query_stats;

SELECT * FROM pipelinedb.stream_readers;

SELECT * FROM pipelinedb.stream_stats;

SELECT * FROM pipelinedb."transforms";

SELECT * FROM pipelinedb.views;

示例编写

下面是一个用户业务数据行汇总的流式视图查询(设计到组织机构和用户):

--创建事件流
drop FOREIGN table event_stream;
CREATE FOREIGN TABLE event_stream (i_orgid integer,c_orgbh varchar, i_userid integer,num integer) SERVER pipelinedb;

--创建视图
drop view org_event_stream_v ;
CREATE VIEW org_event_stream_v AS SELECT c_orgbh,sum(num) total FROM event_stream group by c_orgbh;

--创建视图
drop view org_event_like_stream_v ;
CREATE VIEW org_event_like_stream_v AS SELECT concat(c_orgbh,'%'),sum(num) total FROM event_stream where c_orgbh like concat(c_orgbh,'%') group by concat(c_orgbh,'%');

--创建视图
drop view user_event_stream_v;
CREATE VIEW user_event_stream_v AS SELECT i_userid,sum(num) total FROM event_stream group by i_userid;

--批量保存
INSERT INTO event_stream (i_orgid, c_orgbh,i_userid,num) values
(1,1000,1,1),(1,1000,1,1),
(2,1001,2,1),(2,1001,2,1),(2,1001,2,1),(2,1001,2,1),
(3,1002,3,1),(3,1003,3,1),(3,1002,3,1),(3,1003,3,1),(3,1002,3,1),(3,1003,3,1);

--批量保存
INSERT INTO event_stream (i_orgid, c_orgbh,i_userid,num) values
(4,10000001,4,1),(4,10000001,4,1),
(5,10010001,5,1),(5,10010001,5,1),(5,10010001,5,1),(5,10010001,5,1),
(6,10020001,6,1),(6,10020001,6,1),(6,10020001,6,1),(6,10020001,6,1),(6,10020001,6,1),(6,10020001,6,1);

--删除记录标记
INSERT INTO event_stream (i_orgid, c_orgbh,i_userid,num) values (4,10000001,4,-1);

select * from org_event_stream_v;

select * from  org_event_like_stream_v;

select * from user_event_stream_v;

分析:

  • 组织机构汇总只能汇总到具体的用户单位下,需要程序递归组织机构汇总(like汇总无效)
  • 用户的可以直接实现统计数据的加减(新增+1、删除-1,只需要在行记录进行标记)
  • 处理优势:不需要用户SQL去汇总计算,只需要汇总计算逻辑组织数据即可 

组织机构模糊查询再汇总:

select '1000%',sum(total) from  org_event_stream_v where c_orgbh like '1000%'
union all
select '1001%',sum(total) from  org_event_stream_v where c_orgbh like '1001%'
union all
select '1002%',sum(total) from  org_event_stream_v where c_orgbh like '1002%';

注:上面肯定有更好的方法来解决这类聚合,比如流聚合等等,有待深入研究!

Java客户端调用方法

下面是官方客户端方法,参考如何调用:

import java.util.Properties;
import java.sql.*;

public class Example {

  static final String HOST = "localhost";
  static final String DATABASE = "test";
  static final String USER = "user";

  public static void main(String[] args) throws SQLException {

    // Connect to "test" database on port 5432
    String url = "jdbc:postgresql://" + HOST + ":5432/" + DATABASE;
    ResultSet  rs;
    Properties props = new Properties();

    props.setProperty("user", USER);
    Connection conn = DriverManager.getConnection(url, props);

    Statement stmt = conn.createStatement();
    stmt.executeUpdate(
      "CREATE FOREIGN TABLE stream (x integer) SERVER pipelinedb");
    stmt.executeUpdate(
      "CREATE VIEW v WITH (action=materialize) AS SELECT x::integer, COUNT(*) FROM stream GROUP BY x");

    for (int i=0; i<100000; i++)
    {
      // 10 unique groupings
      int x = i % 10;

      // INSERT INTO stream (x) VALUES (x)
      stmt.addBatch("INSERT INTO stream (x) VALUES (" + Integer.toString(x) + ")");
    }

    stmt.executeBatch();

    rs = stmt.executeQuery("SELECT * FROM v");
    while (rs.next())
    {
      int id = rs.getInt("x");
      int count = rs.getInt("count");

      System.out.println(id + " = " + count);
    }

    // Clean up
    stmt.executeUpdate("DROP VIEW v");
    conn.close();
  }
}

 

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部