文档章节

基于JMS实现RPC功能

ksfzhaohui
 ksfzhaohui
发布于 2017/06/13 22:25
字数 1497
阅读 40
收藏 0
点赞 0
评论 0

前言
JMS的点对点消息传送模式允许JMS客户端通过队列(queue)这个虚拟通道来同步和异步发送、接收消息;点对点消息传送模式既支持异步“即发即弃”消息传送方式,又支持同步请求/应答消息传送方式;点对点消息传送模式支持负载均衡,它允许多个接收者监听同一队列,并以此来分配负载;所以完全可以以JMS的点对点消息传送模式实现一套RPC框架。

准备
jdk:jdk1.7.0_80
jms:apache-activemq-5.10.0
serialize:protostuff

整体结构
整个实现包括:rpc-jms-common,rpc-jms-client,rpc-jms-server;
以及测试模块:rpc-jms-test-api,rpc-jms-test-client,rpc-jms-test-server
整体如下图所示:

rpc-jms-common:公共模块
rpc-jms-client:给客户端使用的支持库
rpc-jms-server:给服务端使用的支持库

客户端实现
客户端其实核心思想都是动态代理,生成的代理类将接口名称,版本信息,方法名,参数类型,参数值等信息封装成RpcRequest,然后使用protostuff对其进行序列化操作,将序列化后的二进制数据放入BytesMessage中,发送给消息队列;然后等待消息的返回,从BytesMessage中获取二进制数据,使用protostuff进行反序列化操作;最后将结果或者异常展示给用户,具体代码如下:

public class RpcClient {
 
    private static final Logger LOGGER = LoggerFactory.getLogger(RpcClient.class);
 
    private QueueConnection qConnection;
    private QueueSession qSession;
    private Queue requestQ;
    private Queue responseQ;
 
    public RpcClient(String rpcFactory, String rpcRequest, String rpcResponse) {
        try {
            Context ctx = new InitialContext();
            QueueConnectionFactory factory = (QueueConnectionFactory) ctx.lookup(rpcFactory);
            qConnection = factory.createQueueConnection();
            qSession = qConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            requestQ = (Queue) ctx.lookup(rpcRequest);
            responseQ = (Queue) ctx.lookup(rpcResponse);
            qConnection.start();
        } catch (Exception e) {
            LOGGER.error("init rpcproxy error", e);
        }
    }
 
    public <T> T create(final Class<?> interfaceClass) {
        return create(interfaceClass, "");
    }
 
    @SuppressWarnings("unchecked")
    public <T> T create(final Class<?> interfaceClass, final String serviceVersion) {
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] { interfaceClass },
                new InvocationHandler() {
 
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        RpcRequest request = new RpcRequest();
                        request.setRequestId(UUID.randomUUID().toString());
                        request.setInterfaceName(method.getDeclaringClass().getName());
                        request.setServiceVersion(serviceVersion);
                        request.setMethodName(method.getName());
                        request.setParameterTypes(method.getParameterTypes());
                        request.setParameters(args);
 
                        BytesMessage requestMessage = qSession.createBytesMessage();
                        requestMessage.writeBytes(SerializationUtil.serialize(request));
                        requestMessage.setJMSReplyTo(responseQ);
                        QueueSender qsender = qSession.createSender(requestQ);
                        qsender.send(requestMessage);
 
                        String filter = "JMSCorrelationID = '" + requestMessage.getJMSMessageID() + "'";
                        QueueReceiver qReceiver = qSession.createReceiver(responseQ, filter);
                        BytesMessage responseMessage = (BytesMessage) qReceiver.receive(30000);
                        byte messByte[] = new byte[(int) responseMessage.getBodyLength()];
                        responseMessage.readBytes(messByte);
                        RpcResponse rpcResponse = SerializationUtil.deserialize(messByte, RpcResponse.class);
 
                        if (rpcResponse.hasException()) {
                            throw rpcResponse.getException();
                        } else {
                            return rpcResponse.getResult();
                        }
                    }
                });
    }
}

RpcClient被创建的时候就和jms建立了连接,相关jms的配置信息在测试部分讲解。

其中封装请求类RpcRequest代码如下:

public class RpcRequest {
 
    private String requestId;
    private String interfaceName;
    private String serviceVersion;
    private String methodName;
    private Class<?>[] parameterTypes;
    private Object[] parameters;
    //省略get和set方法
}

封装响应类RpcResponse代码如下:

public class RpcResponse{
 
    private String requestId;
    private Exception exception;
    private Object result;
    //省略get和set方法
}

服务端实现
服务器端首先加载所有需要对外提供服务的类对象;然后监听消息队列,从BytesMessage中获取二进制数据,通过protostuff反序列化为RpcRequest对象;最后通过反射的方式调用服务类,获取的结果封装成RpcResponse,然后序列化二进制写入BytesMessage中,发送给客户端,具体代码如下:

public class RpcServer implements ApplicationContextAware, InitializingBean {
 
    private static final Logger LOGGER = LoggerFactory.getLogger(RpcServer.class);
 
    private QueueConnection qConnection;
    private QueueSession qSession;
    private Queue requestQ;
 
    private String rpcFactory;
    private String rpcRequest;
 
    /**
     * 存放 服务名 与 服务对象 之间的映射关系
     */
    private Map<String, Object> serviceMap = new HashMap<String, Object>();
 
    public RpcServer(String rpcFactory, String rpcRequest) {
        this.rpcFactory = rpcFactory;
        this.rpcRequest = rpcRequest;
    }
 
    @Override
    public void afterPropertiesSet() throws Exception {
        try {
            Context ctx = new InitialContext();
            QueueConnectionFactory factory = (QueueConnectionFactory) ctx.lookup(rpcFactory);
            qConnection = factory.createQueueConnection();
            qSession = qConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            requestQ = (Queue) ctx.lookup(rpcRequest);
            qConnection.start();
 
            QueueReceiver receiver = qSession.createReceiver(requestQ);
            receiver.setMessageListener(new RpcMessageListener(qSession, serviceMap));
 
            LOGGER.info("ready receiver message");
        } catch (Exception e) {
            if (qConnection != null) {
                try {
                    qConnection.close();
                } catch (JMSException e1) {
                }
            }
            LOGGER.error("server start error", e);
        }
    }
 
    @Override
    public void setApplicationContext(ApplicationContext ctx) throws BeansException {
        Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(RpcService.class);
        if (serviceBeanMap != null && serviceBeanMap.size() > 0) {
            for (Object serviceBean : serviceBeanMap.values()) {
                RpcService rpcService = serviceBean.getClass().getAnnotation(RpcService.class);
                String serviceName = rpcService.value().getName();
                String serviceVersion = rpcService.version();
                if (serviceVersion != null && !serviceVersion.equals("")) {
                    serviceName += "-" + serviceVersion;
                }
                serviceMap.put(serviceName, serviceBean);
            }
        }
    }
 
}

消息监听器类RpcMessageListener:

public class RpcMessageListener implements MessageListener {
 
    private static final Logger LOGGER = LoggerFactory.getLogger(RpcMessageListener.class);
 
    private QueueSession qSession;
    /**
     * 存放 服务名 与 服务对象 之间的映射关系
     */
    private Map<String, Object> serviceMap = new HashMap<String, Object>();
 
    public RpcMessageListener(QueueSession qSession, Map<String, Object> serviceMap) {
        this.qSession = qSession;
        this.serviceMap = serviceMap;
    }
 
