zookeeper c 客户端源码分析以及使用注意点

zookeeper 的库用起来很麻烦,建议使用我封装的 zookeeper c 库,能考虑更少的 zookeeper 本身的逻辑

链接:

ylibzkevent


在上一篇文章查 CLOSE_WAIT 泄漏问题的时候稍微看了下 zookeeper c 源码,大致对他的流程来做一个分析。

zookeeper 的 C 客户端分为 mt 库和 st 库(多线程和单线程),一般操作都是以多线程库为主。

多线程库分为三个线程,主线程,io 线程和 completion 线程

主线程就是调用 API 的线程,io 线程负责网络通信,对异步请求和 watch 响应等,IO 线程会发给 completion 线程,由 completion 线程异步完成

主进程

首先看下 zookeeper_init 是如何创建 io 线程和 completion 线程的

1
2
3
4
5
6
7
8
9
10
zookeeper_init-->adaptor_init-->start_threads
void start_threads(zhandle_t* zh)
{
...
api_prolog(zh);
rc=pthread_create(&adaptor->io, 0, do_io, zh);
rc=pthread_create(&adaptor->completion, 0, do_completion, zh);
wait_for_others(zh);
api_epilog(zh, 0);
}

io 进程

do_io 就是 io 进程的主要逻辑了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
void *do_io(void *v)
{
zhandle_t *zh = (zhandle_t*)v;
struct pollfd fds[2];
struct adaptor_threads *adaptor_threads = zh->adaptor_priv;

api_prolog(zh);
notify_thread_ready(zh);
LOG_DEBUG(("started IO thread"));
fds[0].fd=adaptor_threads->self_pipe[0];
fds[0].events=POLLIN;
while(!zh->close_requested) {
struct timeval tv;
int fd;
int interest;
int timeout;
int maxfd=1;
int rc;

zookeeper_interest(zh, &fd, &interest, &tv);
//将zookeeper_intereset里建立的新描述符加入监听事件里,睡眠一定的超时时间直到被唤醒
if (fd != -1) {
fds[1].fd=fd;
fds[1].events=(interest&ZOOKEEPER_READ)?POLLIN:0;
fds[1].events|=(interest&ZOOKEEPER_WRITE)?POLLOUT:0;
maxfd=2;
}
timeout=tv.tv_sec * 1000 + (tv.tv_usec/1000);

poll(fds,maxfd,timeout);
if (fd != -1) {
interest=(fds[1].revents&POLLIN)?ZOOKEEPER_READ:0;
interest|=((fds[1].revents&POLLOUT)||(fds[1].revents&POLLHUP))?ZOOKEEPER_WRITE:0;
}
if(fds[0].revents&POLLIN){
// flush the pipe
char b[128];
while(read(adaptor_threads->self_pipe[0],b,sizeof(b))==sizeof(b)){}
}
// dispatch zookeeper events
rc = zookeeper_process(zh, interest);
// check the current state of the zhandle and terminate
// if it is_unrecoverable()
if(is_unrecoverable(zh))
break;
}
api_epilog(zh, 0);
LOG_DEBUG(("IO thread terminated"));
return 0;
}

可以看到,do_io 线程是一个使用 poll 多路复用的循环,主要是监视 zh->fd 和 adaptor_threads->self_pipe [0]。

zh->fd 是和服务端通信的描述符,self_pipe 是用来唤醒这个线程的 (wakeup_io_thread)

循环开始使用 zookeeper_interest 对服务端的网络进行检查,并且设置了 poll 中的等待时间

而后 poll 会进行睡眠等待被服务端或者主进程唤醒,唤醒后执行 zookeeper_process 来做具体的操作。

