Flask-SQLAlchemy使用
简介
扩展Flask-SQLAlchemy集成了SQLAlchemy,它简化了连接数据库服务器、管理数据库会话等各类工作,同时也是一个ORM框架。
安装
pip install flask-sqlalchemy
在Flask环境下使用
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
db = SQLAlchemy()
db.init_app(app)
如果未配置SQLALCHEMY_DATABASE_URI,则程序无法连接到数据库服务器,直接执行则会出错。
连接数据库服务器
DBMS通常会提供一个数据库服务器在操作系统上。要连接数据库服务器。首先要为我们的程序制定一个URI。数据库URI是一个包含了各种属性的字符串。
常用的数据库连接URI
- PostgreSQL : postgresql+<数据库驱动>://username:password@host:port/databasename
- MySQL : mysql+<数据库驱动>://username:password@host:port/databasename
- Oracle : oracle+<数据库驱动>://username:password@host:port/databasename
- SQLite(UNIX) : sqlite:////dir/sqlite.db
- SQLite(Windows) : r'sqlite:///dir\sqlite.db'
在Flask-SQLAlchemy中数据库连接uri通过SQLALCHEMY_DATABASE_URI进行配置。
向Flask框架中添加配置:
# app.config.update({'SQLALCHEMY_DATABASE_URI': 'mysql://root:wangke0310@127.0.0.1:3306/test'})
or
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:wangke0310@127.0.0.1:3306/test'
配置警告:SQLALCHEMY_TRACK_MODIFICATIONS
SQLALCHEMY_TRACK_MODIFICATIONS配置变量决定是否追踪对象的修改,这用于Flask-SQLAlchemy的事件通知系统。这个配置键的值默认为None,如果没有特殊要求,可以将其设置为False以关闭警告信息:
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
定义模型类
用来映射到数据表的Python类通常被称为一个数据库模型(Model)一个数据库模型类对应数据库中的一张表。
使用SQLAlchemy定义模型类
Base = declarative_base()
class User(Base):
__tablename__ = 'tab_users'
id = Column(Integer, primary_key=True)
name = Column(String(20))
fullname = Column(String(20))
password = Column(String(20))
在使用SQLAlchemy定义模型类时我们需要去继承一个由declarative_base()方法创建的一个基类。
使用Flask-SQLAlchemy定义模型类
db = SQLAlchemy(app)
当我们通过Flask-SQLAlchemy来定义模型类时则不再需要向上面一样手动去通过declarative_base()方法去获取一个基类。我们只需要创建一个SQLAlchemy的实例即可,其内部会自动帮助我们创建。
SQLAlchemy类的构造方法:
def __init__(self, app=None, use_native_unicode=True, session_options=None,
metadata=None, query_class=BaseQuery, model_class=Model):
self.use_native_unicode = use_native_unicode
self.Query = query_class
self.session = self.create_scoped_session(session_options)
self.Model = self.make_declarative_base(model_class, metadata)
self._engine_lock = Lock()
self.app = app
_include_sqlalchemy(self, query_class)
if app is not None:
self.init_app(app)
因此模型类直接继承db.Model即可:
from app import db
Base = declarative_base()
class User(db.model):
__tablename__ = 'tab_users'
id = Column(Integer, primary_key=True)
name = Column(String(20))
fullname = Column(String(20))
password = Column(String(20))
类名到表名的生成规则
类名到表名的转换关系如下:
Message -> message [单个单词转换为小写]
ProposalCategory -> proposal_category [多个单词转换为小写并使用下划线分割每一个单词]
也可以通过{%raw%} tablename{%endraw%} 属性来自定义类所对应数据库中表的表名。
class Campus(Base):
__tablename__ = 'haha'
id = Column(Integer, autoincrement=True, primary_key=True)
name = Column(String(24), nullable=False)
address = Column(String(254), nullable=True)
常用字段类型
- Integer 整数
- String 字符串,指定length来设置所存放字符的最大长度
- Text 较长的Unicode文本
- Date 日期,存储Python的datetime.date对象
- Time 时间,存储Python的datetime.time对象
- DateTime 日期时间,存储Python的datetime对象
- Interval 时间间隔,存储Python的datetime.timedelta
- Float 浮点数
- Boolean 布尔值
- PickleType 存储Pickle序列化的Python对象
- LargeBinary 存储任意二进制文件
常用字段参数
下面为实例化字段类时常用的字段参数:
- primary_key : 布尔类型,如果设为True则该字段为主键
- unique : 布尔类型,如果设为True则该字段不允许出现重复值
- index : 布尔类型,如果设为True则为该字字段创建索引,以提高查询效率
- nullable : 布尔类型,确定该字段是否可为空,默认为True
- default : 为当前字段设置一个默认值
类属性名到数据库表字段名生成规则
字段名默认为类属性名,也可以通过字段类构造方法的第一个参数指定,或者使用关键字name。
通过字段类构造方法第一个参数指定:
name = Column('name', String(24), nullable=False)
通过使用关关键字name指定:
name = Column(String(24), nullable=False, name='new_name')
创建表
可以通过调用db.create_all()来把已经定义好的模型类生成数据库中的表。
需要注意有一下几点:
-
如果模型类是单独定义在别的模块中的,在调用db.create_all()前要导入相应的模块。确保其被加载到了内存。
-
执行db.create_all()需要上下文环境,可以通过下面的方式创建一个上下文环境:
with app.app_context():
db.create_all()
- 使用db.create_all()只会创建和模型类相对应的表,而不会创建数据库。因此在调用此方法前需要确保要操作的数据库已经被创建。
数据库和表一旦被创建模型类的改动就不会自动作用到实际的表中。比如在模型类中添加或删除了一个字段,修改字段的名称和类型。这时再次调用create_all()也不会更新表结构。如果想要数据库中的表结构能够生效则需要先调用db.drop_all()删除数据库中的所有表,再调用db.create_all()才会生效。
在开发时,以删除表再重建的方式更新数据库简单直接,但是明显的缺陷就会丢掉已的数据。在生产环境中肯定是不以这样的。我们可以通过flask-migrate来帮我们实现数据库的迁移,数据库迁移工具可以在不破坏数据的情况下更新数据库表结构。
flask-migrate
扩展flask-migrate集成了Alembic(和SQLAlchemy是同一个作者)它提供了一些flask命令来简化我们的工作。
将flask-migrate集成到我们的项目非常简单:
migrate = Migrate(app, db)
实例化Migrate类并传递两个参数即可。
- 参数1: Flask实例对象
- 参数2: SQLAlchemy实例对象
配置好之后就可以通过flask命令还来执行数据库迁移工作。在使用命令前要确保FLASK_APP配置正确,否则命令无法执行。
数据库迁移命令
- 创建迁移环境 : flask db init
- 生成迁移脚本 : flask db migrate -m "add column remark"
- 更新数据库 : flask db upgrade
- 回退到上一个数据库版本 : flask db downgrade
简单数据库操作
下面关于简单数据库操作使用Campus类进行演示:
class Campus(Base):
id = Column(Integer, autoincrement=True, primary_key=True)
# 校区名
name = Column(String(24), nullable=False)
# 校区地址
address = Column(String(254), nullable=True)
查询操作
一个完整的查询应该遵循下面的模式:
<模型类>.query.<过滤方法>.<查询方法>
或者:
query(<模型类>).<过滤方法>.<查询方法>
all() 查询所有的地址列表
list = Campus.query.all()
or
list = db.session.query(Campus).query.all()
first() 查询地址id为1的行
campus = Campus.query.filter(Campus.id == 1).first()
count() 查询记录总数
count = Campus.query.count()
常用查询方法:
- all() 返回包含所有查询记录的列表
- first() 返回查询的第一条记录,如果未找到返回None
- first_or_404() 返回查询的第一条记录,如果未找到则返回404响应给客户端
- one() 返回查询到的第一条记录,且整个查询到的记录数只允许为1,否则抛出错误
- one_or_none() 返回查询到的第一条记录,如果查询到的记录数不为1,则返回None
- get(ident) 传入主键作为参数,返回指定主键所对应的记录,如果未找到则返回None
- get_or_404(ident) 传入主键作为参数,返回指定主键所对应的记录,如果未找到返回一个404响应给客户端
- count() 返回查询结果的数量
- paginate() 返回一个paginate对象,可以对记录进行分页处理
常用的过滤方法
- filter() 使用指定的规则过滤,返回新的查询对象
- filter_by() 使用指定的规则过滤(以关键字表达式的形式),返回新产生的查询对象
- order_by() 根据指定的规则对查询到的结果进行排序,返回新产生的查询对象
list = Campus.query.order_by(Campus.id.desc()).all()
- limit(limit) 使用指定的值限制原查询记录返回的数量,返回新产生的查询对象
- group_by() 根据指定条件对记录进行分组,返回新产生的查询对象
- offset(offset) 使用指定的值偏移原查询结果,返回新产生的查询对象
聚合操作
需要从: from sqlalchemy.sql import func 引入
- func.max() 根据参数指定的列取所有记录中最大的那个
max_id = db.session.query(func.max(Campus.id)).first()
- func.sum() 对指定的列执行累加操作
sum_id = db.session.query(func.sum(Campus.id)).first()
- func.min() 根据参数指定的列取所有记录中最小的那个
min_id = db.session.query(func.min(Campus.id)).first()
filter()常用的过滤操作:
- == :
filter(Campus.id == 1)
- != :
filter(Campus.id != 1)
- LIKE :
filter(Campus.address.like('%汊河%'))
- IN :
filter(Campus.name.in_(['mingjia', 'zhihu']))
- NOT IN :
filter(~Campus.name.in_(['mingjia', 'zhihu']))
- AND :
# 1.使用and_
filter(and_(Campus.id == 1, Campus.name == 'zhihu'))
#2. 直接在filter中添加多个表达式使用逗号分隔
filter(Campus.id == 1, Campus.name == 'zhihu')
#3. 叠加使用对个filter或filter_by方法
filter(Campus.id == 1).filter(Campus.name == 'zhihu')
- OR :
filter(or_(Campus.id == 1, Campus.name == 'mingjia' ))
filter与filter_by的区别
通过filter_by可以直接使用 关键字表达式 来指定过滤规则,而filter只需要通多传递参数的形式来指定过滤规则。
使用filter和filter_by实现同样的查询功能:
filter:
campus = Campus.query.filter(Campus.id == 1).first()
filter_by:
campus = Campus.query.filter_by(id = 1).first()
更新操作
直接赋值给模型类的字段属性就可以完成更新的操作
campus = Campus.query.get(1)
campus.name = '四望亭校区'
db.session.commit()
注意: 只有要插入新的记录或者将现有的记录添加到会话中才需要使用add,单纯的更新记录时只需要为属性赋新值,并提交修改。
删除操作
campus = Campus.query.filter_by(id > 2).delete()
db.session.commit()
高级数据库操作
执行原生SQL
# 查询
cursor = db.session.execute('select *from student')
print(cursor.fetchall())
# 插入
db.session.execute('insert into student(name) values(:value)', params={'value': 'wangk1e'})
db.session.commit()
join查询
data = db.session.query(Student).join(Guardian, isouter = True).all()
由于Student模型和Guardian模型之间存在主外键关系,因此可以不用指定任何条件就可以实现连接查询。
join中的isouter参数用于指定两张连接时是采用外连接还是内连接。
也可以手动指定两个模型之间的关联条件:
data = db.session.query(Student).join(Guardian, Student.guardian_id == Guardian.id, isouter = False).all()
多个模型关联查询
query = db.session.query(Student).join(Student.guardian, isouter=True) \
.join(Student.local_school, isouter=True) \
.join(Student.course, isouter=True) \
.join(Student.category, isouter=True) \
.order_by(sort)
内连接和外链接的区别
-
内连接: 从左表中取出一条记录去和右表中的所有记录根据条件进行匹配,只有匹配得上才会保留内容。也就是两张表根据某个条件取交集。
-
外连接: 从左表中取出一条记录去和右表中的所有记录根据条件进行匹配,无论是否匹配得上都会保留内容。不能匹配的其他表中的内容为null。
示例1
q = session.query(Address).select_from(User).\
join(User.addresses).\
filter(User.name == 'ed')
Which will produce SQL similar to::
SELECT address.* FROM user
JOIN address ON user.id=address.user_id
WHERE user.name = :name_1
示例2
address_subq = session.query(Address).\
filter(Address.email_address == 'ed@foo.com').\
subquery()
q = session.query(User).join(address_subq, User.addresses)
Producing SQL similar to::
SELECT user.* FROM user
JOIN (
SELECT address.id AS id,
address.user_id AS user_id,
address.email_address AS email_address
FROM address
WHERE address.email_address = :email_address_1
) AS anon_1 ON user.id = anon_1.user_id
示例3
a_alias = aliased(Address)
q = session.query(User).\
join(User.addresses).\
join(a_alias, User.addresses).\
filter(Address.email_address=='ed@foo.com').\
filter(a_alias.email_address=='ed@bar.com')
Where above, the generated SQL would be similar to::
SELECT user.* FROM user
JOIN address ON user.id = address.user_id
JOIN address AS address_1 ON user.id=address_1.user_id
WHERE address.email_address = :email_address_1
AND address_1.email_address = :email_address_2