文档章节

RabbitMQ (5) 远程过程调用(RPC)

玄影镜心
 玄影镜心
发布于 2016/12/14 17:24
字数 583
阅读 52
收藏 1

原理简述:客户端向服务端定义好的Queue发送消息,其中携带的消息就是服务端将要调用的方法的参数 ,并使用Propertis告诉服务端将结果返回到指定的Queue。

服务端:

package sunf.rabbitMQTest;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
 *RabbitMq实现RPC 服务端-测试
 * 
 
 * 
 *1、首先客户端发送一个reply_to和corrention_id的请求,发布到RPC队列中;
 *2、服务器端处理这个请求,并把处理结果发布到一个回调Queue,此Queue的名称应当与reply_to的名称一致
 *3、客户端从回调Queue中得到先前corrention_id设定的值的处理结果。
 *---------------------------------------------
 *deliveryMode:持久化属性
 *contentType:编码
 *replyTo:指定一个回调队列
 *correlationId:消息id
 *---------------------------------------------
 *
 */
public class RPCServer {

    /**
     * 自定义的RPC队列名称
     */
    public static final String RPC_QUEUE_NAME = RabbitMQConstants.RPC_QUEUE_NAME;

    /**
     * 自定义接口方法
     * @param arg
     * @return
     */
    public static String interfaceMethod(String param) {
        return "hello " + param;
    }

    public static void main(String[] args) throws Exception {
        ConnectionFactory connFac = new ConnectionFactory();
        connFac.setHost(RabbitMQConstants.VHOST_IP);
        Connection conn = connFac.newConnection();
        Channel channel = conn.createChannel();

        channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

        while (true) {
            System.out.println("服务端等待接收消息....");
            QueueingConsumer.Delivery deliver = consumer.nextDelivery();
            System.out.println("服务端成功收到消息....");
            BasicProperties props = deliver.getProperties();
            //处理消息,此时得到的消息应该是需要调用方法的参数列表
            //如果有多个参数,服务端制定传入参数的统一格式以便处理。
            String message = new String(deliver.getBody(), "UTF-8");
            String responseMsg = interfaceMethod(message);
            //获取从客户端请求到的correlationId
            String correlationId = props.getCorrelationId();
            //绑定correlationId;
           /* Builder builder = new BasicProperties.Builder();
            builder.correlationId(correlationId);
            BasicProperties responseProps = builder.build();*/
            BasicProperties responseProps = new BasicProperties
                                                .Builder()
                                                .correlationId(correlationId)
                                                .replyTo(RPC_QUEUE_NAME)
                                                .build();
            
            //将结果返回到客户端Queue
            channel.basicPublish("", props.getReplyTo(), responseProps, responseMsg.getBytes("UTF-8"));

            //向客户端确认消息
            channel.basicAck(deliver.getEnvelope().getDeliveryTag(), false);
            System.out.println("服务端返回消息完成..");
        }

    }
}

客户端: 

package sunf.rabbitMQTest;

import java.util.UUID;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
 * RabbitMq实现RPC 客户端-测试
 * 
 *
 */
public class RPCClient {

    public static final String RPC_QUEUE_NAME = RabbitMQConstants.RPC_QUEUE_NAME;

    public static void main(String[] args) throws Exception {
        ConnectionFactory connFac = new ConnectionFactory();
        connFac.setHost(RabbitMQConstants.VHOST_IP);
        Connection conn = connFac.newConnection();
        Channel channel = conn.createChannel();

        //响应QueueName ,服务端将会把要返回的信息发送到该Queue
        String responseQueue = channel.queueDeclare().getQueue();
        //假定一个UUID作为correlationId
        String correlationId = UUID.randomUUID().toString();

        BasicProperties props = new BasicProperties
                                    .Builder()
                                    .replyTo(responseQueue)
                                    .correlationId(correlationId)
                                    .build();

        String message = "西安";
        channel.basicPublish("", RPC_QUEUE_NAME, props, message.getBytes("UTF-8"));

        QueueingConsumer consumer = new QueueingConsumer(channel);

        channel.basicConsume(responseQueue, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            if (delivery.getProperties().getCorrelationId().equals(correlationId)) {
                String result = new String(delivery.getBody());
                System.out.println(result);
            }

        }
    }

}

 

 

