技术探究|深入解析 Apache Pulsar Key_Shared 订阅模式原理与最佳实践

2022/11/29 18:30
阅读数 1K

作者简介

赵亚军,StreamNative Solution Engineer

导语

Apache Pulsar 统一了消息和流两类场景。除了计算存储分离和分片存储,Pulsar 还提供了多种订阅模式[1]来满足用户多样的业务场景。如果你正在寻找局部顺序消费并突破 Partition 并发数约束的方案,Key_Shared 模式可以帮助你,下面给大家详细剖析 Key_Shared 订阅模式。

顺序消费场景

很多业务场景对顺序消费有非常强的诉求,特别是流和 CDC(Change Data Capture,变化数据捕获)场景,例如在大型网站的点击事件分析过程中,既要同时处理大量的个人点击事件,又要捕获到每个人的点击顺序,从而分析出用户的点击行为。

目前大部分 MQ 支持两种顺序模式,一种全局有序,要求 Topic 只能有一个 Partition,对生产和消费的并行度有较大的限制;另一种是局部有序,保证 Message 中 Key 的有序生产和消费,例如订单 ID 或者用户 ID,这也是业务场景使用最多的一种方式。Kafka 和 RocketMQ 采用的是这种形式,通过将相同的消息 Key 路由到相同的 Partition 中,单个 Partition 的消息只能被同一个 Consumer 消费。但是在消息量非常大的情况下,系统会出现性能瓶颈,因为相同消费组的 Consumer 个数受限于 Partition 的个数。

Pulsar 的 Key_Shared 模式可以很好解决这个问题,Consumer 的消息按照 Key 分配,因此 Key 分散度越高,Consumer 的并发度越高,接下来详细介绍其工作原理。

Key_Shared 模式原理剖析

本文主要介绍 Pulsar 在 Key_Shared 模式下的消费分发机制。Broker 内部的 Dispatcher 组件负责消费的消息分发和流量控制。Pulsar 支持的 4 种订阅模式也是通过 Dispatcher 来控制的,其中 Key_Shared 模式下 Dispatcher 重要的功能是把相同 Key 的消息分配给相同的 Consumer,从而保证相同 Key 的消息有序。

KEY 分配机制

如果预先不知道有哪些 Key 的存在,可以将 Key 进行哈希后得到一个 HashCode,再将 HashCode 映射到一个指定范围 Key Range,然后 Dispatcher 根据 Key Range 对消息进行分配。下面介绍如何计算 Hash 和分配消息。

Hash 计算

Dispatcher 首先获取消息的 OrderingKey。如果 OrderingKey 为空,再尝试获取设置的 Key 字段进行 Hash 运算,默认使用 MurmurHash 算法计算出 HashCode,具体方法如下:

public int makeHash(byte[] b) {
   return org.apache.pulsar.common.util.Murmur3_32Hash.getInstance().makeHash(b) & Integer.MAX_VALUE;
}

Consumer 分配

Consumer 分配由StickyKeyConsumerSelector接口定义,应具备以下几点能力:

  1. 1. 增加/删除 Consumer 对应的动作,以及删除对应的内部 Hash 映射等;

  2. 2. 获取所有的 Consumer 对应的 Range;

  3. 3. 根据计算出的 Key 的 Hash 值,选出对应的 Consumer。

Pulsar 提供了多种分配模式实现,下面分别进行介绍。

1. Hash Range Auto Split 算法

Auto Split 是 Pulsar 最早支持的一种模式,每次增加一个 Consumer 时会寻找一个 Consumer 对应的最大 Range,然后将该 Range 一分为二。例如, Key Range 为 65536 , 第一个 Consumer 订阅后直接分配 [0, 65536) (consumer-1),如果再增加一个 Consumer 则平均分配成 [0, 32768)(consumer-1)和 [32768, 65536)(consumer-2)。如果减少一个 Consumer,则同相邻的 Range 进行合并。

public synchronized void addConsumer(Consumer consumer) throws ConsumerAssignException {
   if (rangeMap.isEmpty()) {
       rangeMap.put(rangeSize, consumer);
       consumerRange.put(consumer, rangeSize);
   } else {
       splitRange(findBiggestRange(), consumer);
   }
}
public synchronized void removeConsumer(Consumer consumer) {
   Integer removeRange = consumerRange.remove(consumer);
   if (removeRange != null) {
       if (removeRange == rangeSize && rangeMap.size() > 1) {
           Map.Entry<Integer, Consumer> lowerEntry = rangeMap.lowerEntry(removeRange);
           rangeMap.put(removeRange, lowerEntry.getValue());
           rangeMap.remove(lowerEntry.getKey());
           consumerRange.put(lowerEntry.getValue(), removeRange);
       } else {
           rangeMap.remove(removeRange);
       }
   }
}

