Skip to content

Commit

Permalink
Merge branch main into colin/explode
Browse files Browse the repository at this point in the history
  • Loading branch information
Colin Ho authored and Colin Ho committed Oct 23, 2024
2 parents f7777cb + 0727dc1 commit 945a0c7
Show file tree
Hide file tree
Showing 45 changed files with 1,441 additions and 637 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,12 @@ jobs:
run: |
uv pip install -r requirements-dev.txt dist/${{ env.package-name }}-*x86_64*.whl --force-reinstall
rm -rf daft
- name: Install ODBC Driver 18 for SQL Server
run: |
curl https://packages.microsoft.com/keys/microsoft.asc | sudo apt-key add -
sudo add-apt-repository https://packages.microsoft.com/ubuntu/$(lsb_release -rs)/prod
sudo apt-get update
sudo ACCEPT_EULA=Y apt-get install -y msodbcsql18
- name: Spin up services
run: |
pushd ./tests/integration/sql/docker-compose/
Expand Down
4 changes: 2 additions & 2 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1659,8 +1659,8 @@ def repartition(self, num: Optional[int], *partition_by: ColumnInputType) -> "Da
def into_partitions(self, num: int) -> "DataFrame":
"""Splits or coalesces DataFrame to ``num`` partitions. Order is preserved.
No rebalancing is done; the minimum number of splits or merges are applied.
(i.e. if there are 2 partitions, and change it into 3, this function will just split the bigger one)
This will naively greedily split partitions in a round-robin fashion to hit the targeted number of partitions.
The number of rows/size in a given partition is not taken into account during the splitting.
Example:
>>> import daft
Expand Down
19 changes: 19 additions & 0 deletions daft/delta_lake/delta_lake_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
ScanTask,
StorageConfig,
)
from daft.io.aws_config import boto3_client_from_s3_config
from daft.io.object_store_options import io_config_to_storage_options
from daft.io.scan import PartitionField, ScanOperator
from daft.logical.schema import Schema
Expand All @@ -43,6 +44,24 @@ def __init__(
deltalake_sdk_io_config = storage_config.config.io_config
scheme = urlparse(table_uri).scheme
if scheme == "s3" or scheme == "s3a":
# Try to get region from boto3
if deltalake_sdk_io_config.s3.region_name is None:
from botocore.exceptions import BotoCoreError

try:
client = boto3_client_from_s3_config("s3", deltalake_sdk_io_config.s3)
response = client.get_bucket_location(Bucket=urlparse(table_uri).netloc)
except BotoCoreError as e:
logger.warning(
"Failed to get the S3 bucket region using existing storage config, will attempt to get it from the environment instead. Error from boto3: %s",
e,
)
else:
deltalake_sdk_io_config = deltalake_sdk_io_config.replace(
s3=deltalake_sdk_io_config.s3.replace(region_name=response["LocationConstraint"])
)

# Try to get config from the environment
if any([deltalake_sdk_io_config.s3.key_id is None, deltalake_sdk_io_config.s3.region_name is None]):
try:
s3_config_from_env = S3Config.from_env()
Expand Down
30 changes: 30 additions & 0 deletions daft/execution/execution_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -1044,3 +1044,33 @@ def run_partial_metadata(self, input_metadatas: list[PartialPartitionMetadata])
)

return results


@dataclass(frozen=True)
class FanoutEvenSlices(FanoutInstruction):
def run(self, inputs: list[MicroPartition]) -> list[MicroPartition]:
[input] = inputs
results = []

input_length = len(input)
num_outputs = self.num_outputs()

chunk_size, remainder = divmod(input_length, num_outputs)
ptr = 0
for output_idx in range(self.num_outputs()):
end = ptr + chunk_size + 1 if output_idx < remainder else ptr + chunk_size
results.append(input.slice(ptr, end))
ptr = end
assert ptr == input_length

return results

def run_partial_metadata(self, input_metadatas: list[PartialPartitionMetadata]) -> list[PartialPartitionMetadata]:
# TODO: Derive this based on the ratios of num rows
return [
PartialPartitionMetadata(
num_rows=None,
size_bytes=None,
)
for _ in range(self._num_outputs)
]
73 changes: 17 additions & 56 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,6 @@ def actor_pool_project(
actor_pool_name = f"{stateful_udf_names}-stage={stage_id}"

# Keep track of materializations of the children tasks
#
# Our goal here is to saturate the actors, and so we need a sufficient number of completed child tasks to do so. However
# we do not want too many child tasks to be running (potentially starving our actors) and hence place an upper bound of `num_actors * 2`
child_materializations_buffer_len = num_actors * 2
child_materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()

# Keep track of materializations of the actor_pool tasks
Expand Down Expand Up @@ -313,8 +309,8 @@ def actor_pool_project(
if len(child_materializations) > 0 or len(actor_pool_materializations) > 0:
yield None

# If there is capacity in the pipeline, attempt to schedule child work
elif len(child_materializations) < child_materializations_buffer_len:
# Attempt to schedule child work
else:
try:
child_step = next(child_plan)
except StopIteration:
Expand All @@ -326,10 +322,6 @@ def actor_pool_project(
child_materializations.append(child_step)
yield child_step

# Otherwise, indicate that we need to wait for work to complete
else:
yield None


def monotonically_increasing_id(
child_plan: InProgressPhysicalPlan[PartitionT], column_name: str
Expand Down Expand Up @@ -1351,61 +1343,30 @@ def split(
num_input_partitions: int,
num_output_partitions: int,
) -> InProgressPhysicalPlan[PartitionT]:
"""Repartition the child_plan into more partitions by splitting partitions only. Preserves order."""
"""Repartition the child_plan into more partitions by splitting partitions only. Preserves order.
This performs a naive split, which might lead to data skews but does not require a full materialization of
input partitions when performing the split.
"""
assert (
num_output_partitions >= num_input_partitions
), f"Cannot split from {num_input_partitions} to {num_output_partitions}."

# Materialize the input partitions so we can see the number of rows and try to split evenly.
# Splitting evenly is fairly important if this operation is to be used for parallelism.
# (optimization TODO: don't materialize if num_rows is already available in physical plan metadata.)
materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()
stage_id = next(stage_id_counter)
base_splits_per_partition, num_partitions_with_extra_output = divmod(num_output_partitions, num_input_partitions)

input_partition_idx = 0
for step in child_plan:
if isinstance(step, PartitionTaskBuilder):
step = step.finalize_partition_task_single_output(stage_id=stage_id)
materializations.append(step)
yield step

while any(not _.done() for _ in materializations):
logger.debug("split_to blocked on completion of all sources: %s", materializations)
yield None

splits_per_partition = deque([1 for _ in materializations])
num_splits_to_apply = num_output_partitions - num_input_partitions

# Split by rows for now.
# In the future, maybe parameterize to allow alternatively splitting by size.
rows_by_partitions = [task.partition_metadata().num_rows for task in materializations]

# Calculate how to spread the required splits across all the partitions.
# Iteratively apply a split and update how many rows would be in the resulting partitions.
# After this loop, splits_per_partition has the final number of splits to apply to each partition.
rows_after_splitting = [float(_) for _ in rows_by_partitions]
for _ in range(num_splits_to_apply):
_, split_at = max((rows, index) for (index, rows) in enumerate(rows_after_splitting))
splits_per_partition[split_at] += 1
rows_after_splitting[split_at] = float(rows_by_partitions[split_at] / splits_per_partition[split_at])

# Emit the split partitions.
for task, num_out, num_rows in zip(consume_deque(materializations), splits_per_partition, rows_by_partitions):
if num_out == 1:
yield PartitionTaskBuilder[PartitionT](
inputs=[task.partition()],
partial_metadatas=[task.partition_metadata()],
resource_request=ResourceRequest(memory_bytes=task.partition_metadata().size_bytes),
num_out = (
base_splits_per_partition + 1
if input_partition_idx < num_partitions_with_extra_output
else base_splits_per_partition
)
step = step.add_instruction(instruction=execution_step.FanoutEvenSlices(_num_outputs=num_out))
input_partition_idx += 1
yield step
else:
boundaries = [math.ceil(num_rows * i / num_out) for i in range(num_out + 1)]
starts, ends = boundaries[:-1], boundaries[1:]
yield PartitionTaskBuilder[PartitionT](
inputs=[task.partition()],
partial_metadatas=[task.partition_metadata()],
resource_request=ResourceRequest(memory_bytes=task.partition_metadata().size_bytes),
).add_instruction(
instruction=execution_step.FanoutSlices(_num_outputs=num_out, slices=list(zip(starts, ends)))
)
yield step


def coalesce(
Expand Down
17 changes: 0 additions & 17 deletions daft/io/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from __future__ import annotations

import sys

from daft.daft import (
AzureConfig,
GCSConfig,
Expand All @@ -21,21 +19,6 @@
from daft.io.catalog import DataCatalogTable, DataCatalogType
from daft.io.file_path import from_glob_path


def _set_linux_cert_paths():
import os
import ssl

paths = ssl.get_default_verify_paths()
if paths.cafile:
os.environ[paths.openssl_cafile_env] = paths.openssl_cafile
if paths.capath:
os.environ[paths.openssl_capath_env] = paths.openssl_capath


if sys.platform == "linux":
_set_linux_cert_paths()

__all__ = [
"read_csv",
"read_json",
Expand Down
21 changes: 21 additions & 0 deletions daft/io/aws_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from typing import TYPE_CHECKING

from daft.daft import S3Config

if TYPE_CHECKING:
import boto3


def boto3_client_from_s3_config(service: str, s3_config: S3Config) -> "boto3.client":
import boto3

return boto3.client(
service,
region_name=s3_config.region_name,
use_ssl=s3_config.use_ssl,
verify=s3_config.verify_ssl,
endpoint_url=s3_config.endpoint_url,
aws_access_key_id=s3_config.key_id,
aws_secret_access_key=s3_config.access_key,
aws_session_token=s3_config.session_token,
)
15 changes: 2 additions & 13 deletions daft/io/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Optional

from daft.daft import IOConfig
from daft.io.aws_config import boto3_client_from_s3_config


class DataCatalogType(Enum):
Expand Down Expand Up @@ -42,20 +43,8 @@ def table_uri(self, io_config: IOConfig) -> str:
"""
if self.catalog == DataCatalogType.GLUE:
# Use boto3 to get the table from AWS Glue Data Catalog.
import boto3
glue = boto3_client_from_s3_config("glue", io_config.s3)

