远程过程调用(Remote Proceddure call【RPC】)
(本实例都是使用的Net的客户端,使用C#编写) 在中,我们学习了如何使用工作队列在多个工作实例之间分配耗时的任务。 但是,如果我们需要在远程计算机上运行功能并等待结果怎么办? 那是一个不同的故事。 此模式通常称为远程过程调用或RPC。 在本教程中,我们将使用RabbitMQ构建一个RPC系统:一个客户机和一个可扩展的RPC服务器。 由于我们没有任何值得分发的耗时任务,我们将创建一个返回斐波纳契数字的虚拟RPC服务。1、客户端接口【Client Interface】 为了说明如何使用RPC服务,我们将创建一个简单的客户端类。 它将公开一个名为call的方法,该方法发送RPC请求并阻塞,直到接收到答案:var rpcClient = new RPCClient();Console.WriteLine(" [x] Requesting fib(30)");var response = rpcClient.Call("30");Console.WriteLine(" [.] Got '{0}'", response);rpcClient.Close();
var corrId = Guid.NewGuid().ToString();var props = channel.CreateBasicProperties();props.ReplyTo = replyQueueName;props.CorrelationId = corrId;var messageBytes = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: props, body: messageBytes);// ... then code to read a response message from the callback_queue ...
private static int fib(int n) { if (n == 0 || n == 1) return n; return fib(n - 1) + fib(n - 2); }
1 using System; 2 using RabbitMQ.Client; 3 using RabbitMQ.Client.Events; 4 using System.Text; 5 6 class RPCServer 7 { 8 public static void Main() 9 {10 var factory = new ConnectionFactory() { HostName = "localhost" };11 using (var connection = factory.CreateConnection())12 using (var channel = connection.CreateModel())13 {14 channel.QueueDeclare(queue: "rpc_queue", durable: false,15 exclusive: false, autoDelete: false, arguments: null);16 channel.BasicQos(0, 1, false);17 var consumer = new EventingBasicConsumer(channel);18 channel.BasicConsume(queue: "rpc_queue",19 noAck: false, consumer: consumer);20 Console.WriteLine(" [x] Awaiting RPC requests");21 22 consumer.Received += (model, ea) =>23 {24 string response = null;25 26 var body = ea.Body;27 var props = ea.BasicProperties;28 var replyProps = channel.CreateBasicProperties();29 replyProps.CorrelationId = props.CorrelationId;30 31 try32 {33 var message = Encoding.UTF8.GetString(body);34 int n = int.Parse(message);35 Console.WriteLine(" [.] fib({0})", message);36 response = fib(n).ToString();37 }38 catch (Exception e)39 {40 Console.WriteLine(" [.] " + e.Message);41 response = "";42 }43 finally44 {45 var responseBytes = Encoding.UTF8.GetBytes(response);46 channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,47 basicProperties: replyProps, body: responseBytes);48 channel.BasicAck(deliveryTag: ea.DeliveryTag,49 multiple: false);50 }51 };52 53 Console.WriteLine(" Press [enter] to exit.");54 Console.ReadLine();55 }56 }57 58 ///59 60 /// Assumes only valid positive integer input.61 /// Don't expect this one to work for big numbers, and it's62 /// probably the slowest recursive implementation possible.63 ///64 65 private static int fib(int n)66 {67 if (n == 0 || n == 1)68 {69 return n;70 }71 72 return fib(n - 1) + fib(n - 2);73 }74 }
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6 using RabbitMQ.Client; 7 using RabbitMQ.Client.Events; 8 9 class RPCClient10 {11 private IConnection connection;12 private IModel channel;13 private string replyQueueName;14 private QueueingBasicConsumer consumer;15 16 public RPCClient()17 {18 var factory = new ConnectionFactory() { HostName = "localhost" };19 connection = factory.CreateConnection();20 channel = connection.CreateModel();21 replyQueueName = channel.QueueDeclare().QueueName;22 consumer = new QueueingBasicConsumer(channel);23 channel.BasicConsume(queue: replyQueueName,24 noAck: true,25 consumer: consumer);26 }27 28 public string Call(string message)29 {30 var corrId = Guid.NewGuid().ToString();31 var props = channel.CreateBasicProperties();32 props.ReplyTo = replyQueueName;33 props.CorrelationId = corrId;34 35 var messageBytes = Encoding.UTF8.GetBytes(message);36 channel.BasicPublish(exchange: "",37 routingKey: "rpc_queue",38 basicProperties: props,39 body: messageBytes);40 41 while(true)42 {43 var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();44 if(ea.BasicProperties.CorrelationId == corrId)45 {46 return Encoding.UTF8.GetString(ea.Body);47 }48 }49 }50 51 public void Close()52 {53 connection.Close();54 }55 }56 57 class RPC58 {59 public static void Main()60 {61 var rpcClient = new RPCClient();62 63 Console.WriteLine(" [x] Requesting fib(30)");64 var response = rpcClient.Call("30");65 Console.WriteLine(" [.] Got '{0}'", response);66 67 rpcClient.Close();68 }69 }
var rpcClient = new RPCClient();Console.WriteLine(" [x] Requesting fib(30)");var response = rpcClient.Call("30");Console.WriteLine(" [.] Got '{0}'", response);rpcClient.Close();
cd RPCServerdotnet run# => [x] Awaiting RPC requests
要请求运行客户端的fibonacci号码:
cd RPCClientdotnet run# => [x] Requesting fib(30)