假设这样一个业务场景,知道一个邮箱地址,要查询这个地址所属的用户,第一个办法是用连接多个 filter()
来查询。
for u, a in session.query(User, Address).\ filter(User.id==Address.user_id).\ filter(Address.email_address=='jack@google.com').\ all(): print(u) print(a) # 执行结果jackjack@google.com
更简便的方法是使用 join()
方法:
u = session.query(User).join(Address).\ filter(Address.email_address=='jack@google.com').\ one() print(u)# 执行结果jack
Query.join()
知道如何在 User
和 Address
之间进行连接,因为我们设定了外键。假如我们没有指定外键,比如这样:
class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(50)) fullname = Column(String(50)) password = Column(String(12))class Address(Base): __tablename__ = 'addresses' id = Column(Integer, primary_key=True) email_address = Column(String, nullable=False) user_id = Column(Integer)
我们可以用下面方法来让 join
生效:
query.join(Address, User.id==Address.user_id) # explicit conditionquery.join(User.addresses) # specify relationship from left to rightquery.join(Address, User.addresses) # same, with explicit targetquery.join('addresses') # same, using a string
例子:
session.query(User).\ join(Address, User.id==Address.user_id).\ filter(Address.email_address=='jack@google.com').all()
现在需要查询每个用户所拥有的邮箱地址数量,思路是先对 addresses 表按用户 ID 分组,统计各组数量,这样我们得到一张新表;然后用 JOIN 连接新表和 users 两个表,在这里,我们应该使用 LEFT OUTER JOIN,因为使用 INTER JOIN 所得出的新表只包含两表的交集。
from sqlalchemy.sql import func stmt = session.query(Address.user_id, func.count('*').\ label('address_count')).\ group_by(Address.user_id).subquery() for u, count in session.query(User, stmt.c.address_count).\ outerjoin(stmt, User.id==stmt.c.user_id).order_by(User.id): print(u, count)# 执行结果ed Nonewendy Nonemary Nonefred Nonejack 2
如果上面的暂时看不懂,我们先来看看第一个 stmt 的情况。
from sqlalchemy.sql import func stmt = session.query(Address.user_id, func.count('*').\ label('address_count')).\ group_by(Address.user_id).all() for i in stmt: print(i) # 执行结果(5, 2)
可以理解成 group_by()
方法生成了一张新的表,该表有两列,第一列是 user_id ,第二列是该 user_id 所拥有的 addresses 的数量,这个值由 func()
跟着的方法产生,我们可以使用 c()
方法来访问这个值。
from sqlalchemy.sql import func stmt = session.query(Address.user_id, func.count('*').\ label('address_count')).\ group_by(Address.user_id).subquery() q = session.query(User, stmt.c.address_count).\ outerjoin(stmt, User.id==stmt.c.user_id).order_by(User.id).all() for i in q: print(i) # 执行结果(ed, None)(wendy, None)(mary, None)(fred, None)(jack, 2)
如果不用 outerjoin()
而使用 join()
,就等于使用 SQL 中的 INTER JOIN,所得出的表只为两者交集,不会包含 None 值的列。
from sqlalchemy.sql import func stmt = session.query(Address.user_id, func.count('*').\ label('address_count')).\ group_by(Address.user_id).subquery() q = session.query(User, stmt.c.address_count).\ join(stmt, User.id==stmt.c.user_id).order_by(User.id).all() for i in q: print(i)# 执行结果(jack, 2)
SQLAlchemy 使用 aliased()
方法表示别名,当我们需要把同一张表连接多次的时候,常常需要用到别名。
from sqlalchemy.orm import aliased # 把 Address 表分别设置别名adalias1 = aliased(Address)adalias2 = aliased(Address) for username, email1, email2 in \ session.query(User.name, adalias1.email_address, adalias2.email_address).\ join(adalias1, User.addresses).\ join(adalias2, User.addresses).\ filter(adalias1.email_address=='jack@google.com').\ filter(adalias2.email_address=='j25@yahoo.com'): print(username, email1, email2) # 执行结果jack jack@google.com j25@yahoo.com
上述代码查询同时拥有两个名为:"jack@google.com" 和 "j25@yahoo.com" 邮箱地址的用户。
别名也可以在子查询里使用:
from sqlalchemy.orm import aliased stmt = session.query(Address).\ filter(Address.email_address != 'j25@yahoo.com').\ subquery() adalias = aliased(Address, stmt) for user, address in session.query(User, adalias).\ join(adalias, User.addresses): print(user) print(address)# 执行结果jackjack@google.com
EXISTS 关键字可以在某些场景替代 JOIN 的使用。
from sqlalchemy.sql import exists stmt = exists().where(Address.user_id==User.id) for name, in session.query(User.name).filter(stmt): print(name) # 执行结果jack
使用 any()
方法也能得到同意的效果:
for name, in session.query(User.name).\ filter(User.addresses.any()): print(name)
使用 any()
方法时也可加上查询条件:
for name, in session.query(User.name).\ filter(User.addresses.any(Address.email_address.like('%google%'))): print(name)
使用 has()
方法也能起到 JOIN 的作用:
session.query(Address).filter(~Address.user.has(User.name=='jack')).all()
注意:这里的 ~
符号是 “不” 的意思。
1. 等于、不等于
query = session.query(Address)jack = session.query(User).filter(User.name == 'jack').one() # 筛选 user 为 jack 的邮箱query.filter(Address.user == jack) # 筛选 user 不为 jack 的邮箱query.filter(Address.user != jack)
2. 为空、不为空
# 筛选 user 为空的邮箱query.filter(Address.user == None) # 筛选 user 不为空的邮箱query.filter(Address.user != None)
3. 包含
query = session.query(User)address = session.query(Address).filter(Address.id == 1).one() # 筛选包含某地址的用户query.filter(User.addresses.contains(address))
作者:SingleDiego
链接:https://www.jianshu.com/p/bf9f98479026
來源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。
Copyright © 2021 .长沙麦涛网络科技有限公司 All rights reserved.
湘ICP备20015126号-2
联系我们