Skip to content

Commit

Permalink
feat: BI-5955 schema migration factory (#743)
Browse files Browse the repository at this point in the history
* feat: BI-5955 schema migration factory
  • Loading branch information
ForrestGump authored Jan 14, 2025
1 parent b5035a2 commit 230db12
Show file tree
Hide file tree
Showing 11 changed files with 103 additions and 4 deletions.
5 changes: 5 additions & 0 deletions lib/dl_core/dl_core/connectors/base/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
DefaultConnectionLifecycleManager,
)
from dl_core.connectors.base.query_compiler import QueryCompiler
from dl_core.connectors.base.schema_migration import (
ConnectionSchemaMigration,
DefaultConnectionSchemaMigration,
)
from dl_core.connectors.settings.primitives import ConnectorSettingsDefinition
from dl_core.data_source.base import DataSource
from dl_core.data_source_spec.base import DataSourceSpec
Expand Down Expand Up @@ -65,6 +69,7 @@ class CoreConnectionDefinition(abc.ABC):
sync_conn_executor_cls: ClassVar[Optional[Type[ConnExecutorBase]]] = None
async_conn_executor_cls: ClassVar[Optional[Type[AsyncConnExecutorBase]]] = None
lifecycle_manager_cls: ClassVar[Type[ConnectionLifecycleManager]] = DefaultConnectionLifecycleManager
schema_migration_cls: ClassVar[Type[ConnectionSchemaMigration]] = DefaultConnectionSchemaMigration
dialect_string: ClassVar[str]
data_source_migrator_cls: ClassVar[Type[DataSourceMigrator]] = DefaultDataSourceMigrator
settings_definition: ClassVar[Optional[Type[ConnectorSettingsDefinition]]] = None
Expand Down
1 change: 1 addition & 0 deletions lib/dl_core/dl_core/connectors/base/registrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def register_connection_definition(
allow_ct_override=True,
conn_type=conn_def.conn_type,
lifecycle_manager_cls=conn_def.lifecycle_manager_cls,
schema_migration_cls=conn_def.schema_migration_cls,
)
register_connection_backend_type(conn_type=conn_def.conn_type, backend_type=backend_type)
register_connection_schema(conn_cls=conn_def.connection_cls, schema_cls=conn_def.us_storage_schema_cls) # type: ignore # 2024-01-30 # TODO: Argument "schema_cls" to "register_connection_schema" has incompatible type "type[Schema] | None"; expected "type[Schema]" [arg-type]
Expand Down
16 changes: 16 additions & 0 deletions lib/dl_core/dl_core/connectors/base/schema_migration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from dl_core.us_manager.schema_migration.base import (
BaseEntrySchemaMigration,
Migration,
)


class ConnectionSchemaMigration(BaseEntrySchemaMigration):
...


