基于StarRocks的指标平台查询加速方案

原创
06/24 17:48
阅读数 547

#项目背景

指标管理平台按指标查询类型可以划为落表指标和即席查询指标。

  • 落表指标:可选择不同的维度生成多个结果表(每天提交任务写入结果表),对指标进行取数的时候会根据查询条件自动匹配最合适的结果表进行查询。

  • 即席查询指标:不产生结果表,每次取数根据指标计算规则以及查询条件动态生成SQL去指标来源表中查询

举例说明:现有一张订单明细表 order_info,表结构如下

CREATE TABLE order_info (

order_id varchar(64) NOT NULL COMMENT "订单id",

pt varchar(12) NOT NULL COMMENT "用户id",

user_id varchar(64) NOT NULL COMMENT "用户id",

price double NULL COMMENT "",

project_id int(11) NOT NULL COMMENT "产品id",

channel varchar(64) NULL COMMENT "渠道"

) ENGINE=OLAP

PRIMARY KEY(order_id,pt)

PARTITION BY (pt)

DISTRIBUTED BY HASH(order_id)

PROPERTIES ( "replication_num" = "3",

"in_memory" = "false",

"enable_persistent_index" = "true",

"replicated_storage" = "true",

"compression" = "LZ4"

);

##构建指标

(1)创建模型:示例只有单表不需要增加关联,选择price作为度量列,user_id、project_id、channel作为维度列。

(2)创建原子指标:销售额、计算逻辑 sum(price) , 维度为模型的全部维度。

(3)构建落表派生指标:当日销售金额、指标计算逻辑 sum(price) , 落表维度分别选择 channel (渠道当日销售金额), project_id (产品当日销售金额)

-- 渠道当日销售金额 create table sum_price_day_channel as select sum(price) as sum_price_day , channel , '{pt}' from order_info where pt = '{pt}' group by channel;

-- 产品当日销售金额 create table sum_price_day_project as select sum(price) as sum_price_day , project_id , '{pt}' from order_info where pt = '{pt}' group by project_id;

(4)构建即席查询派生指标:当日销售金额、指标计算逻辑 sum(price), 支持维度选择 channel、project_id。

##查询指标

(1)根据维度channel ,20250101<= pt <= 20250105 查询

a.即席查询:实时生成sql

select sum(price) as sum_price_day,channel,pt

from order_info

where pt >= '20250101' and pt <= '20250105'

group by channel,pt

b.落表查询:当 sum_price_channel 表包含所有需要查询的日期,否则根据即席查询生成sql获取数据。

-- 当sum_price_channel包含所有查询日期

select sum_price_day,channel,pt

from sum_price_day_channel

where pt >= '20250101' and pt <= '20250105'

(2)根据维度channel、project ,20250101<= pt <= 20250105 查询

因为落表指标没有同时包含channel、project_id的结果表则走即席查询逻辑

select sum(price) as sum_price_day,channel,project_id,pt from order_info

where pt >= '20250101' and pt <= '20250105'

group by channel,project_id,pt

#StarRocks物化视图 file ##同步物化视图

###限制

只支持单表

本质上是基表的索引而不是物理表

###语法

CREATE MATERIALIZED VIEW [IF NOT EXISTS] [database.]<mv_name>

[COMMENT ""]

[PROPERTIES ("key"="value", ...)]

AS

<query_statement>

##异步物化视图

基于default_catalog为基表创建的异步物化视图,StarRocks 通过排除数据与基表不一致的物化视图,来保证改写之后的查询与原始查询结果的强一致性。External Catalog 创建的物化视图由于异步刷新机制,查询结果可能与基表上查询的结果不一致。

###限制

  • 异步物化视图不支持使用 List 分区策略,不支持基于使用 List 分区的基表创建。

  • 查询改写只支持Cardinality Preservation Join(结果集行数不会超过输入表中的任意一方)

  • 不支持grouping set、grouping set with rollup 以及 grouping set with cube 的查询改写

  • 分区物化视图只支持 Range 分区

###语法

CREATE MATERIALIZED VIEW [IF NOT EXISTS] [database.]<mv_name>

[COMMENT ""]

-- 必须至少指定

distribution_desc

refresh_scheme 其中之一。

-- distribution_desc[DISTRIBUTED BY HASH(<bucket_key>[,<bucket_key2> ...]) [BUCKETS <bucket_number>]]

-- refresh_desc[REFRESH

-- refresh_moment

[IMMEDIATE | DEFERRED]

-- refresh_scheme

[ASYNC | ASYNC [START (<start_time>)] EVERY (INTERVAL <refresh_interval>) | MANUAL]]

-- partition_expression

[PARTITION BY {<date_column> | date_trunc(fmt, <date_column>)}]

-- order_by_expression

[ORDER BY (<sort_key>)][PROPERTIES ("key"="value", ...)]AS <query_statement>

##手动刷新视图

-- 异步调用刷新任务。

REFRESH MATERIALIZED VIEW <mv_name>;

-- 同步调用刷新任务。

REFRESH MATERIALIZED VIEW <mv_name> WITH SYNC MODE;

#查询加速

##方案一:于StarRocks物化视图加速即席指标

StarRocks 查询改会校验是否可以复用已有物化视图中的预计算结果处理查询,如果不能复用会去原表查询,保证数据一致性。

(1)基于原子指标创建异步物化视图

CREATE MATERIALIZED VIEW sum_price_view

REFRESH ASYNC START('2025-05-01 09:00:00') EVERY (interval 1 day)

AS

SELECT

sum(price),user_id,project_id,channel,pt

FROM order_info group by user_id,project_id,channel,pt;

