diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html
index 375da030239f..8f9954263de6 100644
--- a/docs/generated/metrics/metrics.html
+++ b/docs/generated/metrics/metrics.html
@@ -1369,7 +1369,7 @@
APPLICATION | logical_replication.flush_bytes | Number of bytes in a given flush | Logical bytes | HISTOGRAM | BYTES | AVG | NONE |
APPLICATION | logical_replication.flush_hist_nanos | Time spent flushing messages across all replication streams | Nanoseconds | HISTOGRAM | NANOSECONDS | AVG | NONE |
APPLICATION | logical_replication.flush_row_count | Number of rows in a given flush | Rows | HISTOGRAM | COUNT | AVG | NONE |
-APPLICATION | logical_replication.kv_write_failure_count | Total number of times the kv write path encountered an error resolved by writing via SQL instead | Events | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
+APPLICATION | logical_replication.kv_write_fallback_count | Total number of times the kv write path could not handle a row update and fell back to SQL instead | Events | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | logical_replication.logical_bytes | Logical bytes (sum of keys + values) received by all replication jobs | Bytes | COUNTER | BYTES | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | logical_replication.optimistic_insert_conflict_count | Total number of times the optimistic insert encountered a conflict | Events | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | logical_replication.replan_count | Total number of dist sql replanning events | Events | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
diff --git a/pkg/ccl/crosscluster/logical/logical_replication_writer_processor.go b/pkg/ccl/crosscluster/logical/logical_replication_writer_processor.go
index 88a1412733d8..02d507556231 100644
--- a/pkg/ccl/crosscluster/logical/logical_replication_writer_processor.go
+++ b/pkg/ccl/crosscluster/logical/logical_replication_writer_processor.go
@@ -648,7 +648,7 @@ func (lrw *logicalReplicationWriterProcessor) flushBuffer(
}
perChunkStats[worker] = s
lrw.metrics.OptimisticInsertConflictCount.Inc(s.optimisticInsertConflicts)
- lrw.metrics.KVWriteFailureCount.Inc(s.kvWriteFailures)
+ lrw.metrics.KVWriteFallbackCount.Inc(s.kvWriteFallbacks)
return nil
})
}
@@ -794,7 +794,7 @@ func (lrw *logicalReplicationWriterProcessor) flushChunk(
}
} else {
stats.optimisticInsertConflicts += singleStats.optimisticInsertConflicts
- stats.kvWriteFailures += singleStats.kvWriteFailures
+ stats.kvWriteFallbacks += singleStats.kvWriteFallbacks
batch[i] = streampb.StreamEvent_KV{}
stats.processed.success++
stats.processed.bytes += int64(batch[i].Size())
@@ -803,7 +803,7 @@ func (lrw *logicalReplicationWriterProcessor) flushChunk(
}
} else {
stats.optimisticInsertConflicts += s.optimisticInsertConflicts
- stats.kvWriteFailures += s.kvWriteFailures
+ stats.kvWriteFallbacks += s.kvWriteFallbacks
stats.processed.success += int64(len(batch))
// Clear the event to indicate successful application.
for i := range batch {
@@ -873,7 +873,7 @@ func (lrw *logicalReplicationWriterProcessor) dlq(
type batchStats struct {
optimisticInsertConflicts int64
- kvWriteFailures int64
+ kvWriteFallbacks int64
}
type flushStats struct {
processed struct {
@@ -882,7 +882,7 @@ type flushStats struct {
notProcessed struct {
count, bytes int64
}
- optimisticInsertConflicts, kvWriteFailures int64
+ optimisticInsertConflicts, kvWriteFallbacks int64
}
func (b *flushStats) Add(o flushStats) {
@@ -892,7 +892,7 @@ func (b *flushStats) Add(o flushStats) {
b.notProcessed.count += o.notProcessed.count
b.notProcessed.bytes += o.notProcessed.bytes
b.optimisticInsertConflicts += o.optimisticInsertConflicts
- b.kvWriteFailures += o.kvWriteFailures
+ b.kvWriteFallbacks += o.kvWriteFallbacks
}
type BatchHandler interface {
diff --git a/pkg/ccl/crosscluster/logical/lww_kv_processor.go b/pkg/ccl/crosscluster/logical/lww_kv_processor.go
index f614b38cddf0..f91f08b80498 100644
--- a/pkg/ccl/crosscluster/logical/lww_kv_processor.go
+++ b/pkg/ccl/crosscluster/logical/lww_kv_processor.go
@@ -112,10 +112,23 @@ func (p *kvRowProcessor) ProcessRow(
}
p.lastRow = row
+ // TODO(dt, ssd): the rangefeed prev value does not include its mvcc ts, which
+ // is a problem for us if we want to use CPut to replace the old row with the
+ // new row, because our local version of the old row is likely to have the
+ // remote version's mvcc timestamp in its origin ts column, i.e. in the value.
+ // Without knowing the remote previous row's ts, we cannot exactly reconstruct
+ // the value of our local row to put in the expected value for a CPut.
+ // Instead, for now, we just don't use the direct CPut for anything other than
+ // inserts. If/when we have a LDR-flavor CPut (or if we move the TS out and
+ // decide that equal values negate LWW) we can remove this.
+ if prevValue.IsPresent() {
+ return p.fallback.processParsedRow(ctx, txn, row, keyValue.Key, prevValue)
+ }
+
if err := p.processParsedRow(ctx, txn, row, keyValue, prevValue); err != nil {
stats, err := p.fallback.processParsedRow(ctx, txn, row, keyValue.Key, prevValue)
if err == nil {
- stats.kvWriteFailures += 1
+ stats.kvWriteFallbacks += 1
}
return stats, err
}
diff --git a/pkg/ccl/crosscluster/logical/metrics.go b/pkg/ccl/crosscluster/logical/metrics.go
index 7bf4ab20d34d..f964dd2b8bc9 100644
--- a/pkg/ccl/crosscluster/logical/metrics.go
+++ b/pkg/ccl/crosscluster/logical/metrics.go
@@ -143,9 +143,9 @@ var (
Measurement: "Events",
Unit: metric.Unit_COUNT,
}
- metaKVWriteFailureCount = metric.Metadata{
- Name: "logical_replication.kv_write_failure_count",
- Help: "Total number of times the kv write path encountered an error resolved by writing via SQL instead",
+ metaKVWriteFallbackCount = metric.Metadata{
+ Name: "logical_replication.kv_write_fallback_count",
+ Help: "Total number of times the kv write path could not handle a row update and fell back to SQL instead",
Measurement: "Events",
Unit: metric.Unit_COUNT,
}
@@ -191,7 +191,7 @@ type Metrics struct {
StreamBatchBytesHist metric.IHistogram
StreamBatchNanosHist metric.IHistogram
OptimisticInsertConflictCount *metric.Counter
- KVWriteFailureCount *metric.Counter
+ KVWriteFallbackCount *metric.Counter
ReplanCount *metric.Counter
}
@@ -247,7 +247,7 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct {
BucketConfig: metric.IOLatencyBuckets,
}),
OptimisticInsertConflictCount: metric.NewCounter(metaOptimisticInsertConflictCount),
- KVWriteFailureCount: metric.NewCounter(metaKVWriteFailureCount),
+ KVWriteFallbackCount: metric.NewCounter(metaKVWriteFallbackCount),
ReplanCount: metric.NewCounter(metaDistSQLReplanCount),
}
}