python多进程控制学习小结

 更新时间:2018年10月31日 09:27:44   作者:青云  
这篇文章主要介绍了python多进程控制学习小结,想要充分利用多核CPU资源,Python中大部分情况下都需要使用多进程,Python中提供了multiprocessing这个包实现多进程。感兴趣的小伙伴们可以参考一下

前言:

python多进程,经常在使用,却没有怎么系统的学习过,官网上面讲得比较细,结合自己的学习,整理记录下官网:https://docs.python.org/3/library/multiprocessing.html

multiprocessing简介

multiprocessing是python自带的多进程模块,可以大批量的生成进程,在服务器为多核CPU时效果更好,类似于threading模块。相对于多线程,多进程由于独享内存空间,更稳定安全,在运维里面做些批量操作时,多进程有更多适用的场景

multiprocessing包提供了本地和远程两种并发操作,有效的避开了使用子进程而不是全局解释锁的线程,因此,multiprocessing可以有效利用到多核处理

Process类

在multiporcessing中,通过Process类对象来批量产生进程,使用start()方法来启动这个进程

1.语法

multiprocessing.Process(group=None,target=None,name=None,args=(),kwargs={},*)

  • group: 这个参数一般为空,它只是为了兼容threading.Tread
  • target: 这个参数就是通过run()可调用对象的方法,默认为空,表示没有方法被调用
  • name: 表示进程名
  • args: 传给target调用方法的tuple(元组)参数
  • kwargs: 传给target调用方法的dict(字典)参数

2.Process类的方法及对象

run()
该方法是进程的运行过程,可以在子类中重写此方法,一般也很少去重构

start()
启动进程,每个进程对象都必须被该方法调用

join([timeout])
等待进程终止,再往下执行,可以设置超时时间

name
可以获取进程名字,多个进程也可以是相同的名字

is_alive()
返回进程是否还存活,True or False,进程存活是指start()开始到子进程终止

daemon
守护进程的标记,一个布尔值,在start()之后设置该值,表示是否后台运行
注意:如果设置了后台运行,那么后台程序不运行再创建子进程

pid
可以获取进程ID

exitcode
子进程退出时的值,如果进程还没有终止,值将是None,如果是负值,表示子进程被终止

terminate()
终止进程,如果是Windows,则使用terminateprocess(),该方法对已经退出和结束的进程,将不会执行

以下为一个简单的例子:

#-*- coding:utf8 -*- 
import multiprocessing
import time

def work(x):
  time.sleep(1)
  print time.ctime(),'这是子进程[{0}]...'.format(x)

if __name__ == '__main__':
  for i in range(5):
    p = multiprocessing.Process(target=work,args=(i,))
    print '启动进程数:{0}'.format(i)
    p.start()
    p.deamon = True

当然也可以显示每个进程的ID

#-*- coding:utf8 -*- 
import multiprocessing
import time
import os

def work(x):
  time.sleep(1)
  ppid = os.getppid()
  pid = os.getpid()
  print time.ctime(),'这是子进程[{0},父进程:{1},子进程:{2}]...'.format(x,ppid,pid)

if __name__ == '__main__':
  for i in range(5):
    p = multiprocessing.Process(target=work,args=(i,))
    print '启动进程数:{0}'.format(i)
    p.start()
    p.deamon = True

但在实际使用的过程中,并不只是并发完就可以了,比如,有30个任务,由于服务器资源有限,每次并发5个任务,这里还涉及到30个任务怎么获取的问题,另外并发的进程任务执行时间很难保证一致,尤其是需要时间的任务,可能并发5个任务,有3个已经执行完了,2个还需要很长时间执行,总不能等到这两个进程执行完了,再继续执行后面的任务,因此进程控制就在此有了使用场景,可以利用Process的方法和一些multiprocessing的包,类等结合使用

进程控制及通信常用类

一、Queue类

类似于python自带的Queue.Queue,主要用在比较小的队列上面

语法:

multiprocessing.Queue([maxsize])

类方法:

qsize()
返回队列的大致大小,因为多进程或者多线程一直在消耗队列,因此该数据不一定正确

empty()
判断队列是否为空,如果是,则返回True,否则False

full()
判断队列是否已满,如果是,则返回True,否则False

put(obj[, block[, timeout]])
将对象放入队列,可选参数block为True,timeout为None

get()
从队列取出对象

#-*- coding:utf8 -*-
from multiprocessing import Process, Queue

def f(q):
  q.put([42,None,'hi'])

if __name__ == '__main__':
  q = Queue()
  p = Process(target=f, args=(q,))
  p.start()
  print q.get() #打印内容: [42,None,'hi']
  p.join()

二、Pipe类

pipe()函数返回一对对象的连接,可以为进程间传输消息,在打印一些日志、进程控制上面有一些用处,Pip()对象返回两个对象connection,代表两个通道,每个connection对象都有send()和recv()方法,需要注意的是两个或以上的进程同时读取或者写入同一管道,可能会导致数据混乱,测试了下,是直接覆盖了。另外,返回的两个connection,如果一个是send()数据,那么另外一个就只能recv()接收数据了

#-*- coding:utf8 -*-
from multiprocessing import Process, Pipe
import time
def f(conn,i):
  print '[{0}]已经执行到子进程:{1}'.format(time.ctime(),i)
  time.sleep(1)
  w = "[{0}]hi,this is :{1}".format(time.ctime(),i)
  conn.send(w)
  conn.close()

