nginx模块开发的一些经验

原创内容,转载请注明出处

Posted by Weakyon Blog on April 13, 2015

fastdfs有nginx模块,在上面做一些缩略图缓存的功能。我当时总是觉得lua-nginx这种方式太重了,因此就入了nginx模块开发这个坑。

nginx模块开发的资料网上还是比较多的,《nginx模块开发与架构解析》这一本就不错,推荐入手实体书。我看PDF看的有点烦躁,因为很多东西要来回对照着看。

很多基础东西书里写的很细,就不谈了,就说说自己开发时候的过程和遇到的坑吧。

nginx模块是存在http module和http filter module的。

http module依次处理,最后输出内容,输出的内容再由http filter module依次过滤内容输出。

我的缩略图方案就是request->fastdfs-nginx-module->nginx-image-filter-module这个顺序进行的

因此cache模块应当改写在image-filter内

look up cache没什么好看的,中规中矩的本地目录检索。看下save cache吧。

一。save cache的AIO实现

nginx本身因为不支持文件的上传,因此没有设计异步的write方式。只有异步的read方式。

把异步write融入nginx本身的框架废了不少手脚。

  
extern int            ngx_eventfd;
extern aio_context_t  ngx_aio_ctx;

static void ngx_file_aio_event_handler(ngx_event_t *ev);

static int
io_submit(aio_context_t ctx, long n, struct iocb **paiocb)
{
    return syscall(SYS_io_submit, ctx, n, paiocb);
}

ssize_t
ngx_file_aio_write(ngx_file_t *file, u_char *buf, size_t size, off_t offset,
    ngx_pool_t *pool)
{
    ngx_err_t         err;
    struct iocb      *piocb[1];
    ngx_event_t      *ev;
    ngx_event_aio_t  *aio;

    if (!ngx_file_aio) {
        return ngx_write_file(file, buf, size, offset);
    }

    aio = file->aio;

    if (aio == NULL) {
        aio = ngx_pcalloc(pool, sizeof(ngx_event_aio_t));
        if (aio == NULL) {
            return NGX_ERROR;
        }

        aio->file = file;
        aio->fd = file->fd;
        aio->event.data = aio;
        aio->event.ready = 1;
        aio->event.log = file->log;
        file->aio = aio;
    }

    ev = &aio->event;

    if (!ev->ready) {
        ngx_log_error(NGX_LOG_ALERT, file->log, 0,
                      "second aio post for \"%V\"", &file->name);
        return NGX_AGAIN;
    }

    ngx_log_debug4(NGX_LOG_DEBUG_CORE, file->log, 0,
                   "aio complete:%d @%O:%z %V",
                   ev->complete, offset, size, &file->name);

    if (ev->complete) {
        ev->active = 0;
        ev->complete = 0;

        if (aio->res >= 0) {
            ngx_set_errno(0);
            return aio->res;
        }

        ngx_set_errno(-aio->res);

        ngx_log_error(NGX_LOG_CRIT, file->log, ngx_errno,
                      "aio write \"%s\" failed", file->name.data);

        return NGX_ERROR;
    }

    ngx_memzero(&aio->aiocb, sizeof(struct iocb));

    aio->aiocb.aio_data = (uint64_t) (uintptr_t) ev;
    aio->aiocb.aio_lio_opcode = IOCB_CMD_PWRITE;
    aio->aiocb.aio_fildes = file->fd;
    aio->aiocb.aio_buf = (uint64_t) (uintptr_t) buf;
    aio->aiocb.aio_nbytes = size;
    aio->aiocb.aio_offset = offset;
    aio->aiocb.aio_flags = IOCB_FLAG_RESFD;
    aio->aiocb.aio_resfd = ngx_eventfd;

    ev->handler = ngx_file_aio_event_handler;

    piocb[0] = &aio->aiocb;

    if (io_submit(ngx_aio_ctx, 1, piocb) == 1) {
        ev->active = 1;
        ev->ready = 0;
        ev->complete = 0;

        return NGX_AGAIN;
    }

    err = ngx_errno;

    if (err == NGX_EAGAIN) {
        return ngx_write_file(file, buf, size, offset);
    }

    ngx_log_error(NGX_LOG_CRIT, file->log, err,
                  "io_submit(\"%V\") failed", &file->name);

    if (err == NGX_ENOSYS) {
        ngx_file_aio = 0;
        return ngx_write_file(file, buf, size, offset);
    }

    return NGX_ERROR;
}


