文档章节

聊聊Guava的RateLimiter

go4it
 go4it
发布于 2018/08/31 14:56
字数 1336
阅读 80
收藏 1

本文主要研究一下Guava的RateLimiter

RateLimiter

guava-26.0-jre-sources.jar!/com/google/common/util/concurrent/RateLimiter.java

@Beta
@GwtIncompatible
public abstract class RateLimiter {

	//......
 /**
   * Acquires the given number of permits from this {@code RateLimiter}, blocking until the request
   * can be granted. Tells the amount of time slept, if any.
   *
   * @param permits the number of permits to acquire
   * @return time spent sleeping to enforce rate, in seconds; 0.0 if not rate-limited
   * @throws IllegalArgumentException if the requested number of permits is negative or zero
   * @since 16.0 (present in 13.0 with {@code void} return type})
   */
  @CanIgnoreReturnValue
  public double acquire(int permits) {
    long microsToWait = reserve(permits);
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
    return 1.0 * microsToWait / SECONDS.toMicros(1L);
  }

  /**
   * Reserves the given number of permits from this {@code RateLimiter} for future use, returning
   * the number of microseconds until the reservation can be consumed.
   *
   * @return time in microseconds to wait until the resource can be acquired, never negative
   */
  final long reserve(int permits) {
    checkPermits(permits);
    synchronized (mutex()) {
      return reserveAndGetWaitLength(permits, stopwatch.readMicros());
    }
  }

  private static void checkPermits(int permits) {
    checkArgument(permits > 0, "Requested permits (%s) must be positive", permits);
  }

  /**
   * Reserves next ticket and returns the wait time that the caller must wait for.
   *
   * @return the required wait time, never negative
   */
  final long reserveAndGetWaitLength(int permits, long nowMicros) {
    long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
    return max(momentAvailable - nowMicros, 0);
  }

  public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
    long timeoutMicros = max(unit.toMicros(timeout), 0);
    checkPermits(permits);
    long microsToWait;
    synchronized (mutex()) {
      long nowMicros = stopwatch.readMicros();
      if (!canAcquire(nowMicros, timeoutMicros)) {
        return false;
      } else {
        microsToWait = reserveAndGetWaitLength(permits, nowMicros);
      }
    }
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
    return true;
  }

  private boolean canAcquire(long nowMicros, long timeoutMicros) {
    return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
  }

  /**
   * Reserves next ticket and returns the wait time that the caller must wait for.
   *
   * @return the required wait time, never negative
   */
  final long reserveAndGetWaitLength(int permits, long nowMicros) {
    long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
    return max(momentAvailable - nowMicros, 0);
  }

  /**
   * Returns the earliest time that permits are available (with one caveat).
   *
   * @return the time that permits are available, or, if permits are available immediately, an
   *     arbitrary past or present time
   */
  abstract long queryEarliestAvailable(long nowMicros);

  /**
   * Reserves the requested number of permits and returns the time that those permits can be used
   * (with one caveat).
   *
   * @return the time that the permits may be used, or, if the permits may be used immediately, an
   *     arbitrary past or present time
   */
  abstract long reserveEarliestAvailable(int permits, long nowMicros);

  //......
}
  • 这里主要看acquire以及tryAcquire方法
  • acquire主要依赖reserve方法,先调用reserveAndGetWaitLength,最后是调用reserveEarliestAvailable方法
  • tryAcquire也会调用reserveAndGetWaitLength,最后也是调用reserveEarliestAvailable方法
  • reserveEarliestAvailable是抽象方法,由子类去实现

SmoothRateLimiter

guava-26.0-jre-sources.jar!/com/google/common/util/concurrent/SmoothRateLimiter.java

@GwtIncompatible
abstract class SmoothRateLimiter extends RateLimiter {
  //......
  @Override
  final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
    resync(nowMicros);
    long returnValue = nextFreeTicketMicros;
    double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
    double freshPermits = requiredPermits - storedPermitsToSpend;
    long waitMicros =
        storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
            + (long) (freshPermits * stableIntervalMicros);

    this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
    this.storedPermits -= storedPermitsToSpend;
    return returnValue;
  }

  /** Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time. */
  void resync(long nowMicros) {
    // if nextFreeTicket is in the past, resync to now
    if (nowMicros > nextFreeTicketMicros) {
      double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
      storedPermits = min(maxPermits, storedPermits + newPermits);
      nextFreeTicketMicros = nowMicros;
    }
  }

  /**
   * Translates a specified portion of our currently stored permits which we want to spend/acquire,
   * into a throttling time. Conceptually, this evaluates the integral of the underlying function we
   * use, for the range of [(storedPermits - permitsToTake), storedPermits].
   *
   * <p>This always holds: {@code 0 <= permitsToTake <= storedPermits}
   */
  abstract long storedPermitsToWaitTime(double storedPermits, double permitsToTake);

  /**
   * Returns the number of microseconds during cool down that we have to wait to get a new permit.
   */
  abstract double coolDownIntervalMicros();

  //......
}
  • SmoothRateLimiter是RateLimiter的抽象子类,是平滑限流实现类的抽象父类
  • 这里首先调用resync方法(用于处理根据速率添加token的逻辑),然后再去计算permits扣减以及等待时间的计算
  • 这里调用了两个抽象方法,分别是coolDownIntervalMicros以及storedPermitsToWaitTime

SmoothRateLimiter的两个子类

SmoothRateLimiter有两个内部静态子类,分别是SmoothBursty以及SmoothWarmingUp

SmoothBursty

  /**
   * This implements a "bursty" RateLimiter, where storedPermits are translated to zero throttling.
   * The maximum number of permits that can be saved (when the RateLimiter is unused) is defined in
   * terms of time, in this sense: if a RateLimiter is 2qps, and this time is specified as 10
   * seconds, we can save up to 2 * 10 = 20 permits.
   */
  static final class SmoothBursty extends SmoothRateLimiter {
    /** The work (permits) of how many seconds can be saved up if this RateLimiter is unused? */
    final double maxBurstSeconds;

    SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
      super(stopwatch);
      this.maxBurstSeconds = maxBurstSeconds;
    }

    @Override
    void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
      double oldMaxPermits = this.maxPermits;
      maxPermits = maxBurstSeconds * permitsPerSecond;
      if (oldMaxPermits == Double.POSITIVE_INFINITY) {
        // if we don't special-case this, we would get storedPermits == NaN, below
        storedPermits = maxPermits;
      } else {
        storedPermits =
            (oldMaxPermits == 0.0)
                ? 0.0 // initial state
                : storedPermits * maxPermits / oldMaxPermits;
      }
    }

    @Override
    long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
      return 0L;
    }

    @Override
    double coolDownIntervalMicros() {
      return stableIntervalMicros;
    }
  }
  • SmoothBursty是一个zero throttling的"bursty" RateLimiter
  • coolDownIntervalMicros返回的是stableIntervalMicros,而storedPermitsToWaitTime返回的为0

SmoothWarmingUp

  static final class SmoothWarmingUp extends SmoothRateLimiter {
    private final long warmupPeriodMicros;
    /**
     * The slope of the line from the stable interval (when permits == 0), to the cold interval
     * (when permits == maxPermits)
     */
    private double slope;

    private double thresholdPermits;
    private double coldFactor;

    SmoothWarmingUp(
        SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit, double coldFactor) {
      super(stopwatch);
      this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod);
      this.coldFactor = coldFactor;
    }

    @Override
    void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
      double oldMaxPermits = maxPermits;
      double coldIntervalMicros = stableIntervalMicros * coldFactor;
      thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;
      maxPermits =
          thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
      slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
      if (oldMaxPermits == Double.POSITIVE_INFINITY) {
        // if we don't special-case this, we would get storedPermits == NaN, below
        storedPermits = 0.0;
      } else {
        storedPermits =
            (oldMaxPermits == 0.0)
                ? maxPermits // initial state is cold
                : storedPermits * maxPermits / oldMaxPermits;
      }
    }

    @Override
    long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
      double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
      long micros = 0;
      // measuring the integral on the right part of the function (the climbing line)
      if (availablePermitsAboveThreshold > 0.0) {
        double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
        // TODO(cpovirk): Figure out a good name for this variable.
        double length =
            permitsToTime(availablePermitsAboveThreshold)
                + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
        micros = (long) (permitsAboveThresholdToTake * length / 2.0);
        permitsToTake -= permitsAboveThresholdToTake;
      }
      // measuring the integral on the left part of the function (the horizontal line)
      micros += (long) (stableIntervalMicros * permitsToTake);
      return micros;
    }

    private double permitsToTime(double permits) {
      return stableIntervalMicros + permits * slope;
    }

    @Override
    double coolDownIntervalMicros() {
      return warmupPeriodMicros / maxPermits;
    }
  }
  • coolDownIntervalMicros返回的是warmupPeriodMicros / maxPermits,而storedPermitsToWaitTime的计算相对复杂一些
  • SmoothBursty是基于token bucket算法,允许一定量的bursty流量,但是有些场景需要bursty流量更平滑些,这就需要使用SmoothWarmingUp
  • SmoothWarmingUp有一个warmup period,为thresholdPermits到maxPermits的这段范围
   * <pre>
   *          ^ throttling
   *          |
   *    cold  +                  /
   * interval |                 /.
   *          |                / .
   *          |               /  .   ← "warmup period" is the area of the trapezoid between
   *          |              /   .     thresholdPermits and maxPermits
   *          |             /    .
   *          |            /     .
   *          |           /      .
   *   stable +----------/  WARM .
   * interval |          .   UP  .
   *          |          . PERIOD.
   *          |          .       .
   *        0 +----------+-------+--------------→ storedPermits
   *          0 thresholdPermits maxPermits
   * </pre>

