Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Streaming physical writes for native executor #2992

Merged
merged 18 commits into from
Oct 31, 2024
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ daft-local-execution = {path = "src/daft-local-execution", default-features = fa
daft-micropartition = {path = "src/daft-micropartition", default-features = false}
daft-minhash = {path = "src/daft-minhash", default-features = false}
daft-parquet = {path = "src/daft-parquet", default-features = false}
daft-physical-plan = {path = "src/daft-physical-plan", default-features = false}
daft-plan = {path = "src/daft-plan", default-features = false}
daft-scan = {path = "src/daft-scan", default-features = false}
daft-scheduler = {path = "src/daft-scheduler", default-features = false}
Expand Down Expand Up @@ -47,6 +48,7 @@ python = [
"daft-json/python",
"daft-micropartition/python",
"daft-parquet/python",
"daft-physical-plan/python",
"daft-plan/python",
"daft-scan/python",
"daft-scheduler/python",
Expand Down
135 changes: 135 additions & 0 deletions daft/io/writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import uuid
from typing import Dict, Optional, Union

Check warning on line 2 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L1-L2

Added lines #L1 - L2 were not covered by tests

from daft.daft import IOConfig, PyTable
from daft.dependencies import pa, pacsv, pq
from daft.filesystem import (

Check warning on line 6 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L4-L6

Added lines #L4 - L6 were not covered by tests
_resolve_paths_and_filesystem,
canonicalize_protocol,
get_protocol_from_path,
)
from daft.series import Series
from daft.table.micropartition import MicroPartition
from daft.table.table import Table

Check warning on line 13 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L11-L13

Added lines #L11 - L13 were not covered by tests


def partition_values_to_str_mapping(

Check warning on line 16 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L16

Added line #L16 was not covered by tests
partition_values: Table,
) -> Dict[str, str]:
null_part = Series.from_pylist([None])
colin-ho marked this conversation as resolved.
Show resolved Hide resolved
pkey_names = partition_values.column_names()

Check warning on line 20 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L19-L20

Added lines #L19 - L20 were not covered by tests

partition_strings = {}

Check warning on line 22 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L22

Added line #L22 was not covered by tests

for c in pkey_names:
column = partition_values.get_column(c)
string_names = column._to_str_values()
null_filled = column.is_null().if_else(null_part, string_names)
partition_strings[c] = null_filled.to_pylist()[0]

Check warning on line 28 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L24-L28

Added lines #L24 - L28 were not covered by tests

return partition_strings

Check warning on line 30 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L30

Added line #L30 was not covered by tests


def partition_string_mapping_to_postfix(

Check warning on line 33 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L33

Added line #L33 was not covered by tests
partition_strings: Dict[str, str],
default_partition_fallback: str,
) -> str:
postfix = "/".join(

Check warning on line 37 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L37

Added line #L37 was not covered by tests
f"{k}={v if v is not None else default_partition_fallback}" for k, v in partition_strings.items()
)
return postfix

Check warning on line 40 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L40

Added line #L40 was not covered by tests


class FileWriterBase:
colin-ho marked this conversation as resolved.
Show resolved Hide resolved
def __init__(

Check warning on line 44 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L43-L44

Added lines #L43 - L44 were not covered by tests
self,
root_dir: str,
file_idx: int,
file_format: str,
partition_values: Optional[PyTable] = None,
colin-ho marked this conversation as resolved.
Show resolved Hide resolved
compression: Optional[str] = None,
io_config: Optional[IOConfig] = None,
default_partition_fallback: str = "__HIVE_DEFAULT_PARTITION__",
):
[self.resolved_path], self.fs = _resolve_paths_and_filesystem(root_dir, io_config=io_config)
protocol = get_protocol_from_path(root_dir)
canonicalized_protocol = canonicalize_protocol(protocol)
is_local_fs = canonicalized_protocol == "file"

Check warning on line 57 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L54-L57

Added lines #L54 - L57 were not covered by tests

self.file_name = f"{uuid.uuid4()}-{file_idx}.{file_format}"
self.partition_values = Table._from_pytable(partition_values) if partition_values is not None else None
if self.partition_values is not None:
partition_strings = partition_values_to_str_mapping(self.partition_values)
postfix = partition_string_mapping_to_postfix(partition_strings, default_partition_fallback)
self.dir_path = f"{self.resolved_path}/{postfix}"

Check warning on line 64 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L59-L64

Added lines #L59 - L64 were not covered by tests
else:
self.dir_path = f"{self.resolved_path}"

Check warning on line 66 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L66

Added line #L66 was not covered by tests

self.full_path = f"{self.dir_path}/{self.file_name}"
if is_local_fs:
self.fs.create_dir(self.dir_path, recursive=True)

Check warning on line 70 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L68-L70

Added lines #L68 - L70 were not covered by tests

self.compression = compression if compression is not None else "none"
self.current_writer: Optional[Union[pq.ParquetWriter, pacsv.CSVWriter]] = None

Check warning on line 73 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L72-L73

Added lines #L72 - L73 were not covered by tests
colin-ho marked this conversation as resolved.
Show resolved Hide resolved

colin-ho marked this conversation as resolved.
Show resolved Hide resolved
def _create_writer(self, schema: pa.Schema):
raise NotImplementedError("Subclasses must implement this method.")

Check warning on line 76 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L75-L76

Added lines #L75 - L76 were not covered by tests

def write(self, table: MicroPartition):
if self.current_writer is None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be better here to rely on override-able methods or properties.
so something like if self.current_writer() is None:
Ideally, you can center the logic here and then have the child classes implement the specifics for each file type

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I made the methods in FileWriterBase all abstract methods, so the child classes have their own implementation.

self.current_writer = self._create_writer(table.schema().to_pyarrow_schema())
self.current_writer.write_table(table.to_arrow())

Check warning on line 81 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L78-L81

Added lines #L78 - L81 were not covered by tests

def close(self) -> PyTable:
colin-ho marked this conversation as resolved.
Show resolved Hide resolved
if self.current_writer is not None:
self.current_writer.close()

Check warning on line 85 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L83-L85

Added lines #L83 - L85 were not covered by tests

metadata = {"path": Series.from_pylist([self.full_path])}
if self.partition_values is not None:
for col_name in self.partition_values.column_names():
metadata[col_name] = self.partition_values.get_column(col_name)
return Table.from_pydict(metadata)._table

Check warning on line 91 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L87-L91

Added lines #L87 - L91 were not covered by tests

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also have a finalize method rather than overloading close to start a next file and closing the last file

Copy link
Contributor Author

@colin-ho colin-ho Oct 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was actually intending for these Python writers to be non rotating. i.e. no writing after closing. They should be given a unique file_idx for the file_name generation upon construction, and unique set of partition_values.

I will add assertions and some comments to document this behaviour


class ParquetFileWriter(FileWriterBase):
def __init__(

Check warning on line 95 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L94-L95

Added lines #L94 - L95 were not covered by tests
self,
root_dir: str,
file_idx: int,
partition_values: Optional[PyTable] = None,
compression: str = "none",
io_config: Optional[IOConfig] = None,
):
super().__init__(root_dir, file_idx, "parquet", partition_values, compression, io_config)

Check warning on line 103 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L103

Added line #L103 was not covered by tests

def _create_writer(self, schema: pa.Schema) -> pq.ParquetWriter:
return pq.ParquetWriter(

Check warning on line 106 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L105-L106

Added lines #L105 - L106 were not covered by tests
self.full_path,
schema,
compression=self.compression,
use_compliant_nested_type=False,
filesystem=self.fs,
)


class CSVFileWriter(FileWriterBase):
def __init__(

Check warning on line 116 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L115-L116

Added lines #L115 - L116 were not covered by tests
self,
root_dir: str,
file_idx: int,
partition_values: Optional[PyTable] = None,
io_config: Optional[IOConfig] = None,
):
super().__init__(

Check warning on line 123 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L123

Added line #L123 was not covered by tests
root_dir,
file_idx,
"csv",
partition_values=partition_values,
io_config=io_config,
)

def _create_writer(self, schema: pa.Schema) -> pacsv.CSVWriter:
return pacsv.CSVWriter(

Check warning on line 132 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L131-L132

Added lines #L131 - L132 were not covered by tests
self.full_path,
schema,
)
13 changes: 12 additions & 1 deletion src/daft-core/src/utils/identity_hash_set.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::hash::{BuildHasherDefault, Hasher};
use std::hash::{BuildHasherDefault, Hash, Hasher};

pub type IdentityBuildHasher = BuildHasherDefault<IdentityHasher>;

Expand Down Expand Up @@ -27,3 +27,14 @@
self.hash = i;
}
}

