Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
129434: crosscluster/logical: only use kv writer for inserts for now r=dt a=dt

See inline comment.

Co-authored-by: David Taylor <[email protected]>
  • Loading branch information
craig[bot] and dt committed Aug 22, 2024
2 parents 86856d0 + a6d81e9 commit db79d36
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 13 deletions.
2 changes: 1 addition & 1 deletion docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -1369,7 +1369,7 @@
<tr><td>APPLICATION</td><td>logical_replication.flush_bytes</td><td>Number of bytes in a given flush</td><td>Logical bytes</td><td>HISTOGRAM</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.flush_hist_nanos</td><td>Time spent flushing messages across all replication streams</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.flush_row_count</td><td>Number of rows in a given flush</td><td>Rows</td><td>HISTOGRAM</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.kv_write_failure_count</td><td>Total number of times the kv write path encountered an error resolved by writing via SQL instead</td><td>Events</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.kv_write_fallback_count</td><td>Total number of times the kv write path could not handle a row update and fell back to SQL instead</td><td>Events</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.logical_bytes</td><td>Logical bytes (sum of keys + values) received by all replication jobs</td><td>Bytes</td><td>COUNTER</td><td>BYTES</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.optimistic_insert_conflict_count</td><td>Total number of times the optimistic insert encountered a conflict</td><td>Events</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.replan_count</td><td>Total number of dist sql replanning events</td><td>Events</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ func (lrw *logicalReplicationWriterProcessor) flushBuffer(
}
perChunkStats[worker] = s
lrw.metrics.OptimisticInsertConflictCount.Inc(s.optimisticInsertConflicts)
lrw.metrics.KVWriteFailureCount.Inc(s.kvWriteFailures)
lrw.metrics.KVWriteFallbackCount.Inc(s.kvWriteFallbacks)
return nil
})
}
Expand Down Expand Up @@ -794,7 +794,7 @@ func (lrw *logicalReplicationWriterProcessor) flushChunk(
}
} else {
stats.optimisticInsertConflicts += singleStats.optimisticInsertConflicts
stats.kvWriteFailures += singleStats.kvWriteFailures
stats.kvWriteFallbacks += singleStats.kvWriteFallbacks
batch[i] = streampb.StreamEvent_KV{}
stats.processed.success++
stats.processed.bytes += int64(batch[i].Size())
Expand All @@ -803,7 +803,7 @@ func (lrw *logicalReplicationWriterProcessor) flushChunk(
}
} else {
stats.optimisticInsertConflicts += s.optimisticInsertConflicts
stats.kvWriteFailures += s.kvWriteFailures
stats.kvWriteFallbacks += s.kvWriteFallbacks
stats.processed.success += int64(len(batch))
// Clear the event to indicate successful application.
for i := range batch {
Expand Down Expand Up @@ -873,7 +873,7 @@ func (lrw *logicalReplicationWriterProcessor) dlq(

type batchStats struct {
optimisticInsertConflicts int64
kvWriteFailures int64
kvWriteFallbacks int64
}
type flushStats struct {
processed struct {
Expand All @@ -882,7 +882,7 @@ type flushStats struct {
notProcessed struct {
count, bytes int64
}
optimisticInsertConflicts, kvWriteFailures int64
optimisticInsertConflicts, kvWriteFallbacks int64
}

func (b *flushStats) Add(o flushStats) {
Expand All @@ -892,7 +892,7 @@ func (b *flushStats) Add(o flushStats) {
b.notProcessed.count += o.notProcessed.count
b.notProcessed.bytes += o.notProcessed.bytes
b.optimisticInsertConflicts += o.optimisticInsertConflicts
b.kvWriteFailures += o.kvWriteFailures
b.kvWriteFallbacks += o.kvWriteFallbacks
}

type BatchHandler interface {
Expand Down
15 changes: 14 additions & 1 deletion pkg/ccl/crosscluster/logical/lww_kv_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,23 @@ func (p *kvRowProcessor) ProcessRow(
}
p.lastRow = row

// TODO(dt, ssd): the rangefeed prev value does not include its mvcc ts, which
// is a problem for us if we want to use CPut to replace the old row with the
// new row, because our local version of the old row is likely to have the
// remote version's mvcc timestamp in its origin ts column, i.e. in the value.
// Without knowing the remote previous row's ts, we cannot exactly reconstruct
// the value of our local row to put in the expected value for a CPut.
// Instead, for now, we just don't use the direct CPut for anything other than
// inserts. If/when we have a LDR-flavor CPut (or if we move the TS out and
// decide that equal values negate LWW) we can remove this.
if prevValue.IsPresent() {
return p.fallback.processParsedRow(ctx, txn, row, keyValue.Key, prevValue)
}

if err := p.processParsedRow(ctx, txn, row, keyValue, prevValue); err != nil {
stats, err := p.fallback.processParsedRow(ctx, txn, row, keyValue.Key, prevValue)
if err == nil {
stats.kvWriteFailures += 1
stats.kvWriteFallbacks += 1
}
return stats, err
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/ccl/crosscluster/logical/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ var (
Measurement: "Events",
Unit: metric.Unit_COUNT,
}
metaKVWriteFailureCount = metric.Metadata{
Name: "logical_replication.kv_write_failure_count",
Help: "Total number of times the kv write path encountered an error resolved by writing via SQL instead",
metaKVWriteFallbackCount = metric.Metadata{
Name: "logical_replication.kv_write_fallback_count",
Help: "Total number of times the kv write path could not handle a row update and fell back to SQL instead",
Measurement: "Events",
Unit: metric.Unit_COUNT,
}
Expand Down Expand Up @@ -191,7 +191,7 @@ type Metrics struct {
StreamBatchBytesHist metric.IHistogram
StreamBatchNanosHist metric.IHistogram
OptimisticInsertConflictCount *metric.Counter
KVWriteFailureCount *metric.Counter
KVWriteFallbackCount *metric.Counter
ReplanCount *metric.Counter
}

Expand Down Expand Up @@ -247,7 +247,7 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct {
BucketConfig: metric.IOLatencyBuckets,
}),
OptimisticInsertConflictCount: metric.NewCounter(metaOptimisticInsertConflictCount),
KVWriteFailureCount: metric.NewCounter(metaKVWriteFailureCount),
KVWriteFallbackCount: metric.NewCounter(metaKVWriteFallbackCount),
ReplanCount: metric.NewCounter(metaDistSQLReplanCount),
}
}

0 comments on commit db79d36

Please sign in to comment.