OkHttp3源码解析(三)——连接池复用

本文基于OkHttp3的3.11.0版本

implementation 'com.squareup.okhttp3:okhttp:3.11.0'

我们已经分析了OkHttp3的拦截器链和缓存策略,今天我们再来看看OkHttp3的连接池复用。

客户端和服务器建立socket连接需要经历TCP的三次握手和四次挥手,是一种比较消耗资源的动作。Http中有一种keepAlive connections的机制,在和客户端通信结束以后可以保持连接指定的时间。OkHttp3支持5个并发socket连接,默认的keepAlive时间为5分钟。下面我们来看看OkHttp3是怎么实现连接池复用的。

OkHttp3的连接池--ConnectionPool

public final class ConnectionPool {
    
    //线程池,用于执行清理空闲连接
    private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
      Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
      new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
    //最大的空闲socket连接数
    private final int maxIdleConnections;
    //socket的keepAlive时间
    private final long keepAliveDurationNs;
    
    private final Deque<RealConnection> connections = new ArrayDeque<>();
    final RouteDatabase routeDatabase = new RouteDatabase();
    boolean cleanupRunning;
}

ConnectionPool里的几个重要变量:

(1)executor线程池,类似于CachedThreadPool,用于执行清理空闲连接的任务。

(2)Deque双向队列,同时具有队列和栈的性质,经常在缓存中被使用,里面维护的RealConnection是socket物理连接的包装

(3)RouteDatabase,用来记录连接失败的路线名单

下面看看ConnectionPool的构造函数

public ConnectionPool() {
    this(5, 5, TimeUnit.MINUTES);
}

public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
    this.maxIdleConnections = maxIdleConnections;
    this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);

    // Put a floor on the keep alive duration, otherwise cleanup will spin loop.
    if (keepAliveDuration <= 0) {
      throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
    }
}

从构造函数中可以看出,ConnectionPool的默认空闲连接数为5个,keepAlive时间为5分钟。ConnectionPool是什么时候被创建的呢?是在OkHttpClient的builder中:

public static final class Builder {
    ...
    ConnectionPool connectionPool;
    ...
    public Builder() {
        ...
        connectionPool = new ConnectionPool();
        ...
    }
    
    //我们也可以定制连接池
    public Builder connectionPool(ConnectionPool connectionPool) {
        if (connectionPool == null) throw new NullPointerException("connectionPool == null");
        this.connectionPool = connectionPool;
        return this;
    }
}

缓存操作:添加、获取、回收连接

(1)从缓存中获取连接

//ConnectionPool.class
@Nullable 
RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
      if (connection.isEligible(address, route)) {
        streamAllocation.acquire(connection, true);
        return connection;
      }
    }
    return null;
}

获取连接的逻辑比较简单,就遍历连接池里的连接connections,然后用RealConnection的isEligible方法找到符合条件的连接,如果有符合条件的连接则复用。需要注意的是,这里还调用了streamAllocation的acquire方法。acquire方法的作用是对RealConnection引用的streamAllocation进行计数,OkHttp3是通过RealConnection的StreamAllocation的引用计数是否为0来实现自动回收连接的。

//StreamAllocation.class
public void acquire(RealConnection connection, boolean reportedAcquired) {
    assert (Thread.holdsLock(connectionPool));
    if (this.connection != null) throw new IllegalStateException();

    this.connection = connection;
    this.reportedAcquired = reportedAcquired;
    connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
}

public static final class StreamAllocationReference extends WeakReference<StreamAllocation> {

    public final Object callStackTrace;

    StreamAllocationReference(StreamAllocation referent, Object callStackTrace) {
      super(referent);
      this.callStackTrace = callStackTrace;
    }
}
//RealConnection.class
public final List<Reference<StreamAllocation>> allocations = new ArrayList<>();

每一个RealConnection中都有一个allocations变量,用于记录对于StreamAllocation的引用。StreamAllocation中包装有HttpCodec,而HttpCodec里面封装有Request和Response读写Socket的抽象。每一个请求Request通过Http来请求数据时都需要通过StreamAllocation来获取HttpCodec,从而读取响应结果,而每一个StreamAllocation都是和一个RealConnection绑定的,因为只有通过RealConnection才能建立socket连接。所以StreamAllocation可以说是RealConnection、HttpCodec和请求之间的桥梁。

当然同样的StreamAllocation还有一个release方法,用于移除计数,也就是将当前的StreamAllocation的引用从对应的RealConnection的引用列表中移除。

private void release(RealConnection connection) {
    for (int i = 0, size = connection.allocations.size(); i < size; i++) {
      Reference<StreamAllocation> reference = connection.allocations.get(i);
      if (reference.get() == this) {
        connection.allocations.remove(i);
        return;
      }
    }
    throw new IllegalStateException();
}

(2)向缓存中添加连接

//ConnectionPool.class
void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (!cleanupRunning) {
      cleanupRunning = true;
      executor.execute(cleanupRunnable);
    }
    connections.add(connection);
  }

