深入理解OkHttp3(3):Connections

原创
2017/04/25 13:45
阅读数 482

img

OkHttp3的网络连接创建和数据传输由责任链网络层的ConnectInterceptorCallServerInterceptor完成。ConnectInterceptor为请求创建与服务器网络连接,CallServerInterceptor负责网络数据读写的具体操作。

img

1. StreamAllocation

StreamAllocation类似中介者模式,协调Connections、Stream和Call三者之间的关系。每个Call在Application层RetryAndFollowUpInterceptor实例化一个StreamAllocation

相同Address(相同的Host与端口)可以共用相同的连接RealConnection。

  1. StreamAllocation通过Address,从连接池ConnectionPools中取出有效的RealConnection,与远程服务器建立Socket连接。
  2. 在处理响应结束后或出现网络异常时,释放Socket连接。
  3. 每个RealConnection都持有对StreamAllocation的弱引用,用于连接闲置状态的判断。

查找可用连接

private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
      boolean connectionRetryEnabled) throws IOException {
    Route selectedRoute;
    synchronized (connectionPool) {
      // prediction check
      ...
      // 尝试使用已分配的连接
      ...
      // 尝试从连接池获取
      Internal.instance.get(connectionPool, address, this);
      ...
      selectedRoute = route;
    }

    // 未路由寻址时,查找可用路由
    ...

    // 立即创建RealConnection
    RealConnection result;
    // 并发
    synchronized (connectionPool) {
      route = selectedRoute;
      refusedStreamCount = 0;
      result = new RealConnection(connectionPool, selectedRoute);
      ...
      if (canceled) throw new IOException("Canceled");
    }

    // 进行TCP和TLS握手,建立与服务器连接通道
    result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
    // 路由可用,从失败路由表中移除
    routeDatabase().connected(result.route());

    Socket socket = null;
    synchronized (connectionPool) {
      // 添加到连接池,如果其他并发线程已创建同个多路复用的连接,则丢弃当前的连接
      // 并释放Socket资源
      ...
    }
    ...
    return result;
  }

2. ConnectionPool

Okhttp3封装了网络连接的逻辑,通过RealConnection建立Socket连接。因为建立Socket网络连接会增加网络延迟,尤其是对于应用程序客户端的频繁的网络请求,需要建立HTTP和HTTP/2的重用策略,降低网络连接代理性能损耗。

ConnectionPool针对于相同Address的请求复用同一个connection,,维护RealConnection类似循环队列形式的双端队列。 ConnectionPool默认的最大空闲连接数为5,最大的空闲时间为5分钟。

public ConnectionPool() {
    this(5, 5, TimeUnit.MINUTES);
  }

ConnectionPool使用ArrayDeque来维护池内的连接。ArrayDeque大多数操作时间复杂度为O(1)。对于remove、removeFirstOccurrence、removeLastOccurrence、contains和iterator.remove()方法,时间复杂度为O(n),线性复杂度。双端队列中的元素可以从两端弹出,插入和删除操作限定在队列的两边进行。

2.1. 循环数组

private final Deque<RealConnection> connections = new ArrayDeque<>();

ArrayDeque是双端队列的数组的实现,ArrayDeque是一个可变大小的循环数组,没有长度大小限制,非线程安全,因此不支持多线程并发,需要对数组的操作需要加上同步锁。 它的默认长度是16。

public ArrayDeque() {
        elements = new Object[16];
    }

设定指定长度数组时,则初始长度为比选择的数大的最小2的指数的整数。

private void allocateElements(int numElements) {
        int initialCapacity = MIN_INITIAL_CAPACITY;
        // 移位操作将最高位以下的为置为1,正好比
        if (numElements >= initialCapacity) {
            initialCapacity = numElements;
            initialCapacity |= (initialCapacity >>>  1);
            initialCapacity |= (initialCapacity >>>  2);
            initialCapacity |= (initialCapacity >>>  4);
            initialCapacity |= (initialCapacity >>>  8);
            initialCapacity |= (initialCapacity >>> 16);
            initialCapacity++; 
            ...
        }
        elements = new Object[initialCapacity];
    }
  1. 添加连接
void put(RealConnection connection) {
    ...
    // 是否需要清理连接池
    ...
    connections.add(connection);
  }
  1. 查找连接
// 使用迭代来查找连接池连接
connections.iterator()
  1. 清理连接
// 如果返回true , 表示连接已经从连接池移除,需要立刻关闭
  boolean connectionBecameIdle(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (connection.noNewStreams || maxIdleConnections == 0) {
      connections.remove(connection);
      return true;
    } else {
      // 唤醒清理线程,因为可能超过了最大空闲连接数
      notifyAll();
      return false;
    }
  }

2.2. 连接清理

内置的清理线程会定时对连接池检查,清理空闲连接。根据空闲连接的最大空闲时间和连接数,判断执行清理操作。

