一文带你了解 Pulsar Broker 负载均衡

2022/12/20 21:00
阅读数 155

作者简介

赵亚军,StreamNative Solution Engineer。

导语

Apache Pulsar 采用了计算和存储分离的分布式架构,其友好的弹性伸缩能力颇受广大用户欢迎。Pulsar 除了存储层的分片架构规避了历史数据平衡的缺陷,计算层 Broker 的负载均衡模块也在弹性伸缩过程以及资源调节中发挥着重要的作用。本文将带大家揭开 Pulsar Broker 负载均衡的神秘面纱。

原理介绍

负载均衡对于一个分布式系统而言是不可或缺的重要模块,Apache Pulsar 实现了 Broker 无需手工干预的动态负载均衡,得益于 Pulsar 在设计之初从架构上分离了存储层和计算层,可以更加灵活地分配 Broker 与 Topic 的映射关系。Pulsar Broker 节点本身并不会存储任何状态信息,其消息持久化保存到 BookKeeper 上,而 Broker 之间用于协调状态信息和 Topic 相关的元数据信息存储在 Metadata Store 中,这些特性让 Pulsar Broker 成为无状态服务。当多个 Broker 之间进行负载均衡时,无需将 Topic 中的消息从一个 Broker 复制到另一个 Broker。

对于分布式消息队列而言,当某个 Broker 节点出现高负载并且整体负载分布不均时能够进行负载平衡,进行负载均衡的最小单位一般是 Topic/Partition。Pulsar 也采用了这样的思路,不过若想支持百万级 Topic,如果按照单个 Topic/Partition 来调度,势必会为负载均衡带来非常大的计算压力,Pulsar 采用了聚合分组(Bundle)的管理方式,负载均衡的最小单位设计为 Bundle,多个 Topic/Partition 分配到同一个 Bundle 内,减少了用于负载计算的数据规模,从而提升了支持 Topic 的数量级。

Pulsar 提供了“开箱即用”的负载管理功能。Broker 中的负载均衡由org.apache.pulsar.broker.loadbalance模块实现,当 Broker 出现负载超载,Loadbalance 触发均衡流程,通过智能策略进行自动 unload bundle,整体的处理过程如下:

1.LoadManager 模块依据负载指标、负载策略和触发门限来获取符合要求的 Broker。
2.Loadbalance 模块遍历该 Broker 内的所有 Bundle,从而获得被卸载的 Bundle 列表。
3.根据 Bundle 名称得到对应的 Namespace 名称。
4.使用管理 API 卸载 Bundle。

限于篇幅本文不打算介绍 Bundle 与 Topic/Partition 的映射关系,只介绍基于负载的自动 unload bundle,主要包括在 Broker 负载均衡中用到的衡量指标、负载均衡策略、Bundle 卸载流程等,最后进行了验证测试。

负载衡量指标

触发 loadbalance 的前提是集群中 Broker 节点出现负载超过阀值的情况,包括 CPU 使用率、堆内/堆外内存使用率、Broker 的带宽 IN/带宽 OUT、Broker 上 Bundle 的负载等。

负载信息

Broker 实时负载信息

Broker 的负载信息在 ZooKeeper 中保存,路径为/loadbalance/brokers/[broker host:port],保存的 Broker 实时负载数据样例如下:

{

//以下为broker的链接信息
"webServiceUrl":"<http://localhost:8080>",
"pulsarServiceUrl":"pulsar://localhost:6650",
"persistentTopicsEnabled":true,
"nonPersistentTopicsEnabled":true,

//以下为Broker最新的系统资源使用量
"cpu":{
       "usage":0.0,
       "limit":0.0
       },
"memory":{
      "usage":0.0,
      "limit":0.0
       },
"directMemory":{
     "usage":0.0,
     "limit":0.0
       },
"bandwidthIn":{
     "usage":0.0,
     "limit":0.0
       },
"bandwidthOut":{
     "usage":0.0,
     "limit":0.0
       },


//以下为Broker上所有Bundle用量的统计
"msgThroughputIn":0.0,
"msgThroughputOut":0.0,
"msgRateIn":0.0,
"msgRateOut":0.0,
"lastUpdate":1668497351981,

//以下为Broker上的详细信息
"lastStats":{},
"numTopics":0,
"numBundles":0,
"numConsumers":0,
"numProducers":0,

//以下为其他信息
"Bundles":[],
"lastBundleGains":[],
"lastBundleLosses":[],
"Protocols":{},
"advertisedListeners":{},
"bundleStats":{},
"maxResourceUsage":0.0,
"loadReportType":"LocalBrokerData"
}

