文档章节

Guava RateLimiter限流源码解析和实例应用

算法之名
 算法之名
发布于 05/26 21:26
字数 2905
阅读 262
收藏 1

在开发高并发系统时有三把利器用来保护系统:缓存、降级和限流

  • 缓存 缓存的目的是提升系统访问速度和增大系统处理容量
  • 降级 降级是当服务出现问题或者影响到核心流程时,需要暂时屏蔽掉,待高峰或者问题解决后再打开
  • 限流 限流的目的是通过对并发访问/请求进行限速,或者对一个时间窗口内的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务、排队或等待、降级等处理

常用的限流算法

漏桶算法

漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。

 

令牌桶算法

对于很多应用场景来说,除了要求能够限制数据的平均传输速率外,还要求允许某种程度的突发传输。这时候漏桶算法可能就不合适了,令牌桶算法更为适合。如图所示,令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。

 

RateLimiter使用以及源码解析

Google开源工具包Guava提供了限流工具类RateLimiter,该类基于令牌桶算法实现流量限制,使用十分方便,而且十分高效。

Guava有两种限流模式,一种为稳定模式(SmoothBursty:令牌生成速度恒定),一种为渐进模式(SmoothWarmingUp:令牌生成速度缓慢提升直到维持在一个稳定值) 两种模式实现思路类似,主要区别在等待时间的计算上,本篇重点介绍SmoothBursty

public static RateLimiter create(double permitsPerSecond) {
  /*
   * 默认的RateLimiter配置可以保存最多一秒钟的未使用许可证
   */
  return create(SleepingStopwatch.createFromSystemTimer(), permitsPerSecond);
}

RateLimiter是一个抽象类,SmoothBursty是其子类SmoothRateLimiter的子类,其两个构造参数含义如下

  • SleepingStopwatch:guava中的一个时钟类实例,会通过这个来计算时间及令牌
  • maxBurstSeconds:官方解释,在ReteLimiter未使用时,最多保存几秒的令牌,默认是1
@VisibleForTesting
static RateLimiter create(SleepingStopwatch stopwatch, double permitsPerSecond) {
  RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
  //根据每秒向桶中放入令牌的数量来设置当前存储令牌数
  rateLimiter.setRate(permitsPerSecond);
  return rateLimiter;
}
public final void setRate(double permitsPerSecond) {
  //如果每秒向桶中放入令牌的数量(permitsPerSecond)大于0且为数字,通过检查,否则抛出参数异常
  checkArgument(
      permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
  //对每个线程进行互斥,建立互斥对象的锁定
  synchronized (mutex()) {
    //由各项参数更新当前存储令牌数
    doSetRate(permitsPerSecond, stopwatch.readMicros());
  }
}
public static void checkArgument(boolean expression, @Nullable Object errorMessage) {
  if (!expression) {
    throw new IllegalArgumentException(String.valueOf(errorMessage));
  }
}
private volatile Object mutexDoNotUseDirectly; //线程安全的互斥对象
private Object mutex() {
  Object mutex = mutexDoNotUseDirectly;
  if (mutex == null) {
    synchronized (this) {
      mutex = mutexDoNotUseDirectly;
      if (mutex == null) {
        mutexDoNotUseDirectly = mutex = new Object();
      }
    }
  }
  return mutex;
}

在SmoothBursty中

@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {
  //若当前时间晚于nextFreeTicketMicros,则计算该段时间内可以生成多少令牌,将生成的令牌加入令牌桶中并更新数据 
  resync(nowMicros);
  //更新添加1个令牌的时间间隔(单位微妙)为1000000微妙(1秒)除以每秒放入令牌桶中的数量
  double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
  this.stableIntervalMicros = stableIntervalMicros;
  //将令牌桶中可以存储令牌的时间参数加上更新当前可以存储的令牌数
  doSetRate(permitsPerSecond, stableIntervalMicros);
}
private long nextFreeTicketMicros = 0L; //下一次请求可以获取令牌的起始时间
double storedPermits; //当前存储令牌数
double maxPermits; //最大存储令牌数 = maxBurstSeconds * stableIntervalMicros
double stableIntervalMicros; //添加令牌时间间隔 = SECONDS.toMicros(1L) / permitsPerSecond;(1秒/每秒的令牌数)
final double maxBurstSeconds; //在RateLimiter未使用时,最多存储几秒的令牌
private void resync(long nowMicros) {
  //如果当前时间大于下一次请求可以获取令牌的起始时间
  if (nowMicros > nextFreeTicketMicros) {
    //比较最大存储令牌数和当前存储的令牌数加上现在要增加的令牌数的大小,小的那个赋给当年存储令牌数,即增加令牌数与当前令牌数之和不能大于最大令牌数
    storedPermits = min(maxPermits,
        storedPermits + (nowMicros - nextFreeTicketMicros) / stableIntervalMicros);
    //将当前时间赋给下一次请求可以获取的起始时间
    nextFreeTicketMicros = nowMicros;
  }
}
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
  //将最大存储令牌数存入临时副本
  double oldMaxPermits = this.maxPermits;
  //更新最大存储令牌数为存放令牌的秒数乘以每秒向桶中放入的令牌数
  maxPermits = maxBurstSeconds * permitsPerSecond;
  //如果最大存储令牌数的临时副本为正无穷大
  if (oldMaxPermits == Double.POSITIVE_INFINITY) {
    //更新当前存储令牌数为最大存储令牌数
    storedPermits = maxPermits;
  } else { //如果最大存储令牌数的临时副本不为正无穷大
    //如果最大存储令牌数的临时副本为0,则更新当前存储令牌数为0,否则
    //更新当前存储令牌数为当前存储令牌数乘以最大存储令牌数除以最大存储令牌数的临时副本数
    storedPermits = (oldMaxPermits == 0.0)
        ? 0.0 // initial state
        : storedPermits * maxPermits / oldMaxPermits;
  }
}