计算出的 hash 值可能会很大,为了便于后续处理,规范 Hash 值落到 0 ~ 65536 区间,所以对 hash 值按照 65536 取模计算处理,具体处理如下:

int hash = org.apache.pulsar.common.util.Murmur3_32Hash.getInstance().makeHash(b) & Integer.MAX_VALUE;
int slot = hash % rangeSize;

举例:

下图显示了调度的过程,图中有 3 种颜色的区块,分别代表消费者 C1,C2 和 C3。横轴代表时间,纵轴代表哈希范围值。

  1. 1. 在 0 时刻,创建此订阅,并添加第一个消费者 C1,所有哈希范围 (0 - 1) 由消费者 C1 提供服务。

  2. 2. 在 T1 时刻,一个新的消费者 C2 加入,哈希范围 (0 - 0.5) 仍然由 C1 服务,而另一半哈希范围 (0.5 - 1) 由新消费者 C2 服务。

  3. 3. 在 T2 时刻,新的消费者 C3 加入,寻找最大的哈希范围。由于范围相同,则任意选择一个消费者如 C2,拆分并共享 C2 的哈希范围。哈希范围 (0.5 - 0.75)仍由 C2 提供,而 C3 提供哈希范围 (0.75 - 1) 。

  4. 4. 在 T3 时刻,C1 关闭,因为 C1 与 C2 相邻,所以它的哈希范围将分配给 C2, C2 将服务 (0 - 0.75)。

  5. 5. 在 T4 时刻,C2 关闭,其哈希范围分配给 C3,C3 将服务于整个范围(0 - 1)。

该算法虽简单,但有一定的缺陷,每个消费者可能分配的数据不均匀,如从 T2 到 T3 时刻,消费者 C1 的数据量大于消费者 C2 和 C3。为了改进该算法,引入了 Consistent Hash 算法。

2. Consistent Hash Auto Split 算法

为了能够让 Consumer 尽量均匀消费,采用 Consistent Hash 算法 ,构成一个 Hash 环(0, Integer.MAX_VALUE)。当增加一个 Consumer 时,Hash 环上会增加若干个(见下面subscriptionKeySharedConsistentHashingReplicaPoints参数配置,默认为 100 个)虚拟节点;当再增加一个 Consumer 时,则再增加若干个虚拟节点,从而调度数据在各个 Consumer 之间达到均衡。虚拟节点按照 Consumer Name 拼接上虚拟节点个数下标得出具体的 Hash 环位置,关键代码如下:

public void addConsumer(Consumer consumer) throws ConsumerAssignException {
   rwLock.writeLock().lock();
   try {
       // Insert multiple points on the hash ring for every consumer
       // The points are deterministically added based on the hash of the consumer name
       for (int i = 0; i < numberOfPoints; i++) {
           String key = consumer.consumerName() + i;
           int hash = Murmur3_32Hash.getInstance().makeHash(key.getBytes());
           hashRing.compute(hash, (k, v) -> {
               if (v == null) {
                   return Lists.newArrayList(consumer);
               } else {
                   if (!v.contains(consumer)) {
                       v.add(consumer);
                       v.sort(Comparator.comparing(Consumer::consumerName, String::compareTo));
                   }
                   return v;
               }
           });
       }
   } finally {
       rwLock.writeLock().unlock();
   }
}
public void removeConsumer(Consumer consumer) {
   rwLock.writeLock().lock();
   try {
       // Remove all the points that were added for this consumer
       for (int i = 0; i < numberOfPoints; i++) {
           String key = consumer.consumerName() + i;
           int hash = Murmur3_32Hash.getInstance().makeHash(key.getBytes());
           hashRing.compute(hash, (k, v) -> {
               if (v == null) {
                   return null;
               } else {
                   v.removeIf(c -> c.equals(consumer));
                   if (v.isEmpty()) {
                       v = null;
                   }
                   return v;
               }
           });
       }
   } finally {
       rwLock.writeLock().unlock();
   }
}

举例:

当增加消费者 C1 时,增加若干个 C1 的虚拟节点;继续增加消费者 C2 时,再增加若干个 C2 的虚拟节点。

