Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
124195: roachtest: add order validation to CDC Kafka roachtests r=rharding6373 a=andyyang890

This patch adds order validation to CDC Kafka roachtests so that we
can build more confidence in our ordering guarantees. It can be enabled
for a roachtest either by directly setting the `validateOrder` flag on a
`kafkaManager` before creating consumers, or indirectly by setting the
`validateOrder` flag on `kafkaFeedArgs` for tests that use `cdcTester`.

Informs cockroachdb#124148

Release note: None

Co-authored-by: Andy Yang <[email protected]>
  • Loading branch information
craig[bot] and andyyang890 committed May 17, 2024
2 parents 590c61f + 3428726 commit d8073b5
Show file tree
Hide file tree
Showing 4 changed files with 228 additions and 82 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/cdctest/nemeses.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func RunNemesis(
if err != nil {
return nil, err
}
ns.v = MakeCountValidator(Validators{
ns.v = NewCountValidator(Validators{
NewOrderValidator(`foo`),
baV,
fprintV,
Expand Down
28 changes: 15 additions & 13 deletions pkg/ccl/changefeedccl/cdctest/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,11 @@ func (v *orderValidator) NoteRow(partition string, key, value string, updated hl
})
seen := timestampsIdx < len(timestampValueTuples) &&
timestampValueTuples[timestampsIdx].ts == updated
if seen {
return nil
}

if !seen && len(timestampValueTuples) > 0 &&
if len(timestampValueTuples) > 0 &&
updated.Less(timestampValueTuples[len(timestampValueTuples)-1].ts) {
v.failures = append(v.failures, fmt.Sprintf(
`topic %s partition %s: saw new row timestamp %s after %s was seen`,
Expand All @@ -152,21 +155,20 @@ func (v *orderValidator) NoteRow(partition string, key, value string, updated hl
timestampValueTuples[len(timestampValueTuples)-1].ts.AsOfSystemTime(),
))
}
if !seen && updated.Less(v.resolved[partition]) {
latestResolved := v.resolved[partition]
if updated.Less(latestResolved) {
v.failures = append(v.failures, fmt.Sprintf(
`topic %s partition %s: saw new row timestamp %s after %s was resolved`,
v.topic, partition, updated.AsOfSystemTime(), v.resolved[partition].AsOfSystemTime(),
v.topic, partition, updated.AsOfSystemTime(), latestResolved.AsOfSystemTime(),
))
}

if !seen {
v.keyTimestampAndValues[key] = append(
append(timestampValueTuples[:timestampsIdx], timestampValue{
ts: updated,
value: value,
}),
timestampValueTuples[timestampsIdx:]...)
}
v.keyTimestampAndValues[key] = append(
append(timestampValueTuples[:timestampsIdx], timestampValue{
ts: updated,
value: value,
}),
timestampValueTuples[timestampsIdx:]...)
return nil
}

Expand Down Expand Up @@ -675,8 +677,8 @@ type CountValidator struct {
rowsSinceResolved int
}

// MakeCountValidator returns a CountValidator wrapping the given Validator.
func MakeCountValidator(v Validator) *CountValidator {
// NewCountValidator returns a CountValidator wrapping the given Validator.
func NewCountValidator(v Validator) *CountValidator {
return &CountValidator{v: v}
}

Expand Down
Loading

0 comments on commit d8073b5

Please sign in to comment.