Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ParquetObjectReader::with_runtime #6612

Merged
merged 7 commits into from
Nov 2, 2024

Conversation

itsjunetime
Copy link
Contributor

Which issue does this PR close?

Closes #6248

What changes are included in this PR?

This PR works on top of #6249 to add a test for the new with_runtime fn, as well as changing the signature of the spawn function slightly to avoid an extra re-boxing when a runtime is set.

This also fixes a few things that clippy was complaining about.

Rationale for this change

See #6248 for the API addition.

With regard to the test, I felt like this is really the only test we'd want for this feature - we just want to make sure that the runtime is actually being used by ParquetObjectReader. We can't make any guarantees about how it actually performs or would work if there's another runtime being used for CPU-bound operations, so all we really want to test is if it is used.

Are there any user-facing changes?

No

@github-actions github-actions bot added parquet Changes to the parquet crate arrow Changes to the arrow crate labels Oct 21, 2024
#[tokio::test]
// We need to mark this with the `target_has_atomic` because the spawned_tasks_count() fn is
// only available for that cfg
#[cfg(all(target_has_atomic = "64", tokio_unstable))]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could instead create a runtime with IO / blocking threads disabled and use that to determine that the IO was spawned to a different runtime?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that would work. I'm not certain why, but ParquetObjectReader seems to work fine regardless of whether or not IO is 'enabled' on its runtime. I was able to change the tests so they don't rely on tokio_unstable anymore and (I think) still show what we want them to show, so I'll push that in a minute.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @itsjunetime -- this is looking very good. I think we need to also move get_metadata to spawn

Otherwise I have a few other suggestions, but this one is looking very close

parquet/src/arrow/async_reader/store.rs Outdated Show resolved Hide resolved
parquet/src/arrow/async_reader/store.rs Show resolved Hide resolved
parquet/src/arrow/async_reader/store.rs Show resolved Hide resolved
parquet/src/arrow/async_reader/store.rs Show resolved Hide resolved
parquet/src/arrow/async_reader/store.rs Show resolved Hide resolved

assert_ne!(current_id, other_id);

tokio::runtime::Handle::current().spawn_blocking(move || drop(rt));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add unit tests for each of the three APIs in ParquetObjectReader that spawn is used?

  • get_bytes
  • get_byte_ranges
  • get_metadata?

itsjunetime and others added 2 commits October 29, 2024 10:57
- Remove outdated comment about target_has_atomic
- Add test to verify reader fails when spawned on a shutdown runtime
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me -- thank you @itsjunetime

@@ -107,6 +107,13 @@ impl From<str::Utf8Error> for ParquetError {
}
}

#[cfg(test)]
impl From<std::convert::Infallible> for ParquetError {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice improvement too. Thank you. Maybe it is worth adding publically as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this, the whole point of infallible is that it can't be constructed and so doesn't need to be handled

Copy link
Contributor

@alamb alamb Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it can't be constructed, but it often does need to be "handled" (aka to transform a Result<.., Infallible> to Result<.., Error> type expected by an API)

I don't feel strongly about this particular code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aka to transform a Result<.., Infallible> to Result<.., Error> type expected by an API)

Right but this is a little funky, because it then makes code look more fallible than it is. Often you can use an infallible version of the API, i.e. into() instead of try_into(), but sometimes you do have to either unwrap() or let _ = ...

FWIW Rust 1.82 gives us a very nice way to handle this, but I'm not sure whether our MSRV policy covers tests.

let Ok(value) = expression();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed in 8d24cd7

let current_id = std::thread::current().id();

let other_id = reader
.spawn(|_, _| async move { Ok::<_, Infallible>(std::thread::current().id()) }.boxed())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.spawn(|_, _| async move { Ok::<_, Infallible>(std::thread::current().id()) }.boxed())
.spawn(|_, _| async move { Ok::<_, ParquetError>(std::thread::current().id()) }.boxed())

Would remove the need for the std::convert::Infallible conversion

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had this repo checked out and in the editor, so I just made this change to accelerate getting this PR in in 8d24cd7

It results in a nice simplification

@alamb alamb merged commit 22bc772 into apache:master Nov 2, 2024
26 checks passed
@alamb
Copy link
Contributor

alamb commented Nov 2, 2024

Thanks again @itsjunetime and @tustvold

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
arrow Changes to the arrow crate parquet Changes to the parquet crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add ParquetObjectReader::with_runtime
3 participants