挖掘网络库中冷门而有趣的小知识

在维护网络库时,总能遇到一些没太大用处,但是很有意思的小知识,细细碎碎又不成体系,记录一下

异步的 epoll 使用

  • 2015.5.22 整理:

    epoll 下 LT 和 ET 的处理都是大致相同的

    • LT 模式

      读 buff 有数据 / 写 buff 有空间,就触发

    • ET 模式

      读 buff 有数据,且数据减少或调用 epoll_mod 时 / 写 buff 空间增加或调用 epoll_mod 时,才触发

    LT 模式例子:

    https://www.cnblogs.com/lojunren/p/3856290.html

    https://github.com/hurley25/ANet

    https://juejin.im/post/5ab3c5acf265da2380598efa

    https://www.zhihu.com/question/22840801

    https://blog.codingnow.com/2012/04/mread.html

    在 ET 模式中,需要主动把数据读完或者写满:

    • 读处理是一直 read

      返回 - 1,检查 errno,如果是 EAGAIN 那么不再读(缓冲区读完),如果是其他那么说明连接出错,进行报错然后也不再读。

      返回 0,说明对端关闭

      返回大于 0,成功读到数据

    • 写处理是一直 write,直到数据写完

      返回 - 1,检查 errno,如果是 EAGAIN 那么不再写(缓冲区写完),如果是其他那么说明连接出错,进行报错然后也不再写。

      返回大于 0,成功写数据

在使用 tcp 时,内核的 tcp 上存在读写缓冲区,上层 app 通过这个缓冲区来和实际的网络进行通信

app <=> 内核tcp <=> network

当读缓冲区有数据时,epoll 就会通知 READ 就绪事件,让上层 app 去读,当应用一直不去读,就会导致接收窗口为 0,通知发送方不要再发送了

当写缓冲区空闲时,epoll 就会通知 WRITE 就绪事件,让上层 app 去写,当写满缓冲区,存在几个可能:

  • 缓冲区不够大
  • 本地拥塞窗口限制
  • 就是刚才说的,对端应用层读太慢,接收窗口限制

因此当建立一条链接后,写事件总是就绪的,可以直接写入

tcp read < 0 处理细节

在 muduo 库中,< 0 直接被无视,根据 https://github.com/chenshuo/muduo/issues/314 的回答

read () 如果由于对方 RST 而返回 -1,那么这个 fd 会保持 readable 状态,下一次 read () 会返回 0,然后就走正常关闭连接的流程了。

但是在其他库中 < 0 并且排除掉 EAGAIN 会直接关闭描述符

因此我认为 muduo 库应该是偷懒了,确实这种写法会简洁一些,但是性能也变差了

close fd 时 epoll 是否需要 remove

根据 Is it necessary to deregister a socket from epoll before closing it?

中提到 man epoll 中 Q6 写到

Q6 Will closing a file descriptor cause it to be removed from all epoll sets automatically?

Q6 关闭文件描述符会自动从所有 epoll 集中移除吗?

A6 Yes, but be aware of the following point. A file descriptor is a reference to an open file description (see open(2)). Whenever a descriptor is duplicated via dup(2), dup2(2), fcntl(2) F_DUPFD, or fork(2), a new file descriptor referring to the same open file description is created. An open file description continues to exist until all file descriptors referring to it have been closed. A file descriptor is removed from an epoll set only after all the file descriptors referring to the underlying open file description have been closed (or before if the descriptor is explicitly removed using epoll_ctl(2) EPOLL_CTL_DEL). This means that even after a file descriptor that is part of an epoll set has been closed, events may be reported for that file descriptor if other file descriptors referring to the same underlying file description remain open.

A6 会,但是需要注意以下几点。文件描述符是指向打开的文件描述(参见 open (2))的引用。每当通过 dup (2)、dup2 (2)、fcntl (2) F_DUPFD 或 fork (2) 复制描述符时,会创建一个新的文件描述符,指向同一个打开的文件描述。打开的文件描述将继续存在,直到指向它的所有文件描述符都被关闭。只有在所有关联到底层打开文件描述的文件描述符被关闭(或者在这之前明确地使用 epoll_ctl (2) EPOLL_CTL_DEL 移除)后,文件描述符才会从 epoll 集中移除。这意味着即使 epoll 集中的某个文件描述符已经被关闭,只要仍有其他文件描述符指向同一个底层的文件描述并保持打开,那么可能仍会报告该文件描述符的事件。

TcpConnection 为什么要用智能指针管理

muduo 库的书中 4.7 节提到

