一、首先第一步,我们先来编写数据库配置文件,在test001keshanchu下创建目录和文件test_db\config.ini,内容如下
- [DATABASE]
- host = 10.182.27.158
- port = 3306
- user = root
- passwd = 123456wt
- database = xttest_new
- dbchar = utf8
- table = interface_test
-
二、第二步,我们写一个从config.ini配置文件中读取我们想要信息的readConfig.py文件,在test_db下创建readConfig.py文件
- import os, configparser
-
- path = os.path.split(os.path.realpath(__file__))[0]#得到readConfig.py文件的上级目录C:\Users\songlihui\PycharmProjects\test001keshanchu\test_db
- config_path = os.path.join(path, 'config.ini')#得到配置文件目录,配置文件目录为path下的\config.ini
- config = configparser.ConfigParser()#调用配置文件读取
- config.read(config_path, encoding='utf-8')
-
- class ReadConfig():
-
- def get_mysql(self, name):
- value = config.get('DATABASE', name)#通过config.get拿到配置文件中DATABASE的name的对应值
- return value
-
-
- if __name__ == '__main__':
- print('path值为:', path)#测试path内容
- print('config_path', config_path)#打印输出config_path测试内容是否正确
- print('通过config.get拿到配置文件中DATABASE的host的对应值:', ReadConfig().get_mysql('host'))#通过上面的ReadConfig().get_mysql方法获取配置文件中DATABASE的'host'的对应值为10.182.27.158
-
运行此文件,输入内容为
三、编写我们的数据库连接池文件,在test_db创建configDB.py
- # -*- coding: UTF-8 -*-
- """
- 1、执行带参数的SQL时,请先用sql语句指定需要输入的条件列表,然后再用tuple/list进行条件批配
- 2、在格式SQL中不需要使用引号指定数据类型,系统会根据输入参数自动识别
- 3、在输入的值中不需要使用转意函数,系统会自动处理
- """
- import MySQLdb
- from DBUtils.PooledDB import PooledDB
- from test_db import readConfig
-
- config = readConfig.ReadConfig()#实例化
-
- """
- Config是一些数据库的配置文件,通过调用我们写的readConfig来获取配置文件中对应值
- """
- host = config.get_mysql('host')
- port = int(config.get_mysql('port'))
- user = config.get_mysql('user')
- passwd = config.get_mysql('passwd')
- database = config.get_mysql('database')
- dbchar = config.get_mysql('dbchar')
-
-
- class Mysql(object):
- """
- MYSQL数据库对象,负责产生数据库连接 , 此类中的连接采用连接池实现获取连接对象:conn = Mysql.getConn()
- 释放连接对象;conn.close()或del conn
- """
- # 连接池对象
- __pool = None
-
- def __init__(self):
- # 数据库构造函数,从连接池中取出连接,并生成操作游标
- self._conn = Mysql.__getConn()
- self._cursor = self._conn.cursor()
-
- @staticmethod
- def __getConn():
- """
- @summary: 静态方法,从连接池中取出连接
- @return MySQLdb.connection
- """
- if Mysql.__pool is None:
- __pool = PooledDB(creator=MySQLdb, mincached=1, maxcached=20, host=host, port=port, user=user, passwd=passwd, db=database)
- return __pool.connection()
-
- def getAll(self, sql, param=None):
- """
- @summary: 执行查询,并取出所有结果集
- @param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
- @param param: 可选参数,条件列表值(元组/列表)
- @return: result list(字典对象)/boolean 查询到的结果集
- """
- if param is None:
- count = self._cursor.execute(sql)
- else:
- count = self._cursor.execute(sql, param)
- if count > 0:
- result = self._cursor.fetchall()
- else:
- result = False
- return result
-
- def getOne(self, sql, param=None):
- """
- @summary: 执行查询,并取出第一条
- @param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
- @param param: 可选参数,条件列表值(元组/列表)
- @return: result list/boolean 查询到的结果集
- """
- if param is None:
- count = self._cursor.execute(sql)
- else:
- count = self._cursor.execute(sql, param)
- if count > 0:
- result = self._cursor.fetchone()
- else:
- result = False
- return result
-
- def getMany(self, sql, num, param=None):
- """
- @summary: 执行查询,并取出num条结果
- @param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
- @param num:取得的结果条数
- @param param: 可选参数,条件列表值(元组/列表)
- @return: result list/boolean 查询到的结果集
- """
- if param is None:
- count = self._cursor.execute(sql)
- else:
- count = self._cursor.execute(sql, param)
- if count > 0:
- result = self._cursor.fetchmany(num)
- else:
- result = False
- return result
-
- def insertOne(self, sql, value):
- """
- @summary: 向数据表插入一条记录
- @param sql:要插入的SQL格式
- @param value:要插入的记录数据tuple/list
- @return: insertId 受影响的行数
- """
- self._cursor.execute(sql, value)
- return self.__getInsertId()
-
- def insertMany(self, sql, values):
- """
- @summary: 向数据表插入多条记录
- @param sql:要插入的SQL格式
- @param values:要插入的记录数据tuple(tuple)/list[list]
- @return: count 受影响的行数
- """
- count = self._cursor.executemany(sql, values)
- return count
-
- def __getInsertId(self):
- """
- 获取当前连接最后一次插入操作生成的id,如果没有则为0
- """
- self._cursor.execute("SELECT @@IDENTITY AS id")
- result = self._cursor.fetchall()
- return result[0]['id']
-
- def __query(self, sql, param=None):
- if param is None:
- count = self._cursor.execute(sql)
- else:
- count = self._cursor.execute(sql, param)
- return count
-
- def update(self, sql, param=None):
- """
- @summary: 更新数据表记录
- @param sql: SQL格式及条件,使用(%s,%s)
- @param param: 要更新的 值 tuple/list
- @return: count 受影响的行数
- """
- return self.__query(sql, param)
-
- def delete(self, sql, param=None):
- """
- @summary: 删除数据表记录
- @param sql: SQL格式及条件,使用(%s,%s)
- @param param: 要删除的条件 值 tuple/list
- @return: count 受影响的行数
- """
- return self.__query(sql, param)
-
- def begin(self):
- """
- @summary: 开启事务
- """
- self._conn.autocommit(0)
-
- def end(self, option='commit'):
- """
- @summary: 结束事务
- """
- if option == 'commit':
- self._conn.commit()
- else:
- self._conn.rollback()
-
- def dispose(self, isEnd=1):
- """
- @summary: 释放连接池资源
- """
- if isEnd == 1:
- self.end('commit')
- else:
- self.end('rollback');
- self._cursor.close()
- self._conn.close()
-
- if __name__ == '__main__':
- print(host, port, user, passwd, database)
四、测试验证在test_db下创建test_sql.py
- # coding:utf-8
- import test_db.configDB
-
- mysql = test_db.configDB.Mysql()
- sqlAll = "select * from utest"#sql语句,具体根据实际情况填写真实信息
- result = mysql.getAll(sqlAll, None)
- if result:
- print("get all")
- for row in result:
- print(row[0], row[1])
- mysql.dispose()#释放连接池资源
五、运行结果
注意事项:
config.ini的配置项根据自己的环境填写对应的正确数据,sqlAll = "select * from utest"#sql语句,具体根据实际情况填写真实信息