作者: CheungQ

一个开发中用到的报文压缩实现

之前开发中用到的一个报文压缩的实现方案,简单在这里单独提出来写一下,分为Java和JS两版代码。

Java版代码是服务端使用,用来在各服务端之间发送接收报文使用。JS版是在前端页面上用来查看或者调试接口报文使用的。

Java代码,压缩和解压缩方法


/**
 * 
 * 功能描述:字符串压缩 <br>
 * 将字符串压缩
 *
 * @param str 待压缩的字符串
 * @return 压缩后的字符串
 */
@SuppressWarnings("restriction")
public static String gzip(String str) {
    // 创建字符流
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    GZIPOutputStream gzip = null;
    try {
        // 对字符串进行压缩并写入压缩流
        gzip = new GZIPOutputStream(out);
        gzip.write(str.getBytes());
    } catch (IOException e) {
        String errMsg = e.getMessage();
        logger.error(errMsg);
    } finally {
        if (gzip != null) {
            try {
                gzip.close();
            } catch (IOException e) {
                String errMsg = e.getMessage();
                logger.error(errMsg);
            }
        }
    }
    // 返回压缩后的字符串
    return new sun.misc.BASE64Encoder().encode(out.toByteArray());
}

/**
 * 
 * 功能描述: 字符串解压<br>
 * 将字符串解压
 *
 * @param str 待解压的字符串
 * @return 解压后的字符串
 * @throws Exception
 */
@SuppressWarnings("restriction")
public static String gunzip(String str) {
    // 校验压缩数据
    if (str == null) {
        return null;
    }

    // 创建读取流
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    ByteArrayInputStream in = null;
    // 解压缩流
    GZIPInputStream ginzip = null;
    byte[] compressed = null;
    // 原始字符串
    String decompressed = null;

    try {
        // 进行解压缩操作
        compressed = new sun.misc.BASE64Decoder().decodeBuffer(str);
        in = new ByteArrayInputStream(compressed);
        ginzip = new GZIPInputStream(in);

        byte[] buffer = new byte[BYTE_SIZE];
        int offset = -1;
        while ((offset = ginzip.read(buffer)) != -1) {
            out.write(buffer, 0, offset);
        }
        decompressed = out.toString();
    } catch (IOException e) {
        String errMsg = e.getMessage();
        logger.error(errMsg);
        logger.error("解析压缩字符串异常", e);
    } finally {
        if (ginzip != null) {
            try {
                ginzip.close();
            } catch (IOException e) {
                String errMsg = e.getMessage();
                logger.error(errMsg);
            }
        }
        if (in != null) {
            try {
                in.close();
            } catch (IOException e) {
                String errMsg = e.getMessage();
                logger.error(errMsg);
            }
        }
        try {
            out.close();
        } catch (IOException e) {
            String errMsg = e.getMessage();
            logger.error(errMsg);
        }
    }
    // 返回原始字符串
    return decompressed;
}

JavaScript代码,js的压缩解压缩需要调用到pako包的内容,可以在https://www.bootcdn.cn/pako/ 上找到需要的版本引用,或者简单点直接把需要min版本代码复制到你需要的页面里, https://cdn.bootcdn.net/ajax/libs/pako/2.0.4/pako.min.js,另外也用到浏览器自带的base64加密解密的方法btoa和atob

Continue reading

一次序列化与反序列化引发的BUG排查

BUG现场

这是一份若干年前的历史代码,当时的同学写这份代码的时候设计思路是这样的,我复现这份BUG现场的代码如下

要点1 有一个用来传输数据的DTO,大致如下

@Data
public class CategoryBrandBu implements Serializable {

    protected static final Map<String,String> BRAND_RELATION_MAP = new HashMap<>();
    public Map<String,String> getRelationMap(){
        return BRAND_RELATION_MAP;
    }
    private Long id;
    private String brandCode;
    private String brandName;

    public String getBrandName() {
        if (StringUtils.isBlank(brandCode)){
            return StringUtils.EMPTY;
        }
        return BRAND_RELATION_MAP.getOrDefault(brandCode,StringUtils.EMPTY);
    }
}

DTO中声明了一个静态常量BRAND_RELATION_MAP的HashMap,用来保存某个不常更新的表中的映射信息,表中的数据基本只有几十条,所以放在这用来获取品牌名称

要点2. 再另外起一个定时job,定时来刷新这个HashMap中的映射关系数据,代码大意如下

@Configuration
public class ScheduledConfig {

    @Scheduled(fixedDelay = 5000)
    public void updateMap(){
        Random random = new Random();
        int i = random.nextInt(1000);
        System.out.println("updated: "+i);
        CategoryBrandBu dto = new CategoryBrandBu();
        dto.getRelationMap().put("ABC", String.valueOf(i));
    }
}

其中代码先new CategoryBrandBu()getRelationMap(),再put的操作虽然看起来比较挫,不过嗯、至少确实还是有用的(原来的代码就是这样),这个不是重点,另外fixedDelay = 5000是我特地调整的5秒刷新一次。

