Python多进程编程

纸上得来终觉浅,绝知此事要躬行。

Python多进程编程


1. 多进程编程

由于全局解释锁(GIL)的问题,多线程并不能充分利用多核处理器,如果是一个CPU计算型的任务,应该使用多进程(multiprocessing)模块。虽然两者的工作方式并不相同,但是接口却非常相似。使用多进程模块,给每个进程赋予了单独的Python解释器,这样就规避了全局解释锁带来的问题。

1.1 多进程的使用方式

我们这里只介绍一下基本的使用方式。

  • 目标函数不需要传递参数
from multiprocessing import Process

def worker():
    print('Worker')

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = Process(target=worker)
        jobs.append(p)
        p.start()
$ python multiprocessing.py
Worker
Worker
Worker
Worker
Worker
  • 目标函数可传入参数
from multiprocessing import Process

def worker(num):
    print(f'Worker: {num}')

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()
$ python multiprocessing.py
Worker: 0
Worker: 1
Worker: 2
Worker: 3
Worker: 4

1.2 守护和非守护线程

类似于多线程一样也是可以设置守护进程的,这个守护进程也是可以一直运行且不影响主程序的结束。如果主程序结束了,会随着主程序一起结束。

  • 可以看到如下的守护进程,主程序结束了还没有运行完成,随着主程序一起结束了。
from time import sleep
from multiprocessing import Process, current_process


def daemon():
    p = current_process()
    print(f'Starting: {p.name} {p.pid}')
    sleep(2)
    print('Exiting :', p.name, p.pid)


def non_daemon():
    p = current_process()
    print(f'Starting: {p.name} {p.pid}')
    print('Exiting :', p.name, p.pid)


if __name__ == '__main__':
    d =Process(name='daemon', target=daemon, daemon=True)
    n =Process(name='non-daemon', target=non_daemon)
    d.start()
    sleep(1)
    n.start()
$ python multiprocessing.py
Starting: daemon 41295
Starting: non-daemon 41296
Exiting : non-daemon 41296
  • 当然也是可以设置join方法设置超时参数的,让主程序等待守护进程执行完毕再结束程序。
from time import sleep
from multiprocessing import Process, current_process


def daemon():
    p = current_process()
    print(f'Starting: {p.name} {p.pid}')
    sleep(2)
    print('Exiting :', p.name, p.pid)


def non_daemon():
    p = current_process()
    print(f'Starting: {p.name} {p.pid}')
    print('Exiting :', p.name, p.pid)


if __name__ == '__main__':
    d = Process(name='daemon', target=daemon, daemon=True)
    n = Process(name='non-daemon', target=non_daemon)

    d.start()
    sleep(1)
    n.start()

    d.join()
    n.join()
$ python multiprocessing.py
Starting: daemon 39312
Starting: non-daemon 39318
Exiting : non-daemon 39318
Exiting : daemon 39312
  • join方法不带参数的情况下,默认为None,表示一直会阻塞下去。
from time import sleep
from multiprocessing import Process, current_process


def daemon():
    p = current_process()
    print(f'Starting: {p.name} {p.pid}')
    sleep(2)
    print('Exiting :', p.name, p.pid)


def non_daemon():
    p = current_process()
    print(f'Starting: {p.name} {p.pid}')
    print('Exiting :', p.name, p.pid)


if __name__ == '__main__':
    d = Process(name='daemon', target=daemon, daemon=True)
    n = Process(name='non-daemon', target=non_daemon)

    d.start()
    sleep(1)
    n.start()

    d.join(1)
    
    print('d.is_alive()', d.is_alive())
    n.join()
$ python multiprocessing.py
Starting: daemon 41297
Starting: non-daemon 41298
Exiting : non-daemon 41298
d.is_alive() True

2. 同步机制

multiprocessingLockConditionEventRLockSemaphore等同步原语和threading模块的API风格是一样的,用法也类似,就不展开了。

>>> dir(multiprocessing)
['Array', 'Value',,
 'context', 'cpu_count', 'current_process'
 'Condition', 'Event', 'Lock', 'RLock', 'Semaphore',
 'Queue', 'SimpleQueue', 'JoinableQueue',
 'Manager', 'managers',
 'Pipe', 'Process', 'Pool']

