Guava限流器

概述

Guava 是由 Google 开源的一个 Java 工具库,其中包含了很多实用工具类。RateLimiter 是 Guava 提供的一个限流器,用于限制代码的执行速率。它基于令牌桶算法实现,适用于控制请求速率、限制并发等场景

RateLimiter 提供了两种限流模式:

  1. 平滑突发限流(SmoothBursty):适用于允许短时间内的突发请求,然后平滑地限制速率
  2. 平滑预热限流(SmoothWarmingUp):适用于系统需要一段时间预热后达到稳定速率的场景

具体实现

引入依赖

在 Maven 的 pom.xml 文件中引入 Guava 依赖:

1
2
3
4
5
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.1-jre</version>
</dependency>

配置限流器

在 Spring Boot 应用中,可以使用 RateLimiter 来配置限流器。例如,在一个服务类中定义限流器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import com.google.common.util.concurrent.RateLimiter;
import org.springframework.stereotype.Service;

@Service
public class MyService {

// 每秒发放2个令牌
private final RateLimiter rateLimiter = RateLimiter.create(2.0);

public void performOperation() {
// 获取令牌,阻塞直到获取到令牌
rateLimiter.acquire();

// 执行被限流的操作
System.out.println("Operation performed at " + System.currentTimeMillis());
}
}

在控制器中使用

在 Spring Boot 控制器中调用限流服务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MyController {

@Autowired
private MyService myService;

@GetMapping("/limitedOperation")
public String limitedOperation() {
myService.performOperation();
return "Operation performed";
}
}

通过上述配置,当 /limitedOperation 接口被频繁访问时,每秒最多只会执行两次 performOperation 方法。

限流器的核心方法:

  • acquire() :获取一个令牌, 改方法会阻塞直到获取到这一个令牌, 返回值为获取到这个令牌花费的时间
  • acquire(int permits) :获取指定数量的令牌, 该方法也会阻塞, 返回值为获取到这 N 个令牌花费的时间
  • tryAcquire() :判断时候能获取到令牌, 如果不能获取立即返回 false
  • tryAcquire(int permits): 获取指定数量的令牌, 如果不能获取立即返回 false
  • tryAcquire(long timeout, TimeUnit unit): 判断能否在指定时间内获取到令牌, 如果不能获取立即返回 false
  • tryAcquire(int permits, long timeout, TimeUnit unit) :同上

AOP方式简化

  1. 加入AOP依赖

    1
    2
    3
    4
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-aop</artifactId>
    </dependency>
  2. 自定义限流注解

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    @Retention(RetentionPolicy.RUNTIME)
    @Target({ElementType.METHOD})
    @Documented
    public @interface Limit {
    /**
    * 资源的key,唯一
    * 作用:不同的接口,不同的流量控制
    */
    String key() default "";

    /**
    * 最多的访问限制次数
    */
    double permitsPerSecond () ;

    /**
    * 获取令牌最大等待时间
    */
    long timeout();

    /**
    * 获取令牌最大等待时间,单位(例:分钟/秒/毫秒) 默认:毫秒
    */
    TimeUnit timeunit() default TimeUnit.MILLISECONDS;

    /**
    * 得不到令牌的提示语
    */
    String msg() default "系统繁忙,请稍后再试.";
    }
  3. 使用AOP切面拦截限流注解

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    @Slf4j
    @Aspect
    @Component
    public class LimitAop {
    /**
    * 不同的接口,不同的流量控制
    * map的key为 Limiter.key
    */
    private final Map<String, RateLimiter> limitMap = Maps.newConcurrentMap();

    @Around("@annotation(com.wht.guavalimit.Limit)")
    public Object around(ProceedingJoinPoint joinPoint) throws Throwable{
    MethodSignature signature = (MethodSignature) joinPoint.getSignature();
    Method method = signature.getMethod();
    //拿limit的注解
    Limit limit = method.getAnnotation(Limit.class);
    if (limit != null) {
    //key作用:不同的接口,不同的流量控制
    String key=limit.key();
    RateLimiter rateLimiter = null;
    //验证缓存是否有命中key
    if (!limitMap.containsKey(key)) {
    // 创建令牌桶
    rateLimiter = RateLimiter.create(limit.permitsPerSecond());
    limitMap.put(key, rateLimiter);
    log.info("新建了令牌桶={},容量={}",key,limit.permitsPerSecond());
    }
    rateLimiter = limitMap.get(key);
    // 拿令牌
    boolean acquire = rateLimiter.tryAcquire(limit.timeout(), limit.timeunit());
    // 拿不到命令,直接返回异常提示
    if (!acquire) {
    log.debug("令牌桶={},获取令牌失败",key);
    this.responseFail(limit.msg());
    return null;
    }
    }
    return joinPoint.proceed();
    }

    /**
    * 直接向前端抛出异常
    * @param msg 提示信息
    */
    private void responseFail(String msg) {
    HttpServletResponse response=((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getResponse();
    ResultData<Object> resultData = ResultData.fail(ReturnCode.LIMIT_ERROR.getCode(), msg);
    WebUtils.writeJson(response,resultData);
    }
    }
  4. 配置注解:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    @Slf4j
    @RestController
    @RequestMapping("/limit")
    public class LimitController {

    @GetMapping("/test2")
    @Limit(key = "limit2", permitsPerSecond = 1, timeout = 500, timeunit = TimeUnit.MILLISECONDS,msg = "当前排队人数较多,请稍后再试!")
    public String limit2() {
    log.info("令牌桶limit2获取令牌成功");
    return "ok";
    }


    @GetMapping("/test3")
    @Limit(key = "limit3", permitsPerSecond = 2, timeout = 500, timeunit = TimeUnit.MILLISECONDS,msg = "系统繁忙,请稍后再试!")
    public String limit3() {
    log.info("令牌桶limit3获取令牌成功");
    return "ok";
    }
    }

底层原理

Guava 的 RateLimiter 基于令牌桶算法实现。下面是对 RateLimiter 核心实现的简要分析:

  1. RateLimiter 类

    RateLimiter 是一个抽象类,其核心方法包括 acquire(), tryAcquire()setRate() 等。实际的限流实现由其子类 SmoothRateLimiter 完成。

  2. SmoothRateLimiter 类

    SmoothRateLimiter 通过令牌桶算法来控制令牌的发放速率。其关键字段包括:

    • storedPermits: 当前桶中存储的令牌数量。
    • maxPermits: 桶中允许存储的最大令牌数量。
    • nextFreeTicketMicros: 下一个令牌可用的时间戳(微秒)。
  3. 令牌发放逻辑

    SmoothRateLimiter 通过 resync 方法更新令牌桶的状态:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // 并不是定时放令牌,而是在拿令牌的时候判断时间段内能放多少令牌
    void resync(long nowMicros) {
    if (nowMicros > nextFreeTicketMicros) {
    // 计算新的令牌数
    long newPermits = (nowMicros - nextFreeTicketMicros) / stableIntervalMicros;
    storedPermits = Math.min(maxPermits, storedPermits + newPermits);
    // 记录下一次能获取令牌的时间
    nextFreeTicketMicros = nowMicros;
    }
    }
  4. 获取令牌

    acquire 方法用于获取令牌并执行限流操作:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    public double acquire(int permits) {
    long microsToWait = reserve(permits);
    sleeper.sleepUninterruptibly(microsToWait, TimeUnit.MICROSECONDS);
    return 1.0 * microsToWait / TimeUnit.SECONDS.toMicros(1L);
    }

    private long reserve(int permits) {
    checkPermits(permits);
    long nowMicros = readSafeMicros();
    synchronized (mutex) {
    // 方法通过计算当前时间与下一个令牌可用时间的差值来确定需要等待的时间,从而实现限流
    long microsToWait = reserveAndGetWaitLength(permits, nowMicros);
    return microsToWait;
    }
    }

    final long reserveAndGetWaitLength(int permits, long nowMicros) {
    //将数据透传,拿到最早可预订的时间,如果预订时间在未来时间,返回一个大于0的值为等待时间
    long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
    return max(momentAvailable - nowMicros, 0);
    }


    final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
    //放令牌
    resync(nowMicros);
    //返回下次发放令牌时间,如果这个时间大于当前时间,在调用的上层会sleep
    long returnValue = nextFreeTicketMicros;
    //拿到此次花费的令牌
    double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
    // 如果令牌不够,这里就会大于0,下面就会得出一个等待时间,可以支持预支令牌,让其他获取等待
    double freshPermits = requiredPermits - storedPermitsToSpend;
    long waitMicros =
    storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
    + (long) (freshPermits * stableIntervalMicros);
    //将下次发放令牌的时间加上等待时间
    this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
    this.storedPermits -= storedPermitsToSpend;
    return returnValue;
    }