多核结构与程序设计 杨全胜 http://www.njyangqs.com/ 东南大学成贤学院计算机系
使用Windows Threads编程 内容 Windows threads库 使用Windows thread API 同步
Windows threads库 Win32 API是内核与应用程序之间的接口。 它是一个能够被应用程序调用的系统函数包。 MFC是微软基本函数类库,它把Win32 API 封装成类。 .NET框架包含公共语言运行库(CLR)和框架类 库(FCL)。.NET基本类中的 System.Threading命名空间提供支持线程的 大量类和接口。
使用Windows Threads编程 内容 Windows threads库 使用Windows thread API 同步
使用Windows thread API Win32* HANDLE类型 每个Windows Object对象都被以HANDLE(句柄)类型的变量引用。 指向内核的Object(对象) Thread, process, file, event, mutex, semaphore等. Object创建函数返回HANDLE Object通过它的handle(句柄)来控制 不要直接操作object,应利用句柄使用各类函数
使用Windows thread API 创建Win32* 线程 HANDLE CreateThread( LPSECURITY_ATTRIBUTES lpThreadAttributes, SIZE_T dwStackSize, LPTHREAD_START_ROUTINE lpStartAddress, LPVOID lpParameter, DWORD dwCreationFlags, LPDWORD lpThreadId ) ; lpThreadAttributes: 新线程的安全属性 dwStackSize: 新线程的栈大小 lpStartAddress : 一个指向线程函数以及启动线程后在代码段的起始地址的指针 pvParam : 传递到线程函数的参数 dwCreationFlags : 创建标志(0或CREATE_SUSPENDED). lpThreadId :[输出] 指向创建函数返回的线程标识的变量的指针 This is the function prototype for CreateThread(), which creates a Windows thread that begins execution of the function provided as the third parameter. The CreateThread() function returns a HANDLE that is used as a reference to the newly created thread. The following parameters are used in the CreateThread() function: lpThreadAttributes: Every kernel object has security attributes. Therefore, the first parameter, ThreadAttributes, allows the programmer to specify the security attributes of the thread. This parameter is a pointer to a SECURITY_ATTRIBUTES structure, and it can be NULL. A null value sets up default security for the object. The security of the object mainly decides which processes can access and manipulate the object. dwStackSize: The StackSize parameter allows the user to specify the amount of stack space that needs to be reserved for the thread. Windows provide the threads with a default stack size by setting the parameter to zero. The default value of the stack size is 1 megabyte. However, you can create different stack sizes within threads to optimize memory usage. lpStartAddress: The third parameter, StartAddress, is a pointer to the function and the start address of a thread in the code after its launch. It points to the global function on which the thread will begin its execution. In response, the operating system returns a HANDLE to identify that thread. This implies that StartAddress specifies a function pointer to the actual code that the thread runs. lpParameter: The fourth parameter is a pointer and a single, 32-bit parameter value that is passed to the thread. The thread function can accept a single LPVOID parameter. The parameter is passed as an argument to the StartAddress. If no parameters are accepted by the thread function, this value can be NULL. If the thread function accepts more than one parameter, you can encapsulate all the parameters into a single structure. Then, you can send a pointer to the structure as the fourth parameter of the CreateThread() function call and decompose the structure into its components as the first thing done in the threaded version of the function. dwCreationFlags: The fifth parameter specifies various configuration options and allows you to control the creation of the thread, such as starting as a suspended thread. The default (using ‘0’ for this parameter) is to start the thread as soon as it has been created by the system. To create a thread that is suspended after creation, use the defined constant CREATE_SUSPENDED. lpThreadId: ThreadId is the unique number assigned to the thread at creation. This is the variable that receives the thread’s identifier. It is an out parameter, and the value assigned will be unique among all the running threads. If the CreateThread() function fails to create a thread, it will return NULL. You can find the reason for the failure by calling the GetLastError() function. It is recommended that you check error codes from all Windows API calls. After successfully creating a thread, the CreateThread() function returns the HANDLE of the new thread. After you create the thread and receive its HANDLE, you can set the thread’s priority. You can use SetThreadpriority() function to set the priority value for the specified thread. This value, together with the priority class of the thread's process, determines the thread's base priority level. Alternative functions are available to create threads within the Microsoft C library. The CreateThread() function provides a flexible and an easy-to-use mechanism for creating threads in Windows applications. However, the CreateThread() function does not perform per thread initialization of C runtime data blocks and variables. Therefore, you cannot use the CreateThread() function in any application that uses the C runtime library. However, Microsoft provides two additional methods, _beginthread and _beginthreadex(). Avoid using _beginthread method because it does not include security flags or attributes and does not return a thread ID. Arguments for _beginthreadex() are the same as the CreateThread() function. Therefore, it is recommended that you use _beginthread() function for writing applications that use the C run time library.
使用Windows thread API LPTHREAD_START_ROUTINE参数 CreateThread() 希望该参数指向一个全局函数(线程函数),该函数: 返回DWORD 调用的类型是 WINAPI 只有一个参数:LPVOID (void *) 线程函数: DWORD WINAPI ThreadFunc (LPVOID lpvThreadParm);
使用Windows thread API 使用明确的线程 明确标识出可以并行执行的代码部分 将这些代码封装到函数中 如果代码已经是一个函数,需要写一个驱动函数来协调多个线程的工作。 增加CreateThread( ) 调用来指派线程到执行函数
使用Windows thread API 管理线程 设置优先权 线程优先权 = 进程优先权 + 线程相对优先权 Bool SetThreadPriority (HANDLE hThread , int nPriority) 为指定的线程设置优先权值 hThread: 要修改优选权值的线程的句柄 nPriority: 为线程设置的优先权值
使用Windows thread API 管理线程 挂起和恢复线程 如果线程中的挂起计数器 (suspend count)=0, 线程被执行. 如果它>0, 调度器将不会调度这个线程 DWORD SuspendThread(HANDLE hThread); 挂起指定的线程 hThread: 要挂起的线程的句柄. DWORD ResumeThread(HANDLE hThread); 递减一个线程的suspend count. 当suspend count减到0,恢复线程的执行 hThread: 要被恢复的线程的句柄
使用Windows thread API 管理线程 等待线程 等某一个object (thread) DWORD WaitForSingleObject( HANDLE hHandle, DWORD dwMilliseconds ); 当指定的object 是可用状态(比如线程退出)或者时间间隔已经过去,该函数返回,否则等待 hHandle: object的句柄. dwMilliseconds: 毫秒级的时间间隔 Thread handles left lying around take up memory space. Continuously creating threads without cleaning up the handles when done, could lead to a memory leak. Another possibility would be that the maximum number of threads that can be supported within a process will be reached and no more new threads will be able to be created. Clean up after your threads!! Examples within this talk don’t use this since they are so small and the exit of the process will do the resource reclamation automatically.
使用Windows thread API 管理线程 等待线程 等多个objects (线程) DWORD WaitForMultipleObjects( DWORD nCount, const HANDLE* lpHandles, BOOL bWaitAll, DWORD dwMilliseconds ); 当指定的多个object中任一个进入可用状态(线程退出)或者时间间隔已过,函数返回,否者等待 nCount: 数组指针lpHandles中的object句柄个数. lpHandles: 指向一个object句柄的数组的指针. bWaitAll: 如果该参数是TRUE, 则只有lpHandles数组中所有object都可用了函数才返回,如果是FALSE,则只要任意一个object状态是可用,就返回。 dwMilliseconds: 毫秒级的时间间隔.
使用Windows thread API 管理线程 退出一个线程 ExitThread(DWORD dwExitCode) 结束一个线程. dwExitCode: 给调用线程的退出码 TerminateThread(HANDLE hThread, DWORD dwExitCode) 终止一个线程. hThread: 要终止的线程的句柄. dwExitCode:给调用线程的退出码 ExitThread always kill the thread it called. TerminateThread can kill all kind of Thread. BOOL GetExitCodeThread( HANDLE hThread, LPDWORD lpExitCode ); 获取指定线程的终止状态. hThread: 线程的句柄 lpExitCode: 接收线程终止状态的变量
使用Windows thread API 管理线程 杀死线程 释放操作系统资源 在程序完成之前清理线程 进程退出的时候做这件事 CloseHandle(HANDLE hObject); 关闭一个打开的object句柄 hObject: Handle to an open object.
使用Windows thread API 下面这个程序首先创建两个线程,当输入为1时,执行线程,否则挂起线程 #include <windows.h> #include <iostream> using namespace std; DWORD WINAPI FunOne(LPVOID param){ while(true) { Sleep(1000); cout<<"hello! "; } return 0; DWORD WINAPI FunTwo(LPVOID param){ cout<<"world! ";
使用Windows thread API int main(int argc, char* argv[]) { int input=0; HANDLE hand1=CreateThread (NULL, 0, FunOne, (void*)&input, CREATE_SUSPENDED, NULL); HANDLE hand2=CreateThread (NULL, 0, FunTwo, (void*)&input, CREATE_SUSPENDED, NULL); while(true){ cin>>input; if(input==1) ResumeThread(hand1); ResumeThread(hand2); } else SuspendThread(hand1); SuspendThread(hand2); }; TerminateThread(hand1,1); TerminateThread(hand2,1); return 0;
使用Windows thread API 案例: 创建线程 会发生什么情况? #include <stdio.h> #include <windows.h> DWORD WINAPI helloFunc(LPVOID arg) { printf(“Hello Thread”); return 0; } Main() { HANDLE hThread = CreateThread(NULL, 0, helloFunc, NULL, 0, NULL); The independent task in this example is to print the phrase Hello Thread encapsulated into the helloFunc() function. The DWORD WINAPI declaration specifies that this function can be mapped to a thread when you call the CreateThread() function. When the program starts execution, the main (process) thread begins execution of the main() function. The main thread creates a child thread that will execute the helloFunc() function. After creation of the helloFunc thread, the main thread encounters the program termination, which will destroy all the child threads. If the created thread does not have enough time to print Hello Thread before the main thread exits, nothing will be printed from execution of this application. The main thread holds the process information. When the process terminates, all the threads terminate. Therefore, to ensure that the desired printing is accomplished, the main thread must wait for the created thread to finish before it exits. Therefore, to avoid writing applications that spawn threads and end before any useful work begins; you need a mechanism to wait for threads to finish their processing. Question for Discussion: What are the possible outcomes for the above code? Answer: Two possible outcomes: Message Hello Thread is printed on screen. Nothing printed on screen. This outcome is more likely that previous. Main thread is the process and when the process ends, all threads are cancelled, too. Thus, if the CreateThread() call returns before the operating system has had the time to set up the thread and begin execution, the thread will die a premature death when the process ends. 会发生什么情况?
使用Windows thread API 案例解释 主线程是进程 当进程结束,所有线程结束 需要有办法能等待线程结束
使用Windows thread API 等待Windows*线程 不是个好方法! #include <stdio.h> #include <windows.h> BOOL threadDone = FALSE; DWORD WINAPI helloFunc(LPVOID arg) { printf(“Hello Thread”); threadDone = TRUE; return 0; } Main() { HANDLE hThread = CreateThread(NULL, 0, helloFunc, NULL, 0, NULL); While (!threadDone); // wasted cycles Discuss the execution of the code. Waiting for a thread by using a while statement is not a good idea because you use a lot of processor resources in the entire process. Consider the method of making the master thread wait and prevent it from exiting the process before the spawned thread executes. While the message “Hello Thread” will eventually be printed, the main thread is in a spin-wait until the value of threadDone is changed by the CreatedThread() function. At this point, the thread has completed all the required computation and you have not lost any work if the ending process kills the thread rather than by natural causes of the RETURN. However, if running on a single processor system or with Hyper-Threading Technology, the main thread may be spinning its wheels, soaking up thousands or millions of CPU cycles before the operating system swaps it out and allows the created thread to execute. Therefore, this is not a good idea. Waiting for a thread this way is not recommended because of the possibility to waste resources of the processor. When waiting for another thread to terminate, consider the following two cases: Wait until the thread terminates. Wait a predefined amount of time for thread termination. The time for which the waiting thread must wait for a thread to terminate depends on the purpose of waiting and the condition whether there is other computation that can be done instead of being blocked waiting. The next section discusses the API functions that Windows has to wait on thread termination and other kernel objects. Questions for Discussion: What are the potential race conditions in this code? Answer: The race condition in this case is that the main thread can easily terminate before the CreateThread() can actually has chance to do too many things. In addition, another potential race condition is that I/O in general is not thread safe. How can you modify the main function as a method of waiting for thread to finish? Answers: In the above example, you can try including getch() in the main function as a method of waiting for thread to finish. In other words, to avoid writing applications that spawn threads and then end before any useful work has the chance to begin, you need some mechanism to wait for threads to finish their processing. 不是个好方法!
使用Windows thread API 等待线程 等待单个object (线程) 调用线程等待(阻塞)直到: 不用CPU时钟 时间到 返回码通常会指出这一点 线程退出(句柄被释放) 用INFINITE来等待直到线程结束 不用CPU时钟 DWORD WaitForSingleObject( HANDLE hHandle, DWORD dwMilliseconds );
使用Windows thread API 等待多个线程 等待多达64个objects (线程) 等待所有: bWaitAll =TRUE 等待部分: bWaitAll = FALSE 返回的值是找到的第一个数组索引 DWORD WaitForMultipleObjects( DWORD nCount, const HANDLE* lpHandles, // array BOOL bWaitAll, // wait for one or all DWORD dwMilliseconds );
使用Windows thread API WaitFor*函数注意事项 句柄作为参数 适用于不同类型的对象(objects) Signaled Non-signaled 行为被句柄指明的object所定义 线程: signaled意味着终止
使用Windows thread API 案例: 多线程 #include <stdio.h> #include <windows.h> const int numThread = 4; DWORD WINAPI helloFunc(LPVOID arg) { printf(“Hello Thread”); return 0; } Main() { HANDLE hThread[numThread]; for (int i = 0; i < numThread; i++) hThread[i] = CreateThread(NULL, 0, helloFunc, NULL, 0, NULL); WaitForMultipleObjects(numThreads, hThread, TRUE, INFINITE);
使用Windows thread API 怎么打印线程号? ? =>
使用Windows thread API 这样对吗?为什么? #include <stdio.h> #include <windows.h> const int numThread = 4; DWORD WINAPI helloFunc(LPVOID pArg) { int* p = (int *)pArg; int mynum = *p; printf("Hello Thread %d\n",mynum); return 0; } main() { HANDLE hThread[numThreads]; for (int i = 0; i < numThreads; i++) hThread[i] = CreateThread(NULL, 0, helloFunc, &i, 0, NULL ); Details Problem is passing *address* of “i”; value of “i” is changing and will likely be different when thread is allowed to run than when CreateThread was called. Transition Quote “Let’s see how this plays out when threads are running in parallel.”
使用Windows thread API Hello线程的时间轴 Time main Thread 0 Thread 1 T0 i = 0 --- ---- T1 create(&i) T2 i++ (i == 1) launch T3 p = pArg T4 i++ (i == 2) myNum = *p myNum = 2 T5 print(2) T6 i++ (i == 3) exit myNum = 3 Run through the time steps of the table. Again this is just an example to emphasize timing issues. Notice that both threads (0, 1) get the same value for “num” because they both de-reference the same address for “myNum”. It very possible in this scenario that thread0 gets a value of myNum = 1, but thread1 will get a value of num = 2. But this is still incorrect; correct would be for the threads to get values of 0 and 1, respectively. Mention that this is called a “data race” when more than one thread accesses the same variable.
使用Windows Threads编程 内容 Windows threads库 使用Windows thread API 同步
同步 线程化的问题 线程化问题 死锁 活锁 粒度 负载均衡 数据竞争 多线程应用的线程化问题
同步 竞争条件 多个线程对相同变量的并发访问 并发程序中最常见的错误 并不是所有时候都表现出来 读/写冲突 写/写冲突 并发程序中最常见的错误 并不是所有时候都表现出来 可以用Intel® Thread Checker来检测执行时候的数据竞争 SMT-Simultaneous Multi-Threading 同时多线程技术 CMP-Chip Multi-Threading 芯片多线程技术
同步 如何避免数据竞争 把变量的作用域限制在线程区域内 用临界区控制共享访问 变量在线程函数内声明 在线程栈中分配 TLS (Thread Local Storage,线程局部存储) 用临界区控制共享访问 互斥和同步 锁、信号量、事件、临界段、互斥量…
同步 解决办法 – “局部” 存储 DWORD WINAPI helloFunc(LPVOID pArg) { int mynum = *((int *)pArg); printf("Hello Thread %d\n",mynum); return 0; } // from main int tNum[numThreads]; for (int i = 0; i < numThreads; i++){ tNum[i] = i; hThread[i] = CreateThread(NULL, 0, helloFunc, &tNum[i], 0, NULL ); WaitForMultipleObjects(numThreads, hThread, TRUE, INFINITE);
同步 Win32* 的互斥量(Mutexes) Win32互斥量是由句柄指向的核心对象(Kernel object) 当其可用的时候标记为“Signaled” 除线程外,还在进程之间共享 操作: CreateMutex(…) // 产生一个新的 WaitForSingleObject(…) // 等待并上锁 WaitForMultipleObjects(…) //等待多个 ReleaseMutex(…) // 开锁 There is a Win32 MUTEX kernel object. CreateMutex returns HANDLE of mutex. Use WaitForSingleObject to Lock mutex. If mutex is not locked, mutex handle will be in signaled state. When WaitFor function returns, mutex handle is in non-signaled state and thread is considered to hold the mutex. Unlock the mutext with ReleaseMutex. If needed, a mutex can be established that is shared between multiple processes.
同步 Win32* 的互斥量(Mutexes) HANDLE WINAPI CreateMutex( LPSECURITY_ATTRIBUTES lpMutexAttributes, BOOL bInitialOwner, LPCSTR lpName); // text name for object 创建或打开一个命名或无名的互斥量mutex lpMutexAttributes: 指向决定返回的句柄是否能够被子女继承的 SECURITY_ATTRIBUTES 结构的指针. bInitialOwner: 如果该值为TRUE 并且调用者创建了互斥量mutex, 调用的线程将获得互斥量mutex对象的初始所有权。 lpName: 到NULL结束字符串的指针,该字符串为mutex对象命名。 BOOL ReleaseMutex( HANDLE hMutex ); 释放指定互斥量mutex对象的所有权。 hMutex: mutex对象的句柄.
案例:mutex DWORD WINAPI ThreadFunc1(PVOID pvParam ) { HANDLE *phMutex = (HANDLE *) pvParam; for(int i=1;i<=100;i++) { WaitForSingleObject( *phMutex, INFINITE); cout<<“Thread1 output;”<<i<<endl; ReleaseMutex(*phMutex); } return 0; main() { HANDLE hMutex=CreateMutex(NULL,FALSE,”DisplayMutex”); HANDLE ThreadHandl1=CraeteThread(NULL,0,ThreadFun1,&hMutex,0,NULL); HANDLE ThreadHandl2=CraeteThread(NULL,0,ThreadFun2,&hMutex,0,NULL); HANDLE ThreadHandle[2] ={ThreadHandl1,ThreadHandl2} WaitForMultipleObjects(NUMTHREADS, hThread, TRUE, INFINITE); CloseHandle(hMutex); return 0; DWORD WINAPI ThreadFunc2(…) { HANDLE *phMutex = (HANDLE *) pvParam; … } DWORD WINAPI ThreadFunc2(PVOID pvParam ) { HANDLE *phMutex = (HANDLE *) pvParam; for(int i=1;i<=100;i++) { WaitForSingleObject( *phMutex, INFINITE); cout<<“Thread2 output;”<<i<<endl; ReleaseMutex(*phMutex); } return 0;
同步 Win32* 临界区(Critical Section) Win32临界区是轻量级的,进程内只互斥 Win32临界区是最有用和最常用的 新类型 创建和撤销操作 CRITICAL_SECTION cs ; void InitializeCriticalSection( LPCRITICAL_SECTION lpCriticalSection ); Better mutex mechanism. New data type used to declare objects. Critical Sections must be initialized before first use. “SpinCount” has been tuned for use on HT systems. (There should be more details on this later.) 初始化一个critical section对象. lpCriticalSection: 指向critical section对象的指针. void DeleteCriticalSection( LPCRITICAL_SECTION lpCriticalSection ); 释放所有的被无主critical section对象使用的资源. lpCriticalSection:指向critical section对象的指针.
同步 Win32* 临界区 进入临界区 退出临界区 当有另一个线程在临界区中时阻塞 当没有线程在临界区的时候返回 必须从获得临界区使用权的线程中退出临界区 退出临界区 void EnterCriticalSection( LPCRITICAL_SECTION lpCriticalSection ); void LeaveCriticalSection( LPCRITICAL_SECTION lpCriticalSection ); Protect critical sections (access of shared,modifiable data) of code with Enter* and LeaveCriiticalSection
同时 案例: Critical Section #define NUMTHREADS 4 CRITICAL_SECTION g_cs; // why does this have to be global? int g_sum = 0; DWORD WINAPI threadFunc(LPVOID arg ) { int mySum = bigComputation(); EnterCriticalSection(&g_cs); g_sum += mySum; // threads access one at a time LeaveCriticalSection(&g_cs); return 0; } main() { HANDLE hThread[NUMTHREADS]; InitializeCriticalSection(&g_cs); for (int i = 0; i < NUMTHREADS; i++) hThread[i] = CreateThread(NULL,0,threadFunc,NULL,0,NULL); WaitForMultipleObjects(NUMTHREADS, hThread, TRUE, INFINITE); DeleteCriticalSection(&g_cs); Question for Discussion: Can you imagine why not just put bigComputation() declared into critical section? Answer: bigComputation() is not declared inside the critical section because the thread will exclude all the other threads from running their own, independent calls to bigComputation(). This will make the code serial.
同步 数值积分案例 4.0 4.0 dx = f(x) = (1+x2) (1+x2) 1 4.0 (1+x2) f(x) = 数值积分案例 static long num_steps=100000; double step, pi; void main() { int i; double x, sum = 0.0; step = 1.0/(double) num_steps; //delta x for (i=0; i< num_steps; i++){ x = (i+0.5)*step; //每个矩形中间点 sum = sum + 4.0/(1.0 + x*x); // 计算矩形中间点的Y值的和 } pi = step * sum; //所有矩形的面积,近似曲线内图形的面积 printf(“Pi = %f\n”,pi); 4.0 2.0 Questions for Discussion: Where is the bulk of the work found in the code? Where should we focus attention if we were to thread this code? Answers: The for-loop. Now ask the participants to thread this serial code. This is a serial version of the source code. It uses a “sum” variable that could give a clue to an efficient solution, that is, local partial sum variable that is updated each loop iteration. This code is small and efficient in serial, but will challenge the students to come up with an efficient solution. Of course, efficiency is not one of the goals with such a short module. Getting the answer correct with multiple threads is enough of a goal for this. static long num_steps=100000; double step, pi; void main() { int i; double x, sum = 0.0; step = 1.0/(double) num_steps; delta x for (i=0; i< num_steps; i++){ x = (i+0.5)*step; 分别是0.5delta x, 1.5delta x …… 既每个矩形的中间点 sum = sum + 4.0/(1.0 + x*x); 4.0/(1.0 + x*x);是每个矩形中间点的Y值, sum是算所有Y值的和 } pi = step * sum; 所有y值的和乘以delta x就是所有矩形的面积,近似曲线内图形的面积 printf(“Pi = %f\n”,pi); 0.0 X 1.0
同步 案例: 计算 用Win32线程来并行化数值积分代码 如何在线程间划分循环迭代? 什么变量能够局部化? static long num_steps=1000000; double step, pi; void main() { int i; double x, sum = 0.0; step = 1.0/(double) num_steps; for (i=0; i< num_steps; i++){ x = (i+0.5)*step; sum = sum + 4.0/(1.0 + x*x); } pi = step * sum; printf("Pi = %12.9f\n",pi); 用Win32线程来并行化数值积分代码 如何在线程间划分循环迭代? 什么变量能够局部化? 什么变量必须对所有线程都是可见的? Details This is a serial version of the source code. It usse a “sum” variable that could give a clue to an efficient solution (i.e., local partial sum variable that is updated each loop iteration). This code is small and efficient in serial, but will challenge the students to come up with an efficient solution. Of course, efficiency is not one of the goals with such a short module. Getting the answer correct with multiple threads is enough of a goal for this.
同步 案例: 计算 const int gNumSteps = 100000; const int gNumThreads = 4; double gStep = 0.0; double gPi = 0.0; CRITICAL_SECTION gCS; Details This is a serial version of the source code. It usse a “sum” variable that could give a clue to an efficient solution (i.e., local partial sum variable that is updated each loop iteration). This code is small and efficient in serial, but will challenge the students to come up with an efficient solution. Of course, efficiency is not one of the goals with such a short module. Getting the answer correct with multiple threads is enough of a goal for this. 40 40
同步 案例: 计算 main() { HANDLE threadHandles[gNumThreads]; int tNum[gNumThreads]; InitializeCriticalSection(&gCS); gStep = 1.0 / gNumSteps; for ( int i=0; i<gNumThreads; ++i ) tNum[i] = i; threadHandles[i] = CreateThread( NULL, 0, threadFunction, (LPVOID)&tNum[i], 0, NULL); } WaitForMultipleObjects(gNumThreads, threadHandles, TRUE, INFINITE); DeleteCriticalSection(&gCS); printf("%12.9f\n", gPi ); } 案例: 计算 Details This is a serial version of the source code. It usse a “sum” variable that could give a clue to an efficient solution (i.e., local partial sum variable that is updated each loop iteration). This code is small and efficient in serial, but will challenge the students to come up with an efficient solution. Of course, efficiency is not one of the goals with such a short module. Getting the answer correct with multiple threads is enough of a goal for this. 41 41
同步 案例: 计算 DWORD WINAPI threadFunction(LPVOID pArg) { int myNum = *((int *)pArg); double partialSum = 0.0, x; // local to each thread for ( int i=myNum; i<gNumSteps; i+=gNumThreads ) { x = (i + 0.5f) / gNumSteps; partialSum += 4.0f / (1.0f + x*x); } EnterCriticalSection(&gCS); gPi += partialSum * gStep; LeaveCriticalSection(&gCS); return 0; 同步 案例: 计算 Details This is a serial version of the source code. It usse a “sum” variable that could give a clue to an efficient solution (i.e., local partial sum variable that is updated each loop iteration). This code is small and efficient in serial, but will challenge the students to come up with an efficient solution. Of course, efficiency is not one of the goals with such a short module. Getting the answer correct with multiple threads is enough of a goal for this. 42 42
同步 Win32* 信号量(Semaphores) 信号量是一个同步对象: 信号量上的两个操作 用来允许多个线程进入临界区. 用来记录可用资源的计数值(count). 在1968年被Edsger Dijkstra形式化. 信号量上的两个操作 Wait [P(s)]: 线程等待直到s > 0, 然后 s = s-1 Post [V(s)]: s = s + 1 当count > 0,信号量进入signaled状态 From Wikipedia (“Semaphore (programming),” revised 07 FEB 06): P and V are initialisms of Dutch words. The explanation for V is simple, it means "verhoog," or increase. Several explanations have been given for P in Dutch ("passeer" for pass, "probeer" for try, "pakken" for grab), but all these are incorrect. In fact Dijkstra intended P to stand for a made-up word "prolaag,"[1] short for "probeer te verlagen," or try-and-decrease.[2][3] This confusion stems from the unfortunate characteristic of the Dutch language that the words for increase and decrease both begin with the letter V, and the words spelled out in full would be impossibly confusing for non-Dutch speakers.
同步 创建Win32*信号量 IMaximumCount的值必须大于0 lInitialCount的值必须是: 大于等于0 不能超出值域范围 HANDLE CreateSemaphore( LPSECURITY_ATTRIBUTES lpSemaphoreAttributes, LONG lInitialCount, // 初始化信号量对象的计数值 count LONG lMaximumCount, // count的最大值 LPCTSTR lpName ); // 对象的文本名称
同步 Wait和Post操作 用WaitForSingleObject 来等待信号量 增加信号量(Post操作) 如果 count = 0, 线程等待 如果count > 0,则将count-1并返回 增加信号量(Post操作) 将信号量count + cReleaseCount 通过lpPreviousCount返回count的上一个值 BOOL ReleaseSemaphore( HANDLE hSemaphore, LONG cReleaseCount, LPLONG lpPreviousCount ); WaitForSingleObject will block the calling thread if the semaphore count is equal to zero. Once another thread call ReleaseSemaphore to increment the count above zero, the WaitForSingleObject will decrement the count by 1 and return, allowing the calling thread to proceed. If more than one thread is waiting on the same semaphore, the number of thread that can resume execution will be the lesser of the number of threads waiting and the value of the semaphore count. IF the count is already greater than zero when called by a thread, the count is decremented and the thread does not block. ReleaseSemaphore is able to increment the count by a value other than 1. If the count of the semaphore would be greater than the initialized maximum, the call will fail and return FALSE; no adjustment to the count will be made. Since other threads may affect the count of the semaphore, use the lpPreviousCount return value with caution.
同步 信号量的使用 控制规模有限的数据结构的访问 控制有限资源的访问 在一定范围内限制活动线程的数目 队列、栈、双端队列 用count来计算可用的元素 控制有限资源的访问 文件描述符、磁带驱动器等… 在一定范围内限制活动线程的数目 二值信号量[0,1]能作为互斥量mutex Implementing a queue with an array will limit the number of elements that can reside in the queue at any time. Initialize the semaphore with the maximum queue length. Use WAIT before placing something new on the queue. If the queue is full, this will block the thread attempting to place an item in the queue. Use POST when removing something off the queue; which will decrement the count and allow a waiting thread to proceed with placing an item in the queue. WHAT ABOUT EMPTY QUEUE? May use a semaphore to also keep track of the number of items in a queue. Thus, to enter something on queue, a thread must WAIT on full semaphore, deposit item, POST to empty queue semaphore. To remove an item, a thread must WAIT on empty queue semaphore, remove item, POST to full semaphore. IF performance will be affected by too many threads being active (perhaps on HT system?), a semaphore can be used to limit the number of threads executing within a given region of code. Binary semaphore can replace mutex and CRITICAL_SECTION, but there are potential problems that are pointed out on next slide.
同步 使用信号量注意: 信号量没有所有权 任何线程都可以释放信号量,而不仅是等待中最后的线程。 没有无约束的信号量的概念 用好的编程习惯来避免这种情况 没有无约束的信号量的概念 如果线程在post之前终止,信号量的增加运算可能会丢失。 死锁 分布式共享结构 Details Like a lock object, semaphores should be programmed with matching Wait and Post operations, by the same thread. This is good programming practice. However, this practice can lead to problems if a thread terminates before the Post operation, since the count will be off. Deadlock or lack of performance may result. Background If a thread terminates while holding a Mutex, the Mutex can be locked by another thread. The return code from the WaitFor* call will indicate that the Mutex was abandoned. Even though the Semaphore is also a kernel object, there is no facility to recognize an abandoned Semaphore.
同步 案例1: 信号量当互斥量 主线程打开输入文件,等线程终止。 线程将 从输入文件读一行 计算在一行中所有的5个字符的单词
同步 案例1: 主程序 HANDLE hSem1, hSem2; FILE *fd; int fiveLetterCount = 0; main() { HANDLE hThread[NUMTHREAD]; hSem1 = CreateSemaphore(NULL,1,1,NULL); // Binary semphore hSem2 = CreateSemaphore(NULL,1,1,NULL); // Binary semphore fd = fopen(“InFile”, “r”); // open file for read for (int i = 0; i < NUMTHREAD; i++) hThread[i] = CreateThread(NULL, 0,CountFives,NULL,0,NULL); WaitForMultipleObjects(NUMTHREAD, hThread, TRUE, INFINITE); fclose(fd); printf(“Numbers of five letter words is %d\n”, fiveLetterCount); }
同步 案例1: 信号量 DWORD WINAPI CountFives(LPVOID arg) { BOOL bDone = FALSE; char inLine[132]; int lCount = 0; while (!bDone) { WaitForSingleObject(hSem1, INFINITE); // access to input bDone = (GetNextLine(fd, inLine) == EOF); ReleaseSemaphore(hSem1, 1, NULL); if (!bDone) if (lCount = GetFiveLetterWordCount(inLine)) { WaitForSingleObject(hSem2, INFINITE); // update global fiveLetterCount += lCount; ReleaseSemaphore(hSem2, 1, NULL); }
同步 案例2: 信号量当互斥量 主线程打开输入文件,等线程终止。 线程将 从输入文件读一行 计算在一行中所有的单词个数,奇数字母单词个数,偶数字母单词个数
同步 案例2: 变量定义与读一行 FILE *fd; int TotalEvenWords = 0; Int TotalOddWords = 0, TotalWords = 0; const int NUMTHREADS = 4; HANDLE hSem1, hSem2; int GetNextLine(FILE *f, char *Line) { if (fgets(Line, 132, f)==NULL) if (feof(f))return EOF; else return 1; }
同步 案例2: 主程序 main() { HANDLE hThread[NUMTHREADS]; hSem1 = CreateSemaphore(NULL, 1, 1, NULL); // Binary semaphore hSem2 = CreateSemaphore(NULL, 1, 1, NULL); // Binary semaphore fd = fopen("InFile1.txt", "r"); // Open file for read for (int i = 0; i < NUMTHREADS; i++) hThread[i] = CreateThread(NULL,0,CountWords,NULL,0,NULL); WaitForMultipleObjects(NUMTHREADS, hThread, TRUE, INFINITE); fclose(fd); printf("Total Words = %8d\n\n", TotalWords); printf("Total Even Words = %7d\nTotal Odd Words = %7d\n", TotalEvenWords, TotalOddWords); } 53 53
同步 案例2: 信号量 DWORD WINAPI CountWords(LPVOID arg) { BOOL bDone = FALSE ; char inLine[132]; int lCount = 0; while (!bDone) { WaitForSingleObject(hSem1, INFINITE); // access to input bDone = (GetNextLine(fd, inLine) == EOF); ReleaseSemaphore(hSem1, 1, NULL); if (!bDone){ lCount = GetWordAndLetterCount(inLine) ; WaitForSingleObject(hSem2, INFINITE); // update global TotalOddWords += (lCount % 100); lCount /= 100; TotalEvenWords += (lCount % 100); lCount /= 100; TotalWords += lCount; ReleaseSemaphore(hSem2, 1, NULL); } } return 0;
同步 int GetWordAndLetterCount(char *Line) { int Word_Count = 0, Letter_Count = 0; int lTotalEvenWords = 0, lTotalOddWords = 0; for (int i=0;i<132;i++) { if ((Line[i]!=' ')&&(Line[i]!=0)&&(Line[i]!='\n')) Letter_Count++; else { if (Letter_Count % 2) { lTotalOddWords++; Word_Count++; Letter_Count = 0; } lTotalEvenWords++; Word_Count++; Letter_Count = 0; if (Line[i]==0) break; return (Word_Count*10000 + lTotalEvenWords*100 + lTotalOddWords); // encode three return values 55
同步 Win32*事件(Events) 用来发信号给其他线程说明有些事件发生: 线程用WaitFor*函数等待信号 两类事件 数据可用,消息准备好,计算结束等 线程用WaitFor*函数等待信号 两类事件 自动重置 这类事件得到通知时,等待该事件的所有线程均变为可调度线程。 人工重置 这类事件得到通知时,等待该事件的线程中只有一个线程变为可调度线程。 An event can be used to notify one or more threads about an event. Threads can wait for a single event using WaitForSingleObject or what for many events (or one of many events) with WaitForMultipleObjects. Types of Events are described in next slide
同步 如果bManualReset被设置为: 如果bInitialState被设置为: TRUE: 人工重设事件 FALSE: 自动重设事件 HANDLE CreateEvent( LPSECURITY_ATTRIBUTES lpEventAttributes, BOOL bManualReset BOOL bInitialState, LPCTSTR lpName ); 如果bManualReset被设置为: TRUE: 人工重设事件 FALSE: 自动重设事件 如果bInitialState被设置为: TRUE: 事件开始于signaled状态 FALSE: 事件开始于unsignaled状态
同步 事件的设置与复位 设置一个事件到signaled状态 复位人工状态事件 脉冲事件 BOOL SetEvent( HANDLE hEvent ); BOOL ResetEvent( HANDLE hEvent ); SetEvent为设置事件对象为有信号状态;而PulseEvent也是将指定的事件设为有信号状态,不同的是如果是一个人工重设事件,正在等候事件的、被挂起的所有线程都会进入活动状态,函数随后将事件设回,并返回;如果是一个自动重设事件,则正在等候事件的、被挂起的单个线程会进入活动状态,事件随后设回无信号,并且函数返回。 也就是说在自动重置模式下PulseEvent和SetEvent的作用没有什么区别,但在手动模式下PulseEvent就有明显的不同,可以比较容易的控制程序是单步走,还是连续走。如果让循环按要求执行一次就用PulseEvent,如果想让循环连续不停的运转就用SetEvent,在要求停止的地方发个ResetEvent就OK了。 BOOL PulseEvent( HANDLE hEvent );
同步 使用事件 考虑下面一个线程代码的例子:使用事件查找一个线程 HANDLE hObj [2]; // 0 is event, 1 is thread DWORD WINAPI threadFunc(LPVOID arg) { BOOL bFound = bigFind(); if (bFound) SetEvent(hObj[0]); // signal data was found bigFound(); } moreBigStuff(); return 0; In this example, the main thread will create a thread to search for some item. If the thread finds the item, it sends a signal to the main thread. The main thread will perform some other computation and then check on the progress of the searching thread. The main thread will print a message out if the item is found and will print another message upon termination of the searching thread. When the function is executed, it performs a search by using the bigFind() function. If the search is successful, the event is signaled and the found item is processed. If the item is not found, the event is not signaled and a different type of processing is done before the thread terminates.
同步 主函数 考虑主程序的前一半: hObj[0] = CreateEvent(NULL, FALSE, FALSE, NULL); . . . hObj[0] = CreateEvent(NULL, FALSE, FALSE, NULL); hObj[1] = CreateThread(NULL,0,threadFunc,NULL,0,NULL); /* Do some other work while thread executes search */ DWORD waitRet = WaitForMultipleObjects(2, hObj, FALSE, INFINITE); switch(waitRet) { case WAIT_OBJECT_0: // event signaled printf ("found it!\n"); WaitForSingleObject(hObj[1], INFINITE) ; // fall through case WAIT_OBJECT_0+1: // thread signaled printf ("thread done\n"); break ; default: printf (“wait error: ret %u\n", waitRet); } examine a portion of the main() routine. Notice that the event is created and the thread is launched. Each HANDLE is assigned into separate elements of the hObj array. After the searching thread is started, the main thread might continue with some other execution. Eventually the main thread needs to determine whether the searching thread has been successful in its search. Within the call to the WaitForMultipleObjects() function, the third parameter, the WaitForAllObjects function has the value FALSE. This will cause the main thread to wait for any of the two HANDLES to become signaled. The lowest indexed HANDLE found to be signaled will trigger the waiting thread to be released. The return code will indicate which HANDLE was found in the signaled state.
同步 主函数(继续) 考虑主程序的后半部分: switch(waitRet) { . . . hObj[0] = CreateEvent(NULL, FALSE, FALSE, NULL); hObj[1] = CreateThread(NULL,0,threadFunc,NULL,0,NULL); /* Do some other work while thread executes search */ DWORD waitRet = WaitForMultipleObjects(2, hObj, FALSE, INFINITE); switch(waitRet) { case WAIT_OBJECT_0: // event signaled printf ("found it!\n"); WaitForSingleObject(hObj[1], INFINITE) ; // fall through case WAIT_OBJECT_0+1: // thread signaled printf ("thread done\n"); break ; default: printf (“wait error: ret %u\n", waitRet); } The switch statement interprets the return code from the WaitForMultipleObjects call. If the item was found, WAIT_OBJECT_0 will be returned as the lowest indexed HANDLE that was signaled. After printing the message, the code waits on thread termination by using the WaitForSingleObject() function and falls through to the print for that event. If the item was not found, the thread termination will be the HANDLE that releases the main thread and only the thread termination message is printed. Questions for Discussion: What happens if the item was found and the search thread has terminated before the main thread calls the WaitForMultipleObjects() function? What changes will the code logic need if the order of the event and thread HANDLE were reversed in the hObj array?