第三讲 多线程程序设计 课程网站:CourseGrading http://judge. buaa.edu.cn 主讲教师: 赵长海 主讲教师: 赵长海 办公室: 新主楼G910 Email: zch@buaa.edu.cn Spring 2012
本章内容 3.1 线程基础 3.2 线程的基本操作 3.3 多线程的共享变量 3.4 线程同步机制 3.5 多线程信号处理 3.1 线程基础 本章内容 3.2 线程的基本操作 3.3 多线程的共享变量 3.4 线程同步机制 3.5 多线程信号处理 3.6 并发常见问题
3.1 线程基础 一.线程定义 线程可认为是进程内部的执行流,一个进程 内可包括多个线程,一个显著特点是线程间共享地址空间。
理解线程:进程内独立的执行流 进程开始执行时只有一个线程,称为主线程(main thread) 独立执行:线程间互相独立,与进程类似 共享地址空间:共享堆(指针)、数据段(静态变量、全局变量) 、代码段 独立的栈:临时变量可间接共享 Process Threads Within a Process
理解线程:并行的函数 多线程进程 func1 ( ) { .... } func2 ( ) func3 ( )
多线程间没有 严格的父子 关系 理解线程:并发线程执行 线程1 (主线程) 线程2 (对等线程) 线程上下文切换
Linux线程也是进程,与父进程共享地址空间,称为轻量级进程 理解线程:轻量级进程 fork实现 (创建进程) int clone(int (*fn)(void *), void *child_stack, int flags, void *arg, ... /* pid_t *ptid, struct user_desc *tlsctid */ ); pthread_create实现 (创建线程) clone系统调用可以指定子进程 共享父进程的哪一部分进程 上下文。 如果共享虚拟内存,则是线程
二.线程优势与风险 50000次fork() 与pthread_create()的时间比较,单位秒 Platform fork() real user sys Intel 2.8 GHz Xeon 5660 (12cpus/node) 4.4 0.4 4.3 0.7 0.2 0.5 AMD 2.3 GHz Opteron (16cpus/node) 12.5 1.0 1.2 1.3 AMD 2.4 GHz Opteron (8cpus/node) 17.6 2.2 15.7 1.4 0.3 IBM 4.0 GHz POWER6 (8cpus/node) 9.5 0.6 8.8 1.6 0.1 IBM 1.9 GHz POWER5 p5-575 (8cpus/node) 64.2 30.7 27.6 1.7 1.1 IBM 1.5 GHz POWER4 (8cpus/node) 104.5 48.6 47.2 2.1 1.5 INTEL 2.4 GHz Xeon (2 cpus/node) 54.9 20.8 0.9 INTEL 1.4 GHz Itanium2 (4 cpus/node) 54.5 22.2 2.0
线程优势 1) 提高性能(相比进程) 2) 便捷的数据共享(相比进程) 线程创建快:与进程共享资源,因此,创建线程不需要复制整个地址空间 上下文切换快:从同一个进程内的一个线程切换到另一个线程时需要载入的信息比进程少 2) 便捷的数据共享(相比进程) 不必通过内核就可以共享和传递数据。线程间通信比进程间通信高效、方便
线程风险 1) 增加程序复杂性 2) 难于调试 竞态条件、死锁……
3.2 线程的基本操作 一.线程创建 #include <pthread.h> typedef void * (start_routine)(void *); int pthread_create(pthread_t * tid, pthread_attr_t * attr, start_routine* f, void * arg); 创建线程 函数指针 pthread_attr_t* attr:设置线程属性,可以是NULL(传入参数) pthread_t* tid:新创建线程的ID (传出参数) void* arg:函数f的参数,如果要传多个参数,使用结构体指针 start_routine* f:在新线程中运行的函数地址(传入参数)
新线程的创建与执行 main() 主线程 pthread_ create(func) func() 新线程
线程ID pthread_t pthread_self(void); 二.线程标识 #include <pthread.h> pthread_t有可能是无符号整型(linux)、结构体、 指针,为了可移植性,尽量不要打印。
例 void* print(void* str) { pthread_t tid = pthread_self(); if(str != NULL) printf("%u: %s\n", (unsigned int)tid, (char*)str); return NULL; } int main() { pthread_t ht; char str[] = "helloworld"; pthread_create(&ht, NULL, print, str); return 0; } 演示:helloworld 线程ID 线程 属性 入口 函数 入口 函数 参数
pthread_create 进程结束,所有线程终止运行 所有线程并发执行 共享地址空间 原因:主线程(main)结束,调用了exit 与多进程类似,但上下文切换开销小很多; 共享地址空间 共享静态变量、全局变量、文件描述符等;
等待(回收)线程 int pthread_join(pthread_t tid, void **thread_return); 三.线程回收 #include <pthread.h> int pthread_join(pthread_t tid, void **thread_return); 等待(回收)线程 void** thread_return:线程函数返回的 (void *)指针赋给 thread_return所指的地址,可以是NULL(传出参数) pthread_t tid:要等待的线程ID (传入参数) 等待tid线程终止运行,如果该线程已经终止,立即返回; 否则,阻塞直到该线程终止 与wait不同,pthread_joint只能等待一个指定的线程 终止,不能等待任意一个线程终止
例 void* print(void* str) { pthread_t tid = pthread_self(); if(str != NULL) printf("%u: %s\n", (unsigned int)tid, (char*)str); return NULL; } int main() { pthread_t ht; char str[] = "helloworld"; void * tret; pthread_create(&ht, NULL, print, str); pthread_join(ht, &tret); return 0; }
…… 补充知识:指针 线程栈 %ebp(old) func %ebp i &i 返回地址 caller +24 Frame pointer 当前%ebp Stack pointer …… 高地址 补充知识:指针 void caller () { int i = 0; func(&i ); } ptr &ptr +16 void * ptr = NULL; , &ptr , void** ptr void * t_ret = thd_func(); *ptr = t_ret; int * func(int* i ) { *i = 534; return 0; } 汇编代码 movl $534, 24(%ebp), movl %eax, 16(%ebp),
补充知识:指针 如果…… void caller () { typedef voidPtr void* int i ; func(&i ); } typedef voidPtr void* voidPtr ptr = NULL; , &ptr , voidptr* ptr voidptr t_ret = thd_func(); *ptr = t_ret; int * func(int* i ) { *i = 534; return 0; }
补充知识:指针 如果…… void caller () { int* i = malloc(sizeof(int)) ; } i和ptr指向堆内空间 void* ptr = malloc(sizeof(void)); , &ptr , void** ptr void** t_ret = thd_func(); *ptr = t_ret; func(i ); int * func(int* i ) { *i = 534; return 0; }
void pthread_exit(void *rval_ptr); int pthread_cancel(pthread_t tid); 四.线程终止 线程终止的三种方式: (1)从线程函数返回; (2)被同进程内的其它线程取消(pthread_cancel); (3)调用pthread_exit函数; 线程终止 #include <pthread.h> void pthread_exit(void *rval_ptr); int pthread_cancel(pthread_t tid); 从当前线程终止 终止其它线程 pthread_cancel并不会立即终止另一个线程,只是发送了请求,另一个 线程在到达cancellation point(系统调用)的时候才终止。
例 演示:example_exit void * thr_fn1(void *arg) { /*线程函数*/ void * thr_fn1(void *arg) { printf("thread 1 returning\n"); return((void *)1); } /*线程函数*/ void * thr_fn2(void *arg) { printf("thread 2 exiting\n"); pthread_exit((void *)2); } 例 int main(void) { int err; pthread_t tid1, tid2; void *tret; //创建线程 err = pthread_create(&tid1, NULL, thr_fn1, NULL); err = pthread_create(&tid2, NULL, thr_fn2, NULL); //等待线程终止 err = pthread_join(tid1, &tret); printf("thread 1 exit code %d\n", (int)tret); err = pthread_join(tid2, &tret); printf("thread 2 exit code %d\n", (int)tret); exit(0); } 演示:example_exit
Figure 12.14. Cancellation points defined by POSIX.1 accept mq_timedsend putpmsg sigsuspend aio_suspend msgrcv pwrite sigtimedwait clock_nanosleep msgsnd read sigwait close msync readv sigwaitinfo connect nanosleep recv sleep creat open recvfrom system fcntl2 pause recvmsg tcdrain fsync poll select usleep getmsg pread sem_timedwait wait getpmsg pthread_cond_timedwait sem_wait waitid lockf pthread_cond_wait send waitpid mq_receive pthread_join sendmsg write mq_send pthread_testcancel sendto writev mq_timedreceive putmsg sigpause
pthread_cancel 调用风险 需要时学习 如果线程处于临界区的Cancellation points点时,线程取消,此时会引起死锁 需要时学习 …… 演示:example_cancel 线程取消时执行 #include <pthread.h> void pthread_cleanup_push(void (*rtn)(void *), void *arg); void pthread_cleanup_pop(int execute);
void pthread_detach(pthread_t tid); 五.线程分离 线程默认属性 类似僵尸进程 在任何时间点上,线程是 可结合的(joinable) 或者是 分离的(detached)。可结合的线程如果没有被回收,存储资源(栈等)不会被释放;分离的线程不能被回收或者杀死,线程终止时,存储资源自动释放。 线程 分离 分离线程 分离可结合线程tid #include <pthread.h> void pthread_detach(pthread_t tid); 以pthread_self返回值作参数,可以分离自己
例 int main() { while (true) { /*等待请求到来(例如http请求)*/ waitQuest();//伪代码 pthread_create(&ht, NULL, print, str); pthread_detach(&ht); } return 0; 分离线程,不阻塞 int main() { char* str = "helloworld" pthread_create(&ht, NULL, print, str); pthread_join(&ht, NULL); return 0; } 阻塞,等待线程结束
3.3 多线程的共享变量 共享变量 主线程 线程函数(例程) msgs是自动变量(栈内) 演示:example_shared #define N 2 void *thread(void *arg);/*线程函数声明*/ char **ptr; /*全局变量*/ int main(){ int i; pthread_t tid; char *msgs[N]= {“Hello from T1”, “Hello from T2”}; ptr=msg; for(i = 0; i < N; i++) pthread_create(&tid, NULL, thread, (void *)i); pthread_exit(NULL); } void *thread(void *arg) { int myid = (int)arg; static int cnt = 0;/*本地静态变量*/ printf(“[%d]: %s(cnt=%d)\n”, myid, ptr[myid], ++cnt); 主线程 共享变量 msgs是自动变量(栈内) 演示:example_shared 线程函数(例程)
C语言存储类型 全局变量 本地静态变量 本地自动变量 一个变量v是共享的,当且仅当它的一个实例 被一个以上的线程引用。 定义在函数之外的变量。运行时,一个全局变量在整个地址空间只有一个实例,任何线程都可以引用 本地静态变量 定义在函数内部有static属性的变量。和全局变量一样,运行时,在整个地址空间只有一个实例,任何线程都可以引用 本地自动变量 定义在函数内部没有static属性的变量。运行时,每一个线程栈内都有一份本地自动变量实例。其它线程可以间接引用本地自动变量
3.4 线程同步 利用共享变量进行线程间通信, 但容易引起“同步错误” 示例:利用多个线程计数
例 最后计数结果是 演示:example_badcnt void * thread(void *arg) { int i; /*线程函数*/ void * thread(void *arg) { int i; for ( i = 0; i < N; i++) cnt++; return NULL; } #define N 100000 volatile int cnt = 0; /* 计数器*/ void *thread(void *arg); int main() { pthread_t tid1, tid2; /*创建线程并等待它们结束*/ pthread_create(&tid1, NULL, thread, NULL); pthread_create(&tid2, NULL, thread, NULL); pthread_join(tid1, NULL); pthread_join(tid2, NULL); /*打印结果*/ printf("cnt = %d\n", cnt); return 0; } 最后计数结果是 演示:example_badcnt
汇编代码 计数错误原因 for ( i = 0; i < N; i++) cnt++; thread: pushl %ebp movl %esp, %ebp subl $16, %esp movl $0, -4(%ebp) jmp .L4 .L5: movl cnt, %eax addl $1, %eax movl %eax, cnt addl $1, -4(%ebp) .L4: cmpl $99999, -4(%ebp) jle .L5 movl $0, %eax leave ret for ( i = 0; i < N; i++) cnt++; 将cnt(内存)放入寄存器eax 寄存器eax值加1 寄存器eax值存入cnt (内存)
全局变量cnt最后的计数结果可能是? (cnt初始0) 2*N 最大: 最小: 2
互斥 互斥初始化和销毁 int pthread_mutex_init(pthread_mutex_t *mutex, 可以看作一把锁,保护共享资源同一时刻只能被一个线程访问。 互斥 互斥初始化和销毁 #include <pthread.h> int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr); 互斥变量类型 int pthread_mutex_destroy(pthread_mutex_t *mutex);
int pthread_mutex_lock(pthread_mutex_t *mutex); 加锁和解锁 #include <pthread.h> int pthread_mutex_lock(pthread_mutex_t *mutex); 如果mutex已经被锁,调用线程阻塞,直到mutex被解锁。 int pthread_mutex_trylock(pthread_mutex_t *mutex); 如果mutex已经被锁,不阻塞,返回EBUSY 。 int pthread_mutex_unlock(pthread_mutex_t *mutex); 解锁
初始化 使用 使用 释放 调用方式 pthread_mutex_t mutex; pthread_mutex_init(&mutex); THREAD A: pthread_mutex_lock(&mutex); cnt++; pthread_mutex_unlock(&mutex); THREAD B: cnt--; pthread_nutex_destroy(&mutex); 使用 使用 释放
初始化 使用 使用 调用方式2 pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; THREAD A: pthread_mutex_lock(&mutex); cnt++; pthread_mutex_unlock(&mutex); THREAD B: cnt--; 使用 使用
例 演示:example_mutex pthread_mutex_t mutex; void * thread(void *arg) { /*线程函数*/ void * thread(void *arg) { int i; for ( i = 0; i < N; i++) { cnt++; } return NULL; #define N 100000 volatile int cnt = 0; /* 计数器*/ void *thread(void *arg); int main() { pthread_t tid1, tid2; /*创建线程并等待它们结束*/ pthread_create(&tid1, NULL, thread, NULL); pthread_create(&tid2, NULL, thread, NULL); pthread_join(tid1, NULL); pthread_join(tid2, NULL); /*打印结果*/ printf("cnt = %d\n", cnt); return 0; } pthread_mutex_lock(&mutex); pthread_mutex_t mutex; pthread_mutex_unlock(&mutex); pthread_mutex_init(&mutex, NULL); pthread_mutex_destroy(&mutex); 演示:example_mutex
int sem_init(sem_t *sem, 0, unsigned int value); 二.信号量(semaphore) 信号量初始值 #include <semaphore.h> int sem_init(sem_t *sem, 0, unsigned int value); 信号量初始化和销毁 信号量变量类型 int sem_destroy(sem_t *sem); 为0,信号量可以被线程共享 非0,信号量可以被进程共享
int sem_wait (sem_t *sem); P和V #include <semaphore.h> int sem_wait (sem_t *sem); 如果信号量的值大于0,那么进行减一的操作,函数立即返回. 如果信号量当前等于0,那么调用就会阻塞,直到信号量大于0 int sem_trywait(sem_t *sem); 如果信号量的值大于0,那么进行减一的操作,函数立即返回. 如果信号量当前等于0,函数立即返回,errno等于EAGAIN int sem_post(sem_t *sem); 信号量的值增加1,阻塞在这个信号量上的一个线程将会被唤醒
例 生产者-消费者 “生产者”线程不断向共享缓冲区写人数据(即生产数据),而“消费者”线程不断从共享缓冲区读出数据(即消费数据);共享缓冲区共有n个;任何时刻只能有一个线程可对共享缓冲区进行操作。
设计要点(阻塞版本): 缓冲区槽满,生产者要阻塞 缓冲区所有槽空,消费者要阻塞 同一时刻只能有一个线程读或者写缓冲区
缓冲区数据结构 缓冲区初始化 b c d front rear buf typedef struct { int *buf; int n; int front; int rear; sem_t mutex; /*protects accesses to buf*/ sem_t nEmpty; /*counts available slots*/ sem_t nStored; /*counts available items*/ } buf_t; front rear buf b c d 1 2 …… n-1 /*初始换缓冲区,创建n个槽的空间*/ void buf_init(buf_t *bp, int n) { bp->buf = calloc(n, sizeof(int)); bp->n = n; bp->front = bp->rear = 0; sem_init(&bp->mutex, 0, 1); sem_init(&bp->nEmpty, 0, n); sem_init(&bp->nStored, 0, 0); } 缓冲区初始化
共享缓冲区读写 演示:example_buffer /*写一项到缓冲区尾部*/ void buf_write(buf_t *bp, int item) { int idx; idx = (bp->rear)++; bp->rear = (bp->rear) % (bp->n); bp->buf[idx] = item; } /*读并删除缓冲区头部数据*/ int buf_read(buf_t *bp) { int item, idx; idx = (bp->front)++; bp->front = (bp->front ) % (bp->n); item = bp->buf[idx]; return item; } sem_wait(&bp->nEmpty); sem_post(&bp-> nEmpty); sem_post(&bp-> nStored); sem_wait(&bp-> nStored); sem_wait(&bp->mutex); sem_post(&bp->mutex);
条件变量(Condition Variables) 三.条件变量 是一种等待某事件发生的一种同步机制。即,线程挂起,直到共享数据上的某些条件得到满足。 主要包括两个部分: 一部分线程等待"条件变量的条件成立"; 一部分线程判断“条件成立”,唤醒等待线程。 条件变量(Condition Variables) 该线程判断“条件成立”,发出唤醒信号 等待被唤醒
int pthread_cond_destroy(pthread_cond_t *cond); #include <pthread.h> int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr) 条件变量初始化和销毁 条件变量类型 int pthread_cond_destroy(pthread_cond_t *cond); 一般设置为NULL,使用缺省属性
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex); 等待条件成立 #include <pthread.h> int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex); 注意:互斥参数,为什么? int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime); 限时等待
int pthread_cond_signal(pthread_cond_t *cond); 唤醒等待线程 #include <pthread.h> int pthread_cond_signal(pthread_cond_t *cond); 唤醒一个等待(条件成立)线程 int pthread_cond_broadcast(pthread_cond_t *cond); 唤醒所有等待(条件成立)线程
pthead_mutex_t mutex; 调用方式 pthread_mutex_signal为了实现的简单,可能会唤醒多个线程。 pthread_cond_t cond; pthead_mutex_t mutex; 为什么用while,而不用if 主线程: pthread_cond_init(&cond); 保护共享变量 线程 1、2……: pthread_mutex_lock(&mutex); ++ count; pthread_cond_signal(&cond); pthread_mutex_unlock(&mutex); 线程 A、B……: pthread_mutex_lock(&mutex); while (count <= 0) pthread_cond_wait(&cond, &mutex) pthread_mutex_unlock(&mutex); 为什么将mutex作为参数 主线程: pthread_cond_destroy(&cond); 线程挂起前,unlock mutex
调用方式2 pthread_cond_t cond = PTHREAD_COND_INITIALIZER; pthead_mutex_t mutex =PTHREAD_MUTEX_INITIALIZER; 线程 1、2……: pthread_mutex_lock(&mutex); ++ count; pthread_cond_signal(&cond); pthread_mutex_unlock(&mutex); 线程 A、B……: pthread_mutex_lock(&mutex); while (count <= 0) pthread_cond_wait(&cond, &mutex) pthread_mutex_unlock(&mutex);
例 A B C D 简单的消息队列 qtail qhead ^ #define MAXMSGSIZE 128 /*消息最大长度*/ typedef struct smsg_node { struct smsg_node *m_next; char data[MAXMSGSIZE]; /*存放消息*/ int msg_size; /*实际消息长度*/ } smsg_node_t; 例 简单的消息队列 smsg_node_t *qtail = NULL; /*queue tail*/ smsg_node_t *qhead = NULL; /*queue head*/ pthread_cond_t qready = PTHREAD_COND_INITIALIZER; pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER; qtail ^ A B C D qhead
队列操作 演示:example_mq_cond /*写一项到队列尾部*/ /*从队列头部取一个消息*/ void msg_receive(char * msg_ptr, int* msg_size) { smsg_node_t *mp; pthread_mutex_lock(&qlock); while (qhead == NULL) pthread_cond_wait(&qready, &qlock); mp = qhead; qhead = qhead->m_next; if(mp == qtail) qtail = NULL; pthread_mutex_unlock(&qlock); /* now process the message mp */ *msg_size = mp->msg_size; memcpy(msg_ptr, mp->data, mp->msg_size); free(mp); } /*写一项到队列尾部*/ void msg_send(const char *msg_ptr, int msg_size) { smsg_node_t* msg = malloc(…); memcpy(msg->data, msg_ptr, msg_size); msg->msg_size = msg_size; msg->m_next = NULL; pthread_mutex_lock(&qlock); if(qtail != NULL) { qtail->m_next = msg; } qtail = msg; if(qhead == NULL) qhead = msg; pthread_mutex_unlock(&qlock); pthread_cond_signal(&qready);
唤醒丢失问题 什么时候发生唤醒丢失? 1.没有线程正在处在阻塞等待的状态下 调用pthread_cond_signal,必然会唤醒一个线程吗? 唤醒丢失问题 什么时候发生唤醒丢失? 1.没有线程正在处在阻塞等待的状态下 2. 一个线程调用pthread_cond_signal(broadcast) 函数,另一个线程正处在测试条件变量和调用pthread_cond_wait函数之间
四. CAS指令 compare-and-swap指令( CMPXCHG ),同步机制实现的基石。 指令语义:将寄存器内的值与给定值(oldval)比较,当前且仅当相等时,寄存器内的值被赋一个新的值(newval),返回寄存器内存放的旧值(*reg)。 CAS指令: C语言表达CAS指令 int compare_and_swap (int* reg, int oldval, int newval) { int old_reg_val = *reg; if (old_reg_val == oldval) *reg = newval; return old_reg_val; } 三步操作由CAS一条指令完成
如何使用CAS实现互斥 int reg = 1;//对比pthread_mutex_t mutex void mutex_lock() { int compare_and_swap (int* reg, int oldval, int newval) int reg = 1;//对比pthread_mutex_t mutex void mutex_lock() { } int old_reg; do { old_reg = compare_and_swap(®, 1, 0); } while(reg == old_reg); void mutex_unlock() { } compare_and_swap(®, 0, 1);
异步信号由哪一个线程接收 3.5 多线程信号处理 同步信号(synchronous signals): 进程(线程)的某个操作产生的信号称,例如SEGILL、SIGSEGV、SIGFPE等。 同步信号(synchronous signals): 类似用户击键这样的进程外部事件产生的信号叫做异步信号,例如kill命令、ctrl+c产生的信号。 异步信号(asynchronous signals): 异步信号由哪一个线程接收 某线程内产生的同步信号,由谁接收
int sigwait(const sigset_t *set, int *sig); #include <signal.h> int pthread_sigmask(int how, const sigset_t *set, sigset_t *oldset); 线程信号处理函数 类似sigprocmask,阻塞或者取消阻塞信号 向另外一个线程发送信号 int pthread_kill(pthread_t thread, int sig); int sigwait(const sigset_t *set, int *sig); 信号等待函数,挂起线程,直到set集合内的信号到达; sigwait的特别之处:自动取消阻塞set集合内的信号
多线程“异步信号”处理模式 异步信号由哪一个线程接收 如果所有线程都未阻塞该信号,则接收线程不确定,可能是任意线程; 如果只有一个线程未阻塞该信号,则信号将送达该线程。 多线程“异步信号”处理模式
阻塞信号 取消阻塞信号,并等待信号到达 信号处理线程,继承了阻塞的信号 演示: example_asyn_sig int main( ) { void * sig_thread(void *arg) { sigset_t *set = (sigset_t *) arg; int s, sig; for (;;) { sigwait(set, &sig); printf("Signal handling thread got signal %d\n", sig); } 取消阻塞信号,并等待信号到达 int main( ) { pthread_t thread; sigset_t set; /* 主线程阻塞信号,其它线程继承信号阻塞 */ sigemptyset(&set); sigaddset(&set, SIGQUIT); sigaddset(&set, SIGUSR1); pthread_sigmask(SIG_BLOCK, &set, NULL); pthread_create(&thread, NULL, &sig_thread, (void *) &set); /* 主线程继续创建其它线程或者进行其它工作*/ pause(); } 阻塞信号 信号处理线程,继承了阻塞的信号
示例 某线程内产生的同步信号,由谁接收 线程内产生的同步信号(SIGSEGV等)由本线程接收。 捕捉同步信号的用途之一:程序故障时,打印错误信息(堆栈、错误发生的位置),方便调试 示例
注册信号处理函数,处理本线程产生的同步信号 void* sig_thread(void* arg){ int i = 0; int* nullptr = NULL; signal(SIGSEGV, handler); /*generate SIGSEGV signal*/ *nullptr = i; return NULL; } void handler(int sig) { printf("Catch signal %d\n", sig); exit(0); } 注册信号处理函数,处理本线程产生的同步信号 int main(){ pthread_t ht; pthread_create(&ht, NULL, sig_thread, NULL); pthread_join(ht, NULL); return 0; } 演示: example_syn_sig
3.6 并发常见问题 线程安全 竞争 死锁 假共享 线程个数限制
线程安全: 线程不安全函数分类: 一.线程安全(thread safety) 一段代码(函数)被称为线程安全的,当且仅当被多个线程反复调用时,一直产生正确的结果。 线程安全: 线程不安全函数分类:
例 1.不保护共享变量 1. 利用同步操作保护共享变量 extern int errno; int open(const char *path, int oflag, ... ) { …… errno = EACCES; } __thread int errno; 例 1. 利用同步操作保护共享变量 互斥、信号量等 2. 使用线程本地存储(Thread-local storage):同样 的变量名,其实例在不同的线程中位于不同的存储位置 _ _thread关键字(编译器支持)或pthread Thread-Specific Data 3. 原子操作 Linux/Unix的atomic_set等、java的AtomicInteger等
2.共享资源操作未保护 1.尽量在一个线程内进行共享资源的操作 2.其它办法? int function() { char *filename="/etc/config"; FILE *config; if(file_exist(filename)){ config=fopen(filename); fputs(config, ….); } 取消阻塞信号,并等待信号到达 1.尽量在一个线程内进行共享资源的操作 2.其它办法?
例 3.返回指向静态变量的指针 调用时采用“加锁-拷贝(lock-and-copy)” char * getenv(const char *name) { static char envbuf[ARG_MAX]; …… return envbuf; } 调用时采用“加锁-拷贝(lock-and-copy)” char user [64]; pthead_mutex_lock(&mutex); strcpy(user, getenv(“USER”)); Pthead_mutex_lock(&mutex); 例
例 竞争: 二.竞争(race) 当一个程序的正确性依赖于一个线程在另一个线程到达y点之前,到达它的控制流中的x点时,就会发生竞争。 int main() { pthread_t tid[N]; int i; for(i = 0; i < N; i++) pthread_create(&tid[i], NULL, thread, &i); pthread_join(tid[i], NULL); exit(0); } void *thread(void * arg) { int myid = *((int*)arg); printf("Hello from thread %d\n", myid); return NULL; 例 演示: example_race
三.死锁(deadlock) 一组线程被阻塞了,等待一个永远也不会为真的条件。 死锁: 互斥锁加锁顺序规则:如果对于程序中每对互斥锁(s, t),每个同时占用s和t的线程都按照相同的顺序对它们加锁,那么这个程序就是无死锁的。
例 假共享: 四.假共享(false sharing) 变量声明 线程1内 线程0内 分别被两个线程使用的变量,由于存储位置靠得太近,有可能被放到一个cache line(通常64字节)内,此时会引起假共享,严重影响性能。 假共享: struct foo { volatile int x; volatile int y; }; struct foo f; 变量声明 例 int sum_y() { int s = 0; for (int i = 0; i < 1000000; ++i) s += f. y; return s; } 线程1内 void inc_x(){ for (int i = 0; i < 1000000; ++i) ++f. x; } 线程0内
Cache中的数据组成以块为单位,该块称为cache line,典型大小是64~128字节,cache line是从内存读写的最小单位。 x(CPU 0)被修改,y(CPU 1)所在的cache line被置无效 缓存一致性 对于多核(多处理)CPU,如果一个cache line(Li)内的数据被更改, 其它 Cache上Li的副本,都会被标识为无效,需要重新从内存读取。 内存延迟250时钟周期, L1 cache 3个时钟周期
避免假共享 x与y存储位置间填充数据, 确保x与y不处于一个cache line struct foo { volatile int x; volatile int y; }; struct foo f; struct foo { volatile int x; char padding[60]; volatile int y; }; struct foo f; x与y存储位置间填充数据, 确保x与y不处于一个cache line
创建多少个线程最为合适? 性能最高、充分发挥计算能力 四.线程个数限制 1. 所有线程执行计算密集型计算 1. 所有线程执行计算密集型计算 创建线程 == CPU核数, 创建线程过多,会引起频繁的上下文切换 2. 既有计算密集型线程,也有I/O线程 创建线程 == CPU核数 + n(个网络I/O线程) + 1(个磁盘I/O线程) 如果有多块磁盘,可以启用多个磁盘I/O线程,但要保证每个线程读不同磁盘;
本 章 内 容 小 结
2. 线程操作 3. 线程同步 4. 线程信号处理 5. 线程常见问题及解决方法 多 线 程 编 1. 理解线程 共享地址空间 1. 理解线程 并行的函数、轻量级进程 共享地址空间 共享数据段、内存映射段、堆 2. 线程操作 创建线程(pthead_create) 多 线 程 编 回收线程、分离线程 可结合的线程被回收之后才会释放占用的资源 可分离的线程终止运行,占用资源自动释放 线程退出(pthread_exit, pthread_cancel) 3. 线程同步 共享变量 互斥、信号量、条件变量、CAS 4. 线程信号处理 同步信号处理 异步信号处理 5. 线程常见问题及解决方法