文档章节

AsyncHttpClient源码分析-基于Netty的连接池实现

 墨径_猿猴
发布于 2017/04/06 23:46
字数 4717
阅读 492
收藏 38
点赞 2
评论 0

原文地址:asynchttpclient源码分析-基于Netty的连接池实现


最近项目重构,有了个机会更多接触一个有别于HttpAsyncClient的异步网络框架AsyncHttpClient,是个基于Netty的支持Http(s)或WebSocket协议的客户端。这东西有很多有趣的地方,特别是连接池的实现简单易懂,没有apache.hc的连接池实现那么蛋疼。如果想深入了解Netty用法的话,这是个不错的案例,很值得深究!


这里暂且仅关注连接池的实现,部分涉及Netty的channel输入输出处理、哈希轮定时器算法、事件轮询方式的区别,又或者信号量的使用等等,以后有机会会单独拿出来详解。

由于是基Netty的实现的,所以连接池实际上就是对channel的管理控制,有趣的是整个管理只用到了信号量+一个定时检测器,略微复杂的也就定时检测的逻辑,其实现方式简单且很好理解,不像httpclient里各种队列各种信号量难以理解。

先上一个简单的例子,事实上使用起来也不复杂。

public class HttpTest {
    static AsyncHttpClient asyncHttpClient = Dsl
            //实例化所有池和检测器
            .asyncHttpClient(
                    Dsl.config()
                    .setMaxConnections(500)
                    .setMaxConnectionsPerHost(50)
                    .setPooledConnectionIdleTimeout(6000)
                    .setConnectionTtl(500)
                    .setIoThreadsCount(100)
                    .setConnectTimeout(60000)
                    .setUseNativeTransport(
                            System.getProperty("os.name").toLowerCase().indexOf("linux") > 0));

    public static void main(String[] args) throws Exception {
        List<Param> params = new ArrayList<>();
        params.add(new Param("keyfrom", "XXX"));

        asyncHttpClient
                .prepareGet("http://fanyi.youdao.com/openapi.do")
                .addQueryParams(params)
                //这里进入发送请求阶段
                .execute()
                .toCompletableFuture()
                //超时报错,或请求异常,做容错处理,抛出一个Response
                .exceptionally(t -> {
                    return new Response() {...};
                })
                .thenAccept(rep -> System.out.println("RESPONSE BODY" + rep.getResponseBody()));
    }
}

先看看DefaultAsyncHttpClientConfig类的配置参数,这里只列出本文所需要的参数。有一点值得提一下,如果想了解Java怎么像clojure或者scala一样创建不可变对象,可以看看这个类的写法。

// timeouts
//连接超时
private final int connectTimeout;
//请求超时
private final int requestTimeout;
//读取超时,含于请求时间
private final int readTimeout;
//关闭Client前的静默时间
private final int shutdownQuietPeriod;
//关闭超时
private final int shutdownTimeout;

// keep-alive
private final boolean keepAlive;
//连接池空闲时间
private final int pooledConnectionIdleTimeout;
//定时清理空闲连接的时间
private final int connectionPoolCleanerPeriod;
//连接存活时间
private final int connectionTtl;
//最大连接数
private final int maxConnections;
//每个路由的最大连接数
private final int maxConnectionsPerHost;
//用于channel超时处理
private final ChannelPool channelPool;
private final KeepAliveStrategy keepAliveStrategy;


// internals
private final String threadPoolName;
private final int httpClientCodecMaxInitialLineLength;
private final int httpClientCodecMaxHeaderSize;
private final int httpClientCodecMaxChunkSize;
private final int chunkedFileChunkSize;
private final int webSocketMaxBufferSize;
private final int webSocketMaxFrameSize;
private final Map<ChannelOption<Object>, Object> channelOptions;
//时间轮询组类型
private final EventLoopGroup eventLoopGroup;
//是否用epoll,仅linux系统支持
private final boolean useNativeTransport;
//用于Timeout处理,建议用默认Netty的HashedWheelTimer
private final Timer nettyTimer;
private final ThreadFactory threadFactory;
private final AdditionalChannelInitializer httpAdditionalChannelInitializer;
private final AdditionalChannelInitializer wsAdditionalChannelInitializer;
private final ResponseBodyPartFactory responseBodyPartFactory;
//其实就是EventLoopGroup指定的线程数
private final int ioThreadsCount;

就从这里开始,开头主要实例化ChannelManagerNettyRequestSender以及Timer三个重要组件,NettyRequestSender用于发送请求以及向ChannelManager索取channel使用权,Timer则负责另外两个组件给他的检测任务。

public final class Dsl {
  public static AsyncHttpClient asyncHttpClient(DefaultAsyncHttpClientConfig.Builder configBuilder) {
    //默认客户端
    return new DefaultAsyncHttpClient(configBuilder.build());
  }
  //...
  //...
}

public class DefaultAsyncHttpClient implements AsyncHttpClient {
  private final AsyncHttpClientConfig config;
  private final AtomicBoolean closed = new AtomicBoolean(false);
  //Netty初始化的起点,Bootstrap与Channel池管理都在这里声明
  private final ChannelManager channelManager;
  //发送请求,以及向ChannelManager索取channel的使用权
  private final NettyRequestSender requestSender;
  private final boolean allowStopNettyTimer;
  //定时做超时处理
  private final Timer nettyTimer;

