Page 2 of 61

ElasticSearch数据迁移reindex命令

起因

一次需求中涉及两个索引,其中一个是本次新建的索引,需要将原始ES索引中的部分数据导入到当前新建的索引中。

分析

对于新所以的数据初始化有两个思路,因为我们的数据都是在HIVE中清洗完成之后再打到ElasticSearch的,所以可以从HIVE中捞取对应的数据重新打到ElasticSearch即可。但是这里有一个局限性的问题,因为HIVE表数量巨大,我们只保留的近7天的清洗结果,如果按照需求的时间跨度则需要重新清洗近3个月的数据,之后再将数据打到ES。

但是这样的话又会有另外一个问题,HIVE表数据在清洗过程中使用到的数据是会经常变动的,比如某个商品最近几天是挂靠在A品类下,但是上个月因为做活动是挂靠在B品类下的,这就导致事后隔了一段时间之后重新清洗的结果无法反应出当时这条数据正确的关联关系的情况。如果产品产品认可当前方案,那么也不失为可行方案之一。

那么,既然我们在原始的索引中有这部分数据,所以自然使用reindex命令来完成这一操作就最为方便了

代码

POST _reindex?wait_for_completion=false&slices=5&refresh
{
  "source": {
    "index": "SOURCE_ES_INDEX",
    "type": "SOURCE_ES_TYPE",
    "query": {
      "range": {
        "last_modify_time": {
          "gte": "2023-01-01 00:00:00",
          "lte": "2023-07-24 23:59:59"
        }
      }
    }
  },
  "dest": {
    "index": "TARGET_ES_INDEX",
    "type": "TARGET_ES_TYPE"
  }
}

简单说下这条语句,公司生产环境使用的ES版本为5.4.2的,所以除了index参数外,还需要有对应的type参数,但是Elasticsearch在 7.X版本中去除了type的概念,所以如果你的ES版本较新的话,可以不用这个type参数,即:

Continue reading

基于AOP的Redis缓存注解功能开发设计[3],缓存更新、删除、过期

前文

在完成了前面两篇文章(基于AOP的Redis缓存注解功能开发设计[2],JexlEngine自定义缓存Key)中的基础功能的开发之后,我们可以再进一步完成缓存更新、缓存删除、缓存过期这几个接口。从而适应在具体的业务开发过程中遇到的各种对缓存数据操作的需求。

因为前面对具体的Redis的缓存的操作和实现逻辑已经做了详细说明,所以在这三个功能注解的开发过程中只做简单的逻辑阐述,不在细究每一个方法的具体含义

缓存更新

缓存更新的意思就是,当我们在更新数据库中的某条数据之后,我们需要另外也把Redis中作为缓存的数据也同步时更新下,这里其实就会涉及到我们之前提到的缓存一致性问题,这个话题要讲的话可以展开来讲很长一段,我们不在这里做过多讨论,可以自行再去搜索一番。

我在这里需要提供一个新的注解@RedisPut,和PointCut来实现我们的功能即可,代码直接加到原来的切面类中。切面类中需要实现的逻辑就是根据方法执行的结果,直接更新Redis中的缓存数据,和之前@RedisCache的区别就是这里不需要去Redis中判断是否已经存在缓存数据。

Continue reading

基于AOP的Redis缓存注解功能开发设计[2],JexlEngine自定义缓存Key

那么在经历了前面两片文章《基于AOP的Redis缓存注解功能开发设计》和《JexlEngine表达式引擎的简单使用》之后,我们便可以将JexlEngine表达式引擎在AOP缓存中使用起来,通过表达式引擎的定义来自定义每个缓存的Key或者每组类型的缓存的Key,从而达到在不同的代码逻辑中增删改有相同组别或者类型关联的缓存数据的需求

如何指定生成Key规则

在之前的Redis缓存注解一文中,我们定义的注解中有个Key变量

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface RedisCache {
    String value();

    String key() default "";
}

