有点长。本来应该分3个部分分别是AOP、Redis配置、限流原理及实现来写的,写的时候没收住,一股脑全顺着写下来了

什么是 AOP

(1)面向切面编程(方面),利用 AOP 可以对业务逻辑的各个部分进行隔离,从而使得业务逻辑各部分之间的耦合度降低,提高程序的可重用性,同时提高了开发的效率。

(2)通俗描述:不通过修改源代码方式,在主干功能里面添加新功能

准备工作

详细的原理、底层实现的内容这里不多提,只谈实际使用。

添加maven依赖,具体version需要自己判断,我这边本地起的应用的SpringBoot2.4的

<dependency>
    <groupId>org.aspectj</groupId>
    <artifactId>aspectjrt</artifactId>
    <version>1.9.4</version>
</dependency>
<dependency>
    <groupId>org.aspectj</groupId>
    <artifactId>aspectjweaver</artifactId>
    <version>1.9.4</version>
</dependency>

创建一个我们需要使用的的注解

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface ControlStrategy {
    String level() default "6";
    String threshold() default "";
    String strategy() default "";
    String key() default "";
    String switchKey() default "";
}

以及对应的切面类

@Component
@Aspect
public class ControlStrategyAspect {
}

简单说下这两部分

首先,注解部分上的元注解@Target({ElementType.METHOD, ElementType.TYPE}),表示当前注解可以作用于方法和接口或类上,@Retention(RetentionPolicy.RUNTIME)表示作用于运行时,@Inherited表示该注解对添加了该注解的类的子类同样生效

https://www.liaoxuefeng.com/wiki/1252599548343744/1265102803921888

ControlStrategyAspect类上添加了@Aspect注解,接下来在这里面完成相关逻辑。

切面类的实现

首先是切面类里的各式注解

@Pointcut(value = "execution(* com.cheungq.demo.controller.*.*(..))")
public void pointCut(){}

@Before("pointCut()")
@Order(1)
public void before(JoinPoint point){
    System.out.println("----before 1----");
}

@Before("pointCut()")
@Order(2)
public void beforeOrder2(JoinPoint point){
    System.out.println("----before 2----");
}

@After(value = "pointCut()")
public void after(){
    System.out.println("----after----");
}

@AfterReturning(value = "pointCut()")
public void afterReturning(){
    System.out.println("----afterReturning----");
}

@AfterThrowing(value = "pointCut()")
public void afterThrowing(){
    System.out.println("----afterThrowing----");
}

@SneakyThrows
@Around(value = "pointCut()")
public String around(ProceedingJoinPoint point){
    System.out.println("----around before proceed----");
    String str = (String) point.proceed();
    System.out.println("----around after proceed----");
    return str;
}

@Pointcut 切入点声明注解,以及所有的通知注解都可以通过 value 属性或者 pointcut 属性指定切入点表达式。切入点表达式通过 execution 函数匹配连接点,语法:

execution([方法修饰符] 返回类型 包名.类名.方法名(参数类型) [异常类型])
  • 访问修饰符可以省略;
  • 返回值类型、包名、类名、方法名可以使用星号*代表任意;
  • 包名与类名之间一个点.代表当前包下的类,两个点..表示当前包及其子包下的类;
  • 参数列表可以使用两个点..表示任意个数,任意类型的参数列表;

切入点表达式的写法比较灵活,比如:* 号表示任意一个,.. 表示任意多个,还可以使用 &&、||、! 进行逻辑运算,不过实际开发中通常用不到那么多花里胡哨的,掌握以下几种就基本够用了。

比如上面的@Pointcut(value = "execution(* com.cheungq.demo.controller.*.*(..))")表示com.cheungq.demo.controller包下的任意类中的任意方法,参数类型不限。具体的匹配规则可以自行另外搜索了解,包括@annotation()、@within()、@target()、@args()等多种方式。如果你的controller目录下内容比较混乱,不全都是Controller类,那么可以使用execution(* com.cheungq.demo.controller.*Controller.*(..))这样的方式来匹配,即任意以Controller字符为结尾的类

不过为了配合前面创建的注解这边需要修改下,使用注解类型的切入点定义