  public DefaultAsyncHttpClient(AsyncHttpClientConfig config) {

    this.config = config;

    allowStopNettyTimer = config.getNettyTimer() == null;
    //默认使用HashedWheelTimer
    nettyTimer = allowStopNettyTimer ? newNettyTimer() :config.getNettyTimer();
    //下面两个是重点!!!
    channelManager = new ChannelManager(config, nettyTimer);
    requestSender = new NettyRequestSender(config, channelManager,nettyTimer, new AsyncHttpClientState(closed));
    //给Bootstraps配置解析器,以及处理接收服务端发送的消息的处理器
    channelManager.configureBootstraps(requestSender);
  }
  private Timer newNettyTimer() {
    HashedWheelTimer timer = new HashedWheelTimer();
    timer.start();
    return timer;
  }
}

这里是重中之重,网络配置、连接池、IO线程池、轮询方式都是在这儿产生的。

public class ChannelManager {
  private final AsyncHttpClientConfig config;
  private final EventLoopGroup eventLoopGroup;
  private final boolean allowReleaseEventLoopGroup;
  private final Bootstrap httpBootstrap;
  private final Bootstrap wsBootstrap;
  private final long handshakeTimeout;
  private final IOException tooManyConnections;
  private final IOException tooManyConnectionsPerHost;

  //定时清理不符合标准的channel
  private final ChannelPool channelPool;
  //netty自带的用于管理channel的管理器
  private final ChannelGroup openChannels;
  private final ConcurrentHashMap<Channel, Object> channelId2PartitionKey = new ConcurrentHashMap<>();
  //是否开启最大总连接数
  private final boolean maxTotalConnectionsEnabled;
  //最大连接数
  private final Semaphore freeChannels;
  //是否开启每个路由最大连接数
  private final boolean maxConnectionsPerHostEnabled;
  //每个路由最大连接数
  private final ConcurrentHashMap<Object, Semaphore> freeChannelsPerHost = new ConcurrentHashMap<>();

  private AsyncHttpClientHandler wsHandler;

  public ChannelManager(final AsyncHttpClientConfig config, Timer nettyTimer) {

    this.config = config;
    //忽略一小段关于ssl的
    //ChannelPool是用于检测已经实例化的channel的健康状况,如果不合格会直接close掉
    ChannelPool channelPool = config.getChannelPool();
    if (channelPool == null) {
        if (config.isKeepAlive()) {
            //这是默认使用的,事实上多数场景不需要我们自己实现
            channelPool = new DefaultChannelPool(config, nettyTimer);
        } else {
            channelPool = NoopChannelPool.INSTANCE;
        }
    }
    this.channelPool = channelPool;

    tooManyConnections = trimStackTrace(new TooManyConnectionsException(config.getMaxConnections()));
    tooManyConnectionsPerHost = trimStackTrace(new TooManyConnectionsPerHostException(config.getMaxConnectionsPerHost()));
    maxTotalConnectionsEnabled = config.getMaxConnections() > 0;
    maxConnectionsPerHostEnabled = config.getMaxConnectionsPerHost() > 0;

    if (maxTotalConnectionsEnabled || maxConnectionsPerHostEnabled) {
        //管理已经被实例化的channel
        openChannels = new DefaultChannelGroup("asyncHttpClient", GlobalEventExecutor.INSTANCE) {
            //重写删除方法,因为删除channel时,需要释放信号量
            @Override
            public boolean remove(Object o) {
                boolean removed = super.remove(o);
                if (removed) {
                    //释放总连接池的信号量
                    if (maxTotalConnectionsEnabled)
                        freeChannels.release();
                    //释放路由连接池的信号量
                    if (maxConnectionsPerHostEnabled) {
                        Object partitionKey = channelId2PartitionKey.remove(Channel.class.cast(o));
                        if (partitionKey != null) {
                            Semaphore hostFreeChannels = freeChannelsPerHost.get(partitionKey);
                            if (hostFreeChannels != null)
                                hostFreeChannels.release();
                        }
                    }
                }
                return removed;
            }
        };
        //信号量数为最大连接数
        freeChannels = new Semaphore(config.getMaxConnections());
    } else {
        openChannels = new DefaultChannelGroup("asyncHttpClient", GlobalEventExecutor.INSTANCE);
        freeChannels = null;
    }

    handshakeTimeout = config.getHandshakeTimeout();

    ThreadFactory threadFactory = config.getThreadFactory() != null ? config.getThreadFactory() : new DefaultThreadFactory(config.getThreadPoolName());
    allowReleaseEventLoopGroup = config.getEventLoopGroup() == null;
    ChannelFactory<? extends Channel> channelFactory;
    if (allowReleaseEventLoopGroup) {
        //这个只能在linux下使用
        if (config.isUseNativeTransport()) {
            eventLoopGroup = newEpollEventLoopGroup(config.getIoThreadsCount(), threadFactory);
            channelFactory = getEpollSocketChannelFactory();
        } else {
            //通常默认走这个!
            eventLoopGroup = new NioEventLoopGroup(config.getIoThreadsCount(), //瞧!IO线程数就是时间轮询的线程数
              threadFactory);
            channelFactory = NioSocketChannelFactory.INSTANCE;
        }
    } else {
        //...
    }
    //用于http请求的bootstrap
    httpBootstrap = newBootstrap(channelFactory, eventLoopGroup, config);
    //用于WebSocket请求的bootstrap
    wsBootstrap = newBootstrap(channelFactory, eventLoopGroup, config);

    httpBootstrap.option(ChannelOption.AUTO_READ, false);
  }
}

实例化完ChannelManager后,就轮到请求发送器,这里先看看所需要的参数,具体执行的方法在后面说。

public final class NettyRequestSender {
  private final AsyncHttpClientConfig config;
  private final ChannelManager channelManager;
  private final Timer nettyTimer;
  private final AsyncHttpClientState clientState;
  private final NettyRequestFactory requestFactory;

