首页 > 代码库 > 自动获取数据库表结构

自动获取数据库表结构

Python

将sandman包中的自动获取数据库表结构的部分提取出来

包名auto_get_database

__init__.py

 

from flask import Flask
from flask.ext.sqlalchemy import SQLAlchemy


app = Flask(__name__)
db = SQLAlchemy(app)

 

models.py

 

from decimal import Decimal

from flask import current_app
from flask.ext.admin.contrib.sqla import ModelView

from auto_get_database import app, db


class Model(object):
    __endpoint__ = None

    __tablename__ = None

    __table__ = None

    __from_class__ = None

    @classmethod
    def endpoint(cls):
        endpoint = ‘‘
        if cls.__endpoint__ is not None:
            return cls.__endpoint__
        elif cls.__from_class__ is not None:
            endpoint = cls.__from_class__.__name__.lower()
        else:
            endpoint = cls.__tablename__.lower()
        if not endpoint.endswith(s):
            endpoint += s
        return endpoint


    @classmethod
    def primary_key(cls):
        if cls.__from_class__:
            cls = cls.__from_class__
        return cls.__table__.primary_key.columns.values()[0].name

    def links(self):
        links = []
        for foreign_key in self.__table__.foreign_keys:
            column = foreign_key.column.name
            column_value = getattr(self, column, None)
            if column_value:
                table = foreign_key.column.table.name
                with app.app_context():
                    endpoint = current_app.class_references[table]
                links.append({rel: related, uri: /{}/{}.format(
                    endpoint.__name__, column_value)})
        links.append({rel: self, uri: self.resource_uri()})
        return links

    def as_dict(self, depth=0):
        result_dict = {}
        for column in self.__table__.columns.keys():
            result_dict[column] = getattr(self, column, None)
            if isinstance(result_dict[column], Decimal):
                result_dict[column] = str(result_dict[column])
        result_dict[links] = self.links()
        for foreign_key in self.__table__.foreign_keys:
            column_name = foreign_key.column.name
            column_value = getattr(self, column_name, None)
            if column_value:
                table = foreign_key.column.table.name
                with app.app_context():
                    endpoint = current_app.class_references[table]
                    session = db.session()
                    resource = session.query(endpoint).get(column_value)
                if depth > 0:
                    result_dict.update({
                        rel: endpoint.__name__,
                        endpoint.__name__.lower(): resource.as_dict(depth - 1)
                        })
                else:
                    result_dict[
                        endpoint.__name__.lower() + _url] = /{}/{}.format(
                        endpoint.__name__, column_value)

        result_dict[self] = self.resource_uri()
        return result_dict

    def from_dict(self, dictionary):
        for column in self.__table__.columns.keys():
            value = dictionary.get(column, None)
            if value:
                setattr(self, column, value)

    def replace(self, dictionary):
        for column in self.__table__.columns.keys():
            setattr(self, column, None)
        self.from_dict(dictionary)


    def __str__(self):
        return str(getattr(self, self.primary_key()))


class AdminModelViewWithPK(ModelView):
    column_display_pk = True

 

utils.py

 

"""自动获取数据库表结构"""
import webbrowser
import collections

from sqlalchemy.engine import reflection
from sqlalchemy.ext.declarative import declarative_base, DeferredReflection
from sqlalchemy.orm import relationship
from sqlalchemy.schema import Table

from flask import current_app

from models import Model

from auto_get_database import app, db


def generate_endpoint_classes(db):
    seen_classes = set()
    for cls in current_app.class_references.values():
        seen_classes.add(cls.__tablename__)
    with app.app_context():
        db.metadata.reflect(bind=db.engine)
        for name, table in db.metadata.tables.items():
            if not name in seen_classes:
                seen_classes.add(name)
                cls = type(str(name), (sandman_model, db.Model), {‘__tablename__‘: name})
                register(cls)


def add_pk_if_required(db, table, name):
    db.metadata.reflect(bind=db.engine)
    cls_dict = {‘__tablename__‘: name}
    if not table.primary_key:
        for column in table.columns:
            column.primary_key = True
        Table(name, db.metadata, *table.columns, extend_existing=True)
        cls_dict[‘__table__‘] = table
        db.metadata.create_all(bind=db.engine)

    return type(str(name), (sandman_model, db.Model), cls_dict)


def prepare_relationships(db, known_tables):
    inspector = reflection.Inspector.from_engine(db.engine)
    for cls in set(known_tables.values()):
        for foreign_key in inspector.get_foreign_keys(cls.__tablename__):
            if foreign_key[‘referred_table‘] in known_tables:
                other = known_tables[foreign_key[‘referred_table‘]]
                constrained_column = foreign_key[‘constrained_columns‘]
                if other not in cls.__related_tables__ and cls not in (other.__related_tables__) and other != cls:
                    cls.__related_tables__.add(other)
                    setattr(cls, other.__table__.name, relationship(other.__name__, backref=db.backref(cls.__name__.lower()), foreign_keys=str(cls.__name__) + ‘.‘ + ‘‘.join(constrained_column)))


def register(cls, use_admin=True):
    with app.app_context():
        if getattr(current_app, ‘class_references‘, None) is None:
            current_app.class_references = {}
        if isinstance(cls, (list, tuple)):
            for entry in cls:
                register_internal_data(entry)
                entry.use_admin = use_admin
        else:
            register_internal_data(cls)
            cls.use_admin = use_admin


def register_internal_data(cls):
    with app.app_context():
        if getattr(cls, ‘endpoint‘, None) is None:
            orig_class = cls
            cls = type(‘Sandman‘ + cls.__name__, (cls, Model), {})
            cls.__from_class__ = orig_class
        current_app.class_references[cls.__tablename__] = cls
        current_app.class_references[cls.__name__] = cls
        current_app.class_references[cls.endpoint()] = cls
        if not getattr(cls, ‘__related_tables__‘, None):
            cls.__related_tables__ = set()


def activate(name=‘admin‘):
    with app.app_context():
        app.class_references = collections.OrderedDict()
        generate_endpoint_classes(db)
        prepare_relationships(db, current_app.class_references)


sandman_model = Model
Model = declarative_base(cls=(Model, DeferredReflection))

 

main.py为使用示例

from auto_get_database import app
from utils import activate

from flask import current_app


engine = ‘mssql+pymssql://**:****************@***.***.***.***/*************‘
app.config[‘SQLALCHEMY_DATABASE_URI‘] = engine
activate()

classes = []
with app.app_context():
    classes = set(current_app.class_references.values())

for i in classes:
    print(i.__table__.columns.keys())

 

自动获取数据库表结构