@Pointcut(value = "@annotation(com.cheungq.demo.annotation.ControlStrategy)")

当然如果两个文件在同一个层级下,也可以直接写

@Pointcut(value = "@annotation(ControlStrategy)")

当然也可以不定义Pointcut,而是直接写上对应的匹配规则,比如这样

@Before("@annotation(ControlStrategy)")

另外关于多个重复定义的问题,在上面的代码中我加入了@Order注解,我们找个方法添加上注解,并在方法中打印文本“ControllerFunction”,并执行下,可以看到如下输出内容

----around before proceed----
----before 1----
----before 2----
----ControllerFunction-----
----afterReturning----
----after----
----around after proceed----

如果在调用方法中抛出异常了,则

----around before proceed----
----before 1----
----before 2----
----ControllerFunction-----
----afterThrowing----
----after----

根据输出的内容,我们也就能够了解各自方法的执行顺序了

另外也可以换个方式来写,如下

@Before("@annotation(annotation)")
public void before(JoinPoint point, ControlStrategy annotation){
    System.out.println("----before----");
}

@annotation()中的值和 第二个注解参数的值相同,这样的好处是我们可以直接在其中方便的获取到注解内容。比如这样,我在对应方法上写了注解

@ControlStrategy(threshold = "threshold" , strategy = "strategy", key = "kkkey", switchKey = "switchKey")

此处打上断点之后可以可看到如下,获取到了在注解上定义的内容

当然,如果是原来那样的代码也是可以获取到注解内容的,相关代码

@Pointcut(value = "@annotation(ControlStrategy)")
public void pointCut(){}

@Before("pointCut()")
public void before(JoinPoint point){
    System.out.println("----before----");
    //获取切入的 Method
    MethodSignature joinPointObject = (MethodSignature) point.getSignature();
    Method method = joinPointObject.getMethod();
    boolean isControlStrategyAnnotation = method.isAnnotationPresent(ControlStrategy.class);
    ControlStrategy controlStrategy;
    if (isControlStrategyAnnotation) {
        controlStrategy = method.getAnnotation(ControlStrategy.class);
        System.out.println(controlStrategy.toString());
    } else {
        // 如果方法上没有注解,则搜索类上是否有注解
        controlStrategy = AnnotationUtils.findAnnotation(joinPointObject.getMethod().getDeclaringClass(), ControlStrategy.class);
        if (controlStrategy == null) {
            System.out.println("没有注解");
        }else{
            System.out.println(controlStrategy.toString());
        }
    }
}

基本逻辑就是,通过切入点找到当前对应的方法,再通过当前方法,查找对应的类上的注解内容,最终找到我们在注解中定义的内容,如图可以看到

因为这里是通过注解设置切入点的,所以这里必然能获取到注解内容。如果切入点是一开始所写的通过package的路径定义的

@Pointcut(value = "execution(* com.cheungq.demo.controller.*.*(..))")
public void pointCut(){}

那么就有可能会发生找不到注解的情况。

当然除了上面这种还有其他不少实现AOP的方式,比如实现org.aopalliance.intercept.MethodInterceptor接口,实现自己的invoke方法等,这里不另外再做展开介绍。

如何接入Redis

集群环境下,常用手段当然还是使用Redis来做限流的,那么下面第一步,配置Redis客户端。首先是maven包依赖,添加如下依赖,并选择自己合适的版本

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>3.3.0</version>
</dependency>
<dependency>
    <groupId>org.springframework.data</groupId>
    <artifactId>spring-data-redis</artifactId>
    <version>2.4.1</version>
</dependency>

配置redis服务服务相关参数,在application.yml中配置对应信息,如果你使用的是.properties配置文件,可以自行修改对应格式

spring:
  redis:
    host: 127.0.0.1
    port: 6379
    password:
    database: 0
    jedis:
      pool:
        max-idle: 0
        max-active: 32
        max-wait: 100
        min-idle: 4

添加RedisConfig配置,需要注意下StringRedisTemplate和RedisTemplate的区别,这里不多做说明,有兴趣可以另外自行查阅下

@Configuration
public class RedisConfig {

