基于RocketMQ实现分布式事务的方法
背景
在一个微服务架构的项目中,一个业务操作可能涉及到多个服务,这些服务往往是独立部署,构成一个个独立的系统。这种分布式的系统架构往往面临着分布式事务的问题。为了保证系统数据的一致性,我们需要确保这些服务中的操作要么全部成功,要么全部失败。通过使用RocketMQ实现分布式事务,我们可以协调这些服务的操作,保证数据的一致性。
功能原理
RocketMQ的分布式事务消息功能,在普通消息基础上,支持二阶段的提交。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。
整个事务消息的详细交互流程如下图所示:
1、生产者将消息发送至RocketMQ服务端。
2、RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。
3、生产者开始执行本地事务逻辑。
4、生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
5、在断网或者是生产者应用重启的特殊情况下,若服务端未收到生产者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者集群中任一生产者实例发起消息回查。
6、生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
7、生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。
注意问题
消息类型事务消息仅支持在MessageType为Transaction的主题使用,即事务消息只能发送至类型为事务消息的主题中。
消息消费RocketMQ事务消息保证生产者本地事务和下游消息发送事务的一致性,但不保证消息消费结果和上游事务的一致性。因此需要下游业务自行保证消息正确处理,建议消费端做好消费重试。
中间状态RocketMQ事务消息一致性为最终一致性,即在消息提交到下游消费端处理完成之前,下游和上游事务之间的状态会不一致。因此,事务消息仅适合能接受异步执行的场景。
事务超时RocketMQ事务消息的生命周期存在超时机制,即半事务消息被生产者发送服务端后,如果在指定时间内服务端无法确认提交或者回滚状态,则消息默认会被回滚。
示例代码
以下为RocketMQ 4.x版本事务消息示例代码,
import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import java.util.concurrent.*; public class RocketMqTransactionDemo { public static void main(String[] args) throws Exception { // 创建事务消息生产者 TransactionMQProducer producer = new TransactionMQProducer("transaction_producer"); producer.setNamesrvAddr("127.0.0.1:9876"); // 设置事务监听器 TransactionListener transactionListener = new MyTransactionListener(); producer.setTransactionListener(transactionListener); // 设置事务回查的线程池,可以不必设置,如果不设置也会默认生成一个 ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue <Runnable> (2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; } }); producer.setExecutorService(executorService); // 启动生产者 producer.start(); // 发送事务消息 Message message = new Message("transaction_topic", "test_tag", "test_key", "Hello RocketMQ".getBytes()); producer.sendMessageInTransaction(message, null); // 关闭生产者 producer.shutdown(); } } /** * 事务监听器 */ class MyTransactionListener implements TransactionListener { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { // 执行本地事务操作 System.out.println("执行本地事务操作,消息内容:" + new String(msg.getBody())); return LocalTransactionState.COMMIT_MESSAGE; // 提交事务,允许消费者消费该消息 // return LocalTransactionState.ROLLBACK_MESSAGE;// 回滚事务,消息将被丢弃不允许消费。 // return LocalTransactionState.UNKNOW;// 暂时无法判断状态,等待固定时间以后Broker端根据回查规则向生产者进行消息回查。 } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { // 检查本地事务状态 System.out.println("检查本地事务状态,消息内容:" + new String(msg.getBody())); return LocalTransactionState.COMMIT_MESSAGE; } }
代码解释:
1、事务消息的生产者使用TransactionMQProducer
创建。
2、MyTransactionListener
作为事务监听器,实现了接口TransactionListener
,该接口有两个方法,分别是:
executeLocalTransaction
:
半事务消息发送成功后,执行本地事务的方法,具体执行完本地事务后,可以在该方法中返回以下三种状态:
LocalTransactionState.COMMIT_MESSAGE: 提交事务,允许消费者消费该消息。
LocalTransactionState.ROLLBACK_MESSAGE: 回滚事务,消息将被丢弃不允许消费。
LocalTransactionState.UNKNOW: 暂时无法判断状态,等待固定时间以后RocketMQ服务端根据回查规则向生产者进行消息回查。checkLocalTransaction
:
二次确认消息没有收到,RocketMQ服务端回查生产者端事务结果的方法。回查规则:本地事务执行完成后,若RocketMQ服务端收到的本地事务返回状态为LocalTransactionState.UNKNOW,或生产者应用退出导致本地事务未提交任何状态。则RocketMQ服务端会向消息生产者发起事务回查,第一次回查后仍未获取到事务状态,则之后每隔一段时间会再次回查。
到此这篇关于基于RocketMQ实现分布式事务的文章就介绍到这了,更多相关RocketMQ分布式事务内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
Java报错:ClassCastException问题解决方法
异常是程序中的一些错误,但并不是所有的错误都是异常,并且错误有时候是可以避免的,下面这篇文章主要给大家介绍了关于Java报错:ClassCastException问题解决方法,需要的朋友可以参考下2024-07-07@Controller、@RestController注解区别详解
这篇文章主要介绍了@Controller、@RestController注解区别详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧2019-10-10SpringBoot整合RabbitMQ处理死信队列和延迟队列
这篇文章将通过示例为大家详细介绍SpringBoot整合RabbitMQ时如何处理死信队列和延迟队列,文中的示例代码讲解详细,需要的可以参考一下2022-05-05
最新评论