一、首先第一步,我们先来编写数据库配置文件,在test001keshanchu下创建目录和文件test_db\config.ini,内容如下
[DATABASE]
host = 10.182.27.158
port = 3306
user = root
passwd = 123456wt
database = xttest_new
dbchar = utf8
table = interface_test
二、第二步,我们写一个从config.ini配置文件中读取我们想要信息的readConfig.py文件,在test_db下创建readConfig.py文件
import os, configparser
path = os.path.split(os.path.realpath(__file__))[0]#得到readConfig.py文件的上级目录C:\Users\songlihui\PycharmProjects\test001keshanchu\test_db
config_path = os.path.join(path, 'config.ini')#得到配置文件目录,配置文件目录为path下的\config.ini
config = configparser.ConfigParser()#调用配置文件读取
config.read(config_path, encoding='utf-8')
class ReadConfig():
def get_mysql(self, name):
value = config.get('DATABASE', name)#通过config.get拿到配置文件中DATABASE的name的对应值
return value
if __name__ == '__main__':
print('path值为:', path)#测试path内容
print('config_path', config_path)#打印输出config_path测试内容是否正确
print('通过config.get拿到配置文件中DATABASE的host的对应值:', ReadConfig().get_mysql('host'))#通过上面的ReadConfig().get_mysql方法获取配置文件中DATABASE的'host'的对应值为10.182.27.158
运行此文件,输入内容为
三、编写我们的数据库连接池文件,在test_db创建configDB.py
# -*- coding: UTF-8 -*-
"""
1、执行带参数的SQL时,请先用sql语句指定需要输入的条件列表,然后再用tuple/list进行条件批配
2、在格式SQL中不需要使用引号指定数据类型,系统会根据输入参数自动识别
3、在输入的值中不需要使用转意函数,系统会自动处理
"""
import MySQLdb
from DBUtils.PooledDB import PooledDB
from test_db import readConfig
config = readConfig.ReadConfig()#实例化
"""
Config是一些数据库的配置文件,通过调用我们写的readConfig来获取配置文件中对应值
"""
host = config.get_mysql('host')
port = int(config.get_mysql('port'))
user = config.get_mysql('user')
passwd = config.get_mysql('passwd')
database = config.get_mysql('database')
dbchar = config.get_mysql('dbchar')
class Mysql(object):
"""
MYSQL数据库对象,负责产生数据库连接 , 此类中的连接采用连接池实现获取连接对象:conn = Mysql.getConn()
释放连接对象;conn.close()或del conn
"""
# 连接池对象
__pool = None
def __init__(self):
# 数据库构造函数,从连接池中取出连接,并生成操作游标
self._conn = Mysql.__getConn()
self._cursor = self._conn.cursor()
@staticmethod
def __getConn():
"""
@summary: 静态方法,从连接池中取出连接
@return MySQLdb.connection
"""
if Mysql.__pool is None:
__pool = PooledDB(creator=MySQLdb, mincached=1, maxcached=20, host=host, port=port, user=user, passwd=passwd, db=database)
return __pool.connection()
def getAll(self, sql, param=None):
"""
@summary: 执行查询,并取出所有结果集
@param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
@param param: 可选参数,条件列表值(元组/列表)
@return: result list(字典对象)/boolean 查询到的结果集
"""
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
if count > 0:
result = self._cursor.fetchall()
else:
result = False
return result
def getOne(self, sql, param=None):
"""
@summary: 执行查询,并取出第一条
@param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
@param param: 可选参数,条件列表值(元组/列表)
@return: result list/boolean 查询到的结果集
"""
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
if count > 0:
result = self._cursor.fetchone()
else:
result = False
return result
def getMany(self, sql, num, param=None):
"""
@summary: 执行查询,并取出num条结果
@param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
@param num:取得的结果条数
@param param: 可选参数,条件列表值(元组/列表)
@return: result list/boolean 查询到的结果集
"""
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
if count > 0:
result = self._cursor.fetchmany(num)
else:
result = False
return result
def insertOne(self, sql, value):
"""
@summary: 向数据表插入一条记录
@param sql:要插入的SQL格式
@param value:要插入的记录数据tuple/list
@return: insertId 受影响的行数
"""
self._cursor.execute(sql, value)
return self.__getInsertId()
def insertMany(self, sql, values):
"""
@summary: 向数据表插入多条记录
@param sql:要插入的SQL格式
@param values:要插入的记录数据tuple(tuple)/list[list]
@return: count 受影响的行数
"""
count = self._cursor.executemany(sql, values)
return count
def __getInsertId(self):
"""
获取当前连接最后一次插入操作生成的id,如果没有则为0
"""
self._cursor.execute("SELECT @@IDENTITY AS id")
result = self._cursor.fetchall()
return result[0]['id']
def __query(self, sql, param=None):
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
return count
def update(self, sql, param=None):
"""
@summary: 更新数据表记录
@param sql: SQL格式及条件,使用(%s,%s)
@param param: 要更新的 值 tuple/list
@return: count 受影响的行数
"""
return self.__query(sql, param)
def delete(self, sql, param=None):
"""
@summary: 删除数据表记录
@param sql: SQL格式及条件,使用(%s,%s)
@param param: 要删除的条件 值 tuple/list
@return: count 受影响的行数
"""
return self.__query(sql, param)
def begin(self):
"""
@summary: 开启事务
"""
self._conn.autocommit(0)
def end(self, option='commit'):
"""
@summary: 结束事务
"""
if option == 'commit':
self._conn.commit()
else:
self._conn.rollback()
def dispose(self, isEnd=1):
"""
@summary: 释放连接池资源
"""
if isEnd == 1:
self.end('commit')
else:
self.end('rollback');
self._cursor.close()
self._conn.close()
if __name__ == '__main__':
print(host, port, user, passwd, database)
四、测试验证在test_db下创建test_sql.py
# coding:utf-8
import test_db.configDB
mysql = test_db.configDB.Mysql()
sqlAll = "select * from utest"#sql语句,具体根据实际情况填写真实信息
result = mysql.getAll(sqlAll, None)
if result:
print("get all")
for row in result:
print(row[0], row[1])
mysql.dispose()#释放连接池资源
五、运行结果
注意事项:
config.ini的配置项根据自己的环境填写对应的正确数据,sqlAll = "select * from utest"#sql语句,具体根据实际情况填写真实信息