You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

16 KiB

11 通过程序并行计算避免CPU资源浪费

你好,我是尹会生。

在我为运营工作提供技术咨询的时候遇到过这样一个场景这场运营活动需要在电脑和手机端的多个不同应用程序同时推送产品宣传图片和视频。这些大量的图片需要有不同的格式和尺寸视频也需要根据不同的App截取不同的时长。

如果这类需要大量计算的多个任务成为你的日常工作,会花费你不少的时间和精力。不过别担心,我们可以通过程序并行计算,来提升任务效率。

不过你可能会说用Python自动化执行也可以提高计算效率啊那为什么还要学习并行计算呢

要知道Python默认的自动化只能利用CPU的一个逻辑核心如果采用并行计算那就能够最大化地利用CPU资源从而成倍提升大量计算的任务效率。接下来我就详细分析一下并行计算的高效之处。

为什么要进行并行计算

还是我在开头提出的运营工作的场景。如果你从这类任务消耗计算机主要资源的角度去考虑,会发现这类需求有两个共同的特点。

第一它们都需要进行大量的计算而计算主要是通过CPU来实现的。CPU的硬件指标上有两个和计算效率最相关的概念分别是主频和多核。

主频决定CPU处理任务的快慢多核决定处理的时候是否可以并行运行。这和生活中超市的收银一样收银员的工作效率和超市开放了多少个收银台的通道都决定了你能否以最快的速度购买到你想要买的商品。

第二,这些任务往往都需要运行相同的程序,但是程序的参数却需要根据不同的需求进行调整。

虽然咱们可以使用Python自动化执行这些程序从而减少手动操作时间但是我们还可以利用CPU的多核特性让程序尽可能并行执行发挥CPU的全部计算能力提高运行效率。

那么接下来我就来教你怎样利用Python的多进程库来实现程序的并行计算以及怎么提高并行计算的效率。

怎样实现并行计算

要想实现程序的并行计算需要使用到标准库中的multiprocessing多进程库。你可能会问进程是什么呢

进程,是计算机用来描述程序运行状态的名词。一个进程在运行时需要消耗一定的资源包括CPU的时间、内存、设备I/O等。如果两个进程互相独立在同一个任务处理过程中没有前后依赖关系那你可以利用multiprocessing库同时运行多个进程这样就能成倍地减少多个任务执行的总时间。

接下来我就以计算1-100的平方为例看看怎么使用multiprocessing实现并行计算。代码如下

from multiprocessing import Pool

# 计算平方
def f(x):
    return x*x

with Pool(8) as p:
    # 并行计算
    res = p.map(f, range(1, 101))
    print(f'计算平方的结果是:{res}')

在这段代码中我通过Pool包的map()函数来求1到100平方计算由于每次计算平方的过程和下一次计算没有直接关联我就可以使用并行的方式进行计算提高计算效率。

为了让map()函数能够实现并行计算我们必须在使用它之前通过Pool()包为它指定并行计算的进程数量设置要执行的函数名称f以及f()函数所需参数。那么接下来我就带你学习一下我是怎样使用with语句来设置函数的参数并正确执行map()函数的。

首先来看最关键的map()函数它是Pool包实现并行计算的函数之一。在代码中我为map()函数赋值了f和range()函数两个参数。

第一个参数是函数对象。

函数对象会作为map()函数创建进程以后即将执行的主要任务。因此由于这里的含义是指定f对象将要被创建的进程执行而不是将f()函数执行的结果作为新的进程执行所以第一个参数必须使用函数对象f而不能使用f()函数。

第二个参数要求必须是可迭代的对象。

例如我在代码中需要为f函数传递参数为1-100的整数就可以使用range()函数产生1到100的整数并直接返回因为它的返回值就是可迭代对象。

如果参数不是数字就可以采用列表、元组、字典等支持迭代的数据类型代替range()函数作为f()函数的参数。举个例子如果你需要并行调整多个视频的时长就可以采用字典存储路径和要调整的视频时长并把这个字典作为map()函数的第二个参数map()函数会为字典的每个键值对创建一个进程来并行处理它们。

接下来是map()函数中的三个主要部分,我来分析一下它们各自在并行计算中的功能。

第一with语句。这是我们在第七讲学习怎么使用Python打开文件之后第二次用到with语句了。

和文件操作类似,进程打开后也需要妥善关闭,但是进程关闭属于较为底层的操作,如果你不进行操作系统层面的程序设计,是不需要对关闭进程的函数进行修改的,因为使用默认关闭进程的行为,就能满足编写并行计算的需求。

因此multiprocessing库对Pool包支持了比较友好的进程打开和关闭方式即with语句。也就是说multiprocessing库把对进程的操作写在with语句块中而with语句就会自动处理进程的打开和关闭这样在实现并行计算的代码中你就不用学习进程的基本操作也能减轻你学习并发程序的负担。