private final Runnable cleanupRunnable = new Runnable() {
    @Override public void run() {
      while (true) {
        long waitNanos = cleanup(System.nanoTime());
        // waitNanos等于-1表示无空闲或在用的连接
        ...
        // 如果有空闲或在用连接则定时启动清理线程
        ...
      }
    }
  };
  
  /**
  * 维护连接池,清理超过最大空闲存活时间或数量的空闲连接。
  * 返回下次调用这个方法的纳秒大小的睡眠时间。返回-1表示无须再次执行。
  */
  long cleanup(long now) {
    int inUseConnectionCount = 0;  // 在用连接数
    int idleConnectionCount = 0;   // 空闲连接数
    RealConnection longestIdleConnection = null; // 临时最大空闲连接
    long longestIdleDurationNs = Long.MIN_VALUE; // 最大空闲持续时间

    ...
    synchronized (this) {
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();
        // 迭代查找最大空闲时间的连接,记录空闲连接和在用连接数
        ...
      }

      if (longestIdleDurationNs >= this.keepAliveDurationNs
          || idleConnectionCount > this.maxIdleConnections) {
        // 超过最大空闲连接时间或空闲数据超过最大空闲连接数,从连接池移除连接
        connections.remove(longestIdleConnection);
      } else if (idleConnectionCount > 0) {
        // 有空闲连接但是未超标,返回下次执行清理的时间间隔
        return keepAliveDurationNs - longestIdleDurationNs;
      } else if (inUseConnectionCount > 0) {
        // 所有连接都在使用,返回最大连接时间
        return keepAliveDurationNs;
      } else {
        // 无空闲和在用时间
        cleanupRunning = false;
        return -1;
      }
    }
    ...
    // 立即清理
    return 0;
  }

除了自动清理,还支持手动清理空闲的连接

public void evictAll() {
    List<RealConnection> evictedConnections = new ArrayList<>();
    synchronized (this) {
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();
        if (connection.allocations.isEmpty()) {
          // 如果处于空闲状态,直接移除
          connection.noNewStreams = true;
          evictedConnections.add(connection);
          i.remove();
        }
      }
    }

    for (RealConnection connection : evictedConnections) {
      // 循环关闭连接
    }
  }

2.3. Address

连接池通过Address来判断是否有可复用的连接。依据服务器地址、端口号判断是否相同的Address

@Address.java 
  @Override public boolean equals(Object other) {
    if (other instanceof Address) {
      Address that = (Address) other;
      return this.url.equals(that.url) && ...;
    }
    return false;
  }

ConnectionPools中不存在对应的Address的RealConnection则须进行路由寻址,初始化RealConnection,执行网络建立和传输操作。

3. RealConnection

初始化一个连接需要ConnectionPoolRoute两个参数

public RealConnection(ConnectionPool connectionPool, Route route) {
    this.connectionPool = connectionPool;
    this.route = route;
  }

Route 在创建RealConnection实例前,执行路由寻址操作,找到可用的路由。出现异常,则返回到应用层进行重试处理。

3.1. 路由寻址

路由寻址为指定Address初始化Route实例。

public Route next() throws IOException {
    // Compute the next route to attempt.
    if (!hasNextInetSocketAddress()) {
      // 未进行寻址
      if (!hasNextProxy()) {
        // 无可用代理,尝试之前连接失败的路由
        ...
        return nextPostponed();
      }
      // 代理
      lastProxy = nextProxy();
    }
    lastInetSocketAddress = nextInetSocketAddress();

    Route route = new Route(address, lastProxy, lastInetSocketAddress);
    if (routeDatabase.shouldPostpone(route)) {
      // 如果路由曾失败过,添加到待用的路由表中
      postponedRoutes.add(route);
      // 继续查找合适的路由
      return next();
    }

    return route;
  }

代理设置,配置连接的host和端口号。

private void resetNextInetSocketAddress(Proxy proxy) throws IOException {
    // Clear the addresses. Necessary if getAllByName() below throws!
    inetSocketAddresses = new ArrayList<>();

    String socketHost;
    int socketPort;
    if (proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.SOCKS) {
      // 无代理
      socketHost = address.url().host();
      socketPort = address.url().port();
    } else {
      // 代理
      ....
    }

    // 端口号检查
    ...
    if (proxy.type() == Proxy.Type.SOCKS) {
      ...
    } else {
      // Dns寻址
      List<InetAddress> addresses = address.dns().lookup(socketHost);
      for (int i = 0, size = addresses.size(); i < size; i++) {
        InetAddress inetAddress = addresses.get(i);
        inetSocketAddresses.add(new InetSocketAddress(inetAddress, socketPort));
      }
    }
    ...
  }

路由寻址成功后,初始化RealConnection并添加到连接池中,建立Socket连接

3.2. Socket连接

OkHttp3支持HTTP与HTTP/2。HTTP/2需要进行TLS的握手流程。

@StreamAllocation.java
public void connect(
      int connectTimeout, int readTimeout, int writeTimeout, boolean connectionRetryEnabled) {
    if (protocol != null) throw new IllegalStateException("already connected");

    RouteException routeException = null;
    List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
    ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);

    if (route.address().sslSocketFactory() == null) {
      if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
        throw new RouteException(new UnknownServiceException(
            "CLEARTEXT communication not enabled for client"));
      }
      String host = route.address().url().host();
      if (!Platform.get().isCleartextTrafficPermitted(host)) {
        throw new RouteException(new UnknownServiceException(
            "CLEARTEXT communication to " + host + " not permitted by network security policy"));
      }
    }

    while (true) {
      try {
        if (route.requiresTunnel()) {
          // 使用隧道在不兼容的网络上传输数据,或在不安全网络上提供安全路径
          connectTunnel(connectTimeout, readTimeout, writeTimeout);
        } else {
          // 直接建立socket连接
          connectSocket(connectTimeout, readTimeout);
        }
        // 协议连接方式,HTTP/1或者HTTP/2
        establishProtocol(connectionSpecSelector);
        break;
      } catch (IOException e) {
        ...
      }
    }

    if (http2Connection != null) {
      synchronized (connectionPool) {
        allocationLimit = http2Connection.maxConcurrentStreams();
      }
    }
  }

TLS 握手流程

img

建立Socket连接成功之后,客户端就可以与服务端进行通讯了。

展开阅读全文
打赏
0
1 收藏
分享
加载中
更多评论
打赏
0 评论
1 收藏
0
分享
返回顶部
顶部