Skip to content

Commit

Permalink
Remove ObjectStore from FileStream (#4533) (#4601)
Browse files Browse the repository at this point in the history
* Remove ObjectStore from FileStream (#4533)

* Fix avro
  • Loading branch information
tustvold authored Dec 14, 2022
1 parent a5cf577 commit 84d3ae8
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 76 deletions.
23 changes: 10 additions & 13 deletions datafusion/core/src/physical_plan/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,20 +105,20 @@ impl ExecutionPlan for AvroExec {
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
use super::file_stream::FileStream;
let object_store = context
.runtime_env()
.object_store(&self.base_config.object_store_url)?;

let config = Arc::new(private::AvroConfig {
schema: Arc::clone(&self.base_config.file_schema),
batch_size: context.session_config().batch_size(),
projection: self.base_config.projected_file_column_names(),
object_store,
});
let opener = private::AvroOpener { config };

let stream = FileStream::new(
&self.base_config,
partition,
context,
opener,
self.metrics.clone(),
)?;
let stream =
FileStream::new(&self.base_config, partition, opener, self.metrics.clone())?;
Ok(Box::pin(stream))
}

Expand Down Expand Up @@ -157,6 +157,7 @@ mod private {
pub schema: SchemaRef,
pub batch_size: usize,
pub projection: Option<Vec<String>>,
pub object_store: Arc<dyn ObjectStore>,
}

impl AvroConfig {
Expand All @@ -178,14 +179,10 @@ mod private {
}

impl FileOpener for AvroOpener {
fn open(
&self,
store: Arc<dyn ObjectStore>,
file_meta: FileMeta,
) -> Result<FileOpenFuture> {
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
let config = self.config.clone();
Ok(Box::pin(async move {
match store.get(file_meta.location()).await? {
match config.object_store.get(file_meta.location()).await? {
GetResult::File(file, _) => {
let reader = config.open(file)?;
Ok(futures::stream::iter(reader).boxed())
Expand Down
23 changes: 10 additions & 13 deletions datafusion/core/src/physical_plan/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,25 +131,25 @@ impl ExecutionPlan for CsvExec {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let object_store = context
.runtime_env()
.object_store(&self.base_config.object_store_url)?;

let config = Arc::new(CsvConfig {
batch_size: context.session_config().batch_size(),
file_schema: Arc::clone(&self.base_config.file_schema),
file_projection: self.base_config.file_column_projection_indices(),
has_header: self.has_header,
delimiter: self.delimiter,
object_store,
});

let opener = CsvOpener {
config,
file_compression_type: self.file_compression_type.to_owned(),
};
let stream = FileStream::new(
&self.base_config,
partition,
context,
opener,
self.metrics.clone(),
)?;
let stream =
FileStream::new(&self.base_config, partition, opener, self.metrics.clone())?;
Ok(Box::pin(stream) as SendableRecordBatchStream)
}

Expand Down Expand Up @@ -184,6 +184,7 @@ struct CsvConfig {
file_projection: Option<Vec<usize>>,
has_header: bool,
delimiter: u8,
object_store: Arc<dyn ObjectStore>,
}

impl CsvConfig {
Expand All @@ -208,15 +209,11 @@ struct CsvOpener {
}

impl FileOpener for CsvOpener {
fn open(
&self,
store: Arc<dyn ObjectStore>,
file_meta: FileMeta,
) -> Result<FileOpenFuture> {
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
let config = self.config.clone();
let file_compression_type = self.file_compression_type.to_owned();
Ok(Box::pin(async move {
match store.get(file_meta.location()).await? {
match config.object_store.get(file_meta.location()).await? {
GetResult::File(file, _) => {
let decoder = file_compression_type.convert_read(file)?;
Ok(futures::stream::iter(config.open(decoder, true)).boxed())
Expand Down
38 changes: 6 additions & 32 deletions datafusion/core/src/physical_plan/file_format/file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,18 @@
use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Instant;

use arrow::datatypes::SchemaRef;
use arrow::{error::Result as ArrowResult, record_batch::RecordBatch};
use datafusion_common::ScalarValue;
use futures::future::BoxFuture;
use futures::stream::BoxStream;
use futures::{ready, FutureExt, Stream, StreamExt};
use object_store::ObjectStore;

use datafusion_common::ScalarValue;

use crate::datasource::listing::PartitionedFile;
use crate::error::Result;
use crate::execution::context::TaskContext;
use crate::physical_plan::file_format::{
FileMeta, FileScanConfig, PartitionColumnProjector,
};
Expand All @@ -56,11 +52,7 @@ pub type FileOpenFuture =
pub trait FileOpener: Unpin {
/// Asynchronously open the specified file and return a stream
/// of [`RecordBatch`]
fn open(
&self,
store: Arc<dyn ObjectStore>,
file_meta: FileMeta,
) -> Result<FileOpenFuture>;
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture>;
}

/// A stream that iterates record batch by record batch, file over file.
Expand All @@ -79,8 +71,6 @@ pub struct FileStream<F: FileOpener> {
file_reader: F,
/// The partition column projector
pc_projector: PartitionColumnProjector,
/// the store from which to source the files.
object_store: Arc<dyn ObjectStore>,
/// The stream state
state: FileStreamState,
/// File stream specific metrics
Expand Down Expand Up @@ -175,7 +165,6 @@ impl<F: FileOpener> FileStream<F> {
pub fn new(
config: &FileScanConfig,
partition: usize,
context: Arc<TaskContext>,
file_reader: F,
metrics: ExecutionPlanMetricsSet,
) -> Result<Self> {
Expand All @@ -191,17 +180,12 @@ impl<F: FileOpener> FileStream<F> {

let files = config.file_groups[partition].clone();

let object_store = context
.runtime_env()
.object_store(&config.object_store_url)?;

Ok(Self {
file_iter: files.into(),
projected_schema,
remain: config.limit,
file_reader,
pc_projector,
object_store,
state: FileStreamState::Idle,
file_stream_metrics: FileStreamMetrics::new(&metrics, partition),
baseline_metrics: BaselineMetrics::new(&metrics, partition),
Expand All @@ -228,7 +212,7 @@ impl<F: FileOpener> FileStream<F> {

self.file_stream_metrics.time_opening.start();

match self.file_reader.open(self.object_store.clone(), file_meta) {
match self.file_reader.open(file_meta) {
Ok(future) => {
self.state = FileStreamState::Open {
future,
Expand Down Expand Up @@ -339,11 +323,7 @@ mod tests {
}

impl FileOpener for TestOpener {
fn open(
&self,
_store: Arc<dyn ObjectStore>,
_file_meta: FileMeta,
) -> Result<FileOpenFuture> {
fn open(&self, _file_meta: FileMeta) -> Result<FileOpenFuture> {
let iterator = self.records.clone().into_iter().map(Ok);
let stream = futures::stream::iter(iterator).boxed();
Ok(futures::future::ready(Ok(stream)).boxed())
Expand Down Expand Up @@ -375,14 +355,8 @@ mod tests {
output_ordering: None,
};

let file_stream = FileStream::new(
&config,
0,
ctx.task_ctx(),
reader,
ExecutionPlanMetricsSet::new(),
)
.unwrap();
let file_stream =
FileStream::new(&config, 0, reader, ExecutionPlanMetricsSet::new()).unwrap();

file_stream
.map(|b| b.expect("No error expected in stream"))
Expand Down
21 changes: 9 additions & 12 deletions datafusion/core/src/physical_plan/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,19 +119,18 @@ impl ExecutionPlan for NdJsonExec {
options
};

let object_store = context
.runtime_env()
.object_store(&self.base_config.object_store_url)?;
let opener = JsonOpener {
file_schema,
options,
file_compression_type: self.file_compression_type.to_owned(),
object_store,
};

let stream = FileStream::new(
&self.base_config,
partition,
context,
opener,
self.metrics.clone(),
)?;
let stream =
FileStream::new(&self.base_config, partition, opener, self.metrics.clone())?;

Ok(Box::pin(stream) as SendableRecordBatchStream)
}
Expand Down Expand Up @@ -162,16 +161,14 @@ struct JsonOpener {
options: DecoderOptions,
file_schema: SchemaRef,
file_compression_type: FileCompressionType,
object_store: Arc<dyn ObjectStore>,
}

impl FileOpener for JsonOpener {
fn open(
&self,
store: Arc<dyn ObjectStore>,
file_meta: FileMeta,
) -> Result<FileOpenFuture> {
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
let options = self.options.clone();
let schema = self.file_schema.clone();
let store = self.object_store.clone();
let file_compression_type = self.file_compression_type.to_owned();
Ok(Box::pin(async move {
match store.get(file_meta.location()).await? {
Expand Down
7 changes: 1 addition & 6 deletions datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,6 @@ impl ExecutionPlan for ParquetExec {
let stream = FileStream::new(
&self.base_config,
partition_index,
ctx,
opener,
self.metrics.clone(),
)?;
Expand Down Expand Up @@ -406,11 +405,7 @@ struct ParquetOpener {
}

impl FileOpener for ParquetOpener {
fn open(
&self,
_: Arc<dyn ObjectStore>,
file_meta: FileMeta,
) -> Result<FileOpenFuture> {
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
let file_range = file_meta.range.clone();

let file_metrics = ParquetFileMetrics::new(
Expand Down

0 comments on commit 84d3ae8

Please sign in to comment.