原本我们在使用的时候,value()用来指定在Redis中的Key,而hashKey则用注解作用在的方法的所有参数组合之后的Array来担当。现在我们把注解中的key()用上,用来填写对应的hashKey的生成规则,填写的内容则是一个JexlEngine的表达式。下面,写个Demo。

@RedisCache(value = "dict_cache", key = "paramStr+'_'+paramInteger+'_'+chars[0]+'_'+paramInt+'_'+paramDto.getCodeParam()")
public Result<DictionaryInfo> cacheParamExample(String paramStr, Integer paramInteger, ParamDto paramDto, char[] chars, int paramInt, Integer integer, double v){
    DictionaryInfo dictionaryInfo = new DictionaryInfo();
    dictionaryInfo.setDictName("DIC");
    dictionaryInfo.setDictCode("CODE");
    return new Result<>(dictionaryInfo);
}

表达式写得有点夸张,不过主要是做个展示,这里也不会用到太复杂的JexlEngine表达式,其根本在于用几个给予的对象拼接一个字符串,不会涉及到JexlEngine提供的各种复杂的功能。这边更新下之前文章中切面类RedisCacheAspect的代码

Continue reading

ElasticSearch Nested数组嵌套对象的更新插入操作

在ES的日常使用中,需要使用到Nested结构存储数个同级的子节点数据,例如一条主订单下的N条子订单的数据。

新增更新操作

现在,假设我们在ES中有这样一条数据

PUT /celebrities/_doc/114
{
    "user" : "Kun",
    "post_date" : "2009-11-15T14:12:12",
    "message" : "Idol who has been practicing for two and a half years",
    "skills":[
        {
            "name":"sing",
            "skill_level":"A"
        },
        {
            "name":"jump",
            "skill_level":"S"
        },
        {
            "name":"rap",
            "skill_level":"SS"
        }
    ]
}

我们需要往skills的Nested数组中添加一个新的节点,节点的name为“consecutive five whips”,则可以这么写

POST /celebrities/_doc/114
{
    "script": {
        "source": "if (ctx._source.skills == null) {List ls = new ArrayList();ls.add(params.skill);ctx._source.skills = ls;} else {ctx._source.skills.add(params.skill);}",
        "lang": "painless",
        "params": {
            "skill": {
                "name": "consecutive five whips",
                "skill_level": "SSS"
            }
        }
    }
}

得到返回结果,表明执行成功

{
    "_index": "celebrities",
    "_id": "114",
    "_version": 6,
    "result": "updated",
    "_shards": {
        "total": 2,
        "successful": 1,
        "failed": 0
    },
    "_seq_no": 6,
    "_primary_term": 1
}

通过代码可以简单的看出逻辑,如果_source.skills为null,则创建一个新的ArrayList,并将参数中的skill节点的内容放进去,最后赋值给_source.skills,反之如果不为null的话,则直接往里加入当前的skill字段的内容。

if (ctx._source.skills == null) {
    List ls = new ArrayList();
    ls.add(params.skill);
    ctx._source.skills = ls;
} else {
    ctx._source.skills.add(params.skill);
}

要先判断是否为null,否则无法调用add方法,并会抛出一个异常

"caused_by": {
                "type": "null_pointer_exception",
                "reason": "cannot access method/field [add] from a null def reference"
}
Continue reading

ElasticSearch使用_delete_by_query删除大批数据,及409 Conflict版本冲突问题处理

起因

生产环境一个最近新上了一个清理ES历史数据的定时任务,执行_delete_by_query语句。在检查系统异常日志的时候,发现这块功能报了数个异常,elastic: Error 409 (Conflict)。于是带着这个这个问题展开探索一番

代码中使用的方法就是最基本DSL根据查询条件进行删除的语句

POST /ES_INDEX/_delete_by_query
{
  "query": {
    "term": {
      "data_date": "${dataDate}"
    }
  }
}

