(nginx源码系列七)--nginx进程管理分析

原创内容,转载请注明出处

Posted by Weakyon Blog on May 19, 2015

由于twemproxy的多进程改造计划,所以对教科书一般的nginx的管理方式做了分析

nginx的-s指令可以对进程执行多个命令。

这些命令由主进程接受信号然后在循环中对状态进行判断,然后通过unix socket或者信号的方式传达给子进程。

从代码里面可以看到如下状态。

for()
{
    if (delay) {}
    sigsuspend();
    ngx_time_update();
    if (ngx_reap) {} //需要重启工作进程
    if (!live && (ngx_terminate || ngx_quit)) {} //工作进程都结束,自己也设定了结束,那么退出
    if (ngx_terminate) {} //强制退出状态
    if (ngx_quit) {} //优雅退出状态
    if (ngx_reconfigure) {} //配置热加载
    if (ngx_restart) {} //重启(配置不变)
    if (ngx_reopen) {} //重新打开日志
    if (ngx_change_binary) {} //二进制文件改变
    if (ngx_noaccept) {} //不再接受新连接
}

状态是通过信号处理函数来判断的。信号处理函数对主从做了不同的处理。

void
ngx_signal_handler(int signo)
{
    ...
    ngx_time_sigsafe_update();
    switch (ngx_process) {
    case NGX_PROCESS_MASTER:
    case NGX_PROCESS_SINGLE:
        switch (signo) {
        case ngx_signal_value(NGX_SHUTDOWN_SIGNAL):
        case ngx_signal_value(NGX_TERMINATE_SIGNAL):
        case SIGINT:
        case ngx_signal_value(NGX_NOACCEPT_SIGNAL):
        case ngx_signal_value(NGX_RECONFIGURE_SIGNAL):
        case ngx_signal_value(NGX_REOPEN_SIGNAL):
        case ngx_signal_value(NGX_CHANGEBIN_SIGNAL):
        case SIGALRM:
        case SIGIO:
        case SIGCHLD:
        }
        break;
    case NGX_PROCESS_WORKER:
    case NGX_PROCESS_HELPER:
        switch (signo) {
        case ngx_signal_value(NGX_NOACCEPT_SIGNAL):
        case ngx_signal_value(NGX_SHUTDOWN_SIGNAL):
        case ngx_signal_value(NGX_TERMINATE_SIGNAL):
        case SIGINT:
        case ngx_signal_value(NGX_REOPEN_SIGNAL):
        case ngx_signal_value(NGX_RECONFIGURE_SIGNAL):
        case ngx_signal_value(NGX_CHANGEBIN_SIGNAL):
        case SIGIO:
        }
        break;
    }
    if (signo == SIGCHLD) {
        ngx_process_get_status();
    }
}

ngx_process是一个全局变量,工作进程被fork以后这个值会改为NGX_PROCESS_WORKER

xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx

NGX_SHUTDOWN_SIGNAL:        ngx_quit = 1            ngx_quit = 1
NGX_TERMINATE_SIGNAL:       ngx_terminate = 1       ngx_terminate = 1
SIGINT:                     ngx_terminate = 1       ngx_terminate = 1
NGX_NOACCEPT_SIGNAL:        ngx_noaccept = 1        ngx_debug_quit = 1
NGX_RECONFIGURE_SIGNAL:     ngx_reconfigure = 1 
NGX_REOPEN_SIGNAL:          ngx_reopen = 1          ngx_reopen = 1
NGX_CHANGEBIN_SIGNAL:       ngx_change_binary = 1
SIGALRM:                    ngx_sigalrm = 1
SIGIO:                      ngx_sigio = 1
SIGCHLD:                    ngx_reap = 1

从的信号处理函数是主的一个子集,有一些信号只有主才能处理

可以看到这种模式提供了不少操作选项,这里最常用的几个(工作进程重新拉起,退出,配置热加载)看下


工作进程重新拉起

