Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[write] Transaction append data API #393

Merged

Conversation

zachschuermann
Copy link
Collaborator

@zachschuermann zachschuermann commented Oct 14, 2024

This PR is the second (of two) major pieces for supporting simple blind appends. It implements:

  1. new Transaction APIs for appending data to delta tables:
    a. get_write_context() to get a WriteContext to pass to the data path which includes all information needed to write: target directory, snapshot schema, transformation expression, and (future: columns to collect stats on)
    b. add_write_metadata(impl EngineData) to add metadata about a write to the transaction along with a new static method transaction::get_write_metadata_schema to provide the expected schema of this engine data.
    c. new machinery in 'commit' method to commit new Add actions for each row of write_metadata from the API above.
  2. new default engine capabilities for using the default engine to write parquet data (to append to tables):
    a. parquet handler can now write_parquet_file(EngineData)
    b. usage example in write.rs tests for now
  3. new append tests in the write.rs integration test suite

Details and some follow-ups:

  • the parquet writing (similar to JSON) currently just buffers everything into memory before issuing one big PUT. we should make this smarter: single PUT for small data and MultipartUpload for larger data. tracking in default JSON/parquet handler write implementations should stream to object store #418
  • schema enforcement is done at the data layer. this means it is up to the engine to call the expression evaluation and we expect this to fail if the output schema is incorrect (see test_append_invalid_schema in write.rs integration test). we may want to change this in the future to eagerly error based on the engine providing a schema up front at metadata time (transaction creation time)

based on #370
resolves #390

@zachschuermann zachschuermann self-assigned this Oct 14, 2024
@github-actions github-actions bot added the breaking-change Change that will require a version bump label Oct 14, 2024
@zachschuermann zachschuermann changed the title [wip][write stage0] Transaction append data API [write stage0] Transaction append data API Oct 28, 2024
Copy link

codecov bot commented Oct 29, 2024

Codecov Report

Attention: Patch coverage is 93.57542% with 23 lines in your changes missing coverage. Please review.

Project coverage is 78.88%. Comparing base (4466509) to head (796f2c4).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
kernel/src/engine/default/parquet.rs 91.51% 6 Missing and 13 partials ⚠️
kernel/src/engine/arrow_utils.rs 33.33% 0 Missing and 2 partials ⚠️
kernel/src/engine/default/mod.rs 92.00% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #393      +/-   ##
==========================================
+ Coverage   78.41%   78.88%   +0.46%     
==========================================
  Files          55       55              
  Lines       11806    12157     +351     
  Branches    11806    12157     +351     
==========================================
+ Hits         9258     9590     +332     
- Misses       2041     2047       +6     
- Partials      507      520      +13     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@zachschuermann zachschuermann marked this pull request as ready for review October 29, 2024 04:19
@zachschuermann
Copy link
Collaborator Author

note: did a sanity check and we can read partitioned table with delta-spark (v3.2.0)

>>> spark.read.format("delta").load("/users/zach.schuermann/desktop/kernel_tests/test_table").show()
24/10/29 12:00:49 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+------+---------+
|number|partition|
+------+---------+
|     1|        a|
|     2|        a|
|     3|        a|
|     4|        b|
|     5|        b|
|     6|        b|
+------+---------+

