Skip to content

Commit

Permalink
feat(ingest): delta-lake - extract table history into operation aspect (
Browse files Browse the repository at this point in the history
#5277)

Co-authored-by: Shirshanka Das <[email protected]>
  • Loading branch information
MugdhaHardikar-GSLab and shirshanka authored Aug 7, 2022
1 parent da0258c commit b32a072
Show file tree
Hide file tree
Showing 11 changed files with 828 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ class Config:
default=AllowDenyPattern.allow_all(),
description="regex patterns for tables to filter in ingestion.",
)
version_history_lookback: Optional[int] = Field(
default=1,
description="Number of previous version histories to be ingested. Defaults to 1. If set to -1 all version history will be ingested.",
)

s3: Optional[S3] = Field()

Expand All @@ -72,14 +76,20 @@ def is_s3(self):
def get_complete_path(self):
return self._complete_path

@pydantic.validator("version_history_lookback")
def negative_version_history_implies_no_limit(cls, v):
if v and v < 0:
return None
return v

@pydantic.root_validator()
def validate_config(cls, values: Dict) -> Dict[str, Any]:
values["_is_s3"] = is_s3_uri(values["base_path"])
values["_is_s3"] = is_s3_uri(values.get("base_path", ""))
if values["_is_s3"]:
if values["s3"] is None:
if values.get("s3") is None:
raise ValueError("s3 config must be set for s3 path")
values["_complete_path"] = values["base_path"]
if values["relative_path"] is not None:
values["_complete_path"] = values.get("base_path")
if values.get("relative_path") is not None:
values[
"_complete_path"
] = f"{values['_complete_path'].rstrip('/')}/{values['relative_path'].lstrip('/')}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,7 @@ def read_delta_table(

except PyDeltaTableError as e:
if "Not a Delta table" not in str(e):
import pdb

pdb.set_trace()
raise e

return delta_table


Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging
import os
import time
from datetime import datetime
from typing import Callable, Iterable, List

from deltalake import DeltaTable
Expand All @@ -8,6 +10,7 @@
make_data_platform_urn,
make_dataset_urn_with_platform_instance,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext, WorkUnit
from datahub.ingestion.api.decorators import (
SourceCapability,
Expand Down Expand Up @@ -41,8 +44,11 @@
SchemaMetadata,
)
from datahub.metadata.schema_classes import (
ChangeTypeClass,
DatasetPropertiesClass,
NullTypeClass,
OperationClass,
OperationTypeClass,
OtherSchemaClass,
)
from datahub.telemetry import telemetry
Expand All @@ -54,6 +60,19 @@
"platform",
]

OPERATION_STATEMENT_TYPES = {
"INSERT": OperationTypeClass.INSERT,
"UPDATE": OperationTypeClass.UPDATE,
"DELETE": OperationTypeClass.DELETE,
"MERGE": OperationTypeClass.UPDATE,
"CREATE": OperationTypeClass.CREATE,
"CREATE_TABLE_AS_SELECT": OperationTypeClass.CREATE,
"CREATE_SCHEMA": OperationTypeClass.CREATE,
"DROP_TABLE": OperationTypeClass.DROP,
"REPLACE TABLE AS SELECT": OperationTypeClass.UPDATE,
"COPY INTO": OperationTypeClass.UPDATE,
}


@platform_name("Delta Lake", id="delta-lake")
@config_class(DeltaLakeSourceConfig)
Expand Down Expand Up @@ -122,6 +141,58 @@ def get_fields(self, delta_table: DeltaTable) -> List[SchemaField]:

return fields

def _create_operation_aspect_wu(
self, delta_table: DeltaTable, dataset_urn: str
) -> Iterable[MetadataWorkUnit]:
for hist in delta_table.history(
limit=self.source_config.version_history_lookback
):

# History schema picked up from https://docs.delta.io/latest/delta-utility.html#retrieve-delta-table-history
reported_time: int = int(time.time() * 1000)
last_updated_timestamp: int = hist["timestamp"]
statement_type = OPERATION_STATEMENT_TYPES.get(
hist.get("operation"), OperationTypeClass.CUSTOM
)
custom_type = (
hist.get("operation")
if statement_type == OperationTypeClass.CUSTOM
else None
)

operation_custom_properties = dict()
for key, val in hist.items():
if val is not None:
if isinstance(val, dict):
for k, v in val:
if v is not None:
operation_custom_properties[f"{key}_{k}"] = str(v)
else:
operation_custom_properties[key] = str(val)
operation_custom_properties.pop("timestamp", None)
operation_custom_properties.pop("operation", None)
operation_aspect = OperationClass(
timestampMillis=reported_time,
lastUpdatedTimestamp=last_updated_timestamp,
operationType=statement_type,
customOperationType=custom_type,
customProperties=operation_custom_properties,
)

mcp = MetadataChangeProposalWrapper(
entityType="dataset",
aspectName="operation",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspect=operation_aspect,
)
operational_wu = MetadataWorkUnit(
id=f"{datetime.fromtimestamp(last_updated_timestamp / 1000).isoformat()}-operation-aspect-{dataset_urn}",
mcp=mcp,
)
self.report.report_workunit(operational_wu)
yield operational_wu

def ingest_table(
self, delta_table: DeltaTable, path: str
) -> Iterable[MetadataWorkUnit]:
Expand Down Expand Up @@ -164,11 +235,6 @@ def ingest_table(
"version": str(delta_table.version()),
"location": self.source_config.get_complete_path(),
}
customProperties.update(delta_table.history()[-1])
customProperties["version_creation_time"] = customProperties["timestamp"]
del customProperties["timestamp"]
for key in customProperties.keys():
customProperties[key] = str(customProperties[key])

dataset_properties = DatasetPropertiesClass(
description=delta_table.metadata().description,
Expand Down Expand Up @@ -221,6 +287,8 @@ def ingest_table(
self.report.report_workunit(wu)
yield wu

yield from self._create_operation_aspect_wu(delta_table, dataset_urn)

def process_folder(
self, path: str, get_folders: Callable[[str], Iterable[str]]
) -> Iterable[MetadataWorkUnit]:
Expand Down
Loading

0 comments on commit b32a072

Please sign in to comment.