Skip to content

Commit

Permalink
WIP: Compiling but the test I introduced still failing
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler committed Mar 25, 2023
1 parent ea321f3 commit 53a4fe2
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 48 deletions.
37 changes: 31 additions & 6 deletions rust/src/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use arrow::datatypes::Schema as ArrowSchema;
use arrow::error::ArrowError;
use arrow::json::reader::{Decoder, DecoderOptions};
use arrow::json::RawReaderBuilder;
use chrono::{DateTime, Datelike, Duration, Utc};
use futures::StreamExt;
use lazy_static::lazy_static;
Expand All @@ -14,6 +14,7 @@ use regex::Regex;
use serde_json::Value;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::io::Write;
use std::iter::Iterator;
use std::ops::Add;

Expand Down Expand Up @@ -72,6 +73,13 @@ pub enum CheckpointError {
#[from]
source: serde_json::Error,
},
/// Passthrough error returned when doing std::io operations
#[error("std::io::Error: {source}")]
Io {
/// The source std::io::Error
#[from]
source: std::io::Error,
},
}

/// The record batch size for checkpoint parquet file
Expand Down Expand Up @@ -332,7 +340,7 @@ fn parquet_bytes_from_state(state: &DeltaTableState) -> Result<bytes::Bytes, Che
}

// protocol
let mut jsons = std::iter::once(action::Action::protocol(action::Protocol {
let jsons = std::iter::once(action::Action::protocol(action::Protocol {
min_reader_version: state.min_reader_version(),
min_writer_version: state.min_writer_version(),
}))
Expand Down Expand Up @@ -382,11 +390,28 @@ fn parquet_bytes_from_state(state: &DeltaTableState) -> Result<bytes::Bytes, Che
// Write the Checkpoint parquet file.
let mut bytes = vec![];
let mut writer = ArrowWriter::try_new(&mut bytes, arrow_schema.clone(), None)?;
let options = DecoderOptions::new().with_batch_size(CHECKPOINT_RECORD_BATCH_SIZE);
let decoder = Decoder::new(arrow_schema, options);
while let Some(batch) = decoder.next_batch(&mut jsons)? {
writer.write(&batch)?;
let mut decoder = RawReaderBuilder::new(arrow_schema)
.with_batch_size(CHECKPOINT_RECORD_BATCH_SIZE)
.build_decoder()?;

let mut buf = vec![];
for res in jsons {
let json = res?;
buf.write_all(serde_json::to_string(&json)?.as_bytes())?;
}
let mut consumed = 0;

loop {
let read_bytes = decoder.decode(&buf)?;
consumed += read_bytes;
if let Some(batch) = decoder.flush()? {
writer.write(&batch)?;
}
if consumed == buf.len() {
break;
}
}

let _ = writer.close()?;
debug!("Finished writing checkpoint parquet buffer.");

Expand Down
12 changes: 6 additions & 6 deletions rust/src/storage/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use super::utils::str_is_truthy;
use crate::{DeltaResult, DeltaTableError};
use object_store::memory::InMemory;
use object_store::path::Path;
use object_store::prefix::PrefixObjectStore;
use object_store::prefix::PrefixStore;
use object_store::DynObjectStore;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
Expand Down Expand Up @@ -111,14 +111,14 @@ pub(crate) enum ObjectStoreImpl {
impl ObjectStoreImpl {
pub(crate) fn into_prefix(self, prefix: Path) -> Arc<DynObjectStore> {
match self {
ObjectStoreImpl::Local(store) => Arc::new(PrefixObjectStore::new(store, prefix)),
ObjectStoreImpl::InMemory(store) => Arc::new(PrefixObjectStore::new(store, prefix)),
ObjectStoreImpl::Local(store) => Arc::new(PrefixStore::new(store, prefix)),
ObjectStoreImpl::InMemory(store) => Arc::new(PrefixStore::new(store, prefix)),
#[cfg(feature = "azure")]
ObjectStoreImpl::Azure(store) => Arc::new(PrefixObjectStore::new(store, prefix)),
ObjectStoreImpl::Azure(store) => Arc::new(PrefixStore::new(store, prefix)),
#[cfg(any(feature = "s3", feature = "s3-native-tls"))]
ObjectStoreImpl::S3(store) => Arc::new(PrefixObjectStore::new(store, prefix)),
ObjectStoreImpl::S3(store) => Arc::new(PrefixStore::new(store, prefix)),
#[cfg(feature = "gcs")]
ObjectStoreImpl::Google(store) => Arc::new(PrefixObjectStore::new(store, prefix)),
ObjectStoreImpl::Google(store) => Arc::new(PrefixStore::new(store, prefix)),
}
}

Expand Down
2 changes: 1 addition & 1 deletion rust/src/writer/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ mod tests {
let data = serde_json::json!(
{
"id" : "A",
"value": "test",
"value": 42,
"modified": "2021-02-01"
}
);
Expand Down
43 changes: 15 additions & 28 deletions rust/src/writer/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ mod tests {
test_utils::{create_initialized_table, get_record_batch},
utils::PartitionPath,
};
use arrow::json::RawReaderBuilder;
use std::path::Path;

#[tokio::test]
Expand Down Expand Up @@ -456,7 +457,6 @@ mod tests {
#[tokio::test]
async fn test_divide_record_batch_with_map_single_partition() {
use crate::{action::Protocol, SchemaTypeStruct};
use arrow::json::reader::{Decoder, DecoderOptions};

let mut table = crate::writer::test_utils::create_bare_table();
let partition_cols = vec!["modified".to_string()];
Expand Down Expand Up @@ -504,40 +504,27 @@ mod tests {
.await
.unwrap();

let data_as_json = r#"[
let buf = r#"
{"id" : "0xdeadbeef", "value" : 42, "modified" : "2021-02-01",
"metadata" : {"some-key" : "some-value"}},
{"id" : "0xdeadcaf", "value" : 3, "modified" : "2021-02-02",
"metadata" : {"some-key" : "some-value"}}
]"#;

let deser_data: Vec<serde_json::Value> =
serde_json::from_str(data_as_json).expect("Failed to deserialize test record");
let mut data_iterable = deser_data.iter().map(|v| Ok(v.to_owned()));
/*
* This works
use arrow::datatypes::{DataType, Field};
let entries = DataType::Struct(vec![
Field::new("key", DataType::Utf8, false),
Field::new("value", DataType::Utf8, true),
]);
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Utf8, true),
Field::new("value", DataType::Int32, true),
Field::new("modified", DataType::Utf8, true),
Field::new("metadata", DataType::Map(
Box::new(Field::new("entries", entries, true)), false), true),
]));
*/
{"id" : "0xdeadcaf", "value" : 3, "modified" : "2021-02-02",
"metadata" : {"some-key" : "some-value"}}"#;

let schema: ArrowSchema =
<ArrowSchema as TryFrom<&Schema>>::try_from(&delta_schema).unwrap();
let options = DecoderOptions::new();
let decoder = Decoder::new(Arc::new(schema), options);
let batch = decoder.next_batch(&mut data_iterable).unwrap().unwrap();

let mut writer = RecordBatchWriter::for_table(&table).unwrap();
// Using a batch size of two since the buf above only has two records
let mut decoder = RawReaderBuilder::new(Arc::new(schema))
.with_batch_size(2)
.build_decoder()
.expect("Failed to build decoder");

decoder
.decode(&buf.as_bytes())
.expect("Failed to deserialize the JSON in the buffer");
let batch = decoder.flush().expect("Failed to flush").unwrap();

let mut writer = RecordBatchWriter::for_table(&table).unwrap();
let partitions = writer.divide_by_partition_values(&batch).unwrap();

let expected_keys = vec![
Expand Down
29 changes: 22 additions & 7 deletions rust/src/writer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::io::Write;
use std::sync::Arc;

use crate::writer::DeltaWriterError;
use crate::DeltaTableError;
use crate::{DeltaResult, DeltaTableError};

use arrow::array::{
as_boolean_array, as_generic_binary_array, as_primitive_array, as_string_array, Array,
Expand All @@ -16,7 +16,7 @@ use arrow::datatypes::{
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type,
UInt64Type, UInt8Type,
};
use arrow::json::reader::{Decoder, DecoderOptions};
use arrow::json::RawReaderBuilder;
use arrow::record_batch::*;
use object_store::path::Path;
use parking_lot::RwLock;
Expand Down Expand Up @@ -108,12 +108,27 @@ pub(crate) fn next_data_path(
pub fn record_batch_from_message(
arrow_schema: Arc<ArrowSchema>,
message_buffer: &[Value],
) -> Result<RecordBatch, DeltaTableError> {
let mut value_iter = message_buffer.iter().map(|j| Ok(j.to_owned()));
let options = DecoderOptions::new().with_batch_size(message_buffer.len());
let decoder = Decoder::new(arrow_schema, options);
) -> DeltaResult<RecordBatch> {
let mut buf = vec![];
for message in message_buffer {
buf.write_all(
serde_json::to_string(&message)
// Unsure why From<serde_json::Error> is not working for DeltaTableError
.map_err(|e| DeltaTableError::GenericError {
source: Box::new(e),
})?
.as_bytes(),
)?;
}
// Reading the batch size of the message_buffer num rows to ensure this reads
// a single RecordBatch out of the buffer
let mut decoder = RawReaderBuilder::new(arrow_schema)
.with_batch_size(message_buffer.len())
.build_decoder()?;

let _read_bytes = decoder.decode(&buf)?;
decoder
.next_batch(&mut value_iter)?
.flush()?
.ok_or_else(|| DeltaWriterError::EmptyRecordBatch.into())
}

Expand Down

0 comments on commit 53a4fe2

Please sign in to comment.