作者: CheungQ

一次Maven引用版本冲突的NoClassDefFoundError问题的排查记录

起因

前几日正常版本发布后,经过发布验证,一个不在本次发布功能范围中的文件导出功能不能正常执行,具体表现为,任务启动后一直卡在执行中状态,无法完成且不会自动异常中断,就这么一直卡在执行中的状态,同时后续更多导出任务仍然在排队持续积压。接下来就是在众多业务部门狂轰滥炸的问询之下,顶住压力进行排查处理。

排查过程

首先初步排查确认故障范围,首先第一步想到的是去异常日志监管平台和统计平台查看有无明显的新增异常,翻了一圈无果,这就很奇怪了。接下来我只能去根据功能逻辑的特点去逐排查。同事去申请下载完整的近期应用日志,这个要走流程,很慢,就先不管他了。

交代下这个下载服务功能的基础逻辑,然后我们根据不同部分验证来排查确认。

该下载服务根据平台用户提交的下载请求创建导出任务,再有任务调度系统来定时执行启动导出任务执行。导出任务的执行是一个基本的生产者消费者模型,启动后分两部分,一部分为生产者线程,根据用户提交的查询参数去对应的ES索引或者MYSQL表查询数据。另外一方面启动消费者线程来消费这些数据,写入到一个或者多个csv文件中,消费完毕后将文件提交到OSS对象存储服务中,得到对应的文件下载地址。之后标记导出任务完成,并更新下载地址到导出任务信息中。那么就对这些步骤分别排查验证。

逐步排查第一步,任务状态能从排队中更新为执行中状态,且后续任务能保持在排队中状态,说明任务调度系统到当前执行导出任务的应用直接的通信应该是正常的。

第二步,任务启动后会首先查询待导出总数据量,经过测试,导出任务的该字段能正确更新,说明一方面具体的执行线程能正确查询到对应ES/MYSQL中的数据,另一方面也能反过来正确更新导出任务中的总数据量字段,说明这两个路径之间是正常。那么如果任务发生异常中断了的话,按照代码逻辑应当能正确进入Exception处理分支,去更新导出任务的状态为失败。到这里合理的怀疑是不是任务哪边进入了死循环不能中断跳出

第三步,去到最终的OSS服务器目录上查看下,确认没有对应的文件上传,到这里怀疑是不是上传OSS的时候网络问题导致文件上传非常慢从而卡在了这里,任务一直无法结束

第四步,回到我们的应用服务器,简单申请个权限看下对应的应用服务的临时文件夹,确认下文件是否正确生成了,从而确认第三步的怀疑是否正确。但是在应用服务器上的临时文件目录看了下后发现,对应的csv文件生成了,但是文件大小是0,是个空文件(看不到文件内容,涉及敏感信息管理,查看文件内容需要另外申请权限)。

最初是想看下是不是有什么异常数据导致写入文件失败的,但是一看只生成了空文件,那么可以确认应该是往文件写入的这部分代码的问题了。这样也就排除掉前面第三步的向OSS上传问题猜测。

Continue reading

一个开发中用到的报文压缩实现

之前开发中用到的一个报文压缩的实现方案,简单在这里单独提出来写一下,分为Java和JS两版代码。

Java版代码是服务端使用,用来在各服务端之间发送接收报文使用。JS版是在前端页面上用来查看或者调试接口报文使用的。

Java代码,压缩和解压缩方法


/**
 * 
 * 功能描述:字符串压缩 <br>
 * 将字符串压缩
 *
 * @param str 待压缩的字符串
 * @return 压缩后的字符串
 */
@SuppressWarnings("restriction")
public static String gzip(String str) {
    // 创建字符流
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    GZIPOutputStream gzip = null;
    try {
        // 对字符串进行压缩并写入压缩流
        gzip = new GZIPOutputStream(out);
        gzip.write(str.getBytes());
    } catch (IOException e) {
        String errMsg = e.getMessage();
        logger.error(errMsg);
    } finally {
        if (gzip != null) {
            try {
                gzip.close();
            } catch (IOException e) {
                String errMsg = e.getMessage();
                logger.error(errMsg);
            }
        }
    }
    // 返回压缩后的字符串
    return new sun.misc.BASE64Encoder().encode(out.toByteArray());
}

