水一篇,之前写了es分页查询的几种方法,ElasticSearch 分页查询方法from&size,scroll,及search_after浅析,起因是1月份的需求要做一个ES索引内数据聚合导出的需求。

这个ES索引每天有100万左右新增数据,而需求是每月根据不同的条件做聚合统计,并导出数据到csv文件。之前的同事做过一个类似的功能,但是性能很差,基本思路如下,可以简单分析下。

  1. java的job接收导出查询的参数,创建导出执行任务,启动两个线程,一个es查询线程,一个写csv线程
  2. java调用EsClient,进行search_after查询,每一页查询的结果保存到java内存中,并判断累计的结果数量,大于一个设定的数值之后则根据导出时候设定的参数进行聚合,保存到一个Map中
  3. 分页查询结束后,写csv线程读取聚合后的结果写入csv文件

首先,启动两个线程,一个读,一个等待读之后写csv这个其实是不合理的,在所有数据聚合完成之前,写线程就一直处于等待状态。但是这个模式我改动不了,这是目前项目代码里集成的导出任务jar包里的内容,我没法动它,其他很多导出功能也用的这一套代码,所以这边就只能先这样。(也不是说jar包里的逻辑我在外面就没法修改)

接下来,现有的功能是用的search_after查询的,排序字段是子订单id,是无序的。所以只能支持单线程操作,即只能用一个读线程(ReadThread),一页一页的往后翻页查询。且原来的逻辑中有点bug,大于一个设定的数值之后则根据导出时候设定的参数进行聚合。假定这个数值为5万,那么当不断地翻页某次聚合之后总结果数已经大于5万了,那么之后每次进行翻页查询都要把所有数据重新聚合一遍。最终实测,某次导出了300万数据进行聚合,导出任务总共执行超过了6个小时。

根据以上情况,我重新设计了这次的导出功能。

首先想到的其实是ES自带的聚合统计查询的功能,但是经过实际测试,并不可行,首先因为我们的ES索引是分多个shard的,最终统计出来的结果只能是一个前几位top结果,或者近似值。且统计量比较大,耗时比较久,目前java开发框架不支持等原因,只好放弃这个方向的方案,所以本文题目为(不是),ES的聚合查询相关细节,回头再开一片文章来记录

回到既有方案的思路,我对他做了改进设计

  1. ES查询使用scroll slice查询,这样支持多线程读取,能大大加快读取速度,且因为业务场景是统计导出,并不需要实时的感知索引中数据的变化,用scroll足矣
  2. 因为使用了scroll slice,所以在导出任务中创建了多个读取线程
  3. 同时维护一个ConcurrentHashMap,每个读取线程读取到一页数据后,把读取到的数据根据指定规则在ConcurrentHashMap上进行聚合,此处使用Map#merge,稍后细讲
  4. 为了解决导出框架中ReadThread和WriteThread同时创建执行,但是我们的所有ReadThread在执行结束前又不能进行写文件操作的问题。我另外在导出任务中维护一个AtomicInteger,初始化对应值为读取线程的个数
  5. 每当一个ReadThread读取结束了,则对当前AtomicInteger执行decrementAndGet操作,如果结果为0则表明当前ReadThread是最后一个读完的线程,此时所有数据已经聚合完成了,下面可以开始进行写操作了。
  6. 当前这个最后读完的ReadThread则最后再执行一步,把聚合完成的数据丢入写数据队列,由写线程消费写入csv文件。这个消费队列为ArrayBlockingQueue,生产者使用java.util.concurrent.ArrayBlockingQueue#put方法将数据塞入,当队列已满时,会进行等待,而不是抛出异常

关于聚合部分,直接调用java.util.concurrent.ConcurrentHashMap#merge方法

从方法的注释中,或者直接读源码的话都可以看出ConcurrentHashMap中重写了原来Map类中定义的merge方法。重写后的方法目前可以保证在并发模式下的原子性。在此之前,其实想过另外一个方法,基本代码如下

@Test
public void cc() throws InterruptedException {
    ConcurrentHashMap<String,TestDto> concurrentHashMap = new ConcurrentHashMap<>();
    int size = 1000000;
    Thread thread1 = new Thread(()->{
        for (int i = 0; i < size; i++) {
            TestDto testDto = new TestDto();
            concurrentHashMap.putIfAbsent(""+i,testDto);
            concurrentHashMap.get(""+i).getAtomicInteger().addAndGet(2);
        }
    });
    Thread thread2 = new Thread(()->{
        for (int i = 0; i < size; i++) {
            TestDto testDto = new TestDto();
            concurrentHashMap.putIfAbsent(""+i,testDto);
            concurrentHashMap.get(""+i).getAtomicInteger().addAndGet(2);
        }
    });
    thread1.start();
    thread2.start();
    Thread.sleep(4000);
    for (Map.Entry<String, TestDto> entry : concurrentHashMap.entrySet()) {
        if (concurrentHashMap.get(entry.getKey()).getAtomicInteger().intValue() != 4){
            System.out.println("err");
        }
    }

    System.out.println(1);
}

先在ConcurrentHashMap上创建对象,且这个对象上我们需要聚合统计的字段需要为Atomic型的。之后再获取对象,操作这个Atomic型字段进行增减。

这样的逻辑来实现本身没有什么问题,先putIfAbsent一个空的待初始化的对象,此处在源码中putVal方法里对应填入值的代码块有synchronized关键字保证原子性,所以不会出现两个线程同时往同一个key调用putIfAbsent插入对象成功的情况。

