"""
Metadata bases specific to each session (subclasses represent tables affected by the
same session -- ie base.metadata.create_all()/drop_all()/upgrade())
Each class used as part of a session needs to be initialized directly
"""
[docs]__author__ = "Elisha Yadgaran"
import logging
from sqlalchemy import DDL, MetaData, event
from .base_sqlalchemy import BaseSQLAlchemy
[docs]LOGGER = logging.getLogger(__name__)
[docs]class SimplemlCoreSqlalchemy(BaseSQLAlchemy):
"""
Shared metadata for all tables that live in the main schema
"""
# Uses main (public) schema
[docs]class BinaryStorageSqlalchemy(BaseSQLAlchemy):
"""
Shared metadata for all tables that live in the binary storage schema
"""
# Store binary data in its own schema
@event.listens_for(metadata, "before_create", propagate=True)
[docs] def _receive_before_create(target, connection, **kwargs):
"""
Listen for and creates a new schema for datasets
"""
# SQLite supports schemas as "Attached Databases"
# https://www.sqlite.org/lang_attach.html
if connection.dialect.name == "postgresql":
LOGGER.debug("Issuing create schema if not exists")
DDL("""CREATE SCHEMA IF NOT EXISTS "{}";""".format(target.schema)).execute(
connection
)
# elif connection.dialect.name == 'sqlite':
# raise NotImplementedError('SQLite does not support multiple schemas right now')
# TODO: Figure out a mechanism to pass in a dynamic schema so testing
# doesnt mess with existing ones
# DDL('''ATTACH DATABASE "{}";'''.format(target.schema))
else:
raise NotImplementedError(
"Schemas not supported on {dialect}".format(
dialect=connection.dialect.name
)
)
[docs]class DatasetStorageSqlalchemy(BaseSQLAlchemy):
"""
Shared metadata for all tables that live in the dataset storage schema
"""
# Use different schemas/databases for storage optionality (dataframes are big in size)
@event.listens_for(metadata, "before_create", propagate=True)
[docs] def _receive_before_create(target, connection, **kwargs):
"""
Listen for and creates a new schema for datasets
"""
# SQLite supports schemas as "Attached Databases"
# https://www.sqlite.org/lang_attach.html
if connection.dialect.name == "postgresql":
LOGGER.debug("Issuing create schema if not exists")
DDL("""CREATE SCHEMA IF NOT EXISTS "{}";""".format(target.schema)).execute(
connection
)
# elif connection.dialect.name == 'sqlite':
# raise NotImplementedError('SQLite does not support multiple schemas right now')
# TODO: Figure out a mechanism to pass in a dynamic schema so testing
# doesnt mess with existing ones
# DDL('''ATTACH DATABASE "{}";'''.format(target.schema))
else:
raise NotImplementedError(
"Schemas not supported on {dialect}".format(
dialect=connection.dialect.name
)
)