Python多线程如何同时处理多个文件

 更新时间:2023年08月14日 11:35:23   作者:DS..  
这篇文章主要介绍了Python多线程如何同时处理多个文件问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

Python多线程同时处理多个文件

在需要对大量文件进行相同的操作时,逐个遍历是非常耗费时间的。这时,我们可以借助于Python的多线程操作来大大提高处理效率,减少处理时间。

问题背景

比如说,我们现在需要从一个文件夹下面读取出所有的视频,然后对每个视频进行逐帧处理。

由于对视频逐帧处理本身就是比较耗时的任务,如果按照串行的方式顺序处理每个视频文件是效率非常低的,此时,一种比较容易的解决方案是使用Python的多线程进行处理。

定义通用处理函数

并发适合于处理相似的任务。例如我们需要对视频中的每一帧进行处理,处理函数接受的是一个视频名称的列表,对列表内的视频顺序处理,那么我们可以构建以下处理函数:

import cv2
def func(video_names):
    for video_name in video_names:
        cap = cv2.VideoCapture(video_name)
        while True:
            ret, frame = cap.read()
            if ret:
                # process temp frame
            else:
                break

这样,我们只需要将待处理的视频名称按照开启的线程数,划分成多个子列表,就可以进行并发批量处理了。

多线程 Thread

Python中的多线程是通过threading库实现的,除此之外,multiprocessing也可以实现并发处理,二者之间是存在一定差异的。这里我们使用threading来实现同时处理多个不同的文件。

import threading
# video_names_list = [part_names_1_list, part_names_2_list, ..., part_names_k_list]
for part_video in video_names_list:
    thread = threading.Thread(target=func, args=([part_video]))
    thread.start()

在这里,首先将要处理的文件名称列表划分成若干个子列表,然后对每一个子列表开启一个线程进行处理。

Python多线程文件操作

使用python 将在csv文件中的一百万条网址,写入mongo数据库中,这里使用多线程进行操作。

直接贴上代码,如下:

import os
import threading  #导入进程
import csv
import time
from Mongo_cache import MongoCache 
import win32com.client
import winsound
NUM_THREAD = 5
COUNT = 0
lock = threading.Lock()
cache = MongoCache()     #数据库连接初始化
def worker():
"""
func: 从csv文件中读取数据,并将数据返回
"""
    for path in os.listdir(os.getcwd()):
        #print("当前工作目录", path)
        file_name = path.split('.')
        #print(file_name)
        if file_name[-1] == 'csv':
            #print("地址是:",path)
            file = open(path)
            data = csv.reader(file)
            return data
        else:
            pass
def save_info(data,i, num_retries=2):
"""
func: 将数据保存
"""
    global COUNT
    global lock
    global cache
    for _, website in data:
        try:
            lock.acquire()
            #print("线程{}》》》{}正在运行".format(threading.current_thread().name, i))
            item = {'website':website}
            cache(item)
            COUNT += 1
        except:
            if num_retries > 0:
                save_info(data, i, num_retries-1)
        finally:
            lock.release()
def main():
"""
启动线程
"""
    print("start working")
    print("working...")
    data = worker()
    threads = []   #设置主线程
    for i in range(NUM_THREAD):
        t = threading.Thread(target=save_info, args=(data, i))
        threads.append(t)
    for i in range(NUM_THREAD):
        threads[i].start()
    for i in range(NUM_THREAD):
        threads[i].join()
    print("all was done!")
if __name__ == '__main__':
    s_time = time.time()
    main()
    e_time = time.time()
    print("总的信息条数:", COUNT)
    print("总耗时:", e_time-s_time)
    speak = win32com.client.Dispatch('SAPI.SPVOICE')
    speak.Speak("早上好,eric,程序结束!")

数据存储模块

