zookeeper c 客户端源码分析以及使用注意点

原创内容,转载请注明出处

Posted by Weakyon Blog on November 21, 2015

zookeeper的库用起来很麻烦,建议使用我封装的zookeeper c库,能考虑更少的zookeeper本身的逻辑

链接:

ylibzkevent


在上一篇文章查CLOSE_WAIT泄漏问题的时候稍微看了下zookeeper c源码,大致对他的流程来做一个分析。

zookeeper的C客户端分为mt库和st库(多线程和单线程),一般操作都是以多线程库为主。

多线程库分为三个线程,主线程,io线程和completion线程

主线程就是调用API的线程,io线程负责网络通信,对异步请求和watch响应等,IO线程会发给completion线程,由completion线程异步完成

主进程

首先看下zookeeper_init是如何创建io线程和completion线程的

  
zookeeper_init-->adaptor_init-->start_threads
void start_threads(zhandle_t* zh)
{
    ...
    api_prolog(zh);
    rc=pthread_create(&adaptor->io, 0, do_io, zh);
    rc=pthread_create(&adaptor->completion, 0, do_completion, zh);
    wait_for_others(zh);
    api_epilog(zh, 0);
}

io进程

do_io就是io进程的主要逻辑了

  
void *do_io(void *v)
{
    zhandle_t *zh = (zhandle_t*)v;
    struct pollfd fds[2];
    struct adaptor_threads *adaptor_threads = zh->adaptor_priv;

    api_prolog(zh);
    notify_thread_ready(zh);
    LOG_DEBUG(("started IO thread"));
    fds[0].fd=adaptor_threads->self_pipe[0];
    fds[0].events=POLLIN;
    while(!zh->close_requested) {
        struct timeval tv;
        int fd;
        int interest;
        int timeout;
        int maxfd=1;
        int rc;

        zookeeper_interest(zh, &fd, &interest, &tv);
        //将zookeeper_intereset里建立的新描述符加入监听事件里,睡眠一定的超时时间直到被唤醒
        if (fd != -1) {
            fds[1].fd=fd;
            fds[1].events=(interest&ZOOKEEPER_READ)?POLLIN:0;
            fds[1].events|=(interest&ZOOKEEPER_WRITE)?POLLOUT:0;
            maxfd=2;
        }
        timeout=tv.tv_sec * 1000 + (tv.tv_usec/1000);

        poll(fds,maxfd,timeout);
        if (fd != -1) {
            interest=(fds[1].revents&POLLIN)?ZOOKEEPER_READ:0;
            interest|=((fds[1].revents&POLLOUT)||(fds[1].revents&POLLHUP))?ZOOKEEPER_WRITE:0;
        }
        if(fds[0].revents&POLLIN){
            // flush the pipe
            char b[128];
            while(read(adaptor_threads->self_pipe[0],b,sizeof(b))==sizeof(b)){}
        }
        // dispatch zookeeper events
        rc = zookeeper_process(zh, interest);
        // check the current state of the zhandle and terminate 
        // if it is_unrecoverable()
        if(is_unrecoverable(zh))
            break;
    }
    api_epilog(zh, 0);
    LOG_DEBUG(("IO thread terminated"));
    return 0;
}

可以看到,do_io线程是一个使用poll多路复用的循环,主要是监视zh->fd和adaptor_threads->self_pipe[0]。

zh->fd是和服务端通信的描述符,self_pipe是用来唤醒这个线程的(wakeup_io_thread)

循环开始使用zookeeper_interest对服务端的网络进行检查,并且设置了poll中的等待时间

而后poll会进行睡眠等待被服务端或者主进程唤醒,唤醒后执行zookeeper_process来做具体的操作。

