Python大批量写入数据(百万级别)的方法
背景
现有一个百万行数据的csv格式文件,需要在两分钟之内存入数据库。
方案
方案一:多线程+协程+异步MySql方案二:多线程+MySql批量插入
代码
1,先通过pandas读取所有csv数据存入列表。
2,设置N个线程,将一百万数据均分为N份,以start,end传递给线程以切片的方法读取区间数据(建议为16个线程)
3,方案二 线程内以 executemany 方法批量插入所有数据。
4,方案一 线程内使用异步事件循环遍历所有数据异步插入。
5,方案一纯属没事找事型。
方案二
import threading import pandas as pd import asyncio import time import aiomysql import pymysql data=[] error_data=[] def run(start,end): global data global error_data print("start"+threading.current_thread().name) print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) mysdb = getDb("*", *, "*", "*", "*") cursor = mysdb.cursor() sql = """insert into *_*_* values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""" cursor.executemany(sql,data[start:end]) mysdb.commit() mysdb.close() print("end" + threading.current_thread().name) print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) def csv_file_read_use_pd(csvFile): csv_result = pd.read_csv(csvFile,encoding="utf-16",sep='\t') csv_result = csv_result.fillna(value="None") result = csv_result.values.tolist() return result class MyDataBase: def __init__(self,host=None,port=None,username=None,password=None,database=None): self.db = pymysql.connect(host=host,port=port,user=username,password=password,database=database) def close(self): self.db.close() def getDb(host,port,username,password,database): MyDb = MyDataBase(host, port, username, password,database) return MyDb.db def main(csvFile): global data #获取全局对象 csv全量数据 #读取所有的数据 将所有数据均分成 thread_lens 份 分发给 thread_lens 个线程去执行 thread_lens=20 csv_result=csv_file_read_use_pd(csvFile) day = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) for item in csv_result: item.insert(0,day) data=csv_result thread_exe_count_list=[] #线程需要执行的区间 csv_lens=len(csv_result) avg = csv_lens // thread_lens remainder=csv_lens % thread_lens # 0,27517 27517,55,034 nowIndex=0 for i in range(thread_lens): temp=[nowIndex,nowIndex+avg] nowIndex=nowIndex+avg thread_exe_count_list.append(temp) thread_exe_count_list[-1:][0][1]+=remainder #余数分给最后一个线程 # print(thread_exe_count_list) #th(thread_exe_count_list[0][0],thread_exe_count_list[0][1]) for i in range(thread_lens): sub_thread = threading.Thread(target=run,args=(thread_exe_count_list[i][0],thread_exe_count_list[i][1],)) sub_thread.start() sub_thread.join() time.sleep(3) if __name__=="__main__": #csv_file_read_use_pd("分公司箱型箱量.csv") main("分公司箱型箱量.csv")
方案一
import threading import pandas as pd import asyncio import time import aiomysql data=[] error_data=[] async def async_basic(loop,start,end): global data global error_data print("start"+threading.current_thread().name) print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) conn = await aiomysql.connect( host="*", port=*, user="*", password="*", db="*", loop=loop ) day = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) sql = """insert into **** values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""" async with conn.cursor() as cursor: for item in data[start:end]: params=[day] params.extend(item) try: x=await cursor.execute(sql,params) if x==0: error_data.append(item) print(threading.current_thread().name+" result "+str(x)) except Exception as e: print(e) error_data.append(item) time.sleep(10) pass await conn.close() #await conn.commit() #关闭连接池 # pool.close() # await pool.wait_closed() print("end" + threading.current_thread().name) print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) def csv_file_read_use_pd(csvFile): csv_result = pd.read_csv(csvFile,encoding="utf-16",sep='\t') csv_result = csv_result.fillna(value="None") result = csv_result.values.tolist() return result def th(start,end): loop = asyncio.new_event_loop() loop.run_until_complete(async_basic(loop,start,end)) def main(csvFile): global data #获取全局对象 csv全量数据 #读取所有的数据 将所有数据均分成 thread_lens 份 分发给 thread_lens 个线程去执行 thread_lens=20 csv_result=csv_file_read_use_pd(csvFile) data=csv_result thread_exe_count_list=[] #线程需要执行的区间 csv_lens=len(csv_result) avg = csv_lens // thread_lens remainder=csv_lens % thread_lens # 0,27517 27517,55,034 nowIndex=0 for i in range(thread_lens): temp=[nowIndex,nowIndex+avg] nowIndex=nowIndex+avg thread_exe_count_list.append(temp) thread_exe_count_list[-1:][0][1]+=remainder #余数分给最后一个线程 print(thread_exe_count_list) #th(thread_exe_count_list[0][0],thread_exe_count_list[0][1]) for i in range(thread_lens): sub_thread = threading.Thread(target=th,args=(thread_exe_count_list[i][0],thread_exe_count_list[i][1],)) sub_thread.start() time.sleep(3) if __name__=="__main__": #csv_file_read_use_pd("分公司箱型箱量.csv") main("分公司箱型箱量.csv")
总结
到此这篇关于Python大批量写入数据的文章就介绍到这了,更多相关Python大批量写入数据内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
Python实现滑动平均(Moving Average)的例子
今天小编就为大家分享一篇Python实现滑动平均(Moving Average)的例子,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧2019-08-08浅谈Python由__dict__和dir()引发的一些思考
这篇文章主要介绍了浅谈Python由__dict__和dir()引发的一些思考,具有一定参考价值,需要的朋友可以了解下。2017-10-10Python之time模块的时间戳,时间字符串格式化与转换方法(13位时间戳)
今天小编就为大家分享一篇Python之time模块的时间戳,时间字符串格式化与转换方法(13位时间戳),具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧2019-08-08
最新评论