主要涉及如下几个公式

coldInterval = coldFactor * stableInterval.
thresholdPermits = 0.5 * warmupPeriod / stableInterval
maxPermits = thresholdPermits + 2 * warmupPeriod / (stableInterval + coldInterval)
  • coldFactor默认是3
  • stableInterval代码以毫秒计算,即stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond

小结

  • Guava的RateLimiter(SmoothRateLimiter)基于token bucket算法实现,具体有两个实现类,分别是SmoothBursty以及SmoothWarmingUp
  • SmoothBursty初始化的storedPermits为0,可以支持burst到maxPermits
  • SmoothWarmingUp初始化的storedPermits为maxPermits(thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros)),也支持burst,但是总体相对平滑

doc

© 著作权归作者所有

go4it
粉丝 89
博文 1101
码字总数 1038833
作品 0
深圳
私信 提问
服务流量控制及限流

服务流量控制及限流 青蜂侠2017-07-2821 阅读 服务控制 工作上在开发类似 淘宝开放平台 的点评到综的开放平台,需要针对不同的API做流量控制,后期还需要针对不同属性的服务商(上线状态,部...

青蜂侠
2017/07/28
0
0
【Guava】使用Guava的RateLimiter做限流

一、常见的限流算法 目前常用的限流算法有两个:漏桶算法和令牌桶算法。 1.漏桶算法 漏桶算法的原理比较简单,请求进入到漏桶中,漏桶以一定的速率漏水。当请求过多时,水直接溢出。可以看出...

大海201506
2018/09/19
174
0
JFinalFilter扩展问题

@JFinal 你好,想跟你请教个问题:项目使用JFinal2.2,要使用Google的Guava的RateLimiter加一个访问限流器,但JFinalFilter没办法扩展,请问有什么建议。 RateLimiter的一个使用方式参考...

DeanHere
2016/11/07
179
1
Guava RateLimiter 学习

Google开源工具包Guava提供了限流工具类RateLimiter,该类基于令牌桶算法(Token Bucket)来完成限流 RateLimiter 从概念上来讲,速率限制器会在可配置的速率下分配许可证。如果必要的话,每个a...

谢随安
04/01
0
0
Guava RateLimiter限流源码解析和实例应用

在开发高并发系统时有三把利器用来保护系统:缓存、降级和限流 缓存的目的是提升系统访问速度和增大系统处理容量 降级是当服务出现问题或者影响到核心流程时,需要暂时屏蔽掉,待高峰或者问题...

算法之名
05/26
278
0

没有更多内容

加载失败,请刷新页面

加载更多

SpringBoot 集成MongoDB

一、MongoDB 简介 MongoDB 如今是最流行的 NoSQL 数据库,被广泛应用于各行各业中,很多创业公司数据库选型就直接使用了 MongoDB,但对于大部分公司,使用 MongoDB 的场景是做大规模数据查询...

zw965
12分钟前
7
0
使用 Envoy 和 AdGuard Home 阻挡烦人的广告

> 原文链接:使用 Envoy 和 AdGuard Home 阻挡烦人的广告 通常我们使用网络时,宽带运营商会为我们分配一个 DNS 服务器。这个 DNS 通常是最快的,距离最近的服务器,但会有很多问题,比如: ...

米开朗基杨
45分钟前
13
0
springboot之全局处理异常封装

springboot之全局处理异常封装 简介 在项目中经常出现系统异常的情况,比如NullPointerException等等。如果默认未处理的情况下,springboot会响应默认的错误提示,这样对用户体验不是友好,系...

Purgeyao
56分钟前
22
0
cookie

cookie: n. 饼干;小甜点 为什么会引入Cookie(在客户端保持http状态) 因为http协议是一种无状态协议,web服务器本身不能识别出哪些请求是同一个服务器发送的,浏览器的每一次请求都是独立...

五公里
今天
23
0
PHP常用函数

<?php/** * 获取客户端IP * @return [string] [description] */function getClientIp() { $ip = NULL; if (isset($_SERVER['HTTP_X_FORWARDED_FOR'])) { $arr = explode('......

半缘修道半缘君丶
今天
14
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部