2.1 信号量 - Semaphore

  • 信号量同步基于内部的计数器,每调用一次acquire计数器就会减1,表示获取了一个锁。每调用一次release计数器就会加1,表示释放了这个锁。当计数器为0的时候,acquire的调用就会被阻塞。
import time
import random
import multiprocessing


sema = multiprocessing.Semaphore(3)


def limit_run(seam, num):
    t = multiprocessing.current_process()
    with seam:
        print(f'[{t.pid}]: {num} is acquire sema.')
        sleep_time = random.random() * 2
        time.sleep(sleep_time)
    print(f'[{t.pid}]: {num} is release sema.')


process = []
for num in range(1, 6):
    p = multiprocessing.Process(name="limit_run", target=limit_run, args=(sema, num))
    process.append(p)
    p.start()

for p in process:
    p.join()
$ python multiprocessing.py
[82222]: 1 is acquire sema.
[82223]: 2 is acquire sema.
[82224]: 3 is acquire sema.
[82223]: 2 is release sema.
[82225]: 4 is acquire sema.
[82224]: 3 is release sema.
[82226]: 5 is acquire sema.
[82222]: 1 is release sema.
[82225]: 4 is release sema.
[82226]: 5 is release sema.

2.2 锁 - Lock

  • 互斥锁,即相对于信号量值为1Semaphore,表示同一时刻只能有一个线程来访问这个资源。但是使用了锁会损失一定的性能,因为需要其他线程等待锁的释放。

不加锁的示例

import time
import multiprocessing as mp
from multiprocessing.sharedctypes import Value


value = Value('i', 0)


def getlock():
    global value
    new = value.value + 1
    time.sleep(0.001)
    value.value = new


process = []
for i in range(100):
    p = mp.Process(name="getlock", target=getlock)
    p.start()
    process.append(p)


for p in process:
    p.join()
print(value.value)
$ python multiprocessing.py
91

加了锁的示例

import time
import multiprocessing as mp
from multiprocessing.sharedctypes import Value


lock = mp.Lock()
value = Value('i', 0)


def getlock():
    global value
    with lock:
        new = value.value + 1
        time.sleep(0.001)
        value.value = new


process = []
for i in range(100):
    p = mp.Process(name="getlock", target=getlock)
    p.start()
    process.append(p)


for p in process:
    p.join()
print(value.value)
$ python multiprocessing.py
100

2.3 可重入锁 - RLock

  • 可重入锁就是acquire方法能够不被阻塞的被一个线程重复执行多次,但是需要注意的是release需要调用和acquire相同的次数才能够释放锁。
import multiprocessing

lock = multiprocessing.RLock()
print('First try: ', lock.acquire())
print('Second try: ', lock.acquire(0))
$ python multiprocessing.py
First try: True
Second try: True

2.4 条件 - Condition

  • 接收条件,一个线程等待某种特定的条件,而另一个线程会发出满足这个特定条件的信号。这个同步机制最好的示例说明就是「生产者-消费者」模型。
import time
import multiprocessing as mp


cond = mp.Condition()


def cusumer(cond):
    t = mp.current_process()
    with cond:
        cond.wait()
        print('{t.name}: consumer is start...'.format(t=t))


def product(cond):
    t = mp.current_process()
    with cond:
        print('{t.name}: producer is start...'.format(t=t))
        cond.notify_all()


process = []
for num in range(2):
    c = mp.Process(name='cusumer', target=cusumer, args=(cond,))
    c.start()
    process.append(c)

time.sleep(2)
for num in range(1):
    p = mp.Process(name='product', target=product, args=(cond,))
    p.start()
    process.append(p)

for t in process:
    t.join()
$ python multiprocessing.py
product: producer is start...
cusumer: consumer is start...
cusumer: consumer is start...