if __name__ == '__main__':
  reader = []
  parent_conn, child_conn = Pipe()
  for i in range(4):
    p = Process(target=f, args=(child_conn,i))
    p.start()
    reader.append(parent_conn)
    p.deamon=True

  # 等待所有子进程跑完
  time.sleep(3)
  print '\n[{0}]下面打印child_conn向parent_conn传输的信息:'.format(time.ctime())
  for i in reader:
    print i.recv()

输出为:

三、Value,Array

在进行并发编程时,应尽量避免使用共享状态,因为多进程同时修改数据会导致数据破坏。但如果确实需要在多进程间共享数据,multiprocessing也提供了方法Value、Array

from multiprocessing import Process, Value, Array

def f(n, a):
  n.value = 3.1415927
  for i in range(len(a)):
    a[i] = -a[i]

if __name__ == '__main__':
  num = Value('d',0.0)
  arr = Array('i', range(10))

  p = Process(target=f, args=(num, arr))
  p.start()
  p.join()

  print num.value
  print arr[:]

*print
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]*

四、Manager进程管理模块

Manager类管理进程使用得较多,它返回对象可以操控子进程,并且支持很多类型的操作,如: list, dict, Namespace、lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value, Array,因此使用Manager基本上就够了

from multiprocessing import Process, Manager

def f(d, l):
  d[1] = '1'
  d['2'] = 2
  d[0.25] = None
  l.reverse()

if __name__ == '__main__':
  with Manager() as manager:
    d = manager.dict()
    l = manager.list(range(10))

    p = Process(target=f, args=(d, l))
    p.start()
    p.join() #等待进程结束后往下执行
    print d,'\n',l

输出:
{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

可以看到,跟共享数据一样的效果,大部分管理进程的方法都集成到了Manager()模块了

五、对多进程控制的应用实例

  #-*- coding:utf8 -*-
  from multiprocessing import Process, Queue
  import time
  
  def work(pname,q):
    time.sleep(1)
    print_some = "{0}|this is process: {1}".format(time.ctime(),pname)
    print print_some
    q.put(pname)
  
  if __name__ == '__main__':
    p_manag_num = 2 # 进程并发控制数量2
    # 并发的进程名
    q_process = ['process_1','process_2','process_3','process_4','process_5']
    q_a = Queue() # 将进程名放入队列
    q_b = Queue() # 将q_a的进程名放往q_b进程,由子进程完成
  
    for i in q_process:
      q_a.put(i)
  
    p_list = [] # 完成的进程队列
    while not q_a.empty():
      if len(p_list) <= 2:
        pname=q_a.get()
        p = Process(target=work, args=(pname,q_b))
        p.start()
        p_list.append(p)
        print pname
  
      for p in p_list:
        if not p.is_alive():
          p_list.remove(p)
  
    # 等待5秒,预估执行完后看队列通信信息
    # 当然也可以循环判断队列里面的进程是否执行完成
    time.sleep(5)
    print '打印p_b队列:'
    while not q_b.empty():
      print q_b.get()

执行结果:

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

相关文章

  • 解决Python3 struct报错argument for 's' must be a bytes object

    解决Python3 struct报错argument for 's'&

    这篇文章主要为大家介绍了解决Python3 struct报错argument for 's' must be a bytes object方法详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-08-08
  • TensorFlow和keras中GPU使用的设置操作

    TensorFlow和keras中GPU使用的设置操作

    这篇文章主要介绍了TensorFlow和keras中GPU使用的设置操作,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-05-05
  • Python中threading库实现线程锁与释放锁

    Python中threading库实现线程锁与释放锁

    threading用于提供线程相关的操作,为了保证安全的访问一个资源对象,我们需要创建锁。那么Python线程锁与释放锁如何实现,感兴趣的小伙伴们可以参考一下
    2021-05-05
  • Python命令行解析器argparse详解

    Python命令行解析器argparse详解

    大家好,本篇文章主要讲的是Python命令行解析器argparse详解,感兴趣的同学赶快来看一看吧,对你有帮助的话记得收藏一下,方便下次浏览
    2022-01-01
  • windows+vscode安装paddleOCR运行环境的步骤

    windows+vscode安装paddleOCR运行环境的步骤

    这篇文章主要介绍了windows+vscode安装paddleOCR运行环境,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-11-11
  • Python hashlib模块用法实例分析

    Python hashlib模块用法实例分析

    这篇文章主要介绍了Python hashlib模块用法,结合实例形式分析了Python使用hash模块进行md5、sha1、sha224、sha256、sha512等加密运算相关操作技巧与注意事项,需要的朋友可以参考下
    2018-06-06
  • windows上安装Anaconda和python的教程详解

    windows上安装Anaconda和python的教程详解

    本文主要给大家介绍windows上安装Anaconda和python的教程详解,非常不错,具有参考借鉴价值,需要的朋友参考下
    2017-03-03
  • Python各种扩展名区别点整理

    Python各种扩展名区别点整理

    在本篇文章里小编给大家整理的是关于Python各种扩展名区别点整理,需要的朋友们可以学习下。
    2020-02-02
  • 详解python:time模块用法

    详解python:time模块用法

    这篇文章主要介绍了python:time模块用法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-03-03
  • django drf框架中的user验证以及JWT拓展的介绍

    django drf框架中的user验证以及JWT拓展的介绍

    这篇文章主要介绍了django drf框架中的user验证以及JWT拓展的介绍,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-08-08

最新评论