docker启动rabbitmq以及使用方式详解

 更新时间:2022年08月04日 11:34:00   作者:Maackia  
RabbitMQ是一个由erlang开发的消息队列,下面这篇文章主要给大家介绍了关于docker启动rabbitmq以及使用的相关资料,文中通过图文介绍的非常详细,需要的朋友可以参考下

搜索rabbitmq镜像

docker search rabbitmq:management

下载镜像

docker pull rabbitmq:management

启动容器

docker run -d --hostname localhost --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:management

打印容器

docker logs rabbitmq

访问RabbitMQ Management

http://localhost:15672

账户密码默认:guest

编写生产者类

package com.xun.rabbitmqdemo.example;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();
        /**
         * 生成一个queue队列
         * 1、队列名称 QUEUE_NAME
         * 2、队列里面的消息是否持久化(默认消息存储在内存中)
         * 3、该队列是否只供一个Consumer消费 是否共享 设置为true可以多个消费者消费
         * 4、是否自动删除 最后一个消费者断开连接后 该队列是否自动删除
         * 5、其他参数
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        String message = "Hello world!";
        /**
         * 发送一个消息
         * 1、发送到哪个exchange交换机
         * 2、路由的key
         * 3、其他的参数信息
         * 4、消息体
         */
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println(" [x] Sent '"+message+"'");

        channel.close();
        connection.close();
    }
}

运行该方法,可以看到控制台的打印

name=hello的队列收到Message

消费者

package com.xun.rabbitmqdemo.example;

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Receiver {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setConnectionTimeout(600000);//milliseconds
        factory.setRequestedHeartbeat(60);//seconds
        factory.setHandshakeTimeout(6000);//milliseconds
        factory.setRequestedChannelMax(5);
        factory.setNetworkRecoveryInterval(500);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        System.out.println("Waiting for messages. ");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        channel.basicConsume(QUEUE_NAME,true,consumer);
    }
}

工作队列

RabbitMqUtils工具类

package com.xun.rabbitmqdemo.utils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMqUtils {
    public static Channel getChannel() throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}

启动2个工作线程

package com.xun.rabbitmqdemo.workQueue;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;

public class Work01 {
    private static final String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收消息:"+receivedMessage);
        };
        CancelCallback cancelCallback = (consumerTag)->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        };
        System.out.println("C1 消费者启动等待消费....");
        /**
         * 消费者消费消息
         * 1、消费哪个队列
         * 2、消费成功后是否自动应答
         * 3、消费的接口回调
         * 4、消费未成功的接口回调
         */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}
package com.xun.rabbitmqdemo.workQueue;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;

public class Work02 {
    private static final String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收消息:"+receivedMessage);
        };
        CancelCallback cancelCallback = (consumerTag)->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        };
        System.out.println("C2 消费者启动等待消费....");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

启动工作线程

启动发送线程

package com.xun.rabbitmqdemo.workQueue;

import com.rabbitmq.client.Channel;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import java.util.Scanner;

public class Task01 {
    private static final String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception{
        try(Channel channel= RabbitMqUtils.getChannel();){
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            //从控制台接收消息
            Scanner scanner = new Scanner(System.in);
            while(scanner.hasNext()){
                String message = scanner.next();
                channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
                System.out.println("发送消息完成:"+message);
            }
        }
    }
}

启动发送线程,此时发送线程等待键盘输入

发送4个消息

可以看到2个工作线程按照顺序分别接收message。

消息应答机制

rabbitmq将message发送给消费者后,就会将该消息标记为删除。

但消费者在处理message过程中宕机,会导致消息的丢失。

因此需要设置手动应答。

生产者

import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import java.util.Scanner;

public class Task02 {
    private static final String TASK_QUEUE_NAME = "ack_queue";
    public static void main(String[] args) throws Exception{
        try(Channel channel = RabbitMqUtils.getChannel()){
            channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
            Scanner scanner = new Scanner(System.in);
            System.out.println("请输入信息");
            while(scanner.hasNext()){
                String message = scanner.nextLine();
                channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes());
                System.out.println("生产者task02发出消息"+ message);
            }
        }
    }
}

消费者

package com.xun.rabbitmqdemo.workQueue;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import com.xun.rabbitmqdemo.utils.SleepUtils;

public class Work03 {
    private static final String ACK_QUEUE_NAME = "ack_queue";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("Work03 等待接收消息处理时间较短");
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String message = new String(delivery.getBody());
            SleepUtils.sleep(1);
            System.out.println("接收到消息:"+message);
            /**
             * 1、消息的标记tag
             * 2、是否批量应答
             */
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        CancelCallback cancelCallback = (consumerTag)->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        };
        //采用手动应答
        boolean autoAck = false;
        channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}
