twemproxy的多进程改造计划中参考了nginx的设计,然而nginx有一段让我看不太明白。
这篇文章总结的很好
sigsuspend
sigprocmask函数的使用方法
sigsuspend将新的信号集阻塞操作和pause操作组合在一起成为原子操作
做了以下的操作
设置新的mask阻塞当前进程;
收到信号,恢复原先mask;
调用该进程设置的信号处理函数;
待信号处理函数返回后,sigsuspend返回。
现在再来回顾一下我当时疑惑的代码段
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 void ngx_master_process_cycle (ngx_cycle_t *cycle) { sigemptyset(&set ); sigaddset(&set , SIGCHLD); sigaddset(&set , SIGALRM); sigaddset(&set , SIGIO); sigaddset(&set , SIGINT); sigaddset(&set , ngx_signal_value(NGX_RECONFIGURE_SIGNAL)); sigaddset(&set , ngx_signal_value(NGX_REOPEN_SIGNAL)); sigaddset(&set , ngx_signal_value(NGX_NOACCEPT_SIGNAL)); sigaddset(&set , ngx_signal_value(NGX_TERMINATE_SIGNAL)); sigaddset(&set , ngx_signal_value(NGX_SHUTDOWN_SIGNAL)); sigaddset(&set , ngx_signal_value(NGX_CHANGEBIN_SIGNAL)); if (sigprocmask(SIG_BLOCK, &set , NULL ) == -1 ) { ngx_log_error(NGX_LOG_ALERT, cycle->log , ngx_errno, "sigprocmask() failed" ); } sigemptyset(&set ); ngx_start_worker_processes(cycle, ccf->worker_processes, NGX_PROCESS_RESPAWN); for ( ;; ) { sigsuspend(&set ); if (ngx_reconfigure) { ngx_start_worker_processes(cycle, ccf->worker_processes, NGX_PROCESS_RESPAWN); } } }
首先初始化了一个空集set,往里面添加了信号形成阻塞的信号集合,然后运行sigprocmask(SIG_BLOCK,
&set, NULL)阻塞这些信号
临界区(在sigprocmask之后和sigsuspend之前的代码)中如果出现了信号集的信号都会被阻塞
临界区之后运行的sigsuspend(&set)处理的信号集合是个空集
sigsuspend将不会阻塞任何信号,如果在临界区中就发生信号会执行信号函数,否则就等待任何信号的发生,然后往下运行
这里要提一下临界区的代码,主要就是一句ngx_start_worker_processes。
ngx_start_worker_processed中会fork出子进程,每个子进程初始化时会调用ngx_worker_process_init
1 2 3 4 5 6 7 8 9 10 11 static void ngx_worker_process_init (ngx_cycle_t *cycle, ngx_int_t worker) { sigemptyset(&set ); if (sigprocmask(SIG_SETMASK, &set , NULL ) == -1 ) { ngx_log_error(NGX_LOG_ALERT, cycle->log , ngx_errno, "sigprocmask() failed" ); } }
可以看到,在fork出进程后初始化,将继承的阻塞信号集设置为空集,也就是不在阻塞这些信号
这样做是怕fork前信号函数执行,例如在临界区执行reload信号执行函数,导致逻辑复杂化,现在的逻辑就很清晰,必须全部fork完毕以后才能reload。
所以说twemproxy的多进程改造也应该学习这样的过程,把fork作为临界区来处理。
sigsuspend在APUE这本圣经里面有个案例是当作同步来使用的。我觉得这个例子也很好。
我稍微补充了一下内容和书中有一些区别
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <sys/types.h> #include <errno.h> #include <signal.h> static volatile sig_atomic_t sigflag;static sigset_t newmask,oldmask,zeromask;static void printcharacter (const char * str) { const char *ptr; setbuf(stdout ,NULL ); for (ptr=str;*ptr!='\0' ;ptr++) putc(*ptr,stdout ); } static void sig_usr (int signo) { sigflag = 1 ; } void TELL_WAIT (void ) { signal(SIGUSR1,sig_usr); signal(SIGUSR2,sig_usr); sigemptyset(&zeromask); sigemptyset(&newmask); sigaddset(&newmask,SIGUSR1); sigaddset(&newmask,SIGUSR2); sigprocmask(SIG_BLOCK,&newmask,&oldmask); } void TELL_PARENT (pid_t pid) { kill(pid,SIGUSR2); } void TELL_CHILD (pid_t pid) { kill(pid,SIGUSR1); } void WAIT_PARENT (void ) { while (sigflag == 0 ) sigsuspend(&zeromask); sigflag = 0 ; sigprocmask(SIG_SETMASK,&oldmask,NULL ); } void WAIT_CHILD (void ) { while (sigflag == 0 ) sigsuspend(&zeromask); sigflag = 0 ; sigprocmask(SIG_SETMASK,&oldmask,NULL ); } int main () { pid_t pid; TELL_WAIT(); pid = fork(); switch (pid) { case -1 : perror("fork() error" ); exit (-1 ); case 0 : printcharacter("output from child prcess.\n" ); TELL_PARENT(getppid()); WAIT_PARENT(); printcharacter("output2 from child prcess.\n" ); break ; default : WAIT_CHILD(); printcharacter("output from parent prcess.\n" ); TELL_CHILD(pid); } exit (0 ); }
父进程将等待子进程的信号,输出字串,随后发送子进程信号证明自己收到信号。
子进程输出字串,随后发送信号给父进程,然后等待父进程的信号后再次输出。
这样的同步方式使得字串永远按以下顺序输出不会乱序。
1 2 3 4 [root@localhost.localdomain ~]# ./signal_test output from child prcess. output from parent prcess. output2 from child prcess.
这种同步方式是很有启发性的。
例如子进程是父进程的一个阻塞操作的异步操作,父进程通过管道将处理的对象放入子进程的等待处理队列,子进程通过信号来告知父进程,已经全部处理完毕。