diff --git a/lib/dl_core/dl_core/connectors/base/connector.py b/lib/dl_core/dl_core/connectors/base/connector.py index 8b948bb61..262d94b6a 100644 --- a/lib/dl_core/dl_core/connectors/base/connector.py +++ b/lib/dl_core/dl_core/connectors/base/connector.py @@ -23,6 +23,10 @@ DefaultConnectionLifecycleManager, ) from dl_core.connectors.base.query_compiler import QueryCompiler +from dl_core.connectors.base.schema_migration import ( + ConnectionSchemaMigration, + DefaultConnectionSchemaMigration, +) from dl_core.connectors.settings.primitives import ConnectorSettingsDefinition from dl_core.data_source.base import DataSource from dl_core.data_source_spec.base import DataSourceSpec @@ -65,6 +69,7 @@ class CoreConnectionDefinition(abc.ABC): sync_conn_executor_cls: ClassVar[Optional[Type[ConnExecutorBase]]] = None async_conn_executor_cls: ClassVar[Optional[Type[AsyncConnExecutorBase]]] = None lifecycle_manager_cls: ClassVar[Type[ConnectionLifecycleManager]] = DefaultConnectionLifecycleManager + schema_migration_cls: ClassVar[Type[ConnectionSchemaMigration]] = DefaultConnectionSchemaMigration dialect_string: ClassVar[str] data_source_migrator_cls: ClassVar[Type[DataSourceMigrator]] = DefaultDataSourceMigrator settings_definition: ClassVar[Optional[Type[ConnectorSettingsDefinition]]] = None diff --git a/lib/dl_core/dl_core/connectors/base/registrator.py b/lib/dl_core/dl_core/connectors/base/registrator.py index 40402dbbc..50a3f5465 100644 --- a/lib/dl_core/dl_core/connectors/base/registrator.py +++ b/lib/dl_core/dl_core/connectors/base/registrator.py @@ -67,6 +67,7 @@ def register_connection_definition( allow_ct_override=True, conn_type=conn_def.conn_type, lifecycle_manager_cls=conn_def.lifecycle_manager_cls, + schema_migration_cls=conn_def.schema_migration_cls, ) register_connection_backend_type(conn_type=conn_def.conn_type, backend_type=backend_type) register_connection_schema(conn_cls=conn_def.connection_cls, schema_cls=conn_def.us_storage_schema_cls) # type: ignore # 2024-01-30 # TODO: Argument "schema_cls" to "register_connection_schema" has incompatible type "type[Schema] | None"; expected "type[Schema]" [arg-type] diff --git a/lib/dl_core/dl_core/connectors/base/schema_migration.py b/lib/dl_core/dl_core/connectors/base/schema_migration.py new file mode 100644 index 000000000..a2170d1d7 --- /dev/null +++ b/lib/dl_core/dl_core/connectors/base/schema_migration.py @@ -0,0 +1,16 @@ +from dl_core.us_manager.schema_migration.base import ( + BaseEntrySchemaMigration, + Migration, +) + + +class ConnectionSchemaMigration(BaseEntrySchemaMigration): + ... + + +class DefaultConnectionSchemaMigration(ConnectionSchemaMigration): + @property + def migrations(self) -> list[Migration]: + migrations = [] + migrations.extend(super().migrations) + return migrations diff --git a/lib/dl_core/dl_core/us_connection.py b/lib/dl_core/dl_core/us_connection.py index 2387b604f..d1d04e3fe 100644 --- a/lib/dl_core/dl_core/us_connection.py +++ b/lib/dl_core/dl_core/us_connection.py @@ -7,6 +7,10 @@ ConnectionLifecycleManager, DefaultConnectionLifecycleManager, ) +from dl_core.connectors.base.schema_migration import ( + ConnectionSchemaMigration, + DefaultConnectionSchemaMigration, +) from dl_core.us_connection_base import ( ConnectionBase, UnknownConnection, @@ -19,6 +23,9 @@ CONNECTION_LIFECYCLE_MGR_CLASSES: dict[ConnectionType, Type[ConnectionLifecycleManager]] = { ConnectionType.unknown: DefaultConnectionLifecycleManager, } +CONNECTION_SHEMA_MIGRATION_CLASSES: dict[str, Type[ConnectionSchemaMigration]] = { + ConnectionType.unknown.name: DefaultConnectionSchemaMigration, +} def get_connection_class(conn_type: ConnectionType) -> Type[ConnectionBase]: @@ -31,10 +38,16 @@ def get_lifecycle_manager_cls(conn_type: ConnectionType) -> Type[ConnectionLifec return CONNECTION_LIFECYCLE_MGR_CLASSES[conn_type] +def get_schema_migration_cls(conn_type_name: str) -> Type[ConnectionSchemaMigration]: + """Return class for given connection type""" + return CONNECTION_SHEMA_MIGRATION_CLASSES[conn_type_name] + + def register_connection_class( new_conn_cls: Type[ConnectionBase], conn_type: ConnectionType, lifecycle_manager_cls: Type[ConnectionLifecycleManager] = DefaultConnectionLifecycleManager, + schema_migration_cls: Type[ConnectionSchemaMigration] = DefaultConnectionSchemaMigration, allow_ct_override: bool = False, ) -> None: if conn_type is None: @@ -49,3 +62,4 @@ def register_connection_class( CONNECTION_TYPES[conn_type] = new_conn_cls CONNECTION_LIFECYCLE_MGR_CLASSES[conn_type] = lifecycle_manager_cls + CONNECTION_SHEMA_MIGRATION_CLASSES[conn_type.name] = schema_migration_cls diff --git a/lib/dl_core/dl_core/us_manager/schema_migration/base.py b/lib/dl_core/dl_core/us_manager/schema_migration/base.py index 779c7551d..47d3d4687 100644 --- a/lib/dl_core/dl_core/us_manager/schema_migration/base.py +++ b/lib/dl_core/dl_core/us_manager/schema_migration/base.py @@ -9,9 +9,7 @@ import attr from typing_extensions import Self -from dl_api_commons.base_models import RequestContextInfo from dl_app_tools.profiling_base import generic_profiler -from dl_core.services_registry import ServicesRegistry LOGGER = logging.getLogger(__name__) @@ -42,8 +40,6 @@ def migrate(self, entry: dict) -> dict: @attr.s class BaseEntrySchemaMigration: - bi_context: RequestContextInfo | None = attr.ib(default=None) - services_registry: ServicesRegistry | None = attr.ib(default=None) strict_migration: bool = attr.ib(default=False) @property diff --git a/lib/dl_core/dl_core/us_manager/schema_migration/dataset.py b/lib/dl_core/dl_core/us_manager/schema_migration/dataset.py new file mode 100644 index 000000000..65be745c5 --- /dev/null +++ b/lib/dl_core/dl_core/us_manager/schema_migration/dataset.py @@ -0,0 +1,12 @@ +from dl_core.us_manager.schema_migration.base import ( + BaseEntrySchemaMigration, + Migration, +) + + +class DatasetSchemaMigration(BaseEntrySchemaMigration): + @property + def migrations(self) -> list[Migration]: + migrations = [] + migrations.extend(super().migrations) + return migrations diff --git a/lib/dl_core/dl_core/us_manager/schema_migration/factory.py b/lib/dl_core/dl_core/us_manager/schema_migration/factory.py new file mode 100644 index 000000000..414381ff0 --- /dev/null +++ b/lib/dl_core/dl_core/us_manager/schema_migration/factory.py @@ -0,0 +1,28 @@ +from dl_core.us_connection import get_schema_migration_cls +from dl_core.us_manager.schema_migration.base import BaseEntrySchemaMigration +from dl_core.us_manager.schema_migration.dataset import DatasetSchemaMigration +from dl_core.us_manager.schema_migration.factory_base import EntrySchemaMigrationFactoryBase + + +class DummyEntrySchemaMigrationFactory(EntrySchemaMigrationFactoryBase): + def get_schema_migration( + self, + entry_scope: str, + entry_type: str, + ) -> BaseEntrySchemaMigration: + return BaseEntrySchemaMigration() + + +class DefaultEntrySchemaMigrationFactory(EntrySchemaMigrationFactoryBase): + def get_schema_migration( + self, + entry_scope: str, + entry_type: str, + ) -> BaseEntrySchemaMigration: + if entry_scope == "dataset": + return DatasetSchemaMigration() + elif entry_scope == "connection": + schema_migration_cls = get_schema_migration_cls(conn_type_name=entry_type) + return schema_migration_cls() + else: + return BaseEntrySchemaMigration() diff --git a/lib/dl_core/dl_core/us_manager/schema_migration/factory_base.py b/lib/dl_core/dl_core/us_manager/schema_migration/factory_base.py new file mode 100644 index 000000000..0a3bd59d7 --- /dev/null +++ b/lib/dl_core/dl_core/us_manager/schema_migration/factory_base.py @@ -0,0 +1,13 @@ +import abc + +from dl_core.us_manager.schema_migration.base import BaseEntrySchemaMigration + + +class EntrySchemaMigrationFactoryBase(abc.ABC): + @abc.abstractmethod + def get_schema_migration( + self, + entry_scope: str, + entry_type: str, + ) -> BaseEntrySchemaMigration: + pass diff --git a/lib/dl_core/dl_core/us_manager/us_manager.py b/lib/dl_core/dl_core/us_manager/us_manager.py index a91184a07..8d9ff408c 100644 --- a/lib/dl_core/dl_core/us_manager/us_manager.py +++ b/lib/dl_core/dl_core/us_manager/us_manager.py @@ -54,6 +54,8 @@ ) from dl_core.us_manager.crypto.main import CryptoController from dl_core.us_manager.local_cache import USEntryBuffer +from dl_core.us_manager.schema_migration.factory import DefaultEntrySchemaMigrationFactory +from dl_core.us_manager.schema_migration.factory_base import EntrySchemaMigrationFactoryBase from dl_core.us_manager.storage_schemas.connection_schema_registry import MAP_TYPE_TO_SCHEMA_MAP_TYPE_TO_SCHEMA from dl_core.us_manager.storage_schemas.dataset import DatasetStorageSchema from dl_core.us_manager.us_entry_serializer import ( @@ -100,6 +102,7 @@ class USManagerBase: _us_client: UStorageClientBase _fake_us_client: FakeUSClient _lifecycle_manager_factory: EntryLifecycleManagerFactoryBase + _schema_migration_factory: EntrySchemaMigrationFactoryBase def __init__( self, @@ -110,6 +113,7 @@ def __init__( us_auth_context: USAuthContextBase, services_registry: ServicesRegistry, lifecycle_manager_factory: Optional[EntryLifecycleManagerFactoryBase] = None, + schema_migration_factory: Optional[EntrySchemaMigrationFactoryBase] = None, ): # TODO FIX: Try to connect it together to eliminate possible divergence if services_registry is not None: @@ -144,6 +148,10 @@ def __init__( assert lifecycle_manager_factory is not None self._lifecycle_manager_factory = lifecycle_manager_factory + schema_migration_factory = schema_migration_factory or DefaultEntrySchemaMigrationFactory() + assert schema_migration_factory is not None + self._schema_migration_factory = schema_migration_factory + def get_entry_buffer(self) -> USEntryBuffer: return self._loaded_entries diff --git a/lib/dl_core/dl_core/us_manager/us_manager_async.py b/lib/dl_core/dl_core/us_manager/us_manager_async.py index b4d1a86a6..9e6629d12 100644 --- a/lib/dl_core/dl_core/us_manager/us_manager_async.py +++ b/lib/dl_core/dl_core/us_manager/us_manager_async.py @@ -32,6 +32,7 @@ BrokenUSLink, BrokenUSLinkErrorKind, ) +from dl_core.us_manager.schema_migration.factory_base import EntrySchemaMigrationFactoryBase from dl_core.us_manager.us_manager import USManagerBase from dl_utils.aio import shield_wait_for_complete @@ -60,6 +61,7 @@ def __init__( crypto_keys_config: Optional[CryptoKeysConfig] = None, us_api_prefix: Optional[str] = None, lifecycle_manager_factory: Optional[EntryLifecycleManagerFactoryBase] = None, + schema_migration_factory: Optional[EntrySchemaMigrationFactoryBase] = None, ): self._us_client = UStorageClientAIO( host=us_base_url, @@ -80,6 +82,7 @@ def __init__( us_auth_context=us_auth_context, services_registry=services_registry, lifecycle_manager_factory=lifecycle_manager_factory, + schema_migration_factory=schema_migration_factory, ) @property diff --git a/lib/dl_core/dl_core/us_manager/us_manager_sync.py b/lib/dl_core/dl_core/us_manager/us_manager_sync.py index 6bbb9c369..544b1e46f 100644 --- a/lib/dl_core/dl_core/us_manager/us_manager_sync.py +++ b/lib/dl_core/dl_core/us_manager/us_manager_sync.py @@ -36,6 +36,7 @@ BrokenUSLink, BrokenUSLinkErrorKind, ) +from dl_core.us_manager.schema_migration.factory_base import EntrySchemaMigrationFactoryBase from dl_core.us_manager.us_manager import USManagerBase from dl_utils.aio import await_sync @@ -64,6 +65,7 @@ def __init__( # caches_redis: Optional[aioredis.Redis] = None, request_timeout_sec: int = 30, # WARNING: unused. lifecycle_manager_factory: Optional[EntryLifecycleManagerFactoryBase] = None, + schema_migration_factory: Optional[EntrySchemaMigrationFactoryBase] = None, ): super().__init__( bi_context=bi_context, @@ -73,6 +75,7 @@ def __init__( us_auth_context=us_auth_context, services_registry=services_registry, lifecycle_manager_factory=lifecycle_manager_factory, + schema_migration_factory=schema_migration_factory, ) self._us_client = self._create_us_client()