当关闭消费者 C2 时,直接移除所有 C2 的虚拟节点。

在该模式下,Broker 根据 Key 的 Hash 值在 Hash 环上找到最近的虚拟节点即可选择 Consumer。

public Consumer select(int hash) {
   rwLock.readLock().lock();
   try {
       if (hashRing.isEmpty()) {
           return null;
       }

       List<Consumer> consumerList;
       Map.Entry<Integer, List<Consumer>> ceilingEntry = hashRing.ceilingEntry(hash);
       if (ceilingEntry != null) {
           consumerList =  ceilingEntry.getValue();
       } else {
           consumerList = hashRing.firstEntry().getValue();
       }

       return consumerList.get(hash % consumerList.size());
   } finally {
       rwLock.readLock().unlock();
   }
}

3. Hash Range Exclusive Sticky Key 算法

Hash Range Exclusive 算法允许客户自己设置 Hash 的 Range,当服务端在选择消息投递时,会根据计算的 Hash 值,投递到指定的 Hash Range 的 Consumer 中。

举例:

消费者 C1、C2、C3、C4 在订阅时会指定各自的 Range,在消息分发时使用提前设定好的 Hash 范围。

在以上三种算法中,前两种不需要手动设定 Hash 范围,第三种 Hash Range Exclusive 算法需要 Consumer 在订阅时设定自己的 Hash 范围,这种适合于已知 Key 或者固定 Key 的情况。如果以上三种还不能满足用户的实际需求,可以使用的StickyKeyConsumerSelector扩展接口实现自己的调度算法。

最佳实践

Broker 配置

在 conf/broker.conf 中配置如下,默认为 Consistent Hash Auto Split。

# Enable Key_Shared subscription (default is enabled)
# @deprecated since 2.8.0 subscriptionTypesEnabled is preferred over subscriptionKeySharedEnable.
subscriptionKeySharedEnable=true

# On KeyShared subscriptions, with default AUTO_SPLIT mode, use splitting ranges or
# consistent hashing to reassign keys to new consumers
subscriptionKeySharedUseConsistentHashing=true

# On KeyShared subscriptions, number of points in the consistent-hashing ring.
# The higher the number, the more equal the assignment of keys to consumers
subscriptionKeySharedConsistentHashingReplicaPoints=100

生产端配置

如果使用了 Batch(默认开启),一个 Batch 中会有多个 Key 的消息,分发时只会取第一条消息的 Key,这样会出现同一个 Key 被不同的 Consumer 消费情况。为了避免这种情况发生,需要在生产时设置BatcherBuilder.KEY_BASED参数 :

Producer<byte[]> producer = pulsarClient.newProducer()
       .topic("persistent://tenant/namespace/topic")
       // 让相同的key在一个批次里面
       .batcherBuilder(BatcherBuilder.KEY_BASED)
       .create();

消费端配置

在构建消费者时,需要指定订阅模式为 key_Shared,然后根据场景设置 KeySharedPolicy,可以配置 KeySharedMode.AUTO_SPLIT(默认),或KeySharedMode.STICKY,从而选择不同的算法模式。在KeySharedMode.AUTO_SPLIT模式中,通过 Broker 端的配置subscriptionKeySharedUseConsistentHashing=true选择 Consistent Hash 算法(默认),或通过 Broker 端的配置subscriptionKeySharedUseConsistentHashing=false选择 Hash Range Auto Split 算法,具体见上面 Broker 侧的配置。

  • • AUTO_SPLIT

     Consumer<String> consumer = client.newConsumer(Schema.STRING)
       .topic(topicName)
       .subscriptionName(subscriptionName)
       // 声明消费模式为Key_Shared模式
       .subscriptionType(SubscriptionType.Key_Shared)
       .keySharedPolicy(KeySharedMode.AUTO_SPLIT)
  .subscribe();
  • • STICKY

Consumer<byte[]> consumer = client.newConsumer()
       .topic(topicName)
       .subscriptionName(subscriptionName)
       .subscriptionType(SubscriptionType.Key_Shared)
.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(List<Range> ranges))
.subscribe();

Consistent Hash Auto Split 验证

为了让大家更清晰地认识 Key-Shared 的分发过程,我们下面使用默认配置 Consistent Hash Auto Split 算法进行验证,观察在添加消费者的情况下,系统如何按照 Key 把消息分发给对应 Consumer。测试步骤如下:

1.创建一个 Topic:

bin/pulsar-admin topics create public/default/my-topic

