Python实现动态生成系统数据库设计到Word文档
背景
经常需要交付一些系统文档而且基本都是word的,其中又有系统数据库介绍模块, 看着数据库里的几百张表于是我开始怀疑人生, 所以咱手写一个
涉及知识
- pymysql 操作数据库
- tkinter GUI图形库
- threading 线程
- queue 阻塞队列
- pandas 数据计算模型(DataFame)
- python-docx 操作word文档库
1、功能介绍
填写相关数据库信息后, 支持生成系统数据库设计到word文档节省写文档时间
- 支持按自定义SQL结果导出
- 支持导出所有表结构信息
- 支持导出数据库表清单
- 支持测试连接
界面如下:
1.1 自定义SQL导出
在文本框内输入sql,然后勾选导出模式为 自定义SQL, 最后点击导出,生成word文档如下。 会将该sq的执行结果导出到word的表格
在自定义SQL模式时, 如果勾选了对所有表执行一遍, 并且配置了模版变量 #{tableName}
. 则会遍历所有表执行一遍该模版SQL, 如下图。逻辑就是会先获取该数据库的所有表,然后遍历执行该模版sql并且替换模版变量#{tableName}
, 最后将每次sql的执行结果导出到word的表格里面。
最终结果:
1.2 导出所有表结构
勾选导出模式为表结构,生成word文档如下。 逻辑是导出数据库的所有的表结构到word的表格。 这个通过自定义SQL模式也可实现,现在不过相当于内置了一些常用的模式。
1.3 导出数据库表清单
勾选导出模式为表清单,生成word文档如下, 就是将数据库包含哪些表生成一份清单导出。
2、 代码实现
import queue import threading import time import traceback from tkinter import Tk, Button, messagebox, Label, Frame, Entry, IntVar, Radiobutton, StringVar, \ constants, filedialog, Text, font, Scrollbar, BooleanVar, Checkbutton from tkinter.font import Font from tkinter.messagebox import showinfo, showerror from tkinter.ttk import Progressbar from enum import Enum import logging from pymysql import OperationalError class MsgEnum(Enum): """ 消息类型枚举 """ START = 0 STOP = 1 EXIT = 3 class InputValue: def __init__(self, host, port, user, password, database, sql,calc_mode,isAllTable): self.host = host self.port = port self.user = user self.password = password self.database = database self.sql = sql self.calc_mode = calc_mode self.isAllTable = isAllTable class GuiTempldate: class Cache: RUNING = False def __init__(self) -> None: # 1、Gui消息队列 self.msg_center = MsgCenter(self) # 2、窗口设置 self.root = Tk() self.root.title("数据库导出工具") # self.root.wm_resizable(False, False) # 设置窗口大小不可拉伸 self.root.geometry('600x700+500+200') # 设置窗口: 宽 x 高 + 窗口位置x坐标 + 窗口位置y坐标 self.root.protocol("WM_DELETE_WINDOW", self.close_event) # 4、初始化各个组件和布局 self.initGui() def initGui(self): # 1- 标签 text_str = """版本: 1.0.0 #author burukeyou #说明: 1) 支持导出数据库所有表结构信息 2)支持导出数据库表清单 3)支持自定义sql导出 """ Label(self.root, text=text_str, justify='left', fg='red').pack(anchor=constants.W) # 4- 文本框 fm02 = Frame(self.root) self.ip_address = StringVar(value="localhost:3306") fm02.pack(anchor=constants.W, fill=constants.X) Label(fm02, text='服务器地址 ').pack(side=constants.LEFT) self.name_btn = Entry(fm02, width=40, textvariable=self.ip_address) self.name_btn.pack(side=constants.RIGHT) # 4- 文本框 fm02 = Frame(self.root) self.user_name = StringVar(value="root") fm02.pack(anchor=constants.W, fill=constants.X) Label(fm02, text='帐号').pack(side=constants.LEFT) self.name_btn = Entry(fm02, width=40, textvariable=self.user_name) self.name_btn.pack(side=constants.RIGHT) # 4- 文本框 fm02 = Frame(self.root) self.password = StringVar(value="123456") fm02.pack(anchor=constants.W, fill=constants.X) Label(fm02, text='密码').pack(side=constants.LEFT) self.name_btn = Entry(fm02, width=40, textvariable=self.password) self.name_btn.pack(side=constants.RIGHT) # 4- 文本框 fm02 = Frame(self.root) self.database = StringVar(value="") fm02.pack(anchor=constants.W, fill=constants.X) Label(fm02, text='数据库').pack(side=constants.LEFT) self.name_btn = Entry(fm02, width=40, textvariable=self.database) self.name_btn.pack(side=constants.RIGHT) # 3- 单选框 self.mode_var = IntVar(value=1) fm01 = Frame(self.root) # , bg='blue' fm01.pack(anchor=constants.W) Label(fm01, text='导出模式').grid(row=0, column=0, sticky='W') Radiobutton(fm01, text='表结构', variable=self.mode_var, value=1).grid(row=0, column=1, sticky='E', padx=40) Radiobutton(fm01, text='表清单', variable=self.mode_var, value=2).grid(row=0, column=2, sticky='E', padx=40) Radiobutton(fm01, text='自定义SQL', variable=self.mode_var, value=3).grid(row=0, column=3, sticky='E', padx=40) # 5- 勾选框 self.is_all_flag = BooleanVar() self.is_all_table_btn = Checkbutton(self.root, variable=self.is_all_flag, text='所有表执行一遍(自定义SQL可使用模版变量#{tableName})', onvalue=True, offvalue=False) self.is_all_table_btn.pack(anchor=constants.W) # 9- 多行文本框 Label(self.root, text='自定义SQL输入').pack(anchor=constants.W) fm22 = Frame(self.root) fm22.pack(anchor=constants.W, fill=constants.X) scroll = Scrollbar(fm22) scroll.pack(side=constants.RIGHT, fill=constants.Y) ft = Font(family='宋体', size=14, weight=font.BOLD) self.text_btn = Text(fm22, height=12, fg="black", font=ft, bg="white", insertbackground="black") self.text_btn.pack(side=constants.LEFT) # text 联动 scroll scroll.config(command=self.text_btn.yview) self.text_btn.config(yscrollcommand=scroll.set) # 进度条 self.progressbar = Progressbar(self.root, length=600, mode="determinate", orient=constants.HORIZONTAL) self.progressbar.pack(anchor=constants.W) self.progressbar['maximum'] = 100 # 10、开始结束按钮 fm06 = Frame(self.root) fm06.pack(side=constants.BOTTOM) self.start_btn = Button(fm06, text="导出", width=6, height=1, command=self.start_event) self.start_btn.grid(row=0, column=0, sticky='W', padx=20, pady=20) self.stop_btn = Button(fm06, text="停止", width=6, height=1, command=self.stop_event) self.stop_btn.grid(row=0, column=1, sticky='E', padx=20, pady=20) self.test_btn = Button(fm06, text="测试连接", width=6, height=1, command=self.test_connection) self.test_btn.grid(row=0, column=2, sticky='E', padx=20, pady=20) def start_event(self): self.msg_center.put(MsgEnum.START) self.Cache.RUNING = True self.main_calc_thread: threading.Thread = threading.Thread(target=start_clac, args=(self,)) self.main_calc_thread.start() def stop_event(self): self.msg_center.put(MsgEnum.STOP) self.Cache.RUNING = False def close_event(self): # 关闭窗口 if self.Cache.RUNING and not messagebox.askokcancel("警告", "任务还在执行中,确定要关闭吗?"): return self.root.destroy() self.msg_center.put(MsgEnum.EXIT) def test_connection(self) -> bool: input_value = self.getInputValue() if not checkParam(input_value): return False conn = None try: conn = connectDB(input_value) showinfo(message="测试连接成功") return True except OperationalError as e: showerror(message="测试连接数据库失败,请确认数据库信息是否正确。 \n" + str(e)) finally: if conn: conn.close() def showUI(self): # 启动消息队列 threading.Thread(target=self.msg_center.mainloop).start() # 这个方法会循环阻塞住,监听gui的事件,得放到主线程最后 self.root.mainloop() def getInputValue(self) -> InputValue: ipaddress: str = self.ip_address.get().strip() if ipaddress == "" or ipaddress.find(":") < 0: showerror(message="请输入正确的ip地址") return ip = ipaddress.split(":")[0].strip() port = ipaddress.split(":")[1].strip() database = self.database.get().strip() user_name = self.user_name.get().strip() password = self.password.get().strip() calc_mode = self.mode_var.get() custom_sql = self.text_btn.get("1.0", "end-1c").strip() isAllTable: bool = self.is_all_flag.get() return InputValue(ip, port, user_name, password,database, custom_sql,calc_mode,isAllTable) def start_clac(gui: GuiTempldate): try: # 实际计算任务 start_demo(gui) finally: # 发布结束事件 gui.stop_event() class MsgCenter: """ 消息队列 主要处理窗口控件消息 """ def __init__(self, obj: GuiTempldate) -> None: self.queue = queue.Queue() self.obj = obj def put(self, msg: Enum): self.queue.put(msg) def mainloop(self): while True: try: # 阻塞获取消息 msg = self.queue.get() print("消费消息: {}".format(msg)) if msg == MsgEnum.START: MsgStrategy.start_strategy(self.obj) elif msg == MsgEnum.STOP: MsgStrategy.stop_strategy(self.obj) elif msg == MsgEnum.EXIT: break else: pass except queue.Empty: traceback.print_exc() class MsgStrategy: @classmethod def start_strategy(cls, gui: GuiTempldate): gui.start_btn.config(state=constants.DISABLED) gui.stop_btn.config(state=constants.NORMAL) # gui.progressbar['value'] = 0 for i in range(100): time.sleep(0.1) gui.progressbar['value'] += 1 if i >= 50: gui.progressbar['value'] = 100 gui.root.update() break gui.root.update() @classmethod def stop_strategy(cls, gui: GuiTempldate): gui.start_btn.config(state=constants.NORMAL) gui.stop_btn.config(state=constants.DISABLED) # ===============================业务逻辑 ====================================================== from typing import List import pymysql import pandas as pd from docx import Document from docx.oxml.ns import nsdecls from docx.oxml import parse_xml from datetime import datetime from docx.table import Table, _Cell def checkParam(input_value: InputValue) -> bool: if input_value.database == "" or input_value.user == "" or input_value.password == "" or input_value.host == "" or input_value.port == "": showerror(message="请填写完整的数据库连接参数") return False return True def connectDB(input_value: InputValue): return pymysql.connect(host=input_value.host, user=input_value.user, password=input_value.password, db=input_value.database, port=int(input_value.port), charset="utf8") def start_demo(gui: GuiTempldate): """ 自定义计算任务 :param gui: gui组件对象 :return: """ input_value = gui.getInputValue() # 校验参数 if not checkParam: return conn = None try: conn: pymysql.Connection = connectDB(input_value) print("建立连接成功") download(conn, input_value) except OperationalError as e: logging.error(e) showerror(message="连接数据库失败,请确认数据库信息是否正确") except Exception as e1: # 打印堆栈信息 e1.with_traceback(None) showerror(message="导出失败") finally: if conn is not None: conn.close() print("关闭连接成功") """ 表结构SQL """ def tableSchemaSql(table_name,dataBase): return f""" select ORDINAL_POSITION 序号, COLUMN_NAME 字段名, COLUMN_TYPE 字段类型, -- DATA_TYPE 字段类型, -- CHARACTER_MAXIMUM_LENGTH 长度, -- IS_NULLABLE 是否为空, case when COLUMN_DEFAULT is null then '无' else COLUMN_DEFAULT end 默认值, COLUMN_COMMENT 字段说明 FROM INFORMATION_SCHEMA.COLUMNS where table_schema ='{dataBase}' and table_name = '{table_name}'; """ """ 数据库表清单SQL """ def databaseTableListSql(database): return f"""select table_name as '表名',TABLE_COMMENT '表中文名称','业务表',TABLE_COMMENT '说明' from information_schema.tables where table_schema='{database}' and LOWER(table_type) ='base table' """ def readAllTableToDataFrame(conn,dataBase) -> dict: # 获取所有表的信息 tables = pd.read_sql_query("SHOW TABLES;", conn) df_map = {} for _, table in tables.iterrows(): table_name = table[0] if table_name is None or table_name == "": continue sql = tableSchemaSql(table_name,dataBase) df = pd.read_sql_query(sql, conn) df = df.reset_index(drop=True) df.index = df.index + 1 df_map[table_name] = df return df_map def readDataBaseTableListToDataFrame(conn, dataBase): df_map = {} sql = databaseTableListSql(dataBase) df = pd.read_sql_query(sql, conn) df = df.reset_index(drop=True) df.index = df.index + 1 df_map["tmp"] = df return df_map def readByCustomSqlToDataFrame(conn, input_value): # sql模版 sql = input_value.sql # 获取所有表的信息 tables = pd.read_sql_query("SHOW TABLES;", conn) df_map = {} for _, table in tables.iterrows(): table_name = table[0] if table_name is None or table_name == "": continue # 替换变量名 #{taleName} tmp_sql = sql.replace("#{tableName}", table_name) df = pd.read_sql_query(tmp_sql, conn) df = df.reset_index(drop=True) df.index = df.index + 1 df_map[table_name] = df return df_map def downloadToWord(df_map, save_path, heading): # 创建Word文档 doc = Document() for table_name in df_map: df = df_map[table_name] # 添加标题 if heading: doc.add_heading(heading, level=3) else: doc.add_heading(f"【{table_name}表】", level=3) # 转成list表格数据 table_data: List = [df.columns.tolist()] + df.values.tolist() # 创建 rows x col 表格 table: Table = doc.add_table(rows=len(table_data), cols=len(table_data[0])) # 设置表格样式 table.style = "Table Grid" # 设置表格字体和大小 #table.style.font.name = "宋体" #table.style.font.size = 5 # 填充表格内容 for i, row in enumerate(table_data): for j, value in enumerate(row): cell: _Cell = table.cell(i, j) cell.text = str(value) # 设置第一行 if i == 0: # 设置文本加粗 for run in cell.paragraphs[0].runs: run.bold = True # 设置第一行背景色为浅灰色D9D9D9 for i, cell in enumerate(table.rows[0].cells): shading_elm_1 = parse_xml(r'<w:shd {} w:fill="D9D9D9"/>'.format(nsdecls('w'))) cell._tc.get_or_add_tcPr().append(shading_elm_1) # 保存Word文档 doc.save(save_path) print("导出" + save_path + "成功") def download(conn, input_value): dataBase = input_value.database calc_mode = input_value.calc_mode custom_sql = input_value.sql # 读取所有表结构到DataFrame heading = None df_map = {} if calc_mode == 1: # 所有表结构 print("执行模式1") df_map = readAllTableToDataFrame(conn, dataBase) elif calc_mode == 2: # 表清单 print("执行模式2") df_map = readDataBaseTableListToDataFrame(conn, dataBase) heading = f"【{dataBase}数据库表清单】" else: # 自定义sql if custom_sql is None or custom_sql == "": showinfo("", "请输入自定义SQL") return # 配置了都执行一遍并且sql模版真中包含#{tableName}变量 if input_value.isAllTable == True and custom_sql.find("#{tableName}") != -1: print("自定义SQL对所有表执行一遍") df_map = readByCustomSqlToDataFrame(conn, input_value) else: df = pd.read_sql_query(custom_sql, conn) df_map["tmp"] = df heading = f"【{dataBase}自定义SQL数据】" # 选择导出位置 now = datetime.now().strftime("%y%m%d%H%M%S") file_name = dataBase + f"数据库数据-{now}.docx" save_path = filedialog.asksaveasfilename(title=u'保存文件', filetypes=[("word文件", ".docx")], initialfile=file_name) print("保存位置目录", save_path) if save_path == "" or save_path is None: return # 导出到word downloadToWord(df_map, save_path, heading) showinfo("", "导出成功") if __name__ == '__main__': gui = GuiTempldate() gui.showUI()
代码说明:
GuiTempldate
: 将所有的图形按钮控件等都封装成该对象, 一个该对象代表一个窗口, 并且封装相关按钮事件MsgCenter
: 消息队列, 每个GuiTempldate
内部都对应一个消息队列, 用作异步,解耦主流程事件防止阻塞只能串行
执行事件, 打个比方因为一般窗口的按钮都是都是可以随便点的,不可能你点完这个按钮后触发了一些事件,然后你必须等这个事件执行完才能操作下一个按钮。 说白类似每次用一个新线程去异步执行, 用消息队列不过是线程池的效果。- 点击导出后实际执行的方法是
start_clac
, 然后里面逻辑比较简单都是导出的相关业务逻辑
以上就是Python实现动态生成系统数据库设计到Word文档的详细内容,更多关于Python生成数据库设计到Word的资料请关注脚本之家其它相关文章!
相关文章
Selenium Webdriver元素定位的八种常用方式(小结)
这篇文章主要介绍了Selenium Webdriver元素定位的八种常用方式(小结),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧2021-01-01python中的tkinter库弹窗messagebox详解
这篇文章主要介绍了python中的tkinter库弹窗messagebox,包括消息提示框、消息警告框、错误消息框,通过代码给大家介绍的非常详细,需要的朋友可以参考下2021-06-06Python使用mongodb保存爬取豆瓣电影的数据过程解析
这篇文章主要介绍了Python使用mongodb保存爬取豆瓣电影的数据过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下2019-08-08
最新评论