2025年3月31日 星期一 乙巳(蛇)年 正月初一 设为首页 加入收藏
rss
您当前的位置:首页 > 计算机 > 编程开发 > Python

SQLAlchemy实战使用总结(通俗易懂)!!!

时间:06-18来源:作者:点击数:28

简介

SQLAlchemy是用Python编程语言开发的一个开源项目。它提供了SQL工具包和ORM(对象关系映射)工具,使用MIT许可证发行。

SQLAlchemy最初在2006年2月发行,发行后便很快的成为Python社区中最广泛使用的ORM工具之一,丝毫不亚于Django自带的ORM框架。

SQLAlchemy采用简单的Python语言,提供高效和高性能的数据库访问,实现了完整的企业级持久模型。它的理念是,SQL数据库的量级和性能比对象集合重要,而对象集合的抽象又重要于表和行。


以上都是比较官方的说法,帮助大家了解一下SQLAlchemy。了解了解就可。

基本用法

安装

安装sqlalchemy

  • pip3 install sqlalchemy
  • pip3 install pymysql

本文使用MySQL作为数据库,使用pymysql作为驱动,因此需要安装pymysql

连接数据库

配置信息

在连接数据库前,需要使用到一些配置信息,然后把它们组合成满足以下条件的字符串:

  • dialect+driver://username:password@host:port/database

例:mysql+pymysql://root:root@192.168.105.110:30306/testDataBase

建议将配置信息放到你的配置文件中,如config.py

  • dialect:数据库,如:sqlite、mysql、oracle等
  • driver:数据库驱动,用于连接数据库的,本文使用pymysql
  • username:用户名
  • password:密码
  • host:IP地址
  • port:端口
  • database:数据库

创建引擎并连接数据库

  • from sqlalchemy import create_engine
  • from config import DB_URI
  • engine = create_engine(DB_URI) # 创建引擎
  • conn = engine.connect() # 连接
  • result = conn.execute('SELECT 1') # 执行SQL
  • print(result.fetchone())
  • conn.close() # 关闭连接

创建ORM模型并映射到数据建库中

  • from sqlalchemy.ext.declarative import declarative_base
  • from sqlalchemy import create_engine, Column, Integer, String
  • from sqlalchemy.orm import sessionmaker
  • from config import DB_URI
  • engine = create_engine(DB_URI)
  • Base = declarative_base(engine) # SQLORM基类
  • session = sessionmaker(engine)() # 构建session对象
  • class Student(Base):
  • __tablename__ = 'student' # 表名
  • id = Column(Integer, primary_key=True, autoincrement=True)
  • name = Column(String(50))
  • age = Column(Integer)
  • sex = Column(String(10))
  • Base.metadata.create_all() # 将模型映射到数据库中

执行上面代码,将会在数据库中生成对应的映射表student。

增删改查(CRUD)操作

新增和修改的时候可能会遇到类型错误问题,可以参考下面的类型转换。

新增

注意:主要看使用的方法以及传入的参数,像开发session以及命名是什么,用自己框架的获取session对象就可以。

单条新增:
  • # 1.首先导入之间做好的ORM 对象 User
  • from my_create_table import User
  • # 2.使用Users ORM模型创建一条数据
  • user1 = User(name="wang")
  • # 在db_session会话中添加一条 UserORM模型创建的数据
  • db_session.add(user1)
  • # 使用 db_session 会话提交 , 这里的提交是指将db_session中的所有指令一次性提交
  • db_session.commit()
批量新增:

方法一:(最常用,但是执行时间不算快)

  • # 方法一:
  • user2 = User(name="张三")
  • user3 = User(name="王五")
  • db_session.add(user2)
  • db_session.add(user3)
  • db_session.commit()
  • # 之前说过commit是将db_session中的所有指令一次性提交,现在的db_session中至少有两条指令user2和user3
  • db_session.close()
  • #关闭会话

方法二:

  • # 方法二:
  • user_list = [
  • User(name="wang1"),
  • User(name="wang2"),
  • User(name="wang3")
  • ]
  • db_session.add_all(user_list)
  • db_session.commit()
  • db_session.close()

方法三:(执行时间最短排名:4)

  • db_session.bulk_save_objects(
  • [
  • Customer(name="NAME " + str(i))
  • for i in xrange(min(10000, n1))
  • ]
  • )
  • db_session.commit()

方法四:(执行时间最短排名:3)

  • db_session.bulk_insert_mappings(
  • Customer,
  • [
  • dict(name="NAME " + str(i))
  • for i in xrange(min(10000, n1))
  • ]
  • )
  • db_session.commit()

