基于RocketMQ实现分布式事务的方法

 更新时间:2024年03月12日 09:33:41   作者:Y..  
了保证系统数据的一致性,我们需要确保这些服务中的操作要么全部成功,要么全部失败,通过使用RocketMQ实现分布式事务,我们可以协调这些服务的操作,保证数据的一致性,这篇文章主要介绍了基于RocketMQ实现分布式事务,需要的朋友可以参考下

背景

在一个微服务架构的项目中,一个业务操作可能涉及到多个服务,这些服务往往是独立部署,构成一个个独立的系统。这种分布式的系统架构往往面临着分布式事务的问题。为了保证系统数据的一致性,我们需要确保这些服务中的操作要么全部成功,要么全部失败。通过使用RocketMQ实现分布式事务,我们可以协调这些服务的操作,保证数据的一致性。

功能原理

RocketMQ的分布式事务消息功能,在普通消息基础上,支持二阶段的提交。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。

整个事务消息的详细交互流程如下图所示:

1、生产者将消息发送至RocketMQ服务端。

2、RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。

3、生产者开始执行本地事务逻辑。

4、生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:

  • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。

  • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。

5、在断网或者是生产者应用重启的特殊情况下,若服务端未收到生产者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者集群中任一生产者实例发起消息回查。

6、生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

7、生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

注意问题

消息类型事务消息仅支持在MessageType为Transaction的主题使用,即事务消息只能发送至类型为事务消息的主题中。

消息消费RocketMQ事务消息保证生产者本地事务和下游消息发送事务的一致性,但不保证消息消费结果和上游事务的一致性。因此需要下游业务自行保证消息正确处理,建议消费端做好消费重试。

中间状态RocketMQ事务消息一致性为最终一致性,即在消息提交到下游消费端处理完成之前,下游和上游事务之间的状态会不一致。因此,事务消息仅适合能接受异步执行的场景。

事务超时RocketMQ事务消息的生命周期存在超时机制,即半事务消息被生产者发送服务端后,如果在指定时间内服务端无法确认提交或者回滚状态,则消息默认会被回滚。

示例代码

以下为RocketMQ 4.x版本事务消息示例代码,

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.concurrent.*;

public class RocketMqTransactionDemo {
	public static void main(String[] args) throws Exception {
		// 创建事务消息生产者
		TransactionMQProducer producer = new TransactionMQProducer("transaction_producer");
		producer.setNamesrvAddr("127.0.0.1:9876");

		// 设置事务监听器
		TransactionListener transactionListener = new MyTransactionListener();
		producer.setTransactionListener(transactionListener);

		// 设置事务回查的线程池,可以不必设置,如果不设置也会默认生成一个
		ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue <Runnable> (2000), new ThreadFactory() {
			@Override
			public Thread newThread(Runnable r) {
				Thread thread = new Thread(r);
				thread.setName("client-transaction-msg-check-thread");
				return thread;
			}
		});
		producer.setExecutorService(executorService);

		// 启动生产者
		producer.start();

		// 发送事务消息
		Message message = new Message("transaction_topic", "test_tag", "test_key", "Hello RocketMQ".getBytes());
		producer.sendMessageInTransaction(message, null);

		// 关闭生产者
		producer.shutdown();
	}
}

/**
 * 事务监听器
 */
class MyTransactionListener implements TransactionListener {
	@Override
	public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
		// 执行本地事务操作
		System.out.println("执行本地事务操作,消息内容:" + new String(msg.getBody()));
		return LocalTransactionState.COMMIT_MESSAGE; // 提交事务,允许消费者消费该消息
		// return LocalTransactionState.ROLLBACK_MESSAGE;// 回滚事务,消息将被丢弃不允许消费。
		// return LocalTransactionState.UNKNOW;// 暂时无法判断状态,等待固定时间以后Broker端根据回查规则向生产者进行消息回查。
	}

	@Override
	public LocalTransactionState checkLocalTransaction(MessageExt msg) {
		// 检查本地事务状态
		System.out.println("检查本地事务状态,消息内容:" + new String(msg.getBody()));
		return LocalTransactionState.COMMIT_MESSAGE;
	}
}

