Skip to content

Commit

Permalink
[apache#5729] feat(client-python): Add distribution expression in Pyt…
Browse files Browse the repository at this point in the history
…hon client (apache#5833)

### What changes were proposed in this pull request?
Implement distributions expression in python client, add unit test.

### Why are the changes needed?
We need to support the  distributions expressions in python client

Fix: apache#5729

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Need to pass all unit tests.

---------

Co-authored-by: Xun <[email protected]>
  • Loading branch information
2 people authored and Abyss-lord committed Dec 13, 2024
1 parent 6e8a650 commit c48c7ac
Show file tree
Hide file tree
Showing 10 changed files with 381 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from abc import abstractmethod
from typing import List

from gravitino.api.expressions.distributions.strategy import Strategy
from gravitino.api.expressions.expression import Expression


class Distribution(Expression):
"""
An interface that defines how data is distributed across partitions.
"""

@abstractmethod
def strategy(self) -> Strategy:
"""Return the distribution strategy name."""

@abstractmethod
def number(self) -> int:
"""Return The number of buckets/distribution. For example, if the distribution strategy is HASH
and the number is 10, then the data is distributed across 10 buckets."""

@abstractmethod
def expressions(self) -> List[Expression]:
"""Return The expressions passed to the distribution function."""

def children(self) -> List[Expression]:
"""
Returns the child expressions.
"""
return self.expressions()

def equals(self, other: "Distribution") -> bool:
"""
Indicates whether some other object is "equal to" this one.
Args:
other (Distribution): The reference distribution object with which to compare.
Returns:
bool: True if this object is the same as the other; False otherwise.
"""
if other is None:
return False

return (
self.strategy() == other.strategy()
and self.number() == other.number()
and self.expressions() == other.expressions()
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import List

from gravitino.api.expressions.distributions.strategy import Strategy
from gravitino.api.expressions.distributions.distribution import Distribution
from gravitino.api.expressions.expression import Expression
from gravitino.api.expressions.named_reference import NamedReference


class DistributionImpl(Distribution):
_strategy: Strategy
_number: int
_expressions: List[Expression]

def __init__(self, strategy: Strategy, number: int, expressions: List[Expression]):
self._strategy = strategy
self._number = number
self._expressions = expressions

def strategy(self) -> Strategy:
return self._strategy

def number(self) -> int:
return self._number

def expressions(self) -> List[Expression]:
return self._expressions

def __str__(self) -> str:
return f"DistributionImpl(strategy={self._strategy}, number={self._number}, expressions={self._expressions})"

def __eq__(self, other: object) -> bool:
if not isinstance(other, DistributionImpl):
return False
return (
self._strategy == other.strategy()
and self._number == other.number()
and self._expressions == other.expressions()
)

def __hash__(self) -> int:
return hash((self._strategy, self._number, tuple(self._expressions)))


class Distributions:
NONE: Distribution = DistributionImpl(Strategy.NONE, 0, Expression.EMPTY_EXPRESSION)
"""NONE is used to indicate that there is no distribution."""
HASH: Distribution = DistributionImpl(Strategy.HASH, 0, Expression.EMPTY_EXPRESSION)
"""List bucketing strategy hash, TODO: #1505 Separate the bucket number from the Distribution."""
RANGE: Distribution = DistributionImpl(
Strategy.RANGE, 0, Expression.EMPTY_EXPRESSION
)
"""List bucketing strategy range, TODO: #1505 Separate the bucket number from the Distribution."""

@staticmethod
def even(number: int, *expressions: Expression) -> Distribution:
"""
Create a distribution by evenly distributing the data across the number of buckets.
:param number: The number of buckets.
:param expressions: The expressions to distribute by.
:return: The created even distribution.
"""
return DistributionImpl(Strategy.EVEN, number, list(expressions))

@staticmethod
def hash(number: int, *expressions: Expression) -> Distribution:
"""
Create a distribution by hashing the data across the number of buckets.
:param number: The number of buckets.
:param expressions: The expressions to distribute by.
:return: The created hash distribution.
"""
return DistributionImpl(Strategy.HASH, number, list(expressions))

@staticmethod
def of(strategy: Strategy, number: int, *expressions: Expression) -> Distribution:
"""
Create a distribution by the given strategy.
:param strategy: The strategy to use.
:param number: The number of buckets.
:param expressions: The expressions to distribute by.
:return: The created distribution.
"""
return DistributionImpl(strategy, number, list(expressions))

@staticmethod
def fields(
strategy: Strategy, number: int, *field_names: List[str]
) -> Distribution:
"""
Create a distribution on columns. Like distribute by (a) or (a, b), for complex like
distributing by (func(a), b) or (func(a), func(b)), please use DistributionImpl.Builder to create.
NOTE: a, b, c are column names.
SQL syntax: distribute by hash(a, b) buckets 5
fields(Strategy.HASH, 5, ["a"], ["b"])
SQL syntax: distribute by hash(a, b, c) buckets 10
fields(Strategy.HASH, 10, ["a"], ["b"], ["c"])
SQL syntax: distribute by EVEN(a) buckets 128
fields(Strategy.EVEN, 128, ["a"])
:param strategy: The strategy to use.
:param number: The number of buckets.
:param field_names: The field names to distribute by.
:return: The created distribution.
"""
expressions = [NamedReference.field(name) for name in field_names]
return Distributions.of(strategy, number, *expressions)
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from enum import Enum


class Strategy(Enum):
"""
An enum that defines the distribution strategy.
The following strategies are supported:
- NONE: No distribution strategy, depends on the underlying system's allocation.
- HASH: Uses the hash value of the expression to distribute data.
- RANGE: Uses the specified range of the expression to distribute data.
- EVEN: Distributes data evenly across partitions.
"""

NONE = "NONE"
HASH = "HASH"
RANGE = "RANGE"
EVEN = "EVEN"

@staticmethod
def get_by_name(name: str) -> "Strategy":
upper_name = name.upper()
if upper_name == "NONE":
return Strategy.NONE
elif upper_name == "HASH":
return Strategy.HASH
elif upper_name == "RANGE":
return Strategy.RANGE
elif upper_name in {"EVEN", "RANDOM"}:
return Strategy.EVEN
else:
raise ValueError(
f"Invalid distribution strategy: {name}. Valid values are: {[s.value for s in Strategy]}"
)
10 changes: 5 additions & 5 deletions clients/client-python/gravitino/api/expressions/expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from __future__ import annotations
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, List

if TYPE_CHECKING:
from gravitino.api.expressions.named_reference import NamedReference
Expand All @@ -26,23 +26,23 @@
class Expression(ABC):
"""Base class of the public logical expression API."""

EMPTY_EXPRESSION: list[Expression] = []
EMPTY_EXPRESSION: List[Expression] = []
"""
`EMPTY_EXPRESSION` is only used as an input when the default `children` method builds the result.
"""

EMPTY_NAMED_REFERENCE: list[NamedReference] = []
EMPTY_NAMED_REFERENCE: List[NamedReference] = []
"""
`EMPTY_NAMED_REFERENCE` is only used as an input when the default `references` method builds
the result array to avoid repeatedly allocating an empty array.
"""

@abstractmethod
def children(self) -> list[Expression]:
def children(self) -> List[Expression]:
"""Returns a list of the children of this node. Children should not change."""
pass

def references(self) -> list[NamedReference]:
def references(self) -> List[NamedReference]:
"""Returns a list of fields or columns that are referenced by this expression."""

ref_set: set[NamedReference] = set()
Expand Down
16 changes: 16 additions & 0 deletions clients/client-python/tests/unittests/rel/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Loading

0 comments on commit c48c7ac

Please sign in to comment.