ElasticSearch使用script更新文档失败问题排查

起因

起因是一个批量根据es文档id更新指定字段的功能,经过上线后使用反馈,经常性的偶发文档无法更新的情况

处理

找到相关代码,自行写个demo代码批量跑下试下,原来的代码大概示意如下

public String len10() throws Exception{
    Random random = new Random();
    String[] ls = new String[500];
    for (int i = 0; i < 500; i++) {
        int finalI = i;
        Runnable callable = new Runnable() {
            @Override
            public void run() {
                String str = String.valueOf(random.nextInt(40)+10);
                String str2 = String.valueOf(random.nextInt(40)+10);
                String str3 = String.valueOf(random.nextInt(40)+10);
                String action = "/XXXXXXX_index/XXXXXXX_type/_update_by_query";
                String dateTime = "20"+str+"-01-12 23:"+str2+":"+str3;
                String script = "{\n" +
                        "  \"script\": {\n" +
                        "    \"inline\": \"ctx._source.modify_time='"+dateTime+"'\"\n" +
                        "  },\n" +
                        "  \"query\": {\n" +
                        "    \"bool\": {\n" +
                        "      \"filter\": [{\n" +
                        "        \"term\": {\n" +
                        "          \"id\": \"2\"\n" +
                        "        }\n" +
                        "      }]\n" +
                        "    }\n" +
                        "  }\n" +
                        "}"
                        ;
                try {
                    String post = esRestClient.performRequest("POST", action, script);
                    System.out.println(post);
                    ls[finalI] = post;
                } catch (IOException e) {
                    e.printStackTrace();
                    ls[finalI] = e.getMessage();
                }
            }
        };
        Thread thread = new Thread( callable);
        thread.start();
    }
    Thread.sleep(50000);
    return "";
}

大意就是起500个线程,更新索引中指定文档id为2的文档的modify_time字段,通过script来更新。

执行之后其实就可以看到大量异常信息了

Continue reading

从一个最基础的生产者消费者模型开始构建离线下载服务

基础版生产者消费者模型

这里我们构建一个最基本的生产者消费者模型,多个Producer线程往队列中写入数据,一个Consumer线程从队列中按顺序取出数据。基本代码如下

public class Consumer extends Thread {

    private final AtomicInteger producerCnt;

    private ArrayBlockingQueue<String> queue;