我们再来看一下RateLimiter的tryAcquire方法

public boolean tryAcquire(long timeout, TimeUnit unit) {
  //尝试在timeout时间内获取令牌,如果可以则挂起(睡眠)等待相应时间并返回true,否则立即返回false 
  return tryAcquire(1, timeout, unit);
}
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
  //取等待时间的微妙数与0比较取大值赋给超时时间
  long timeoutMicros = max(unit.toMicros(timeout), 0);
  //如果检查时间>0,通过检查,此处为1
  checkPermits(permits);
  long microsToWait;
  //建立互斥对象加锁互斥
  synchronized (mutex()) {
    //获取当前时间
    long nowMicros = stopwatch.readMicros();
    //如果下一次请求可以获取令牌的起始时间减去等待时间大于当前时间
    if (!canAcquire(nowMicros, timeoutMicros)) {
      return false; //返回false
    } else { //如果下一次请求可以获取令牌的起始时间减去等待时间小于等于当前时间
      //获取下一次请求可以获取令牌的起始时间减去当前时间的值与0之间的大值并刷新各参数(下一次请求可以获取令牌的起始时间、当前存储令牌数)
      microsToWait = reserveAndGetWaitLength(permits, nowMicros);
    }
  }
  //线程休眠microsToWait时间
  stopwatch.sleepMicrosUninterruptibly(microsToWait);
  //返回true
  return true;
}
private static int checkPermits(int permits) {
  checkArgument(permits > 0, "Requested permits (%s) must be positive", permits);
  return permits;
}
final Stopwatch stopwatch = Stopwatch.createStarted();
@Override
long readMicros() {
  return stopwatch.elapsed(MICROSECONDS);
}
private boolean canAcquire(long nowMicros, long timeoutMicros) {
  //返回下一次请求可以获取令牌的起始时间减去等待时间是否小于等于当前时间
  return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
}
final long reserveAndGetWaitLength(int permits, long nowMicros) {
  //获取下一次请求可以获取令牌的起始时间并更新各参数(下一次请求可以获取令牌的起始时间、当前存储令牌数long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
  //返回下一次请求可以获取令牌的起始时间减去当前时间的值与0之间的大值
  return max(momentAvailable - nowMicros, 0);
}
@Override
void sleepMicrosUninterruptibly(long micros) {
  if (micros > 0) {
    Uninterruptibles.sleepUninterruptibly(micros, MICROSECONDS);
  }
}

在SmoothBursty中

@Override
final long queryEarliestAvailable(long nowMicros) {
  //返回下一次请求可以获取令牌的起始时间
  return nextFreeTicketMicros;
}
@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
  //若当前时间晚于nextFreeTicketMicros,则计算该段时间内可以生成多少令牌,将生成的令牌加入令牌桶中并更新数据
  resync(nowMicros);
  //获取下一次请求可以获取令牌的起始时间
  long returnValue = nextFreeTicketMicros;
  //在允许的请求数(这里为1)和当前存储令牌数间取小值赋给允许消费的存储令牌数(storedPermitsToSpend)
  double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
  //将允许的请求数减去允许消费的存储令牌数赋给允许刷新数(freshPermits)
  double freshPermits = requiredPermits - storedPermitsToSpend;
  //将允许刷新数乘以添加令牌时间间隔赋给等待微妙数(waitMicros)
  long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
      + (long) (freshPermits * stableIntervalMicros);
  //更新下一次请求可以获取令牌的起始时间为下一次请求可以获取令牌的起始时间加上等待微妙数
  this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros;
  //更新当前存储令牌数为当前存储令牌数减去允许消费的存储令牌数
  this.storedPermits -= storedPermitsToSpend;
  return returnValue;
}
@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
  return 0L;
}

