diff --git a/pkg/kv/kvserver/rangefeed/BUILD.bazel b/pkg/kv/kvserver/rangefeed/BUILD.bazel index 26820fc9d802..34a59e2dd238 100644 --- a/pkg/kv/kvserver/rangefeed/BUILD.bazel +++ b/pkg/kv/kvserver/rangefeed/BUILD.bazel @@ -5,6 +5,7 @@ go_library( srcs = [ "budget.go", "catchup_scan.go", + "event_size.go", "filter.go", "metrics.go", "processor.go", @@ -56,6 +57,7 @@ go_test( "budget_test.go", "catchup_scan_bench_test.go", "catchup_scan_test.go", + "event_size_test.go", "processor_test.go", "registry_test.go", "resolved_timestamp_test.go", @@ -71,11 +73,16 @@ go_test( "//pkg/kv/kvserver/concurrency/lock", "//pkg/roachpb", "//pkg/settings/cluster", + "//pkg/sql/randgen", + "//pkg/sql/rowenc/keyside", + "//pkg/sql/sem/tree", + "//pkg/sql/types", "//pkg/storage", "//pkg/storage/enginepb", "//pkg/storage/fs", "//pkg/testutils", "//pkg/testutils/skip", + "//pkg/testutils/storageutils", "//pkg/util", "//pkg/util/encoding", "//pkg/util/future", diff --git a/pkg/kv/kvserver/rangefeed/event_size.go b/pkg/kv/kvserver/rangefeed/event_size.go new file mode 100644 index 000000000000..5050008406ee --- /dev/null +++ b/pkg/kv/kvserver/rangefeed/event_size.go @@ -0,0 +1,269 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed + +import ( + "context" + "unsafe" + + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/uuid" +) + +const ( + mvccLogicalOp = int64(unsafe.Sizeof(enginepb.MVCCLogicalOp{})) + mvccWriteValueOp = int64(unsafe.Sizeof(enginepb.MVCCWriteValueOp{})) + mvccDeleteRangeOp = int64(unsafe.Sizeof(enginepb.MVCCDeleteRangeOp{})) + mvccWriteIntentOp = int64(unsafe.Sizeof(enginepb.MVCCWriteIntentOp{})) + mvccUpdateIntentOp = int64(unsafe.Sizeof(enginepb.MVCCUpdateIntentOp{})) + mvccCommitIntentOp = int64(unsafe.Sizeof(enginepb.MVCCCommitIntentOp{})) + mvccAbortIntentOp = int64(unsafe.Sizeof(enginepb.MVCCAbortIntentOp{})) + mvccAbortTxnOp = int64(unsafe.Sizeof(enginepb.MVCCAbortTxnOp{})) + + eventPtrOverhead = int64(unsafe.Sizeof(&event{})) + eventOverhead = int64(unsafe.Sizeof(&event{})) + int64(unsafe.Sizeof(event{})) + + sstEventOverhead = int64(unsafe.Sizeof(sstEvent{})) + syncEventOverhead = int64(unsafe.Sizeof(syncEvent{})) + + // futureEventBaseOverhead accounts for the base struct overhead of + // sharedEvent{} and its pointer. Each sharedEvent contains a + // *kvpb.RangeFeedEvent and *SharedBudgetAllocation. futureEventBaseOverhead + // also accounts for the underlying base struct memory of RangeFeedEvent and + // SharedBudgetAllocation. Underlying data for SharedBudgetAllocation includes + // a pointer to the FeedBudget, but that points to the same data structure + // across all rangefeeds, so we opted out in the calculation. + sharedEventPtrOverhead = int64(unsafe.Sizeof(&sharedEvent{})) + sharedEventOverhead = int64(unsafe.Sizeof(sharedEvent{})) + rangeFeedEventOverhead = int64(unsafe.Sizeof(kvpb.RangeFeedEvent{})) + allocEventOverhead = int64(unsafe.Sizeof(SharedBudgetAllocation{})) + futureEventBaseOverhead = sharedEventPtrOverhead + sharedEventOverhead + rangeFeedEventOverhead + allocEventOverhead + + rangefeedValueOverhead = int64(unsafe.Sizeof(kvpb.RangeFeedValue{})) + rangefeedDeleteRangeOverhead = int64(unsafe.Sizeof(kvpb.RangeFeedDeleteRange{})) + rangefeedCheckpointOverhead = int64(unsafe.Sizeof(kvpb.RangeFeedCheckpoint{})) + rangefeedSSTTableOverhead = int64(unsafe.Sizeof(kvpb.RangeFeedSSTable{})) +) + +// No future memory usages have been accounted so far. +// rangefeedSSTTableOpMemUsage accounts for the entire memory usage of a new +// RangeFeedSSTable event. +func rangefeedSSTTableOpMemUsage(data []byte, startKey, endKey roachpb.Key) int64 { + // Pointer to RangeFeedSSTable has already been accounted in + // futureEventBaseOverhead as part of the base struct overhead of + // RangeFeedEvent. rangefeedValueOverhead includes the memory usage of the + // underlying RangeFeedSSTable base struct. + memUsage := futureEventBaseOverhead + rangefeedSSTTableOverhead + + // RangeFeedSSTable has Data, Span{startKey,endKey}, and WriteTS. Only Data, + // startKey, and endKey has underlying memory usage in []byte. Timestamp and + // other base structs of Value have no underlying data and are already + // accounted in rangefeedValueOverhead. + memUsage += int64(cap(data)) + memUsage += int64(cap(startKey)) + memUsage += int64(cap(endKey)) + return memUsage +} + +// No future memory usages have been accounted so far. +// rangefeedCheckpointOpMemUsage accounts for the entire memory usage of a new +// RangeFeedCheckpoint event. +func rangefeedCheckpointOpMemUsage() int64 { + // Pointer to RangeFeedCheckpoint has already been accounted in + // futureEventBaseOverhead as part of the base struct overhead of + // RangeFeedEvent. rangefeedCheckpointOverhead includes the memory usage of + // the underlying RangeFeedCheckpoint base struct. + + // RangeFeedCheckpoint has Span{p.Span} and Timestamp{rts.Get()}. Timestamp is + // already accounted in rangefeedCheckpointOverhead. Ignore bytes under + // checkpoint.span here since it comes from p.Span which always points at the + // same underlying data. + return futureEventBaseOverhead + rangefeedCheckpointOverhead +} + +// Pointer to the MVCCWriteValueOp was already accounted in mvccLogicalOp in the +// caller. writeValueOpMemUsage accounts for the memory usage of +// MVCCWriteValueOp. +func writeValueOpMemUsage(key roachpb.Key, value, prevValue []byte) int64 { + // MVCCWriteValueOp has Key, Timestamp, Value, PrevValue, OmitInRangefeeds. + // Only key, value, and prevValue has underlying memory usage in []byte. + // Timestamp and OmitInRangefeeds have no underlying data and are already + // accounted in MVCCWriteValueOp. + currMemUsage := mvccWriteValueOp + currMemUsage += int64(cap(key)) + currMemUsage += int64(cap(value)) + currMemUsage += int64(cap(prevValue)) + return currMemUsage +} + +// Pointer to the MVCCDeleteRangeOp was already accounted in mvccLogicalOp in +// the caller. deleteRangeOpMemUsage accounts for the memory usage of +// MVCCDeleteRangeOp. +func deleteRangeOpMemUsage(startKey, endKey roachpb.Key) int64 { + // MVCCDeleteRangeOp has StartKey, EndKey, and Timestamp. Only StartKey and + // EndKey has underlying memory usage in []byte. Timestamp has no underlying + // data and was already accounted in MVCCDeleteRangeOp. + currMemUsage := mvccDeleteRangeOp + currMemUsage += int64(cap(startKey)) + currMemUsage += int64(cap(endKey)) + return currMemUsage +} + +// Pointer to the MVCCWriteIntentOp was already accounted in mvccLogicalOp in +// the caller. writeIntentOpMemUsage accounts for the memory usage of +// MVCCWriteIntentOp. +func writeIntentOpMemUsage(txnID uuid.UUID, txnKey []byte) int64 { + // MVCCWriteIntentOp has TxnID, TxnKey, TxnIsoLevel, TxnMinTimestamp, and + // Timestamp. Only TxnID and TxnKey has underlying memory usage in []byte. + // TxnIsoLevel, TxnMinTimestamp, and Timestamp have no underlying data and was + // already accounted in MVCCWriteIntentOp. + currMemUsage := mvccWriteIntentOp + currMemUsage += int64(cap(txnID)) + currMemUsage += int64(cap(txnKey)) + return currMemUsage +} + +// Pointer to the MVCCUpdateIntentOp was already accounted in mvccLogicalOp in +// the caller. updateIntentOpMemUsage accounts for the memory usage of +// MVCCUpdateIntentOp. +func updateIntentOpMemUsage(txnID uuid.UUID) int64 { + // MVCCUpdateIntentOp has TxnID and Timestamp. Only TxnID has underlying + // memory usage in []byte. Timestamp has no underlying data and was already + // accounted in MVCCUpdateIntentOp. + currMemUsage := mvccUpdateIntentOp + currMemUsage += int64(cap(txnID)) + return currMemUsage +} + +// Pointer to the MVCCCommitIntentOp was already accounted in mvccLogicalOp in +// the caller. commitIntentOpMemUsage accounts for the memory usage of +// MVCCCommitIntentOp. +func commitIntentOpMemUsage(txnID uuid.UUID, key []byte, value []byte, prevValue []byte) int64 { + // MVCCCommitIntentOp has TxnID, Key, Timestamp, Value, PrevValue, and + // OmintInRangefeeds. Only TxnID, Key, Value, and PrevValue has underlying + // memory usage in []byte. Timestamp and OmintInRangefeeds have no underlying + // data and was already accounted in MVCCCommitIntentOp. + currMemUsage := mvccCommitIntentOp + currMemUsage += int64(cap(txnID)) + currMemUsage += int64(cap(key)) + currMemUsage += int64(cap(value)) + currMemUsage += int64(cap(prevValue)) + return currMemUsage +} + +// Pointer to the MVCCAbortIntentOp was already accounted in mvccLogicalOp in +// the caller. abortIntentOpMemUsage accounts for the memory usage of +// MVCCAbortIntentOp. +func abortIntentOpMemUsage(txnID uuid.UUID) int64 { + // MVCCAbortIntentOp has TxnID which has underlying memory usage in []byte. + currMemUsage := mvccAbortIntentOp + currMemUsage += int64(cap(txnID)) + return currMemUsage +} + +// Pointer to the MVCCAbortTxnOp was already accounted in mvccLogicalOp in the +// caller. abortTxnOpMemUsage accounts for the memory usage of MVCCAbortTxnOp. +func abortTxnOpMemUsage(txnID uuid.UUID) int64 { + // MVCCAbortTxnOp has TxnID which has underlying memory usage in []byte. + currMemUsage := mvccAbortTxnOp + currMemUsage += int64(cap(txnID)) + return currMemUsage +} + +// eventOverhead accounts for the base struct of event{} which only included +// pointer to syncEvent. currMemUsage accounts the base struct memory of +// syncEvent{} and its underlying memory. +func (sync syncEvent) currMemUsage() int64 { + // syncEvent has a channel and a pointer to testRegCatchupSpan. + // testRegCatchupSpan is never set in production. Channel sent is always + // struct{}, so the memory has already been accounted in syncEventOverhead. + return syncEventOverhead +} + +// currMemUsage returns the current memory usage of the opsEvent including base +// structs overhead and underlying memory usage. +func (ops opsEvent) currMemUsage() int64 { + // currMemUsage: eventOverhead already accounts for slice overhead in + // opsEvent, []enginepb.MVCCLogicalOp. For each cap(ops), the underlying + // memory include a MVCCLogicalOp overhead. + currMemUsage := mvccLogicalOp * int64(cap(ops)) + for _, op := range ops { + switch t := op.GetValue().(type) { + // currMemUsage: for each op, the pointer to the op is already accounted in + // mvccLogicalOp. We now account for the underlying memory usage inside the + // op below. + case *enginepb.MVCCWriteValueOp: + currMemUsage += writeValueOpMemUsage(t.Key, t.Value, t.PrevValue) + case *enginepb.MVCCDeleteRangeOp: + currMemUsage += deleteRangeOpMemUsage(t.StartKey, t.EndKey) + case *enginepb.MVCCWriteIntentOp: + currMemUsage += writeIntentOpMemUsage(t.TxnID, t.TxnKey) + case *enginepb.MVCCUpdateIntentOp: + currMemUsage += updateIntentOpMemUsage(t.TxnID) + case *enginepb.MVCCCommitIntentOp: + currMemUsage += commitIntentOpMemUsage(t.TxnID, t.Key, t.Value, t.PrevValue) + case *enginepb.MVCCAbortIntentOp: + currMemUsage += abortIntentOpMemUsage(t.TxnID) + case *enginepb.MVCCAbortTxnOp: + currMemUsage += abortTxnOpMemUsage(t.TxnID) + default: + log.Fatalf(context.Background(), "unknown logical op %T", t) + } + } + // For each op, a checkpoint may or may not be published depending on whether + // the operation caused resolved timestamp to update. Since these are very + // rare, we disregard them to avoid the complexity. + return currMemUsage +} + +// MemUsage estimates the total memory usage of the event, including its +// underlying data. The memory usage is estimated in bytes. +func (e *event) MemUsage() int64 { + if e == nil { + // For nil event, only eventPtrOverhead is accounted. + return eventPtrOverhead + } + // currMemUsage: pointer to e is passed to p.eventC channel. Each e pointer is + // &event{}, and each pointer points at an underlying event{}. + switch { + case e.ops != nil: + // For logical ops events, current memory usage is usually larger than + // rangefeed events. Note that we assume no checkpoint events are caused by + // ops since they are pretty rare to avoid the complexity. + return eventOverhead + e.ops.currMemUsage() + case !e.ct.IsEmpty(): + // For ct event, rangefeed checkpoint event usually takes more memory than + // current memory usage. Note that we assume checkpoint event will happen. + // If it does not happen, the memory will be released soon after + // p.ConsumeEvent returns. + return rangefeedCheckpointOpMemUsage() + case bool(e.initRTS): + // For initRTS event, rangefeed checkpoint event usually takes more memory + // than current memory usage. Note that we assume checkpoint event will + // happen. If it does not happen, the memory will be released soon after + // p.ConsumeEvent returns. + return rangefeedCheckpointOpMemUsage() + case e.sst != nil: + // For sst event, rangefeed event usually takes more memory than current + // memory usage. + return rangefeedSSTTableOpMemUsage(e.sst.data, e.sst.span.Key, e.sst.span.EndKey) + case e.sync != nil: + // For sync event, no rangefeed events will be published. + return eventOverhead + e.sync.currMemUsage() + default: + log.Fatalf(context.Background(), "missing event variant: %+v", e) + } + // For empty event, only eventOverhead is accounted. + return eventOverhead +} diff --git a/pkg/kv/kvserver/rangefeed/event_size_test.go b/pkg/kv/kvserver/rangefeed/event_size_test.go new file mode 100644 index 000000000000..a86232f7e133 --- /dev/null +++ b/pkg/kv/kvserver/rangefeed/event_size_test.go @@ -0,0 +1,586 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed + +import ( + "math/rand" + "testing" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/randgen" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc/keyside" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/testutils/storageutils" + "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/stretchr/testify/require" +) + +type kvs = storageutils.KVs + +var ( + pointKV = storageutils.PointKV + rangeKV = storageutils.RangeKV +) +var ( + testKey = roachpb.Key("/db1") + testTxnID = uuid.MakeV4() + testIsolationLevel = isolation.Serializable + testSpan = roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("z")} + testTs = hlc.Timestamp{WallTime: 1} + testStartKey = roachpb.Key("a") + testEndKey = roachpb.Key("z") + testValue = []byte("1") + testSSTKVs = kvs{ + pointKV("a", 1, "1"), + pointKV("b", 1, "2"), + pointKV("c", 1, "3"), + rangeKV("d", "e", 1, ""), + } +) + +type testData struct { + numOfLogicalOps int + kvs []interface{} + span roachpb.Span + key roachpb.Key + timestamp hlc.Timestamp + value []byte + startKey, endKey roachpb.Key + txnID uuid.UUID + txnKey []byte + txnIsoLevel isolation.Level + txnMinTimestamp hlc.Timestamp + omitInRangefeeds bool +} + +func generateRandomizedTs(rand *rand.Rand) hlc.Timestamp { + // Avoid generating zero timestamp which will equal to an empty event. + return hlc.Timestamp{WallTime: int64(rand.Intn(100)) + 1} +} + +func generateRandomizedBytes(rand *rand.Rand) []byte { + const tableID = 42 + dataTypes := []*types.T{types.String, types.Int, types.Decimal, types.Bytes, types.Bool, types.Date, types.Timestamp, types.Float} + randType := dataTypes[rand.Intn(len(dataTypes))] + + key, err := keyside.Encode( + keys.SystemSQLCodec.TablePrefix(tableID), + randgen.RandDatumSimple(rand, randType), + encoding.Ascending, + ) + if err != nil { + panic(err) + } + return key +} + +func generateStartAndEndKey(rand *rand.Rand) (roachpb.Key, roachpb.Key) { + start := rand.Intn(2 << 20) + end := start + rand.Intn(2<<20) + startDatum := tree.NewDInt(tree.DInt(start)) + endDatum := tree.NewDInt(tree.DInt(end)) + const tableID = 42 + + startKey, err := keyside.Encode( + keys.SystemSQLCodec.TablePrefix(tableID), + startDatum, + encoding.Ascending, + ) + if err != nil { + panic(err) + } + + endKey, err := keyside.Encode( + keys.SystemSQLCodec.TablePrefix(tableID), + endDatum, + encoding.Ascending, + ) + if err != nil { + panic(err) + } + return startKey, endKey +} + +func generateRandomizedTxnId(rand *rand.Rand) uuid.UUID { + var txnID uuid.UUID + n := rand.Intn(100) + if n == 0 { + // rand.Intn(0) panics + n = 1 + } + i := rand.Intn(n) + txnID.DeterministicV4(uint64(i), uint64(n)) + return txnID +} + +func generateRandomizedSpan(rand *rand.Rand) roachpb.RSpan { + startKey, endKey := generateStartAndEndKey(rand) + return roachpb.RSpan{ + Key: roachpb.RKey(startKey), + EndKey: roachpb.RKey(endKey), + } +} + +func generateRandomTestData(rand *rand.Rand) testData { + startKey, endkey := generateStartAndEndKey(rand) + return testData{ + numOfLogicalOps: rand.Intn(100), + kvs: testSSTKVs, + span: generateRandomizedSpan(rand).AsRawSpanWithNoLocals(), + key: generateRandomizedBytes(rand), + timestamp: generateRandomizedTs(rand), + value: generateRandomizedBytes(rand), + startKey: startKey, + endKey: endkey, + txnID: generateRandomizedTxnId(rand), + txnKey: generateRandomizedBytes(rand), + txnIsoLevel: isolation.Levels()[rand.Intn(len(isolation.Levels()))], + txnMinTimestamp: generateRandomizedTs(rand), + omitInRangefeeds: rand.Intn(2) == 1, + } +} + +func writeValueOpEvent( + key roachpb.Key, timestamp hlc.Timestamp, value []byte, +) ( + op enginepb.MVCCLogicalOp, + futureEvent kvpb.RangeFeedEvent, + expectedMemUsage int64, + expectedFutureMemUsage int64, +) { + op = writeValueOpWithKV(key, timestamp, value) + futureEvent.MustSetValue(&kvpb.RangeFeedValue{ + Key: key, + Value: roachpb.Value{ + RawBytes: value, + Timestamp: timestamp, + }, + }) + expectedMemUsage += mvccWriteValueOp + int64(cap(key)) + int64(cap(value)) + expectedFutureMemUsage += futureEventBaseOverhead + rangefeedValueOverhead + int64(cap(key)) + int64(cap(value)) + return op, futureEvent, expectedMemUsage, expectedFutureMemUsage +} + +func deleteRangeOpEvent( + startKey, endKey roachpb.Key, timestamp hlc.Timestamp, +) ( + op enginepb.MVCCLogicalOp, + futureEvent kvpb.RangeFeedEvent, + expectedMemUsage int64, + expectedFutureMemUsage int64, +) { + op = deleteRangeOp(startKey, endKey, timestamp) + futureEvent.MustSetValue(&kvpb.RangeFeedDeleteRange{ + Span: roachpb.Span{Key: startKey, EndKey: endKey}, + Timestamp: testTs, + }) + expectedMemUsage += mvccDeleteRangeOp + int64(cap(startKey)) + int64(cap(endKey)) + expectedFutureMemUsage += futureEventBaseOverhead + rangefeedDeleteRangeOverhead + int64(cap(startKey)) + int64(cap(endKey)) + return op, futureEvent, expectedMemUsage, expectedFutureMemUsage +} + +func writeIntentOpEvent( + txnID uuid.UUID, + txnKey []byte, + txnIsoLevel isolation.Level, + txnMinTimestamp hlc.Timestamp, + timestamp hlc.Timestamp, +) ( + op enginepb.MVCCLogicalOp, + futureEvent kvpb.RangeFeedEvent, + expectedMemUsage int64, + expectedFutureMemUsage int64, +) { + op = writeIntentOpWithDetails(txnID, txnKey, txnIsoLevel, txnMinTimestamp, timestamp) + expectedMemUsage += mvccWriteIntentOp + int64(cap(txnID)) + int64(cap(txnKey)) + // No future event to publish. + return op, futureEvent, expectedMemUsage, 0 +} + +func updateIntentOpEvent( + txnID uuid.UUID, timestamp hlc.Timestamp, +) ( + op enginepb.MVCCLogicalOp, + futureEvent kvpb.RangeFeedEvent, + expectedMemUsage int64, + expectedFutureMemUsage int64, +) { + op = updateIntentOp(txnID, timestamp) + expectedMemUsage += mvccUpdateIntentOp + int64(cap(txnID)) + // No future event to publish. + return op, futureEvent, expectedMemUsage, 0 +} + +func commitIntentOpEvent( + txnID uuid.UUID, + key roachpb.Key, + timestamp hlc.Timestamp, + value, prevValue []byte, + omitInRangefeeds bool, +) ( + op enginepb.MVCCLogicalOp, + futureEvent kvpb.RangeFeedEvent, + expectedMemUsage int64, + expectedFutureMemUsage int64, +) { + op = commitIntentOpWithKV(txnID, key, timestamp, value, omitInRangefeeds) + + futureEvent.MustSetValue(&kvpb.RangeFeedValue{ + Key: key, + Value: roachpb.Value{ + RawBytes: value, + Timestamp: timestamp, + }, + }) + + expectedMemUsage += mvccCommitIntentOp + int64(cap(txnID)) + int64(cap(key)) + int64(cap(value)) + int64(cap(prevValue)) + expectedFutureMemUsage += futureEventBaseOverhead + rangefeedValueOverhead + int64(cap(key)) + int64(cap(value)) + return op, futureEvent, expectedMemUsage, expectedFutureMemUsage +} + +func abortIntentOpEvent( + txnID uuid.UUID, +) ( + op enginepb.MVCCLogicalOp, + futureEvent kvpb.RangeFeedEvent, + expectedMemUsage int64, + expectedFutureMemUsage int64, +) { + op = abortIntentOp(txnID) + expectedMemUsage += mvccAbortIntentOp + int64(cap(txnID)) + // No future event to publish. + return op, futureEvent, expectedMemUsage, 0 +} + +func abortTxnOpEvent( + txnID uuid.UUID, +) ( + op enginepb.MVCCLogicalOp, + futureEvent kvpb.RangeFeedEvent, + expectedMemUsage int64, + expectedFutureMemUsage int64, +) { + op = abortTxnOp(txnID) + expectedMemUsage += mvccAbortTxnOp + int64(cap(txnID)) + // No future event to publish. + return op, futureEvent, expectedMemUsage, 0 +} + +func generateLogicalOpEvents( + data testData, +) (ev event, expectedMemUsage int64, expectedFutureMemUsage int64) { + var ops []enginepb.MVCCLogicalOp + var mem, futureMem int64 + expectedMemUsage += eventOverhead + for i := 0; i < data.numOfLogicalOps; i++ { + var op enginepb.MVCCLogicalOp + switch i % 7 { + case 0: + op, _, mem, futureMem = writeValueOpEvent(data.key, data.timestamp, data.value) + case 1: + op, _, mem, futureMem = deleteRangeOpEvent(data.startKey, data.endKey, data.timestamp) + case 2: + op, _, mem, futureMem = writeIntentOpEvent(data.txnID, data.txnKey, data.txnIsoLevel, data.txnMinTimestamp, data.timestamp) + case 3: + op, _, mem, futureMem = updateIntentOpEvent(data.txnID, data.timestamp) + case 4: + op, _, mem, futureMem = commitIntentOpEvent(data.txnID, data.key, data.timestamp, data.value, nil, data.omitInRangefeeds) + case 5: + op, _, mem, futureMem = abortIntentOpEvent(data.txnID) + case 6: + op, _, mem, futureMem = abortTxnOpEvent(data.txnID) + } + ops = append(ops, op) + expectedMemUsage += mem + expectedFutureMemUsage += futureMem + } + ev = event{ops: ops} + expectedMemUsage += mvccLogicalOp * int64(cap(ev.ops)) + return ev, expectedMemUsage, expectedFutureMemUsage +} + +func generateOneLogicalOpEvent( + typesOfOps string, data testData, +) ( + ev event, + futureEvent kvpb.RangeFeedEvent, + expectedCurrMemUsage int64, + expectedFutureMemUsage int64, +) { + var op enginepb.MVCCLogicalOp + var mem, futureMem int64 + switch typesOfOps { + case "write_value": + op, futureEvent, mem, futureMem = writeValueOpEvent(data.key, data.timestamp, data.value) + case "delete_range": + op, futureEvent, mem, futureMem = deleteRangeOpEvent(data.startKey, data.endKey, data.timestamp) + case "write_intent": + op, futureEvent, mem, futureMem = writeIntentOpEvent(data.txnID, data.txnKey, data.txnIsoLevel, data.txnMinTimestamp, data.timestamp) + case "update_intent": + op, futureEvent, mem, futureMem = updateIntentOpEvent(data.txnID, data.timestamp) + case "commit_intent": + op, futureEvent, mem, futureMem = commitIntentOpEvent(data.txnID, data.key, data.timestamp, data.value, nil, data.omitInRangefeeds) + case "abort_intent": + op, futureEvent, mem, futureMem = abortIntentOpEvent(data.txnID) + case "abort_txn": + op, futureEvent, mem, futureMem = abortTxnOpEvent(data.txnID) + } + + ev = event{ + ops: []enginepb.MVCCLogicalOp{op}, + } + expectedCurrMemUsage += eventOverhead + mem + mvccLogicalOp + expectedFutureMemUsage += futureMem + return ev, futureEvent, expectedCurrMemUsage, expectedFutureMemUsage +} + +func generateCtEvent( + data testData, +) ( + ev event, + futureEvent kvpb.RangeFeedEvent, + expectedMemUsage int64, + expectedFutureMemUsage int64, +) { + ev = event{ + ct: ctEvent{ + Timestamp: data.timestamp, + }, + } + expectedMemUsage += eventOverhead + // Publish checkpoint event. + expectedFutureMemUsage += futureEventBaseOverhead + rangefeedCheckpointOverhead + return ev, futureEvent, expectedMemUsage, expectedFutureMemUsage +} + +func generateInitRTSEvent() ( + ev event, + futureEvent kvpb.RangeFeedEvent, + expectedMemUsage int64, + expectedFutureMemUsage int64, +) { + ev = event{ + initRTS: true, + } + expectedMemUsage += eventOverhead + // Publish checkpoint event. + expectedFutureMemUsage += futureEventBaseOverhead + rangefeedCheckpointOverhead + return ev, futureEvent, expectedMemUsage, expectedFutureMemUsage +} + +func generateSSTEvent( + t *testing.T, data testData, st *cluster.Settings, +) ( + ev event, + futureEvent kvpb.RangeFeedEvent, + expectedMemUsage int64, + expectedFutureMemUsage int64, +) { + sst, _, _ := storageutils.MakeSST(t, st, data.kvs) + ev = event{ + sst: &sstEvent{ + data: sst, + span: data.span, + ts: data.timestamp, + }, + } + + futureEvent.MustSetValue(&kvpb.RangeFeedSSTable{ + Data: sst, + Span: data.span, + WriteTS: data.timestamp, + }) + expectedMemUsage += eventOverhead + sstEventOverhead + int64(cap(sst)+cap(data.span.Key)+cap(data.span.EndKey)) + expectedFutureMemUsage += futureEventBaseOverhead + rangefeedSSTTableOverhead + int64(cap(sst)+cap(data.span.Key)+cap(data.span.EndKey)) + return ev, futureEvent, expectedMemUsage, expectedFutureMemUsage +} + +func generateSyncEvent() ( + ev event, + futureEvent kvpb.RangeFeedEvent, + expectedMemUsage int64, + expectedFutureMemUsage int64, +) { + ev = event{ + sync: &syncEvent{c: make(chan struct{})}, + } + expectedMemUsage += eventOverhead + syncEventOverhead + return +} + +func generateRandomizedEventAndSend( + rand *rand.Rand, +) (ev event, expectedMemUsage int64, randomlyChosenEvent string) { + // Opt out sst event since it requires testing.T to call + // storageutils.MakeSST(t, st, data.kvs) + typesOfEvents := []string{"logicalsOps", "ct", "initRTS", "sync"} + randomlyChosenEvent = typesOfEvents[rand.Intn(len(typesOfEvents))] + data := generateRandomTestData(rand) + switch randomlyChosenEvent { + case "logicalsOps": + e, mem, _ := generateLogicalOpEvents(data) + ev = e + expectedMemUsage = mem + case "ct": + e, _, _, futureEvent := generateCtEvent(data) + ev = e + expectedMemUsage = futureEvent + case "initRTS": + e, _, _, futureEvent := generateInitRTSEvent() + ev = e + expectedMemUsage = futureEvent + case "sync": + e, _, mem, _ := generateSyncEvent() + ev = e + expectedMemUsage = mem + } + return ev, expectedMemUsage, randomlyChosenEvent +} + +func generateStaticTestdata() testData { + return testData{ + kvs: testSSTKVs, + span: testSpan, + key: testKey, + timestamp: testTs, + value: testValue, + startKey: testStartKey, + endKey: testEndKey, + txnID: testTxnID, + txnKey: testKey, + txnIsoLevel: testIsolationLevel, + txnMinTimestamp: testTs, + omitInRangefeeds: false, + } +} + +func TestBasicEventSizeCalculation(t *testing.T) { + st := cluster.MakeTestingClusterSettings() + data := generateStaticTestdata() + + t.Run("write_value event", func(t *testing.T) { + ev, _, expectedCurrMemUsage, _ := generateOneLogicalOpEvent("write_value", data) + mem := ev.MemUsage() + require.Equal(t, expectedCurrMemUsage, mem) + }) + + t.Run("delete_range event", func(t *testing.T) { + ev, _, expectedCurrMemUsage, _ := generateOneLogicalOpEvent("delete_range", data) + mem := ev.MemUsage() + require.Equal(t, expectedCurrMemUsage, mem) + }) + + t.Run("write_intent event", func(t *testing.T) { + ev, _, expectedCurrMemUsage, _ := generateOneLogicalOpEvent("write_intent", data) + mem := ev.MemUsage() + require.Equal(t, expectedCurrMemUsage, mem) + }) + + t.Run("update_intent event", func(t *testing.T) { + ev, _, expectedCurrMemUsage, _ := generateOneLogicalOpEvent("update_intent", data) + mem := ev.MemUsage() + require.Equal(t, expectedCurrMemUsage, mem) + }) + + t.Run("commit_intent event", func(t *testing.T) { + ev, _, expectedCurrMemUsage, _ := generateOneLogicalOpEvent("commit_intent", data) + mem := ev.MemUsage() + require.Equal(t, expectedCurrMemUsage, mem) + }) + + t.Run("abort_intent event", func(t *testing.T) { + ev, _, expectedCurrMemUsage, _ := generateOneLogicalOpEvent("abort_intent", data) + mem := ev.MemUsage() + require.Equal(t, expectedCurrMemUsage, mem) + }) + + t.Run("abort_txn event", func(t *testing.T) { + ev, _, expectedCurrMemUsage, _ := generateOneLogicalOpEvent("abort_txn", data) + mem := ev.MemUsage() + require.Equal(t, expectedCurrMemUsage, mem) + }) + + t.Run("ct event", func(t *testing.T) { + ev, _, _, expectedFutureMemUsage := generateCtEvent(data) + mem := ev.MemUsage() + require.Equal(t, expectedFutureMemUsage, mem) + }) + + t.Run("initRTS event", func(t *testing.T) { + generateOneLogicalOpEvent("write_intent", data) + ev, _, _, expectedFutureMemUsage := generateInitRTSEvent() + mem := ev.MemUsage() + require.Equal(t, expectedFutureMemUsage, mem) + }) + + t.Run("sst event", func(t *testing.T) { + ev, _, _, expectedFutureMemUsage := generateSSTEvent(t, data, st) + mem := ev.MemUsage() + require.Equal(t, expectedFutureMemUsage, mem) + }) + + t.Run("sync event", func(t *testing.T) { + ev, _, expectedCurrMemUsage, _ := generateSyncEvent() + mem := ev.MemUsage() + require.Equal(t, expectedCurrMemUsage, mem) + }) +} + +// BenchmarkMemoryAccounting benchmarks the memory accounting of the event +// struct. +func BenchmarkMemoryAccounting(b *testing.B) { + b.Run("memory_calculation", func(b *testing.B) { + b.ReportAllocs() + rand, _ := randutil.NewTestRand() + events := []event{} + type res struct { + memUsage int64 + chosenEvent string + } + expectedRes := []res{} + for i := 0; i < 20; i++ { + ev, mem, chosenEvent := generateRandomizedEventAndSend(rand) + expectedRes = append(expectedRes, res{ + memUsage: mem, + chosenEvent: chosenEvent, + }) + events = append(events, ev) + } + + // Reset the timer without the cost of generating the events. + b.ResetTimer() + + for _, ev := range events { + ev.MemUsage() + } + + b.StopTimer() + + totalMemUsageSum := int64(0) + for i := 0; i < 20; i++ { + b.Logf("event %d: %+v\n", i+1, events[i]) + b.Logf("chosen event: %s\n", expectedRes[i].chosenEvent) + memUsage := events[i].MemUsage() + require.Equal(b, expectedRes[i].memUsage, memUsage) + totalMemUsageSum += memUsage + } + b.Logf("total memory usage: %d\n", totalMemUsageSum) + }) +} diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index cc4e408a9c05..3a55763e97d6 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -291,19 +291,3 @@ type syncEvent struct { // should be called from underneath a stopper task to ensure that the // engine has not been closed. type IntentScannerConstructor func() IntentScanner - -// calculateDateEventSize returns estimated size of the event that contain actual -// data. We only account for logical ops and sst's. Those events come from raft -// and are budgeted. Other events come from processor jobs and update timestamps -// we don't take them into account as they are supposed to be small and to avoid -// complexity of having multiple producers getting from budget. -func calculateDateEventSize(e event) int64 { - var size int64 - for _, op := range e.ops { - size += int64(op.Size()) - } - if e.sst != nil { - size += int64(len(e.sst.data)) - } - return size -} diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index 5fa6b7039bb8..47123cba8977 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -122,6 +122,14 @@ func abortTxnOp(txnID uuid.UUID) enginepb.MVCCLogicalOp { }) } +func deleteRangeOp(startKey, endKey roachpb.Key, timestamp hlc.Timestamp) enginepb.MVCCLogicalOp { + return makeLogicalOp(&enginepb.MVCCDeleteRangeOp{ + StartKey: startKey, + EndKey: endKey, + Timestamp: timestamp, + }) +} + func makeRangeFeedEvent(val interface{}) *kvpb.RangeFeedEvent { var event kvpb.RangeFeedEvent event.MustSetValue(val) @@ -810,7 +818,7 @@ func TestProcessorMemoryBudgetExceeded(t *testing.T) { // TestProcessorMemoryBudgetReleased that memory budget is correctly released. func TestProcessorMemoryBudgetReleased(t *testing.T) { defer leaktest.AfterTest(t)() - fb := newTestBudget(40) + fb := newTestBudget(250) p, h, stopper := newTestProcessor(t, withBudget(fb), withChanTimeout(15*time.Minute)) ctx := context.Background() defer stopper.Stop(ctx) diff --git a/pkg/kv/kvserver/rangefeed/scheduled_processor.go b/pkg/kv/kvserver/rangefeed/scheduled_processor.go index f4a295d0ca8c..4ff7ecf5800d 100644 --- a/pkg/kv/kvserver/rangefeed/scheduled_processor.go +++ b/pkg/kv/kvserver/rangefeed/scheduled_processor.go @@ -128,7 +128,7 @@ func (p *ScheduledProcessor) Start( return err } } else { - p.initResolvedTS(p.taskCtx) + p.initResolvedTS(p.taskCtx, nil) } p.Metrics.RangeFeedProcessorsScheduler.Inc(1) @@ -448,7 +448,7 @@ func (p *ScheduledProcessor) enqueueEventInternal( // inserting value into channel. var alloc *SharedBudgetAllocation if p.MemBudget != nil { - size := calculateDateEventSize(e) + size := e.MemUsage() if size > 0 { var err error // First we will try non-blocking fast path to allocate memory budget. @@ -638,9 +638,9 @@ func (p *ScheduledProcessor) consumeEvent(ctx context.Context, e *event) { case e.ops != nil: p.consumeLogicalOps(ctx, e.ops, e.alloc) case !e.ct.IsEmpty(): - p.forwardClosedTS(ctx, e.ct.Timestamp) + p.forwardClosedTS(ctx, e.ct.Timestamp, e.alloc) case bool(e.initRTS): - p.initResolvedTS(ctx) + p.initResolvedTS(ctx, e.alloc) case e.sst != nil: p.consumeSSTable(ctx, e.sst.data, e.sst.span, e.sst.ts, e.alloc) case e.sync != nil: @@ -699,7 +699,7 @@ func (p *ScheduledProcessor) consumeLogicalOps( // Determine whether the operation caused the resolved timestamp to // move forward. If so, publish a RangeFeedCheckpoint notification. if p.rts.ConsumeLogicalOp(ctx, op) { - p.publishCheckpoint(ctx) + p.publishCheckpoint(ctx, nil) } } } @@ -714,15 +714,17 @@ func (p *ScheduledProcessor) consumeSSTable( p.publishSSTable(ctx, sst, sstSpan, sstWTS, alloc) } -func (p *ScheduledProcessor) forwardClosedTS(ctx context.Context, newClosedTS hlc.Timestamp) { +func (p *ScheduledProcessor) forwardClosedTS( + ctx context.Context, newClosedTS hlc.Timestamp, alloc *SharedBudgetAllocation, +) { if p.rts.ForwardClosedTS(ctx, newClosedTS) { - p.publishCheckpoint(ctx) + p.publishCheckpoint(ctx, alloc) } } -func (p *ScheduledProcessor) initResolvedTS(ctx context.Context) { +func (p *ScheduledProcessor) initResolvedTS(ctx context.Context, alloc *SharedBudgetAllocation) { if p.rts.Init(ctx) { - p.publishCheckpoint(ctx) + p.publishCheckpoint(ctx, alloc) } } @@ -795,12 +797,12 @@ func (p *ScheduledProcessor) publishSSTable( }, false /* omitInRangefeeds */, alloc) } -func (p *ScheduledProcessor) publishCheckpoint(ctx context.Context) { +func (p *ScheduledProcessor) publishCheckpoint(ctx context.Context, alloc *SharedBudgetAllocation) { // TODO(nvanbenschoten): persist resolvedTimestamp. Give Processor a client.DB. // TODO(nvanbenschoten): rate limit these? send them periodically? event := p.newCheckpointEvent() - p.reg.PublishToOverlapping(ctx, all, event, false /* omitInRangefeeds */, nil) + p.reg.PublishToOverlapping(ctx, all, event, false /* omitInRangefeeds */, alloc) } func (p *ScheduledProcessor) newCheckpointEvent() *kvpb.RangeFeedEvent {