

-
我们该使用什么表引擎 -
Flink如何写入到ClickHouse -
查询ClickHouse为什么要比查询ES慢1~2分钟 -
写入分布式表还是本地表 -
为什么只有某个分片CPU使用率高 -
如何定位是哪些SQL在消耗CPU,这么多慢SQL,我怎么知道是哪个SQL引起的 -
找到了慢SQL,如何进行优化 -
如何抗住高并发、保证ClickHouse可用性
-
Log系列:适用于少量数据(小于一百万行)的场景,不支持索引,所以对于范围查询效率不高。 -
Integration系列:主要用于导入外部数据到ClickHouse,或者在ClickHouse中直接操作外部数据,支持Kafka、HDFS、JDBC、Mysql等。 -
Special系列:比如Memory将数据存储在内存,重启后会丢失数据,查询性能极好,File直接将本地文件作为数据存储等大多是为了特定场景而定制的。 -
MergeTree系列:MergeTree家族自身拥有多种引擎的变种,其中MergeTree作为家族中最基础的引擎提供主键索引、数据分区、数据副本和数据采样等能力并且支持极大量的数据写入,家族中其他引擎在MergeTree引擎的基础上各有所长。
3.1 MergeTree:合并树
CREATE TABLE test_MergeTree (
orderNo String,
number Int16,
createTime DateTime,
updateTime DateTime
) ENGINE = MergeTree()
PARTITION BY createTime
ORDER BY (orderNo)
PRIMARY KEY (orderNo);
insert into test_MergeTree values('1', '20', '2021-01-01 00:00:00', '2021-01-01 00:00:00');
insert into test_MergeTree values('1', '30', '2021-01-01 00:00:00', '2021-01-01 01:00:00');




3.2 ReplacingMergeTree:替换合并树
-
如果ver版本列未指定,相同主键行中保留最后插入的一行。 如果ver版本列已经指定,下面实例就指定了version列为版本列,去重是将会保留version值最大的一行,与数据插入顺序无关。
CREATE TABLE test_ReplacingMergeTree (
orderNo String,
version Int16,
number Int16,
createTime DateTime,
updateTime DateTime
) ENGINE = ReplacingMergeTree(version)
PARTITION BY createTime
ORDER BY (orderNo)
PRIMARY KEY (orderNo);
1) insert into test_ReplacingMergeTree values('1', 1, '20', '2021-01-01 00:00:00', '2021-01-01 00:00:00');
2) insert into test_ReplacingMergeTree values('1', 2, '30', '2021-01-01 00:00:00', '2021-01-01 01:00:00');
3) insert into test_ReplacingMergeTree values('1', 3, '30', '2021-01-02 00:00:00', '2021-01-01 01:00:00');
-- final方式去重
select * from test_ReplacingMergeTree final;
-- argMax方式去重
select argMax(orderNo,version) as orderNo, argMax(number,version) as number,argMax(createTime,version),argMax(updateTime,version) from test_ReplacingMergeTree
-
普通查询:查询结果未去重,物理数据未去重(未合并分区文件) -
final去重查询:查询结果已去重,物理数据未去重(未合并分区文件) -
argMax去重查询:查询结果已去重,物理数据未去重(未合并分区文件)



-
使用主键作为判断重复数据的唯一键,支持插入相同主键数据。 -
在合并分区的时候会触发删除重复数据的逻辑。但是合并的时机不确定,所以在查询的时候可能会有重复数据,但是最终会去重。可以手动调用optimize,但是会引发对数据大量的读写,不建议生产使用。 -
以数据分区为单位删除重复数据,当分区合并时,同一分区内的重复数据会被删除,不同分区的重复数据不会被删除。 -
可以通过final,argMax方式做查询去重,这种方式无论有没有做过数据合并,都可以得到正确的查询结果。
-
普通select查询:对时效不高的离线查询可以采用ClickHouse自动合并配合,但是需要保证同一业务单据落在同一个数据分区,分布式表也需要保证在同一个分片(Shard),这是一种最高效,最节省计算资源的查询方式。 -
final方式查询:对于实时查询可以使用final,final是本地去重,需要保证同一主键数据落在同一个分片(Shard),但是不需要落在同一个数据分区,这种方式效率次之,但是与普通select相比会消耗一些性能,如果where条件对主键索引,二级索引,分区字段命中的比较好的话效率也可以完全可以使用。 -
argMax方式查询:对于实时查询可以使用argMax,argMax的使用要求最低,咋查都能去重,但是由于它的实现方式,效率会低很多,也很消耗性能,不建议使用。后面9.4.3会配合压测数据与final进行对比。
3.3 CollapsingMergeTree/VersionedCollapsingMergeTree:折叠合并树