    @Override
    public void onMessage(Message message) {
        try {
            LOGGER.info("receiver message : " + message.getJMSMessageID());
            RpcResponse response = new RpcResponse();
            BytesMessage responeByte = qSession.createBytesMessage();
            responeByte.setJMSCorrelationID(message.getJMSMessageID());
            QueueSender sender = qSession.createSender((Queue) message.getJMSReplyTo());
            try {
                BytesMessage byteMessage = (BytesMessage) message;
                byte messByte[] = new byte[(int) byteMessage.getBodyLength()];
                byteMessage.readBytes(messByte);
                RpcRequest rpcRequest = SerializationUtil.deserialize(messByte, RpcRequest.class);
 
                response.setRequestId(rpcRequest.getRequestId());
 
                String serviceName = rpcRequest.getInterfaceName();
                String serviceVersion = rpcRequest.getServiceVersion();
                if (serviceVersion != null && !serviceVersion.equals("")) {
                    serviceName += "-" + serviceVersion;
                }
                Object serviceBean = serviceMap.get(serviceName);
                if (serviceBean == null) {
                    throw new RuntimeException(String.format("can not find service bean by key: %s", serviceName));
                }
 
                Class<?> serviceClass = serviceBean.getClass();
                String methodName = rpcRequest.getMethodName();
                Class<?>[] parameterTypes = rpcRequest.getParameterTypes();
                Object[] parameters = rpcRequest.getParameters();
 
                Method method = serviceClass.getMethod(methodName, parameterTypes);
                method.setAccessible(true);
                Object result = method.invoke(serviceBean, parameters);
                response.setResult(result);
            } catch (Exception e) {
                response.setException(e);
                LOGGER.error("onMessage error", e);
            }
            responeByte.writeBytes(SerializationUtil.serialize(response));
            sender.send(responeByte);
        } catch (Exception e) {
            LOGGER.error("send message error", e);
        }
    }
 
}

测试
1.rpc-jms-test-api:接口模块,被rpc-jms-test-client和rpc-jms-test-server共同使用
IHelloService类:

public interface IHelloService {
 
    String hello(String name);
}

2.rpc-jms-test-server:服务器端测试模块,依赖rpc-jms-server
jms相关信息配置在jndi.properties中,如下所示:

java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:61616
java.naming.security.principal=system
java.naming.security.credentials=manager
 
connectionFactoryNames=RPCFactory
 
queue.RPCRequest=jms.RPCRequest
queue.RPCResponse=jms.RPCResponse

服务器端主要依赖的spring-server.xml配置文件,主要用于实例化RpcServer

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context.xsd">
 
    <context:component-scan base-package="zh.rpc.jms.test.server.impl" />
 
    <context:property-placeholder location="classpath:jms.properties" />
 
    <bean id="rpcServer" class="zh.rpc.jms.server.RpcServer">
        <constructor-arg name="rpcFactory" value="${rpc.rpc_factory}" />
        <constructor-arg name="rpcRequest" value="${rpc.rpc_request}" />
    </bean>
</beans>

IHelloService的具体实现类:

@RpcService(IHelloService.class)
public class HelloServiceImpl implements IHelloService {
 
    @Override
    public String hello(String name) {
        return "REQ+" + name;
    }
}

3.rpc-jms-test-client:客户端测试模块,依赖rpc-jms-client
客户端同样需要连接消息队列,所以也提供了配置文件jndi.properties;
客户端主要依赖的spring-client.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context.xsd">
 
    <context:property-placeholder location="classpath:jms.properties" />
 
    <bean id="rpcProxy" class="zh.rpc.jms.client.RpcClient">
        <constructor-arg name="rpcFactory" value="${rpc.rpc_factory}" />
        <constructor-arg name="rpcRequest" value="${rpc.rpc_request}" />
        <constructor-arg name="rpcResponse" value="${rpc.rpc_response}" />
    </bean>
</beans>

客户端测试类:

public class ClientTest {
 
    private static ApplicationContext context;
 
    public static void main(String[] args) throws Exception {
        context = new ClassPathXmlApplicationContext("spring-client.xml");
        RpcClient rpcProxy = context.getBean(RpcClient.class);
 
        IHelloService helloService = rpcProxy.create(IHelloService.class);
        String result = helloService.hello("World");
        System.out.println(result);
        System.exit(0);
    }
}

4.测试
首先启动事先准备的activemq,运行bin\win64\activemq.bat即可;
然后启动服务器ServerTest,ready receiver message
最后运行ClientTest,发送消息验证结果,结果如下:

REQ+World

以上只列出了部分较为重要的代码,更多详细的可以参考:https://github.com/ksfzhaohui/rpc-jms.git

© 著作权归作者所有

共有 人打赏支持
ksfzhaohui

ksfzhaohui

粉丝 296
博文 126
码字总数 153828
作品 3
南京
高级程序员
Tomcat,JBoss与JBoss Web

最近接触到应用服务器JBoss,此外JBoss Web与Tomcat也同为web服务器,便查阅资料对三者进行比较,供大家参考。 一、Tomcat Tomcat 服务器是免费开源的Web 应用服务器。支持最新的Servlet 和J...

thinkyoung ⋅ 2014/11/16 ⋅ 0

web项目中web.xml的作用

每个javaEE工程中都有web.xml文件,那么它的作用是什么呢?它是每个web.xml工程都必须的吗? 一个web中可以没有web.xml文件,也就是说,web.xml文件并不是web工程必须的。 web.xml文件是用来...

ChinaHYF ⋅ 04/27 ⋅ 0

JSP 学习总结---学习笔记

什么是JSP 1)为什么说,Servlet是一个动态Web开发技术呢? Servlet是基于服务端的一种动态交互技术, HttpServletRequest表示客户端到服务端的对象 HttpServletResponse表示服务端到客户端的...

知止内明 ⋅ 04/18 ⋅ 0

Servlet的一些细节--学习笔记

Servlet细节 1)浏览器访问的url-pattern只是一个符合格式的任意字符串,以/开头 2)一个Servlet的url-pattern可以是1个或多个,有二种形式; a).xx b)/xx/ 注意:/不能一起直接使用 3)/和....

知止内明 ⋅ 04/17 ⋅ 0

Apache Qpid JMS AMQP 0-x 6.3.1 发布

Apache Qpid JMS AMQP 0-x 6.3.1 已发布。Qpid JMS AMQP 0-x 是兼容 JMS 1.1 的客户端,等同于 AMQP 0-8,0-9,0-9-1 和 0-10。Qpid JMS 是一个使用 Qpid Proton 协议引擎的 Java Message Ser......

淡漠悠然 ⋅ 05/21 ⋅ 0

web.xml中在Servlet中获取context-param和init-param内的参数

web.xml里面可以定义两种参数: 1.application范围内的参数,存放在servletcontext中,在web.xml中配置如下: <context-param> <param-name>context/param</param-name> <param-value>avalib......

村长大神 ⋅ 2014/10/27 ⋅ 0

Java Web(一) Servlet详解!!

一、什么是servlet?     处理请求和发送响应的过程是由一种叫做Servlet的程序来完成的,并且Servlet是为了解决实现动态页面而衍生的东西。理解这个的前提是了解一些http协议的东西,并且...

architect刘源源 ⋅ 05/08 ⋅ 0

【J2EE】之常用的接口和协议

前言 初接触J2EE,会遇到很多之前没有遇到过的术语,下面我们来简单地汇总一下这些接口和协议吧。 各大接口和协议详解 JDBC 1.定义:Java数据库连接 2.地位:用于Java应用程序连接数据库的标...

m18633778874 ⋅ 04/26 ⋅ 0

J2EE进阶(二十四)JBoss Web和 Tomcat的区别

J2EE进阶(二十四)JBoss Web和 Tomcat的区别 在Web2.0的浪潮中,各种页面技术和框架不断涌现,为服务器端的基础架构提出了更高的稳定性和可扩展性的要求。近年来,作为开源中间件的全球领导者...

