java实现请求缓冲合并的示例代码
业务背景:
我们对外提供了一个rest接口给第三方业务进行调用,但是由于第三方框架限制,导致会发送大量相似无效请求,例如:接口入参json包含两个字段,createBy和receiverList,完整的入参json示例如下:
{ "createBy": "aa", "receiverList": [ "bb", "cc" ] }
实际第三方业务会进行多次调用接口,每次传递的数据可能如下:
{ "createBy": "aa", "receiverList": [ "bb" ] } 或者 { "createBy": "aa", "receiverList": [ "cc" ] } 或者 { "createBy": "bb", "receiverList": [ "cc" ] } 或者 { "createBy": "aa", "receiverList": [ "bb", "cc" ] }
所有需要对第三方业务传递过来的数据进行缓冲合并处理,减轻真正的后台服务的压力。
代码实现
package com.demo; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; /** * Description: 请求合并管理类 */ @Slf4j @Component public class RequestMerger { // 线程池核心线程数 private final int corePoolSize = 200; // 任务执行超时时间,单位:毫秒 private final int timeout = 5 * 60 * 1000; // 队列,队列长度为Integer.MAX_VALUE private final LinkedBlockingQueue<String> requestQueue = new LinkedBlockingQueue<>(); // 定时器,所有任务共用线程池 private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(corePoolSize, new CustomizableThreadFactory("schedule-executor-")); // 是否关闭标志 private final AtomicBoolean isShutdown = new AtomicBoolean(false); /** * 构造函数,用于初始化请求合并器。 * * @param batchSize 每次合并的最大请求数量。 * @param delayMillis 合并请求的周期间隔,单位为毫秒。 */ public RequestMerger(int batchSize, long delayMillis) { // 启动定时器,定期合并请求,延迟delayMillis后开始,之后每隔delayMillis执行一次 scheduler.scheduleAtFixedRate(() -> { if (!isShutdown.get()) { List<String> batch = new ArrayList<>(batchSize); int drainedCount = requestQueue.drainTo(batch, batchSize); log.info("==>scheduler,drainedCount:{},nowQueueCount:{}", drainedCount, requestQueue.size()); if (!batch.isEmpty()) { // 异步执行任务,防止业务执行时间过长导致业务整体延迟过大 scheduler.submit(() -> { sendRequestBatch(batch); }); } } }, delayMillis, delayMillis, TimeUnit.MILLISECONDS); } /** * 发送请求批次的方法。 * * @param batch 请求批次。 */ private void sendRequestBatch(List<String> batch) { Future<?> future = scheduler.submit(() -> { try { // 在这里实现你的请求发送逻辑 // 可以使用HTTP客户端库(如Apache HttpClient或OkHttp)来发送请求 // ... System.out.println("Sending batch of " + batch.size() + " requests"); } catch (Exception e) { // 异常处理逻辑 System.err.println("Error sending requests: " + e.getMessage()); } }); // 尝试获取任务结果,如果超过超时时间则抛出TimeoutException异常,进行取消任务 try { // 超时时间,单位:毫秒 future.get(timeout, TimeUnit.MILLISECONDS); } catch (TimeoutException | ExecutionException e) { // 超时或执行异常时取消任务 future.cancel(true); } catch (Exception e) { log.error("==>任务执行异常", e); // 任务执行异常 future.cancel(true); } } /** * 在对象销毁前执行的关闭操作。 * 该方法从请求队列中拉取所有未处理的请求,并将它们批量发送。 * 无参数和返回值。 */ @PreDestroy public void shutdown() { isShutdown.set(true); List<String> batch = new ArrayList<>(); // 获取请求队列中的剩余所有请求 int drainedCount = requestQueue.drainTo(batch); log.info("==>shutdown,drainedCount:{},nowQueueCount:{}", drainedCount, requestQueue.size()); // 批量发送收集到的剩余请求 sendRequestBatch(batch); // 关闭定时执行器 scheduler.shutdown(); try { if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) { log.error("Scheduler did not terminate gracefully within 60 seconds, force shutting down."); scheduler.shutdownNow(); } } catch (InterruptedException e) { log.warn("Interrupted during scheduler termination, force shutting down."); scheduler.shutdownNow(); Thread.currentThread().interrupt(); } } /** * 向请求队列中添加一个请求。如果服务未关闭,则直接添加到请求队列中; * 如果服务已关闭,则将该请求作为一批请求发送。 * * @param request 要添加的请求字符串。 */ public void addRequest(String request) throws InterruptedException { // 检查服务是否已关闭 if (!isShutdown.get()) { // 未关闭,直接添加到请求队列 requestQueue.put(request); } else { // 已关闭,将当前请求作为一批发送 List<String> batch = new ArrayList<>(); batch.add(request); sendRequestBatch(batch); } } }
参考资料
注意:此代码容易导致数据丢失。例如:调用add方法将10个元素放入队列,但是真正获取到9个元素。
造成原因:FlushThread#add()中使用offer方法将数据放入队列,如果此时队列已满,返回值为false,实际数据未进入队列,需要额外对数据进行处理。
修改建议:调大队列长度,并且将offer方法改为put方法,保证数据最终进入队列。
到此这篇关于java实现请求缓冲合并的文章就介绍到这了,更多相关java请求缓冲合并内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
MyBatis insert语句返回主键和selectKey标签方式
这篇文章主要介绍了MyBatis insert语句返回主键和selectKey标签方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教2021-09-09详解如何在Java中重写equals()和hashCode()方法
在 Java 中,equals() 和 hashCode() 方法是 Object 类中定义的重要方法,它们用于比较对象的相等性以及计算对象的哈希值,本文将详细介绍如何在 Java 中重写 equals() 和 hashCode() 方法,并讨论其最佳实践,需要的朋友可以参考下2024-08-08
最新评论