由于第三方模块的不稳定等等原因导致工作进程异常退出,这时候nginx就需要把这个进程重新拉起了

从上文的信号处理函数可以看到,主进程收到SIGCHLD后,将ngx_reap设为1,并且进入ngx_process_get_status函数

这个函数会收集子进程的信息

子进程有5个字段来标识现在的状态:

typedef struct {
    ngx_pid_t           pid;
    int                 status;
    ngx_socket_t        channel[2];

    ngx_spawn_proc_pt   proc;
    void               *data;
    char               *name;

    unsigned            respawn:1; //是否需要被重新拉起
    unsigned            just_spawn:1; //是否是刚被拉起的进程
    unsigned            detached:1; //似乎是调试用的,在代码段并没有看到这个值可能被使用
    unsigned            exiting:1;//父进程向unix socket发送信息给子进程后将这个字段设为1
    unsigned            exited:1;//在ngx_process_get_status发现waitpid触发的pid和这个对象的pid相同时,设为1
} ngx_process_t;

这个函数会对exited和respawn做处理

static void
ngx_process_get_status(void)
{
    int one = 0;

    for ( ;; ) { 
        pid = waitpid(-1, &status, WNOHANG);
        if (pid == 0) {
            return;//如果全部子进程都退出了,那么退出
        }   
        if (pid == -1) {
            if (err == NGX_EINTR) {
                continue;//如果waitpid调用被其他系统调用打断了,那么继续调用
            }   
            if (err == NGX_ECHILD && one) {
                continue;
            }
            ...//输出报错信息然后退出
            return;
        }
        one = 1;
        //遍历所有的进程,将退出的进程的exited状态设为1
        for (i = 0; i < ngx_last_process; i++) {
            if (ngx_processes[i].pid == pid) {
                ngx_processes[i].status = status;
                ngx_processes[i].exited = 1;
                break;
            }
        }
        if (WTERMSIG(status)) {
            ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, 0,
                          "%s %P exited on signal %d",
                          process, pid, WTERMSIG(status));
        } else {
            ngx_log_error(NGX_LOG_NOTICE, ngx_cycle->log, 0,
                          "%s %P exited with code %d",
                          process, pid, WEXITSTATUS(status));
        }
        if (WEXITSTATUS(status) == 2 && ngx_processes[i].respawn) {
            ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, 0,
                          "%s %P exited with fatal code %d "
                          "and cannot be respawned",
                          process, pid, WEXITSTATUS(status));
            ngx_processes[i].respawn = 0;
        }
        ngx_unlock_mutexes(pid);//释放负载均衡锁
    }
}

这里有三个关于信号的知识点

1 为啥这里要用while( (pid = waitpid(-1,&stat,WNOHANG)) > 0)的方式

这是因为SIGRTMIN以前的也就是从SIGHUP到SIGSYS的1-31个信号都是不可靠的。这和UNIX系统的早期实现有关系,出于历史原因这些老的信号现在并没有被修改成可排队的。

不可靠信号的意思是信号并不会排队,如果进程处理速度很低,而同时发生了多个信号,就有可能只能接收到一个信号。

这是因为在内核里用一个位来表示一个不可靠信号的触发,所以它无法保存多个同一类型不可靠信号的触发

比如一个进程创建了100个子进程,然后在外部用kill同时把所有的子进程都杀掉,那么父进程并不能收到100个SIGCHLD信号。

这里有篇文章有个实现信号丢失的情况的一个例子

Linux可靠信号和不可靠信号

所以这里要循环执行无阻塞的waitpid判断返回值来处理可能出现的多个SIGCHLD信号发生。

另外默认只有当信号处理函数运行完以后,这个位才会被重置,所以信号处理函数不会被同一个信号打断。(但是sigaction里的有个叫SA_NODEFER可以改变这一点,使得信号处理函数可以被同一个信号多次打断)

2 one变量的用途

如果已经wait到一个pid了,那么即使再次调用的时候返回-1那也没事了,可以直接退出。

