首页 > 代码库 > sqlalchemy操作

sqlalchemy操作

Sqlalchemy ORM操作归类

#简单查询

#注意User是一个类对象,user_table是数据库中的表

#session = sessionmaker() #创建了一个自定义了的 Session类

 

1.      session.query(User).all()

session.query(User).first()# 记录不存在时,first()会返回 None

session.query(User).one()#用于获取所有元素,如果没有获得结果或者返回了多个结果,则会产生一个 error

session.query(User).count()#计数

session.query(User).scalar()#返回列表和标量

2.      session.query(User).limit(2).all()#最多返回两条数据

session.query(User).offset(3).all()#从第4条数据开始

session.query(User).order_by(User.name).all()#排序

session.query(User).order_by(‘name‘).all()

session.query(User).order_by(User.name.desc()).all()

session.query(User).order_by(‘name desc‘).all()

3.      filter/ filter_by过滤

equals:

session.query(User).filter(User.name == ‘ed‘)

not equals:

session.query(User).filter(User.name != ‘ed‘)

LIKE:

session.query(User).filter(User.name.like(‘%ed%‘))

IN:

session.query(User).filter(User.name.in_([‘ed‘, ‘wendy‘, ‘jack‘]))

# 可以和 query 对象协同工作:

session.query(User).filter(User.name.in_(session.query(User.name).filter(User.name.like(‘%ed%‘))))

NOT IN:

session.query(User).filter(~User.name.in_([‘ed‘, ‘wendy‘, ‘jack‘]))

IS NULL:

session.query(User).filter(User.name == None)

IS NOT NULL:

filter(User.name != None)

AND:

from sqlalchemy import and_

session.query(User).filter(and_(User.name == ‘ed‘, User.fullname ==‘Ed Jones‘))

 

# 或者连续调用 filter()/filter_by()两次

session.query(User).filter(User.name == ‘ed‘).filter(User.fullname== ‘Ed Jones‘)

OR:

from sqlalchemy import or_

session.query(User).filter(or_(User.name == ‘ed‘, User.name ==‘wendy‘))

match:

session.query(User).filter(User.name.match(‘wendy‘))

match 的参数内容由数据库后台指定。

4.      聚合查询

session.query(func.count(‘*‘)).select_from(User).scalar()

session.query(func.count(‘1‘)).select_from(User).scalar()

session.query(func.count(User.id)).scalar()

session.query(func.count(‘*‘)).filter(User.id> 0).scalar() # filter() 中包含 User,因此不需要指定表

session.query(func.count(‘*‘)).filter(User.name== ‘a‘).limit(1).scalar() == 1 # 可以用 limit() 限制 count() 的返回数

session.query(func.sum(User.id)).scalar()

session.query(func.now()).scalar()# func 后可以跟任意函数名,只要该数据库支持

session.query(func.current_timestamp()).scalar()

session.query(func.md5(User.name)).filter(User.id== 1).scalar()

5.      更多查询操作

#多条件查询

print(session.query(User).filter(and_(User.name.like("user),User.fullname.like("first%"))).all()

---相当于SQL语句--->

select * from user_table where user_table.name like %user anduser_table.fullname like first%

 

print(session.query(User).filter(or_(User.name.like("user%),User.password!= None)).all()

---相当于SQL语句--->

select * from user_table where user_table.name = user% oruser_table.password != none

 

#sql过滤

print(session(User).filter("id>:id").params(id=1).all()

---相当于SQL语句--->

select * from user_table where user_table.id > 1

 

#关联查询

print(session.query(User,Address).filter(User.id ==Address.user_id).all()

---相当于SQL语句--->

select * from user_table,address_table where user_table.id ==address.user_id

 

print (session.query(User).jion(User.address).all()

---相当于SQL语句--->

********************

print (session.query(User).outerjoin(User.address).all())

*******************

 

#子查询

stmt = session.query(Address.user_id,func.count(‘*‘).label("address_count").groupby(Address.user_id).subquery()

print(session.query(User,stmt.c.address_count).outjion((stmt,User.id ==stmt.c.user_id.order_by(User_id).all()

 

#exits

print (session.query(User).filter(exists().where(Address.user_id ==User.id)))

print (session.query(User).filter(User.addresses.any()))

 

其他《增删改》

query.filter(User.id == 1).update({User.name: ‘c‘})
user = query.get(1)
print user.name

user.name = ‘d‘
session.flush() # 写数据库,但并不提交
print query.get(1).name

session.delete(user)
session.flush()
print query.get(1)

session.rollback()
print query.get(1).name
query.filter(User.id == 1).delete()
session.commit()
print query.get(1)

 

sqlalchemy操作