方法五:(执行时间最短排名:2)

  • engine = create_engine(dbname, echo=False)
  • engine.execute(
  • Customer.__table__.insert(),
  • [{"name": 'NAME ' + str(i)} for i in xrange(n)]
  • ) ##==> engine.execute('insert into ttable (name) values ("NAME"), ("NAME2")')

方法六:(执行时间最短排名:1)

  • def init_sqlite3(dbname):
  • conn = sqlite3.connect(dbname)
  • c = conn.cursor()
  • c.execute("DROP TABLE IF EXISTS customer")
  • c.execute(
  • "CREATE TABLE customer (id INTEGER NOT NULL, "
  • "name VARCHAR(255), PRIMARY KEY(id))")
  • conn.commit()
  • return conn
  • def test_sqlite3(n=100000, dbname='sqlite3.db'):
  • conn = init_sqlite3(dbname)
  • c = conn.cursor()
  • t0 = time.time()
  • for i in xrange(n):
  • row = ('NAME ' + str(i),)
  • c.execute("INSERT INTO customer (name) VALUES (?)", row)
  • conn.commit()

删除

  • # 删除
  • class_info = db_session.query(ClassTable).filter(ClassTable.name=="OldBoyS1").first()
  • db_session.query(Student).filter(Student.class_id == class_info.id).delete()
  • db_session.commit()
  • db_session.close()
  • user = UserSample()
  • db.session.query(UserSample).filter_by(id=user.id).delete()
  • db.session.commit()
  • db.session.close()

修改

单条更新:
  • # UPDATE user SET name="NBDragon" WHERE id=20 更新一条数据
  • # 语法是这样的 :
  • # 使用 db_session 执行User表 query(User) 筛选 User.id = 20 的数据 filter(User.id == 20)
  • # 将name字段的值改为NBDragon update({"name":"NBDragon"})
  • res = db_session.query(User).filter(User.id == 20).update({"name":"NBDragon"})
  • print(res) # 1 res就是我们当前这句更新语句所更新的行数
  • # 注意注意注意
  • # 这里一定要将db_session中的执行语句进行提交,因为你这是要对数据中的数据进行操作
  • # 数据库中 增 改 删 都是操作,也就是说执行以上三种操作的时候一定要commit
  • db_session.commit()
  • db_session.close()
  • #关闭会话
批量更新:
  • # 更新多条
  • res = db_session.query(User).filter(User.id <= 20).update({"name":"NBDragon"})
  • print(res) # 6 res就是我们当前这句更新语句所更新的行数
  • db_session.commit()
  • db_session.close()
  • #关闭会话

查询

查询所有数据
  • # 查询所有数据
  • res = db_session.query(User).all()
指定查询列
  • # query的时候我们不在使用User ORM对象,而是使用User.name来对内容进行选取
  • user_list = db_session.query(User.name).all()
获取返回数据的第一行
  • # 获取返回数据的第一行
  • res = db_session.query(User).first()
查询数据 指定查询数据列 加入别名
  • # 查询数据 指定查询数据列 加入别名
  • res = db_session.query(User.name.label('username'), User.id).first()
原生SQL查询
  • #原生SQL查询
  • res = db_session.query(User).from_statement(text("SELECT * FROM User where name=:name")).params(name='wang').all()
表达式筛选条件
  • # 表达式筛选条件
  • res = db_session.query(User).filter(User.name == "wang").all()
原生SQL筛选条件
  • # 原生SQL筛选条件
  • r4 = db_session.query(User).filter_by(name='wang').all()
  • r5 = db_session.query(User).filter_by(name='wang').first()
筛选条件格式
  • # 筛选条件格式
  • # filter:
  • user_list = db_session.query(User).filter(User.name == "wang").all()
  • user_list = db_session.query(User).filter(User.name == "wang").first()
  • # filter_by:
  • user_list = db_session.query(User).filter_by(name="wang").all()
  • user_list = db_session.query(User).filter_by(name="wang").first()
字符串匹配方式筛选条件 并使用 order_by进行排序
  • # 字符串匹配方式筛选条件 并使用 order_by进行排序
  • res = db_session.query(User).filter(text("id<:value and name=:name")).params(value=224, name='wang').order_by(User.id).all()
and or使用
  • # and or
  • from sqlalchemy.sql import and_ , or_
  • res = db_session.query(User).filter(and_(User.id > 3, User.name == 'wang')).all()
  • ret = db_session.query(User).filter(or_(User.id < 2, User.name == 'wang')).all()
复杂查询
  • # 复杂查询
  • from sqlalchemy.sql import text
  • user_list = db_session.query(User).filter(text("id<:value and name=:name")).params(value=3,name="wang")
