万豪棋牌官网
  咨询电话:15146302749

万豪国际手机

Python并行编程(十三):进程池和mpi4py模块

1、基本概念

      多进程库提供了Pool类来实现简单的多进程任务。Pool类有以下方法:

      - apply():直到得到结果之前一直阻塞。

      - apply_async():这是apply()方法的一个变体,返回的是一个result对象。这是一个异步的操作,在所有的子类执行之前不会锁住主进程。

      - map():这是内置的map函数的并行版本,在得到结果之前一直阻塞,此方法将可迭代的数据的每一个元素作为进程池的一个任务来执行。

      - map_async():这是map的一个变体,返回一个result对象。如果指定了回调函数,回调函数应该是callable的,并且只接受一个参数。当result准备好时,会自动调用回调函数,除非调用失败。回调函数应该立即完成,否则,持有result的进程将被阻塞。

 

2、测试用例

      创建四个进程池,然后使用map方法进行一个简单的计算。

import multiprocessingdef function_square(data): result = data * data return resultif __name__ == "__main__": inputs = list(range(100)) pool = multiprocessing.Pool(processes=4) pool_outputs = pool.map(function_square, inputs) pool.close() pool.join() print("pool: ", pool_outputs)

      pool.map方法将一些独立的任务提交给进程池。pool.map和内置map的执行结果相同,但pool.map是通过多个并行进程计算的。

 

3、mpi4py模块

      Python提供了很多MPI模块写并行程序。其中mpi4py在MPI-1/2顶层构建,提供了面向对象的接口,紧跟C++绑定的MPI-2。MPI是C语言用户可以无需学习新的接口就可以使用这个库。

      此模块包含的主要的应用:

      - 点对点通讯

      - 集体通讯

      - 拓扑

4、安装mpi4py

      安装mpich:https://www.microsoft.com/en-us/download/confirmation.aspx?id=56727

      下载并安装msmpisetup.exe

       安装完成后安装目录如下:

       

      将bin目录添加到系统环境中:

      

      用cmd输入并显示如下即为安装成功

      

      安装mpi4py

      pip install mpi4py

      MPI测试用例

from mpi4py import MPIdef mpi_test(rank): print("I am rank %s" %rank)if __name__ == "__main__": comm = MPI.COMM_WORLD rank = comm.Get_rank() mpi_test(rank) print("Hello world from process", rank)

      使用mpi运行文件

     

      在MPI中,并行程序中不同进程用一个非负整数来区别,如果我们有P个进程,那么rank会从0到P-1分配。

      MPI拿到rank的函数如下:rank = comm.Get_rank()

      这个函数返回调用它的进程的rank,comm叫做交流者,用于区别不同的进程集合:comm = MPI.COMM_WORLD

 5、MPI点对点通讯

      MPI提供的最实用的一个特性是点对点通讯。两个不同的进程之间可以通过点对点通讯交换数据:一个进程是接收者,一个进程是发送者。

      Python的mpi4py通过下面两个函数提供了点对点通讯功能:

      - Comm.Send(data, process_destination):通过它在交流组中的排名来区分发送给不同进程的数据。

      - Comm.Recv(process_source):接收来自源进程的数据,也是通过在交流组中的排名来分分的。

      Comm变量表示交流着,定义了可以互相通讯的进程组:

      comm  = MKPI.COMM_WORLD

      交换信息测试用例: 

from mpi4py import MPIcomm = MPI.COMM_WORLDrank = comm.rankprint("My rank is :",rank)if rank == 0: data = 10000000 destination_process = 4 comm.send(data, dest=destination_process) print("sending data %s to process %d" %(data, destination_process))if rank == 1: destination_process = 8 data = "hello,I am rank 1" comm.send(data, dest=destination_process) print("sending data %s to process %d" %(data, destination_process))if rank == 4: data = comm.recv(source=0) print("data received is = %s" %data)if rank == 8: data1 = comm.recv(source=1) print("data received is = %s" %data1)

      运行结果:

      

      通过mpiexec -n 9运行9个互相通讯的进程,使用rank的值来区分每个进程。

      整个过程分为两部分,发送者发送数据,接收者接收数据,二者必须都指定发送方/接收方,source=为指定发送者。如果有发送的数据没有被接收,程序会阻塞。

      comm.send()和comm.recv()函数都是阻塞的函数,他们会一直阻塞调用者,直到数据使用完成,同时在MPI中,有两种方式发送和接收数据:

      - buffer模式

      - 同步模式

      在buffer模式中,只要需要发送的数据被拷贝到buffer中,执行权就会交回到主程序,此时数据并非已经发送/接收完成。在同步模式中,只有函数真正的结束发送/接收任务之后才会返回。

 

