SQLAlchemy 动态切换数据源

原创
2019/04/24 19:29
阅读数 2.2K
from sqlalchemy import Column, String, create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy.ext.declarative import declarative_base

# 创建对象的基类:
from werkzeug.local import LocalProxy

Base = declarative_base()


# 定义User对象:
class User(Base):
    # 表的名字:
    __tablename__ = 'user'

    # 表的结构:
    id = Column(String(20), primary_key=True)
    name = Column(String(20))


# # 初始化数据库连接:
# engine = create_engine('sqlite:///test1.db')
# # 创建DBSession类型:
# DBSession = sessionmaker(bind=engine)
db_flag = 1


class SessionManager(object):
    def __init__(self, base_uri=None, **kwargs):
        self.session = None
        self.base_uri = base_uri
        self.kwargs = kwargs

    def get_session(self):
        if db_flag == 1:
            db_name = 'db1.db'
        else:
            db_name = 'db2.db'

        if self.session:
            if self.session.name == db_name:
                return self.session
            else:
                self.session.remove()
                self.session = None

        if not self.session:
            engine = create_engine(self.base_uri % db_name, **self.kwargs)
            Base.metadata.create_all(engine)
            db_session = scoped_session(sessionmaker(bind=engine))
            db_session.name = db_name
            self.session = db_session
        return db_session


session_manager = SessionManager(base_uri='sqlite:///%s', pool_recycle=3600)
db_session = LocalProxy(session_manager.get_session)


def save_user():
    # 创建session对象:
    # session = DBSession()
    # # 创建新User对象:
    # new_user = User(id='5', name='Bob')
    # # 添加到session:
    # session.add(new_user)
    # # 提交即保存到数据库:
    # session.commit()
    # # 关闭session:
    # session.close()
    new_user = User(id='1', name='Zhanghai')
    db_session.add(new_user)
    db_session.commit()
    db_session.close()


# def get_user():
#     session = DBSession()
#     one = session.query(User).filter(User.id == 5).one()
#     print(one.name)
#

def get_user():
    one = db_session.query(User).filter(User.id == 1).one()
    print(one.name)


if __name__ == '__main__':
    # save_user()
    get_user()

 

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部