来源 https://zhuanlan.zhihu.com/p/455171966


目录
收起
估算pi
找素数

算法在并行环境中的性能通常由必须共享的状态数量来决定。如果多个Python进程之间没有互相通信(可以认为没有共享状态),那么通信开销并不大;如果每一个进程都需要和其他所有Python进程之间通信,那么通信开销会比较大,事情的处理会越来越慢,最终使整体性能下降。

Python用于单核,实现有效的并行化会比较困难。当我们使用n个核心来解决问题时,我们的速度理论上可以达到原来的n倍。所以为了达到更好的加速,这篇文章会用两个例子探讨使用多核的方法。

multiprocessing模块能让我们使用基于进程和基于线程的并行处理,在队列上共享任务和在进程间共享数据:

  • 进程或池对象并行化一个任务。
  • 哑元模块在线程池中并行化一个I/O任务。
  • 队列共享捎带的工作。
  • 在并行工作者之间共享状态(如字节、原生数据类型、字典和列表)。

先做一些名词上的解释,在multiprocessing中:

  • 进程:一个当前进程的派生(forked)拷贝,创建了一个新的进程标识符,并且任务在操作系统中以一个独立的子进程运行。
  • 池:包装了进程或线程。在一个方便的工作者线程池中共享一块工作并返回聚合的结果。
  • 队列:一个先进先出(FIFP)的队列,允许多个生产者和消费者。
  • 管理者:一个单向或双向的在两个进程间的通信渠道。
  • ctypes:允许在进程派生(forked)后,在父子进程间共享原生数据类型(例如,整型数、浮点数和字节数)。
  • 同步原语:锁和信号量在进程间同步控制流。

看例子。

估算pi

我们使用蒙特卡洛办法估算pi。我们将10000000个随机点放入长为2的正方形中,如果点的坐标满足:

则表示点就落入到单位圆中,反之则落到圆外。由此我们可以估算出3位小数可靠的pi值。

使用Python写一个循环来实现这一过程:

import random

#用循环估计pi
def calculate_pi(nbr_estimates):
    sum_trials = 0
    for step in range(int(nbr_estimates)):
        x = random.uniform(0,1)
        y = random.uniform(0,1)
        in_unit_circle = x*x + y*y <= 1.0
        sum_trials += in_unit_circle
    return sum_trials

使用multiprocessing导入进程池,我们创建一个包含nbr_estimates 的列表,被工作者的数量整除。它会将参数送给每一个工作者。执行后,我们会收到相同数量的返回结果,把这些结果累加起来去估算单位圆内的点数量。

if __name__ == "__main__":
    from multiprocessing import Pool
import time
    nbr_samples = 1e8
    parallel_blocks = 4#进程数量
    pool = Pool(processes = parallel_blocks)#使用multiprocessing.Pool创建进程池
    nbr_samples_per_worker = nbr_samples/parallel_blocks
    print("Making {} samples per work".format(nbr_samples_per_worker))
    nbr_trials_per_process = [nbr_samples_per_worker]*parallel_blocks
    start = time.time()
    #map是同步阻塞执行,即上一个子进程结束后才能进行下一个子进程。
    #iterable可迭代类型,将iterable中每个元素作为参数应用到func函数中,返回list
    nbr_in_unit_circles = pool.map(func = calculate_pi,iterable = nbr_trials_per_process)
    pi_estimate = sum(nbr_in_unit_circles)*4/nbr_samples
    print("Estimated pi",pi_estimate)
print("Delta: ",time.time()-start)

下表列出了随着使用进程数与运行时间的关系:

进程数 1 2 4 6 8 16
Samples per work 1e8 5e7 2.5e7 1.66e7 1.25e7 6.25e6
时间 49.227 24.015 12.145 9.426 8.440 7.460

