Jaqen's Blog

OkHttp 源码分析(三):连接机制

字数统计: 2.1k阅读时长: 10 min
2019/03/27 Share

前面两篇文章分别介绍了 OkHttp 的请求流程和缓存机制,最后这篇文章介绍 OkHttp 的连接机制,作为 OkHttp 源码分析的收尾。

建议将 OkHttp 的源码下载下来,使用 IDEA 编辑器可以直接打开阅读。我这边也将最新版的源码下载下来,进行了注释说明,有需要的可以直接从 这里 下载查看。

创建连接

OkHttp 连接的创建是通过 StreamAllocation 对象统筹完成。

它主要用来管理两个角色:

  • RealConnection:真正建立连接的对象,利用 Socket 建立连接。
  • ConnectionPool:连接池,用来管理和复用连接。

StreamAllocation 是在 RetryAndFollowUpInterceptor 中被创建,此时并未发起连接。

1
2
3
4
// RetryAndFollowUpInterceptor .intercept()

StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(request.url()), call, eventListener, callStackTrace);

真正的连接是在处理完 Header 和缓存之后,调用 ConnectInterceptor 进行的。

1
2
3
4
5
6
7
8
// ConnectInterceptor.intercept()

StreamAllocation streamAllocation = realChain.streamAllocation();

// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();

这里创建了两个对象:

  • HttpCodec:用来编码 http request 和解码 http response
  • RealConnection:上文介绍了。

调用 streamAllocation 的 newStream 方法经过一系列判断最终会走到 findConnection 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
boolean foundPooledConnection = false;
RealConnection result = null;
Route selectedRoute = null;
Connection releasedConnection;
Socket toClose;
synchronized (connectionPool) {
if (released) throw new IllegalStateException("released");
if (codec != null) throw new IllegalStateException("codec != null");
if (canceled) throw new IOException("Canceled");


// 1、尝试使用已分配的连接
releasedConnection = this.connection;
toClose = releaseIfNoNewStreams();
if (this.connection != null) {
// 当前连接可用.
result = this.connection;
releasedConnection = null;
}
if (!reportedAcquired) {
// If the connection was never reported acquired, don't report it as released!
releasedConnection = null;
}

// 2、尝试从连接池中获取一个连接
if (result == null) {
// Attempt to get a connection from the pool.
Internal.instance.acquire(connectionPool, address, this, null);
if (connection != null) {
foundPooledConnection = true;
result = connection;
} else {
selectedRoute = route;
}
}
}
closeQuietly(toClose);

if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection);
}
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
}
if (result != null) {
// 如果从连接池中获取到了一个连接,就将其返回.
return result;
}

// If we need a route selection, make one. This is a blocking operation.
boolean newRouteSelection = false;
if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
newRouteSelection = true;
routeSelection = routeSelector.next();
}


synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");

if (newRouteSelection) {
// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. This could match due to connection coalescing.
// 根据一系列的 IP地址从连接池中获取一个链接
List<Route> routes = routeSelection.getAll();
for (int i = 0, size = routes.size(); i < size; i++) {
Route route = routes.get(i);
// 从连接池中获取一个连接
Internal.instance.acquire(connectionPool, address, this, route);
if (connection != null) {
foundPooledConnection = true;
result = connection;
this.route = route;
break;
}
}
}

// 3、如果连接池中没有可用连接,则创建一个
if (!foundPooledConnection) {
if (selectedRoute == null) {
selectedRoute = routeSelection.next();
}

// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
route = selectedRoute;
refusedStreamCount = 0;
result = new RealConnection(connectionPool, selectedRoute);
acquire(result, false);
}
}

// If we found a pooled connection on the 2nd time around, we're done.
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
return result;
}

//4、 开始TCP以及TLS握手操作
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
connectionRetryEnabled, call, eventListener);
routeDatabase().connected(result.route());

Socket socket = null;
synchronized (connectionPool) {
reportedAcquired = true;

// 5、将新创建的连接,放在连接池中.
Internal.instance.put(connectionPool, result);

// If another multiplexed connection to the same address was created concurrently, then
// release this connection and acquire that one.
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
closeQuietly(socket);

eventListener.connectionAcquired(call, result);
return result;
}

整个流程是:

  • 1、判断当前的连接是否可以使用:输入输出流没有关闭,Socket 未关闭等
  • 2、如果当前连接不可用,尝试从连接池中获取一个可用连接
  • 3、如果连接池中没有可用连接,则创建一个连接
  • 4、开始 TCP 连接以及 TLS 握手操作
  • 5、将新创建的连接加入到连接池中

连接池