s3_config = io_config.s3

glue = boto3.client(
"glue",
region_name=s3_config.region_name,
use_ssl=s3_config.use_ssl,
verify=s3_config.verify_ssl,
endpoint_url=s3_config.endpoint_url,
aws_access_key_id=s3_config.key_id,
aws_secret_access_key=s3_config.access_key,
aws_session_token=s3_config.session_token,
)
if self.catalog_id is not None:
# Allow cross account access, table.catalog_id should be the target account id
glue_table = glue.get_table(
Expand Down
8 changes: 6 additions & 2 deletions daft/sql/sql_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,18 @@ def _get_num_rows(self) -> int:

def _attempt_partition_bounds_read(self, num_scan_tasks: int) -> tuple[Any, PartitionBoundStrategy]:
try:
# Try to get percentiles using percentile_cont
# Try to get percentiles using percentile_disc.
# Favor percentile_disc over percentile_cont because we want exact values to do <= and >= comparisons.
percentiles = [i / num_scan_tasks for i in range(num_scan_tasks + 1)]
# Use the OVER clause for SQL Server
over_clause = "OVER ()" if self.conn.dialect in ["mssql", "tsql"] else ""
percentile_sql = self.conn.construct_sql_query(
self.sql,
projection=[
f"percentile_disc({percentile}) WITHIN GROUP (ORDER BY {self._partition_col}) AS bound_{i}"
f"percentile_disc({percentile}) WITHIN GROUP (ORDER BY {self._partition_col}) {over_clause} AS bound_{i}"
for i, percentile in enumerate(percentiles)
],
limit=1,
)
pa_table = self.conn.execute_sql_query(percentile_sql)
return pa_table, PartitionBoundStrategy.PERCENTILE
Expand Down
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ trino[sqlalchemy]==0.328.0; python_version >= '3.8'
PyMySQL==1.1.0; python_version >= '3.8'
psycopg2-binary==2.9.9; python_version >= '3.8'
sqlglot==23.3.0; python_version >= '3.8'
pyodbc==5.1.0; python_version >= '3.8'

# AWS
s3fs==2023.12.0; python_version >= '3.8'
Expand Down
5 changes: 1 addition & 4 deletions src/arrow2/src/array/growable/primitive.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use std::sync::Arc;

use crate::{
array::{Array, PrimitiveArray},
bitmap::MutableBitmap,
datatypes::DataType,
types::NativeType,
array::{Array, PrimitiveArray}, bitmap::MutableBitmap, datatypes::DataType, types::NativeType
};

use super::{
Expand Down
2 changes: 2 additions & 0 deletions src/daft-core/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
//!
//! This module re-exports commonly used items from the Daft core library.
// Re-export arrow2 bitmap
pub use arrow2::bitmap;
// Re-export core series structures
pub use daft_schema::schema::{Schema, SchemaRef};

Expand Down
16 changes: 2 additions & 14 deletions src/daft-dsl/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -990,21 +990,9 @@ impl Expr {
to_sql_inner(inner, buffer)?;
write!(buffer, ") IS NOT NULL")
}
Expr::IfElse {
if_true,
if_false,
predicate,
} => {
write!(buffer, "CASE WHEN ")?;
to_sql_inner(predicate, buffer)?;
write!(buffer, " THEN ")?;
to_sql_inner(if_true, buffer)?;
write!(buffer, " ELSE ")?;
to_sql_inner(if_false, buffer)?;
write!(buffer, " END")
}
// TODO: Implement SQL translations for these expressions if possible
Expr::Agg(..)
Expr::IfElse { .. }
| Expr::Agg(..)
| Expr::Cast(..)
| Expr::IsIn(..)
| Expr::Between(..)
Expand Down
Loading

0 comments on commit 945a0c7

Please sign in to comment.