java微信延迟支付的实现示例
1.需求
下单支付后,支付回调因部分因素不可达,导致订单状态与微信支付状态不一致。此时需要服务端主动查询订单支付状态,进行更改订单状态。
2.实现方式
基于定时任务
每隔30秒启动一次,找出最近10分钟内创建并且未支付的订单,调用微信查单接口核实订单状态。系统记录订单查询的次数,在n次查询之后状态还是未支付成功,则停止后续查询,并调用关单接口关闭订单。
基于延时队列
每隔5秒/30秒/1分钟/3分钟/5分钟/10分钟/30分钟调用查单接口,若查询付款成功,不再执行队列任务,最后一次查询若还是未返回支付成功状态,则停止后续查询,并调用《关单接口》关闭订单。
延时队列实现方式(这里不做详细描述,优缺点自行百度,本文采用redis实现)
- delayqueue
- RabbitMQ
- reids
3.具体代码
新建实体类 DelayQueueJob 与 ScoredSortedItem
/** * @Desecription: 延迟任务 * @Author: yangyu * @Date: 2021/9/10 14:35 */ @Data public class DelayQueueJob implements Serializable { /** * 延迟任务的唯一标识,用于检索任务 */ private long jobId; /** * 任务的执行时间段 */ private List<Long> delayTimeList; /** * 任务的执行次数 */ private Integer exCount = 0; /** * 任务的执行时间单位 */ private TimeUnit timeUnit = TimeUnit.SECONDS; /** * 任务的执行超时时间 */ private long timeout; /** * 订单编号,根据编号查询订单 */ private String orderCode; /** * 任务类型(具体业务类型) */ private Integer topic; /** * 任务状态 0:执行 1:结束 */ private int jobType = 0; }
/** * @Desecription: 延时任务 桶 * @Author: yangyu * @Date: 2021/9/10 14:55 */ @Data @AllArgsConstructor public class ScoredSortedItem implements Serializable { /** * 延迟任务的唯一标识 */ private long jobId; /** * 任务的执行时间 */ private long delayTime; }
定义常量
/** * @Desecription: 延时队列常量 * @Author: yangyu * @Date: 2021/9/10 14:44 */ public class DelayQueueConstant { /* * 延时任务池 * */ public static final String DELAY_QUEUE_JOB_POOL = "delayQueue:delayQueueJobPool:"; /* * 延时桶 * */ public static final String DELAY_BUCKET_KEY_PREFIX = "delayQueue:delayBucket:"; /* * 任务的执行时间段 * */ public static final List<Long> SLOT_DELAY_TIME = Arrays.asList(5L, 30L, 60L, 180L, 300L, 600L, 1800L); /* * 任务的执行固定时间 * */ public static final List<Long> FIXED_DELAY_TIME = Arrays.asList(1800L); }
zset有序队列操作类
/** * @Desecription: 以时间为维度的有序队列zset 操作类,参考redisson_delay_queue_timeout * @Author: yangyu * @Date: 2021/9/10 14:50 */ @Component public class DelayBucket { @Autowired private RedissonClient redissonClient; /** * @Desecription: 添加jobId到延迟任务桶中 * @Param: key * @Param: jobId * @Param: delayTimeList * @Return: * @Author: yangyu * @Date: 2021/9/10 14:52 */ public void addToBucket(String key, Long jobId, List<Long> delayTimeList, TimeUnit timeUnit) { RScoredSortedSet<ScoredSortedItem> scoredSortedSet = redissonClient.getScoredSortedSet(DelayQueueConstant.DELAY_BUCKET_KEY_PREFIX + key); long millis = System.currentTimeMillis(); for (int i = 0; i < delayTimeList.size(); i++) { ScoredSortedItem scoredSortedItem = new ScoredSortedItem(jobId, millis + timeUnit.toMillis(delayTimeList.get(i))); scoredSortedSet.add(scoredSortedItem.getDelayTime(), scoredSortedItem); } } /** * @Desecription: 从延迟任务桶中获取延迟时间最小的 jodId * @Param: jobIdKey * @Return: ScoredSortedItem * @Author: yangyu * @Date: 2021/9/10 15:35 */ public ScoredSortedItem getFromBucket(String key) { RScoredSortedSet<ScoredSortedItem> scoredSortedSet = redissonClient.getScoredSortedSet(DelayQueueConstant.DELAY_BUCKET_KEY_PREFIX + key); if (scoredSortedSet.size() == 0) { return null; } return scoredSortedSet.first(); } /** * @Desecription: 从延迟任务桶中删除 jod * @Param: key * @Param scoredSortedItem * @Return: * @Author: yangyu * @Date: 2021/9/10 14:52 */ public void deleteFormBucket(String key, ScoredSortedItem scoredSortedItem) { RScoredSortedSet<ScoredSortedItem> scoredSortedSet = redissonClient.getScoredSortedSet(DelayQueueConstant.DELAY_BUCKET_KEY_PREFIX + key); scoredSortedSet.remove(scoredSortedItem); } }
延时任务类
/** * @Desecription: 操作延时任务 * @Author: yangyu * @Date: 2021/9/10 14:43 */ @Component public class DelayQueueJobPool { @Autowired private RedissonClient redissonClient; /** * @Desecription: 添加延时队列job * @Param: delayQueueJob * @Author: yangyu * @Date: 2021/9/10 14:43 */ public void addDelayQueueJob(DelayQueueJob delayQueueJob, String key) { RMap<Long, DelayQueueJob> rMap = redissonClient.getMap(DelayQueueConstant.DELAY_QUEUE_JOB_POOL + key); rMap.put(delayQueueJob.getJobId(), delayQueueJob); } /** * @Desecription: 删除延时队列job * @Param: jobId * @Author: yangyu * @Date: 2021/9/10 14:46 */ public void deleteDelayQueueJob(Long jobId, String key) { RMap<Long, DelayQueueJob> rMap = redissonClient.getMap(DelayQueueConstant.DELAY_QUEUE_JOB_POOL + key); rMap.remove(jobId); } /** * @Desecription: 查询延时队列job * @Param: jobId * @Return: * @Author: yangyu * @Date: 2021/9/10 17:04 */ public DelayQueueJob getDelayQueueJob(Long jobId, String key) { RMap<Long, DelayQueueJob> rMap = redissonClient.getMap(DelayQueueConstant.DELAY_QUEUE_JOB_POOL + key); return rMap.get(jobId); } /** * @Desecription: 修改延时队列job * @Param: jobId * @Author: yangyu * @Date: 2021/9/10 14:46 */ /** * @Desecription: 描述方法 * @Param: jobId * @Param: jobId * @Return: * @Author: yangyu * @Date: 2021/9/13 11:09 */ public void updateDelayQueueJob(DelayQueueJob delayQueueJob, String key) { RMap<Long, DelayQueueJob> rMap = redissonClient.getMap(DelayQueueConstant.DELAY_QUEUE_JOB_POOL + key); rMap.replace(delayQueueJob.getJobId(), delayQueueJob); } }
封装api
@Log4j2 @Component public class RedisDelayedQueue { @Autowired private DelayQueueJobPool delayQueueJobPool; @Autowired private DelayBucket delayBucket; /** * @Desecription: 添加延迟任务到延迟队列 * @Param: * @Author: yangyu * @Date: 2021/9/10 14:41 */ public void push(DelayQueueJob delayQueueJob, String simpleName) { delayQueueJobPool.addDelayQueueJob(delayQueueJob, simpleName); delayBucket.addToBucket(simpleName, delayQueueJob.getJobId(), delayQueueJob.getDelayTimeList(), delayQueueJob.getTimeUnit()); } }
队列事件监听接口
/** * @Desecription: 队列事件监听接口 * @Author: yangyu * @Date: 2021/9/10 10:11 */ public interface RedisDelayedQueueListenerService<T> { void invoke(T t); }
两个接口实现类,主要分固定时间和延时时间,具体业务代码就不贴出来了
/** * * @Desecription: 延时队列监听 * @Author: yangyu * @Date: 2021/9/10 14:32 */ @Log4j2 @Service public class DelayedQueueSlotTime implements RedisDelayedQueueListenerService<DelayQueueJob> { @Autowired private WeChatPay chatPay; @Autowired private CrcxPayDetailService crcxPayDetailService; @Autowired private CrcxOrderService crcxOrderService; @Autowired private DelayQueueJobPool delayQueueJobPool; @Override public void invoke(DelayQueueJob delayQueueJob) { log.info("时间段执行:{}", delayQueueJob); // topic == 1 查单延时任务 if (delayQueueJob.getTopic() == 1) { // 1.通过订单编号,调用微信查单接口,查询微信订单支付状态。 try { Map<String, String> queryOrder = chatPay.getQueryOrder(delayQueueJob.getOrderCode()); // 2.若已付款,对比系统订单状态,未付款修改订单状态,付款不做处理, String tradeState = queryOrder.get("tradeState"); if (tradeState.equalsIgnoreCase("SUCCESS")) { CrcxOrder crcxOrder = new CrcxOrder(); crcxOrder.setOrderCode(delayQueueJob.getOrderCode()); crcxOrder = crcxOrderService.getCrcxOrderByCode(crcxOrder); if (crcxOrder.getIsPay() == 0) { // 3.通过订单编号 查 crcx_pay_detail 支付单-明细 CrcxPayDetail crcxPayDetail = new CrcxPayDetail(); crcxPayDetail.setTradeCode(delayQueueJob.getOrderCode()); crcxPayDetail = crcxPayDetailService.getPayDetail(crcxPayDetail); // 4.进行修改订单状态 int update = crcxOrderService.updateOrderStatus(crcxPayDetail, queryOrder); if (update >= 1) { log.info("延时队列:修改订单支付状态成功"); } else { log.error("延时队列:修改订单支付状态失败"); } } // 5.标记任务不需要再进业务逻辑。因为再 zset 中无法通过jobId 或者 订单编号获取对应的List delayQueueJob.setJobType(1); delayQueueJobPool.updateDelayQueueJob(delayQueueJob, this.getClass().getSimpleName()); } // 6.非已付款,不做任何处理,继续执行延时任务,直到执行完毕,或者订单已付款。 } catch (Exception e) { e.printStackTrace(); } } } } /** * * @Desecription: 固定时间队列监听 * @Author: yangyu * @Date: 2021/9/10 13:18 */ @Log4j2 @Service public class DelayedQueueFixedTime implements RedisDelayedQueueListenerService<DelayQueueJob> { @Autowired private CrcxOrderService crcxOrderService; @Override public void invoke(DelayQueueJob delayQueueJob) { log.info("固定时间执行...." + delayQueueJob.getJobId()); if (delayQueueJob != null) { // 查询订单状态是否未支付状态 CrcxOrder crcxOrder = new CrcxOrder(); crcxOrder.setOrderCode(delayQueueJob.getOrderCode()); crcxOrder = crcxOrderService.getCrcxOrderByCode(crcxOrder); if (crcxOrder.getIsPay() == 0) { // 将待支付的订单改为已取消(超时未支付) crcxOrder.setStatusCode(50); int ref = crcxOrderService.orderPaidTimeout(crcxOrder); if (ref == 0) throw new CustomException("自动取消订单失败"); } } } }
初始化队列监听
/** * @Desecription: 初始化队列监听 * @Author: yangyu * @Date: 2021/9/10 9:31 */ @Log4j2 @Component public class RedisDelayedQueueInit implements ApplicationContextAware { @Autowired private DelayBucket delayBucket; @Autowired private DelayQueueJobPool delayQueueJobPool; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { Map<String, RedisDelayedQueueListenerService> map = applicationContext.getBeansOfType(RedisDelayedQueueListenerService.class); for (Map.Entry<String, RedisDelayedQueueListenerService> taskEventListenerEntry : map.entrySet()) { String listenerName = taskEventListenerEntry.getValue().getClass().getSimpleName(); startThread(listenerName, taskEventListenerEntry.getValue()); } } /** * @Desecription: 启动线程获取队列 * @Param: queueName * @Param: redisDelayedQueueListenerService 任务回调监听 * @Return: * @Author: yangyu * @Date: 2021/9/10 11:32 */ private <T> void startThread(String queueName, RedisDelayedQueueListenerService redisDelayedQueueListenerService) { Thread thread = new Thread(() -> { // 有时间误差,一般在一秒左右 log.info("启动监听队列线程:{}", queueName); while (true) { try { ScoredSortedItem item = delayBucket.getFromBucket(queueName); // 没有任务就堵塞 if (item == null) { Thread.sleep(1000); continue; } // 延迟时间没到 if (item.getDelayTime() > System.currentTimeMillis()) { Thread.sleep(1000); continue; } DelayQueueJob delayQueueJob = delayQueueJobPool.getDelayQueueJob(item.getJobId(), queueName); // 延迟任务数据不存在 if (delayQueueJob == null) { log.info("延迟任务数据不存在·····"); delayBucket.deleteFormBucket(queueName, item); Thread.sleep(1000); continue; } // 如果后面任务不需要执行,不走业务逻辑,删除正常执行 if (delayQueueJob.getJobType() == 0) { new Thread(() -> { DelayQueueJob job = new DelayQueueJob(); BeanUtils.copyProperties(delayQueueJob, job); redisDelayedQueueListenerService.invoke(job); }).start(); } Integer exCount = delayQueueJob.getExCount(); if (exCount == 1) { delayQueueJobPool.deleteDelayQueueJob(delayQueueJob.getJobId(), queueName); delayBucket.deleteFormBucket(queueName, item); continue; } delayQueueJob.setExCount(--exCount); delayQueueJobPool.updateDelayQueueJob(delayQueueJob, queueName); delayBucket.deleteFormBucket(queueName, item); } catch (Exception e) { log.error("监听队列线程错误:", e); try { Thread.sleep(10000); } catch (InterruptedException ex) { } } } }); thread.setName(queueName); thread.start(); } }
主要使用到的依赖包
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.10.5</version> </dependency>
至此整个redis延时队列就实现完成。
到此这篇关于java微信延迟支付的实现示例的文章就介绍到这了,更多相关java微信延迟支付内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
Spring @DateTimeFormat日期格式化时注解场景分析
这篇文章主要介绍了Spring @DateTimeFormat日期格式化时注解场景分析,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下2023-05-05Intellij IDEA中如何查看maven项目中所有jar包的依赖关系图
这篇文章主要介绍了Intellij IDEA中如何查看maven项目中所有jar包的依赖关系图,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧2020-05-05
最新评论