那么下面先看下zookeeper_interest的具体操作

  
do_io->zookeeper_interest
int zookeeper_interest(zhandle_t *zh, int *fd, int *interest,
     struct timeval *tv)
{
    struct timeval now;
    if(zh==0 || fd==0 ||interest==0 || tv==0)
        return ZBADARGUMENTS;
    if (is_unrecoverable(zh))
        return ZINVALIDSTATE;
    gettimeofday(&now, 0);
    //如果存在deadline时间,那么打印超过了deadline时间多久
    if(zh->next_deadline.tv_sec!=0 || zh->next_deadline.tv_usec!=0){
        int time_left = calculate_interval(&zh->next_deadline, &now);
        if (time_left > 10)
            LOG_WARN(("Exceeded deadline by %dms", time_left));
    }
    //增加zh的引用计数,而后对变量进行初始化
    api_prolog(zh);
    *fd = zh->fd; 
    *interest = 0;
    tv->tv_sec = 0;
    tv->tv_usec = 0;
    //如果连接不存在
    if (*fd == -1) {
        if (zh->connect_index == zh->addrs_count) {
            /* Wait a bit before trying again so that we don't spin */
            zh->connect_index = 0;
        }else {
            //重新建立连接,并且关nagle算法
            int rc;
            int enable_tcp_nodelay = 1;
            int ssoresult;

            zh->fd = socket(zh->addrs[zh->connect_index].ss_family, SOCK_STREAM, 0);
            if (zh->fd < 0) {
                return api_epilog(zh,handle_socket_error_msg(zh,__LINE__,
                                                             ZSYSTEMERROR, "socket() call failed"));
            }
            ssoresult = setsockopt(zh->fd, IPPROTO_TCP, TCP_NODELAY, &enable_tcp_nodelay, sizeof(enable_tcp_nodelay));
            if (ssoresult != 0) {
                LOG_WARN(("Unable to set TCP_NODELAY, operation latency may be effected"));
            }
            fcntl(zh->fd, F_SETFL, O_NONBLOCK|fcntl(zh->fd, F_GETFL, 0));
            {
                rc = connect(zh->fd, (struct sockaddr*) &zh->addrs[zh->connect_index], sizeof(struct sockaddr_in));
            }
            //因为是非阻塞socket,所以没连上的时候此时把状态设置为connecting
            if (rc == -1) {
                /* we are handling the non-blocking connect according to
                 * the description in section 16.3 "Non-blocking connect"
                 * in UNIX Network Programming vol 1, 3rd edition */
                if (errno == EWOULDBLOCK || errno == EINPROGRESS)
                    zh->state = ZOO_CONNECTING_STATE;
                else
                    return api_epilog(zh,handle_socket_error_msg(zh,__LINE__,
                            ZCONNECTIONLOSS,"connect() call failed"));
            } else {
            //否则调用prime_connection,对zk服务端做一些握手验证等等,这个函数后面再看
                if((rc=prime_connection(zh))!=0)
                    return api_epilog(zh,rc);

                LOG_INFO(("Initiated connection to server [%s]",
                        format_endpoint_info(&zh->addrs[zh->connect_index])));
            }
        }
        //设置初始的poll等待时间为传入的recv_timeout的三分之一
        *fd = zh->fd;
        *tv = get_timeval(zh->recv_timeout/3);
        zh->last_recv = now;
        zh->last_send = now;
        zh->last_ping = now;
    }
    //如果连接建立
    if (zh->fd != -1) {
        //计算有多久没有recv和send数据了
        int idle_recv = calculate_interval(&zh->last_recv, &now);
        int idle_send = calculate_interval(&zh->last_send, &now);
        int recv_to = zh->recv_timeout*2/3 - idle_recv;
        int send_to = zh->recv_timeout/3;
        // have we exceeded the receive timeout threshold?
        //如果空闲recv时间达到2/3*timeout,那么超时,关闭连接
        if (recv_to <= 0) {
            // We gotta cut our losses and connect to someone else
         errno = ETIMEDOUT;
        *interest=0;
            *tv = get_timeval(0);
            return api_epilog(zh,handle_socket_error_msg(zh,
                    __LINE__,ZOPERATIONTIMEOUT,
                    "connection to %s timed out (exceeded timeout by %dms)",
                    format_endpoint_info(&zh->addrs[zh->connect_index]),
                    -recv_to));

        }
        //如果空闲send时间达到1/3*timeout
        //那么发送ping(send_ping->adaptor_send_queue->flush_send_queue->send_buffer->zookeeper_send->send)
        // We only allow 1/3 of our timeout time to expire before sending
        // a PING
        if (zh->state==ZOO_CONNECTED_STATE) {
            send_to = zh->recv_timeout/3 - idle_send;
            if (send_to <= 0) {
                if (zh->sent_requests.head==0) {
                    int rc=send_ping(zh);
                    if (rc < 0){
                        LOG_ERROR(("failed to send PING request (zk retcode=%d)",rc));
                        return api_epilog(zh,rc);
                    }
                }
                send_to = zh->recv_timeout/3;
            }
        }
        // choose the lesser value as the timeout
        //计算poll的等待时间next_deadline
        *tv = get_timeval(recv_to < send_to? recv_to:send_to);
        zh->next_deadline.tv_sec = now.tv_sec + tv->tv_sec;
        zh->next_deadline.tv_usec = now.tv_usec + tv->tv_usec;
        if (zh->next_deadline.tv_usec > 1000000) {
            zh->next_deadline.tv_sec += zh->next_deadline.tv_usec / 1000000;
            zh->next_deadline.tv_usec = zh->next_deadline.tv_usec % 1000000;
        }
        //标识目前客户端对read事件(ping)感兴趣
        *interest = ZOOKEEPER_READ;
        /* we are interested in a write if we are connected and have something
         * to send, or we are waiting for a connect to finish. */
        //如果zh->to_send有内容那么也关注write事件
        if ((zh->to_send.head && (zh->state == ZOO_CONNECTED_STATE))
        || zh->state == ZOO_CONNECTING_STATE) {
            *interest |= ZOOKEEPER_WRITE;
        }
    }
    return api_epilog(zh,ZOK);
}

