SpringBoot基于Disruptor实现高效的消息队列 

 更新时间:2024年02月22日 09:04:48   作者:wx59bcc77095d22  
Disruptor是一个开源的Java框架,它被设计用于在生产者-消费者问题上获得尽量高的吞吐量和尽量低的延迟,本文主要介绍了SpringBoot基于Disruptor实现高效的消息队列 ,具有一定的参考价值,感兴趣的可以了解一下

一、前言

Disruptor是一个开源的Java框架,它被设计用于在生产者-消费者问题上获得尽量高的吞吐量和尽量低的延迟,从功能上来看Disruptor是实现了队列的功能,而且是一个有界队列。那么它的应用场景自然就是“生产者-消费者”模型的应用场合了。Disruptor 是在内存中以队列的方式去实现的,而且是无锁的。这也是 Disruptor 为什么高效的原因。

二、SpringBoot整合Disruptor

1.添加依赖

<!--Disruptor-->
<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.4</version>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>

2.创建消息体实体

package com.example.aopdemo.disruptor;

import lombok.Data;

/**
 * @author qx
 * @date 2024/2/21
 * @des 消息体
 */
@Data
public class MessageModel {

    private String message;

}

3.创建事件工厂类

package com.example.aopdemo.disruptor;

import com.lmax.disruptor.EventFactory;

/**
 * @author qx
 * @date 2024/2/21
 * @des 事件工厂类
 */
public class MessageEventFactory implements EventFactory<MessageModel> {
    @Override
    public MessageModel newInstance() {
        return new MessageModel();
    }
}

4.创建消费者

package com.example.aopdemo.disruptor;

import com.lmax.disruptor.EventHandler;
import lombok.extern.slf4j.Slf4j;

/**
 * @author qx
 * @date 2024/2/21
 * @des 消息消费者
 */
@Slf4j
public class MessageEventHandler implements EventHandler<MessageModel> {
    @Override
    public void onEvent(MessageModel messageModel, long sequence, boolean endOfBatch) {
        log.info("消费者获取消息:{}", messageModel);
    }
}

5.构造BeanManager

package com.example.aopdemo.disruptor;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

/**
 * @author qx
 * @date 2024/2/21
 * @des
 */
@Component
public class BeanManager implements ApplicationContextAware {

    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        BeanManager.applicationContext = applicationContext;
    }

    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }

    public static Object getBean(String name) {
        return applicationContext.getBean(name);
    }

    public static <T> T getBean(Class<T> clazz) {
        return applicationContext.getBean(clazz);
    }
}

6.创建消息管理器

package com.example.aopdemo.disruptor;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author qx
 * @date 2024/2/21
 * @des 事件管理器
 */
@Configuration
public class MessageManager {

    @Bean("messageModel")
    public RingBuffer<MessageModel> messageModelRingBuffer() {
        // 定义线程池
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        // 指定事件工厂
        MessageEventFactory factory = new MessageEventFactory();

        // 指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率
        int bufferSize = 1024 * 256;

        //单线程模式,获取额外的性能
        Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executorService, ProducerType.SINGLE, new BlockingWaitStrategy());

        //设置事件业务处理器---消费者
        disruptor.handleEventsWith(new MessageEventHandler());

        //启动disruptor线程
        disruptor.start();

        //获取ringbuffer环,用于接取生产者生产的事件
        return disruptor.getRingBuffer();
    }

}

7.创建生产者

package com.example.aopdemo.disruptor;

import com.lmax.disruptor.RingBuffer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @author qx
 * @date 2024/2/21
 * @des 生产者
 */
@Service
@Slf4j
public class DisruptorService {

    @Autowired
    private RingBuffer<MessageModel> messageModelRingBuffer;

