StreamNative 宣布 Spring for Apache Pulsar 正式发布

2022/12/21 19:00
阅读数 84

译者信息

王中兴,就职于 ebay 消息中间件团队,社区昵称 alphawang。

我们很高兴地宣布 Spring for Apache Pulsar[1] 的第一个里程碑版本已发布,现在你可以在 Spring 应用程序里直接利用 Apache Pulsar 的强大能力。

我们先来看一下 Apache Pulsar 与 Spring 集成带来的好处,然后通过示例应用程序讲解如何进行集成。

为什么使用 Spring for Apache Pulsar

Spring 是当今最流行的 Java 框架,它帮助开发人员快速、安全、轻松地创建生产就绪的应用程序。它是一个灵活的框架,提供开箱即用的默认设置以提高开发效率,还支持针对随时出现的需求进行定制。这使其成为构建云原生应用程序时的理想选择。

Apache Pulsar 是一个云原生流和消息平台,使组织能够在弹性云环境中构建可扩展、可靠的应用程序。它结合了传统消息系统以及发布订阅系统的最佳特性。在 Pulsar 的多层架构中,每一层都是可扩展、分布式的,并且与其他层解耦。计算和存储的分离支持独立地扩展每一层。

Pulsar 和 Spring 结合在一起,可以让你轻松快速地构建可扩展、稳健的数据应用程序。将 Pulsar 与 Spring 微服务相整合,可以进一步实现与其他语言编写的服务的无缝互操作。Spring for Apache Pulsar 提供了一个与 Pulsar 交互的工具包。从模板到监听器和自动配置,所有你喜欢的 Spring 概念现在都可以与 Pulsar 一起使用。如果你正在使用 Spring for Kafka 或 Spring AMQP,那么将 Pulsar 融入到你的现有架构中则非常简单。Spring for Pulsar 采用了相同的概念,使用起来不会有陌生的感觉。

如何使用 Spring for Apache Pulsar

我们将构建一个示例应用程序,通过消费注册数据来提醒客户成功团队有新客户。Spring 将运行我们的应用程序并提供配置,而 Pulsar 则作为消息总线来路由我们的数据。

该示例程序的完整源代码参考 GitHub 仓库[2] 。

观看如下演示视频,了解 Spring for Apache Pulsar 的实际应用: 

先决条件

示例应用程序使用 Maven 和 Java 17 运行。要开始使用 Spring for Apache Pulsar,首先需要将其作为依赖项添加到我们的 Spring 项目中。

