[Pulsar-源码] Pulsar client初始化

07/01 10:32
阅读数 298

文章目录
Pulsar client初始化过程
Producer 初始化
Pulsar client初始化过程
初始化Pulsar Producer和Consumer都需要先初始化Pulsar client。示例:

PulsarClient client = PulsarClient.builder()
        .serviceUrl("pulsar://127.0.0.1:6650")
        .build();
1
2
3
PulsarClient.builder()

会创建一个ClientBuilderImpl一个实例, 并用一个 ClientConfigurationData实例来初始化conf对象;

.serviceUrl("pulsar://127.0.0.1:6650")

为conf对象设置serviceUrl参数,并判断是否使用了TLS

.build()

首先简要serviceUrl参数和serviceUrlProvider,两者只能存在一个

使用conf初始化PulsarClientImpl

初始化EventLoopGroup:(EpollEventLoopGroup或者NioEventLoopGroup),这个过程会new一个客户端线程工厂

初始化ConnectionPool:初始化bootstrap(client) ,设置channel的一些参数,并且指定PulsarChannelInitializer, channel初始化时,会给channel设置handler,其中包括一个ClientCnx,主要用来处理broker的结果响应;最后会初始化一个DnsNameResolver

初始化producer、consumer以及request的ID生成器

初始化externalExecutorProvider,初始化LookupService, 这个过程中会初始化PulsarServiceNameResolver, 并且在 PulsarServiceNameResolver中解析、记录url的相关信息;

初始化producers和consumers两个map,将状态设置为OPEN

至此,Pulsarclient初始化完毕。

Producer 初始化
向pulsar生产数据,需要首先初始化一个producer,

final Producer<byte[]> producer = client.newProducer()
                    .topic(topic)
                    .maxPendingMessages(5000)
                    .enableBatching(true)
                    .create();
1
2
3
4
5
过程比较简单,

初始化一个ProducerBuilderImpl

设置 topic 参数

设置最大缓存的Message数量

设置是否支持批量发送

创建Producer

设置消息路由模式,包括SinglePartition, RoundRobinPartition和CustomPartition, 使用 ``CustomPartition`时,需要实现router
异步创建producer,首先需要获取topic的元数据信息,这是需要创建连接,调用关系如下图所示: 最终会调用bootstrap.connect()方法创建netty连接。netty连接创建完成之后,为 netty channel 添加closeFuture的清理逻辑,然后创建ClientCnx对象,并设置remoteaddress等属性信息。
创建连接完成之后,会创建newPartitionMetadataRequest并且携带topic作为参数,然后发送newPartitionMetadataRequest到服务端。服务端会返回对应的分区数量信息。
获取到topic的分区信息之后,根据分区数量创建PartitionedProducerImpl或者ProducerImpl。
至此,producer创建完毕。
————————————————
 

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部