Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming CLI support #8651

Merged
merged 8 commits into from
Dec 28, 2023
Merged

Streaming CLI support #8651

merged 8 commits into from
Dec 28, 2023

Conversation

berkaysynnada
Copy link
Contributor

@berkaysynnada berkaysynnada commented Dec 25, 2023

Which issue does this PR close?

Part of #4285

Rationale for this change

Unbounded tables can now be displayed in the CLI using any format except PrintFormat::Table. Row limit is ignored while printing unbounded tables.

What changes are included in this PR?

When the PhysicalPlan of a query is unbounded, the stream is printed until it is exhausted. I have attempted to avoid creating a writer for each iteration but was unsuccessful.

For unspecified format options, the batch format defaults to Table as before, and the stream format is set to CSV. If the Table format is explicitly given, unbounded queries will produce an error.

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added the core Core DataFusion crate label Dec 25, 2023
@ozankabak
Copy link
Contributor

@alamb can you please take a look? This intends to give streaming display support to CLI when processing unbounded sources

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this PR @berkaysynnada. I think it adds features that don't currently exist on main and thus is a step forward.

I do think there is a bit of replicated code that could be avoided, but I don't think that is required. I left some suggestions on how to improve the code for your consideration, though I think we make them as follow on PRs as well.

Unbounded tables can now be displayed in the CLI using any format except PrintFormat::Table. Row limit is ignored while printing unbounded tables.

In general this is a great idea.

@@ -159,14 +191,41 @@ impl PrintFormat {
}
Ok(())
}

pub fn print_stream(&self, batch: &RecordBatch, with_header: bool) -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is almost a copy/paste of print_batch -- as you say in your comments, the only difference is i Self::Table. I think it would be possible to reduce replication by changing all this code to be in terms of the output stream rather than creating strings

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifically, I think you could write the functions in terms of std::io::Write and then pass in https://doc.rust-lang.org/std/io/fn.stdout.html to avoid having to both copy the code and the code could avoid copying strings

Self::Tsv => {
println!("{}", print_stream_with_sep(batch, b'\t', with_header)?)
}
Self::Table => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thought here is that when printing streams, it could just print out a new Table for each batch (rather than buffering all batches).

query_start_time: Instant,
) -> Result<()> {
let mut row_count = 0_usize;
let mut with_header = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there is any way to test this behavior (specifically that the header is only printed out for the first batch) so that it isn't broken in subsequent refactorings

@berkaysynnada
Copy link
Contributor Author

Thank you for this PR @berkaysynnada. I think it adds features that don't currently exist on main and thus is a step forward.

I do think there is a bit of replicated code that could be avoided, but I don't think that is required. I left some suggestions on how to improve the code for your consideration, though I think we make them as follow on PRs as well.

Unbounded tables can now be displayed in the CLI using any format except PrintFormat::Table. Row limit is ignored while printing unbounded tables.

In general this is a great idea.

Thank you for your feedback. I will try to implement your suggestions, and also add a test by looking at existing examples.

@berkaysynnada
Copy link
Contributor Author

berkaysynnada commented Dec 27, 2023

@alamb Can you review my changes removing the duplication when you have time? I have tried the possible scenarios and I think the display looks good. Thanks

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

THanks @berkaysynnada -- I don't really understand some of the test changes. My suggestion is to revert them if possible, and then we can merge this PR and iterate on the implementation in future PRs

}
}

Ok(())
}

impl PrintFormat {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't have to be for this PR, but I think this code could be simplified by putting all the state management into some object

Perhaps something like this (would have to also have a field for the table being built up)

pub struct OutputState {
  format: PrintFormat,
  with_header: bool, 
  maxrows: MaxRows,
  rows_printed: usize,
  seen_header: bool
}

And then it would be possible to avoid having to collect all the batches prior to printing, with a main loop like

// configure output state ...
let state = OutputState::new()
  .with_header(...);

while let Some(batch) = plan.next().await? {
  // formats other than table this would print out the batch
  state.print_batch(batch)
}

// for table format, flush any buffered tables
state.finsh()

Copy link
Contributor

@ozankabak ozankabak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just went over this and made sure to keep the tests unchanged (w.r.t. expected results). I think it is much better now. @alamb, PTAL and let's merge this if it looks good to you. And then let's do the encapsulation you suggested in a follow-on 🚀

@berkaysynnada
Copy link
Contributor Author

In the current version, format_batches_with_maxrows() needs to add newlines after the batch in order to display timing info or following batch at the new line. My last commit fixes that. Consequently format_batches_with_maxrows() tests now includes a newline at the end of batches.

@@ -244,7 +290,7 @@ mod tests {
"| 1 |",
"| 2 |",
"| 3 |",
"+---+",
"+---+\n",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, this is not a real change in the behavior being tested. Before this PR, we used to append the final newline "outside" at the final print out stage. With this PR, since we operate on writers instead of building strings, the final newline is here.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM -- let's get this in and we can iterate

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants