-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
roachtest: add mixed version testing for CDC. #85277
roachtest: add mixed version testing for CDC. #85277
Conversation
This test fails consistently on |
This might be catching a change in decimal serialization somewhere? Or possibly even a failure of roundtrippability within a single version in our JSON encoding of decimals. I don't think we have a lot of testing that exercises that, even in the nemesis. Would it be easy to A: see if the failures still happen if you have the fingerprint validator ignore the decimal columns, and/or B: see if the failures still happen if both versions being tested are the same commit on master? |
3546df6
to
b7e4ca0
Compare
const ( | ||
// how many resolved timestamps to wait for before considering the | ||
// system to be working as intended. | ||
requestedResolved = 20 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the significance of 20
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Chosen empirically based on how long it takes to reach 20 resolved timestamps. With 20
, the test typically spends non-negligible time in all stages of the test. It's a little arbitrary, but that's how other CDC tests operate and I was trying to use something that is known to work here. Open to changing if you have suggestions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems very reasonable. I wonder if we should also set the minimum duration between subsequent "resolved timestamps" events, e.g., resolved='30s'
. The rationale is to prevent the case where each resolved timestamp corresponds to exactly one change; i.e., ideally we want to test more than 20 changes in total.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure yet about the use of this; using resolved option by itself, sometimes hides issues -- (it forces to emit resolved message and kinda acts as a force flush function)... (Just typing this in since I do top-bottom review).
} | ||
|
||
tpcc.install(ctx, u.c) | ||
time.Sleep(2 * time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems flaky; what if install takes 5 seconds? will the run
step time out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's not the purpose of the Sleep
call; install
is synchronous already. This was taken from:
cockroach/pkg/cmd/roachtest/tests/cdc.go
Lines 171 to 173 in 47d34c9
// TODO(dan,ajwerner): sleeping momentarily before running the workload | |
// mitigates errors like "error in newOrder: missing stock row" from tpcc. | |
time.Sleep(2 * time.Second) |
I should package this into a function to be reused (maybe install
itself). But before I need to validate that it indeed fails without the Sleep
call. I'll do that once I have a version of this roachtest running reliably.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, that TODO and error message look quite dated (circa 2018). The install
imports the fixtures (i.e., pre-computed tpcc tables). I can't think of a reason as to why any sleep
would be needed since by the time install
returns, all the tables and their rows would have been propagated down to storage via AddSSTable
. It would be great to verify and remove it.
Maybe @ajwerner could add some historical context?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think sleep should be dropped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dropped -- it's indeed not necessary.
} | ||
|
||
// setupVerifier creates a CDC validator to validate that a changefeed | ||
// created on the `tpcc.warehouse` table is able to re-create the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why warehouse
instead of say newOrder
? After tpcc.install
, the warehouse table is never updated whereas newOrder
is updated with the highest probability. The default weights for each updated table are here: https://github.com/cockroachdb/cockroach/blob/master/pkg/workload/tpcc/tpcc.go#L193
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interestingly, I tried using new_order
as it's the only table that doesn't have a decimal column, following @HonoreDB's suggestion. For some reason, however, the test timed out when I did that: it wouldn't reach 20 resolved timestamps after the allotted 30mins. I didn't spend too much time debugging that, but I'll investigate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting; definitely worth debugging this further. This might be indicative of an actual issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 -- definitely interesting; new_order is also substantially larger and more interesting than warehouse
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to use new_order
. After a few tweaks, it works well now.
|
||
// validatorVerify checks if the validator has found any issues at the | ||
// time the function is called. | ||
func (cmvt *cdcMixedVersionTester) validatorVerify() versionStep { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't read like a proper verb or phrase... maybe rename to validate
or assertValid
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed to assertValid
.
@HonoreDB thanks for the suggestions! I'm still trying to figure some things out, but here are some things I've observed so far:
Interesting idea about the decimal columns; unfortunately, it doesn't seem to be the source of the fingerprint errors. I tried two things: using the AFAIK, it's not possible to ignore columns when computing the fingerprint.
This also doesn't seem to have an impact on the rate of fingerprint failures I'm seeing. What does seem to have a noticeable impact, however, is if I don't stop and re-start binaries in the test, but instead just let the workload and changefeed run undisturbed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly nits; I won't have time to do another pass before my vaca...
// the CDC target (table). We're running the tpcc workload in this | ||
// test; verifying that the changefeed on the `warehouse` table | ||
// works is sufficient for our purposes | ||
target = "tpcc.warehouse" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably should be a constant or a config struct to be passed into these methods...
const ( | ||
// how many resolved timestamps to wait for before considering the | ||
// system to be working as intended. | ||
requestedResolved = 20 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure yet about the use of this; using resolved option by itself, sometimes hides issues -- (it forces to emit resolved message and kinda acts as a force flush function)... (Just typing this in since I do top-bottom review).
ctx context.Context | ||
crdbNodes option.NodeListOption | ||
workloadNodes option.NodeListOption | ||
kafkaNodes option.NodeListOption |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to restrict this to only work with kafka?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm open to supporting other sinks. Do we have tests that pull changes from other sinks at the moment? I can't seem to find anything, but maybe I missed something.
} | ||
|
||
tpcc.install(ctx, u.c) | ||
time.Sleep(2 * time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think sleep should be dropped.
} | ||
|
||
// setupVerifier creates a CDC validator to validate that a changefeed | ||
// created on the `tpcc.warehouse` table is able to re-create the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 -- definitely interesting; new_order is also substantially larger and more interesting than warehouse
partitionStr := strconv.Itoa(int(m.Partition)) | ||
// errors in the calls to the validator below could lead to | ||
// transient failures if the cluster is upgrading, so we retry | ||
// for 1 minute. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how often do these errors happen? if upgrade is happening, does it mean that we'll be spending up to 1 minute per row?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not per row, because we're not processing rows concurrently. These transient errors can last more than a minute though; I had to increase the timeout here to 2 minutes.
newVersionUpgradeTest(c, | ||
uploadAndStartFromCheckpointFixture(tester.crdbNodes, predecessorVersion), | ||
tester.setupVerifier(1), | ||
tester.installAndStartTPCCWorkload("15m"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: have this method take time.Duration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
// NB: at this point, cluster and binary version equal predecessorVersion, | ||
// and auto-upgrades are on. | ||
preventAutoUpgradeStep(1), | ||
tester.createChangeFeed(1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the significance of argument? perhaps put a /* comment */ after?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
tester.createChangeFeed(1), | ||
|
||
// let the workload run in the old version for a while | ||
sleep, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sleep has negative connotations in tests (at least, I think it's bad). Can we rename this? executueWorkload
, runWorkloadForAWhile
-- really anything but sleep
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed.
|
||
// allow cluster version to update | ||
allowAutoUpgradeStep(1), | ||
waitForUpgradeStep(tester.crdbNodes), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should workload run for a bit after this step?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I already included steps in the test to allow for the workload to run. After this point, the verifier has already received 20 resolved timestamps. What value would letting the workload run longer bring, if the validator has already terminated?
return nil | ||
} | ||
} else { | ||
if err := retry.ForDuration(1*time.Minute, func() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I haven't proved it locally yet but I think this right here is the source of the flakes. The FingerprintValidator's NoteResolved method is neither atomic nor idempotent, so if it hits an error when trying to write to a restarting database, it will end up in an inconsistent state. It has an in-memory buffer of consumed events to be applied (populated by NoteRow), and iterates over it by doing
row := v.buffer[0]
v.buffer = v.buffer[1:]
if err := v.applyRowUpdate(row); err != nil {
return err
}
So if the attempt to write a row to the scratch table in applyRowUpdate fails, we'll never actually retry the write with that row.
A simple solution might be to move the retry logic down into the actual query execution? Or we could make the validator methods more ACID.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just changing FingerprintValidator's NoteResolved to only modify the validator state when it's about to return a nil error might do it, but I haven't audited all the validator methods, there might be one or two similar bugs there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's indeed a very possible reason for this, let me run some tests. 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does seem to fix the issue of the fingerprint validator failing in this test, good catch @HonoreDB!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First commit in this PR shifts the mutation on the validator to after the applyRowUpdate
call (which, afaict, is the only call in the validator that writes to a table). I haven't seen fingerprint mismatch errors after the change.
b7e4ca0
to
b17d38d
Compare
This is ready for another look. In terms of flakiness, I feel this is at a stage where it's good to be merged. I do see flakiness, but at much lower rate: my estimate is ~1%. The issue I see is a timeout: we never see 20 resolved timestamps in the 30 minutes we give this test. I'll investigate some more, but I think there's value in running this daily even with this flake (let me know if you think otherwise). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM pending addressing other's comments.
Reviewed 1 of 6 files at r1, 2 of 3 files at r2.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @herkolategan, @HonoreDB, @miretskiy, @renatolabs, @smg260, and @srosenberg)
pkg/cmd/roachtest/tests/mixed_version_cdc.go
line 34 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
Not sure yet about the use of this; using resolved option by itself, sometimes hides issues -- (it forces to emit resolved message and kinda acts as a force flush function)... (Just typing this in since I do top-bottom review).
I think we need the resolved option in order to make the fingerprint validator work, and since it only really validates when it sees a resolved timestamp, measuring progress in terms of a minimum number of resolved messages does mostly make sense to me. Wish there were a way to not flake though. Would it be possible to say something like "we need to spend at least 2 minutes in each step of the test and get at least 5 resolved messages"?
pkg/cmd/roachtest/tests/mixed_version_cdc.go
line 65 at r1 (raw file):
Previously, renatolabs (Renato Costa) wrote…
I'm open to supporting other sinks. Do we have tests that pull changes from other sinks at the moment? I can't seem to find anything, but maybe I missed something.
I don't think we have roachtests that read anything other than Kafka. Some of the other sinks are getting actually integrated with in unit tests (implementations in https://github.com/cockroachdb/cockroach/blob/master/pkg/ccl/changefeedccl/testfeed_test.go) -- nodelocal, experimental-sql, and sinkless changefeeds don't require any mocking so we run them more or less for real and pull from them. Supporting them in roachtests is definitely outside the scope of this PR but we should consider working on that at some point.
b17d38d
to
d875e48
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @herkolategan, @HonoreDB, @miretskiy, @renatolabs, @smg260, and @srosenberg)
a discussion (no related file):
I changed the approach in this PR a little bit; instead of the previous sleep calls to let the workload run in different states and binary/cluster version combinations, we now wait for a specific number of resolved timestamp messages in each state before moving on (currently set to 5).
In the process of making this change, I realized that using the tpcc
workload is not ideal; I was not getting updates to the new_order
table fast enough for the test to finish under 30 minutes. I switched to using the bank
workload, which has only one table and all updates are applied to that table. With this change, the test is able to make progress a lot faster.
pkg/cmd/roachtest/tests/mixed_version_cdc.go
line 34 at r1 (raw file):
I think we need the resolved option in order to make the fingerprint validator work
Why do you say that? The code as it was prior to the updates I just pushed was not specifying an interval for resolved
, and the fingerprint validator still worked.
Wish there were a way to not flake though.
I noticed that the flake happens not because normal executions of this test are dangerously close to the timeout (they generally finish in under 20 minutes); instead, when the timeout happens, it seems to be for some reason no more resolved timestamps are observed at a certain point. What additional logging or context could we add to the test to make it easier for the CDC team to debug failures if/when this test fails after merge?
Would it be possible to say something like "we need to spend at least 2 minutes in each step of the test and get at least 5 resolved messages"?
I like this idea. I just pushed a change where we'll wait for 5 resolved timestamps in each stage of the test, instead of the previous "sleep for a while to let the workload run" approach.
d875e48
to
5f6b52a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great! I only left a few minor comments.
As I was reading this, I seemed to recall that during the initial brainstorming we thought it might be feasible to extend the existing runVersionUpgrade
with the cdc steps. I now see why that isn't straightforward. First, the boilerplate is rather cumbersome; it'd be great if we could share most of it with the unit test. Second, the non-declarative nature of all the steps makes it difficult to mix elaborate state transitions and validations. As a follow-up to this PR, we should think how this would map into @tbg's "layered testing" framework.
Reviewed 5 of 7 files at r4, 2 of 2 files at r5, all commit messages.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @herkolategan, @HonoreDB, @miretskiy, @renatolabs, @smg260, and @srosenberg)
pkg/cmd/roachtest/tests/cdc.go
line 1836 at r5 (raw file):
} kafka.install(ctx)
Curious if this ever flakes?
pkg/cmd/roachtest/tests/mixed_version_cdc.go
line 131 at r5 (raw file):
// installAndStartWorkload starts a bank workload. This step does not // block; if an error is found while runing the workflow, the test
A bit fuzzy about the error comment. Below we start the workload with --tolerate-errors
, so it will continue to run even when transactions time out or get rejected. Is it actually required that we run with --tolerate-errors
? I'd expect the cluster to be available and making progress at every step.
pkg/cmd/roachtest/tests/mixed_version_cdc.go
line 159 at r5 (raw file):
defer cmvt.timestampsResolved.Unlock() cmvt.timestampsResolved.C = make(chan struct{})
Nit: atomic.Value
has Load/Store
which wouldn't require the lock.
pkg/cmd/roachtest/tests/mixed_version_cdc.go
line 229 at r5 (raw file):
if err != nil { t.Fatal(err) return nil
Nit: control doesn't reach here, but return err
is more consistent. A really dumb linter might be foiled :)
pkg/cmd/roachtest/tests/mixed_version_cdc.go
line 235 at r5 (raw file):
// errors in the calls to the validator below could lead to // transient failures if the cluster is upgrading, so we retry // for 1 minute.
It's now 2 minutes; maybe just kill the last line of the comment?
pkg/cmd/roachtest/tests/mixed_version_cdc.go
line 308 at r5 (raw file):
tester := newCDCMixedVersionTester(ctx, t, c) tester.StartKafka(t, c)
Nit: maybe this bootstrapping step could also go inside newCDCMixedVersionTester
where cockroach
and workload
binaries are bootstrapped?
pkg/cmd/roachtest/tests/mixed_version_cdc.go
line 314 at r5 (raw file):
// test (to do things like change settings, create a changefeed, // etc) sqlNode := tester.crdbNodes[0]
This is usually an anti-pattern because we've actually caught bugs in the sql optimizer which manifest only when a specific node was used. Since we're not decommissioning any nodes during this test, only upgrading/downgrading, we'd expect each node to be reasonably available, right? Could we then round-robin sqlNode
across the steps?
09f4770
to
ce9f5ee
Compare
This commits adds the `cdc/mixed-versions` roachtest. It builds on top of the existing infrastructure for mixed version testing in order to create a test scenario that reproduces the issue observed by DoorDash when upgrading from 21.2 to 22.1.0. Following the pattern present in other mixed version tests, this test starts the cluster at the previous version, upgrades the binaries to the current version, rolls it back, and then finally performs the binary upgrade again and allowing the upgrade to finalize. This roachtest uses the `FingerprintValidator` infrastructure used in CDC unit tests (and also in the currently skipped `cdc/bank` roachtest). This validator gives us strong guarantees that the events produced by a changefeed are correct even in the face of ongoing upgrades. This test fails roachtest on v22.1.0; once the upgrade to v22.1 is finalized, the Kafka consumer won't see further updates, and the test eventually times out. Release justification: test-only change. Release note: None.
ce9f5ee
to
3525705
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One final change I pushed here was to move the retrying logic out of the roachtest and into the fingerprint validator itself (adding a new field to the struct to indicate if we should retry operations). The reasoning is that there was more state in the validator than just the buffer I changed initially; while that change did drastically reduce the probability of a fingerprint mismatch, I was able to still see a mismatch by running it enough times. I'm more confident that the validator's state will stay consistent this way.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @herkolategan, @HonoreDB, @miretskiy, @renatolabs, @smg260, and @srosenberg)
pkg/cmd/roachtest/tests/cdc.go
line 1836 at r5 (raw file):
Previously, srosenberg (Stan Rosenberg) wrote…
Curious if this ever flakes?
I believe it's something that could flake, but with very low probability. At this point I've run this test 100s of times while iterating on it, and I haven't seen this step fail so far.
pkg/cmd/roachtest/tests/mixed_version_cdc.go
line 131 at r5 (raw file):
Previously, srosenberg (Stan Rosenberg) wrote…
A bit fuzzy about the error comment. Below we start the workload with
--tolerate-errors
, so it will continue to run even when transactions time out or get rejected. Is it actually required that we run with--tolerate-errors
? I'd expect the cluster to be available and making progress at every step.
Yeah, I meant errors starting the workload, or errors other than what --tolerate-errors
captures. Since it's confusing, I removed that part of the comment. We need tolerate-errors
otherwise workload
will fail when the node it's connected to is stopped.
pkg/cmd/roachtest/tests/mixed_version_cdc.go
line 159 at r5 (raw file):
Previously, srosenberg (Stan Rosenberg) wrote…
Nit:
atomic.Value
hasLoad/Store
which wouldn't require the lock.
This would work well, but you can't store nil
in an atomic.Value
, so it's not straightforward to indicate that "we don't have a channel to write to" using that type.
pkg/cmd/roachtest/tests/mixed_version_cdc.go
line 229 at r5 (raw file):
Previously, srosenberg (Stan Rosenberg) wrote…
Nit: control doesn't reach here, but
return err
is more consistent. A really dumb linter might be foiled :)
Good catch. I wasn't returning the error in other spots, so I removed the return
statement here.
pkg/cmd/roachtest/tests/mixed_version_cdc.go
line 235 at r5 (raw file):
Previously, srosenberg (Stan Rosenberg) wrote…
It's now 2 minutes; maybe just kill the last line of the comment?
Yep, removed.
pkg/cmd/roachtest/tests/mixed_version_cdc.go
line 314 at r5 (raw file):
Previously, srosenberg (Stan Rosenberg) wrote…
This is usually an anti-pattern because we've actually caught bugs in the sql optimizer which manifest only when a specific node was used. Since we're not decommissioning any nodes during this test, only upgrading/downgrading, we'd expect each node to be reasonably available, right? Could we then round-robin
sqlNode
across the steps?
Good point. Changed it to use a random SQL node for each operation.
bors r=srosenberg TFTR! |
Build succeeded: |
This commits adds the
cdc/mixed-versions
roachtest. It builds on topof the existing infrastructure for mixed version testing in order to
create a test scenario that reproduces the issue observed by DoorDash
when upgrading from 21.2 to 22.1.0.
Following the pattern present in other mixed version tests, this test
starts the cluster at the previous version, upgrades the binaries to
the current version, rolls it back, and then finally performs the
binary upgrade again and allowing the upgrade to finalize.
This roachtest uses the
FingerprintValidator
infrastructure used inCDC unit tests (and also in the currently skipped
cdc/bank
roachtest). This validator gives us strong guarantees that the events
produced by a changefeed are correct even in the face of ongoing
upgrades.
This test fails roachtest on v22.1.0; once the upgrade to v22.1 is
finalized, the Kafka consumer won't see further updates, and the test
eventually times out.
Release justification: test-only changes.
Release note: None.