从单进程到6个线程,每增加一倍的线程,速度也会相应提升近一倍。当达到8进程和16进程时,速度提升越来越小。这可能是因为我是在六核CPU的linux系统上运行的,6个超线程几乎榨干了CPU的所有性能,几乎已经最大化利用6个CPU了。

现在我们使用一个进程中的多线程(将from multiprocessing import Pool改成from multiprocessing.dummy import Pool就能得到线程版本):

if __name__ == "__main__":
    from multiprocessing.dummy import Pool
    import time
    nbr_samples = 1e8
    parallel_blocks = 2#线程数量
    pool = Pool(processes = parallel_blocks)
    nbr_samples_per_worker = nbr_samples/parallel_blocks
    print("Making {} samples per work".format(nbr_samples_per_worker))
    nbr_trials_per_process = [nbr_samples_per_worker]*parallel_blocks
    start = time.time()
    nbr_in_unit_circles = pool.map(func = calculate_pi,iterable = nbr_trials_per_process)
    pi_estimate = sum(nbr_in_unit_circles)*4/nbr_samples
    print("Estimated pi",pi_estimate)
    print("Delta: ",time.time()-start)

线程数 1 2 4 6 8 16
Samples per work 1e8 5e7 2.5e7 1.66e7 1.25e7 6.25e6
时间 48.551 48.724 53.873 56.326 57.118 58.30

多进程中运行了一定数量的Python进程,每个进程都有自己的内存空间等,所以没有GIL竞争。而对一个进程的多线程,会存在GIL竞争而造成额外开销,于是代码运行会变慢。

GIL竞争图。红色表示线程在重复设法获得GIL但失败了;绿色表示一个运行中的线程;白色表示线程闲置。Python 中的多线程对于I/O 密集型任务有优势,但是对CPU 密集型问题则没有优势。

此外,每当一个线程被唤醒并设法获得GIL时,就使用了系统资源。如果一个线程忙碌,那么其他线程将重复不断地唤醒并设法获取GIL,这些重复造成开销越来越大。当然,这是多线程运行于多个CPU上的问题,多线程的单核CPU没有没有GIL竞争。

我们知道,numpy因在RAM的连续块以低层次创建和操控相同的对象类型而更快,对缓存更加友好。我们使用numpy来解决上面的问题,查看速度提升。使用numpy估算pi(矢量化版本):

import numpy as np

def calculate_pi(nbr_samples):
    np.random.seed()#在每个新进程中为 numpy 设置随机种子,否则folk将意味着它们都共享相同的状态k将意味着它们都共享相同的状态k将意味着它们都共享相同的状态k将意味着它们都共享相同的状态
    xs = np.random.uniform(0, 1, nbr_samples)
    ys = np.random.uniform(0, 1, nbr_samples)
    estimate_inside_quarter_unit_circle = (xs * xs + ys * ys) <= 1
    nbr_trials_in_quarter_unit_circle = np.sum(estimate_inside_quarter_unit_circle)
return nbr_trials_in_quarter_unit_circle
进程版本(包括串行和多进程)运行:
if __name__ == "__main__":
    from multiprocessing import Pool
    import time
    nbr_samples = 1e8
    parallel_blocks = 1#进程数量
    pool = Pool(processes = parallel_blocks)
    nbr_samples_per_worker = nbr_samples/parallel_blocks
    print("Making {} samples per work".format(nbr_samples_per_worker))
    nbr_trials_per_process = [int(nbr_samples_per_worker)]*parallel_blocks
    start = time.time()
    nbr_in_unit_circles = pool.map(func = calculate_pi,iterable = nbr_trials_per_process)
    pi_estimate = sum(nbr_in_unit_circles)*4/nbr_samples
    print("Estimated pi",pi_estimate)
print("Delta: ",time.time()-start)

结果:

进程数 1 2 4 6 8 16
Samples per work 1e8 5e7 2.5e7 1.66e7 1.25e7 6.25e6
时间 11.382 4.133 1.220 0.701 0.643 0.549

