文档章节

dubbo remoting(2)

hyssop
 hyssop
发布于 2016/08/23 18:26
字数 1624
阅读 50
收藏 0

上文我们说了客户端和服务端如何通过自己的逻辑包装和处理实现类。本文将从类之间的关系继续梳理客户端和服务器端之间请求处理方面额逻辑。将包含以下几个方面
1、客户端获取连接 2、客户端的请求逻辑 3、服务端的处理客户端请求 #客户端获取连接 这个currentClient的由来是在获取invoker的时候我们获取了客户端实例

DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);

getClients方法会建立连接

initClient(URL url)
  client = Exchangers.connect(url ,requestHandler);

最终会找到HeaderExchanger的方法

  public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

在Transporters里面会使用url找到真正的实现类NettyTransporter

public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);
    }

NettyClent实例化如下

 public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException{
        super(url, wrapChannelHandler(url, handler));
    }

首先,NettyClient会调用父类AbstractClient的wrapChannelHandler(url, handler)。



  protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler){
        url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
        url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
        return ChannelHandlers.wrap(handler, url);
    }

ChannelHandlers最终会返回一个包装器渠道处理器。

 protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                                        .getAdaptiveExtension().dispatch(handler, url)));
    }

然后,nettyClient调用父类实例化方法,首先获得url中请求参数信息。然后调用 doOpen()、connect()方法建立dubbo连接。具体的实现逻辑在NettyClient的doOpen和doConnect方法中。之后的逻辑涉及到netty,这里不细说。

#客户端的请求逻辑 上文我们说到客户端首先要将我们请求的接口和url封装成一个invoker。 然后使用JavassistProxyFactory或者JdkProxyFactory将invoker做成代理。 接下来我们将继续讲解客户端那些事儿。 生成代理只是客户端数据准备的一部分。接下来客户端要想真正获得结果,还要继续储备数据。这些数据可以分为两类,请求处理需要的参数,和类方法调用的参数。 ##请求处理参数 请求包括很多种有远程请求,有本地请求。dubbo客户端首先是感应了请求类型做了一些处理。

  if (invoker.getUrl().hasParameter(Constants.STUB_KEY)) {
                        stub = serviceType.getName() + "Stub";
                    } else {
                        stub = serviceType.getName() + "Local";
                    }

不同类型 的请求会在请求接口类名称哪里加一个后缀。 接下来就是实例化代理了

     proxy = (T) constructor.newInstance(new Object[] {proxy});

然后在url里面添加了一些参数,这些参数就是为后期处理请求而准备的。

 if (url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT)){
                            url = url.addParameter(Constants.STUB_EVENT_METHODS_KEY, StringUtils.join(Wrapper.getWrapper(proxy.getClass()).getDeclaredMethodNames(), ","));
                            url = url.addParameter(Constants.IS_SERVER_KEY, Boolean.FALSE.toString());

这里很明显有一个是否是服务器请求的标志,is_server_key.这个字段被设置为false了。 这得注意的是强大的动态包装器又生成了代理类的实例,并将该实例额所有方法名称以逗号连接,并放到了dubbo.stub.event.methods字段中。 接下来就是调用以下抽象方法

  export(proxy, (Class)invoker.getInterface(), url);

该抽象方法使用ExtendLoader最终找到了具体的实现类(以dubbo协议类举例)

DubboProtocol
 <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

从该类的输入输出类型可以看出,该方法是将invoker转换为exporter的过程。dubboProtocol该方法的实现会判断该请求是否是服务器端请求,如果是,则会打开一次连接,如果不是则将exporter实例放置到容器中。

  exporterMap.put(key, exporter);

#客户端发送请求 当我们在客户端去请求类方法的时候客户端的请求过程如下: 首先,会从代理里面找到代理InvokerInvocationHandler,调用它的invoke方法。 然后会包装请求方法和参数为一个RpcInvocation实例

 invoker.invoke(new RpcInvocation(method, args)).recreate();

然后会走过滤器链,走每一个过滤器逻辑 首先是ProtocolFilterWrapper包装器,找到接下来的过滤器 然后是ConsumerContextFilter,将invoker、invocation等信息放置到上下文,将将invoker放置到invocation中。

 public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        RpcContext.getContext()
                .setInvoker(invoker)
                .setInvocation(invocation)
                .setLocalAddress(NetUtils.getLocalHost(), 0)
                .setRemoteAddress(invoker.getUrl().getHost(), 
                                  invoker.getUrl().getPort());
        if (invocation instanceof RpcInvocation) {
            ((RpcInvocation)invocation).setInvoker(invoker);
        }
        try {
            return invoker.invoke(invocation);
        } finally {
            RpcContext.getContext().clearAttachments();
        }
    }

最终通过AbstractInvoker调到了dubboInvoker中 dubboInvoker里面的doInvoke方法首先获得客户端,然后根据请求类型(同步、异步、本地调用)分别做处理。这里我们只交代同步请求。

(Result) currentClient.request(inv, timeout).get();

