消息传递并行编程环境 王彦棡 2010年5月
MPI 123456789123456789123456789…… 111111111 22222222 33333333 44444444 55555555 …… 串行 并行
MPI OPENMP MPI
MPI MPI是一种实现手段
进程 单个进程(process) 进程与程序相联,程序一旦在操作系统中运行即成为进程。进程拥有独立的执行环境(内存、寄存器、程序计数器等),是操作系统中独立存在的可执行的基本程序单位 串行应用程序编译形成的可执行代码,分为“指令”和“数据”两个部分,并在程序执行时“独立地申请和占有”内存空间,且所有计算均局限于该内存空间。 进程1 进程2 内存
进程 单机内多个进程 多个进程可同时存在于单机内同一操作系统。操作系统负责调度分时共享处理机资源(CPU、内存、存储、外设等) 进程间相互独立(内存空间不相交)。在操作系统调度下各自独立地运行,例如多个串行应用程序在同一台计算机运行 进程间可以相互交换信息。例如数据交换、同步等待,消息是这些交换信息的基本单位,消息传递是指这些信息在进程间的相互交换,是实现进程间通信的唯一方式
包含于通过网络联接的不同处理器的多个进程 进程独立存在,并位于不同的处理器,由各自独立的操作系统调度,享有独立的CPU和内存资源 进程间相互信息交换,可依靠消息传递 最基本的消息传递操作包括发送消息send、接受消息receive、进程同步barrier、归约reduction等
线程 将一个进程分解成两个部分 : 一部分由其资源特征构成,仍称之为进程 一部分由其执行特征构成,称之为线程 进程可由单个线程来执行 进程也可由多个线程来并行执行 多个线程将共享该进程的所有资源特征
线程 单进程多线程执行示意图
OpenMP OpenMP应用编程接口API是在共享存储体系结构上的一个编程模型 包含编译制导(Compiler Directive)、运行库例程(Runtime Library)和环境变量(Environment Variables) 支持增量并行化(Incremental Parallelization)
OpenMP 什么是OpenMP OpenMP不包含的性质 应用编程接口API(Application Programming Interface ) 由三个基本API部分(编译指令、运行部分和环境变量)构成 是C/C++ 和Fortan等的应用编程接口 已经被大多数计算机硬件和软件厂家所标准化 OpenMP不包含的性质 不是建立在分布式存储系统上的 不是在所有的环境下都是一样的 不是能保证让多数共享存储器均能有效的利用
基于线程的并行编程模型(Programming Model) OpenMP使用Fork-Join并行执行模型
OpenMP int main(int argc, char* argv[]) { #pragma omp parallel for for (int i = 0; i < 10; i++ ) { printf("i = %d\n", i); } return 0; }
什么是MPI (Message Passing Interface) 是函数库规范,而不是并行语言;操作如同库函数调用 是一种标准和规范,而非某个对它的具体实现(MPICH等),与编程语言无关 是一种消息传递编程模型,并成为这类编程模型的代表 What is the message? DATA+ENVELOPE MPI的目标 较高的通信性能 较好的程序可移植性 强大的功能
消息传递平台MPI MPI程序编译与运行 程序编译 C: %mpicc -o mpiprog mpisrc.c Fortran 77: %mpif77 -o mpiprog mpisrc.f 程序运行 %mpirun -np 4 mpiprog 程序执行过程中不能动态改变进程的个数 申请的进程数np与实际处理器个数无关
例子 头文件 相关变量声明 程序开始 程序体计算与通信 程序结束 #include "mpi.h" int main(int argc, char ** argv) { int myid, numprocs; int namelen; char processor_name[MPI_MAX_PROCESSOR_NAME]; MPI_Init(&argc,&argv); MPI_Comm_rank(MPI_COMM_WORLD,&myid); MPI_Comm_size(MPI_COMM_WORLD,&numprocs); MPI_Get_processor_name(processor_name,&namelen); fprintf(stderr,"Hello World! Process %d of %d on %s\n", myid, numprocs, processor_name); MPI_Finalize(); } 头文件 相关变量声明 程序开始 程序体计算与通信 程序结束
MPI基础知识 MPI重要概念 MPI函数一般形式 MPI原始数据类型 MPI程序基本结构 MPI几个基本函数 并行编程模式
MPI重要概念 进程组(process group) 通信器(communicator) 指MPI 程序的全部进程集合的一个有序子集且进程组中每个进程被赋于一个在该组中唯一的序号(rank), 用于在该组中标识该进程。序号的取值范围是[0,进程数- 1] 通信器(communicator) 理解为一类进程的集合即一个进程组,且在该进程组,进程间可以相互通信 任何MPI通信函数均必须在某个通信器内发生 MPI系统提供省缺的通信器MPI_COMM_WORLD 组内通信器和组间通信器
MPI重要概念 进程序号(rank) 消息(message) MPI 程序中的进程由进程组或通信器序号唯一确定, 序号相对于进程组或通信器而言(假设np个处理器,标号0…np-1) 同一个进程在不同的进程组或通信器中可以有不同的序号,进程的序号是在进程组或通信器被创建时赋予的 MPI 系统提供了一个特殊的进程序号MPI_PROC_NULL,它代表空进程(不存在的进程), 与MPI_PROC_NULL 间的通信实际上没有任何作用 消息(message) 分为数据(data)和包装(envelope)两个部分 包装由接收进程序号/发送进程序号、消息标号和通信器三部分组成;数据包含用户将要传递的内容
MPI函数一般形式 C: error = MPI_Xxxxx(parameter,...); MPI_Xxxxx(parameter,...); 整型错误码由函数值返回 除MPI_Wtime() 和MPI_Wtick()外, 所有MPI 的C 函数均返回一个整型错误码。成功时返回MPI_SUCCESS,其他错误代码依赖于执行 Fortran: CALL MPI_XXXXX(parameter,...,IERROR) 整型错误码由函数的参数返回 除MPI_WTIME() 和MPI_WTICK()外为子函数程序(function), Fortran77的所有MPI过程都是Fortran77的子例行程序(subroutine)
MPI原始数据类型 MPI_BYTE 一个字节 MPI_PACKED 打包数据
MPI原始数据类型
MPI几个基本函数 MPI_Init MPI_Initialized MPI_Comm_size MPI_Comm_rank MPI_Finalize MPI_Abort MPI_Get_processor_name MPI_Get_version MPI_Wtime
MPI几个基本函数 初始化 MPI 系统 C: int MPI_Init(int *argc, char *argv[]) Fortran 77: MPI_INIT(IERROR) INTEGER IERROR 通常为第一个调用的MPI函数,除 MPI_Initialized 外 在C接口中,MPI系统通过argc和argv得到命令行参数,并 且会把MPI系统专用的参数删除,留下用户的解释参数
获取通信器的进程数和进程在通信器中的标号 MPI几个基本函数 获取通信器的进程数和进程在通信器中的标号 C: int MPI_Comm_size(MPI_Comm comm, int *size) int MPI_Comm_rank(MPI_Comm comm, int *rank) Fortran 77: MPI_COMM_SIZE(COMM, SIZE, IERROR) MPI_COMM_RANK(COMM, RANK, IERROR) INTEGER COMM, SIZE, RANK, IERROR
MPI几个基本函数 退出 MPI 系统 C: int MPI_Finalize(void) Fortran 77: MPI_FINALIZE(IERROR) 每个进程都必须调用,使用后不准许调用任何MPI函数 若不执行MPI退出函数,进程可能被悬挂 用户在调用该函数前,应确保非阻塞通讯结束
MPI几个基本函数 异常终止MPI程序 C: int MPI_Abort(MPI_Comm comm, int errorcode) Fortran 77: MPI_ABORT(COMM, ERRORCODE, IERROR) INTEGER COMM, ERRORCODE, IERROR 在出现了致命错误而希望异常终止MPI程序时执行 MPI系统会设法终止comm通信器中所有进程 输入整型参数errorcode,将被作为进程的退出码返回给系统
MPI几个基本函数 获取处理器的名称 C: MPI_Get_processor_name(char *name, int *resultlen) Fortran 77: MPI_GET_PROCESSOR_NAME(NAME, RESULTLEN, IERR) 在返回的name中存储所在处理器的名称 resultlen存放返回名字所占字节 应提供参数name不少于MPI_MAX_PRCESSOR_NAME 个字节的存储空间
MPI几个基本函数 获取墙上时间 C: double MPI_Wtime(void) Fortran 77: DOUBLE PRECISION MPI_WTIME() 返回调用时刻的墙上时间,用浮点数表示秒数 经常用来计算程序运行时间
Sample :Hello World - C C+MPI #include “mpi.h” #include <stdio.h> #include <math.h> void main(int argc, char *argv[ ]) { int myid, numprocs, namelen; char processor_name[MPI_MAX_PROCESSOR_NAME]; MPI_Init(&argc,&argv); MPI_Comm_rank(MPI_COMM_WORLD,&myid); MPI_Comm_size(MPI_COMM_WORLD,&numprocs); MPI_Get_processor_name(processor_name,&namelen); printf("Hello World! Process %d of %d on %s\n",myid, numprocs, processor_name); MPI_Finalize(); }
Sample :Hello World - Fortran Fortran+MPI program main include 'mpif.h‘ character * (MPI_MAX_PROCESSOR_NAME)processor_name integer myid, numprocs, namelen, rc, ierr call MPI_INIT( ierr ) call MPI_COMM_RANK( MPI_COMM_WORLD, myid, ierr ) call MPI_COMM_SIZE( MPI_COMM_WORLD, numprocs, ierr ) call MPI_GET_PROCESSOR_NAME(processor_name, namelen, ierr) write(*,*) 'Hello World! Process ',myid,' of ',numprocs,' on ', processor_name call MPI_FINALIZE(ierr) end
Sample :Hello World 单处理器(tp5)运行4个进程 4个处理器(tp1,tp2,tp3,tp4)分别运行4个进程
Sample :Hello World
点对点通信
MPI系统的通信方式都建立在点对点通信之上 定义 1 2 3 4 5 destination source communicator 两个进程之间的通信 源进程发送消息到目标进程 目标进程接受消息 通信发生在同一个通信器内 进程通过其在通信器内的标号表示 MPI系统的通信方式都建立在点对点通信之上
阻塞式点对点通信 阻塞式消息发送 C: int MPI_Send(void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm) Fortran 77: MPI_SEND(BUF, COUNT, DATATYPE, DEST, TAG, COMM, IERROR) count 不是字节数,而是指定数据类型的个数 datatype可是原始数据类型,或为用户自定义类型 dest 取值范围是 0~np-1,或MPI_PROC_NULL (np是comm中的进程总数) tag 取值范围是 0~MPI_TAG_UB,用来区分消息
阻塞式点对点通信 阻塞式消息接收 C: int MPI_Recv(void *buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Status *status) Fortran 77: MPI_RECV(BUF, COUNT, DATATYPE, SOURCE, TAG, COMM, STATUS, IERROR) count是接受缓存区的大小,表示接受上界,具体接 受长度可用MPI_Get_count 获得 source 取值范围是 0~np-1,或MPI_PROC_NULL和 MPI_ANY_SOURCE tag 取值范围是 0~MPI_TAG_UB,或MPI_ANY_TAG
阻塞式点对点通信 消息(message)
阻塞式点对点通信 标准阻塞式通信 是否对发送数据进行缓存,由MPI系统决定,而非程序员 阻塞:发送成功,意味(1)消息成功发送;(2)或者消息被缓存 接收成功,意味消息已被成功接收
阻塞式点对点通信 消息传递成功 发送进程需指定一个有效的目标接收进程 接收进程需指定一个有效的源发送进程 接收和发送消息的进程要在同一个通信器内 接收和发送消息的 tag 要相同 接收缓存区要足够大
例子
其他点对点通信 捆绑发送和接收 捆绑发送和接收,收发使用同一缓存区 缓存消息发送函数 同步消息发送函数 就绪消息发送函数 MPI_Bsend MPI_SENDRECV(SENDBUFF,SENDCOUNT,SENDTYPE,DEST, SENDTAG, RECVBUFF,RECVCOUNT,RECVTYPE,SOURCE,RECVTAG, COMM, STATUS, IERR) 捆绑发送和接收,收发使用同一缓存区 MPI_SENDRECV_REPLACE(BUFF,COUNT,DATATYPE,DEST,SENDTAG, SOURCE,RECVTAG,COMM,STATUS,IERR) 缓存消息发送函数 同步消息发送函数 就绪消息发送函数 MPI_Bsend MPI_Ssend MPI_Rsend
非阻塞式点对点通信 阻塞式通信与非阻塞式通信 通信类型 函数返回 对数据区操作 特性 阻塞式通信 非阻塞式通信 阻塞型函数需要等待 指定操作完成返回 或所涉及操作的数据 要被MPI系统缓存安全 备份后返回 函数返回后,对数 据区操作是安全的 程序设计相对 简单 使用不当容易 造成死锁 非阻塞式通信 调用后立刻返回,实 际操作在MPI后台执行 需调用函数等待或查 询操作的完成情况 函数返回后,即操 作数据区不安全。 可能与后台正进行 的操作冲突 可以实现计算 与通信的重叠 程序设计相对 复杂
非阻塞式点对点通信
阻塞型与非阻塞型通信函数
聚合通信
定义 communicator 5 2 1 3 4 一个通信器的所有进程参与,所有进程都调用聚合通信函数 1 2 3 4 5 communicator 一个通信器的所有进程参与,所有进程都调用聚合通信函数 MPI系统保证聚合通信函数与点对点调用不会混淆 聚合通信不需要消息标号 聚合通信函数都为阻塞式函数 聚合通信的功能:通信、同步、计算等
三种通信方式 一对多 多对一 多对多
聚合函数列表 MPI_Barrier MPI_Bcast MPI_Gather/MPI_Gatherv MPI_Allgather/MPI_Allgatherv MPI_Scatter/MPI_Scatterv MPI_Alltoall/MPI_Alltoallv MPI_Reduce/MPI_Allreduce/MPI_Reduce_scatter MPI_Scan
该函数用于进程同步,即一个进程调用该函数后需等待通信器内所有进程调用该函数后返回
√ Sample - Fortran …… CALL MPI_COMM_RANK(COMM,RANK,IERR) IF(RANK.EQ.0) THEN CALL WORK0(……) ELSE CALL WORK1(……) CALL MPI_BARRIER(COMM,IERR) CALL WORK2(……) …… CALL MPI_COMM_RANK(COMM,R ANK,IERR) IF(RANK.EQ.0) THEN CALL WORK0(……) CALL MPI_BARRIER(COMM,IERR) ELSE CALL WORK1(……) CALL WORK2(……) √
广播
广播
广播 通信器中root进程将自己buffer内的数据发给通信器内所有进程 非root进程用自己的buffer接收数据
Sample - C #include<mpi.h> int main (int argc, char *argv[]) { int rank; double param; MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD,&rank); if(rank==5) param=23.0; MPI_Bcast(¶m,1,MPI_DOUBLE,5,MPI_COMM_WORLD); printf("P:%d after broadcast parameter is %f\n",rank,param); MPI_Finalize(); } Program Output P:0 after broadcast parameter is 23.000000 P:6 after broadcast parameter is 23.000000 P:5 after broadcast parameter is 23.000000 P:2 after broadcast parameter is 23.000000 P:3 after broadcast parameter is 23.000000 P:7 after broadcast parameter is 23.000000 P:1 after broadcast parameter is 23.000000 P:4 after broadcast parameter is 23.000000
收集&散发
收集 (MPI_Gather) ROOT
收集(MPI_Gather) 所有进程(包括根进程)将sendbuf的数据传输给根进程;根进程按着进程号顺序依次接收到recvbuf 发送与接收的数据类型相同;sendcount和recvcount相同 非根进程接收消息缓冲区被忽略,但需要提供
散发(MPI_Scatter)
散发(MPI_Scatter) 根进程有np个数据块,每块包含sendcount个类型为sendtype的数据;根进程将这些数据块按着进程号顺序依次散发到各个进程(包含根进程)的recvbuf 发送与接收的数据类型相同;sendcount和recvcount相同 非根进程发送消息缓冲区被忽略,但需要提供
Sample - C聚合通信 根进程向所有进程次序分发1个数组元素 #include <mpi.h> int main (int argc, char *argv[]) { int rank,size,i,j; double param[400],mine; int sndcnt,revcnt; MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD,&rank); MPI_Comm_size(MPI_COMM_WORLD,&size); revcnt=1; if(rank==3) for(i=0;i<size;i++) param[i]=23.0+i; sndcnt=1; } MPI_Scatter(param,sndcnt,MPI_DOUBLE,&mine,revcnt,MPI_DOUBLE,3,MPI_COMM_ WORLD); printf("P:%d mine is %f\n",rank,mine); MPI_Finalize(); 根进程向所有进程次序分发1个数组元素 Program Output P:0 mine is 23.000000 P:1 mine is 24.000000 P:2 mine is 25.000000 P:3 mine is 26.000000
归约
归约(MPI_Reduce) 各进程提供数据(sendbuf,count,datatype) 归约结果存放在root进程的缓冲区recvbuf
归约
归约
归约
Sample - C (1.000000,0) (2.000000,1)…(16.000000,15) 数对的归约操作 #include <mpi.h> /* Run with 16 processes */ int main (int argc, char *argv[]) { int rank, root=7; struct double value; int rank; } in, out; MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD,&rank); in.value=rank+1; in.rank=rank; MPI_Reduce(&in,&out,1,MPI_DOUBLE_INT,MPI_MAXLOC,root,MPI_COMM_WORLD); if(rank==root) printf("P :%d max=%lf at rank %d\n",rank,out.value,out.rank); MPI_Reduce(&in,&out,1,MPI_DOUBLE_INT,MPI_MINLOC,root,MPI_COMM_WORLD); if(rank==root) printf("P :%d min=%lf at rank %d\n",rank,out.value,out.rank); MPI_Finalize(); } 数对的归约操作 (1.000000,0) (2.000000,1)…(16.000000,15) Program Output P:7 max = 16.000000 at rank 15 P:7 min = 1.000000 at rank 0
Sample – Fortran C Run with 8 processes Program Output PROGRAM MaxMin C Run with 8 processes INCLUDE 'mpif.h' INTEGER err, rank, size integer in(2),out(2) CALL MPI_INIT(err) CALL MPI_COMM_RANK(MPI_WORLD_COMM,rank,err) CALL MPI_COMM_SIZE(MPI_WORLD_COMM,size,err) in(1)=rank+1 in(2)=rank call MPI_REDUCE(in,out,1,MPI_2INTEGER,MPI_MAXLOC, 7,MPI_COMM_WORLD,err) if(rank.eq.7) print *,"P:",rank," max=",out(1)," at rank ",out(2) call MPI_REDUCE(in,out,1,MPI_2INTEGER,MPI_MINLOC, 2,MPI_COMM_WORLD,err) if(rank.eq.2) print *,"P:",rank," min=",out(1)," at rank ",out(2) CALL MPI_FINALIZE(err) END Program Output P:2 min=1 at rank 0 P:7 max=8 at rank 7