在执行_delete_by_query期间,为了删除匹配到的所有文档,多个搜索请求是按顺序执行的。每次找到一批文档时,将会执行相应的批处理请求来删除找到的全部文档。如果搜索或者批处理请求被拒绝,_delete_by_query根据默认策略对被拒绝的请求进行重试(最多10次)。达到最大重试次数后,会造成_delete_by_query请求中止,并且会在failures字段中响应 所有的故障。已经删除的仍会执行。换句话说,该过程没有回滚,只有中断。

原因

Elasticsearch 会在删除文档前使用查询时获取的 internal 版本号和当前文档版本号进行对比。如果在查询和删除操作之间文档发生更改,则会导致版本冲突并且删除操作失败,报 elastic: Error 409 (Conflict) 错误。

Continue reading

JexlEngine表达式引擎的简单使用

JEXL 是一个表达式语言引擎,可以在应用程序或框架中实现动态和脚本功能。先写一个简单的例子,最直观的感受下。

依赖

首先我们是引入依赖包,添加pom文件中的dependency

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-jexl3</artifactId>
    <version>3.1</version>
</dependency>

基础用法

再新建一个测试用的对象类

public class TestJexlObj {
    private int numA;
    private int numB;

    //get.set方法省略
}

之后我们来写一份测试逻辑用的代码

@Test
public void test() {
    TestJexlObj testObj = new TestJexlObj();
    testObj.setNumA(12);
    testObj.setNumB(9);
    JexlEngine jexl = new Engine();
    JexlExpression expression = jexl.createExpression("'numA is '+testObj.getNumA()+' numB is '+testObj.getNumB()");
    JexlContext jc = new MapContext();
    jc.set("testObj", testObj);
    String str = String.valueOf(expression.evaluate(jc));
    System.out.println(str);
    JexlExpression expression2 = jexl.createExpression("'numA + numB = '+(testObj.getNumA()+testObj.getNumB())");
    String str2 = String.valueOf(expression2.evaluate(jc));
    System.out.println(str2);
}

执行后我们可以看到如下结果

以及一份多行构建java对象的表达式作为Script执行的示例代码

Continue reading

基于AOP的Redis缓存注解功能开发设计

在完成的前面的《Redis结合AOP实现限流注解的开发》之后,文中提到了一个配置管理或者字典管理的功能模块,作为一个对外提供快速查询获取配置信息的功能模块,其数据信息本身可以保存的Mysql数据库中。但如果每次查询都要从Mysql中获取数据则又非常影响接口效率和速度,故需要将对应信息在Redis中做一份缓存,帮助快速查询获取。

基础前置

需要的基础前置知识包括《Redis结合AOP实现限流注解的开发》AOP相关,《Java对象序列化常用操作-Protostuff》Java对象的序列化和反序列化,可以前往对应文章查看。

基础思路是这样的

首先我们需要定一个作用于方法上的注解比如@RedisCache(题外话,Spring自身也也有个类似功能的注解@Cacheable,回头可以再开一篇写下相关的使用方法,本文着重是自己开发一套这样的功能)。之后通过一个RedisCacheAspect类,使用@Around注解实现对添加了@RedisCache注解的方法的代理,并将方法的入参,执行结果序列化之后保存到Redis的一个Hash表中去。当下次请求过来的时候,先根据请求参数去Redis的Hash表中查询一下,如果存在,则将查到的信息反序列化为所要查询的对象直接返回给方法调用者。

代码实现

首先是注解类,我们需要新建一个这样的RedisCache注解类

/**
 * @author CheungQ
 */
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface RedisCache {
    String value();

    String key() default "";
}

之后,我们在自行实现一个切面类,用来定义切面,并实现切面上的功能。使用@Around注解,并在@RedisCache注解作用的方法前查询Redis中是否有缓存,如果有,则返回结果,如果没有则执行@RedisCache作用的方法,得到方法执行结果,并将结果缓存到Redis,待下次请求的时候查询获取。基本逻辑就是这样,具体的可以看下代码中的实现,这里用到了上面提到的前几篇文章中的相关技术细节。代码如下

Continue reading