比使用纯Python版本快了很多。

使用线程版本:

if __name__ == "__main__":
    from multiprocessing.dummy import Pool
    import time
    nbr_samples = 1e8
    parallel_blocks = 1#进程数量
    pool = Pool(processes = parallel_blocks)
    nbr_samples_per_worker = nbr_samples/parallel_blocks
    print("Making {} samples per work".format(nbr_samples_per_worker))
    nbr_trials_per_process = [int(nbr_samples_per_worker)]*parallel_blocks
    start = time.time()
    nbr_in_unit_circles = pool.map(func = calculate_pi,iterable = nbr_trials_per_process)
    pi_estimate = sum(nbr_in_unit_circles)*4/nbr_samples
    print("Estimated pi",pi_estimate)
print("Delta: ",time.time()-start)

得到结果:

线程数 1 2 4 6 8 16
Samples per work 1e8 5e7 2.5e7 1.66e7 1.25e7 6.25e6
时间 9.414 2.169 1.991 1.943 1.912 1.976

同样比纯Python快了很多,并且线程越多,速度基本会越快。通过工作于GIL之外,numpy能够用多线程达到更快的速度。

找素数

找素数是与估算pi不同的问题,因为工作负载会因数值位置而变,并且每个数字检查都有不可预测的复杂度。我们会使用调制队列来使用计算资源,并用简单的办法有效使用资源。使用Python寻找素数:

import math

def check_prime(n):
    if n % 2 == 0:
        return False
    start = 3
    end = int(math.sqrt(n))+1
    for i in range(start , end, 2):
        if n % i == 0:
            return False
return True

当我们给进程池分配工作时,我们可以指定要给每个工作者传递的工作量。我们可以均匀地划分所有工作并力求一次传递完,或者我们也可以创建很多工作块,当CPU空闲时就把它们传递出去。

chunksize指定了工作块的大小。更大的工作块意味着更少的通信开销,而更小的工作块意味着资源分配更灵活。对找素数来说,一个单独的工作划片是一个由check_prime检测的数字n。例如chunksize是8就意味着每一个进程处理一列8个整数,同时处理一列。

为了探究工作块的影响,我们固定了进程数而改变chunksize:

if __name__ == "__main__":
    from multiprocessing import Pool
    import time
    n = 1e8
    parallel_blocks = 6#进程数量
    pool = Pool(processes = parallel_blocks)
    nbr_list = [int(n+i) for i in range(1000000)]
    chunksize = 2#工作块的划分
    start = time.time()
    nbr_in_unit_circles = pool.map(func = check_prime,iterable = nbr_list,chunksize = chunksize)
    print("Delta: ",time.time()-start)

我们可以看到从1(每个任务是一个单独的工作划片)到64(每个任务是一列64个数字)之间变化的chunksize的效果:

chunksize 1 2 4 8 16 32 64 128
时间 31.625 15.052 7.517 4.834 3.967 3.592 4.099 3.775

尽管有小的任务能带来高的灵活性,但也强加了最大的通信开销。所有的CPU 会有效地得以利用,但是当每一个任务和处理结果都经过一个单独的通信管道传输时,这个单独的信道就变成了一个瓶颈。

把chunksize 翻2倍,任务就以两倍快的速度得到了解决,因为在通信管道上有更少的竞争,但过大的chunksize有可能会让时间变长,比如让chunkszie=50000,那么两个CPU会有空闲,从而造成低效。因此错配工作负载和可利用资源也会导致低效

这种情况下的优化是让任务数量和CPU数量一致,即默认的chunksize。

如上图所示,错配会导致低效。1到5个工作块都会使一部分CPU无法利用,6个工作块时采用上所有资源。这时如果增加了第7个工作块,那么资源会再次利用不足,因为6个CPU会工作在他们的块上,接着一个CPU将运行计算第7个块。

原创文章,转载请注明出处:http://124.221.219.47/article/891445/