  public NettyRequestSender(AsyncHttpClientConfig config,//
            ChannelManager channelManager,//
            Timer nettyTimer,//
            AsyncHttpClientState clientState) {
      this.config = config;
      this.channelManager = channelManager;
      this.nettyTimer = nettyTimer;
      this.clientState = clientState;
      requestFactory = new NettyRequestFactory(config);
  }
}

再回来看看ChannelManager构造方法中使用的工厂方法newBootstrap(channelFactory, eventLoopGroup, config),这是支持整个ahc运作的代码,对于用Netty实现网络客户端来说,这个配置很有参考价值,所以也贴上来一起观赏!

private Bootstrap newBootstrap(ChannelFactory<? extends Channel> channelFactory, EventLoopGroup eventLoopGroup, AsyncHttpClientConfig config) {
    @SuppressWarnings("deprecation")
    Bootstrap bootstrap = new Bootstrap().channelFactory(channelFactory)
            //客户端只有worker线程池,ServerBootstrap则需要boss和worker
            .group(eventLoopGroup)
            //设置内存分配器,我的理解是关于堆内存模型的,可用于对Netty的优化
            .option(ChannelOption.ALLOCATOR, config.getAllocator() != null ? config.getAllocator() : ByteBufAllocator.DEFAULT)
            //是否使用tcp的Nagle算法,文件传输可以选择使用
            .option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay())
            //重复使用本地地址端口
            .option(ChannelOption.SO_REUSEADDR, config.isSoReuseAddress())//
            .option(ChannelOption.AUTO_CLOSE, false);
    if (config.getConnectTimeout() > 0) {
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout());
    }
    if (config.getSoLinger() >= 0) {
        //就是一个设置延迟关闭时间的参数,用于保证数据发送完成
        bootstrap.option(ChannelOption.SO_LINGER, config.getSoLinger());
    }
    if (config.getSoSndBuf() >= 0) {
        bootstrap.option(ChannelOption.SO_SNDBUF, config.getSoSndBuf());
    }
    if (config.getSoRcvBuf() >= 0) {
        bootstrap.option(ChannelOption.SO_RCVBUF, config.getSoRcvBuf());
    }
    //自定义配置
    for (Entry<ChannelOption<Object>, Object> entry : config.getChannelOptions().entrySet()) {
        bootstrap.option(entry.getKey(), entry.getValue());
    }
    return bootstrap;
}

//下面则是管道的配置
public void configureBootstraps(NettyRequestSender requestSender) {
    //ahc自定义的ChannelInboundHandler,异步方式获取服务端返回的数据
    //我们自己获取数据后的核心业务逻辑,也在这里开始
    final AsyncHttpClientHandler httpHandler = new HttpHandler(config, this, requestSender);
    wsHandler = new WebSocketHandler(config, this, requestSender);
    final NoopHandler pinnedEntry = new NoopHandler();

    httpBootstrap.handler(new ChannelInitializer<Channel>() {
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline()//
                    .addLast(PINNED_ENTRY, pinnedEntry)//
                    .addLast(HTTP_CLIENT_CODEC, newHttpClientCodec())//
                    .addLast(INFLATER_HANDLER, newHttpContentDecompressor())//
                    .addLast(CHUNKED_WRITER_HANDLER, new ChunkedWriteHandler())//
                    .addLast(AHC_HTTP_HANDLER, httpHandler);
            if (config.getHttpAdditionalChannelInitializer() != null)
                config.getHttpAdditionalChannelInitializer().initChannel(ch);
        }
    });

    wsBootstrap.handler(new ChannelInitializer<Channel>() {
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline()//
                    .addLast(PINNED_ENTRY, pinnedEntry)//
                    .addLast(HTTP_CLIENT_CODEC, newHttpClientCodec())//
                    .addLast(AHC_WS_HANDLER, wsHandler);
            if (config.getWsAdditionalChannelInitializer() != null)
                config.getWsAdditionalChannelInitializer().initChannel(ch);
        }
    });
}

一切工作准备就绪,现在可以请求了!怎么构建请求就不打算讲了,可以自行阅读RequestBuilderBase类。执行execute()方法,正式开始请求,往下看DefaultAsyncHttpClient.executeRequest()怎么创建连接的。

public class BoundRequestBuilder extends RequestBuilderBase<BoundRequestBuilder> {
  private final AsyncHttpClient client;
  public ListenableFuture<Response> execute() {
      return client.executeRequest(build(), new AsyncCompletionHandlerBase());
  }
}

public class DefaultAsyncHttpClient implements AsyncHttpClient {
  @Override
  public <T> ListenableFuture<T> executeRequest(Request request, AsyncHandler<T> handler) {
      if (config.getRequestFilters().isEmpty()) {
          return execute(request, handler);
      } else {
        //不考虑设置请求过滤器的情况
      }
  }
  private <T> ListenableFuture<T> execute(Request request, final AsyncHandler<T> asyncHandler) {
      try {
          //把请求参数,和读取数据后的回调一同塞给请求发送器
          return requestSender.sendRequest(request, asyncHandler, null, false);
      } catch (Exception e) {
          asyncHandler.onThrowable(e);
          return new ListenableFuture.CompletedFailure<>(e);
      }
  }
}

OK~~上面列出NettyRequestSender需要什么参数,现在再来看看怎么做的? 下面的方法中,重点关注sendRequestWithNewChannel,它包括了如何新建channel、连接,抢占信号量

