Rocketmq事务消息之半消息详解

 更新时间:2023年09月08日 09:26:39   作者:澄风  
这篇文章主要介绍了Rocketmq事务消息之半消息详解,RocketMQ的事务消息支持在业务逻辑与发送消息之间提供事务保证,RocketMQ通过两阶段的方式提供事务消息的支持,需要的朋友可以参考下

什么是半消息(事务消息)

当我们在业务逻辑中发送消息时,消息与业务的事务之间难以保证一致性,如果业务代码出现异常,如果已发送的消息无法回滚,则很会出现数据不一致的情况,RocketMQ的事务消息支持在业务逻辑与发送消息之间提供事务保证,RocketMQ通过两阶段的方式提供事务消息的支持。

首需要注意的是 事务消息(半消息) 仅仅只是保证本地事务和MQ消息发送形成整体的 原子性 ,而投递到MQ服务器后,并无法保证消费者一定能消费成功!

  • 事务消息 :MQ 提供类似 X/Open XA 的分布事务功能,通过 MQ 事务消息能达到分布式事务的最终一致。
  • 半消息 :暂不能投递的消息,发送方已经将消息成功发送到了 MQ 服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半消息。
  • 半消息回查 :由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,MQ 服务端通过扫描发现某条消息长期处于“半消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该过程即消息回查。

极端情况:是否任何情况下MQ的事务性消息都可以保证双方的最终一致性?答案是否定的。

考虑上面提到的异常情况“情况2:MQ发送方在执行完本地事务之后commit之前异常退出”。

在这种情况下如果如果MQ发送方由于运维上的失误长时间不重启MQ发送方,那么MQ在多次回查不成功之后将会丢弃该消息。

最终分布式事务的双方是不能达到最终一致性了。当然这个回查的最大值可以通过修改broker的参数transactionCheckMax来调整。但是过大的transactionCheckMax参数将会导致MQ堆积过多的半包消息,从而危害MQ的稳定性,是个需要权衡的参数。

半消息事务实现流程

在这里插入图片描述

流程:

1.发送方向 MQ 服务端发送事务消息;

2.MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。

3.发送方开始执行本地事务逻辑。

4.发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;MQ Server 收到 Rollback 状态则删除

半消息,订阅方将不会接受该消息。

5.在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达 MQ Server,经过固定时间后 MQ Server 将对该消息发起消息回查。

6.发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

7.发送方根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server 仍按照步骤4对半消息进行操作。

半消息的应用场景

注册系统注册的流程中,用户入口在网页注册系统,通知系统在邮件系统,两个系统之间的数据需要保持最终一致。

普通消息处理

如上所述,注册系统和邮件通知系统之间通过消息队列进行异步处理。注册系统将注册信息写入注册系统之后,发送一条注册成功的消息到消息队列RocketMQ版,邮件通知系统订阅消息队列RocketMQ版的注册消息,做相应的业务处理,发送注册成功或者失败的邮件。

普通消息处理

在这里插入图片描述

流程说明如下:

1.注册系统发起注册。

2.注册系统向消息队列RocketMQ版发送注册消息成功与否的消息。

2.1 消息发送成功,进入3。

2.2 消息发送失败,导致邮件通知系统未收到消息队列RocketMQ版发送的注册成功与否的消息,而无法发送邮件,最终邮件通知系统和注册系统之间的状态数据不一致,流程结束。

3.邮件通知系统收到消息队列RocketMQ版的注册成功消息。

4.邮件通知系统发送注册成功邮件给用户。

在这样的情况下,虽然实现了系统间的解耦,上游系统不需要关心下游系统的业务处理结果;但是数据一致性不好处理,如何保证邮件通知系统状态与注册系统状态的最终一致。

事务消息处理

此时,需要利用消息队列RocketMQ版所提供的事务消息来实现系统间的状态数据一致性。

在这里插入图片描述

事务消息 流程说明如下:

1.注册系统向消息队列RocketMQ版发送半事务消息。

1.1 半事务消息发送成功,进入2。

