网站建设知识
Nginx源码分析 - 主流程篇 - 多进程的惊群和进程负载均衡处理
2025-07-22 10:00  点击:0

Linux2.6版本之前还存在对于socket的accept的惊群现象。之后的版本已经解决掉了这个问题。

惊群是指多个进程/线程在等待同一资源时,每当资源可用,所有的进程/线程都来竞争资源的现象。

Nginx采用的是多进程的模式。假设Linux系统是2.6版本以前,当有一个客户端要连到Nginx服务器上,Nginx的N个进程都会去监听socket的accept的,如果全部的N个进程都对这个客户端的socket连接进行了监听,就会造成资源的竞争甚至数据的错乱。我们要保证的是,一个链接在Nginx的一个进程上处理,包括accept和read/write事件。

Nginx解决惊群和进程负载均衡处理的要点

1. Nginx的N个进程会争抢文件锁,当只有拿到文件锁的进程,才能处理accept的事件。

2. 没有拿到文件锁的进程,只能处理当前连接对象的read事件

3. 当单个进程总的connection连接数达到总数的7/8的时候,就不会再接收新的accpet事件。

4. 如果拿到锁的进程能很快处理完accpet,而没拿到锁的一直在等待(等待时延:ngx_accept_mutex_delay),容易造成进程忙的很忙,空的很空

具体的实现

ngx_process_events_and_timers 进程事件分发器

此方法为进程实现的核心函数。主要作用:事件分发;惊群处理;简单的负载均衡。

负载均衡:

1. 当事件配置初始化的时候,会设置一个全局变量:ngx_accept_disabled = ngx_cycle->connection_n / 8 - ngx_cycle->free_connection_n;

2. 当ngx_accept_disabled为正数的时候,connection达到连接总数的7/8的时候,就不再处理新的连接accept事件,只处理当前连接的read事件

惊群处理:

1. 通过ngx_trylock_accept_mutex争抢文件锁,拿到文件锁的,才可以处理accept事件。

2. ngx_accept_mutex_held是拿到锁的一个标志,当拿到锁了,flags会被设置成NGX_POST_EVENTS,这个标志会在事件处理函数ngx_process_events中将所有事件(accept和read)放入对应的ngx_posted_accept_events和ngx_posted_events队列中进行延后处理。

3. 当没有拿到锁,调用事件处理函数ngx_process_events的时候,可以明确都是read的事件,所以可以直接调用事件ev->handler方法回调处理。

4. 拿到锁的进程,接下来会优先处理ngx_posted_accept_events队列上的accept事件,处理函数:ngx_event_process_posted

5. 处理完accept事件后,就将文件锁释放

6. 接下来处理ngx_posted_events队列上的read事件,处理函数:ngx_event_process_posted