6、避免死锁

      mpi4py没有提供特定的功能来解决这种情况,但是提供了一些程序员必须遵守的规则来避免死锁的问题。

      出现死锁的情况:

      

from mpi4py import MPIcomm = MPI.COMM_WORLDrank = comm.rankprint("my rank is :",rank)if rank == 1: data_send = "a" destination_process = 5 source_process = 5 data_received = comm.recv(source=source_process) comm.send(data_send, dest=destination_process) print("sending data %s to process %d" %(data_send, destination_process)) print("data received is = %s" %data_received)if rank == 5: data_send = "b" destination_process = 1 source_process = 1 data_received = comm.recv(source=source_process) comm.send(data_send, dest=destination_process) print("sending data %s to process %d" % (data_send, destination_process)) print("data received is = %s" % data_received)

      运行结果:

      

      进程1和进程5产生阻塞,程序阻塞。

      此时两个进程都在等待对方,发生阻塞,因为recv和send都是阻塞的,两个函数都先使用的recv,所以调用者都在等待他们完成。所以讲上述代码改为如下即可解决阻塞:

from mpi4py import MPIcomm = MPI.COMM_WORLDrank = comm.rankprint("my rank is :",rank)if rank == 1: data_send = "a" destination_process = 5 source_process = 5 comm.send(data_send, dest=destination_process) data_received = comm.recv(source=source_process) print("sending data %s to process %d" %(data_send, destination_process)) print("data received is = %s" %data_received)if rank == 5: data_send = "b" destination_process = 1 source_process = 1 data_received = comm.recv(source=source_process) comm.send(data_send, dest=destination_process) print("sending data %s to process %d" % (data_send, destination_process)) print("data received is = %s" % data_received)

      将其中一个函数的recv和send顺序调换。

      运行结果:

      

      也可通过Sendrecv函数解决,代码如下:

from mpi4py import MPIcomm = MPI.COMM_WORLDrank = comm.rankprint("my rank is :",rank)if rank == 1: data_send = "a" destination_process = 5 source_process = 5 # comm.send(data_send, dest=destination_process) # data_received = comm.recv(source=source_process) data_received = comm.sendrecv(data_send, dest=destination_process, source=source_process) print("sending data %s to process %d" %(data_send, destination_process)) print("data received is = %s" %data_received)if rank == 5: data_send = "b" destination_process = 1 source_process = 1 # data_received = comm.recv(source=source_process) # comm.send(data_send, dest=destination_process) data_received = comm.sendrecv(data_send, dest=destination_process, source=source_process) print("sending data %s to process %d" % (data_send, destination_process)) print("data received is = %s" % data_received)

      运行结果:

      

7、集体通讯:Broadcast

      在并行代码的开发中,会经常需要在多个进程间共享某个变量运行时的值,或操作多个进程提供的变量。MPI库提供了在多个进程之间交换信息的方法,将所有进程变成通讯者的这种方法叫做集体交流。因此,一个集体交流通常是2个以上的进程,也可以称为广播——一个进程将消息发送给其他进程。mpi4py模块通过以下方式提供广播的功能:

buf = comm.bcast(data_to_share, rank_of_root_process)

      这个函数将root消息中包含的信息发送给属于comm通讯组其他的进程,每个进程必须通过相同的root和comm来调用它。

      

      测试代码:

from mpi4py import MPIcomm = MPI.COMM_WORLDrank = comm.Get_rank()if rank == 0: variable_to_share = 100else: variable_to_share = Nonevariable_to_share = comm.bcast(variable_to_share, root=0)print("process = %d variable shared = %d" %(rank, variable_to_share))

      运行结果:

      

      rank等于0的root进程初始化了一个变量,variable_to_share,值为100,然后声明了一个广播variable_to_share = comm.bcast(variable_to_share, root=0)

      这个变量将通过通讯组发送给其他进程。

      集体通讯允许组中的多个进程同时进行数据交流。在mpi4py模块中,只提供了阻塞版本的集体通讯(阻塞调用者,直到缓存中的数据全部安全发送。)

      广泛应用的集体通讯应该是:

            - 组中的进程提供通讯的屏障

            - 通讯方式包括:

                  - 将一个进程的数据广播到组中其他进程中

                  - 从其他进程收集数据发给一个进程

                  - 从一个进程散播数据到其他进程中

            - 减少操作

 8、集体通讯:Scatter

      scatter函数和广播很像,但是不同的是comm.bcast将相同的数据发送给所有在监听的进程,comm.scatter可以将数据放在数据中,发送给不同的进程。

      

      comm.scatter函数接收一个array,根据进程的rank将其中的元素发给不同的进程,第一个元素发送给进程0,第二个元素发给进程1,以此类推。

      测试用例:

from mpi4py import MPIcomm = MPI.COMM_WORLDrank = comm.Get_rank()# array_to_share = ["a","b","c","d","e","f","g","h","i","j"]if rank == 0: array_to_share = [0,1,2,3,4,5,6,7,8,9]else: array_to_share = Nonerecvbuf = comm.scatter(array_to_share, root=0)print("Process = %d recvbuf = %s" %(rank, recvbuf))

      执行结果:

      

      注意:列表中的元素个数,需要个进程保持一致。否则会出现如下错误。

      

 9、集体通讯:gather

      gather函数基本上是反向的scatter,即收集所有进程发送到root进程数据。方法如下:

recvbuf = comm.gather(sendbuf, rank_of_root_process)

      sendbuf是要发送的数据,rank_of_root_process代表要接收数据的进程。

      

      测试用例:

from mpi4py import MPIcomm = MPI.COMM_WORLDsize = comm.Get_size()# print(size)rank = comm.Get_rank()data = "process %s" %rank# print("start %s"%data)data = comm.gather(data, root=0)# print(data)if rank == 0: print("rank = %s receiving data to other process" %rank) for i in range(1, size): #data[i] = (i+1) ** 2 value = data[i] print("process %s receiving %s from process %s" %(rank, value, i)) # print(data)

      执行结果:

      

 10、使用Alltoall通讯

      Alltoall集体通讯结合了scatter和gather的功能。在mpi4py中,有以下三类的Alltoall集体通讯。

      - comm.Alltoall(sendbuf, recvbuf);

      - comm.Alltoallv(sendbuf, recvbuf);

      - comm.Alltoallw(sendbuf, recvbuf);

      Alltoall测试用例:

from mpi4py import MPIimport numpycomm = MPI.COMM_WORLDsize = comm.Get_size()rank = comm.Get_rank()a_size = 1# print("numpy arange: %s" %numpy.arange(size, dtype=int))senddata = (rank+1)*numpy.arange(size, dtype=int)recvdata = numpy.empty(size * a_size, dtype=int)print("senddata is %s , recvdata is %s" %(senddata, recvdata))# print("Recvdata is %s: , numpy.empty is %s" %(recvdata, numpy.empty(size * a_size, dtype=int)))comm.Alltoall(senddata, recvdata)print("process %s sending %s, receiving %s" %(rank, senddata, recvdata))

      运行结果:

      

      comm.alltoall方法将task j的sendbuf的第j个对象拷贝到task i中,recvbuf的第j个对象,一一对应。发送过程如图:

      

      可以将左右两个方格看做xy轴,结果一一对应,如左图的(0,0)对应的值为0,其对应的有图的值为右图的(0,0)也为0。左图的3,4对应的值为16,右图(4,3)也为16。

      P0包含的数据[0 1 2 3 4],它将值0赋值给自己,1传给进程P1,2传给进程P2,3传给进程P3,以此类推。

      相同的P1的数据为[0 2 4 6 8] , 它将0传给P0,2传给P1,4传给P2,以此类推。

      All-to-all定制通讯也叫全部交换,这种操作经常用于各种并发算法中,比如快速傅里叶变换,矩阵变换,样本排序以及一些数据库的 Join 操作。

 11、简化操作

      同comm.gather一样,comm.reduce接收一个数组,每一个元素是一个进程的输入,然后返回一个数组,每一个元素是进程的输出,返回给root进程。输出的元素包含了简化的结果。

      简化定义如下:comm.Reduce(sendbuf, recvbuf, rank_of_root_process, op = type_of_reduction_operation)

      这里需要注意的是,参数op和comm.gather不同,它代表你想应用在数据上的操作,mpi4py模块代表定义了一系列的简化操作,包括:

      - MPI.MAX:返回最大的元素

      - MPI.MIN:返回最小的元素

      - MPI.SUM:对所有的元素相加

      - MPI.PROD:对所有元素相乘

      - MPI.LAND:对所有元素进行逻辑操作

      - MPI.MAXLOC:返回最大值,以及拥有它的进程

      - MPI.MINLOC:返回最小值,以及拥有它的进程

      测试用例:

import numpy as npfrom mpi4py import MPIcomm = MPI.COMM_WORLDsize = comm.sizerank = comm.rankarray_size = 3recvdata = np.zeros(array_size, dtype=np.int)senddata = (rank+1)*np.arange(size, dtype=np.int)print("+++++++++++++%s+++++++++++++%s++++++++++++" %(recvdata, senddata))print("Process %s sending %s" %(rank, senddata))comm.Reduce(senddata, recvdata, root=0, op=MPI.SUM)print("on task %s, after Reduce: data = %s" %(rank, recvdata))

      执行结果:

      

      MPI.SUM为求和操作,过程如下:

      

      简化操作将每个task的第i个元素相加,然后放回到P0进程(root进程)的第i个元素中。

, 1, 0, 9);