2.5 事件 - Event

  • 事件模型,一个线程等待某种特定的条件,而另一个线程会发出满足这个特定条件的信号,最好的示例说明也是「生产者-消费者」模型。事件和条件是不同,在Condition条件中一个条件发出之后,所有接受这个条件的子线程都会处理,但是在Event事件中则是谁接收到谁来处理。
import time
import random
import multiprocessing as mp


event = mp.Event()
manager = mp.Manager()


def consumer(event, q):
    t = mp.current_process()
    while True:
        event_is_set = event.wait(10)
        if event_is_set:
            try:
                integer = q.pop()
                print(f'{integer} popped from list by {t.name}')
                event.clear()
            except IndexError:
                pass


def producer(event, q):
    t = mp.current_process()
    while True:
        integer = random.randint(10, 100)
        q.append(integer)
        print(f'{integer} appended to list by {t.name}')
        event.set()
        time.sleep(1)


threads = []
q = manager.list()

for name in ('consumer1', 'consumer2'):
    c = mp.Process(name=name, target=consumer, args=(event, q))
    print(f'{name} is starting...')
    c.start()
    threads.append(c)

for name in ('producer',):
    p = mp.Process(name=name, target=producer, args=(event, q))
    print(f'{name} is starting...')
    p.start()
    threads.append(p)

for t in threads:
    t.join()
$ python multiprocessing.py
consumer1 is starting...
consumer2 is starting...
producer is starting...
70 appended to list by producer
70 popped from list by consumer2
59 appended to list by producer
59 popped from list by consumer1
40 appended to list by producer
40 popped from list by consumer2
88 appended to list by producer
88 popped from list by consumer2
......

3. 进程间共享状态

多进程模块中提供了进程间共享状态的方案,有三种方案,分别是QueueArrayManager。而共享的意思就是可以在多个进程之前共享数据,如我在一个进程中修改了对应的值,在另一个进程中立马就可以看到修改之后的结果了。在使用多进程的过程中,最好不要使用共享资源,因为普通的全局变量是不能被子进程所共享的,只有通过Multiprocessing组件构造的数据结构可以被共享。

共享方式 对应的函数 适用范围
Queue 使用Multiprocessing.Queue类 只适用于Process类
sharedctypes 使用Multiprocessing.sharedctypes类 只适用于Process类
Manager 使用Multiprocessing.Manager类 可以适用于Pool类

3.1 内存共享 - sharedctypes

内存共享主要是靠多进程模块中sharedctypesValueArray实现的。

  • 常见的共享类型
    • 可以使用缩写也可以使用全称
In [1]: from multiprocessing.sharedctypes import typecode_to_type

In [2]: typecode_to_type
Out[2]:
{'B': ctypes.c_ubyte,
 'H': ctypes.c_ushort,
 'I': ctypes.c_uint,
 'L': ctypes.c_ulong,
 'b': ctypes.c_byte,
 'c': ctypes.c_char,
 'd': ctypes.c_double,
 'f': ctypes.c_float,
 'h': ctypes.c_short,
 'i': ctypes.c_int,
 'l': ctypes.c_long,
 'u': ctypes.c_wchar}
  • 共享内存实例说明
    • 可以看到我们通过modify这个函数对传入的值进行了修改,并且还可以给ValueArray传递lock参数来决定是否带锁,默认为不带锁。这样就实现了进程之间共享状态了。
    • 需要注意的是并不是能够用于typecode_to_type列出来的这些类型,只要是ctypes里面定义的都是可以使用。
from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_bool, c_double

lock = Lock()

class Point(Structure):
    _fields_ = [('x', c_double), ('y', c_double)]

def modify(n, b, s, arr, A):
    n.value **= 2
    b.value = True
    s.value = s.value.upper()
    arr[0] = 10
    for a in A:
        a.x **= 2
        a.y **= 2

n = Value('i', 7)
b = Value(c_bool, False, lock=False)
s = Array('c', b'hello world', lock=lock)
arr = Array('i', range(5), lock=True)
A = Array(Point, [(1.875, -6.25), (-5.75, 2.0)], lock=lock)

p = Process(target=modify, args=(n, b, s, arr, A))
p.start()
p.join()

