SpringBoot集成RabbitMQ的方法(死信队列)

 更新时间:2019年05月01日 11:21:27   作者:小揪揪  
这篇文章主要介绍了SpringBoot集成RabbitMQ的方法(死信队列),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

介绍

死信队列:没有被及时消费的消息存放的队列,消息没有被及时消费有以下几点原因:
1.有消息被拒绝(basic.reject/ basic.nack)并且requeue=false
2.队列达到最大长度
3.消息TTL过期

场景

1.小时进入初始队列,等待30分钟后进入5分钟队列
2.消息等待5分钟后进入执行队列
3.执行失败后重新回到5分钟队列
4.失败5次后,消息进入2小时队列
5.消息等待2小时进入执行队列
6.失败5次后,将消息丢弃或做其他处理

使用

安装MQ

使用docker方式安装,选择带mangement的版本

docker pull rabbitmq:management
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management

访问 localhost: 15672,默认账号密码guest/guest

项目配置

(1)创建springboot项目
(2)在application.properties配置文件中配置mq连接信息

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

(3)队列配置

package com.df.ps.mq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Autowire;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class MqConfig {

  //time
  @Value("${spring.df.buffered.min:120}")
  private int springdfBufferedTime;

  @Value("${spring.df.high-buffered.min:5}")
  private int springdfHighBufferedTime;

  @Value("${spring.df.low-buffered.min:120}")
  private int springdfLowBufferedTime;

  // 30min Buffered Queue
  @Value("${spring.df.queue:spring-df-buffered-queue}")
  private String springdfBufferedQueue;

  @Value("${spring.df.topic:spring-df-buffered-topic}")
  private String springdfBufferedTopic;

  @Value("${spring.df.route:spring-df-buffered-route}")
  private String springdfBufferedRouteKey;

  // 5M Buffered Queue
  @Value("${spring.df.high-buffered.queue:spring-df-high-buffered-queue}")
  private String springdfHighBufferedQueue;

  @Value("${spring.df.high-buffered.topic:spring-df-high-buffered-topic}")
  private String springdfHighBufferedTopic;

  @Value("${spring.df.high-buffered.route:spring-df-high-buffered-route}")
  private String springdfHighBufferedRouteKey;

  // High Queue
  @Value("${spring.df.high.queue:spring-df-high-queue}")
  private String springdfHighQueue;

  @Value("${spring.df.high.topic:spring-df-high-topic}")
  private String springdfHighTopic;

  @Value("${spring.df.high.route:spring-df-high-route}")
  private String springdfHighRouteKey;

  // 2H Low Buffered Queue
  @Value("${spring.df.low-buffered.queue:spring-df-low-buffered-queue}")
  private String springdfLowBufferedQueue;

  @Value("${spring.df.low-buffered.topic:spring-df-low-buffered-topic}")
  private String springdfLowBufferedTopic;

  @Value("${spring.df.low-buffered.route:spring-df-low-buffered-route}")
  private String springdfLowBufferedRouteKey;

  // Low Queue
  @Value("${spring.df.low.queue:spring-df-low-queue}")
  private String springdfLowQueue;

  @Value("${spring.df.low.topic:spring-df-low-topic}")
  private String springdfLowTopic;

  @Value("${spring.df.low.route:spring-df-low-route}")
  private String springdfLowRouteKey;


  @Bean(autowire = Autowire.BY_NAME, value = "springdfBufferedQueue")
  Queue springdfBufferedQueue() {
    int bufferedTime = 1000 * 60 * springdfBufferedTime;
    return createBufferedQueue(springdfBufferedQueue, springdfHighBufferedTopic, springdfHighBufferedRouteKey, bufferedTime);
  }

  @Bean(autowire = Autowire.BY_NAME, value = "springdfHighBufferedQueue")
  Queue springdfHighBufferedQueue() {
    int highBufferedTime = 1000 * 60 * springdfHighBufferedTime;
    return createBufferedQueue(springdfHighBufferedQueue, springdfHighTopic, springdfHighRouteKey, highBufferedTime);
  }

  @Bean(autowire = Autowire.BY_NAME, value = "springdfHighQueue")
  Queue springdfHighQueue() {
    return new Queue(springdfHighQueue, true);
  }

  @Bean(autowire = Autowire.BY_NAME, value = "springdfLowBufferedQueue")
  Queue springdfLowBufferedQueue() {
    int lowBufferedTime = 1000 * 60 * springdfLowBufferedTime;
    return createBufferedQueue(springdfLowBufferedQueue, springdfLowTopic, springdfLowRouteKey, lowBufferedTime);
  }

  @Bean(autowire = Autowire.BY_NAME, value = "springdfLowQueue")
  Queue springdfLowQueue() {
    return new Queue(springdfLowQueue, true);
  }


  @Bean(autowire = Autowire.BY_NAME, value = "springdfBufferedTopic")
  TopicExchange springdfBufferedTopic() {
    return new TopicExchange(springdfBufferedTopic);
  }

  @Bean
  Binding springBuffereddf(Queue springdfBufferedQueue, TopicExchange springdfBufferedTopic) {
    return BindingBuilder.bind(springdfBufferedQueue).to(springdfBufferedTopic).with(springdfBufferedRouteKey);
  }


  @Bean(autowire = Autowire.BY_NAME, value = "springdfHighBufferedTopic")
  TopicExchange springdfHighBufferedTopic() {
    return new TopicExchange(springdfHighBufferedTopic);
  }

  @Bean
  Binding springHighBuffereddf(Queue springdfHighBufferedQueue, TopicExchange springdfHighBufferedTopic) {
    return BindingBuilder.bind(springdfHighBufferedQueue).to(springdfHighBufferedTopic).with(springdfHighBufferedRouteKey);
  }

  @Bean(autowire = Autowire.BY_NAME, value = "springdfHighTopic")
  TopicExchange springdfHighTopic() {
    return new TopicExchange(springdfHighTopic);
  }

  @Bean
  Binding springHighdf(Queue springdfHighQueue, TopicExchange springdfHighTopic) {
    return BindingBuilder.bind(springdfHighQueue).to(springdfHighTopic).with(springdfHighRouteKey);
  }

  @Bean(autowire = Autowire.BY_NAME, value = "springdfLowBufferedTopic")
  TopicExchange springdfLowBufferedTopic() {
    return new TopicExchange(springdfLowBufferedTopic);
  }

  @Bean
  Binding springLowBuffereddf(Queue springdfLowBufferedQueue, TopicExchange springdfLowBufferedTopic) {
    return BindingBuilder.bind(springdfLowBufferedQueue).to(springdfLowBufferedTopic).with(springdfLowBufferedRouteKey);
  }

  @Bean(autowire = Autowire.BY_NAME, value = "springdfLowTopic")
  TopicExchange springdfLowTopic() {
    return new TopicExchange(springdfLowTopic);
  }

  @Bean
  Binding springLowdf(Queue springdfLowQueue, TopicExchange springdfLowTopic) {
    return BindingBuilder.bind(springdfLowQueue).to(springdfLowTopic).with(springdfLowRouteKey);
  }


  @Bean
  SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
                       MessageListenerAdapter listenerAdapter) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(springdfHighQueue, springdfLowQueue);
    container.setMessageListener(listenerAdapter);
    return container;
  }

  @Bean
  MessageListenerAdapter listenerAdapter(IntegrationReceiver receiver) {


    MessageListenerAdapter adapter = new MessageListenerAdapter(receiver);
    adapter.setDefaultListenerMethod("receive");
    Map<String, String> queueOrTagToMethodName = new HashMap<>();
    queueOrTagToMethodName.put(springdfHighQueue, "springdfHighReceive");
    queueOrTagToMethodName.put(springdfLowQueue, "springdfLowReceive");
    adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
    return adapter;

  }


  private Queue createBufferedQueue(String queueName, String topic, String routeKey, int bufferedTime) {
    Map<String, Object> args = new HashMap<>();
    args.put("x-dead-letter-exchange", topic);
    args.put("x-dead-letter-routing-key", routeKey);
    args.put("x-message-ttl", bufferedTime);
    // 是否持久化
    boolean durable = true;
    // 仅创建者可以使用的私有队列,断开后自动删除
    boolean exclusive = false;
    // 当所有消费客户端连接断开后,是否自动删除队列
    boolean autoDelete = false;

    return new Queue(queueName, durable, exclusive, autoDelete, args);
  }
}

