zookeeper c 客户端源码分析以及使用注意点

zookeeper的库用起来很麻烦,建议使用我封装的zookeeper c库,能考虑更少的zookeeper本身的逻辑

链接:

ylibzkevent


在上一篇文章查CLOSE_WAIT泄漏问题的时候稍微看了下zookeeper c源码,大致对他的流程来做一个分析。

zookeeper的C客户端分为mt库和st库(多线程和单线程),一般操作都是以多线程库为主。

多线程库分为三个线程,主线程,io线程和completion线程

主线程就是调用API的线程,io线程负责网络通信,对异步请求和watch响应等,IO线程会发给completion线程,由completion线程异步完成

主进程

首先看下zookeeper_init是如何创建io线程和completion线程的

1
2
3
4
5
6
7
8
9
10
zookeeper_init-->adaptor_init-->start_threads
void start_threads(zhandle_t* zh)
{
...
api_prolog(zh);
rc=pthread_create(&adaptor->io, 0, do_io, zh);
rc=pthread_create(&adaptor->completion, 0, do_completion, zh);
wait_for_others(zh);
api_epilog(zh, 0);
}

io进程

do_io就是io进程的主要逻辑了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
void *do_io(void *v)
{
zhandle_t *zh = (zhandle_t*)v;
struct pollfd fds[2];
struct adaptor_threads *adaptor_threads = zh->adaptor_priv;

api_prolog(zh);
notify_thread_ready(zh);
LOG_DEBUG(("started IO thread"));
fds[0].fd=adaptor_threads->self_pipe[0];
fds[0].events=POLLIN;
while(!zh->close_requested) {
struct timeval tv;
int fd;
int interest;
int timeout;
int maxfd=1;
int rc;

zookeeper_interest(zh, &fd, &interest, &tv);
//将zookeeper_intereset里建立的新描述符加入监听事件里,睡眠一定的超时时间直到被唤醒
if (fd != -1) {
fds[1].fd=fd;
fds[1].events=(interest&ZOOKEEPER_READ)?POLLIN:0;
fds[1].events|=(interest&ZOOKEEPER_WRITE)?POLLOUT:0;
maxfd=2;
}
timeout=tv.tv_sec * 1000 + (tv.tv_usec/1000);

poll(fds,maxfd,timeout);
if (fd != -1) {
interest=(fds[1].revents&POLLIN)?ZOOKEEPER_READ:0;
interest|=((fds[1].revents&POLLOUT)||(fds[1].revents&POLLHUP))?ZOOKEEPER_WRITE:0;
}
if(fds[0].revents&POLLIN){
// flush the pipe
char b[128];
while(read(adaptor_threads->self_pipe[0],b,sizeof(b))==sizeof(b)){}
}
// dispatch zookeeper events
rc = zookeeper_process(zh, interest);
// check the current state of the zhandle and terminate
// if it is_unrecoverable()
if(is_unrecoverable(zh))
break;
}
api_epilog(zh, 0);
LOG_DEBUG(("IO thread terminated"));
return 0;
}

可以看到,do_io线程是一个使用poll多路复用的循环,主要是监视zh->fd和adaptor_threads->self_pipe[0]。

zh->fd是和服务端通信的描述符,self_pipe是用来唤醒这个线程的(wakeup_io_thread)

循环开始使用zookeeper_interest对服务端的网络进行检查,并且设置了poll中的等待时间

而后poll会进行睡眠等待被服务端或者主进程唤醒,唤醒后执行zookeeper_process来做具体的操作。