    @Bean
    public RedisConnectionFactory connectionFactory(@Value("${spring.redis.jedis.pool.max-idle}") Integer maxIdle,
                                                    @Value("${spring.redis.jedis.pool.max-active}") Integer maxActive,
                                                    @Value("${spring.redis.jedis.pool.max-wait}") Integer maxWait,
                                                    @Value("${spring.redis.jedis.pool.min-idle}") Integer minIdle,
                                                    @Value("${spring.redis.host}") String host,
                                                    @Value("${spring.redis.port}") Integer port,
                                                    @Value("${spring.redis.password}") String password,
                                                    @Value("${spring.redis.database}") Integer database) {
        RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();
        redisStandaloneConfiguration.setHostName(host);
        redisStandaloneConfiguration.setPort(port);
        redisStandaloneConfiguration.setDatabase(database);
        redisStandaloneConfiguration.setPassword(RedisPassword.of(password));
        // 连接池配置
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        poolConfig.setMaxIdle(maxIdle);
        poolConfig.setMaxTotal(maxActive);
        poolConfig.setMaxWaitMillis(maxWait);
        poolConfig.setMinIdle(minIdle);

        JedisClientConfiguration.JedisPoolingClientConfigurationBuilder jedisClientConfigurationBuilder = (JedisClientConfiguration.JedisPoolingClientConfigurationBuilder) JedisClientConfiguration.builder();
        jedisClientConfigurationBuilder.poolConfig(poolConfig);
        JedisClientConfiguration jedisClientConfiguration = jedisClientConfigurationBuilder.build();
        return new JedisConnectionFactory(redisStandaloneConfiguration, jedisClientConfiguration);
    }

    @Bean
    public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
        StringRedisTemplate stringRedisTemplate = new StringRedisTemplate();
        stringRedisTemplate.setConnectionFactory(redisConnectionFactory);
        return stringRedisTemplate;
    }
}

接下来,我们在controller中添加对应代码

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

以及在方法中调用

stringRedisTemplate.opsForValue().set("javaRedis","Hello!~~");

测试下,调用方法后,可以看到Redis中成功写入了当前信息,说明Redis客户端已经配置完成了。

基础准备已经完全就绪,接下来就是限流的相关实现了。

限流实现原理

限流的基本原理大家都知道的,基本的分为漏桶令牌桶两种方式。基本原理这边简单说两句,不做过多的深入说明。漏桶即预先设置一个固定容量的池子或容器,每次接到新的请求就往里加1,加满为止,如果容器中已满,则拒绝掉当前这次的网络请求,大多情况下需要配合队列类型的数据结构使用。令牌桶则相反,服务器以固定的速率生成令牌,不能是当前令牌总数超出预设数量值,每当有请求过来的时候则取出一个令牌,如果没能取到令牌则拒绝掉当前请求。

  • 令牌桶简单实现

我们以每秒为一个桶,根据当前秒获得一个redis的key,每当来一个请求之后,往桶中数量加1,如果结果数量超过了上限,则拒绝当前请求。

看代码,在原来的切面类中新增一个方法,作为漏桶的简单实现。

private boolean simpleTokenBucket(String key){
    //当前秒数
    long current = System.currentTimeMillis()/1000;
    key += ":"+current;
    if (!stringRedisTemplate.opsForValue().setIfAbsent(key,"1",60, TimeUnit.SECONDS)){
        //初始化key失败,说明key已经存在,则需要用+1来判断
        return stringRedisTemplate.opsForValue().increment(key,1) <= LIMIT_REQUEST_COUNT;
    }
    return true;
}

setIfAbsent方法可以保证如果当前key不存在则初始设置为1,同时给这个key设置超时时间60秒,这里是为了回头查看redis内容方便,时间才设置为了60秒,实际上的操作中可以设置为1-2秒超时即可。简单看下源码,通过打断点跟踪,我们可以看到setIfAbsent方法最终向Redis发送了如下命令

SET SimpleController:queryByPage:1678246024 1 nx ex 60

从 Redis 2.6.12 版本开始, SET 命令的行为可以通过一系列参数来修改

