diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/checkpoint.py b/metadata-ingestion/src/datahub/ingestion/source/state/checkpoint.py index b33d061e1a5b5..97f2876ff4023 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/state/checkpoint.py +++ b/metadata-ingestion/src/datahub/ingestion/source/state/checkpoint.py @@ -99,7 +99,6 @@ class Checkpoint(Generic[StateType]): pipeline_name: str platform_instance_id: str run_id: str - config: ConfigModel state: StateType @classmethod @@ -107,20 +106,10 @@ def create_from_checkpoint_aspect( cls, job_name: str, checkpoint_aspect: Optional[DatahubIngestionCheckpointClass], - config_class: Type[ConfigModel], state_class: Type[StateType], ) -> Optional["Checkpoint"]: if checkpoint_aspect is None: return None - try: - # Construct the config - config_as_dict = json.loads(checkpoint_aspect.config) - config_obj = config_class.parse_obj(config_as_dict) - except Exception as e: - # Failure to load config is probably okay...config structure has changed. - logger.warning( - "Failed to construct checkpoint's config from checkpoint aspect. %s", e - ) else: try: if checkpoint_aspect.state.serde == "utf-8": @@ -153,7 +142,6 @@ def create_from_checkpoint_aspect( pipeline_name=checkpoint_aspect.pipelineName, platform_instance_id=checkpoint_aspect.platformInstanceId, run_id=checkpoint_aspect.runId, - config=config_obj, state=state_obj, ) logger.info( @@ -230,7 +218,7 @@ def to_checkpoint_aspect( pipelineName=self.pipeline_name, platformInstanceId=self.platform_instance_id, runId=self.run_id, - config=self.config.json(), + config="", state=checkpoint_state, ) return checkpoint_aspect diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/redundant_run_skip_handler.py b/metadata-ingestion/src/datahub/ingestion/source/state/redundant_run_skip_handler.py index cf58be727fa52..e8d0f6017b211 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/state/redundant_run_skip_handler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/state/redundant_run_skip_handler.py @@ -3,7 +3,6 @@ import pydantic -from datahub.configuration.common import ConfigModel from datahub.ingestion.api.ingestion_job_state_provider import JobId from datahub.ingestion.source.state.checkpoint import Checkpoint from datahub.ingestion.source.state.stateful_ingestion_base import ( @@ -42,17 +41,14 @@ class RedundantRunSkipHandler( def __init__( self, source: StatefulIngestionSourceBase, - config: Optional[StatefulIngestionConfigBase], + config: StatefulIngestionConfigBase[StatefulRedundantRunSkipConfig], pipeline_name: Optional[str], run_id: str, ): self.source = source - self.config = config - self.stateful_ingestion_config = ( - cast(StatefulRedundantRunSkipConfig, self.config.stateful_ingestion) - if self.config - else None - ) + self.stateful_ingestion_config: Optional[ + StatefulRedundantRunSkipConfig + ] = config.stateful_ingestion self.pipeline_name = pipeline_name self.run_id = run_id self.checkpointing_enabled: bool = source.is_stateful_ingestion_configured() @@ -100,14 +96,12 @@ def create_checkpoint(self) -> Optional[Checkpoint[BaseUsageCheckpointState]]: if not self.is_checkpointing_enabled() or self._ignore_new_state(): return None - assert self.config is not None assert self.pipeline_name is not None return Checkpoint( job_name=self.job_id, pipeline_name=self.pipeline_name, platform_instance_id=self.source.get_platform_instance_id(), run_id=self.run_id, - config=cast(ConfigModel, self.config), state=BaseUsageCheckpointState( begin_timestamp_millis=self.INVALID_TIMESTAMP_VALUE, end_timestamp_millis=self.INVALID_TIMESTAMP_VALUE, diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py b/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py index 9e6d60e0e6b52..1f5475a34b93a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py @@ -5,7 +5,6 @@ import pydantic -from datahub.configuration.common import ConfigModel from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.ingestion_job_state_provider import JobId from datahub.ingestion.api.workunit import MetadataWorkUnit @@ -140,21 +139,18 @@ class hierarchies. def __init__( self, source: StatefulIngestionSourceBase, - config: Optional[StatefulIngestionConfigBase], + config: StatefulIngestionConfigBase[StatefulStaleMetadataRemovalConfig], state_type_class: Type[StaleEntityCheckpointStateBase], pipeline_name: Optional[str], run_id: str, ): - self.config = config self.source = source self.state_type_class = state_type_class self.pipeline_name = pipeline_name self.run_id = run_id - self.stateful_ingestion_config = ( - cast(StatefulStaleMetadataRemovalConfig, self.config.stateful_ingestion) - if self.config - else None - ) + self.stateful_ingestion_config: Optional[ + StatefulStaleMetadataRemovalConfig + ] = config.stateful_ingestion self.checkpointing_enabled: bool = ( True if ( @@ -217,7 +213,6 @@ def create_checkpoint(self) -> Optional[Checkpoint]: pipeline_name=self.pipeline_name, platform_instance_id=self.source.get_platform_instance_id(), run_id=self.run_id, - config=cast(ConfigModel, self.config), state=self.state_type_class(), ) return None diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py b/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py index 1f930a6381370..4241205e75476 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py +++ b/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py @@ -116,7 +116,6 @@ def __init__( ) -> None: super().__init__(ctx) self.stateful_ingestion_config = config.stateful_ingestion - self.source_config_type = type(config) self.last_checkpoints: Dict[JobId, Optional[Checkpoint]] = {} self.cur_checkpoints: Dict[JobId, Optional[Checkpoint]] = {} self.run_summaries_to_report: Dict[JobId, DatahubIngestionRunSummaryClass] = {} @@ -246,7 +245,6 @@ def _get_last_checkpoint( last_checkpoint = Checkpoint[StateType].create_from_checkpoint_aspect( job_name=job_id, checkpoint_aspect=last_checkpoint_aspect, - config_class=self.source_config_type, state_class=checkpoint_state_class, ) return last_checkpoint diff --git a/metadata-ingestion/tests/integration/dbt/test_dbt.py b/metadata-ingestion/tests/integration/dbt/test_dbt.py index 74477245b728f..935c34134eb43 100644 --- a/metadata-ingestion/tests/integration/dbt/test_dbt.py +++ b/metadata-ingestion/tests/integration/dbt/test_dbt.py @@ -454,7 +454,6 @@ def get_fake_base_sql_alchemy_checkpoint_state( pipeline_name=dbt_source.ctx.pipeline_name, platform_instance_id=dbt_source.get_platform_instance_id(), run_id=dbt_source.ctx.run_id, - config=dbt_source.config, state=sql_state, ) diff --git a/metadata-ingestion/tests/integration/kafka/test_kafka_state.py b/metadata-ingestion/tests/integration/kafka/test_kafka_state.py index 26b919a4047b3..7136365c8e6a4 100644 --- a/metadata-ingestion/tests/integration/kafka/test_kafka_state.py +++ b/metadata-ingestion/tests/integration/kafka/test_kafka_state.py @@ -184,10 +184,7 @@ def test_kafka_ingest_with_stateful( == f"urn:li:dataset:(urn:li:dataPlatform:kafka,{platform_instance}.{kafka_ctx.topics[0]},PROD)" ) - # 4. Checkpoint configuration should be the same. - assert checkpoint1.config == checkpoint2.config - - # 5. Validate that all providers have committed successfully. + # 4. Validate that all providers have committed successfully. # NOTE: The following validation asserts for presence of state as well # and validates reporting. validate_all_providers_have_committed_successfully( diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/provider/test_datahub_ingestion_checkpointing_provider.py b/metadata-ingestion/tests/unit/stateful_ingestion/provider/test_datahub_ingestion_checkpointing_provider.py index 5062b41677d73..bb19ab077b563 100644 --- a/metadata-ingestion/tests/unit/stateful_ingestion/provider/test_datahub_ingestion_checkpointing_provider.py +++ b/metadata-ingestion/tests/unit/stateful_ingestion/provider/test_datahub_ingestion_checkpointing_provider.py @@ -14,7 +14,6 @@ JobId, JobStateKey, ) -from datahub.ingestion.source.sql.postgres import PostgresConfig from datahub.ingestion.source.state.checkpoint import Checkpoint from datahub.ingestion.source.state.sql_common_state import ( BaseSQLAlchemyCheckpointState, @@ -124,7 +123,6 @@ def test_provider(self): pipeline_name=self.pipeline_name, platform_instance_id=self.platform_instance_id, run_id=self.run_id, - config=PostgresConfig(host_port="localhost:5432"), state=job1_state_obj, ) # Job2 - Checkpoint with a BaseUsageCheckpointState state @@ -136,7 +134,6 @@ def test_provider(self): pipeline_name=self.pipeline_name, platform_instance_id=self.platform_instance_id, run_id=self.run_id, - config=PostgresConfig(host_port="localhost:5432"), state=job2_state_obj, ) @@ -171,7 +168,6 @@ def test_provider(self): job_name=self.job_names[0], checkpoint_aspect=last_state[self.job_names[0]], state_class=type(job1_state_obj), - config_class=type(job1_checkpoint.config), ) self.assertEqual(job1_last_checkpoint, job1_checkpoint) @@ -180,6 +176,5 @@ def test_provider(self): job_name=self.job_names[1], checkpoint_aspect=last_state[self.job_names[1]], state_class=type(job2_state_obj), - config_class=type(job2_checkpoint.config), ) self.assertEqual(job2_last_checkpoint, job2_checkpoint) diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/test_checkpoint.py b/metadata-ingestion/tests/unit/stateful_ingestion/state/test_checkpoint.py index b07eb9dca3d60..f5ad0a84110b8 100644 --- a/metadata-ingestion/tests/unit/stateful_ingestion/state/test_checkpoint.py +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/test_checkpoint.py @@ -5,8 +5,6 @@ import pytest from datahub.emitter.mce_builder import make_dataset_urn -from datahub.ingestion.source.sql.postgres import PostgresConfig -from datahub.ingestion.source.sql.sql_common import BasicSQLAlchemyConfig from datahub.ingestion.source.state.checkpoint import Checkpoint, CheckpointStateBase from datahub.ingestion.source.state.sql_common_state import ( BaseSQLAlchemyCheckpointState, @@ -22,7 +20,6 @@ test_platform_instance_id: str = "test_platform_instance_1" test_job_name: str = "test_job_1" test_run_id: str = "test_run_1" -test_source_config: BasicSQLAlchemyConfig = PostgresConfig(host_port="test_host:1234") def _assert_checkpoint_deserialization( @@ -34,7 +31,7 @@ def _assert_checkpoint_deserialization( timestampMillis=int(datetime.utcnow().timestamp() * 1000), pipelineName=test_pipeline_name, platformInstanceId=test_platform_instance_id, - config=test_source_config.json(), + config="", state=serialized_checkpoint_state, runId=test_run_id, ) @@ -44,7 +41,6 @@ def _assert_checkpoint_deserialization( job_name=test_job_name, checkpoint_aspect=checkpoint_aspect, state_class=type(expected_checkpoint_state), - config_class=PostgresConfig, ) expected_checkpoint_obj = Checkpoint( @@ -52,7 +48,6 @@ def _assert_checkpoint_deserialization( pipeline_name=test_pipeline_name, platform_instance_id=test_platform_instance_id, run_id=test_run_id, - config=test_source_config, state=expected_checkpoint_state, ) assert checkpoint_obj == expected_checkpoint_obj @@ -127,7 +122,6 @@ def test_serde_idempotence(state_obj): pipeline_name=test_pipeline_name, platform_instance_id=test_platform_instance_id, run_id=test_run_id, - config=test_source_config, state=state_obj, ) @@ -142,7 +136,6 @@ def test_serde_idempotence(state_obj): job_name=test_job_name, checkpoint_aspect=checkpoint_aspect, state_class=type(state_obj), - config_class=PostgresConfig, ) assert orig_checkpoint_obj == serde_checkpoint_obj