PHP網絡處理模塊FPM源碼分析
一個請求從瀏覽器到達PHP腳本執行中間有個必要模塊是網絡處理模塊,FPM是這個模塊的一部分,配合fastcgi協議實現對請求的從監聽到轉發到PHP處理,并將結果返回這條流程。
FPM采用多進程模型,就是創建一個master進程,在master進程中創建并監聽socket,然后fork多個子進程,然后子進程各自accept請求,子進程在啟動后阻塞在accept上,有請求到達后開始讀取請求 數據,讀取完成后開始處理然后再返回,在這期間是不會接收其它請求的,也就是說fpm的子進程同時只能響應 一個請求,只有把這個請求處理完成后才會accept下一個請求,這是一種同步阻塞的模型。master進程負責管理子進程,監聽子進程的狀態,控制子進程的數量。master進程與worker進程之間通過共享變量同步信息。
從main函數開始int main(int argc, char *argv[]){ zend_signal_startup(); // 將全局變量sapi_module設置為cgi_sapi_module sapi_startup(&cgi_sapi_module); fcgi_init(); // 獲取命令行參數,其中php-fpm -D、-i等參數都是在這里被解析出來的 // ... cgi_sapi_module.startup(&cgi_sapi_module); fpm_init(argc, argv, fpm_config ? fpm_config : CGIG(fpm_config), fpm_prefix, fpm_pid, test_conf, php_allow_to_run_as_root, force_daemon, force_stderr); // master進程會在這一步死循環,后面的流程都是子進程在執行。 fcgi_fd = fpm_run(&max_requests); fcgi_fd = fpm_run(&max_requests); request = fpm_init_request(fcgi_fd); // accept請求 // ....}main()函數展現了這個fpm運行完整的框架,可見整個fpm主要分為三個部分:
1、運行前的fpm_init();2、運行函數fpm_run();3、子進程accept請求處理。FPM中的事件監聽機制在詳細了解fpm工作過程前,我們要先了解fpm中的事件機制。在fpm中事件的監聽默認使用kqueue來實現,關于kqueue的介紹可以看看我之前整理的這篇文章kqueue用法簡介。
// fpm中的事件結構體struct fpm_event_s { // 事件的句柄 int fd; // 下一次觸發的事件 struct timeval timeout; // 頻率:多久執行一次 struct timeval frequency; // 事件觸發時調用的函數 void (*callback)(struct fpm_event_s *, short, void *); void *arg;// 調用callback時的參數 // FPM_EV_READ:讀;FPM_EV_TIMEOUT:;FPM_EV_PERSIST:;FPM_EV_EDGE:; int flags; int index;// 在fd句柄數組中的索引 // 事件的類型 FPM_EV_READ:讀;FPM_EV_TIMEOUT:計時器;FPM_EV_PERSIST:;FPM_EV_EDGE:; short which;};// 事件隊列typedef struct fpm_event_queue_s { struct fpm_event_queue_s *prev; struct fpm_event_queue_s *next; struct fpm_event_s *ev;} fpm_event_queue;以fpm_run()中master進程注冊的一個sp[0]的可讀事件為例:
void fpm_event_loop(int err){ static struct fpm_event_s signal_fd_event; // 創建一個事件:管道sp[0]可讀時觸發 fpm_event_set(&signal_fd_event, fpm_signals_get_fd(), FPM_EV_READ, &fpm_got_signal, NULL); // 將事件添加進queue fpm_event_add(&signal_fd_event, 0); // 處理定時器等邏輯 // 以阻塞的方式獲取事件 // module->wait()是一個接口定義的方法簽名,下面展示kqueue的實現 ret = module->wait(fpm_event_queue_fd, timeout);}int fpm_event_add(struct fpm_event_s *ev, unsigned long int frequency){ // ... // 如果事件是觸發事件則之間添加進queue中 // 對于定時器事件先根據事件的frequency設置事件的觸發頻率和下一次觸發的事件 if (fpm_event_queue_add(&fpm_event_queue_timer, ev) != 0) {return -1; } return 0;}static int fpm_event_queue_add(struct fpm_event_queue_s **queue, struct fpm_event_s *ev){ // ... // 構建并將當前事件插入事件隊列queue中 if (*queue == fpm_event_queue_fd && module->add) {// module->add(ev)是一個接口定義的方法簽名,下面展示kqueue的實現module->add(ev); } return 0;}// kqueue關于添加事件到kqueue的實現static int fpm_event_kqueue_add(struct fpm_event_s *ev) /* {{{ */{ struct kevent k; int flags = EV_ADD; if (ev->flags & FPM_EV_EDGE) { flags = flags | EV_CLEAR; } EV_SET(&k, ev->fd, EVFILT_READ, flags, 0, 0, (void *)ev); if (kevent(kfd, &k, 1, NULL, 0, NULL) < 0) {zlog(ZLOG_ERROR, 'kevent: unable to add event');return -1; } /* mark the event as registered */ ev->index = ev->fd; return 0;}FPM中關于kqueue的實現
// kqueue關于從kqueue中監聽事件的實現static int fpm_event_kqueue_wait(struct fpm_event_queue_s *queue, unsigned long int timeout) /* {{{ */{ struct timespec t; int ret, i; /* ensure we have a clean kevents before calling kevent() */ memset(kevents, 0, sizeof(struct kevent) * nkevents); /* convert ms to timespec struct */ t.tv_sec = timeout / 1000; t.tv_nsec = (timeout % 1000) * 1000 * 1000; /* wait for incoming event or timeout */ ret = kevent(kfd, NULL, 0, kevents, nkevents, &t); if (ret == -1) {/* trigger error unless signal interrupt */if (errno != EINTR) { zlog(ZLOG_WARNING, 'epoll_wait() returns %d', errno); return -1;} } /* fire triggered events */ for (i = 0; i < ret; i++) {if (kevents[i].udata) { struct fpm_event_s *ev = (struct fpm_event_s *)kevents[i].udata; fpm_event_fire(ev); /* sanity check */ if (fpm_globals.parent_pid != getpid()) {return -2; }} } return ret;}fpm_initfpm_init()負責啟動前的初始化工作,包括注冊各個模塊的銷毀時用于清理變量的callback。下面只介紹幾個重要的init。
fpm_conf_init_main負責解析php-fpm.conf配置文件,分配worker pool內存結構并保存到全局變量fpm_worker_all_pools中,各worker pool配置解析到 fpm_worker_pool_s->config 中。
所謂worker pool 是fpm可以同時監聽多個端口,每個端口對應一個worker pool。
fpm_scoreboard_init_main為每個worker pool分配一個fpm_scoreboard_s結構的內存空間scoreboard,用于記錄worker進程運行信息。
// fpm_scoreboard_s 結構struct fpm_scoreboard_s { union {atomic_t lock;char dummy[16]; }; char pool[32]; int pm; // 進程的管理方式 static、dynamic、ondemand time_t start_epoch; int idle;// 空閑的worker進程數 int active;// 繁忙的worker進程數 int active_max; // 最大繁忙進程數 unsigned long int requests; unsigned int max_children_reached; int lq; int lq_max; unsigned int lq_len; unsigned int nprocs; int free_proc; unsigned long int slow_rq; struct fpm_scoreboard_proc_s *procs[];};fpm_signals_init_mainfpm注冊自己的信號量,并設置監聽函數的處理邏輯。
int fpm_signals_init_main() /* {{{ */{ struct sigaction act; // 創建一個全雙工套接字 // 全雙工的套接字是一個可以讀、寫的socket通道[0]和[1],每個進程固定一個管道。 // 寫數據時:管道不滿不會被阻塞;讀數據時:管道里沒有數據會阻塞(可設置) // 向sp[0]寫入數據時,sp[0]的讀取將會被阻塞,sp[1]的寫管道會被阻塞,sp[1]中此時讀取sp[0]的數據 if (0 > socketpair(AF_UNIX, SOCK_STREAM, 0, sp)) {zlog(ZLOG_SYSERROR, 'failed to init signals: socketpair()');return -1; } if (0 > fd_set_blocked(sp[0], 0) || 0 > fd_set_blocked(sp[1], 0)) {zlog(ZLOG_SYSERROR, 'failed to init signals: fd_set_blocked()');return -1; } if (0 > fcntl(sp[0], F_SETFD, FD_CLOEXEC) || 0 > fcntl(sp[1], F_SETFD, FD_CLOEXEC)) {zlog(ZLOG_SYSERROR, 'falied to init signals: fcntl(F_SETFD, FD_CLOEXEC)');return -1; } memset(&act, 0, sizeof(act)); act.sa_handler = sig_handler; // 監聽到信號調用這個函數 sigfillset(&act.sa_mask); if (0 > sigaction(SIGTERM, &act, 0) ||0 > sigaction(SIGINT, &act, 0) ||0 > sigaction(SIGUSR1, &act, 0) ||0 > sigaction(SIGUSR2, &act, 0) ||0 > sigaction(SIGCHLD, &act, 0) ||0 > sigaction(SIGQUIT, &act, 0)) {zlog(ZLOG_SYSERROR, 'failed to init signals: sigaction()');return -1; } return 0;}// 所有信號共用同一個處理函數static void sig_handler(int signo) /* {{{ */{ static const char sig_chars[NSIG + 1] = {[SIGTERM] = 'T',[SIGINT] = 'I',[SIGUSR1] = '1',[SIGUSR2] = '2',[SIGQUIT] = 'Q',[SIGCHLD] = 'C' }; char s; int saved_errno; if (fpm_globals.parent_pid != getpid()) {return; } saved_errno = errno; s = sig_chars[signo]; zend_quiet_write(sp[1], &s, sizeof(s)); // 將信息對應的字節寫進管道sp[1]端,此時sp[1]端的讀數據會阻塞;數據可以從sp[0]端讀取 errno = saved_errno;}fpm_sockets_init_main每個worker pool 開啟一個socket套接字。
fpm_event_init_main這里啟動master的事件管理器。用于管理IO、定時事件,其中IO事件通過kqueue、epoll、 poll、select等管理,定時事件就是定時器,一定時間后觸發某個事件。同樣,我們以kqueue的實現為例看下源碼。
int fpm_event_init_main(){ // ... if (module->init(max) < 0) {zlog(ZLOG_ERROR, 'Unable to initialize the event module %s', module->name);return -1; } // ...}// max用于指定kqueue事件數組的大小static int fpm_event_kqueue_init(int max) /* {{{ */{ if (max < 1) {return 0; } kfd = kqueue(); if (kfd < 0) {zlog(ZLOG_ERROR, 'kqueue: unable to initialize');return -1; } kevents = malloc(sizeof(struct kevent) * max); if (!kevents) {zlog(ZLOG_ERROR, 'epoll: unable to allocate %d events', max);return -1; } memset(kevents, 0, sizeof(struct kevent) * max); nkevents = max; return 0;}fpm_runfpm_init到此結束,下面進入fpm_run階段,在這個階段master進程會根據配置fork出多個子進程然后master進程會進入fpm_event_loop(0)函數,并在這個函數內部死循環,也就是說master進程將不再執行后面的代碼,后面的邏輯全部是子進程執行的操作。
master進程在fpm_event_loop里通過管道sp來監聽子進程的各個事件,同時也要處理自身產生的一些事件、定時器等任務,來響應的管理子進程。內部的邏輯在介紹事件監聽機制時已經詳細說過。
int fpm_run(int *max_requests) /* {{{ */{ struct fpm_worker_pool_s *wp; /* create initial children in all pools */ for (wp = fpm_worker_all_pools; wp; wp = wp->next) {int is_parent;is_parent = fpm_children_create_initial(wp);if (!is_parent) { goto run_child;} } /* run event loop forever */ fpm_event_loop(0);run_child: /* only workers reach this point */ fpm_cleanups_run(FPM_CLEANUP_CHILD); *max_requests = fpm_globals.max_requests; return fpm_globals.listening_socket;}子進程處理請求回到main函數,fpm_run后面的邏輯都是子進程在運行。首先會初始化一個fpm的request結構的變量,然后子進程會阻塞在fcgi_accept_request(request)函數上等待請求。關于fcgi_accept_request函數就是死循環一個socket編程的accept函數來接收請求,并將請求數據全部取出。
...// 初始化requestrequest = fpm_init_request(fcgi_fd);zend_first_try { // accept接收請求 while (EXPECTED(fcgi_accept_request(request) >= 0)) {init_request_info();fpm_request_info();if (UNEXPECTED(php_request_startup() == FAILURE)) { // ...}if (UNEXPECTED(fpm_status_handle_request())) { goto fastcgi_request_done;}...// 打開配置文件中DOCUMENT_ROOT設置的腳本if (UNEXPECTED(php_fopen_primary_script(&file_handle) == FAILURE)) { ...}fpm_request_executing();// 執行腳本php_execute_script(&file_handle);... } // 銷毀請求request fcgi_destroy_request(request); // fcgi退出 fcgi_shutdown(); if (cgi_sapi_module.php_ini_path_override) {free(cgi_sapi_module.php_ini_path_override); } if (cgi_sapi_module.ini_entries) {free(cgi_sapi_module.ini_entries); }} zend_catch { ...} zend_end_try();以上就是PHP網絡處理模塊FPM源碼分析的詳細內容,更多關于PHP網絡處理模塊FPM的資料請關注好吧啦網其它相關文章!