(nginx源码系列七)--nginx进程管理分析

由于twemproxy的多进程改造计划,所以对教科书一般的nginx的管理方式做了分析

nginx的-s指令可以对进程执行多个命令。

这些命令由主进程接受信号然后在循环中对状态进行判断,然后通过unix socket或者信号的方式传达给子进程。

从代码里面可以看到如下状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
for()
{
if (delay) {}
sigsuspend();
ngx_time_update();
if (ngx_reap) {} //需要重启工作进程
if (!live && (ngx_terminate || ngx_quit)) {} //工作进程都结束,自己也设定了结束,那么退出
if (ngx_terminate) {} //强制退出状态
if (ngx_quit) {} //优雅退出状态
if (ngx_reconfigure) {} //配置热加载
if (ngx_restart) {} //重启(配置不变)
if (ngx_reopen) {} //重新打开日志
if (ngx_change_binary) {} //二进制文件改变
if (ngx_noaccept) {} //不再接受新连接
}

状态是通过信号处理函数来判断的。信号处理函数对主从做了不同的处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void
ngx_signal_handler(int signo)
{
...
ngx_time_sigsafe_update();
switch (ngx_process) {
case NGX_PROCESS_MASTER:
case NGX_PROCESS_SINGLE:
switch (signo) {
case ngx_signal_value(NGX_SHUTDOWN_SIGNAL):
case ngx_signal_value(NGX_TERMINATE_SIGNAL):
case SIGINT:
case ngx_signal_value(NGX_NOACCEPT_SIGNAL):
case ngx_signal_value(NGX_RECONFIGURE_SIGNAL):
case ngx_signal_value(NGX_REOPEN_SIGNAL):
case ngx_signal_value(NGX_CHANGEBIN_SIGNAL):
case SIGALRM:
case SIGIO:
case SIGCHLD:
}
break;
case NGX_PROCESS_WORKER:
case NGX_PROCESS_HELPER:
switch (signo) {
case ngx_signal_value(NGX_NOACCEPT_SIGNAL):
case ngx_signal_value(NGX_SHUTDOWN_SIGNAL):
case ngx_signal_value(NGX_TERMINATE_SIGNAL):
case SIGINT:
case ngx_signal_value(NGX_REOPEN_SIGNAL):
case ngx_signal_value(NGX_RECONFIGURE_SIGNAL):
case ngx_signal_value(NGX_CHANGEBIN_SIGNAL):
case SIGIO:
}
break;
}
if (signo == SIGCHLD) {
ngx_process_get_status();
}
}

ngx_process是一个全局变量,工作进程被fork以后这个值会改为NGX_PROCESS_WORKER

1
2
3
4
5
6
7
8
9
10
11
12
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx主xxxxxxxxxxxxxxxxxxxxx从xxxxxxxxxxxx

NGX_SHUTDOWN_SIGNAL: ngx_quit = 1 ngx_quit = 1
NGX_TERMINATE_SIGNAL: ngx_terminate = 1 ngx_terminate = 1
SIGINT: ngx_terminate = 1 ngx_terminate = 1
NGX_NOACCEPT_SIGNAL: ngx_noaccept = 1 ngx_debug_quit = 1
NGX_RECONFIGURE_SIGNAL: ngx_reconfigure = 1
NGX_REOPEN_SIGNAL: ngx_reopen = 1 ngx_reopen = 1
NGX_CHANGEBIN_SIGNAL: ngx_change_binary = 1
SIGALRM: ngx_sigalrm = 1
SIGIO: ngx_sigio = 1
SIGCHLD: ngx_reap = 1

从的信号处理函数是主的一个子集,有一些信号只有主才能处理

可以看到这种模式提供了不少操作选项,这里最常用的几个(工作进程重新拉起,退出,配置热加载)看下


工作进程重新拉起

由于第三方模块的不稳定等等原因导致工作进程异常退出,这时候nginx就需要把这个进程重新拉起了

从上文的信号处理函数可以看到,主进程收到SIGCHLD后,将ngx_reap设为1,并且进入ngx_process_get_status函数

这个函数会收集子进程的信息

子进程有5个字段来标识现在的状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
typedef struct {
ngx_pid_t pid;
int status;
ngx_socket_t channel[2];

ngx_spawn_proc_pt proc;
void *data;
char *name;

unsigned respawn:1; //是否需要被重新拉起
unsigned just_spawn:1; //是否是刚被拉起的进程
unsigned detached:1; //似乎是调试用的,在代码段并没有看到这个值可能被使用
unsigned exiting:1;//父进程向unix socket发送信息给子进程后将这个字段设为1
unsigned exited:1;//在ngx_process_get_status发现waitpid触发的pid和这个对象的pid相同时,设为1
} ngx_process_t;