1.2 半事务消息发送失败,注册系统不进行注册,流程结束。

说明 最终注册系统与邮件通知系统数据一致。

2.注册系统开始注册。

2.1 注册成功,进入3.1。

2.2 注册失败,进入3.2。

3.注册系统向消息队列RocketMQ版发送半消息状态。

3.1 提交半事务消息,产生注册成功消息,进入4。

3.2 回滚半事务消息,未产生注册成功消息,流程结束。

说明 最终注册系统与邮件通知系统数据一致。

4.邮件通知系统接收消息队列RocketMQ版的注册成功消息。

5.邮件通知系统发送注册成功邮件。

说明 最终注册系统与邮件通知系统数据一致。

关于分布式事务消息的更多详细内容,请参见事务消息。

这一段是摘抄子阿里云的rocketmq文档介绍,大概的意思就是我发消息的动作要和一个本地事务进行绑定,我如果发消息失败那么你本地事务也不应该执行,我本地事务执行失败,那么消息也不应该发。要保证上下游系统的数据是最终一致的,保证消息和本地事务一定是原子性的。

实践

在这里插入图片描述

1.模拟一个接口发送事务消息

@GetMapping("/sendMessage")
public String sendMessage(String cron) {
    //TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction
    //        (String.format("%s:%s", "FINORDER_MQ_TOPIC_KEY", "FINORDER_PAY_TAG_KEY"), MessageBuilder.withPayload("{\"name\": \"ok\"}").build(), "");
    Message<String> mqMessage = MessageBuilder
            .withPayload("OK 啦!")
            .setHeader("key", "ALLENS") // ①
            .build();
    TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction("FINORDER_MQ_TOPIC_KEY:FINORDER_PAY_TAG_KEY" /* ② */, mqMessage , "");
    return "ok";
}

① 设置消息头,这事务监听里边可以通过这个header来区分是哪一个事务,假如单个微服务有多个事务消息就可以用这个来区分。

② TOPIC + groupid 用":"来分割

创建事务监听器