再之后下一步,两个线程获取到key上的对象,并对需要的Atomic字段执行addAndGet等操作,底层中由native方法compareAndSwapInt保证原子性。所以整个这个过程是线程安全的。实际跑了几次上面的代码,没有问题

不过这么写还是不够优雅,我其实一直忽略了Map接口中提供的merge方法java.util.Map#merge(K key, V value, BiFunction remappingFunction)。简单说一下这个方法,这个方法用来向Map中某个key上的对象中合并另一个对象,对这两个对象进行合并操作之后并更新到这个key上。如果合并的结果是null,则移除这个key。

If the specified key is not already associated with a value or is associated with null, associates it with the given non-null value. Otherwise, replaces the associated value with the results of the given remapping function, or removes if the result is {@code null}. This method may be of use when combining multiple mapped values for a key.

Map接口中的merge方法的注释中描述了这个方法执行的基本逻辑。先读取oldValue,判断如果oldValue为null,则newValue则为传入的value,否则执行传入的合并方法并将加过赋值给newValue。

如果newValue为空则删除当前key,不为空则则将newValue关联上key,替代掉原来的oldValue

V oldValue = map.get(key);
V newValue = (oldValue == null) ? value :
             remappingFunction.apply(oldValue, value);
if (newValue == null)
    map.remove(key);
else
    map.put(key, newValue);

但是基础的Map接口中定义的方法并不能保证并发状态下的线程安全,官方注解中有如下说明。这个默认实现不保证同步性或原子性,任何保证原子性的Map实现类都应当重写此方法。

The default implementation makes no guarantees about synchronization or atomicity properties of this method. Any implementation providing atomicity guarantees must override this method and document its concurrency properties. In particular, all implementations of subinterface {@link java.util.concurrent.ConcurrentMap} must document whether the function is applied once atomically only if the value is not present.

于是我才将目光移到了ConcurrentHashMap#merge这里。ConcurrentHashMap中重写了该方法,并写了如下注释说明,包括基本的逻辑介绍,以及特地提到了一点整个方法调用以原子方式执行(The entire method invocation is performed atomically.)。所以在当前这个需求场景下,多线程数据合并操作中完全就可以用这个merge方法来处理,简介优雅且保证原子性。

 If the specified key is not already associated with a (non-null) value, associates it with the given value. Otherwise, replaces the value with the results of the given remapping function, or removes if {@code null}. The entire method invocation is performed atomically.  Some attempted update operations on this map by other threads may be blocked while computation is in progress, so the computation should be short and simple, and must not attempt to update any other mappings of this Map.

如果仔细阅读源码,也可以发现,源码中把Map中的value读取、替换以及合并方法的执行用synchronized关键字修饰起来了,所以才能保证原子性。但同时因为这个合并方法是调用方输入的,为了保证锁粒度的尽量小,方法注解中也特意说明了合并计算的逻辑应当尽量简短,且不能更新其他key上的对象。

另外一个题外话,Map的key上的值从链表转TreeMap过程并不在这个锁中执行,而是由另外的锁控制,这里也可以清楚看到,当链表长度大于等于8的时候才会执行

if (binCount != 0) {
    if (binCount >= TREEIFY_THRESHOLD)
        treeifyBin(tab, i);
    break;
}

所以基于ConcurrentHashMap#merge方法,我们重新写下上面的demo,具体业务中的代码就不贴出来了。同时做了一点小小的调整,原来的Thread.sleep(4000);的不确定性改为了用CountDownLatch代替

@Test
public void cc2() throws InterruptedException {
    ConcurrentHashMap<Integer, TestDto> concurrentHashMap = new ConcurrentHashMap<>();
    int size = 20000000;
    CountDownLatch countDownLatch = new CountDownLatch(2);
    Thread thread1 = new Thread(() -> {
        for (int i = 0; i < size; i++) {
            TestDto testDto = new TestDto();
            testDto.setNormalInteger(1);
            merge(concurrentHashMap, i, testDto);
        }
        countDownLatch.countDown();
    });
    Thread thread2 = new Thread(() -> {
        for (int i = 0; i < size; i++) {
            TestDto testDto = new TestDto();
            testDto.setNormalInteger(1);
            merge(concurrentHashMap, i, testDto);
        }
        countDownLatch.countDown();
    });
    thread1.start();
    thread2.start();
    countDownLatch.await();
    for (Map.Entry<Integer, TestDto> entry : concurrentHashMap.entrySet()) {
        if (concurrentHashMap.get(entry.getKey()).getNormalInteger() != 2) {
            System.out.println("err");
        }
    }
    System.out.println(1);
}

private void merge(ConcurrentHashMap<Integer, TestDto> concurrentHashMap, int i, TestDto testDto) {
    concurrentHashMap.merge(i, testDto, (testDto1, testDto2) -> {
        testDto1.setNormalInteger(testDto1.getNormalInteger() + 1);
        return testDto1;
    });
}

执行几次,验证结果无误。

生产环境上线实际执行的业务代码的结果。上面提到原来的功能统计300万条数据一共花费了6个小时,而新代码上线当晚跑了一下870万条数据,共计耗时7分多钟,经过手工统计验证结果无误,效果拔群确实完美!~

另外我在本地也执行过,顺便关注了下控制台的输出内容,新的多线程的整个业务执行过程中,大部分的时间花费在了写导出文件,然后上传到文件服务器这部分了,原本最耗时的单线程读es部分已经执行得非常快了。