Dubbo服务暴露与注册

原创
2019/06/17 06:43
阅读数 3.7K

        前面的文章中,我们讲解了Dubbo是如何进行配置的属性的初始化的,并且讲到,Dubbo最终会将所有的属性参数都封装为一个URL对象,从而以这个URL对象为基准传递参数。本文则主要讲解Dubbo是如何基于URL对象进行服务的暴露与注册的。

        首先需要说明的一点是,服务的暴露与注册是两个不同的概念。在Dubbo中,微服务之间的交互默认是通过Netty进行的,而服务之间的通信是基于TCP以全双工的方式进行的。那么也就是说,每个服务都会存在一个ip和port。所谓的服务暴露就是指根据配置将当前服务使用Netty绑定一个本地的端口号(对于消费者而言,则是尝试连接目标服务的ip和端口)。至于注册,由于微服务架构中对于新添加的服务,需要一定的机制来通知消费者,有新的服务可用,或者对于某些下线的服务,也需要通知消费者,将这个已经下线的服务给移除。Dubbo中服务的注册与发现默认是委托给zookeeper来进行的。本文主要讲解服务的暴露与注册的整体实现结构,至于服务暴露和注册时所需要注意的详细细节,则在后面的文章中进行讲解。

1. 服务的暴露

        服务的暴露的入口位置主要在RegistryProtocol.export()方法中,该方法首先会进行服务的暴露,然后会进行服务的注册。如下是该方法的源码:

@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
  // 获取服务注册相关的配置数据
  URL registryUrl = getRegistryUrl(originInvoker);
  // 获取provider相关的配置数据
  URL providerUrl = getProviderUrl(originInvoker);

  // 对provider的部分配置信息进行覆盖,重写的工作主要是委托给Configurator进行,
  // 这里OverrideListener的作用主要是在当前服务的配置信息发生更改时,对原有的配置进行重写,
  // 并且会判断是否需要对当前的服务进行重新暴露
  final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
  final OverrideListener overrideSubscribeListener = 
    new OverrideListener(overrideSubscribeUrl, originInvoker);
  overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
  providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
  
  // 进行服务的本地暴露,本质上就是根据配置使用Netty绑定本地的某个端口,从而完成服务暴露工作
  final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

  // 根据配置获取对应的Registry对象,常见的有ZookeeperRegistry和RedisRegistry,默认使用的是
  // ZookeeperRegistry,本文则以Zookeeper为例进行讲解
  final Registry registry = getRegistry(originInvoker);
  final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
  // 将当前的Invoker对象注册到一个全局的providerInvokers中进行缓存,
  // 该Map对象保存了所有的已经暴露了的服务
  ProviderInvokerWrapper<T> providerInvokerWrapper = 
    ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, 
      registeredProviderUrl);

  // 除非主动配置不进行注册,那么这里将会返回true
  boolean register = registeredProviderUrl.getParameter("register", true);
  if (register) {
    // 进行服务注册的代码,主要是通过Zookeeper的客户端CuratorFramework进行服务的注册
    register(registryUrl, registeredProviderUrl);
    // 将当前Invoker标识为已经注册完成
    providerInvokerWrapper.setReg(true);
  }

  // 注册配置被更改的监听事件,将配置被更改时将会触发相应的listener
  registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

  // 设置相关的URL对象,并且使用DestroyableExporter对exporter进行封装返回
  exporter.setRegisterUrl(registeredProviderUrl);
  exporter.setSubscribeUrl(overrideSubscribeUrl);
  return new DestroyableExporter<>(exporter);
}

        上面的代码中,主要完成了三部分的工作:

  • 将服务与本地的某个端口号进行绑定,从而实现服务暴露的功能;
  • 根据配置得到一个服务注册对象Registry,然后对其进行注册;
  • 创建一个配置被重写的监听器,并且注册该监听器,从而实现配置被重写时能够动态的使用新的配置进行服务的配置。

        对于服务的暴露,主要是在doLocalExport()方法中,我们继续阅读其源码:

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, 
      URL providerUrl) {
  // 获取当前Invoker对应的key,默认为group/interface/version的格式
  String key = getCacheKey(originInvoker);

  // 这一段代码看起来比较复杂,其实本质上还是protocol.export()方法的调用,该方法就是进行服务暴露的代码,
  // 而ExporterChangeableWrapper的主要作用则是进行unexport()时的一些清理工作
  return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
    Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
    return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), 
        originInvoker);
  });
}

        doLocalExport()方法的实现比较简单,主要的导出工作还是委托给了protocol.export()方法进行,这里的protocol的类型为DubboProtocol,这里我们直接看其export()方法:

