首页 > 代码库 > sqlalchemy操作
sqlalchemy操作
Sqlalchemy ORM操作归类
#简单查询
#注意User是一个类对象,user_table是数据库中的表
#session = sessionmaker() #创建了一个自定义了的 Session类
1. session.query(User).all()
session.query(User).first()# 记录不存在时,first()会返回 None
session.query(User).one()#用于获取所有元素,如果没有获得结果或者返回了多个结果,则会产生一个 error
session.query(User).count()#计数
session.query(User).scalar()#返回列表和标量
2. session.query(User).limit(2).all()#最多返回两条数据
session.query(User).offset(3).all()#从第4条数据开始
session.query(User).order_by(User.name).all()#排序
session.query(User).order_by(‘name‘).all()
session.query(User).order_by(User.name.desc()).all()
session.query(User).order_by(‘name desc‘).all()
3. filter/ filter_by过滤
equals:
session.query(User).filter(User.name == ‘ed‘)
not equals:
session.query(User).filter(User.name != ‘ed‘)
LIKE:
session.query(User).filter(User.name.like(‘%ed%‘))
IN:
session.query(User).filter(User.name.in_([‘ed‘, ‘wendy‘, ‘jack‘]))
# 可以和 query 对象协同工作:
session.query(User).filter(User.name.in_(session.query(User.name).filter(User.name.like(‘%ed%‘))))
NOT IN:
session.query(User).filter(~User.name.in_([‘ed‘, ‘wendy‘, ‘jack‘]))
IS NULL:
session.query(User).filter(User.name == None)
IS NOT NULL:
filter(User.name != None)
AND:
from sqlalchemy import and_
session.query(User).filter(and_(User.name == ‘ed‘, User.fullname ==‘Ed Jones‘))
# 或者连续调用 filter()/filter_by()两次
session.query(User).filter(User.name == ‘ed‘).filter(User.fullname== ‘Ed Jones‘)
OR:
from sqlalchemy import or_
session.query(User).filter(or_(User.name == ‘ed‘, User.name ==‘wendy‘))
match:
session.query(User).filter(User.name.match(‘wendy‘))
match 的参数内容由数据库后台指定。
4. 聚合查询
session.query(func.count(‘*‘)).select_from(User).scalar()
session.query(func.count(‘1‘)).select_from(User).scalar()
session.query(func.count(User.id)).scalar()
session.query(func.count(‘*‘)).filter(User.id> 0).scalar() # filter() 中包含 User,因此不需要指定表
session.query(func.count(‘*‘)).filter(User.name== ‘a‘).limit(1).scalar() == 1 # 可以用 limit() 限制 count() 的返回数
session.query(func.sum(User.id)).scalar()
session.query(func.now()).scalar()# func 后可以跟任意函数名,只要该数据库支持
session.query(func.current_timestamp()).scalar()
session.query(func.md5(User.name)).filter(User.id== 1).scalar()
5. 更多查询操作
#多条件查询
print(session.query(User).filter(and_(User.name.like("user),User.fullname.like("first%"))).all()
---相当于SQL语句--->
select * from user_table where user_table.name like %user anduser_table.fullname like first%
print(session.query(User).filter(or_(User.name.like("user%),User.password!= None)).all()
---相当于SQL语句--->
select * from user_table where user_table.name = user% oruser_table.password != none
#sql过滤
print(session(User).filter("id>:id").params(id=1).all()
---相当于SQL语句--->
select * from user_table where user_table.id > 1
#关联查询
print(session.query(User,Address).filter(User.id ==Address.user_id).all()
---相当于SQL语句--->
select * from user_table,address_table where user_table.id ==address.user_id
print (session.query(User).jion(User.address).all()
---相当于SQL语句--->
********************
print (session.query(User).outerjoin(User.address).all())
*******************
#子查询
stmt = session.query(Address.user_id,func.count(‘*‘).label("address_count").groupby(Address.user_id).subquery()
print(session.query(User,stmt.c.address_count).outjion((stmt,User.id ==stmt.c.user_id.order_by(User_id).all()
#exits
print (session.query(User).filter(exists().where(Address.user_id ==User.id)))
print (session.query(User).filter(User.addresses.any()))
其他《增删改》
query.filter(User.id == 1).update({User.name: ‘c‘})
user = query.get(1)
print user.name
user.name = ‘d‘
session.flush() # 写数据库,但并不提交
print query.get(1).name
session.delete(user)
session.flush()
print query.get(1)
session.rollback()
print query.get(1).name
query.filter(User.id == 1).delete()
session.commit()
print query.get(1)
sqlalchemy操作