这个函数会对exited和respawn做处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
static void
ngx_process_get_status(void)
{
int one = 0;

for ( ;; ) {
pid = waitpid(-1, &status, WNOHANG);
if (pid == 0) {
return;//如果全部子进程都退出了,那么退出
}
if (pid == -1) {
if (err == NGX_EINTR) {
continue;//如果waitpid调用被其他系统调用打断了,那么继续调用
}
if (err == NGX_ECHILD && one) {
continue;
}
...//输出报错信息然后退出
return;
}
one = 1;
//遍历所有的进程,将退出的进程的exited状态设为1
for (i = 0; i < ngx_last_process; i++) {
if (ngx_processes[i].pid == pid) {
ngx_processes[i].status = status;
ngx_processes[i].exited = 1;
break;
}
}
if (WTERMSIG(status)) {
ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, 0,
"%s %P exited on signal %d",
process, pid, WTERMSIG(status));
} else {
ngx_log_error(NGX_LOG_NOTICE, ngx_cycle->log, 0,
"%s %P exited with code %d",
process, pid, WEXITSTATUS(status));
}
if (WEXITSTATUS(status) == 2 && ngx_processes[i].respawn) {
ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, 0,
"%s %P exited with fatal code %d "
"and cannot be respawned",
process, pid, WEXITSTATUS(status));
ngx_processes[i].respawn = 0;
}
ngx_unlock_mutexes(pid);//释放负载均衡锁
}
}

这里有三个关于信号的知识点

1 为啥这里要用while( (pid = waitpid(-1,&stat,WNOHANG)) > 0)的方式

这是因为SIGRTMIN以前的也就是从SIGHUP到SIGSYS的1-31个信号都是不可靠的。这和UNIX系统的早期实现有关系,出于历史原因这些老的信号现在并没有被修改成可排队的。

不可靠信号的意思是信号并不会排队,如果进程处理速度很低,而同时发生了多个信号,就有可能只能接收到一个信号。

这是因为在内核里用一个位来表示一个不可靠信号的触发,所以它无法保存多个同一类型不可靠信号的触发

比如一个进程创建了100个子进程,然后在外部用kill同时把所有的子进程都杀掉,那么父进程并不能收到100个SIGCHLD信号。

这里有篇文章有个实现信号丢失的情况的一个例子

Linux可靠信号和不可靠信号

所以这里要循环执行无阻塞的waitpid判断返回值来处理可能出现的多个SIGCHLD信号发生。

另外默认只有当信号处理函数运行完以后,这个位才会被重置,所以信号处理函数不会被同一个信号打断。(但是sigaction里的有个叫SA_NODEFER可以改变这一点,使得信号处理函数可以被同一个信号多次打断)

2 one变量的用途

如果已经wait到一个pid了,那么即使再次调用的时候返回-1那也没事了,可以直接退出。

能catch到另外一个pid当然是赚的,catch不到那也不亏。

如果这里返回-1直接报错,即使成功调用也将会100%报出no such process

3 W开头这几个宏的用法

WTERMSIG(status)取得子进程因信号而中止的信号代码,一般会先用WIFSIGNALED 来判断后才使用此宏。

WEXITSTATUS(status)取得子进程exit()返回的结束代码,一般会先用WIFEXITED 来判断是否正常结束才能使用此宏。

除了这些还有几个同一系列的宏:

WIFEXITED(status)如果子进程正常结束则为非0值。

WIFSIGNALED(status)如果子进程是因为信号而结束则此宏值为真。

WIFSTOPPED(status)如果子进程处于暂停执行情况则此宏值为真。一般只有使用WUNTRACED 时才会有此情况。

WSTOPSIG(status)取得引发子进程暂停的信号代码,一般会先用WIFSTOPPED 来判断后才使用此宏。


另外这里还有一个变量被设置了ngx_processes[i].respawn

当一个进程是以exit(2)为结尾的时候,respawn字段被设置为0,也就是不会被重启

