当前位置: 首页 > news >正文

guava限流器RateLimiter源码详解

限流是保护高并发系统的三把利器之一,另外两个是缓存和降级。

常用的限流算法有滑动窗口、令牌桶和和漏桶等。Guava中的RateLimiter使用的是令牌桶,对比其他,该算法可容忍一定突发流量的速率的限流,通过控制桶的容量、发放令牌的速率,来达到对请求的限制。

 如上图:令牌桶算法会维护⼀个令牌( token )桶,以⼀个恒定的速度往桶⾥放⼊令牌( token ):当令牌桶满了的时候,再向其中放令牌,那么多余的令牌会被丢弃;当想要处理一个请求的时候,需要从令牌桶中取出一个令牌,如果此时令牌桶中没有令牌,那么则拒绝该请求。

1、guava RateLimiter使用

RateLimiter只能通过设置“令牌生成速率”(permits per second)来控制流量,无法显式设置桶的容量(burst size)。桶的容量是隐式管理的,支持以下两种模式:

  1. SmoothBursty(平滑突发):只要桶内有令牌就直接放行,也就是说系统会直接面对这突发的大量请求;这种模式下,桶的最大令牌数=permitsPerSecond (见下文解析)
  2. SmoothWarmingUp(平滑预热):认为系统在闲置一段时间后可能会存在缓存大量失效的情况,把这种情况下的系统的状态称为cool冷却状态,这种情况下系统不能达到最大的性能,如果直接面对大量的请求可能存在系统崩溃的状态,所以应该存在一个预热状态,也就是逐渐增多放行的请求数量,给系统一个缓冲时间。这种模式下,桶的最大令牌数=

1.1)示例:

import com.google.common.util.concurrent.RateLimiter;
import java.util.concurrent.TimeUnit;public class RateLimiterExample {public static void main(String[] args) {// SmoothBursty:允许突发流量RateLimiter burstyLimiter = RateLimiter.create(1.0);//burstyLimiter.setRate(10.0); 重新调整速率for (int i = 0; i < 10; i++) {final int finalI = i;new Thread(() -> {   // 使用burstyLimiterburstyLimiter.acquire();System.out.println("请求 " + finalI + ",时间:" + System.currentTimeMillis());}).start();}// SmoothWarmingUp:预热10秒,令牌生成速率逐渐升高到5 QPSRateLimiter warmingUpLimiter = RateLimiter.create(5.0, 10, TimeUnit.SECONDS);// 使用warmingUpLimiterSystem.out.println("SmoothWarmingUp模式:");for (int i = 0; i < 10; i++) {final int finalI = i;new Thread(() -> {   // 使用burstyLimiterwarmingUpLimiter.acquire();System.out.println("请求 " + i + ",时间:" + System.currentTimeMillis());}).start();}}
}

输出:

SmoothBursty模式:
请求 1,时间:1751290470946
请求 9,时间:1751290471951
请求 7,时间:1751290472950
请求 6,时间:1751290473948
请求 8,时间:1751290474950
请求 5,时间:1751290475948
请求 3,时间:1751290476950
请求 4,时间:1751290477947
请求 2,时间:1751290478947
请求 0,时间:1751290479950

可以看到:guava RateLimiter不保证线程的公平,由于设置的qps=1,所以每个线程打印的时间基本是相差1000.

1.2)api说明:

  • create(double permitsPerSecond):创建一个限流器,参数是每秒允许的请求数(QPS),默认是平滑突发模式。对于平滑突发模式,桶的最大容量=permitsPerSecond。
  • double acquire()/acquire(int permits):请求一个/n个令牌,如果当前没有令牌,则会阻塞直到令牌可用。返回值是获取令牌等待的时间(单位秒)
  • boolean tryAcquire() /tryAcquire(int permits):尝试获取1个/n个令牌,获取不到时立即返回 false,适合非阻塞限流。
  • boolean tryAcquire(int permits, long timeout, TimeUnit unit):获取permits个令牌,最多阻塞等待timeout时间
  • setRate(double permitsPerSecond):设置令牌发放速率。RateLimiter允许在运行中调整速率。

注:RateLimiter 是线程安全的,它会限制所有线程的调用总速率,但它并不保证公平性。

1.3)其他单机限流组件:

  • Resilience4j:功能丰富的限流、熔断、重试库,支持多种限流策略。
  • Bucket4j:基于令牌桶算法的限流库,支持分布式限流。
  • Sentinel(阿里开源):功能强大的流量控制组件,支持多种限流规则。