在非阻塞网络编程中,我们常常要面临这样一种场景:从某个 TCP 连接 A 收到了一个 request,程序开始处理这个 request; 处理可能要花一定的时间,为了避免耽误(阻塞)处理其他 request,程序记住了发来 request 的 TCP 连接,在某个线程池中处理这个请求;在处理完之后,会把 response 发回 TCP 连接 A。但是,在处理 request 的过程中,客户端断开了 TCP 连接 A,而另一个客户端刚好创建了新连接 B。我们的程序不能只记住 TCP 连接 A 的文件描述符,而应该持有封装 socket 连接的 TcpConnection 对象,保证在处理 request 期间 TCP 连接 A 的文件描述符不会被关闭。或者持有 TcpConnection 对象的弱引用 (weak_ptr),这样能知道 socket 连接在处理 request 期间是否已经关闭了,fd=8 的文件描述符到底是 “前世” 还是 “今生”。

否则的话,l 旧的 TCP 连接 A 一断开,TcpConnection 对象销毁,关闭了旧的文件描述符(RAI),而且新连接 B 的 socket 文件描述符有可能等于之前断开的 TCP 连接(这是完全可能的,POSIX 要求每次新建文件描述符时选取当前最小的可用的整数)。当程序处理完旧连接的 request 时,就有可能把 response ' 发给新的 TCP 连接 B,造成串话。

为了应对这种情况,防止访问失效的对象或者发生网络串话,muduo 使用 shared_ptr 来管理 TcpConnection 的生命期。这是唯一一个采用引用计数方式管理生命期的对象。如果不用 shared_ptr,我想不出其他安全且高效的办法来管理多线程网络服务端程序中的并发连接。

书中提到的例子是 TcpConnection 被转发到了一个业务线程池中去完成,因此引入了多线程问题,需要用智能指针来解决生命周期问题,这个概念其实不好理解,muduo 库书中的实现本身是单线程的

我在引入协程的时候,发现用协程视角去解读更好

1
2
3
4
5
6
7
CoTcpConnection::Read(...) {
auto curCoro = CoroutineScheduler::currentCoroScheduler()->currentCoro();
connection_->AsyncRead(len, [curCoro](...) {
CoroutineScheduler::currentCoroScheduler()->resume(curCoro);
});
CoroutineScheduler::currentCoroScheduler()->suspend();
}

从协程角度来看,在主协程 AsyncRead 读到 < 0 的时候,就会要求 Server 或者 Client 删除持有的 TcpConnection

但是这个时候另外一个协程还持有了 TcpConnection,如果 Server 和 Client 不使用智能指针直接析构,会导致其他协程 core 掉

为什么 Muduo TcpConnection 没有提供 close,而只提供 shutdown

这么做是为了收发数据的完整性。

TCP 是一个全双工协议,同一个文件描述符既可读又可写, shutdownWrite () 关闭了 “写” 方向的连接,保留了 “读” 方向,这称为 TCP half-close。如果直接 close (socket_fd),那么 socket_fd 就不能读或写了。

用 shutdown 而不用 close 的效果是,如果对方已经发送了数据,这些数据还 “在路上”,那么 muduo 不会漏收这些数据。换句话说,muduo 在 TCP 这一层面解决了 “当你打算关闭网络连接的时候,如何得知对方有没有发了一些数据而你还没有收到?” 这一问题。当然,这个问题也可以在上面的协议层解决,双方商量好不再互发数据,就可以直接断开连接。

等于说 muduo 把 “主动关闭连接” 这件事情分成两步来做,如果要主动关闭连接,它会先关本地 “写” 端,等对方关闭之后,再关本地 “读” 端。练习:阅读代码,回答 “如果被动关闭连接,muduo 的行为如何?” 提示:muduo 在 read () 返回 0 的时候会回调 connection callback,这样客户代码就知道对方断开连接了。

Muduo 这种关闭连接的方式对对方也有要求,那就是对方 read () 到 0 字节之后会主动关闭连接(无论 shutdownWrite () 还是 close ()),一般的网络程序都会这样,不是什么问题。当然,这么做有一个潜在的安全漏洞,万一对方故意不不关,那么 muduo 的连接就一直半开着,消耗系统资源。

完整的流程是:我们发完了数据,于是 shutdownWrite,发送 TCP FIN 分节,对方会读到 0 字节,然后对方通常会关闭连接,这样 muduo 会读到 0 字节,然后 muduo 关闭连接。(思考题,在 shutdown () 之后,muduo 回调 connection callback 的时间间隔大约是一个 round-trip time,为什么?)

另外,如果有必要,对方可以在 read () 返回 0 之后继续发送数据,这是直接利用了 half-close TCP 连接。muduo 会收到这些数据,通过 message callback 通知客户代码。