print n.value
print b.value
print s.value
print arr[:]
print [(a.x, a.y) for a in A]
$ python multiprocessing.py
49
Trueb
'HELLO WORLD'
[10, 1, 2, 3, 4]
[(3.515625, 39.0625), (33.0625, 4.0)]

3.2 服务器进程 - Manager

一个multiprocessingManager对象会控制了服务器进程,其他进程可以通过代理的方式来访问这个服务的进程。

  • [1] 常见的共享方式
共享方式
Namespace 创建一个可分享的命名空间
Value/Array 和上面共享ctypes对象的方式一样
dict/list 创建一个可分享的dict/list并支持对应方法
Condition/Event/Lock/Queue/Semaphore 创建一个可分享的对应同步原语的对象
>>> dir(multiprocessing.Manager())
['Namespace', 'Array', 'Value',
 'dict', 'list',
 'Condition', 'Event', 'Lock', 'RLock', 'Semaphore',
 'Queue', 'JoinableQueue', 'Pool',
 'address', 'connect', 'get_server', 'register',
 'start', 'join', 'shutdown']
  • 共享方式实例说明
    • 创建了一个叫做ns的可分享的命名空间,里面有一个元素是a
    • 创建了一个叫做lproxy的可分享的列表,里面有一个元素是a
    • 创建了一个叫做dproxy的可分享的列表,里面有一个元素是b
    • 其中,p.pid是可以拿到这个进程的对应系统进程的名称的,之后再通过modify这个函数对原有的值进行改变,输出结果。
from multiprocessing import Manager, Process

def modify(ns, lproxy, dproxy):
    ns.a **= 2
    lproxy.extend(['b', 'c'])
    dproxy['b'] = 0

manager = Manager()
ns = manager.Namespace()
ns.a = 1
lproxy = manager.list()
lproxy.append('a')
dproxy = manager.dict()
dproxy['b'] = 2

p = Process(target=modify, args=(ns, lproxy, dproxy))p.start()
print(f'PID: {p.pid}') p.join()
print(ns.a)
print(lproxy)
print(dproxy)
$ python multiprocessing.py
PID: 45121
1
['a', 'b', 'c']
{'b': 0}
  • [2] 分布式的进程间通信
    • 即我们这里使用C/S架构,服务端有一个分享的列表和一个get_list的一个方法来获取分享的列表的值。之后将服务跑起来,客户端就可以使用服务端的get_list来查看和修改分享的列表的值了。
    • 注意在客户端注册的时候,并没有使用这个callable的参数,请多多留意一下哈。
    • 可以通过多进程模块的这个功能做一个分布式的作业调度系统或者服务之间的心跳监测等等,要多多发掘其中的潜能。
# 服务端代码
from multiprocessing.managers import BaseManager

host = '127.0.0.1'
port = 9030
authkey = b'secret'

shared_list = []


class RemoteManager(BaseManager):
    pass


RemoteManager.register('get_list', callable=lambda: shared_list)
mgr = RemoteManager(address=(host, port), authkey=authkey)
server = mgr.get_server()
server.serve_forever()
$ python remote_server.py
...
# 客户端代码
from multiprocessing.managers import BaseManager

host = '127.0.0.1'
port = 9030
authkey = b'secret'


class RemoteManager(BaseManager):
    pass


RemoteManager.register('get_list')
mgr = RemoteManager(address=(host, port), authkey=authkey)
mgr.connect()

l = mgr.get_list()
print(l)
l.append(1)
print(mgr.get_list())
$ python local-client.py
[]
[1]

4. 进程池

任务的执行周期决定了CPU核数和任务的分配算法,使用多进程(Pool)是非常灵活且保障效率的方案。

4.1 队列 - Queue

多线程中有Queue模块来实现队列,而多进程中也包含了Queue这个类,它是

  • 使用两个队列,一个用于存储完成的任务,另一个存储任务完成之后的结果。其中,JoinableQueue里面有jointask_done方法,而Queue里面什么它们的。
  • 在存储完成的任务队列中使用JoinableQueue是以为提供了task_done方法能够标识该任务已经完成,通知其不要一直join阻塞着。