2、RateLimiter实现原理

传统的令牌桶算法中,有一个固定容量的桶,并以稳定的速率生成令牌放在桶中。当有请求时,需要获取令牌才能通过。因为桶的容量固定,令牌满了就不生产了。桶中有充足的令牌时,突发的流量可以直接获取令牌通过。当令牌取光后,后面的请求就得等生成令牌后才能通过。

RateLimiter的令牌桶算法并不是真的有个令牌桶对象,以每秒定时任务式地往桶中生产指定个数的令牌对象。后面介绍RateLimiter到底是如何以非定时任务的方式实现令牌桶算法的?

2.1)RateLimiter如何生成令牌放到桶里?——延迟计算令牌

1)思路:

根据令牌桶算法,桶中的令牌是持续生成存放的,有请求时需要先从桶中拿到令牌才能开始执行。根据这个特点,很容易想到:

开启一个定时任务,由定时任务持续生成令牌。这样的问题在于会极大的消耗系统资源,如,某接口需要分别对每个用户做访问频率限制,假设系统中存在6W用户,则至多需要开启6W个定时任务来维持每个桶中的令牌数,这样的开销是巨大的。

RateLimiter使用延迟计算令牌的方式:通过维护一个“下一次可用许可(令牌)的时间戳”(nextFreeTicketMicros)来控制许可发放节奏,同时允许令牌积累,但积累的最大量是有限制的。主要原理是:允许第一个请求预消费桶中的令牌,下一个请求则需要等待生产“上次预消费令牌数量”的时间。

  1. 在每次acquire请求获取特定数量的令牌是,先按照【当前时间和上一次计算令牌的时间的差值/产生令牌的时间间隔】计算出当前时刻桶内应该有多少个令牌(是一个浮点数),将生成的令牌加入令牌桶中并更新令牌数。这样一来,只需要在获取令牌时计算一次令牌数,而不是定时生产令牌。
  2. 计算得到桶内令牌数后,根据请求的令牌数和当前现有的令牌数,通过一定的规则计算出本次请求需要等待的时间是多少(可以是0),然后让请求线程进行阻塞等待这么多时间。(第一个请求可以预消费——不用等待,但后续请求需要先把上次欠的令牌还完后再消费
  3. 请求线程从acquire请求处苏醒,继续执行后续代码。这样,RateLimiter对这个请求做限流的工作就完成了。

注意:请求的许可数量不会影响请求本身的限流(调用 acquire(1) 和调用 acquire(1000) 会导致完全相同的节流,即使两者有差异),但会影响下一个请求的节流。也就是说,如果一个开销较大的任务到达空闲的 RateLimiter,它会立即被授予许可,但下一个请求将受到额外的节流,从而弥补这个开销较大的任务的开销。

2)说明:
  • RateLimiter实际上有两种限流器实现类:SmoothBursty限流器和SmoothWarmingUp限流器。它们的核心区别其实仅在于第2步中,去计算本次请求令牌需要等待的时间的方式是不同的。具体体现为reserveEarliestAvailable()内调用的计算等待时间的方法有不同的重写。
  • 在第2步中,计算出来的等待时间也可能是0,即不需要等待(请求线程不需要被阻塞),请求线程直接能马上获取到令牌。
  • RateLimiter一定保证请求线程能拿到它想要的全部令牌,尽管拿到这些令牌可能需要等一段时间。这个也是比较创新的一点
3)源码结构:

RateLimiter是一个抽象类,SmoothRateLimiter(本身也是一个抽象类)实现了该抽象类;在SmoothRateLimiter中定义了两个static的子实现类,分别是:SmoothWarmingUp和SmoothBursty。查看SmoothRateLimiter,定义了以下属性:

abstract class SmoothRateLimiter extends RateLimiter {static final class SmoothWarmingUp extends SmoothRateLimiter {}static final class SmoothBursty extends SmoothRateLimiter {}/**当前存储令牌数 The currently stored permits. */double storedPermits;/**最大存储令牌数 The maximum number of stored permits. */double maxPermits;/*** 添加(生成)令牌时间间隔* 无论哪个子类,计算方式都是:stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;*/double stableIntervalMicros;/*** 下一次请求可以获取令牌的起始时间,由于RateLimiter允许一次预消费,上次请求预消费令牌后,下次请求需要等待相应的时间到nextFreeTicketMicros时刻才可以获取令牌*/private long nextFreeTicketMicros = 0L; // could be either in the past or future
}