public <T> ListenableFuture<T> sendRequest(final Request request,//
            final AsyncHandler<T> asyncHandler,//
            NettyResponseFuture<T> future,//
            boolean performingNextRequest) {
    //...
    ProxyServer proxyServer = getProxyServer(config, request);
    //使用SSL代理或者ws
    if (proxyServer != null && (request.getUri().isSecured() || request.getUri().isWebSocket()) && !isConnectDone(request, future))
        //暂时忽略另外两个创建连接的方式
    else
        //我们的例子用的是GET,所以执行该方法
        return sendRequestWithCertainForceConnect(request, asyncHandler, future, performingNextRequest, proxyServer, false);
}

private <T> ListenableFuture<T> sendRequestWithCertainForceConnect(//
            Request request,//
            AsyncHandler<T> asyncHandler,//
            NettyResponseFuture<T> future,//注意,这时候传进来是null
            boolean performingNextRequest,//
            ProxyServer proxyServer,//
            boolean forceConnect) {
    //把所有请求信息保证在一个响应回调对象里
    NettyResponseFuture<T> newFuture = newNettyRequestAndResponseFuture(request, asyncHandler, future, proxyServer, forceConnect);
    //这里视图根据这个请求去拿去channel,过程有点漫长,回头再来解释
    Channel channel = getOpenChannel(future, request, proxyServer, asyncHandler);

    if (Channels.isChannelValid(channel))
        return sendRequestWithOpenChannel(request, proxyServer, newFuture, asyncHandler, channel);
    else
        return sendRequestWithNewChannel(request, proxyServer, newFuture, asyncHandler, performingNextRequest);
}
private Channel getOpenChannel(NettyResponseFuture<?> future, Request request, ProxyServer proxyServer, AsyncHandler<?> asyncHandler) {
    //future并没有channel,对于什么时候channel是可复用的,一直没搞明白,所以我基本默认每次都要新建一个channel
    if (future != null && future.isReuseChannel() && Channels.isChannelValid(future.channel()))
        return future.channel();
    //视图在channelManager中找到可用对象
    else
        return pollPooledChannel(request, proxyServer, asyncHandler);
}

private <T> ListenableFuture<T> sendRequestWithOpenChannel(Request request, ProxyServer proxy, NettyResponseFuture<T> future, AsyncHandler<T> asyncHandler, Channel channel) {
    if (asyncHandler instanceof AsyncHandlerExtensions)
        AsyncHandlerExtensions.class.cast(asyncHandler).onConnectionPooled(channel);
    //启动请求超时,在writeRequest中,会启动读取超时
    TimeoutsHolder timeoutsHolder = scheduleRequestTimeout(future);
    timeoutsHolder.initRemoteAddress((InetSocketAddress) channel.remoteAddress());
    future.setChannelState(ChannelState.POOLED);
    future.attachChannel(channel, false);

    Channels.setAttribute(channel, future);
    if (Channels.isChannelValid(channel)) {
        writeRequest(future, channel);
    } else {
        handleUnexpectedClosedChannel(channel, future);
    }
    return future;
}
//把这里当作一个请求连接的开始
private <T> ListenableFuture<T> sendRequestWithNewChannel(//
            Request request,//
            ProxyServer proxy,//
            NettyResponseFuture<T> future,//
            AsyncHandler<T> asyncHandler,//
            boolean performingNextRequest) {

    Realm realm = future.getRealm();
    Realm proxyRealm = future.getProxyRealm();
    //...
    //为做连接做准备
    Bootstrap bootstrap = channelManager.getBootstrap(request.getUri(), proxy);
    //用于索取channel
    Object partitionKey = future.getPartitionKey();
    final boolean acquireChannelLock = !performingNextRequest;

    try {
        //抢占信号量
        if (acquireChannelLock) {
            channelManager.acquireChannelLock(partitionKey);
        }
    } catch (Throwable t) {
        abort(null, future, getCause(t));
        return future;
    }
    //开启请求超时定时器
    scheduleRequestTimeout(future);
    //域名解析
    RequestHostnameResolver.INSTANCE.resolve(request, proxy, asyncHandler)//
            .addListener(new SimpleFutureListener<List<InetSocketAddress>>() {
                @Override
                //域名解析后得到的IP地址列表
                protected void onSuccess(List<InetSocketAddress> addresses) {
                    NettyConnectListener<T> connectListener = new NettyConnectListener<>(future, NettyRequestSender.this, channelManager, acquireChannelLock, partitionKey);
                    //不要怀疑!这里开始连接了!!!
                    NettyChannelConnector connector = new NettyChannelConnector(request.getLocalAddress(), addresses, asyncHandler, clientState, config);
                    if (!future.isDone()) {
                        connector.connect(bootstrap, connectListener);
                    } else if (acquireChannelLock) {
                        //如果future已经完成,则释放信号量
                        channelManager.releaseChannelLock(partitionKey);
                    }
                }
                @Override
                protected void onFailure(Throwable cause) {
                    //失败,释放信号
                    if (acquireChannelLock) {
                        channelManager.releaseChannelLock(partitionKey);
                    }
                    abort(null, future, getCause(cause));
                }
            });
    return future;
}

NettyChannelConnector负责对远程IP创建连接,一旦连接成功,NettyConnectListener就会调用requestSender向服务端发送数据。

public class NettyChannelConnector {
  public void connect(final Bootstrap bootstrap, final NettyConnectListener<?> connectListener) {
      //获取DNS后的IP地址
      final InetSocketAddress remoteAddress = remoteAddresses.get(i);
      if (asyncHandlerExtensions != null)
          asyncHandlerExtensions.onTcpConnectAttempt(remoteAddress);
      try {
          connect0(bootstrap, connectListener, remoteAddress);
      } catch (RejectedExecutionException e) {
          if (clientState.isClosed()) {
              connectListener.onFailure(null, e);
          }
      }
  }