static void
ngx_file_aio_event_handler(ngx_event_t *ev)
{
    ngx_event_aio_t  *aio;

    aio = ev->data;

    ngx_log_debug2(NGX_LOG_DEBUG_CORE, ev->log, 0,
                   "aio event handler fd:%d %V", aio->fd, &aio->file->name);

    aio->handler(ev);
}

struct image_cache_s {
	u_char file_name[128];
	int name_len;
	ngx_file_t file;
	u_char *buf;
	ngx_log_t log;
	ngx_open_file_t log_file;
	ngx_event_aio_t aio;
};

typedef struct image_cache_s image_cache_t;

static void
ngx_http_save_cache_file_handler(ngx_event_t *ev)
{
	image_cache_t *im;
	u_char new_name[128];
	im = ((ngx_event_aio_t *)ev->data)->data;
	
	if(ngx_close_file(im->file.fd) == NGX_FILE_ERROR){
		ngx_log_error(NGX_LOG_ALERT,ev->log,ngx_errno,ngx_close_file_n " \"%s\" failed",im->file_name);
	}
	ngx_memzero(new_name,128);
	ngx_memcpy(new_name,im->file_name,im->name_len);
	if(ngx_rename_file(im->file_name,new_name) == NGX_FILE_ERROR){
		ngx_log_error(NGX_LOG_ERR, ev->log, ngx_errno, "\"%s\"",im->file_name);
	}
	
	ngx_log_error(NGX_LOG_ERR, ev->log, 0, "\"%s\",\"%s\"",im->file_name,new_name);
	free(im->buf);
    free(im);
}

static ngx_int_t
ngx_http_save_cache_file(ngx_http_request_t *r, ngx_chain_t *in,ngx_chain_t *out)
{
	u_char *uri_file_name;
	size_t file_len;
	int rand_stamp;
	image_cache_t *im;
	ngx_int_t result;

	im = calloc(1,sizeof(image_cache_t));
	if(im == NULL){
		ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "\"%d\" :can't malloc space",__LINE__);
		return NGX_ERROR;
	}

	/*uri_start plus 12 than point to file_name*/
	im->name_len = r->uri_end - r->uri_start - 12;
	uri_file_name = r->uri_start + 12;

	/*doesn't exist temp file*/
	do{
		rand_stamp = rand()%1000000;
		snprintf((char *)im->file_name,128,"/tmp/image_cache/%.*s-%d",im->name_len,uri_file_name,rand_stamp);
	}while(access((char *)im->file_name,F_OK) == 0);
	/*strlen("/tmp/image_cache/") = 17*/
	//ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "\"%s\"",im->file_name);
	im->name_len += 17;

	im->file.fd = ngx_open_file(im->file_name, NGX_FILE_WRONLY|NGX_FILE_NONBLOCK, NGX_FILE_CREATE_OR_OPEN,NGX_FILE_DEFAULT_ACCESS);

	if(im->file.fd <= 0){
		ngx_log_error(NGX_LOG_ERR, r->connection->log, ngx_errno, "can't open file \"%s\"",im->file_name);
		free(im);
		return NGX_ERROR;
	}
	file_len = out->buf->last - out->buf->pos;
	im->buf = malloc(file_len);
	if(im->buf == NULL){
		free(im);
		ngx_log_error(NGX_LOG_ERR, r->connection->log, ngx_errno, "\"%d\" :can't malloc space",__LINE__);
		return NGX_ERROR;
	}
	ngx_memcpy(im->buf,out->buf->pos,file_len);

    im->file.aio = &im->aio;

	im->log.file = &im->log_file;
	im->log.log_level = NGX_LOG_DEBUG;
	im->log_file.fd = r->connection->log->file->fd;

    im->file.aio->file = &im->file;
    im->file.aio->fd = im->file.fd;
	im->file.aio->event.data = &im->aio;
    im->file.aio->event.ready = 1;
	im->file.aio->data = im;
	im->file.log = &im->log;
    im->file.aio->event.log = im->file.log;
