Apache Pulsar 的 Golang admin 库简介

2023/07/13 08:40
阅读数 66

Apache Pulsar 是一种高度可扩展和可靠的消息传递系统,正在得到开发者的广泛关注。Pulsar 提供了许多功能和优势,使其成为现代数据流应用程序的热门选择。然而,管理 Pulsar 集群可能是一个复杂的任务,这就是 StreamNative 创建 pulsar-admin-go 库的原因。

pulsar-admin-go 是一个 Go 库,为开发人员提供了统一的 API 集合,可编程地管理 Pulsar 集群。它使自动化任务变得轻松,并将 Pulsar 管理无缝集成到您的应用程序中。

了解 Apache Pulsar 的同学几乎都听说过 StreamNative 这家开源商业化公司,由 Apache Pulsar 创始团队创立,致力于 Apache Pulsar 项目的推广和社区的构建,主要构建 cloud-native event streaming 的平台,帮助企业更容易地处理实时数据。

在这篇博客中,我们将更详细地介绍 pulsar-admin-go 库的功能、优势和高级用法。我们也将提供逐步说明的安装和使用库的指南。

I. pulsar-admin-go 库概述

pulsar-admin-go 库为主题、分区和订阅提供了一系列有用的管理功能。

  • • 主题:创建、删除、获取主题元数据和列出主题。

  • • 分区:向主题添加和删除分区。

  • • 订阅:创建和删除订阅、获取现有订阅的元数据和列出订阅。

  • • 主题统计信息:通过主题统计信息列出主题的生产者和消费者。

  • • 集群(Clusters):获取集群元数据,列出集群,以及设置和更新集群属性。

pulsar-admin-go 库为开发人员提供了多个优点:

  1. 1. 统一的 Go API:开发人员可以使用统一的 Go API 操作 Pulsar 资源。通过抽象底层的 Pulsar 管理 HTTP 操作,简化了 Pulsar 管理任务。

  2. 2. 开发简化:使用 pulsar-admin-go 库可以无缝集成其他管理工具,如 “terrafrom-provider-pulsar”,“pulsar-resources-operator” 以及 “pulsarctl” 等。

  3. 3. 依赖管理改进:更容易控制 Go 模块依赖和软件版本。

II. 安装 pulsar-admin-go 库

要使用 pulsar-admin-go 库,您需要 Go 版本 1.18 或更高版本,并启用 Go 模块。为安装库,请运行以下命令:

go get github.com/streamnative/pulsar-admin-go

III. 基本用法

下面是使用 pulsar-admin-go 库的一些基本示例。

要连接到 Pulsar 集群,您可以使用 ServiceURL 或 Auth Token 创建 Admin 客户端。

使用 ServiceURL 创建一个 Admin 客户端。

package main

import (
    "github.com/streamnative/pulsar-admin-go"
)

func main() {
    cfg := &pulsaradmin.Config{
        WebServiceURL: "http://localhost:8080",
    }
    admin, err := pulsaradmin.NewClient(cfg)
    if err != nil {
        panic(err)
    }
}

使用身份验证令牌创建管理客户端。

import (
    "github.com/streamnative/pulsar-admin-go"
)

func main() {
    cfg := &pulsaradmin.Config{
        // 使用 JWT 令牌进行身份验证,请注意分配具有 Pulsar 管理员角色的令牌
        // https://pulsar.apache.org/docs/2.11.x/security-jwt/#configure-jwt-authentication-in-pulsar-clients
        Token: "eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY",
    }
    admin, err := pulsaradmin.NewClient(cfg)
    if err != nil {
        panic(err)
    }
}

创建租户、命名空间和主题。

import (
    "github.com/streamnative/pulsar-admin-go"
    "github.com/streamnative/pulsar-admin-go/pkg/utils"
)

func main() {
    cfg := &pulsaradmin.Config{}
    admin, err := pulsaradmin.NewClient(cfg)
    if err != nil {
        panic(err)
    }

    // 创建新租户
    admin.Tenants().Create(utils.TenantData{
        Name: "new-tenant",
    })

    // 创建新命名空间
    admin.Namespaces().CreateNamespace("new-tenant/new-namespace")

    // 创建拥有 3 个分区的新主题
    topic, _ := utils.GetTopicName("new-tenant/new-namespace/new-topic")
    admin.Topics().Create(*topic, 3)
}