  private void connect0(Bootstrap bootstrap, final NettyConnectListener<?> connectListener, InetSocketAddress remoteAddress) {
      bootstrap.connect(remoteAddress, localAddress)//
              .addListener(new SimpleChannelFutureListener() {
                  @Override
                  public void onSuccess(Channel channel) {
                      if (asyncHandlerExtensions != null) {
                          asyncHandlerExtensions.onTcpConnectSuccess(remoteAddress, channel);
                      }
                      //如果有设置连接的存活时间,则初始化channelId,在ChannelPool中自检有用到
                      if (connectionTtlEnabled) {
                          Channels.initChannelId(channel);
                      }
                      connectListener.onSuccess(channel, remoteAddress);
                  }
                  @Override
                  public void onFailure(Channel channel, Throwable t) {
                      if (asyncHandlerExtensions != null)
                          asyncHandlerExtensions.onTcpConnectFailure(remoteAddress, t);
                      //如果连接失败,则尝试连接下一个IP
                      boolean retry = pickNextRemoteAddress();
                      if (retry)
                          NettyChannelConnector.this.connect(bootstrap, connectListener);
                      else
                          connectListener.onFailure(channel, t);
                  }
              });
  }
}

连接成功,就来到这里,拿到channel,准备向服务器发送数据!

public final class NettyConnectListener<T> {
  public void onSuccess(Channel channel, InetSocketAddress remoteAddress) {

      Channels.setInactiveToken(channel);
      TimeoutsHolder timeoutsHolder = future.getTimeoutsHolder();
      if (futureIsAlreadyCancelled(channel)) {
          return;
      }
      Request request = future.getTargetRequest();
      Uri uri = request.getUri();
      timeoutsHolder.initRemoteAddress(remoteAddress);

      if (future.getProxyServer() == null && uri.isSecured()) {
        //直接无视
      } else {
          writeRequest(channel);
      }
  }

  private void writeRequest(Channel channel) {
      if (futureIsAlreadyCancelled(channel)) {
          return;
      }
      //在这设置属性,在读取服务器数据的httphandler里面有用到
      Channels.setAttribute(channel, future);
      //注册到ChannelGroup中
      channelManager.registerOpenChannel(channel, partitionKey);
      //设置为不复用channel
      future.attachChannel(channel, false);
      //发送请求数据
      //这个方法就不贴上来了,没什么意思
      //方法里最后将启动读取超时scheduleReadTimeout(future);意味将进入HttpHandler读取服务端数据
      requestSender.writeRequest(future, channel);
  }
}

读取数据一切顺利后,就会走下面这个私有方法,将channel送入channelpool里,等待生命的结束!

public final class HttpHandler extends AsyncHttpClientHandler {
  private void finishUpdate(final NettyResponseFuture<?> future, Channel channel, boolean expectOtherChunks) throws IOException {
      future.cancelTimeouts();
      boolean keepAlive = future.isKeepAlive();
      //这里继续读取后面的数据块,最后channel被设置了回调,依然调用下面的tryToOfferChannelToPool方法
      if (expectOtherChunks && keepAlive)
          channelManager.drainChannelAndOffer(channel, future);
      else
          channelManager.tryToOfferChannelToPool(channel, future.getAsyncHandler(), keepAlive, future.getPartitionKey());
      try {
          future.done();
      } catch (Exception t) {}
  }
}

tryToOfferChannelToPoolChannelManager的方法,主要将依然活跃的channel送入生命倒数器中,还记得connectionTtl么,这个参数在这就起作用了!

public final void tryToOfferChannelToPool(Channel channel, AsyncHandler<?> asyncHandler, boolean keepAlive, Object partitionKey) {
    //长连接,或者依然活跃的
    if (channel.isActive() && keepAlive) {
        //丢弃被设置的属性
        Channels.setDiscard(channel);
        if (asyncHandler instanceof AsyncHandlerExtensions)
            AsyncHandlerExtensions.class.cast(asyncHandler).onConnectionOffer(channel);
            //尝试塞进pool里
        if (channelPool.offer(channel, partitionKey)) {
            if (maxConnectionsPerHostEnabled)
                //我没明白这个映射到底是干嘛用的
                channelId2PartitionKey.putIfAbsent(channel, partitionKey);
        } else {
          //被pool驳回,就直接关闭掉!!
            closeChannel(channel);
        }
    } else {
      //已经死亡或者不是长连接,直接关闭!!
        closeChannel(channel);
    }
}

<font color=#f28080>到这里,关于channel已经接近尾声了,细心的童鞋可能发现,信号量呢?!不用释放么?!其实在关闭channel的时候,已经释放了,这是因为 ChannelGroup 的作用,在将channel注册(add方法)到group的时候,已经在其上面加了关闭的监听器,一旦close就执行remove,实例化 ChannelGroup 时已经将remove(channel)重写,可以倒回去看是不是已经释放了信号量,也可以看看 ChannelGroup 源码是不是在add时候添加了监听器。</font>

不过,这里只是接近尾声,没意味就结束了,还有存活的channel被塞到 ChannelPool 进行生命的倒计时。