那么下面先看下 zookeeper_interest 的具体操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
do_io->zookeeper_interest
int zookeeper_interest(zhandle_t *zh, int *fd, int *interest,
struct timeval *tv)
{
struct timeval now;
if(zh==0 || fd==0 ||interest==0 || tv==0)
return ZBADARGUMENTS;
if (is_unrecoverable(zh))
return ZINVALIDSTATE;
gettimeofday(&now, 0);
//如果存在deadline时间,那么打印超过了deadline时间多久
if(zh->next_deadline.tv_sec!=0 || zh->next_deadline.tv_usec!=0){
int time_left = calculate_interval(&zh->next_deadline, &now);
if (time_left > 10)
LOG_WARN(("Exceeded deadline by %dms", time_left));
}
//增加zh的引用计数,而后对变量进行初始化
api_prolog(zh);
*fd = zh->fd;
*interest = 0;
tv->tv_sec = 0;
tv->tv_usec = 0;
//如果连接不存在
if (*fd == -1) {
if (zh->connect_index == zh->addrs_count) {
/* Wait a bit before trying again so that we don't spin */
zh->connect_index = 0;
}else {
//重新建立连接,并且关nagle算法
int rc;
int enable_tcp_nodelay = 1;
int ssoresult;

zh->fd = socket(zh->addrs[zh->connect_index].ss_family, SOCK_STREAM, 0);
if (zh->fd < 0) {
return api_epilog(zh,handle_socket_error_msg(zh,__LINE__,
ZSYSTEMERROR, "socket() call failed"));
}
ssoresult = setsockopt(zh->fd, IPPROTO_TCP, TCP_NODELAY, &enable_tcp_nodelay, sizeof(enable_tcp_nodelay));
if (ssoresult != 0) {
LOG_WARN(("Unable to set TCP_NODELAY, operation latency may be effected"));
}
fcntl(zh->fd, F_SETFL, O_NONBLOCK|fcntl(zh->fd, F_GETFL, 0));
{
rc = connect(zh->fd, (struct sockaddr*) &zh->addrs[zh->connect_index], sizeof(struct sockaddr_in));
}
//因为是非阻塞socket,所以没连上的时候此时把状态设置为connecting
if (rc == -1) {
/* we are handling the non-blocking connect according to
* the description in section 16.3 "Non-blocking connect"
* in UNIX Network Programming vol 1, 3rd edition */
if (errno == EWOULDBLOCK || errno == EINPROGRESS)
zh->state = ZOO_CONNECTING_STATE;
else
return api_epilog(zh,handle_socket_error_msg(zh,__LINE__,
ZCONNECTIONLOSS,"connect() call failed"));
} else {
//否则调用prime_connection,对zk服务端做一些握手验证等等,这个函数后面再看
if((rc=prime_connection(zh))!=0)
return api_epilog(zh,rc);

LOG_INFO(("Initiated connection to server [%s]",
format_endpoint_info(&zh->addrs[zh->connect_index])));
}
}
//设置初始的poll等待时间为传入的recv_timeout的三分之一
*fd = zh->fd;
*tv = get_timeval(zh->recv_timeout/3);
zh->last_recv = now;
zh->last_send = now;
zh->last_ping = now;
}
//如果连接建立
if (zh->fd != -1) {
//计算有多久没有recv和send数据了
int idle_recv = calculate_interval(&zh->last_recv, &now);
int idle_send = calculate_interval(&zh->last_send, &now);
int recv_to = zh->recv_timeout*2/3 - idle_recv;
int send_to = zh->recv_timeout/3;
// have we exceeded the receive timeout threshold?
//如果空闲recv时间达到2/3*timeout,那么超时,关闭连接
if (recv_to <= 0) {
// We gotta cut our losses and connect to someone else
errno = ETIMEDOUT;
*interest=0;
*tv = get_timeval(0);
return api_epilog(zh,handle_socket_error_msg(zh,
__LINE__,ZOPERATIONTIMEOUT,
"connection to %s timed out (exceeded timeout by %dms)",
format_endpoint_info(&zh->addrs[zh->connect_index]),
-recv_to));

}
//如果空闲send时间达到1/3*timeout
//那么发送ping(send_ping->adaptor_send_queue->flush_send_queue->send_buffer->zookeeper_send->send)
// We only allow 1/3 of our timeout time to expire before sending
// a PING
if (zh->state==ZOO_CONNECTED_STATE) {
send_to = zh->recv_timeout/3 - idle_send;
if (send_to <= 0) {
if (zh->sent_requests.head==0) {
int rc=send_ping(zh);
if (rc < 0){
LOG_ERROR(("failed to send PING request (zk retcode=%d)",rc));
return api_epilog(zh,rc);
}
}
send_to = zh->recv_timeout/3;
}
}
// choose the lesser value as the timeout
//计算poll的等待时间next_deadline
*tv = get_timeval(recv_to < send_to? recv_to:send_to);
zh->next_deadline.tv_sec = now.tv_sec + tv->tv_sec;
zh->next_deadline.tv_usec = now.tv_usec + tv->tv_usec;
if (zh->next_deadline.tv_usec > 1000000) {
zh->next_deadline.tv_sec += zh->next_deadline.tv_usec / 1000000;
zh->next_deadline.tv_usec = zh->next_deadline.tv_usec % 1000000;
}
//标识目前客户端对read事件(ping)感兴趣
*interest = ZOOKEEPER_READ;
/* we are interested in a write if we are connected and have something
* to send, or we are waiting for a connect to finish. */
//如果zh->to_send有内容那么也关注write事件
if ((zh->to_send.head && (zh->state == ZOO_CONNECTED_STATE))
|| zh->state == ZOO_CONNECTING_STATE) {
*interest |= ZOOKEEPER_WRITE;
}
}
return api_epilog(zh,ZOK);
}

可以看到 zookeeper_interest 决定了 poll 会睡眠多久,并且决定了 zookeeper 目前是对 read 还是 write 感兴趣

下面来看看 zookeeper_process

zookeeper_process 中有个很重要的函数 check_events,先看看他

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
do_io->zookeeper_process->check_events
static int check_events(zhandle_t *zh, int events)
{
if (zh->fd == -1)
return ZINVALIDSTATE;
//如果描述符触发写事件,并且状态为ZOO_CONNECTING_STATE状态,说明刚刚建立连接
if ((events&ZOOKEEPER_WRITE)&&(zh->state == ZOO_CONNECTING_STATE)) {
int rc, error;
socklen_t len = sizeof(error);
rc = getsockopt(zh->fd, SOL_SOCKET, SO_ERROR, &error, &len);
/* the description in section 16.4 "Non-blocking connect"
* in UNIX Network Programming vol 1, 3rd edition, points out
* that sometimes the error is in errno and sometimes in error */
if (rc < 0 || error) {
if (rc == 0)
errno = error;
return handle_socket_error_msg(zh, __LINE__,ZCONNECTIONLOSS,
"server refused to accept the client");
}
//进入prime_connecttion
if((rc=prime_connection(zh))!=0)
return rc;
LOG_INFO(("initiated connection to server [%s]",
format_endpoint_info(&zh->addrs[zh->connect_index])));
return ZOK;
}

连接建立后会进入 prime_connection,看看这个函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
do_io->zookeeper_process->check_events->prime_connection
static int prime_connection(zhandle_t *zh)
{
int rc;
/*this is the size of buffer to serialize req into*/
char buffer_req[HANDSHAKE_REQ_SIZE];
int len = sizeof(buffer_req);
int hlen = 0;
struct connect_req req;
//设置协议req的version,sessionId,passwd等等
req.protocolVersion = 0;
req.sessionId = zh->client_id.client_id;
req.passwd_len = sizeof(req.passwd);
memcpy(req.passwd, zh->client_id.passwd, sizeof(zh->client_id.passwd));
req.timeOut = zh->recv_timeout;
req.lastZxidSeen = zh->last_zxid;
hlen = htonl(len);
/* We are running fast and loose here, but this string should fit in the initial buffer! */
//发送协议头长度,以及整个包??这里为啥不是异步的,没看懂
rc=zookeeper_send(zh->fd, &hlen, sizeof(len));
serialize_prime_connect(&req, buffer_req);
rc=rc<0 ? rc : zookeeper_send(zh->fd, buffer_req, len);
if (rc<0) {
return handle_socket_error_msg(zh, __LINE__, ZCONNECTIONLOSS,
"failed to send a handshake packet: %s", strerror(errno));
}
//从CONNECTING状态转换为ASSOCIATING状态
zh->state = ZOO_ASSOCIATING_STATE;

//将input_buffer标记为primer_buffer
zh->input_buffer = &zh->primer_buffer;
/* This seems a bit weird to to set the offset to 4, but we already have a
* length, so we skip reading the length (and allocating the buffer) by
* saying that we are already at offset 4 */
//设置已经当前input_buffer便宜为4
zh->input_buffer->curr_offset = 4;

return ZOK;
}

继续刚才的 check_events 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
do_io->zookeeper_process->check_events
static int check_events(zhandle_t *zh, int events)
{
...
//如果to_send链表有内容,且触发事件为写事件,那么flush_send_queue,此时数据会被发出
if (zh->to_send.head && (events&ZOOKEEPER_WRITE)) {
/* make the flush call non-blocking by specifying a 0 timeout */
int rc=flush_send_queue(zh,0);
if (rc < 0)
return handle_socket_error_msg(zh,__LINE__,ZCONNECTIONLOSS,
"failed while flushing send queue");
}
//如果触发事件为读
if (events&ZOOKEEPER_READ) {
int rc;
if (zh->input_buffer == 0) {
zh->input_buffer = allocate_buffer(0,0);
}

//读取数据,读取一个文件头大小后申请响应的大小
rc = recv_buffer(zh->fd, zh->input_buffer);
if (rc < 0) {
return handle_socket_error_msg(zh, __LINE__,ZCONNECTIONLOSS,
"failed while receiving a server response");
}
if (rc > 0) {
//如果input_buffer不为primer_buffer,说明不是刚建立的连接,那么把input_buffer的内容放入to_process链表等待被处理
gettimeofday(&zh->last_recv, 0);
if (zh->input_buffer != &zh->primer_buffer) {
queue_buffer(&zh->to_process, zh->input_buffer, 0);
//如果是刚建立的连接
} else {
int64_t oldid,newid;
//deserialize
deserialize_prime_response(&zh->primer_storage, zh->primer_buffer.buffer);
/* We are processing the primer_buffer, so we need to finish
* the connection handshake */
oldid = zh->client_id.client_id;
newid = zh->primer_storage.sessionId;
//把客户端的client_id和服务端的sessionid做比较,如果不同说明会话已经过期,此时会触发一个过期事件
if (oldid != 0 && oldid != newid) {
zh->state = ZOO_EXPIRED_SESSION_STATE;
errno = ESTALE;
return handle_socket_error_msg(zh,__LINE__,ZSESSIONEXPIRED,
"sessionId=%#llx has expired.",oldid);
} else {
//把客户端id设置为服务端的session_id
zh->recv_timeout = zh->primer_storage.timeOut;
zh->client_id.client_id = newid;

memcpy(zh->client_id.passwd, &zh->primer_storage.passwd,
sizeof(zh->client_id.passwd));
//进入ZOO_CONNECTED_STATE状态
zh->state = ZOO_CONNECTED_STATE;
LOG_INFO(("session establishment complete on server [%s], sessionId=%#llx, negotiated timeout=%d",
format_endpoint_info(&zh->addrs[zh->connect_index]),
newid, zh->recv_timeout));
/* we want the auth to be sent for, but since both call push to front
we need to call send_watch_set first */
//发送AUTH和WATCH信息
send_set_watches(zh);
/* send the authentication packet now */
send_auth_info(zh);
LOG_DEBUG(("Calling a watcher for a ZOO_SESSION_EVENT and the state=ZOO_CONNECTED_STATE"));
zh->input_buffer = 0; // just in case the watcher calls zookeeper_process() again
PROCESS_SESSION_EVENT(zh, ZOO_CONNECTED_STATE);
}
}
zh->input_buffer = 0;
} else {
// zookeeper_process was called but there was nothing to read
// from the socket
return ZNOTHING;
}
}
return ZOK;
}

这个 send_set_watches 很关键,把自己已有的 watch 信息全部收集起来发给服务端,服务端会把这些信息和自己的信息做比对

如果有不同会发送给客户端事件,这是为了防止心跳超时后,会话没有超时时,因为不是同一个 TCP 连接导致信息丢失。

这个函数能保证在会话期间的任何节点变化,都能触发 watch 函数的调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
do_io->zookeeper_process->check_events->send_set_watches
static int send_set_watches(zhandle_t *zh)
{
struct oarchive *oa;
struct RequestHeader h = { STRUCT_INITIALIZER(xid , SET_WATCHES_XID), STRUCT_INITIALIZER(type , ZOO_SETWATCHES_OP)};
struct SetWatches req;
int rc;

req.relativeZxid = zh->last_zxid;
req.dataWatches.data = collect_keys(zh->active_node_watchers, (int*)&req.dataWatches.count);
req.existWatches.data = collect_keys(zh->active_exist_watchers, (int*)&req.existWatches.count);
req.childWatches.data = collect_keys(zh->active_child_watchers, (int*)&req.childWatches.count);

// return if there are no pending watches
if (!req.dataWatches.count && !req.existWatches.count &&
!req.childWatches.count) {
free_key_list(req.dataWatches.data, req.dataWatches.count);
free_key_list(req.existWatches.data, req.existWatches.count);
free_key_list(req.childWatches.data, req.childWatches.count);
return ZOK;
}


oa = create_buffer_oarchive();
rc = serialize_RequestHeader(oa, "header", &h);
rc = rc < 0 ? rc : serialize_SetWatches(oa, "req", &req);
/* add this buffer to the head of the send queue */
rc = rc < 0 ? rc : queue_front_buffer_bytes(&zh->to_send, get_buffer(oa),
get_buffer_len(oa));
/* We queued the buffer, so don't free it */
close_buffer_oarchive(&oa, 0);
free_key_list(req.dataWatches.data, req.dataWatches.count);
free_key_list(req.existWatches.data, req.existWatches.count);
free_key_list(req.childWatches.data, req.childWatches.count);
LOG_DEBUG(("Sending set watches request to %s",format_current_endpoint_info(zh)));
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
do_io->zookeeper_process->check_events->queue_session_event
// IO thread queues session events to be processed by the completion thread
static int queue_session_event(zhandle_t *zh, int state)
{
int rc;
struct WatcherEvent evt = { ZOO_SESSION_EVENT, state, "" };
struct ReplyHeader hdr = { WATCHER_EVENT_XID, 0, 0 };
struct oarchive *oa;
completion_list_t *cptr;

if ((oa=create_buffer_oarchive())==NULL) {
LOG_ERROR(("out of memory"));
goto error;
}
rc = serialize_ReplyHeader(oa, "hdr", &hdr);
rc = rc<0?rc: serialize_WatcherEvent(oa, "event", &evt);
if(rc<0){
close_buffer_oarchive(&oa, 1);
goto error;
}
cptr = create_completion_entry(WATCHER_EVENT_XID,-1,0,0,0,0);
cptr->buffer = allocate_buffer(get_buffer(oa), get_buffer_len(oa));
cptr->buffer->curr_offset = get_buffer_len(oa);
if (!cptr->buffer) {
free(cptr);
close_buffer_oarchive(&oa, 1);
goto error;
}
/* We queued the buffer, so don't free it */
close_buffer_oarchive(&oa, 0);
cptr->c.watcher_result = collectWatchers(zh, ZOO_SESSION_EVENT, "");
//将watcher的内容放入completions_to_process链表内,由completion线程进行调用
queue_completion(&zh->completions_to_process, cptr, 0);
if (process_async(zh->outstanding_sync)) {
process_completions(zh);
}
return ZOK;
error:
errno=ENOMEM;
return ZSYSTEMERROR;
}

check_events 看完了,回到 zookeeper_process

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
nt zookeeper_process(zhandle_t *zh, int events)
{
buffer_list_t *bptr;
int rc;

if (zh==NULL)
return ZBADARGUMENTS;
if (is_unrecoverable(zh))
return ZINVALIDSTATE;
api_prolog(zh);
IF_DEBUG(checkResponseLatency(zh));
rc = check_events(zh, events);
if (rc!=ZOK)
return api_epilog(zh, rc);

IF_DEBUG(isSocketReadable(zh));

//如果to_process链表有内容,也就是接受到服务器的信息,那么解析出来
while (rc >= 0 && (bptr=dequeue_buffer(&zh->to_process))) {
struct ReplyHeader hdr;
struct iarchive *ia = create_buffer_iarchive(
bptr->buffer, bptr->curr_offset);
deserialize_ReplyHeader(ia, "hdr", &hdr);
if (hdr.zxid > 0) {
zh->last_zxid = hdr.zxid;
} else {
// fprintf(stderr, "Got %#x for %#x\n", hdr.zxid, hdr.xid);
}
//如果类型是WATCHER_EVENT_XID,那么创建一个WATCHER_EVENT_XID的watch放入completions_to_process队列让异步completions队列去处理
if (hdr.xid == WATCHER_EVENT_XID) {
struct WatcherEvent evt;
int type = 0;
char *path = NULL;
completion_list_t *c = NULL;

LOG_DEBUG(("Processing WATCHER_EVENT"));

deserialize_WatcherEvent(ia, "event", &evt);
type = evt.type;
path = evt.path;
/* We are doing a notification, so there is no pending request */
c = create_completion_entry(WATCHER_EVENT_XID,-1,0,0,0,0);
c->buffer = bptr;
c->c.watcher_result = collectWatchers(zh, type, path);

// We cannot free until now, otherwise path will become invalid
deallocate_WatcherEvent(&evt);
queue_completion(&zh->completions_to_process, c, 0);
} else if (hdr.xid == SET_WATCHES_XID) {
//如果类型是SET_WATCHES_XID,那么不处理
LOG_DEBUG(("Processing SET_WATCHES"));
free_buffer(bptr);
//如果类型是验证,那么直接自己调用auth_completion_func来处理(不从completions线程去处理)
} else if (hdr.xid == AUTH_XID){
LOG_DEBUG(("Processing AUTH_XID"));

/* special handling for the AUTH response as it may come back
* out-of-band */
auth_completion_func(hdr.err,zh);
free_buffer(bptr);
/* authentication completion may change the connection state to
* unrecoverable */
if(is_unrecoverable(zh)){
handle_error(zh, ZAUTHFAILED);
close_buffer_iarchive(&ia);
return api_epilog(zh, ZAUTHFAILED);
}
} else {
int rc = hdr.err;
/* Find the request corresponding to the response */
completion_list_t *cptr = dequeue_completion(&zh->sent_requests);

/* [ZOOKEEPER-804] Don't assert if zookeeper_close has been called. */
if (zh->close_requested == 1 && cptr == NULL) {
LOG_DEBUG(("Completion queue has been cleared by zookeeper_close()"));
close_buffer_iarchive(&ia);
return api_epilog(zh,ZINVALIDSTATE);
}
assert(cptr);
/* The requests are going to come back in order */
if (cptr->xid != hdr.xid) {
LOG_DEBUG(("Processing unexpected or out-of-order response!"));

// received unexpected (or out-of-order) response
close_buffer_iarchive(&ia);
free_buffer(bptr);
// put the completion back on the queue (so it gets properly
// signaled and deallocated) and disconnect from the server
queue_completion(&zh->sent_requests,cptr,1);
return handle_socket_error_msg(zh, __LINE__,ZRUNTIMEINCONSISTENCY,
"unexpected server response: expected %#x, but received %#x",
hdr.xid,cptr->xid);
}

activateWatcher(zh, cptr->watcher, rc);

//如果是异步请求
if (cptr->c.void_result != SYNCHRONOUS_MARKER) {
//如果是ping消息,那么直接更新last_ping即可
if(hdr.xid == PING_XID){
int elapsed = 0;
struct timeval now;
gettimeofday(&now, 0);
elapsed = calculate_interval(&zh->last_ping, &now);
LOG_DEBUG(("Got ping response in %d ms", elapsed));

// Nothing to do with a ping response
free_buffer(bptr);
destroy_completion_entry(cptr);
} else {
LOG_DEBUG(("Queueing asynchronous response"));
//如果其他的,那么发给completion线程去处理
cptr->buffer = bptr;
queue_completion(&zh->completions_to_process, cptr, 0);
}
} else {
//同步消息,那么自己来处理
struct sync_completion
*sc = (struct sync_completion*)cptr->data;
sc->rc = rc;

process_sync_completion(cptr, sc, ia, zh);

notify_sync_completion(sc);
free_buffer(bptr);
zh->outstanding_sync--;
destroy_completion_entry(cptr);
}
}

close_buffer_iarchive(&ia);

}
if (process_async(zh->outstanding_sync)) {
process_completions(zh);
}
return api_epilog(zh,ZOK);
}

总结

一共 5 种状态

1
2
3
4
5
const int ZOO_EXPIRED_SESSION_STATE = EXPIRED_SESSION_STATE_DEF;
const int ZOO_AUTH_FAILED_STATE = AUTH_FAILED_STATE_DEF;
const int ZOO_CONNECTING_STATE = CONNECTING_STATE_DEF;
const int ZOO_ASSOCIATING_STATE = ASSOCIATING_STATE_DEF;
const int ZOO_CONNECTED_STATE = CONNECTED_STATE_DEF;

对它的状态转换做一个示意图

1
2
3
4
5
6
                connect返回-1且errnr == EWOULDBLOCK
notconnected ------------------------------------------ connecting--------------------------|
| |
| prime_connection |
different session id | |
expired_session -------------------------------------- associating ------------------- connected ---------auth_failed

有如下四种情景:

1 使用 zookeeper_init 刚连上时,会触发 SESSION_EVENT 且状态为 CONNECTED 的 watcher

2 如果发生了节点变化,会 Process 相应的 WATCHER_EVENT 的 watcher

3 当 client 和 zookeeper 超时时间大于心跳时间且小于会话时间时

原先的连接断线后会触发 SESSION_EVENT 且状态为 CONNECTING 的 watcher,当重新连上后会触发 SESSION_EVENT 且状态为 CONNECTED 的 watcher

如果在这段时间内监视的节点发生变化,还会 Process 相应的 WATCHER_EVENT 的 watcher

4 如果超时时间大于会话时间时

原先的连接断线后会触发 SESSION_EVENT 且状态为 CONNECTING 的 watch,当重新连上后会触发 SESSION_EVENT 且状态为 EXPIRED_SESSION 的 watcher

此时需要自己重新使用 zookeeper_init 连接,并且重新注册自己的 watcher,这时重新进入情景 1,即为触发 SESSION_EVENT 且状态为 CONNECTED 的 watcher


应该这样正确使用 zookeeper_mt 库:

使用 zookeeper_init 注册一个全局 watcher 如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
static void watcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx)
{
if(type == ZOO_SESSION_EVENT)
{
if (state == ZOO_CONNECTED_STATE)
{
log_info("connected zookeeper");
//第一次正常连接和超时重连都会触发该状态,所以要判断是否是超时引起的该状态
//如果是会话超时引起的,需要重新设置观察器
//对之前注册的每个路径,都需要显式的触发一次
//(例如原来是调用的get_children监视,那么现在需要调用get_children获取一次)
//防止因为和zk的会话超时,导致这段时间内的节点变化监视丢失
if(reconnection_flag) {
reconnection_flag = 0;
...
//重新设置对路径的观察事件
}
}
else if(state == ZOO_AUTH_FAILED_STATE)
{
log_error("Authentication failure. Shutting down...");
zookeeper_close(zh);
}
else if(state == ZOO_EXPIRED_SESSION_STATE)
{
log_error("Session expired. Shutting down...");
//超时会话过期,设置重连标记位
reconnection_flag = 1;
//关闭原来的zookeeper handle,并且用zookeeper_init尝试重连
zookeeper_close(zh);
... = zookeeper_init(...);
}
}
else
{
//判断事件和路径,分发给不同的调用逻辑
...
}
}

要注意的是,watcher 是后台线程,因此对某些和主线程共享的变量,需要添加互斥锁

zookeeper 的客户端必须做成事件通知机制,多线程确实是一个比较简单的方案

但是对于开发而言,如果要考虑那么多方方面面,确实会很蛋疼,我认为用本文开始提到的 zookeeper c 库会更加简单方便