RocketMQ事务消息机制详解

 更新时间:2024年01月11日 09:28:50   作者:智由静生  
这篇文章主要介绍了RocketMQ事务消息机制详解,RocketMQ服务端将消息持久化之后,向发送方返回Ack确认消息已经发送成功,由于消息为半事务消息,在未收到生产者对该消息的二次确认前,此消息被标记成"暂不能投递"状态,需要的朋友可以参考下

RocketMQ事务消息

RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致,从而实现了可靠消息服务。

一、事务消息的实现步骤

事务消息发送步骤:

1. 发送方将半事务消息发送至RocketMQ服务端。

2. RocketMQ服务端将消息持久化之后,向发送方返回Ack确认消息已经发送成功。由于消息为半事务消息,在未收到生产者对该消息的二次确认前,此消息被标记成“暂不能投递”状态。

3. 发送方开始执行本地事务逻辑。

4. 发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到Commit 状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息,订阅方将不会接受该消息。

事务消息回查步骤:

1. 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。

2. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。 3. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。

二、程序实现

事务消息处理类需要继承RocketMQLocalTransactionListener类。该类的executeLocalTransaction方法负责在接到RocketMQ服务端的Ack确认消息后执行本地方法,也就是事务消息发送步骤中的步骤3。该类的checkLocalTransaction方法负责,在断网或者是应用重启的特殊情况下,执行RocketMQ服务端的消息回查,也就是事务消息回查步骤中的步骤2。

此外,要使该类生效,还需要加@RocketMQTransactionListener注解。这里有个要特别注意的地方。在2.1.0版本前,这个注解有一个属性txProducerGroup,可以用多个@RocketMQTransactionListener来监听不同的txProducerGroup来发送不同类型的事务消息到topic。但是现在在一个项目中,如果你在一个project中写了多个@RocketMQTransactionListener,项目将不能启动,启动会报错。产生这个问题的原因据说是,当使用RocketMQTemplate并发的执行事务时,非常容易出现"illegal state"的异常,原因是一个TransactionProducer在执行事务时不能被共享。所以,必须使用同一个TransactionMQProducer来发送所有类型的事务消息。当然同理也就必须使用一个侦听器处理所有的消息了。

既然必须使用同一个TransactionMQProducer,对于比较大的应用,业务场景很多,就会造成混乱。这里我给出一个方案抛砖引玉。TransactionMQProducer在发送消息时,是可以传递参数对象和指定消息头的。可以把要执行的本地方法的bean名和方法名放进去。

//发送半事务消息
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
		topicAndTag,
		MessageBuilder.withPayload(msg)
			.setHeader(Constants.TX_ID_HEADER_NAME, msg.getTxId())
			.setHeader(Constants.CHECK_BEAN_ID_HEADER_NAME, def.getCheckBeanId())
			.setHeader(Constants.BIZ_ID_HEADER_NAME, msg.getBizId())
			.build(),
		def
);

其中def就是参数对象,可以自定义对象,这里是我自定义的TransactionMsgDefinationDto类,可以把想传递的信息放进去,最重要的是要执行的本地方法的bean名和方法名和方法执行参数:executeBeanId(bean名)、executeBeanMethod(方法名)、executeBeanParams(方法执行参数)。该对象可以传给RocketMQLocalTransactionListener的executeLocalTransaction方法,然后通过反射执行。

@Override
	public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
		try {
			//保存消息记录
			String body = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
			JSONObject jsonBody = JSONObject.parseObject(body);
			BaseMsgDto dto = JSONObject.toJavaObject(jsonBody, BaseMsgDto.class);//(BaseMsgDto)msg.getPayload();
			TransactionMsgDefinationDto def = (TransactionMsgDefinationDto)arg;
			ProducerLog producerLog = BeanCopyUtils.copyProperties(def, ProducerLog::new);
			String[] tags = def.getMsgTags();
			if(tags !=null && tags.length > 0) {
				StringBuilder tag = new StringBuilder();
				for(int i = 0; i<tags.length; i++) {
					tag.append(tags[0]);
					if(i != tags.length-1) {
						tag.append("||");
					}
				}
				producerLog.setMsgTag(tag.toString());
			}
			producerLog.setBizId(dto.getBizId());
			producerLog.setTxId(dto.getTxId());
			producerLog.setBizType(dto.getBizType());
			producerLog.setGroupName(dto.getProducerGroup());
			producerLog.setMsgBody(body);
			producerLogService.save(producerLog);
			//执行事务方法
			SpringUtil.invokeBeanMethod(def.getExecuteBeanId(), def.getExecuteBeanMethod(), def.getExecuteBeanParams());
			return RocketMQLocalTransactionState.COMMIT;
		} catch (Exception e) {
			logger.error("发生错误:", e);
			return RocketMQLocalTransactionState.UNKNOWN;
		}
	}

