Skip to content

Commit

Permalink
rangefeed: assert intent commits above resolved timestamp
Browse files Browse the repository at this point in the history
Epic: none
Release note: None
  • Loading branch information
erikgrinaker committed Jan 30, 2024
1 parent f68bef8 commit 9a33cbc
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 58 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,7 @@ func (p *LegacyProcessor) consumeLogicalOps(

// Determine whether the operation caused the resolved timestamp to
// move forward. If so, publish a RangeFeedCheckpoint notification.
if p.rts.ConsumeLogicalOp(op) {
if p.rts.ConsumeLogicalOp(ctx, op) {
p.publishCheckpoint(ctx)
}
}
Expand Down
40 changes: 31 additions & 9 deletions pkg/kv/kvserver/rangefeed/resolved_timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ package rangefeed

import (
"bytes"
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/container/heap"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -129,32 +131,40 @@ func (rts *resolvedTimestamp) ForwardClosedTS(newClosedTS hlc.Timestamp) bool {
// operation within its range of tracked keys. This allows the structure to
// update its internal intent tracking to reflect the change. The method returns
// whether this caused the resolved timestamp to move forward.
func (rts *resolvedTimestamp) ConsumeLogicalOp(op enginepb.MVCCLogicalOp) bool {
if rts.consumeLogicalOp(op) {
func (rts *resolvedTimestamp) ConsumeLogicalOp(
ctx context.Context, op enginepb.MVCCLogicalOp,
) bool {
if rts.consumeLogicalOp(ctx, op) {
return rts.recompute()
}
rts.assertNoChange()
return false
}

func (rts *resolvedTimestamp) consumeLogicalOp(op enginepb.MVCCLogicalOp) bool {
func (rts *resolvedTimestamp) consumeLogicalOp(
ctx context.Context, op enginepb.MVCCLogicalOp,
) bool {
switch t := op.GetValue().(type) {
case *enginepb.MVCCWriteValueOp:
rts.assertOpAboveRTS(op, t.Timestamp)
rts.assertOpAboveRTS(ctx, op, t.Timestamp, true /* fatal */)
return false

case *enginepb.MVCCDeleteRangeOp:
rts.assertOpAboveRTS(op, t.Timestamp)
rts.assertOpAboveRTS(ctx, op, t.Timestamp, true /* fatal */)
return false

case *enginepb.MVCCWriteIntentOp:
rts.assertOpAboveRTS(op, t.Timestamp)
rts.assertOpAboveRTS(ctx, op, t.Timestamp, true /* fatal */)
return rts.intentQ.IncRef(t.TxnID, t.TxnKey, t.TxnIsoLevel, t.TxnMinTimestamp, t.Timestamp)

case *enginepb.MVCCUpdateIntentOp:
return rts.intentQ.UpdateTS(t.TxnID, t.Timestamp)

case *enginepb.MVCCCommitIntentOp:
// This assertion can be violated in mixed-version clusters prior
// to 24.1, so make it non-fatal for now. See:
// https://github.com/cockroachdb/cockroach/issues/104309
rts.assertOpAboveRTS(ctx, op, t.Timestamp, false /* fatal */)
return rts.intentQ.DecrRef(t.TxnID, t.Timestamp)

case *enginepb.MVCCAbortIntentOp:
Expand Down Expand Up @@ -265,10 +275,22 @@ func (rts *resolvedTimestamp) assertNoChange() {
// assertOpAboveTimestamp asserts that this operation is at a larger timestamp
// than the current resolved timestamp. A violation of this assertion would
// indicate a failure of the closed timestamp mechanism.
func (rts *resolvedTimestamp) assertOpAboveRTS(op enginepb.MVCCLogicalOp, opTS hlc.Timestamp) {
func (rts *resolvedTimestamp) assertOpAboveRTS(
ctx context.Context, op enginepb.MVCCLogicalOp, opTS hlc.Timestamp, fatal bool,
) {
if opTS.LessEq(rts.resolvedTS) {
panic(fmt.Sprintf("resolved timestamp %s equal to or above timestamp of operation %v",
rts.resolvedTS, op))
// NB: MVCCLogicalOp.String() is only implemented for pointer receiver.
// We shadow the variable to avoid it escaping to the heap.
op := op
err := errors.AssertionFailedf(
"resolved timestamp %s equal to or above timestamp of operation %v", rts.resolvedTS, &op)
if fatal {
// TODO(erikgrinaker): use log.Fatalf. Panic for now, since tests expect
// it and to minimize code churn for backports.
panic(err)
} else {
log.Errorf(ctx, "%v", err)
}
}
}

Expand Down
Loading

0 comments on commit 9a33cbc

Please sign in to comment.