那么 muduo 什么时候真正 close socket 呢?在 TcpConnection 对象析构的时候。TcpConnection 持有一个 Socket 对象,Socket 是一个 RAII handler,它的析构函数会 close (sockfd_)。这样,如果发生 TcpConnection 对象泄漏,那么我们从 /proc/pid/fd/ 就能找到没有关闭的文件描述符,便于查错。

muduo 在 read () 返回 0 的时候会回调 connection callback,然后把 TcpConnection 的引用计数减一,如果 TcpConnection 的引用计数降到零,它就会析构了。

一般的服务不会这么实现,是因为协议上已经约定了 client 在 write,read 后 close 的时机

另外一个原因是,在 rpc 服务来说,会复用 tcp 连接,无序的传输多个 rpc 请求的包,这种情况下一般不需要关闭连接,除非出错了需要整个 close 连接

没有 Connect 或者 Bind 的 socket,可以触发 epoll_wait

在分析 tars 源码的时候,发现在通知网络线程的时候,对没有实际连接到任何端点的描述符,有时候是 EPOLLIN,有时候是 EPOLLOUT,然后不需要读写。这样居然能唤醒网络线程?

这就超出我知识盲区了,我在自己的 rpc 框架中写过类似代码,但是是把管道加入了 epoll 中,对其读写来唤醒的

根据 tc_epoller.h 写了一下测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#include <sys/types.h>
#include <sys/socket.h>
#include <thread>
#include <unistd.h>
#include <iostream>

#include "tc_epoller.h"

using namespace std;

int main() {
tars::TC_Epoller ep;
ep.create(1000);

auto sock = socket(AF_INET, SOCK_STREAM, 0);

std::thread t([&]() {
int num = ep.wait(-1);
for (int i = 0; i < num; ++i) {
const epoll_event& ev = ep.get(i);
cout << ev.data.u64 << endl;
cout << ev.events << endl;
}
});

sleep(1);

ep.add(sock, 100, EPOLLIN);

t.join();
}

编译运行

1
2
3
4
root:~/tars/epoll_test# g++ -std=c++11 -g -Wall main.cpp tc_epoller.cpp -o out.exe -lpthread
root:~/tars/epoll_test# ./out.exe
100
16

确实被唤醒了,读到了写入的数据 100,唤醒的事件是 16,也就是 EPOLLHUP。

查了一下资料,即使对于阻塞模式下也确实如此 Why am I getting the EPOLLHUP event on a brand new socket

See my other comment with the pastebin code. An uninitialized (i.e. before connect / listen) socket always seems to cause EPOLLHUP (at least when in blocking mode)

嗯,又学到了一手

非阻塞 Connect 必须通过 getsockopt 来检查是否连接成功吗

一些知名的开源库,客户端在 connect () 后总是通过 getsockopt () 来检查是否连接成功

例如 redis

src/socket.c#L104 入口是为了连接在 connSocketConnect 为 ae_handler 回调创建 epoll 事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static int connSocketConnect(connection *conn, const char *addr, int port, const char *src_addr,
ConnectionCallbackFunc connect_handler) {
int fd = anetTcpNonBlockBestEffortBindConnect(NULL,addr,port,src_addr);
if (fd == -1) {
conn->state = CONN_STATE_ERROR;
conn->last_errno = errno;
return C_ERR;
}

conn->fd = fd;
conn->state = CONN_STATE_CONNECTING;

conn->conn_handler = connect_handler;
aeCreateFileEvent(server.el, conn->fd, AE_WRITABLE,
conn->type->ae_handler, conn);

return C_OK;
}

src/socket.c#L402 ae_handler 回调会设置成 connSocketEventHandler

1
.ae_handler = connSocketEventHandler,

