Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chunked object reader everywhere #24

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ballista/rust/core/src/execution_plans/shuffle_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ fn stats_for_partitions(

async fn fetch_partition(
location: &PartitionLocation,
) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
) -> Result<Pin<Box<dyn RecordBatchStream + Send>>> {
let metadata = &location.executor_meta;
let partition_id = &location.partition_id;
let mut ballista_client =
Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/core/src/serde/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,7 @@ mod roundtrip_tests {
use core::panic;
use datafusion::datasource::listing::ListingTable;
use datafusion::datasource::object_store::{
FileMetaStream, ListEntryStream, ObjectReader, ObjectStore, SizedFile,
FileMetaStream, ListEntryStream, ObjectReaderWrapper, ObjectStore, SizedFile,
};
use datafusion::error::DataFusionError;
use datafusion::{
Expand Down Expand Up @@ -895,7 +895,7 @@ mod roundtrip_tests {
fn file_reader(
&self,
_file: SizedFile,
) -> datafusion::error::Result<Arc<dyn ObjectReader>> {
) -> datafusion::error::Result<ObjectReaderWrapper> {
Err(DataFusionError::NotImplemented(
"this is only a test object store".to_string(),
))
Expand Down
8 changes: 4 additions & 4 deletions ballista/rust/core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use futures::{Stream, StreamExt};
/// Stream data to disk in Arrow IPC format

pub async fn write_stream_to_disk(
stream: &mut Pin<Box<dyn RecordBatchStream + Send + Sync>>,
stream: &mut Pin<Box<dyn RecordBatchStream + Send>>,
path: &str,
disk_write_metric: &metrics::Time,
) -> Result<PartitionStats> {
Expand Down Expand Up @@ -98,7 +98,7 @@ pub async fn write_stream_to_disk(
}

pub async fn collect_stream(
stream: &mut Pin<Box<dyn RecordBatchStream + Send + Sync>>,
stream: &mut Pin<Box<dyn RecordBatchStream + Send>>,
) -> Result<Vec<RecordBatch>> {
let mut batches = vec![];
while let Some(batch) = stream.next().await {
Expand Down Expand Up @@ -310,13 +310,13 @@ impl<T: 'static + AsLogicalPlan> QueryPlanner for BallistaQueryPlanner<T> {
}

pub struct WrappedStream {
stream: Pin<Box<dyn Stream<Item = ArrowResult<RecordBatch>> + Send + Sync>>,
stream: Pin<Box<dyn Stream<Item = ArrowResult<RecordBatch>> + Send>>,
schema: SchemaRef,
}

impl WrappedStream {
pub fn new(
stream: Pin<Box<dyn Stream<Item = ArrowResult<RecordBatch>> + Send + Sync>>,
stream: Pin<Box<dyn Stream<Item = ArrowResult<RecordBatch>> + Send>>,
schema: SchemaRef,
) -> Self {
Self { stream, schema }
Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use futures::StreamExt;

use super::FileFormat;
use crate::avro_to_arrow::read_avro_schema_from_reader;
use crate::datasource::object_store::{ObjectReader, ObjectReaderStream};
use crate::datasource::object_store::{ObjectReaderStream, ObjectReaderWrapper};
use crate::error::Result;
use crate::logical_plan::Expr;
use crate::physical_plan::file_format::{AvroExec, FileScanConfig};
Expand All @@ -49,15 +49,15 @@ impl FileFormat for AvroFormat {
async fn infer_schema(&self, mut readers: ObjectReaderStream) -> Result<SchemaRef> {
let mut schemas = vec![];
while let Some(obj_reader) = readers.next().await {
let mut reader = obj_reader?.sync_reader()?;
let mut reader = obj_reader?;
let schema = read_avro_schema_from_reader(&mut reader)?;
schemas.push(schema);
}
let merged_schema = Schema::try_merge(schemas)?;
Ok(Arc::new(merged_schema))
}

async fn infer_stats(&self, _reader: Arc<dyn ObjectReader>) -> Result<Statistics> {
async fn infer_stats(&self, _reader: ObjectReaderWrapper) -> Result<Statistics> {
Ok(Statistics::default())
}

Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use async_trait::async_trait;
use futures::StreamExt;

use super::FileFormat;
use crate::datasource::object_store::{ObjectReader, ObjectReaderStream};
use crate::datasource::object_store::{ObjectReaderStream, ObjectReaderWrapper};
use crate::error::Result;
use crate::logical_plan::Expr;
use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
Expand Down Expand Up @@ -98,7 +98,7 @@ impl FileFormat for CsvFormat {
let mut records_to_read = self.schema_infer_max_rec.unwrap_or(std::usize::MAX);

while let Some(obj_reader) = readers.next().await {
let mut reader = obj_reader?.sync_reader()?;
let mut reader = obj_reader?;
let (schema, records_read) = arrow::csv::reader::infer_reader_schema(
&mut reader,
self.delimiter,
Expand All @@ -119,7 +119,7 @@ impl FileFormat for CsvFormat {
Ok(Arc::new(merged_schema))
}

async fn infer_stats(&self, _reader: Arc<dyn ObjectReader>) -> Result<Statistics> {
async fn infer_stats(&self, _reader: ObjectReaderWrapper) -> Result<Statistics> {
Ok(Statistics::default())
}

Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use futures::StreamExt;

use super::FileFormat;
use super::FileScanConfig;
use crate::datasource::object_store::{ObjectReader, ObjectReaderStream};
use crate::datasource::object_store::{ObjectReaderStream, ObjectReaderWrapper};
use crate::error::Result;
use crate::logical_plan::Expr;
use crate::physical_plan::file_format::NdJsonExec;
Expand Down Expand Up @@ -64,7 +64,7 @@ impl FileFormat for JsonFormat {
let mut schemas = Vec::new();
let mut records_to_read = self.schema_infer_max_rec.unwrap_or(usize::MAX);
while let Some(obj_reader) = readers.next().await {
let mut reader = BufReader::new(obj_reader?.sync_reader()?);
let mut reader = BufReader::new(obj_reader?);
let iter = ValueIter::new(&mut reader, None);
let schema = infer_json_schema_from_iterator(iter.take_while(|_| {
let should_take = records_to_read > 0;
Expand All @@ -81,7 +81,7 @@ impl FileFormat for JsonFormat {
Ok(Arc::new(schema))
}

async fn infer_stats(&self, _reader: Arc<dyn ObjectReader>) -> Result<Statistics> {
async fn infer_stats(&self, _reader: ObjectReaderWrapper) -> Result<Statistics> {
Ok(Statistics::default())
}

Expand Down
5 changes: 3 additions & 2 deletions datafusion/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ use crate::logical_plan::Expr;
use crate::physical_plan::file_format::FileScanConfig;
use crate::physical_plan::{ExecutionPlan, Statistics};

use crate::datasource::object_store::ObjectReaderWrapper;
use async_trait::async_trait;

use super::object_store::{ObjectReader, ObjectReaderStream};
use super::object_store::ObjectReaderStream;

/// This trait abstracts all the file format specific implementations
/// from the `TableProvider`. This helps code re-utilization accross
Expand All @@ -53,7 +54,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug {

/// Infer the statistics for the provided object. The cost and accuracy of the
/// estimated statistics might vary greatly between file formats.
async fn infer_stats(&self, reader: Arc<dyn ObjectReader>) -> Result<Statistics>;
async fn infer_stats(&self, reader: ObjectReaderWrapper) -> Result<Statistics>;

/// Take a list of files and convert it to the appropriate executor
/// according to this file format.
Expand Down
35 changes: 16 additions & 19 deletions datafusion/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Parquet format abstractions

use std::any::Any;
use std::io::Read;
use std::io::SeekFrom;
use std::sync::Arc;

use arrow::datatypes::Schema;
Expand All @@ -40,7 +40,7 @@ use crate::arrow::array::{
BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array,
};
use crate::arrow::datatypes::{DataType, Field};
use crate::datasource::object_store::{ObjectReader, ObjectReaderStream};
use crate::datasource::object_store::{ObjectReaderStream, ObjectReaderWrapper};
use crate::datasource::{create_max_min_accs, get_col_stats};
use crate::error::DataFusionError;
use crate::error::Result;
Expand Down Expand Up @@ -98,7 +98,7 @@ impl FileFormat for ParquetFormat {
Ok(Arc::new(merged_schema))
}

async fn infer_stats(&self, reader: Arc<dyn ObjectReader>) -> Result<Statistics> {
async fn infer_stats(&self, reader: ObjectReaderWrapper) -> Result<Statistics> {
let stats = fetch_statistics(reader)?;
Ok(stats)
}
Expand Down Expand Up @@ -268,19 +268,17 @@ fn summarize_min_max(
}

/// Read and parse the schema of the Parquet file at location `path`
fn fetch_schema(object_reader: Arc<dyn ObjectReader>) -> Result<Schema> {
let obj_reader = ChunkObjectReader(object_reader);
let file_reader = Arc::new(SerializedFileReader::new(obj_reader)?);
fn fetch_schema(object_reader: ObjectReaderWrapper) -> Result<Schema> {
let file_reader = Arc::new(SerializedFileReader::new(object_reader)?);
let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
let schema = arrow_reader.get_schema()?;

Ok(schema)
}

/// Read and parse the statistics of the Parquet file at location `path`
fn fetch_statistics(object_reader: Arc<dyn ObjectReader>) -> Result<Statistics> {
let obj_reader = ChunkObjectReader(object_reader);
let file_reader = Arc::new(SerializedFileReader::new(obj_reader)?);
fn fetch_statistics(object_reader: ObjectReaderWrapper) -> Result<Statistics> {
let file_reader = Arc::new(SerializedFileReader::new(object_reader)?);
let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
let schema = arrow_reader.get_schema()?;
let num_fields = schema.fields().len();
Expand Down Expand Up @@ -336,22 +334,21 @@ fn fetch_statistics(object_reader: Arc<dyn ObjectReader>) -> Result<Statistics>
Ok(statistics)
}

/// A wrapper around the object reader to make it implement `ChunkReader`
pub struct ChunkObjectReader(pub Arc<dyn ObjectReader>);

impl Length for ChunkObjectReader {
impl Length for ObjectReaderWrapper {
fn len(&self) -> u64 {
self.0.length()
self.0.lock().length()
}
}

impl ChunkReader for ChunkObjectReader {
type T = Box<dyn Read + Send + Sync>;
impl ChunkReader for ObjectReaderWrapper {
type T = Self;

fn get_read(&self, start: u64, length: usize) -> ParquetResult<Self::T> {
self.0
.sync_chunk_reader(start, length)
.map_err(|e| ParquetError::ArrowError(e.to_string()))
let mut r = self.0.lock();
r.seek(SeekFrom::Start(start))
.map_err(|e| ParquetError::ArrowError(e.to_string()))?;
r.set_limit(length);
Ok(self.clone())
}
}

Expand Down
59 changes: 39 additions & 20 deletions datafusion/src/datasource/object_store/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ use std::sync::Arc;

use async_trait::async_trait;
use futures::{stream, AsyncRead, StreamExt};
use parking_lot::Mutex;

use crate::datasource::object_store::{
FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore,
FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectReaderWrapper,
ObjectStore,
};
use crate::datasource::PartitionedFile;
use crate::error::{DataFusionError, Result};
Expand Down Expand Up @@ -55,18 +57,46 @@ impl ObjectStore for LocalFileSystem {
todo!()
}

fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>> {
Ok(Arc::new(LocalFileReader::new(file)?))
fn file_reader(&self, file: SizedFile) -> Result<ObjectReaderWrapper> {
Ok(ObjectReaderWrapper(Arc::new(Mutex::new(
LocalFileReader::new(file)?,
))))
}
}

struct LocalFileReader {
file: SizedFile,
r: BufReader<File>,
total_size: u64,
limit: usize,
}

impl LocalFileReader {
fn new(file: SizedFile) -> Result<Self> {
Ok(Self { file })
Ok(Self {
r: BufReader::new(File::open(file.path)?),
total_size: file.size,
limit: file.size as usize,
})
}
}

impl Read for LocalFileReader {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
// read from current position to limit
if self.limit > 0 {
let read_len = std::cmp::min(self.limit, buf.len());
let read_len = self.r.read(&mut buf[..read_len])?;
self.limit -= read_len;
Ok(read_len)
} else {
Ok(0)
}
}
}

impl Seek for LocalFileReader {
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
self.r.seek(pos)
}
}

Expand All @@ -82,23 +112,12 @@ impl ObjectReader for LocalFileReader {
)
}

fn sync_chunk_reader(
&self,
start: u64,
length: usize,
) -> Result<Box<dyn Read + Send + Sync>> {
// A new file descriptor is opened for each chunk reader.
// This okay because chunks are usually fairly large.
let mut file = File::open(&self.file.path)?;
file.seek(SeekFrom::Start(start))?;

let file = BufReader::new(file.take(length as u64));

Ok(Box::new(file))
fn set_limit(&mut self, limit: usize) {
self.limit = limit;
}

fn length(&self) -> u64 {
self.file.size
self.total_size
}
}

Expand Down Expand Up @@ -167,7 +186,7 @@ pub fn local_object_reader_stream(files: Vec<String>) -> ObjectReaderStream {
}

/// Helper method to convert a file location to a `LocalFileReader`
pub fn local_object_reader(file: String) -> Arc<dyn ObjectReader> {
pub fn local_object_reader(file: String) -> ObjectReaderWrapper {
LocalFileSystem
.file_reader(local_unpartitioned_file(file).file_meta.sized_file)
.expect("File not found")
Expand Down
Loading