2.打开 2 个客户端 client1 和 client2 分别启动 2 个消费者,使用 Key_Shared 模式订阅该 Topic:

./bin/pulsar-client consume public/default/my-topic -s sss -n 0 -t Key_Shared
./bin/pulsar-client consume public/default/my-topic -s sss -n 0 -t Key_Shared

3.打开 1 个客户端启动生产,向该 Topic 中生产带有 Key 的数据:

./bin/pulsar-client produce public/default/my-topic -m ddd0 -n 1 -k key_0
./bin/pulsar-client produce public/default/my-topic -m ddd1 -n 1 -k key_1
./bin/pulsar-client produce public/default/my-topic -m ddd2 -n 1 -k key_2
./bin/pulsar-client produce public/default/my-topic -m ddd3 -n 1 -k key_3
./bin/pulsar-client produce public/default/my-topic -m ddd4 -n 1 -k key_0
./bin/pulsar-client produce public/default/my-topic -m ddd5 -n 1 -k key_1
./bin/pulsar-client produce public/default/my-topic -m ddd6 -n 1 -k key_2
./bin/pulsar-client produce public/default/my-topic -m ddd7 -n 1 -k key_3
./bin/pulsar-client produce public/default/my-topic -m ddd8 -n 1 -k key_4
./bin/pulsar-client produce public/default/my-topic -m ddd9 -n 1 -k key_5

4.查看 key_0、key_1、key_2、key_3、key_4 和 key_5 对应的 Hash 值(Murmur 算法):

import org.apache.pulsar.common.util.Murmur3_32Hash;

public class Main {
    public static void main(String[] args) {
              for (int i = 0; i < 6; i++) {
                    String key = "key_" + i ;
                System.out.println("key_"+ i + ":" + Murmur3_32Hash.getInstance().makeHash(key.getBytes()));
            }
        }
}

pom.xml 里面加载包:

<dependencies>
     <dependency>
        <groupId>io.streamnative</groupId>
        <artifactId>pulsar-common</artifactId>
        <version>2.10.1</version>
     </dependency>
 </dependencies>

计算出 key_0、key_1、key_2、key_3、key_4 和 key_5 对应的 Hash 值(Murmur 算法)为:

  • • key_0:1357352656

  • • key_1:1707756238

  • • key_2:394882454

  • • key_3:935407534

  • • key_4:5294831

  • • key_5:169972206


  • 5.使用命令查看两个消费者对应的 Hash 范围:

# bin/pulsar-admin topics stats public/default/my-topic

