译文推荐|ScyllaDB + Pulsar:分布式数据库与事件流的结合

2022/08/10 19:17
阅读数 306

本文翻译自 Raouf Chebri 发布在 Medium 上的《Integrating a distributed database with event streaming》一文。校对与整理:StreamNative

译者简介

张建威,Shopee 商家服务团队后台开发工程师,致力于云原生、微服务、分布式架构等方向,热爱开源。

本文整理自 ScyllaDB 和 StreamNative 联合举办的《Distributed Data Systems Masterclass》活动上的分享,我和 AWS 的 Maheedhar Gunturu 以及 StreamNative 的 Tim Spann 一起参加了这次活动,感觉很有意思。

什么是事件流

事件是发生的事情或者事实。在微服务架构中,我们可以想象一个事件影响多个服务并生成一系列交互。连续不断的事件和数据称为事件流。

数据流平台把这些工具整合在一起,目的是将静态的数据转换为在组织中可以自由流动的数据 -- Ben Stopford《Designing Event-Driven Systems(设计事件驱动系统)》。

像 Apache Kafka 和 Apache Pulsar 等数据流平台负责管理数据流,并把它们传递到正确的数据系统或服务。数据流平台将由生产者产生的数据传递给消费者,或者从一个数据源导出数据并导入到另一个数据源。

适用的场景包括数据迁移、实时分析或者查询型数据库等。

本文将演示如何使用 Python 创建一个简单的生产者服务,用于生产消息到 Apache Pulsar 中的主题。我们将会使用 ScyllaDB,并创建一个 Sink 连接器用于将消息导出到该数据库中。

使用 ScyllaDB 和 Pulsar 构建低延迟应用

使用 ScyllaDB 和 Apache Pulsar 旨在将分布式数据系统结合,以提供低延迟和高吞吐的能力。

下图来源于 《2022 Pulsar 与 Kafka 基准测试》白皮书,它展示了 Apache Pulsar 的 P99 操作耗时只有几毫秒。ScyllaDB 是一个分布式 NoSQL 数据库,它使用 Seastar 框架实现了操作的稳定低延迟以及百万 QPS 的高性能。

内置的连接器

如上所述,像 Pulsar 和 Kafka 等数据流平台可用于数据迁移。用于向数据流平台导入数据的系统称为“Source(数据源)”,接收数据流平台导出数据的系统称为“Sink(数据池)”。

作为开发者,你可以根据需要创建自己的连接器。然而,如果你不想创建自己的连接器以避免维护多余的代码,可以选择 Pulsar 开箱即用的连接器,支持了几乎所有的数据系统,例如 PostgreSQL、Kafka、Kinesis 和 Redis 等。

在接下来的示例中,我们将会使用 Apache Cassandra 的 Sink 连接器来组合 ScyllaDB 和 Pulsar。

组合 ScyllaDB 和 Pulsar

在这个例子中,我们将会使用内置的 Cassandra Sink 连接器。请注意,ScyllaDB 和 Cassandra 是 API 兼容的。然而,和 ScyllaDB 的 Kafka 连接器[1]不同,该 Cassandra 连接器不支持分片感知(Not shared-aware)。

更多关于每核一个分片(Shard-per-core)架构的介绍请参考此页面[2]

接下来我们开始使用 Apache Cassandra Sink 连接器组合 ScyllaDB 和 Pulsar。我们将分三步来完成。

步骤 1:准备数据库

执行以下命令使用 Docker 创建一个本地 ScyllaDB 集群:

docker run --name scylla -d scylladb/scylla

请注意,在本示例中我们创建了单节点集群。ScyllaDB 是一个分布式数据库,为了实现高可用建议创建一个至少包含三个节点的集群。

docker run 命令会在后台启动一个新的 Docker 实例,名为 scylla,用于运行 ScyllaDB 服务器。

执行以下命令确认集群已经正常运行:

docker exec -it scylla nodetool status

预期输出:

Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address     Load       Tokens       Owns (effective)  Host ID                               Rack
UN  172.17.0.2  103.67 KiB  256          100.0%            af0e4b2f-84e0-4f0b-bb14-bd5f9070ff26  rack1

集群运行之后,我们将会使用 CQLSH 连接至数据库并创建键空间和表。

docker exec -it scylla cqlsh

使用以下命令创建键空间:

CREATE KEYSPACE pulsar_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor':1};

使用 pulsar_keyspace 并创建表 pulsar_table

CREATE TABLE pulsar_table (key text PRIMARY KEY, col text);

