ActiveMQ中consumer的消息确认机制详解

 更新时间:2023年10月10日 10:36:35   作者:程序员阿红  
这篇文章主要介绍了ActiveMQ中consumer的消息确认机制详解,对于broker而言,只有接收到确认指令,才会认为消息被正确的接收或者处理成功了,InforSuiteMQ提供以下几种Consumer与Broker之间的消息确认方式,需要的朋友可以参考下

1. Consumer消息确认机制

简单讲就是消息被Consumer接收后,Consumer将在何时确认消息。

对于broker而言,只有接收到确认指令,才会认为消息被正确的接收或者处理成功了。InforSuiteMQ提供以下几种Consumer与Broker之间的消息确认方式。

(1)AUTO_ACKNOWLEDGE = 1 自动确认

(2)CLIENT_ACKNOWLEDGE = 2 客户端手动确认

(3)DUPS_OK_ACKNOWLEDGE = 3 自动批量确认

(4)SESSION_TRANSACTED = 0 事务提交并确认

(5)INDIVIDUAL_ACKNOWLEDGE = 4 单条消息确认

前四种是JMS API中提供的客户端ACK_MODE。第五种是InforSuiteMQ自定义补充的一种ACK_MODE。

Consumer有两种消息消费方式:同步消费consumer.receive()和异步消费MessageListener,这两种方式下,消息确认机制也是不同的。同一Consumer中,不可同时使用这两种消费方式。

同步调用时,在消息从receive方法返回之前,就已经调用了ACK;因此如果Client端没有处理成功,此消息将丢失(可能重发,与ACK_MODE有关)。

异步调用时,消息的确认是在onMessage方法返回之后,如果onMessage方法异常,会导致消息重发。

2.消息确认方式详解

2.1自动确认

AUTO_ACKNOWLEDGE : 自动确认,这就意味着消息的确认时机将有consumer择机确认。

使用开发者必须明确知道"择机确认"的具体时机,否则将有可能导致消息的丢失,或者消息的重复接收。

2.2客户端确认

CLIENT_ACKNOWLEDGE : 客户端手动确认,开发者需要自己择机确认。客户端手动确认时机有以下三种:

(1) message.acknowledge():确认当前session中所有consumer中尚未ACK的消息;

(2) InforSuiteMQSession.acknowledge():确认当前session中所有consumer中尚未ACK的消息;

(3) InforSuiteMQMessageConsumer.acknowledege():确认当前consumer中那些尚未确认的消息。

2.3自动批量确认

DUPS_OK_ACKNOWLEDGE : 自动批量确认,也是一种自动确实方式,使用方法与AUTO_ACKNOWLEDGE相同,具有“批量”和“延迟”的确认特点。

该模式下,当Consumer故障重启后,那些尚未被ACK确认的消息会重新发送过来,这就意味着消息可能重复。

2.4事务确认

  • SESSION_TRANSACTED:事务提交并确认。当session使用事务时,调用此确认方式。在事务开启之后和session.commit()之前,所有消费的消息,要么全部正常确认,要么全部redelivery。
  • 当session.commit方法异常时,开发者通常是调用session.rollback()回滚事务(事实上开发者不调用也没有问题),开发这个可以在事务开始之后的任何时机调用rollback(),rollback意味着当前事务的结束,事务中所有的消息都将被重发。调用session.rollback()而导致消息重发,都会导致message.redeliveryCounter计数器增加,最终都会受限于brokerUrl中配置的"jms.redeliveryPolicy.maximumRedeliveries",如果rollback的次数过多,而达到重发次数的上限时,消息将会被DLQ(dead letter)。

2.5单条消息确认

INDIVIDUAL_ACKNOWLEDGE : 单条消息确认。此确认方式与客户端确认方式使用CLIENT_ACKNOWLEDGE几乎一样,当消息消费成功之后,调用message.acknowledege来确认此消息(单条),而CLIENT_ACKNOWLEDGE模式,调用message.acknowledge()方法将导致整个session中所有消息被确认(批量确认)。

3.客户端确认使用场景解析

Consumer使用MessageListener异步监听队列消息,并将消息插入到数据中。

消息确认方式为客户端单条消息确认,消息插入数据库成功,调用message.acknowledege()来确认此消息(单条),消息插入数据库失败,调用session.recover()将消息返回的队列中重新发送。

