月度归档: 2023年2月

Redis结合AOP实现限流注解的开发

有点长。本来应该分3个部分分别是AOP、Redis配置、限流原理及实现来写的,写的时候没收住,一股脑全顺着写下来了

什么是 AOP

(1)面向切面编程(方面),利用 AOP 可以对业务逻辑的各个部分进行隔离,从而使得业务逻辑各部分之间的耦合度降低,提高程序的可重用性,同时提高了开发的效率。

(2)通俗描述:不通过修改源代码方式,在主干功能里面添加新功能

准备工作

详细的原理、底层实现的内容这里不多提,只谈实际使用。

添加maven依赖,具体version需要自己判断,我这边本地起的应用的SpringBoot2.4的

<dependency>
    <groupId>org.aspectj</groupId>
    <artifactId>aspectjrt</artifactId>
    <version>1.9.4</version>
</dependency>
<dependency>
    <groupId>org.aspectj</groupId>
    <artifactId>aspectjweaver</artifactId>
    <version>1.9.4</version>
</dependency>

创建一个我们需要使用的的注解

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface ControlStrategy {
    String level() default "6";
    String threshold() default "";
    String strategy() default "";
    String key() default "";
    String switchKey() default "";
}

以及对应的切面类

@Component
@Aspect
public class ControlStrategyAspect {
}

简单说下这两部分

首先,注解部分上的元注解@Target({ElementType.METHOD, ElementType.TYPE}),表示当前注解可以作用于方法和接口或类上,@Retention(RetentionPolicy.RUNTIME)表示作用于运行时,@Inherited表示该注解对添加了该注解的类的子类同样生效

Continue reading

一次Mysql使用order by和limit分页查询的陈年旧坑

  • 起因背景

本次开发是在一个旧功能的增加新的逻辑,其中有这么一个消息重发的模块。在正常的Kafka消息推送过程中,因为异常、数据条件暂时无法确定是否能够下发的情况下,把这条消息暂时存到一张Mysql表中,并使用定时任务每隔一段时间扫一下这张表,把消息重新尝试下发Kafka。如果发送失败,则把这条消息的重试次数加1。如果发送成功,则把这条消息的从重试表中删除。

  • 问题现象

因为这是一个上线了几年的功能了,且这次开发的需求是在下发的时候进行的调整,所以一开始对这块并没有太多在意。直到昨天测试同学给我讲了他发现的一个现象,执行一次重试任务之后,有些数据的重试次数加了不止1次,但是有些数据的重试次数却没有增加。那么疑点就来了,根据测试提供的充实任务入口名称,我找到了这段已经在生产环境跑了好几年的代码,初步看了一遍基本定位到问题所在。

基本逻辑是这样的

  1. 根据分页参数分页从Mysql捞取待重试数据,需要查询已重试次数小于一个设定值的,比如5次
  2. 将待重试数据调用推送Kafka方法尝试进行推送,这之中包括一系列验证逻辑
  3. 推送成功的数据从表中清除
  4. 推送失败的数据在表中更新重试次数加1
Continue reading

ElasticSearch聚合查询使用过程中遇到的一点问题

再续一下前面两篇相关的内容,ElasticSearch 分页查询方法from&size,scroll,及search_after浅析es聚合导出(不是),Map#merge方法使用

在开发那个聚合导出功能的时候,最先考虑到的其实不是scroll扫数据并内存计算,之前了解过ElasticSeach是可以支持聚合计算查询的,但是没有实际在大批量数据的索引中使用过,所以再开发功能的设计阶段进行了测试之后发现并不能满足我们功能开发的需求,遂放弃了。网上使用aggs进行ElasticSearch聚合查询的介绍文章也很多,于是写下本文做一点记录,和给别人提供一点掉坑提醒

官方文档地址:https://www.elastic.co/guide/cn/elasticsearch/guide/current/aggregations.html

详尽的使用方式你也可以再这里面找到

下面是坑记录

  • 字段类型选择

因为开发不规范、对ElasticSearch不了解等原因,原本功能开发的时候建的ES索引中对应订单价格、运费等字段的字段类型全部都是keyword类型的。而这次要做聚合统计的话,要对价格字段进行聚合统计执行类似如下语句的时候

GET /your_es_index/your_es_type/_search
{
  "size":0,
  "aggs":{
    "sum_claims":{
      "sum":{
        "field":"cost_of_claims"
      }
    }
  }
}

会得到一个这样的错误信息

Expected numeric type on field [cost_of_claims], but got [keyword]"}

基本意思就是cost_of_claims字段类型为keyword,但是进行聚合统计运算的时候期望cost_of_claims字段为numeric型的。也就是说明ES的聚合统计运算只能作用于numeric型字段上。那么遇到问题解决问题,接下来就是想要尝试将cost_of_claims变更为numeric型字段这件事上。

在进行avg和sum等Metrics Aggregations(度量聚合)的时候报错的,进行数学维度的计算,期望得到一个数字字段,但是得到的是一个String的keyword类型,若把cost_of_claims.keyword换成cost_of_claims,则又会报另一个错误。则需要在index数据之前进行mapping配置,将该字段的值定义为numeric,或者开启mapping的Numeric detection(数字探测)

但是这时候遇到个问题,在es中,是不支持更改现有字段的映射或字段类型的,如果我们非得需要更改字段的类型,怎么办,数据迁移,重建索引,建立我们想要的正确的映射规则。接下来新建一个相同结构的索引,对应字段修改为numeric的类型,然后把老索引的数据reindex过来。同时,reindex之后把老索引删除,并给新索引添加一个别名和老索引的名字一样,这样就可以完整切换过来,且不用修改现有业务代码中和老索引相关的内容。

Continue reading

es聚合导出(不是),Map#merge方法使用

水一篇,之前写了es分页查询的几种方法,ElasticSearch 分页查询方法from&size,scroll,及search_after浅析,起因是1月份的需求要做一个ES索引内数据聚合导出的需求。

这个ES索引每天有100万左右新增数据,而需求是每月根据不同的条件做聚合统计,并导出数据到csv文件。之前的同事做过一个类似的功能,但是性能很差,基本思路如下,可以简单分析下。

  1. java的job接收导出查询的参数,创建导出执行任务,启动两个线程,一个es查询线程,一个写csv线程
  2. java调用EsClient,进行search_after查询,每一页查询的结果保存到java内存中,并判断累计的结果数量,大于一个设定的数值之后则根据导出时候设定的参数进行聚合,保存到一个Map中
  3. 分页查询结束后,写csv线程读取聚合后的结果写入csv文件

首先,启动两个线程,一个读,一个等待读之后写csv这个其实是不合理的,在所有数据聚合完成之前,写线程就一直处于等待状态。但是这个模式我改动不了,这是目前项目代码里集成的导出任务jar包里的内容,我没法动它,其他很多导出功能也用的这一套代码,所以这边就只能先这样。(也不是说jar包里的逻辑我在外面就没法修改)

接下来,现有的功能是用的search_after查询的,排序字段是子订单id,是无序的。所以只能支持单线程操作,即只能用一个读线程(ReadThread),一页一页的往后翻页查询。且原来的逻辑中有点bug,大于一个设定的数值之后则根据导出时候设定的参数进行聚合。假定这个数值为5万,那么当不断地翻页某次聚合之后总结果数已经大于5万了,那么之后每次进行翻页查询都要把所有数据重新聚合一遍。最终实测,某次导出了300万数据进行聚合,导出任务总共执行超过了6个小时。

根据以上情况,我重新设计了这次的导出功能。

Continue reading