pulsar_table 非常简单。它有一个键和列都是 text 类型。
完成 Scylla 课程[3]了解更多有关 ScyllaDB 的知识。

步骤 2:创建 Sink 连接器

在这一步,我们将会使用 Docker 创建一个 Pulsar 实例并配置 Sink 连接器。执行以下命令使用 Docker 运行 Pulsar。

$ docker run -it \
-p 6650:6650 \
-p 8080:8080 \
--mount source=pulsardata,target=/pulsar/data \
--mount source=pulsarconf,target=/pulsar/conf \
apachepulsar/pulsar:2.7.0 \
bin/pulsar standalone

执行以下命令列出运行中的容器:

$ docker  ps -a
CONTAINER ID   IMAGE                       COMMAND                  CREATED          STATUS          PORTS                                            NAMES
3b027303f6e6   apachepulsar/pulsar:2.7.0   "bin/pulsar standalo…"   11 minutes ago   Up 11 minutes   0.0.0.0:6650->6650/tcp, 0.0.0.0:8080->8080/tcp   gallant_mahavira

使用以下命令连接至该 Pulsar 实例:

docker exec -it <container id> /bin/bash

首先,我们需要创建配置文件:

"configs" :
{
  “roots”: “localhost:9042”,
  “keyspace”: “pulsar_keyspace”,
  “columnFamily”: “pulsar_table”,
  “keyname”: “key”,
  “columnName”: “col”
}

在这个例子中我将会使用 JSON,根据需要你也可以使用 YAML。

configs:
  roots: “localhost:9042”
  keyspace: “pulsar_keyspace”
  columnFamily: “pulsar_table”
  keyname: “key”
  columnName: “col”

创建该 JSON 文件:

touch scylla-sink.json
json="{"configs":{"roots":"localhost:9042","keyspace":"pulsar_keyspace","columnFamily":"pulsar_table","keyname":"key","columnName":"col"}}"
echo $json > scylla-sink.json

执行以下命令创建 Sink:

bin/pulsar-admin sinks create \
  — tenant public \
  — namespace default \
  — name scylla-sink \
  — sink-type cassandra \
  — sink-config-file scylla-sink.yml \
  — inputs test_scylla

以上命令会创建一个 Sink 连接器,它将把所有写入到 test_scylla 主题中的消息写入到我们之前创建的 ScyllaDB 集群中。注意我们设置了 sink-type 为 cassandra

步骤 3:使用 Python 进行测试

在以下 producer.py 文件的代码中,我们使用 Client 类连接至 Pulsar 实例。之后,我们为主题 test_scylla 创建一个生产者,并向该主题发送 10 条消息。

import pulsar
client = pulsar.Client("pulsar://localhost:6650")
producer = client.create_producer(
  "persistent://public/default/test_scylla"
)
for i in range(10):
  producer.send(
    (‘Hello-%d’ % i).encode(‘utf-8’), 
    properties=None,
    partition_key="my-key-{}".format(i)
  )
client.close()

你可以使用以下命令执行上面的代码:

python3 producer.py

现在让我们看一下数据库:

docker exec -it scylla cqlsh
select * from pulsar_keyspace.pulsar_table;

预期结果:

key      | col
---------+---------
my-key-7 | Hello-7
my-key-2 | Hello-2
my-key-8 | Hello-8
my-key-0 | Hello-0
my-key-3 | Hello-3
my-key-1 | Hello-1
my-key-5 | Hello-5
my-key-6 | Hello-6
my-key-4 | Hello-4
my-key-9 | Hello-9

总结

ScyllaDB 和 Apache Pulsar 都是拥有低延迟操作的分布式数据系统。以上示例展示了如何使用 Pulsar 内置的 Cassandra 连接器,并将它和 ScyllaDB 组合作为一个 Sink。

与 Kafka 的 ScyllaDB 连接器不同的是,内置的 Cassandra 连接器不支持分片感知。尽管该连接器兼容 ScyllaDB,但是它并没有为和 ScyllaDB 高效协作而进行优化。

引用链接

[1] ScyllaDB 的 Kafka 连接器: https://docs.scylladb.com/using-scylla/integrations/sink-kafka-connector/
[2] 此页面: https://www.scylladb.com/product/technology/shard-per-core-architecture/
[3] Scylla 课程: https://university.scylladb.com/


关注「Apache Pulsar」,获取干货与动态

👇🏻 加入 Apache Pulsar 中文交流群 👇🏻

点击「阅读原文」,给 Pulsar 点赞!

本文分享自微信公众号 - ApachePulsar(ApachePulsar)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
0 评论
0 收藏
0
分享
返回顶部
顶部