EX second :设置键的过期时间为 second 秒。 SET key value EX second 效果等同于 SETEX key second value 。
PX millisecond :设置键的过期时间为 millisecond 毫秒。 SET key value PX millisecond 效果等同于 PSETEX key millisecond value 。
NX :只在键不存在时,才对键进行设置操作。 SET key value NX 效果等同于 SETNX key value 。
XX :只在键已经存在时,才对键进行设置操作。

所以当想要执行将redis的值初始化为1的时候,如果失败,则说明已经有其他线程初始化过了,则需要再进行一次increment操作。为什么一开始要调用setIfAbsent初始化,因为每一秒都会生成一个新的RedisKey,时间长了之后会产生大量的Key需要及时清理,如果能让这个Key自动Expire就是最好的了,但是计数加1的方法increment并不能自带Expire参数,所以第一个数用setIfAbsent方法来初始化创建,同时带上Expire参数让其自动超时。

对于Key的部分,这里我们通过以下方法来构造Redis上使用的key。仅作为展示,实际开发中因为同名或者其他诸多原因,生成的Key会有同名的情况,后面我们在说不同方法使用同Key的时候会有相关内容提到。

String[] path = method.getDeclaringClass().getName().split("\\.");
String key = path[path.length-1]+":"+method.getName();

那么在执行中,我们可以拿到Key字符串为“SimpleController:queryByPage”,之后再在@Before注解方法中添加判断,如果调用了simpleLeakyBucket之后返回true,则继续执行,如果返回false则抛出异常不去执行Controller中的业务代码。

执行JMeter,跑个并发试试效果,设置200个线程并发,循环次数永远,结果过滤掉失败的,可以看到吞吐量在100个出头每秒,即每秒只有100个请求得到的正常响应处理。因为我们在上面的漏桶算法的方法中给LIMIT_REQUEST_COUNT这个上限值的赋值是100。

而在Redis中我们可以看到如下结果,可以看到当前的这一秒内接收到了1000个请求

虽然不重要,不过我们可以再稍微做一点点小小的调整优化,先查询一遍Redis内的值再做处理,顺便改下LIMIT_REQUEST_COUNT为150再验证下。执行一下JMeter看下结果,redis查询结果注意null值判断。

private boolean simpleTokenBucket(String key){
    //当前秒数
    long current = System.currentTimeMillis()/1000;
    key += ":"+current;
    String cnt = stringRedisTemplate.opsForValue().get(key);
    if (null != cnt && Long.parseLong(cnt) >= LIMIT_REQUEST_COUNT){
        return false;
    }
    if (!stringRedisTemplate.opsForValue().setIfAbsent(key,"1",60, TimeUnit.SECONDS)){
        //初始化key失败,说明key已经存在,则需要用+1来判断
        return stringRedisTemplate.opsForValue().increment(key,1) <= LIMIT_REQUEST_COUNT;
    }
    return true;
}

可以看到redis中的值就少了很多无意义的increase操作的结果了

而JMeter上测出来的并发结果也是在149/sec,说明限流150个每秒确实生效了