要点3. 接下来一个至关重要的东西,当前Application中提供了对外服务RPC(某自研RPC框架)接口用于查询相关数据,其中有一个接口的返回值中就用到了当前涉及的对象CategoryBrandBu。

RPC基础服务类中将请求参数序列化之后,发送到目标Application中。目标Application接收到请求之后,将请求信息解析之后调用对应bean实例的对应方法,且同时反序列化对应的请求参数为对应方法的java参数对象。这里这序列化和反序列化中用的fastjson(能用,不过也不是很高明的样子)。

BUG表现情况

我们的@Scheduled定时任务无论怎样刷新CategoryBrandBu类中静态成员变量Map的信息,外部应用实例(B)通过RPC访问过来的之后得到的结果永远都是第一次初始化之后得到Map中的值(其实也不是第一次初始化的值,后面我再说到)

举个栗子来说明下,首先当我们的Application(A)启动之后,当前静态成员变量上Map被初始化成了如下

(A)->AAA
(B)->BBB
(C)->CCC

此时外部的应用实例(B)通过RPC访问到当前Application(A)的对应接口之后,得到了这个DTO序列化的结果,其中包含已经被序列化了的BRAND_RELATION_MAP,而我们对应外部应用实例(B)的系统也恰巧用了同样的DTO的java类文件,于是在外部应用实例(B)接收到序列化的返回结果同时,也接收到了序列化的BRAND_RELATION_MAP,外部应用实例(B)对数据进行反序列化,同时也对外部应用实例(B)中的CategoryBrandBu类中的静态成员变量BRAND_RELATION_MAP进行赋值。

Continue reading

ElasticSearch使用script更新文档失败问题排查

起因

起因是一个批量根据es文档id更新指定字段的功能,经过上线后使用反馈,经常性的偶发文档无法更新的情况

处理

找到相关代码,自行写个demo代码批量跑下试下,原来的代码大概示意如下

public String len10() throws Exception{
    Random random = new Random();
    String[] ls = new String[500];
    for (int i = 0; i < 500; i++) {
        int finalI = i;
        Runnable callable = new Runnable() {
            @Override
            public void run() {
                String str = String.valueOf(random.nextInt(40)+10);
                String str2 = String.valueOf(random.nextInt(40)+10);
                String str3 = String.valueOf(random.nextInt(40)+10);
                String action = "/XXXXXXX_index/XXXXXXX_type/_update_by_query";
                String dateTime = "20"+str+"-01-12 23:"+str2+":"+str3;
                String script = "{\n" +
                        "  \"script\": {\n" +
                        "    \"inline\": \"ctx._source.modify_time='"+dateTime+"'\"\n" +
                        "  },\n" +
                        "  \"query\": {\n" +
                        "    \"bool\": {\n" +
                        "      \"filter\": [{\n" +
                        "        \"term\": {\n" +
                        "          \"id\": \"2\"\n" +
                        "        }\n" +
                        "      }]\n" +
                        "    }\n" +
                        "  }\n" +
                        "}"
                        ;
                try {
                    String post = esRestClient.performRequest("POST", action, script);
                    System.out.println(post);
                    ls[finalI] = post;
                } catch (IOException e) {
                    e.printStackTrace();
                    ls[finalI] = e.getMessage();
                }
            }
        };
        Thread thread = new Thread( callable);
        thread.start();
    }
    Thread.sleep(50000);
    return "";
}

大意就是起500个线程,更新索引中指定文档id为2的文档的modify_time字段,通过script来更新。

执行之后其实就可以看到大量异常信息了

Continue reading

从一个最基础的生产者消费者模型开始构建离线下载服务

基础版生产者消费者模型

这里我们构建一个最基本的生产者消费者模型,多个Producer线程往队列中写入数据,一个Consumer线程从队列中按顺序取出数据。基本代码如下

public class Consumer extends Thread {

    private final AtomicInteger producerCnt;

    private ArrayBlockingQueue<String> queue;