在了解了with语句可以操作进程的打开和关闭后我们来看代码中我是怎么使用with语句的。

我在代码中使用了“ with Pool(8) as p ”这条语句这里的Pool()类是多进程库支持的进程池功能它的作用是指定一个多进程的程序最多能够并行执行的进程数量。它的参数“8”表示map()函数最多同时运行8个进程。

剩下两个部分是range()函数和f()函数。

range()函数的作用是产生1-100的整数这些整数会在每次创建新的进程时依次作为f()函数的参数并赋值。而f()函数得到参数后会把计算结果返回给map()函数。当f()函数处理完所有的参数后map()函数还会返回一个列表作为运行的结果,并进行输出。

以上就是实现并行计算的主要过程。

如何提高并行计算的效率

我们除了需要掌握并行计算的基本方法外,还可以继续提升并行计算的效率。所以在程序中还有两个地方需要优化。

一个是为并行程序自动指定并行度。在并行计算的基本方法中我使用了手动指定并行度的方式来指定进程最多能够运行多少个。不过手动指定的并行度并不能适合所有的电脑因此就需要根据计算机的CPU核数设置合理的并行度。而且每台计算机的CPU资源是固定不变的那么设置合理的进程数量能让你的并行计算任务充分利用CPU资源。

另一个是统计程序运行的时间。当你对并行计算的数量做了修改后那程序是否对计算效率起到了提升效果呢就还需要更精确的测量这样才能得到更准确的结果。所以我们还需要使用Python统计出程序执行过程一共消耗了多长的时间。

我们先来看怎么自动设置适合你的电脑的并行度。

为并行程序自动指定并行度

计算类的任务包括数字计算、数据统计、视频的编解码等都属于计算密集型应用它们最大的资源开销就是CPU时间的消耗设置的并行度过大或过小都不能达到最好的运行效率。

  • 如果并行度设置过小比如运行的进程数量小于逻辑CPU的数量就会造成部分逻辑CPU因为无法被充分利用而处于闲置状态。
  • 如果并行度设置过大由于现代的操作系统为了保证每个进程都能公平得到CPU资源所以会造成CPU把时间大量消耗在进程切换上。那么并行度设置过大会导致CPU还未完成一个进程的处理时就得切换至下一个进程进行处理而多进程之间来回切换也会消耗CPU时间造成CPU资源的浪费。

那并行度该怎么设置才合理呢通常情况下我们会把并行度设置为逻辑CPU数量的两倍。不过假如计算任务达到小时级别这类任务需要长时间占用CPU资源为了减少切换任务时的开销我建议计算的并行度和逻辑CPU数量保持相等。

这就又有一个问题了该怎么获得计算机的逻辑CPU个数呢Windows可以通过任务管理器获得MacOS可以通过活动监视器获得。如果你希望取得逻辑CPU的个数之后可以根据它的数量自动设置创建进程的数量那么可以通过安装第三方包psutils利用其中的cpu_count()函数取得逻辑CPU个数。

我把并行度自动设置为当前逻辑CPU两倍的代码写在下面供你参考。

from multiprocessing import Pool,Queue
import os
import psutil

# 逻辑cpu个数
count = psutil.cpu_count()

# 定义一个队列用于存储进程id
queue = Queue()

# 用于计算平方和将运行函数的进程id写入队列
def f(x):
    queue.put(os.getpid())
    return x*x

with Pool(count*2) as p:
    # 并行计算
    res = p.map(f, range(1, 101))
    print(f'计算平方的结果是:{res}')

# 并行计算用到的进程id
pids = set()
while not queue.empty():
    pids.add(queue.get())
    
print(f'用到的进程id是: {pids}')

在代码中,我使用了 psutil.cpu_count() 函数来获取逻辑CPU的个数它把“count*2”作为参数传递给Pool()类并以逻辑CPU两倍作为最大创建进程数量从而计算1-100的平方。

这里有两点需要你注意。第一,psutils是process and system utilities的缩写所以它除了获取逻辑CPU数量外还可以获取内存、硬盘、网络连接等和操作系统相关的信息。如果你在工作中需要取得操作系统的运行状态就可以采用psutils包。

第二psutils是第三方库因此在Windows上你需要通过cmd命令行执行pip3 install psutil安装后才能释放psutils包否则会出现模块无法找到的错误。

由于map()函数的第二个参数可能会被传入不可迭代对象这时有可能会导致只运行了一个进程因此我就在多进程执行过程中增加了记录进程ID的功能。而在这一功能中我使用的是**os库、队列库和集合数据类型******按照下面三个步骤来实现对所有创建的进程ID的统计。