消费者配置

package com.df.ps.mq;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

import java.util.Map;

public class MqReceiver {

  private static Logger logger = LoggerFactory.getLogger(MqReceiver.class);

  @Value("${high-retry:5}")
  private int highRetry;

  @Value("${low-retry:5}")
  private int lowRetry;

  @Value("${spring.df.high-buffered.topic:spring-df-high-buffered-topic}")
  private String springdfHighBufferedTopic;

  @Value("${spring.df.high-buffered.route:spring-df-high-buffered-route}")
  private String springdfHighBufferedRouteKey;

  @Value("${spring.df.low-buffered.topic:spring-df-low-buffered-topic}")
  private String springdfLowBufferedTopic;

  @Value("${spring.df.low-buffered.route:spring-df-low-buffered-route}")
  private String springdfLowBufferedRouteKey;

  private final RabbitTemplate rabbitTemplate;
  @Autowired
  public MqReceiver(RabbitTemplate rabbitTemplate) {
    this.rabbitTemplate = rabbitTemplate;
  }

  public void receive(Object message) {
    if (logger.isInfoEnabled()) {
      logger.info("default receiver: " + message);
    }
  }

  /**
   * 消息从初始队列进入5分钟的高速缓冲队列
   * @param message
   */
  public void highReceiver(Object message){
    ObjectMapper mapper = new ObjectMapper();
    Map msg = mapper.convertValue(message, Map.class);

    try{
      logger.info("这里做消息处理...");
    }catch (Exception e){
      int times = msg.get("times") == null ? 0 : (int) msg.get("times");
      if (times < highRetry) {
        msg.put("times", times + 1);
        rabbitTemplate.convertAndSend(springdfHighBufferedTopic,springdfHighBufferedRouteKey,message);
      } else {
        msg.put("times", 0);
        rabbitTemplate.convertAndSend(springdfLowBufferedTopic,springdfLowBufferedRouteKey,message);
      }
    }
  }

