-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Flight SQL Federation example #3
Conversation
At glance, I think the flightsql executor code looks great actually! I have also found it unnatural to keep the execute method sync, and I've seen discussion about this decision somewhere before (possibly on Datafusion discord). I'll try to dig that up or if not start a discussion at the Datafusion level to document the architectural decisions around what is sync or asynchronous and why more clearly. I plan to try this out in more detail later today. |
I did find some old informal discussion of the async vs. sync in I think the relevant bit for this PR is: /// ## Lazily (async) create a Stream
///
/// If you need to to create the return `Stream` using an `async` function,
/// you can do so by flattening the result:
///
/// ```
/// # use std::sync::Arc;
/// # use arrow_array::RecordBatch;
/// # use arrow_schema::SchemaRef;
/// # use futures::TryStreamExt;
/// # use datafusion_common::Result;
/// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
/// # use datafusion_physical_plan::memory::MemoryStream;
/// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
/// struct MyPlan {
/// schema: SchemaRef,
/// }
///
/// /// async function that returns a stream
/// async fn get_batch_stream() -> Result<SendableRecordBatchStream> {
/// todo!()
/// }
///
/// impl MyPlan {
/// fn execute(
/// &self,
/// partition: usize,
/// context: Arc<TaskContext>
/// ) -> Result<SendableRecordBatchStream> {
/// // A future that yields a stream
/// let fut = get_batch_stream();
/// // Use TryStreamExt::try_flatten to flatten the stream of streams
/// let stream = futures::stream::once(fut).try_flatten();
/// Ok(Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(), stream)))
/// }
/// }
/// This is going to apply to most execution plans in federation context since creating the stream will require an async call just to set up the stream in the first place. The recommendation to simplify the code vs. a relatively complex |
I took a stab at following the advice to use Is there any way to synchronously determine the expected schema coming back from the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This ran for me locally without issue. The built in FlightSQL server is great and will be helpful for setting up tests in the future.
I think calling block_on
for initial connection set up and to obtain the Schema
of the RecordBatchStream
should be OK. We may be able to simplify the code a little bit if there were a way to synchronously compute the Schema
we expect to come back from the stream, but that can be a future concern.
For posterity, here is the relevant ASF slack discussion on the sync vs async issue https://the-asf.slack.com/archives/C04RJ0C85UZ/p1698514242205729 and relevant datafusion issue: apache/datafusion#8010 |
…fusion-contrib#3) * Fix the table scan rewrite to properly rewrite column relations * Handle more expressions for table scan rewrites
I added a Flight SQL example.
cc @devinjdangelo since you showed interest. I do feel
FlightSQLExecutor.execute
is somewhat complicated, making me wonder if keeping it async is warranted.The
FlightSqlService
is rather basic now. It may be possible to add extension points for things like authentication so it becomes usable as a library.