Python封装数据库连接池详解

 更新时间:2022年06月20日 15:56:47   作者:傲娇的喵酱  
这篇文章主要介绍了Python封装数据库连接池详解,文章围绕主题相关内容展开详细的内容介绍,具有一定的参考价值,需要的小伙伴可以参考一下

前言:

线程安全问题:当2个线程同时用到线程池时,会同时创建2个线程池。如果多个线程,错开用到线程池,就只会创建一个线程池,会共用一个线程池。我用的注解方式的单例模式,感觉就是这个注解的单例方式,解决了多线程问题,但是没解决线程安全问题,需要优化这个单例模式。

主要通过 PooledDB 模块实现。

一、数据库封装

1.1数据库基本配置

db_config.py

# -*- coding: UTF-8 -*-
import pymysql
 
# 数据库信息
DB_TEST_HOST = "127.0.0.1"
DB_TEST_PORT = 3308
DB_TEST_DBNAME = "bt"
DB_TEST_USER = "root"
DB_TEST_PASSWORD = "123456"
 
# 数据库连接编码
DB_CHARSET = "utf8"
# mincached : 启动时开启的闲置连接数量(缺省值 0 开始时不创建连接)
DB_MIN_CACHED = 5
# maxcached : 连接池中允许的闲置的最多连接数量(缺省值 0 代表不闲置连接池大小)
DB_MAX_CACHED = 0
# maxshared : 共享连接数允许的最大数量(缺省值 0 代表所有连接都是专用的)如果达到了最大数量,被请求为共享的连接将会被共享使用
DB_MAX_SHARED = 5
# maxconnecyions : 创建连接池的最大数量(缺省值 0 代表不限制)
DB_MAX_CONNECYIONS = 300
# blocking : 设置在连接池达到最大数量时的行为(缺省值 0 或 False 代表返回一个错误<toMany......> 其他代表阻塞直到连接数减少,连接被分配)
DB_BLOCKING = True
# maxusage : 单个连接的最大允许复用次数(缺省值 0 或 False 代表不限制的复用).当达到最大数时,连接会自动重新连接(关闭和重新打开)
DB_MAX_USAGE = 0
# setsession : 一个可选的SQL命令列表用于准备每个会话,如["set datestyle to german", ...]
DB_SET_SESSION = None

# creator : 使用连接数据库的模块
DB_CREATOR = pymysql

设置连接池最大最小为5个。则启动连接池时,就会建立5个连接。

1.2 编写单例模式注解

singleton.py

#单例模式函数,用来修饰类
def singleton(cls,*args,**kw):
    instances = {}
    def _singleton():
        if cls not in instances:
            instances[cls] = cls(*args,**kw)
        return instances[cls]
    return _singleton

1.3 构建连接池

db_dbutils_init.py

from dbutils.pooled_db import PooledDB
import db_config as config
# import random

from singleton import singleton
"""
@功能:创建数据库连接池
"""

class MyConnectionPool(object):
    # 私有属性
    # 能通过对象直接访问,但是可以在本类内部访问;
    __pool = None
 
    # def __init__(self):
    #     self.conn = self.__getConn()
    #     self.cursor = self.conn.cursor()
 
    # 创建数据库连接conn和游标cursor
    def __enter__(self):
        self.conn = self.__getconn()
        self.cursor = self.conn.cursor()
 
    # 创建数据库连接池
    def __getconn(self):
        if self.__pool is None:
            # i = random.randint(1, 100)
            # print("创建线程池的数量"+str(i))
            self.__pool = PooledDB(
                creator=config.DB_CREATOR,
                mincached=config.DB_MIN_CACHED,
                maxcached=config.DB_MAX_CACHED,
                maxshared=config.DB_MAX_SHARED,
                maxconnections=config.DB_MAX_CONNECYIONS,
                blocking=config.DB_BLOCKING,
                maxusage=config.DB_MAX_USAGE,
                setsession=config.DB_SET_SESSION,
                host=config.DB_TEST_HOST,
                port=config.DB_TEST_PORT,
                user=config.DB_TEST_USER,
                passwd=config.DB_TEST_PASSWORD,
                db=config.DB_TEST_DBNAME,
                use_unicode=False,
                charset=config.DB_CHARSET
            )
        return self.__pool.connection()
 
    # 释放连接池资源
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.cursor.close()
        self.conn.close()
    # 关闭连接归还给链接池
    # def close(self):
    #     self.cursor.close()
    #     self.conn.close()
 
    # 从连接池中取出一个连接
    def getconn(self):
        conn = self.__getconn()
        cursor = conn.cursor()
        return cursor, conn
