一个socket长连接

长连接会存在以下两种情况:

  1. 一个客户端连接服务器以后,如果长期没有和服务器有数据来往,可能会被防火墙程序关闭连接,有时候我们并不想要被关闭连接。要求必须保持客户端与服务器之间的连接正常,就是我们通常所说的保活
  2. 通常情况下,服务器与某个客户端一般不是位于同一个网络,其之间可能经过数个路由器和交换机,如果其中某个必经路由器或者交换器出现了故障,并且一段时间内没有恢复,导致这之间的链路不再畅通,而此时服务器与客户端之间也没有数据进行交换,由于TCP连接是状态机,对于这种情况,无论是客户端或者服务器都无法感知与对方的连接是否正常,这类连接我们一般称之为死链

根据上面的分析,可以看到,心跳检测一般有两个作用:

  • 保活
  • 检测死链

TCP keepalive选项

熟悉socket编程的读者可能会熟悉下面的这个方法:

1
2
3
4
5
public void setKeepAlive(boolean on) throws SocketException {
if (isClosed())
throw new SocketException("Socket is closed");
getImpl().setOption(SocketOptions.SO_KEEPALIVE, Boolean.valueOf(on));
}

setKeepAlive字面意思是保持活着,这个方法也确实是提供用来保持一个TCP连接的,但是为什么我们在设计一个通信系统的时候往往不会直接使用这个方法呢,而是自己实现一个保活机制,不推荐使用SO_KEEPALIVE为什么呢?

我们通过了解TCPKeepAlive的原理,来找到这个问题的答案。

TCP内嵌有心跳包,以服务端为例,当server检测到超过一定时间(2小时)没有数据传输,那么会向client端发送一个keepalive packet,此时client端有三种反应:

  1. client端连接正常,返回一个ACK。server端收到ACK后重置计时器,在2小时后在发送探测。如果2小时内连接上有数据传输,那么在该时间的基础上向后推延2小时发送探测包
  2. 客户端异常关闭,或网络断开。client无响应,server收不到ACK,在一定时间(75秒)后重发keepalive packet, 并且重发一定次数(9次)
  3. 客户端曾经崩溃,但已经重启。server收到的探测响应是一个复位(reset),server端终止连接

注意:两个小时才会发一次。也就是说,在没有实际数据通信的时候,我把网线拔了,你的应用程序要经过两个小时才会知道。

应用层的心跳包机制设计

由于keepalive选项需要为每个连接中的socket开启,这不一定是必须的,可能会产生大量无意义的带宽浪费,且keepalive选项不能与应用层很好地交互,因此一般实际的服务开发中,还是建议读者在应用层设计自己的心跳包机制。那么如何设计呢?

假定现在有一对已经连接的socket,在以下情况发生时候,socket将不再可用:

  1. 某一端关闭是socket:主动关闭的一方会发送FIN,通知对方要关闭TCP连接。在这种情况下,另一端如果去读socket,将会读到EoF(End of File),于是我们知道对方关闭了socket
  2. 应用程序奔溃:此时socket会由内核关闭,结果跟情况1一样
  3. 系统奔溃:这时候系统是来不及发送FIN的,因为它已经跪了。此时对方无法得知这一情况。对方在尝试读取数据时,最后会返回read time out;如果写数据,则是host unreachable之类的错误(如果没有对socket进行读写,两边都不知道发生了事故)
  4. 电缆被挖断、网线被拔:跟情况3差不多,如果没有对socket进行读写,两边都不知道发生了事故。跟情况3不同的是,如果我们把网线接回去,socket依旧可以正常使用

在上面的几种情形中,有一个共同点就是,只要去读、写socket,只要socket连接不正常,我们就能够知道。基于这一点,要实现一个socket长连接,我们需要做的就是不断地给对方写数据,然后读取对方的数据,也就是所谓的心跳。只要心还在跳,socket就是活的。写数据的间隔,需要根据实际的应用需求来决定。

心跳包不是实际的业务数据,根据通信协议的不同,需要做不同的处理。比方说,我们使用JSON进行通信,那么,可以为协议包加一个type字段,表面这个JSON是心跳还是业务数据:

1
2
3
4
{
"msgType": 0, // 0 表示心跳 // 1 表示真实的通信数据
// ...
}

