Pulsar Sink 入门指南

原创
2020/03/24 20:35
阅读数 165


阅读本文需要约 20 分钟。


Apache Pulsar 是一个分布式的发布-订阅消息系统,sink 是 Pulsar 的一个组件,用于将数据导入至其他系统。本文介绍了 sink 的功能,并演示了如何创建与使用 JDBC sink 与 MySQL 进行连接。


Sink 命令


Create

创建 sink


$ bin/pulsar-admin sink create <options>


常用参数

  • -a,--archive : 指定 sink 的 NAR 包

  • --classname : 指定 sink 的类名称

  • -i,--inputs : 指定 sink 的 topic,多个 topic 用逗号隔开

  • --name : 指定 sink 的名称

  • --namespace : 指定 sink 的命名空间

  • --parallelism : 指定 sink 的并发数

  • --sink-config-file : 指定 sink 的 yaml 配置文件

  • --tenant : 指定 sink 的租户


Update

更新 sink


$ bin/pulsar-admin sink update <options>


常用参数

  • -a,--archive : 指定 sink 的 NAR 包

  • --classname : 指定 sink 的类名称

  • -i,--inputs : 指定 sink 的 topic,多个 topic 用逗号隔开

  • --name : 指定 sink 的名称

  • --namespace : 指定 sink 的命名空间

  • --parallelism : 指定 sink 的并发数

  • --sink-config-file : 指定 sink 的 yaml 配置文件

  • --tenant : 指定 sink 的租户


Delete

删除 sink


$ bin/pulsar-admin sink delete <options>


常用参数

  • --name : 指定 sink 的名称

  • --namespace : 指定 sink 的命名空间

  • --tenant : 指定 sink 的租户


List

显示所有 sink


$ bin/pulsar-admin sink list <options>


常用参数

  • --namespace : 指定 sink 的命名空间

  • --tenant : 指定 sink 的租户


Get

显示 sink 的信息


$ bin/pulsar-admin sink get <options>


常用参数

  • --name : 指定 sink 的名称

  • --namespace : 指定 sink 的命名空间

  • --tenant : 指定 sink 的租户


Status

显示 sink 的状态


$ bin/pulsar-admin sink status <options>


常用参数

  • --instance-id :  指定 sink 的实例 ID

    如果未指定,则获取所有实例的状态

  • --name : 指定 sink 的名称

  • --namespace : 指定 sink 的命名空间

  • --tenant : 指定 sink 的租户


Stop

停止 sink


$ bin/pulsar-admin sink stop <options>


常用参数

  • --instance-id :  指定 sink 的实例 ID

    如果未指定,则停止所有实例的状态

  • --name : 指定 sink 的名称

  • --namespace : 指定 sink 的命名空间

  • --tenant : 指定 sink 的租户


Start

启动 sink


$ bin/pulsar-admin sink start <options>


常用参数

  • --instance-id :  指定 sink 的实例 ID

    如果未指定,则启动所有实例

  • --name : 指定 sink 的名称

  • --namespace : 指定 sink 的命名空间

  • --tenant : 指定 sink 的租户


Restart

重启 sink


$ bin/pulsar-admin sink restart <options>


常用参数

  • --instance-id :  指定 sink 的实例 ID

    如果未指定,则获取所有实例的状态

  • --name : 指定 sink 的名称

  • --namespace : 指定 sink 的命名空间

  • --tenant : 指定 sink 的租户


Localrun

本地运行


在本地运行一个 Pulsar IO sink connector,方便调试。


$ bin/pulsar-admin sink localrun <options>


常用参数

  • -a,--archive : 指定 source 的 NAR 包

  • --classname : 指定 sink 的类名称

  • -i,--inputs : 指定 sink 的 topic,多个 topic 用逗号隔开

  • --name : 指定 sink 的名称

  • --namespace : 指定 sink 的命名空间

  • --parallelism : 指定 sink 的并发数

  • --sink-config-file : 指定 sink 的 yaml 配置文件

  • --tenant : 指定 sink 的租户


环境搭建


本示例创建JDBC sink,并使用 JDBC sink 与 MySQL 进行连接。


准备工作


