RocketMQ事务消息图文示例讲解
更新时间:2022年12月27日 16:10:11 作者:一个双子座的Java攻城狮
RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布式事务功能,通过事务消息能达到分布式事务的最终一致
RocketMQ 也允许我们像mysql 一样发送具有事务特征的消息
MQ 的事务流程(本地代码正常执行)
MQ 的消息补偿过程(当本地代码执行失败时)
MQ 消息的三种状态
- 提交状态:允许进入队列,此消息与非事务消息无区别
- 回滚状态:不允许进入队列,此消息等同于未发送过
- 中间状态:完成了 half 消息的发送,未对 MQ 进行二次状态确认(未知状态)
注意:事务消息仅与生产者有关,与消费者无关
生产者代码(提交状态、回滚状态):
public class Producer { public static void main(String[] args) throws Exception{ //事务消息使用的生产者是TransactionMQProducer TransactionMQProducer producer = new TransactionMQProducer("group1"); producer.setNamesrvAddr("192.168.23.127:9876"); //添加本地事务对应的监听 producer.setTransactionListener(new TransactionListener() { //正常事务过程 @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { // 此处写本地事务处理业务 // 如果成功,消息改为提交,如果失败改为 回滚,如果是多线程处理状态未知,就提交为未知等待事务补偿过程 //事务提交状态 return LocalTransactionState.COMMIT_MESSAGE;// 类似于msql 的 commit //return LocalTransactionState.ROLLBACK_MESSAGE;回滚状态 } //事务补偿过程 @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { return null; } }); producer.start(); Message msg = new Message("topic8",("事务消息:hello rocketmq ").getBytes("UTF-8")); SendResult result = producer.sendMessageInTransaction(msg,null); System.out.println("返回结果:"+result); producer.shutdown(); } }
生产者(中间状态):
public class Producer { public static void main(String[] args) throws Exception{ //事务消息使用的生产者是TransactionMQProducer TransactionMQProducer producer = new TransactionMQProducer("group1"); producer.setNamesrvAddr("192.168.23.127:9876"); //添加本地事务对应的监听 producer.setTransactionListener(new TransactionListener() { //正常事务过程 @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { return LocalTransactionState.UNKNOW; } //事务补偿过程 @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { System.out.println("事务补偿过程执行"); return LocalTransactionState.COMMIT_MESSAGE; } }); producer.start(); Message msg = new Message("topic8",("事务消息:hello rocketmq ").getBytes("UTF-8")); SendResult result = producer.sendMessageInTransaction(msg,null); System.out.println("返回结果:"+result); //事务补偿过程必须保障服务器在运行过程中,否则将无法进行正常的事务补偿 //producer.shutdown(); } }
到此这篇关于RocketMQ事务消息图文示例讲解的文章就介绍到这了,更多相关RocketMQ事务消息内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
SpringBoot中@ConfigurationProperties实现配置自动绑定的方法
本文主要介绍了SpringBoot中@ConfigurationProperties实现配置自动绑定的方法,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下2022-02-02Springboot如何根据docx填充生成word文件并导出pdf
这篇文章主要介绍了Springboot如何根据docx填充生成word文件并导出pdf问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教2024-08-08Jenkins初级应用之Invoke Phing targets插件配置
这篇文章主要为大家介绍了Jenkins初级应用之Invoke Phing targets的插件配置,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步早日升职加薪<BR>2022-04-04
最新评论