# 这个示例很重要,它更够帮助我们理解队列

import time
from random import random
from multiprocessing import Process, JoinableQueue, Queue


tasks_queue = JoinableQueue()
results_queue = Queue()


def double(n):
    return n * 2


def producer(in_queue):
    while 1:
        wt = random()
        time.sleep(wt)
        in_queue.put((double, wt))
        if wt > 0.9:
            in_queue.put(None)
            print('stop producer')
            break


def consumer(in_queue, out_queue):
    while 1:
        task = in_queue.get()
        if task is None:
            break
        func, arg = task
        result = func(arg)
        in_queue.task_done()
        out_queue.put(result)

processes = []

p = Process(target=producer, args=(tasks_queue,))
p.start()
processes.append(p)

p = Process(target=consumer, args=(tasks_queue, results_queue))
p.start()
processes.append(p)

tasks_queue.join()

for p in processes:
    p.join()

while 1:
    if results_queue.empty():
        break
    result = results_queue.get()
    print(f'Result: {result}')
$ python multiprocessing.py
stop producer
Result: 1.5603713848691385
Result: 0.9995048352324905
Result: 0.5281936405729699
Result: 1.9964631043908454

4.2 进程池 - Pool

多进程模块中已经为我们封装好了进程池,方便我们进行多进程编程。

  • 装饰器 lru_cache 非常适合把耗时的函数执行结果保存起来,避免传入相同参数时重复计算。
  • 这里 map 方法和平时我们使用的基本一样,只不过是这里提供了多进程的支持,提高执行效率。
from functools import lru_cache
from multiprocessing import Pool

@lru_cache(maxsize=None)
def fib(n):
    if n < 2:
        return n
    return fib(n-1) + fib(n-2)

pool = Pool(2)
pool.map(fib, [35] * 2)

4.3 dummy

在开源项目代码中,很多人都使用了 dummy 这个模块。虽然定义在多进程模块里面,但是以相同 API 实现的多线程模块。它提供一种兼容的方式,这样在多线程/多进程之间切换非常方便。

  • 使用技巧
    • 有时候,我们拿不准这个项目到底是使用多进程(CPU)还是多线程(I/O)来处理任务,且没有其它不能选择多进程方式的因素,都统一直接上多进程模式。
    • 处理这种情况还有一个简单暴力的方法,就是使用 dummy 模块实现兼容模式,看测试结果到底哪个更好就采用哪个。
进程类型 使用方式 注意事项
多进程池 from multiprocessing import Pool 绑定一个cpu核心
多线程进程池 from multiprocessing.dummy import Pool 运行于多个cpu核心
from multiprocessing import Pool
from multiprocessing.dummy import Pool

5. 参考文档

介绍一下常用的多进程模块的使用和对应的属性和方法。

5.1 Process

  • 构造方法
    • Process(group, target, name, args, kwargs, daemon)
    • group: 线程组,目前还没有实现
    • target: 要执行的方法
    • name: 指定进程名称
    • args/kwargs: 要传入方法的参数
    • daemon: 是否为守护进程
  • 属性方法
    • start(): 启动Process进程,每个进程对象最多必须调用一次
    • run(): 启动Process进程,可以在子类中覆盖此方法
    • join(): 阻塞当前上下文环境,直到调用此方法的进程终止或到达指定的超时时间
    • is_alive(): 返回进程是否在运行
    • terminate(): 不管任务是否完成,立即停止工作进程
  • 实例方法
    • name: 进程名字
    • pid: 进程号
    • daemon: 和线程的setDeamon功能一样
    • exitcode: 进程在运行时为None、如果为–N,表示被信号N结束
# 继承Process类且重构run函数
import time
from multiprocessing import Process

class MyProcess(Process):
    def __init__(self, arg):
        super(MyProcess, self).__init__()
        self.arg = arg

    def run(self):
        print 'nMask', self.arg
        time.sleep(1)

process = []
for i in range(10):
    p = MyProcess(i)
    process.append(p)
    p.start()
for p in process:
    p.join()