能catch到另外一个pid当然是赚的,catch不到那也不亏。

如果这里返回-1直接报错,即使成功调用也将会100%报出no such process

3 W开头这几个宏的用法

WTERMSIG(status)取得子进程因信号而中止的信号代码,一般会先用WIFSIGNALED 来判断后才使用此宏。

WEXITSTATUS(status)取得子进程exit()返回的结束代码,一般会先用WIFEXITED 来判断是否正常结束才能使用此宏。

除了这些还有几个同一系列的宏:

WIFEXITED(status)如果子进程正常结束则为非0值。

WIFSIGNALED(status)如果子进程是因为信号而结束则此宏值为真。

WIFSTOPPED(status)如果子进程处于暂停执行情况则此宏值为真。一般只有使用WUNTRACED 时才会有此情况。

WSTOPSIG(status)取得引发子进程暂停的信号代码,一般会先用WIFSTOPPED 来判断后才使用此宏。


另外这里还有一个变量被设置了ngx_processes[i].respawn

当一个进程是以exit(2)为结尾的时候,respawn字段被设置为0,也就是不会被重启

这里信号的处理也就告一段落了,然后就是主循环的reap段

for(;;){
    sigsuspend();
    if (ngx_reap) {
            ngx_reap = 0; 
            live = ngx_reap_children(cycle);
    }
}
static ngx_uint_t
ngx_reap_children(ngx_cycle_t *cycle)
{
    ngx_int_t         i, n;
    ngx_uint_t        live;
    ngx_channel_t     ch;
    ngx_core_conf_t  *ccf;

    ch.command = NGX_CMD_CLOSE_CHANNEL;
    ch.fd = -1;

    live = 0;
    for (i = 0; i < ngx_last_process; i++) {
        if (ngx_processes[i].pid == -1) {
            continue;
        }
        if (ngx_processes[i].exited) {

            //进程退出了,由于子进程之间也有unix socket(虽然nginx目前没有使用这些),那么告诉其他进程我这个管道废了
            if (!ngx_processes[i].detached) {
                ngx_close_channel(ngx_processes[i].channel, cycle->log);

                ngx_processes[i].channel[0] = -1;
                ngx_processes[i].channel[1] = -1;

                ch.pid = ngx_processes[i].pid;
                ch.slot = i;

                for (n = 0; n < ngx_last_process; n++) {
                    if (ngx_processes[n].exited
                        || ngx_processes[n].pid == -1
                        || ngx_processes[n].channel[0] == -1)
                    {
                        continue;
                    }

                    ngx_log_debug3(NGX_LOG_DEBUG_CORE, cycle->log, 0,
                                   "pass close channel s:%i pid:%P to:%P",
                                   ch.slot, ch.pid, ngx_processes[n].pid);

                    /* TODO: NGX_AGAIN */

                    ngx_write_channel(ngx_processes[n].channel[0],
                                      &ch, sizeof(ngx_channel_t), cycle->log);
                }
            }
            //判断是否需要重新拉起
            if (ngx_processes[i].respawn
                && !ngx_processes[i].exiting
                && !ngx_terminate
                && !ngx_quit)
            {
                if (ngx_spawn_process(cycle, ngx_processes[i].proc,
                                      ngx_processes[i].data,
                                      ngx_processes[i].name, i)
                    == NGX_INVALID_PID)
                {
                    ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
                                  "could not respawn %s",
                                  ngx_processes[i].name);
                    continue;
                }
                //拉起后重新建立和其他子进程的管道
                ch.command = NGX_CMD_OPEN_CHANNEL;
                ch.pid = ngx_processes[ngx_process_slot].pid;
                ch.slot = ngx_process_slot;
                ch.fd = ngx_processes[ngx_process_slot].channel[0];

                ngx_pass_open_channel(cycle, &ch);

                live = 1;

                continue;
            }
            //如果没重新拉起,当前这个是process最大值,那么把这个值减一
            if (i == ngx_last_process - 1) {
                ngx_last_process--;

            } else {
                ngx_processes[i].pid = -1;
            }

        } else if (ngx_processes[i].exiting || !ngx_processes[i].detached) {
            live = 1;
        }
    }

    return live;
}