  /**
   * 消息从5分钟缓冲队列进入2小时缓冲队列
   * @param message
   */
  public void lowReceiver(Object message){
    ObjectMapper mapper = new ObjectMapper();
    Map msg = mapper.convertValue(message, Map.class);
    
    try {
      logger.info("这里做消息处理...");
    }catch (Exception e){
      int times = msg.get("times") == null ? 0 : (int) msg.get("times");
      if (times < lowRetry) {
        rabbitTemplate.convertAndSend(springdfLowBufferedTopic,springdfLowBufferedRouteKey,message);
      }else{
        logger.info("消息无法被消费...");
      }
    } 
  }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

相关文章

  • SpringBoot实现给属性赋值的两种方式

    SpringBoot实现给属性赋值的两种方式

    在Spring Boot中,配置文件是用来设置应用程序的各种参数和操作模式的重要部分,Spring Boot支持两种主要类型的配置文件:properties文件和YAML 文件,这两种文件都可以用来定义相同的配置,接下来由小编给大家详细的介绍一下这两种方式
    2024-07-07
  • java  基础知识之IO总结

    java 基础知识之IO总结

    这篇文章主要介绍了java 基础知识之IO总结的相关资料,Java中的I/O分为两种类型,一种是顺序读取,一种是随机读取,需要的朋友可以参考下
    2017-03-03
  • 详解SpringBoot如何删除引用jar包中的无用bean

    详解SpringBoot如何删除引用jar包中的无用bean

    为了赶速度和直接将之前多模块的maven项目中的部分模块,直接以jar包的形式引入到新项目中了,虽然省去了不少开发时间,导致项目臃肿,启动很慢。本文将用@ComponentScan注解去实现让项目只加载自己需要的bean,需要的可以参考一下
    2022-06-06
  • SpringBoot2.0 整合 SpringSecurity 框架实现用户权限安全管理方法

    SpringBoot2.0 整合 SpringSecurity 框架实现用户权限安全管理方法

    Spring Security是一个能够为基于Spring的企业应用系统提供声明式的安全访问控制解决方案的安全框架。这篇文章主要介绍了SpringBoot2.0 整合 SpringSecurity 框架,实现用户权限安全管理 ,需要的朋友可以参考下
    2019-07-07
  • Java垃圾回收之复制算法详解

    Java垃圾回收之复制算法详解

    今天小编就为大家分享一篇关于Java垃圾回收之复制算法详解,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2018-10-10
  • SpringBoot开发案例之配置Druid数据库连接池的示例

    SpringBoot开发案例之配置Druid数据库连接池的示例

    本篇文章主要介绍了SpringBoot开发案例之配置Druid数据库连接池的示例,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-03-03
  • Java获取当前时间戳案例详解

    Java获取当前时间戳案例详解

    这篇文章主要介绍了Java获取当前时间戳案例详解,本篇文章通过简要的案例,讲解了该项技术的了解与使用,以下就是详细内容,需要的朋友可以参考下
    2021-08-08
  • 关于ElasticSearch的常用增删改查DSL和代码

    关于ElasticSearch的常用增删改查DSL和代码

    这篇文章主要介绍了关于ElasticSearch的常用增删改查DSL和代码,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-04-04
  • 一款不可错过的Java应用诊断利器Arthas

    一款不可错过的Java应用诊断利器Arthas

    Arthas是一款由阿里巴巴开源的Java应用诊断利器,它可以帮助开发人员在运行时对Java应用进行调试和诊断,解决线上问题,本文将简单的描述一下该工具的用法和常用命令,以勾起大家对此工具应用的兴趣
    2023-06-06
  • Java实现FTP文件的上传和下载功能的实例代码

    Java实现FTP文件的上传和下载功能的实例代码

    FTP 是File Transfer Protocol(文件传输协议)的英文简称,而中文简称为“文传协议”。接下来通过本文给大家实例讲解Java实现FTP文件的上传和下载功能,需要的的朋友一起看看吧
    2016-11-11

最新评论