(2)根据维度channel ,20250101<= pt <= 20250105 查询

即席查询生成sql

select sum(price) as sum_price_day,channel,pt

from order_info

where pt >= '20250101' and pt<= '20250105'

group by channel,pt

因为有sum_price物化视图,StarRocks会改写查询

select sum(price) as sum_price_day,channel,pt

from sum_price_view

where pt >= '20250101' and pt <= '20250105'

group by channel,pt

从而达到查询加速的目的。

##方案二:基于StarRocks物化视图加速

落表指标落表指标只生成最多维度结果表,其他结果表基于最全结果表使用同步物化视图代替。

(1)与方案一一样也基于原子指标创建物化视图

(2)创建所有已选维度的结果表,结果表使用range分区

-- 结果表分区字段设置为date类型,分区方式使用时间表达式分区

-- 主键修改为bigint类型自增

CREATE TABLE IF NOT EXISTS sum_price_day_channel_project_id ( pk bigint AUTO_INCREMENT,

pt datetime,

sum_price_day DOUBLE,

channel string,

project_id int(11)

)

PRIMARY KEY (pk,pt)

PARTITION BY date_trunc('day',pt)

DISTRIBUTED BY HASH(pk)PROPERTIES (

"enable_persistent_index" = "true");

-- 基于所有维度结果表创建异步分区物化视图

CREATE MATERIALIZED VIEW sum_price_day_channel_view

REFRESH ASYNC

PARTITION BY pt

AS

SELECT

sum(sum_price_day),channel

FROM sum_price_day_channel_project_id

where pt = '{pt}'group by channel;

CREATE MATERIALIZED VIEW sum_price_day_project_view

REFRESH

ASYNC

PARTITION BY pt

AS

SELECT

sum(sum_price_day),project_id

FROM sum_price_day_channel_project_id

where pt = '{pt}'

group by project_id;

(3)落表指标任务 sql 利用物化视图自动刷新机制,查询sum_price_day_channel_view、sum_price_day_project_view 数据会与sum_price_day_channel_project_id结果一致,并支持查询改写。

insert OVERWRITE sum_price_day_channel_project_id PARTITION(pt='20250501') (pt,sum_price_day,channel,project_id)

select str2date('20250501', '%Y%m%d'),idx.sum_price_day,idx.channel,idx.project_id from

( select sum(price) as sum_price_day ,channel, project_id from order_info where pt = '{pt}'

group by project_id,channel,project_id;)idx

基于以上操作可以减少导入结果表次数加速任务运行,简化取数sql结合StarRocks查询改写提升查询性能。

##方案三:其他优化

通过字典转换string类型为integer类型提升效率。

有序的排序聚合 (Sorted streaming aggregate),利用排序键提高group性能。

Colocate Join 通过指定 "colocate_with" = "group_name" 参数,使相同维度数据保持在同一组 BE 节点上,从而减少数据在节点间的传输耗时,提升join性能。

(1)创建字典表并导入数据。

CREATE TABLE channel_dict (

channel STRING,

channel_int BIGINT AUTO_INCREMENT

)

PRIMARY KEY (channel)

DISTRIBUTED BY HASH(channel)

PROPERTIES("replicated_storage" = "true");

CREATE TABLE order_id_dict ( order_id STRING, order_id_int BIGINT AUTO_INCREMENT )

PRIMARY KEY (order_id)

DISTRIBUTED BY HASH(order_id)

PROPERTIES("replicated_storage" = "true");

CREATE TABLE user_id_dict (

user_id STRING,

user_id_int BIGINT AUTO_INCREMENT

)

PRIMARY KEY (user_id)

DISTRIBUTED BY HASH(user_id)

PROPERTIES("replicated_storage" = "true");

-- 导入数据

insert into channel_dict(channel) select distinct channel from order_info;

insert into order_id_dict(order_id) select distinct order_id from order_info;

insert into user_id_dict(user_id) select distinct user_id from order_info;

(2)创建包含channel_integer的结果表并导入数据。

CREATE TABLE order_info_integer (

order_id varchar(64) NOT NULL COMMENT "订单id",

pt varchar(12) NOT NULL COMMENT "用户id",

user_id varchar(64) NOT NULL COMMENT "用户id",

price double NULL COMMENT "",

project_id int(11) NOT NULL COMMENT "产品id",

channel varchar(64) NULL COMMENT "渠道"

-- 该列是配置 dict_mapping 的生成列,在导入数据时其列值自动从示例一中的字典表 dict 中获取。

-- 后续可以直接基于该列进行去重和 JOIN 查询。

channel_int BIGINT AS dict_mapping('channel_dict', channel),

order_id_int BIGINT AS dict_mapping('order_id_dict', order_id),

user_id BIGINT AS dict_mapping('user_id_dict', user_id)

)ENGINE=OLAP

PRIMARY KEY(order_id,pt)

PARTITION BY (pt)

DISTRIBUTED BY HASH(order_id)

PROPERTIES (

"replication_num" = "3",

"in_memory" = "false",

"enable_persistent_index" = "true",

"replicated_storage" = "true",

"compression" = "LZ4"

);

insert into order_info_integer(order_id,pt,user_id,price,project_id)

select order_id,pt,user_id,price,project_id from order_info;

(3)结果表存储以及后续关联都是用integer字段,会加速查询关联查询。

这个方案会产生字典数据,查询时需要查字典表进行id转换,会带来一定开销,适合关联比较频繁的场景使用。

本文由博客一文多发平台 OpenWrite 发布!

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
0 评论
0 收藏
0
分享
返回顶部
顶部