void ngx_process_events_and_timers(ngx_cycle_t *cycle) {ngx_uint_t flags;ngx_msec_t timer, delta;if (ngx_timer_resolution) {timer = NGX_TIMER_INFINITE;flags = 0;} else {timer = ngx_event_find_timer();flags = NGX_UPDATE_TIME;#if (NGX_WIN32)if (timer == NGX_TIMER_INFINITE || timer > 500) {timer = 500;}#endif}if (ngx_use_accept_mutex) {if (ngx_accept_disabled > 0) {ngx_accept_disabled--;} else {if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {return;}if (ngx_accept_mutex_held) {flags |= NGX_POST_EVENTS;} else {if (timer == NGX_TIMER_INFINITE|| timer > ngx_accept_mutex_delay) {timer = ngx_accept_mutex_delay;}}}}delta = ngx_current_msec;(void) ngx_process_events(cycle, timer, flags);delta = ngx_current_msec - delta;ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,"timer delta: %M", delta);ngx_event_process_posted(cycle, &ngx_posted_accept_events);if (ngx_accept_mutex_held) {ngx_shmtx_unlock(&ngx_accept_mutex);}if (delta) {ngx_event_expire_timers();}ngx_event_process_posted(cycle, &ngx_posted_events);}

ngx_trylock_accept_mutex 获取accept锁

1. ngx_accept_mutex_held是拿到锁的唯一标识的全局变量。

2. 当拿到锁,则调用ngx_enable_accept_events,将新的connection加入event事件上

3. 如果没有拿到锁,则调用ngx_disable_accept_events。

ngx_int_t ngx_trylock_accept_mutex(ngx_cycle_t *cycle) {if (ngx_shmtx_trylock(&ngx_accept_mutex)) {ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,"accept mutex locked");if (ngx_accept_mutex_held && ngx_accept_events == 0) {return NGX_OK;}if (ngx_enable_accept_events(cycle) == NGX_ERROR) {ngx_shmtx_unlock(&ngx_accept_mutex);return NGX_ERROR;}ngx_accept_events = 0;ngx_accept_mutex_held = 1;return NGX_OK;}ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,"accept mutex lock failed: %ui", ngx_accept_mutex_held);if (ngx_accept_mutex_held) {if (ngx_disable_accept_events(cycle, 0) == NGX_ERROR) {return NGX_ERROR;}ngx_accept_mutex_held = 0;}return NGX_OK;}

ngx_enable_accept_events 和 ngx_disable_accept_events

static ngx_int_t ngx_enable_accept_events(ngx_cycle_t *cycle) {ngx_uint_t i;ngx_listening_t *ls;ngx_connection_t *c;ls = cycle->listening.elts;for (i = 0; i < cycle->listening.nelts; i++) {c = ls[i].connection;if (c == NULL || c->read->active) {continue;}if (ngx_add_event(c->read, NGX_READ_EVENT, 0) == NGX_ERROR) {return NGX_ERROR;}}return NGX_OK;}static ngx_int_t ngx_disable_accept_events(ngx_cycle_t *cycle, ngx_uint_t all) {ngx_uint_t i;ngx_listening_t *ls;ngx_connection_t *c;ls = cycle->listening.elts;for (i = 0; i < cycle->listening.nelts; i++) {c = ls[i].connection;if (c == NULL || !c->read->active) {continue;}#if (NGX_HAVE_REUSEPORT)if (ls[i].reuseport && !all) {continue;}#endifif (ngx_del_event(c->read, NGX_READ_EVENT,NGX_DISABLE_EVENT) == NGX_ERROR) {return NGX_ERROR;}}return NGX_OK;}

ngx_event_process_posted 事件队列处理

对ngx_posted_accept_events或ngx_posted_events队列上的accept/read事件进行回调处理。

void ngx_event_process_posted(ngx_cycle_t *cycle, ngx_queue_t *posted) {ngx_queue_t *q;ngx_event_t *ev;while (!ngx_queue_empty(posted)) {q = ngx_queue_head(posted);ev = ngx_queue_data(q, ngx_event_t, queue);ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,"posted event %p", ev);ngx_delete_posted_event(ev);ev->handler(ev);}}

ngx_process_events 事件的核心处理函数

这个方法,我们主要看epoll模型下的ngx_epoll_process_events方法(ngx_epoll_module.c)

1. 如果抢到了锁,则会将accpet/read事件放到队列上延后处理。

2. 没有抢到锁的进程都是处理当前连接的read事件,所以直接进行处理。

                if ((revents & EPOLLIN) && rev->active) {#if (NGX_HAVE_EPOLLRDHUP)            if (revents & EPOLLRDHUP) {                rev->pending_eof = 1;            }            rev->available = 1;#endif            rev->ready = 1;                        if (flags & NGX_POST_EVENTS) {                queue = rev->accept ? &ngx_posted_accept_events                                    : &ngx_posted_events;                ngx_post_event(rev, queue);            } else {                            rev->handler(rev);            }        }

下一章节开始,我们会进入Nginx的事件模块。