本文转载自:http://blog.csdn.net/ysw1132/article/details/51698002

玄影镜心
粉丝 9
博文 102
码字总数 53030
作品 0
西安
高级程序员
私信 提问
深入理解RabbitMQ消息队列的使用-张明阳-专题视频课程

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/a2011480169/article/details/83581214 深入理解RabbitMQ消息队列的使用—126人已学习 课程介绍 RabbitMQ使用...

安静的技术控
2018/06/21
0
0
【急】rabbitmq rpc 服务端如何知道客户端需要调用远程服务端的哪个方法?

rabbitmq rpc 服务端如何知道客户端调用需要调用哪个方法? 如果用RPC实现远程调用的话,通常会指定调用远程的方法,比如A需要反问远程B的ask()方法。 当调用过去之后肯定去执行ask方法了。...

菜太咸
2018/06/01
968
0
使用 RabbitMQ 实现 RPC

背景知识 RabbitMQ RabbitMQ 是基于 AMQP 协议实现的一个消息队列(Message Queue),Message Queue 是一个典型的生产者/消费者模式。生产者发布消息,消费者消费消息,生产者和消费者之间是...

Java干货分享
2018/10/26
427
0
python操作rabbitmq操作数据

一、RabbitMQ 消息队列介绍 RabbitMQ也是消息队列,那RabbitMQ和之前python的Queue有什么区别么? 1 2 3 4 5 6 7 8 2)Install RabbitMQ Server 1 2 3)use RabbitMQ Server 1 2 2、基本示例...

数据架构师
2018/08/27
0
0
RabbitMQ高级指南:从配置、使用到高可用集群搭建

作者介绍 章为忠,随变科技.net架构师。致力于电商领域的开发与架构设计工作,拥有丰富的电商网站架构搭建经验。博客:http://www.cnblogs.com/zhangweizhong/。 本文大纲: 1. RabbitMQ简介...

章为忠
2017/04/27
0
0

没有更多内容

加载失败,请刷新页面

加载更多

EDI 电子数据交换全解指南

EDI(Electronic Data Interchange,电子数据交换)技术使得企业与企业(B2B)实现通信自动化,帮助交易伙伴和组织更快更好地完成更多工作,并消除了人工操作带来的错误。从零售商到制造商、物...

EDI知行软件
今天
3
0
CentOS7的LVM动态扩容

# 问题 CentOS7上面的磁盘空间有点紧张,需要扩容。 解决 查询当前磁盘状态 [root@xxx ~]# lsblkNAME MAJ:MIN RM SIZE RO TYPE MOUNTPOINTfd0 2:0 1 4K ...

亚林瓜子
今天
5
0
Kafka 0.8 Producer (0.9以前版本适用)

Kafka旧版本producer由scala编写,0.9以后已经废除 示例代码如下: import kafka.producer.KeyedMessage;import kafka.javaapi.producer.Producer;import kafka.producer.ProducerConfig;......

实时计算
今天
5
0
Giraph源码分析(八)—— 统计每个SuperStep中参与计算的顶点数目

作者|白松 目的:科研中,需要分析在每次迭代过程中参与计算的顶点数目,来进一步优化系统。比如,在SSSP的compute()方法最后一行,都会把当前顶点voteToHalt,即变为InActive状态。所以每次...

数澜科技
今天
6
0
Navicat 快捷键

操作 结果 ctrl+q 打开查询窗口 ctrl+/ 注释sql语句 ctrl+shift +/ 解除注释 ctrl+r 运行查询窗口的sql语句 ctrl+shift+r 只运行选中的sql语句 F6 打开一个mysql命令行窗口 ctrl+l 删除一行 ...

低至一折起
今天
13
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部