Python实现将MongoDB中的数据导入到MySQL

 更新时间:2023年05月04日 14:11:31   作者:小小鸟爱吃辣条  
这篇文章主要为大家详细介绍了如何通过Python封装一个将 MongoDB 中的数据导入到 MySQL 中的 Python 工具类 MongoToMysql,感兴趣的可以了解一下

本文主要介绍了一个将 MongoDB 中的数据导入到 MySQL 中的 Python 工具类 MongoToMysql。该工具类实现了获取 MongoDB 数据类型、创建 MySQL 表结构以及将数据从 MongoDB 推送到 MySQL 等功能。

通过该工具类,用户可以轻松地将 MongoDB 中的数据导入到 MySQL 中,实现数据的转移和使用。

使用该工具类,用户需要传入相应的参数,包括 MongoDB 的连接信息,MySQL 的连接信息,以及表名、是否设置最大长度、批处理大小和表描述等信息。具体使用可以参考代码中的注释。

实现代码

import pymysql
from loguru import logger

class MongoToMysql:
    def __init__(self, mongo_host, mongo_port, mongo_db, mongo_collection, mysql_host, mysql_port, mysql_user,
                 mysql_password, mysql_db,table_name=None,set_max_length=False,batch_size=10000,table_description=''):
        self.mongo_host = mongo_host
        self.mongo_port = mongo_port
        self.mongo_db = mongo_db
        self.mongo_collection = mongo_collection
        self.mysql_host = mysql_host
        self.mysql_port = mysql_port
        self.mysql_user = mysql_user
        self.mysql_password = mysql_password
        self.mysql_db = mysql_db
        self.table_name = table_name
        self.set_max_length = set_max_length
        self.batch_size = batch_size
        self.table_description = table_description
        self.data_types = self.get_mongo_data_types()
        self.create_mysql_table(self.data_types,set_max_length= self.set_max_length,table_description=self.table_description)
        self.push_data_to_mysql(self.batch_size)

    def get_mongo_data_types(self):
        logger.debug('正在获取mongo中字段的类型!')
        client = pymongo.MongoClient(host=self.mongo_host, port=self.mongo_port)
        db = client[self.mongo_db]
        collection = db[self.mongo_collection]
        data_types = {}
        for field in collection.find_one().keys():
            data_types[field] = type(collection.find_one()[field]).__name__
        return data_types

    def check_mysql_table_exists(self):
        logger.debug('检查是否存在该表,有则删之!')
        conn = pymysql.connect(host=self.mysql_host, port=self.mysql_port, user=self.mysql_user,
                               password=self.mysql_password, db=self.mysql_db)
        cursor = conn.cursor()
        sql = f"DROP TABLE IF EXISTS {self.mongo_collection}"
        cursor.execute(sql)
        conn.commit()
        conn.close()

    def get_max_length(self, field):
        logger.debug(f'正在获取字段 {field} 最大长度......')
        client = pymongo.MongoClient(host=self.mongo_host, port=self.mongo_port)
        db = client[self.mongo_db]
        collection = db[self.mongo_collection]
        max_length = 0
        for item in collection.find({},{field:1,'_id':0}):
            value = item.get(field)
            if isinstance(value, str) and len(value) > max_length:
                max_length = len(value)
        return max_length

    def create_mysql_table(self, data_types,set_max_length,table_description):
        logger.debug('正在mysql中创建表结构!')
        self.check_mysql_table_exists()
        conn = pymysql.connect(host=self.mysql_host, port=self.mysql_port, user=self.mysql_user,
                               password=self.mysql_password, db=self.mysql_db)
        cursor = conn.cursor()
        if self.table_name:
            table_name = self.table_name
        else:
            table_name = self.mongo_collection
        fields = []
        for field, data_type in data_types.items():
            if data_type == 'int':
                fields.append(f"{field} INT")
            elif data_type == 'float':
                fields.append(f"{field} FLOAT")
            elif data_type == 'bool':
                fields.append(f"{field} BOOLEAN")
            else:
                if set_max_length:
                    fields.append(f"{field} TEXT)")
                else:
                    max_length = self.get_max_length(field)
                    fields.append(f"{field} VARCHAR({max_length + 200})")
        fields_str = ','.join(fields)
        sql = f"CREATE TABLE {table_name} (id INT PRIMARY KEY AUTO_INCREMENT,{fields_str}) COMMENT='{table_description}'"
        cursor.execute(sql)
        conn.commit()
        conn.close()


    def push_data_to_mysql(self, batch_size=10000):
        logger.debug('--- 正在准备从mongo中每次推送10000条数据到mysql ----')
        client = pymongo.MongoClient(host=self.mongo_host, port=self.mongo_port)
        db = client[self.mongo_db]
        collection = db[self.mongo_collection]
        conn = pymysql.connect(host=self.mysql_host, port=self.mysql_port, user=self.mysql_user,
                               password=self.mysql_password, db=self.mysql_db)
        cursor = conn.cursor()
        if self.table_name:
            table_name = self.table_name
        else:
            table_name = self.mongo_collection
        # table_name = self.mongo_collection
        data = []
        count = 0
        for item in collection.find():
            count += 1
            row = []
            for field, data_type in self.data_types.items():
                value = item.get(field)
                if value is None:
                    row.append(None)
                elif data_type == 'int':
                    row.append(str(item.get(field, 0)))
                elif data_type == 'float':
                    row.append(str(item.get(field, 0.0)))
                elif data_type == 'bool':
                    row.append(str(item.get(field, False)))
                else:
                    row.append(str(item.get(field, '')))
            data.append(row)
            if count % batch_size == 0:
                placeholders = ','.join(['%s'] * len(data[0]))
                sql = f"INSERT INTO {table_name} VALUES (NULL,{placeholders})"
                cursor.executemany(sql, data)
                conn.commit()
                data = []
                logger.debug(f'--- 已完成推送:{count} 条数据! ----')
        if data:
            placeholders = ','.join(['%s'] * len(data[0]))
            sql = f"INSERT INTO {table_name} VALUES (NULL,{placeholders})"
            cursor.executemany(sql, data)
            conn.commit()
            logger.debug(f'--- 已完成推送:{count} 条数据! ----')
        conn.close()