@Service
@RocketMQTransactionListener
@Slf4j
public class TestTransactionListenerImpl implements RocketMQLocalTransactionListener {
	/**
	 * 每次推送消息会执行executeLocalTransaction方法,首先会发送半消息,到这里的时候是执行具体本地业务,
     * 执行成功后手动返回RocketMQLocalTransactionState.COMMIT状态,
     * 这里是保证本地事务执行成功,如果本地事务执行失败则可以返回ROLLBACK进行消息回滚。 此时消息只是被保存到broker,并没有发送到topic中,broker会根据本地返回的状态来决定消息的处理方式。
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        System.out.println("接收到消息:" + msg);
        System.out.println("Header:" + msg.getHeaders().get("key"));
        return RocketMQLocalTransactionState.COMMIT;
    }
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        return RocketMQLocalTransactionState.COMMIT;
    }
}

该监听器的实现有两个方法一个是本地事务的执行方法executeLocalTransaction,一个是本地事务回查方法checkLocalTransaction。 两个方法的返回值类型为RocketMQLocalTransactionState,该枚举有三种:

// COMMIT:即生产者通知Rocket该消息可以消费
RocketMQLocalTransactionState.COMMIT;
// ROLLBACK:即生产者通知Rocket将该消息删除
RocketMQLocalTransactionState.ROLLBACK;
// UNKNOWN:即生产者通知Rocket继续查询该消息的状态
RocketMQLocalTransactionState.UNKNOWN;

对于长时间没有 Commit/Rollback 的事务消息( pending 状态的消息),从服务端发起一次 回查Producer 收到回查消息,检查回查消息对应的 本地事务状态根据本地事务状态,重新 Commit 或者 Rollback。

以上代码中,如果sex是偶数,executeLocalTransaction会抛出异常,本地事务会回滚,半消息状态是UNKNOWN,此时就会启动消息的回查机制,mq会在一定的时间调用checkLocalTransaction方法查询执行状态,根据执行状态来决定是继续回查、删除消息、发送消息。

executeLocalTransaction也可以自己捕获异常,手动回滚事务,返回RocketMQLocalTransactionState.ROLLBACK,这样能减少消息回查。

等消息正常提交,半消息消息会移动到发送指定的TOPIC队里中,这个时候订阅者就可以正常获取消息了。

@Service
@RocketMQMessageListener(consumerGroup = "FINORDER_PAY_TAG_KEY",topic = "FINORDER_MQ_TOPIC_KEY")
@Slf4j
public class MQConsumerService implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("consumer:" + message);
    }
}

总结

1、首需要注意的是 事务消息(半消息) 仅仅只是保证本地事务和MQ消息发送形成整体的 原子性 ,而投递到MQ服务器后,并无法保证消费者一定能消费成功!

2、发消息的动作要和一个本地事务进行绑定,我如果发消息失败那么你本地事务也不应该执行,我本地事务执行失败,那么消息也不应该发。要保证上下游系统的数据是最终一致的,保证消息和本地事务一定是原子性的。

3、 半消息如果提交成功最终是要入队列的,可以正常的收到消息,这个时候可以认为上游系统的依赖条件肯定是已经执行成功了的。

到此这篇关于Rocketmq事务消息之半消息详解的文章就介绍到这了,更多相关Rocketmq半消息内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 详解java安全编码指南之可见性和原子性

    详解java安全编码指南之可见性和原子性

    java类中会定义很多变量,有类变量也有实例变量,这些变量在访问的过程中,会遇到一些可见性和原子性的问题。这里我们来详细了解一下怎么避免这些问题。
    2021-06-06
  • java中动态代理的实现

    java中动态代理的实现

    本篇文章主要介绍了Java中两种动态代理的实现:jdk动态代理;cglib动态代理。具有一定的参考价值,下面跟着小编一起来看下吧
    2017-01-01
  • Java编程使用Runtime和Process类运行外部程序的方法

    Java编程使用Runtime和Process类运行外部程序的方法

    这篇文章主要介绍了Java编程使用Runtime和Process类运行外部程序的方法,结合实例形式分析了java使用Runtime.getRuntime().exec()方法运行外部程序的常见情况与操作技巧,需要的朋友可以参考下
    2017-08-08
  • Spring Boot 启动注解过程分析

    Spring Boot 启动注解过程分析

    这篇文章主要为大家介绍了Spring Boot 启动注解过程示例分析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-06-06
  • java实现读取带合并单元格的Excel

    java实现读取带合并单元格的Excel

    这篇文章主要为大家详细介绍了java如何实现读取带合并单元格的Excel,文中的示例代码讲解详细, 感兴趣的小伙伴可以跟随小编一起学习一下
    2023-12-12
  • MyBatis框架零基础快速入门案例详解

    MyBatis框架零基础快速入门案例详解

    MyBatis本是apache的一个开源项目iBatis,2010年这个项目由apache software foundation迁移到了google code,并且改名为MyBatis。2013年11月迁移到Github。iBATIS一词来源于“internet”和“abatis”的组合,是一个基于Java的持久层框架
    2022-04-04
  • JAVA十大排序算法之归并排序详解

    JAVA十大排序算法之归并排序详解

    这篇文章主要介绍了java中的归并排序,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-08-08
  • Java动态设置注解值及原理详解

    Java动态设置注解值及原理详解

    这篇文章主要介绍了Java动态设置注解值及原理详解,AnnotationInvocationHandler是注解的代理hander,通过反射获取类的注解时会通过AnnotationInvocationHandler创建代理对象并将数据存储到memberValues里,需要的朋友可以参考下
    2023-11-11
  • Java_异常类(错误和异常,两者的区别介绍)

    Java_异常类(错误和异常,两者的区别介绍)

    下面小编就为大家带来一篇Java_异常类(错误和异常,两者的区别介绍) 。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2016-09-09
  • 在java中实现C#语法里的按引用传递参数的方法

    在java中实现C#语法里的按引用传递参数的方法

    下面小编就为大家带来一篇在java中实现C#语法里的按引用传递参数的方法。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2016-09-09

最新评论