本文翻译自 Raouf Chebri 发布在 Medium 上的《Integrating a distributed database with event streaming》一文。校对与整理:StreamNative
译者简介
张建威,Shopee 商家服务团队后台开发工程师,致力于云原生、微服务、分布式架构等方向,热爱开源。
本文整理自 ScyllaDB 和 StreamNative 联合举办的《Distributed Data Systems Masterclass》活动上的分享,我和 AWS 的 Maheedhar Gunturu 以及 StreamNative 的 Tim Spann 一起参加了这次活动,感觉很有意思。
什么是事件流
事件是发生的事情或者事实。在微服务架构中,我们可以想象一个事件影响多个服务并生成一系列交互。连续不断的事件和数据称为事件流。
数据流平台把这些工具整合在一起,目的是将静态的数据转换为在组织中可以自由流动的数据 -- Ben Stopford《Designing Event-Driven Systems(设计事件驱动系统)》。
像 Apache Kafka 和 Apache Pulsar 等数据流平台负责管理数据流,并把它们传递到正确的数据系统或服务。数据流平台将由生产者产生的数据传递给消费者,或者从一个数据源导出数据并导入到另一个数据源。
适用的场景包括数据迁移、实时分析或者查询型数据库等。
本文将演示如何使用 Python 创建一个简单的生产者服务,用于生产消息到 Apache Pulsar 中的主题。我们将会使用 ScyllaDB,并创建一个 Sink 连接器用于将消息导出到该数据库中。
使用 ScyllaDB 和 Pulsar 构建低延迟应用
使用 ScyllaDB 和 Apache Pulsar 旨在将分布式数据系统结合,以提供低延迟和高吞吐的能力。
下图来源于 《2022 Pulsar 与 Kafka 基准测试》白皮书,它展示了 Apache Pulsar 的 P99 操作耗时只有几毫秒。ScyllaDB 是一个分布式 NoSQL 数据库,它使用 Seastar 框架实现了操作的稳定低延迟以及百万 QPS 的高性能。
内置的连接器
如上所述,像 Pulsar 和 Kafka 等数据流平台可用于数据迁移。用于向数据流平台导入数据的系统称为“Source(数据源)”,接收数据流平台导出数据的系统称为“Sink(数据池)”。
作为开发者,你可以根据需要创建自己的连接器。然而,如果你不想创建自己的连接器以避免维护多余的代码,可以选择 Pulsar 开箱即用的连接器,支持了几乎所有的数据系统,例如 PostgreSQL、Kafka、Kinesis 和 Redis 等。
在接下来的示例中,我们将会使用 Apache Cassandra 的 Sink 连接器来组合 ScyllaDB 和 Pulsar。
组合 ScyllaDB 和 Pulsar
在这个例子中,我们将会使用内置的 Cassandra Sink 连接器。请注意,ScyllaDB 和 Cassandra 是 API 兼容的。然而,和 ScyllaDB 的 Kafka 连接器[1]不同,该 Cassandra 连接器不支持分片感知(Not shared-aware)。
更多关于每核一个分片(Shard-per-core)架构的介绍请参考此页面[2]。
接下来我们开始使用 Apache Cassandra Sink 连接器组合 ScyllaDB 和 Pulsar。我们将分三步来完成。
步骤 1:准备数据库
执行以下命令使用 Docker 创建一个本地 ScyllaDB 集群:
docker run --name scylla -d scylladb/scylla
请注意,在本示例中我们创建了单节点集群。ScyllaDB 是一个分布式数据库,为了实现高可用建议创建一个至少包含三个节点的集群。
docker run
命令会在后台启动一个新的 Docker 实例,名为 scylla
,用于运行 ScyllaDB 服务器。
执行以下命令确认集群已经正常运行:
docker exec -it scylla nodetool status
预期输出:
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
-- Address Load Tokens Owns (effective) Host ID Rack
UN 172.17.0.2 103.67 KiB 256 100.0% af0e4b2f-84e0-4f0b-bb14-bd5f9070ff26 rack1
集群运行之后,我们将会使用 CQLSH 连接至数据库并创建键空间和表。
docker exec -it scylla cqlsh
使用以下命令创建键空间:
CREATE KEYSPACE pulsar_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor':1};
使用 pulsar_keyspace
并创建表 pulsar_table
:
CREATE TABLE pulsar_table (key text PRIMARY KEY, col text);
pulsar_table
非常简单。它有一个键和列都是 text
类型。
完成 Scylla 课程[3]了解更多有关 ScyllaDB 的知识。
步骤 2:创建 Sink 连接器
在这一步,我们将会使用 Docker 创建一个 Pulsar 实例并配置 Sink 连接器。执行以下命令使用 Docker 运行 Pulsar。
$ docker run -it \
-p 6650:6650 \
-p 8080:8080 \
--mount source=pulsardata,target=/pulsar/data \
--mount source=pulsarconf,target=/pulsar/conf \
apachepulsar/pulsar:2.7.0 \
bin/pulsar standalone
执行以下命令列出运行中的容器:
$ docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
3b027303f6e6 apachepulsar/pulsar:2.7.0 "bin/pulsar standalo…" 11 minutes ago Up 11 minutes 0.0.0.0:6650->6650/tcp, 0.0.0.0:8080->8080/tcp gallant_mahavira
使用以下命令连接至该 Pulsar 实例:
docker exec -it <container id> /bin/bash
首先,我们需要创建配置文件:
"configs" :
{
“roots”: “localhost:9042”,
“keyspace”: “pulsar_keyspace”,
“columnFamily”: “pulsar_table”,
“keyname”: “key”,
“columnName”: “col”
}
在这个例子中我将会使用 JSON,根据需要你也可以使用 YAML。
configs:
roots: “localhost:9042”
keyspace: “pulsar_keyspace”
columnFamily: “pulsar_table”
keyname: “key”
columnName: “col”
创建该 JSON 文件:
touch scylla-sink.json
json="{"configs":{"roots":"localhost:9042","keyspace":"pulsar_keyspace","columnFamily":"pulsar_table","keyname":"key","columnName":"col"}}"
echo $json > scylla-sink.json
执行以下命令创建 Sink:
bin/pulsar-admin sinks create \
— tenant public \
— namespace default \
— name scylla-sink \
— sink-type cassandra \
— sink-config-file scylla-sink.yml \
— inputs test_scylla
以上命令会创建一个 Sink 连接器,它将把所有写入到 test_scylla
主题中的消息写入到我们之前创建的 ScyllaDB 集群中。注意我们设置了 sink-type
为 cassandra
。
步骤 3:使用 Python 进行测试
在以下 producer.py
文件的代码中,我们使用 Client
类连接至 Pulsar 实例。之后,我们为主题 test_scylla
创建一个生产者,并向该主题发送 10 条消息。
import pulsar
client = pulsar.Client("pulsar://localhost:6650")
producer = client.create_producer(
"persistent://public/default/test_scylla"
)
for i in range(10):
producer.send(
(‘Hello-%d’ % i).encode(‘utf-8’),
properties=None,
partition_key="my-key-{}".format(i)
)
client.close()
你可以使用以下命令执行上面的代码:
python3 producer.py
现在让我们看一下数据库:
docker exec -it scylla cqlsh
select * from pulsar_keyspace.pulsar_table;
预期结果:
key | col
---------+---------
my-key-7 | Hello-7
my-key-2 | Hello-2
my-key-8 | Hello-8
my-key-0 | Hello-0
my-key-3 | Hello-3
my-key-1 | Hello-1
my-key-5 | Hello-5
my-key-6 | Hello-6
my-key-4 | Hello-4
my-key-9 | Hello-9
总结
ScyllaDB 和 Apache Pulsar 都是拥有低延迟操作的分布式数据系统。以上示例展示了如何使用 Pulsar 内置的 Cassandra 连接器,并将它和 ScyllaDB 组合作为一个 Sink。
与 Kafka 的 ScyllaDB 连接器不同的是,内置的 Cassandra 连接器不支持分片感知。尽管该连接器兼容 ScyllaDB,但是它并没有为和 ScyllaDB 高效协作而进行优化。
引用链接
[1]
ScyllaDB 的 Kafka 连接器: https://docs.scylladb.com/using-scylla/integrations/sink-kafka-connector/[2]
此页面: https://www.scylladb.com/product/technology/shard-per-core-architecture/[3]
Scylla 课程: https://university.scylladb.com/
▼ 关注「Apache Pulsar」,获取干货与动态 ▼
👇🏻 加入 Apache Pulsar 中文交流群 👇🏻
点击「阅读原文」,给 Pulsar 点赞!
本文分享自微信公众号 - ApachePulsar(ApachePulsar)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。