这里信号的处理也就告一段落了,然后就是主循环的reap段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
for(;;){
sigsuspend();
if (ngx_reap) {
ngx_reap = 0;
live = ngx_reap_children(cycle);
}
}
static ngx_uint_t
ngx_reap_children(ngx_cycle_t *cycle)
{
ngx_int_t i, n;
ngx_uint_t live;
ngx_channel_t ch;
ngx_core_conf_t *ccf;

ch.command = NGX_CMD_CLOSE_CHANNEL;
ch.fd = -1;

live = 0;
for (i = 0; i < ngx_last_process; i++) {
if (ngx_processes[i].pid == -1) {
continue;
}
if (ngx_processes[i].exited) {

//进程退出了,由于子进程之间也有unix socket(虽然nginx目前没有使用这些),那么告诉其他进程我这个管道废了
if (!ngx_processes[i].detached) {
ngx_close_channel(ngx_processes[i].channel, cycle->log);

ngx_processes[i].channel[0] = -1;
ngx_processes[i].channel[1] = -1;

ch.pid = ngx_processes[i].pid;
ch.slot = i;

for (n = 0; n < ngx_last_process; n++) {
if (ngx_processes[n].exited
|| ngx_processes[n].pid == -1
|| ngx_processes[n].channel[0] == -1)
{
continue;
}

ngx_log_debug3(NGX_LOG_DEBUG_CORE, cycle->log, 0,
"pass close channel s:%i pid:%P to:%P",
ch.slot, ch.pid, ngx_processes[n].pid);

/* TODO: NGX_AGAIN */

ngx_write_channel(ngx_processes[n].channel[0],
&ch, sizeof(ngx_channel_t), cycle->log);
}
}
//判断是否需要重新拉起
if (ngx_processes[i].respawn
&& !ngx_processes[i].exiting
&& !ngx_terminate
&& !ngx_quit)
{
if (ngx_spawn_process(cycle, ngx_processes[i].proc,
ngx_processes[i].data,
ngx_processes[i].name, i)
== NGX_INVALID_PID)
{
ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
"could not respawn %s",
ngx_processes[i].name);
continue;
}
//拉起后重新建立和其他子进程的管道
ch.command = NGX_CMD_OPEN_CHANNEL;
ch.pid = ngx_processes[ngx_process_slot].pid;
ch.slot = ngx_process_slot;
ch.fd = ngx_processes[ngx_process_slot].channel[0];

ngx_pass_open_channel(cycle, &ch);

live = 1;

continue;
}
//如果没重新拉起,当前这个是process最大值,那么把这个值减一
if (i == ngx_last_process - 1) {
ngx_last_process--;

} else {
ngx_processes[i].pid = -1;
}

} else if (ngx_processes[i].exiting || !ngx_processes[i].detached) {
live = 1;
}
}

return live;
}

通过判断是否还存在exiting的进程来判断live值,如果没有exiting状态的进程,并且存在ngx_terminate或者ngx_quit那么说明主进程可以退出了

这里可以看到只有当exited = 1,respawn = 1,exiting = 0的进程才会被重新拉起

一般来说挂了的进程都会exited = 1,respawn = 1,那么这里就是主要判断exiting值了

如果exiting为1,那么就不会被拉起,否则会被拉起

exiting值是哪儿可能会改变呢?这就是退出的内容了


退出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
if (ngx_terminate) {
//设置延时时间
if (delay == 0) {
delay = 50;
}
if (sigio) {
sigio--;
continue;
}
sigio = ccf->worker_processes + 2 /* cache processes */;

//延迟太久了,只好强制干掉你们了
if (delay > 1000) {
ngx_signal_worker_processes(cycle, SIGKILL);
} else {
ngx_signal_worker_processes(cycle,
ngx_signal_value(NGX_TERMINATE_SIGNAL));
}
}
if (ngx_quit) {
ngx_signal_worker_processes(cycle,
ngx_signal_value(NGX_SHUTDOWN_SIGNAL));

ls = cycle->listening.elts;
for (n = 0; n < cycle->listening.nelts; n++) {
if (ngx_close_socket(ls[n].fd) == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_socket_errno,
ngx_close_socket_n " %V failed",
&ls[n].addr_text);
}
}
cycle->listening.nelts = 0;
}

可以看到退出的实现都依赖ngx_signal_worker_processes