那么下面先看下zookeeper_interest的具体操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
do_io->zookeeper_interest
int zookeeper_interest(zhandle_t *zh, int *fd, int *interest,
struct timeval *tv)
{
struct timeval now;
if(zh==0 || fd==0 ||interest==0 || tv==0)
return ZBADARGUMENTS;
if (is_unrecoverable(zh))
return ZINVALIDSTATE;
gettimeofday(&now, 0);
//如果存在deadline时间,那么打印超过了deadline时间多久
if(zh->next_deadline.tv_sec!=0 || zh->next_deadline.tv_usec!=0){
int time_left = calculate_interval(&zh->next_deadline, &now);
if (time_left > 10)
LOG_WARN(("Exceeded deadline by %dms", time_left));
}
//增加zh的引用计数,而后对变量进行初始化
api_prolog(zh);
*fd = zh->fd;
*interest = 0;
tv->tv_sec = 0;
tv->tv_usec = 0;
//如果连接不存在
if (*fd == -1) {
if (zh->connect_index == zh->addrs_count) {
/* Wait a bit before trying again so that we don't spin */
zh->connect_index = 0;
}else {
//重新建立连接,并且关nagle算法
int rc;
int enable_tcp_nodelay = 1;
int ssoresult;

zh->fd = socket(zh->addrs[zh->connect_index].ss_family, SOCK_STREAM, 0);
if (zh->fd < 0) {
return api_epilog(zh,handle_socket_error_msg(zh,__LINE__,
ZSYSTEMERROR, "socket() call failed"));
}
ssoresult = setsockopt(zh->fd, IPPROTO_TCP, TCP_NODELAY, &enable_tcp_nodelay, sizeof(enable_tcp_nodelay));
if (ssoresult != 0) {
LOG_WARN(("Unable to set TCP_NODELAY, operation latency may be effected"));
}
fcntl(zh->fd, F_SETFL, O_NONBLOCK|fcntl(zh->fd, F_GETFL, 0));
{
rc = connect(zh->fd, (struct sockaddr*) &zh->addrs[zh->connect_index], sizeof(struct sockaddr_in));
}
//因为是非阻塞socket,所以没连上的时候此时把状态设置为connecting
if (rc == -1) {
/* we are handling the non-blocking connect according to
* the description in section 16.3 "Non-blocking connect"
* in UNIX Network Programming vol 1, 3rd edition */
if (errno == EWOULDBLOCK || errno == EINPROGRESS)
zh->state = ZOO_CONNECTING_STATE;
else
return api_epilog(zh,handle_socket_error_msg(zh,__LINE__,
ZCONNECTIONLOSS,"connect() call failed"));
} else {
//否则调用prime_connection,对zk服务端做一些握手验证等等,这个函数后面再看
if((rc=prime_connection(zh))!=0)
return api_epilog(zh,rc);

LOG_INFO(("Initiated connection to server [%s]",
format_endpoint_info(&zh->addrs[zh->connect_index])));
}
}
//设置初始的poll等待时间为传入的recv_timeout的三分之一
*fd = zh->fd;
*tv = get_timeval(zh->recv_timeout/3);
zh->last_recv = now;
zh->last_send = now;
zh->last_ping = now;
}
//如果连接建立
if (zh->fd != -1) {
//计算有多久没有recv和send数据了
int idle_recv = calculate_interval(&zh->last_recv, &now);
int idle_send = calculate_interval(&zh->last_send, &now);
int recv_to = zh->recv_timeout*2/3 - idle_recv;
int send_to = zh->recv_timeout/3;
// have we exceeded the receive timeout threshold?
//如果空闲recv时间达到2/3*timeout,那么超时,关闭连接
if (recv_to <= 0) {
// We gotta cut our losses and connect to someone else
errno = ETIMEDOUT;
*interest=0;
*tv = get_timeval(0);
return api_epilog(zh,handle_socket_error_msg(zh,
__LINE__,ZOPERATIONTIMEOUT,
"connection to %s timed out (exceeded timeout by %dms)",
format_endpoint_info(&zh->addrs[zh->connect_index]),
-recv_to));

}
//如果空闲send时间达到1/3*timeout
//那么发送ping(send_ping->adaptor_send_queue->flush_send_queue->send_buffer->zookeeper_send->send)
// We only allow 1/3 of our timeout time to expire before sending
// a PING
if (zh->state==ZOO_CONNECTED_STATE) {
send_to = zh->recv_timeout/3 - idle_send;
if (send_to <= 0) {
if (zh->sent_requests.head==0) {
int rc=send_ping(zh);
if (rc < 0){
LOG_ERROR(("failed to send PING request (zk retcode=%d)",rc));
return api_epilog(zh,rc);
}
}
send_to = zh->recv_timeout/3;
}
}
// choose the lesser value as the timeout
//计算poll的等待时间next_deadline
*tv = get_timeval(recv_to < send_to? recv_to:send_to);
zh->next_deadline.tv_sec = now.tv_sec + tv->tv_sec;
zh->next_deadline.tv_usec = now.tv_usec + tv->tv_usec;
if (zh->next_deadline.tv_usec > 1000000) {
zh->next_deadline.tv_sec += zh->next_deadline.tv_usec / 1000000;
zh->next_deadline.tv_usec = zh->next_deadline.tv_usec % 1000000;
}
//标识目前客户端对read事件(ping)感兴趣
*interest = ZOOKEEPER_READ;
/* we are interested in a write if we are connected and have something
* to send, or we are waiting for a connect to finish. */
//如果zh->to_send有内容那么也关注write事件
if ((zh->to_send.head && (zh->state == ZOO_CONNECTED_STATE))
|| zh->state == ZOO_CONNECTING_STATE) {
*interest |= ZOOKEEPER_WRITE;
}
}
return api_epilog(zh,ZOK);
}

