首页 > 数据库 > 一文解读消息中间件RabbitMQ实现简单的RPC服务(图文+源码)
2021
02-07

一文解读消息中间件RabbitMQ实现简单的RPC服务(图文+源码)

attachments-2021-02-Pwh92zM6601e368c993de.png


RPC(Remote Procedure Call, 远程过程调用),是一种计算机通信协议。对于两台机器而言,就是 A 服务器上的应用程序调用 B 服务器上的函数或者方法,由于不在同一个内存空间或机器上运行,因此需要借助于网络通信。


1. RPC 框架

我们首先通过一张图理解 RPC 的工作流程:

attachments-2021-02-o0G9ZrBM601e36a0dfecc.jpg

因此,实现一个最简单的 RPC 服务,只需要 Client、Server 和 Network,本文就是利用消息中间件 RabbitMQ 作为 Network 载体传输信息,实现简单的 RPC 服务。简单原理可如下图所示:

attachments-2021-02-rvci9qL6601e36a7da0e5.jpg

即:当 Client 发送 RPC 请求时,Client 端是消息生产者,Server 端是消息消费者;当 Server 返回结果时,Server 端是消息生产者,Client 是消息消费者;发送和返回使用不同的队列。

接下来我们通过代码,详细展示一个计算斐波那契数列的 RPC 服务。


2. RPCServer 实现

2.1 Server 初始化

1. `/**` 2. `* 队列名、交换机名、路由键` 3. `*/` 4. `private static final String EXCHANGE_NAME = "rpc_exchange";` 5. `private static final String QUEUE_NAME = "request_rpc_queue";` 6. `private static final String ROUTING_KEY = "rpc_routing_key";` 7. `private Connection connection = null;` 8. `private Channel channel = null;` 9. `private QueueingConsumer consumer = null;` 10. `/**` 11. `* Server的构造函数` 12. `*/` 13. `private RPCServer() {` 14. `try {` 15. `//创建链接` 16. `ConnectionFactory factory = new ConnectionFactory();` 17. `factory.setHost(Config.HOST);` 18. `factory.setPort(Config.PORT);` 19. `factory.setUsername(Config.USER);` 20. `factory.setPassword(Config.PASSWORD);` 21. `connection = factory.newConnection();` 22. `//创建信道` 23. `channel = connection.createChannel();` 24. `//设置AMQP的通信结构` 25. `channel.exchangeDeclare(EXCHANGE_NAME, "direct");` 26. `channel.queueDeclare(QUEUE_NAME, false, false, false, null);` 27. `channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);` 28. `//设置消费者` 29. `consumer = new QueueingConsumer(channel);` 30. `channel.basicConsume(QUEUE_NAME, false, QUEUE_NAME, consumer);` 31. `} catch (Exception e) {` 32. `LOG.error("build connection failed!", e);` 33. `}` 34. `}` 

初始化就是声明 RabbitMQ 的链接工厂、链接、信道、队列、交换机等等,并做了绑定,由此构成了 AMQP 的通信结构。

2.2 监听队列并反馈

1.  `/**` 2.  `* 开启server` 3.  `*/` 4.  `private void startServer() {` 5.  `try {` 6.  `LOG.info("Waiting for RPC calls.....");` 7.  `while (true) {` 8.  `//获得文本消息` 9.  `QueueingConsumer.Delivery delivery = consumer.nextDelivery();` 10.  `BasicProperties props = delivery.getProperties();` 11.  `//返回消息的属性` 12.  `BasicProperties replyProps = new BasicProperties.Builder()` 13.  `.correlationId(props.getCorrelationId())` 14.  `.build();` 15.  `long receiveTime = System.currentTimeMillis();` 16.  `JSONObject json = new JSONObject();` 17.  `try {` 18.  `String message = new String(delivery.getBody(), "UTF-8");` 19.  `int n = Integer.parseInt(message);` 20.  `LOG.info("Got a request: fib(" + message + ")");` 21.  `json.put("status", "success");` 22.  `json.put("result", fib(n));` 23.  `} catch (Exception e) {` 24.  `json.put("status", "fail");` 25.  `json.put("reason", "Not a Number!");` 26.  `LOG.error("receive message failed!", e);` 27.  `} finally {` 28.  `long responseTime = System.currentTimeMillis();` 29.  `json.put("calculateTime", (responseTime - receiveTime));` 30.  `channel.basicPublish("", props.getReplyTo(), replyProps, json.toString().getBytes("UTF-8"));` 31.  `channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);` 32.  `}` 33.  `}` 34.  `} catch (Exception e) {` 35.  `LOG.error("server failed!", e);` 36.  `} finally {` 37.  `if (connection != null) {` 38.  `try {` 39.  `connection.close();` 40.  `} catch (Exception e) {` 41.  `LOG.error("close failed!", e);` 42.  `}` 43.  `}` 44.  `}` 45.  `}`