Broker 平均负载信息

Broker 的平均负载信息在 ZooKeeper 中保存,路径为/loadbalance/broker-time-average/[broker host:port],保存的 Broker 平均负载数据样例如下:

{
    "shortTermMsgThroughputIn": 4.866669527654777,
    "shortTermMsgThroughputOut": 5.533336591887468,
    "shortTermMsgRateIn": 0.06666670585828466,
    "shortTermMsgRateOut": 0.06666670592635504,
    "longTermMsgThroughputIn": 4.865849676427839,
    "longTermMsgThroughputOut": 5.532400443487092,
    "longTermMsgRateIn": 0.06665547501955964,
    "longTermMsgRateOut": 0.06665542702996446
}

Bundle 负载信息

Bundle 的历史负载信息在 ZooKeeper 中保存,路径为/loadbalance/bundle-data/[tenant]/[namespace]/[bundle],保存的 Bundle 负载数据样例如下:

{
  "shortTermData" : {
    "maxSamples" : 10,
    "numSamples" : 6,
    "msgThroughputIn" : 0.0,
    "msgThroughputOut" : 8.911721910539358,
    "msgRateIn" : 0.0,
    "msgRateOut" : 0.15913789125963138
  },
  "longTermData" : {
    "maxSamples" : 1000,
    "numSamples" : 6,
    "msgThroughputIn" : 0.0,
    "msgThroughputOut" : 8.911721910539358,
    "msgRateIn" : 0.0,
    "msgRateOut" : 0.15913789125963138
  },
  "topics" : 1
}

负载上报

Broker 实时负载信息上报

Pulsar 有专门的定时调度线程池 loadManagerExecutor,用于每个 Broker 按照loadBalancerHostUsageCheckIntervalMinutes的频率收集本地 Broker 的负载信息,不断刷新本地缓存并上报当前 Broker 的负载信息。每次执行时,先刷新本地负载信息的 Cache,当有负载变化比例超过loadBalancerReportUpdateThresholdPercentage配置值,或距离上次更新大于loadBalancerReportUpdateMaxIntervalMinutes配置值时,触发负载信息上报到 ZooKeeper。

具体代码如下(ModularLoadManagerImpl.java 中):

// Determine if the broker data requires an update by delegating to the update condition.
private boolean needBrokerDataUpdate() {
   final long updateMaxIntervalMillis = TimeUnit.MINUTES       .toMillis(conf.getLoadBalancerReportUpdateMaxIntervalMinutes());
   ……
   if (timeSinceLastReportWrittenToStore > updateMaxIntervalMillis) {
       log.info("Writing local data to metadata store because time since last" + " update exceeded threshold of {} minutes",               conf.getLoadBalancerReportUpdateMaxIntervalMinutes());
       // Always update after surpassing the maximum interval.
       return true;
   }
   ……
   if (maxChange > conf.getLoadBalancerReportUpdateThresholdPercentage()) {
       log.info("Writing local data to metadata store because maximum change {}% exceeded threshold {}%; " + "time since last report written is {} seconds", maxChange,
conf.getLoadBalancerReportUpdateThresholdPercentage(),timeSinceLastReportWrittenToStore / 1000.0);
       return true;
   }
   return false;
}

以上参数配置如下(本篇所有配置均在broker.conf中,下同):

# Percentage of change to trigger load report update
loadBalancerReportUpdateThresholdPercentage=10

# maximum interval to update load report
loadBalancerReportUpdateMaxIntervalMinutes=15

# Frequency of report to collect
loadBalancerHostUsageCheckIntervalMinutes=1

Broker 平均负载信息上报