/**
 * 
 * 功能描述: 字符串解压<br>
 * 将字符串解压
 *
 * @param str 待解压的字符串
 * @return 解压后的字符串
 * @throws Exception
 */
@SuppressWarnings("restriction")
public static String gunzip(String str) {
    // 校验压缩数据
    if (str == null) {
        return null;
    }

    // 创建读取流
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    ByteArrayInputStream in = null;
    // 解压缩流
    GZIPInputStream ginzip = null;
    byte[] compressed = null;
    // 原始字符串
    String decompressed = null;

    try {
        // 进行解压缩操作
        compressed = new sun.misc.BASE64Decoder().decodeBuffer(str);
        in = new ByteArrayInputStream(compressed);
        ginzip = new GZIPInputStream(in);

        byte[] buffer = new byte[BYTE_SIZE];
        int offset = -1;
        while ((offset = ginzip.read(buffer)) != -1) {
            out.write(buffer, 0, offset);
        }
        decompressed = out.toString();
    } catch (IOException e) {
        String errMsg = e.getMessage();
        logger.error(errMsg);
        logger.error("解析压缩字符串异常", e);
    } finally {
        if (ginzip != null) {
            try {
                ginzip.close();
            } catch (IOException e) {
                String errMsg = e.getMessage();
                logger.error(errMsg);
            }
        }
        if (in != null) {
            try {
                in.close();
            } catch (IOException e) {
                String errMsg = e.getMessage();
                logger.error(errMsg);
            }
        }
        try {
            out.close();
        } catch (IOException e) {
            String errMsg = e.getMessage();
            logger.error(errMsg);
        }
    }
    // 返回原始字符串
    return decompressed;
}

JavaScript代码,js的压缩解压缩需要调用到pako包的内容,可以在https://www.bootcdn.cn/pako/ 上找到需要的版本引用,或者简单点直接把需要min版本代码复制到你需要的页面里, https://cdn.bootcdn.net/ajax/libs/pako/2.0.4/pako.min.js,另外也用到浏览器自带的base64加密解密的方法btoa和atob

Continue reading

一次序列化与反序列化引发的BUG排查

BUG现场

这是一份若干年前的历史代码,当时的同学写这份代码的时候设计思路是这样的,我复现这份BUG现场的代码如下

要点1 有一个用来传输数据的DTO,大致如下

@Data
public class CategoryBrandBu implements Serializable {

    protected static final Map<String,String> BRAND_RELATION_MAP = new HashMap<>();
    public Map<String,String> getRelationMap(){
        return BRAND_RELATION_MAP;
    }
    private Long id;
    private String brandCode;
    private String brandName;

    public String getBrandName() {
        if (StringUtils.isBlank(brandCode)){
            return StringUtils.EMPTY;
        }
        return BRAND_RELATION_MAP.getOrDefault(brandCode,StringUtils.EMPTY);
    }
}

DTO中声明了一个静态常量BRAND_RELATION_MAP的HashMap,用来保存某个不常更新的表中的映射信息,表中的数据基本只有几十条,所以放在这用来获取品牌名称

要点2. 再另外起一个定时job,定时来刷新这个HashMap中的映射关系数据,代码大意如下

@Configuration
public class ScheduledConfig {

    @Scheduled(fixedDelay = 5000)
    public void updateMap(){
        Random random = new Random();
        int i = random.nextInt(1000);
        System.out.println("updated: "+i);
        CategoryBrandBu dto = new CategoryBrandBu();
        dto.getRelationMap().put("ABC", String.valueOf(i));
    }
}

其中代码先new CategoryBrandBu()getRelationMap(),再put的操作虽然看起来比较挫,不过嗯、至少确实还是有用的(原来的代码就是这样),这个不是重点,另外fixedDelay = 5000是我特地调整的5秒刷新一次。

要点3. 接下来一个至关重要的东西,当前Application中提供了对外服务RPC(某自研RPC框架)接口用于查询相关数据,其中有一个接口的返回值中就用到了当前涉及的对象CategoryBrandBu。