@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
  URL url = invoker.getUrl();
  
  String key = serviceKey(url);
  DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
  exporterMap.put(key, exporter);

  // 这里主要是构建Stub的事件分发器,该分发器用于在消费者端进行Stub事件的分发
  Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, 
      Constants.DEFAULT_STUB_EVENT);
  Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
  if (isStubSupportEvent && !isCallbackservice) {
    String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
    if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
      if (logger.isWarnEnabled()) {
        logger.warn(new IllegalStateException("consumer [" 
            + url.getParameter(Constants.INTERFACE_KEY) 
            + "], has set stubproxy support event ,but no stub methods founded."));
      }
    } else {
      stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
    }
  }

  // 开启服务
  openServer(url);
  // 该方法的主要作用是对序列化进行优化,其会获取配置的实现了SerializationOptimizer接口的配置类,
  // 然后通过其getSerializableClasses()方法获取序列化类,通过这些类来进行序列化的优化
  optimizeSerialization(url);

  return exporter;
}

        export()方法主要做了三件事:a. 注册stub事件分发器;b. 开启服务;c. 注册序列化优化器类。这里openServer()方法是用于开启服务的,我们继续阅读其源码:

private void openServer(URL url) {
  String key = url.getAddress();
  boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
  // 这里采用双检查法来判断对应于当前服务的server是否已经创建,如果没有创建,
  // 则创建一个新的,并且缓存起来
  if (isServer) {
    ExchangeServer server = serverMap.get(key);
    if (server == null) {
      synchronized (this) {
        server = serverMap.get(key);
        if (server == null) {
          // 创建并缓存新服务
          serverMap.put(key, createServer(url));
        }
      }
    } else {
      server.reset(url);
    }
  }
}

private ExchangeServer createServer(URL url) {
  url = URLBuilder.from(url)
    .addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
    .addParameterIfAbsent(Constants.HEARTBEAT_KEY, 
         String.valueOf(Constants.DEFAULT_HEARTBEAT))
    .addParameter(Constants.CODEC_KEY, DubboCodec.NAME)
    .build();
  
  // 获取所使用的server类型,默认为netty
  String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
  if (str != null && str.length() > 0 
      && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
    throw new RpcException("Unsupported server type: " + str + ", url: " + url);
  }

  // 通过Exchangers.bind()方法进行服务的绑定
  ExchangeServer server;
  try {
    server = Exchangers.bind(url, requestHandler);
  } catch (RemotingException e) {
    throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
  }

  // 获取client参数所指定的值,该值指定了当前client所使用的传输层服务,比如netty或mina。
  // 然后判断当前SPI所提供的传输层服务是否包含所指定的服务类型,如果不包含,则抛出异常
  str = url.getParameter(Constants.CLIENT_KEY);
  if (str != null && str.length() > 0) {
    Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class)
      .getSupportedExtensions();
    if (!supportedTypes.contains(str)) {
      throw new RpcException("Unsupported client type: " + str);
    }
  }

  return server;
}

        上面的代码主要是创建ExchangeServer的,使用双检查来检测是否已经存在了对应的服务,如果不存在,则通过Exchangers.bind()方法进行创建。这里最终会将bind()方法的调用委托给HeaderExchanger.bind()方法进行。需要注意的是,上面的代码中传入了一个requestHandler的参数,这是一个ExchangeHandler类型的对象,其主要作用是获取并且调用Invoker,以得到最终的调用结果,这些Handler的作用,我们将在后面的文章中进行讲解,本文主要讲解服务的暴露与注册的过程。下面我们继续阅读HeaderExchanger.bind()方法的源码:

@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
  return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(
    new HeaderExchangeHandler(handler))));
}

        这里的bind()方法主要是创建了三个Handler,并且最后一个Handler将传入的ExchangeHandler包裹起来了。相信读者朋友应该很快就能认识到,这里使用的是责任链模式,这几个handler通过统一的构造函数将下一个handler的实例注入到当前handler中。其实我们也就能够理解,最终通过netty进行的调用过程就是基于这些责任链的。这里我们主要看Transporters.bind()方法的实现原理:

public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
  if (url == null) {
    throw new IllegalArgumentException("url == null");
  }
  if (handlers == null || handlers.length == 0) {
    throw new IllegalArgumentException("handlers == null");
  }
  
  // 判断传入的Handler是否只有一个,如果只有一个,则直接使用该handler,如果存在多个,
  // 则使用ChannelHandlerDispatcher将这些handler包裹起来进行分发
  ChannelHandler handler;
  if (handlers.length == 1) {
    handler = handlers[0];
  } else {
    handler = new ChannelHandlerDispatcher(handlers);
  }
  
  // 通过配置指定的Transporter进行服务的绑定,这里默认使用的是NettyTransporter
  return getTransporter().bind(url, handler);
}