在该方法中使用了一个无限循环,每次处理一条消息。通过调用消费者对象的 nextDelivery 方法来获得 RabbitMQ 队列的最新一条消息。同时通过 getProperties 获取到消息中的反馈信息属性,用于标记客户端 Client 的属性。然后计算斐波那契数列的结果。
最后通过 basicAck 使用消息信封向 RabbitMQ 确认了该消息。

到这里就实现了计算斐波那契数列 RPC 服务的 Server 端。


3. RPCClient 实现

3.1 初始化 CLient

1. `/**` 2. `* 消息请求的队列名、交换机名、路由键` 3. `*/` 4. `private static final String EXCHANGE_NAME = "rpc_exchange";` 5. `private static final String QUEUE_NAME = "request_rpc_queue";` 6. `private static final String ROUTING_KEY = "rpc_routing_key";` 7. `/**` 8. `* 消息返回的队列名、交换机名、路由键` 9. `*/` 10. `private static final String RESPONSE_QUEUE = "response_rpc_queue";` 11. `private static final String RESPONSE_ROUTING_KEY = "response_rpc_routing_key";` 12. `/**` 13. `* RabbitMQ的实体` 14. `*/` 15. `private Connection connection = null;` 16. `private Channel channel = null;` 17. `private QueueingConsumer consumer = null;` 18. `/**` 19. `* 构造客户端` 20. `* @throws Exception` 21. `*/` 22. `private RPCClient() throws Exception {` 23. `ConnectionFactory factory = new ConnectionFactory();` 24. `factory.setHost(Config.HOST);` 25. `factory.setPort(Config.PORT);` 26. `factory.setUsername(Config.USER);` 27. `factory.setPassword(Config.PASSWORD);` 28. `connection = factory.newConnection();` 29. `channel = connection.createChannel();` 30. `channel.exchangeDeclare(EXCHANGE_NAME, "direct");` 31. `channel.queueDeclare(QUEUE_NAME, false, false, false, null);` 32. `channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);` 33. `channel.queueDeclare(RESPONSE_QUEUE, false, false, false, null);` 34. `channel.queueBind(RESPONSE_QUEUE, EXCHANGE_NAME, RESPONSE_ROUTING_KEY);` 35. `consumer = new QueueingConsumer(channel);` 36. `channel.basicConsume(RESPONSE_QUEUE, true, consumer);` 37. `}`

这里声明 AMQP 结构体的方式和 Server 端类似,只不过 Client 端需要多声明一个队列,用于 RPC 的 response。

3.2 发送 / 接收消息

1. `/**` 2. `* 请求server` 3. `* @param message` 4. `* @return` 5. `* @throws Exception` 6. `*/` 7. `private String requestMessage(String message) throws Exception {` 8. `String response = null;` 9. `String corrId = UUID.randomUUID().toString();` 10. `BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(RESPONSE_QUEUE).build();` 11. `channel.basicPublish("", QUEUE_NAME, props, message.getBytes("UTF-8"));` 12. `while (true) {` 13. `QueueingConsumer.Delivery delivery = consumer.nextDelivery();` 14. `if (delivery.getProperties().getCorrelationId().equals(corrId)) {` 15. `response = new String(delivery.getBody(),"UTF-8");` 16. `break;` 17. `}` 18. `}` 19. `return response;` 20. `}` 

BasicProperties 用于存储你请求消息的属性,这里我设置了 correlationId 和 replyTo 属性,用于 Server 端的返回识别。


4. 运行测试

Client 端发送:

attachments-2021-02-Hjh5WTcy601e36cd4ae69.png

Server 端接收并处理:

attachments-2021-02-NS5lLHJe601e36d3852d1.png

Client 收到计算结果:

attachments-2021-02-sqf2gJMi601e36d9b0916.png

由于我运行 RabbitMQ 的服务器是租用的阿里云的,差不多传输时延在 60ms 左右,如果把 RPC 服务和消息中间件同机房部署的话延时基本上就在 ms 级别。


5. FAQ

5.1 说明

需要体验完整的过程,你需要如下环境:

1. `JDK1.6以上 + Maven+ RabbitMQ`

5.2 源代码

完整代码代码请戳:github:https://github.com/chadwick521/rabbitmqdemo

其中 Server 的代码在:

1. `rpc.RPCServer`

Client 端的代码位置:

1. `rpc.RPCClient`

以上内容就是关于基于消息中间件 RabbitMQ 实现简单的 RPC 服务的全部内容了,谢谢你阅读到了这里!

扫码芷若 获取免费视频学习资料

编程学习

查 看2019高级编程视频教程免费获取