Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
132443: crosscluster/logical: add KV write path metrics r=msbutler a=stevendanna

It would be nice to know how often we are hitting these particular errors.

Epic: none
Release note: None

133092: roachtest: disable 23.1 -> 23.2 testing for follower reads r=arulajmani a=andrewbaptist

After introducing shared process testing of follower reads, the tests follower-reads/mixed-version/* have been flaky. This commit starts testing from 23.2 to make sure that any other failures are caught and triaged correctly.

Epic: none
Fixes: #133000
Fixes: #132999
Fixes: #129546
Fixes: #129167

Release note: None

133273: ui: bump cluster-ui to 24.3.0-prerelease.4 r=xinhaoz a=xinhaoz

This commit bumps cluster-ui version to 24.3.0-prerelease.4 and moves crdb-protobuf-client pkg from dependencies back to dev dependencies in an attempt to resolve build issues when exporting the package.

Epic: none

Release note: None

Co-authored-by: Steven Danna <[email protected]>
Co-authored-by: Andrew Baptist <[email protected]>
Co-authored-by: Xin Hao Zhang <[email protected]>
  • Loading branch information
4 people committed Oct 23, 2024
4 parents 1c9e30f + 96c82bf + 30a21f3 + 77c735e commit 9e6a644
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 19 deletions.
2 changes: 2 additions & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -1492,6 +1492,8 @@
<tr><td>APPLICATION</td><td>logical_replication.events_initial_success</td><td>Successful applications of an incoming row update</td><td>Failures</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.events_retry_failure</td><td>Failed re-attempts to apply a row update</td><td>Failures</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.events_retry_success</td><td>Row update events applied after one or more retries</td><td>Failures</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.kv.update_too_old</td><td>Total number of updates that were not applied because they were too old</td><td>Events</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.kv.value_refreshes</td><td>Total number of batches that refreshed the previous value</td><td>Events</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.logical_bytes</td><td>Logical bytes (sum of keys + values) received by all replication jobs</td><td>Bytes</td><td>COUNTER</td><td>BYTES</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.replan_count</td><td>Total number of dist sql replanning events</td><td>Events</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.replicated_time_by_label</td><td>Replicated time of the logical replication stream by label</td><td>Seconds</td><td>COUNTER</td><td>SECONDS</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,8 @@ func (lrw *logicalReplicationWriterProcessor) flushBuffer(
flushTime := timeutil.Since(preFlushTime).Nanoseconds()
lrw.debug.RecordFlushComplete(flushTime, int64(len(kvs)), stats.processed.bytes)

lrw.metrics.KVUpdateTooOld.Inc(stats.kvWriteTooOld)
lrw.metrics.KVValueRefreshes.Inc(stats.kvWriteValueRefreshes)
lrw.metrics.AppliedRowUpdates.Inc(stats.processed.success)
lrw.metrics.DLQedRowUpdates.Inc(stats.processed.dlq)
if l := lrw.spec.MetricsLabel; l != "" {
Expand Down Expand Up @@ -834,17 +836,15 @@ func (lrw *logicalReplicationWriterProcessor) flushChunk(
stats.notProcessed.bytes += int64(batch[i].Size())
}
} else {
stats.optimisticInsertConflicts += singleStats.optimisticInsertConflicts
stats.kvWriteFallbacks += singleStats.kvWriteFallbacks
stats.batchStats.Add(singleStats)
batch[i] = streampb.StreamEvent_KV{}
stats.processed.success++
stats.processed.bytes += int64(batch[i].Size())
}
}
}
} else {
stats.optimisticInsertConflicts += s.optimisticInsertConflicts
stats.kvWriteFallbacks += s.kvWriteFallbacks
stats.batchStats.Add(s)
stats.processed.success += int64(len(batch))
// Clear the event to indicate successful application.
for i := range batch {
Expand Down Expand Up @@ -919,16 +919,25 @@ func (lrw *logicalReplicationWriterProcessor) dlq(

type batchStats struct {
optimisticInsertConflicts int64
kvWriteFallbacks int64
kvWriteTooOld int64
kvWriteValueRefreshes int64
}

func (b *batchStats) Add(o batchStats) {
b.optimisticInsertConflicts += o.optimisticInsertConflicts
b.kvWriteTooOld += o.kvWriteTooOld
b.kvWriteValueRefreshes += o.kvWriteValueRefreshes
}

type flushStats struct {
processed struct {
success, dlq, bytes int64
}
notProcessed struct {
count, bytes int64
}
optimisticInsertConflicts, kvWriteFallbacks int64

batchStats
}

func (b *flushStats) Add(o flushStats) {
Expand All @@ -937,8 +946,7 @@ func (b *flushStats) Add(o flushStats) {
b.processed.bytes += o.processed.bytes
b.notProcessed.count += o.notProcessed.count
b.notProcessed.bytes += o.notProcessed.bytes
b.optimisticInsertConflicts += o.optimisticInsertConflicts
b.kvWriteFallbacks += o.kvWriteFallbacks
b.batchStats.Add(o.batchStats)
}

type BatchHandler interface {
Expand Down
12 changes: 8 additions & 4 deletions pkg/ccl/crosscluster/logical/lww_kv_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,11 @@ func (p *kvRowProcessor) ProcessRow(
return batchStats{}, err
}

if err := p.processParsedRow(ctx, txn, row, keyValue, prevValue, 0); err != nil {
return batchStats{}, err
var s batchStats
if err := p.processParsedRow(ctx, txn, row, keyValue, prevValue, &s, 0); err != nil {
return s, err
}
return batchStats{}, nil
return s, nil

}

Expand All @@ -123,6 +124,7 @@ func (p *kvRowProcessor) processParsedRow(
row cdcevent.Row,
k roachpb.KeyValue,
prevValue roachpb.Value,
s *batchStats,
refreshCount int,
) error {
dstTableID, ok := p.dstBySrc[row.TableID]
Expand Down Expand Up @@ -152,6 +154,7 @@ func (p *kvRowProcessor) processParsedRow(
// loser. We ignore the error and move onto the next row row we have
// to process.
if condErr.OriginTimestampOlderThan.IsSet() {
s.kvWriteTooOld++
return nil
}
// If HadNewerOriginTimestamp is true, it implies that the row we
Expand All @@ -175,11 +178,12 @@ func (p *kvRowProcessor) processParsedRow(
if refreshCount > maxRefreshCount {
return errors.Wrapf(err, "max refresh count (%d) reached", maxRefreshCount)
}
s.kvWriteValueRefreshes++
var refreshedValue roachpb.Value
if condErr.ActualValue != nil {
refreshedValue = *condErr.ActualValue
}
return p.processParsedRow(ctx, txn, row, k, refreshedValue, refreshCount+1)
return p.processParsedRow(ctx, txn, row, k, refreshedValue, s, refreshCount+1)
}
}
return err
Expand Down
21 changes: 19 additions & 2 deletions pkg/ccl/crosscluster/logical/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,18 @@ var (
Measurement: "Events",
Unit: metric.Unit_COUNT,
}
metaKVUpdateTooOld = metric.Metadata{
Name: "logical_replication.kv.update_too_old",
Help: "Total number of updates that were not applied because they were too old",
Measurement: "Events",
Unit: metric.Unit_COUNT,
}
metaKVValueRefreshes = metric.Metadata{
Name: "logical_replication.kv.value_refreshes",
Help: "Total number of batches that refreshed the previous value",
Measurement: "Events",
Unit: metric.Unit_COUNT,
}

// Labeled metrics.
metaLabeledReplicatedTime = metric.Metadata{
Expand Down Expand Up @@ -174,6 +186,8 @@ type Metrics struct {
// a specific way.
CheckpointEvents *metric.Counter
ReplanCount *metric.Counter
KVValueRefreshes *metric.Counter
KVUpdateTooOld *metric.Counter

// Labeled export-only metrics.
LabeledReplicatedTime *metric.GaugeVec
Expand Down Expand Up @@ -213,8 +227,11 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct {
InitialApplyFailures: metric.NewCounter(metaInitialApplyFailures),
RetriedApplySuccesses: metric.NewCounter(metaRetriedApplySuccesses),
RetriedApplyFailures: metric.NewCounter(metaRetriedApplyFailures),
CheckpointEvents: metric.NewCounter(metaCheckpointEvents),
ReplanCount: metric.NewCounter(metaDistSQLReplanCount),

CheckpointEvents: metric.NewCounter(metaCheckpointEvents),
ReplanCount: metric.NewCounter(metaDistSQLReplanCount),
KVUpdateTooOld: metric.NewCounter(metaKVUpdateTooOld),
KVValueRefreshes: metric.NewCounter(metaKVValueRefreshes),

// Labeled export-only metrics.
LabeledReplicatedTime: metric.NewExportedGaugeVec(metaLabeledReplicatedTime, []string{"label"}),
Expand Down
7 changes: 7 additions & 0 deletions pkg/cmd/roachtest/tests/follower_reads.go
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,13 @@ func runFollowerReadsMixedVersionGlobalTableTest(
// Use a longer upgrade timeout to give the migrations enough time to finish
// considering the cross-region latency.
mixedversion.UpgradeTimeout(60*time.Minute),

// This test is flaky when upgrading from v23.1 to v23.2 for follower
// reads in shared-process deployments. There were a number of changes
// to tenant health checks since then which appear to have addressed
// this issue.
mixedversion.MinimumSupportedVersion("v23.2.0"),

// This test does not currently work with shared-process
// deployments (#129167), so we do not run it in separate-process
// mode either to reduce noise. We should reevaluate once the test
Expand Down
6 changes: 3 additions & 3 deletions pkg/ui/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/ui/workspaces/cluster-ui/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@cockroachlabs/cluster-ui",
"version": "24.3.0-prerelease.3",
"version": "24.3.0-prerelease.4",
"description": "Cluster UI is a library of large features shared between CockroachDB and CockroachCloud",
"repository": {
"type": "git",
Expand Down Expand Up @@ -33,7 +33,6 @@
"dependencies": {
"@ant-design/icons": "^5.3.6",
"@babel/runtime": "^7.12.13",
"@cockroachlabs/crdb-protobuf-client": "workspace:../db-console/src/js",
"@cockroachlabs/design-tokens": "0.4.5",
"@cockroachlabs/icons": "0.5.2",
"@cockroachlabs/ui-components": "0.4.3",
Expand Down Expand Up @@ -90,6 +89,7 @@
"@babel/preset-typescript": "^7.8.0",
"@bazel/typescript": "5.5.0",
"@bazel/worker": "5.5.0",
"@cockroachlabs/crdb-protobuf-client": "workspace:../db-console/src/js",
"@cockroachlabs/eslint-config": "1.0.7",
"@cockroachlabs/eslint-plugin-crdb": "workspace:../eslint-plugin-crdb",
"@storybook/addon-actions": "^6.5.16",
Expand Down

0 comments on commit 9e6a644

Please sign in to comment.