-
Notifications
You must be signed in to change notification settings - Fork 468
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
persist: add some unit tests #12826
persist: add some unit tests #12826
Conversation
src/persist-client/src/read.rs
Outdated
return Err(Since(self.since.clone())); | ||
} | ||
let mut machine = self.machine.clone(); | ||
let () = machine.listen(&as_of).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this change hangs the open_loop benchmark because it creates listeners as_of zero at startup and immediately awaits them before any writes have come in. any ideas how to proceed? everything I've come up with has been awful.
materialize/src/persist-client/examples/open_loop.rs
Lines 441 to 443 in a0ec852
let listen = reader | |
.listen(Antichain::from_elem(0)) | |
.await |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this is an early sign that the previous behaviour of listen()
was more ergonomic? (See my other comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice optimization! 🙌
Regarding the change in semantics of listen()
: what's the reasoning behind that? It doesn't seem to be necessary for the optimization, and the test for the optimization could also be written with the old semantics. I'm asking because it seemed natural to me that snapshot()
would block, because it does "get me the entire data up to as_of
" while listen()
felt more like an async stream where creating the stream at any legal as_of
would not block but then updates would only trickle in once they are available.
src/persist-client/src/lib.rs
Outdated
"{:?}", | ||
client.open::<Vec<u8>, String, u64, i64>(shard_id).await | ||
), | ||
"Err(CodecMismatch { requested: (\"Vec<u8>\", \"String\", \"u64\", \"i64\"), actual: (\"String\", \"String\", \"u64\", \"i64\") })" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are you using string comparison instead of something like:
assert_eq!(
client
.open::<Vec<u8>, String, u64, i64>(shard_id)
.await
.unwrap_err(),
InvalidUsage::CodecMismatch {
requested: tpe("Vec<u8>", "String", "u64", "i64",),
actual: tpe("String", "String", "u64", "i64",),
}
);
Where tpe()
is a helper I made up. Plus I had to add #[cfg_attr(test, derive(PartialEq, Eq))]
on InvalidUsage
.
The strings seem somewhat hard to maintain, but there probably is a good reason. 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I usually lean toward matching on error message in tests because that's often how they're consumed in production. unwrap_err is a good idea, I simply forgot it existed :). I'll switch to that!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding the change in semantics of listen(): what's the reasoning behind that? It doesn't seem to be necessary for the optimization, and the test for the optimization could also be written with the old semantics. I'm asking because it seemed natural to me that snapshot() would block, because it does "get me the entire data up to as_of" while listen() felt more like an async stream where creating the stream at any legal as_of would not block but then updates would only trickle in once they are available.
I'm convinced!
src/persist-client/src/lib.rs
Outdated
"{:?}", | ||
client.open::<Vec<u8>, String, u64, i64>(shard_id).await | ||
), | ||
"Err(CodecMismatch { requested: (\"Vec<u8>\", \"String\", \"u64\", \"i64\"), actual: (\"String\", \"String\", \"u64\", \"i64\") })" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I usually lean toward matching on error message in tests because that's often how they're consumed in production. unwrap_err is a good idea, I simply forgot it existed :). I'll switch to that!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First two commits look good to me, third commit im still reading and its taking me a bit to internalize because I'm slow this morning but don't let that block merging!
src/persist-client/src/lib.rs
Outdated
"Err(CodecMismatch { requested: (\"String\", \"String\", \"i64\", \"i64\"), actual: (\"String\", \"String\", \"u64\", \"i64\") })" | ||
); | ||
// We can't test the D param mismatch currently because i64 is literally | ||
// the only type that implements both Codec64 and Semigroup right now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm this is really surprising to me because Semigroup/Monoid should be implemented for the unsigned integers and i guess it just never was a pressing need
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
opened TimelyDataflow/differential-dataflow#368 which once it merges and we bump differential should let us test the diff param mismatch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh because u64 will implement Semigroup now? nice!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, thanks for the timely fix!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ready for another look!
@@ -644,6 +643,11 @@ mod tests { | |||
let mut snapshot = read.expect_snapshot(2).await; | |||
let mut listen = read.expect_listen(0).await; | |||
|
|||
// Manually advance the listener's machine so that it has the latest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't love this! some other options:
- snapshot currently clones the machine so that the methods can be
&self
instead of&mut self
but I don't think that's super important. removing that clone would happen to make this unnecessary because the listen would inherit the state from the snapshot call - change the
listen
call to try fetching updated state once if it's not immediately serveable - dunno something else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking right now! sorry about dropping this!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think snapshot mutating the reader makes the most sense to me! intuitively, having a mechanism whereby a reader changes after snapshotting seems to make sense, and it seems like we're doing
// Hack: Keep this method `&self` instead of `&mut self` by cloning the
// cached copy of the state, updating it, and throwing it away
// afterward.
purely as a means to keep that method &self
, which i think might have the rationale that its more like the expectation for the api? I don't know of any stronger reason, and given all of that, I feel like &mut self
is a fine way forward!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I was rewriting this test locally, to fit with the previous (and now unchanged!) semantics of listen, I changed this to first do one next()
call on listen, asserted against that. Then made it unreliable, and then fetched the rest of the listen events. Also slightly awkward, but doesn't require calling internal methods or changing signatures. 🤷♂️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes a lot of sense to make snapshot &mut
and remove the machine clone (and we should consider doing that independantly), but after reverting my change to also make listen wait for as_of to be available, it seems pretty subtle for the test to rely on the fact that we call snapshot
first. went with aljoscha's suggestion
either of you want to take another look at this? if not, I'll resolve these conflicts (and fix my lint issue) and merge |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a nit and a comment. But I think this is good to merge!
src/persist-client/src/error.rs
Outdated
/// An update was not beyond the expected lower of the batch | ||
UpdateNotBeyondLower { | ||
/// An update was not at or beyond the expected lower of the batch | ||
UpdateNotAtOrBeyondLower { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: I think timely (and Frank) already understand "beyond" as "not less than", which means "at or greater", in laymans terms.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIL (and I confirmed frank shares this interpretation)! reverted
@@ -644,6 +643,11 @@ mod tests { | |||
let mut snapshot = read.expect_snapshot(2).await; | |||
let mut listen = read.expect_listen(0).await; | |||
|
|||
// Manually advance the listener's machine so that it has the latest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I was rewriting this test locally, to fit with the previous (and now unchanged!) semantics of listen, I changed this to first do one next()
call on listen, asserted against that. Then made it unreliable, and then fetched the rest of the listen events. Also slightly awkward, but doesn't require calling internal methods or changing signatures. 🤷♂️
@@ -200,29 +200,48 @@ where | |||
} | |||
} | |||
|
|||
pub async fn listen(&self, as_of: &Antichain<T>) -> Result<Self, Since<T>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is super nit, but: it feels a bit weird to have these methods on Machine
and State
, because ReadHandle
doesn't really have to call them, they're just an additional layer of verification/assertion. We could maybe put that in a comment here or maybe call this verify_listen
or sth.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did both!
This was started in 12685, but the last mine was left as a TODO to avoid making 12216 rebase. Now that 12216 is in, finish this work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TFTRs!!
src/persist-client/src/error.rs
Outdated
/// An update was not beyond the expected lower of the batch | ||
UpdateNotBeyondLower { | ||
/// An update was not at or beyond the expected lower of the batch | ||
UpdateNotAtOrBeyondLower { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIL (and I confirmed frank shares this interpretation)! reverted
src/persist-client/src/lib.rs
Outdated
"Err(CodecMismatch { requested: (\"String\", \"String\", \"i64\", \"i64\"), actual: (\"String\", \"String\", \"u64\", \"i64\") })" | ||
); | ||
// We can't test the D param mismatch currently because i64 is literally | ||
// the only type that implements both Codec64 and Semigroup right now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, thanks for the timely fix!
@@ -200,29 +200,48 @@ where | |||
} | |||
} | |||
|
|||
pub async fn listen(&self, as_of: &Antichain<T>) -> Result<Self, Since<T>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did both!
@@ -644,6 +643,11 @@ mod tests { | |||
let mut snapshot = read.expect_snapshot(2).await; | |||
let mut listen = read.expect_listen(0).await; | |||
|
|||
// Manually advance the listener's machine so that it has the latest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes a lot of sense to make snapshot &mut
and remove the machine clone (and we should consider doing that independantly), but after reverting my change to also make listen wait for as_of to be available, it seems pretty subtle for the test to rely on the fact that we call snapshot
first. went with aljoscha's suggestion
This adds a performance optimization where a Listener doesn't fetch the latest Consensus state if the one it currently has can serve the next request. A similar thing already was true of SnapshotIter, so also included is a test that covers both.
See commits for details. The first commit isn't a test, but it was small so I snuck it in.
Motivation
Testing
Release notes
This PR includes the following user-facing behavior changes: