Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support null as a valid primary key value #352

Merged
merged 1 commit into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion deltacat/aws/constants.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import botocore
from typing import Set
from daft.exceptions import DaftTransientError

from deltacat.utils.common import env_integer, env_string


Expand Down
13 changes: 9 additions & 4 deletions deltacat/compute/compactor_v2/utils/primary_key_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ def _append_sha1_hash_to_table(table: pa.Table, hash_column: pa.Array) -> pa.Tab

result = []
for hash_value in hash_column_np:
assert hash_value is not None, f"Expected non-null primary key"
result.append(hashlib.sha1(hash_value.encode("utf-8")).hexdigest())
if hash_value is None:
result.append(None)
logger.info("A primary key hash is null")
else:
result.append(hashlib.sha1(hash_value.encode("utf-8")).hexdigest())

return sc.append_pk_hash_string_column(table, result)

Expand Down Expand Up @@ -191,7 +194,7 @@ def _generate_pk_hash(table: pa.Table) -> pa.Array:
pk_columns.append(sliced_string_cast(table[pk_name]))

pk_columns.append(PK_DELIMITER)
hash_column = pc.binary_join_element_wise(*pk_columns)
hash_column = pc.binary_join_element_wise(*pk_columns, null_handling="replace")
return hash_column

def _generate_uuid(table: pa.Table) -> pa.Array:
Expand Down Expand Up @@ -345,8 +348,10 @@ def hash_group_index_to_hash_bucket_indices(
return range(hb_group, num_buckets, num_groups)


def pk_digest_to_hash_bucket_index(digest: str, num_buckets: int) -> int:
def pk_digest_to_hash_bucket_index(digest: Optional[str], num_buckets: int) -> int:
"""
Generates the hash bucket index from the given digest.
"""
if digest is None:
return 0
return int(digest, 16) % num_buckets
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,83 @@ def __iter__(self):
assert_compaction_audit=None,
num_rounds=3,
),
# 4 input deltas (3 upsert, 1 delete delta), 2 rounds requested
# Expect to see a table that aggregates 10 records total
# (12 upserts - 2 deletes (null PK) = 10 records)
# (dropDuplicates = False)
"9-multiple-rounds-delete-deltas-with-null-pk": MultipleRoundsTestCaseParams(
primary_keys={"pk_col_1"},
sort_keys=ZERO_VALUED_SORT_KEY,
partition_keys=[PartitionKey.of("region_id", PartitionKeyType.INT)],
partition_values=["1"],
input_deltas=[
(
pa.Table.from_arrays(
[
pa.array([None, 11, 12, 13]),
pa.array(["a", "b", "c", "d"]),
],
names=["pk_col_1", "col_1"],
),
DeltaType.UPSERT,
None,
),
(
pa.Table.from_arrays(
[
pa.array([14, 15, 16, 17]),
pa.array(["e", "f", "g", "h"]),
],
names=["pk_col_1", "col_1"],
),
DeltaType.UPSERT,
None,
),
(
pa.Table.from_arrays(
[
pa.array([18, 19, 20, 21]),
pa.array(["i", "j", "k", "l"]),
],
names=["pk_col_1", "col_1"],
),
DeltaType.UPSERT,
None,
),
(
pa.Table.from_arrays(
[pa.array([None, 11]), pa.array(["a", "b"])],
names=["pk_col_1", "col_1"],
),
DeltaType.DELETE,
DeleteParameters.of(["pk_col_1", "col_1"]),
),
],
rebase_expected_compact_partition_result=pa.Table.from_arrays(
[
pa.array([i for i in range(12, 22)]),
pa.array(["c", "d", "e", "f", "g", "h", "i", "j", "k", "l"]),
],
names=["pk_col_1", "col_1"],
),
expected_terminal_compact_partition_result=pa.Table.from_arrays(
[
pa.array([i for i in range(12, 22)]),
pa.array(["c", "d", "e", "f", "g", "h", "i", "j", "k", "l"]),
],
names=["pk_col_1", "col_1"],
),
expected_terminal_exception=None,
expected_terminal_exception_message=None,
do_create_placement_group=False,
records_per_compacted_file=DEFAULT_MAX_RECORDS_PER_FILE,
hash_bucket_count=DEFAULT_HASH_BUCKET_COUNT,
read_kwargs_provider=None,
drop_duplicates=False,
skip_enabled_compact_partition_drivers=[CompactorVersion.V1],
assert_compaction_audit=None,
num_rounds=2,
),
}

MULTIPLE_ROUNDS_TEST_CASES = with_compactor_version_func_test_param(
Expand Down
Loading
Loading