以下为部分代码示例。

final InforBrokerQueueSession session =  (InforBrokerQueueSession) connection.createQueueSession(Boolean.FALSE, InforBrokerSession.INDIVIDUAL_ACKNOWLEDGE);
Destination destination = session.createQueue("myqueue_rz");  		 
InforBrokerMessageConsumer consumer = (InforBrokerMessageConsumer) session.createConsumer(destination);
public void onMessage(Message m) { 		 		
	          TextMessage message =  (TextMessage)m;	 
 	         long begin=System.currentTimeMillis();//用于性能测试,跟踪程序运行时间
 	           try {
 				  System.out.println("message::"+message.getText()); 				 
  	       	       state=insertDB(message.getText());  	
 			   } catch (JMSException e) {
 				   e.printStackTrace();
                   }
 	          long end=System.currentTimeMillis(); //用于性能测试,跟踪程序运行时间
	          long cost=end-begin;
	          System.out.println("数据接收处理总共耗时:"+cost); //用于性能测试,跟踪程序运行时间
 	           if(state.equals("1")){
 	              try {
 	            	 // System.out.println("stat=1"+message.getText());
					    message.acknowledge();
					    //   session.commit();
				} catch (JMSException e) {
					e.printStackTrace();
				}
 	           }else{
 	        	  try {
 	        		  //session.rollback();
 	        		 session.recover();
					System.out.println("getJMSRedelivered():"+m.getJMSRedelivered());
					m.setJMSRedelivered(true); 					System.out.println("m.getJMSDeliveryMode():"+m.getJMSDeliveryMode());
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				} 					
 	           }

到此这篇关于ActiveMQ中consumer的消息确认机制详解的文章就介绍到这了,更多相关consumer的消息确认机制内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 一文详解Spring构造函数推断

    一文详解Spring构造函数推断

    这篇文章主要介绍了Spring构造函数推断自动注入及底层原理详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加
    2023-04-04
  • java实现单链表中的增删改

    java实现单链表中的增删改

    这篇文章主要为大家详细介绍了java实现单链表中的增删改,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-05-05
  • SpringCloud之Config配置中心与Redis分布式锁详解

    SpringCloud之Config配置中心与Redis分布式锁详解

    这篇文章主要给大家介绍了SpringCloud Alibaba中Config配置中心,Redis分布式锁,文中有详细的代码示例供大家参考,需要的朋友可以参考阅读
    2023-05-05
  • Java实现简易学籍管理系统

    Java实现简易学籍管理系统

    这篇文章主要为大家详细介绍了Java实现简易学籍管理系统,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-02-02
  • Java线程的start方法回调run方法的操作技巧

    Java线程的start方法回调run方法的操作技巧

    面试过程中经常会被面试官问到为什么我们调用start()方法时会执行run()方法,为什么不能直接调用run()方法,问的一头雾水,今天小编给大家介绍下Java线程的start方法回调run方法的操作技巧,需要的朋友参考下吧
    2017-11-11
  • 关于Springboot数据库配置文件明文密码加密解密的问题

    关于Springboot数据库配置文件明文密码加密解密的问题

    这篇文章主要介绍了Springboot数据库配置文件明文密码加密解密的问题,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2022-03-03
  • java获取request中的参数以及java解析URL问号后的参数

    java获取request中的参数以及java解析URL问号后的参数

    这篇文章主要介绍了java获取request中的参数以及java解析URL问号后的参数问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-12-12
  • 在es中查询null值的操作方法

    在es中查询null值的操作方法

    在我们向es中写入数据时,有些时候数据写入到es中的是null,或者没有写入这个字段,那么这个时候在es中该如何查询出这种为null的数据呢,本文给大家详细讲解,需要的朋友参考下吧
    2023-02-02
  • Java线程使用同步锁交替执行打印奇数偶数的方法

    Java线程使用同步锁交替执行打印奇数偶数的方法

    这篇文章主要介绍了Java线程使用同步锁交替执行打印奇数偶数的方法。小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2019-01-01
  • Java jvm中Code Cache案例详解

    Java jvm中Code Cache案例详解

    这篇文章主要介绍了Java jvm中Code Cache案例详解,本篇文章通过简要的案例,讲解了该项技术的了解与使用,以下就是详细内容,需要的朋友可以参考下
    2021-08-08

最新评论