pub struct IndexHash {
pub idx: u64,
pub hash: u64,
}

impl Hash for IndexHash {
fn hash<H: Hasher>(&self, state: &mut H) {
state.write_u64(self.hash);
}

Check warning on line 39 in src/daft-core/src/utils/identity_hash_set.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-core/src/utils/identity_hash_set.rs#L37-L39

Added lines #L37 - L39 were not covered by tests
}
1 change: 1 addition & 0 deletions src/daft-local-execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ daft-scan = {path = "../daft-scan", default-features = false}
daft-table = {path = "../daft-table", default-features = false}
futures = {workspace = true}
indexmap = {workspace = true}
itertools = {workspace = true}
lazy_static = {workspace = true}
log = {workspace = true}
num-format = "0.4.4"
Expand Down
97 changes: 97 additions & 0 deletions src/daft-local-execution/src/buffer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
use std::{cmp::Ordering::*, collections::VecDeque, sync::Arc};

use common_error::DaftResult;
use daft_micropartition::MicroPartition;

// A buffer that accumulates morsels until a threshold is reached
pub struct RowBasedBuffer {
pub buffer: VecDeque<Arc<MicroPartition>>,
pub curr_len: usize,
pub threshold: usize,
}

impl RowBasedBuffer {
pub fn new(threshold: usize) -> Self {
assert!(threshold > 0);
Self {
buffer: VecDeque::new(),
curr_len: 0,
threshold,
}
}

// Push a morsel to the buffer
pub fn push(&mut self, part: Arc<MicroPartition>) {
self.curr_len += part.len();
self.buffer.push_back(part);
}

// Pop enough morsels that reach the threshold
// - If the buffer currently has not enough morsels, return None
// - If the buffer has exactly enough morsels, return the morsels
// - If the buffer has more than enough morsels, return a vec of morsels, each correctly sized to the threshold.
// The remaining morsels will be pushed back to the buffer
pub fn pop_enough(&mut self) -> DaftResult<Option<Vec<Arc<MicroPartition>>>> {
match self.curr_len.cmp(&self.threshold) {
Less => Ok(None),
Equal => {
if self.buffer.len() == 1 {
let part = self.buffer.pop_front().unwrap();
self.curr_len = 0;
Ok(Some(vec![part]))

Check warning on line 41 in src/daft-local-execution/src/buffer.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-local-execution/src/buffer.rs#L38-L41

Added lines #L38 - L41 were not covered by tests
} else {
let chunk = MicroPartition::concat(
&std::mem::take(&mut self.buffer)
.iter()
.map(|x| x.as_ref())
.collect::<Vec<_>>(),
)?;
self.curr_len = 0;
Ok(Some(vec![Arc::new(chunk)]))

Check warning on line 50 in src/daft-local-execution/src/buffer.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-local-execution/src/buffer.rs#L43-L50

Added lines #L43 - L50 were not covered by tests
}
}
Greater => {
let num_ready_chunks = self.curr_len / self.threshold;
let concated = MicroPartition::concat(
&std::mem::take(&mut self.buffer)
.iter()
.map(|x| x.as_ref())
.collect::<Vec<_>>(),
)?;
let mut start = 0;
let mut parts_to_return = Vec::with_capacity(num_ready_chunks);
for _ in 0..num_ready_chunks {
let end = start + self.threshold;
let part = Arc::new(concated.slice(start, end)?);
parts_to_return.push(part);
start = end;
}
if start < concated.len() {
let part = Arc::new(concated.slice(start, concated.len())?);
self.curr_len = part.len();
self.buffer.push_back(part);
} else {
self.curr_len = 0;
}

Check warning on line 75 in src/daft-local-execution/src/buffer.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-local-execution/src/buffer.rs#L73-L75

Added lines #L73 - L75 were not covered by tests
Ok(Some(parts_to_return))
}
}
}

// Pop all morsels in the buffer regardless of the threshold
pub fn pop_all(&mut self) -> DaftResult<Option<Arc<MicroPartition>>> {
assert!(self.curr_len < self.threshold);
if self.buffer.is_empty() {
Ok(None)
} else {
let concated = MicroPartition::concat(
&std::mem::take(&mut self.buffer)
.iter()
.map(|x| x.as_ref())
.collect::<Vec<_>>(),
)?;
self.curr_len = 0;
Ok(Some(Arc::new(concated)))
}
}
}
91 changes: 0 additions & 91 deletions src/daft-local-execution/src/intermediate_ops/buffer.rs

This file was deleted.

Loading
Loading