if __name__ == '__main__':
    """MySQL"""
    mongo_host = '127.0.0.1'
    mongo_port = 27017
    mongo_db = 'db_name'
    mongo_collection = 'collection_name'

    """MongoDB"""
    mysql_host = '127.0.0.1'
    mysql_port = 3306
    mysql_user = 'root'
    mysql_password = '123456'
    mysql_db = 'mysql_db'

    table_description = ''  # 表描述

    mongo_to_mysql = MongoToMysql(mongo_host, mongo_port, mongo_db, mongo_collection, mysql_host, mysql_port,
                                  mysql_user, mysql_password, mysql_db,table_description=table_description)


    #
    # table_name = None  # 默认为None 则MySQL中的表名跟Mongo保持一致
    # set_max_length = False # 默认为False 根据mongo中字段最大长度 加200 来设置字段的VARCHART长度 , 否则定义TEXT类型
    # batch_size = 10000   # 控制每次插入数据量的大小
    # table_description = '' # 表描述
    # mongo_to_mysql = MongoToMysql(mongo_host, mongo_port, mongo_db, mongo_collection, mysql_host, mysql_port,
    #                               mysql_user, mysql_password, mysql_db,table_name,set_max_length,batch_size,table_description)

代码使用了 PyMongo、PyMySQL 和 Loguru 等 Python 库,并封装了一个 MongoToMysql 类。在类的初始化时,会自动获取 MongoDB 中字段的类型,并根据字段类型创建 MySQL 表结构。在将数据从 MongoDB 推送到 MySQL 时,还可以控制每次插入数据量的大小,以避免一次性插入大量数据造成系统崩溃或性能下降。

需要注意的是,在创建 MySQL 表结构时,如果用户选择了设置最大长度,则会创建 TEXT 类型的字段,否则会根据 MongoDB 中字段的最大长度加上200来设置 VARCHAR 类型的字段长度。

总之,本文介绍的 MongoToMysql 工具类非常方便实用,对于需要将 MongoDB 数据迁移到 MySQL 的用户来说,是一种很好的解决方案。

到此这篇关于Python实现将MongoDB中的数据导入到MySQL的文章就介绍到这了,更多相关Python MongoDB数据导入到MySQL内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 教你使用Python根据模板批量生成docx文档

    教你使用Python根据模板批量生成docx文档

    这篇文章主要介绍了教你使用Python根据模板批量生成docx文档,文中有非常详细的代码示例,对正在学习python的小伙伴们有很好地帮助,需要的朋友可以参考下
    2021-05-05
  • Python中单、双下划线的区别总结

    Python中单、双下划线的区别总结

    这篇文章主要给大家介绍了关于Python中单、双下划线区别的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧。
    2017-12-12
  • Python调用PC摄像头实现扫描二维码

    Python调用PC摄像头实现扫描二维码

    PC摄像机扫描二维码的应用场景很广泛,可以应用于各种需要快速扫描、识别和管理的场景,本文就来具体讲讲如何用Python实现这一功能吧
    2023-05-05
  • python使用xlsx和pandas处理Excel表格的操作步骤

    python使用xlsx和pandas处理Excel表格的操作步骤

    python的神器pandas库就可以非常方便地处理excel,csv,矩阵,表格 等数据,下面这篇文章主要给大家介绍了关于python使用xlsx和pandas处理Excel表格的操作步骤,文中通过图文介绍的非常详细,需要的朋友可以参考下
    2023-01-01
  • 详解Python Matplot中文显示完美解决方案

    详解Python Matplot中文显示完美解决方案

    这篇文章主要介绍了Python Matplot中文显示完美解决方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-03-03
  • 详解Python如何优雅的重试

    详解Python如何优雅的重试

    这篇文章主要为大家介绍了Python如何优雅的重试详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-07-07
  • Python xpath,JsonPath,bs4的基本使用

    Python xpath,JsonPath,bs4的基本使用

    这篇文章主要介绍了Python xpath,JsonPath,bs4的基本使用,文章围绕主题展开详细的内容介绍,具有一定的参考价值,感兴趣的小伙伴可以参考一下
    2022-07-07
  • python实现决策树、随机森林的简单原理

    python实现决策树、随机森林的简单原理

    这篇文章主要为大家详细介绍了python实现决策树、随机森林的简单原理,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-03-03
  • Python中方法定义与方法调用举例详解

    Python中方法定义与方法调用举例详解

    在Python中,方法调用是编写程序时经常会涉及到的一个重要概念,下面这篇文章主要给大家介绍了关于Python中方法定义与方法调用的相关资料,文章通过代码介绍的非常详细,需要的朋友可以参考下
    2024-06-06
  • Python实现的绘制三维双螺旋线图形功能示例

    Python实现的绘制三维双螺旋线图形功能示例

    这篇文章主要介绍了Python实现的绘制三维双螺旋线图形功能,结合实例形式分析了Python使用matplotlib、numpy模块进行数值运算及图形绘制相关操作技巧,需要的朋友可以参考下
    2018-06-06

最新评论