SQLAlchemy 笔记 ORM方式访问数据库

原创
2014/08/29 15:56
阅读数 641

下载安装

看看安装成功了吗

import sqlalchemy
print sqlalchemy.__version__

创建引擎,准备链接数据库

from sqlalchemy import create_engine
engine = create_engine('sqlite:///:memory:', echo=True)


NOTE:

echo=True 代表打印日志和生成的SQL语句,使用的是标准python logging模块,生产环境要设置成False

create_engine()返回一个引擎实例,还并没有链接数据库,当执行查询时才开始链接

几种链接数据库的url写法:



SQLite

相对路径写法
# sqlite://<nohostname>/<path> # where <path> is relative: 
engine = create_engine('sqlite:///foo.db')

绝对路径  4个/
engine = create_engine('sqlite:////absolute/path/to/foo.db')

使用内存数据库 2个/
engine = create_engine('sqlite://')



Postgresql&para;


# default engine = create_engine('postgresql://scott:tiger@localhost/mydatabase')

# psycopg2 engine = create_engine('postgresql+psycopg2://scott:tiger@localhost/mydatabase')

# pg8000 engine = create_engine('postgresql+pg8000://scott:tiger@localhost/mydatabase')

MySQL


# default engine = create_engine('mysql://scott:tiger@localhost/foo')
# mysql-python engine = create_engine('mysql+mysqldb://scott:tiger@localhost/foo')
# MySQL-connector-python engine = create_engine('mysql+mysqlconnector://scott:tiger@localhost/foo')
# OurSQL engine = create_engine('mysql+oursql://scott:tiger@localhost/foo')
解释:


mysql+oursql://<user>:<password>@<host>[:<port>]/<dbname>[?charset=utf-8]


user   用户名

password   密码

host:post   地址端口号

dbname   数据库名字

?charset=utf-8  查询字符集


声明映射:

映射基类 from sqlalchemy.ext.declarative import declarative_base 
 Base = declarative_base()

 

所有的model都继承Base基类

from sqlalchemy import Column, Integer, String   
class User(Base):    
     __tablename__ = 'users'     
    id = Column(Integer, primary_key=True)     
    name = Column(String)      
    fullname = Column(String)     
    password = Column(String)
           
    def __repr__(self):         
        return "<User(name='%s', fullname='%s', password='%s')>" % ( ...  self.name, self.fullname, self.password)


1:使用ORM,一个是继承Base,另一个就是__tablename__  指定数据库表名字
2:需要一个Column  用来当作主键(其实是可以设置没有主键的,不过这里不解释)
3:除了sqlalchemy需要的东西外,这就是一个正常的类,我们可以定义其它的方法,属性啊,就像  def__repr__()

执行创建表,使用我们一开始创建的那个引擎
Base.metadata.create_all(engine) 由于echo=True,你会看到创建表时的SQL语句
CREATE TABLE users (
    id INTEGER NOT NULL, 
    name VARCHAR, 
    fullname VARCHAR, 
    password VARCHAR, 
    PRIMARY KEY (id)
)
2014-08-19 16:24:03,095 INFO sqlalchemy.engine.base.Engine ()
2014-08-19 16:24:03,095 INFO sqlalchemy.engine.base.Engine COMMIT
注意  Column(string())  这个在 SQLite and Postgresql是正确的,但其他的数据库不可以
Column(String(50)) 其他的数据库需要指定长度 
其他字段同理需要注意
另一个注意点   Firebird and Oracle require sequences to generate new primary key identifiers 这两个数据库需要在定义主键的时候需要额外的设置from sqlalchemy import Sequence Column(Integer, Sequence('user_id_seq'), primary_key=True)

创建一个映射实例

ed_user = User(name='ed', fullname='Ed Jones', password='edspassword')

print  ed_user.name

print  ed_user.password

print  ed_user.id

ed
edspassword
None


ed_user.id为什么是None

首先这是一个类,一般一个类会有__init__方法(这个方法在继承base的时候继承过来了),我们在实例化的时候并没有给id 赋值,自然也就是空了。

到这里,其实类只是打算和数据库映射,还没有真正执行那,数据库里是没有任何信息的。


创建会话:

现在才是真正的开始操作数据库了。

The ORM’s “handle” to the database is the Session,这是官网上的,想要用ORM功能处理数据库,得使用session.

from sqlalchemy.orm import sessionmaker  这是一个session工厂

Session = sessionmaker(bind=engine)

session = Session()

When we first set up the application, at the same level as our create_engine() statement, we define a Session class which will serve as a factory for new Session objects:我觉着这几句很重要,建立APP ,创建引擎,获取一个session ,是一气呵成的。

三个问题,何时构造一个session  ,何时提交会话内的数据,何时关闭session?

答案http://docs.sqlalchemy.org/en/rel_0_9/orm/session.html#session-faq-whentocreate


向数据库添加一个新对象

创建一个对象
ed_user = User(name='ed', fullname='Ed Jones', password='edspassword') 加入到会话中,并没有真的插入到数据库,数据库是没有内容的。
session.add(ed_user)

查询。在查询时,会flush一下session,把session中要执行的SQL语句发送到数据库,虽然已经执行SQL语句,
但是只对当前会话有效
经过测试发现这里是开启了事务,直到使用session.commit()显示提交事务才可以。
our_user = session.query(User).filter_by(username='ed').first()

提交事务,修改数据库,再有操作就不属于这个事务里了。
session.commit()


回滚:

session.rollback()

查询

session.query()