查询语句
  • # 查询语句
  • data = db_session.execute('select * from t_user where name=:value', params={"value": 'abc'})
  • row = data.fetchone() # 取第一条
  • print(row.id) # 取主键
  • print(row.name) # 取字段
  • rows = data.fetchall() # 取所有数据
  • for row in rows:
  • print(row.id, row.name)
  • print(data.rowcount) # 取条数
排序
  • # 排序 :
  • user_list = db_session.query(User).order_by(User.id).all()
  • user_list = db_session.query(User).order_by(User.id.desc()).all()
通配符
  • # 通配符
  • ret = db_session.query(User).filter(User.name.like('e%')).all()
  • ret = db_session.query(User).filter(~User.name.like('e%')).all()
限制(切片)
  • # 限制
  • ret = db_session.query(User)[1:2]
分组
  • from sqlalchemy.sql import func
  • ret = db_session.query(User).group_by(User.extra).all()
  • ret = db_session.query(
  • func.max(User.id),
  • func.sum(User.id),
  • func.min(User.id)).group_by(User.name).all()
  • ret = db_session.query(
  • func.max(User.id),
  • func.sum(User.id),
  • func.min(User.id)).group_by(User.name).having(func.min(User.id) >2).all()
between in使用
  • # between使用
  • ret = session.query(User).filter(User.id.between(1, 3), User.name == 'wang').all() # between 大于1小于3
  • # in使用
  • ret = session.query(User).filter(User.id.in_([1,3,4])).all() # in_([1,3,4]) 只查询id等于1,3,4
  • ret = session.query(User).filter(~User.id.in_([1,3,4])).all() # ~xxxx.in_([1,3,4]) 查询不等于1,3,4
  • ret = session.query(User).filter(User.id.in_(session.query(User.id).filter_by(name='wang'))).all() 子查询

类型转换

dict_to_object

  • def dict_to_object(dict_data, obj):
  • dic2class(dict_data, obj)
  • def dic2class(py_data, obj):
  • for name in [name for name in dir(obj) if not name.startswith('_')]:
  • if name not in py_data:
  • setattr(obj, name, None)
  • else:
  • value = getattr(obj, name)
  • setattr(obj, name, set_value(value, py_data[name]))
  • def set_value(value, py_data):
  • if str(type(value)).__contains__('.'):
  • # value 为自定义类
  • dic2class(py_data, value)
  • elif str(type(value)) == "<class 'list'>":
  • # value为列表
  • if value.__len__() == 0:
  • # value列表中没有元素,无法确认类型
  • value = py_data
  • else:
  • # value列表中有元素,以第一个元素类型为准
  • child_value_type = type(value[0])
  • value.clear()
  • for child_py_data in py_data:
  • child_value = child_value_type()
  • child_value = set_value(child_value, child_py_data)
  • value.append(child_value)
  • else:
  • value = py_data
  • return value

db_tuple_to_dict

  • def db_tuple_to_dict(resultproxy):
  • d, a = {}, []
  • for rowproxy in resultproxy:
  • for column, value in rowproxy.items():
  • d = {**d, **{column: value}}
  • a.append(d)
  • return a

model_to_dict

  • def model_to_dict(obj):
  • dic = {}
  • dic_columns = obj.__table__.columns
  • # 保证都是字符串和数字
  • types = [str, int, float, bool]
  • # 注意,obj.__dict__会在commit后被作为过期对象清空dict,所以保险的办法还是用columns
  • for k, tmp in dic_columns.items():
  • # k=nick,tmp=usergroup.nick
  • v = getattr(obj, k, None)
  • if v != None:
  • dic[k] = str(v) if v and type(v) not in types else v
  • return dic

res_copy_model_to_dest(修改的时候会用到)

  • def res_copy_model_to_dest(res, dest):
  • dic_columns = res.__table__.columns
  • # 保证都是字符串和数字
  • types = [str, int, float, bool, bytes]
  • for k, tmp in dic_columns.items():
  • v = getattr(res, k, None)
  • value = str(v) if v and type(v) not in types else v
  • print("key:", k, " v:", value)
  • if v is not None:
  • setattr(dest, k, value)

例:

  • test = TestModel()
  • db_test = db.session.query(TestModel).get(test.id)
  • res_copy_model_to_dest(test, db_test)
  • db.session.commit()
  • db.session.close()

在项目中用到的东西,分享一波

方便获取更多学习、工作、生活信息请关注本站微信公众号城东书院 微信服务号城东书院 微信订阅号
推荐内容
相关内容
栏目更新
栏目热门