import pickle
import zlib
from bson.binary import Binary 
from datetime import datetime, timedelta
from pymongo import MongoClient
import time
class MongoCache(object):
    def __init__(self, client=None, expires=timedelta(days=30)):
        self.client = MongoClient('localhost', 27017) if client is None else client
        self.db = self.client.cache
        #self.db.webpage.create_index('timestamp', expireAfterSeconds=expires.total_seconds())  #设置自动删除时间
    def __call__(self,url):
        self.db.webpage.insert(url)
        #print("保存成功")
    def __contains__(self,url):
        try:
            self[url]
        except KeyError:
            return False
        else:
            return True
    def __getitem__(self, url):
        record = self.db.webpage.find_one({'_id':url})
        if record:
            return pickle.loads(zlib.decompress(record['result']))
        else:
            raise KeyError(url + 'dose not exist')
    def __setitem__(self, url, result):
        record = {'result': Binary(zlib.compress(pickle.dumps(result))), 'timestamp':datetime.utcnow()}
        self.db.webpage.update({'_id':url},{'$set':record},upsert=True)
    def clear(self):
        self.db.webpage.drop()

将一百万条网址从csv文件保存到数据库所花费的时间为

start working
working...
all was done!
总的信息条数: 1000000
总耗时: 427.4034459590912

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • QT5 Designer 打不开的问题及解决方法

    QT5 Designer 打不开的问题及解决方法

    这篇文章主要介绍了QT5 Designer 打不开的问题及解决方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-08-08
  • Python基于多线程实现ping扫描功能示例

    Python基于多线程实现ping扫描功能示例

    这篇文章主要介绍了Python基于多线程实现ping扫描功能,结合实例形式分析了Python多线程与进程相关模块调用操作技巧,需要的朋友可以参考下
    2018-07-07
  • postman发送文件请求并以python服务接收方式

    postman发送文件请求并以python服务接收方式

    这篇文章主要介绍了postman发送文件请求并以python服务接收方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-07-07
  • numba CUDA报错的问题解决

    numba CUDA报错的问题解决

    本文主要介绍了numba CUDA报错的问题解决,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-02-02
  • Pandas库中iloc[]函数的使用方法

    Pandas库中iloc[]函数的使用方法

    在数据分析过程中,很多时候需要从数据表中提取出相应的数据,而这么做的前提是需要先“索引”出这一部分数据,下面这篇文章主要给大家介绍了关于Pandas库中iloc[]函数的使用方法,需要的朋友可以参考下
    2023-01-01
  • pytorch部署到jupyter中的问题及解决方案

    pytorch部署到jupyter中的问题及解决方案

    这篇文章主要介绍了pytorch部署到jupyter中,在这里需要注意我再输入的时候出现了一些无法定位的提示,但是我的电脑没有影响使用jupyter,还是可以使用jupyter并且可以import torch,本文给大家讲解的非常详细,需要的朋友参考下吧
    2022-05-05
  • 使用python编写监听端

    使用python编写监听端

    这篇文章主要为大家详细介绍了使用python编写监听端,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-04-04
  • Python 实现 WebSocket 通信的过程详解

    Python 实现 WebSocket 通信的过程详解

    WebSocket是一种在Web应用程序中实现双向通信的协议,与传统的HTTP请求-响应模型不同,WebSocket允许服务器主动向客户端推送数据,实现实时性和互动性,这篇文章主要介绍了Python 实现 WebSocket 通信的过程详解,需要的朋友可以参考下
    2024-06-06
  • python使用OpenCV实现多目标跟踪

    python使用OpenCV实现多目标跟踪

    这篇文章主要介绍了python使用OpenCV实现多目标跟踪,如何在OpenCV中使用MultiTracker类实现多目标跟踪API。在深入了解详细信息之前,请查看下面列出的关于目标跟踪的帖子,以了解在OpenCV中实现的单个目标跟踪器的基础知识,需要的朋友可以参考一下
    2022-04-04
  • python 出现SyntaxError: non-keyword arg after keyword arg错误解决办法

    python 出现SyntaxError: non-keyword arg after keyword arg错误解决办

    这篇文章主要介绍了python 出现SyntaxError: non-keyword arg after keyword arg错误解决办法的相关资料,需要的朋友可以参考下
    2017-02-02

最新评论