From 2716ede2ff35433536a7d4b4415f0de7d8e866a6 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 24 May 2022 08:57:50 +0100 Subject: [PATCH] Review feedback --- parquet/src/arrow/array_reader/list_array.rs | 2 +- parquet/src/arrow/arrow_reader.rs | 4 +- parquet/src/arrow/arrow_writer.rs | 21 +- parquet/src/arrow/schema.rs | 4 +- parquet/src/column/mod.rs | 22 +- parquet/src/file/mod.rs | 14 +- parquet/src/file/writer.rs | 258 +++++++------------ parquet/src/record/record_writer.rs | 6 +- parquet/src/util/cursor.rs | 72 +++++- parquet/src/util/io.rs | 72 ++++++ parquet/tests/boolean_writer.rs | 2 - parquet_derive/src/lib.rs | 4 +- parquet_derive_test/src/lib.rs | 12 +- 13 files changed, 268 insertions(+), 225 deletions(-) diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs index 2d199f69ea29..808f815e6b42 100644 --- a/parquet/src/arrow/array_reader/list_array.rs +++ b/parquet/src/arrow/array_reader/list_array.rs @@ -564,7 +564,7 @@ mod tests { .set_max_row_group_size(200) .build(); - let mut writer = ArrowWriter::try_new( + let writer = ArrowWriter::try_new( file.try_clone().unwrap(), Arc::new(arrow_schema), Some(props), diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs index 3f6dbea855e4..34a14f3725f7 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader.rs @@ -270,7 +270,7 @@ mod tests { use crate::errors::Result; use crate::file::properties::{WriterProperties, WriterVersion}; use crate::file::reader::{FileReader, SerializedFileReader}; - use crate::file::writer::{FileWriter, SerializedFileWriter}; + use crate::file::writer::SerializedFileWriter; use crate::schema::parser::parse_message_type; use crate::schema::types::{Type, TypePtr}; use crate::util::cursor::SliceableCursor; @@ -952,7 +952,7 @@ mod tests { writer.close() } - fn get_test_reader(file_name: &str) -> Arc { + fn get_test_reader(file_name: &str) -> Arc> { let file = get_test_file(file_name); let reader = diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer.rs index a9e9a42bed7e..e23f9f6ced2f 100644 --- a/parquet/src/arrow/arrow_writer.rs +++ b/parquet/src/arrow/arrow_writer.rs @@ -36,11 +36,8 @@ use super::schema::{ use crate::column::writer::ColumnWriter; use crate::errors::{ParquetError, Result}; use crate::file::properties::WriterProperties; -use crate::file::writer::SerializedColumnWriter; -use crate::{ - data_type::*, - file::writer::{FileWriter, RowGroupWriter, SerializedFileWriter}, -}; +use crate::file::writer::{SerializedColumnWriter, SerializedRowGroupWriter}; +use crate::{data_type::*, file::writer::SerializedFileWriter}; /// Arrow writer /// @@ -187,7 +184,7 @@ impl ArrowWriter { }) .collect(); - write_leaves(row_group_writer.as_mut(), &arrays, &mut levels)?; + write_leaves(&mut row_group_writer, &arrays, &mut levels)?; } row_group_writer.close().unwrap(); @@ -197,7 +194,7 @@ impl ArrowWriter { } /// Close and finalize the underlying Parquet writer - pub fn close(&mut self) -> Result { + pub fn close(mut self) -> Result { self.flush()?; self.writer.close() } @@ -205,17 +202,17 @@ impl ArrowWriter { /// Convenience method to get the next ColumnWriter from the RowGroupWriter #[inline] -fn get_col_writer( - row_group_writer: &mut dyn RowGroupWriter, -) -> Result> { +fn get_col_writer<'a, W: Write>( + row_group_writer: &'a mut SerializedRowGroupWriter<'_, W>, +) -> Result> { let col_writer = row_group_writer .next_column()? .expect("Unable to get column writer"); Ok(col_writer) } -fn write_leaves( - row_group_writer: &mut dyn RowGroupWriter, +fn write_leaves( + row_group_writer: &mut SerializedRowGroupWriter<'_, W>, arrays: &[ArrayRef], levels: &mut [Vec], ) -> Result<()> { diff --git a/parquet/src/arrow/schema.rs b/parquet/src/arrow/schema.rs index 820aa7e7a817..5416e4078538 100644 --- a/parquet/src/arrow/schema.rs +++ b/parquet/src/arrow/schema.rs @@ -1591,7 +1591,7 @@ mod tests { // write to an empty parquet file so that schema is serialized let file = tempfile::tempfile().unwrap(); - let mut writer = ArrowWriter::try_new( + let writer = ArrowWriter::try_new( file.try_clone().unwrap(), Arc::new(schema.clone()), None, @@ -1660,7 +1660,7 @@ mod tests { // write to an empty parquet file so that schema is serialized let file = tempfile::tempfile().unwrap(); - let mut writer = ArrowWriter::try_new( + let writer = ArrowWriter::try_new( file.try_clone().unwrap(), Arc::new(schema.clone()), None, diff --git a/parquet/src/column/mod.rs b/parquet/src/column/mod.rs index e068db3c2265..93a4f00d2eef 100644 --- a/parquet/src/column/mod.rs +++ b/parquet/src/column/mod.rs @@ -44,7 +44,7 @@ //! file::{ //! properties::WriterProperties, //! reader::{FileReader, SerializedFileReader}, -//! writer::{FileWriter, SerializedFileWriter}, +//! writer::SerializedFileWriter, //! }, //! schema::parser::parse_message_type, //! }; @@ -66,17 +66,17 @@ //! let props = Arc::new(WriterProperties::builder().build()); //! let file = fs::File::create(path).unwrap(); //! let mut writer = SerializedFileWriter::new(file, schema, props).unwrap(); -//! { -//! let mut row_group_writer = writer.next_row_group().unwrap(); -//! while let Some(mut col_writer) = row_group_writer.next_column().unwrap() { -//! col_writer -//! .typed::() -//! .write_batch(&[1, 2, 3], Some(&[3, 3, 3, 2, 2]), Some(&[0, 1, 0, 1, 1])) -//! .unwrap(); -//! col_writer.close().unwrap(); -//! } -//! row_group_writer.close().unwrap(); +//! +//! let mut row_group_writer = writer.next_row_group().unwrap(); +//! while let Some(mut col_writer) = row_group_writer.next_column().unwrap() { +//! col_writer +//! .typed::() +//! .write_batch(&[1, 2, 3], Some(&[3, 3, 3, 2, 2]), Some(&[0, 1, 0, 1, 1])) +//! .unwrap(); +//! col_writer.close().unwrap(); //! } +//! row_group_writer.close().unwrap(); +//! //! writer.close().unwrap(); //! //! // Reading data using column reader API. diff --git a/parquet/src/file/mod.rs b/parquet/src/file/mod.rs index a9d79e1a2ed0..d293dc7731ad 100644 --- a/parquet/src/file/mod.rs +++ b/parquet/src/file/mod.rs @@ -32,7 +32,7 @@ //! use parquet::{ //! file::{ //! properties::WriterProperties, -//! writer::{FileWriter, SerializedFileWriter}, +//! writer::SerializedFileWriter, //! }, //! schema::parser::parse_message_type, //! }; @@ -48,14 +48,12 @@ //! let props = Arc::new(WriterProperties::builder().build()); //! let file = fs::File::create(&path).unwrap(); //! let mut writer = SerializedFileWriter::new(file, schema, props).unwrap(); -//! { -//! let mut row_group_writer = writer.next_row_group().unwrap(); -//! while let Some(mut col_writer) = row_group_writer.next_column().unwrap() { -//! // ... write values to a column writer -//! col_writer.close().unwrap() -//! } -//! row_group_writer.close().unwrap(); +//! let mut row_group_writer = writer.next_row_group().unwrap(); +//! while let Some(mut col_writer) = row_group_writer.next_column().unwrap() { +//! // ... write values to a column writer +//! col_writer.close().unwrap() //! } +//! row_group_writer.close().unwrap(); //! writer.close().unwrap(); //! //! let bytes = fs::read(&path).unwrap(); diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 8e9e0824ad64..646550dcb6be 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -37,6 +37,7 @@ use crate::file::{ statistics::to_thrift as statistics_to_thrift, FOOTER_SIZE, PARQUET_MAGIC, }; use crate::schema::types::{self, SchemaDescPtr, SchemaDescriptor, TypePtr}; +use crate::util::io::TryClone; /// A wrapper around a [`Write`] that keeps track of the number /// of bytes that have been written @@ -72,62 +73,6 @@ impl Write for TrackedWrite { } } -// ---------------------------------------------------------------------- -// APIs for file & row group writers - -/// Parquet file writer API. -/// Provides methods to write row groups sequentially. -/// -/// The main workflow should be as following: -/// - Create file writer, this will open a new file and potentially write some metadata. -/// - Request a new row group writer by calling `next_row_group`. -/// - Once finished writing row group, close row group writer by passing it into -/// `close_row_group` method - this will finalise row group metadata and update metrics. -/// - Write subsequent row groups, if necessary. -/// - After all row groups have been written, close the file writer using `close` method. -pub trait FileWriter { - /// Creates new row group from this file writer. - /// In case of IO error or Thrift error, returns `Err`. - /// - /// There is no limit on a number of row groups in a file; however, row groups have - /// to be written sequentially. Every time the next row group is requested, the - /// previous row group must be finalised and closed using `RowGroupWriter::close` method. - fn next_row_group(&mut self) -> Result>; - - /// Closes and finalises file writer, returning the file metadata. - /// - /// All row groups must be appended before this method is called. - /// No writes are allowed after this point. - /// - /// Can be called multiple times. It is up to implementation to either result in - /// no-op, or return an `Err` for subsequent calls. - fn close(&mut self) -> Result; -} - -/// Parquet row group writer API. -/// Provides methods to access column writers in an iterator-like fashion, order is -/// guaranteed to match the order of schema leaves (column descriptors). -/// -/// All columns should be written sequentially; the main workflow is: -/// - Request the next column using `next_column` method - this will return `None` if no -/// more columns are available to write. -/// - Once done writing a column, close column writer with `close` -/// - Once all columns have been written, close row group writer with `close` method - -/// it will return row group metadata and is no-op on already closed row group. -pub trait RowGroupWriter { - /// Returns the next column writer, if available; otherwise returns `None`. - /// In case of any IO error or Thrift error, or if row group writer has already been - /// closed returns `Err`. - fn next_column(&mut self) -> Result>>; - - /// Closes this row group writer and returns row group metadata. - /// After calling this method row group writer must not be used. - /// - /// Can be called multiple times. In subsequent calls will result in no-op and return - /// already created row group metadata. - fn close(&mut self) -> Result; -} - /// Callback invoked on closing a column chunk, arguments are: /// /// - the number of bytes written @@ -142,11 +87,23 @@ pub type OnCloseColumnChunk<'a> = /// - the row group metadata pub type OnCloseRowGroup<'a> = Box Result<()> + 'a>; +#[deprecated = "use std::io::Write"] +pub trait ParquetWriter: Write + std::io::Seek + TryClone {} +#[allow(deprecated)] +impl ParquetWriter for T {} + // ---------------------------------------------------------------------- // Serialized impl for file & row group writers -/// A serialized implementation for Parquet [`FileWriter`]. -/// See documentation on file writer for more information. +/// Parquet file writer API. +/// Provides methods to write row groups sequentially. +/// +/// The main workflow should be as following: +/// - Create file writer, this will open a new file and potentially write some metadata. +/// - Request a new row group writer by calling `next_row_group`. +/// - Once finished writing row group, close row group writer by calling `close` +/// - Write subsequent row groups, if necessary. +/// - After all row groups have been written, close the file writer using `close` method. pub struct SerializedFileWriter { buf: TrackedWrite, schema: TypePtr, @@ -154,7 +111,6 @@ pub struct SerializedFileWriter { props: WriterPropertiesPtr, row_groups: Vec, row_group_index: usize, - is_closed: bool, } impl SerializedFileWriter { @@ -169,10 +125,47 @@ impl SerializedFileWriter { props: properties, row_groups: vec![], row_group_index: 0, - is_closed: false, }) } + /// Creates new row group from this file writer. + /// In case of IO error or Thrift error, returns `Err`. + /// + /// There is no limit on a number of row groups in a file; however, row groups have + /// to be written sequentially. Every time the next row group is requested, the + /// previous row group must be finalised and closed using `RowGroupWriter::close` method. + pub fn next_row_group(&mut self) -> Result> { + self.assert_previous_writer_closed()?; + self.row_group_index += 1; + + let row_groups = &mut self.row_groups; + let on_close = |metadata| { + row_groups.push(metadata); + Ok(()) + }; + + let row_group_writer = SerializedRowGroupWriter::new( + self.descr.clone(), + self.props.clone(), + &mut self.buf, + Some(Box::new(on_close)), + ); + Ok(row_group_writer) + } + + /// Closes and finalises file writer, returning the file metadata. + /// + /// All row groups must be appended before this method is called. + /// No writes are allowed after this point. + /// + /// Can be called multiple times. It is up to implementation to either result in + /// no-op, or return an `Err` for subsequent calls. + pub fn close(mut self) -> Result { + self.assert_previous_writer_closed()?; + let metadata = self.write_metadata()?; + Ok(metadata) + } + /// Writes magic bytes at the beginning of the file. fn start_file(buf: &mut TrackedWrite) -> Result<()> { buf.write_all(&PARQUET_MAGIC)?; @@ -220,15 +213,6 @@ impl SerializedFileWriter { Ok(file_metadata) } - #[inline] - fn assert_not_closed(&self) -> Result<()> { - if self.is_closed { - Err(general_err!("File writer is closed")) - } else { - Ok(()) - } - } - #[inline] fn assert_previous_writer_closed(&self) -> Result<()> { if self.row_group_index != self.row_groups.len() { @@ -239,40 +223,16 @@ impl SerializedFileWriter { } } -impl FileWriter for SerializedFileWriter { - #[inline] - fn next_row_group(&mut self) -> Result> { - self.assert_not_closed()?; - self.assert_previous_writer_closed()?; - self.row_group_index += 1; - - let row_groups = &mut self.row_groups; - let on_close = |metadata| { - row_groups.push(metadata); - Ok(()) - }; - - let row_group_writer = SerializedRowGroupWriter::new( - self.descr.clone(), - self.props.clone(), - &mut self.buf, - Some(Box::new(on_close)), - ); - Ok(Box::new(row_group_writer)) - } - - fn close(&mut self) -> Result { - self.assert_not_closed()?; - self.assert_previous_writer_closed()?; - let metadata = self.write_metadata()?; - self.is_closed = true; - Ok(metadata) - } -} - -/// A serialized implementation for Parquet [`RowGroupWriter`]. -/// Coordinates writing of a row group with column writers. -/// See documentation on row group writer for more information. +/// Parquet row group writer API. +/// Provides methods to access column writers in an iterator-like fashion, order is +/// guaranteed to match the order of schema leaves (column descriptors). +/// +/// All columns should be written sequentially; the main workflow is: +/// - Request the next column using `next_column` method - this will return `None` if no +/// more columns are available to write. +/// - Once done writing a column, close column writer with `close` +/// - Once all columns have been written, close row group writer with `close` method - +/// it will return row group metadata and is no-op on already closed row group. pub struct SerializedRowGroupWriter<'a, W: Write> { descr: SchemaDescPtr, props: WriterPropertiesPtr, @@ -312,28 +272,10 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> { } } - #[inline] - fn assert_closed(&self) -> Result<()> { - if self.row_group_metadata.is_some() { - Err(general_err!("Row group writer is closed")) - } else { - Ok(()) - } - } - - #[inline] - fn assert_previous_writer_closed(&self) -> Result<()> { - if self.column_index != self.column_chunks.len() { - Err(general_err!("Previous column writer was not closed")) - } else { - Ok(()) - } - } -} - -impl<'a, W: Write> RowGroupWriter for SerializedRowGroupWriter<'a, W> { - fn next_column(&mut self) -> Result>> { - self.assert_closed()?; + /// Returns the next column writer, if available; otherwise returns `None`. + /// In case of any IO error or Thrift error, or if row group writer has already been + /// closed returns `Err`. + pub fn next_column(&mut self) -> Result>> { self.assert_previous_writer_closed()?; if self.column_index >= self.descr.num_columns() { @@ -376,7 +318,12 @@ impl<'a, W: Write> RowGroupWriter for SerializedRowGroupWriter<'a, W> { ))) } - fn close(&mut self) -> Result { + /// Closes this row group writer and returns row group metadata. + /// After calling this method row group writer must not be used. + /// + /// Can be called multiple times. In subsequent calls will result in no-op and return + /// already created row group metadata. + pub fn close(mut self) -> Result { if self.row_group_metadata.is_none() { self.assert_previous_writer_closed()?; @@ -398,6 +345,15 @@ impl<'a, W: Write> RowGroupWriter for SerializedRowGroupWriter<'a, W> { let metadata = self.row_group_metadata.as_ref().unwrap().clone(); Ok(metadata) } + + #[inline] + fn assert_previous_writer_closed(&self) -> Result<()> { + if self.column_index != self.column_chunks.len() { + Err(general_err!("Previous column writer was not closed")) + } else { + Ok(()) + } + } } /// A wrapper around a [`ColumnWriter`] that invokes a callback on [`Self::close`] @@ -599,48 +555,6 @@ mod tests { use crate::record::RowAccessor; use crate::util::memory::ByteBufferPtr; - #[test] - fn test_file_writer_error_after_close() { - let file = tempfile::tempfile().unwrap(); - let schema = Arc::new(types::Type::group_type_builder("schema").build().unwrap()); - let props = Arc::new(WriterProperties::builder().build()); - let mut writer = SerializedFileWriter::new(file, schema, props).unwrap(); - writer.close().unwrap(); - { - let res = writer.next_row_group(); - assert!(res.is_err()); - if let Err(err) = res { - assert_eq!(format!("{}", err), "Parquet error: File writer is closed"); - } - } - { - let res = writer.close(); - assert!(res.is_err()); - if let Err(err) = res { - assert_eq!(format!("{}", err), "Parquet error: File writer is closed"); - } - } - } - - #[test] - fn test_row_group_writer_error_after_close() { - let file = tempfile::tempfile().unwrap(); - let schema = Arc::new(types::Type::group_type_builder("schema").build().unwrap()); - let props = Arc::new(WriterProperties::builder().build()); - let mut writer = SerializedFileWriter::new(file, schema, props).unwrap(); - let mut row_group_writer = writer.next_row_group().unwrap(); - row_group_writer.close().unwrap(); - - let res = row_group_writer.next_column(); - assert!(res.is_err()); - if let Err(err) = res { - assert_eq!( - format!("{}", err), - "Parquet error: Row group writer is closed" - ); - } - } - #[test] fn test_row_group_writer_error_not_all_columns_written() { let file = tempfile::tempfile().unwrap(); @@ -656,7 +570,7 @@ mod tests { ); let props = Arc::new(WriterProperties::builder().build()); let mut writer = SerializedFileWriter::new(file, schema, props).unwrap(); - let mut row_group_writer = writer.next_row_group().unwrap(); + let row_group_writer = writer.next_row_group().unwrap(); let res = row_group_writer.close(); assert!(res.is_err()); if let Err(err) = res { @@ -728,7 +642,7 @@ mod tests { .unwrap(), ); let props = Arc::new(WriterProperties::builder().build()); - let mut writer = + let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap(); writer.close().unwrap(); @@ -758,7 +672,7 @@ mod tests { )])) .build(), ); - let mut writer = + let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap(); writer.close().unwrap(); @@ -804,7 +718,7 @@ mod tests { .set_writer_version(WriterVersion::PARQUET_2_0) .build(), ); - let mut writer = + let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap(); writer.close().unwrap(); diff --git a/parquet/src/record/record_writer.rs b/parquet/src/record/record_writer.rs index fb8bfbb79aff..fe803a7ff4ef 100644 --- a/parquet/src/record/record_writer.rs +++ b/parquet/src/record/record_writer.rs @@ -18,12 +18,12 @@ use crate::schema::types::TypePtr; use super::super::errors::ParquetError; -use super::super::file::writer::RowGroupWriter; +use super::super::file::writer::SerializedRowGroupWriter; pub trait RecordWriter { - fn write_to_row_group( + fn write_to_row_group( &self, - row_group_writer: &mut dyn RowGroupWriter, + row_group_writer: &mut SerializedRowGroupWriter, ) -> Result<(), ParquetError>; /// Generated schema diff --git a/parquet/src/util/cursor.rs b/parquet/src/util/cursor.rs index a3b5ceccd763..ff7067fcbcad 100644 --- a/parquet/src/util/cursor.rs +++ b/parquet/src/util/cursor.rs @@ -15,8 +15,9 @@ // specific language governing permissions and limitations // under the License. -use std::io::{self, Error, ErrorKind, Read, Seek, SeekFrom}; -use std::sync::Arc; +use crate::util::io::TryClone; +use std::io::{self, Cursor, Error, ErrorKind, Read, Seek, SeekFrom, Write}; +use std::sync::{Arc, Mutex}; use std::{cmp, fmt}; /// This is object to use if your file is already in memory. @@ -131,6 +132,73 @@ impl Seek for SliceableCursor { } } +/// Use this type to write Parquet to memory rather than a file. +#[deprecated = "use Vec instead"] +#[derive(Debug, Default, Clone)] +pub struct InMemoryWriteableCursor { + buffer: Arc>>>, +} + +#[allow(deprecated)] +impl InMemoryWriteableCursor { + /// Consume this instance and return the underlying buffer as long as there are no other + /// references to this instance. + pub fn into_inner(self) -> Option> { + Arc::try_unwrap(self.buffer) + .ok() + .and_then(|mutex| mutex.into_inner().ok()) + .map(|cursor| cursor.into_inner()) + } + + /// Returns a clone of the underlying buffer + pub fn data(&self) -> Vec { + let inner = self.buffer.lock().unwrap(); + inner.get_ref().to_vec() + } + + /// Returns a length of the underlying buffer + pub fn len(&self) -> usize { + let inner = self.buffer.lock().unwrap(); + inner.get_ref().len() + } + + /// Returns true if the underlying buffer contains no elements + pub fn is_empty(&self) -> bool { + let inner = self.buffer.lock().unwrap(); + inner.get_ref().is_empty() + } +} + +#[allow(deprecated)] +impl TryClone for InMemoryWriteableCursor { + fn try_clone(&self) -> std::io::Result { + Ok(Self { + buffer: self.buffer.clone(), + }) + } +} + +#[allow(deprecated)] +impl Write for InMemoryWriteableCursor { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + let mut inner = self.buffer.lock().unwrap(); + inner.write(buf) + } + + fn flush(&mut self) -> std::io::Result<()> { + let mut inner = self.buffer.lock().unwrap(); + inner.flush() + } +} + +#[allow(deprecated)] +impl Seek for InMemoryWriteableCursor { + fn seek(&mut self, pos: SeekFrom) -> std::io::Result { + let mut inner = self.buffer.lock().unwrap(); + inner.seek(pos) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/parquet/src/util/io.rs b/parquet/src/util/io.rs index f982ca0328c2..a7b5e73074c6 100644 --- a/parquet/src/util/io.rs +++ b/parquet/src/util/io.rs @@ -18,6 +18,8 @@ use std::{cell::RefCell, cmp, fmt, io::*}; use crate::file::reader::Length; +#[allow(deprecated)] +use crate::file::writer::ParquetWriter; const DEFAULT_BUF_SIZE: usize = 8 * 1024; @@ -153,6 +155,52 @@ impl Length for FileSource { self.end - self.start } } + +/// Struct that represents `File` output stream with position tracking. +/// Used as a sink in file writer. +#[deprecated = "use TrackedWrite instead"] +#[allow(deprecated)] +pub struct FileSink { + buf: BufWriter, + // This is not necessarily position in the underlying file, + // but rather current position in the sink. + pos: u64, +} + +#[allow(deprecated)] +impl FileSink { + /// Creates new file sink. + /// Position is set to whatever position file has. + pub fn new(buf: &W) -> Self { + let mut owned_buf = buf.try_clone().unwrap(); + let pos = owned_buf.seek(SeekFrom::Current(0)).unwrap(); + Self { + buf: BufWriter::new(owned_buf), + pos, + } + } +} + +#[allow(deprecated)] +impl Write for FileSink { + fn write(&mut self, buf: &[u8]) -> Result { + let num_bytes = self.buf.write(buf)?; + self.pos += num_bytes as u64; + Ok(num_bytes) + } + + fn flush(&mut self) -> Result<()> { + self.buf.flush() + } +} + +#[allow(deprecated)] +impl Position for FileSink { + fn pos(&self) -> u64 { + self.pos + } +} + // Position implementation for Cursor to use in various tests. impl<'a> Position for Cursor<&'a mut Vec> { fn pos(&self) -> u64 { @@ -229,6 +277,30 @@ mod tests { assert_eq!(buf, vec![b'P', b'A', b'R', b'1']); } + #[test] + #[allow(deprecated)] + fn test_io_write_with_pos() { + let mut file = tempfile::tempfile().unwrap(); + file.write_all(&[b'a', b'b', b'c']).unwrap(); + + // Write into sink + let mut sink = FileSink::new(&file); + assert_eq!(sink.pos(), 3); + + sink.write_all(&[b'd', b'e', b'f', b'g']).unwrap(); + assert_eq!(sink.pos(), 7); + + sink.flush().unwrap(); + assert_eq!(sink.pos(), file.seek(SeekFrom::Current(0)).unwrap()); + + // Read data using file chunk + let mut res = vec![0u8; 7]; + let mut chunk = + FileSource::new(&file, 0, file.metadata().unwrap().len() as usize); + chunk.read_exact(&mut res[..]).unwrap(); + assert_eq!(res, vec![b'a', b'b', b'c', b'd', b'e', b'f', b'g']); + } + #[test] fn test_io_large_read() { // Generate repeated 'abcdef' pattern and write it into a file diff --git a/parquet/tests/boolean_writer.rs b/parquet/tests/boolean_writer.rs index 20884295e6fe..dc2eccfbf3c3 100644 --- a/parquet/tests/boolean_writer.rs +++ b/parquet/tests/boolean_writer.rs @@ -19,7 +19,6 @@ use parquet::data_type::BoolType; use parquet::file::properties::WriterProperties; use parquet::file::reader::FileReader; use parquet::file::serialized_reader::SerializedFileReader; -use parquet::file::writer::FileWriter; use parquet::file::writer::SerializedFileWriter; use parquet::schema::parser::parse_message_type; use std::fs; @@ -62,7 +61,6 @@ fn it_writes_data_without_hanging() { } let rg_md = row_group_writer.close().expect("close row group"); println!("total rows written: {}", rg_md.num_rows()); - row_group_writer.close().expect("close row groups"); } writer.close().expect("close writer"); diff --git a/parquet_derive/src/lib.rs b/parquet_derive/src/lib.rs index 9a10e55616f1..fc7af20ca3f1 100644 --- a/parquet_derive/src/lib.rs +++ b/parquet_derive/src/lib.rs @@ -98,9 +98,9 @@ pub fn parquet_record_writer(input: proc_macro::TokenStream) -> proc_macro::Toke (quote! { impl #generics RecordWriter<#derived_for #generics> for &[#derived_for #generics] { - fn write_to_row_group( + fn write_to_row_group( &self, - row_group_writer: &mut dyn parquet::file::writer::RowGroupWriter + row_group_writer: &mut parquet::file::writer::SerializedRowGroupWriter<'_, W> ) -> Result<(), parquet::errors::ParquetError> { let mut row_group_writer = row_group_writer; let records = &self; // Used by all the writer snippets to be more clear diff --git a/parquet_derive_test/src/lib.rs b/parquet_derive_test/src/lib.rs index 94aa32a9c8de..0222ac7b3e8c 100644 --- a/parquet_derive_test/src/lib.rs +++ b/parquet_derive_test/src/lib.rs @@ -56,7 +56,7 @@ mod tests { use parquet::{ file::{ properties::WriterProperties, - writer::{FileWriter, SerializedFileWriter}, + writer::SerializedFileWriter, }, schema::parser::parse_message_type, }; @@ -131,13 +131,9 @@ mod tests { let mut writer = SerializedFileWriter::new(file, generated_schema, props).unwrap(); - { - let mut row_group = writer.next_row_group().unwrap(); - drs.as_slice() - .write_to_row_group(row_group.as_mut()) - .unwrap(); - row_group.close().unwrap(); - } + let mut row_group = writer.next_row_group().unwrap(); + drs.as_slice().write_to_row_group(&mut row_group).unwrap(); + row_group.close().unwrap(); writer.close().unwrap(); }