public final class DefaultChannelPool implements ChannelPool {
  private final ConcurrentHashMap<Object, ConcurrentLinkedDeque<IdleChannel>> partitions = new ConcurrentHashMap<>();
  private final ConcurrentHashMap<ChannelId, ChannelCreation> channelId2Creation;
  private final AtomicBoolean isClosed = new AtomicBoolean(false);
  private final Timer nettyTimer;
  private final int connectionTtl;
  private final boolean connectionTtlEnabled;
  private final int maxIdleTime;
  private final boolean maxIdleTimeEnabled;
  private final long cleanerPeriod;
  private final PoolLeaseStrategy poolLeaseStrategy;

  public DefaultChannelPool(int maxIdleTime,//
            int connectionTtl,//
            PoolLeaseStrategy poolLeaseStrategy,//
            Timer nettyTimer,//
            int cleanerPeriod) {
      this.maxIdleTime = maxIdleTime;
      this.connectionTtl = connectionTtl;
      connectionTtlEnabled = connectionTtl > 0;
      channelId2Creation = connectionTtlEnabled ? new ConcurrentHashMap<>() : null;
      this.nettyTimer = nettyTimer;
      maxIdleTimeEnabled = maxIdleTime > 0;
      this.poolLeaseStrategy = poolLeaseStrategy;
      //在cleanerPeriod清理周期时间、connectionTtl连接存活时间、maxIdleTime最大空闲时间中选择最小的
      this.cleanerPeriod = Math.min(cleanerPeriod定时清理周期, Math.min(connectionTtlEnabled ? connectionTtl : Integer.MAX_VALUE, maxIdleTimeEnabled ? maxIdleTime : Integer.MAX_VALUE));
      //如果开启了连接存活时间,或者最大空闲时间,则实例化空闲channel检测
      if (connectionTtlEnabled || maxIdleTimeEnabled)
          scheduleNewIdleChannelDetector(new IdleChannelDetector());
  }

  private void scheduleNewIdleChannelDetector(TimerTask task) {
      nettyTimer.newTimeout(task, cleanerPeriod, TimeUnit.MILLISECONDS);
  }

  private final class IdleChannelDetector implements TimerTask {
      //挖出已经不满足条件的channel
      private List<IdleChannel> expiredChannels(ConcurrentLinkedDeque<IdleChannel> partition, long now) {
          List<IdleChannel> idleTimeoutChannels = null;
          for (IdleChannel idleChannel : partition) {
              //空闲时间是否过期
              boolean isIdleTimeoutExpired = isIdleTimeoutExpired(idleChannel, now);
              //channel是否还活跃
              boolean isRemotelyClosed = isRemotelyClosed(idleChannel.channel);
              //存活时间是否过期
              boolean isTtlExpired = isTtlExpired(idleChannel.channel, now);
              //满足其中一个条件,加入即将被关闭的channel队列
              if (isIdleTimeoutExpired || isRemotelyClosed || isTtlExpired) {
                  if (idleTimeoutChannels == null)
                      idleTimeoutChannels = new ArrayList<>(1);
                  idleTimeoutChannels.add(idleChannel);
              }
          }
          return idleTimeoutChannels != null ? idleTimeoutChannels : Collections.<IdleChannel> emptyList();
      }
      //关闭expiredChannels筛选出来的队列,并返回一个已被close的channel队列
      private List<IdleChannel> closeChannels(List<IdleChannel> candidates) {
          List<IdleChannel> closedChannels = null;
          for (int i = 0; i < candidates.size(); i++) {
              IdleChannel idleChannel = candidates.get(i);
              //如果未被占有,则直接close;如果中间出现有被占有的channel,实例化closedChannels,并将之前被close的channel塞进其中
              if (idleChannel.takeOwnership()) {
                  close(idleChannel.channel);
                  if (closedChannels != null) {
                      closedChannels.add(idleChannel);
                  }
              } //注意,这里只会被执行一次,closedChannels被实例化后不会再执行
              else if (closedChannels == null) {
                  closedChannels = new ArrayList<>(candidates.size());
                  for (int j = 0; j < i; j++)
                      closedChannels.add(candidates.get(j));
              }
          }
          //如果closedChannels为null,代表已经关闭candidates所有channel,原封不动返回
          //如果closedChannels非null,代表被占用的channel没有close并继续存活在candidates,所以返回被close了的channel队列closedChannels
          return closedChannels != null ? closedChannels : candidates;
      }
      public void run(Timeout timeout) throws Exception {
          if (isClosed.get())
              return;
          //检测器的启动时间
          long start = unpreciseMillisTime();
          int closedCount = 0;
          int totalCount = 0;
          //遍历每个路由的被塞到ChannelPool的channel队列
          for (ConcurrentLinkedDeque<IdleChannel> partition : partitions.values()) {
              List<IdleChannel> closedChannels = closeChannels(expiredChannels(partition, start));
              //非空且开启了连接存活时间的channel且被close的channel,全部从channelId2Creation和partition中去除
              if (!closedChannels.isEmpty()) {
                  if (connectionTtlEnabled) {
                      for (IdleChannel closedChannel : closedChannels)
                          channelId2Creation.remove(channelId(closedChannel.channel));
                  }
                  partition.removeAll(closedChannels);
                  closedCount += closedChannels.size();
              }
          }
          //退出并继续下一轮检测
          scheduleNewIdleChannelDetector(timeout.task());
      }
  }

  //存放空闲channel
  private static final class IdleChannel {
      final Channel channel;
      final long start;
      final AtomicBoolean owned = new AtomicBoolean(false);
      IdleChannel(Channel channel, long start) {
          this.channel = assertNotNull(channel, "channel");
          this.start = start;
      }
      public boolean takeOwnership() {
          return owned.compareAndSet(false, true);
      }
      @Override
      public boolean equals(Object o) {...}
      @Override
      public int hashCode() {...}
  }
  //存放channel的创建时间
  private static final class ChannelCreation {
      final long creationTime;
      final Object partitionKey;
      ChannelCreation(long creationTime, Object partitionKey) {
          this.creationTime = creationTime;
          this.partitionKey = partitionKey;
      }
  }
}