src/socket.c#L257 在 epoll 触发 EPOLLOUT 时,通过 anetGetError 检查连接状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask)
{
UNUSED(el);
UNUSED(fd);
connection *conn = clientData;

if (conn->state == CONN_STATE_CONNECTING &&
(mask & AE_WRITABLE) && conn->conn_handler) {

int conn_error = anetGetError(conn->fd);
if (conn_error) {
conn->last_errno = conn_error;
conn->state = CONN_STATE_ERROR;
} else {
conn->state = CONN_STATE_CONNECTED;
}

src/anet.c#L65

1
2
3
4
5
6
7
8
int anetGetError(int fd) {
int sockerr = 0;
socklen_t errlen = sizeof(sockerr);

if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)
sockerr = errno;
return sockerr;
}

例如 tars

在 epoll_wait () 中触发 EPOLLOUT,这个事件分配给 handleOutputImp (),在这里发送请求 doRequest (),在正式发送请求之前会通过 checkConnect () 来检查是否连接上

util/src/tc_transceiver.cpp#L286

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void TC_Transceiver::checkConnect()
{
//检查连接是否有错误
if (isConnecting())
{
int iVal = 0;
SOCKET_LEN_TYPE iLen = static_cast<SOCKET_LEN_TYPE>(sizeof(int));
int ret = ::getsockopt(_fd, SOL_SOCKET, SO_ERROR, reinterpret_cast<char*>(&iVal), &iLen);

if (ret < 0 || iVal)
{
string err = TC_Exception::parseError(iVal);
THROW_ERROR(TC_Transceiver_Exception, CR_Connect, "connect " + _desc + " error:" + err);
}

例如 nginx

ngx_http_upstream_test_connectngx_http_upstream_send_requestngx_http_upstream_send_request_handlerepollngx_event_connect_peerngx_http_upstream_connectngx_http_upstream_test_connectngx_http_upstream_send_requestngx_http_upstream_send_request_handlerepollngx_event_connect_peerngx_http_upstream_connectloop[epoll事件循环]alt[rc == NGX_ERROR][rc == NGX_OK,一次就connect成功][rc == NGX_AGAIN]rc = ngx_event_connect_peer(&u->peer)1s = ngx_socket(pc->sockaddr->sa_family, type, 0)bind(s, pc->local->sockaddr, pc->local->socklen)rc = connect(s, pc->sockaddr, pc->socklen)2return rc3u->write_event_handler = ngx_http_upstream_send_request_handler4失败,直接返回NGX_HTTP_INTERNAL_SERVER_ERRORngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR)5ngx_http_upstream_send_request(r, u, 1)6ngx_http_upstream_test_connect(c)7getsockopt(c->fd, SOL_SOCKET, SO_ERROR, (void *) &err, &len)8return9return10加入超时等待epoll触发ngx_add_timer(c->write, u->conf->connect_timeout)11触发写事件12ngx_http_upstream_send_request(r, u, 1)13ngx_http_upstream_test_connect(c)14getsockopt(c->fd, SOL_SOCKET, SO_ERROR, (void *) &err, &len)15return16return17
View on mermaid.live

src/http/ngx_http_upstream.c#L1508

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static void
ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
...
//connect封装
rc = ngx_event_connect_peer(&u->peer);
...
//设置epoll写事件回调
u->write_event_handler = ngx_http_upstream_send_request_handler;
u->read_event_handler = ngx_http_upstream_process_header;
...
//如果写失败,直接返回NGX_HTTP_INTERNAL_SERVER_ERROR
if (rc == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
...
//如果返回重试,那么进入epoll等待触发读写回调
if (rc == NGX_AGAIN) {
ngx_add_timer(c->write, u->conf->connect_timeout);
return;
}
...
//如果返回成功,进入ngx_http_upstream_send_request下一流程
ngx_http_upstream_send_request(r, u, 1);
}

src/event/ngx_event_connect.c#L21

1
2
3
4
5
6
7
8
9
10
11
12
ngx_int_t
ngx_event_connect_peer(ngx_peer_connection_t *pc)
{
s = ngx_socket(pc->sockaddr->sa_family, type, 0);
//绑定本地配置地址
if (bind(s, pc->local->sockaddr, pc->local->socklen) == -1) {
ngx_log_error(NGX_LOG_CRIT, pc->log, ngx_socket_errno,
"bind(%V) failed", &pc->local->name);
goto failed;
}
rc = connect(s, pc->sockaddr, pc->socklen);
}

src/http/ngx_http_upstream.c#L2292

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static void
ngx_http_upstream_send_request_handler(ngx_http_request_t *r,
ngx_http_upstream_t *u)
{
ngx_connection_t *c;

c = u->peer.connection;

ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http upstream send request handler");

if (c->write->timedout) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_TIMEOUT);
return;
}

if (u->header_sent && !u->conf->preserve_output) {
u->write_event_handler = ngx_http_upstream_dummy_handler;

(void) ngx_handle_write_event(c->write, 0);

return;
}

ngx_http_upstream_send_request(r, u, 1);
}

src/http/ngx_http_upstream.c#L2059

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
static void
ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u,
ngx_uint_t do_write)
{
ngx_int_t rc;
ngx_connection_t *c;

c = u->peer.connection;

ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http upstream send request");

//统计连接时间
if (u->state->connect_time == (ngx_msec_t) -1) {
u->state->connect_time = ngx_current_msec - u->start_time;
}

//如果没有发送成功过request(request_header + request_body)
//那么通过ngx_http_upstream_test_connect检查连接
if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
return;
}

