api_prolog(zh); notify_thread_ready(zh); LOG_DEBUG(("started IO thread")); fds[0].fd=adaptor_threads->self_pipe[0]; fds[0].events=POLLIN; while(!zh->close_requested) { structtimevaltv; int fd; int interest; int timeout; int maxfd=1; int rc;
poll(fds,maxfd,timeout); if (fd != -1) { interest=(fds[1].revents&POLLIN)?ZOOKEEPER_READ:0; interest|=((fds[1].revents&POLLOUT)||(fds[1].revents&POLLHUP))?ZOOKEEPER_WRITE:0; } if(fds[0].revents&POLLIN){ // flush the pipe char b[128]; while(read(adaptor_threads->self_pipe[0],b,sizeof(b))==sizeof(b)){} } // dispatch zookeeper events rc = zookeeper_process(zh, interest); // check the current state of the zhandle and terminate // if it is_unrecoverable() if(is_unrecoverable(zh)) break; } api_epilog(zh, 0); LOG_DEBUG(("IO thread terminated")); return0; }
do_io->zookeeper_interest intzookeeper_interest(zhandle_t *zh, int *fd, int *interest, struct timeval *tv) { structtimevalnow; if(zh==0 || fd==0 ||interest==0 || tv==0) return ZBADARGUMENTS; if (is_unrecoverable(zh)) return ZINVALIDSTATE; gettimeofday(&now, 0); //如果存在deadline时间,那么打印超过了deadline时间多久 if(zh->next_deadline.tv_sec!=0 || zh->next_deadline.tv_usec!=0){ int time_left = calculate_interval(&zh->next_deadline, &now); if (time_left > 10) LOG_WARN(("Exceeded deadline by %dms", time_left)); } //增加zh的引用计数,而后对变量进行初始化 api_prolog(zh); *fd = zh->fd; *interest = 0; tv->tv_sec = 0; tv->tv_usec = 0; //如果连接不存在 if (*fd == -1) { if (zh->connect_index == zh->addrs_count) { /* Wait a bit before trying again so that we don't spin */ zh->connect_index = 0; }else { //重新建立连接,并且关nagle算法 int rc; int enable_tcp_nodelay = 1; int ssoresult;
zh->fd = socket(zh->addrs[zh->connect_index].ss_family, SOCK_STREAM, 0); if (zh->fd < 0) { return api_epilog(zh,handle_socket_error_msg(zh,__LINE__, ZSYSTEMERROR, "socket() call failed")); } ssoresult = setsockopt(zh->fd, IPPROTO_TCP, TCP_NODELAY, &enable_tcp_nodelay, sizeof(enable_tcp_nodelay)); if (ssoresult != 0) { LOG_WARN(("Unable to set TCP_NODELAY, operation latency may be effected")); } fcntl(zh->fd, F_SETFL, O_NONBLOCK|fcntl(zh->fd, F_GETFL, 0)); { rc = connect(zh->fd, (struct sockaddr*) &zh->addrs[zh->connect_index], sizeof(struct sockaddr_in)); } //因为是非阻塞socket,所以没连上的时候此时把状态设置为connecting if (rc == -1) { /* we are handling the non-blocking connect according to * the description in section 16.3 "Non-blocking connect" * in UNIX Network Programming vol 1, 3rd edition */ if (errno == EWOULDBLOCK || errno == EINPROGRESS) zh->state = ZOO_CONNECTING_STATE; else return api_epilog(zh,handle_socket_error_msg(zh,__LINE__, ZCONNECTIONLOSS,"connect() call failed")); } else { //否则调用prime_connection,对zk服务端做一些握手验证等等,这个函数后面再看 if((rc=prime_connection(zh))!=0) return api_epilog(zh,rc);
LOG_INFO(("Initiated connection to server [%s]", format_endpoint_info(&zh->addrs[zh->connect_index]))); } } //设置初始的poll等待时间为传入的recv_timeout的三分之一 *fd = zh->fd; *tv = get_timeval(zh->recv_timeout/3); zh->last_recv = now; zh->last_send = now; zh->last_ping = now; } //如果连接建立 if (zh->fd != -1) { //计算有多久没有recv和send数据了 int idle_recv = calculate_interval(&zh->last_recv, &now); int idle_send = calculate_interval(&zh->last_send, &now); int recv_to = zh->recv_timeout*2/3 - idle_recv; int send_to = zh->recv_timeout/3; // have we exceeded the receive timeout threshold? //如果空闲recv时间达到2/3*timeout,那么超时,关闭连接 if (recv_to <= 0) { // We gotta cut our losses and connect to someone else errno = ETIMEDOUT; *interest=0; *tv = get_timeval(0); return api_epilog(zh,handle_socket_error_msg(zh, __LINE__,ZOPERATIONTIMEOUT, "connection to %s timed out (exceeded timeout by %dms)", format_endpoint_info(&zh->addrs[zh->connect_index]), -recv_to));
} //如果空闲send时间达到1/3*timeout //那么发送ping(send_ping->adaptor_send_queue->flush_send_queue->send_buffer->zookeeper_send->send) // We only allow 1/3 of our timeout time to expire before sending // a PING if (zh->state==ZOO_CONNECTED_STATE) { send_to = zh->recv_timeout/3 - idle_send; if (send_to <= 0) { if (zh->sent_requests.head==0) { int rc=send_ping(zh); if (rc < 0){ LOG_ERROR(("failed to send PING request (zk retcode=%d)",rc)); return api_epilog(zh,rc); } } send_to = zh->recv_timeout/3; } } // choose the lesser value as the timeout //计算poll的等待时间next_deadline *tv = get_timeval(recv_to < send_to? recv_to:send_to); zh->next_deadline.tv_sec = now.tv_sec + tv->tv_sec; zh->next_deadline.tv_usec = now.tv_usec + tv->tv_usec; if (zh->next_deadline.tv_usec > 1000000) { zh->next_deadline.tv_sec += zh->next_deadline.tv_usec / 1000000; zh->next_deadline.tv_usec = zh->next_deadline.tv_usec % 1000000; } //标识目前客户端对read事件(ping)感兴趣 *interest = ZOOKEEPER_READ; /* we are interested in a write if we are connected and have something * to send, or we are waiting for a connect to finish. */ //如果zh->to_send有内容那么也关注write事件 if ((zh->to_send.head && (zh->state == ZOO_CONNECTED_STATE)) || zh->state == ZOO_CONNECTING_STATE) { *interest |= ZOOKEEPER_WRITE; } } return api_epilog(zh,ZOK); }
do_io->zookeeper_process->check_events staticintcheck_events(zhandle_t *zh, int events) { if (zh->fd == -1) return ZINVALIDSTATE; //如果描述符触发写事件,并且状态为ZOO_CONNECTING_STATE状态,说明刚刚建立连接 if ((events&ZOOKEEPER_WRITE)&&(zh->state == ZOO_CONNECTING_STATE)) { int rc, error; socklen_t len = sizeof(error); rc = getsockopt(zh->fd, SOL_SOCKET, SO_ERROR, &error, &len); /* the description in section 16.4 "Non-blocking connect" * in UNIX Network Programming vol 1, 3rd edition, points out * that sometimes the error is in errno and sometimes in error */ if (rc < 0 || error) { if (rc == 0) errno = error; return handle_socket_error_msg(zh, __LINE__,ZCONNECTIONLOSS, "server refused to accept the client"); } //进入prime_connecttion if((rc=prime_connection(zh))!=0) return rc; LOG_INFO(("initiated connection to server [%s]", format_endpoint_info(&zh->addrs[zh->connect_index]))); return ZOK; }
do_io->zookeeper_process->check_events->prime_connection staticintprime_connection(zhandle_t *zh) { int rc; /*this is the size of buffer to serialize req into*/ char buffer_req[HANDSHAKE_REQ_SIZE]; int len = sizeof(buffer_req); int hlen = 0; structconnect_reqreq; //设置协议req的version,sessionId,passwd等等 req.protocolVersion = 0; req.sessionId = zh->client_id.client_id; req.passwd_len = sizeof(req.passwd); memcpy(req.passwd, zh->client_id.passwd, sizeof(zh->client_id.passwd)); req.timeOut = zh->recv_timeout; req.lastZxidSeen = zh->last_zxid; hlen = htonl(len); /* We are running fast and loose here, but this string should fit in the initial buffer! */ //发送协议头长度,以及整个包??这里为啥不是异步的,没看懂 rc=zookeeper_send(zh->fd, &hlen, sizeof(len)); serialize_prime_connect(&req, buffer_req); rc=rc<0 ? rc : zookeeper_send(zh->fd, buffer_req, len); if (rc<0) { return handle_socket_error_msg(zh, __LINE__, ZCONNECTIONLOSS, "failed to send a handshake packet: %s", strerror(errno)); } //从CONNECTING状态转换为ASSOCIATING状态 zh->state = ZOO_ASSOCIATING_STATE;
//将input_buffer标记为primer_buffer zh->input_buffer = &zh->primer_buffer; /* This seems a bit weird to to set the offset to 4, but we already have a * length, so we skip reading the length (and allocating the buffer) by * saying that we are already at offset 4 */ //设置已经当前input_buffer便宜为4 zh->input_buffer->curr_offset = 4;
do_io->zookeeper_process->check_events staticintcheck_events(zhandle_t *zh, int events) { ... //如果to_send链表有内容,且触发事件为写事件,那么flush_send_queue,此时数据会被发出 if (zh->to_send.head && (events&ZOOKEEPER_WRITE)) { /* make the flush call non-blocking by specifying a 0 timeout */ int rc=flush_send_queue(zh,0); if (rc < 0) return handle_socket_error_msg(zh,__LINE__,ZCONNECTIONLOSS, "failed while flushing send queue"); } //如果触发事件为读 if (events&ZOOKEEPER_READ) { int rc; if (zh->input_buffer == 0) { zh->input_buffer = allocate_buffer(0,0); }
//读取数据,读取一个文件头大小后申请响应的大小 rc = recv_buffer(zh->fd, zh->input_buffer); if (rc < 0) { return handle_socket_error_msg(zh, __LINE__,ZCONNECTIONLOSS, "failed while receiving a server response"); } if (rc > 0) { //如果input_buffer不为primer_buffer,说明不是刚建立的连接,那么把input_buffer的内容放入to_process链表等待被处理 gettimeofday(&zh->last_recv, 0); if (zh->input_buffer != &zh->primer_buffer) { queue_buffer(&zh->to_process, zh->input_buffer, 0); //如果是刚建立的连接 } else { int64_t oldid,newid; //deserialize deserialize_prime_response(&zh->primer_storage, zh->primer_buffer.buffer); /* We are processing the primer_buffer, so we need to finish * the connection handshake */ oldid = zh->client_id.client_id; newid = zh->primer_storage.sessionId; //把客户端的client_id和服务端的sessionid做比较,如果不同说明会话已经过期,此时会触发一个过期事件 if (oldid != 0 && oldid != newid) { zh->state = ZOO_EXPIRED_SESSION_STATE; errno = ESTALE; return handle_socket_error_msg(zh,__LINE__,ZSESSIONEXPIRED, "sessionId=%#llx has expired.",oldid); } else { //把客户端id设置为服务端的session_id zh->recv_timeout = zh->primer_storage.timeOut; zh->client_id.client_id = newid;
memcpy(zh->client_id.passwd, &zh->primer_storage.passwd, sizeof(zh->client_id.passwd)); //进入ZOO_CONNECTED_STATE状态 zh->state = ZOO_CONNECTED_STATE; LOG_INFO(("session establishment complete on server [%s], sessionId=%#llx, negotiated timeout=%d", format_endpoint_info(&zh->addrs[zh->connect_index]), newid, zh->recv_timeout)); /* we want the auth to be sent for, but since both call push to front we need to call send_watch_set first */ //发送AUTH和WATCH信息 send_set_watches(zh); /* send the authentication packet now */ send_auth_info(zh); LOG_DEBUG(("Calling a watcher for a ZOO_SESSION_EVENT and the state=ZOO_CONNECTED_STATE")); zh->input_buffer = 0; // just in case the watcher calls zookeeper_process() again PROCESS_SESSION_EVENT(zh, ZOO_CONNECTED_STATE); } } zh->input_buffer = 0; } else { // zookeeper_process was called but there was nothing to read // from the socket return ZNOTHING; } } return ZOK; }
// return if there are no pending watches if (!req.dataWatches.count && !req.existWatches.count && !req.childWatches.count) { free_key_list(req.dataWatches.data, req.dataWatches.count); free_key_list(req.existWatches.data, req.existWatches.count); free_key_list(req.childWatches.data, req.childWatches.count); return ZOK; }
oa = create_buffer_oarchive(); rc = serialize_RequestHeader(oa, "header", &h); rc = rc < 0 ? rc : serialize_SetWatches(oa, "req", &req); /* add this buffer to the head of the send queue */ rc = rc < 0 ? rc : queue_front_buffer_bytes(&zh->to_send, get_buffer(oa), get_buffer_len(oa)); /* We queued the buffer, so don't free it */ close_buffer_oarchive(&oa, 0); free_key_list(req.dataWatches.data, req.dataWatches.count); free_key_list(req.existWatches.data, req.existWatches.count); free_key_list(req.childWatches.data, req.childWatches.count); LOG_DEBUG(("Sending set watches request to %s",format_current_endpoint_info(zh))); return (rc < 0)?ZMARSHALLINGERROR:ZOK; }
nt zookeeper_process(zhandle_t *zh, int events) { buffer_list_t *bptr; int rc;
if (zh==NULL) return ZBADARGUMENTS; if (is_unrecoverable(zh)) return ZINVALIDSTATE; api_prolog(zh); IF_DEBUG(checkResponseLatency(zh)); rc = check_events(zh, events); if (rc!=ZOK) return api_epilog(zh, rc);
IF_DEBUG(isSocketReadable(zh));
//如果to_process链表有内容,也就是接受到服务器的信息,那么解析出来 while (rc >= 0 && (bptr=dequeue_buffer(&zh->to_process))) { structReplyHeaderhdr; structiarchive *ia = create_buffer_iarchive( bptr->buffer, bptr->curr_offset); deserialize_ReplyHeader(ia, "hdr", &hdr); if (hdr.zxid > 0) { zh->last_zxid = hdr.zxid; } else { // fprintf(stderr, "Got %#x for %#x\n", hdr.zxid, hdr.xid); } //如果类型是WATCHER_EVENT_XID,那么创建一个WATCHER_EVENT_XID的watch放入completions_to_process队列让异步completions队列去处理 if (hdr.xid == WATCHER_EVENT_XID) { struct WatcherEvent evt; int type = 0; char *path = NULL; completion_list_t *c = NULL;
LOG_DEBUG(("Processing WATCHER_EVENT"));
deserialize_WatcherEvent(ia, "event", &evt); type = evt.type; path = evt.path; /* We are doing a notification, so there is no pending request */ c = create_completion_entry(WATCHER_EVENT_XID,-1,0,0,0,0); c->buffer = bptr; c->c.watcher_result = collectWatchers(zh, type, path);
// We cannot free until now, otherwise path will become invalid deallocate_WatcherEvent(&evt); queue_completion(&zh->completions_to_process, c, 0); } elseif (hdr.xid == SET_WATCHES_XID) { //如果类型是SET_WATCHES_XID,那么不处理 LOG_DEBUG(("Processing SET_WATCHES")); free_buffer(bptr); //如果类型是验证,那么直接自己调用auth_completion_func来处理(不从completions线程去处理) } elseif (hdr.xid == AUTH_XID){ LOG_DEBUG(("Processing AUTH_XID"));
/* special handling for the AUTH response as it may come back * out-of-band */ auth_completion_func(hdr.err,zh); free_buffer(bptr); /* authentication completion may change the connection state to * unrecoverable */ if(is_unrecoverable(zh)){ handle_error(zh, ZAUTHFAILED); close_buffer_iarchive(&ia); return api_epilog(zh, ZAUTHFAILED); } } else { int rc = hdr.err; /* Find the request corresponding to the response */ completion_list_t *cptr = dequeue_completion(&zh->sent_requests);
/* [ZOOKEEPER-804] Don't assert if zookeeper_close has been called. */ if (zh->close_requested == 1 && cptr == NULL) { LOG_DEBUG(("Completion queue has been cleared by zookeeper_close()")); close_buffer_iarchive(&ia); return api_epilog(zh,ZINVALIDSTATE); } assert(cptr); /* The requests are going to come back in order */ if (cptr->xid != hdr.xid) { LOG_DEBUG(("Processing unexpected or out-of-order response!"));
// received unexpected (or out-of-order) response close_buffer_iarchive(&ia); free_buffer(bptr); // put the completion back on the queue (so it gets properly // signaled and deallocated) and disconnect from the server queue_completion(&zh->sent_requests,cptr,1); return handle_socket_error_msg(zh, __LINE__,ZRUNTIMEINCONSISTENCY, "unexpected server response: expected %#x, but received %#x", hdr.xid,cptr->xid); }
activateWatcher(zh, cptr->watcher, rc);
//如果是异步请求 if (cptr->c.void_result != SYNCHRONOUS_MARKER) { //如果是ping消息,那么直接更新last_ping即可 if(hdr.xid == PING_XID){ int elapsed = 0; structtimevalnow; gettimeofday(&now, 0); elapsed = calculate_interval(&zh->last_ping, &now); LOG_DEBUG(("Got ping response in %d ms", elapsed));
// Nothing to do with a ping response free_buffer(bptr); destroy_completion_entry(cptr); } else { LOG_DEBUG(("Queueing asynchronous response")); //如果其他的,那么发给completion线程去处理 cptr->buffer = bptr; queue_completion(&zh->completions_to_process, cptr, 0); } } else { //同步消息,那么自己来处理 struct sync_completion *sc = (struct sync_completion*)cptr->data; sc->rc = rc;