首先,使用os库的getpid()函数获取进程ID。

由于map()函数会根据Pool()类的参数事先创建好指定数量的进程而每次运行f()函数都在创建好的进程中执行所以我就采用os库的getpid()函数取得运行f()函数进程的唯一标识这就是使用os库的用途。

接下来,使用队列库存储每次运行进程的ID

为了把每次运行的进程ID存到一个对象中我使用了multiprocessing库的队列包。因为在多进程的程序中不能采用标准数据类型来传递数据所以multiprocessing库还提供了方便进程间通信的对象——Queue队列。

map()函数每执行一次f()函数我就把进程ID作为队列的put()函数的参数并把进程ID放入队Queue中直到所有的f()函数执行完成队列里就会记录每次执行的进程ID信息。

最后,使用集合数据类型存储本次f()函数运行的所有进程ID

为了实现这一功能我需要通过while循环结构根据队列不为空的条件把队列中的进程ID使用get()函数取出来放入pids变量中。

pids变量是集合数据类型集合是一个无序的不重复元素序列需要使用set()创建。你可以把集合当作一个只有键没有值的字典来记忆,它的特点是集合里的元素不能重复。

由于f()函数会多次在一个进程中执行因此在队列中会记录重复的进程ID我把进程ID从队列中取出后放入集合数据类型中自己就不用编写程序自动把重复的进程ID去掉了。而且通过对集合pids中的进程ID进行输出可以看到进程ID的数量刚好和Pool()类指定的并行进程数量相等。

这种用法是我经常在进行多进程程序调试的一种简单用法,我还会把它们的结果写入文件保存,以便程序出现异常执行结果时,可以根据调试的信息进行问题的定位。

统计程序运行的时间

我们除了需要掌握判断程序的并行度外还可以统计并行计算比顺序计算节省了多少时间。那么再遇到相同场景的时候你可以选择并行方式来运行程序提高工作效率。接下来我来教你怎样统计Python程序运行的时间。

在Python中我们可以利用time库的time()函数,来记录当前时间的功能。

  • 首先需要在统计时间代码的前后各增加一次time.time()函数并把它们统计时间的结果存放在time1、time2两个不同的变量中。
  • 然后再把两个变量相减,这样就能取得程序的运行时间了。

我把核心实现代码写在下面供你参考。

# 并行计算时间统计 
with Pool(4) as p:
    # 并行计算
    time1 = time.time()
    res = p.map(f, range(1, 10001))
    time2 = time.time()
    # print(f'计算平方的结果是:{res}')

print(str(time2-time1))


# 串行计算时间统计
list1 = []

time1 = time.time()
for i in range(1, 10001):
    list1.append(f(i))
time2 = time.time()

print(str(time2-time1))

在这段代码中通过time1和time2的时间差就可以得到程序运行的时间了那么根据运行时间我们可以把并行程序和串行程序执行时间的性能进行对比。

这里你需要注意由于计算平方的CPU开销较小比较难体现并行计算的优势你就可以采用并行访问网页或其他CPU开销较高的程序这样会让两个程序的时间差别更加明显。

总结

在最后,我来为你总结一下实现并行计算的基本方法和三个注意事项。

通过multiprocessing的Pool包可以实现基于进程的并行计算功能Pool包的map()函数会根据Pool包指定的进程数量实现并行运行。这里还有三点需要你注意

  1. 作为map()函数的第一个参数你需要传递函数对象f不能传递函数的调用f()形式,这是初学者实现并行任务最容易出现的错误。
  2. 为了让并行度更适合你的电脑应该根据逻辑CPU的个数设置并行度并根据运行时间来对并行数量进一步优化。
  3. 实现并行计算任务的程序除了使用多进程模型外还可以使用多线程模型。多进程的并行计算更适用于计算密集型应用即程序运行过程中主要为计算类CPU开销大的程序多线程模型适合I/O密集型的应用例如:通过互联网进行批量网页访问和下载。如果你想将多进程的并发模型改为多线程的并发模型只需在导入库的时候将“multiprocessing”改为“multiprocessing.dummy”就能实现多线程并行访问网页。我将多进程和多线程两种方式导入库的代码贴在下方供你参考。
# 多进程模型
from multiprocessing import Pool

# 多线程模型
from multiprocessing.dummy import Pool

# multiprocessing.dummy的Pool用法和multiprocessing库相同

我把这节课的相关代码放在了GitHub上,你可以自行查找、学习。

思考题

我为你留一道思考题有一个软件包requests可以通过requests.get('http://www.baidu.com').text 方式访问一个网站并能够得到网页的源代码。假设我为你提供了几十个需要访问的网站你是如何实现这些网站的并行访问的你又能否通过Python对比出逐个访问网页的时间是并行访问的几倍吗