C HAPTER 12 非同步 I/O A SYNCHRONOUS I NPUT /O UTPUT
C ONTENTS 12.1 I/O 的特性 12.2 四種 I/O 的類型 12.3 非同步 I/O 相關函數 12.4 非同步 I/O 程式範例
C ONTENTS 12.1 I/O 的特性 12.2 四種 I/O 的類型 12.3 非同步 I/O 相關函數 12.4 非同步 I/O 程式範例
I/O 的特性 (1/3) 作業系統提供了不同特性的 I/O ,方便使用者依據應用需求採用 主要可以從兩種特性分類 裝置層級:阻塞與非阻塞 (Blocking vs. Non-Blocking) 系統層級:同步與非同步 (Synchronous vs. Asynchronous) OS I/O Device 系統層級 裝置層級
I/O 的特性 (2/3) :裝置層級分類 阻塞 ( B LOCKING ) VS. 非阻塞 ( N ON - B LOCKING ) I/O 阻塞輸出入 (Blocking I/O) 在啟動 I/O 後,應用程式只能等待 I/O 裝置完成處理,而不能繼續往下執行 亦稱為 Polling I/O 裝置例子:紅外線遙控器 生活範例:打電話時,在受話方接通前撥電話方僅能等待 非阻塞輸出入 (Non-Blocking I/O) 在啟動 I/O 後,應用程式可以繼續執行與該次 I/O 資料無關之其他指令,不需 要等待 I/O 裝置完成 I/O 處理 亦稱為 Interrupt-Driven I/O 裝置例子:網路裝置 生活範例:寄發電子郵件時,不需要等對方收信即可寄出 Device-Level
同步輸出入 ( Synchronous I/O ) 應用程式啟動 I/O 系統程序之後,此程序會在作業系統核心完成 I/O 後才返回應用程式。 Linux 系統程序: read(), write() 生活範例:排隊結帳 ( 真正付款是當下 ) 非同步輸出入 ( Asynchronous I/O ) 應用程式啟動 I/O 系統程序之後,作業系統核心將不等 I/O 工作完成直接返回應用程式。 Linux 系統程序: aio_read(), aio_write() 生活範例:刷卡繳費 ( 真正付款是事後 ) I/O 的特性 (3/3): 系統層級分類 同步 ( S YNCHRONOUS ) VS. 非同步 ( A SYNCHRONOUS ) I/O 系統程序: System Call System-Level
C ONTENTS 12.1 I/O 的特性 12.2 四種 I/O 的類型 12.3 非同步 I/O 相關函數 12.4 非同步 I/O 程式範例
四種 I/O 的類型 (1/5) 組合前述兩種特性 ( 裝置與系統 ) 可以將 I/O 分類為四種: 同步阻塞 (Synchronous Blocking) 同步非阻塞 (Synchronous Non-blocking) 非同步阻塞 (Asynchronous Blocking) 非同步非阻塞 (Asynchronous Non-blocking) 本單元重點
四種 I/O 的類型 (2/5) :同步阻塞 I/O 同步阻塞 (Synchronous Blocking) 系統程序會等 I/O 已經全部完成才回傳 生活範例:非常固執的人,一定要得到全部想要的才罷休 Application Kernel Device read() No data ready system call Poll until device ready Data ready, Copy data to user space Data transfer to kernel Copy complete Return OK Process blocks in call to read() Process data
四種 I/O 的類型 (3/5) :同步非阻塞 I/O 同步非阻塞 (Synchronous Non-blocking) 系統程序在取得當下可以取得的資料後,立刻回傳,亦即不等全部資料傳輸完畢 生活範例:不貪心的人,有多少算多少,不一定要全部 Application Kernel Device read() No data ready system call Check device status Data ready, Copy data to user space Data transfer to kernel Copy complete Return OK Process repeatedly calls read(), until OK Process data EAGAIN read() system call EAGAIN No data ready Device ready read() system call
四種 I/O 的類型 (4/5) :非同步阻塞 I/O 非同步阻塞 (Asynchronous Blocking) 系統程序僅將 I/O 需求提出 ( 不等任何資料 ) ,資料的傳輸由 select, poll, epoll 等 阻塞的方式完成。又稱為 I/O Multiplexing ,主要應用於多重通道 生活例子:有權某的人,一開始不奢望,但是逐漸獲得全部想要的 Application Kernel Device aio_read() No data ready system call Select/Poll until device ready Data ready, Copy data to user space Data transfer to kernel Copy complete Deliver signal Process continues executing Signal handler process data return
四種 I/O 的類型 (5/5) :非同步非阻塞 I/O 非同步非阻塞 (Asynchronous Non-blocking) 系統程序僅將 I/O 需求提出 ( 不等任何資料 ) ,當資料備齊作業系統核心會用事後通知的方 式,例如 signal 、 callback function 等,使應用程式知道該次 I/O 已有資料可使用。 生活例子:主管級人物,吩咐屬下完成某項任務。屬下完成任務後,回報進度。 Application Kernel Device aio_read() No data ready system call Data ready, Copy data to user space Data transfer to kernel Copy complete Deliver signal Process continues executing Signal handler process data return Check device status Device ready
C ONTENTS 12.1 I/O 的特性 12.2 四種 I/O 的類型 12.3 非同步 I/O 相關函數 12.4 非同步 I/O 程式範例
非同步 I/O (1/7) A SYNCHRONOUS NON - BLOCKING I/O (AIO) 優點 因為 I/O 裝置比 CPU 慢很多,所以 AIO 可有効加速應用程式執行效率 缺點 容易出錯,由其資源衝突,因此需要使用 Mutual Exclusion, Semaphores 等機制保護資源 需要設計回調函數 ( callback function ) 、指定 I/O 完成通知用 Signal 等方式進行回應 類型 Process, Polling, Select/poll loops, Signals (Linux, BSD), Callback functions (Mac OS), completion queues/ports (Solaris) 主要可以分為: Polling 和 Interrupt-Driven ( 上述方法均可使用這兩種模擬 ) 範例 POSIX AIO API: aio_read, aio_write, aio_fsync, aio_error, aio_return, aio_suspend, aio_cancel, lio_listio
非同步 I/O (2/7) : 非同步 I/O 控制區塊 (AIO C ONTROL B LOCK, AIOCB) POSIX AIO 相關 API 會使用以下資料結構進行非同步 I/O #include struct aiocb { /* The order of these fields is implementation-dependent */ int aio_fildes; /* File descriptor */ off_t aio_offset; /* File offset */ volatile void *aio_buf; /* Location of buffer */ size_t aio_nbytes; /* Length of transfer */ int aio_reqprio; /* Request priority */ struct sigevent aio_sigevent;/* Notification method */ int aio_lio_opcode; /* Operation to be performed; lio_listio() only*/ /* Various implementation-internal fields not shown */ };
非同步 I/O(3/7) : 相關函數 使用 AIO 時 相關函數庫必須符合 POSIX-RTS Realtime Extension (POSIX 的即時擴充標準 ) 需要 include 表頭檔 aio.h aio_read() 、 aio_write() 、 aio_fsync() functions int aio_read(struct aiocb *aiocbp); int aio_write(struct aiocb *aiocbp); int aio_fsync(int op, struct aiocb *aciocbp); // op: O_SYNC or O_DSYNC (fsync or fdatasync) Enqueue a read/write/sync request. Asynchronous analog of POSIX I/O API read()/write()/fync(). aio_error() function int aio_error(const struct aiocb *aiocbp); returns the error status of an enqueued AIO request 0 表示 aiocbp 指定的非同步 I/O 操作請求完成 EINPROGRESS 表示 aiocbp 指定的非同步 I/O 操作請求正在處理中 ECANCELED 表示 aiocbp 指定的非同步 I/O 操作請求已經取消 一個正整數表示發生錯誤,此值與 read/write 回傳的錯誤值一樣 可用來監控非同 步 I/O 是否完成 可使非同步 I/O 排隊等待處理 ( 獨、寫、同步 )
aio_return () function ssize_t aio_return(struct aiocb * aiocbp ); returns the final return status for the asynchronous I/O request with control block pointed to by aiocbp 返回值相當於同步 I/O 中,read/write/fysnc/fdatasync 的返回值 在 AIO 尚未完成前 aio_return 的回傳值沒有意義,所以必須先用 aio_error() 函數確認 AIO 已完成 在 aio_error() 回傳非 EINPROGRESS 的回傳值後, aio_return() 只能被呼叫一次 aio_suspend () function int aio_suspend(const struct aiocb *const aiocb_list[], int nitems, const struct timespec *timeout) suspends the calling thread until one of the following occurs One or more of the asynchronous I/O requests in the list aiocb_list has completed A signal is delivered timeout is not NULL and the specified time interval has passed 非同步 I/O(4/7) 相關函數
非同步 I/O (5/7) 相關函數 aio_cancel () function int aio_cancel(int fd, struct aiocb *aiocbp) Attempt to cancel outstanding I/O requests on a specified file descriptor fd If fd is NULL, all AIO requests are canceled Return Values AIO_CANCELED: All requests were successfully canceled. AIO_NOTCANCELED: At least one of the requests specified was not canceled. AIO_ALLDONE: All requests had already been completed before the call. On successful cancellation, aio_error() returns ECANCELED aio_return() returns 1 If cancellation is unsuccessful, aio_error() returns EINPROGRESS aio_return() should not be called.
非同步 I/O (6/7) 相關函數 lio_listio () function int lio_listio(int mode, struct aiocb *const aiocb_list[], int nitems, struct sigevent *sevp); Initiates a list of I/O operations specified by aiocb_list. Parameter mode: LIO_WAIT: Call blocks until all AIO complete. LIO_NOWAIT: Queues AIO and returns aio_lio_opcode in aiocb_list specified the AIO to be intiated LIO_READ: Initiate a read operation. LIO_WRITE: Initiate a write operation. LIO_NOP: Ignore this control block. Return values 0: All AIO operations successfully queued (LIO_NOWAIT) or completed (LIO_WAIT).
非同步 I/O (7/7) 如何用 AIO 相關函數啟動與完成 AIO ? 使用 aio_read() or aio_write() 進行排隊等待處理 單一 AIO 主動檢查:使用 aio_error() 監控 AIO 是否完成 ( 直至非 EINPROGRESS 為止 ) 被動等通知:使用 aio_suspend() 等待通知 AIO 完成 確定完成後:使用 aio_return() 瞭解完成後的狀況 ( 傳輸資料量或錯誤等 ) 取消:使用 aio_cancel() 試著取消某個已經等帶處理或進行中的 AIO 多個 AIO 建構 aiocb_list[] ,包含各個 AIO 的資訊 ( 檔案、動作等 ) 使用 lio_listio() 函數將一整組的 AIO 進行排隊等待處理 需選擇 LIO_WAIT or LIO_NOWAIT 的方式
C ONTENTS 12.1 I/O 的特性 12.2 四種 I/O 的類型 12.3 非同步 I/O 相關函數 12.4 非同步 I/O 程式範例
非同步 I/O 程式範例 (1/12) 功能說明: 開啟命令列上參數中的每個檔案描述符 (file descriptor) 使用 aio_read () 將排入每個開啟檔案的非同步讀取 當有 I/O 完成,將以非同步號誌 (signal) 通知 使用 aio_error () 監控所有非同步 I/O 的進度 當所有 I/O 完畢,使用 aio_return () 取得所有 I/O 的結束狀態 當程式收到 SIGQUIT 號誌時,使用 aio_cancel () 取消所有尚未完成的 I/O
非同步 I/O 程式範例 (2/12) 程式可能的執行結果 $./a.out /dev/stdin /dev/stdin opened /dev/stdin on descriptor 3 opened /dev/stdin on descriptor 4 aio_error(): for request 0 (descriptor 3): In progress for request 1 (descriptor 4): In progress abc I/O completion signal received aio_error(): for request 0 (descriptor 3): I/O succeeded for request 1 (descriptor 4): In progress aio_error(): for request 1 (descriptor 4): In progress x I/O completion signal received aio_error(): for request 1 (descriptor 4): I/O succeeded All I/O requests completed aio_return(): for request 0 (descriptor 3): 4 for request 1 (descriptor 4): 2 檔案 3 的 資料輸入 檔案 4 的 資料輸入 監控所有 I/O 完成 開啟檔案 取得 I/O 完成狀態
非同步 I/O 程式範例 (3/12) 程式碼:前言 #include #define BUF_SIZE 20 /* Size of buffers for read operations */ #define errExit(msg) do { perror(msg); exit(EXIT_FAILURE); } while (0) #define errMsg(msg) do { perror(msg); } while (0)
非同步 I/O 程式範例 (4/12) : 程式碼: I/O 狀態資料結構定義 struct ioRequest { /* Application-defined structure for tracking I/O requests */ int reqNum; int status; struct aiocb *aiocbp; };
非同步 I/O 程式範例 (5/12) : 程式碼:號誌處理器與資料定義 static volatile sig_atomic_t gotSIGQUIT = 0; /* On delivery of SIGQUIT, we attempt to cancel all outstanding I/O requests */ static void /* Handler for SIGQUIT */ quitHandler(int sig) { gotSIGQUIT = 1; } #define IO_SIGNAL SIGUSR1 /* Signal used to notify I/O completion */ static void /* Handler for I/O completion signal */ aioSigHandler(int sig, siginfo_t *si, void *ucontext) { write(STDOUT_FILENO, "I/O completion signal received\n", 31); /* The corresponding ioRequest structure would be available as struct ioRequest *ioReq = si->si_value.sival_ptr; and the file descriptor would then be available via ioReq->aiocbp->aio_fildes */ }
非同步 I/O 程式範例 (6/12) : 程式碼:主程式初始化 int main(int argc, char *argv[]) { struct ioRequest *ioList; struct aiocb *aiocbList; struct sigaction sa; int s, j; int numReqs; /* Total number of queued I/O requests */ int openReqs; /* Number of I/O requests still in progress */ if (argc < 2) { fprintf(stderr, "Usage: %s...\n", argv[0]); exit(EXIT_FAILURE); } numReqs = argc 1;
非同步 I/O 程式範例 (7/12) : 程式碼:主程式中資料陣列定義 /* Allocate our arrays */ ioList = calloc(numReqs, sizeof(struct ioRequest)); if (ioList == NULL) errExit("calloc"); aiocbList = calloc(numReqs, sizeof(struct aiocb)); if (aiocbList == NULL) errExit("calloc");
非同步 I/O 程式範例 (8/12) : 程式碼:主程式中號誌處理器註冊 /* Establish handlers for SIGQUIT and the I/O completion signal */ sa.sa_flags = SA_RESTART; sigemptyset(&sa.sa_mask); sa.sa_handler = quitHandler; if (sigaction(SIGQUIT, &sa, NULL) == 1) errExit("sigaction"); sa.sa_flags = SA_RESTART | SA_SIGINFO; sa.sa_sigaction = aioSigHandler; if (sigaction(IO_SIGNAL, &sa, NULL) == -1) errExit("sigaction");
非同步 I/O 程式範例 (9/12) : 程式碼:主程式中開啟檔案、非同步讀取 ( 使用 AIO _ READ ) /* Open each file specified on the command line, and queue a read request on the resulting file descriptor */ for (j = 0; j < numReqs; j++) { ioList[j].reqNum = j; ioList[j].status = EINPROGRESS; ioList[j].aiocbp = &aiocbList[j]; ioList[j].aiocbp->aio_fildes = open(argv[j + 1], O_RDONLY); if (ioList[j].aiocbp->aio_fildes == -1) errExit("open"); printf("opened %s on descriptor %d\n", argv[j + 1], ioList[j].aiocbp->aio_fildes); ioList[j].aiocbp->aio_buf = malloc(BUF_SIZE); if (ioList[j].aiocbp->aio_buf == NULL) errExit("malloc"); ioList[j].aiocbp->aio_nbytes = BUF_SIZE; ioList[j].aiocbp->aio_reqprio = 0; ioList[j].aiocbp->aio_offset = 0; ioList[j].aiocbp->aio_sigevent.sigev_notify = SIGEV_SIGNAL; ioList[j].aiocbp->aio_sigevent.sigev_signo = IO_SIGNAL; ioList[j].aiocbp->aio_sigevent.sigev_value.sival_ptr = &ioList[j]; s = aio_read (ioList[j].aiocbp); if (s == -1) errExit("aio_read"); }
非同步 I/O 程式範例 (10/12) : 程式碼:主程式中取消 I/O 處理 ( 使用 AIO _ CANCEL ) openReqs = numReqs; /* Loop, monitoring status of I/O requests */ while (openReqs > 0) { sleep(3); /* Delay between each monitoring step */ if (gotSIGQUIT) { /* On receipt of SIGQUIT, attempt to cancel each of the outstanding I/O requests, and display status returned from the cancellation requests */ printf("got SIGQUIT; canceling I/O requests: \n"); for (j = 0; j < numReqs; j++) { if (ioList[j].status == EINPROGRESS) { printf(“ Request %d on descriptor %d:”, j, ioList[j].aiocbp->aio_fildes); s = aio_cancel (ioList[j].aiocbp->aio_fildes, ioList[j].aiocbp); if (s == AIO_CANCELED) printf("I/O canceled\n"); else if (s == AIO_NOTCANCELED) printf("I/O not canceled\n"); else if (s == AIO_ALLDONE) printf("I/O all done\n"); else errMsg("aio_cancel"); } gotSIGQUIT = 0; }
非同步 I/O 程式範例 (11/12) : 程式碼:主程式中監控 I/O 處理進度 ( 使用 AIO _ ERROR ) /* Check the status of each I/O request that is still in progress */ printf("aio_error():\n"); for (j = 0; j < numReqs; j++) { if (ioList[j].status == EINPROGRESS) { printf(" for request %d (descriptor %d): ", j, ioList[j].aiocbp->aio_fildes); ioList[j].status = aio_error(ioList[j].aiocbp); switch (ioList[j].status) { case 0: printf("I/O succeeded\n"); break; case EINPROGRESS: printf("In progress\n"); break; case ECANCELED: printf("Canceled\n"); break; default: errMsg("aio_error"); break; } if (ioList[j].status != EINPROGRESS) openReqs--; }
非同步 I/O 程式範例 (12/12) : 程式碼:主程式中取得 I/O 完成狀態 ( 使用 AIO _ RETURN ) printf("All I/O requests completed\n"); /* Check status return of all I/O requests */ printf("aio_return():\n"); for (j = 0; j < numReqs; j++) { ssize_t s; s = aio_return (ioList[j].aiocbp); printf(" for request %d (descriptor %d): %ld\n", j, ioList[j].aiocbp->aio_fildes, (long) s); } exit(EXIT_SUCCESS); }
R EFERENCES For different types of POSIX I/O (in Chinese) For POSIX Asynchronous I/O (Linux man pages in English)
T HE E ND