c->log->action = "sending request to upstream";

rc = ngx_http_upstream_send_request_body(r, u, do_write);

...

src/http/ngx_http_upstream.c#L2743

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
static ngx_int_t
ngx_http_upstream_test_connect(ngx_connection_t *c)
{
int err;
socklen_t len;

{
err = 0;
len = sizeof(int);

/*
* BSDs and Linux return 0 and set a pending error in err
* Solaris returns -1 and sets errno
*/

if (getsockopt(c->fd, SOL_SOCKET, SO_ERROR, (void *) &err, &len)
== -1)
{
err = ngx_socket_errno;
}

if (err) {
c->log->action = "connecting to upstream";
(void) ngx_connection_error(c, err, "connect() failed");
return NGX_ERROR;
}
}

return NGX_OK;
}

测试 demo

由于开源项目都是这么实现的,所以我认为 getsockopt () 是必须项目

在没有 connect () 成功时,有可能产生 EPOLLOUT 的事件,需要通过 getsockopt () 检查一下有没有连接成功,才能 write,否则直接 write 会导致失败

但是同事写的一个简单的 http 客户端,connect 成功以后没有 getsockopt () 检查,也运行的很正常

因此我写了一个 demo 测试一下

python_http_server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from http.server import BaseHTTPRequestHandler, HTTPServer

class MyHTTPRequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.send_header('Content-type','text/html')
self.end_headers()
self.wfile.write(bytes("Hello", "utf8"))
return

def run():
server_address = ('', 8000)
httpd = HTTPServer(server_address, MyHTTPRequestHandler)
print('Starting http server...')
httpd.serve_forever()

run()

8000 端口添加 1200ms 延迟

1
2
3
4
5
6
7
8
9
10
11
12
13
#	在lo设备上添加了一个新的队列规则(qdisc),类型为prio。prio类型的队列规则创建了一个具有3个带宽队列的优先级队列规则
# root handle 1:指定这个队列规则是该设备的根队列规则,并且分配给它一个handle编号为1
~# tc qdisc add dev lo root handle 1: prio

# 创建的prio队列规则下添加了一个过滤器,它使用u32选择器匹配目标端口为8000的IP数据包
# parent 1:0说明这个过滤器附加于handle 1:队列规则上
# match ip dport 8000 0xffff指定匹配目标端口8000的包,0xffff是端口匹配的掩码
# flowid 2:1将匹配成功的数据包重定向到2:1这个handle的队列规则处理。
~# tc filter add dev lo parent 1:0 protocol ip prio 1 u32 match ip dport 8000 0xffff flowid 2:1

# 在prio队列规则(handle为1:1)的第1个带宽队列下创建了一个netem(网络模拟)队列规则(handle编号2:),并为其设置了1200毫秒的延迟
# netem用于模拟网络属性,如延迟、丢包、抖动等。在这个示例中,它将1200毫秒的延迟添加到所有被过滤器策略选择并且流向2:1队列的数据包上。
~# tc qdisc add dev lo parent 1:1 handle 2: netem delay 1200ms

基于 epoll 的简单 http 客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
//simple_epoll.cpp
#include <ostream>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <fcntl.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <errno.h>
#include <iostream>
#include <chrono>
#include <ctime>
#include <sstream>
#include <iomanip>
#include <thread>
#include <signal.h>

std::string getCurrentTimeString() {
using namespace std::chrono;

system_clock::time_point tp = system_clock::now();
std::time_t current_time = system_clock::to_time_t(tp);
milliseconds msec = duration_cast<milliseconds>(tp.time_since_epoch()) % 1000;
std::tm* t = std::localtime(&current_time);
std::stringstream ss;
ss << std::put_time(t, "%Y-%m-%d %H:%M:%S");
ss << '.' << msec.count();

return ss.str();
}

#define LOG std::cout << getCurrentTimeString() << " " << __FILE__ << ":" << __LINE__ << " "