HeaderExchangeChannel将请求包装成request对象找到客户端通道发送数据。 (插图) #服务端处理请求 刚刚我们说客户端已经将数据发送给服务器。服务器端最终会获得该数据请求。这个处理逻辑在dubboProtocol中的requestHandler实例里。 在建立客户端和服务端的时候,都将其绑定进来

   server = Exchangers.bind(url, requestHandler);
   client = Exchangers.connect(url ,requestHandler);

所以客户端也能够像服务器端一样接受外来请求。 首先我们来看AllChannelHandler.它的received方法创建一个线程池,将获得到的请求包装成为ChannelEventRunnable任务进行处理

 public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

ChannelEnventRunnable的run方法对获得的通道事件分别做处理

 public void run() {
        switch (state) {
            case CONNECTED:
                try{
                    handler.connected(channel);
                }catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                }
                break;
            case DISCONNECTED:
                try{
                    handler.disconnected(channel);
                }catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                }
                break;
            case SENT:
                try{
                    handler.sent(channel,message);
                }catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                            + ", message is "+ message,e);
                }
                break;
            case RECEIVED:
                try{
                    handler.received(channel, message);
                }catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                            + ", message is "+ message,e);
                }
                break;
            case CAUGHT:
                try{
                    handler.caught(channel, exception);
                }catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is "+ channel
                            + ", message is: " + message + ", exception is " + exception,e);
                }
                break;
            default:
                logger.warn("unknown state: " + state + ", message is " + message);
        }
    }

这里面的handler类型为HeaderExchangeHandler,它会根据channel获取的信息类型是request还是response分别做处理

 public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            if (message instanceof Request) {
                // handle request.
                Request request = (Request) message;
                if (request.isEvent()) {
                    handlerEvent(channel, request);
                } else {
                    if (request.isTwoWay()) {
                        Response response = handleRequest(exchangeChannel, request);
                        channel.send(response);
                    } else {
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            } else if (message instanceof Response) {
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
                if (isClientSide(channel)) {
                    Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                    logger.error(e.getMessage(), e);
                } else {
                    String echo = handler.telnet(channel, (String) message);
                    if (echo != null && echo.length() > 0) {
                        channel.send(echo);
                    }
                }
            } else {
                handler.received(exchangeChannel, message);
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }

如果是response类型的数据,则获得结果。 如果是request类型的数据,则调用requestHanler里面的reply方法处理请求。

欢迎加入qq群: 122567875 欢迎关注微信公众号:hyssop的后花园

© 著作权归作者所有

上一篇: dubbo remoting(3)
下一篇: dubbo 之remotting (1)
hyssop
粉丝 20
博文 102
码字总数 111521
作品 0
昌平
程序员
私信 提问
dubbo源码学习笔记----整体结构

dubbo核心包 config dubbo-config-api dubbo-config-spring remoting dubbo-remoting-netty dubbo-remoting-netty4 dubbo-remoting-mina dubbo-remoting-grizzly dubbo-remoting-p2p dubbo-......

春哥大魔王的博客
2018/01/13
274
3
聊聊dubbo的ExecutionDispatcher

序 本文主要研究一下dubbo的ExecutionDispatcher ExecutionDispatcher dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/exec......

go4it
09/02
22
0
dubbo Servlet Bridge Server时同时支持hessian和webservice

原生的bubbo在发布hessian和webservice时使用了嵌入jetty的方式开启了两个端口。 而考虑到我们实际的情况,需要使用weblogic发布,并且是使用同一个端口对外提供服务。 所以我们要扩展dubbo...

linan
2015/10/13
860
0
聊聊dubbo的AllDispatcher

序 本文主要研究一下dubbo的AllDispatcher Dispatcher dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Dispatcher.java Dispatcher接口定义了dis......

go4it
08/31
29
0
Dubbo 升级扩展 --Dubbo-G

Dubbo-G 详细介绍 Dubbo是一个被国内很多互联网公司广泛使用的开源分布式服务框架,即使从国际视野来看应该也是一个非常全面的SOA基础框架。作为一个重要的技术研究课题,在联想电商我们根据...

技术专家
2017/05/26
8.5K
23

没有更多内容

加载失败,请刷新页面

加载更多

利用CSS禁止手机长按出现气泡: 复制、选择等功能

可以用 * ,也可作用于一个div div{  -webkit-touch-callout:none;  /*系统默认菜单被禁用*/  -webkit-user-select:none; /*webkit浏览器*/  -khtml-user-select:none; /*早期浏览...

蓝小驴
54分钟前
9
0
前端的一些雕虫小技,从100%和滚动条说起

1、100%和滚动条 当我们在css中把html和body同时设为100%时,会出现滚动条 html, body { width: 100%; height: 100%; } 原因是html和b...

wphmoon
今天
8
0
电力区块链应用案例【2019】

随着区块链技术的日益普及,出现了大量创业企业尝试使用区块链技术来解决能源与电力行业中存在的问题。在本文中,我们将介绍其中的三个能源区块链项目。 能源行业以价格不透明著称:消费者很...

汇智网教程
今天
12
0
聊聊rocketmq的adjustThreadPoolNumsThreshold

序 本文主要研究一下rocketmq的adjustThreadPoolNumsThreshold DefaultMQPushConsumer rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.ja......

go4it
今天
10
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部