Skip to content

Commit

Permalink
feat(ingest): remove source config from DatahubIngestionCheckpoint (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Dec 14, 2022
1 parent 8109b8b commit 2f95719
Show file tree
Hide file tree
Showing 8 changed files with 11 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,28 +99,17 @@ class Checkpoint(Generic[StateType]):
pipeline_name: str
platform_instance_id: str
run_id: str
config: ConfigModel
state: StateType

@classmethod
def create_from_checkpoint_aspect(
cls,
job_name: str,
checkpoint_aspect: Optional[DatahubIngestionCheckpointClass],
config_class: Type[ConfigModel],
state_class: Type[StateType],
) -> Optional["Checkpoint"]:
if checkpoint_aspect is None:
return None
try:
# Construct the config
config_as_dict = json.loads(checkpoint_aspect.config)
config_obj = config_class.parse_obj(config_as_dict)
except Exception as e:
# Failure to load config is probably okay...config structure has changed.
logger.warning(
"Failed to construct checkpoint's config from checkpoint aspect. %s", e
)
else:
try:
if checkpoint_aspect.state.serde == "utf-8":
Expand Down Expand Up @@ -153,7 +142,6 @@ def create_from_checkpoint_aspect(
pipeline_name=checkpoint_aspect.pipelineName,
platform_instance_id=checkpoint_aspect.platformInstanceId,
run_id=checkpoint_aspect.runId,
config=config_obj,
state=state_obj,
)
logger.info(
Expand Down Expand Up @@ -230,7 +218,7 @@ def to_checkpoint_aspect(
pipelineName=self.pipeline_name,
platformInstanceId=self.platform_instance_id,
runId=self.run_id,
config=self.config.json(),
config="",
state=checkpoint_state,
)
return checkpoint_aspect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

import pydantic

from datahub.configuration.common import ConfigModel
from datahub.ingestion.api.ingestion_job_state_provider import JobId
from datahub.ingestion.source.state.checkpoint import Checkpoint
from datahub.ingestion.source.state.stateful_ingestion_base import (
Expand Down Expand Up @@ -42,17 +41,14 @@ class RedundantRunSkipHandler(
def __init__(
self,
source: StatefulIngestionSourceBase,
config: Optional[StatefulIngestionConfigBase],
config: StatefulIngestionConfigBase[StatefulRedundantRunSkipConfig],
pipeline_name: Optional[str],
run_id: str,
):
self.source = source
self.config = config
self.stateful_ingestion_config = (
cast(StatefulRedundantRunSkipConfig, self.config.stateful_ingestion)
if self.config
else None
)
self.stateful_ingestion_config: Optional[
StatefulRedundantRunSkipConfig
] = config.stateful_ingestion
self.pipeline_name = pipeline_name
self.run_id = run_id
self.checkpointing_enabled: bool = source.is_stateful_ingestion_configured()
Expand Down Expand Up @@ -100,14 +96,12 @@ def create_checkpoint(self) -> Optional[Checkpoint[BaseUsageCheckpointState]]:
if not self.is_checkpointing_enabled() or self._ignore_new_state():
return None

assert self.config is not None
assert self.pipeline_name is not None
return Checkpoint(
job_name=self.job_id,
pipeline_name=self.pipeline_name,
platform_instance_id=self.source.get_platform_instance_id(),
run_id=self.run_id,
config=cast(ConfigModel, self.config),
state=BaseUsageCheckpointState(
begin_timestamp_millis=self.INVALID_TIMESTAMP_VALUE,
end_timestamp_millis=self.INVALID_TIMESTAMP_VALUE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import pydantic

from datahub.configuration.common import ConfigModel
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.ingestion_job_state_provider import JobId
from datahub.ingestion.api.workunit import MetadataWorkUnit
Expand Down Expand Up @@ -140,21 +139,18 @@ class hierarchies.
def __init__(
self,
source: StatefulIngestionSourceBase,
config: Optional[StatefulIngestionConfigBase],
config: StatefulIngestionConfigBase[StatefulStaleMetadataRemovalConfig],
state_type_class: Type[StaleEntityCheckpointStateBase],
pipeline_name: Optional[str],
run_id: str,
):
self.config = config
self.source = source
self.state_type_class = state_type_class
self.pipeline_name = pipeline_name
self.run_id = run_id
self.stateful_ingestion_config = (
cast(StatefulStaleMetadataRemovalConfig, self.config.stateful_ingestion)
if self.config
else None
)
self.stateful_ingestion_config: Optional[
StatefulStaleMetadataRemovalConfig
] = config.stateful_ingestion
self.checkpointing_enabled: bool = (
True
if (
Expand Down Expand Up @@ -217,7 +213,6 @@ def create_checkpoint(self) -> Optional[Checkpoint]:
pipeline_name=self.pipeline_name,
platform_instance_id=self.source.get_platform_instance_id(),
run_id=self.run_id,
config=cast(ConfigModel, self.config),
state=self.state_type_class(),
)
return None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ def __init__(
) -> None:
super().__init__(ctx)
self.stateful_ingestion_config = config.stateful_ingestion
self.source_config_type = type(config)
self.last_checkpoints: Dict[JobId, Optional[Checkpoint]] = {}
self.cur_checkpoints: Dict[JobId, Optional[Checkpoint]] = {}
self.run_summaries_to_report: Dict[JobId, DatahubIngestionRunSummaryClass] = {}
Expand Down Expand Up @@ -246,7 +245,6 @@ def _get_last_checkpoint(
last_checkpoint = Checkpoint[StateType].create_from_checkpoint_aspect(
job_name=job_id,
checkpoint_aspect=last_checkpoint_aspect,
config_class=self.source_config_type,
state_class=checkpoint_state_class,
)
return last_checkpoint
Expand Down
1 change: 0 additions & 1 deletion metadata-ingestion/tests/integration/dbt/test_dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,6 @@ def get_fake_base_sql_alchemy_checkpoint_state(
pipeline_name=dbt_source.ctx.pipeline_name,
platform_instance_id=dbt_source.get_platform_instance_id(),
run_id=dbt_source.ctx.run_id,
config=dbt_source.config,
state=sql_state,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,7 @@ def test_kafka_ingest_with_stateful(
== f"urn:li:dataset:(urn:li:dataPlatform:kafka,{platform_instance}.{kafka_ctx.topics[0]},PROD)"
)

# 4. Checkpoint configuration should be the same.
assert checkpoint1.config == checkpoint2.config

# 5. Validate that all providers have committed successfully.
# 4. Validate that all providers have committed successfully.
# NOTE: The following validation asserts for presence of state as well
# and validates reporting.
validate_all_providers_have_committed_successfully(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
JobId,
JobStateKey,
)
from datahub.ingestion.source.sql.postgres import PostgresConfig
from datahub.ingestion.source.state.checkpoint import Checkpoint
from datahub.ingestion.source.state.sql_common_state import (
BaseSQLAlchemyCheckpointState,
Expand Down Expand Up @@ -124,7 +123,6 @@ def test_provider(self):
pipeline_name=self.pipeline_name,
platform_instance_id=self.platform_instance_id,
run_id=self.run_id,
config=PostgresConfig(host_port="localhost:5432"),
state=job1_state_obj,
)
# Job2 - Checkpoint with a BaseUsageCheckpointState state
Expand All @@ -136,7 +134,6 @@ def test_provider(self):
pipeline_name=self.pipeline_name,
platform_instance_id=self.platform_instance_id,
run_id=self.run_id,
config=PostgresConfig(host_port="localhost:5432"),
state=job2_state_obj,
)

Expand Down Expand Up @@ -171,7 +168,6 @@ def test_provider(self):
job_name=self.job_names[0],
checkpoint_aspect=last_state[self.job_names[0]],
state_class=type(job1_state_obj),
config_class=type(job1_checkpoint.config),
)
self.assertEqual(job1_last_checkpoint, job1_checkpoint)

Expand All @@ -180,6 +176,5 @@ def test_provider(self):
job_name=self.job_names[1],
checkpoint_aspect=last_state[self.job_names[1]],
state_class=type(job2_state_obj),
config_class=type(job2_checkpoint.config),
)
self.assertEqual(job2_last_checkpoint, job2_checkpoint)
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
import pytest

from datahub.emitter.mce_builder import make_dataset_urn
from datahub.ingestion.source.sql.postgres import PostgresConfig
from datahub.ingestion.source.sql.sql_common import BasicSQLAlchemyConfig
from datahub.ingestion.source.state.checkpoint import Checkpoint, CheckpointStateBase
from datahub.ingestion.source.state.sql_common_state import (
BaseSQLAlchemyCheckpointState,
Expand All @@ -22,7 +20,6 @@
test_platform_instance_id: str = "test_platform_instance_1"
test_job_name: str = "test_job_1"
test_run_id: str = "test_run_1"
test_source_config: BasicSQLAlchemyConfig = PostgresConfig(host_port="test_host:1234")


def _assert_checkpoint_deserialization(
Expand All @@ -34,7 +31,7 @@ def _assert_checkpoint_deserialization(
timestampMillis=int(datetime.utcnow().timestamp() * 1000),
pipelineName=test_pipeline_name,
platformInstanceId=test_platform_instance_id,
config=test_source_config.json(),
config="",
state=serialized_checkpoint_state,
runId=test_run_id,
)
Expand All @@ -44,15 +41,13 @@ def _assert_checkpoint_deserialization(
job_name=test_job_name,
checkpoint_aspect=checkpoint_aspect,
state_class=type(expected_checkpoint_state),
config_class=PostgresConfig,
)

expected_checkpoint_obj = Checkpoint(
job_name=test_job_name,
pipeline_name=test_pipeline_name,
platform_instance_id=test_platform_instance_id,
run_id=test_run_id,
config=test_source_config,
state=expected_checkpoint_state,
)
assert checkpoint_obj == expected_checkpoint_obj
Expand Down Expand Up @@ -127,7 +122,6 @@ def test_serde_idempotence(state_obj):
pipeline_name=test_pipeline_name,
platform_instance_id=test_platform_instance_id,
run_id=test_run_id,
config=test_source_config,
state=state_obj,
)

Expand All @@ -142,7 +136,6 @@ def test_serde_idempotence(state_obj):
job_name=test_job_name,
checkpoint_aspect=checkpoint_aspect,
state_class=type(state_obj),
config_class=PostgresConfig,
)
assert orig_checkpoint_obj == serde_checkpoint_obj

Expand Down

0 comments on commit 2f95719

Please sign in to comment.