代码解释:
1、事务消息的生产者使用TransactionMQProducer创建。
2、MyTransactionListener作为事务监听器,实现了接口TransactionListener,该接口有两个方法,分别是:

  • executeLocalTransaction
    半事务消息发送成功后,执行本地事务的方法,具体执行完本地事务后,可以在该方法中返回以下三种状态:
    LocalTransactionState.COMMIT_MESSAGE: 提交事务,允许消费者消费该消息。
    LocalTransactionState.ROLLBACK_MESSAGE: 回滚事务,消息将被丢弃不允许消费。
    LocalTransactionState.UNKNOW: 暂时无法判断状态,等待固定时间以后RocketMQ服务端根据回查规则向生产者进行消息回查。

  • checkLocalTransaction
    二次确认消息没有收到,RocketMQ服务端回查生产者端事务结果的方法。回查规则:本地事务执行完成后,若RocketMQ服务端收到的本地事务返回状态为LocalTransactionState.UNKNOW,或生产者应用退出导致本地事务未提交任何状态。则RocketMQ服务端会向消息生产者发起事务回查,第一次回查后仍未获取到事务状态,则之后每隔一段时间会再次回查。

到此这篇关于基于RocketMQ实现分布式事务的文章就介绍到这了,更多相关RocketMQ分布式事务内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 解决无法解析javax.servlet的方法

    解决无法解析javax.servlet的方法

    最近在创建一个servlet时,自动生成的代码中出现servlet无法解析的提示,令我无法正常使用servlet里的方法,在对各个步骤进行查看后,发现了问题所在,需要的朋友可以参考下
    2021-05-05
  • JAVA JNI函数的注册过程详细介绍

    JAVA JNI函数的注册过程详细介绍

    这篇文章主要介绍了JAVA JNI函数的注册过程详细介绍的相关资料,需要的朋友可以参考下
    2016-11-11
  • 基于JavaCore文件的深入分析

    基于JavaCore文件的深入分析

    本篇文章介绍了,对JavaCore文件的深入分析。需要的朋友参考下
    2013-05-05
  • Java报错:ClassCastException问题解决方法

    Java报错:ClassCastException问题解决方法

    异常是程序中的一些错误,但并不是所有的错误都是异常,并且错误有时候是可以避免的,下面这篇文章主要给大家介绍了关于Java报错:ClassCastException问题解决方法,需要的朋友可以参考下
    2024-07-07
  • Java中JSON处理工具类使用详解

    Java中JSON处理工具类使用详解

    这篇文章主要为大家详细介绍了Java中JSON处理工具类的使用,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-01-01
  • 详细讲解Java抽象类示例

    详细讲解Java抽象类示例

    这篇文章主要介绍了 Java抽象类示例,抽象类通常用于定义一些公共的方法和属性,但是这些方法没有具体的实现,需要的朋友可以参考下
    2023-05-05
  • @Controller、@RestController注解区别详解

    @Controller、@RestController注解区别详解

    这篇文章主要介绍了@Controller、@RestController注解区别详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-10-10
  • SpringBoot整合RabbitMQ处理死信队列和延迟队列

    SpringBoot整合RabbitMQ处理死信队列和延迟队列

    这篇文章将通过示例为大家详细介绍SpringBoot整合RabbitMQ时如何处理死信队列和延迟队列,文中的示例代码讲解详细,需要的可以参考一下
    2022-05-05
  • Java 中图片压缩处理的解决方案

    Java 中图片压缩处理的解决方案

    图片经过base64编码转换后,文件会变大的原因是因为base64编码会将每个3字节的数据转换成4字节的数据,并且在转换的过程中还会添加一些额外的字符,这篇文章主要介绍了Java 中如何对图片进行压缩处理,需要的朋友可以参考下
    2023-09-09
  • 详解springboot整合mongodb

    详解springboot整合mongodb

    本篇文章主要介绍了详解springboot整合mongodb,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-05-05

最新评论