这里的把这个方案归为令牌桶是因为我们把每一个key上LIMIT_REQUEST_COUNT个请求限制当做是生成令牌的操作,如果不依赖多个key的生成只使用一个key的话,则需要另外有一个线程每隔一段时间将这个key的值设置为LIMIT_REQUEST_COUNT,同时每个请求过来对当前key执行decrease操作。这就需要额外的一个线程来定时发令牌,且另外一个问题,我们不方便扩展key,在这里的案例中,我们的key是根据类名、方法名动态生成的,如果需要额外线程定时来发令牌,则这个线程需要先感知的当前key的存在,实属有点点麻烦。不过这个方案还是有问题的

  1. 实际的限流要求情况会复杂很多,也不可能都是按照秒级限流,也可能是毫秒或者奇葩的非整数的情况,比如限制5秒内17个请求这种的。面对这种问题可以对时间取模,又或者把key的精度从秒再进一步到毫秒。这一个问题还不算难解决
  2. 再一个问题,这里的时间区间仍然是我们人为的划分的。这句话怎么理解,我来举个例子,假设我们限流要求是5秒内限制10个请求,按照当前的方案来说是会发生这样的情况的,即在0-3秒的时候,没有请求过来,第4秒的时候也就是第一个key的最后一秒之内来了10个请求,都能成功处理。之后第5秒的时候,也就是第二个key的时间区间内的第一秒同时又来了10个请求,但是因为此时落在了第二个区间上了,这10个请求依旧能够成功响应。而此时实际的情况是在第4到第5这两秒钟内一共处理了20个请求,并不符合一开始要求5秒内限制10个请求的要求。从全局整体上来看平均下来每一秒确实没有超过150个请求的上限,但是放到细节中,取时间段上的任意一个1秒钟时长的时间范围内,就不能保证不超过150个了。
  3. 还有一个问题,在上面的Redis的操作代码虽然每一个操作都是原子的,但是每一个指令之间依旧是相互独立的。也就是说虽然我第一次调用Redis查询当前数量结果是null,或者当前结果小于LIMIT_REQUEST_COUNT,但是当执行后面步骤setIfAbsent的时候这个key其实可能已经有值了,执行increment的时候key的值可能已经大于LIMIT_REQUEST_COUNT了。这也是为什么Redis对应key上的数量不等于LIMIT_REQUEST_COUNT的原因。这个问题可以通过Lua脚本来解决,将多个redis命令写成lua脚本让redis执行,整个脚本在redis内执行将是原子性的,不会被其他的redis指令中途插入执行。后面也分一段讲下,先往下看。
  • 漏桶实现

带着上面几个问题,我们再来看一段漏桶限流的实现,同样的我们在切面类中新增一个方法,在这个方法中使用到了Redis 的有序集合(sorted set)的数据结构

private boolean leakyBucket(String key){
    long current = System.currentTimeMillis();
    long before = current - (LIMIT_TIME_RANGE * 1000);
    ZSetOperations<String, String> operations = stringRedisTemplate.opsForZSet();
    Long cnt = operations.count(key,before,current);
    if (null != cnt && cnt >= LIMIT_REQUEST_COUNT){
        return false;
    }
    return operations.add(key, UUID.randomUUID().toString(),current);
}

有序集合(sorted set)是string类型元素的集合,且不允许重复的成员。不同的是每个元素都会关联一个double类型的分数。redis正是通过分数来为集合中的成员进行从小到大的排序。有序集合的成员是唯一的,但分数(score)却可以重复。而在有序集合中是根据分数从小到大排列的。

