RocketMQ消息队列实现随机消息发送当做七夕礼物

 更新时间:2022年08月22日 14:57:13   作者:奔跑的毛球  
这篇文章主要为大家介绍了RocketMQ消息队列实现随机消息发送当做七夕礼物,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

正文

都在过情人节,前端的小哥哥们给女朋友画个页面,美美的,写个chrome插件,好看的,俺们后端程序员咋办。

我给媳妇写首诗,哈哈

我决定,把想对媳妇说的,今天发送到一个MQ里边,然后在七夕当天,打开消费者,将这一段话给俺媳妇看。你看,这就是我好久前对你说的话,这就是我们后端程序员的浪漫。当然也可以多发送几个,到时候跟根据topic控制到底发什么,哈哈。

这里首先得用顺序消息,当然,消息过期时间得设置的长一点。

1 下载并启动RocketMQ

点击下载,这是个windows版本的。

下载完成解压后长这样:

然后后还需要配置环境变量

这个时候就可以进入到RocketMQ的bin目录启动MQ了

1.1 首先启动name server

start mqnamesrv.cmd

1.2 然后启动Broker

start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

这个时候就启动成功了

2 生产者

需要注意的是,消息必须是顺序消息,不然发送的消息顺序就乱了。一首情诗顺序乱了,读不下去,岂不是很尴尬。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.List;
public class RocketMQOrderProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        String[] tags = new String[]{"TagA", "TagC", "TagD"};
        //读取文件
        List<String> messages = getMessages();
        for (int i = 0; i < messages.size(); i++) {
            String body = messages.get(i);
            Message msg = new Message("topic_luke", tags[i % tags.length], "KEY" + i, body.getBytes());
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Long id = ((Integer)arg).longValue();
                    long index = id % mqs.size();
                    return mqs.get((int) index);
                }
            }, i);
        }
        producer.shutdown();
    }
    static List<String> getMessages() throws IOException {
        String temp = null;
        File f = new File("E:\Code\online-taxi-three\demo\luke.txt");
        InputStreamReader read = new InputStreamReader(new FileInputStream(f));
        ArrayList readList = new ArrayList();
        BufferedReader reader = new BufferedReader(read);
        while ((temp = reader.readLine()) != null && !"".equals(temp)) {
            readList.add(temp);
        }
        return readList;
    }
}

3 消费者

这里需要注意的是setConsumeThreadMaxsetConsumeThreadMin都需要设置成1,单线程取消息这样就可以通过sleep控制消息的显示速度,不然并发取消息就很快显示完了。不够唯美。

import lombok.SneakyThrows;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class RockerMQConsumer {
    public static void main(String[] args) throws Exception {
        //实例化消息消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_luke");
        //指定nameserver地址
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumeThreadMax(1);
        consumer.setConsumeThreadMin(1);
        consumer.setPullBatchSize(1);
        //订阅topic
        consumer.subscribe("topic_luke","*");
        // 注册回调实现类来处理从broker拉取回来的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @SneakyThrows
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody()));
                    TimeUnit.SECONDS.sleep(3);
                }
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者实例
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

发送的内容在这里自由编写哈,路径和文件名能对上就行,谢谢观看,最近突发奇想,把技术编成故事讲出来,会不会比较受欢迎呢。

以上就是RocketMQ消息队列实现随机消息发送当做七夕礼物的详细内容,更多关于RocketMQ消息队列随机消息的资料请关注脚本之家其它相关文章!

相关文章

  • springboot如何获取application.yml里值的方法

    springboot如何获取application.yml里值的方法

    这篇文章主要介绍了springboot如何获取application.yml里的值,文章围绕主题相关自资料展开详细的内容介绍,具有一定的参考价值,需要的小伙伴可以参考一下
    2022-04-04
  • 详解Java使用JMH进行基准性能测试

    详解Java使用JMH进行基准性能测试

    本文主要介绍了Java使用JMH进行基准性能测试,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-11-11
  • 教你通过B+Tree平衡多叉树理解InnoDB引擎的聚集和非聚集索引

    教你通过B+Tree平衡多叉树理解InnoDB引擎的聚集和非聚集索引

    大家都知道B+Tree是从二叉树演化而来,在这之前我们来先了解二叉树、平衡二叉树、平衡多叉树,这篇文章主要介绍了通过B+Tree平衡多叉树理解InnoDB引擎的聚集和非聚集索引,需要的朋友可以参考下
    2022-01-01
  • Java数组转换为List的四种方式

    Java数组转换为List的四种方式

    这篇文章主要介绍了Java开发技巧数组转List的四种方式总结,每种方式结合实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-09-09
  • Java8默认方法Default Methods原理及实例详解

    Java8默认方法Default Methods原理及实例详解

    这篇文章主要介绍了Java8默认方法Default Methods原理及实例详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-01-01
  • Java实现的连续奇数(n+2*x)是合数的算法题暴力算法

    Java实现的连续奇数(n+2*x)是合数的算法题暴力算法

    这篇文章主要介绍了Java实现的连续奇数(n+2*x)是合数的算法题暴力算法,本文包含运算结果和实现代码,需要的朋友可以参考下
    2014-09-09
  • java实现简单的英文文本单词翻译器功能示例

    java实现简单的英文文本单词翻译器功能示例

    这篇文章主要介绍了java实现简单的英文文本单词翻译器功能,涉及java文件读取、字符串分割、遍历、判断等相关操作技巧,需要的朋友可以参考下
    2017-10-10
  • java字符串日期类Date和Calendar相互转化及相关常用方法

    java字符串日期类Date和Calendar相互转化及相关常用方法

    Java语言的Calendar(日历),Date(日期),和DateFormat(日期格式)组成了Java标准的一个基本但是非常重要的部分,下面这篇文章主要给大家介绍了关于java字符串日期类Date和Calendar相互转化及相关常用方法的相关资料,需要的朋友可以参考下
    2023-12-12
  • SpringBoot中的multipartResolver上传文件配置

    SpringBoot中的multipartResolver上传文件配置

    这篇文章主要介绍了SpringBoot中的multipartResolver上传文件配置,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-10-10
  • vue+springboot项目上传部署tomcat的方法实现

    vue+springboot项目上传部署tomcat的方法实现

    本文主要介绍了vue+springboot项目上传部署tomcat的方法实现,包括环境准备、配置调整以及部署步骤,文中通过图文及示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2024-01-01

最新评论