本示例在 Mac 系统上进行。在开始之前,需要安装以下依赖:

  • Docker (docker.com)

  • Java (https://www.oracle.com/technetwork/java/javase/downloads/index.html)

  • Maven (https://archive.apache.org/dist/maven/maven-3/)

  • Git (https://www.linode.com/docs/development/version-control/how-to-install-git-on-linux-mac-and-windows/)


开始搭建

搭建步骤总计 6 步。


1. 安装与启动 MySQL


(1)拉取 MySQL 镜像。

$ docker pull mysql:5.7


(2)启动 MySQL。

$ docker run -d -it --rm \
--name pulsar-mysql \
-p 3306:3306 \
-e MYSQL_ROOT_PASSWORD=jdbc \
-e MYSQL_USER=mysqluser \
-e MYSQL_PASSWORD=mysqlpw \
mysql:5.7


提示:

  • -d : 以后台模式运行。

  • -it : 以交互模式运行,并为 docker 分配一个伪输入终端。

  • --rm : docker 停止后,自动删除 docker

  • --name : 指定 docker 名称。 

    本示例指定 docker 名称为 pulsar-mysql。

  • -p : 指定端口。

    本示例指定对外暴露 3306 端口。

  • -e : 指定环境变量。 

    本示例为 MySQL 指定以下信息:

    root 用户的密码为 jdbc

    普通用户的名称为 mysqluser

    普通用户的密码为mysqlpw


(3)验证是否成功启动。 

$ docker logs -f pulsar-mysql


如果出现以下信息,则说明成功启动。

2019-07-29T01:50:05.116660Z 0 [Note] InnoDB: Waiting for purge to start
2019-07-29T01:50:05.168247Z 0 [Note] InnoDB: 5.7.26 started; log sequence number 12363846
2019-07-29T01:50:05.168596Z 0 [Note] InnoDB: Loading buffer pool(s) from /var/lib/mysql/ib_buffer_pool
2019-07-29T01:50:05.168855Z 0 [Note] Plugin 'FEDERATED' is disabled.
2019-07-29T01:50:05.173901Z 0 [Note] InnoDB: Buffer pool(s) load completed at 190729  1:50:05
2019-07-29T01:50:05.174778Z 0 [Note] Found ca.pem, server-cert.pem and server-key.pem in data directory. Trying to enable SSL support using them.
2019-07-29T01:50:05.175045Z 0 [Warning] CA certificate ca.pem is self signed.
2019-07-29T01:50:05.176942Z 0 [Note] Server hostname (bind-address): '*'; port: 3306
2019-07-29T01:50:05.177017Z 0 [Note] IPv6 is available.
2019-07-29T01:50:05.178937Z 0 [Note]   - '::' resolves to '::';
2019-07-29T01:50:05.178998Z 0 [Note] Server socket created on IP: '::'.
2019-07-29T01:50:05.181545Z 0 [Warning] Insecure configuration for --pid-file: Location '/var/run/mysqld' in the path is accessible to all OS users. Consider choosing a different directory.
2019-07-29T01:50:05.192955Z 0 [Note] Event Scheduler: Loaded 0 events
2019-07-29T01:50:05.193401Z 0 [Note] mysqld: ready for connections.
Version: '5.7.26'  socket: '/var/run/mysqld/mysqld.sock'  port: 3306  MySQL Community Server (GPL)


2  创建 MySQL 表


为了简化操作,本示例使用 root 用户和密码进入 docker,再创建数据库和表,方便数据写入。

(1)进入 MySQL。

$ docker exec -it pulsar-mysql /bin/bash
mysql -h localhost -uroot -pjdbc


(2)创建数据库和表。

$ create database test_jdbc;

$
 use test_jdbc;


$
 create table
 if not exists test_jdbc
(
 id INT AUTO_INCREMENT,
 name VARCHAR(255) NOT NULL,
 primary key (id)
)
engine=innodb;


3. 安装与启动 Pulsar

(1)下载并安装 Pulsar。

$ wget https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=pulsar/pulsar-2.4.0/apache-pulsar-2.4.0-bin.tar.gz

$
 tar -zxvf apache-pulsar-2.4.0-bin.tar.gz


$
 cd apache-pulsar-2.4.0


(2)启动 Pulsar。

$ bin/pulsar standalone -nss


(3)验证是否成功启动。

如果出现以下信息,则说明启动成功。

09:56:22.753 [pulsar-web-44-8] INFO  org.apache.pulsar.broker.web.PulsarWebResource - Successfully validated clusters on tenant [public]
09:56:22.761 [pulsar-web-44-8] INFO  org.apache.pulsar.broker.admin.impl.NamespacesBase - [null] Created namespace public/default
09:56:22.763 [pulsar-web-44-8] INFO  org.eclipse.jetty.server.RequestLog - 192.168.50.140 - - [29/七月/2019:09:56:22 +0800] "PUT /admin/v2/namespaces/public/default HTTP/1.1" 204 0 "-" "Pulsar-Java-v2.4.0" 12
09:56:22.771 [pulsar-web-44-11] INFO  org.apache.pulsar.broker.web.PulsarWebResource - Successfully validated clusters on tenant [public]
09:56:22.777 [pulsar-ordered-OrderedExecutor-1-0-EventThread] INFO  org.apache.pulsar.zookeeper.ZooKeeperDataCache - [State:CONNECTED Timeout:30000 sessionid:0x1003d74007c0003 local:/127.0.0.1:61606 remoteserver:localhost/127.0.0.1:2181 lastZxid:167 xid:42 sent:42 recv:44 queuedpkts:0 pendingresp:0 queuedevents:0] Received ZooKeeper watch event: WatchedEvent state:SyncConnected type:NodeDataChanged path:/admin/policies/public/default
09:56:22.778 [pulsar-web-44-11] INFO  org.apache.pulsar.broker.admin.impl.NamespacesBase - [null] Successfully updated the replication clusters on namespace public/default
09:56:22.779 [pulsar-web-44-11] INFO  org.eclipse.jetty.server.RequestLog - 192.168.50.140 - - [29/七月/2019:09:56:22 +0800] "POST /admin/v2/namespaces/public/default/replication HTTP/1.1" 204 0 "-" "Pulsar-Java-v2.4.0" 12


4. 增加配置文件

(1)创建 mysql-jdbc-sink.yaml 配置文件。

(2)复制以下内容至 mysql-jdbc-sink.yaml 文件。

以下内容指定了 MySQL 的用户名、密码、链接和表名。

configs:
   userName: "root"
   password: "jdbc"
   jdbcUrl: "jdbc:mysql://127.0.0.1:3306/test_jdbc"
   tableName: "test_jdbc"


5. 创建 schema

数据库的表包含 schema 信息,JDBC sink 也支持 schema。

因此,只要构建好 schema,即能直接从 topic 中读取消息,再通过 JDBC 将消息传送至数据库的表,该 schema 与数据库的表一一对应。 

以下示例创建 avro-schema 文件。

{
   "type": "AVRO",
   "schema": "{\"type\":\"record\",\"name\":\"Test\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"]},{\"name\":\"name\",\"type\":[\"null\",\"string\"]}]}",
   "properties": {}
}


提示:

更多关于 AVRO 字段的信息,参阅 AVRO 官网(https://avro.apache.org/docs/1.8.2/) 。

6. 上传 schema,启动 sink

(1)上传 schema 至 test-jdbc topic。

$ bin/pulsar-admin schemas upload test-jdbc -f avro-schema


(2)验证是否上传成功。

$ bin/pulsar-admin schemas get test-jdbc


如果出现以下信息,则说明上传成功。

{
  "name": "test-jdbc",
  "schema": {
    "type": "record",
    "name": "Test",
    "fields": [
      {
        "name": "id",
        "type": [
          "null",
          "int"
        ]
      },
      {
        "name": "name",
        "type": [
          "null",
          "string"
        ]
      }
    ]
  },
  "type": "AVRO",
  "properties": {}
}


(3)启动 sink。

$ bin/pulsar-admin sink localrun \
--archive connectors/pulsar-io-jdbc-2.4.0.nar \
--inputs test-jdbc \
--name mysql-jdbc-sink \
--sink-config-file connectors/mysql-jdbc-sink.yaml \
--parallelism 1


(4)验证是否启动成功。

如果出现以下信息,则说明启动成功。

10:01:20.357 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ClientCnx - [id: 0x21a4b716, L:/127.0.0.1:61690 - R:localhost/127.0.0.1:6650] Connected through proxy to target broker at tengdeMBP:6650
10:01:20.359 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [test-jdbc][public/default/mysql-jdbc-sink] Subscribing to topic on cnx [id: 0x21a4b716, L:/127.0.0.1:61690 - R:localhost/127.0.0.1:6650]
10:01:20.407 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [test-jdbc][public/default/mysql-jdbc-sink] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0


实践


Create

创建 sink


(1)创建 mysql-jdbc-sink,并指定 nar 文件、topic、名称、yaml 配置文件和并发数。

$ bin/pulsar-admin sink create \
--archive connectors/pulsar-io-jdbc-2.4.0.nar \
--inputs test-jdbc \
--name mysql-jdbc-sink \
--sink-config-file connectors/mysql-jdbc-sink.yaml \
--parallelism 1


(2)如果出现以下信息,则说明创建成功。

Created successfully


List

显示所有 sink


 (1)显示所有 sink。

$ bin/pulsar-admin sink list \
--tenant public \
--namespace default


(2)返回结果显示前文创建的 mysql-jdbc-sink。

[
 "mysql-jdbc-sink"
]


Get

显示 sink 的信息


 (1)显示 sink 的信息。

$ bin/pulsar-admin sink get \
--tenant public \
--namespace default \
--name mysql-jdbc-sink


(2)返回结果显示 mysql-jdbc-sink 的信息,包括租户、命名空间和名称等。

{
 "tenant": "public",
 "namespace": "default",
 "name": "mysql-jdbc-sink",
 "className": "org.apache.pulsar.io.jdbc.JdbcAutoSchemaSink",
 "inputSpecs": {
   "test-jdbc": {
     "isRegexPattern": false
   }
 },
 "configs": {
   "password": "jdbc",
   "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test_jdbc",
   "userName": "root",
   "tableName": "test_jdbc"
 },
 "parallelism": 1,
 "processingGuarantees": "ATLEAST_ONCE",
 "retainOrdering": false,
 "autoAck": true
}


Status

显示 sink 的状态


 (1)显示 mysql-jdbc-sink 的状态。

$ bin/pulsar-admin sink status \
--tenant public \
--namespace default \
--name mysql-jdbc-sink


(2)返回结果显示 mysql-jdbc-sink 的状态信息,包括实例数量、是否正在运行和 worker ID 等。

{
 "numInstances" : 1,
 "numRunning" : 1,
 "instances" : [ {
   "instanceId" : 0,
   "status" : {
     "running" : true,
     "error" : "",
     "numRestarts" : 0,
     "numReadFromPulsar" : 0,
     "numSystemExceptions" : 0,
     "latestSystemExceptions" : [ ],
     "numSinkExceptions" : 0,
     "latestSinkExceptions" : [ ],
     "numWrittenToSink" : 0,
     "lastReceivedTime" : 0,
     "workerId" : "c-standalone-fw-tengdeMBP.lan-8080"
   }
 } ]
}


Stop

停止 sink


 (1)停止 mysql-jdbc-sink。

$ bin/pulsar-admin sink stop \
--tenant public \
--namespace default \
--name mysql-jdbc-sink \
--instance-id 0


(2)如果出现以下信息,则说明停止成功。

Stopped successfully


Start

启动 sink


 (1)启动 mysql-jdbc-sink。

$ bin/pulsar-admin sink start \
--tenant public \
--namespace default \
--name mysql-jdbc-sink \
--instance-id 0


(2)如果出现以下信息,则说明启动成功。

Started successfully


Restart

重启 sink


 (1)重启 mysql-jdbc-sink。

$ bin/pulsar-admin sink restart \
--tenant public \
--namespace default \
--name mysql-jdbc-sink \
--instance-id 0


(2)如果出现以下信息,则说明重启成功。

Restarted successfully


Update

更新 sink


 (1)将 parallelism 更新至 2。

$ bin/pulsar-admin sink update \
--name mysql-jdbc-sink \
--parallelism 2


(2)如果出现以下信息,则说明更新成功。

Updated successfully


(3)查看 mysql-jdbc-sink 的信息,再次验证更新结果。

$ bin/pulsar-admin sink get \
--tenant public \
--namespace default \
--name mysql-jdbc-sink


(4)Parallelism 为 2,说明已更新成功。

{
 "tenant": "public",
 "namespace": "default",
 "name": "mysql-jdbc-sink",
 "className": "org.apache.pulsar.io.jdbc.JdbcAutoSchemaSink",
 "inputSpecs": {
   "test-jdbc": {
     "isRegexPattern": false
   }
 },
 "configs": {
   "password": "jdbc",
   "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test_jdbc",
   "userName": "root",
   "tableName": "test_jdbc"
 },
 "parallelism": 2,
 "processingGuarantees": "ATLEAST_ONCE",
 "retainOrdering": false,
 "autoAck": true
}


Delete

删除 sink


 (1)删除 mysql-jdbc-sink。

$ bin/pulsar-admin sink delete \
--tenant public \
--namespace default \
--name mysql-jdbc-sink


(2)如果出现以下信息,则说明删除成功。

Deleted successfully


(3)查看 mysql-jdbc-sink 的信息,再次验证删除结果。

$ bin/pulsar-admin sink get \
--tenant public \
--namespace default \
--name mysql-jdbc-sink


(4)mysql-jdbc-sink 不存在,说明已删除成功。

HTTP 404 Not Found

Reason: Sink mysql-jdbc-sink doesn't exist


Localrun

本地运行


更多关于 localrun 的使用示例,参阅上文 6. 上传 schema,启动 sink 的第 3 步。


总结


 本文介绍了 sink 的功能,并演示了如何创建与使用 JDBC sink 与 MySQL 进行连接



作者 | tuteng

审校 | Anonymitaet

编辑 | Irene



社区福利推荐

HBase 技术社区

Apache HBase技术社区,研究探讨HBase内核原理,源码剖析,周边生态以及实践应用,汇集众多Apache HBase PMC & Committer以及爱好使用者,提供一线Apache HBase企业实战以及Flink集成等资讯。


Apache Kylin

Apache Kylin 公众号,介绍 Kylin 的功能特性、应用案例、经验分享、社区资讯、活动等。开源大数据分布式 OLAP 引擎 Apache Kylin 于 2014 年开源,在 2015 年和 2016 年连续获得 InfoWorld 的 BOSSIE 奖:年度最佳开源大数据工具奖,发展至今在全球已经拥有超过 1000 家企业用户。作为首个被 Apache 软件基金会认证的由中国人主导的顶级开源项目,Kylin 为万亿数据提供亚秒级查询,并可以和现有的 Hadoop/Spark 及 BI 无缝集成。


Ververica

Apache Flink 社区公众号,由 Apache Flink Community China 运营管理,旨在联合国内的 Flink 大 V,向国内宣传和普及 Flink 相关的技术。公众号将持续输出 Flink 最新社区动态,入门教程、Meetup 资讯、应用案例以及源码解析等内容,希望联合社区同学一起推动国内大数据技术发展。


Apache Pulsar

Apache Pulsar 是下一代云原生流数据平台,助力企业快速分析实时数据,激活数据价值,实现 C 位出道。这里是 Pulsar 前沿技术的传播圣地,也是技术爱好者、开发者和终极用户时刻关注的技术平台。我们定时分享 Pulsar 优质内容,包括社区活动、技术文章、用户案例、行业动态和热点话题等,让你全面拥抱 Pulsar 的一手讯息。Apache Pulsar,助力千万企业和技术人开疆拓土、共同成长。



Apache Pulsar 是下一代云原生分布式流数据平台,它源于 Yahoo,2016 年 12 月开源,2018 年 9 月正式成为 Apache 顶级项目,逐渐从单一的消息系统演化成集消息、存储和函数式轻量化计算的流数据平台。在 Apache Pulsar 快速发展的过程中,社区的伙伴们也致力于硅谷以外的布道之旅,在中国社区开始了不平凡的历程。8 月 17 日,来自 Yahoo!Japan、腾讯、智联招聘、EMQ、Apache Fink 和 Apache Pulsar 社区的开源爱好者们将齐聚一堂,共同探讨 Pulsar 的用户案例、最佳实践和 Pulsar 特性等等,包括:

  1. Apache Pulsar at Yahoo! Japan

  2. 智联招聘如何参与社区开发以及 Key_Shared 等近期贡献特性详解

  3. Apache Pulsar 在腾讯计费场景下的实践

  4. Apache Pulsar 在 EMQ 物联网平台产品 ActorCloud 上的应用

  5. Pulsar 2.5.0 事务功能预览

  6. Apache Pulsar 和大数据生态的集成与实践

  7. 监控流系统中的 Flink 状态管理


扫描下图二维码或点击【阅读原文】,报名参加 Pulsar Beijing Meetup

本文分享自微信公众号 - StreamNative(StreamNative)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部