文档章节

dubbo 之remotting (1)

hyssop
 hyssop
发布于 2016/08/21 13:44
字数 1688
阅读 86
收藏 0

dubbo rpc篇中我们介绍了duboo在服务器段是如何将服务层层包装成invoker,通过协议工厂代理成exchanger发布消息出去的。我们又介绍了客户端是如何将接口动态封装成wrapper,通过协议类反向走代理链,找到真正的服务的。rpc文没有详细的将通信过程,本文将基于dubbo-remoting包具体讲解服务是怎么发布出去的。 #服务器端 上文中我们说到服务器端通过在类DubboProtocol中将Invoker转成Exporter,通过dubbo协议发布url到注册中心。接下来我从这里开始去发现服务器端并建立连接。 DubboProtocol里面的export方法将invoker转换成为了exporter对象。并调用 openServer(url);发布服务

serverMap.put(key, createServer(url));

#1、使用netty协议建立连接 ###1.1、在url里面放置netty连接需要的参数。 (channel.readonly.sent,heartbeat、server、codec) ###1.2、调用 Exchangers.bind(url, requestHandler)建立服务器实例 这个方法做了两件事情 首先:通过ExtendLoader找到url指定的协议对应的Transporter(默认是NettyTransporter)。 其次:调用Transporter的bind(URL url, ChannelHandler listener)方法新建一个对应协议连接,并将实例服务实例返回。

 public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }
    
     public NettyServer(URL url, ChannelHandler handler) throws RemotingException{
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }

Exchangers.bind(url, requestHandler)的调用比较复杂。 首先是这个Exchangers类的bind方法首先要根据url配置路由到真正的实现类

 getExchanger(url).bind(url, handler);
 
  public static Exchanger getExchanger(URL url) {
        String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
        return getExchanger(type);
    }
       public static Exchanger getExchanger(String type) {
        return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
    }

最终找到了类 HeaderExchanger 调用该类的bind(url, handler)方法。

 public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

该方法首先要包装Handler,将hander包装在编码处理器中。 然后,Transporters.bind方法。该方法也是有多步组成:

1、调用getTransporter获得Transporter适配器。

####2、调用适配器的bind方法时候会 感应url信息去找到真正的Transporter实现类(默认是netty)。 ####3、到真正的Transporter实现类中调用bind方法。

  public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }
      public NettyServer(URL url, ChannelHandler handler) throws RemotingException{
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }

虽然表面上这里只是一个初始化的过程,其实不然,nettyServer在初始化之后通过调用doOpen()方法还建立了连接。 至此我们也修改下上文的错误,invoker并没有发布到远程,这里只是简单的进行了netty协议的链接。 接下来的内容就是netty包的内容。后文细讲。 至此,dubbo的远程连接就到这里了,我们来屡屡这个思路。首先从dubboProtocol那儿开始讲一个叫requestHandler的实例层层包装给了dubbo的handler。

#客户端 上文中我们说到客户端最后是调用DubboInvoker的doInvoker方法去发现真正的执行类。接下来从这里出发,发现客户端在远程调用的过程都做了什么。 首先会针对请求做一系列判断,是否是异步,是否是本地服务请求,会根据不同请求类型做不同处理。如果是远程请求,那么调用方法

currentClient.request(inv, timeout).get()

首先从clients里面获取ReferenceCountExchangeClient实例,然后调用该实例的request方法。

public ResponseFuture request(Object request, int timeout) throws RemotingException {
        return client.request(request, timeout);
    }

上面的client指的是 HeaderExchangeClient,她得request方法是要继续查找通道发送请求。

     public ResponseFuture request(Object request) throws RemotingException {
        return channel.request(request);
    }

找到了 HeaderExchangeChannel,它将invoker信息包装成了一个request,去找真正的通道发送。

      public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // 创建一个请求,将版本号,invoker信息包装成数据发送给远程。
        Request req = new Request();
        req.setVersion("2.0.0");
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try{
            channel.send(req);
        }catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }

最终找到了nettyClient,调用子类的send方法发送信息。 发送完信息,服务器端会通过netty接收到该请求。 首先、nettyHandler的方法获得该请求。 然后、调用HeaderExchangeHandler的messageReceived继续往后传递。 传递过程中汇调用requestHandler的两个逻辑。 一个是连接请求。和客户端建立连接。 一个是处理请求调用reply方法处理请求获得结果。该方法里面会调用invoke方法执行具体的逻辑

 public void connected(Channel channel) throws RemotingException {
            invoke(channel, Constants.ON_CONNECT_KEY);
        }
 public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
   return invoker.invoke(inv);
 }

找到代理类的doInvoke方法去执行


    public Result invoke(Invocation invocation) throws RpcException {
        try {
            return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
        } catch (InvocationTargetException e) {
            return new RpcResult(e.getTargetException());
        } catch (Throwable e) {
            throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

比如找到javasistProxyFactory则这样执行

   public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper类不能正确处理带$的类名
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName, 
                                      Class<?>[] parameterTypes, 
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

回到上文中我们交代了这个wrapper实例的强大,我们截取他getSize 方法就会发现这时候调用的就是实现类的getSize方法了,返回的结果就是实际的结果了。 最后让我们再来仔细看看请求处理器requestHandler 该匿名类主要包括几个方面的方法 1、接收到信息如何处理 如果是服务端,自然是要调用代理类找实现类。如果是客户端,那应该是应该讲结果返回给相应调用。 2、连接请求调用onconnect事务处理逻辑进行连接处理。 3、断开连接的请求调用ondisconnect处理逻辑继续断开连接处理。

 private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
        
        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                Invoker<?> invoker = getInvoker(channel, inv);
                //如果是callback 需要处理高版本调用低版本的问题
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
                    String methodsStr = invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr == null || methodsStr.indexOf(",") == -1){
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods = methodsStr.split(",");
                        for (String method : methods){
                            if (inv.getMethodName().equals(method)){
                                hasMethod = true;
                                break;
                            }
                        }
                    }
                    if (!hasMethod){
                        logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
                        return null;
                    }
                }
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                return invoker.invoke(inv);
            }
            throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
        }

        @Override
        public void received(Channel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                reply((ExchangeChannel) channel, message);
            } else {
                super.received(channel, message);
            }
        }

        @Override
        public void connected(Channel channel) throws RemotingException {
            invoke(channel, Constants.ON_CONNECT_KEY);
        }

        @Override
        public void disconnected(Channel channel) throws RemotingException {
            if(logger.isInfoEnabled()){
                logger.info("disconected from "+ channel.getRemoteAddress() + ",url:" + channel.getUrl());
            }
            invoke(channel, Constants.ON_DISCONNECT_KEY);
        }
        
        private void invoke(Channel channel, String methodKey) {
            Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
            if (invocation != null) {
                try {
                    received(channel, invocation);
                } catch (Throwable t) {
                    logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
                }
            }
        }

© 著作权归作者所有

上一篇: dubbo remoting(2)
下一篇: 线程池
hyssop
粉丝 20
博文 102
码字总数 111521
作品 0
昌平
程序员
私信 提问
分布式服务-DUBBOX(二):dubbo-amdin、dubbo-monitor-simple使用

1、本章内容 会用到Zookeeper(对服务注册和发现)、dubbo-admin、dubbo-monitor-simple部署 2、Zookeeper 在conf/zoo.cfg,其配置默认端口clientPort=2181。 在bin/zkServer.cmd,启动服务。...

warlockssss
2016/11/28
137
0
dubbo并发控制&连接控制&线程池控制

一、dubbo 并发控制: 1、服务端并发限制 限制 com.foo.BarService 的每个方法,服务器端并发执行(或占用线程池线程数)不能超过 10 个: <dubbo:service interface="com.foo.BarService" ...

满风
01/23
192
0
centos 7下安装配置dubbo-admin

1、安装好tomcat之后, 下载并且编译最新的dubbo-admin 如图: 1、修改WEB-INF中的dubbo.properties dubbo.registry.address=zookeeper://192.168.1.87:2181//zookeeper的地址 dubbo.admin.......

yunlielai
2017/04/28
0
0
185.dubbo 后台管理系统

效果及目的 效果: 目的:查看 管理服务 2. 启动要求 (1)项目是dubbo (2)jdk 1.7 (3) dubbo的war要与zookeeper在同一台服务上 3. 安装zookeeper 要求:先安装jdk (1.7的) https://my.os...

Lucky_Me
2018/09/13
161
0
dubbo rpc

dubbo中rpc通信算是重头戏。本文基于dubbo-rpc-api dubbo-rpc-default两个包重点了解下dubbo rpc的实现原理。 1、代码示例 既然要说原理,首先就是把实现了rpc的代码示例放在这里。 将其“肢...

hyssop
2016/08/19
384
1

没有更多内容

加载失败,请刷新页面

加载更多

手写RPC框架指北另送贴心注释代码一套

Angular8正式发布了,Java13再过几个月也要发布了,技术迭代这么快,框架的复杂度越来越大,但是原理是基本不变的。所以沉下心看清代码本质很重要,这次给大家带来的是手写RPC框架。 完整代码...

全菜工程师小辉
18分钟前
3
0
【Java】开发收货

简介 谨以此篇,记载开发过程中的一些tips。 编译器 【Shift + F6】可实现变量的联动修改。

Areya
35分钟前
5
0
DOM官方定义

DOM Document Object Model 文档对象模型 DOM的官方定义:W3C的DOM,可以使程序或者脚本(JS或AS\JScript),动态的访问或者操作文档的内容、结构、样式。 DOM只是一个标准,操作网页的标准。...

前端老手
40分钟前
6
0
IT兄弟连 HTML5教程 HTML5的学习线路图 第一阶段学习网页制作

学习HTML5技术可并不是简单学会几个新增的标签而已,HTML5现在可以说是前端所有技术的代名词。需要学习的语言和工具不仅多,对于刚接触他们的新人会感觉很乱。另外,前端开发也会细分很多个开...

老码农的一亩三分地
42分钟前
6
0
可见性有序性,Happens-before来搞定

写在前面 上一篇文章并发 Bug 之源有三,请睁大眼睛看清它们 谈到了可见性/原子性/有序性三个问题,这些问题通常违背我们的直觉和思考模式,也就导致了很多并发 Bug 为了解决 CPU,内存,IO ...

tan日拱一兵
57分钟前
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部