diff --git a/pkg/kv/kvserver/rangefeed/BUILD.bazel b/pkg/kv/kvserver/rangefeed/BUILD.bazel index e91de4b69de4..a747e871aeff 100644 --- a/pkg/kv/kvserver/rangefeed/BUILD.bazel +++ b/pkg/kv/kvserver/rangefeed/BUILD.bazel @@ -5,6 +5,7 @@ go_library( srcs = [ "budget.go", "catchup_scan.go", + "event_size.go", "filter.go", "metrics.go", "processor.go", @@ -56,6 +57,7 @@ go_test( "budget_test.go", "catchup_scan_bench_test.go", "catchup_scan_test.go", + "event_size_test.go", "processor_test.go", "registry_test.go", "resolved_timestamp_test.go", @@ -76,6 +78,7 @@ go_test( "//pkg/storage/enginepb", "//pkg/testutils", "//pkg/testutils/skip", + "//pkg/testutils/storageutils", "//pkg/util", "//pkg/util/encoding", "//pkg/util/future", diff --git a/pkg/kv/kvserver/rangefeed/budget.go b/pkg/kv/kvserver/rangefeed/budget.go index 6dc92415acdb..b1a6d676138a 100644 --- a/pkg/kv/kvserver/rangefeed/budget.go +++ b/pkg/kv/kvserver/rangefeed/budget.go @@ -148,6 +148,7 @@ func NewFeedBudget(budget *mon.BoundAccount, limit int64, settings *settings.Val stopC: make(chan interface{}), limit: limit, settings: settings, + // add more fields to cache span size here } f.mu.memBudget = budget return f diff --git a/pkg/kv/kvserver/rangefeed/event_size.go b/pkg/kv/kvserver/rangefeed/event_size.go new file mode 100644 index 000000000000..ea1330e2d26d --- /dev/null +++ b/pkg/kv/kvserver/rangefeed/event_size.go @@ -0,0 +1,242 @@ +// Copyright 2018 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed + +import ( + "context" + "unsafe" + + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" +) + +const ( + mvccLogicalOp = int64(unsafe.Sizeof(enginepb.MVCCLogicalOp{})) + mvccWriteValueOp = int64(unsafe.Sizeof(enginepb.MVCCWriteValueOp{})) + mvccDeleteRangeOp = int64(unsafe.Sizeof(enginepb.MVCCDeleteRangeOp{})) + mvccWriteIntentOp = int64(unsafe.Sizeof(enginepb.MVCCWriteIntentOp{})) + mvccUpdateIntentOp = int64(unsafe.Sizeof(enginepb.MVCCUpdateIntentOp{})) + mvccCommitIntentOp = int64(unsafe.Sizeof(enginepb.MVCCCommitIntentOp{})) + mvccAbortIntentOp = int64(unsafe.Sizeof(enginepb.MVCCAbortIntentOp{})) + mvccAbortTxnOp = int64(unsafe.Sizeof(enginepb.MVCCAbortTxnOp{})) +) + +const ( + eventOverhead = int64(unsafe.Sizeof(&event{})) + int64(unsafe.Sizeof(event{})) +) + +const ( + sharedEventPtrOverhead = int64(unsafe.Sizeof(&sharedEvent{})) + sharedEventOverhead = int64(unsafe.Sizeof(sharedEvent{})) + rangeFeedEventOverhead = int64(unsafe.Sizeof(kvpb.RangeFeedEvent{})) + allocEventOverhead = int64(unsafe.Sizeof(SharedBudgetAllocation{})) + feedBudgetOverhead = int64(unsafe.Sizeof(FeedBudget{})) + futureEventBaseOverhead = sharedEventPtrOverhead + sharedEventOverhead + rangeFeedEventOverhead + allocEventOverhead + feedBudgetOverhead +) + +const ( + rangefeedValueOverhead = int64(unsafe.Sizeof(kvpb.RangeFeedValue{})) + rangefeedDeleteRangeOverhead = int64(unsafe.Sizeof(kvpb.RangeFeedDeleteRange{})) + rangefeedCheckpointOverhead = int64(unsafe.Sizeof(kvpb.RangeFeedCheckpoint{})) + rangefeedSSTTableOverhead = int64(unsafe.Sizeof(kvpb.RangeFeedSSTable{})) +) + +const ( + sstEventOverhead = int64(unsafe.Sizeof(sstEvent{})) + syncEventOverhead = int64(unsafe.Sizeof(syncEvent{})) +) + +func deleteRangeFeedEvent( + startKey, endKey roachpb.Key, timestamp hlc.Timestamp, +) kvpb.RangeFeedEvent { + span := roachpb.Span{Key: startKey, EndKey: endKey} + var event kvpb.RangeFeedEvent + event.MustSetValue(&kvpb.RangeFeedDeleteRange{ + Span: span, + Timestamp: timestamp, + }) + return event +} + +func valueRangeFeedEvent( + key roachpb.Key, timestamp hlc.Timestamp, value, prevValue []byte, +) kvpb.RangeFeedEvent { + var prevVal roachpb.Value + if prevValue != nil { + prevVal.RawBytes = prevValue + } + var event kvpb.RangeFeedEvent + event.MustSetValue(&kvpb.RangeFeedValue{ + Key: key, + Value: roachpb.Value{ + RawBytes: value, + Timestamp: timestamp, + }, + PrevValue: prevVal, + }) + return event +} + +func checkpointRangeFeedEvent(span roachpb.RSpan, ts resolvedTimestamp) kvpb.RangeFeedEvent { + var event kvpb.RangeFeedEvent + event.MustSetValue(&kvpb.RangeFeedCheckpoint{ + Span: span.AsRawSpanWithNoLocals(), + ResolvedTS: ts.Get(), + }) + return event +} + +func sstRangeFeedEvent(sst []byte, sstSpan roachpb.Span, sstWTS hlc.Timestamp) kvpb.RangeFeedEvent { + var event kvpb.RangeFeedEvent + event.MustSetValue(&kvpb.RangeFeedSSTable{ + Data: sst, + Span: sstSpan, + WriteTS: sstWTS, + }) + return event +} + +func (ct ctEvent) futureMemUsage(span roachpb.RSpan, rts resolvedTimestamp) int64 { + if rts.ForwardClosedTS(context.Background(), ct.Timestamp) { + return rangefeedEventMemUsage(checkpointRangeFeedEvent(span, rts)) + } + return 0 +} + +func (ct ctEvent) memUsage() int64 { + return 0 +} + +func (initRTS initRTSEvent) memUsage() int64 { + return 0 +} + +func (initRTS initRTSEvent) futureMemUsage(span roachpb.RSpan, rts resolvedTimestamp) int64 { + // May or may not publish it + if rts.Init(context.Background()) { + return rangefeedEventMemUsage(checkpointRangeFeedEvent(span, rts)) + } + return 0 +} + +func (sst sstEvent) memUsage() int64 { + return sstEventOverhead + int64(cap(sst.data)+cap(sst.span.Key)+cap(sst.span.EndKey)) +} + +func (sst sstEvent) futureMemUsage() int64 { + return rangefeedEventMemUsage(sstRangeFeedEvent(sst.data, sst.span, sst.ts)) +} + +func (sync syncEvent) memUsage() int64 { + return syncEventOverhead +} + +func (sync syncEvent) futureMemUsage() int64 { + return 0 +} + +func (ops opsEvent) futureMemUsage( + ctx context.Context, span roachpb.RSpan, rts resolvedTimestamp, +) int64 { + futureMemUsage := int64(0) + for _, op := range ops { + switch t := op.GetValue().(type) { + case *enginepb.MVCCWriteValueOp: + futureMemUsage += rangefeedEventMemUsage(valueRangeFeedEvent(t.Key, t.Timestamp, t.Value, t.PrevValue)) + case *enginepb.MVCCDeleteRangeOp: + futureMemUsage += rangefeedEventMemUsage(deleteRangeFeedEvent(t.StartKey, t.EndKey, t.Timestamp)) + case *enginepb.MVCCCommitIntentOp: + futureMemUsage += rangefeedEventMemUsage(valueRangeFeedEvent(t.Key, t.Timestamp, t.Value, t.PrevValue)) + } + if rts.ConsumeLogicalOp(ctx, op) { + futureMemUsage += rangefeedEventMemUsage(checkpointRangeFeedEvent(span, rts)) + } + } + return futureMemUsage +} + +func (ops opsEvent) memUsage() int64 { + currMemUsage := mvccLogicalOp * int64(cap(ops)) + for _, op := range ops { + currMemUsage += int64(op.Size()) + switch op.GetValue().(type) { + case *enginepb.MVCCWriteValueOp: + currMemUsage += mvccWriteValueOp + case *enginepb.MVCCDeleteRangeOp: + currMemUsage += mvccDeleteRangeOp + case *enginepb.MVCCWriteIntentOp: + currMemUsage += mvccWriteIntentOp + case *enginepb.MVCCUpdateIntentOp: + currMemUsage += mvccUpdateIntentOp + case *enginepb.MVCCCommitIntentOp: + currMemUsage += mvccCommitIntentOp + case *enginepb.MVCCAbortIntentOp: + currMemUsage += mvccAbortIntentOp + case *enginepb.MVCCAbortTxnOp: + currMemUsage += mvccAbortTxnOp + } + } + return currMemUsage +} + +func rangefeedEventMemUsage(re kvpb.RangeFeedEvent) int64 { + memUsage := futureEventBaseOverhead + switch re.GetValue().(type) { + case *kvpb.RangeFeedValue: + memUsage += rangefeedValueOverhead + int64(re.Size()) + case *kvpb.RangeFeedDeleteRange: + memUsage += rangefeedDeleteRangeOverhead + int64(re.Size()) + case *kvpb.RangeFeedSSTable: + memUsage += rangefeedSSTTableOverhead + int64(re.Size()) + case *kvpb.RangeFeedCheckpoint: + memUsage += rangefeedCheckpointOverhead + int64(re.Size()) + } + return memUsage +} + +func (e *event) MemUsage() int64 { + currMemUsage := eventOverhead + switch { + case e.ops != nil: + currMemUsage += e.ops.memUsage() + case !e.ct.IsEmpty(): + currMemUsage += e.ct.memUsage() + case bool(e.initRTS): + currMemUsage += e.initRTS.memUsage() + case e.sst != nil: + currMemUsage += e.sst.memUsage() + case e.sync != nil: + currMemUsage += e.sync.memUsage() + } + return currMemUsage +} + +func (e *event) FutureMemUsage( + ctx context.Context, span roachpb.RSpan, rts resolvedTimestamp, +) int64 { + switch { + case e.ops != nil: + return e.ops.futureMemUsage(ctx, span, rts) + case !e.ct.IsEmpty(): + return e.ct.futureMemUsage(span, rts) + case bool(e.initRTS): + // no current extra memory usage + // may publish checkpoint but we overaccount for now and release later on right after we know we dont need it. + return e.initRTS.futureMemUsage(span, rts) + case e.sst != nil: + return e.sst.futureMemUsage() + case e.sync != nil: + return e.sync.futureMemUsage() + } + return 0 +} diff --git a/pkg/kv/kvserver/rangefeed/event_size_test.go b/pkg/kv/kvserver/rangefeed/event_size_test.go new file mode 100644 index 000000000000..8145b349da0d --- /dev/null +++ b/pkg/kv/kvserver/rangefeed/event_size_test.go @@ -0,0 +1,447 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed + +import ( + "context" + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/testutils/storageutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/stretchr/testify/require" +) + +type kvs = storageutils.KVs + +var ( + pointKV = storageutils.PointKV + rangeKV = storageutils.RangeKV +) +var ( + testKey = roachpb.Key("/db1") + testTxnID = uuid.MakeV4() + testIsolationLevel = isolation.Serializable + testTxnTS = hlc.Timestamp{WallTime: 10, Logical: 4} + testNewClosedTs = hlc.Timestamp{WallTime: 15} + testRSpan = roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")} + testSpan = roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("z")} + testTs = hlc.Timestamp{WallTime: 1} + testStartKey = roachpb.Key("a") + testEndKey = roachpb.Key("z") + testValue = []byte("1") + sstKVs = kvs{ + pointKV("a", 1, "1"), + pointKV("b", 1, "2"), + pointKV("c", 1, "3"), + rangeKV("d", "e", 1, ""), + } +) + +func writeValueOpEvent() ( + op enginepb.MVCCLogicalOp, + expectedMemUsage int64, + expectedFutureMemUsage int64, +) { + op = makeLogicalOp(&enginepb.MVCCWriteValueOp{ + Key: testKey, + Timestamp: testTxnTS, + }) + var futureEvent kvpb.RangeFeedEvent + futureEvent.MustSetValue(&kvpb.RangeFeedValue{ + Key: testKey, + Value: roachpb.Value{ + Timestamp: testTxnTS, + }, + }) + expectedMemUsage += mvccLogicalOp + mvccWriteValueOp + int64(op.Size()) + expectedFutureMemUsage += futureEventBaseOverhead + rangefeedValueOverhead + int64(futureEvent.Size()) + return op, expectedMemUsage, expectedFutureMemUsage +} + +func deleteRangeOpEvent() ( + op enginepb.MVCCLogicalOp, + expectedMemUsage int64, + expectedFutureMemUsage int64, +) { + op = makeLogicalOp(&enginepb.MVCCDeleteRangeOp{ + StartKey: testStartKey, + EndKey: testEndKey, + Timestamp: testTs, + }) + var futureEvent kvpb.RangeFeedEvent + futureEvent.MustSetValue(&kvpb.RangeFeedDeleteRange{ + Span: roachpb.Span{Key: testStartKey, EndKey: testEndKey}, + Timestamp: testTs, + }) + expectedMemUsage += mvccLogicalOp + mvccDeleteRangeOp + int64(op.Size()) + expectedFutureMemUsage += futureEventBaseOverhead + rangefeedDeleteRangeOverhead + int64(futureEvent.Size()) + return op, expectedMemUsage, expectedFutureMemUsage +} + +func writeIntentOpEvent() ( + op enginepb.MVCCLogicalOp, + expectedMemUsage int64, + expectedFutureMemUsage int64, +) { + fmt.Println(testTxnID) + op = makeLogicalOp(&enginepb.MVCCWriteIntentOp{ + TxnID: testTxnID, + TxnKey: testKey, + TxnIsoLevel: testIsolationLevel, + Timestamp: testTxnTS, + }) + expectedMemUsage += mvccLogicalOp + mvccWriteIntentOp + int64(op.Size()) + // no future event to publish + return op, expectedMemUsage, 0 +} + +func updateIntentOpEvent() ( + op enginepb.MVCCLogicalOp, + expectedMemUsage int64, + expectedFutureMemUsage int64, +) { + fmt.Println(testTxnID) + op = makeLogicalOp(&enginepb.MVCCUpdateIntentOp{ + TxnID: testTxnID, + Timestamp: hlc.Timestamp{Logical: 3}, + }) + expectedMemUsage += mvccLogicalOp + mvccUpdateIntentOp + int64(op.Size()) + // no future event to publish + return op, expectedMemUsage, 0 +} + +func commitIntentOpEvent() ( + op enginepb.MVCCLogicalOp, + expectedMemUsage int64, + expectedFutureMemUsage int64, +) { + op = makeLogicalOp(&enginepb.MVCCCommitIntentOp{ + Key: testKey, + Timestamp: hlc.Timestamp{Logical: 4}, + PrevValue: testValue, + }) + + var prevVal roachpb.Value + if op.CommitIntent.PrevValue != nil { + prevVal.RawBytes = op.CommitIntent.PrevValue + } + var futureEvent kvpb.RangeFeedEvent + futureEvent.MustSetValue(&kvpb.RangeFeedValue{ + Key: testKey, + Value: roachpb.Value{ + Timestamp: op.CommitIntent.Timestamp, + }, + PrevValue: prevVal, + }) + + expectedMemUsage += mvccLogicalOp + mvccCommitIntentOp + int64(op.Size()) + expectedFutureMemUsage += futureEventBaseOverhead + rangefeedValueOverhead + int64(futureEvent.Size()) + return op, expectedMemUsage, expectedFutureMemUsage +} + +func abortIntentOpEvent() ( + op enginepb.MVCCLogicalOp, + expectedMemUsage int64, + expectedFutureMemUsage int64, +) { + fmt.Println(testTxnID) + op = makeLogicalOp(&enginepb.MVCCAbortIntentOp{ + TxnID: testTxnID, + }) + expectedMemUsage += mvccLogicalOp + mvccAbortIntentOp + int64(op.Size()) + // no future event to publish + return op, expectedMemUsage, 0 +} + +func abortTxnOpEvent() ( + op enginepb.MVCCLogicalOp, + expectedMemUsage int64, + expectedFutureMemUsage int64, +) { + fmt.Println(testTxnID) + op = makeLogicalOp(&enginepb.MVCCAbortTxnOp{ + TxnID: testTxnID, + }) + expectedMemUsage += mvccLogicalOp + mvccAbortTxnOp + int64(op.Size()) + // no future event to publish + return op, expectedMemUsage, 0 +} + +func generateLogicalOpEvent( + typesOfOps string, span roachpb.RSpan, rts resolvedTimestamp, +) (ev event, expectedMemUsage int64, expectedFutureMemUsage int64) { + var op enginepb.MVCCLogicalOp + var mem, futureMem int64 + switch typesOfOps { + case "write_value": + op, mem, futureMem = writeValueOpEvent() + case "delete_range": + op, mem, futureMem = deleteRangeOpEvent() + case "write_intent": + op, mem, futureMem = writeIntentOpEvent() + case "update_intent": + op, mem, futureMem = updateIntentOpEvent() + case "commit_intent": + op, mem, futureMem = commitIntentOpEvent() + case "abort_intent": + op, mem, futureMem = abortIntentOpEvent() + case "abort_txn": + op, mem, futureMem = abortTxnOpEvent() + } + + ev = event{ + ops: []enginepb.MVCCLogicalOp{op}, + } + expectedMemUsage += eventOverhead + mem + expectedFutureMemUsage += futureMem + if rts.ConsumeLogicalOp(context.Background(), op) { + ce := checkpointEvent(span, rts) + expectedFutureMemUsage += futureEventBaseOverhead + rangefeedCheckpointOverhead + int64(ce.Size()) + } + return +} + +func checkpointEvent(span roachpb.RSpan, rts resolvedTimestamp) kvpb.RangeFeedEvent { + var event kvpb.RangeFeedEvent + event.MustSetValue(&kvpb.RangeFeedCheckpoint{ + Span: span.AsRawSpanWithNoLocals(), + ResolvedTS: rts.Get(), + }) + return event +} + +func generateCtEvent( + span roachpb.RSpan, rts resolvedTimestamp, ctEventTimestamp hlc.Timestamp, +) (ev event, expectedMemUsage int64, expectedFutureMemUsage int64) { + ev = event{ + ct: ctEvent{ + Timestamp: ctEventTimestamp, + }, + } + expectedMemUsage += eventOverhead + if rts.ForwardClosedTS(context.Background(), ctEventTimestamp) { + ce := checkpointEvent(span, rts) + expectedFutureMemUsage += futureEventBaseOverhead + rangefeedCheckpointOverhead + int64(ce.Size()) + } + return +} + +func generateInitRTSEvent( + span roachpb.RSpan, rts resolvedTimestamp, +) (ev event, expectedMemUsage int64, expectedFutureMemUsage int64) { + ev = event{ + initRTS: true, + } + expectedMemUsage += eventOverhead + if rts.Init(context.Background()) { + ce := checkpointEvent(span, rts) + expectedFutureMemUsage += futureEventBaseOverhead + rangefeedCheckpointOverhead + int64(ce.Size()) + } + return +} + +func generateSSTEvent( + data []byte, +) (ev event, expectedMemUsage int64, expectedFutureMemUsage int64) { + ev = event{ + sst: &sstEvent{ + data: data, + span: testSpan, + ts: testTs, + }, + } + + var futureEvent kvpb.RangeFeedEvent + futureEvent.MustSetValue(&kvpb.RangeFeedSSTable{ + Data: data, + Span: testSpan, + WriteTS: testTs, + }) + expectedMemUsage += eventOverhead + sstEventOverhead + int64(cap(data)+cap(testSpan.Key)+cap(testSpan.EndKey)) + expectedFutureMemUsage += futureEventBaseOverhead + rangefeedSSTTableOverhead + int64(futureEvent.Size()) + return +} + +func generateSyncEvent() (ev event, expectedMemUsage int64, expectedFutureMemUsage int64) { + ev = event{ + sync: &syncEvent{c: make(chan struct{})}, + } + expectedMemUsage += eventOverhead + syncEventOverhead + return +} + +func TestBasicEventSizeCalculation(t *testing.T) { + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + + t.Run("empty_event", func(t *testing.T) { + rts := makeResolvedTimestamp(st) + rts.Init(ctx) + ev := event{} + require.Equal(t, eventOverhead, ev.MemUsage()) + require.Equal(t, int64(0), ev.FutureMemUsage(ctx, testRSpan, rts)) + }) + + t.Run("write_value event", func(t *testing.T) { + rts := makeResolvedTimestamp(st) + rts.Init(ctx) + ev, expectedMemUsage, expectedFutureMemUsage := generateLogicalOpEvent("write_value", testRSpan, rts) + require.Equal(t, expectedMemUsage, ev.MemUsage()) + require.Equal(t, expectedFutureMemUsage, ev.FutureMemUsage(ctx, testRSpan, rts)) + }) + + t.Run("delete_range event", func(t *testing.T) { + rts := makeResolvedTimestamp(st) + rts.Init(ctx) + ev, expectedMemUsage, expectedFutureMemUsage := generateLogicalOpEvent("delete_range", testRSpan, rts) + require.Equal(t, expectedMemUsage, ev.MemUsage()) + require.Equal(t, expectedFutureMemUsage, ev.FutureMemUsage(ctx, testRSpan, rts)) + }) + + t.Run("write_intent event", func(t *testing.T) { + rts := makeResolvedTimestamp(st) + rts.Init(ctx) + ev, expectedMemUsage, expectedFutureMemUsage := generateLogicalOpEvent("write_intent", testRSpan, rts) + require.Equal(t, expectedMemUsage, ev.MemUsage()) + require.Equal(t, expectedFutureMemUsage, ev.FutureMemUsage(ctx, testRSpan, rts)) + }) + + t.Run("update_intent event", func(t *testing.T) { + rts := makeResolvedTimestamp(st) + rts.Init(ctx) + ev, expectedMemUsage, expectedFutureMemUsage := generateLogicalOpEvent("update_intent", testRSpan, rts) + require.Equal(t, expectedMemUsage, ev.MemUsage()) + require.Equal(t, expectedFutureMemUsage, ev.FutureMemUsage(ctx, testRSpan, rts)) + }) + + t.Run("commit_intent event", func(t *testing.T) { + rts := makeResolvedTimestamp(st) + rts.Init(ctx) + ev, expectedMemUsage, expectedFutureMemUsage := generateLogicalOpEvent("commit_intent", testRSpan, rts) + require.Equal(t, expectedMemUsage, ev.MemUsage()) + require.Equal(t, expectedFutureMemUsage, ev.FutureMemUsage(ctx, testRSpan, rts)) + }) + + t.Run("abort_intent event", func(t *testing.T) { + rts := makeResolvedTimestamp(st) + rts.Init(ctx) + ev, expectedMemUsage, expectedFutureMemUsage := generateLogicalOpEvent("abort_intent", testRSpan, rts) + require.Equal(t, expectedMemUsage, ev.MemUsage()) + require.Equal(t, expectedFutureMemUsage, ev.FutureMemUsage(ctx, testRSpan, rts)) + }) + + t.Run("abort_txn event", func(t *testing.T) { + rts := makeResolvedTimestamp(st) + rts.Init(ctx) + ev, expectedMemUsage, expectedFutureMemUsage := generateLogicalOpEvent("abort_txn", testRSpan, rts) + require.Equal(t, expectedMemUsage, ev.MemUsage()) + require.Equal(t, expectedFutureMemUsage, ev.FutureMemUsage(ctx, testRSpan, rts)) + }) + + t.Run("ct event", func(t *testing.T) { + rts := makeResolvedTimestamp(st) + rts.Init(ctx) + ev, expectedMemUsage, expectedFutureMemUsage := generateCtEvent(testRSpan, rts, testNewClosedTs) + require.Equal(t, expectedMemUsage, ev.MemUsage()) + require.Equal(t, expectedFutureMemUsage, ev.FutureMemUsage(ctx, testRSpan, rts)) + }) + + t.Run("initRTS event", func(t *testing.T) { + rts := makeResolvedTimestamp(st) + rts.Init(ctx) + generateLogicalOpEvent("write_intent", testRSpan, rts) + ev, expectedMemUsage, expectedFutureMemUsage := generateInitRTSEvent(testRSpan, rts) + require.Equal(t, expectedMemUsage, ev.MemUsage()) + require.Equal(t, expectedFutureMemUsage, ev.FutureMemUsage(ctx, testRSpan, rts)) + }) + + t.Run("sst event", func(t *testing.T) { + rts := makeResolvedTimestamp(st) + rts.Init(ctx) + sst, _, _ := storageutils.MakeSST(t, st, sstKVs) + ev, expectedMemUsage, expectedFutureMemUsage := generateSSTEvent(sst) + require.Equal(t, expectedMemUsage, ev.MemUsage()) + require.Equal(t, expectedFutureMemUsage, ev.FutureMemUsage(ctx, testRSpan, rts)) + }) + + t.Run("sync event", func(t *testing.T) { + rts := makeResolvedTimestamp(st) + rts.Init(ctx) + ev, expectedMemUsage, expectedFutureMemUsage := generateSyncEvent() + require.Equal(t, expectedMemUsage, ev.MemUsage()) + require.Equal(t, expectedFutureMemUsage, ev.FutureMemUsage(ctx, testRSpan, rts)) + }) +} + +func TestEventSeriesSizeCalculation(t *testing.T) { + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + + t.Run("series of events", func(t *testing.T) { + events := []event{} + expectedMemUsage, expectedFutureMemUsage := int64(0), int64(0) + rts := makeResolvedTimestamp(st) + rts.Init(ctx) + + // Set a new closed timestamp. Resolved timestamp advances. + ev, memUsage, futureMemUsage := generateCtEvent(testRSpan, rts, hlc.Timestamp{WallTime: 5}) + // We pass by value when calling generateCtEvent so that testRSpan and rts + // are not modified by the function. Assert that. + require.Equal(t, hlc.Timestamp{}, rts.Get()) + rts.ForwardClosedTS(ctx, hlc.Timestamp{WallTime: 5}) + expectedMemUsage += memUsage + expectedFutureMemUsage += futureMemUsage + events = append(events, ev) + + // Add an intent for a new transaction. + ev, memUsage, futureMemUsage = generateLogicalOpEvent("write_intent", testRSpan, rts) + require.Equal(t, len(ev.ops), 1) + rts.ConsumeLogicalOp(ctx, ev.ops[0]) + require.Equal(t, hlc.Timestamp{WallTime: 5}, rts.Get()) + expectedMemUsage += memUsage + expectedFutureMemUsage += futureMemUsage + events = append(events, ev) + + // Set a new closed timestamp and do not advance resolved ts. + ev, memUsage, futureMemUsage = generateCtEvent(testRSpan, rts, hlc.Timestamp{WallTime: 15}) + rts.ForwardClosedTS(ctx, hlc.Timestamp{WallTime: 5}) + require.Equal(t, hlc.Timestamp{WallTime: 5}, rts.Get()) + expectedMemUsage += memUsage + expectedFutureMemUsage += futureMemUsage + events = append(events, ev) + + // Abort the transaction. Force checkpoint event in ConsumeLogicalOp. + ev, memUsage, futureMemUsage = generateLogicalOpEvent("abort_txn", testRSpan, rts) + require.Equal(t, len(ev.ops), 1) + rts.ConsumeLogicalOp(ctx, ev.ops[0]) + require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get()) + expectedMemUsage += memUsage + expectedFutureMemUsage += futureMemUsage + events = append(events, ev) + + actualMemUsage := int64(0) + actualFutureMemUsage := int64(0) + for _, e := range events { + actualMemUsage += e.MemUsage() + actualFutureMemUsage += e.FutureMemUsage(ctx, testRSpan, rts) + } + + require.Equal(t, expectedMemUsage, actualMemUsage) + require.Equal(t, expectedFutureMemUsage, actualFutureMemUsage) + }) +} diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index cc4e408a9c05..3a55763e97d6 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -291,19 +291,3 @@ type syncEvent struct { // should be called from underneath a stopper task to ensure that the // engine has not been closed. type IntentScannerConstructor func() IntentScanner - -// calculateDateEventSize returns estimated size of the event that contain actual -// data. We only account for logical ops and sst's. Those events come from raft -// and are budgeted. Other events come from processor jobs and update timestamps -// we don't take them into account as they are supposed to be small and to avoid -// complexity of having multiple producers getting from budget. -func calculateDateEventSize(e event) int64 { - var size int64 - for _, op := range e.ops { - size += int64(op.Size()) - } - if e.sst != nil { - size += int64(len(e.sst.data)) - } - return size -} diff --git a/pkg/kv/kvserver/rangefeed/scheduled_processor.go b/pkg/kv/kvserver/rangefeed/scheduled_processor.go index f4a295d0ca8c..f85a63642629 100644 --- a/pkg/kv/kvserver/rangefeed/scheduled_processor.go +++ b/pkg/kv/kvserver/rangefeed/scheduled_processor.go @@ -128,7 +128,7 @@ func (p *ScheduledProcessor) Start( return err } } else { - p.initResolvedTS(p.taskCtx) + p.initResolvedTS(p.taskCtx, nil) } p.Metrics.RangeFeedProcessorsScheduler.Inc(1) @@ -448,7 +448,7 @@ func (p *ScheduledProcessor) enqueueEventInternal( // inserting value into channel. var alloc *SharedBudgetAllocation if p.MemBudget != nil { - size := calculateDateEventSize(e) + size := max(e.MemUsage(), e.FutureMemUsage(ctx, p.Span, p.rts)) if size > 0 { var err error // First we will try non-blocking fast path to allocate memory budget. @@ -638,9 +638,9 @@ func (p *ScheduledProcessor) consumeEvent(ctx context.Context, e *event) { case e.ops != nil: p.consumeLogicalOps(ctx, e.ops, e.alloc) case !e.ct.IsEmpty(): - p.forwardClosedTS(ctx, e.ct.Timestamp) + p.forwardClosedTS(ctx, e.ct.Timestamp, e.alloc) case bool(e.initRTS): - p.initResolvedTS(ctx) + p.initResolvedTS(ctx, e.alloc) case e.sst != nil: p.consumeSSTable(ctx, e.sst.data, e.sst.span, e.sst.ts, e.alloc) case e.sync != nil: @@ -699,7 +699,8 @@ func (p *ScheduledProcessor) consumeLogicalOps( // Determine whether the operation caused the resolved timestamp to // move forward. If so, publish a RangeFeedCheckpoint notification. if p.rts.ConsumeLogicalOp(ctx, op) { - p.publishCheckpoint(ctx) + // May publish checkpoint we are over accounting here -> we should have a separate token for publishcheckpoint maybe + p.publishCheckpoint(ctx, alloc) } } } @@ -714,15 +715,17 @@ func (p *ScheduledProcessor) consumeSSTable( p.publishSSTable(ctx, sst, sstSpan, sstWTS, alloc) } -func (p *ScheduledProcessor) forwardClosedTS(ctx context.Context, newClosedTS hlc.Timestamp) { +func (p *ScheduledProcessor) forwardClosedTS( + ctx context.Context, newClosedTS hlc.Timestamp, alloc *SharedBudgetAllocation, +) { if p.rts.ForwardClosedTS(ctx, newClosedTS) { - p.publishCheckpoint(ctx) + p.publishCheckpoint(ctx, alloc) } } -func (p *ScheduledProcessor) initResolvedTS(ctx context.Context) { +func (p *ScheduledProcessor) initResolvedTS(ctx context.Context, alloc *SharedBudgetAllocation) { if p.rts.Init(ctx) { - p.publishCheckpoint(ctx) + p.publishCheckpoint(ctx, alloc) } } @@ -786,21 +789,21 @@ func (p *ScheduledProcessor) publishSSTable( if sstWTS.IsEmpty() { log.Fatalf(ctx, "received SSTable without write timestamp") } - p.reg.PublishToOverlapping(ctx, sstSpan, &kvpb.RangeFeedEvent{ - SST: &kvpb.RangeFeedSSTable{ - Data: sst, - Span: sstSpan, - WriteTS: sstWTS, - }, - }, false /* omitInRangefeeds */, alloc) + var event kvpb.RangeFeedEvent + event.MustSetValue(&kvpb.RangeFeedSSTable{ + Data: sst, + Span: sstSpan, + WriteTS: sstWTS, + }) + p.reg.PublishToOverlapping(ctx, sstSpan, &event, false /* omitInRangefeeds */, alloc) } -func (p *ScheduledProcessor) publishCheckpoint(ctx context.Context) { +func (p *ScheduledProcessor) publishCheckpoint(ctx context.Context, alloc *SharedBudgetAllocation) { // TODO(nvanbenschoten): persist resolvedTimestamp. Give Processor a client.DB. // TODO(nvanbenschoten): rate limit these? send them periodically? event := p.newCheckpointEvent() - p.reg.PublishToOverlapping(ctx, all, event, false /* omitInRangefeeds */, nil) + p.reg.PublishToOverlapping(ctx, all, event, false /* omitInRangefeeds */, alloc) } func (p *ScheduledProcessor) newCheckpointEvent() *kvpb.RangeFeedEvent {