RPC基础服务类中将请求参数序列化之后,发送到目标Application中。目标Application接收到请求之后,将请求信息解析之后调用对应bean实例的对应方法,且同时反序列化对应的请求参数为对应方法的java参数对象。这里这序列化和反序列化中用的fastjson(能用,不过也不是很高明的样子)。

BUG表现情况

我们的@Scheduled定时任务无论怎样刷新CategoryBrandBu类中静态成员变量Map的信息,外部应用实例(B)通过RPC访问过来的之后得到的结果永远都是第一次初始化之后得到Map中的值(其实也不是第一次初始化的值,后面我再说到)

举个栗子来说明下,首先当我们的Application(A)启动之后,当前静态成员变量上Map被初始化成了如下

(A)->AAA
(B)->BBB
(C)->CCC

此时外部的应用实例(B)通过RPC访问到当前Application(A)的对应接口之后,得到了这个DTO序列化的结果,其中包含已经被序列化了的BRAND_RELATION_MAP,而我们对应外部应用实例(B)的系统也恰巧用了同样的DTO的java类文件,于是在外部应用实例(B)接收到序列化的返回结果同时,也接收到了序列化的BRAND_RELATION_MAP,外部应用实例(B)对数据进行反序列化,同时也对外部应用实例(B)中的CategoryBrandBu类中的静态成员变量BRAND_RELATION_MAP进行赋值。

Continue reading

ElasticSearch使用script更新文档失败问题排查

起因

起因是一个批量根据es文档id更新指定字段的功能,经过上线后使用反馈,经常性的偶发文档无法更新的情况

处理

找到相关代码,自行写个demo代码批量跑下试下,原来的代码大概示意如下

public String len10() throws Exception{
    Random random = new Random();
    String[] ls = new String[500];
    for (int i = 0; i < 500; i++) {
        int finalI = i;
        Runnable callable = new Runnable() {
            @Override
            public void run() {
                String str = String.valueOf(random.nextInt(40)+10);
                String str2 = String.valueOf(random.nextInt(40)+10);
                String str3 = String.valueOf(random.nextInt(40)+10);
                String action = "/XXXXXXX_index/XXXXXXX_type/_update_by_query";
                String dateTime = "20"+str+"-01-12 23:"+str2+":"+str3;
                String script = "{\n" +
                        "  \"script\": {\n" +
                        "    \"inline\": \"ctx._source.modify_time='"+dateTime+"'\"\n" +
                        "  },\n" +
                        "  \"query\": {\n" +
                        "    \"bool\": {\n" +
                        "      \"filter\": [{\n" +
                        "        \"term\": {\n" +
                        "          \"id\": \"2\"\n" +
                        "        }\n" +
                        "      }]\n" +
                        "    }\n" +
                        "  }\n" +
                        "}"
                        ;
                try {
                    String post = esRestClient.performRequest("POST", action, script);
                    System.out.println(post);
                    ls[finalI] = post;
                } catch (IOException e) {
                    e.printStackTrace();
                    ls[finalI] = e.getMessage();
                }
            }
        };
        Thread thread = new Thread( callable);
        thread.start();
    }
    Thread.sleep(50000);
    return "";
}

大意就是起500个线程,更新索引中指定文档id为2的文档的modify_time字段,通过script来更新。

执行之后其实就可以看到大量异常信息了

Continue reading

从一个最基础的生产者消费者模型开始构建离线下载服务

基础版生产者消费者模型

这里我们构建一个最基本的生产者消费者模型,多个Producer线程往队列中写入数据,一个Consumer线程从队列中按顺序取出数据。基本代码如下

public class Consumer extends Thread {

    private final AtomicInteger producerCnt;

    private ArrayBlockingQueue<String> queue;

