Apache Pulsar 是一种高度可扩展和可靠的消息传递系统,正在得到开发者的广泛关注。Pulsar 提供了许多功能和优势,使其成为现代数据流应用程序的热门选择。然而,管理 Pulsar 集群可能是一个复杂的任务,这就是 StreamNative 创建 pulsar-admin-go
库的原因。
pulsar-admin-go
是一个 Go 库,为开发人员提供了统一的 API 集合,可编程地管理 Pulsar 集群。它使自动化任务变得轻松,并将 Pulsar 管理无缝集成到您的应用程序中。
了解 Apache Pulsar 的同学几乎都听说过 StreamNative 这家开源商业化公司,由 Apache Pulsar 创始团队创立,致力于 Apache Pulsar 项目的推广和社区的构建,主要构建 cloud-native event streaming 的平台,帮助企业更容易地处理实时数据。
在这篇博客中,我们将更详细地介绍 pulsar-admin-go
库的功能、优势和高级用法。我们也将提供逐步说明的安装和使用库的指南。
I. pulsar-admin-go
库概述
pulsar-admin-go
库为主题、分区和订阅提供了一系列有用的管理功能。
• 主题:创建、删除、获取主题元数据和列出主题。
• 分区:向主题添加和删除分区。
• 订阅:创建和删除订阅、获取现有订阅的元数据和列出订阅。
• 主题统计信息:通过主题统计信息列出主题的生产者和消费者。
• 集群(Clusters):获取集群元数据,列出集群,以及设置和更新集群属性。
pulsar-admin-go
库为开发人员提供了多个优点:
1. 统一的 Go API:开发人员可以使用统一的 Go API 操作 Pulsar 资源。通过抽象底层的 Pulsar 管理 HTTP 操作,简化了 Pulsar 管理任务。
2. 开发简化:使用
pulsar-admin-go
库可以无缝集成其他管理工具,如 “terrafrom-provider-pulsar”,“pulsar-resources-operator” 以及 “pulsarctl” 等。3. 依赖管理改进:更容易控制 Go 模块依赖和软件版本。
II. 安装 pulsar-admin-go
库
要使用 pulsar-admin-go
库,您需要 Go 版本 1.18 或更高版本,并启用 Go 模块。为安装库,请运行以下命令:
go get github.com/streamnative/pulsar-admin-go
III. 基本用法
下面是使用 pulsar-admin-go
库的一些基本示例。
要连接到 Pulsar 集群,您可以使用 ServiceURL 或 Auth Token 创建 Admin 客户端。
使用 ServiceURL 创建一个 Admin 客户端。
package main
import (
"github.com/streamnative/pulsar-admin-go"
)
func main() {
cfg := &pulsaradmin.Config{
WebServiceURL: "http://localhost:8080",
}
admin, err := pulsaradmin.NewClient(cfg)
if err != nil {
panic(err)
}
}
使用身份验证令牌创建管理客户端。
import (
"github.com/streamnative/pulsar-admin-go"
)
func main() {
cfg := &pulsaradmin.Config{
// 使用 JWT 令牌进行身份验证,请注意分配具有 Pulsar 管理员角色的令牌
// https://pulsar.apache.org/docs/2.11.x/security-jwt/#configure-jwt-authentication-in-pulsar-clients
Token: "eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY",
}
admin, err := pulsaradmin.NewClient(cfg)
if err != nil {
panic(err)
}
}
创建租户、命名空间和主题。
import (
"github.com/streamnative/pulsar-admin-go"
"github.com/streamnative/pulsar-admin-go/pkg/utils"
)
func main() {
cfg := &pulsaradmin.Config{}
admin, err := pulsaradmin.NewClient(cfg)
if err != nil {
panic(err)
}
// 创建新租户
admin.Tenants().Create(utils.TenantData{
Name: "new-tenant",
})
// 创建新命名空间
admin.Namespaces().CreateNamespace("new-tenant/new-namespace")
// 创建拥有 3 个分区的新主题
topic, _ := utils.GetTopicName("new-tenant/new-namespace/new-topic")
admin.Topics().Create(*topic, 3)
}
IV. 高级用法
以下是使用 pulsar-admin-go
库的高级示例。
配置地理复制。
import (
"github.com/streamnative/pulsar-admin-go"
"github.com/streamnative/pulsar-admin-go/pkg/utils"
)
func main() {
cfg := &pulsaradmin.Config{}
admin, err := pulsaradmin.NewClient(cfg)
if err != nil {
panic(err)
}
// 部署两个 pulsar 集群,并将数组 clusters 设置为已部署 pulsar 集群的名称
// https://pulsar.apache.org/docs/2.11.x/install-deploy-upgrade-landing/
// https://pulsar.apache.org/docs/2.11.x/administration-geo/
clusters := []string{"us-west", "us-east"}
admin.Tenants().Create(utils.TenantData{
Name: "geo-tenant",
AllowedClusters: clusters,
})
admin.Namespaces().CreateNamespace("geo-tenant/geo-ns")
admin.Namespaces().SetNamespaceReplicationClusters("geo-tenant/geo-ns", clusters)
}
配置命名空间和主题的权限。
import (
"github.com/streamnative/pulsar-admin-go"
"github.com/streamnative/pulsar-admin-go/pkg/utils"
)
func main() {
cfg := &pulsar
admin.Config{}
admin, err := pulsaradmin.NewClient(cfg)
if err != nil {
panic(err)
}
// 为命名空间 public/default 配置管理权限
ns, _ := utils.GetNamespaceName("public/default")
admin.Namespaces().GrantNamespacePermission(*ns, "admin", []utils.AuthAction{"produce", "consume"})
// 为主题 public/default/admin 配置管理权限
tp, _ := utils.GetTopicName("public/default/admin")
admin.Topics().GrantPermission(*tp, "admin", []utils.AuthAction{"produce", "consume"})
}
配置命名空间的保留策略。
import (
"fmt"
"github.com/streamnative/pulsar-admin-go"
"github.com/streamnative/pulsar-admin-go/pkg/utils"
)
func main() {
cfg := &pulsaradmin.Config{}
admin, err := pulsaradmin.NewClient(cfg)
if err != nil {
panic(err)
}
// Create a new tenant
admin.Tenants().Create(utils.TenantData{
Name: "new-tenant",
})
// Create a new namespace
admin.Namespaces().CreateNamespace("new-tenant/new-namespace")
// Set the retention policy for this namespace
admin.Namespaces().SetRetention("new-tenant/new-namespace", utils.RetentionPolicie{RetentionSizeInMB: 10240, RetentionTimeInMinutes: 180})
// Get the retention policy for this namespace
fmt.Println(admin.Namespaces().GetRetention("new-tenant/new-namespace"))
}
VI. 结论
pulsar-admin-go
库是使用 Go 方便管理 Apache Pulsar 集群的一种方法。该库提供了一组直观的接口,使您能够轻松地执行各种任务。此库允许您自动化 Pulsar 管理任务,并将其集成到您的应用程序中。通过使用 pulsar-admin-go
,管理 Pulsar 集群变得更加容易和高效,使您能够充分发挥这个强大的消息系统的优势。
VII. 更多资源
查看 GitHub 仓库[1],使用文档[2]以及贡献指南[3],加入 Apache Pulsar 社区,共同建设这个令人兴奋的项目,参与消息和流处理的未来发展~
引用链接
[1]
GitHub 仓库: https://github.com/streamnative/pulsar-admin-go[2]
使用文档: https://pkg.go.dev/github.com/streamnative/pulsar-admin-go@v0.1.0/pkg/admin/auth[3]
贡献指南: https://pulsar.apache.org/contribute/
本文分享自微信公众号 - StreamNative(StreamNative)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。