This repository has been archived by the owner on Oct 2, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 16
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Update DataFusion version to 28.0.0 (#41)
* Update DataFusion version * update example
- Loading branch information
Showing
12 changed files
with
926 additions
and
772 deletions.
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,78 @@ | ||
mod reader; | ||
mod writer; | ||
|
||
use arrow::record_batch::RecordBatch; | ||
use datafusion::arrow; | ||
use datafusion::arrow::datatypes::SchemaRef; | ||
use datafusion::common::Result; | ||
use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream}; | ||
use futures::Stream; | ||
pub use reader::RayShuffleReaderExec; | ||
use std::pin::Pin; | ||
use std::task::{Context, Poll}; | ||
use tokio::macros::support::thread_rng_n; | ||
pub use writer::RayShuffleWriterExec; | ||
|
||
/// CombinedRecordBatchStream can be used to combine a Vec of SendableRecordBatchStreams into one | ||
pub struct CombinedRecordBatchStream { | ||
/// Schema wrapped by Arc | ||
schema: SchemaRef, | ||
/// Stream entries | ||
entries: Vec<SendableRecordBatchStream>, | ||
} | ||
|
||
impl CombinedRecordBatchStream { | ||
/// Create an CombinedRecordBatchStream | ||
pub fn new(schema: SchemaRef, entries: Vec<SendableRecordBatchStream>) -> Self { | ||
Self { schema, entries } | ||
} | ||
} | ||
|
||
impl RecordBatchStream for CombinedRecordBatchStream { | ||
fn schema(&self) -> SchemaRef { | ||
self.schema.clone() | ||
} | ||
} | ||
|
||
impl Stream for CombinedRecordBatchStream { | ||
type Item = Result<RecordBatch>; | ||
|
||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
use Poll::*; | ||
|
||
let start = thread_rng_n(self.entries.len() as u32) as usize; | ||
let mut idx = start; | ||
|
||
for _ in 0..self.entries.len() { | ||
let stream = self.entries.get_mut(idx).unwrap(); | ||
|
||
match Pin::new(stream).poll_next(cx) { | ||
Ready(Some(val)) => return Ready(Some(val)), | ||
Ready(None) => { | ||
// Remove the entry | ||
self.entries.swap_remove(idx); | ||
|
||
// Check if this was the last entry, if so the cursor needs | ||
// to wrap | ||
if idx == self.entries.len() { | ||
idx = 0; | ||
} else if idx < start && start <= self.entries.len() { | ||
// The stream being swapped into the current index has | ||
// already been polled, so skip it. | ||
idx = idx.wrapping_add(1) % self.entries.len(); | ||
} | ||
} | ||
Pending => { | ||
idx = idx.wrapping_add(1) % self.entries.len(); | ||
} | ||
} | ||
} | ||
|
||
// If the map is empty, then the stream is complete. | ||
if self.entries.is_empty() { | ||
Ready(None) | ||
} else { | ||
Pending | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.