Java中RabbitMQ队列实现RPC详解

 更新时间:2023年08月29日 08:29:27   作者:轻尘×  
这篇文章主要介绍了Java中RabbitMQ队列实现RPC详解,在本教程中,我们将使用RabbitMQ构建一个RPC系统:一个客户端和一个RPC服务器,我们将创建一个返回斐波那契数字的模拟RPC服务,,需要的朋友可以参考下

RabbitMQ实现RPC

如果我们需要在远程计算机上运行一个函数并等待结果,这种模式通常被称为远程过程调用或RPC。

在本教程中,我们将使用RabbitMQ构建一个RPC系统:

  1. 一个客户端和一个RPC服务器。
  2. 我们将创建一个返回斐波那契数字的模拟RPC服务。

整个过程示意图如下:

这里写图片描述

客户端将请求发送至rpc_queue(我们定义的消息队列),然后等待响应;服务端获取请求,并处理请求,然后将请求结果返回给队列,客户端得知请求被响应后获取结果。

在结果被响应之前,客户端是被阻塞的,主线程会等待RPC响应

如果每个RPC请求都创建一个回调队列。这是非常低效,我们创建一个单一的客户端回调队列。

这引发了一个新的问题,在该队列中收到回复时,不清楚回复属于哪个请求。这就需要用到 correlationId属性。

我们为没有请求设置唯一的correlationId值。

然后,当我们在回调队列中收到一条消息时,我们将获取这个值,将响应与请求的进行correlationId匹配。

如果我们一致就是我们需要的结果,否则就不是。

客户端代RPCClient

代码如下:

package com.adtec.rabbitmq;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
public class RPCClient {
    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";
    private String replyQueueName;
    public RPCClient() throws IOException, TimeoutException {
        //建立一个连接和一个通道,并为回调声明一个唯一的'回调'队列
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        connection = factory.newConnection();
        channel = connection.createChannel();
        //定义一个临时变量的接受队列名    
        replyQueueName = channel.queueDeclare().getQueue();
    }
    //发送RPC请求  
    public String call(String message) throws IOException, InterruptedException {
         //生成一个唯一的字符串作为回调队列的编号
        String corrId = UUID.randomUUID().toString();
        //发送请求消息,消息使用了两个属性:replyto和correlationId
        //服务端根据replyto返回结果,客户端根据correlationId判断响应是不是给自己的
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName)
                .build();
        //发布一个消息,requestQueueName路由规则
        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
        //由于我们的消费者交易处理是在单独的线程中进行的,因此我们需要在响应到达之前暂停主线程。
        //这里我们创建的 容量为1的阻塞队列ArrayBlockingQueue,因为我们只需要等待一个响应。
        final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
        // String basicConsume(String queue, boolean autoAck, Consumer callback)
        channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                    byte[] body) throws IOException {
                //检查它的correlationId是否是我们所要找的那个
                if (properties.getCorrelationId().equals(corrId)) {
                    //如果是,则响应BlockingQueue
                    response.offer(new String(body, "UTF-8"));
                }
            }
        });
        return response.take();
    }
    public void close() throws IOException {
        connection.close();
    }
    public static void main(String[] argv) {
        RPCClient fibonacciRpc = null;
        String response = null;
        try {
            fibonacciRpc = new RPCClient();
            System.out.println(" [x] Requesting fib(30)");
            response = fibonacciRpc.call("30");
            System.out.println(" [.] Got '" + response + "'");
        } catch (IOException | TimeoutException | InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (fibonacciRpc != null) {
                try {
                    fibonacciRpc.close();
                } catch (IOException _ignore) {
                }
            }
        }
    }
}

上面的代码中用到了阻塞队列ArrayBlockingQueue

服务端代RPCServer

代码如下:

package rabbitmq;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RPCServer {
    private static final String RPC_QUEUE_NAME = "rpc_queue";
    //具体处理方法
    private static int fib(int n) {
        if (n == 0)
            return 0;
        if (n == 1)
            return 1;
        return fib(n - 1) + fib(n - 2);
    }
    public static void main(String[] argv) {
         //建立连接、通道,并声明队列 
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = null;
        try {
            connection = factory.newConnection();
            final Channel channel = connection.createChannel();
            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
            channel.basicQos(1);
            System.out.println(" [x] Awaiting RPC requests");
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                        byte[] body) throws IOException {
                    AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
                            .correlationId(properties.getCorrelationId()).build();
                    String response = "";
                    try {
                        String message = new String(body, "UTF-8");
                        int n = Integer.parseInt(message);
                        System.out.println(" [.] fib(" + message + ")");
                        response += fib(n);
                    } catch (RuntimeException e) {
                        System.out.println(" [.] " + e.toString());
                    } finally {
                        // 返回处理结果队列
                        channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
                        //  确认消息,已经收到后面参数 multiple:是否批量.true:将一次性确认所有小于envelope.getDeliveryTag()的消息。
                        channel.basicAck(envelope.getDeliveryTag(), false);
                        // RabbitMq consumer worker thread notifies the RPC
                        // server owner thread
                        synchronized (this) {
                            this.notify();
                        }
                    }
                }
            };
            //取消自动确认
            boolean autoAck = false ;
            channel.basicConsume(RPC_QUEUE_NAME, autoAck, consumer);
            // Wait and be prepared to consume the message from RPC client.
            while (true) {
                synchronized (consumer) {
                    try {
                        consumer.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        } finally {
            if (connection != null)
                try {
                    connection.close();
                } catch (IOException _ignore) {
                }
        }
    }
}

测试时先运行服务端,再运行客户端 为了方便观察结果,最好将客户端和服务端在不同workspace实现

客户端结果

这里写图片描述

服务端结果

这里写图片描述

到此这篇关于Java中RabbitMQ队列实现RPC详解的文章就介绍到这了,更多相关RabbitMQ实现RPC内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • MyBatis+MySQL 返回插入的主键ID的方法

    MyBatis+MySQL 返回插入的主键ID的方法

    本篇文章主要介绍了MyBatis+MySQL 返回插入的主键ID的方法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下。
    2017-04-04
  • mybatis-plus处理blob字段的完整示例代码

    mybatis-plus处理blob字段的完整示例代码

    在Spring Boot项目中使用MyBatis-Plus处理longblob字段时,我们可以按照本文的步骤进行操作,假设 longblob 存储的是字符串数据,本文给大家提供完整示例代码,感兴趣的朋友参考下
    2023-12-12
  • Java线程之程安全与不安全代码示例

    Java线程之程安全与不安全代码示例

    这篇文章主要介绍了Java线程之程安全与不安全代码示例,还是比较不错的,这里分享给大家,供需要的朋友参考。
    2017-11-11
  • Skywalking改成适配阿里云等带Http Basic的Elasticsearch服务

    Skywalking改成适配阿里云等带Http Basic的Elasticsearch服务

    这篇文章主要介绍了改造Skywalking支持阿里云等带Http Basic的Elasticsearch服务
    2022-02-02
  • 使用WebSocket实现即时通讯(一个群聊的聊天室)

    使用WebSocket实现即时通讯(一个群聊的聊天室)

    这篇文章主要为大家详细介绍了使用WebSocket实现即使通讯,实现一个群聊的聊天室,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-03-03
  • java中串行流和并行流区别小结

    java中串行流和并行流区别小结

    串行流和并行流是两种处理流操作的方式,串行流适用于小数据量且简单的数据处理,并行流则适用于大规模数据处理和需要并行计算的场景,能够利用多线程并行处理,选择使用哪种流取决于数据量大小、处理复杂度和是否需要并行计算,下面就来具体介绍一下两者的区别
    2024-09-09
  • Java数组队列概念与用法实例分析

    Java数组队列概念与用法实例分析

    这篇文章主要介绍了Java数组队列概念与用法,结合实例形式分析了Java数组队列相关概念、原理、用法及操作注意事项,需要的朋友可以参考下
    2020-03-03
  • maven如何查看jar的pom引入来源

    maven如何查看jar的pom引入来源

    这篇文章主要介绍了maven查看jar的pom引入来源,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-07-07
  • Java注解Annotaton详解

    Java注解Annotaton详解

    Java 注解(Annotation)又称 Java 标注,是 JDK5.0 引入的一种注释机制,文中给大家介绍了三种基本的Annotaton,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧
    2022-05-05
  • java中CopyOnWriteArrayList源码解析

    java中CopyOnWriteArrayList源码解析

    为了将读取的性能发挥到极致,jdk中提供了CopyOnWriteArrayList类,下面这篇文章主要给大家介绍了关于java中CopyOnWriteArrayList源码解析的相关资料,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
    2022-02-02

最新评论