# 获取连接池,实例化
@singleton
def get_my_connection():
    return MyConnectionPool()

1.4 封装Python操作MYSQL的代码

mysqlhelper.py

import time
from db_dbutils_init import get_my_connection
"""执行语句查询有结果返回结果没有返回0;增/删/改返回变更数据条数,没有返回0"""
class MySqLHelper(object):
    def __init__(self):
        self.db = get_my_connection()  # 从数据池中获取连接
    #
    # def __new__(cls, *args, **kwargs):
    #     if not hasattr(cls, 'inst'):  # 单例
    #         cls.inst = super(MySqLHelper, cls).__new__(cls, *args, **kwargs)
    #     return cls.inst
 
    # 封装执行命令
    def execute(self, sql, param=None, autoclose=False):
        """
        【主要判断是否有参数和是否执行完就释放连接】
        :param sql: 字符串类型,sql语句
        :param param: sql语句中要替换的参数"select %s from tab where id=%s" 其中的%s就是参数
        :param autoclose: 是否关闭连接
        :return: 返回连接conn和游标cursor
        """
        cursor, conn = self.db.getconn()  # 从连接池获取连接
        count = 0
        try:
            # count : 为改变的数据条数
            if param:
                count = cursor.execute(sql, param)
            else:
                count = cursor.execute(sql)
            conn.commit()
            if autoclose:
                self.close(cursor, conn)
        except Exception as e:
            pass
        return cursor, conn, count
 
    # 释放连接
    def close(self, cursor, conn):
        """释放连接归还给连接池"""
        cursor.close()
        conn.close()
 
    # 查询所有
    def selectall(self, sql, param=None):
        cursor = None
        conn = None
        count = None
        try:
            cursor, conn, count = self.execute(sql, param)
            res = cursor.fetchall()
            return res
        except Exception as e:
            print(e)
            self.close(cursor, conn)
            return count
 
    # 查询单条
    def selectone(self, sql, param=None):
        cursor = None
        conn = None
        count = None
        try:
            cursor, conn, count = self.execute(sql, param)
            res = cursor.fetchone()
            self.close(cursor, conn)
            return res
        except Exception as e:
            print("error_msg:", e.args)
            self.close(cursor, conn)
            return count
 
    # 增加
    def insertone(self, sql, param):
        cursor = None
        conn = None
        count = None
        try:
            cursor, conn, count = self.execute(sql, param)
            # _id = cursor.lastrowid()  # 获取当前插入数据的主键id,该id应该为自动生成为好
            conn.commit()
            self.close(cursor, conn)
            return count
        except Exception as e:
            print(e)
            conn.rollback()
            self.close(cursor, conn)
            return count
 
    # 增加多行
    def insertmany(self, sql, param):
        """
        :param sql:
        :param param: 必须是元组或列表[(),()]或((),())
        :return:
        """
        cursor, conn, count = self.db.getconn()
        try:
            cursor.executemany(sql, param)
            conn.commit()
            return count
        except Exception as e:
            print(e)
            conn.rollback()
            self.close(cursor, conn)
            return count
 
    # 删除
    def delete(self, sql, param=None):
        cursor = None
        conn = None
        count = None
        try:
            cursor, conn, count = self.execute(sql, param)
            self.close(cursor, conn)
            return count
        except Exception as e:
            print(e)
            conn.rollback()
            self.close(cursor, conn)
            return count
 
    # 更新
    def update(self, sql, param=None):
        cursor = None
        conn = None
        count = None
        try:
            cursor, conn, count = self.execute(sql, param)
            conn.commit()
            self.close(cursor, conn)
            return count
        except Exception as e:
            print(e)
            conn.rollback()
            self.close(cursor, conn)
            return count
# if __name__ == '__main__':
#     db = MySqLHelper()
#     sql = "SELECT SLEEP(10)"
#     db.execute(sql)
#     time.sleep(20)
 
 
    # TODO 查询单条
    # sql1 = 'select * from userinfo where name=%s'
    # args = 'python'
    # ret = db.selectone(sql=sql1, param=args)
    # print(ret)  # (None, b'python', b'123456', b'0')
 
    # TODO 增加单条
    # sql2 = 'insert into hotel_urls(cname,hname,cid,hid,url) values(%s,%s,%s,%s,%s)'
    # ret = db.insertone(sql2, ('1', '2', '1', '2', '2'))
    # print(ret)
 
    # TODO 增加多条
    # sql3 = 'insert into userinfo (name,password) VALUES (%s,%s)'
    # li = li = [
    #     ('分省', '123'),
    #     ('到达','456')
    # ]
    # ret = db.insertmany(sql3,li)
    # print(ret)
 
    # TODO 删除
    # sql4 = 'delete from  userinfo WHERE name=%s'
    # args = 'xxxx'
    # ret = db.delete(sql4, args)
    # print(ret)
 
    # TODO 更新
    # sql5 = r'update userinfo set password=%s WHERE name LIKE %s'
    # args = ('993333993', '%old%')
    # ret = db.update(sql5, args)
    # print(ret)

