Skip to content

Commit

Permalink
rangefeed: assert intent commits above resolved timestamp
Browse files Browse the repository at this point in the history
Epic: none
Release note: None
  • Loading branch information
erikgrinaker committed Jan 16, 2024
1 parent 512cd29 commit 94ce567
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 13 deletions.
14 changes: 12 additions & 2 deletions pkg/kv/kvserver/rangefeed/resolved_timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@ package rangefeed

import (
"bytes"
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/container/heap"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -81,11 +84,13 @@ type resolvedTimestamp struct {
closedTS hlc.Timestamp
resolvedTS hlc.Timestamp
intentQ unresolvedIntentQueue
settings *cluster.Settings
}

func makeResolvedTimestamp() resolvedTimestamp {
func makeResolvedTimestamp(st *cluster.Settings) resolvedTimestamp {
return resolvedTimestamp{
intentQ: makeUnresolvedIntentQueue(),
intentQ: makeUnresolvedIntentQueue(),
settings: st,
}
}

Expand Down Expand Up @@ -155,6 +160,11 @@ func (rts *resolvedTimestamp) consumeLogicalOp(op enginepb.MVCCLogicalOp) bool {
return rts.intentQ.UpdateTS(t.TxnID, t.Timestamp)

case *enginepb.MVCCCommitIntentOp:
// This assertion can be violated in mixed-version clusters, so gate it
// on 24.1. See: https://github.com/cockroachdb/cockroach/issues/104309
if rts.settings.Version.IsActive(context.Background(), clusterversion.V24_1Start) {
rts.assertOpAboveRTS(op, t.Timestamp)
}
return rts.intentQ.DecrRef(t.TxnID, t.Timestamp)

case *enginepb.MVCCAbortIntentOp:
Expand Down
21 changes: 11 additions & 10 deletions pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -179,7 +180,7 @@ func TestUnresolvedIntentQueue(t *testing.T) {

func TestResolvedTimestamp(t *testing.T) {
defer leaktest.AfterTest(t)()
rts := makeResolvedTimestamp()
rts := makeResolvedTimestamp(cluster.MakeTestingClusterSettings())
rts.Init()

// Test empty resolved timestamp.
Expand Down Expand Up @@ -351,7 +352,7 @@ func TestResolvedTimestamp(t *testing.T) {

func TestResolvedTimestampNoClosedTimestamp(t *testing.T) {
defer leaktest.AfterTest(t)()
rts := makeResolvedTimestamp()
rts := makeResolvedTimestamp(cluster.MakeTestingClusterSettings())
rts.Init()

// Add a value. No closed timestamp so no resolved timestamp.
Expand Down Expand Up @@ -389,7 +390,7 @@ func TestResolvedTimestampNoClosedTimestamp(t *testing.T) {

func TestResolvedTimestampNoIntents(t *testing.T) {
defer leaktest.AfterTest(t)()
rts := makeResolvedTimestamp()
rts := makeResolvedTimestamp(cluster.MakeTestingClusterSettings())
rts.Init()

// Set a closed timestamp. Resolved timestamp advances.
Expand Down Expand Up @@ -422,7 +423,7 @@ func TestResolvedTimestampInit(t *testing.T) {
defer leaktest.AfterTest(t)()

t.Run("CT Before Init", func(t *testing.T) {
rts := makeResolvedTimestamp()
rts := makeResolvedTimestamp(cluster.MakeTestingClusterSettings())

// Set a closed timestamp. Not initialized so no resolved timestamp.
fwd := rts.ForwardClosedTS(hlc.Timestamp{WallTime: 5})
Expand All @@ -435,7 +436,7 @@ func TestResolvedTimestampInit(t *testing.T) {
require.Equal(t, hlc.Timestamp{WallTime: 5}, rts.Get())
})
t.Run("No CT Before Init", func(t *testing.T) {
rts := makeResolvedTimestamp()
rts := makeResolvedTimestamp(cluster.MakeTestingClusterSettings())

// Add an intent. Not initialized so no resolved timestamp.
txn1 := uuid.MakeV4()
Expand All @@ -449,7 +450,7 @@ func TestResolvedTimestampInit(t *testing.T) {
require.Equal(t, hlc.Timestamp{}, rts.Get())
})
t.Run("Write Before Init", func(t *testing.T) {
rts := makeResolvedTimestamp()
rts := makeResolvedTimestamp(cluster.MakeTestingClusterSettings())

// Add an intent. Not initialized so no resolved timestamp.
txn1 := uuid.MakeV4()
Expand All @@ -468,7 +469,7 @@ func TestResolvedTimestampInit(t *testing.T) {
require.Equal(t, hlc.Timestamp{WallTime: 2}, rts.Get())
})
t.Run("Abort + Write Before Init", func(t *testing.T) {
rts := makeResolvedTimestamp()
rts := makeResolvedTimestamp(cluster.MakeTestingClusterSettings())

// Abort an intent. Not initialized so no resolved timestamp.
txn1 := uuid.MakeV4()
Expand Down Expand Up @@ -500,7 +501,7 @@ func TestResolvedTimestampInit(t *testing.T) {
require.Equal(t, hlc.Timestamp{WallTime: 5}, rts.Get())
})
t.Run("Abort Before Init, No Write", func(t *testing.T) {
rts := makeResolvedTimestamp()
rts := makeResolvedTimestamp(cluster.MakeTestingClusterSettings())

// Abort an intent. Not initialized so no resolved timestamp.
txn1 := uuid.MakeV4()
Expand All @@ -516,7 +517,7 @@ func TestResolvedTimestampInit(t *testing.T) {

func TestResolvedTimestampTxnAborted(t *testing.T) {
defer leaktest.AfterTest(t)()
rts := makeResolvedTimestamp()
rts := makeResolvedTimestamp(cluster.MakeTestingClusterSettings())
rts.Init()

// Set a closed timestamp. Resolved timestamp advances.
Expand Down Expand Up @@ -572,7 +573,7 @@ func TestResolvedTimestampTxnAborted(t *testing.T) {
func TestClosedTimestampLogicalPart(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
rts := makeResolvedTimestamp()
rts := makeResolvedTimestamp(cluster.MakeTestingClusterSettings())
rts.Init()

// Set a new closed timestamp. Resolved timestamp advances.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/rangefeed/scheduled_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func NewScheduledProcessor(cfg Config) *ScheduledProcessor {
Config: cfg,
scheduler: cfg.Scheduler.NewClientScheduler(),
reg: makeRegistry(cfg.Metrics),
rts: makeResolvedTimestamp(),
rts: makeResolvedTimestamp(cfg.Settings),
processCtx: cfg.AmbientContext.AnnotateCtx(context.Background()),

requestQueue: make(chan request, 20),
Expand Down

0 comments on commit 94ce567

Please sign in to comment.