2.2)请求获取令牌过程

RateLimiter本身是线程安全的,在实际业务中

  • 如果你的限流逻辑是全局的,且需要在整个应用程序中共享同一个RateLimiter实例,可以使用单例模式(例如在spring中设置成一个bean)。
  • 如果每个需要限流的业务逻辑都有自己独立的RateLimiter实例,那么需要多个RateLimiter实例。
1)整体流程:

对于某个RateLimiter实例,多个线程并发调用acquire()方式进行流量控制,内部调用链如下:

  1. reserve :每个线程同步阻塞的方式更新桶令牌、nextFreeTicketMicros,以及计算需要等待的时间
  2. sleepMicrosUninterruptibly:每个线程进行等待阻塞(内部通过sleep实现)

所以,从这里可以看到,令牌的发放通过惰性的方式放在了每次请求中,而不是通过定时器来完成

public abstract class RateLimiter {public double acquire(int permits) {long microsToWait = reserve(permits); //更新桶令牌、nextFreeTicketMicros,以及计算需要等待的时间stopwatch.sleepMicrosUninterruptibly(microsToWait); //执行等待return 1.0 * microsToWait / SECONDS.toMicros(1L); //返回等待时间(转成秒)}final long reserve(int permits) {checkPermits(permits);synchronized (mutex()) {return reserveAndGetWaitLength(permits, stopwatch.readMicros());}}final long reserveAndGetWaitLength(int permits, long nowMicros) {long momentAvailable = reserveEarliestAvailable(permits, nowMicros);return max(momentAvailable - nowMicros, 0);}abstract long reserveEarliestAvailable(int permits, long nowMicros);
}

说明1:在执行等待的时候,是通过Uninterruptibles.sleepUninterruptibly实现的,它本质上是包装了Thread.sleep(),但是会:

  • 忽略中断(不抛出InterruptedException)
  • 确保睡眠完整的时间
  • 最后恢复中断状态

说明2:当并发场景中,多个线程调用acquire方法时,阻塞会发生在两个阶段:

  1. 同步阶段:在synchronized内计算等待时间、更新桶内令牌数和nextFreeTicketMicros(非常短暂)
  2. 等待阶段:通过sleep让请求的线程等待

这样设计好处是:实际等待在同步块外进行,避免持有锁时睡眠。同步块只用于计算等待时间,不用于实际等待,同步块内确保了令牌计算和状态更新的原子性,防止并发问题

 

http://www.lqws.cn/news/589537.html

相关文章:

  • SpringBoot -- 自动配置原理
  • 基于Python的GIS-RS多源数据处理(TIF/SHP/NC/...)【20250630】
  • P1967 [NOIP 2013 提高组] 货车运输
  • Spring生态:云原生与AI的革新突破
  • C++ 快速回顾(五)
  • 编程新手之环境搭建:node python
  • Excel转pdf实现动态数据绑定
  • 「Java案例」计算矩形面积
  • Linux随记(十九)
  • python+uniapp基于微信小程序的食堂菜品查询系统
  • [springboot系列] 探秘JUnit 5: Java单元测试利器
  • Spring 依赖注入:官方推荐方式及最佳实践
  • hono+postgresql+CURD
  • YOLOv13:最新的YOLO目标检测算法
  • Windows11系统中安装docker并配置docker镜像到pycharm中
  • 文旅数字孪生交付生态链:集成-交付-运维“三位一体”,100+案例助力行业数字化转型
  • 腾讯云空间,高性能显卡云,安装xinference报错,pip install 空间不够用了
  • WOLA(Weighted Overlap-Add)方法详解
  • 实战避坑:MyBatis中${}拼接如何优雅又安全?
  • Python 数据分析与机器学习入门 (二):NumPy 核心教程,玩转多维数组
  • Redis 集群
  • SQLite 安装使用教程
  • 长短期记忆网络(LSTM):让神经网络拥有 “持久记忆力” 的神奇魔法
  • 反射,枚举和lambda表达式
  • Bessel位势方程求解步骤
  • 国产化替换中政务行业通用的解决方案是什么?需要注意的事项有哪些?
  • 链表题解——移除链表元素【LeetCode】
  • 基于DSP的边缘检测与图像锐化算法研究与实现
  • ACE之ACE_NonBlocking_Connect_Handler问题分析
  • Vue防抖节流