并行计算 Parallel Computing 方匡南 厦门大学教授 博士生导师
R是一个统计计算语言,虽然功能非常强大,但是有一个比较严重的缺陷就是计算速度比较慢(这其实是所有高级语言面临的共同问题),尤其当要处理大数据或者需要计算很多循环的时候,R在计算速度上的缺陷就更为明显。
目录 Contents 一、提高R语言的计算速度 二、R语言的并行计算 三、HPC多线程并行计算
Improve computing speed of R 1 提高R语言计算速度 Improve computing speed of R
改进R语言运算速度的方法 最简单的方法就是尽量少使用循环,能用矩阵运输尽量用矩阵来运算(这也是我们在初级教程《R数据分析-方法与案例详解》一书中推荐的)。该方法的好处是相对简单,不需要学习其他语言,但不是所有程序都可以完全简化到矩阵运算,还会使程序的可读性降低。 用C或者Fortran编写核心的运算,然后再用R调用编译这些函数。这种方法是目前运算效率最高的方法,也是很多统计编程大牛常用的方法,比如Stanford统计系那几个大牛(Hastie、Tibshirani、Friedman等)编写的packages就是用的这种方法。但并不是所有人都懂C或者Fortran。而且前期投入成本较大。 利用compiler package提高运算速度。这是一种折中的方法,即将属于解释性语言的R代码提前编译为低级语言的字节代码(byte code)来提升R的运算速度,简单地讲就是将R的程式码编译后再执行,可提高执行的速度。该方法的好处是简单方便,可以很大程度上提升计算速度,
我们这里重点介绍一下compiler 包的运用。compiler包的作者是University of Iowa的Luke Tierney,下面是在R语言中使用compiler包的一个例子,操作环境要求R 2.13以上的版本。 > library ( compiler ) > f <- function ( x ) { + s <- 0 + for ( y in x ) s <- s + y + return ( s ) + } > fc <- cmpfun ( f ) > x <- 1 : 10000000 > system.time ( f ( x ) ) 用户系统流逝 4.757 0.018 4.773 > system.time ( fc ( x ) ) 1.287 0.001 1.288
Parallel Computing of R 2 R语言的并行计算 Parallel Computing of R
上一节介绍了几种常用的提高R计算速度的方法,它们在具体使用时都会存在一些 局限性。其实,还有一种强大的方法用来提高R的计算速度,那就是并行计算。 所谓并行计算,是指同时使用多种计算资源解决计算问题的过程,是提高计算机系 统计算速度和处理能力的一种有效手段。 它的基本思想是用多个处理器来协同求解同一问题,即将被求解的问题分解成若干 个部分,各部分均由一个独立的处理机来并行计算。 并行计算又分为数据并行和任务并行两种,R的并行计算主要通过它内置的任务并 行包来实现,包括parallel包和foreach包,接下来我们会分别介绍这两个包。
并行计算的基本思路 建立M个工作节点,对每个进程做基本的初始化 将所需数据传输到每个工作节点 将任务分割成M份传输到各个工作节点进行计算 重复上述过程进行进一步的计算 关闭工作节点
(一)parallel包 在进行并行计算前,首先需要了解计算机或者服务器的性能。 R提供了detectCores()来查看计算机的逻辑核心数,detectCores ( logical = FALSE ) 来查看计算机的物理核心数。逻辑核心一般会大于其物理核心, 是采用多线程方式对物理核心处理器的性能提升。 当然,该函数只是显示的计算机的可用核心,并不代表执行当前任务时可 用核心数。 parallel包的思路和lapply()函数类似,都是将输入数据分割成几份,然后计 算并把结果整合在一起,只不过,parallel并行计算是用不同的CPU来运算
Parallel包并行计算步骤 设置需要调用的工作节点,如调用两个节点进行并行计算: cl <- makeCluster ( getOption ( "cl.cores" , 2 ) 采用不同的函数进行并行计算 关闭开辟的节点:stopCluster ( cl ) 接下来我们使用4个例子对上述步骤2进行说明,在这之前先运行library ( parallel ) 加载parallel包。
例:16. 1 (1)clusterExport ( cl = NULL , varlist , envir = 例:16.1 (1)clusterExport ( cl = NULL , varlist , envir = .GlobalEnv ),将全局变量传输到各个工作节点上;clusterCall ( cl = NULL , fun , ... ),在各个工作节点上,执行函数fun > cl <- makeCluster ( getOption ( "cl.cores" , 2 ) ) > xx <- 1 : 2 > yy <- 3 : 4 > clusterExport ( cl , c ( "xx" , "yy" ) ) > clusterCall ( cl , function ( z ) xx + yy + sin(z) , pi ) > stopCluster ( cl ) [[1]] [1] 4 6 [[2]]
例:16.1 (2)clusterEvalQ ( cl = NULL , expr ),这个函数与clusterCall功能相似,区别在于clusterCall调用的是函数,而clusterEvalQ调用的是表达。 > clusterEvalQ ( cl , { + # set up each worker. Could also use clusterExport ( ) + library ( boot ) + cd4.rg <- function ( data , mle ) MASS :: mvrnorm ( nrow ( data ) , mle$m , mle$v ) + cd4.mle <- list ( m = colMeans ( cd4 ) , v = var ( cd4 ) ) + } )
例:16. 1 (3)clusterApply ( cl = NULL , x , fun , 例:16.1 (3)clusterApply ( cl = NULL , x , fun , ... ),这个函数表示将向量x的每一个值带入函数fun 进行计算。例如clusterApply ( cl , 1 : 2 , get ( "+" ) , 3 ),cl为集群数,1:2为输入变量值,"+" 为即将进行的计算,3为 "+" 函数的另一参数 > fxy <- function ( x , y ) { + x ^ 2 + sin ( y ) – 1 + } > Z <- clusterApply ( cl , 1 : 10 , fxy , y = 2 ) > z <- unlist ( Z ) > z [1] 0.9092974 3.9092974 8.9092974 15.9092974 24.9092974 35.9092974 48.9092974 [8] 63.9092974 80.9092974 99.9092974
例:16.1 (4) > M <- matrix ( rnorm ( 50000000 ) , 100 , 500000 ) > Mysort <- function ( x ) { + return ( sort ( x ) [ 1 : 10 ] ) + } > do_apply <- function ( M ) { + return ( apply ( M , 2 , mysort ) ) > do_parallel <- function ( M , ncl ) { + cl <- makeCluster ( getOption ( "cl.cores" , ncl ) ) + ans <- parApply ( cl , M , 2 , mysort ) + stopCluster ( cl ) + return ( ans )
例:16.1 (4) > system.time ( ans <- do_apply ( M ) ) 用户系统流逝 70.92 0.28 71.71 > system.time ( ans2 <- do_parallel ( M , 2 ) ) 10.22 5.89 43.43
例:16.1 (5) > set.seed ( 123 ) > clusterCall ( cl , function ( ) rnorm ( 1 ) ) [[1]] [1] 0.4499589 [[2]] [1] -1.50794 [1] 1.010458 [1] 0.9822696
例:16.1 (5) > clusterSetRNGStream ( cl , iseed = 123 ) # 给集群内的随机数的生成设置随机种子 > clusterCall ( cl , function ( ) rnorm ( 1 ) ) [[1]] [1] -0.9685927 [[2]] [1] -0.4094454 > clusterCall (cl , function ( ) rnorm ( 1 ) )
(二)foreach包与doParallel包 doParalle多与foreach包一同使用,doParallel包使foreach并行计算成为可能。foreach的功能类似于for 或者lapply函数,其最大的好处在于代码简单而且容易采用并行方式进行计算。foreach可进行并行或串行计算,为了提示计算机foreach将采用并行计算的方式,在开辟工作节点的基础上,1需声明并行计算将要用到的节点数。声明方式如下: > library ( foreach ) > library ( doParallel ) > cl <- makeCluster ( getOption ( "cl.cores" , 2 ) ) > registerDoParallel ( cl ) …… > stopCluster ( cl )
在进行并行节点数目的声明后,可以采用getDoParWorkers()对目前的工作节点 数进行确定。 接下来我们同样使用几个例子来说明foreach包的基本应用和注意事项。
例:16.2 (1)foreach与for loop的关系 > X <- matrix ( 0 , nr = 10 , nc = 10 ) > for ( i in 1 : 10 ) { + X [ i , ] <- 5 * i – rnorm ( 10 , mean = i ) + } # 改写成foreach为 > X <- foreach ( i = 1 : 10 , .combine = 'rbind' ) %do% { 5 * i – rnorm ( 10 , mean = i ) } # 改写为并行计算格式为 > X <- foreach ( i = 1 : 10 , .combine = 'rbind' ) %dopar% { 5 * i – rnorm ( 10 , mean = i ) }
在例16.2(1)中,我们对矩阵X进行了赋值操作。其中%do%与%dopar%都为 二进制操作,而且%dopar%将原问题分配到不同的工作节点进行并行计算。另 外,有一个需要注意的地方是 .combine的使用。.combine为%do%或%dopar%计 算结果的结合方式。例16.2(1)中的 .combine = rbind表示将各个节点返回的 结果作为行进行合并处理。常见的 .combine的形式有“c”,“cbind”,“rbind” 或者其他函数形式。详细情况见例16.2(2)。
例:16.2 (2) .combine的函数形式 > cfun <- function ( x , y ) { + c ( x [ 1 ] , y [ 2 ] ) + } > foreach ( i = 1 : 3 , .combine = 'cbind' ) %do% { z <- i : ( i + 5 ) } result.1 result.2 result.3 [1,] 1 2 3 [2,] 2 3 4 [3,] 3 4 5 [4,] 4 5 6 [5,] 5 6 7 [6,] 6 7 8 > foreach ( i = 1 : 3 , .combine = 'rbind' ) %do% { z <- i : ( i + 5 ) } [,1] [,2] [,3] [,4] [,5] [,6] result.1 1 2 3 4 5 6 result.2 2 3 4 5 6 7 result.3 3 4 5 6 7 8 > foreach ( i = 1 : 3 , .combine = 'cfun' ) %do% { z <- i : ( i + 5 ) } [1] 1 4
例16. 2(2)中的前两个例子比较容易理解,我们着重解释一下第3个例子。在 第3个例子中, 例16.2(2)中的前两个例子比较容易理解,我们着重解释一下第3个例子。在 第3个例子中,.combine整合结果的方式为首先取result.1的第一个元素与result.2 的第二个元素结合形成向量c ( 1 , 3 ),然后取c ( 1 , 3 ) 的第一个元素1与result.3 的第二个元素4 合并形成 c ( 1 , 4 )。 例16.2(1)和例16.2(2)中用于迭代运算的变量i是一个向量,但foreach的迭 代运算并不局限于向量,可以为列表矩阵等任意一种格式。在进行计算时, foreach函数首先将该变量自动转化为迭代器iterator格式,之后进行迭代运算。 例16.2(3)给出了一个更为一般的例子。
例:16.2 (3) iterator格式的迭代 > library ( iterators ) > foreach ( i2 = iter ( list ( x = 1 : 3 , y = 10 , z = c ( 7 , 8 ) ) ) , .combine = 'c' ) %do% { i2 } [1] 1 2 3 10 7 8 在例16.2(3)中第一次迭代带入运算的是x = 1 : 3,而第二次迭代带入运算的 是y = 10,第三次迭代带入运算的是z = c ( 7 , 8 )。 在很多时候,程序中可能存在多重循环或者条件选择。这时可用%:%进行条件 嵌套,用例16.2(4)和例16.2(5)分别说明条件选择嵌套情况,以及两重循 环嵌套情况。
例:16.2 (4)条件选择嵌套 # a quick sort function > qsort <- function ( x ) { + n <- length ( x ) + if ( n == 0 ) {x + } else { + p <- sample ( n , 1 ) + # 统计比x [ p ] 小的值 + smaller <- foreach ( y = x [ -p ] , .combine = c ) %:% when ( y <= x [ p ] ) %do% y + # 统计比x [ p ] 大的值 + larger <- foreach(y=x[-p], .combine=c) %:% when(y > x [ p ] ) %do% y + c ( qsort ( smaller ) , x [ p ], qsort ( larger ) ) # 采用递归的方法排序。 + } + } > qsort ( runif ( 12) ) # 对随机生成的12个数进行排序
例:16.2 (5)多重循环嵌套 > sim <- function ( x , y ) {10 * x + y} > avc <- 1 : 4 > bvc <- 1 : 4 > x <- matrix ( 0 , length ( avc ) , length ( bvc ) ) > for ( j in 1 : length ( bvc ) ) { + for ( i in 1 : length ( avc ) ) + x [ i , j ] <- sim ( avc [ i ] , bvc [ j ] ) + } # 改写为foreach格式 > x <- foreach ( b = bvc , .combine = 'cbind' ) %:% + foreach ( a = avc , .combine = 'c' ) %do% { + sim ( a , b ) + }
在本节的最后,我们就工作节点等返回节点的顺序做一个说明。 在本节的最后,我们就工作节点等返回节点的顺序做一个说明。.inorder = TRUE可以控制results的产生顺序与迭代变量的顺序相同。当结果顺序并不影响分析目的时,可采用 .inorder = FALSE的设置提高程序的运行效果。
Multithreading Parallel 3 HPC多线程并行计算 Multithreading Parallel
接下来这一节具体说明在计算机集群中如何将R程序并行。具体 的实现步骤包括: 单机DOS执行命令测试 计算机集群执行命令(多进程运行程序)
这个过程有以下的注意事项: 并行计算首先需要在HPC上进行,可能不同地方的HPC执行方 式有些不同,本例以厦门大学经济学院的HPC为例。执行路径 和文件存储路径均应设置在网络路径内,在HPC内显示的是Z 盘 并行计算时,传到各个核心上的参数只能是一个,且取值范围 为正整数
并行计算过程中,有些核心会由于出错而终止某一项任务。在 显示任务失败时,并不是全部都失败而是有部分程序失败了
被传输核心上的程序的输出必须以文件的形式保存而后汇总。 命名需和传输到该核心的参数值相关,方便区分
由于采用的是多进程,尽量避免在程序中嵌套并行计算 如果需要调用R包,需在程序里首先进行安装,且安装时要提 供镜像
接下来我们通过一个例子进行说明,比如要并行名为exp_111.R的程序。 其中,前三行程序为必需的,第1句表示读入dos系统中的操作,第2句是 找到参数位置并读入,第3句将参数转化为数值。之所以Args [ ] 中是6, 和后面的输入有关系。 另外,ncvreg包是需要安装的,如果要安装其他包则有两种途径:(1)由 HPC管理员安装;(2)在library之前加install.packages ( ‘ncvreg’ , repos = ’ http://mirrors.xmu.edu.cn/CRAN/’ )。
> Args <- commandArgs ( ) > T <- Args [ 6 ] > T <- as.numeric ( T ) > library ( ncvreg );library ( MASS ) > source ( 'AIC_CV.R' );source ( 'beta_calculate.R' );source ( 'cross_log.R' ) # 相应的文件AIC_CV.R 等需要提前放在相应路径下。 > nfold <- 2;n <- 10; p <- 50;ro <- 0.5;g <- 5; K <- p / g;gama <- 5 > Lambda1 <- c ( 0.03 , 0.05 , 0.07 , 0.09 , 0.1 , 0.3 ) > Lambda2 <- c ( 0.07 , 0.08 , 0.09 , 0.1 , 0.2 , 0.3 , 0.4 , 0.5 , 0.6 , 0.7 , 0.8 , 0.9 ) > sigma <- matrix ( , g , g ) > for ( ii in 1 : g ) { + for ( jj in ii : g ) { + sigma [ ii , jj ] <- ro ^ abs ( ii - jj ) + sigma [ jj , ii ] <- sigma [ ii , jj ] + } + }
> b <- vector ( ) > b [ 1 ] <- 0 > b [ 2 : 26 ] <- rep ( c ( 1 , 1 , 1 , 1 , 1 ) , each = 5 ) > b [ 27 : ( p + 1 ) ] <- 0 > b0 <- rep ( 0 , (p + 1 ) ) > as.numeric ( Sys.time ( ) ) -> t > set.seed ( ( t - floor ( t ) ) * 1e8 -> seed) > print ( seed ) > set.seed ( T * 200000 ) > X <- matrix ( , n , p ) > for ( i in 1 : K ) { + X [ , ( g * ( i - 1 ) + 1 ) : ( g * i ) ] <- mvrnorm ( n = 100 , rep ( 0 , g ) , sigma ) + } > X <- cbind ( 1 , X ) > prob1 <- 1 / ( 1 + exp ( - X %*% b ) ) > y = rbinom ( n , 1 , prob = prob1 )
给出上述exp_111.R程序之后,下面我们就来详细叙述将它并行的过程。 > A <- diag ( 1 , nc = ( p + 1 ) , nr = ( p + 1) ) > A [ 2 : ( p + 1 ) , 2 : ( p + 1 ) ] <- ( cor ( X [ , 2 : ( p + 1 ) ] ) ) ^ 3 > beta <- AIC_CV ( X, y , b0 , Lambda1 , Lambda2 ) # 执行的函数结果必须保存,保存在“//hpcserver-soe/HPCUserFile$/fanxinyan/”之后的文件里 > write.csv ( beta , paste0 ( '//hpcserver-soe/HPCUserFile$/fanxinyan/R 并行/parallel with R/' , + 'beta' , T , '.csv' ) ) 给出上述exp_111.R程序之后,下面我们就来详细叙述将它并行的过程。
(一)单机DOS执行命令测试 1) 打开DOS界面:开始 → Command Prompt,或开始 → 运行,输入“cmd”, 回车。
(一)单机DOS执行命令测试 2)将路径设置到存放文件的(\\hpcserver-soe\HPCUserFile$\fanxinyan\)Z盘。方 法:
(一)单机DOS执行命令测试 3) 执行R 和exp_111 文件,输入参数值为10 4)检查输出文件
(二)计算机集群执行命令(多进程运行程序) 1) 打开 HPC job Manager。 2)点击新建参数扫描作业。
(二)计算机集群执行命令(多进程运行程序) 3)进入如下配置界面,主要设置红色圈起来的位置。其中起始值和最终值 表示传输参数的取值范围。命令行表示执行的DOS界面下的操作。工作目录 必须设置且按如下格式设置到Z盘。任务名可以自己命名,也可以保留默认。
(二)计算机集群执行命令(多进程运行程序) 4)设置完成后单击“提交”按钮。可以从“我的作业”观察程序运行情况。
谢 谢! Thank You 方匡南 xmufkn@xmu.edu.cn 请扫描加微信