.......
"consumers" : [ {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 0,
        "msgOutCounter" : 0,
        "msgRateRedeliver" : 0.0,
        "messageAckRate" : 0.0,
        "chunkedMessageRate" : 0.0,
        "consumerName" : "a3bb8",
        "availablePermits" : 1000,
        "unackedMessages" : 0,
        "avgMessagesPerEntry" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "readPositionWhenJoining" : "586:1",
        "lastAckedTimestamp" : 0,
        "lastConsumedTimestamp" : 0,
        "keyHashRanges" : [ "[16805448, 20608426]", "[54578741, 68863915]", "[76422233, 83222150]", "[83222151, 90736492]", "[97500499, 100585724]", "[100585725, 102439644]", "[102439645, 114694318]", "[182398317, 206910968]", "[221693427, 232746075]", "[232746076, 233338758]", "[233338759, 238979448]", "[238979449, 240386781]", "[240386782, 247272893]", "[247272894, 263267586]", "[271375720, 281760196]", "[281760197, 291161786]", "[291161787, 305473261]", "[306517642, 310453102]", "[328085730, 338507017]", "[338507018, 436082947]", "[474846641, 477579121]", "[482932255, 490036780]", "[490036781, 507918968]", "[509937104, 524950821]", "[524950822, 528336434]", "[589913378, 594160828]", "[598576884, 604764791]", "[607712290, 621295290]", "[631717125, 634927795]", "[634927796, 645076839]", "[645076840, 656054895]", "[722815169, 733545319]", "[736795470, 737213856]", "[741321094, 745486320]", "[764529763, 767446137]", "[784167976, 801470754]", "[884232847, 892183756]", "[892183757, 894423554]", "[903702501, 906217526]", "[909588124, 927780491]", "[927780492, 946932619]", "[946932620, 959038283]", "[959038284, 962063284]", "[962063285, 965373565]", "[1009865111, 1010173971]", "[1065057304, 1067584345]", "[1086484200, 1087166187]", "[1128480462, 1129578045]", "[1129578046, 1132239227]", "[1133854628, 1137222770]", "[1201258182, 1216803372]", "[1219797465, 1220220885]", "[1221922169, 1237991301]", "[1241339185, 1242646663]", "[1271179139, 1271269943]", "[1275618291, 1301472903]", "[1301472904, 1311167619]", "[1311167620, 1329117505]", "[1329117506, 1351442734]", "[1351535319, 1354000380]", "[1354000381, 1376914746]", "[1389261062, 1413213686]", "[1413213687, 1440590348]", "[1440590349, 1450699235]", "[1450699236, 1469155482]", "[1469155483, 1481376964]", "[1485685477, 1500783951]", "[1504594740, 1512173150]", "[1583564716, 1610151271]", "[1610190246, 1622368305]", "[1622368306, 1622943623]", "[1622943624, 1659089827]", "[1659089828, 1660944671]", "[1660944672, 1668947249]", "[1668947250, 1672036533]", "[1678263678, 1709492430]", "[1709492431, 1725691538]", "[1725691539, 1726701807]", "[1726701808, 1753389657]", "[1753389658, 1760835861]", "[1760835862, 1766682501]", "[1766682502, 1769589667]", "[1787898035, 1809779198]", "[1841542849, 1856412381]", "[1886169528, 1888229422]", "[1888229423, 1888903969]", "[1888903970, 1921529385]", "[1921529386, 1925603432]", "[1944637891, 1965635723]", "[1977424056, 1979304212]", "[2027849881, 2034155881]", "[2035986786, 2043489420]", "[2043489421, 2071950871]", "[2082274179, 2093086998]", "[2097973463, 2117087664]", "[2117556125, 2120110138]", "[2129238253, 2130127992]", "[2130127993, 2131414593]", "[2131414594, 2145350685]", "[2145350686, 2146458985]" ],
        "metadata" : { },
        "connectedSince" : "2022-11-09T06:10:35.748111Z",
        "address" : "/172.17.0.30:54510"
      }, {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 0,
        "msgOutCounter" : 0,
        "msgRateRedeliver" : 0.0,
        "messageAckRate" : 0.0,
        "chunkedMessageRate" : 0.0,
        "consumerName" : "175ec",
        "availablePermits" : 1000,
        "unackedMessages" : 0,
        "avgMessagesPerEntry" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "readPositionWhenJoining" : "586:1",
        "lastAckedTimestamp" : 0,
        "lastConsumedTimestamp" : 0,
        "keyHashRanges" : [ "[0, 1335068]", "[1335069, 12537484]", "[12537485, 16805447]", "[20608427, 28786042]", "[28786043, 51185877]", "[51185878, 54578740]", "[68863916, 76422232]", "[90736493, 94686111]", "[94686112, 97500498]", "[114694319, 123459322]", "[123459323, 182398316]", "[206910969, 221693426]", "[263267587, 271375719]", "[305473262, 306517641]", "[310453103, 328085729]", "[436082948, 440064984]", "[440064985, 458719770]", "[458719771, 470272568]", "[470272569, 474846640]", "[477579122, 482932254]", "[507918969, 509937103]", "[528336435, 553116107]", "[553116108, 554251820]", "[554251821, 561250902]", "[561250903, 589913377]", "[594160829, 597736591]", "[597736592, 598576883]", "[604764792, 607712289]", "[621295291, 631669816]", "[631669817, 631717124]", "[656054896, 657760136]", "[657760137, 660799921]", "[660799922, 671440027]", "[671440028, 696953898]", "[696953899, 722815168]", "[733545320, 736795469]", "[737213857, 741321093]", "[745486321, 748154183]", "[748154184, 764529762]", "[767446138, 783093176]", "[783093177, 784167975]", "[801470755, 805894094]", "[805894095, 814818374]", "[814818375, 823510106]", "[823510107, 831642901]", "[831642902, 852288782]", "[852288783, 884232846]", "[894423555, 903702500]", "[906217527, 909588123]", "[965373566, 1009151638]", "[1009151639, 1009865110]", "[1010173972, 1063308791]", "[1063308792, 1065057303]", "[1067584346, 1086484199]", "[1087166188, 1121182174]", "[1121182175, 1126621310]", "[1126621311, 1128480461]", "[1132239228, 1133854627]", "[1137222771, 1163436250]", "[1163436251, 1201258181]", "[1216803373, 1219797464]", "[1220220886, 1221922168]", "[1237991302, 1241339184]", "[1242646664, 1243813658]", "[1243813659, 1258396999]", "[1258397000, 1271179138]", "[1271269944, 1271443249]", "[1271443250, 1275618290]", "[1351442735, 1351535318]", "[1376914747, 1378694645]", "[1378694646, 1389261061]", "[1481376965, 1483496275]", "[1483496276, 1485685476]", "[1500783952, 1504594739]", "[1512173151, 1518023271]", "[1518023272, 1539718671]", "[1539718672, 1555002271]", "[1555002272, 1567668899]", "[1567668900, 1583564715]", "[1610151272, 1610190245]", "[1672036534, 1677645988]", "[1677645989, 1678263677]", "[1769589668, 1783633752]", "[1783633753, 1786718002]", "[1786718003, 1787898034]", "[1809779199, 1816679446]", "[1816679447, 1832412623]", "[1832412624, 1841542848]", "[1856412382, 1857772684]", "[1857772685, 1886169527]", "[1925603433, 1944637890]", "[1965635724, 1975839669]", "[1975839670, 1977424055]", "[1979304213, 2027849880]", "[2034155882, 2035986785]", "[2071950872, 2076021421]", "[2076021422, 2082274178]", "[2093086999, 2097973462]", "[2117087665, 2117556124]", "[2120110139, 2129238252]" ],

6.根据上面的 Hash 值和查询到 Topic 里对应的 Hash 范围,预期对应 Key 的消息会按照 Hash 值落入对应的 Hash 范围,从而分配给两个 Consumer:

  • • key_0 的 hash 值为“1357352656”,命中消费者 1 的 Hash 范围"[1354000381, 1376914746]";

  • • key_1 的 hash 值为“1707756238”,命中消费者 1 的 Hash 范围"[1678263678, 1709492430]”;

  • • key_2 的 hash 值为“394882454”,命中消费者 1 的 Hash 范围"[338507018, 436082947]";

  • • key_3 的 hash 值为“935407534”,命中消费者 1 的 Hash 范围"[927780492, 946932619]";

  • • key_4 的 hash 值为“5294831,命中消费者 2 的 Hash 范围"[51185878, 54578740]”;

  • • key_5 的 hash 值为“169972206",命中消费者 2 的 Hash 范围“[123459323, 182398316]"。


  • 7.实际测试结果如下图。测试结果与预期一致,其中消费者 1 消费了 key_0、key_1、key_2 和 key_3:

消费者 2 消费了 key_4 和 key_5:

以上测试结果符合理论预期。

Q&A

Q: 在 Key_Shared 模式下,某条消息的业务逻辑处理失败怎么处理?
A: 业务逻辑处理失败时,发送 NACK 就会按照原来的 Key_Shared 模式发送到之前的消费端。但是这样无法保证顺序消费,如果要严格保证消费顺序,业务侧只能反复重试。

Q: 在 Key_Shared 模式下,新加入的 Consumer 在 Auto Split 的时候能保证顺序消费吗?
A:在 Auto Split 模式下,新加入 Consumer 会等待当前消费者消费完缓存内的消息,再进行重新分配,分担原来 Hash 范围,相同 Key 的数据可能落到不同的 Hash 范围内,即不同的消费者上,所以新数据依然能保证顺序消费。

Q: 在 Key_Shared 模式下,扩容 Partition 能否保证顺序?
A: 扩容 Partition 后,Key 与 Partition 的关系发生了变化,扩容过程消息的顺序无法保障,后续的数据保持有序。

Q: 在 Key_Shared 模式下,messageListener 如何消费消息,能否保证顺序?
A: MessageListener 和 receive 本质上并没有差别,主要在于控制粒度,receive 更灵活,messageListener 代码更简洁。以上 Key_Shared 规则同样适用,messageListener 最终由 listenerThreads 线程池执行,一个 Key Range 最终会绑定到一个线程上,因此能够保证顺序。

参考链接

  • • https://pulsar.apache.org/docs/next/concepts-messaging#subscriptions

  • • https://github.com/apache/pulsar/wiki/PIP-34%3A-Add-new-subscribe-type-Key_shared

  • • https://www.bilibili.com/video/BV1ta411o79L/

引用链接

[1] 模式: https://pulsar.apache.org/docs/2.10.x/concepts-messaging/#subscription-types


点击「阅读原文」,查看 Apache Pulsar 资料精选!

本文分享自微信公众号 - StreamNative(StreamNative)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部