-
如果sign=1比sign=-1的数据多至少一行,则保留最后一行sign=1的数据。 -
如果sign=-1比sign=1多至少一行,则保留第一行sign=-1的行。 -
如果sign=1与sign=-1的行数一样多,最后一行是sign=1,则保留第一行sign=-1和最后一行sign=1的数据。 -
如果sign=1与sign=-1的行数一样多,最后一行是sign=-1,则什么都不保留。 -
其他情况ClickHouse不会报错但会打印告警日志,这种情况下,查询的结果是不确定不可预知的。
-
使用optimize强制合并,同样也不建议在生产环境中使用效率极低并且消耗资源的强制合并。 -
改写查询方式,通过group by 配合有符号的sign列来完成。这种方式增加了使用的编码成本

3.4 表引擎总结

4.1 Flink版本问题
-
1.11版本之前包名为flink-jdbc -
1.11版本(包含)之后包名为flink-connector-jdbc

4.2 构造ClickHouse Sink
/**
* 构造Sink
* @param clusterPrefix clickhouse 数据库名称
* @param sql insert 占位符 eq:insert into demo (id, name) values (?, ?)
*/
public static SinkFunction getSink(String clusterPrefix, String sql) {
String clusterUrl = LoadPropertiesUtil.appInfoProcessMap.get(clusterPrefix + CLUSTER_URL);
String clusterUsername = LoadPropertiesUtil.appInfoProcessMap.get(clusterPrefix + CLUSTER_USER_NAME);
String clusterPassword = LoadPropertiesUtil.appInfoProcessMap.get(clusterPrefix + CLUSTER_PASSWORD);
return JdbcSink.sink(sql, new CkSinkBuilder<>(),
new JdbcExecutionOptions.Builder().withBatchSize(200000).build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
.withUrl(clusterUrl)
.withUsername(clusterUsername)
.withPassword(clusterPassword)
.build());
}
-
sql:占位符形式的sql语句,例如:insert into demo (id, name) values (?, ?) -
new CkSinkBuilder<>():org.apache.flink.connector.jdbc.JdbcStatementBuilder接口的实现类,主要是将流中数据映射到java.sql.PreparedStatement 来构造PreparedStatement ,具体不再赘述。 -
第三个入参:flink sink的执行策略。 -
第四个入参:jdbc的驱动,连接,账号与密码。

-
使用时直接在DataStream流中addSink即可。


-
第一步:写入分布式表1000条数据,分布式表会根据路由规则,假设按照规则300条分配到S1,200条到S2,500条到S3 -
第二步:client给过来1000条数据,属于S1的300条数据直接写入磁盘,数据S2,S3的数据也会写入到S1的临时目录 -
第三步:S2,S3接收到zk的变更通知,生成拉取S1中当前分片对应的临时目录数据的任务,并且将任务放到一个队列,等到某个时机会将数据拉到自身节点。
7.1 数据分布不均匀,导致部分节点CPU高

7.2 某节点触发合并,导致该节点CPU高



SELECT
ifNull(sum(t1.unTrackQty), 0) AS unTrackQty
FROM
wms.wms_order_sku_local AS t1 FINAL
PREWHERE t1.shipmentOrderCreateTime > '2021-11-17 11:00:00'
AND t1.shipmentOrderCreateTime <= '2021-11-18 11:00:00'
AND t1.gridStationNo = 'WG0000514'
AND t1.warehouseNo NOT IN ('wms-6-979', 'wms-6-978', '6_979', '6_978')
AND t1.orderType = '10'
WHERE
t1.ckDeliveryTaskStatus = '3'


-
7-0节点:扫描了4个part分区文件,共计94W行,耗时0.089s -
7-4节点:扫描了2个part分区文件,其中有一个part491W行,共计502W行,耗时0.439s


7.3 物理机故障

8.1 grafana定位高频执行SQL


8.2 扫描行数高/使用内存高:query_log_all分析

-- 创建query_log分布式表
CREATE TABLE IF NOT EXISTS system.query_log_all
ON CLUSTER default
AS system.query_log
ENGINE = Distributed(sht_ck_cluster_pro,system,query_log,rand());
-- 查询语句
select
-- 执行次数
count(),
-- 平均查询时间
avg(query_duration_ms) avgTime,
-- 平均每次读取数据行数
floor(avg(read_rows)) avgRow,
-- 平均每次读取数据大小
floor(avg(read_rows) / 10000000) avgMB,
-- 具体查询语句
any(query),
-- 去除掉where条件,用户group by归类
substring(query, positionCaseInsensitive(query, 'select'), positionCaseInsensitive(query, 'from')) as queryLimit
from system.query_log_all/system.query_log
where event_date = '2022-01-21'
and type = 2
group by queryLimit
order by avgRow des