#if (NGX_HAVE_AIO_SENDFILE)
    im->file.aio->last_offset = -1;
#endif
	im->file.aio->handler = ngx_http_save_cache_file_handler;
	if((result = ngx_file_aio_write(&im->file,im->buf,file_len,0,r->pool)) != NGX_AGAIN){
		free(im->buf);
		free(im);
		return result;
	}
	return NGX_OK;
}

首先会检索是否存在temp file,这是因为存储时不是直接存储原名,而是存储原名+一定后缀的temp file

而后创建相应的文件结构填充数据。然后创建AIO结构,并且调用改写自ngx_file_aio_read的ngx_file_aio_write函数,并且绑定异步IO完成后的回调函数ngx_http_save_cache_file_handler

就算是大功告成了。

而这个异步IO方式真的完美融入了nginx的框架了吗?来探寻下nginx的异步IO框架吧。

ngx_epoll_init() -> ngx_epoll_aio_init()

  
static void
ngx_epoll_aio_init(ngx_cycle_t *cycle, ngx_epoll_conf_t *epcf)
{
    int                 n;
    struct epoll_event  ee;

    ngx_eventfd = syscall(SYS_eventfd, 0);
...
    n = 1;

    if (ioctl(ngx_eventfd, FIONBIO, &n) == -1) {
...

    if (io_setup(epcf->aio_requests, &ngx_aio_ctx) == -1) {
...

    ngx_eventfd_event.data = &ngx_eventfd_conn;
    ngx_eventfd_event.handler = ngx_epoll_eventfd_handler;
    ngx_eventfd_event.log = cycle->log;
    ngx_eventfd_event.active = 1;
    ngx_eventfd_conn.fd = ngx_eventfd;
    ngx_eventfd_conn.read = &ngx_eventfd_event;
    ngx_eventfd_conn.log = cycle->log;

    ee.events = EPOLLIN|EPOLLET;
    ee.data.ptr = &ngx_eventfd_conn;

    if (epoll_ctl(ep, EPOLL_CTL_ADD, ngx_eventfd, &ee) != -1) {
        return;
    }

这里创建一个ngx_eventfd的变量,创建和初始化了ngx_eventfd_event,ngx_eventfd,ngx_eventfd_conn和ngx_aio_ctx这些全局变量。

而后把代表aio的描述符ngx_eventfd加入epoll,就完成了epoll和native aio的绑定啦。

当实际使用aio write的时候,使用了ngx_eventfd这个全局变量进行了操作,当描述符活跃时就能触发epoll进行异步IO的操作。

例如上文的:

  
ssize_t
ngx_file_aio_writed(ngx_file_t *file, u_char *buf, size_t size, off_t offset,
    ngx_pool_t *pool)
{
...
    aio->aiocb.aio_flags = IOCB_FLAG_RESFD;
    aio->aiocb.aio_resfd = ngx_eventfd;
 
    ev->handler = ngx_file_aio_event_handler;

然后再来看看触发epoll的过程

  
static ngx_int_t
ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
{
...
    events = epoll_wait(ep, event_list, (int) nevents, timer);
...
    for (i = 0; i < events; i++) {
        c = event_list[i].data.ptr;
 
        instance = (uintptr_t) c & 1;
        c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);
 
        rev = c->read;
...
        if ((revents & EPOLLIN) && rev->active) {
...
                rev->handler(rev);
...
        }
...

aio的描述符被触发后最终会进入rev->handler(rev)

也就是ngx_epoll_eventfd_handler函数内

  
static void
ngx_epoll_eventfd_handler(ngx_event_t *ev)
{
...
    n = read(ngx_eventfd, &ready, 8);
...
    ts.tv_sec = 0;
    ts.tv_nsec = 0;
 
    while (ready) {
 
        events = io_getevents(ngx_aio_ctx, 1, 64, event, &ts);
...
        if (events > 0) {
            ready -= events;
 
            for (i = 0; i < events; i++) {
...
                e = (ngx_event_t *) (uintptr_t) event[i].data;
 
                e->complete = 1;
                e->active = 0;
                e->ready = 1;
 
                aio = e->data;
                aio->res = event[i].res;
 
                ngx_post_event(e, &ngx_posted_events);
            }
 
            continue;
        }
...

在这里会使用AIO的调用获取一共触发了多少AIO事件,而后把这些事件加入post事件队列,这个队列在epoll的下一轮时会被操作

  
void
ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
...
    if (ngx_posted_events) {
        if (ngx_threaded) {
            ngx_wakeup_worker_thread(cycle);
 
        } else {
            ngx_event_process_posted(cycle, &ngx_posted_events);
        }
    }
}
 
void
ngx_event_process_posted(ngx_cycle_t *cycle,
    ngx_thread_volatile ngx_event_t **posted)
{
    ngx_event_t  *ev;
 
    for ( ;; ) {
 
        ev = (ngx_event_t *) *posted;
...
        ngx_delete_posted_event(ev);
 
        ev->handler(ev);
    }
}

这里的ev->handler(ev);所调用的就是上文中的ev->handler = ngx_file_aio_event_handler;

也就是

  
static void
ngx_file_aio_event_handler(ngx_event_t *ev)
{
    ngx_event_aio_t  *aio;
 
    aio = ev->data;
 
    ngx_log_debug2(NGX_LOG_DEBUG_CORE, ev->log, 0,
                   "aio event handler fd:%d %V", aio->fd, &aio->file->name);
 
    aio->handler(ev);
}

这个aio->handler(ev);回调函数的逻辑是自己设计的,一般是用作数据的清理的

我是这么定义的im->file.aio->handler = ngx_http_save_cache_file_handler;

做了一些末尾的清理操作。

另外说个无关的,如果是AIO_READ,那么这里的aio操作是已经被写死的

ngx_http_copy_aio_event_handler() -> ngx_http_request_handler() -> ngx_http_writer() -> ngx_http_output_filter() -> ngx_http_top_body_filter()

这样一个流程,有兴趣的同学可以跟着看下

至此,AIO的初始化,结合epoll的处理以及最后的清尾操作都已经分析完毕。

可以看到我的这个AIO_WRITE机制很完美的融入了nginx的全流程中。

二。nginx的http模块的协作

lookup cache不仅仅应该出现在filter模块里,应当在fastdfs的nginx模块之前就拦截下已经存在在缓存的文件请求。

所以应当是这样的处理顺序

lookup cache -> fastdfs nginx -> image filter look up cache -> image filter save cache

这就带来一个问题,这还得从http的介入阶段说起

大部分的http模块都在NGX_HTTP_CONTENT_PHASE阶段介入(一共是11个阶段)

该阶段提供两种介入方式

方法一:与其他10个阶段一样,在必定会调用的postconfiguration方法向全局的ngx_http_core_main_conf_t结构体phases[NGX_HTTP_CONTENT_PHASE]动态数组添加ngx_htpp_hadler_pt处理方法,这种处理方式将应用全部HTTP请求,无视你的SERVER LOC配置。

方法二:通过设置ngx_http_core_loc_conf_t的handler指针来实现,这样可以匹配location段。

fastdfs的nginx模块是使用的方法二,看起来是不错,但是拓展开发的时候是存在坑的。

那就是ngx_htpp_core_loc_conf_t的handler指针不是数组,所以第二种方法只能用一个ngx_http_hander方法,也就是不能和其他http模块一起协作了。

所以不得已改成了方法一的形式,但是方法一的缺点也很明显,他会应用所有的HTTP请求。

包括/web_status和CDN的/do_not_delete

由fastdfs的nginx模块操作时就直接返回错误400了

所以只好在fastdfs nginx的handler入口加了一句

  
if(memcmp(r->uri_start,"/g",2) != 0){
	return NGX_DECLINED;
}

当判断不是/g的url请求时,直接跳过交给下一个http模块处理。

这样就完美运行了

三。fastdfs nginx的IP映射模块

首先在ngx_http_fastdfs_commands中加入命令

  
{ ngx_string("ngx_fdfs_org_ip"),
      NGX_HTTP_LOC_CONF|NGX_CONF_ANY,
      ngx_conf_set_org_ip,
      NGX_HTTP_LOC_CONF_OFFSET,
      0,   
      NULL },

{ ngx_string("ngx_fdfs_chg_ip"),
      NGX_HTTP_LOC_CONF|NGX_CONF_ANY,
      ngx_conf_set_chg_ip,
      NGX_HTTP_LOC_CONF_OFFSET,
      0,   
      NULL },

然后添加实现命令

  
static char* ngx_conf_set_org_ip(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
    ngx_http_fastdfs_loc_conf_t *mycf = conf;
    ngx_array_t *org_ip = cf->args;
    mycf->org_ip = ngx_array_create(cf->pool,org_ip->nelts,sizeof(ngx_str_t));

    ngx_uint_t array_seq = 0;
    for(;array_seq < org_ip->nelts;++array_seq) {
        memcpy((ngx_str_t *)mycf->org_ip->elts + array_seq,(ngx_str_t *)org_ip->elts + array_seq,sizeof(ngx_str_t));
    }
    mycf->org_ip->nelts = org_ip->nelts;

    return NGX_CONF_OK;
}

static char* ngx_conf_set_chg_ip(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
    ngx_http_fastdfs_loc_conf_t *mycf = conf;
    ngx_array_t *chg_ip = cf->args;
    mycf->chg_ip = ngx_array_create(cf->pool,chg_ip->nelts,sizeof(ngx_str_t));

    ngx_uint_t array_seq = 0;
    for(;array_seq < chg_ip->nelts;++array_seq) {
        memcpy((ngx_str_t *)mycf->chg_ip->elts + array_seq,(ngx_str_t *)chg_ip->elts + array_seq,sizeof(ngx_str_t));
    }
    mycf->chg_ip->nelts = chg_ip->nelts;

    return NGX_CONF_OK;
}

实现命令将读出的cf->args数组进行数据拷贝,因为cf->args数组在初始化完毕后会被销毁。光是指向这个指针是无效的,这个我也踩了个坑才发现。

merge_ptr和init_ptr我就不贴了

然后就是IP映射的代码

  
static void ngx_http_fastdfs_set_ctx_dest_ip_addr(ngx_http_fastdfs_loc_conf_t *plcf,ngx_http_fastdfs_proxy_ctx_t *ctx,const char *dest_ip_addr)
{
    if(plcf->org_ip != NGX_CONF_UNSET_PTR && plcf->chg_ip != NGX_CONF_UNSET_PTR) {
        ngx_str_t *org_ip = plcf->org_ip->elts;
        ngx_str_t *chg_ip = plcf->chg_ip->elts;
        ngx_uint_t array_seq = 1;
        while(array_seq < plcf->org_ip->nelts) {
            if(memcmp(org_ip[array_seq].data,dest_ip_addr,org_ip[array_seq].len) == 0){
                strncpy(ctx->dest_ip_addr,(const char *)chg_ip[array_seq].data,chg_ip[array_seq].len);
                return ;
            }
            ++array_seq;
        }
    }
    strcpy(ctx->dest_ip_addr, dest_ip_addr);
}

非常简单,子串匹配罢了。

这样一个IP映射模块就完成了。

未完待续

13 Apr 2015