Skip to content

Commit

Permalink
changefeedccl: roachtest refactor and initial-scan-only
Browse files Browse the repository at this point in the history
Changefeed roachtests were setup focused on running a workload for a
specific duration and then quitting, making it difficult to run an
`initial_scan_only` test that terminated upon Job success.

We as a team have also noticed a greater need to test and observe
changefeeds running in production against real sinks to catch issues we
are unable to mock or observe from simple unit tests. This is currently
a notable hassle as one has to set up each individual sink and run them,
ensure the changefeed is pointing to the right URI, and then be able to
monitor the metrics of this long running process.

This change refactors the cdcBasicTest into distinct pieces that are
then put together in a test.  This allows for easier experimentation
with live tests, allowing us to spin up a cluster and a workload, run
one or more changefeeds on it, set up a poller to print out job
details,have an accessible grafana URL to view metrics, and wait for
some completion condition.

Changing the specialized `runCDCKafkaAuth`, `runCDCBank`, and
`runCDCSchemaRegistry` functions were left out of scope for this first
big change.

The main APIs involved in basic roachtests are now:
- `newCDCTester`: This creates a tester struct to run the rest of the
  APIs and initializes the database
- `tester.runTPCCWorkload(tpccArgs)`: Starts a TPCC workload from the
  last node in the cluster
- `tester.runLedgerWorkload(ledgerArgs)`: Starts a Ledger workload from
  the last node in the cluster
- `tester.newChangefeed(feedArgs)`: starts a new changefeed on the
  cluster and returns `changefeedJob` object
- `tester.runFeedLatencyVerifier(changefeedJob, latencyTargets)`: starts
  a routine that monitors the changefeed latency until the tester is
  `Close`'d
- `tester.waitForWorkload`: waits for a workload started by
  `setupAndRunWorkload` to complete its duration
- `changefeedJob.waitForCompletion`: waits for a changefeed to complete
  (either success or failure)
- `tester.startCRDBChaos`: This starts a Chaos routine that periodically
  shuts nodes down and brings them back up

APIs that are going to be more useful for experimentation are:
- `tester.startGrafana`: Sets up a grafana instance on the last node of
  the cluster and prints out a link to a grafana dashboard with some
  basic changefeed metrics
- `changefeedJob.runFeedPoller(ctx, stopper, onInfo)`: runs a given
  callback every second with the changefeed info

Roachtests can be ran locally with the `--local` flag or on an existing
cluster without destroying it afterwards with `--cluster="my-cluter"
--debug`

Ex: After adding a new test (lets say "cdc/my-test") to the registerCDC
function you can keep running

```
./dev build cockroach --cross # if changes made to crdb
./dev build roachtest         # if changes made to the test

./bin/roachtest run cdc/my-test --cluster="my-cluster" --debug
```

as you try out different changes or options. If you want to try a set of
steps against different versions of the app you could download those
binaries and use the --cockroach="path-to-binary" flag to test against
those instead.

If you want to set up a large TPCC database on a cluster and reuse it
for tests this can be done with roachtests's --wipe and --skip-init
flags.

Release note: None
  • Loading branch information
samiskin committed Nov 23, 2022
1 parent e6b30e1 commit 46c75a5
Show file tree
Hide file tree
Showing 5 changed files with 619 additions and 345 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,7 @@ func (ca *changeAggregator) emitResolved(batch jobspb.ResolvedSpans) error {
rowenc.EncDatum{Datum: tree.DNull}, // key
rowenc.EncDatum{Datum: tree.DNull}, // value
})
ca.metrics.ResolvedMessages.Inc(1)

ca.recentKVCount = 0
return nil
Expand Down
Loading

0 comments on commit 46c75a5

Please sign in to comment.