    public void sayMessage(String message) {
        // 获取下一个Event槽的下标
        long sequence = messageModelRingBuffer.next();
        try {
            // 填充数据
            MessageModel messageModel = messageModelRingBuffer.get(sequence);
            messageModel.setMessage(message);
            log.info("往消息队列中添加消息:{}", messageModel);
        } catch (Exception e) {
            log.error("failed to add event to messageModelRingBuffer for : e = {},{}", e, e.getMessage());
        } finally {
            //发布Event,激活观察者去消费,将sequence传递给改消费者
            //注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producer
            messageModelRingBuffer.publish(sequence);
        }

    }

}

8.创建测试类

package com.example.aopdemo.controller;

import com.example.aopdemo.disruptor.DisruptorService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author qx
 * @date 2024/2/21
 * @des Disruptor测试
 */
@RestController
public class DisruptorController {

    @Autowired
    private DisruptorService disruptorService;

    @GetMapping("/disruptor")
    public String disruptorTest(String message) {
        disruptorService.sayMessage(message);
        return "发送消息成功";
    }
}

9.测试

启动程序,在浏览器访问请求连接进行测试。

我们在控制台上可以获取到消息的发送和接收信息。

2024-02-21 15:22:16.059  INFO 6788 --- [nio-8080-exec-1] c.e.aopdemo.disruptor.DisruptorService   : 往消息队列中添加消息:MessageModel(message=hello)
2024-02-21 15:22:16.060  INFO 6788 --- [pool-1-thread-1] c.e.a.disruptor.MessageEventHandler      : 消费者获取消息:MessageModel(message=hello)

到此这篇关于SpringBoot基于Disruptor实现高效的消息队列 的文章就介绍到这了,更多相关SpringBoot Disruptor消息队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • springboot中如何配置LocalDateTime JSON返回时间戳

    springboot中如何配置LocalDateTime JSON返回时间戳

    这篇文章主要介绍了springboot中如何配置LocalDateTime JSON返回时间戳问题。具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-06-06
  • 利用Java读取Word表格中文本和图片的方法实例

    利用Java读取Word表格中文本和图片的方法实例

    这篇文章主要给大家介绍了关于如何利用Java读取Word表格中文本和图片的相关资料,主要利用的是free spire.doc.jar 包,文中通过图文介绍的非常详细,需要的朋友可以参考下
    2021-07-07
  • Tomcat处理请求的线程模型详解

    Tomcat处理请求的线程模型详解

    这篇文章主要为大家详细介绍了Tomcat处理请求的线程模型,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下,希望能够给你带来帮助
    2022-03-03
  • Spring整合MyBatis图示过程解析

    Spring整合MyBatis图示过程解析

    这篇文章主要介绍了Spring整合MyBatis图示过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-11-11
  • Java基础详解之集合框架工具Collections

    Java基础详解之集合框架工具Collections

    这篇文章主要介绍了Java基础详解之集合框架工具Collections,文中有非常详细的代码示例,对正在学习java的小伙伴们有很好地帮助,需要的朋友可以参考下
    2021-04-04
  • 解决SpringBoot web项目启动后立即关闭的问题

    解决SpringBoot web项目启动后立即关闭的问题

    这篇文章主要介绍了解决SpringBoot web项目启动后立即关闭的问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-08-08
  • springboot3.0整合mybatis-flex实现逆向工程的示例代码

    springboot3.0整合mybatis-flex实现逆向工程的示例代码

    逆向工程先创建数据库表,由框架负责根据数据库表,自动生成mybatis所要执行的代码,本文就来介绍一下springboot mybatis-flex逆向工程,感兴趣的可以了解一下
    2024-06-06
  • 面试官:详细谈谈Java对象的4种引用方式

    面试官:详细谈谈Java对象的4种引用方式

    这篇文章主要给大家介绍了java面试官常会问到的,关于Java对象的4种引用方式的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者使用Java具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧
    2019-05-05
  • JAVA 根据设置的概率生成随机数的方法

    JAVA 根据设置的概率生成随机数的方法

    本篇文章主要介绍了JAVA 根据设置的概率生成随机数的方法,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-08-08
  • java中Map和List初始化的N种方法总结

    java中Map和List初始化的N种方法总结

    这篇文章主要介绍了java中Map和List初始化的N种方法总结,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-03-03

最新评论