sunhuaqiang1 ⋅ 05/27 ⋅ 0

tomcat和jetty的区别

===========2017年11月3日08:33:07======================== 相同点 1.tomcat与jetty都是一种servlet引擎,他们都支持标准的servlet规范和javaEE规范 不同点 1.架构比较 jetty相比tomcat更为...

anlve ⋅ 05/27 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

Linux中的端口大全

1 被LANA定义的端口 端口 名称 描述 1 tcpmux TCP 端口服务多路复用 5 rje 远程作业入口 7 echo Echo 服务 9 discard 用于连接测试的空服务 11 systat 用于列举连接了的端口的系统状态 13 d...

寰宇01 ⋅ 11分钟前 ⋅ 0

Confluence 6 如何备份存储文件和页面信息

备份的 ZIP 文件包含有 entities.xml,这个 XML 文件包含有 Confluence 的所有页面内容和存储附件的目录。 备份 Zip 文件结构 页面的附件是存储在附件存储目录中的,通过页面和附件 ID 进行识...

honeymose ⋅ 14分钟前 ⋅ 0

【每天一个JQuery特效】根据状态确定是否滑入或滑出被选元素

主要效果: 本文主要采用slideToggle()方法实现以一行代码同时实现以展开或收缩的方式显示或隐藏被选元素。 主要代码如下: <!DOCTYPE html><html><head><meta charset="UTF-8">...

Rhymo-Wu ⋅ 18分钟前 ⋅ 0

度量.net framework 迁移到.net core的工作量

把现有的.net framework程序迁移到.net core上,是一个非常复杂的工作,特别是一些API在两个平台上还不能同时支持。两个类库的差异性,通过人工很难识别全。好在微软的工程师们考虑到了我们顾...

李朝强 ⋅ 23分钟前 ⋅ 0

请不要在“微服务”的狂热中迷失自我!

微服务在过去几年一直是一个非常热门的话题(附录1)。何为“微服务的疯狂”,举个例子: 众所周知,Netflix在DevOps上的表现非常棒。Netfix可以做微服务。因此:如果我做微服务,我也将非常...

harries ⋅ 24分钟前 ⋅ 0

oAuth2 升级Spring Cloud Finchley.RELEASE踩坑分享

背景 6.19号,spring团队发布了期待已久的 Spring Cloud Finchley.RELEASE 版本。 重要变化: 基于Spring Boot 2.0.X 不兼容 Spring Boot 1.5.X 期间踩过几个坑,分享出来给大伙,主要是关于...

冷冷gg ⋅ 54分钟前 ⋅ 0

OSChina 周一乱弹 —— 理发师小姐姐的魔法

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @冰冰棒- :分享田馥甄的单曲《My Love》 《My Love》- 田馥甄 手机党少年们想听歌,请使劲儿戳(这里) @Li-Wang :哎,头发又长了。。。又要...

小小编辑 ⋅ 今天 ⋅ 8

Kafka1.0.X_消费者API详解2

偏移量由消费者管理 kafka Consumer Api还提供了自己存储offset的功能,将offset和data做到原子性,可以让消费具有Exactly Once 的语义,比kafka默认的At-least Once更强大 消费者从指定分区...

特拉仔 ⋅ 今天 ⋅ 0

NEO智能合约之发布和升级(二)

接NEO智能合约之发布和升级(一),我们接下来说说智能合约的升级功能。 一 准备工作 合约的升级需要在合约内预先设置好升级接口,以方便在升级时调用。接下来我们对NEO智能合约之发布和升级...

红烧飞鱼 ⋅ 今天 ⋅ 0

个人博客的运营模式能否学习TMALL天猫质量为上?

心情随笔|个人博客的运营模式能否学习TMALL天猫质量为上? 中国的互联网已经发展了很多年了,记得在十年前,个人博客十分流行,大量的人都在写博客,而且质量还不错,很多高质量的文章都是在...

原创小博客 ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部