Leader Broker 会通过 ZooKeeper 的 Watcher 感知到每个 Broker 的实时数据更新,然后从 ZooKeeper 中读取并保存在 Leader 的缓存中,接着通过定时任务在内存中为每个 Broker 计算短期和长期的历史负载数据(TimeAverageBrokerData),最后更新到 ZooKeeper 中。其中短期负载信息应用于被动的快速决策,长期负载信息应用于稳定状态下的决策。短期和长期的计算方法见官方介绍[1],上报的频率见如下配置:

# Interval to flush dynamic resource quota to ZooKeeper
loadBalancerResourceQuotaUpdateIntervalMinutes=15

上报频率的代码在 PulsarService 类的 startLeaderElectionService()方法中。
上报动作的代码在 ModularLoadManagerImpl 类的 writeBundleDataOnZooKeeper()方法中。

Bundle 历史负载信息上报

Bundle 的历史负载信息(TimeAverageMessageData)获取、计算和上报频率与上面 Broker 的一致。

均衡策略

均衡策略是 Broker 依据负载信息判定哪个 Broker 上哪些 Bundle 需要被卸载的方法,通过对 Bundle 的卸载达到整个集群的整体负载更加均衡。当前 Pulsar Broker 支持三种负载策略,分别是 OverloadShedder、ThresholdShedder 和 UniformLoadShedder。下面分别进行介绍。
涉及负载均衡的全局参数配置如下(在broker.conf中):

# Enable load balancer
loadBalancerEnabled=true

# Enable/disable automatic bundle unloading for load-shedding
loadBalancerSheddingEnabled=true

# Load shedding interval. Broker periodically checks whether some traffic should be offload from
# some over-loaded broker to other under-loaded brokers
loadBalancerSheddingIntervalMinutes=1

# Prevent the same topics to be shed and moved to other broker more than once within this timeframe
loadBalancerSheddingGracePeriodMinutes=30

OverloadShedder

OverloadShedder 策略是根据单个 Broker 的负载使用率来判断是否超负荷,这种策略的目的是给高负载的 Broker 降低负载。 

原理:

当某个 Broker 的 CPU、网络带宽 IN/OUT、内存和 Direct 内存中任何一个指标的当前负载使用率超过了loadBalancerBrokerOverloadedThresholdPercentage设置的比例,将在该 Broker 上卸载一个 Bundle。

被卸载的最小吞吐量 =(当前负载使用率 - loadBalancerBrokerOverloadedThresholdPercentage / 100.0 + 5%)*(该 Broker 当前吞吐量 IN + 该 Broker 当前吞吐量 OUT)。

遍历该 Broker 上所有 Bundle,获取被卸载的 Bundle 列表,只有满足如下条件的 Bundle 才会被卸载:

  • • 当前 Broker 上至少有两个 Bundle,并且至少有一个 Bundle 在loadBalancerSheddingGracePeriodMinutes设置的最小时间内没有被卸载过。这个参数防止在这段时间内某个 Bundle 被多次卸载并转移到其他 Broker 上。

  • • 被卸载的 Bundle 是消息速率最大的且最近未被卸载过。 实现代码在 OverloadShedder 类的findBundlesForUnloading()方法中。在 Bundle 的选择过程中要对 Bundle 按照 shortTermData 中 msgThroughputIn 和 msgThroughputOut 之和从大到小排序,卸载前面 N 个最大的 Bundle 吞吐,直到满足被卸载的最小吞吐量。

在这个策略中,只有在某个 Broker 超负载时才会卸载 Bundle,总体上确保 Broker 少卸载 Bundle,从而保障系统的平稳运行。如果在生产环境使用该策略,需要观察多个节点负载是否均匀分布,如果均匀分布,则会出现多个节点几乎同时到达门限而导致负载反复被调整的情况。

配置:

在 Pulsar 2.10.0 版本之前,默认策略为 OverloadShedder。

# load shedding strategy, support OverloadShedder and ThresholdShedder, default is ThresholdShedder since 2.10.0
loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.OverloadShedder

在该策略下的门限配置:

# Usage threshold to determine a broker as over-loaded
loadBalancerBrokerOverloadedThresholdPercentage=85

ThresholdShedder