通过判断是否还存在exiting的进程来判断live值,如果没有exiting状态的进程,并且存在ngx_terminate或者ngx_quit那么说明主进程可以退出了

这里可以看到只有当exited = 1,respawn = 1,exiting = 0的进程才会被重新拉起

一般来说挂了的进程都会exited = 1,respawn = 1,那么这里就是主要判断exiting值了

如果exiting为1,那么就不会被拉起,否则会被拉起

exiting值是哪儿可能会改变呢?这就是退出的内容了


退出

if (ngx_terminate) {
    //设置延时时间
    if (delay == 0) { 
           delay = 50;
       }    
       if (sigio) {
           sigio--;
           continue;
       }    
       sigio = ccf->worker_processes + 2 /* cache processes */;

       //延迟太久了,只好强制干掉你们了
       if (delay > 1000) {
           ngx_signal_worker_processes(cycle, SIGKILL);
       } else {
           ngx_signal_worker_processes(cycle,
                                  ngx_signal_value(NGX_TERMINATE_SIGNAL));
       }
}
if (ngx_quit) {
    ngx_signal_worker_processes(cycle,
                                   ngx_signal_value(NGX_SHUTDOWN_SIGNAL));

    ls = cycle->listening.elts;
    for (n = 0; n < cycle->listening.nelts; n++) {
       if (ngx_close_socket(ls[n].fd) == -1) {
           ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_socket_errno,
                         ngx_close_socket_n " %V failed",
                         &ls[n].addr_text);
       }    
    }    
    cycle->listening.nelts = 0; 
}

可以看到退出的实现都依赖ngx_signal_worker_processes

那么看看ngx_signal_worker_processes吧

static void
ngx_signal_worker_processes(ngx_cycle_t *cycle, int signo)
{
    ngx_int_t      i;
    ngx_err_t      err;
    ngx_channel_t  ch;

    //把主进程的信号翻译成要发给子进程CMD
    switch (signo) {

    case ngx_signal_value(NGX_SHUTDOWN_SIGNAL):
        ch.command = NGX_CMD_QUIT;
        break;

    case ngx_signal_value(NGX_TERMINATE_SIGNAL):
        ch.command = NGX_CMD_TERMINATE;
        break;

    case ngx_signal_value(NGX_REOPEN_SIGNAL):
        ch.command = NGX_CMD_REOPEN;
        break;

    default:
        ch.command = 0;
    }

    ch.fd = -1;

    for (i = 0; i < ngx_last_process; i++) {

        if (ngx_processes[i].detached || ngx_processes[i].pid == -1) {
            continue;
        }
        //如果进程刚被重新加载,那么跳过
        if (ngx_processes[i].just_spawn) {
            ngx_processes[i].just_spawn = 0;
            continue;
        }
        //如果已经设置为exiting状态那么跳过
        if (ngx_processes[i].exiting
            && signo == ngx_signal_value(NGX_SHUTDOWN_SIGNAL))
        {
            continue;
        }
        
        //向unix socket发送命令给子进程
        if (ch.command) {
            if (ngx_write_channel(ngx_processes[i].channel[0],
                                  &ch, sizeof(ngx_channel_t), cycle->log)
                == NGX_OK)
            {
                if (signo != ngx_signal_value(NGX_REOPEN_SIGNAL)) {
                    ngx_processes[i].exiting = 1;
                }
                continue;
            }
        }

        ngx_log_debug2(NGX_LOG_DEBUG_CORE, cycle->log, 0,
                       "kill (%P, %d)" , ngx_processes[i].pid, signo);

        //向子进程发信号
        if (kill(ngx_processes[i].pid, signo) == -1) {
            err = ngx_errno;
            ngx_log_error(NGX_LOG_ALERT, cycle->log, err,
                          "kill(%P, %d) failed", ngx_processes[i].pid, signo);

            if (err == NGX_ESRCH) {
                ngx_processes[i].exited = 1;
                ngx_processes[i].exiting = 0;
                ngx_reap = 1;
            }

            continue;
        }
        //信号发完设置exiting为1
        if (signo != ngx_signal_value(NGX_REOPEN_SIGNAL)) {
            ngx_processes[i].exiting = 1;
        }
    }
}