那么看看ngx_signal_worker_processes吧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
static void
ngx_signal_worker_processes(ngx_cycle_t *cycle, int signo)
{
ngx_int_t i;
ngx_err_t err;
ngx_channel_t ch;

//把主进程的信号翻译成要发给子进程CMD
switch (signo) {

case ngx_signal_value(NGX_SHUTDOWN_SIGNAL):
ch.command = NGX_CMD_QUIT;
break;

case ngx_signal_value(NGX_TERMINATE_SIGNAL):
ch.command = NGX_CMD_TERMINATE;
break;

case ngx_signal_value(NGX_REOPEN_SIGNAL):
ch.command = NGX_CMD_REOPEN;
break;

default:
ch.command = 0;
}

ch.fd = -1;

for (i = 0; i < ngx_last_process; i++) {

if (ngx_processes[i].detached || ngx_processes[i].pid == -1) {
continue;
}
//如果进程刚被重新加载,那么跳过
if (ngx_processes[i].just_spawn) {
ngx_processes[i].just_spawn = 0;
continue;
}
//如果已经设置为exiting状态那么跳过
if (ngx_processes[i].exiting
&& signo == ngx_signal_value(NGX_SHUTDOWN_SIGNAL))
{
continue;
}

//向unix socket发送命令给子进程
if (ch.command) {
if (ngx_write_channel(ngx_processes[i].channel[0],
&ch, sizeof(ngx_channel_t), cycle->log)
== NGX_OK)
{
if (signo != ngx_signal_value(NGX_REOPEN_SIGNAL)) {
ngx_processes[i].exiting = 1;
}
continue;
}
}

ngx_log_debug2(NGX_LOG_DEBUG_CORE, cycle->log, 0,
"kill (%P, %d)" , ngx_processes[i].pid, signo);

//向子进程发信号
if (kill(ngx_processes[i].pid, signo) == -1) {
err = ngx_errno;
ngx_log_error(NGX_LOG_ALERT, cycle->log, err,
"kill(%P, %d) failed", ngx_processes[i].pid, signo);

if (err == NGX_ESRCH) {
ngx_processes[i].exited = 1;
ngx_processes[i].exiting = 0;
ngx_reap = 1;
}

continue;
}
//信号发完设置exiting为1
if (signo != ngx_signal_value(NGX_REOPEN_SIGNAL)) {
ngx_processes[i].exiting = 1;
}
}
}

这里逻辑很简单,会使用两种通知方式发送给子进程

首先用unix socket,这种方式比较优雅,不会打断子进程的系统调用等等

如果失败了会使用信号

unix socket和信号中有一个成功了就会把exiting设置为1,上文讨论过,那么随后就不会被主进程重新拉起

还要提一点,如果信号都失败了,也就是kill发生了NGX_ESRCH(NO SUCH PROCESS)

那么exiting会被设置为0,也就是随后会被主进程重新拉起

这是为了防止出现异常的情况,子进程挂了但是主进程并不知道,虽然这种情况几率很小?

我并不确定,因为我认为是不可能发生的,谁可以留言解答下这个问题。

然后正如上面讨论的,当所有的pid不为-1的进程的exiting状态都不为1时,说明主进程的小弟们都撤了,主进程此时将会退出

配置热加载

由于上面的部分把大部分使用到的函数都分析完了,所以配置热加载分析起来就很简单了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
if (ngx_reconfigure) {
ngx_reconfigure = 0;

ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reconfiguring");

//重新初始化cycle结构(包括配置文件重新加载)
cycle = ngx_init_cycle(cycle);
if (cycle == NULL) {
cycle = (ngx_cycle_t *) ngx_cycle;
continue;
}

ngx_cycle = cycle;
//获取配置,并且重新起新进程
ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx,
ngx_core_module);
ngx_start_worker_processes(cycle, ccf->worker_processes,
NGX_PROCESS_JUST_RESPAWN);
ngx_start_cache_manager_processes(cycle, 1);

/* allow new processes to start */
ngx_msleep(100);

//把老的进程全部干掉
live = 1;
ngx_signal_worker_processes(cycle,
ngx_signal_value(NGX_SHUTDOWN_SIGNAL));
}

重新加载并不是我一开始想象的只是修改本身配置的数据结构,这可能是防止出现一些一致性问题

另外这里重启的进程是用NGX_PROCESS_JUST_RESPAWN启动的,这些进程的just_spawn值为1

因此ngx_signal_worker_processes并不会把这些刚启动的也干掉,但是会把just_spawn重新值为0

总结

nginx的进程管理逻辑思路还是很清晰的

使用了respawn,just_spawn,exiting,exited字段来维护进程的状态

在重新拉起进程的时候,会判断respawn = 1,exiting = 0,exited = 1就重新拉起

在退出的时候,由于exiting被设为1,所以不会被重新拉起

在热加载时,按照新配置启动新的进程,随后给这些进程设定标记just_spawn,然后把没有just_spawn标记的旧进程全部干掉

这个思路很严谨,我将会把他使用在twemproxy的多进程改造计划中


twemproxy的多进程框架已经搭好,中间遇到一个很囧的地方,定位了一天这个BUG,记录下来以供参考。

那就是主进程使用了zookeeper客户端,开出了线程。

此时nginx这个处理方式就会遇到问题,信号处理函数可能运行在zk的线程中,sigsuspend并不会被触发。也就不会运行for循环的flag判断的逻辑了。

这里我的解决方法是在zoo_init之前pthread_setmask阻塞信号,新开的线程会继承这些阻塞,然后zoo_init运行完以后用pthread_setmask取消这些信号阻塞。

这样就和nginx的处理逻辑完全一样了。