class Connection {
public:
Connection() : fd_(socket(AF_INET, SOCK_STREAM, 0)) {
set_nonblock(fd_);
}
~Connection() {
close();
}
int fd() const { return fd_; }
std::string connect(const std::string& host, int port) {
auto [errMsg, addr] = parseAddr(host, port);
if (!errMsg.empty()) {
return errMsg;
}
if (::connect(fd_, &addr, sizeof(addr)) < 0 && errno != EINPROGRESS) {
return strerror(errno);
}
return "";
}
void close() {
if (fd_ != -1) {
::close(fd_);
fd_ = -1;
}
}
virtual void read() = 0;
virtual void write() = 0;
virtual void error(const std::string &errMsg) = 0;
std::string getErr() {
std::string errMsg;
int error;
socklen_t len = sizeof(error);
getsockopt(fd_, SOL_SOCKET, SO_ERROR, &error, &len);
errMsg = strerror(error);
return errMsg;
}
private:
void set_nonblock(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
std::pair<std::string, struct sockaddr> parseAddr(const std::string& ip,
int port) {
std::pair<std::string, struct sockaddr> result;
auto& [errMsg, addr] = result;
struct sockaddr_in* p = (struct sockaddr_in*)&addr;
struct in_addr stSinAddr;
int iRet = inet_pton(AF_INET, ip.c_str(), &(stSinAddr));
if (iRet < 0) {
errMsg = "inet_pton error";
return result;
}
if (iRet == 0) {
struct hostent stHostent;
struct hostent* pstHostent;
char buf[2048] = "\0";
int iError;

gethostbyname_r(ip.c_str(), &stHostent, buf, sizeof(buf),
&pstHostent, &iError);
if (pstHostent == NULL) {
errMsg =
"gethostbyname_r error" + std::string(hstrerror(iError));
return result;
}
stSinAddr = *(struct in_addr*)pstHostent->h_addr;
}

bzero(p, sizeof(*p));
p->sin_family = AF_INET;
p->sin_port = htons(port);
p->sin_addr = stSinAddr;

return result;
}
int fd_ = -1;
};

class Epoller {
public:
int add(Connection &connection, uint32_t event) {
struct epoll_event ev;
ev.data.u64 = reinterpret_cast<uint64_t>(&connection);
ev.events = event | EPOLLET;

return epoll_ctl(epollFd_, EPOLL_CTL_ADD, connection.fd(), &ev);
}
void run() {
while (1) {
int nfds = epoll_wait(epollFd_, events_, MAX_EVENTS, 100);
for (int n = 0; n < nfds; ++n) {
auto event = events_[n];
Connection *conection = reinterpret_cast<Connection*>(event.data.u64);
if (event.events & (EPOLLERR | EPOLLHUP)) {
auto errMsg = conection->getErr();
conection->error(errMsg);
conection->close();
continue;
}
if (event.events & EPOLLIN) {
conection->read();
}
if (event.events & EPOLLOUT) {
conection->write();
}
}
}
}
private:
int epollFd_ = epoll_create1(0);
static constexpr int MAX_EVENTS = 5;
struct epoll_event events_[MAX_EVENTS];
};

class HttpConnection : public Connection {
public:
HttpConnection() {
memset(buf_, 0, sizeof(buf_));
}
private:
void read() {
LOG << "EPOLLIN" << std::endl;
int ret = ::read(fd(), buf_ + readed, sizeof(buf_) - readed);
if (ret > 0) {
readed += ret;
if (strstr(buf_, "\r\n\r\n") != NULL) {
LOG << buf_ << std::endl;
close();
}
}
}
void write() {
LOG << "EPOLLOUT" << std::endl;
const char* req = "GET / HTTP/1.0\r\n\r\n";
if (wrote >= strlen(req)) {
return;
}
int ret = ::write(fd(), req + wrote, strlen(req) - wrote);
if (ret > 0) {
wrote += ret;
}
}
void error(const std::string &errMsg) {
LOG << "EPOLLERR | EPOLLHUP | " << errMsg << std::endl;
}
uint32_t wrote = 0;
int readed = 0;
char buf_[102400];
};

int main() {
signal(SIGPIPE, SIG_IGN);
signal(SIGHUP, SIG_IGN);
HttpConnection connection;
LOG << "connecting" << std::endl;
auto errMsg = connection.connect("127.0.0.1", 8000);
if (!errMsg.empty()) {
LOG << "connect error: " << errMsg << std::endl;
return EXIT_FAILURE;
}

Epoller epoller;
if (epoller.add(connection, EPOLLIN | EPOLLOUT)) {
LOG << "epoll_ctl error: " << strerror(errno) << std::endl;
return EXIT_FAILURE;
}

std::thread t([&]() {
epoller.run();
});
t.detach();

sleep(5);

return EXIT_SUCCESS;
}
1
~# g++ -std=c++17 -g -Wall simple_epoll.cpp -o out.exe -lpthread

测试连接失败

1
2
3
~# ./out.exe
2023-12-02 17:20:01.602 /root/cpp_test/simple_epoll.cpp:191 connecting
2023-12-02 17:20:02.802 /root/cpp_test/simple_epoll.cpp:180 EPOLLERR | EPOLLHUP | Connection refused

在 connect () 调用 1200ms 以后,由于对端拒绝触发 EPOLLERR 或 EPOLLHUP,报错 Connection refused

中间不会产生任何的虚假唤醒

测试连接成功

1
2
3
4
5
6
7
8
9
10
11
~# ./out.exe
2023-12-02 17:22:01.176 /root/cpp_test/simple_epoll.cpp:191 connecting
2023-12-02 17:22:02.376 /root/cpp_test/simple_epoll.cpp:169 EPOLLOUT
2023-12-02 17:22:03.577 /root/cpp_test/simple_epoll.cpp:158 EPOLLIN
2023-12-02 17:22:03.577 /root/cpp_test/simple_epoll.cpp:163 HTTP/1.0 200 OK
Server: BaseHTTP/0.6 Python/3.8.12
Date: Sat, 02 Dec 2023 09:22:03 GMT
Content-type: text/html

Hello
2023-12-02 17:22:03.577 /root/cpp_test/simple_epoll.cpp:169 EPOLLOUT

在 connect () 调用 1200ms 以后,触发 EPOLLOUT 事件,在这中间也不会触发任何事件,导致 read () 或者 write () 失败

结论

只要正确的使用 epoll,那么 EPOLLOUT 事件是只有当描述符可写的时候才会触发的,因此 getsockopt () 检查是否出错,来判断是否连接成功并不是必须的

事实上,如果不正确的使用 epoll,那么即使 getsockopt () 检查出没有错误,也不意味着连接成功,见下面这个例子

golang 的 bug

epoll bug with connect

net: connect after polling initialization

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20161 1406175892.180009 epoll_wait(4,  <unfinished ...>

20159 1406175892.180165 epoll_ctl(4, EPOLL_CTL_ADD, 34, {EPOLLIN|EPOLLOUT|EPOLLET|0x2000, {u32=3362912440, u64=140405843818680}}) = 0

20161 1406175892.180246 <... epoll_wait resumed> {{EPOLLOUT|EPOLLHUP, {u32=3362912440, u64=140405843818680}}}, 128, 4294967295) = 1

20159 1406175892.180290 connect(34, {sa_family=AF_INET, sin_port=htons(5678), sin_addr=inet_addr("x.x.x.x")}, 16 <unfinished ...>

20161 1406175892.180329 epoll_wait(4, <unfinished ...>

20159 1406175892.180359 <... connect resumed> ) = -1 EINPROGRESS (Operation now in progress)

20159 1406175892.180376 getsockopt(34, SOL_SOCKET, SO_ERROR, [0], [4]) = 0

20159 1406175892.180745 write(34, "hi", 2) = -1 EAGAIN (Resource temporarily unavailable)

20161 1406175892.193698 epoll_wait(4, {{EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP|0x2000, {u32=3362912440, u64=140405843818680}}, {EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP|0x2000, {u32=3362912088, u64=140405843818328}}, {EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP|0x2000, {u32=3362918640, u64=140405843824880}}, {EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP|0x2000, {u32=3362911736, u64=140405843817976}}, {EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP|0x2000, {u32=3362917056, u64=140405843823296}}, {EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP|0x2000, {u32=3362917936, u64=140405843824176}}, {EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP|0x2000, {u32=3362915432, u64=140405843821672}}, {EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP|0x2000, {u32=3362922160, u64=140405843828400}}, {EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP|0x2000, {u32=3362916880, u64=140405843823120}}, {EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP|0x2000, {u32=3362918112, u64=140405843824352}}, {EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP|0x2000, {u32=3362911560, u64=140405843817800}}, {EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP|0x2000, {u32=3362912616, u64=140405843818856}}, {EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP|0x2000, {u32=3362918288, u64=140405843824528}}, {EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP|0x2000, {u32=3362921632, u64=140405843827872}}, {EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP|0x2000, {u32=3362913672, u64=140405843819912}}, {EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP|0x2000, {u32=3362919168, u64=140405843825408}}, {EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP|0x2000, {u32=3362912792, u64=140405843819032}}}, 128, 0) = 17

20161 1406175892.196158 write(34, "hi", 2) = -1 ECONNREFUSED (Connection refused)

goalng 的这个版本在 connect () 之前,就把描述符加入了 epoll,导致另外一个线程的 epoll_wait () 被虚假唤醒

在 getsockopt () 检查了没有任何错误以后(此时还在连接中,对端还未拒绝),就开始 write () 了(还在连接中因此返回了 EAGAIN)

由于下一次 write () 不会再用 getsockopt () 检查错误,因此导致返回了 "Connection refused"

常见连接状态

tcp 的关闭连接已经是常识了,如果不太了解可以在这篇博文补充基本知识

很容易遇到的两个状态就是 CLOSE_WAIT 和 TIME_WAIT

CLOSE_WAIT 和 FIN_WAIT2

前者从状态图就能看出来,CLOSE_WAIT 是收到 fin 以后,应用层没有正确处理,调用 close,导致没有发出 FIN 包,是程序 bug 导致的

对应的 FIN_WAIT2 就是主动发起方的状态,可以通过修改 /proc/sys/net/ipv4/tcp_fin_timeout 的值来设定这个状态的超时时间

TIME_WAIT

由于只要关闭连接就会出现,默认会持续 60 秒,这可能是最常见的状态了,当服务器上有大量连接,就很容易因为太多的 TIME_WAIT 导致没有端口可用

有两种办法可以解决:SO_REUSEADDRSO_LINGER

SO_REUSEADDR

SO_REUSEADDR 和 SO_REUSEPORT 可以重用这个地址和端口,这其实很危险

所以一般只有服务端 accept 的端口才会使用,这是因为服务端停止会 close 掉所有的连接,此时会造成很多的 TIME_WAIT 状态

必须使用 SO_REUSEADDR 和 SO_REUSEPORT 才能让服务端立刻重启

SO_LINGER

SO_LINGER 要复杂很多,用法如下

1
2
3
4
5
6
7
8
/*
struct linger {
int l_onoff;
int l_linger;
};
*/
struct linger ling;
setsockopt(sockfd, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));

可以看到,linger 存在两个属性,虽然是 int,其实是 bool 的语义,含义如下:

l_onoff l_linger closesocket 行为 发送队列 底层行为 备注
忽略 立即返回 保持直至发送完成 系统接管套接字并保证将数据发送至对端 默认行为
非零 立即返回 立即放弃 直接发送 RST 包,自身立即复位,不用经过 2MSL 状态
非零 非零 阻塞直到 l_linger 时间超时或数据发送完成 在超时时间段内保持尝试发送,若超时则立即放弃 超时则同第二种情况,若发送完成则皆大欢喜 套接字必须设置为阻塞

由于第三行是只有阻塞的描述符才可以使用的,所以除去默认行为以外,只有第二行的参数可以使用

在这种情况下,close 会直接发送 rst,让描述符直接进入 CLOSED 的最终状态,不会再产生 TIME_WAIT

当没有数据想要发送时,直接发送 RST 就不会导致任何问题

对于 ping-pong 的一请求一回答的场合,客户端需要主动关闭连接就可以使用这种方式

服务端一般而言不会主动关闭连接,但是会在长连接空闲过久时关闭,此时也可以使用这种方式

Delay ACK 和 Nagle 算法

这篇描述的更细致:再多来点 TCP 吧:Delay ACK 和 Nagle 算法

我做一个总结

Delay ACK

Delay ACK 假设如果收到一个包,那么应用层会需要对这个包做出回应,等一小段时间(默认 200ms),应用层写入数据以后再一起返回,直到超时了,才回复 ACK

这是通过在 TCP segment 设置 ACK 标记来实现的(FIN 也可以同时设置这个标记,使得四次挥手实际上只需要三次)

关闭 Delay Ack:

1
2
int flag = 1;
setsockopt(sockfd, IPPROTO_TCP, TCP_QUICKACK, (char *)&flag, sizeof(flag));

Delay Ack 默认就是关闭的

Nagle 算法

Nagle 算法是为了解决每次发送一点内容就立刻发送的话,20 字节的 IP 头和 20 字节的 TCP 头太浪费的问题

简单来说,就是如果要发送的内容足够一个 MSS 了,就立即发送。否则,每次收到对方的 ACK 才发送下一次数据

关闭办法:

1
2
int flag = 1;
setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag));

这样可以关闭

Nagle 踩坑

当在广域网使用 Nagle 时,很容易因为过长的 rtt 等待对方的 ACK 导致吞吐量下降

我朋友在跨国专线中使用 kafka 客户端时发现每个 tcp 包都间隔很久才发送

根据 librdkafka 的文档发现

socket.nagle.disable 这个选项默认值是 false,也就是默认开启了 nagle 算法

网上也能看到对这个选项的讨论

Optimizing Kafka producers for latency

甚至还有提交 pull request 尝试修复这个问题的 Disable Nagle algorithm by default

但是最终没有合进主分支

Delay ACK + Nagle 必踩坑

当 A 使用了 Nagle 算法,B 打开了 Delay ACK,

A 的一大串内容的最后一段不到一个 MSS 包,需要等待对方 ACK 才发送时

B 只是个接收方,不想发送任何内容,直到 B 的 Delay ACK 超时,这一次请求才完成