根据这一特性,我们给接收到的每一个请求分配一个UUID,并用当前的时间作为他的分数(score),于是我们就可以将所有的请求按照时间顺序在集合中排列。并调用redis的Zcount 指令(https://www.redis.net.cn/order/3611.html)根据时间区间参数得到指定时间区间内的请求数量集合。如果这个时间区间内的请求数量已经超过上限了,则拒绝请求,否则向集合中插入一条UUID和对应时间的作为score的记录。最终我们跑下JMeter,看下测试结果,并发可以限制在150左右

同时Redis的key上写入了3630条数据,JMeter的200个线程,每个线程100次请求一共是发出了20000个请求

然后就是说说这个方案的问题

  1. 这里的时间是通过System.currentTimeMillis()方法从JVM获取的,这依赖于本地机器的时间,集群环境下多个机器之间的时间可能有偏差,这可能会导致时间较快的机器能接收到的请求会比时间晚的机器多,甚至时间晚的机器根本没有机会有请求可以通过
  2. 即时我们假定问题1的缺点可以有办法解决,可以做到所有服务的时间都是完全同步的,但是从应用服务发送请求到Redis这之间的并发依然会有时间先后顺序,仍然可能导致早获得时间的应用服务的请求比晚获得时间的应用服务更晚连上Redis向漏桶添加数据。如果是单实例情况下,可以给当前漏桶方法上添加synchronized锁来解决这个问题。
  3. count调用获得了时间区间内的请求之后,如果判定没有超过限流上限,在调用add往集合写入数据之前的这段时间之内仍然会有其他线程发来请求往集合写入数据。此时有可能对应时间区间内的请求数量已经超过上限了。
  4. 对应集合Key中的数据随着时间的积累,数据量会越来越大,需要另外有线程来即时清理过早的数据。清理的线程可以尝试清理当前时间往前减5秒或者10秒之前的数据。这个时间间隔不能过短也不宜过长,假设清理前1秒之前的数据,有可能流量请求过来耗费时间过长,在count的时候发现对应时间区间内的请求记录已经被删除了。但是如果过长会导致Key中大量数据积压最终降低性能。

Lua脚本(一步一坑)

那么为了再解决上面遇到的一些问题,我们需要再借助Lua脚本来实现部分逻辑。Redis使用单个Lua解释器去运行所有脚本,并且Redis也保证脚本会以原子性(atomic)的方式执行。当某个脚本正在运行的时候,不会有其他脚本或 Redis命令被执行。这样我们把上面用的逻辑用Lua脚本来执行的时候,因为脚本本身是有原子性的,则可以解决掉之前遇到的大部分问题。

首先尝试写一个简单的Lua脚本测试运行下。在原来的Controller方法中添加如下代码片段

String luaScriptStr = "local c" +
        "\nc = redis.call('set',KEYS[1],ARGV[1])"+
        "\nreturn c;";
RedisScript<Number> script = new DefaultRedisScript<>(luaScriptStr);
List<String> keys = new ArrayList<>();
keys.add("luaTestKey");
stringRedisTemplate.execute(script,keys,"Hello Lua Redis,I'm CheungQ");

执行之后运行查看效果,确认Lua脚本正确执行了那么就开始用Lua脚本重写上面的逻辑。

现学现卖,之前也没太详细研究过Lua脚本写法

另外可以单独配置Lua脚本文件通过,DefaultRedisScript.setScriptSource方法指定脚本文件路径的方式引入,这样不用把Lua代码混杂在Java代码之中,看起来也更加整洁,需要注意的是execute方法的args参数默认应当使用String型的,即时是数字参数也应当转成String之后传入,否则会报出threw exception [Request processing failed; nested exception is java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String] with root cause的异常

DefaultRedisScript<Long> script = new DefaultRedisScript<>();
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("lua/leaky_bucket.lua")));
script.setResultType(Long.class);
List<String> keys = new ArrayList<>();
keys.add("ZSET_KEY");
Long number = stringRedisTemplate.execute(script,keys,"1000000","10",UUID.randomUUID().toString());

按照这样的思路,我们将上面的漏桶算法的逻辑用Lua脚本重写写一遍,如下

local t = redis.call('TIME')
local current = t[1] * 1000000 + t[2]
local cnt = redis.call('ZCOUNT', KEYS[1], current - tonumber(ARGV[1]), current)
if cnt >= tonumber(ARGV[2]) then
    return false
else
    return redis.call('ZADD', KEYS[1], ARGV[3], current)
end

redis.call(‘TIME’)获得的两个值,第一个字符串是当前时间(以 UNIX 时间戳格式表示),而第二个字符串是当前这一秒钟已经逝去的微秒数。所以整体转换成微秒需要将秒数乘1_000_000转换一下。一切看起来都很完美,启动执行,报异常

Write commands not allowed after non deterministic commands

翻译过来就是说, 写命令不被允许出现在非确定性命令的后面。这里又涉及到Redis命令的另一个知识点。 Redis的不同命令拥有不同的属性,如是否是只读命令,是否是管理员命令等,一个命令可以拥有多个属性,可以参看下https://blog.csdn.net/wtyvhreal/article/details/43193591这篇文章。直接原因就是当一个脚本执行了拥有REDIS_CMD_RANDOM属性的命令后,就不能执行拥有REDIS_CMD_WRITE属性的命令了。而在这里我获取时间调用的redis.call('TIME')就是其中之一。

