Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleaning up rehashing logic as it is a dead code as of now. #166

Merged
merged 2 commits into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
57 changes: 0 additions & 57 deletions deltacat/compute/compactor/steps/rehash/rehash_bucket.py

This file was deleted.

48 changes: 0 additions & 48 deletions deltacat/compute/compactor/steps/rehash/rewrite_index.py

This file was deleted.

223 changes: 1 addition & 222 deletions deltacat/compute/compactor/utils/primary_key_index.py
Original file line number Diff line number Diff line change
@@ -1,180 +1,21 @@
import json
import logging
from collections import defaultdict
from typing import Any, Callable, Dict, List, Optional, Tuple
from typing import List, Optional, Tuple

import numpy as np
import pyarrow as pa
import ray
import s3fs
from ray.types import ObjectRef

from deltacat import logs
from deltacat.aws import s3u
from deltacat.compute.compactor import (
PrimaryKeyIndexLocator,
PrimaryKeyIndexMeta,
PrimaryKeyIndexVersionLocator,
PrimaryKeyIndexVersionMeta,
PyArrowWriteResult,
RoundCompletionInfo,
)
from deltacat.compute.compactor.steps.rehash import rehash_bucket as rb
from deltacat.compute.compactor.steps.rehash import rewrite_index as ri
from deltacat.compute.compactor.utils import round_completion_file as rcf
from deltacat.compute.compactor.utils import system_columns as sc
from deltacat.constants import PRIMARY_KEY_INDEX_WRITE_BOTO3_CONFIG
from deltacat.storage import Manifest, PartitionLocator
from deltacat.types.media import ContentEncoding, ContentType
from deltacat.types.tables import get_table_slicer, get_table_writer
from deltacat.utils.common import ReadKwargsProvider
from deltacat.utils.ray_utils.concurrency import invoke_parallel
from deltacat.io.object_store import IObjectStore

logger = logs.configure_deltacat_logger(logging.getLogger(__name__))


def rehash(
options_provider: Callable[[int, Any], Dict[str, Any]],
s3_bucket: str,
source_partition_locator: PartitionLocator,
old_rci: RoundCompletionInfo,
new_hash_bucket_count: int,
hash_bucket_index_group_count: int,
records_per_primary_key_index_file: int,
delete_old_primary_key_index: bool,
) -> RoundCompletionInfo:

logger.info(
f"Rehashing primary key index. Old round completion info: "
f"{old_rci}. New hash bucket count: {new_hash_bucket_count}"
)

# collect old primary key index information
old_pki_version_locator = old_rci.primary_key_index_version_locator
old_pkiv_meta = old_pki_version_locator.primary_key_index_version_meta
old_pki_meta = old_pkiv_meta.primary_key_index_meta
old_compacted_partition_locator = old_pki_meta.compacted_partition_locator
if old_pkiv_meta.hash_bucket_count == new_hash_bucket_count:
raise ValueError(
f"Primary key index rehash failed. Old hash bucket "
f"count ({new_hash_bucket_count}) is "
f"equal to new hash bucket count. Partition: "
f"{old_compacted_partition_locator}."
)

# generate a new unique primary key index version locator to rehash into
new_pki_meta = PrimaryKeyIndexMeta.of(
old_compacted_partition_locator,
old_pki_meta.primary_keys,
old_pki_meta.sort_keys,
old_pki_meta.primary_key_index_algorithm_version,
)
new_pki_locator = PrimaryKeyIndexLocator.of(new_pki_meta)
new_pki_version_meta = PrimaryKeyIndexVersionMeta.of(
new_pki_meta,
new_hash_bucket_count,
)
rehashed_pki_version_locator = PrimaryKeyIndexVersionLocator.generate(
new_pki_version_meta
)

# launch a rehash task for each bucket of the old primary key index version
old_hash_bucket_count = old_pkiv_meta.hash_bucket_count
hb_tasks_pending = invoke_parallel(
items=range(old_hash_bucket_count),
ray_task=rb.rehash_bucket,
max_parallelism=None,
options_provider=options_provider,
s3_bucket=s3_bucket,
old_pki_version_locator=old_pki_version_locator,
num_buckets=new_hash_bucket_count,
num_groups=hash_bucket_index_group_count,
)
logger.info(f"Getting {len(hb_tasks_pending)} rehash bucket results...")
hb_results = ray.get([t[0] for t in hb_tasks_pending])
logger.info(f"Got {len(hb_results)} rehash bucket results.")
all_hash_group_idx_to_obj_id = defaultdict(list)
for hash_group_idx_to_obj_id in hb_results:
for hash_group_index, object_id in enumerate(hash_group_idx_to_obj_id):
if object_id:
all_hash_group_idx_to_obj_id[hash_group_index].append(object_id)
hash_group_count = len(all_hash_group_idx_to_obj_id)
logger.info(f"Rehash bucket groups created: {hash_group_count}")