在Uninterruptibles中

public static void sleepUninterruptibly(long sleepFor, TimeUnit unit) {
  //定义是否已中断为false
  boolean interrupted = false;
  try {
    //将下一次请求可以获取令牌的起始时间减去当前时间的值转化为纳秒定义为remainingNanos
    long remainingNanos = unit.toNanos(sleepFor);
    //将系统的纳秒值加上该转化值为end
    long end = System.nanoTime() + remainingNanos;
    while (true) {
      try {
        //线程休眠remainingNanos时间
        NANOSECONDS.sleep(remainingNanos);
        return;
      } catch (InterruptedException e) {
        //如果发生中断异常,将是否已中断更新为true
        interrupted = true;
        //更新remainingNanos为end减去系统的纳秒值,并进入下一轮循环
        remainingNanos = end - System.nanoTime();
      }
    }
  } finally {
    //如果发生中断异常
    if (interrupted) {
      //当前线程中断
      Thread.currentThread().interrupt();
    }
  }
}

源码分析就是这些了,现在我们来看一下Guava RateLimiter的应用,在APO中拦截Controller,并进行限流

在pom中添加

<dependency>
   <groupId>com.google.guava</groupId>
   <artifactId>guava</artifactId>
   <version>18.0</version>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-aop</artifactId>
</dependency>

标签

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface LxRateLimit {
    /**
     *
     * @return
     */
    String value() default "";

    /**
     * 每秒向桶中放入令牌的数量   默认最大即不做限流
     * @return
     */
    double perSecond() default Double.MAX_VALUE;

    /**
     * 获取令牌的等待时间  默认0
     * @return
     */
    int timeOut() default 0;

    /**
     * 超时时间单位
     * @return
     */
    TimeUnit timeOutUnit() default TimeUnit.MILLISECONDS;
}

AOP类

@Slf4j
@Aspect
@Component
public class LxRateLimitAspect {
    private RateLimiter rateLimiter = RateLimiter.create(Double.MAX_VALUE);

    /**
     * 带有指定注解切入
     */
    @ResponseBody
    @Around(value = "@annotation(com.guanjian.annotation.LxRateLimit)")
    public Object aroundNotice(ProceedingJoinPoint pjp) throws Throwable {
        log.info("拦截到了{}方法...", pjp.getSignature().getName());
        Signature signature = pjp.getSignature();
        MethodSignature methodSignature = (MethodSignature)signature;
        //获取目标方法
        Method targetMethod = methodSignature.getMethod();
        if (targetMethod.isAnnotationPresent(LxRateLimit.class)) {
            //获取目标方法的@LxRateLimit注解
            LxRateLimit lxRateLimit = targetMethod.getAnnotation(LxRateLimit.class);
            rateLimiter.setRate(lxRateLimit.perSecond());
            if (!rateLimiter.tryAcquire(lxRateLimit.timeOut(), lxRateLimit.timeOutUnit()))
                return "服务器繁忙,请稍后再试!";
        }
        return pjp.proceed();
    }
}

Controller

@RestController
public class AnnotationTestController {
    @GetMapping("/testannotation")
    @LxRateLimit(perSecond = 2000.0, timeOut = 500) //此处限速为2000qps
    public String testAnnotation() {
        return "get token success";
    }
}

我们先在Controller中将@LxRateLimit(perSecond = 2000.0, timeOut = 500)注释掉

运行Jmeter进行压测

我们启用500线程压测

压测结果

吞吐量为7867.8qps,此时是不限速的

现在我们恢复Controller中的@LxRateLimit(perSecond = 2000.0, timeOut = 500)

吞吐量为2067.7qps

系统日志可以看到大量的拦截