可以看到zookeeper_interest决定了poll会睡眠多久,并且决定了zookeeper目前是对read还是write感兴趣

下面来看看zookeeper_process

zookeeper_process中有个很重要的函数check_events,先看看他

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
do_io->zookeeper_process->check_events
static int check_events(zhandle_t *zh, int events)
{
if (zh->fd == -1)
return ZINVALIDSTATE;
//如果描述符触发写事件,并且状态为ZOO_CONNECTING_STATE状态,说明刚刚建立连接
if ((events&ZOOKEEPER_WRITE)&&(zh->state == ZOO_CONNECTING_STATE)) {
int rc, error;
socklen_t len = sizeof(error);
rc = getsockopt(zh->fd, SOL_SOCKET, SO_ERROR, &error, &len);
/* the description in section 16.4 "Non-blocking connect"
* in UNIX Network Programming vol 1, 3rd edition, points out
* that sometimes the error is in errno and sometimes in error */
if (rc < 0 || error) {
if (rc == 0)
errno = error;
return handle_socket_error_msg(zh, __LINE__,ZCONNECTIONLOSS,
"server refused to accept the client");
}
//进入prime_connecttion
if((rc=prime_connection(zh))!=0)
return rc;
LOG_INFO(("initiated connection to server [%s]",
format_endpoint_info(&zh->addrs[zh->connect_index])));
return ZOK;
}

连接建立后会进入prime_connection,看看这个函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
do_io->zookeeper_process->check_events->prime_connection
static int prime_connection(zhandle_t *zh)
{
int rc;
/*this is the size of buffer to serialize req into*/
char buffer_req[HANDSHAKE_REQ_SIZE];
int len = sizeof(buffer_req);
int hlen = 0;
struct connect_req req;
//设置协议req的version,sessionId,passwd等等
req.protocolVersion = 0;
req.sessionId = zh->client_id.client_id;
req.passwd_len = sizeof(req.passwd);
memcpy(req.passwd, zh->client_id.passwd, sizeof(zh->client_id.passwd));
req.timeOut = zh->recv_timeout;
req.lastZxidSeen = zh->last_zxid;
hlen = htonl(len);
/* We are running fast and loose here, but this string should fit in the initial buffer! */
//发送协议头长度,以及整个包??这里为啥不是异步的,没看懂
rc=zookeeper_send(zh->fd, &hlen, sizeof(len));
serialize_prime_connect(&req, buffer_req);
rc=rc<0 ? rc : zookeeper_send(zh->fd, buffer_req, len);
if (rc<0) {
return handle_socket_error_msg(zh, __LINE__, ZCONNECTIONLOSS,
"failed to send a handshake packet: %s", strerror(errno));
}
//从CONNECTING状态转换为ASSOCIATING状态
zh->state = ZOO_ASSOCIATING_STATE;

//将input_buffer标记为primer_buffer
zh->input_buffer = &zh->primer_buffer;
/* This seems a bit weird to to set the offset to 4, but we already have a
* length, so we skip reading the length (and allocating the buffer) by
* saying that we are already at offset 4 */
//设置已经当前input_buffer便宜为4
zh->input_buffer->curr_offset = 4;

return ZOK;
}

继续刚才的check_events函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
do_io->zookeeper_process->check_events
static int check_events(zhandle_t *zh, int events)
{
...
//如果to_send链表有内容,且触发事件为写事件,那么flush_send_queue,此时数据会被发出
if (zh->to_send.head && (events&ZOOKEEPER_WRITE)) {
/* make the flush call non-blocking by specifying a 0 timeout */
int rc=flush_send_queue(zh,0);
if (rc < 0)
return handle_socket_error_msg(zh,__LINE__,ZCONNECTIONLOSS,
"failed while flushing send queue");
}
//如果触发事件为读
if (events&ZOOKEEPER_READ) {
int rc;
if (zh->input_buffer == 0) {
zh->input_buffer = allocate_buffer(0,0);
}

//读取数据,读取一个文件头大小后申请响应的大小
rc = recv_buffer(zh->fd, zh->input_buffer);
if (rc < 0) {
return handle_socket_error_msg(zh, __LINE__,ZCONNECTIONLOSS,
"failed while receiving a server response");
}
if (rc > 0) {
//如果input_buffer不为primer_buffer,说明不是刚建立的连接,那么把input_buffer的内容放入to_process链表等待被处理
gettimeofday(&zh->last_recv, 0);
if (zh->input_buffer != &zh->primer_buffer) {
queue_buffer(&zh->to_process, zh->input_buffer, 0);
//如果是刚建立的连接
} else {
int64_t oldid,newid;
//deserialize
deserialize_prime_response(&zh->primer_storage, zh->primer_buffer.buffer);
/* We are processing the primer_buffer, so we need to finish
* the connection handshake */
oldid = zh->client_id.client_id;
newid = zh->primer_storage.sessionId;
//把客户端的client_id和服务端的sessionid做比较,如果不同说明会话已经过期,此时会触发一个过期事件
if (oldid != 0 && oldid != newid) {
zh->state = ZOO_EXPIRED_SESSION_STATE;
errno = ESTALE;
return handle_socket_error_msg(zh,__LINE__,ZSESSIONEXPIRED,
"sessionId=%#llx has expired.",oldid);
} else {
//把客户端id设置为服务端的session_id
zh->recv_timeout = zh->primer_storage.timeOut;
zh->client_id.client_id = newid;

memcpy(zh->client_id.passwd, &zh->primer_storage.passwd,
sizeof(zh->client_id.passwd));
//进入ZOO_CONNECTED_STATE状态
zh->state = ZOO_CONNECTED_STATE;
LOG_INFO(("session establishment complete on server [%s], sessionId=%#llx, negotiated timeout=%d",
format_endpoint_info(&zh->addrs[zh->connect_index]),
newid, zh->recv_timeout));
/* we want the auth to be sent for, but since both call push to front
we need to call send_watch_set first */
//发送AUTH和WATCH信息
send_set_watches(zh);
/* send the authentication packet now */
send_auth_info(zh);
LOG_DEBUG(("Calling a watcher for a ZOO_SESSION_EVENT and the state=ZOO_CONNECTED_STATE"));
zh->input_buffer = 0; // just in case the watcher calls zookeeper_process() again
PROCESS_SESSION_EVENT(zh, ZOO_CONNECTED_STATE);
}
}
zh->input_buffer = 0;
} else {
// zookeeper_process was called but there was nothing to read
// from the socket
return ZNOTHING;
}
}
return ZOK;
}

这个send_set_watches很关键,把自己已有的watch信息全部收集起来发给服务端,服务端会把这些信息和自己的信息做比对

如果有不同会发送给客户端事件,这是为了防止心跳超时后,会话没有超时时,因为不是同一个TCP连接导致信息丢失。

这个函数能保证在会话期间的任何节点变化,都能触发watch函数的调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
do_io->zookeeper_process->check_events->send_set_watches
static int send_set_watches(zhandle_t *zh)
{
struct oarchive *oa;
struct RequestHeader h = { STRUCT_INITIALIZER(xid , SET_WATCHES_XID), STRUCT_INITIALIZER(type , ZOO_SETWATCHES_OP)};
struct SetWatches req;
int rc;

req.relativeZxid = zh->last_zxid;
req.dataWatches.data = collect_keys(zh->active_node_watchers, (int*)&req.dataWatches.count);
req.existWatches.data = collect_keys(zh->active_exist_watchers, (int*)&req.existWatches.count);
req.childWatches.data = collect_keys(zh->active_child_watchers, (int*)&req.childWatches.count);

// return if there are no pending watches
if (!req.dataWatches.count && !req.existWatches.count &&
!req.childWatches.count) {
free_key_list(req.dataWatches.data, req.dataWatches.count);
free_key_list(req.existWatches.data, req.existWatches.count);
free_key_list(req.childWatches.data, req.childWatches.count);
return ZOK;
}


oa = create_buffer_oarchive();
rc = serialize_RequestHeader(oa, "header", &h);
rc = rc < 0 ? rc : serialize_SetWatches(oa, "req", &req);
/* add this buffer to the head of the send queue */
rc = rc < 0 ? rc : queue_front_buffer_bytes(&zh->to_send, get_buffer(oa),
get_buffer_len(oa));
/* We queued the buffer, so don't free it */
close_buffer_oarchive(&oa, 0);
free_key_list(req.dataWatches.data, req.dataWatches.count);
free_key_list(req.existWatches.data, req.existWatches.count);
free_key_list(req.childWatches.data, req.childWatches.count);
LOG_DEBUG(("Sending set watches request to %s",format_current_endpoint_info(zh)));
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
do_io->zookeeper_process->check_events->queue_session_event
// IO thread queues session events to be processed by the completion thread
static int queue_session_event(zhandle_t *zh, int state)
{
int rc;
struct WatcherEvent evt = { ZOO_SESSION_EVENT, state, "" };
struct ReplyHeader hdr = { WATCHER_EVENT_XID, 0, 0 };
struct oarchive *oa;
completion_list_t *cptr;

if ((oa=create_buffer_oarchive())==NULL) {
LOG_ERROR(("out of memory"));
goto error;
}
rc = serialize_ReplyHeader(oa, "hdr", &hdr);
rc = rc<0?rc: serialize_WatcherEvent(oa, "event", &evt);
if(rc<0){
close_buffer_oarchive(&oa, 1);
goto error;
}
cptr = create_completion_entry(WATCHER_EVENT_XID,-1,0,0,0,0);
cptr->buffer = allocate_buffer(get_buffer(oa), get_buffer_len(oa));
cptr->buffer->curr_offset = get_buffer_len(oa);
if (!cptr->buffer) {
free(cptr);
close_buffer_oarchive(&oa, 1);
goto error;
}
/* We queued the buffer, so don't free it */
close_buffer_oarchive(&oa, 0);
cptr->c.watcher_result = collectWatchers(zh, ZOO_SESSION_EVENT, "");
//将watcher的内容放入completions_to_process链表内,由completion线程进行调用
queue_completion(&zh->completions_to_process, cptr, 0);
if (process_async(zh->outstanding_sync)) {
process_completions(zh);
}
return ZOK;
error:
errno=ENOMEM;
return ZSYSTEMERROR;
}

check_events看完了,回到zookeeper_process

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
nt zookeeper_process(zhandle_t *zh, int events)
{
buffer_list_t *bptr;
int rc;

if (zh==NULL)
return ZBADARGUMENTS;
if (is_unrecoverable(zh))
return ZINVALIDSTATE;
api_prolog(zh);
IF_DEBUG(checkResponseLatency(zh));
rc = check_events(zh, events);
if (rc!=ZOK)
return api_epilog(zh, rc);

IF_DEBUG(isSocketReadable(zh));

//如果to_process链表有内容,也就是接受到服务器的信息,那么解析出来
while (rc >= 0 && (bptr=dequeue_buffer(&zh->to_process))) {
struct ReplyHeader hdr;
struct iarchive *ia = create_buffer_iarchive(
bptr->buffer, bptr->curr_offset);
deserialize_ReplyHeader(ia, "hdr", &hdr);
if (hdr.zxid > 0) {
zh->last_zxid = hdr.zxid;
} else {
// fprintf(stderr, "Got %#x for %#x\n", hdr.zxid, hdr.xid);
}
//如果类型是WATCHER_EVENT_XID,那么创建一个WATCHER_EVENT_XID的watch放入completions_to_process队列让异步completions队列去处理
if (hdr.xid == WATCHER_EVENT_XID) {
struct WatcherEvent evt;
int type = 0;
char *path = NULL;
completion_list_t *c = NULL;

LOG_DEBUG(("Processing WATCHER_EVENT"));

deserialize_WatcherEvent(ia, "event", &evt);
type = evt.type;
path = evt.path;
/* We are doing a notification, so there is no pending request */
c = create_completion_entry(WATCHER_EVENT_XID,-1,0,0,0,0);
c->buffer = bptr;
c->c.watcher_result = collectWatchers(zh, type, path);

// We cannot free until now, otherwise path will become invalid
deallocate_WatcherEvent(&evt);
queue_completion(&zh->completions_to_process, c, 0);
} else if (hdr.xid == SET_WATCHES_XID) {
//如果类型是SET_WATCHES_XID,那么不处理
LOG_DEBUG(("Processing SET_WATCHES"));
free_buffer(bptr);
//如果类型是验证,那么直接自己调用auth_completion_func来处理(不从completions线程去处理)
} else if (hdr.xid == AUTH_XID){
LOG_DEBUG(("Processing AUTH_XID"));

/* special handling for the AUTH response as it may come back
* out-of-band */
auth_completion_func(hdr.err,zh);
free_buffer(bptr);
/* authentication completion may change the connection state to
* unrecoverable */
if(is_unrecoverable(zh)){
handle_error(zh, ZAUTHFAILED);
close_buffer_iarchive(&ia);
return api_epilog(zh, ZAUTHFAILED);
}
} else {
int rc = hdr.err;
/* Find the request corresponding to the response */
completion_list_t *cptr = dequeue_completion(&zh->sent_requests);

/* [ZOOKEEPER-804] Don't assert if zookeeper_close has been called. */
if (zh->close_requested == 1 && cptr == NULL) {
LOG_DEBUG(("Completion queue has been cleared by zookeeper_close()"));
close_buffer_iarchive(&ia);
return api_epilog(zh,ZINVALIDSTATE);
}
assert(cptr);
/* The requests are going to come back in order */
if (cptr->xid != hdr.xid) {
LOG_DEBUG(("Processing unexpected or out-of-order response!"));

// received unexpected (or out-of-order) response
close_buffer_iarchive(&ia);
free_buffer(bptr);
// put the completion back on the queue (so it gets properly
// signaled and deallocated) and disconnect from the server
queue_completion(&zh->sent_requests,cptr,1);
return handle_socket_error_msg(zh, __LINE__,ZRUNTIMEINCONSISTENCY,
"unexpected server response: expected %#x, but received %#x",
hdr.xid,cptr->xid);
}

activateWatcher(zh, cptr->watcher, rc);

//如果是异步请求
if (cptr->c.void_result != SYNCHRONOUS_MARKER) {
//如果是ping消息,那么直接更新last_ping即可
if(hdr.xid == PING_XID){
int elapsed = 0;
struct timeval now;
gettimeofday(&now, 0);
elapsed = calculate_interval(&zh->last_ping, &now);
LOG_DEBUG(("Got ping response in %d ms", elapsed));

// Nothing to do with a ping response
free_buffer(bptr);
destroy_completion_entry(cptr);
} else {
LOG_DEBUG(("Queueing asynchronous response"));
//如果其他的,那么发给completion线程去处理
cptr->buffer = bptr;
queue_completion(&zh->completions_to_process, cptr, 0);
}
} else {
//同步消息,那么自己来处理
struct sync_completion
*sc = (struct sync_completion*)cptr->data;
sc->rc = rc;

process_sync_completion(cptr, sc, ia, zh);

notify_sync_completion(sc);
free_buffer(bptr);
zh->outstanding_sync--;
destroy_completion_entry(cptr);
}
}

close_buffer_iarchive(&ia);

}
if (process_async(zh->outstanding_sync)) {
process_completions(zh);
}
return api_epilog(zh,ZOK);
}

总结

一共5种状态

1
2
3
4
5
const int ZOO_EXPIRED_SESSION_STATE = EXPIRED_SESSION_STATE_DEF;
const int ZOO_AUTH_FAILED_STATE = AUTH_FAILED_STATE_DEF;
const int ZOO_CONNECTING_STATE = CONNECTING_STATE_DEF;
const int ZOO_ASSOCIATING_STATE = ASSOCIATING_STATE_DEF;
const int ZOO_CONNECTED_STATE = CONNECTED_STATE_DEF;

对它的状态转换做一个示意图

1
2
3
4
5
6
                connect返回-1且errnr == EWOULDBLOCK
notconnected ------------------------------------------ connecting--------------------------|
| |
| prime_connection |
different session id | |
expired_session -------------------------------------- associating ------------------- connected ---------auth_failed

有如下四种情景:

1 使用zookeeper_init刚连上时,会触发SESSION_EVENT且状态为CONNECTED的watcher

2 如果发生了节点变化,会Process相应的WATCHER_EVENT的watcher

3 当client和zookeeper超时时间大于心跳时间且小于会话时间时

原先的连接断线后会触发SESSION_EVENT且状态为CONNECTING的watcher,当重新连上后会触发SESSION_EVENT且状态为CONNECTED的watcher

如果在这段时间内监视的节点发生变化,还会Process相应的WATCHER_EVENT的watcher

4 如果超时时间大于会话时间时

原先的连接断线后会触发SESSION_EVENT且状态为CONNECTING的watch,当重新连上后会触发SESSION_EVENT且状态为EXPIRED_SESSION的watcher

此时需要自己重新使用zookeeper_init连接,并且重新注册自己的watcher,这时重新进入情景1,即为触发SESSION_EVENT且状态为CONNECTED的watcher


应该这样正确使用zookeeper_mt库:

使用zookeeper_init注册一个全局watcher如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
static void watcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx)
{
if(type == ZOO_SESSION_EVENT)
{
if (state == ZOO_CONNECTED_STATE)
{
log_info("connected zookeeper");
//第一次正常连接和超时重连都会触发该状态,所以要判断是否是超时引起的该状态
//如果是会话超时引起的,需要重新设置观察器
//对之前注册的每个路径,都需要显式的触发一次
//(例如原来是调用的get_children监视,那么现在需要调用get_children获取一次)
//防止因为和zk的会话超时,导致这段时间内的节点变化监视丢失
if(reconnection_flag) {
reconnection_flag = 0;
...
//重新设置对路径的观察事件
}
}
else if(state == ZOO_AUTH_FAILED_STATE)
{
log_error("Authentication failure. Shutting down...");
zookeeper_close(zh);
}
else if(state == ZOO_EXPIRED_SESSION_STATE)
{
log_error("Session expired. Shutting down...");
//超时会话过期,设置重连标记位
reconnection_flag = 1;
//关闭原来的zookeeper handle,并且用zookeeper_init尝试重连
zookeeper_close(zh);
... = zookeeper_init(...);
}
}
else
{
//判断事件和路径,分发给不同的调用逻辑
...
}
}

要注意的是,watcher是后台线程,因此对某些和主线程共享的变量,需要添加互斥锁

zookeeper的客户端必须做成事件通知机制,多线程确实是一个比较简单的方案

但是对于开发而言,如果要考虑那么多方方面面,确实会很蛋疼,我认为用本文开始提到的zookeeper c库会更加简单方便