而有这个规则的原因是在主从架构架构中,或者选用了AOF持久化方的情况下,当我们执行一条脚本的时候默认情况下, 整个Lua脚本的内容会进行复制。就比如我们这里写的例子, 我们的脚本包含TIME命令,获取系统时间, 这个命令在不同时间执行之后返回的值肯定不一样, 如果在主从复制和AOF追加时,直接复制整个脚本内容, 执行时没办法保证数据的一致性的。当然Redis的设计者也考虑到了这个问题,提供了redis.replicate_commands()这条指令,将这条指令添加到脚本之前,则可以切换到命令复制模式。在脚本第一次行执行这个函数,redis 会将修改数据的命令收集起来,然后用 MULTI/EXEC包裹起来,这种方式称为script effects replication,这个类似于 mysql 中的基于行的复制模式,将非纯函数的值计算出来,用来持久化和主从复制。

正常来说到这里应该就没啥问题了,但是,总是有但是的,添加了redis.replicate_commands()指令之后并没有解决问题,而是得到了一个新的异常信息

ERR Error running script (call to f_dda5d5cebde8b31f848df69c2a8fd4a01dd492f0):
@user_script:1: user_script:1: attempt to call field 'replicate_commands' (a nil value) ; nested exception is redis.clients.jedis.exceptions.JedisDataException: ERR Error running script (call to f_dda5d5cebde8b31f848df69c2a8fd4a01dd492f0): 
@user_script:1: user_script:1: attempt to call field 'replicate_commands' (a nil value) ] with root cause

很明显,我现在所使用的Redis版本并不支持这个功能(我司基于Redis2.x的某个版本自研开发的),这里只能退而求其次调用者传入时间戳来解决这个问题。

修改Lua脚本

local current = tonumber(ARGV[1])
local cnt = redis.call('ZCOUNT', KEYS[1], tonumber(ARGV[2]), current)
if cnt >= tonumber(ARGV[3]) then
    return false
else
    return redis.call('ZADD', KEYS[1], current, ARGV[4]) == 1
end

修改原来的漏桶方法

private boolean leakyBucket(String key) {
    DefaultRedisScript<Boolean> script = new DefaultRedisScript<>();
    script.setScriptSource(new ResourceScriptSource(new ClassPathResource("lua/leaky_bucket.lua")));
    script.setResultType(Boolean.class);
    List<String> keys = new ArrayList<>();
    keys.add(key);
    long current = System.currentTimeMillis();
    long before = current - (LIMIT_TIME_RANGE * 1000);
    return stringRedisTemplate.execute(script, keys,
            String.valueOf(current),
            String.valueOf(before),
            String.valueOf(LIMIT_REQUEST_COUNT),
            UUID.randomUUID().toString()
    );
}

执行JMeter测试,看下结果,最终稳定在170左右

再到redis的Key随便找一组时间区间数下,比如找到如下这么一条记录的时间,末尾是8001毫秒的,行号是22036

再继续往下翻,找比1秒内最后一条,也就是时间末尾是比9001小的数据中最大的那条,可以翻到下面这条数据,末尾数是8996,行号是22188

那么这一秒种之内总共通过的请求条数应当为22188-22036 + 1= 153,但是为什么还是会超出呢?对整个过程经过分析确认,依然还是因为带着不同微秒参数的redis请求到达redis的时间快慢不同,导致带着早些时间的请求比带着晚些时间的请求晚执行造成的。

简单举个例子,假设我限制了每5秒4个并发的请求,目前在第1、3、4、5、7秒上各有一个请求。

目前从任意一段上来看都是符合限流要求的,然而当此时来了一个新的请求,标记了自己的时间是第六秒的话就出现问题了。

当这个请求在Redis内进行处理的时候,不管是从第6秒往前看,还是往后看,都能保证5秒内最多有4个请求,但是如果不是从第6秒,而是从之前已经写进来的第7秒再往前看,此时最近5秒钟之内已经有了5个请求了。所以问题最终还是回到了,保证不同请求时间能够按照时间顺序有序到达Redis内处理校验,不解决这个问题的话只能是近似的保证限流效果

所以,我自己本地再部署一个3.2版本的Redis来测试验证下。还是使用上面之前写的Lua脚本的逻辑,在Lua脚本内获取当前时间,再次启动JMeter,200个线程,不限请求次数持续不断的请求了30000个请求之后,并发数的统计稳定在了149.x – 150.x之间

这就已经能精确的做到的任意时间区间内都能实现150个并发的限流效果了。同时对Lua脚本做了一点点修改