IV. 高级用法

以下是使用 pulsar-admin-go 库的高级示例。

配置地理复制。

import (
    "github.com/streamnative/pulsar-admin-go"
    "github.com/streamnative/pulsar-admin-go/pkg/utils"
)

func main() {
    cfg := &pulsaradmin.Config{}
    admin, err := pulsaradmin.NewClient(cfg)
    if err != nil {
        panic(err)
    }

    // 部署两个 pulsar 集群,并将数组 clusters 设置为已部署 pulsar 集群的名称
    // https://pulsar.apache.org/docs/2.11.x/install-deploy-upgrade-landing/
    // https://pulsar.apache.org/docs/2.11.x/administration-geo/
    clusters := []string{"us-west""us-east"}

    admin.Tenants().Create(utils.TenantData{
        Name:            "geo-tenant",
        AllowedClusters: clusters,
    })

    admin.Namespaces().CreateNamespace("geo-tenant/geo-ns")
    admin.Namespaces().SetNamespaceReplicationClusters("geo-tenant/geo-ns", clusters)
}

配置命名空间和主题的权限。

import (
   "github.com/streamnative/pulsar-admin-go"
   "github.com/streamnative/pulsar-admin-go/pkg/utils"
)

func main() {
   cfg := &pulsar

admin.Config{}
   admin, err := pulsaradmin.NewClient(cfg)
   if err != nil {
      panic(err)
   }

   // 为命名空间 public/default 配置管理权限
   ns, _ := utils.GetNamespaceName("public/default")
   admin.Namespaces().GrantNamespacePermission(*ns, "admin", []utils.AuthAction{"produce""consume"})

   // 为主题 public/default/admin 配置管理权限
   tp, _ := utils.GetTopicName("public/default/admin")
   admin.Topics().GrantPermission(*tp, "admin", []utils.AuthAction{"produce""consume"})
}

配置命名空间的保留策略。

import (
    "fmt"

    "github.com/streamnative/pulsar-admin-go"
    "github.com/streamnative/pulsar-admin-go/pkg/utils"
)

func main() {
    cfg := &pulsaradmin.Config{}
    admin, err := pulsaradmin.NewClient(cfg)
    if err != nil {
        panic(err)
    }

    // Create a new tenant
    admin.Tenants().Create(utils.TenantData{
        Name: "new-tenant",
    })

    // Create a new namespace
    admin.Namespaces().CreateNamespace("new-tenant/new-namespace")

    // Set the retention policy for this namespace
    admin.Namespaces().SetRetention("new-tenant/new-namespace", utils.RetentionPolicie{RetentionSizeInMB: 10240, RetentionTimeInMinutes: 180})

    // Get the retention policy for this namespace
    fmt.Println(admin.Namespaces().GetRetention("new-tenant/new-namespace"))
}

VI. 结论

pulsar-admin-go 库是使用 Go 方便管理 Apache Pulsar 集群的一种方法。该库提供了一组直观的接口,使您能够轻松地执行各种任务。此库允许您自动化 Pulsar 管理任务,并将其集成到您的应用程序中。通过使用 pulsar-admin-go,管理 Pulsar 集群变得更加容易和高效,使您能够充分发挥这个强大的消息系统的优势。

VII. 更多资源

查看 GitHub 仓库[1]使用文档[2]以及贡献指南[3],加入 Apache Pulsar 社区,共同建设这个令人兴奋的项目,参与消息和流处理的未来发展~

引用链接

[1] GitHub 仓库: https://github.com/streamnative/pulsar-admin-go
[2] 使用文档: https://pkg.go.dev/github.com/streamnative/pulsar-admin-go@v0.1.0/pkg/admin/auth
[3] 贡献指南: https://pulsar.apache.org/contribute/


本文分享自微信公众号 - StreamNative(StreamNative)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
0 评论
0 收藏
0
分享
返回顶部
顶部