9.1 使用服务日志进行慢查询分析
select
ifNull(sum(interceptLackQty), 0) as interceptLackQty
from wms.wms_order_sku_local final
prewhere productionEndTime = '2022-02-17 08:00:00'
and orderType = '10'
where shipmentOrderDetailDeleted = '0'
and ckContainerDetailDeleted = '0'
clickhouse-client -h 地址 --port 端口 --user 用户名 --password 密码 --send_logs_level=trace
[2022.02.17 21:21:54.036317 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Debug> executeQuery: (from 11.77.96.163:35988, user: bjwangjiangbo) select ifNull(sum(interceptLackQty), 0) as interceptLackQty from wms.wms_order_sku_local final prewhere productionEndTime = '2022-02-17 08:00:00' and orderType = '10' where shipmentOrderDetailDeleted = '0' and ckContainerDetailDeleted = '0' ]
[2022.02.17 21:21:54.037876 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> ContextAccess (bjwangjiangbo): Access granted: SELECT(orderType, interceptLackQty, productionEndTime, shipmentOrderDetailDeleted, ckContainerDetailDeleted) ON wms.wms_order_sku_local ]
[2022.02.17 21:21:54.038239 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Debug> wms.wms_order_sku_local (SelectExecutor): Key condition: unknown, unknown, and, unknown, unknown, and, and, unknown, unknown, and, and ]
[2022.02.17 21:21:54.038271 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Debug> wms.wms_order_sku_local (SelectExecutor): MinMax index condition: unknown, unknown, and, unknown, unknown, and, and, unknown, unknown, and, and ]
[2022.02.17 21:21:54.038399 [ 1340 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> wms.wms_order_sku_local (SelectExecutor): Not using primary index on part 202101_0_0_0_3 ]
[2022.02.17 21:21:54.038475 [ 1407 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> wms.wms_order_sku_local (SelectExecutor): Not using primary index on part 202103_0_17_2_22 ]
[2022.02.17 21:21:54.038491 [ 111 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> wms.wms_order_sku_local (SelectExecutor): Not using primary index on part 202103_18_20_1_22 ]
..................................省去若干行(此块含义为:在分区内检索有没有使用索引).................................................
[2022.02.17 21:21:54.039041 [ 1205 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> wms.wms_order_sku_local (SelectExecutor): Not using primary index on part 202202_1723330_1723365_7 ]
[2022.02.17 21:21:54.039054 [ 159 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> wms.wms_order_sku_local (SelectExecutor): Not using primary index on part 202202_1723367_1723367_0 ]
[2022.02.17 21:21:54.038928 [ 248 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> wms.wms_order_sku_local (SelectExecutor): Not using primary index on part 202201_3675258_3700711_1054 ]
[2022.02.17 21:21:54.039355 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Debug> wms.wms_order_sku_local (SelectExecutor): Selected 47 parts by date, 47 parts by key, 9471 marks by primary key, 9471 marks to read from 47 ranges ]
[2022.02.17 21:21:54.039495 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> MergeTreeSelectProcessor: Reading 1 ranges from part 202101_0_0_0_3, approx. 65536 rows starting from 0 ]
[2022.02.17 21:21:54.039583 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> MergeTreeSelectProcessor: Reading 1 ranges from part 202101_1_1_0_3, approx. 16384 rows starting from 0 ]
[2022.02.17 21:21:54.040291 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> MergeTreeSelectProcessor: Reading 1 ranges from part 202102_0_2_1_4, approx. 146850 rows starting from 0 ]
..................................省去若干行(每个分区读取的数据行数信息).................................................
[2022.02.17 21:21:54.043538 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> MergeTreeSelectProcessor: Reading 1 ranges from part 202202_1723330_1723365_7, approx. 24576 rows starting from 0 ]
[2022.02.17 21:21:54.043604 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> MergeTreeSelectProcessor: Reading 1 ranges from part 202202_1723366_1723366_0, approx. 8192 rows starting from 0 ]
[2022.02.17 21:21:54.043677 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> MergeTreeSelectProcessor: Reading 1 ranges from part 202202_1723367_1723367_0, approx. 8192 rows starting from 0 ]
..................................完成数据读取,开始进行聚合计算.................................................
[2022.02.17 21:21:54.047880 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> InterpreterSelectQuery: FetchColumns -> Complete ]
[2022.02.17 21:21:54.263500 [ 1377 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> AggregatingTransform: Aggregating ]
[2022.02.17 21:21:54.263680 [ 1439 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> Aggregator: Aggregation method: without_key ]
..................................省去若干行(数据读取完成后做聚合操作).................................................
[2022.02.17 21:21:54.263840 [ 156 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> AggregatingTransform: Aggregated. 12298 to 1 rows (from 36.03 KiB) in 0.215046273 sec. (57187.69187876137 rows/sec., 167.54 KiB/sec.) ]
[2022.02.17 21:21:54.264283 [ 377 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> AggregatingTransform: Aggregated. 12176 to 1 rows (from 35.67 KiB) in 0.215476999 sec. (56507.191284950095 rows/sec., 165.55 KiB/sec.) ]
[2022.02.17 21:21:54.264307 [ 377 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> Aggregator: Merging aggregated data ]
..................................完成聚合计算,返回最终结果.................................................
┌─interceptLackQty─┐
│ 563 │
└──────────────────┘
...................................数据处理耗时,速度,信息展示................................................
[2022.02.17 21:21:54.265490 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Information> executeQuery: Read 73645604 rows, 1.20 GiB in 0.229100749 sec., 321455099 rows/sec., 5.22 GiB/sec. ]
[2022.02.17 21:21:54.265551 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Debug> MemoryTracker: Peak memory usage (for query): 60.37 MiB. ]
1 rows in set. Elapsed: 0.267 sec. Processed 73.65 million rows, 1.28 GB (276.03 million rows/s., 4.81 GB/s.

-
没有使用主键索引:导致全表扫描 -
没有使用分区索引:导致全表扫描



9.2 建表优化
9.2.1 尽量不使用Nullable类型
CREATE TABLE test_Nullable(
orderNo String,
number Nullable(Int16),
createTime DateTime
) ENGINE = MergeTree()
PARTITION BY createTime
ORDER BY (orderNo)
PRIMARY KEY (orderNo);

9.2.2 分区粒度
9.2.3 分布式表选择合适的分片规则
9.3 性能测试,对比优化效果
clickhouse-benchmark -c 1 -h 链接地址 --port 端口号 --user 账号 --password 密码 <<< "具体SQL语句"

9.4 查询优化
9.4.1 条件聚合函数降低扫描数据行数
-- 入库件量
select sum(qty) from table_1 final prewhere type = 'inbound' and dt = '2021-01-01';
-- 有效出库单量
select count(distinct orderNo) final from table_1 prewhere type = 'outbound' and dt = '2021-01-01' where and status = '1' ;
-- 复核件量
select sum(qty) from table_1 final prewhere type = 'check' and dt = '2021-01-01';
select
sumIf(qty, type = 'inbound'), -- 入库件量
countIf(distinct orderNo, type = 'outbound' and status = '1'), -- 有效出库单量
sumIf(qty, type = 'check') -- 复核件量
prewhere dt = '2021-01-01';
9.4.2 二级索引
跳数索引是指数据片段按照粒度(建表时指定的index_granularity)分割成小块后,将granularity_value数量的小块组合成一个大的块,对这些大块写入索引信息,这样有助于使用where筛选时跳过大量不必要的数据,减少SELECT需要读取的数据量。
CREATE TABLE table_name
(
u64 UInt64,
i32 Int32,
s String,
...
INDEX a (u64 * i32, s) TYPE minmax GRANULARITY 3,
INDEX b (u64 * length(s)) TYPE set(1000) GRANULARITY 4
) ENGINE = MergeTree()
...
SELECT count() FROM table WHERE s < 'z'
SELECT count() FROM table WHERE u64 * i32 == 10 AND u64 * length(s) >= 1234
-
minmax:以index granularity为单位,存储指定表达式计算后的min、max值;在等值和范围查询中能够帮助快速跳过不满足要求的块,减少IO。 -
set(max_rows):以index granularity为单位,存储指定表达式的distinct value集合,用于快速判断等值查询是否命中该块,减少IO。 -
ngrambf_v1(n, size_of_bloom_filter_in_bytes, number_of_hash_functions, random_seed):将string进行ngram分词后,构建bloom filter,能够优化等值、like、in等查询条件。 -
tokenbf_v1(size_of_bloom_filter_in_bytes, number_of_hash_functions, random_seed):与ngrambf_v1类似,区别是不使用ngram进行分词,而是通过标点符号进行词语分割。 -
bloom_filter([false_positive]):对指定列构建bloom filter,用于加速等值、like、in等查询条件的执行。
Alter table wms.wms_order_sku_local ON cluster default ADD INDEX belongProvinceCode_idx belongProvinceCode TYPE set(0) GRANULARITY 5;
Alter table wms.wms_order_sku_local ON cluster default ADD INDEX productionEndTime_idx productionEndTime TYPE minmax GRANULARITY 5;
-- 拼接出所有数据分区的MATERIALIZE语句
select concat('alter table wms.wms_order_sku_local on cluster default ', 'MATERIALIZE INDEX productionEndTime_idx in PARTITION '||partition_id||',')
from system.parts
where database = 'wms' and table = 'wms_order_sku_local'
group by partition_id
-- 执行上述SQL查询出的所有MATERIALIZE语句进行重建分区索引数据
9.4.3 final替换argMax进行去重
-- final方式
select count(distinct groupOrderCode), sum(arriveNum), count(distinct sku)
from tms.group_order final
prewhere siteCode = 'WG0001544' and createTime >= '2022-03-14 22:00:00' and createTime <= '2022-03-15 22:00:00' where arriveNum > 0 and test <> '1'
-- argMax方式
select count(distinct groupOrderCode), sum(arriveNumTemp), count(distinct sku)
from (
select
argMax(groupOrderCode,version) as groupOrderCode,
argMax(arriveNum,version) as arriveNumTemp,
argMax(sku,version) as sku
from tms.group_order
prewhere siteCode = 'WG0001544' and createTime >= '2022-03-14 22:00:00' and createTime <= '2022-03-15 22:00:00' where arriveNum > 0 and test <> '1'
group by docId
)

9.4.4 prewhere替代where
-- 常规方式
select count(distinct orderNo) final from table_1
where type = 'outbound' and status = '1' and dt = '2021-01-01';
-- prewhere方式
select count(distinct orderNo) final from table_1
prewhere type = 'outbound' and dt = '2021-01-01'
where and status = '1' ;

--语句1:使用where + status=1 查询,无法命中docId:123_1这行数据
select count(distinct orderNo) final from table_1
where type = 'outbound' and dt = '2021-01-01' and status = '1';
--语句2:使用where + status=2 查询,可以查询到docId:123_1这行数据
select count(distinct orderNo) final from table_1
where type = 'outbound' and dt = '2021-01-01' and status = '2';
-- 语句3:错误方式,将status放到prewhere
select count(distinct orderNo) final from table_1
prewhere type = 'outbound' and dt = '2021-01-01' and status = '1';
-- 语句4:正确prewhere方式,status可变字段放到where上
select count(distinct orderNo) final from table_1
prewhere type = 'outbound' and dt = '2021-01-01'
where and status = '1' ;
9.4.5 列裁剪,分区裁剪
而分区裁剪就是只读取需要分区,控制好分区字段查询范围。
9.4.6 where、group by 顺序
-- 建表语句
create table group_order_local
(
docId String,
version UInt64,
siteCode String,
groupOrderCode String,
sku String,
... 省略非关键字段 ...
createTime DateTime
) engine =
ReplicatedReplacingMergeTree('/clickhouse/tms/group_order/{shard}', '{replica}', version)
PARTITION BY toYYYYMM(createTime)
ORDER BY (siteCode, groupOrderCode, sku);
--查询语句1
select count(distinct groupOrderCode) groupOrderQty, ifNull(sum(arriveNum),0) arriveNumSum,count(distinct sku) skuQty
from tms.group_order final
prewhere createTime >= '2021-09-14 22:00:00' and createTime <= '2021-09-15 22:00:00'and siteCode = 'WG0000709'
where arriveNum > 0 and test <> '1'
--查询语句2 (where/prewhere中字段)
select count(distinct groupOrderCode) groupOrderQty, ifNull(sum(arriveNum),0) arriveNumSum,count(distinct sku) skuQty
from tms.group_order final
prewhere siteCode = 'WG0000709' and createTime >= '2021-09-14 22:00:00' and createTime <= '2021-09-15 22:00:00'
where arriveNum > 0 and test <> '1



3)异步任务执行查询语句,将聚合指标结果落到ES中,应用查询ES中的聚合结果
4)物化视图,通过预聚合方式解决这种问题,但是我们这种业务场景不适用
本文分享自微信公众号 - 京东云开发者(JDT_Developers)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。