package com.xun.rabbitmqdemo.workQueue;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import com.xun.rabbitmqdemo.utils.SleepUtils;

public class Work04 {
    private static final String ACK_QUEUE_NAME = "ack_queue";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("Work04 等待接收消息处理时间较长");
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String message = new String(delivery.getBody());
            SleepUtils.sleep(30);
            System.out.println("接收到消息:"+message);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        CancelCallback cancelCallback = (consumerTag)->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        };
        //采用手动应答
        boolean autoAck = false;
        channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}

工具类SleepUtils

package com.xun.rabbitmqdemo.utils;
public class SleepUtils {
    public static void sleep(int second){
        try{
            Thread.sleep(1000*second);
        }catch (InterruptedException _ignored){
            Thread.currentThread().interrupt();
        }
    }
}

模拟

work04等待30s后发出ack

在work04处理message时手动停止线程,可以看到message:dd被rabbitmq交给了work03

不公平分发

上面的轮询分发,生产者依次向消费者按顺序发送消息,但当消费者A处理速度很快,而消费者B处理速度很慢时,这种分发策略显然是不合理的。
不公平分发:

int prefetchCount = 1;
channel.basicQos(prefetchCount);

通过此配置,当消费者未处理完当前消息,rabbitmq会优先将该message分发给空闲消费者。

总结 

到此这篇关于docker启动rabbitmq以及使用的文章就介绍到这了,更多相关docker启动rabbitmq及使用内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Docker安装部署Mysql8的过程(以作数据持久化)

    Docker安装部署Mysql8的过程(以作数据持久化)

    这篇文章主要介绍了Docker安装部署Mysql8(以作数据持久化),首先创建容器并进行持久化处理,接着配置远程连接并尝试,本文结合实例代码给大家介绍的非常详细,需要的朋友可以参考下
    2022-09-09
  • centos修改docker网络配置方法分享

    centos修改docker网络配置方法分享

    本文给大家分享的是centos修改docker网络配置的方法,非常的实用,有需要的小伙伴可以参考下
    2017-03-03
  • 在Docker中的ubuntu中安装Python3和Pip的问题

    在Docker中的ubuntu中安装Python3和Pip的问题

    这篇文章主要介绍了在Docker中的ubuntu中安装Python3和Pip的问题,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-02-02
  • Docker容器输入汉字触发自动补全问题

    Docker容器输入汉字触发自动补全问题

    Linux系统中,当终端出现中文乱码通常是由于缺少中文字符集和字体,安装中文字体和设置合适的字符集zh_CN.utf8,可以有效解决这一问题,首先,通过查看系统支持的字符集了解是否支持中文,然后安装相应字符集,其次,查看并设置当前系统字符集
    2024-10-10
  • 使用Docker部署SpringBoot项目的实现方法

    使用Docker部署SpringBoot项目的实现方法

    这篇文章主要介绍了使用Docker部署SpringBoot项目的实现方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-01-01
  • 详解Docker下使用Elasticsearch可视化Kibana

    详解Docker下使用Elasticsearch可视化Kibana

    本篇文章主要介绍了详解Docker下使用Elasticsearch可视化Kibana,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-04-04
  • 如何使用docker搭建ELK分布式日志同步方案

    如何使用docker搭建ELK分布式日志同步方案

    ELK作为业界最常用日志同步方案,我们今天尝试一下使用docker快速搭建一套ELK方案,ELK使用国内加速源拉取的镜像比较旧,有条件的朋友可以拉取官网的源,感兴趣的朋友跟随小编一起看看吧
    2024-07-07
  • 使用Docker搭建minio的详细图文教程

    使用Docker搭建minio的详细图文教程

    本文介绍了Docker配置镜像源的方法,以及如何在Docker中拉取镜像和创建运行容器,详细说明了使用MinIO创建Bucket、设置AccessKey、安装和配置Cpolar以及SpringBoot集成MinIO的步骤,需要的朋友可以参考下
    2024-10-10
  • docker CPU限制的实现

    docker CPU限制的实现

    这篇文章主要介绍了docker CPU限制的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-11-11
  • 局域网内部署 Docker Registry(推荐)

    局域网内部署 Docker Registry(推荐)

    本文将从创建单机的 Docker Registry 开始,逐步完成局域网内可用的 Docker Registry 的创建,并重点解释如何使用 IP 地址访问 Registry 的方法
    2017-05-05

最新评论