放在消息头header中的数据可以传递给RocketMQLocalTransactionListener的checkLocalTransaction方法,然后同样通过反射执行。

@Override
	public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
		try {
			String txId = (String)msg.getHeaders().get(Constants.TX_ID_HEADER_NAME); 
			String checkBeanId = (String)msg.getHeaders().get(Constants.CHECK_BEAN_ID_HEADER_NAME);
			Long bizId = Long.parseLong((String)msg.getHeaders().get(Constants.BIZ_ID_HEADER_NAME));
			//执行检查方法
			Boolean ret = (Boolean)SpringUtil.invokeBeanMethod(checkBeanId, "check", new Object[]{bizId, txId});
			if(ret.booleanValue())
				return RocketMQLocalTransactionState.COMMIT;
			else
				return RocketMQLocalTransactionState.ROLLBACK;
		} catch (Exception e) {
			logger.error("发生错误:", e);
			return RocketMQLocalTransactionState.UNKNOWN;
		}
	}

到此这篇关于RocketMQ事务消息机制详解的文章就介绍到这了,更多相关RocketMQ事务消息内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 解决Springboot2.1.x配置Activiti7单独数据源问题

    解决Springboot2.1.x配置Activiti7单独数据源问题

    这篇文章主要介绍了Springboot2.1.x配置Activiti7单独数据源问题,非常不错,具有一定的参考借鉴价值,需要的朋友可以参考下
    2019-09-09
  • Java异常处理深入理解

    Java异常处理深入理解

    这篇文章主要介绍了java项目常用异常处理汇总,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2021-07-07
  • Java 8 的异步编程利器 CompletableFuture的实例详解

    Java 8 的异步编程利器 CompletableFuture的实例详解

    这篇文章主要介绍了Java 8 的异步编程利器 CompletableFuture 详解,本文通过一个例子给大家介绍下Java 8  CompletableFuture异步编程的相关知识,需要的朋友可以参考下
    2022-03-03
  • spring自定义注解实现拦截器的实现方法

    spring自定义注解实现拦截器的实现方法

    本篇文章主要介绍了spring自定义注解实现拦截器的实现方法,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-08-08
  • SpringBoot 中使用 Validation 校验参数的方法详解

    SpringBoot 中使用 Validation 校验参数的方法详解

    Validation 是用于检查程序代码中参数的有效性的框架,作为 Spring 框架中的一个参数校验工具,集成在 spring-context 包中,这篇文章主要介绍了SpringBoot 中使用 Validation 校验参数,需要的朋友可以参考下
    2022-05-05
  • Spring框架JavaMailSender发送邮件工具类详解

    Spring框架JavaMailSender发送邮件工具类详解

    这篇文章主要为大家详细介绍了Spring框架JavaMailSender发送邮件工具类,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-04-04
  • Java多线程中的Phaser详解

    Java多线程中的Phaser详解

    这篇文章主要介绍了Java多线程中的Phaser详解,Pahser是一个可以重复使用的同步屏障,Phaser是按照不同阶段执行线程的,它本身维护着一个叫 phase 的成员变量代表当前执行的阶段,需要的朋友可以参考下
    2023-11-11
  • Spring注解驱动之BeanPostProcessor后置处理器讲解

    Spring注解驱动之BeanPostProcessor后置处理器讲解

    这篇文章主要介绍了Spring注解驱动之BeanPostProcessor后置处理器讲解,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-09-09
  • 剖析SpringCloud Feign中所隐藏的坑

    剖析SpringCloud Feign中所隐藏的坑

    这篇文章主要为大家介绍了剖析SpringCloud Feign中所隐藏的坑示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-08-08
  • java使用反射访问成员变量的值示例

    java使用反射访问成员变量的值示例

    这篇文章主要介绍了java使用反射访问成员变量的值,结合实例形式分析了java基于反射机制操作类成员变量相关实现技巧,需要的朋友可以参考下
    2019-07-07

最新评论