ThresholdShedder 策略是根据 Broker 的负载与所有Broker的平均负载 + 门限 比较来判断是否超负荷。这种策略的目的是让集群中的每个 Broker 的负载达到最平衡的状态。

原理:

  1. 1. 根据每个 Broker 的 LoadData 获取集群 Broker 的平均负载,计算方式为: 集群 Broker 的平均负载 = 每个集群的负载求和/Broker 数 其中,每个 Broker 的负载计算如下: 每个 Broker 的负载 = (历史资源使用率 * 历史资源使用率占比 loadBalancerHistoryResourcePercentage + 当前资源使用率 * (1 - 历史资源使用率占比 loadBalancerHistoryResourcePercentage) 当前资源使用率取各个资源里面的最大值,计算如下: Broker 的当前资源使用率 = Max(CPU * CPU 权重,Heap 内存 * Heap 内存权重,Direct 内存 * Direct 内存权重,BandwithIn * BandwithIn 权重,BandwithOut * BandwithOut 权重) 被卸载的最小吞吐量 =(当前负载使用率 - loadBalancerBrokerThresholdShedderPercentage/ 100.0 + 5%)*(该 Broker 当前吞吐量 IN + 该 Broker 当前吞吐量 OUT)。

  2. 2. 遍历所有 Broker,如果 Broker 的资源使用率 > (集群 Broker 的平均负载+负载门限 loadBalancerBrokerThresholdShedderPercentage),则该 Broker 被加入卸载备选列表中。

  3. 3. 遍历备选 Broker 中所有的 Bundle,获取被卸载的 Bundle 列表,当满足以下条件时才会被卸载,且一次只卸载一个 Bundle。

  • • 当前需要卸载的吞吐率大于最小的 bundle 吞吐率门限 loadBalancerBundleUnloadMinThroughputThreshold,默认超过 10MB 才能触发卸载,防止小流量的 Bundle 被反复卸载。

  • • 当前 Broker 上至少有两个 Bundle,且当前 Bundle 最近没有被卸载过。

实现代码在 ThresholdShedder 类的 findBundlesForUnloading()方法中,在 Bundle 的选择过程中要对 Bundle 按照 shortTermData 中 msgThroughputIn 和 msgThroughputOut 之和从大到小排序,卸载前面 N 个最大的 Bundle 吞吐,直到满足被卸载的最小吞吐量。

配置:

从 Pulsar 2.10.0 版本开始,默认策略为 ThresholdShedder。配置如下:

# load shedding strategy, support OverloadShedder and ThresholdShedder, default is ThresholdShedder since 2.10.0
loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder

如上所述,在该策略下,满足 Broker 的资源使用超过所有 Broker 平均资源的使用率+门限的条件时会触发 Bundle 的卸载,门限的配置如下:

# The broker resource usage threshold.
# When the broker resource usage is greater than the pulsar cluster average resource usage,
# the threshold shedder will be triggered to offload bundles from the broker.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerBrokerThresholdShedderPercentage=10

其他几个影响计算的参数分别为:历史资源使用率的占比、带宽 IN 的计算权重、带宽 OUT 的计算权重、CPU 使用计算权重、Heap 内存使用计算权重和 Direct 内存使用计算权重,最后一个参数是为了避免 Bundle 被反复卸载增加的 Bundle 最小吞吐门限。

# When calculating new resource usage, the history usage accounts for.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerHistoryResourcePercentage=0.9

# The BandWithIn usage weight when calculating new resource usage.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerBandwithInResourceWeight=1.0

# The BandWithOut usage weight when calculating new resource usage.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerBandwithOutResourceWeight=1.0

# The CPU usage weight when calculating new resource usage.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerCPUResourceWeight=1.0

# The heap memory usage weight when calculating new resource usage.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerMemoryResourceWeight=1.0

# The direct memory usage weight when calculating new resource usage.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerDirectMemoryResourceWeight=1.0

# Bundle unload minimum throughput threshold (MB), avoiding bundle unload frequently.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerBundleUnloadMinThroughputThreshold=10

UniformLoadShedder

UniformLoadShedder 策略是根据 msgRate 最高的 Broker 和 msgRate 最低的 Broker 之间的比率,最大吞吐量与最小吞吐量之比,与配置的门限进行对比来判断是否超负荷。该策略倾向于在所有 Broker 之间均匀分配负载。该策略只关注 msgRate 和吞吐量指标。

原理:

该策略基本实现思路如下:

  1. 1. 找出需要卸载 Bundle 的 Broker。 检查 msgRate 最高的 Broker 和 msgRate 最低的 Broker 之间的比率和最大吞吐量与最小吞吐量之比。如果 msgRate 的比率高于配置的阈值 loadBalancerMsgRateDifferenceShedderThreshold 或吞吐量之间的比例高于配置的 loadBalancerMsgThroughputMultiplierDifferenceShedderThreshold,则需要对 Bundle 进行卸载。满足上述条件的最大 msgRate 或者最大吞吐量的 Broker 为需要卸载 Bundle 的 Broker。

msgRate 比率和吞吐量比例的计算方法:
msgRate 比率=(最大 msgRate - 最小 msgRate)/ 最小 msgRate * 100
吞吐量比例=最大吞吐量/ 最小吞吐量
其中:
msgRate=MsgRateIn() + MsgRateOut();
throughput = MsgThroughputIn()+MsgThroughputOut();

实现代码在UniformLoadShedder.findBundlesForUnloading()中,部分代码如下:

double msgRateDifferencePercentage = ((maxMsgRate.getValue() - minMsgRate.getValue()) * 100)/ (minMsgRate.getValue());

double msgThroughputDifferenceRate = maxThroughputRate.getValue() / minThroughputRate.getValue();

// if the threshold matches then find out how much load needs to be unloaded by considering number of msgRate and throughput.

boolean isMsgRateThresholdExceeded = conf.getLoadBalancerMsgRateDifferenceShedderThreshold() > 0 && msgRateDifferencePercentage > conf.getLoadBalancerMsgRateDifferenceShedderThreshold();

boolean isMsgThroughputThresholdExceeded = conf.getLoadBalancerMsgThroughputMultiplierDifferenceShedderThreshold() > 0 && msgThroughputDifferenceRate > conf.getLoadBalancerMsgThroughputMultiplierDifferenceShedderThreshold();

if (isMsgRateThresholdExceeded || isMsgThroughputThresholdExceeded) {
   if (log.isDebugEnabled()) {
log.debug( "Found bundles for uniform load balancing. "
+ "overloaded broker {} with (msgRate,throughput)= ({},{}) "
+ "and underloaded broker {} with (msgRate,throughput)= ({},{})",
overloadedBroker.getValue(), maxMsgRate.getValue(), maxThroughputRate.getValue(),
underloadedBroker.getValue(), minMsgRate.getValue(), minThroughputRate.getValue());
   }
  1. 1. 找出需要卸载的 Bundle,在 Bundle 的选择过程中要对 Bundle 按照 shortTermData 中 msgRate 和吞吐量进行从大到小排序。如果 msgRate 超过门限,按照 msgRateIn 和 msgRateOut 之和进行排序;如果吞吐量超过门限,按照 ThroughputIn 和 msgThroughputOut 之和进行排序,卸载前面 N 个最大的 Bundle 的 msgRate 或吞吐量,直到满足被卸载的最小 msgRate 或吞吐量。当满足以下条件时该 Bundle 将被卸载:

  • • 当前 Broker 上至少有两个 Bundle,且当前 bundle 最近没有被卸载过。

  • • msgRate 最小卸载量为=(maxMsgRate - minMsgRate) * 0.2,需要大于 1000 条,或者吞吐量最小卸载量=(maxThroughputRate - minThroughputRate)*0.2 ,需要大于 1MB。其中 1000 条和 1MB 当前在代码中设置。

配置:

要使用该 UniformLoadShedder 策略,需在调度策略中配置如下:

loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.UniformLoadShedder

门限配置如下:

  1. 1. Broker 的最大消息吞吐率大于 Broker 的最小消息吞吐率的比率((Max-Min)/Min)超过比率门限 loadBalancerMsgRateDifferenceShedderThreshold。

# Message-rate percentage threshold between highest and least loaded brokers for
# uniform load shedding. (eg: broker1 with 50K msgRate and broker2 with 30K msgRate
# will have 66% msgRate difference and load balancer can unload bundles from broker-1
# to broker-2)
loadBalancerMsgRateDifferenceShedderThreshold=50
  1. 1. Broker 的最大消息吞吐率和 Broker 的最小吞吐率的倍数(Max/Min)超过倍数门限 loadBalancerMsgThroughputMultiplierDifferenceShedderThreshold。

# Message-throughput threshold between highest and least loaded brokers for
# uniform load shedding. (eg: broker1 with 450MB msgRate and broker2 with 100MB msgRate
# will have 4.5 times msgThroughout difference and load balancer can unload bundles
# from broker-1 to broker-2)
loadBalancerMsgThroughputMultiplierDifferenceShedderThreshold=4

Bundle 卸载

从以上步骤中得到被卸载的 Bundle 列表后,很容易根据 Bundle 的名称得到对应的 Namespace 名称,因为 Bundle 的名称中含有 Namespace 信息,如:property/public/default/0x00000000_0xFFFFFFFF。 最后使用管理流 REST API 卸载 Bundle,从而完成了一次完整的负载均衡过程。

部分实现代码(ModularLoadManagerImpl.java)如下:

bundlesToUnload.asMap().forEach((broker, bundles) -> {
   bundles.forEach(bundle -> {
       final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);  //get namespace name from bundle.
       final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
       if (!shouldAntiAffinityNamespaceUnload(namespaceName, bundleRange, broker)) {
           return;
       }

       log.info("[{}] Unloading bundle: {} from broker {}",strategy.getClass().getSimpleName(), bundle, broker);
       try {
           pulsar.getAdminClient().namespaces().unloadNamespaceBundle(namespaceName, bundleRange);       //unload bundle using REST API.
           loadData.getRecentlyUnloadedBundles().put(bundle, System.currentTimeMillis());
       } catch (PulsarServerException | PulsarAdminException e) {
           log.warn("Error when trying to perform load shedding on {} for broker {}", bundle, broker, e);
       }
   });

实测

为了更容易理解前两种负载均衡在 Broker 中的实现过程,下面分别进行测试。

OverloadShedder

测试环境:

K8s 部署,3 个 Broker 节点,在 Toolset 中通过 Pulsar-perf 发起 10000 条/s 的生产消息任务。

配置(broker.conf):

loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.OverloadShedder
loadBalancerBrokerOverloadedThresholdPercentage=10        #配置较低是为了更容易触发均衡。

日志:

  1. 1. 从日志中可以看出 Broker0、Broker1 和 Broker2 的 Memory 负载分别为 49.24%、69.15%和 73.88%,都超过了负载门限配置 10%,全部满足卸载的条件;

  2. 2. 再对 Broker 上的 Bundle 进行一一判断,从日志上看,Broker0 和 Broker2 都只有一个 Bundle,最后满足条件的 Bundle 为 Broker1 上的 public/default/0x20000000_0x30000000,把该 Bundle 从 Broker1 上卸载到其他 Broker 上。 详细日志如下:

2022-11-18T10:44:22,366+0000 [pulsar-load-manager-1-1] INFO  org.apache.pulsar.broker.loadbalance.impl.OverloadShedder - Attempting to shed load on test1-sn-platform-broker-2.test1-sn-platform-broker-headless.pulsar1.svc.cluster.local:8080, which has resource usage 73.88626337051392% above threshold 10.0% -- Offloading at least 8.293027558988957 MByte/s of traffic (cpu: 0.75%, memory: 73.89%, directMemory: 4.69%, bandwidthIn: 0.00%, bandwidthOut: 0.00%)
2022-11-18T10:44:22,366+0000 [pulsar-load-manager-1-1] WARN  org.apache.pulsar.broker.loadbalance.impl.OverloadShedder - HIGH USAGE WARNING : Sole namespace bundle public/default/0xc0000000_0xd0000000 is overloading broker test1-sn-platform-broker-2.test1-sn-platform-broker-headless.pulsar1.svc.cluster.local:8080. No Load Shedding will be done on this broker
2022-11-18T10:44:22,366+0000 [pulsar-load-manager-1-1] INFO  org.apache.pulsar.broker.loadbalance.impl.OverloadShedder - Attempting to shed load on test1-sn-platform-broker-1.test1-sn-platform-broker-headless.pulsar1.svc.cluster.local:8080, which has resource usage 69.15156841278076% above threshold 10.0% -- Offloading at least 15.313356030795111 MByte/s of traffic (cpu: 5.20%, memory: 69.15%, directMemory: 3.13%, bandwidthIn: 0.00%, bandwidthOut: 0.00%)
2022-11-18T10:44:22,366+0000 [pulsar-load-manager-1-1] INFO  org.apache.pulsar.broker.loadbalance.impl.OverloadShedder - Attempting to shed load on test1-sn-platform-broker-0.test1-sn-platform-broker-headless.pulsar1.svc.cluster.local:8080, which has resource usage 49.24582540988922% above threshold 10.0% -- Offloading at least 0.0 MByte/s of traffic (cpu: 0.69%, memory: 49.25%, directMemory: 3.13%, bandwidthIn: 0.00%, bandwidthOut: 0.00%)
2022-11-18T10:44:22,366+0000 [pulsar-load-manager-1-1] WARN  org.apache.pulsar.broker.loadbalance.impl.OverloadShedder - HIGH USAGE WARNING : Sole namespace bundle my-tenant1/my-ns1/0x80000000_0xc0000000 is overloading broker test1-sn-platform-broker-0.test1-sn-platform-broker-headless.pulsar1.svc.cluster.local:8080. No Load Shedding will be done on this broker

2022-11-18T10:44:22,366+0000 [pulsar-load-manager-1-1] INFO  org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl - [OverloadShedder] Unloading bundle: public/default/0x20000000_0x30000000 from broker test1-sn-platform-broker-1.test1-sn-platform-broker-headless.pulsar1.svc.cluster.local:8080
2022-11-18T10:44:22,368+0000 [pulsar-web-37-4] INFO  org.apache.pulsar.broker.admin.impl.NamespacesBase - [null] Unloading namespace bundle public/default/0x20000000_0x30000000
2022-11-18T10:44:22,370+0000 [pulsar-web-37-4] INFO  org.eclipse.jetty.server.RequestLog - 172.17.0.49 - - [18/Nov/2022:10:44:22 +0000] "PUT /admin/v2/namespaces/public/default/0x20000000_0x30000000/unload HTTP/1.1" 307 0 "-" "Pulsar-Java-v2.9.3.7" 2

从以上日志打印可以看出首先是三个 Broker 的负载超过了配置的门限值,然后选择满足条件的 Bundle,最后卸载该 Bundle。

监控:

从下图可以看到,在箭头指示时刻发生了 Bundle 从 Broker1 卸载。

ThresholdShedder

测试环境: K8s 部署,3 个 Broker 节点,在 toolset 中通过 pulsar-perf 发起 10000 条/s 的生产消息任务。

配置(broker.conf):

loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder
loadBalancerBrokerThresholdShedderPercentage=3    #为了更容易触发
loadBalancerBundleUnloadMinThroughputThreshold=0.1    #为了更容易触发
loadBalancerHistoryResourcePercentage=0     #为了简化计算

日志:

  1. 1. 从日志中可以看出 Broker0、Broker1 和 Broker2 的当前负载分别为 38%、51%和 36%,平均为 42%,触发门限配置为 3%,3 个 Broker 当前使用率-平均使用率 分别为:-4%、9%和-6%,所以只有 Broker1 超过了负载门限配置 3%,满足卸载的条件;

  2. 2. 卸载的最小数据量=(当前资源的吞吐 IN+当前资源的吞吐 OUT)*(Broker 的当前使用率-平均使用率-门限+ 5%的冗余),日志显示为 0.1143MB/s。

  3. 3. 最后满足卸载条件的 Bundle 是 Broker1 上的 public/default/0xd0000000_0xe0000000。

2022-11-19T15:35:01,971+0000 [pulsar-load-manager-1-1] INFO  org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder - test1-sn-platform-broker-2.test1-sn-platform-broker-headless.pulsar1.svc.cluster.local:8080 broker load: historyUsage=56%, resourceUsage=36%
2022-11-19T15:35:01,971+0000 [pulsar-load-manager-1-1] INFO  org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder - test1-sn-platform-broker-1.test1-sn-platform-broker-headless.pulsar1.svc.cluster.local:8080 broker load: historyUsage=50%, resourceUsage=51%
2022-11-19T15:35:01,971+0000 [pulsar-load-manager-1-1] INFO  org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder - test1-sn-platform-broker-0.test1-sn-platform-broker-headless.pulsar1.svc.cluster.local:8080 broker load: historyUsage=49%, resourceUsage=38%
2022-11-19T15:35:01,971+0000 [pulsar-load-manager-1-1] INFO  org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder - brokers' resource avgUsage:42%
2022-11-19T15:35:01,971+0000 [pulsar-load-manager-1-1] INFO  org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder - [test1-sn-platform-broker-2.test1-sn-platform-broker-headless.pulsar1.svc.cluster.local:8080] broker is not overloaded, ignoring at this point, currentUsage:36%
2022-11-19T15:35:01,971+0000 [pulsar-load-manager-1-1] INFO  org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder - Attempting to shed load on test1-sn-platform-broker-1.test1-sn-platform-broker-headless.pulsar1.svc.cluster.local:8080, which has max resource usage above avgUsage  and threshold 0.5125840759277344% > 0.4212801106770833% + 0.03% -- Offloading at least 0.11431703679843176 MByte/s of traffic, left throughput 0.9127536209359574 MByte/s
2022-11-19T15:35:01,971+0000 [pulsar-load-manager-1-1] INFO  org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder - [test1-sn-platform-broker-0.test1-sn-platform-broker-headless.pulsar1.svc.cluster.local:8080] broker is not overloaded, ignoring at this point, currentUsage:38%
2022-11-19T15:35:01,971+0000 [pulsar-load-manager-1-1] INFO  org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl - [ThresholdShedder] Unloading bundle: public/default/0xd0000000_0xe0000000 from broker test1-sn-platform-broker-1.test1-sn-platform-broker-headless.pulsar1.svc.cluster.local:8080
2022-11-19T15:35:01,976+0000 [pulsar-web-37-4] INFO  org.apache.pulsar.broker.admin.impl.NamespacesBase - [null] Unloading namespace bundle public/default/0xd0000000_0xe0000000
2022-11-19T15:35:01,980+0000 [pulsar-web-37-4] INFO  org.eclipse.jetty.server.RequestLog - 172.17.0.35 - - [19/Nov/2022:15:35:01 +0000] "PUT /admin/v2/namespaces/public/default/0xd0000000_0xe0000000/unload HTTP/1.1" 307 0 "-" "Pulsar-Java-v2.9.3.7" 4

监控:

从下图可以看出,在箭头指示时刻发生了 Bundle 从 Broker1 卸载。

上面对 OverloadShedder 和 ThresholdShedder 两种负载均衡策略进行了测试,我们人为配置较低的门限便于触发负载均衡,从日志的打印可以看出,pulsar-load-manager 会根据检测到的负载跟配置的负载门限进行对比,从而选择出负载比较高的 Broker 和 Broker 中的 Bundle,最后通过管理流 admin 进行 Bundle 的卸载,达到多个 Broker 之间负载均衡的目的。

总结

本文只是从理论上对 Pulsar Broker 的负载均衡进行了阐述,而且仅限于基于负载均衡的 Bundle 自动卸载,在我们实际生产中,需要多观察实际的资源负载情况,根据实际情况选择合适的均衡策略,对门限进行合理的配置和优化,从而达到负载均衡的最佳效果。

参考:

  1. 1. https://pulsar.apache.org/docs/next/develop-load-manager

  2. 2. https://pulsar.apache.org/docs/next/administration-load-balance

  3. 3. 《深入解析 Apache Pulsar》林琳著,电子工业出版社。

引用链接

[1] 官方介绍: https://pulsar.apache.org/docs/next/develop-load-manager/#bundle-data

本文分享自微信公众号 - StreamNative(StreamNative)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部