<properties>
    <spring-pulsar.version>0.1.0-M1</spring-pulsar.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.springframework.pulsar</groupId>
        <artifactId>spring-pulsar-spring-boot-starter</artifactId>
        <version>${spring-pulsar.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-configuration-processor</artifactId>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>
    </plugins>
</build>
<repositories>
    <repository>
        <id>spring-milestones</id>
        <name>Spring Milestones</name>
        <url>https://repo.spring.io/milestone</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
</repositories>
<pluginRepositories>
    <pluginRepository>
        <id>spring-milestones</id>
        <name>Spring Milestones</name>
        <url>https://repo.spring.io/milestone</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </pluginRepository>
</pluginRepositories>

执行 mvn clean package 进行编译,并执行 mvn spring-boot:run 运行该示例应用程序。

我们还需要一个 Pulsar 集群来运行该程序。我们可以使用本地的 Standalone Pulsar 集群,也可以使用 StreamNative Cloud[3] 提供集群。

连接到 Pulsar

接下来使用 Spring Configuration 配置应用程序连接到 Pulsar。将以下内容添加到 src/main/resources/application.yml。

spring:
  pulsar:
    client:
      service-url: pulsar+ssl://free.o-j8r1u.snio.cloud:6651
      auth-plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
      authentication:
        private-key: file:///Users/user/Downloads/o-j8r1u-free.json
        audience: urn:sn:pulsar:o-j8r1u:free
        issuer-url: https://auth.streamnative.cloud/
    producer:
      batching-enabled: false
      topic-name: persistent://public/default/signups-topic

该配置使用 OAuth2 身份验证连接到 StreamNative Cloud。请参照 StreamNative Cloud 文档[4]获取身份验证凭据。我们还关闭了 Pulsar 生产者的批处理消息功能,并设置了其默认主题名称。

生产数据

现在可以开始向我们的集群发送消息了。在本示例中,我们将生成模拟的用户注册数据,并将其不断写入 signups-topic 主题。通过 Spring for Apache Pulsar,直接将 PulsarTemplate 添加到应用程序代码即可发送消息。

@EnableScheduling
@SpringBootApplication
public class SignupApplication {
    private static final Logger log = LoggerFactory.getLogger(SignupApplication.class);
    
    @Autowired private PulsarTemplate<Signup> signupTemplate;
    
    @Autowired private PulsarTemplate<Customer> customerTemplate;
    
    @Scheduled(initialDelay = 5000, fixedRate = 5000)
    void publishSignupData() throws PulsarClientException {
        Signup signup = signupGenerator.generate();
        signupTemplate.setSchema(JSONSchema.of(Signup.class));
        signupTemplate.send(signup);
    }
   …
}

上面的代码创建了一个定时任务来生成模拟的用户注册信息,并将其作为消息发送到我们在 application.yml 中配置的默认主题。我们可以看到,通过配置 PulsarTemplate 来使用 JSON Schema 发送消息非常简单。

消费数据

为了从我们的数据中获取价值,现在要对传入的注册信息进行过滤。如果注册的 tier 是 ENTERPRISE,我们将创建一个新 Customer 对象并向 customer-success 主题发布一条消息。要想消费 signups-topic 主题,我们需要在 SignupApplication 类中添加 PulsarListener 注解。

@PulsarListener(
    subscriptionName = "signup-consumer",
    topics = "signups-topic",
    schemaType = SchemaType.JSON)
void filterSignups(Signup signup) throws PulsarClientException {
    log.info(
        "{} {} ({}) just signed up for {} tier",
        signup.firstName(),
        signup.lastName(),
        signup.companyEmail(),
        signup.signupTier());

    if (signup.signupTier() == SignupTier.ENTERPRISE) {
        Customer customer = Customer.from(signup);
        customerTemplate.setSchema(JSONSchema.of(Customer.class));
        customerTemplate.send("customer-success", customer);
    }
}

PulsarListener 注解配置了一个 Pulsar 消费者来使用给定的 Schema 读取特定主题。在 filterSignups 方法中,我们使用了之前添加的第二个 PulsarTemplate。这次,我们不想向默认主题发送消息,因此我们传入 customer-success 作为要写入的主题名称。

最后,我们的客户成功团队现在可以收到所有新企业客户注册的提醒。要做到这一点,他们只需要使用 Customer Schema 来消费 customer-success 主题即可。

@PulsarListener(
    subscriptionName = "customer-consumer",
    topics = "customer-success",
    schemaType = SchemaType.JSON)
void alertCustomerSuccess(Customer customer) {
    log.info(
        "## Start the onboarding for {} - {} {} ({}) - {} ##",
        customer.companyName(),
        customer.firstName(),
        customer.lastName(),
        customer.phoneNumber(),
        customer.companyEmail());
}

高级特性

Spring for Apache Pulsar 提供了许多高级特性。例如,我们可以使用 ProducerInterceptor 来调试消息的记录。

首先,我们在 Spring 配置类中添加一个 ProducerInterceptor bean。 ProducerInterceptor 实现类仅需要在 Broker 确认后记录消息信息。

@Configuration(proxyBeanMethods = false)
class SignupConfiguration {

  @Bean
  ProducerInterceptor loggingInterceptor() {
    return new LoggingInterceptor();
  }

  static class LoggingInterceptor implements ProducerInterceptor {
    private static final Logger log = LoggerFactory.getLogger(LoggingInterceptor.class);

    @Override
    public void close() {
      // no-op
    }

    @Override
    public boolean eligible(Message message) {
      return true;
    }

    @Override
    public Message beforeSend(Producer producer, Message message) {
      return message;
    }

    @Override
    public void onSendAcknowledgement(
        Producer producer, Message message, MessageId msgId, Throwable exception) {
      log.debug("MessageId: {}, Value: {}", message.getMessageId(), message.getValue());
    }
  }
}

得益于 Spring 自动配置的特性,我们的应用程序将自动配置 LoggingInterceptor 来拦截所有 Pulsar 生产者发送的消息。

接下来在 application.yml 中将日志级别设置为 debug,拦截器即可正常工作。

logging:
  level:
    io.streamnative.example: debug

总结

在这篇博客中,我们探讨了如何使用 Spring for Apache Pulsar 快速构建一个示例应用程序。Spring for Apache Pulsar 集成还提供更多功能,例如订阅类型、批处理和手动确认。对于进一步需要外部系统中数据读写权限的高级应用程序,我们可以添加 Pulsar IO 连接器。如果你有兴趣了解更多 Pulsar IO 连接器的信息,请访问 StreamNative Hub[5]

更多资源

  • • 查看用户注册示例应用程序的完整源代码[6]

  • • 免费试用 StreamNative Cloud[7] 以开始使用 Apache Pulsar

  • • 帮助 Spring for Apache Pulsar[8] 开源项目发展

  • • 查看 Spring for Apache Pulsar[9] 文档

引用链接

[1] Spring for Apache Pulsar: https://docs.spring.io/spring-pulsar/docs/0.1.0-M1/reference/html/
[2] GitHub 仓库: https://github.com/streamnative/examples/tree/master/spring-pulsar
[3] StreamNative Cloud: https://streamnative.io/streamnativecloud/
[4] StreamNative Cloud 文档: https://docs.streamnative.io/cloud/stable/connect/overview
[5] StreamNative Hub: https://hub.streamnative.io/
[6] 完整源代码: https://github.com/streamnative/examples/tree/master/spring-pulsar
[7] StreamNative Cloud: https://streamnative.io/streamnativecloud/
[8] Spring for Apache Pulsar: https://github.com/spring-projects-experimental/spring-pulsar
[9] Spring for Apache Pulsar: https://docs.spring.io/spring-pulsar/docs/0.1.0-M1/reference/html/


本文分享自微信公众号 - StreamNative(StreamNative)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部