可以看到zookeeper_interest决定了poll会睡眠多久,并且决定了zookeeper目前是对read还是write感兴趣

下面来看看zookeeper_process

zookeeper_process中有个很重要的函数check_events,先看看他

  
do_io->zookeeper_process->check_events
static int check_events(zhandle_t *zh, int events)
{
    if (zh->fd == -1)
        return ZINVALIDSTATE;
    //如果描述符触发写事件,并且状态为ZOO_CONNECTING_STATE状态,说明刚刚建立连接
    if ((events&ZOOKEEPER_WRITE)&&(zh->state == ZOO_CONNECTING_STATE)) {
        int rc, error;
        socklen_t len = sizeof(error);
        rc = getsockopt(zh->fd, SOL_SOCKET, SO_ERROR, &error, &len);
        /* the description in section 16.4 "Non-blocking connect"
         * in UNIX Network Programming vol 1, 3rd edition, points out
         * that sometimes the error is in errno and sometimes in error */
        if (rc < 0 || error) {
            if (rc == 0)
                errno = error;
            return handle_socket_error_msg(zh, __LINE__,ZCONNECTIONLOSS,
                "server refused to accept the client");
        }
        //进入prime_connecttion
        if((rc=prime_connection(zh))!=0)
            return rc;
        LOG_INFO(("initiated connection to server [%s]",
                format_endpoint_info(&zh->addrs[zh->connect_index])));
        return ZOK;
    }

连接建立后会进入prime_connection,看看这个函数

  
do_io->zookeeper_process->check_events->prime_connection
static int prime_connection(zhandle_t *zh) 
{
    int rc;
    /*this is the size of buffer to serialize req into*/
    char buffer_req[HANDSHAKE_REQ_SIZE];
    int len = sizeof(buffer_req);
    int hlen = 0; 
    struct connect_req req; 
    //设置协议req的version,sessionId,passwd等等
    req.protocolVersion = 0; 
    req.sessionId = zh->client_id.client_id;
    req.passwd_len = sizeof(req.passwd);
    memcpy(req.passwd, zh->client_id.passwd, sizeof(zh->client_id.passwd));
    req.timeOut = zh->recv_timeout;
    req.lastZxidSeen = zh->last_zxid;
    hlen = htonl(len);
    /* We are running fast and loose here, but this string should fit in the initial buffer! */
    //发送协议头长度,以及整个包??这里为啥不是异步的,没看懂
    rc=zookeeper_send(zh->fd, &hlen, sizeof(len));
    serialize_prime_connect(&req, buffer_req);
    rc=rc<0 ? rc : zookeeper_send(zh->fd, buffer_req, len);
    if (rc<0) {
        return handle_socket_error_msg(zh, __LINE__, ZCONNECTIONLOSS,
                "failed to send a handshake packet: %s", strerror(errno));
    }    
    //从CONNECTING状态转换为ASSOCIATING状态
    zh->state = ZOO_ASSOCIATING_STATE;

    //将input_buffer标记为primer_buffer
    zh->input_buffer = &zh->primer_buffer;
    /* This seems a bit weird to to set the offset to 4, but we already have a
     * length, so we skip reading the length (and allocating the buffer) by
     * saying that we are already at offset 4 */
    //设置已经当前input_buffer便宜为4
    zh->input_buffer->curr_offset = 4; 

    return ZOK; 
}

继续刚才的check_events函数

do_io->zookeeper_process->check_events
static int check_events(zhandle_t *zh, int events)
{
    ...
    //如果to_send链表有内容,且触发事件为写事件,那么flush_send_queue,此时数据会被发出
    if (zh->to_send.head && (events&ZOOKEEPER_WRITE)) {
        /* make the flush call non-blocking by specifying a 0 timeout */
        int rc=flush_send_queue(zh,0);
        if (rc < 0)
            return handle_socket_error_msg(zh,__LINE__,ZCONNECTIONLOSS,
                "failed while flushing send queue");
    }
    //如果触发事件为读
    if (events&ZOOKEEPER_READ) {
        int rc;
        if (zh->input_buffer == 0) {
            zh->input_buffer = allocate_buffer(0,0);
        }

        //读取数据,读取一个文件头大小后申请响应的大小
        rc = recv_buffer(zh->fd, zh->input_buffer);
        if (rc < 0) {
            return handle_socket_error_msg(zh, __LINE__,ZCONNECTIONLOSS,
                "failed while receiving a server response");
        }
        if (rc > 0) {
            //如果input_buffer不为primer_buffer,说明不是刚建立的连接,那么把input_buffer的内容放入to_process链表等待被处理
            gettimeofday(&zh->last_recv, 0);
            if (zh->input_buffer != &zh->primer_buffer) {
                queue_buffer(&zh->to_process, zh->input_buffer, 0);
            //如果是刚建立的连接
            } else  {
                int64_t oldid,newid;
                //deserialize
                deserialize_prime_response(&zh->primer_storage, zh->primer_buffer.buffer);
                /* We are processing the primer_buffer, so we need to finish
                 * the connection handshake */
                oldid = zh->client_id.client_id;
                newid = zh->primer_storage.sessionId;
                //把客户端的client_id和服务端的sessionid做比较,如果不同说明会话已经过期,此时会触发一个过期事件
                if (oldid != 0 && oldid != newid) {
                    zh->state = ZOO_EXPIRED_SESSION_STATE;
                    errno = ESTALE;
                    return handle_socket_error_msg(zh,__LINE__,ZSESSIONEXPIRED,
                            "sessionId=%#llx has expired.",oldid);
                } else {
                    //把客户端id设置为服务端的session_id
                    zh->recv_timeout = zh->primer_storage.timeOut;
                    zh->client_id.client_id = newid;

                    memcpy(zh->client_id.passwd, &zh->primer_storage.passwd,
                           sizeof(zh->client_id.passwd));
                    //进入ZOO_CONNECTED_STATE状态
                    zh->state = ZOO_CONNECTED_STATE;
                    LOG_INFO(("session establishment complete on server [%s], sessionId=%#llx, negotiated timeout=%d",
                              format_endpoint_info(&zh->addrs[zh->connect_index]),
                              newid, zh->recv_timeout));
                    /* we want the auth to be sent for, but since both call push to front
                       we need to call send_watch_set first */
                    //发送AUTH和WATCH信息
                    send_set_watches(zh);
                    /* send the authentication packet now */
                    send_auth_info(zh);
                    LOG_DEBUG(("Calling a watcher for a ZOO_SESSION_EVENT and the state=ZOO_CONNECTED_STATE"));
                    zh->input_buffer = 0; // just in case the watcher calls zookeeper_process() again
                    PROCESS_SESSION_EVENT(zh, ZOO_CONNECTED_STATE);
                }
            }
            zh->input_buffer = 0;
        } else {
            // zookeeper_process was called but there was nothing to read
            // from the socket
            return ZNOTHING;
        }
    }
    return ZOK;
}

这个send_set_watches很关键,把自己已有的watch信息全部收集起来发给服务端,服务端会把这些信息和自己的信息做比对

如果有不同会发送给客户端事件,这是为了防止心跳超时后,会话没有超时时,因为不是同一个TCP连接导致信息丢失。

这个函数能保证在会话期间的任何节点变化,都能触发watch函数的调用

do_io->zookeeper_process->check_events->send_set_watches
static int send_set_watches(zhandle_t *zh)
{
    struct oarchive *oa;
    struct RequestHeader h = { STRUCT_INITIALIZER(xid , SET_WATCHES_XID), STRUCT_INITIALIZER(type , ZOO_SETWATCHES_OP)};
    struct SetWatches req;
    int rc;

    req.relativeZxid = zh->last_zxid;
    req.dataWatches.data = collect_keys(zh->active_node_watchers, (int*)&req.dataWatches.count);
    req.existWatches.data = collect_keys(zh->active_exist_watchers, (int*)&req.existWatches.count);
    req.childWatches.data = collect_keys(zh->active_child_watchers, (int*)&req.childWatches.count);

    // return if there are no pending watches
    if (!req.dataWatches.count && !req.existWatches.count &&
        !req.childWatches.count) {
        free_key_list(req.dataWatches.data, req.dataWatches.count);
        free_key_list(req.existWatches.data, req.existWatches.count);
        free_key_list(req.childWatches.data, req.childWatches.count);
        return ZOK;
    }


    oa = create_buffer_oarchive();
    rc = serialize_RequestHeader(oa, "header", &h);
    rc = rc < 0 ? rc : serialize_SetWatches(oa, "req", &req);
    /* add this buffer to the head of the send queue */
    rc = rc < 0 ? rc : queue_front_buffer_bytes(&zh->to_send, get_buffer(oa),
            get_buffer_len(oa));
    /* We queued the buffer, so don't free it */
    close_buffer_oarchive(&oa, 0);
    free_key_list(req.dataWatches.data, req.dataWatches.count);
    free_key_list(req.existWatches.data, req.existWatches.count);
    free_key_list(req.childWatches.data, req.childWatches.count);
    LOG_DEBUG(("Sending set watches request to %s",format_current_endpoint_info(zh)));
    return (rc < 0)?ZMARSHALLINGERROR:ZOK;
}
do_io->zookeeper_process->check_events->queue_session_event
// IO thread queues session events to be processed by the completion thread
static int queue_session_event(zhandle_t *zh, int state)
{
    int rc;
    struct WatcherEvent evt = { ZOO_SESSION_EVENT, state, "" };
    struct ReplyHeader hdr = { WATCHER_EVENT_XID, 0, 0 };
    struct oarchive *oa;
    completion_list_t *cptr;

    if ((oa=create_buffer_oarchive())==NULL) {
        LOG_ERROR(("out of memory"));
        goto error;
    }
    rc = serialize_ReplyHeader(oa, "hdr", &hdr);
    rc = rc<0?rc: serialize_WatcherEvent(oa, "event", &evt);
    if(rc<0){
        close_buffer_oarchive(&oa, 1);
        goto error;
    }
    cptr = create_completion_entry(WATCHER_EVENT_XID,-1,0,0,0,0);
    cptr->buffer = allocate_buffer(get_buffer(oa), get_buffer_len(oa));
    cptr->buffer->curr_offset = get_buffer_len(oa);
    if (!cptr->buffer) {
        free(cptr);
        close_buffer_oarchive(&oa, 1);
        goto error;
    }
    /* We queued the buffer, so don't free it */
    close_buffer_oarchive(&oa, 0);
    cptr->c.watcher_result = collectWatchers(zh, ZOO_SESSION_EVENT, "");
    //将watcher的内容放入completions_to_process链表内,由completion线程进行调用
    queue_completion(&zh->completions_to_process, cptr, 0);
    if (process_async(zh->outstanding_sync)) {
        process_completions(zh);
    }
    return ZOK;
error:
    errno=ENOMEM;
    return ZSYSTEMERROR;
}

check_events看完了,回到zookeeper_process

  
nt zookeeper_process(zhandle_t *zh, int events)
{
    buffer_list_t *bptr;
    int rc;

    if (zh==NULL)
        return ZBADARGUMENTS;
    if (is_unrecoverable(zh))
        return ZINVALIDSTATE;
    api_prolog(zh);
    IF_DEBUG(checkResponseLatency(zh));
    rc = check_events(zh, events);
    if (rc!=ZOK)
        return api_epilog(zh, rc);

    IF_DEBUG(isSocketReadable(zh));

    //如果to_process链表有内容,也就是接受到服务器的信息,那么解析出来
    while (rc >= 0 && (bptr=dequeue_buffer(&zh->to_process))) {
        struct ReplyHeader hdr;
        struct iarchive *ia = create_buffer_iarchive(
                                    bptr->buffer, bptr->curr_offset);
        deserialize_ReplyHeader(ia, "hdr", &hdr);
        if (hdr.zxid > 0) {
            zh->last_zxid = hdr.zxid;
        } else {
            // fprintf(stderr, "Got %#x for %#x\n", hdr.zxid, hdr.xid);
        }
        //如果类型是WATCHER_EVENT_XID,那么创建一个WATCHER_EVENT_XID的watch放入completions_to_process队列让异步completions队列去处理
        if (hdr.xid == WATCHER_EVENT_XID) {
            struct WatcherEvent evt;
            int type = 0;
            char *path = NULL;
            completion_list_t *c = NULL;

            LOG_DEBUG(("Processing WATCHER_EVENT"));

            deserialize_WatcherEvent(ia, "event", &evt);
            type = evt.type;
            path = evt.path;
            /* We are doing a notification, so there is no pending request */
            c = create_completion_entry(WATCHER_EVENT_XID,-1,0,0,0,0);
            c->buffer = bptr;
            c->c.watcher_result = collectWatchers(zh, type, path);

            // We cannot free until now, otherwise path will become invalid
            deallocate_WatcherEvent(&evt);
            queue_completion(&zh->completions_to_process, c, 0);
        } else if (hdr.xid == SET_WATCHES_XID) {
        //如果类型是SET_WATCHES_XID,那么不处理
            LOG_DEBUG(("Processing SET_WATCHES"));
            free_buffer(bptr);
        //如果类型是验证,那么直接自己调用auth_completion_func来处理(不从completions线程去处理)
        } else if (hdr.xid == AUTH_XID){
            LOG_DEBUG(("Processing AUTH_XID"));

            /* special handling for the AUTH response as it may come back
             * out-of-band */
            auth_completion_func(hdr.err,zh);
            free_buffer(bptr);
            /* authentication completion may change the connection state to
             * unrecoverable */
            if(is_unrecoverable(zh)){
                handle_error(zh, ZAUTHFAILED);
                close_buffer_iarchive(&ia);
                return api_epilog(zh, ZAUTHFAILED);
            }
        } else {
            int rc = hdr.err;
            /* Find the request corresponding to the response */
            completion_list_t *cptr = dequeue_completion(&zh->sent_requests);

            /* [ZOOKEEPER-804] Don't assert if zookeeper_close has been called. */
            if (zh->close_requested == 1 && cptr == NULL) {
                LOG_DEBUG(("Completion queue has been cleared by zookeeper_close()"));
                close_buffer_iarchive(&ia);
                return api_epilog(zh,ZINVALIDSTATE);
            }
            assert(cptr);
            /* The requests are going to come back in order */
            if (cptr->xid != hdr.xid) {
                LOG_DEBUG(("Processing unexpected or out-of-order response!"));

                // received unexpected (or out-of-order) response
                close_buffer_iarchive(&ia);
                free_buffer(bptr);
                // put the completion back on the queue (so it gets properly
                // signaled and deallocated) and disconnect from the server
                queue_completion(&zh->sent_requests,cptr,1);
                return handle_socket_error_msg(zh, __LINE__,ZRUNTIMEINCONSISTENCY,
                        "unexpected server response: expected %#x, but received %#x",
                        hdr.xid,cptr->xid);
            }

            activateWatcher(zh, cptr->watcher, rc);

            //如果是异步请求
            if (cptr->c.void_result != SYNCHRONOUS_MARKER) {
                //如果是ping消息,那么直接更新last_ping即可
                if(hdr.xid == PING_XID){
                    int elapsed = 0;
                    struct timeval now;
                    gettimeofday(&now, 0);
                    elapsed = calculate_interval(&zh->last_ping, &now);
                    LOG_DEBUG(("Got ping response in %d ms", elapsed));

                    // Nothing to do with a ping response
                    free_buffer(bptr);
                    destroy_completion_entry(cptr);
                } else {
                    LOG_DEBUG(("Queueing asynchronous response"));
                    //如果其他的,那么发给completion线程去处理
                    cptr->buffer = bptr;
                    queue_completion(&zh->completions_to_process, cptr, 0);
                }
            } else {
                //同步消息,那么自己来处理
                struct sync_completion
                        *sc = (struct sync_completion*)cptr->data;
                sc->rc = rc;

                process_sync_completion(cptr, sc, ia, zh);

                notify_sync_completion(sc);
                free_buffer(bptr);
                zh->outstanding_sync--;
                destroy_completion_entry(cptr);
            }
        }

        close_buffer_iarchive(&ia);

    }
    if (process_async(zh->outstanding_sync)) {
        process_completions(zh);
    }
    return api_epilog(zh,ZOK);
}

总结

一共5种状态

  
const int ZOO_EXPIRED_SESSION_STATE = EXPIRED_SESSION_STATE_DEF;
const int ZOO_AUTH_FAILED_STATE = AUTH_FAILED_STATE_DEF;
const int ZOO_CONNECTING_STATE = CONNECTING_STATE_DEF;
const int ZOO_ASSOCIATING_STATE = ASSOCIATING_STATE_DEF;
const int ZOO_CONNECTED_STATE = CONNECTED_STATE_DEF;

对它的状态转换做一个示意图

  
                connect返回-1errnr == EWOULDBLOCK
notconnected ------------------------------------------ connecting--------------------------|
                                                            |                               |
                                                            | prime_connection              |
                         different session id               |                               |
expired_session --------------------------------------  associating ------------------- connected ---------auth_failed

有如下四种情景:

1 使用zookeeper_init刚连上时,会触发SESSION_EVENT且状态为CONNECTED的watcher

2 如果发生了节点变化,会Process相应的WATCHER_EVENT的watcher

3 当client和zookeeper超时时间大于心跳时间且小于会话时间时

原先的连接断线后会触发SESSION_EVENT且状态为CONNECTING的watcher,当重新连上后会触发SESSION_EVENT且状态为CONNECTED的watcher

如果在这段时间内监视的节点发生变化,还会Process相应的WATCHER_EVENT的watcher

4 如果超时时间大于会话时间时

原先的连接断线后会触发SESSION_EVENT且状态为CONNECTING的watch,当重新连上后会触发SESSION_EVENT且状态为EXPIRED_SESSION的watcher

此时需要自己重新使用zookeeper_init连接,并且重新注册自己的watcher,这时重新进入情景1,即为触发SESSION_EVENT且状态为CONNECTED的watcher


应该这样正确使用zookeeper_mt库:

使用zookeeper_init注册一个全局watcher如下:

  
static void watcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx)
{
    if(type == ZOO_SESSION_EVENT)
    {   
        if (state == ZOO_CONNECTED_STATE)
        {   
            log_info("connected zookeeper");
            //第一次正常连接和超时重连都会触发该状态,所以要判断是否是超时引起的该状态
            //如果是会话超时引起的,需要重新设置观察器
            //对之前注册的每个路径,都需要显式的触发一次
            //(例如原来是调用的get_children监视,那么现在需要调用get_children获取一次)
            //防止因为和zk的会话超时,导致这段时间内的节点变化监视丢失 
            if(reconnection_flag) {
                reconnection_flag = 0;
                ...
                //重新设置对路径的观察事件
            }
        }   
        else if(state == ZOO_AUTH_FAILED_STATE)
        {   
            log_error("Authentication failure. Shutting down...");
            zookeeper_close(zh);
        }   
        else if(state == ZOO_EXPIRED_SESSION_STATE)
        {   
            log_error("Session expired. Shutting down...");
            //超时会话过期,设置重连标记位
            reconnection_flag = 1;
            //关闭原来的zookeeper handle,并且用zookeeper_init尝试重连
            zookeeper_close(zh);
            ... = zookeeper_init(...);
        }   
    }   
    else 
    {
        //判断事件和路径,分发给不同的调用逻辑
        ...
    }
}

要注意的是,watcher是后台线程,因此对某些和主线程共享的变量,需要添加互斥锁

zookeeper的客户端必须做成事件通知机制,多线程确实是一个比较简单的方案

但是对于开发而言,如果要考虑那么多方方面面,确实会很蛋疼,我认为用本文开始提到的zookeeper c库会更加简单方便

21 Nov 2015