这里才是channel的终结!!!

channel被终结了,但有些还存活的channel还在请求的路上,还有很重要的两点没说到,就是 请求超时读取超时。 每个NettyResponseFuture都持有一个TimeoutsHolder来计算 requestTimeoutreadTimeout 是否过期。在ResponseFuture获取连接后,以及获取成功向服务器发送数据后,都会分别启动请求超时和读取超时两个定时器。通过阅读源码,可以发现 requestTimeout 其实是包括了 readTimeout,如果请求剩余时间小于读取超时时间时,startReadTimeout是不会启动readTimeout定时器的。下面只贴上TimeoutsHolder的部分源码,RequestTimeoutTimerTaskReadTimeoutTimerTask可以自行阅读。

<font color=#f28080>对于这两个参数,需要说明一点就是,一旦超时过期,channel和future都会被close掉,如果读超设置比请超长则是无意义的,只会以requestTimeout为准。</font>

public class TimeoutsHolder {
  private final AtomicBoolean cancelled = new AtomicBoolean();

  private final Timer nettyTimer;
  private final NettyRequestSender requestSender;
  private final long requestTimeoutMillisTime;
  private final int readTimeoutValue;

  private volatile NettyResponseFuture<?> nettyResponseFuture;
  public final Timeout requestTimeout;
  public volatile Timeout readTimeout;

  public TimeoutsHolder(Timer nettyTimer, NettyResponseFuture<?> nettyResponseFuture, NettyRequestSender requestSender, AsyncHttpClientConfig config) {
      this.nettyTimer = nettyTimer;
      this.nettyResponseFuture = nettyResponseFuture;
      this.requestSender = requestSender;
      this.readTimeoutValue = config.getReadTimeout();
      int requestTimeoutInMs = nettyResponseFuture.getTargetRequest().getRequestTimeout();
      //每个请求都可以独立设置请求超时时间
      if (requestTimeoutInMs == 0) {
          requestTimeoutInMs = config.getRequestTimeout();
      }
      if (requestTimeoutInMs != -1) {
          //请求的到期时间,启动请求超时定时器
          requestTimeoutMillisTime = unpreciseMillisTime() + requestTimeoutInMs;
          requestTimeout = newTimeout(new RequestTimeoutTimerTask(nettyResponseFuture, requestSender, this, requestTimeoutInMs), requestTimeoutInMs);
      } else {
          requestTimeoutMillisTime = -1L;
          requestTimeout = null;
      }
  }

  public void startReadTimeout() {
      if (readTimeoutValue != -1) {
          startReadTimeout(null);
      }
  }
  void startReadTimeout(ReadTimeoutTimerTask task) {
      //如果requestTimeout不为null,或者requestTimeout还没有过期并且读取超时时间<请求剩余时间
      if (requestTimeout == null || (!requestTimeout.isExpired() && readTimeoutValue < (requestTimeoutMillisTime - unpreciseMillisTime()))) {
          if (task == null) {
              task = new ReadTimeoutTimerTask(nettyResponseFuture, requestSender, this, readTimeoutValue);
          }
          Timeout readTimeout = newTimeout(task, readTimeoutValue);
          this.readTimeout = readTimeout;
      } else if (task != null) {
          task.clean();
      }
  }
}

最后最后最后。。。

来总结一下ahc的连接池实现,很明显的一点整个过程都是对Channel的管理,而且对于连接的抢占则使用了Semaphore,这再方便不过了!!!对于信号量的释放,Netty的ChannelGroup有很大的功劳,它提供了最优雅的方式关闭channel并且释放信号量。除此之外,一堆的超时限制任务需要一个定时任务容器执行,Netty又提供了一个在面对大量任务依然稳坐泰山的HashedWheelTimer,有机会专门来说说这一个。还有就是DefaultChannelPool对存活时间的检测,实在是通俗易懂,而且基于前面说的几点,实现起来也相当方便。

如果遇到基于netty的网络编程开发,对于连接资源的管理ahc确实提供了一套不错的思路,不仅对客户端,服务端也是可以试一试的!

© 著作权归作者所有

共有 人打赏支持
粉丝 2
博文 1
码字总数 4717
作品 0
广州
程序员
servlet接收post方式传参,但参数全部为null

app使用asynchttpclient调用tomcat(版本7) servlet,post方式,servlet接收到的参数全部为null,这种情况偶尔就会出现,参数大小是没有限制的,请问这是什么原因...

hhl3136 ⋅ 04/21 ⋅ 0

android-async-http框架库使用基础

开源项目链接 android-async-http仓库:git clone https://github.com/loopj/android-async-http android-async-http主页:http://loopj.com/android-async-http/ 背景知识 开始使用分析前还......

xiahuawuyu ⋅ 2015/12/22 ⋅ 0

Netty 源码分析之 三 我就是大名鼎鼎的 EventLoop(一)

目录 源码之下无秘密 ── 做最好的 Netty 源码分析教程 Netty 源码分析之 番外篇 Java NIO 的前生今世 Java NIO 的前生今世 之一 简介 Java NIO 的前生今世 之二 NIO Channel 小结 Java NIO...

永顺 ⋅ 2017/11/29 ⋅ 0

netty高性能浅析

最近在看内部的rpc框架,感觉rpc最重要的就是高效的网络传输,所以学习netty的使用和原理极其重要,于是把netty的基础再总结一下。 首先我们先来看一下netty中的概念,熟悉了基本概念后再来进...