session.query(User) 无任何反应。所以可以断定是个懒查询 
session.query(User).all() 所有的属性值,返回是个列表 
session.query(User.username).all() 查询user一个属性 
session.query(User).order_by(User.id) 无反应,只构造了sql语句 
session.query(User).order_by(User.id).all() 这才是真正的执行查询 下面这个因为有for循环,会去真查询数据库,不然就是构造SQL语句过程, 
for name, fullname in session.query(User.name, User.fullname):     
    print name, fullname 查询条件是具体属性值时,外层列表,里层元组

for row in session.query(User, User.name).all():     
    print row.User, row.name    
    print row.User.id, row.name     
    print row



>>> for row in session.query(User.name.label('name_label')).all():     
        print(row.name_label)

SELECT users.name AS name_label
FROM users
()



>>> from sqlalchemy.orm import aliased
>>> user_alias = aliased(User, name='user_alias')
>>> for row in session.query(user_alias, user_alias.name).all(): 
...    print row.user_alias 

SELECT user_alias.id AS user_alias_id,
        user_alias.name AS user_alias_name,
        user_alias.fullname AS user_alias_fullname,
        user_alias.password AS user_alias_password
FROM users AS user_alias
()



 session.query(User).order_by(User.id)[1:3]


 
for name, in session.query(User.name).(fullname='Ed Jones'):     
    print name 
或者: 
for name, in session.query(User.name).(User.fullname=='Ed Jones'):     
    print name 

可以看出来每次的filter返回一个query()对象,所以可以使用针对query()的方法继续查询
for user in session.query(User).filter(User.name=='ed').filter(User.fullname=='Ed Jones'):     
    print user

常见的过滤器操作符:


query.filter(User.name == 'ed')


query.filter(User.name != 'ed')
  • LIKE: 模糊匹配

query.filter(User.name.like('%ed%'))
  • IN: 多指查找

query.filter(User.name.in_(['ed', 'wendy', 'jack']))
# works with query objects too:
query.filter(User.name.in_(
                session.query(User.name).filter(User.name.like('%ed%'))
            )
)
query.filter(~User.name.in_(['ed', 'wendy', 'jack']))

IS NULL:

query.filter(User.name == None)
# alternatively, if pep8/linters are a concern
query.filter(User.name.is_(None))

query.filter(User.name != None)
# alternatively, if pep8/linters are a concern
query.filter(User.name.isnot(None))
# use and_()from sqlalchemy import and_
query.filter(and_(User.name == 'ed', User.fullname == 'Ed Jones'))

# or send multiple expressions to .filter()
query.filter(User.name == 'ed', User.fullname == 'Ed Jones')

# or chain multiple filter()/filter_by() calls
query.filter(User.name == 'ed').filter(User.fullname == 'Ed Jones')
from sqlalchemy import or_query.filter(or_(User.name == 'ed', User.name == 'wendy'))
query.filter(User.name.match('wendy'))

Note

match() 能不能用取决于你的数据库


返回列表和标量:

all()  返回所有是一个列表

>>> query = session.query(User).filter(User.name.like('%ed')).order_by(User.id)
>>> query.all() 
[<User(name='ed', fullname='Ed Jones', password='f8s7ccs')>,
 <User(name='fred', fullname='Fred Flinstone', password='blah')>]

first() 返回结果集的第一个

>>> query.first() 
<User(name='ed', fullname='Ed Jones', password='f8s7ccs')>

one() 如果不返回一个对象身份或复合行存在于结果,就会报错。意思是只能返回一个对象

>>> from sqlalchemy.orm.exc import MultipleResultsFound
>>> try: 
...     user = query.one()
... except MultipleResultsFound, e:
...     print eMultiple rows were found for one()

查找不到也会报错

>>> from sqlalchemy.orm.exc import NoResultFound
>>> try: 
...     user = query.filter(User.id == 99).one()
... except NoResultFound, e:
...     print eNo row was found for one()

scalar()

这个函数会调用one(),返回找到底一个列数,(这个没试验过,猜测是像index一样吧)

>>> query = session.query(User.id).filter(User.name == 'ed').\
...    order_by(User.id)
>>> query.scalar() 
7


字符串SQL

>>> for user in session.query(User).\
...             filter("id<224").\
...             order_by("id").all() 
...     print user.nameedwendymaryfred

字符串sql参数绑定,使用一个冒号

>>> session.query(User).filter("id<:value and name=:name").\
...     params(value=224, name='fred').order_by(User.id).one()

完全基于字符串的sql声明表达式:

>>> session.query(User).from_statement(
...        "SELECT * FROM users where name=:name").\
...        params(name='ed').all()

[<User(name='ed', fullname='Ed Jones', password='f8s7ccs')>]


>>> session.query("id", "name", "thenumber12").\
...         from_statement("SELECT id, name, 12 as "
...                 "thenumber12 FROM users where name=:name").\
...                 params(name='ed').all()

[(1, u'ed', 12)]



计数:

count()###性能差劲,看显示的SQ就知道性能不好

>>> session.query(User).filter(User.name.like('%ed')).count() 
2

SELECT count(*) AS count_1
FROM (SELECT users.id AS users_id,
                users.name AS users_name,
                users.fullname AS users_fullname,
                users.password AS users_password
FROM users
WHERE users.name LIKE ?) AS anon_1
('%ed',)

另一种计数法

>>> from sqlalchemy import func
>>> session.query(func.count(User.name), User.name).group_by(User.name).all() 
 
SELECT count(users.name) AS count_1, users.name AS users_name
FROM users GROUP BY users.name

[(1, u'ed'), (1, u'fred'), (1, u'mary'), (1, u'wendy')]


实现  select(×) from  table_name  

session.query(func.count('*')).select_from(User).scalar()



展开阅读全文
加载中

作者的其它热门文章

打赏
0
0 收藏
分享
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部
返回顶部
顶部