    @Override
    public void run() {
        try {
            int cntVal = 0;
            while (producerCnt.intValue() != 0 || !queue.isEmpty()) {
                String pollVal = queue.poll(1, TimeUnit.SECONDS);
                System.out.println("poll a value: "+pollVal);
                cntVal++;
            }
            System.out.println("finished, total count: "+cntVal);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public Consumer(ArrayBlockingQueue<String> queue, AtomicInteger cnt) {
        this.queue = queue;
        this.producerCnt = cnt;
    }
}
public class Producer extends Thread {

    private ArrayBlockingQueue<String> queue;

    private AtomicInteger producerCnt;

    @Override
    public void run() {
        String threadName = Thread.currentThread().getName();
        try {
            for (int i = 0; i < 100; i++) {
                queue.put(threadName + "     " + i);
            }
            System.out.println("Producer finished:"+threadName);
            producerCnt.decrementAndGet();
        } catch (InterruptedException e) {
            e.printStackTrace();
            producerCnt.decrementAndGet();
        }
    }

    public Producer(ArrayBlockingQueue<String> queue, AtomicInteger cnt) {
        this.queue = queue;
        this.producerCnt = cnt;
    }
}
Continue reading

最近写HIVE SQL的一点笔记【4】

分组求金额占比,以如下数据为例

tid为主订单号,oid为子订单号,每个子订单有自己的订单金额,字段为fee。那么我们要求出每个子单的金额在主单的总金额的占比。

可以使用 OVER开窗函数 配合SUM求和函数来处理

SUM(FEE) OVER (PARTITION BY TID) AS SUM_FEE

来根据TID求的每个主订单的金额总和,之后再用各自的子订单金额除以主订单金额得到占比就行

SELECT TID,
       OID,
       FEE,
       SUM(FEE) OVER (PARTITION BY TID) AS SUM_FEE,
       ROUND(FEE/SUM(FEE)) OVER (PARTITION BY TID) AS FEE_RATIO
FROM (
         SELECT '101' as tid, '10001' as oid, 33.20 as fee
         UNION ALL
         SELECT '101' as tid, '10002' as oid, 13.65 as fee
         UNION ALL
         SELECT '101' as tid, '10003' as oid, 16.10 as fee
         UNION ALL
         SELECT '201' as tid, '20001' as oid, 6.70 as fee
         UNION ALL
         SELECT '201' as tid, '20002' as oid, 1.21 as fee
         UNION ALL
         SELECT '301' as tid, '30001' as oid, 118.22 as fee
         UNION ALL
         SELECT '401' as tid, '40001' as oid, 208.03 as fee
         UNION ALL
         SELECT '401' as tid, '40002' as oid, 119.90 as fee
         UNION ALL
         SELECT '401' as tid, '40003' as oid, 5.50 as fee
         UNION ALL
         SELECT '401' as tid, '40004' as oid, 24.80 as fee
     ) A

其他可以配合OVER开窗函数使用的函数有 AVG,MIN,MAX

Continue reading

最近写HIVE SQL的一点笔记【3】

一个最近的开发需求,导出某ES索引上某嵌套字段值

promotion_details.promotion_name

符合以“官方立减”文字结尾的所有数据导出,因为es索引的结构的关系,目前es索引结构的原因,数据需要处理下之后再导出,目前es索引数据的结构大致如下

{
  "_index": "my_elasticsearch_index",
  "_type": "order",
  "_id": "3602478673110456764",
  "_version": 1699331966001,
  "found": true,
  "_source": {
    "adjust_fee": "0.00",
    "alipay_point": "0",
    "available_confirm_fee": "0.00",
    "buyer_alipay_no": "****",
    ......
    "oms_orders": [
      {
        "cart_item_no": "3602478673111456764",
        "order_item_id": "210100024285392788",
        ......
      },
      ......
    ],
    "promotion_details": [
      {
        "promotion_name": "2023天猫双11抢先购官方立减",
        ......
      },
    ],
    "sub_orders": [
      {
        "oid": "3602478673111456764",
        ......
      },
      ......
    ],
    "tid": "3602478673110456764",
    "trade_from": "WAP,WAP",
    "type": "fixed",
    ......
  }
}

tid为主订单单号、sub_orders.oid为子单单号、oms_orders.cart_item_no对应等于sub_orders.oid,有一条oid子单记录就对应有一条cart_item_no记录。

Continue reading