2019-05-26 21:24:33.370  INFO 11092 --- [o-8080-exec-176] com.guanjian.aop.LxRateLimitAspect       : 拦截到了testAnnotation方法...
2019-05-26 21:24:33.370  INFO 11092 --- [io-8080-exec-27] com.guanjian.aop.LxRateLimitAspect       : 拦截到了testAnnotation方法...
2019-05-26 21:24:33.374  INFO 11092 --- [o-8080-exec-128] com.guanjian.aop.LxRateLimitAspect       : 拦截到了testAnnotation方法...
2019-05-26 21:24:33.374  INFO 11092 --- [o-8080-exec-191] com.guanjian.aop.LxRateLimitAspect       : 拦截到了testAnnotation方法...
2019-05-26 21:24:33.374  INFO 11092 --- [io-8080-exec-23] com.guanjian.aop.LxRateLimitAspect       : 拦截到了testAnnotation方法...
2019-05-26 21:24:33.377  INFO 11092 --- [io-8080-exec-36] com.guanjian.aop.LxRateLimitAspect       : 拦截到了testAnnotation方法...
2019-05-26 21:24:33.379  INFO 11092 --- [o-8080-exec-123] com.guanjian.aop.LxRateLimitAspect       : 拦截到了testAnnotation方法...
2019-05-26 21:24:33.379  INFO 11092 --- [io-8080-exec-61] com.guanjian.aop.LxRateLimitAspect       : 拦截到了testAnnotation方法...
2019-05-26 21:24:33.380  INFO 11092 --- [io-8080-exec-19] com.guanjian.aop.LxRateLimitAspect       : 拦截到了testAnnotation方法...
2019-05-26 21:24:33.382  INFO 11092 --- [io-8080-exec-77] com.guanjian.aop.LxRateLimitAspect       : 拦截到了testAnnotation方法...
2019-05-26 21:24:33.384  INFO 11092 --- [io-8080-exec-23] com.guanjian.aop.LxRateLimitAspect       : 拦截到了testAnnotation方法...

© 著作权归作者所有

算法之名
粉丝 48
博文 176
码字总数 262845
作品 0
广州
私信 提问
Spring Cloud 入门教程9、服务限流/API限流(Zuul+RateLimiter)

一、前言 1、什么是RateLimiter、Spring Cloud Zuul RateLimiter? RateLimiter是Google开源的实现了令牌桶算法的限流工具(速率限制器)。http://ifeve.com/guava-ratelimiter/ Spring Clou...

吴伟祥
02/19
445
0
服务流量控制及限流

服务流量控制及限流 青蜂侠2017-07-2821 阅读 服务控制 工作上在开发类似 淘宝开放平台 的点评到综的开放平台,需要针对不同的API做流量控制,后期还需要针对不同属性的服务商(上线状态,部...

青蜂侠
2017/07/28
0
0
接口限流算法:漏桶算法和令牌桶算法

漏桶算法 漏桶可以看作是一个带有常量服务时间的单服务器队列,如果漏桶(包缓存)溢出,那么数据包会被丢弃。这一点和线程池原理是很相似的。 把请求比作是水,水来了都先放进桶里,并以限定...

铁骨铮铮
05/23
230
0
限流分析(Guava RateLimter)

一般系统为了防止服务被调用的QPS超出其承载能力,防止大流量压垮服务器造成雪崩后果,设计时往往会加入限流的逻辑。通过限流,当请求超出系统的服务能力时,系统可以采取拒绝服务/排队等待/...

robin-yao
2016/11/19
2.2K
1
【Guava】使用Guava的RateLimiter做限流

一、常见的限流算法 目前常用的限流算法有两个:漏桶算法和令牌桶算法。 1.漏桶算法 漏桶算法的原理比较简单,请求进入到漏桶中,漏桶以一定的速率漏水。当请求过多时,水直接溢出。可以看出...

大海201506
2018/09/19
172
0

没有更多内容

加载失败,请刷新页面

加载更多

for循环

九九乘法表 示例:for(int i = 1; i <= 9; i++){ for (int j = 1; j <= i; j++) { // 每次开始i循环,j都会重新定义为j=1,然后开始循环计算 System.out.print(j +......

Shutting
4分钟前
1
0
小王子1

一定要帅! 韩国设计师品牌 insgram全世界得网红 韩国潮男穿搭 HM 找到穿衣服最好看的人,跟他比,比他好看。 在兴趣前,不要表现目的性,压力 关系是不热就冷的! 不喜欢压力,不喜欢负责任...

阿锋zxf
22分钟前
5
0
时间戳

1 loadTimeString(ts) { var d = new Date(); if (String(ts).length == 10) { d = new Date(ts * 1000); ......

东方巨人
24分钟前
3
0
Redis Cluster

Redis Cluster 集群 redis集群有以下几种方式 普通一主多从 普通一主多从+哨兵 cluster分片模式 一主多从 搭建方式网上很多,就不多描述了。 这种集群方式,一般master用作写,slave用做读,...

lazy~
24分钟前
4
0
 介绍一款优秀的通用管理权限快速开发框架

这是一套以权限管理为主的轻量化快速开发框架,配置有流程、专业表单、权限、app、企业微信等基础功能模块,在开发通用软件的效率上很有优势。 软件平台常用研发需求分析 《那些年我们一起做...

我想造火箭
41分钟前
12
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部