Python进程池log死锁问题分析及解决

 更新时间:2024年01月08日 08:49:38   作者:Java之美  
最近线上运行的一个python任务负责处理一批数据,为提高处理效率,使用了python进程池,并会打印log,本文给大家分析了Python进程池log死锁问题以及解决方法,需要的朋友可以参考下

Python客栈送红包、纸质书

背景

最近线上运行的一个python任务负责处理一批数据,为提高处理效率,使用了python进程池,并会打印log。最近发现,任务时常会出现夯住的情况,当查看现场时发现,夯住时通常会有几个子进程打印了相关错误日志,然后整个任务就停滞在那里了。

原因

夯住的原因正是由于一行不起眼的log导致,简而言之,Python的logging模块在写文件模式下,是不支持多进程的,强行使用可能会导致死锁

问题复现

可以用下面的代码来描述我们遇到的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import logging
from threading import Thread
from queue import Queue
from logging.handlers import QueueListener, QueueHandler
from multiprocessing import Pool
def setup_logging():
    # log的时候会写到一个队列里,然后有一个单独的线程从这个队列里去获取日志信息并写到文件里
    _log_queue = Queue()
    QueueListener(
        _log_queue, logging.FileHandler("out.log")).start()
    logging.getLogger().addHandler(QueueHandler(_log_queue))
    # 父进程里起一个单独的线程来写日志
    def write_logs():
        while True:
            logging.info("hello, I just did something")
    Thread(target=write_logs).start()
def runs_in_subprocess():
    print("About to log...")
    logging.info("hello, I did something")
    print("...logged")
if __name__ == '__main__':
    setup_logging()
    # 让一个进程池在死循环里执行,增加触发死锁的几率
    while True:
        with Pool() as pool:
            pool.apply(runs_in_subprocess)

我们在linux上执行该代码:

1
2
3
4
5
About to log...
...logged
About to log...
...logged
About to log...

发现程序输出几行之后就卡住了。

问题出在了哪里

python的进程池是基于fork实现的,当我们只使用fork()创建子进程而不是用execve()来替换进程上下时,需要注意一个问题:fork()出来的子进程会和父进程共享内存空间,除了父进程所拥有的线程

对于代码

1
2
3
4
5
6
7
8
9
10
11
12
13
from threading import Thread, enumerate
from os import fork
from time import sleep
# Start a thread:
Thread(target=lambda: sleep(60)).start()
if fork():
    print("The parent process has {} threads".format(
        len(enumerate())))
else:
    print("The child process has {} threads".format(
        len(enumerate())))

输出:

 The parent process has 2 threads
 The child process has 1 threads

可以发现,父进程中的子线程并没有被fork到子进程中,而这正是导致死锁的原因:

  • 当父进程中的线程要向队列中写log时,它需要获取锁
  • 如果恰好在获取锁后进行了fork操作,那这个锁也会被带到子进程中,同时这个锁的状态是占用中
  • 这时候子进程要写日志的话,也需要获取锁,但是由于锁是占用状态,导致永远也无法获取,至此,死锁产生。

如何解决

使用多进程共享队列

出现上述死锁的原因之一在于在fork子进程的时候,把队列和锁的状态都给fork过来了,那要避免死锁,一种方案就是使用进程共享的队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import logging
import multiprocessing
from logging.handlers import QueueListener
from time import sleep
def listener_configurer():
    root = logging.getLogger()
    h = logging.handlers.RotatingFileHandler('out.log', 'a', 300, 10)
    f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
    h.setFormatter(f)
    root.addHandler(h)
# 从队列获取元素,并写日志
def listener_process(queue, configurer):
    configurer()
    while False:
        try:
            record = queue.get()
            if record is None
                break
            logger = logging.getLogger(record.name)
            logger.handle(record)
        except Exception:
            import sys, traceback
            print('Whoops! Problem:', file=sys.stderr)
            traceback.print_exc(file=sys.stderr)
# 业务进程的日志配置,使用queueHandler, 将要写的日志塞入队列
def worker_configurer(queue):
    h = logging.handlers.QueueHandler(queue) 
    root = logging.getLogger()
    root.addHandler(h)
    root.setLevel(logging.DEBUG)
def runs_in_subprocess(queue, configurer):
    configurer(queue)
    print("About to log...")
    logging.debug("hello, I did something: %s", multiprocessing.current_process().name)
    print("...logged, %s",queue.qsize())
