-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement StreamTable and StreamTableProvider (#7994) #8021
Conversation
let execution_start = Instant::now(); | ||
write_to_fifo(&file, &header, execution_start, broken_pipe_timeout).unwrap(); | ||
for (cnt, line) in enumerate(lines) { | ||
while waiting_lock.load(Ordering::SeqCst) && cnt > wait_until { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had some difficulty following the various locking, etc... I think the new implementation is broadly equivalent, but I may be missing some nuance
datafusion/core/tests/fifo.rs
Outdated
) -> Result<u64> { | ||
let config = self.0.clone(); | ||
let (sender, mut receiver) = tokio::sync::mpsc::channel::<RecordBatch>(2); | ||
// Note: FIFO Files support poll so this could use AsyncFd |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an example of the potential benefits of a custom TableProvider, it can use external knowledge about the nature of the files in question.
@@ -394,8 +491,6 @@ mod unix_test { | |||
a2 INT NOT NULL | |||
) | |||
STORED AS CSV | |||
WITH HEADER ROW |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no particular reason that CSV headers could not be supported, but it was simpler to just not read/write them for the purposes of this test
Thx @tustvold, I believe that the proof of concept is looking good. My opinion: Instead of creating a new TableProvider for FIFO, we could use a |
I believe we can use your implementation of DataSink for FIFO, nicely coded 👍🏻 |
So this is what I started on implementing, but I wasn't really sure how this interface would materially differ from the existing interfaces used by this PR. In particular I'm struggling to devise an interface that would substantially reduce the amount of boilerplate, whilst still preserving the necessary flexibility. Perhaps I am missing something? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reviewed this PR and I think it looks really nice -- thank you @tustvold and @metesynnada 🏆
I also filed #8035 with some additonal information about PartitionStream
To help with this subject, let's break it down into smaller parts. We can begin by focusing on the FIFO table and finding ways to optimize the SQL involved. Additionally, for external tables that are unbounded, we can create FIFO tables with the proper formatting, starting with CSV and AVRO. When it comes to session state, we keep /// This is used to create [`TableProvider`] instances for the
/// `CREATE EXTERNAL TABLE ... STORED AS <FORMAT>` for custom file
/// formats other than those built into DataFusion
table_factories: HashMap<String, Arc<dyn TableProviderFactory>>, which is not scalable. Currently, Datafusion SQL is closely intertwined with the ListingTable approach, which resulted in FIFO support within the ListingTable. To avoid this, we need to be able to use multiple TL:DR, this part should change to scale the support. async fn create_custom_table(
&self,
cmd: &CreateExternalTable,
) -> Result<Arc<dyn TableProvider>> {
let state = self.state.read().clone();
let file_type = cmd.file_type.to_uppercase();
let factory =
&state
.table_factories
.get(file_type.as_str())
.ok_or_else(|| {
DataFusionError::Execution(format!(
"Unable to find factory for {}",
cmd.file_type
))
})?;
let table = (*factory).create(&state, cmd).await?;
Ok(table)
} |
I don't see why you couldn't have a TableProviderFactory that itself delegates out to other TableProviderFactory based on some criteria? Perhaps I am misunderstanding what you mean by scalable?
Perhaps it is worth discussing where such functionality should live? In this repo, in downstream codebases, somewhere inbetween e.g. datafusion-contrib? Unless I'm mistaken currently support for this mostly only exists in downstream repos, with no support in DF for creating or managing fifo files? |
I've marked this ready for review as I think the feedback is this is a sound approach, but feel free to correct me if I am wrong |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do not merge this yet -- AFAICT this breaks use cases including creating external tables reading from FIFO files that used to have built-in support.
We like the idea of having a FIFOTable
, separating a single sink into FileSink
and a StreamSink
etc. However, we don't seem to agree on the design yet. We will soon share our design proposal with diagrams and API level specs.
TBC this PR simply updates the fifo tests and makes no changes to ListingTable, which would instead be a follow on PR. The idea of this PR is to show that removing the functionality from ListingTable, whilst it would still represent a breaking change, would not require major architectural changes to existing workloads, nor would it require introducing new abstractions to DF. If you feel this hasn't been demonstrated please let me know how I can articulate this better. |
We have discussed this today in detail. We will come back with a design/proposal early next week. Let's iterate together to find the best design. We will also help with the implementation when we converge. |
Given I think this PR has demonstrated the existing APIs can accommodate streaming use-cases, perhaps we might move forward with this and potentially refine things later? I'm keen to find a way to keep things moving so we don't hold up #8029 and by extension the next DF release which is waiting on an arrow bug fix apache/arrow-rs#5050 |
I don't think we should merge this without getting consensus / ensuring the synnada team has reasonable way forward, especially there are other options (e.g. an arrow patch release) that can unblock the datafusion release In particular, I think the tradeoffs between different implementation options and order come down to who is going to do the work, and I think there are ways it doesn't all fall into @tustvold . I will try and set up a call / some forum to discuss this and will post notes on the outcome |
We anticipate to have @metesynnada's design proposal, which aims to embody the discussions in Issue #7994, in a weeks time. The proposed design will provide an API for streaming sources that will decouple them from We aim to resolve the situation in approximately two weeks, with the following end state: (1) We will have merged the converged-on design decoupling ListingTable from streaming sources, (2) We will proceed with the upgrade to arrow-rs 49.0.0. In the meantime, we may make a serde-fix-backported 48.1.0 release of arrow-rs to get Datafusion benefit from this fix. |
I think this makes a lot of sense -- thank you @ozankabak . I will prepare a 48.0.1 patch release candidate -- see apache/arrow-rs#5050 (comment) |
Marking as draft so we don't accidentally merge this as we work on the updated design |
@ozankabak -- @tustvold said he may have some time to help (and would be interested in assisting the design and implementation to help accelerate this process). Is it possible to describe the use case you have in more detail? Given that this particular PR replicates the functionality that already exists in DataFusion as far as we can tell (using the extension APIs), we think we must not fully understand the use case for which you are designing a solution. |
I will talk to @metesynnada tomorrow and check with him to see where he is at in terms of the proposal. If he is close to getting something he can share soon, it'd be better to just talk about the proposal and collaborate on it directly. If he is not close to having something publishable, we will explain the use cases we are trying to support beforehand. I will circle back tomorrow.
thank you, we appreciate it |
}, | ||
}); | ||
} | ||
// Cannot find expression in the projected_schema, stop iterating |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is necessary as the sort key may reference columns not found in the projected schema.
This change matches get_projected_output_ordering
fn writer(&self) -> Result<Box<dyn RecordBatchWriter>> { | ||
match &self.encoding { | ||
StreamEncoding::Csv => { | ||
let header = self.header && !self.location.exists(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't entirely sure about this, CsvSink currently has the following logic which I've attempted to replicate here
let serializer: Box<dyn BatchSerializer> = Box::new(if file_size > 0 {
CsvSerializer::new()
.with_builder(inner_clone)
.with_header(false)
} else {
CsvSerializer::new()
.with_builder(inner_clone)
.with_header(options_clone.writer_options.header())
});
@@ -177,7 +190,7 @@ impl TableProvider for StreamTable { | |||
} | |||
|
|||
fn table_type(&self) -> TableType { | |||
TableType::Temporary | |||
TableType::Base |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is necessary so that Drop behaves correctly
unbounded_file_with_symmetric_join has failed on OS X, I have no looked into if this is an intermittent failure, but I note that this test has been ignored on main since d6c2233#diff-2f6b05fb3bc4e54422c680dfa47dd8a993cc93fcaa056f678e455cc5d83ae6d0R237 I'm not sure what the history here is, is this a known flaky test that I should keep ignored? Edit: looks to just be flaky |
Thanks @tustvold, we will look into this early morning tomorrow and report our findings |
The previous locks ensure that the FIFO is read and written in a streaming manner, simulating infinite data streams. This guarantees that each batch produced after a specific amount of data is written. I plan to submit a corrective commit soon to address stucking caused by asynchronous processing and configuration issues in the current implementation. |
Thank you, I've also just pushed a commit that configures the readers to respect the configured batch size which may help |
Yes, it is necessary as well. However, I need to figure out some points on async runtime. Then, we can move on I think. |
If you're able to articulate the issue I may be able to help out here. |
CI failures are due to logical merge conflicts - #8187 |
let config = self.0.clone(); | ||
let schema = self.0.schema.clone(); | ||
let batch_size = ctx.session_config().batch_size(); | ||
let mut builder = RecordBatchReceiverStreamBuilder::new(schema, 2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am attempting to eliminate the tokio channel dependency, which is not necessary in theory. However, the dyn RecordBatchReader<Item = Result<arrow_array::RecordBatch, arrow_schema::ArrowError>>
is not implementing Send and Sync. Therefore, creating an unfold directly from them is not possible. Do you have any suggestions on how to achieve this without creating a dedicated stream struct similar to what we are doing in ExecutionPlan.execute()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tokio channel is necessary because we're performing blocking file IO no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For synchronous IO, using spawn_blocking
is the rule of thumb, you are right. We can keep this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it is actually a bug that we don't appear to currently be doing this. I've filed #8188 which I'll follow up with once this is merged in
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can merge this when the tests are passed.
Thank you for your help getting this over the line. @ozankabak any final comments or are you happy for me to proceed? |
@tustvold, thank you for helping out with the stopgap. @metesynnada will use this as a foundation as he implements the proposal to completion. Feel free to go ahead and merge this after CI passes 🚀 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you everyone -- I know there were some communication bumps getting here, but this looks like a really nice design in my mind and sets us up for good things to come.
Thanks again
|
||
/// The data encoding for [`StreamTable`] | ||
#[derive(Debug, Clone)] | ||
pub enum StreamEncoding { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@@ -750,7 +750,7 @@ query TT | |||
explain select c1 from t; | |||
---- | |||
logical_plan TableScan: t projection=[c1] | |||
physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/empty.csv]]}, projection=[c1], infinite_source=true, has_header=true | |||
physical_plan StreamingTableExec: partition_sizes=1, projection=[c1], infinite_source=true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
|
||
/// The configuration for a [`StreamTable`] | ||
#[derive(Debug)] | ||
pub struct StreamConfig { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we note here that this is likely to undergo API changes to support additional usecases (like streaming to/from Kafka)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this would necessarily change materially, it parallels ListingTableConfig which is used by ListingTable, but I suppose we shall see
} | ||
} | ||
|
||
/// A [`TableProvider`] for a stream source, such as a FIFO file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we maybe also add some comments here about how this is similar / different than ListingTable
to help new people navigate the code and understand when to use one vs the other?
We could totally do this as a follow on PR as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add some comments in a follow on PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
follow up: #8192
Which issue does this PR close?
Closes #7994
Rationale for this change
See ticket
What changes are included in this PR?
Implements StreamTable and StreamTableProvider as in the design proposal. Also defines a DefaultTableFactory that wires this up as the default for unbounded tables.
Are these changes tested?
Are there any user-facing changes?