KKys ⋅ 2017/08/30 ⋅ 0

京东的Netty实践,京麦TCP网关长连接容器架构

京麦从 2014 年构建网关,从 HTTP 网关发展到 TCP 网关。在 2016 年重构完成基于 Netty4.x+Protobuf3.x 实现对接 PC 和 App 上下行通信的高可用、高性能、高稳定的 TCP 长连接网关。本文重点...

张松然 ⋅ 01/03 ⋅ 0

Netty 源码分析之 一 揭开 Bootstrap 神秘的红盖头 (客户端)

目录 Netty 源码分析之 番外篇 Java NIO 的前生今世 Java NIO 的前生今世 之一 简介 Java NIO 的前生今世 之二 NIO Channel 小结 Java NIO 的前生今世 之三 NIO Buffer 详解 Java NIO 的前生...

永顺 ⋅ 2017/11/29 ⋅ 0

Netty那点事(一)概述

Netty和Mina是Java世界非常知名的通讯框架。它们都出自同一个作者,Mina诞生略早,属于Apache基金会,而Netty开始在Jboss名下,后来出来自立门户netty.io。关于Mina已有@FrankHui的Mina系列文...

黄亿华 ⋅ 2013/09/21 ⋅ 21

netty文章收藏

1.林峰的关于netty文章 不错,遇到问题反复看了几遍总有启发,还说收藏记录一下。 http://www.infoq.com/cn/netty Netty案例集锦之多线程篇(续)作者 李林锋 发布于 2015年11月25日 9 Nett...

GREKI ⋅ 2016/04/05 ⋅ 0

Netty 源码分析(一):引言和 Java NIO 介绍

为什么要介绍 Netty 如今优秀的开源项目非常多,仅在 Java 服务器端开发领域,优秀的开源项目就不胜枚举。比如从十年前就开始流行到现在依旧十分活跃的 Spring Framework,如今已经发展为一个...

编走编想 ⋅ 2015/10/01 ⋅ 2

Netty干货分享:京东京麦的生产级TCP网关技术实践总结

1、引言 京东的京麦商家后台2014年构建网关,从HTTP网关发展到TCP网关。在2016年重构完成基于Netty4.x+Protobuf3.x实现对接PC和App上下行通信的高可用、高性能、高稳定的TCP长连接网关。 早期...

JackJiang2011 ⋅ 2017/12/01 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

浅谈springboot Web模式下的线程安全问题

我们在@RestController下,一般都是@AutoWired一些Service,由于这些Service都是单例,所以并不存在线程安全问题。 由于Controller本身是单例模式 (非线程安全的), 这意味着每个request过来,...

算法之名 ⋅ 今天 ⋅ 0

知乎Java数据结构

作者:匿名用户 链接:https://www.zhihu.com/question/35947829/answer/66113038 来源:知乎 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。 感觉知乎上嘲讽题主简...

颖伙虫 ⋅ 今天 ⋅ 0

Confluence 6 恢复一个站点有关使用站点导出为备份的说明

推荐使用生产备份策略。我们推荐你针对你的生产环境中使用的 Confluence 参考 Production Backup Strategy 页面中的内容进行备份和恢复(这个需要你备份你的数据库和 home 目录)。XML 导出备...

honeymose ⋅ 今天 ⋅ 0

JavaScript零基础入门——(九)JavaScript的函数

JavaScript零基础入门——(九)JavaScript的函数 欢迎回到我们的JavaScript零基础入门,上一节课我们了解了有关JS中数组的相关知识点,不知道大家有没有自己去敲一敲,消化一下?这一节课,...

JandenMa ⋅ 今天 ⋅ 0

火狐浏览器各版本下载及插件httprequest

各版本下载地址:http://ftp.mozilla.org/pub/mozilla.org//firefox/releases/ httprequest插件截至57版本可用

xiaoge2016 ⋅ 今天 ⋅ 0

Docker系列教程28-实战:使用Docker Compose运行ELK

原文:http://www.itmuch.com/docker/28-docker-compose-in-action-elk/,转载请说明出处。 ElasticSearch【存储】 Logtash【日志聚合器】 Kibana【界面】 答案: version: '2'services: ...

周立_ITMuch ⋅ 今天 ⋅ 0

使用快嘉sdkg极速搭建接口模拟系统

在具体项目研发过程中,一旦前后端双方约定好接口,前端和app同事就会希望后台同事可以尽快提供可供对接的接口方便调试,而对后台同事来说定好接口还仅是个开始、设计流程,实现业务逻辑,编...

fastjrun ⋅ 今天 ⋅ 0

PXE/KickStart 无人值守安装

导言 作为中小公司的运维,经常会遇到一些机械式的重复工作,例如:有时公司同时上线几十甚至上百台服务器,而且需要我们在短时间内完成系统安装。 常规的办法有什么? 光盘安装系统 ===> 一...

kangvcar ⋅ 昨天 ⋅ 0

使用Puppeteer撸一个爬虫

Puppeteer是什么 puppeteer是谷歌chrome团队官方开发的一个无界面(Headless)chrome工具。Chrome Headless将成为web应用自动化测试的行业标杆。所以我们很有必要来了解一下它。所谓的无头浏...

小草先森 ⋅ 昨天 ⋅ 0

Java Done Right

* 表示难度较大或理论性较强。 ** 表示难度更大或理论性更强。 【Java语言本身】 基础语法,面向对象,顺序编程,并发编程,网络编程,泛型,注解,lambda(Java8),module(Java9),var(...

风华神使 ⋅ 昨天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部