网络请求时频繁地进行 Socket 连接和断开 Socket 非常消耗网络资源和浪费时间,连接复用可以提升网络访问的效率。这里就引入了连接池的概念。

OKHttp 的连接池由 ConnectionPool 实现。

ConnetionPool 内部维护了一个线程池,负责清理无效的连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public final class ConnectionPool {
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS, new SynchronousQueue<>(), Util.threadFactory("OkHttp ConnectionPool", true));

void put(RealConnection connection) {
assert (Thread.holdsLock(this));

if (!cleanupRunning) {
cleanupRunning = true;
// 使用线程池执行清理任务
executor.execute(cleanupRunnable);
}
// 将新建的连接插入到双端队列中
connections.add(connection);
}

private final Runnable cleanupRunnable = () -> {
while (true) {
// 清理操作,返回下次需要清理的时间
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
};
}

ConnectionPool 维护一个线程池用于清理无效的连接,清理任务由 cleanup方法完成,首先执行清理,返回下次需要清理的间隔时间,然后调用 wait 方法释放锁。等到了时间,再次进行清理操作,返回下一次清理的时间,循环往复下去。

具体看一下 cleanup 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
long cleanup(long now) {
int inUseConnectionCount = 0;
int idleConnectionCount = 0;
RealConnection longestIdleConnection = null;
long longestIdleDurationNs = Long.MIN_VALUE;

// Find either a connection to evict, or the time that the next eviction is due.
synchronized (this) {
// 遍历所有的连接
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();

// 1、连接正在使用,即StreanAllocation的引用数量大于0
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}

idleConnectionCount++;

// If the connection is ready to be evicted, we're done.
// 2、如果找到了一个可以被清理的连接,会尝试去寻找闲置时间最久的连接来释放
long idleDurationNs = now - connection.idleAtNanos;
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}

// maxIdleConnections 表示最大允许的闲置的连接的数量,keepAliveDurationNs表示连接允许存活的最长的时间。
// 默认空闲连接最大数目为5个,keepalive 时间最长为5分钟
// 3、如果空闲连接超过5个或者keepalive时间大于5分钟,则将该连接清理
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
// 4、闲置的连接的数量大于0,停顿指定的时间(等会儿会将其清理掉,现在还不是时候)
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
// All connections are in use. It'll be at least the keep alive duration 'til we run again.
///5、所有的连接都在使用中,5分钟后再清理
return keepAliveDurationNs;
} else {
// No connections, idle or in use.
//6、没有连接
cleanupRunning = false;
return -1;
}
}

closeQuietly(longestIdleConnection.socket());

// Cleanup again immediately.
return 0;
}

整体流程如下:

  • 1、遍历所有连接,查询每个连接的内部的 StreamAllocation 的引用数量,如果大于 0,表示连接正在使用,无需清理,执行下一次循环。
  • 2、如果找到了一个可以被清理的连接,会尝试去寻找闲置时间最久的连接来释放。
  • 3、如果空闲连接超过 5 个或者 keepalive 时间大于 5 分钟,则将该连接清理。
  • 4、闲置的连接的数量大于 0,返回该连接的到期时间(等会儿会将其清理掉,现在还不是时候)。
  • 5、全部都是活跃连接,5 分钟后再进行清理。
  • 6、没有任何连接,跳出循环。

RealConnection 内有一个 SteamAllocation 虚引用列表,每次创建的 StreamAllocation,都会被添加到这个列表中,如果流关闭后就将 SteamAllocation 对象从该列表中移出去,也正是利用这种计数方式判定一个连接是否为空闲连接。

查询引用计数是在 pruneAndGetAllocationCount 方法中实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private int pruneAndGetAllocationCount(RealConnection connection, long now) {
// 虚引用列表
List<Reference<StreamAllocation>> references = connection.allocations;
// 遍历虚引用列表
for (int i = 0; i < references.size(); ) {
Reference<StreamAllocation> reference = references.get(i);
//如果虚引用StreamAllocation正在被使用,则跳过进行下一次循环
if (reference.get() != null) {
i++;
continue;
}

// We've discovered a leaked allocation. This is an application bug.
StreamAllocation.StreamAllocationReference streamAllocRef =
(StreamAllocation.StreamAllocationReference) reference;
String message = "A connection to " + connection.route().address().url()
+ " was leaked. Did you forget to close a response body?";
Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);

// 移除引用
references.remove(i);
connection.noNewStreams = true;

// If this was the last allocation, the connection is eligible for immediate eviction.
if (references.isEmpty()) {
connection.idleAtNanos = now - keepAliveDurationNs;
return 0;
}
}

return references.size();
}

参考

CATALOG
  1. 1. 创建连接
  2. 2. 连接池
  3. 3. 参考