class DefaultConnectionSchemaMigration(ConnectionSchemaMigration):
@property
def migrations(self) -> list[Migration]:
migrations = []
migrations.extend(super().migrations)
return migrations
14 changes: 14 additions & 0 deletions lib/dl_core/dl_core/us_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
ConnectionLifecycleManager,
DefaultConnectionLifecycleManager,
)
from dl_core.connectors.base.schema_migration import (
ConnectionSchemaMigration,
DefaultConnectionSchemaMigration,
)
from dl_core.us_connection_base import (
ConnectionBase,
UnknownConnection,
Expand All @@ -19,6 +23,9 @@
CONNECTION_LIFECYCLE_MGR_CLASSES: dict[ConnectionType, Type[ConnectionLifecycleManager]] = {
ConnectionType.unknown: DefaultConnectionLifecycleManager,
}
CONNECTION_SHEMA_MIGRATION_CLASSES: dict[str, Type[ConnectionSchemaMigration]] = {
ConnectionType.unknown.name: DefaultConnectionSchemaMigration,
}


def get_connection_class(conn_type: ConnectionType) -> Type[ConnectionBase]:
Expand All @@ -31,10 +38,16 @@ def get_lifecycle_manager_cls(conn_type: ConnectionType) -> Type[ConnectionLifec
return CONNECTION_LIFECYCLE_MGR_CLASSES[conn_type]


def get_schema_migration_cls(conn_type_name: str) -> Type[ConnectionSchemaMigration]:
"""Return class for given connection type"""
return CONNECTION_SHEMA_MIGRATION_CLASSES[conn_type_name]


def register_connection_class(
new_conn_cls: Type[ConnectionBase],
conn_type: ConnectionType,
lifecycle_manager_cls: Type[ConnectionLifecycleManager] = DefaultConnectionLifecycleManager,
schema_migration_cls: Type[ConnectionSchemaMigration] = DefaultConnectionSchemaMigration,
allow_ct_override: bool = False,
) -> None:
if conn_type is None:
Expand All @@ -49,3 +62,4 @@ def register_connection_class(

CONNECTION_TYPES[conn_type] = new_conn_cls
CONNECTION_LIFECYCLE_MGR_CLASSES[conn_type] = lifecycle_manager_cls
CONNECTION_SHEMA_MIGRATION_CLASSES[conn_type.name] = schema_migration_cls
4 changes: 0 additions & 4 deletions lib/dl_core/dl_core/us_manager/schema_migration/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@
import attr
from typing_extensions import Self

from dl_api_commons.base_models import RequestContextInfo
from dl_app_tools.profiling_base import generic_profiler
from dl_core.services_registry import ServicesRegistry


LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -42,8 +40,6 @@ def migrate(self, entry: dict) -> dict:

@attr.s
class BaseEntrySchemaMigration:
bi_context: RequestContextInfo | None = attr.ib(default=None)
services_registry: ServicesRegistry | None = attr.ib(default=None)
strict_migration: bool = attr.ib(default=False)

@property
Expand Down
12 changes: 12 additions & 0 deletions lib/dl_core/dl_core/us_manager/schema_migration/dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from dl_core.us_manager.schema_migration.base import (
BaseEntrySchemaMigration,
Migration,
)


class DatasetSchemaMigration(BaseEntrySchemaMigration):
@property
def migrations(self) -> list[Migration]:
migrations = []
migrations.extend(super().migrations)
return migrations
28 changes: 28 additions & 0 deletions lib/dl_core/dl_core/us_manager/schema_migration/factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from dl_core.us_connection import get_schema_migration_cls
from dl_core.us_manager.schema_migration.base import BaseEntrySchemaMigration
from dl_core.us_manager.schema_migration.dataset import DatasetSchemaMigration
from dl_core.us_manager.schema_migration.factory_base import EntrySchemaMigrationFactoryBase


class DummyEntrySchemaMigrationFactory(EntrySchemaMigrationFactoryBase):
def get_schema_migration(
self,
entry_scope: str,
entry_type: str,
) -> BaseEntrySchemaMigration:
return BaseEntrySchemaMigration()


class DefaultEntrySchemaMigrationFactory(EntrySchemaMigrationFactoryBase):
def get_schema_migration(
self,
entry_scope: str,
entry_type: str,
) -> BaseEntrySchemaMigration:
if entry_scope == "dataset":
return DatasetSchemaMigration()
elif entry_scope == "connection":
schema_migration_cls = get_schema_migration_cls(conn_type_name=entry_type)
return schema_migration_cls()
else:
return BaseEntrySchemaMigration()
13 changes: 13 additions & 0 deletions lib/dl_core/dl_core/us_manager/schema_migration/factory_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import abc

from dl_core.us_manager.schema_migration.base import BaseEntrySchemaMigration


class EntrySchemaMigrationFactoryBase(abc.ABC):
@abc.abstractmethod
def get_schema_migration(
self,
entry_scope: str,
entry_type: str,
) -> BaseEntrySchemaMigration:
pass
8 changes: 8 additions & 0 deletions lib/dl_core/dl_core/us_manager/us_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
)
from dl_core.us_manager.crypto.main import CryptoController
from dl_core.us_manager.local_cache import USEntryBuffer
from dl_core.us_manager.schema_migration.factory import DefaultEntrySchemaMigrationFactory
from dl_core.us_manager.schema_migration.factory_base import EntrySchemaMigrationFactoryBase
from dl_core.us_manager.storage_schemas.connection_schema_registry import MAP_TYPE_TO_SCHEMA_MAP_TYPE_TO_SCHEMA
from dl_core.us_manager.storage_schemas.dataset import DatasetStorageSchema
from dl_core.us_manager.us_entry_serializer import (
Expand Down Expand Up @@ -100,6 +102,7 @@ class USManagerBase:
_us_client: UStorageClientBase
_fake_us_client: FakeUSClient
_lifecycle_manager_factory: EntryLifecycleManagerFactoryBase
_schema_migration_factory: EntrySchemaMigrationFactoryBase

def __init__(
self,
Expand All @@ -110,6 +113,7 @@ def __init__(
us_auth_context: USAuthContextBase,
services_registry: ServicesRegistry,
lifecycle_manager_factory: Optional[EntryLifecycleManagerFactoryBase] = None,
schema_migration_factory: Optional[EntrySchemaMigrationFactoryBase] = None,
):
# TODO FIX: Try to connect it together to eliminate possible divergence
if services_registry is not None:
Expand Down Expand Up @@ -144,6 +148,10 @@ def __init__(
assert lifecycle_manager_factory is not None
self._lifecycle_manager_factory = lifecycle_manager_factory

schema_migration_factory = schema_migration_factory or DefaultEntrySchemaMigrationFactory()
assert schema_migration_factory is not None
self._schema_migration_factory = schema_migration_factory

def get_entry_buffer(self) -> USEntryBuffer:
return self._loaded_entries

Expand Down
3 changes: 3 additions & 0 deletions lib/dl_core/dl_core/us_manager/us_manager_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
BrokenUSLink,
BrokenUSLinkErrorKind,
)
from dl_core.us_manager.schema_migration.factory_base import EntrySchemaMigrationFactoryBase
from dl_core.us_manager.us_manager import USManagerBase
from dl_utils.aio import shield_wait_for_complete

Expand Down Expand Up @@ -60,6 +61,7 @@ def __init__(
crypto_keys_config: Optional[CryptoKeysConfig] = None,
us_api_prefix: Optional[str] = None,
lifecycle_manager_factory: Optional[EntryLifecycleManagerFactoryBase] = None,
schema_migration_factory: Optional[EntrySchemaMigrationFactoryBase] = None,
):
self._us_client = UStorageClientAIO(
host=us_base_url,
Expand All @@ -80,6 +82,7 @@ def __init__(
us_auth_context=us_auth_context,
services_registry=services_registry,
lifecycle_manager_factory=lifecycle_manager_factory,
schema_migration_factory=schema_migration_factory,
)

@property
Expand Down
3 changes: 3 additions & 0 deletions lib/dl_core/dl_core/us_manager/us_manager_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
BrokenUSLink,
BrokenUSLinkErrorKind,
)
from dl_core.us_manager.schema_migration.factory_base import EntrySchemaMigrationFactoryBase
from dl_core.us_manager.us_manager import USManagerBase
from dl_utils.aio import await_sync

Expand Down Expand Up @@ -64,6 +65,7 @@ def __init__(
# caches_redis: Optional[aioredis.Redis] = None,
request_timeout_sec: int = 30, # WARNING: unused.
lifecycle_manager_factory: Optional[EntryLifecycleManagerFactoryBase] = None,
schema_migration_factory: Optional[EntrySchemaMigrationFactoryBase] = None,
):
super().__init__(
bi_context=bi_context,
Expand All @@ -73,6 +75,7 @@ def __init__(
us_auth_context=us_auth_context,
services_registry=services_registry,
lifecycle_manager_factory=lifecycle_manager_factory,
schema_migration_factory=schema_migration_factory,
)
self._us_client = self._create_us_client()

Expand Down

0 comments on commit 230db12

Please sign in to comment.