# write primary key index files for each rehashed output bucket
pki_stats_promises = invoke_parallel(
items=all_hash_group_idx_to_obj_id.values(),
ray_task=ri.rewrite_index,
max_parallelism=None,
options_provider=options_provider,
s3_bucket=s3_bucket,
new_primary_key_index_version_locator=rehashed_pki_version_locator,
max_records_per_index_file=records_per_primary_key_index_file,
)
logger.info(f"Getting {len(pki_stats_promises)} rewrite index results...")
pki_stats = ray.get([t[0] for t in pki_stats_promises])
logger.info(f"Got {len(pki_stats)} rewrite index results.")

round_completion_info = RoundCompletionInfo.of(
old_rci.high_watermark,
old_rci.compacted_delta_locator,
old_rci.compacted_pyarrow_write_result,
PyArrowWriteResult.union(pki_stats),
old_rci.sort_keys_bit_width,
rehashed_pki_version_locator,
old_rci.rebase_source_partition_locator,
)
rcf.write_round_completion_file(
s3_bucket,
source_partition_locator,
new_pki_locator.primary_key_index_root_path,
round_completion_info,
)
if delete_old_primary_key_index:
delete_primary_key_index_version(
s3_bucket,
old_pki_version_locator,
)
logger.info(
f"Rehashed primary key index. New round completion info: "
f"{round_completion_info}."
)
return round_completion_info


def download_hash_bucket_entries(
s3_bucket: str,
hash_bucket_index: int,
primary_key_index_version_locator: PrimaryKeyIndexVersionLocator,
file_reader_kwargs_provider: Optional[ReadKwargsProvider] = None,
) -> List[pa.Table]:

pk_index_manifest_s3_url = (
primary_key_index_version_locator.get_pkiv_hb_index_manifest_s3_url(
s3_bucket,
hash_bucket_index,
)
)
result = s3u.download(pk_index_manifest_s3_url, False)
logger.info(
f"Downloading primary key index hash bucket manifest entries: "
f"{pk_index_manifest_s3_url}. Primary key index version "
f"locator: {primary_key_index_version_locator}"
)
pk_index_manifest = Manifest(json.loads(result["Body"].read().decode("utf-8")))
tables = s3u.download_manifest_entries(
pk_index_manifest, file_reader_kwargs_provider=file_reader_kwargs_provider
)
if not tables:
logger.warning(
f"Primary key index manifest is empty at: "
f"{pk_index_manifest_s3_url}. Primary key index version "
f"locator: {primary_key_index_version_locator}"
)
return tables


def delete_primary_key_index_version(
s3_bucket: str, pki_version_locator: PrimaryKeyIndexVersionLocator
) -> None:
Expand Down Expand Up @@ -243,65 +84,3 @@ def pk_digest_to_hash_bucket_index(digest, num_buckets: int) -> int:
"""

return int.from_bytes(digest, "big") % num_buckets


def write_primary_key_index_files(
table: pa.Table,
primary_key_index_version_locator: PrimaryKeyIndexVersionLocator,
s3_bucket: str,
hb_index: int,
records_per_index_file: int,
) -> PyArrowWriteResult:
"""
Writes primary key index files for the given hash bucket index out to the
specified S3 bucket at the path identified by the given primary key index
version locator. Output is written as 1 or more Parquet files with the
given maximum number of records per file.

TODO(raghumdani): Support writing primary key index to any data catalog
"""
logger.info(
f"Writing primary key index files for hash bucket {hb_index}. "
f"Primary key index version locator: "
f"{primary_key_index_version_locator}."
)
s3_file_system = s3fs.S3FileSystem(
anon=False,
s3_additional_kwargs={
"ContentType": ContentType.PARQUET.value,
"ContentEncoding": ContentEncoding.IDENTITY.value,
},
config_kwargs=PRIMARY_KEY_INDEX_WRITE_BOTO3_CONFIG,
)
pkiv_hb_index_s3_url_base = (
primary_key_index_version_locator.get_pkiv_hb_index_s3_url_base(
s3_bucket, hb_index
)
)
manifest_entries = s3u.upload_sliced_table(
table,
pkiv_hb_index_s3_url_base,
s3_file_system,
records_per_index_file,
get_table_writer(table),
get_table_slicer(table),
)
manifest = Manifest.of(manifest_entries)
pkiv_hb_index_s3_manifest_s3_url = (
primary_key_index_version_locator.get_pkiv_hb_index_manifest_s3_url(
s3_bucket, hb_index
)
)
s3u.upload(pkiv_hb_index_s3_manifest_s3_url, str(json.dumps(manifest)))
result = PyArrowWriteResult.of(
len(manifest_entries),
table.nbytes,
manifest.meta.content_length,
len(table),
)
logger.info(
f"Wrote primary key index files for hash bucket {hb_index}. "
f"Primary key index version locator: "
f"{primary_key_index_version_locator}. Result: {result}"
)
return result
4 changes: 0 additions & 4 deletions deltacat/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,4 @@
# Inflation multiplier from snappy-compressed parquet to pyarrow for all columns.
PYARROW_INFLATION_MULTIPLIER_ALL_COLUMNS = 6

PRIMARY_KEY_INDEX_WRITE_BOTO3_CONFIG = {
"retries": {"max_attempts": 25, "mode": "standard"}
}

MEMORY_TO_HASH_BUCKET_COUNT_RATIO = 0.0512 * BYTES_PER_TEBIBYTE