RocketMQ事务消息机制详解
RocketMQ事务消息
RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致,从而实现了可靠消息服务。
一、事务消息的实现步骤
事务消息发送步骤:
1. 发送方将半事务消息发送至RocketMQ服务端。
2. RocketMQ服务端将消息持久化之后,向发送方返回Ack确认消息已经发送成功。由于消息为半事务消息,在未收到生产者对该消息的二次确认前,此消息被标记成“暂不能投递”状态。
3. 发送方开始执行本地事务逻辑。
4. 发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到Commit 状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息,订阅方将不会接受该消息。
事务消息回查步骤:
1. 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
2. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。 3. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。
二、程序实现
事务消息处理类需要继承RocketMQLocalTransactionListener类。该类的executeLocalTransaction方法负责在接到RocketMQ服务端的Ack确认消息后执行本地方法,也就是事务消息发送步骤中的步骤3。该类的checkLocalTransaction方法负责,在断网或者是应用重启的特殊情况下,执行RocketMQ服务端的消息回查,也就是事务消息回查步骤中的步骤2。
此外,要使该类生效,还需要加@RocketMQTransactionListener注解。这里有个要特别注意的地方。在2.1.0版本前,这个注解有一个属性txProducerGroup,可以用多个@RocketMQTransactionListener来监听不同的txProducerGroup来发送不同类型的事务消息到topic。但是现在在一个项目中,如果你在一个project中写了多个@RocketMQTransactionListener,项目将不能启动,启动会报错。产生这个问题的原因据说是,当使用RocketMQTemplate并发的执行事务时,非常容易出现"illegal state"的异常,原因是一个TransactionProducer在执行事务时不能被共享。所以,必须使用同一个TransactionMQProducer来发送所有类型的事务消息。当然同理也就必须使用一个侦听器处理所有的消息了。
既然必须使用同一个TransactionMQProducer,对于比较大的应用,业务场景很多,就会造成混乱。这里我给出一个方案抛砖引玉。TransactionMQProducer在发送消息时,是可以传递参数对象和指定消息头的。可以把要执行的本地方法的bean名和方法名放进去。
//发送半事务消息 TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction( topicAndTag, MessageBuilder.withPayload(msg) .setHeader(Constants.TX_ID_HEADER_NAME, msg.getTxId()) .setHeader(Constants.CHECK_BEAN_ID_HEADER_NAME, def.getCheckBeanId()) .setHeader(Constants.BIZ_ID_HEADER_NAME, msg.getBizId()) .build(), def );
其中def就是参数对象,可以自定义对象,这里是我自定义的TransactionMsgDefinationDto类,可以把想传递的信息放进去,最重要的是要执行的本地方法的bean名和方法名和方法执行参数:executeBeanId(bean名)、executeBeanMethod(方法名)、executeBeanParams(方法执行参数)。该对象可以传给RocketMQLocalTransactionListener的executeLocalTransaction方法,然后通过反射执行。
@Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { //保存消息记录 String body = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8); JSONObject jsonBody = JSONObject.parseObject(body); BaseMsgDto dto = JSONObject.toJavaObject(jsonBody, BaseMsgDto.class);//(BaseMsgDto)msg.getPayload(); TransactionMsgDefinationDto def = (TransactionMsgDefinationDto)arg; ProducerLog producerLog = BeanCopyUtils.copyProperties(def, ProducerLog::new); String[] tags = def.getMsgTags(); if(tags !=null && tags.length > 0) { StringBuilder tag = new StringBuilder(); for(int i = 0; i<tags.length; i++) { tag.append(tags[0]); if(i != tags.length-1) { tag.append("||"); } } producerLog.setMsgTag(tag.toString()); } producerLog.setBizId(dto.getBizId()); producerLog.setTxId(dto.getTxId()); producerLog.setBizType(dto.getBizType()); producerLog.setGroupName(dto.getProducerGroup()); producerLog.setMsgBody(body); producerLogService.save(producerLog); //执行事务方法 SpringUtil.invokeBeanMethod(def.getExecuteBeanId(), def.getExecuteBeanMethod(), def.getExecuteBeanParams()); return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { logger.error("发生错误:", e); return RocketMQLocalTransactionState.UNKNOWN; } }
放在消息头header中的数据可以传递给RocketMQLocalTransactionListener的checkLocalTransaction方法,然后同样通过反射执行。
@Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { try { String txId = (String)msg.getHeaders().get(Constants.TX_ID_HEADER_NAME); String checkBeanId = (String)msg.getHeaders().get(Constants.CHECK_BEAN_ID_HEADER_NAME); Long bizId = Long.parseLong((String)msg.getHeaders().get(Constants.BIZ_ID_HEADER_NAME)); //执行检查方法 Boolean ret = (Boolean)SpringUtil.invokeBeanMethod(checkBeanId, "check", new Object[]{bizId, txId}); if(ret.booleanValue()) return RocketMQLocalTransactionState.COMMIT; else return RocketMQLocalTransactionState.ROLLBACK; } catch (Exception e) { logger.error("发生错误:", e); return RocketMQLocalTransactionState.UNKNOWN; } }
到此这篇关于RocketMQ事务消息机制详解的文章就介绍到这了,更多相关RocketMQ事务消息内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
解决Springboot2.1.x配置Activiti7单独数据源问题
这篇文章主要介绍了Springboot2.1.x配置Activiti7单独数据源问题,非常不错,具有一定的参考借鉴价值,需要的朋友可以参考下2019-09-09Java 8 的异步编程利器 CompletableFuture的实例详解
这篇文章主要介绍了Java 8 的异步编程利器 CompletableFuture 详解,本文通过一个例子给大家介绍下Java 8 CompletableFuture异步编程的相关知识,需要的朋友可以参考下2022-03-03SpringBoot 中使用 Validation 校验参数的方法详解
Validation 是用于检查程序代码中参数的有效性的框架,作为 Spring 框架中的一个参数校验工具,集成在 spring-context 包中,这篇文章主要介绍了SpringBoot 中使用 Validation 校验参数,需要的朋友可以参考下2022-05-05Spring框架JavaMailSender发送邮件工具类详解
这篇文章主要为大家详细介绍了Spring框架JavaMailSender发送邮件工具类,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下2019-04-04Spring注解驱动之BeanPostProcessor后置处理器讲解
这篇文章主要介绍了Spring注解驱动之BeanPostProcessor后置处理器讲解,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教2022-09-09
最新评论