redis.replicate_commands()
local t = redis.call('TIME')
local current = t[1] * 1000000 + t[2]
local before = current - tonumber(ARGV[1])
redis.call('ZREMRANGEBYSCORE', KEYS[1], 0, before-1)
local cnt = redis.call('ZCOUNT', KEYS[1], before, current)
if cnt >= tonumber(ARGV[2]) then
    return false
else
    return redis.call('ZADD', KEYS[1], current, ARGV[3]) == 1
end

添加了ZREMRANGEBYSCORE语句,因为时间有序性得到了保证,超过当前时间之前的数据也已经完全没有保留的必要了。Redis中可以永远保证是最近的150个请求信息

再贴下Java部分

private static final long LIMIT_TIME_RANGE = 1;
private static final long LIMIT_REQUEST_COUNT = 150;

private boolean leakyBucket(String key) {
    DefaultRedisScript<Boolean> script = new DefaultRedisScript<>();
    script.setScriptSource(new ResourceScriptSource(new ClassPathResource("lua/leaky_bucket.lua")));
    script.setResultType(Boolean.class);
    List<String> keys = new ArrayList<>();
    keys.add(key);
    return stringRedisTemplate.execute(script, keys,
            String.valueOf(LIMIT_TIME_RANGE * 1000000),
            String.valueOf(LIMIT_REQUEST_COUNT),
            UUID.randomUUID().toString()
    );
}

使用

文章最后,我们再回到一开始写AOP的时候创建的注解类上看下,在这个类中我定义了几个属性

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface ControlStrategy {
    String level() default "6";
    String threshold() default "";
    String strategy() default "";
    String key() default "";
    String switchKey() default "";
}

这样,在具体开发的时候可以根据在注解中填写的属性不同实现不同的具体控制的功能

  1. switchKey:我们可以在不同的使用注解的地方定义同一个或者多组switchKey的值,在切面类中根据当前调用方法注解上的switchKey到自己系统中的配置项或者字典管理模块之类的地方去获取判断当前接口是否需要限流,从而实现所有加了注解的接口限流开关操作,或者根据类别分组实现开关
  2. key:在注解中手动定义限流操作在Redis上使用的Key的值,这样可以实现根据业务或者系统模块区别多个方法接口使用同一个key,统一合并限流的效果。例如某商品列表、商品搜索、商品详情等相关接口统一使用一个key
  3. threshold:类似于我在上面代码中所写的LIMIT_REQUEST_COUNT值,用来控制请求量达到的上限。
  4. strategy&level:策略类型和接口等级。这边用来预留做一些定制特性的效果,同样的结合系统的配置项模块自定义一些效果。例如要在某个特定时间内,对某些特定level的接口进行限流,一般情况下可以默认不管即可。

文末补充:

需要注意的是,这里所说的限流是指限制一定时间内能够接入的请求的数量,这是不同于同一时间内在处理的请求的数量是两个不同的概念。如果说想要实现后者这种控制任意时刻在JVM中执行的线程的数量不能超过某个上限的话,在切面类中应当是这样写的

@SneakyThrows
@Around(value = "pointCut()")
public Object around(ProceedingJoinPoint joinPoint) {
    MethodSignature joinPointObject = (MethodSignature) joinPoint.getSignature();
    Method method = joinPointObject.getMethod();
    String[] path = method.getDeclaringClass().getName().split("\\.");
    String key = path[path.length - 1] + ":" + method.getName();
    if (stringRedisTemplate.opsForValue().increment(key) > LIMIT_REQUEST_COUNT){
        stringRedisTemplate.opsForValue().decrement(key);
        return "too many connections";
    }
    try {
        return joinPoint.proceed();
    }finally {
        stringRedisTemplate.opsForValue().decrement(key);
    }
}

先给对应key尝试加1,并根据返回的结果判断有多少正在执行的线程,如果超过上限了,则再次请求redis,减掉自己刚刚加的1。如果没有超过上限,则调用proceed()执行业务代码。执行完成后,无论是正常执行结束还是中途抛异常中断结束了,都应当在后面的finally的代码块中请求redis减去自己一开始加上的1。