添加连接之前会先调用线程池执行清理空闲连接的任务,也就是回收空闲的连接。

(3)空闲连接的回收

private final Runnable cleanupRunnable = new Runnable() {
    @Override public void run() {
      while (true) {
        long waitNanos = cleanup(System.nanoTime());
        if (waitNanos == -1) return;
        if (waitNanos > 0) {
          long waitMillis = waitNanos / 1000000L;
          waitNanos -= (waitMillis * 1000000L);
          synchronized (ConnectionPool.this) {
            try {
              ConnectionPool.this.wait(waitMillis, (int) waitNanos);
            } catch (InterruptedException ignored) {
            }
          }
        }
      }
    }
};

cleanupRunnable中执行清理任务是通过cleanup方法来完成,cleanup方法会返回下次需要清理的间隔时间,然后会调用wait方法释放锁和时间片。等时间到了就再次进行清理。下面看看具体的清理逻辑:

long cleanup(long now) {
    //记录活跃的连接数
    int inUseConnectionCount = 0;
    //记录空闲的连接数
    int idleConnectionCount = 0;
    //空闲时间最长的连接
    RealConnection longestIdleConnection = null;
    long longestIdleDurationNs = Long.MIN_VALUE;

    synchronized (this) {
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();

        //判断连接是否在使用,也就是通过StreamAllocation的引用计数来判断
        //返回值大于0说明正在被使用
        if (pruneAndGetAllocationCount(connection, now) > 0) {
            //活跃的连接数+1
            inUseConnectionCount++;
            continue;
        }
        //说明是空闲连接,所以空闲连接数+1
        idleConnectionCount++;

        //找出了空闲时间最长的连接,准备移除
        long idleDurationNs = now - connection.idleAtNanos;
        if (idleDurationNs > longestIdleDurationNs) {
          longestIdleDurationNs = idleDurationNs;
          longestIdleConnection = connection;
        }
      }

      if (longestIdleDurationNs >= this.keepAliveDurationNs
          || idleConnectionCount > this.maxIdleConnections) {
        //如果空闲时间最长的连接的空闲时间超过了5分钟
        //或是空闲的连接数超过了限制,就移除
        connections.remove(longestIdleConnection);
      } else if (idleConnectionCount > 0) {
        //如果存在空闲连接但是还没有超过5分钟
        //就返回剩下的时间,便于下次进行清理
        return keepAliveDurationNs - longestIdleDurationNs;
      } else if (inUseConnectionCount > 0) {
        //如果没有空闲的连接,那就等5分钟后再尝试清理
        return keepAliveDurationNs;
      } else {
        //当前没有任何连接,就返回-1,跳出循环
        cleanupRunning = false;
        return -1;
      }
    }

    closeQuietly(longestIdleConnection.socket());

    // Cleanup again immediately.
    return 0;
}

下面我们看看判断连接是否是活跃连接的pruneAndGetAllocationCount方法

private int pruneAndGetAllocationCount(RealConnection connection, long now) {
    List<Reference<StreamAllocation>> references = connection.allocations;
    for (int i = 0; i < references.size(); ) {
        Reference<StreamAllocation> reference = references.get(i);
    
        //如果存在引用,就说明是活跃连接,就继续看下一个StreamAllocation
        if (reference.get() != null) {
            i++;
            continue;
        }

      // We've discovered a leaked allocation. This is an application bug.
      //发现泄漏的引用,会打印日志
        StreamAllocation.StreamAllocationReference streamAllocRef =
            (StreamAllocation.StreamAllocationReference) reference;
        String message = "A connection to " + connection.route().address().url()
            + " was leaked. Did you forget to close a response body?";
        Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);
        
        //如果没有引用,就移除
        references.remove(i);
        connection.noNewStreams = true;

        //如果列表为空,就说明此连接上没有StreamAllocation引用了,就返回0,表示是空闲的连接
        if (references.isEmpty()) {
            connection.idleAtNanos = now - keepAliveDurationNs;
            return 0;
        }
    }
    //遍历结束后,返回引用的数量,说明当前连接是活跃连接
    return references.size();
}

至此我们就分析完OkHttp3的连接池复用了。

总结

(1)OkHttp3中支持5个并发socket连接,默认的keepAlive时间为5分钟,当然我们可以在构建OkHttpClient时设置不同的值。

(2)OkHttp3通过Deque<RealConnection>来存储连接,通过put、get等操作来管理连接。

(3)OkHttp3通过每个连接的引用计数对象StreamAllocation的计数来回收空闲的连接,向连接池添加新的连接时会触发执行清理空闲连接的任务。清理空闲连接的任务通过线程池来执行。

最后编辑于
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,172评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,346评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,788评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,299评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,409评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,467评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,476评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,262评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,699评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,994评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,167评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,827评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,499评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,149评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,387评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,028评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,055评论 2 352

推荐阅读更多精彩内容