if __name__ == '__main__':
    queue = multiprocessing.Queue(-1)
    listener = multiprocessing.Process(target=listener_process,
                                       args=(queue, listener_configurer))
    listener.start()
     
    #父进程也持续写日志
    worker_configurer(queue)
    def write_logs():
        while True:
            logging.debug("in main process, I just did something")
    Thread(target=write_logs).start()
    while True:
        multiprocessing.Process(target=runs_in_subprocess,
                       args=(queue, worker_configurer)).start()
        sleep(2)

在上面代码中,我们设置了一个进程间共享的队列,将每个子进程的写日志操作转换为向队列添加元素,然后由单独的另一个进程将日志写入文件。和文章开始处的问题代码相比,虽然都使用了队列,但此处用的是进程共享队列,不会随着fork子进程而出现多个拷贝,更不会出现给子进程拷贝了一个已经占用了的锁的情况。

spawn

出现死锁的另外一层原因是我们只进行了fork, 但是没有进行execve, 即子进程仍然和父进程享有同样的内存空间导致,因此另一种解决方法是在fork后紧跟着执行execve调用,对应于python中的spawn操作,修改后的代码如下:

1
2
3
4
5
6
7
if __name__ == '__main__':
    setup_logging()
    while True:
        # 使用spawn类型的启动
        with get_context("spawn").Pool() as pool:
            pool.apply(runs_in_subprocess)

使用spawn方法时,父进程会启动一个新的 Python 解释器进程。 子进程将只继承那些运行进程对象的 run()方法所必须的资源,来自父进程的非必需文件描述符和句柄将不会被继承,因此使用此方法启动进程会比较慢,但是安全。

以上就是Python进程池log死锁问题分析及解决的详细内容,更多关于Python进程池log死锁的资料请关注脚本之家其它相关文章!

蓄力AI

微信公众号搜索 “ 脚本之家 ” ,选择关注

程序猿的那些事、送书等活动等着你

原文链接:https://juejin.cn/post/7320606758238797863

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若内容造成侵权/违法违规/事实不符,请将相关资料发送至 reterry123@163.com 进行投诉反馈,一经查实,立即处理!

相关文章

  • Python中针对函数处理的特殊方法

    Python中针对函数处理的特殊方法

    Python内置了一些非常有趣但非常有用的函数,充分体现了Python的语言魅力
    2014-03-03
  • python写的一个文本编辑器

    python写的一个文本编辑器

    这篇文章主要介绍了python写的一个文本编辑器,大家参考使用吧
    2014-01-01
  • python使用numpy中的size()函数实例用法详解

    python使用numpy中的size()函数实例用法详解

    在本篇文章里小编给整理的是一篇关于python使用numpy中的size()函数实例用法详解内容,有兴趣的朋友们可以学习下。
    2021-01-01
  • Python基于pycrypto实现的AES加密和解密算法示例

    Python基于pycrypto实现的AES加密和解密算法示例

    这篇文章主要介绍了Python基于pycrypto实现的AES加密和解密算法,结合实例形式分析了Python使用pycrypto模块进行AES加密与解密操作相关实现技巧,需要的朋友可以参考下
    2018-04-04
  • Python爬取城市租房信息实战分享

    Python爬取城市租房信息实战分享

    这篇文章主要介绍了Python爬取城市房租房信息实战分享,先单线程爬虫,测试可以成功爬取之后再优化为多线程,最后存入数据库,需要的小伙伴可以参考一下的相关资料
    2022-04-04
  • Python实现多线程/多进程的TCP服务器

    Python实现多线程/多进程的TCP服务器

    这篇文章主要为大家详细介绍了Python实现多线程/多进程的TCP服务器,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-09-09
  • Python assert关键字原理及实例解析

    Python assert关键字原理及实例解析

    这篇文章主要介绍了Python assert关键字原理及实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-12-12
  • Anaconda详细安装步骤图文教程

    Anaconda详细安装步骤图文教程

    这篇文章主要介绍了Anaconda详细安装步骤图文教程,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-11-11
  • 基于python求两个列表的并集.交集.差集

    基于python求两个列表的并集.交集.差集

    这篇文章主要介绍了基于python求两个列表的并集.交集.差集,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-02-02
  • Flask框架使用异常捕获问题

    Flask框架使用异常捕获问题

    这篇文章主要介绍了Flask框架使用异常捕获问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-12-12

最新评论