    @Override
    public void run() {
        try {
            int cntVal = 0;
            while (producerCnt.intValue() != 0 || !queue.isEmpty()) {
                String pollVal = queue.poll(1, TimeUnit.SECONDS);
                System.out.println("poll a value: "+pollVal);
                cntVal++;
            }
            System.out.println("finished, total count: "+cntVal);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public Consumer(ArrayBlockingQueue<String> queue, AtomicInteger cnt) {
        this.queue = queue;
        this.producerCnt = cnt;
    }
}
public class Producer extends Thread {

    private ArrayBlockingQueue<String> queue;

    private AtomicInteger producerCnt;

    @Override
    public void run() {
        String threadName = Thread.currentThread().getName();
        try {
            for (int i = 0; i < 100; i++) {
                queue.put(threadName + "     " + i);
            }
            System.out.println("Producer finished:"+threadName);
            producerCnt.decrementAndGet();
        } catch (InterruptedException e) {
            e.printStackTrace();
            producerCnt.decrementAndGet();
        }
    }

    public Producer(ArrayBlockingQueue<String> queue, AtomicInteger cnt) {
        this.queue = queue;
        this.producerCnt = cnt;
    }
}
Continue reading

最近写HIVE SQL的一点笔记【4】

分组求金额占比,以如下数据为例

tid为主订单号,oid为子订单号,每个子订单有自己的订单金额,字段为fee。那么我们要求出每个子单的金额在主单的总金额的占比。

可以使用 OVER开窗函数 配合SUM求和函数来处理

SUM(FEE) OVER (PARTITION BY TID) AS SUM_FEE

来根据TID求的每个主订单的金额总和,之后再用各自的子订单金额除以主订单金额得到占比就行

SELECT TID,
       OID,
       FEE,
       SUM(FEE) OVER (PARTITION BY TID) AS SUM_FEE,
       ROUND(FEE/SUM(FEE)) OVER (PARTITION BY TID) AS FEE_RATIO
FROM (
         SELECT '101' as tid, '10001' as oid, 33.20 as fee
         UNION ALL
         SELECT '101' as tid, '10002' as oid, 13.65 as fee
         UNION ALL
         SELECT '101' as tid, '10003' as oid, 16.10 as fee
         UNION ALL
         SELECT '201' as tid, '20001' as oid, 6.70 as fee
         UNION ALL
         SELECT '201' as tid, '20002' as oid, 1.21 as fee
         UNION ALL
         SELECT '301' as tid, '30001' as oid, 118.22 as fee
         UNION ALL
         SELECT '401' as tid, '40001' as oid, 208.03 as fee
         UNION ALL
         SELECT '401' as tid, '40002' as oid, 119.90 as fee
         UNION ALL
         SELECT '401' as tid, '40003' as oid, 5.50 as fee
         UNION ALL
         SELECT '401' as tid, '40004' as oid, 24.80 as fee
     ) A

其他可以配合OVER开窗函数使用的函数有 AVG,MIN,MAX

Continue reading

最近写HIVE SQL的一点笔记【3】

一个最近的开发需求,导出某ES索引上某嵌套字段值

promotion_details.promotion_name

符合以“官方立减”文字结尾的所有数据导出,因为es索引的结构的关系,目前es索引结构的原因,数据需要处理下之后再导出,目前es索引数据的结构大致如下

{
  "_index": "my_elasticsearch_index",
  "_type": "order",
  "_id": "3602478673110456764",
  "_version": 1699331966001,
  "found": true,
  "_source": {
    "adjust_fee": "0.00",
    "alipay_point": "0",
    "available_confirm_fee": "0.00",
    "buyer_alipay_no": "****",
    ......
    "oms_orders": [
      {
        "cart_item_no": "3602478673111456764",
        "order_item_id": "210100024285392788",
        ......
      },
      ......
    ],
    "promotion_details": [
      {
        "promotion_name": "2023天猫双11抢先购官方立减",
        ......
      },
    ],
    "sub_orders": [
      {
        "oid": "3602478673111456764",
        ......
      },
      ......
    ],
    "tid": "3602478673110456764",
    "trade_from": "WAP,WAP",
    "type": "fixed",
    ......
  }
}

tid为主订单单号、sub_orders.oid为子单单号、oms_orders.cart_item_no对应等于sub_orders.oid,有一条oid子单记录就对应有一条cart_item_no记录。

Continue reading

ElasticSearch数据迁移reindex命令

起因

一次需求中涉及两个索引,其中一个是本次新建的索引,需要将原始ES索引中的部分数据导入到当前新建的索引中。

分析

对于新所以的数据初始化有两个思路,因为我们的数据都是在HIVE中清洗完成之后再打到ElasticSearch的,所以可以从HIVE中捞取对应的数据重新打到ElasticSearch即可。但是这里有一个局限性的问题,因为HIVE表数量巨大,我们只保留的近7天的清洗结果,如果按照需求的时间跨度则需要重新清洗近3个月的数据,之后再将数据打到ES。

但是这样的话又会有另外一个问题,HIVE表数据在清洗过程中使用到的数据是会经常变动的,比如某个商品最近几天是挂靠在A品类下,但是上个月因为做活动是挂靠在B品类下的,这就导致事后隔了一段时间之后重新清洗的结果无法反应出当时这条数据正确的关联关系的情况。如果产品产品认可当前方案,那么也不失为可行方案之一。

那么,既然我们在原始的索引中有这部分数据,所以自然使用reindex命令来完成这一操作就最为方便了

代码

POST _reindex?wait_for_completion=false&slices=5&refresh
{
  "source": {
    "index": "SOURCE_ES_INDEX",
    "type": "SOURCE_ES_TYPE",
    "query": {
      "range": {
        "last_modify_time": {
          "gte": "2023-01-01 00:00:00",
          "lte": "2023-07-24 23:59:59"
        }
      }
    }
  },
  "dest": {
    "index": "TARGET_ES_INDEX",
    "type": "TARGET_ES_TYPE"
  }
}

简单说下这条语句,公司生产环境使用的ES版本为5.4.2的,所以除了index参数外,还需要有对应的type参数,但是Elasticsearch在 7.X版本中去除了type的概念,所以如果你的ES版本较新的话,可以不用这个type参数,即:

Continue reading

基于AOP的Redis缓存注解功能开发设计[3],缓存更新、删除、过期

前文

在完成了前面两篇文章(基于AOP的Redis缓存注解功能开发设计[2],JexlEngine自定义缓存Key)中的基础功能的开发之后,我们可以再进一步完成缓存更新、缓存删除、缓存过期这几个接口。从而适应在具体的业务开发过程中遇到的各种对缓存数据操作的需求。

因为前面对具体的Redis的缓存的操作和实现逻辑已经做了详细说明,所以在这三个功能注解的开发过程中只做简单的逻辑阐述,不在细究每一个方法的具体含义

缓存更新

缓存更新的意思就是,当我们在更新数据库中的某条数据之后,我们需要另外也把Redis中作为缓存的数据也同步时更新下,这里其实就会涉及到我们之前提到的缓存一致性问题,这个话题要讲的话可以展开来讲很长一段,我们不在这里做过多讨论,可以自行再去搜索一番。

我在这里需要提供一个新的注解@RedisPut,和PointCut来实现我们的功能即可,代码直接加到原来的切面类中。切面类中需要实现的逻辑就是根据方法执行的结果,直接更新Redis中的缓存数据,和之前@RedisCache的区别就是这里不需要去Redis中判断是否已经存在缓存数据。

Continue reading

基于AOP的Redis缓存注解功能开发设计[2],JexlEngine自定义缓存Key

那么在经历了前面两片文章《基于AOP的Redis缓存注解功能开发设计》和《JexlEngine表达式引擎的简单使用》之后,我们便可以将JexlEngine表达式引擎在AOP缓存中使用起来,通过表达式引擎的定义来自定义每个缓存的Key或者每组类型的缓存的Key,从而达到在不同的代码逻辑中增删改有相同组别或者类型关联的缓存数据的需求

如何指定生成Key规则

在之前的Redis缓存注解一文中,我们定义的注解中有个Key变量

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface RedisCache {
    String value();

    String key() default "";
}

原本我们在使用的时候,value()用来指定在Redis中的Key,而hashKey则用注解作用在的方法的所有参数组合之后的Array来担当。现在我们把注解中的key()用上,用来填写对应的hashKey的生成规则,填写的内容则是一个JexlEngine的表达式。下面,写个Demo。

@RedisCache(value = "dict_cache", key = "paramStr+'_'+paramInteger+'_'+chars[0]+'_'+paramInt+'_'+paramDto.getCodeParam()")
public Result<DictionaryInfo> cacheParamExample(String paramStr, Integer paramInteger, ParamDto paramDto, char[] chars, int paramInt, Integer integer, double v){
    DictionaryInfo dictionaryInfo = new DictionaryInfo();
    dictionaryInfo.setDictName("DIC");
    dictionaryInfo.setDictCode("CODE");
    return new Result<>(dictionaryInfo);
}

表达式写得有点夸张,不过主要是做个展示,这里也不会用到太复杂的JexlEngine表达式,其根本在于用几个给予的对象拼接一个字符串,不会涉及到JexlEngine提供的各种复杂的功能。这边更新下之前文章中切面类RedisCacheAspect的代码

Continue reading