Python多进程并发与同步机制超详细讲解
在《多线程与同步》中介绍了多线程及存在的问题,而通过使用多进程而非线程可有效地绕过全局解释器锁。 因此,通过multiprocessing模块可充分地利用多核CPU的资源。
多进程
多进程是通过multiprocessing包来实现的,multiprocessing.Process对象(和多线程的threading.Thread类似)用来创建一个进程对象:
- 在类UNIX平台上,需要对每个Process对象调用join()方法 (实际上等同于wait)避免其成为僵尸进程。
- multiprocessing提供了threading包中没有的IPC(比如Pipe和Queue),效率上更高。应优先考虑Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式。
- 多进程应尽量避免共享资源。必要时可以通过共享内存和Manager的方法来共享资源。
僵尸进程
在unix或unix-like系统中,当一个子进程退出后,它就会变成一个僵尸进程,如果父进程没有通过wait系统调用来读取这个子进程的退出状态的话,这个子进程就会一直维持僵尸进程状态(占据部分系统资源,无法释放)。
要清除僵尸进程,有:
结束父进程(一般是主进程):当父进程退出的时候僵尸进程也会被随之清除。
读取子进程退出状态:如通过multiprocessing.Process产出的进程可以:
- 调用join()来等待子进程的方法来(内部会wait子进程);
- 在父进程中处理SIGCHLD信号:在处理程序中调用wait系统调用或者直接设置为SIG_IGN来清除僵尸进程;
把进程变成孤儿进程,这样进程就会自动交由init进程来自动处理。
通过设定signal.signal(signal.SIGCHLD, signal.SIG_IGN)
或join进程可避免僵尸进程的产生
def zombieProc(): print("zombie running") time.sleep(5) print("zombie exit") if __name__ == '__main__': signal.signal(signal.SIGCHLD, signal.SIG_IGN) proc = multiprocessing.Process(target=zombieProc) proc.start() # proc.join() time.sleep(30)
Process类
Process([group [, target [, name [, args [, kwargs]]]]])
,实例化得到的对象,表示一个子进程任务:
- group参数未使用,值始终为None;
- target表示调用对象,即子进程要执行的任务;
- args表示调用对象的位置参数元组,args=(1, ‘test’, [‘one’]);
- kwargs表示调用对象的字典参数,kwargs={‘name’:‘mike’,‘age’:18};
- name为子进程的名称;
Process类的属性与方法:
- start():启动进程,并执行其run方法;
- run():进程启动时运行的方法,继承Process类时必须要实现方法;
- terminate():强制终止进程,不会进行任何清理操作(若p创建了子进程,则子进程就成了僵尸进程);如进程还持有锁等,那么也不会被释放,进而导致死锁;
- is_alive():返回进程是否在运行状态;
- join([timeout]):等待进程终止;
- daemon:默认值为False,如果设为True,代表为守护进程(当父进程终止时,随之终止;并且不能创建自己的新进程),必须在start()之前设置;
- name:进程的名称;
- pid/ident:进程的pid;
- exitcode:进程在运行时为None、如果为–N,表示被信号N结束;
- authkey:进程的身份验证码(默认是由os.urandom()随机生成的32字符的字符串),在涉及网络连接的底层进程间通信时提供安全性;
也可通过os.getpid()
获取进程的PID,os.getppid()
获取父进程的PID。
函数方式
通过Process类直接运行函数:
def simpleRoutine(name, delay): print(f"routine {name} starting...") time.sleep(delay) print(f"routine {name} finished") if __name__ == '__main__': thrOne = multiprocessing.Process(target=simpleRoutine, args=("First", 1)) thrTwo = multiprocessing.Process(target=simpleRoutine, args=("Two", 2)) thrOne.start() thrTwo.start() thrOne.join() thrTwo.join()
继承方式
通过继承Process类,并实现run方法来启动进程:
class SimpleProcess(multiprocessing.Process): def __init__(self, name, delay): super().__init__() self.name = name self.delay = delay def run(self): print(f"Process {self.name} starting...") time.sleep(self.delay) print(f"Process {self.name} finished") if __name__ == '__main__': thrOne = SimpleProcess("First", 2) thrTwo = SimpleProcess("Second", 1) thrOne.start() thrTwo.start() thrOne.join() thrTwo.join()
同步机制
进程间同步与线程间同步类似(只是所有对象都在multiprocessing模块中):
- Lock/Rlock: 通过acquire()和release()来获取与释放锁;
- Event: 事件信号,通过set()和clear()来设定与清楚信号;通过wait()来等待信号;
- Condition: 条件变量;通过wait()用来等待条件,通过notify/notify_all()来通知等待此条件的进程(等待与通知前,都需先持有锁);
- Semaphore: 信号量;维护一个计数器;
- Barrier: 屏障;只有等待进程数量达到要求数量,才会同时开始执行屏障保护后的代码。
屏障示例:
def waitBarrier(name, barr: multiprocessing.Barrier): print(f"{name} waiting for open") try: barr.wait() print(f"{name} running") time.sleep(2) except multiprocessing.BrokenBarrierError: print(f"{name} exception") print(f"{name} finished") def openFun(): # 屏障满足条件时,执行一次 print("barrier opened") if __name__ == '__main__': signal = multiprocessing.Barrier(5, openFun) for i in range(10): multiprocessing.Process(target=waitBarrier, args=(i, signal)).start() time.sleep(1)
当第5个进程启动时,前面5个进程会同时开始执行(openFun函数会执行一次);当第10个进程启动时,后面5个进程会同时开始执行一次(openFun函数又会执行一次)。
状态管理Managers
Managers提供了一种创建由多进程(包括跨机器间进程共享)共享的数据的方式:
multiprocessing.Manager()
返回一个SyncManager对象;此对象对应着一个管理者子进程(manager process)以及代理(其他子进程使用);- 它确保当某一进程修改了共享对象之后,其他进程中的共享对象也会得到更新;
- 其支持的类型有:list、dict、Namespace、Lock、RLock、Semaphore、BoundedSemaphore、Condition、Event、Queue、Value和Array。
多进程进共享字典与列表(每个进程中都能看到其他进程修改过的内容)
def worker(dictContext: dict, lstContext: list, name): pid = os.getpid() dictContext[name] = pid lstContext.append(pid) print(f"{name} worker: {lstContext}") def managerContext(): mgr = multiprocessing.Manager() multiprocessing.managers dictContext = mgr.dict() lstContext = mgr.list() jobs = [multiprocessing.Process(target=worker, args=(dictContext, lstContext, i)) for i in range(10)] for j in jobs: j.start() for j in jobs: j.join() print('Results:', dictContext)
到此这篇关于Python多进程并发与同步机制超详细讲解的文章就介绍到这了,更多相关Python多进程并发内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
使用fiddler抓包工具Python requests报错:ValueError: check_h
这篇文章主要介绍了使用fiddler抓包工具Python requests报错:ValueError: check_hostname requires server_hostname的解决,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教2023-12-12
最新评论