二、连接池测试

修改 db_dbutils_init.py 文件,在创建连接池def __getconn(self):方法下,加一个打印随机数,方便将来我们定位是否时单例的线程池。

 修改后的db_dbutils_init.py 文件:

from dbutils.pooled_db import PooledDB
import db_config as config
import random
from singleton import singleton
 
"""
@功能:创建数据库连接池
"""
class MyConnectionPool(object):
    # 私有属性
    # 能通过对象直接访问,但是可以在本类内部访问;
    __pool = None
 
    # def __init__(self):
    #     self.conn = self.__getConn()
    #     self.cursor = self.conn.cursor()
 
    # 创建数据库连接conn和游标cursor
    def __enter__(self):
        self.conn = self.__getconn()
        self.cursor = self.conn.cursor()
 
    # 创建数据库连接池
    def __getconn(self):
        if self.__pool is None:
            i = random.randint(1, 100)
            print("线程池的随机数"+str(i))
            self.__pool = PooledDB(
                creator=config.DB_CREATOR,
                mincached=config.DB_MIN_CACHED,
                maxcached=config.DB_MAX_CACHED,
                maxshared=config.DB_MAX_SHARED,
                maxconnections=config.DB_MAX_CONNECYIONS,
                blocking=config.DB_BLOCKING,
                maxusage=config.DB_MAX_USAGE,
                setsession=config.DB_SET_SESSION,
                host=config.DB_TEST_HOST,
                port=config.DB_TEST_PORT,
                user=config.DB_TEST_USER,
                passwd=config.DB_TEST_PASSWORD,
                db=config.DB_TEST_DBNAME,
                use_unicode=False,
                charset=config.DB_CHARSET
            )
        return self.__pool.connection()
 
    # 释放连接池资源
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.cursor.close()
        self.conn.close()
 
    # 关闭连接归还给链接池
    # def close(self):
    #     self.cursor.close()
    #     self.conn.close()
 
    # 从连接池中取出一个连接
    def getconn(self):
        conn = self.__getconn()
        cursor = conn.cursor()
        return cursor, conn
 # 获取连接池,实例化
@singleton
def get_my_connection():
    return MyConnectionPool()

开始测试:

场景一:同一个实例,执行2次sql

from mysqlhelper import MySqLHelper
import time
 
if __name__ == '__main__':
    sql = "SELECT SLEEP(10)"
    sql1 = "SELECT SLEEP(15)"
 
    db = MySqLHelper()
    db.execute(sql)
    db.execute(sql1)
    time.sleep(20)

在数据库中,使用 show processlist;

show processlist;

当执行第一个sql时。数据库连接显示。

当执行第二个sql时。数据库连接显示:

 当执行完sql,程序sleep时。数据库连接显示:

程序打印结果:

线程池的随机数43

由以上可以得出结论:

线程池启动后,生成了5个连接。执行第一个sql时,使用了1个连接。执行完第一个sql后,使用了另外1个连接。 这是一个线性的,线程池中一共5个连接,但是每次执行,只使用了其中一个。

有个疑问,连接池如果不支持并发是不是就毫无意义?

如上,虽然开了线程池5个连接,但是每次执行sql,只用到了一个连接。那为何不设置线程池大小为1呢?设置线程池大小的意义何在呢?(如果在非并发的场景下,是不是设置大小无意义?)

相比于不用线程池的优点:

如果不用线程池,则每次执行一个sql都要创建、断开连接。 像我们这样使用连接池,不用反复创建、断开连接,拿现成的连接直接用就好了。

场景二:依次创建2个实例,各自执行sql

from mysqlhelper import MySqLHelper
import time
 
if __name__ == '__main__':
    db = MySqLHelper()
    db1 = MySqLHelper()
    sql = "SELECT SLEEP(10)"
    sql1 = "SELECT SLEEP(15)"
    db.execute(sql)
    db1.execute(sql1)
    time.sleep(20)

