Skip to content

Commit

Permalink
fix(ingest/s3): Fixing container creation when there is no folder in …
Browse files Browse the repository at this point in the history
…path (#10993)
  • Loading branch information
treff7es authored Jul 25, 2024
1 parent dd732d0 commit 71d1cdb
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,23 +160,24 @@ def create_container_hierarchy(
)
return

for folder in parent_folder_path.split("/"):
abs_path = folder
if parent_key:
prefix: str = ""
if isinstance(parent_key, BucketKey):
prefix = parent_key.bucket_name
elif isinstance(parent_key, FolderKey):
prefix = parent_key.folder_abs_path
abs_path = prefix + "/" + folder
folder_key = self.gen_folder_key(abs_path)
yield from self.create_emit_containers(
container_key=folder_key,
name=folder,
sub_types=[DatasetContainerSubTypes.FOLDER],
parent_container_key=parent_key,
)
parent_key = folder_key
if parent_folder_path:
for folder in parent_folder_path.split("/"):
abs_path = folder
if parent_key:
prefix: str = ""
if isinstance(parent_key, BucketKey):
prefix = parent_key.bucket_name
elif isinstance(parent_key, FolderKey):
prefix = parent_key.folder_abs_path
abs_path = prefix + "/" + folder
folder_key = self.gen_folder_key(abs_path)
yield from self.create_emit_containers(
container_key=folder_key,
name=folder,
sub_types=[DatasetContainerSubTypes.FOLDER],
parent_container_key=parent_key,
)
parent_key = folder_key

assert parent_key is not None
yield from add_dataset_to_container(parent_key, dataset_urn)
78 changes: 78 additions & 0 deletions metadata-ingestion/tests/unit/s3/test_s3_source.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
from typing import List

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.data_lake_common.data_lake_utils import ContainerWUCreator
from datahub.ingestion.source.data_lake_common.path_spec import PathSpec
from datahub.ingestion.source.s3.source import partitioned_folder_comparator

Expand Down Expand Up @@ -91,3 +96,76 @@ def test_path_spec_dir_allowed():

path = "s3://my-bucket/my-folder/year=2022/month=10/day=10/"
assert path_spec.dir_allowed(path) is False, f"{path} should be denied"


def test_container_generation_without_folders():
cwu = ContainerWUCreator("s3", None, "PROD")
mcps = cwu.create_container_hierarchy(
"s3://my-bucket/my-file.json.gz", "urn:li:dataset:123"
)

def container_properties_filter(x: MetadataWorkUnit) -> bool:
assert isinstance(x.metadata, MetadataChangeProposalWrapper)
return x.metadata.aspectName == "containerProperties"

container_properties: List = list(filter(container_properties_filter, mcps))
assert len(container_properties) == 1
assert container_properties[0].metadata.aspect.customProperties == {
"bucket_name": "my-bucket",
"env": "PROD",
"platform": "s3",
}


def test_container_generation_with_folder():
cwu = ContainerWUCreator("s3", None, "PROD")
mcps = cwu.create_container_hierarchy(
"s3://my-bucket/my-dir/my-file.json.gz", "urn:li:dataset:123"
)

def container_properties_filter(x: MetadataWorkUnit) -> bool:
assert isinstance(x.metadata, MetadataChangeProposalWrapper)
return x.metadata.aspectName == "containerProperties"

container_properties: List = list(filter(container_properties_filter, mcps))
assert len(container_properties) == 2
assert container_properties[0].metadata.aspect.customProperties == {
"bucket_name": "my-bucket",
"env": "PROD",
"platform": "s3",
}
assert container_properties[1].metadata.aspect.customProperties == {
"env": "PROD",
"folder_abs_path": "my-bucket/my-dir",
"platform": "s3",
}


def test_container_generation_with_multiple_folders():
cwu = ContainerWUCreator("s3", None, "PROD")
mcps = cwu.create_container_hierarchy(
"s3://my-bucket/my-dir/my-dir2/my-file.json.gz", "urn:li:dataset:123"
)

def container_properties_filter(x: MetadataWorkUnit) -> bool:
assert isinstance(x.metadata, MetadataChangeProposalWrapper)
return x.metadata.aspectName == "containerProperties"

container_properties: List = list(filter(container_properties_filter, mcps))

assert len(container_properties) == 3
assert container_properties[0].metadata.aspect.customProperties == {
"bucket_name": "my-bucket",
"env": "PROD",
"platform": "s3",
}
assert container_properties[1].metadata.aspect.customProperties == {
"env": "PROD",
"folder_abs_path": "my-bucket/my-dir",
"platform": "s3",
}
assert container_properties[2].metadata.aspect.customProperties == {
"env": "PROD",
"folder_abs_path": "my-bucket/my-dir/my-dir2",
"platform": "s3",
}

0 comments on commit 71d1cdb

Please sign in to comment.