Comment on lines 112 to 114
pub fn get_parquet_handler(&self) -> Arc<DefaultParquetHandler<E>> {
self.parquet.clone()
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't particularly like this but we need a way to get back the actual DefaultParquetHandler instead of the type-erased Arc<dyn ParquetHandler>. I think we can do it a couple ways: either like this, or we implement as_any to allow us to just cast the types. thoughts?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this because we want to be able to call write_parquet_files? Surely that should be part of the ParquetHandler api spec?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup! we originally didn't want to impose that requirement on engines since they should elect to just write parquet files however they want. if we added it to ParquetHandler then we would be requiring an API that we (kernel) don't actually use (only the default engine). That being said, we will likely need a write_parquet_files API for writing checkpoints so we could just introduce the requirement here and use it in the default engine but only really require it in the future once we are writing checkpoints

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mmm okay. yeah the other options are a bit... messy, downcasting is not trivial.

can we make it pub(crate) or does that not work?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unfortunately doesn't work - could use developer-visibility but that's the best i could come up with for hiding it

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like other engines will face a similar issue, and even our own default engine already hit it previously with trait EngineData (see its as_any and into_any methods). We also know it will be a Big Deal for FFI, where we need the ability to downcast not merely to a concrete rust struct, but to a native engine's C/C++ struct.

I suspect it's time to generalize any-casting capability and bake it into all these engine traits, i.e. #450?

@@ -52,7 +52,7 @@ impl JsonHandler for SyncJsonHandler {
fn write_json_file(
&self,
path: &Url,
data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send>,
data: Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send + '_>,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious of what people think of this API change. if we don't propagate results to here then we end up in a weird situation of either having to traverse everything to filter out Err cases or just unwap stuff? is there a better way to handle?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are we trying to achieve? Seems like it'll still be an issue that we might write some data and then fail. Without checking the whole thing for Err cases I'm not sure how you can avoid that?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have things like expression evaluation happening at every chunk of the iterator (e.g. before we get here we do the transformation from 'write_metadata' to actions) which might fail. Thus we have to 'hold on' to those Results and propagate them through to this API. And yea not sure if there's a way around that other than something that scans the whole thing ahead of time?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But are you going to somehow recover from those errors? Why not just propagate earlier?

To make sure I'm getting it. This would for example propagate any errors in generate_commit_info down here, rather than early returning from commit() when generate_commit_info failed, right? Early fail in commit actually seems better, so I'm not seeing the benefit here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But are you going to somehow recover from those errors? Why not just propagate earlier?

probably not? but unsure how to propagate early without traversing the whole iterator.

To make sure I'm getting it. This would for example propagate any errors in generate_commit_info down here, rather than early returning from commit() when generate_commit_info failed, right? Early fail in commit actually seems better, so I'm not seeing the benefit here.

since generate_commit_info just always appends the first batch and we could fail early I'll use generate_adds as an example: we lazily generate the adds to produce an impl Iterator<Item = DeltaResult<Box<dyn EngineData>>>. that is, generate_adds takes an iterator of write_metadata and produces an iterator of result (as engine data). The result is injected here since we do some fallible work like expression evaluation to transform into the actions. Then, we are left with handing this iterator of results to the write_json API unless we want to traverse the whole iterator prior to passing it off to check that one of the batches didn't fail.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh, finally I get it. Yeah, we want to let the errors propagate until we need their result. This makes sense. Thanks for bearing with me :)

@@ -56,14 +81,14 @@ impl Transaction {
read_snapshot: snapshot.into(),
operation: None,
commit_info: None,
write_metadata: vec![],
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note we make write_metadata a vec so that we can keep all the batches to traverse multiple times in the case we need to retry the commit. one alternative here is to just require the engine to do the concatenation and hand us one big batch of write metadata

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java kernel requires engine to hand over an iterator, potentially multiple times if there are retries. That way it's engine's job to deal with memory management and spilling on big commits.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea I think we could have them pass something that's impl IntoIterator?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

opened #465 to have some more discussion and take this as a follow-up :)

@zachschuermann zachschuermann requested review from nicklan and scovich and removed request for nicklan October 29, 2024 23:29
Copy link
Collaborator

@nicklan nicklan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flushing some initial comments.

Comment on lines 112 to 114
pub fn get_parquet_handler(&self) -> Arc<DefaultParquetHandler<E>> {
self.parquet.clone()
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this because we want to be able to call write_parquet_files? Surely that should be part of the ParquetHandler api spec?

kernel/src/engine/default/parquet.rs Show resolved Hide resolved
let batch: Box<_> = ArrowEngineData::try_from_engine_data(data)?;
let record_batch = batch.record_batch();

let mut buffer = vec![];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sucks that object_store doesn't have any notion of taking a "stream of bytes" and instead wants everything as a single chunk.

We should probably at least file a tracking issue to see if we can improve this somehow.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a streaming write to an object store would have to take the form of a multipart upload which object_store does support with the MultipartUpload trait and perhaps we can just implement that in some async way to stream our data? also ran across WriteMultipart but I'll do some more digging.

I've already got #418 open for doing streaming JSON puts, and i've just gone ahead and expanded that issue to be streaming JSON/parquet since I think it will generally be the same

Copy link
Collaborator

@scovich scovich Oct 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That WriteMultipart does indeed seem relevant. Looks like it's up to us to sort out simple PUT vs. MPU tho (similar to how arrow parquet reads are super low-level, I guess).

Parquet will be trickier because somebody (engine or default engine) has to encode+compress the bytes that will eventually be written. Parquet is designed to be streamed (row groups, and footer comes last), but I don't know how well arrow-parquet supports it for default client? There's e.g. SerializedFileWriter that uses Write, so I guess we'd have to manually tie that in with the object store machinery (WriteMultipart doesn't seem to impl Write). Our Write impl would have to decide PUT vs. MPU dynamically based on data size being written, I suppose.

Also: I hope there's some easy way to write out arrow data as parquet? From my quick skim, it looks like we'd be wrangling individual row groups and column chunks.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep gonna punt on this for now and optimize in a follow-up.

Also: I hope there's some easy way to write out arrow data as parquet? From my quick skim, it looks like we'd be wrangling individual row groups and column chunks.

we are just using parquet::arrow::arrow_writer::ArrowWriter here! (rather straightforward to use to write record batches - not sure if that's what you were asking)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without disagreeing with anything above, I also meant that object_store should be able to just do a "normal" put on anything that implements std::io::Read.

It's going to write it all as the body of the request out to the network/disk, so it should be able to just stream the bytes rather than needing them in one continuous chunk.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yea got it! I can create an issue for this since it's seeming different than the full 'stream to multipart' one?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, too much time invested in my aside here. It would be an issue on object_store which we could look at, but probably not now, and probably not something to make an issue for in this repo.

@@ -52,7 +52,7 @@ impl JsonHandler for SyncJsonHandler {
fn write_json_file(
&self,
path: &Url,
data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send>,
data: Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send + '_>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are we trying to achieve? Seems like it'll still be an issue that we might write some data and then fail. Without checking the whole thing for Err cases I'm not sure how you can avoid that?

kernel/src/transaction.rs Outdated Show resolved Hide resolved
@nicklan nicklan requested a review from azdavis October 30, 2024 22:24
Comment on lines 112 to 114
pub fn get_parquet_handler(&self) -> Arc<DefaultParquetHandler<E>> {
self.parquet.clone()
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mmm okay. yeah the other options are a bit... messy, downcasting is not trivial.

can we make it pub(crate) or does that not work?

kernel/tests/write.rs Outdated Show resolved Hide resolved
/// Metadata of a parquet file, currently just includes the file metadata but will expand to
/// include file statistics and other metadata in the future.
#[derive(Debug)]
pub struct ParquetMetadata {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest giving this a different name to avoid confusion with the parquet crate's metadata

I wanted to suggest just using their types, since there is overlap, but this feels more like maybe ParquetWriteMetadata or something, i.e. metadata relating to the write rather than the parquet file itself.

Also, all the stuff that goes into WRITE_METADATA_SCHEMA is buried in the FileMeta. I'd prefer to have a struct that could #[derive(Schema)] for the metadata than have the schema manually declared. AFAICT you're not using the fact that this is a FileMeta anywhere, and are just using it as a container inside a container.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup - good point! originally, I just had ParquetMetadata with path, size, modification_time and just realized that was the same as FileMeta so tried to unify there. It sounds like instead you're thinking of just flattening those fields under a ParquetWriteMetadata struct that will live in the kernel (instead of just in default engine) and that uses #[derive(Schema)] to get the 'write metadata schema' instead of the separate schema definition.

That seems to be more elegant thanks! :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, thinking about this more, we have two steps to this data flow:

  1. engine collecting the data returned from each parquet file written:
  • path
  • size
  • last_modified
  1. engine communicating to kernel the data required for each write:
  • path
  • size
  • last_modified
  • partition_values
  • data_change

In (2) we add partition_values and data_change, which wouldn't necessarily be inherent to the parquet write but layers on more information about that file (e.g. engine just wrote this parquet file corresponding to partition_col = part1)

I had introduced ParquetMetadata to house (1) which is purely an engine concept. Then, in kernel I had WRITE_METADATA_SCHEMA which specified the schema for (2).

Thinking more about the suggestion, maybe we need a rename of (1) and then maybe change (2) to be a struct that does #[derive(Schema)]? I'm thinking now about reasons for struct vs. just the schema definition like I have but wanted to post this comment to add some more context and then we can discuss more :)

kernel/src/engine/default/parquet.rs Outdated Show resolved Hide resolved
kernel/src/transaction.rs Outdated Show resolved Hide resolved
kernel/src/transaction.rs Outdated Show resolved Hide resolved
kernel/src/transaction.rs Outdated Show resolved Hide resolved
kernel/src/transaction.rs Outdated Show resolved Hide resolved
kernel/src/transaction.rs Outdated Show resolved Hide resolved
kernel/tests/write.rs Outdated Show resolved Hide resolved
@zachschuermann zachschuermann changed the title [write stage0] Transaction append data API [write] Transaction append data API Nov 1, 2024
Copy link
Collaborator

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flushing comments... hopefully can finish soon

Comment on lines 112 to 114
pub fn get_parquet_handler(&self) -> Arc<DefaultParquetHandler<E>> {
self.parquet.clone()
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like other engines will face a similar issue, and even our own default engine already hit it previously with trait EngineData (see its as_any and into_any methods). We also know it will be a Big Deal for FFI, where we need the ability to downcast not merely to a concrete rust struct, but to a native engine's C/C++ struct.

I suspect it's time to generalize any-casting capability and bake it into all these engine traits, i.e. #450?

kernel/src/engine/default/parquet.rs Outdated Show resolved Hide resolved
}

// convert ParquetMetadata into a record batch which matches the 'write_metadata' schema
fn as_record_batch(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want a single-row version of this? Or is it better to take a slice of some kind?
Also, why are partition values etc. provided externally, rather than part of self?

I could totally buy the argument that partition values and data_change are a "bulk" concept... but in that case we should definitely have the record batch generator take a slice of Self, along with those other bits of information.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: I don't see any call sites for write_parquet_file, so it's hard to guess all the implications of my suggestion. But I would think any reasonable engine implementation will work with iterators or sequences of files, and that we could inject the as_record_batch calls there?

The fact that write_parquet_file is async is a strong indication that we expect it to be part of a lazy iterator of some sort, no? If nothing else, some default engine code somewhere will have to wait on that async call to complete, since the public kernel API surface is all sync.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

write_parquet_file is called from write_parquet here

I think this design is a consequence of write_parquet only taking one file at a time, and that kernel has to go into EngineData at that point. If you could pass multiple files to write_parquet then you'd write them all and build one record batch with all the metadata. But since we do it one at a time, and kernel has to "save" each metadata so it can loop over it in the commit, we have to do this one row approach.

What we really want here (i think) is just an "opaque" type that allows being turned into EngineData, potentially in a batch.

So ParquetMetadata would become a type at the kernel level, not just in the engine, but it's just a marker. And then kernel saves up all the ParquetMetadatas (whatever the engine wants to return for them) and then calls into_engine_data(metadatas) (another new kernel level api), where the engine can turn all the metadata at once into its EngineData.

Given the scope of that, if we like it, maybe we stick with this and tackle that as a follow-up?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea and for context on the parquet file writing - I think we likely do want to support writing multiple files at once - java kernel does something similar taking an Iterator of column batches to write. I've aimed to derisk this whole thing by implementing this just within our default engine (no write parquet file engine API) and we can expand the API later?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and yea @nicklan I haven't thought about that sort of trait-based API (that would be a trait that needs into_engine_data(..) right?) that sounds good and yea probably try to do simple here and i'll make an issue to follow up?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm okay with that. Note that the trait probably wouldn't have into_engine_data, but rather something in the engine would need an into_engine_data(Vec<TheNewTrait>) (or maybe iter). You need a collection of them all at once

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just opened #466 since I'm on a roll and love making new issues

kernel/src/engine/default/parquet.rs Show resolved Hide resolved
Comment on lines +84 to +86
let size: i64 = (*size)
.try_into()
.map_err(|_| Error::generic("Failed to convert parquet metadata 'size' to i64"))?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aside: I almost wonder if FileMeta should just use size: i64 instead of size: usize? At least that way the burden of casting is paid only once, by whoever creates it, rather than (potentially) on every read scattered through the code base?

We're anyway nowhere near hitting any meaningful object size limits, so the cast should never fail:

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about u64? a little safer?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But then we're back to not having a schema type for it...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right haha. doing i64

kernel/src/engine/default/parquet.rs Outdated Show resolved Hide resolved
Copy link
Collaborator

@nicklan nicklan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getting close, just a couple more things I think :)

}

// convert ParquetMetadata into a record batch which matches the 'write_metadata' schema
fn as_record_batch(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm okay with that. Note that the trait probably wouldn't have into_engine_data, but rather something in the engine would need an into_engine_data(Vec<TheNewTrait>) (or maybe iter). You need a collection of them all at once

let batch: Box<_> = ArrowEngineData::try_from_engine_data(data)?;
let record_batch = batch.record_batch();

let mut buffer = vec![];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, too much time invested in my aside here. It would be an issue on object_store which we could look at, but probably not now, and probably not something to make an issue for in this repo.

@@ -52,7 +52,7 @@ impl JsonHandler for SyncJsonHandler {
fn write_json_file(
&self,
path: &Url,
data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send>,
data: Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send + '_>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh, finally I get it. Yeah, we want to let the errors propagate until we need their result. This makes sense. Thanks for bearing with me :)

kernel/tests/write.rs Show resolved Hide resolved
kernel/tests/write.rs Outdated Show resolved Hide resolved
kernel/tests/write.rs Outdated Show resolved Hide resolved
.put(&Path::from(path.path()), buffer.into())
.await?;

let metadata = self.store.head(&Path::from(path.path())).await?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want any kind of "todo" here? We could avoid this and just assume size will be correct and use current time as modification time. That would be an optimization people might want, so maybe we could have an option for it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea good call! i'll make an issue for follow up

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kernel/tests/write.rs Outdated Show resolved Hide resolved
kernel/tests/write.rs Show resolved Hide resolved
kernel/tests/write.rs Outdated Show resolved Hide resolved
kernel/tests/write.rs Show resolved Hide resolved
path: &str,
new_value: serde_json::Value,
) -> Result<(), Box<dyn std::error::Error>> {
let mut path_string = path.replace(".", "/");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious, why not just take a json pointer string as the path?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eh just used to the field1.field2 syntax so was trying to make it easier on future usage (instead of having to write /field1/field2

happy to change if the latter seems better

Copy link
Collaborator

@nicklan nicklan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool thanks! exciting to get write support! I'll review a bit more, but approving now as I think this is okay to merge and we can tackle improvements as follow-ups.

Copy link
Collaborator

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good, other than a bunch of nits to fix before merge.

kernel/src/transaction.rs Show resolved Hide resolved
Comment on lines 147 to 159
Expression::struct_from(self.read_snapshot.schema().fields().filter_map(|f| {
if self
.read_snapshot
.metadata()
.partition_columns
.contains(f.name())
{
None
} else {
let col_name = ColumnName::new([f.name()]);
Some(col_name.into())
}
}))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Expression::struct_from(self.read_snapshot.schema().fields().filter_map(|f| {
if self
.read_snapshot
.metadata()
.partition_columns
.contains(f.name())
{
None
} else {
let col_name = ColumnName::new([f.name()]);
Some(col_name.into())
}
}))
let partition_columns = self.read_snapshot.metadata().partition_columns;
Expression::struct_from(self.read_snapshot.schema().fields().filter_map(|f| {
(!partition_columns.contains(f.name()).then(|| Expression::column([f.name()])
}))

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it might be cleaner to just do a filter+map pair?

Suggested change
Expression::struct_from(self.read_snapshot.schema().fields().filter_map(|f| {
if self
.read_snapshot
.metadata()
.partition_columns
.contains(f.name())
{
None
} else {
let col_name = ColumnName::new([f.name()]);
Some(col_name.into())
}
}))
let partition_columns = &self.read_snapshot.metadata().partition_columns;
let fields = self.read_snapshot.schema().fields();
let fields = fields
.filter(|f| !partition_columns.contains(f.name()))
.map(|f| Expression::column([f.name()]));
Expression::struct_from(fields)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think I have to clone partition columns and I just did a filter_map version of your second suggestion - lmk what you think :)

let logical_to_physical = self.generate_logical_to_physical();
WriteContext::new(
target_dir.clone(),
Arc::new(snapshot_schema.clone()),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aside: Should the snapshot's schema be an arc? Wide tables could lead to expensive clone...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea I think that makes sense. opened #463 for more discussion :)

/// multiple times to add multiple batches.
///
/// The expected schema for `write_metadata` is given by [`get_write_metadata_schema`].
pub fn add_write_metadata(&mut self, write_metadata: Box<dyn EngineData>) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally thought that most of our API used Arc<dyn EngineData> instead of box... but a quick search says we only have one Arc -- the transaction's commit_info. Maybe we should fix that other one?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh interesting - I'll have that as another little follow-up: #464

@@ -56,14 +81,14 @@ impl Transaction {
read_snapshot: snapshot.into(),
operation: None,
commit_info: None,
write_metadata: vec![],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java kernel requires engine to hand over an iterator, potentially multiple times if there are retries. That way it's engine's job to deal with memory management and spilling on big commits.

kernel/src/engine/default/parquet.rs Outdated Show resolved Hide resolved
Comment on lines 165 to 166
let write_metadata = parquet_metadata.as_record_batch(&partition_values, data_change)?;
Ok(write_metadata)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
let write_metadata = parquet_metadata.as_record_batch(&partition_values, data_change)?;
Ok(write_metadata)
Ok(parquet_metadata.as_record_batch(&partition_values, data_change)?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea actually just did

parquet_metadata.as_record_batch(&partition_values, data_change)

Comment on lines +189 to +192
fn generate_adds<'a>(
engine: &dyn Engine,
write_metadata: impl Iterator<Item = &'a dyn EngineData> + Send + 'a,
) -> impl Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send + 'a {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting... this says the items returned by the iterator do not outlive the iterator itself. I would have expected something like

Suggested change
fn generate_adds<'a>(
engine: &dyn Engine,
write_metadata: impl Iterator<Item = &'a dyn EngineData> + Send + 'a,
) -> impl Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send + 'a {
fn generate_adds<'i, 'a: 'i>(
engine: &dyn Engine,
write_metadata: impl Iterator<Item = &'a dyn EngineData> + Send + 'i,
) -> impl Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send + 'i {

(but I guess if it's not causing any lifetime issues as-is, then there's no need to change it)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i suppose the existing semantics imply that it's okay to have the 'items returned by the iterator not outliving the iterator itself'?

@@ -251,3 +270,382 @@ async fn test_invalid_commit_info() -> Result<(), Box<dyn std::error::Error>> {
));
Ok(())
}

// check that the timestamps in commit_info and add actions are within 10s of SystemTime::now()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we care, sorry?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was just a rough sanity check that we aren't wayy off. (otherwise we don't check the time field at all)

@nicklan actually suggested this and it caught a bug where one of the file's timestamps was accidentally in seconds instead of milliseconds!

Comment on lines +374 to +376
let mut parsed_commits: Vec<_> = Deserializer::from_slice(&commit1.bytes().await?)
.into_iter::<serde_json::Value>()
.try_collect()?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Does this work, out of curiosity?

Suggested change
let mut parsed_commits: Vec<_> = Deserializer::from_slice(&commit1.bytes().await?)
.into_iter::<serde_json::Value>()
.try_collect()?;
let commit1 = commit1.bytes().await?;
let mut parsed_commits: Vec<serde_json::Value> = Deserializer::from_slice(&commit1)
.into_iter()
.try_collect()?;

(but actually it's harder to read that than the turbofish, so meh)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it does work! but yea i agree original is more readable to me?

@zachschuermann zachschuermann merged commit d8329ae into delta-incubator:main Nov 8, 2024
17 checks passed
@zachschuermann zachschuermann deleted the transaction-data-path branch November 8, 2024 00:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
breaking-change Change that will require a version bump
Projects
None yet
Development

Successfully merging this pull request may close these issues.

append transaction data path
3 participants