// NettyTransporter
@Override
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
  // 在NettyTransporter中进行服务绑定时,其只是创建了一个NettyServer以返回,但实际上在创建该对象的
  // 过程中,就完成了Netty服务的绑定。需要注意的是,这里的NettyServer并不是Netty所提供的类,而是
  // Dubbo自己封装的一个服务类,其对Netty的服务进行了封装
  return new NettyServer(url, listener);
}

        Transporters.bind()方法主要是将服务的绑定过程交由NettyTransporter进行,而其则是创建了一个NettyServer对象,真正的绑定过程就在创建该对象的过程中。下面我们来看其创建的源码:

// AbstractServer
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
  super(url, handler);
  localAddress = getUrl().toInetSocketAddress();

  // 获取绑定的ip和端口号等信息
  String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
  int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
  if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) 
  {
    bindIp = Constants.ANYHOST_VALUE;
  }
  
  // 在本地绑定指定的ip和端口
  bindAddress = new InetSocketAddress(bindIp, bindPort);
  this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
  this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, 
      Constants.DEFAULT_IDLE_TIMEOUT);
  try {
    // 通过创建的InetSocketAddress对象,将真正的绑定过程交由子类进行
    doOpen();
    if (logger.isInfoEnabled()) {
      logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() 
          + ", export " + getLocalAddress());
    }
  } catch (Throwable t) {
    throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " 
        + getClass().getSimpleName() + " on " + getLocalAddress() + ", cause: " 
        + t.getMessage(), t);
  }

  // 这里的DataStore只是一个本地缓存的数据仓库,主要是对一些大对象进行缓存
  DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class)
      .getDefaultExtension();
  executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, 
      Integer.toString(url.getPort()));
}

// NettyServer
@Override
protected void doOpen() throws Throwable {
  bootstrap = new ServerBootstrap();

  // 这里就进入了创建netty服务的过程,bossGroup指定的线程数为1,因为只有一个channel用于接收客户端请求,
  // 而workerGroup线程数则指定为配置文件所设置的线程数,这些线程主要用于进行请求的处理
  bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
  workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(
    Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
    new DefaultThreadFactory("NettyServerWorker", true));

  // 创建NettyServerHandler,这个handler就是用于处理请求用的handler,但是前面我们也讲到了,
  // Dubbo使用了一个handler的责任链来进行消息的处理,第二个参数this就是这个链的链头。需要注意的是,
  // Netty本身提供的责任链与Dubbo这里使用的责任链是不同的,Dubbo只是使用了Netty的链的一个节点来
  // 处理Dubbo所创建的链,这样Dubbo的链其实是可以在多种服务复用的,比如Mina
  final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
  channels = nettyServerHandler.getChannels();

  // 这里是标准的创建Netty的BootstrapServer的过程
  bootstrap.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
    .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
      @Override
      protected void initChannel(NioSocketChannel ch) throws Exception {
        int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), 
            NettyServer.this);
        ch.pipeline()
          // 添加用于解码的handler
          .addLast("decoder", adapter.getDecoder())
          // 添加用于编码的handler
          .addLast("encoder", adapter.getEncoder())
          // 添加用于进行心跳监测的handler
          .addLast("server-idle-handler", new IdleStateHandler(0, 0, 
              idleTimeout, MILLISECONDS))
          // 将处理请求的handler添加到pipeline中
          .addLast("handler", nettyServerHandler);
      }
    });

  // 进行服务的绑定
  ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
  channelFuture.syncUninterruptibly();
  channel = channelFuture.channel();

}

        上面的代码就是一个标准的使用Netty进行服务绑定的代码,关于Netty的使用,读者朋友可以阅读Netty Reactor模式实现原理详解

2. 服务的注册

        对于服务的注册,前面我们已经讲到,入口主要在RegistryProtocol.export()方法中,而调用入口则是通过其register()方法进行的,这里我们来看一下该方法的调用过程:

public void register(URL registryUrl, URL registeredProviderUrl) {
  // 通过RegistryFactory获取一个Registry对象,该对象的主要作用是进行服务的注册,
  // 这里默认返回的是ZookeeperRegistry
  Registry registry = registryFactory.getRegistry(registryUrl);
  registry.register(registeredProviderUrl);
}

        这里主要是根据配置获取一个Registry对象,我们继续阅读其register()方法的源码:

// FailbackRegistry
@Override
public void register(URL url) {
  // 将当前URL对象保存到已注册的URL对象列表中
  super.register(url);
  // 移除之前注册失败的记录
  removeFailedRegistered(url);
  removeFailedUnregistered(url);
  try {
    // 将真正的注册过程委托给ZookeeperRegistry进行
    doRegister(url);
  } catch (Exception e) {
    Throwable t = e;

    // 下面的过程主要是在注册失败的情况下,将当前URL添加到注册失败的URL列表中
    boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
      && url.getParameter(Constants.CHECK_KEY, true)
      && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
    boolean skipFailback = t instanceof SkipFailbackWrapperException;
    if (check || skipFailback) {
      if (skipFailback) {
        t = t.getCause();
      }
      throw new IllegalStateException("Failed to register " + url + " to registry " 
          + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
    } else {
      logger.error("Failed to register " + url + ", waiting for retry, cause: " 
          + t.getMessage(), t);
    }

    // 将当前URL添加到注册失败的URL列表中
    addFailedRegistered(url);
  }
}

// ZookeeperRegistry
@Override
public void doRegister(URL url) {
  try {
    // 这里是真正的注册过程。需要注意的是这里的zkClient类型为ZookeeperClient,其是Dubbo对
    // 真正使用的CuratorFramework的一个封装
    zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
  } catch (Throwable e) {
    throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() 
        + ", cause: " + e.getMessage(), e);
  }
}

        上面的代码中首先会对一些缓存数据进行清理,并且将当前URL添加到注册的URL列表中,然后将注册过程委托给ZookeeperClient进行。下面我们来看其是如何进行注册的:

@Override
public void create(String path, boolean ephemeral) {
  // 判断创建的是否为临时节点,如果不是临时节点,则判断是否已经存在该节点,如果存在,则直接返回
  if (!ephemeral) {
    if (checkExists(path)) {
      return;
    }
  }
  
  // 对path进行截取,因为最后一个"/"后面是被编码的URL对象,前面则是serviceKey + category
  // 这里的category指定的是provider还是consumer
  int i = path.lastIndexOf('/');
  if (i > 0) {
    // 创建节点,需要注意的是,这里的create()方法进行的是递归调用,这是因为zookeeper创建节点时
    // 只能一级一级的创建,因而其每次都是取"/"前面的一部分来创建,只有当前节点已经存在的情况下,
    // 上面的checkExists()才会为true,而且这里,由于zookeeper规定,除了叶节点以外,其余所有的
    // 节点都必须为非临时节点,因而这里第二个参数传入的是false,这也是前面的if判断能通过的原因
    create(path.substring(0, i), false);
  }
  
  if (ephemeral) {
    // 创建临时节点,具体的创建工作交由子类进行,也就是下面的代码
    createEphemeral(path);
  } else {
    // 创建持久节点,具体的创建工作交由子类进行,也就是下面的代码
    createPersistent(path);
  }
}
@Override
public void createEphemeral(String path) {
  try {
    // 将临时节点的创建工作交由CuratorFramework进行
    client.create().withMode(CreateMode.EPHEMERAL).forPath(path);
  } catch (NodeExistsException e) {
  } catch (Exception e) {
    throw new IllegalStateException(e.getMessage(), e);
  }
}
@Override
public void createPersistent(String path) {
  try {
    // 将持久节点的创建工作交由CuratorFramework进行
    client.create().forPath(path);
  } catch (NodeExistsException e) {
  } catch (Exception e) {
    throw new IllegalStateException(e.getMessage(), e);
  }
}

3. 小结

        本文主要讲解了Dubbo在导出服务时是如何进行服务暴露与注册的,并且具体讲解了如何基于netty进行服务的暴露,和如何基于zookeeper进行服务的注册。

4. 广告

       读者朋友如果觉得本文还不错,可以点击下面的广告链接,这可以为作者带来一定的收入,从而激励作者创作更好的文章,非常感谢!

在项目开发过程中,企业会有很多的任务、需求、缺陷等需要进行管理,CORNERSTONE 提供敏捷、任务、需求、缺陷、测试管理、WIKI、共享文件和日历等功能模块,帮助企业完成团队协作和敏捷开发中的项目管理需求;更有甘特图、看板、思维导图、燃尽图等多维度视图,帮助企业全面把控项目情况。

展开阅读全文
加载中
点击加入讨论🔥(1) 发布并加入讨论🔥
打赏
1 评论
12 收藏
1
分享
返回顶部
顶部