Skip to content

Commit

Permalink
Remove as visitors import (apache#567)
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko authored Apr 1, 2024
1 parent 7fcdb8d commit 1cdd63f
Showing 1 changed file with 11 additions and 10 deletions.
21 changes: 11 additions & 10 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
from typing_extensions import Annotated

import pyiceberg.expressions.parser as parser
import pyiceberg.expressions.visitors as visitors
from pyiceberg.exceptions import CommitFailedException, ResolveError, ValidationError
from pyiceberg.expressions import (
AlwaysTrue,
Expand All @@ -56,6 +55,12 @@
EqualTo,
Reference,
)
from pyiceberg.expressions.visitors import (
_InclusiveMetricsEvaluator,
expression_evaluator,
inclusive_projection,
manifest_evaluator,
)
from pyiceberg.io import FileIO, load_file_io
from pyiceberg.manifest import (
POSITIONAL_DELETE_SCHEMA,
Expand Down Expand Up @@ -1445,9 +1450,7 @@ def _match_deletes_to_data_file(data_entry: ManifestEntry, positional_delete_ent
relevant_entries = positional_delete_entries[positional_delete_entries.bisect_right(data_entry) :]

if len(relevant_entries) > 0:
evaluator = visitors._InclusiveMetricsEvaluator(
POSITIONAL_DELETE_SCHEMA, EqualTo("file_path", data_entry.data_file.file_path)
)
evaluator = _InclusiveMetricsEvaluator(POSITIONAL_DELETE_SCHEMA, EqualTo("file_path", data_entry.data_file.file_path))
return {
positional_delete_entry.data_file
for positional_delete_entry in relevant_entries
Expand All @@ -1471,7 +1474,7 @@ def __init__(
super().__init__(table, row_filter, selected_fields, case_sensitive, snapshot_id, options, limit)

def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
project = visitors.inclusive_projection(self.table.schema(), self.table.specs()[spec_id])
project = inclusive_projection(self.table.schema(), self.table.specs()[spec_id])
return project(self.row_filter)

@cached_property
Expand All @@ -1480,7 +1483,7 @@ def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:

def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]:
spec = self.table.specs()[spec_id]
return visitors.manifest_evaluator(spec, self.table.schema(), self.partition_filters[spec_id], self.case_sensitive)
return manifest_evaluator(spec, self.table.schema(), self.partition_filters[spec_id], self.case_sensitive)

def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]:
spec = self.table.specs()[spec_id]
Expand All @@ -1491,9 +1494,7 @@ def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]
# The lambda created here is run in multiple threads.
# So we avoid creating _EvaluatorExpression methods bound to a single
# shared instance across multiple threads.
return lambda data_file: visitors.expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(
data_file.partition
)
return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition)

def _check_sequence_number(self, min_data_sequence_number: int, manifest: ManifestFile) -> bool:
"""Ensure that no manifests are loaded that contain deletes that are older than the data.
Expand Down Expand Up @@ -1538,7 +1539,7 @@ def plan_files(self) -> Iterable[FileScanTask]:
# this filter depends on the partition spec used to write the manifest file

partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator)
metrics_evaluator = visitors._InclusiveMetricsEvaluator(
metrics_evaluator = _InclusiveMetricsEvaluator(
self.table.schema(), self.row_filter, self.case_sensitive, self.options.get("include_empty_files") == "true"
).eval

Expand Down

0 comments on commit 1cdd63f

Please sign in to comment.