5.2 Pool

  • 构造方法
    • Pool(processes, initializer, initargs, maxtasksperchild, context)
    • processes: 工作进程的数量,不指定则使用os.cpu_count()返回的数量
    • maxtasksperchild: 子进程的最大任务数,用于新老交替不断更新
    • context: 可用于指定用于启动工作进程的上下文
  • 实例方法
    • map(): 阻塞;内置函数map的并行处理
    • map_async(): 非阻塞;可指定callback函数
    • apply: 阻塞;使用参数args和关键字参数kwds调用func方法
    • apply_async: 非阻塞;可指定callback函数
    • join(): 阻塞主进程等待子进程的退出,在close或terminate之后使用
    • close(): 关闭pool使其不在接受新的任务
    • terminate(): 关闭pool结束工作进程不在处理未完成的任务
# 同步进程池 => 阻塞
# 会一个接着一个的执行,输出结果为 num: 2 sleep: 2

import time
from multiprocessing import Pool

def worker(num):
    print(f'num: {num}', end=' ')
    time.sleep(num)
    print(f'sleep: {num}')

pool = Pool(processes=10)
for num in range(100):
    pool.apply(worker, args=(num,))
pool.join()
pool.close()
# 异步进程池 => 非阻塞
# 不等待,直接并发执行100子进程

from multiprocessing import Pool

def worker(num):
    print(f'num: {num}')

pool = Pool(processes=10)
for num in range(100):
    pool.apply_async(worker, args=(num,))
pool.close()
pool.join()

5.3 Queue

  • 多进程编程中常常需进程间进行消息传递,在不使用同步原语的情况下,可以使用管道队列
  • 队列Queue实现queue.Queue的所有方法,除了task_done()join()方法。
  • 队列SimpleQueue是一种简化的Queue类型,非常接近锁定的Pipe
  • 队列JoinableQueue是另外具有task_done()join()方法的队列。
消息传递 作用 多进程模型
管道 用于两个进程之间的连接 Pipe
队列 允许多个生产者和消费者 Queue(先进先出)、JoinableQueue(先进后出)

[1] Queue

  • 构造方法
    • multiprocessing.Queue(maxsize)
    • maxsize: 设置队列数量
  • 实例方法
    • qsize(): 返回队列大致大小;不可靠
    • empty(): 如果队列为空,返回True值;不可靠
    • full(): 如果队列已满,返回True值;不可靠
    • put(): 将obj放入队列
    • put_nowait(): 相当于put(obj, False)
    • get(): 从队列中删除并返回项目
    • get_nowait(): 相当于get(False)
    • close(): 指示当前进程不会在此队列上放置更多数据

[2] SimpleQueue

  • 构造方法
    • multiprocessing.SimpleQueue
    • 没有任何参数可以使用
  • 实例方法
    • put():item放入队列
    • get(): 从队列中删除并返回项目
    • empty(): 如果队列为空,返回True

[3] JoinableQueue

  • 构造方法
    • multiprocessing.JoinableQueue(maxsize)
    • maxsize: 设置队列数量
  • 实例方法
    • join(): 阻塞,直到队列中的所有项目都被获取和处理
    • task_done(): 指示以前入队的任务已完成

5.4 常用函数

  • [1] cpu_count
    • 构造方法: multiprocessing.cpu_count()
    • 函数含义: 返回系统中的CPU数
  • [2] current_process
    • 构造方法: multiprocessing.current_process()
    • 函数含义: 返回与当前进程对应的Process对象
  • [3] get_start_method
    • 构造方法: multiprocessing.get_start_method()
    • 函数含义: 获取当前系统的启动进程的方法名称
  • [4] set_start_method
    • 构造方法: multiprocessing.set_start_method()
    • 函数含义: 设置当前系统的启动进程的方法名称
  • [5] get_all_start_methods
    • 构造方法: multiprocessing.get_all_start_methods()
    • 函数含义: 返回可用的启动进程的方法名称
  • [6] set_executable
    • 构造方法: multiprocessing.set_executable()
    • 函数含义: 设置解释器在启动子进程时要使用的路径