    @Override
    public void run() {
        try {
            int cntVal = 0;
            while (producerCnt.intValue() != 0 || !queue.isEmpty()) {
                String pollVal = queue.poll(1, TimeUnit.SECONDS);
                System.out.println("poll a value: "+pollVal);
                cntVal++;
            }
            System.out.println("finished, total count: "+cntVal);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public Consumer(ArrayBlockingQueue<String> queue, AtomicInteger cnt) {
        this.queue = queue;
        this.producerCnt = cnt;
    }
}
public class Producer extends Thread {

    private ArrayBlockingQueue<String> queue;

    private AtomicInteger producerCnt;

    @Override
    public void run() {
        String threadName = Thread.currentThread().getName();
        try {
            for (int i = 0; i < 100; i++) {
                queue.put(threadName + "     " + i);
            }
            System.out.println("Producer finished:"+threadName);
            producerCnt.decrementAndGet();
        } catch (InterruptedException e) {
            e.printStackTrace();
            producerCnt.decrementAndGet();
        }
    }

    public Producer(ArrayBlockingQueue<String> queue, AtomicInteger cnt) {
        this.queue = queue;
        this.producerCnt = cnt;
    }
}
Continue reading

最近写HIVE SQL的一点笔记【4】

分组求金额占比,以如下数据为例

tid为主订单号,oid为子订单号,每个子订单有自己的订单金额,字段为fee。那么我们要求出每个子单的金额在主单的总金额的占比。

可以使用 OVER开窗函数 配合SUM求和函数来处理

SUM(FEE) OVER (PARTITION BY TID) AS SUM_FEE

来根据TID求的每个主订单的金额总和,之后再用各自的子订单金额除以主订单金额得到占比就行

SELECT TID,
       OID,
       FEE,
       SUM(FEE) OVER (PARTITION BY TID) AS SUM_FEE,
       ROUND(FEE/SUM(FEE)) OVER (PARTITION BY TID) AS FEE_RATIO
FROM (
         SELECT '101' as tid, '10001' as oid, 33.20 as fee
         UNION ALL
         SELECT '101' as tid, '10002' as oid, 13.65 as fee
         UNION ALL
         SELECT '101' as tid, '10003' as oid, 16.10 as fee
         UNION ALL
         SELECT '201' as tid, '20001' as oid, 6.70 as fee
         UNION ALL
         SELECT '201' as tid, '20002' as oid, 1.21 as fee
         UNION ALL
         SELECT '301' as tid, '30001' as oid, 118.22 as fee
         UNION ALL
         SELECT '401' as tid, '40001' as oid, 208.03 as fee
         UNION ALL
         SELECT '401' as tid, '40002' as oid, 119.90 as fee
         UNION ALL
         SELECT '401' as tid, '40003' as oid, 5.50 as fee
         UNION ALL
         SELECT '401' as tid, '40004' as oid, 24.80 as fee
     ) A

其他可以配合OVER开窗函数使用的函数有 AVG,MIN,MAX

Continue reading

最近写HIVE SQL的一点笔记【3】

一个最近的开发需求,导出某ES索引上某嵌套字段值

promotion_details.promotion_name

符合以“官方立减”文字结尾的所有数据导出,因为es索引的结构的关系,目前es索引结构的原因,数据需要处理下之后再导出,目前es索引数据的结构大致如下

{
  "_index": "my_elasticsearch_index",
  "_type": "order",
  "_id": "3602478673110456764",
  "_version": 1699331966001,
  "found": true,
  "_source": {
    "adjust_fee": "0.00",
    "alipay_point": "0",
    "available_confirm_fee": "0.00",
    "buyer_alipay_no": "****",
    ......
    "oms_orders": [
      {
        "cart_item_no": "3602478673111456764",
        "order_item_id": "210100024285392788",
        ......
      },
      ......
    ],
    "promotion_details": [
      {
        "promotion_name": "2023天猫双11抢先购官方立减",
        ......
      },
    ],
    "sub_orders": [
      {
        "oid": "3602478673111456764",
        ......
      },
      ......
    ],
    "tid": "3602478673110456764",
    "trade_from": "WAP,WAP",
    "type": "fixed",
    ......
  }
}

tid为主订单单号、sub_orders.oid为子单单号、oms_orders.cart_item_no对应等于sub_orders.oid,有一条oid子单记录就对应有一条cart_item_no记录。

Continue reading

ElasticSearch数据迁移reindex命令

起因

一次需求中涉及两个索引,其中一个是本次新建的索引,需要将原始ES索引中的部分数据导入到当前新建的索引中。

分析

对于新所以的数据初始化有两个思路,因为我们的数据都是在HIVE中清洗完成之后再打到ElasticSearch的,所以可以从HIVE中捞取对应的数据重新打到ElasticSearch即可。但是这里有一个局限性的问题,因为HIVE表数量巨大,我们只保留的近7天的清洗结果,如果按照需求的时间跨度则需要重新清洗近3个月的数据,之后再将数据打到ES。

但是这样的话又会有另外一个问题,HIVE表数据在清洗过程中使用到的数据是会经常变动的,比如某个商品最近几天是挂靠在A品类下,但是上个月因为做活动是挂靠在B品类下的,这就导致事后隔了一段时间之后重新清洗的结果无法反应出当时这条数据正确的关联关系的情况。如果产品产品认可当前方案,那么也不失为可行方案之一。

那么,既然我们在原始的索引中有这部分数据,所以自然使用reindex命令来完成这一操作就最为方便了

代码

POST _reindex?wait_for_completion=false&slices=5&refresh
{
  "source": {
    "index": "SOURCE_ES_INDEX",
    "type": "SOURCE_ES_TYPE",
    "query": {
      "range": {
        "last_modify_time": {
          "gte": "2023-01-01 00:00:00",
          "lte": "2023-07-24 23:59:59"
        }
      }
    }
  },
  "dest": {
    "index": "TARGET_ES_INDEX",
    "type": "TARGET_ES_TYPE"
  }
}

简单说下这条语句,公司生产环境使用的ES版本为5.4.2的,所以除了index参数外,还需要有对应的type参数,但是Elasticsearch在 7.X版本中去除了type的概念,所以如果你的ES版本较新的话,可以不用这个type参数,即:

Continue reading