这里逻辑很简单,会使用两种通知方式发送给子进程

首先用unix socket,这种方式比较优雅,不会打断子进程的系统调用等等

如果失败了会使用信号

unix socket和信号中有一个成功了就会把exiting设置为1,上文讨论过,那么随后就不会被主进程重新拉起

还要提一点,如果信号都失败了,也就是kill发生了NGX_ESRCH(NO SUCH PROCESS)

那么exiting会被设置为0,也就是随后会被主进程重新拉起

这是为了防止出现异常的情况,子进程挂了但是主进程并不知道,虽然这种情况几率很小?

我并不确定,因为我认为是不可能发生的,谁可以留言解答下这个问题。

然后正如上面讨论的,当所有的pid不为-1的进程的exiting状态都不为1时,说明主进程的小弟们都撤了,主进程此时将会退出

配置热加载

由于上面的部分把大部分使用到的函数都分析完了,所以配置热加载分析起来就很简单了

if (ngx_reconfigure) {
    ngx_reconfigure = 0;

    ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reconfiguring");

    //重新初始化cycle结构(包括配置文件重新加载)
    cycle = ngx_init_cycle(cycle);
    if (cycle == NULL) {
        cycle = (ngx_cycle_t *) ngx_cycle;
        continue;
    }
    
    ngx_cycle = cycle;
    //获取配置,并且重新起新进程
    ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx,
                                          ngx_core_module);
    ngx_start_worker_processes(cycle, ccf->worker_processes,
                              NGX_PROCESS_JUST_RESPAWN);
    ngx_start_cache_manager_processes(cycle, 1);

    /* allow new processes to start */
    ngx_msleep(100);

    //把老的进程全部干掉
    live = 1;
    ngx_signal_worker_processes(cycle,
                               ngx_signal_value(NGX_SHUTDOWN_SIGNAL));
}

重新加载并不是我一开始想象的只是修改本身配置的数据结构,这可能是防止出现一些一致性问题

另外这里重启的进程是用NGX_PROCESS_JUST_RESPAWN启动的,这些进程的just_spawn值为1

因此ngx_signal_worker_processes并不会把这些刚启动的也干掉,但是会把just_spawn重新值为0

总结

nginx的进程管理逻辑思路还是很清晰的

使用了respawn,just_spawn,exiting,exited字段来维护进程的状态

在重新拉起进程的时候,会判断respawn = 1,exiting = 0,exited = 1就重新拉起

在退出的时候,由于exiting被设为1,所以不会被重新拉起

在热加载时,按照新配置启动新的进程,随后给这些进程设定标记just_spawn,然后把没有just_spawn标记的旧进程全部干掉

这个思路很严谨,我将会把他使用在twemproxy的多进程改造计划中


twemproxy的多进程框架已经搭好,中间遇到一个很囧的地方,定位了一天这个BUG,记录下来以供参考。

那就是主进程使用了zookeeper客户端,开出了线程。

此时nginx这个处理方式就会遇到问题,信号处理函数可能运行在zk的线程中,sigsuspend并不会被触发。也就不会运行for循环的flag判断的逻辑了。

这里我的解决方法是在zoo_init之前pthread_setmask阻塞信号,新开的线程会继承这些阻塞,然后zoo_init运行完以后用pthread_setmask取消这些信号阻塞。

这样就和nginx的处理逻辑完全一样了。

19 May 2015