第一个实例db,执行sql。线程池启动了5个连接

第二个实例db1,执行sql:

 程序睡眠时,一共5个线程池:

 打印结果:

结果证明:

虽然我们依次创建了2个实例,但是(1)创建线程池的打印结果,只打印1次,且从始至终,线程池一共只启动了5个连接,且连接的id没有发生改变,说明一直是这5个连接。

证明,我们虽然创建了2个实例,但是这2个实例其实是一个实例。(单例模式是生效的)

场景三:启动2个线程,但是线程在创建连接池实例时,有时间间隔

import threading
from mysqlhelper import MySqLHelper
import time
def sl1():
    time.sleep(2)
    db = MySqLHelper()
    sql = "SELECT SLEEP(6)"
    db.execute(sql)
 
def sl2():
    time.sleep(4)
    db = MySqLHelper()
    sql = "SELECT SLEEP(15)"
    db.execute(sql)
if __name__ == '__main__':
    threads = []
    t1 = threading.Thread(target=sl1)
    threads.append(t1)
    t2 = threading.Thread(target=sl2)
    threads.append(t2)
 
    for t in threads:
        t.setDaemon(True)
        t.start()
    time.sleep(20)

2个线程间隔了2秒。

观察数据库的连接数量:

打印结果:

在并发执行2个sql时,共用了这5个连接,且打印结果只打印了一次,说明虽然并发创建了2次实例,但真正只创建了一个连接池。

场景四:启动2个线程,线程在创建连接池实例时,没有时间间隔

import threading
from mysqlhelper import MySqLHelper
import time

if __name__ == '__main__':
    db = MySqLHelper()
    sql = "SELECT SLEEP(6)"
    sql1 = "SELECT SLEEP(15)"
    threads = []
    t1 = threading.Thread(target=db.execute, args=(sql,))
    threads.append(t1)
    t2 = threading.Thread(target=db.execute, args=(sql1,))
    threads.append(t2)
 
    for t in threads:
        t.setDaemon(True)
        t.start()
    time.sleep(20)

观察数据库连接 :

 打印结果:

结果表明:

终端打印了2次,数据库建立了10个连接,说明创建了2个线程池。这样的单例模式,存在线程安全问题。

到此这篇关于Python封装数据库连接池详解的文章就介绍到这了,更多相关Python连接池内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Django REST framework 视图和路由详解

    Django REST framework 视图和路由详解

    这篇文章主要介绍了Django REST framework 视图和路由详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-07-07
  • Python函数的作用域及内置函数详解

    Python函数的作用域及内置函数详解

    这篇文章主要介绍了python函数的作用域及内置函数详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2021-09-09
  • Python Matplotlib绘制箱线图boxplot()函数详解

    Python Matplotlib绘制箱线图boxplot()函数详解

    箱线图一般用来展现数据的分布(如上下四分位值、中位数等),同时也可以用箱线图来反映数据的异常情况,下面这篇文章主要给大家介绍了关于Python Matplotlib绘制箱线图boxplot()函数的相关资料,需要的朋友可以参考下
    2022-07-07
  • python requests实现上传excel数据流

    python requests实现上传excel数据流

    这篇文章主要介绍了python requests实现上传excel数据流,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-02-02
  • Python语言开发高并发爬虫示例探讨

    Python语言开发高并发爬虫示例探讨

    这篇文章主要为大家介绍了Python语言开发高并发爬虫示例探讨,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-12-12
  • python提取word文件中的所有图片

    python提取word文件中的所有图片

    办公中,偶尔会碰到一种情况,需要提取word文档中的图片,决定写这样一款工具自动提取图片,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-05-05
  • Python中文件操作简明介绍

    Python中文件操作简明介绍

    这篇文章主要介绍了Python中文件操作简明介绍,本文讲解了打开文件、读取方法、写入方法、文件内移动、文件迭代、关闭文件、截取文件等内容,并给出了一个完整操作实例,需要的朋友可以参考下
    2015-04-04
  • Python之list对应元素求和的方法

    Python之list对应元素求和的方法

    今天小编就为大家分享一篇Python之list对应元素求和的方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-06-06
  • python实现可视化动态CPU性能监控

    python实现可视化动态CPU性能监控

    这篇文章主要为大家详细介绍了python可视化动态CPU性能监控,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-06-06
  • selenium中BasicAuth认证弹框处理

    selenium中BasicAuth认证弹框处理

    本文主要介绍了selenium中BasicAuth认证弹框处理,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-07-07

最新评论