Java中RabbitMQ队列实现RPC详解
RabbitMQ实现RPC
如果我们需要在远程计算机上运行一个函数并等待结果,这种模式通常被称为远程过程调用或RPC。
在本教程中,我们将使用RabbitMQ构建一个RPC系统:
- 一个客户端和一个RPC服务器。
- 我们将创建一个返回斐波那契数字的模拟RPC服务。
整个过程示意图如下:
客户端将请求发送至rpc_queue(我们定义的消息队列),然后等待响应;服务端获取请求,并处理请求,然后将请求结果返回给队列,客户端得知请求被响应后获取结果。
在结果被响应之前,客户端是被阻塞的,主线程会等待RPC响应
如果每个RPC请求都创建一个回调队列。这是非常低效,我们创建一个单一的客户端回调队列。
这引发了一个新的问题,在该队列中收到回复时,不清楚回复属于哪个请求。这就需要用到 correlationId属性。
我们为没有请求设置唯一的correlationId值。
然后,当我们在回调队列中收到一条消息时,我们将获取这个值,将响应与请求的进行correlationId匹配。
如果我们一致就是我们需要的结果,否则就不是。
客户端代RPCClient
代码如下:
package com.adtec.rabbitmq; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Envelope; import java.io.IOException; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeoutException; public class RPCClient { private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; public RPCClient() throws IOException, TimeoutException { //建立一个连接和一个通道,并为回调声明一个唯一的'回调'队列 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); //定义一个临时变量的接受队列名 replyQueueName = channel.queueDeclare().getQueue(); } //发送RPC请求 public String call(String message) throws IOException, InterruptedException { //生成一个唯一的字符串作为回调队列的编号 String corrId = UUID.randomUUID().toString(); //发送请求消息,消息使用了两个属性:replyto和correlationId //服务端根据replyto返回结果,客户端根据correlationId判断响应是不是给自己的 AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName) .build(); //发布一个消息,requestQueueName路由规则 channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); //由于我们的消费者交易处理是在单独的线程中进行的,因此我们需要在响应到达之前暂停主线程。 //这里我们创建的 容量为1的阻塞队列ArrayBlockingQueue,因为我们只需要等待一个响应。 final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1); // String basicConsume(String queue, boolean autoAck, Consumer callback) channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //检查它的correlationId是否是我们所要找的那个 if (properties.getCorrelationId().equals(corrId)) { //如果是,则响应BlockingQueue response.offer(new String(body, "UTF-8")); } } }); return response.take(); } public void close() throws IOException { connection.close(); } public static void main(String[] argv) { RPCClient fibonacciRpc = null; String response = null; try { fibonacciRpc = new RPCClient(); System.out.println(" [x] Requesting fib(30)"); response = fibonacciRpc.call("30"); System.out.println(" [.] Got '" + response + "'"); } catch (IOException | TimeoutException | InterruptedException e) { e.printStackTrace(); } finally { if (fibonacciRpc != null) { try { fibonacciRpc.close(); } catch (IOException _ignore) { } } } } }
上面的代码中用到了阻塞队列ArrayBlockingQueue
服务端代RPCServer
代码如下:
package rabbitmq; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Envelope; import java.io.IOException; import java.util.concurrent.TimeoutException; public class RPCServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; //具体处理方法 private static int fib(int n) { if (n == 0) return 0; if (n == 1) return 1; return fib(n - 1) + fib(n - 2); } public static void main(String[] argv) { //建立连接、通道,并声明队列 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; try { connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); System.out.println(" [x] Awaiting RPC requests"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder() .correlationId(properties.getCorrelationId()).build(); String response = ""; try { String message = new String(body, "UTF-8"); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); response += fib(n); } catch (RuntimeException e) { System.out.println(" [.] " + e.toString()); } finally { // 返回处理结果队列 channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8")); // 确认消息,已经收到后面参数 multiple:是否批量.true:将一次性确认所有小于envelope.getDeliveryTag()的消息。 channel.basicAck(envelope.getDeliveryTag(), false); // RabbitMq consumer worker thread notifies the RPC // server owner thread synchronized (this) { this.notify(); } } } }; //取消自动确认 boolean autoAck = false ; channel.basicConsume(RPC_QUEUE_NAME, autoAck, consumer); // Wait and be prepared to consume the message from RPC client. while (true) { synchronized (consumer) { try { consumer.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } catch (IOException | TimeoutException e) { e.printStackTrace(); } finally { if (connection != null) try { connection.close(); } catch (IOException _ignore) { } } } }
测试时先运行服务端,再运行客户端 为了方便观察结果,最好将客户端和服务端在不同workspace实现
客户端结果
服务端结果
到此这篇关于Java中RabbitMQ队列实现RPC详解的文章就介绍到这了,更多相关RabbitMQ实现RPC内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
Skywalking改成适配阿里云等带Http Basic的Elasticsearch服务
这篇文章主要介绍了改造Skywalking支持阿里云等带Http Basic的Elasticsearch服务2022-02-02
最新评论