需要注意的是:一般是客户端主动给服务器端发送心跳包,服务器端做心跳检测决定是否断开连接。而不是反过来,从客户端的角度来说,客户端为了让自己得到服务器端的正常服务有必要主动和服务器保持连接状态正常,而服务器端不会局限于某个特定的客户端,如果客户端不能主动和其保持连接,那么就会主动回收与该客户端的连接。当然,服务器端在收到客户端的心跳包时应该给客户端一个心跳应答。

例子

我们看一个LongLiveSocket类,这个类就是长连接保活的类。也是实现长连接的一个核心类。我们就来看看这个类的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private final Runnable mHeartBeatTask = new Runnable() {
private byte[] mHeartBeat = new byte[0];

@Override
public void run() {
++mSeqNumHeartBeatSent;
// 我们使用长度为 0 的数据作为 heart beat
write(mHeartBeat, new WritingCallback() {
@Override
public void onSuccess() {
// 每隔 HEART_BEAT_INTERVAL_MILLIS 发送一次
mWriterHandler.postDelayed(mHeartBeatTask, HEART_BEAT_INTERVAL_MILLIS);
// At this point, the heart-beat might be received and handled
if (mSeqNumHeartBeatRecv < mSeqNumHeartBeatSent) {
mUIHandler.postDelayed(mHeartBeatTimeoutTask, HEART_BEAT_TIMEOUT_MILLIS);
// double check
if (mSeqNumHeartBeatRecv == mSeqNumHeartBeatSent) {
mUIHandler.removeCallbacks(mHeartBeatTimeoutTask);
}
}
}

@Override
public void onFail(byte[] data, int offset, int len) {
// nop
// write() 方法会处理失败
}
});
}
};

private final Runnable mHeartBeatTimeoutTask = () -> {
Log.e(TAG, "mHeartBeatTimeoutTask#run: heart beat timeout");
closeSocket();
};

可以看出来这个发送心跳频率包的核心方法的实现是:

  1. 有两个心跳包计数器,mSeqNumHeartBeatSent发送心跳包计数器,每次发送一个心跳包,mSeqNumHeartBeatSent+1 。mSeqNumHeartBeatRecv接收的心跳包计数器,每次接收客户端发来的心跳包mSeqNumHeartBeatRecv+1
  2. 这个心跳包的大小是0个字节,rivate byte[] mHeartBeat = new byte[0]
  3. 如果mSeqNumHeartBeatRecv < mSeqNumHeartBeatSent则认为对端断开连接,关闭socket

客户端client代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

public class EchoClient {
private static final String TAG = "EchoClient";

private final LongLiveSocket mLongLiveSocket;

public EchoClient(String host, int port) {
mLongLiveSocket = new LongLiveSocket(
host, port,
// 回调函数
(data, offset, len) -> Log.i(TAG, "EchoClient: received: " + new String(data, offset, len)),
// 发生错误的回调函数,返回 true,所以只要出错,就会一直重连
() -> true);
}

public void send(String msg) {
mLongLiveSocket.write(msg.getBytes(), new LongLiveSocket.WritingCallback() {
@Override
public void onSuccess() {
Log.d(TAG, "onSuccess: ");
}

@Override
public void onFail(byte[] data, int offset, int len) {
Log.w(TAG, "onFail: fail to write: " + new String(data, offset, len));
// 连接成功后,还会发送这个消息
mLongLiveSocket.write(data, offset, len, this);
}
});
}
}

就这样,一个带 socket 长连接的客户端就完成了。剩余代码跟我们这里的主题没有太大关系。

输出如下:

1
2
3
4
5
6
7
8
9
10
11
03:54:55.583 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received
03:55:00.588 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received
03:55:05.594 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received
03:55:09.638 12691-12710/com.example.echo D/EchoClient: onSuccess:
03:55:09.639 12691-12713/com.example.echo I/EchoClient: EchoClient: received: hello
03:55:10.595 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received
03:55:14.652 12691-12710/com.example.echo D/EchoClient: onSuccess:
03:55:14.654 12691-12713/com.example.echo I/EchoClient: EchoClient: received: echo
03:55:15.596 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received
03:55:20.597 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received
03:55:25.602 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received

巨人的肩膀: