diff --git a/pkg/cli/debug.go b/pkg/cli/debug.go index 180d4e3fcee4..060e02934ff9 100644 --- a/pkg/cli/debug.go +++ b/pkg/cli/debug.go @@ -1195,9 +1195,8 @@ var debugIntentCount = &cobra.Command{ Use: "intent-count ", Short: "return a count of intents in directory", Long: ` -Returns a count of interleaved and separated intents in the store directory. -Used to investigate stores with lots of unresolved intents, or to confirm -if the migration away from interleaved intents was successful. +Returns a count of intents in the store directory. Used to investigate stores +with lots of unresolved intents. `, Args: cobra.MinimumNArgs(1), RunE: runDebugIntentCount, @@ -1214,7 +1213,7 @@ func runDebugIntentCount(cmd *cobra.Command, args []string) error { } defer db.Close() - var interleavedIntentCount, separatedIntentCount int + var intentCount int var keysCount uint64 var wg sync.WaitGroup closer := make(chan bool) @@ -1239,48 +1238,29 @@ func runDebugIntentCount(cmd *cobra.Command, args []string) error { }) iter := db.NewEngineIterator(storage.IterOptions{ - LowerBound: roachpb.KeyMin, - UpperBound: roachpb.KeyMax, + LowerBound: keys.LockTableSingleKeyStart, + UpperBound: keys.LockTableSingleKeyEnd, }) defer iter.Close() - valid, err := iter.SeekEngineKeyGE(storage.EngineKey{Key: roachpb.KeyMin}) - var meta enginepb.MVCCMetadata - for ; valid && err == nil; valid, err = iter.NextEngineKey() { + seekKey := storage.EngineKey{Key: keys.LockTableSingleKeyStart} + + var valid bool + for valid, err = iter.SeekEngineKeyGE(seekKey); valid && err == nil; valid, err = iter.NextEngineKey() { key, err := iter.EngineKey() if err != nil { return err } atomic.AddUint64(&keysCount, 1) if key.IsLockTableKey() { - separatedIntentCount++ - continue - } - if !key.IsMVCCKey() { - continue - } - mvccKey, err := key.ToMVCCKey() - if err != nil { - return err - } - if !mvccKey.Timestamp.IsEmpty() { - continue - } - val := iter.UnsafeValue() - if err := protoutil.Unmarshal(val, &meta); err != nil { - return err - } - if meta.IsInline() { - continue + intentCount++ } - interleavedIntentCount++ } if err != nil { return err } close(closer) wg.Wait() - fmt.Printf("interleaved intents: %d\nseparated intents: %d\n", - interleavedIntentCount, separatedIntentCount) + fmt.Printf("intents: %d\n", intentCount) return nil } diff --git a/pkg/kv/batch.go b/pkg/kv/batch.go index 06c21ca2a1d5..158aea704091 100644 --- a/pkg/kv/batch.go +++ b/pkg/kv/batch.go @@ -279,7 +279,6 @@ func (b *Batch) fillResults(ctx context.Context) { case *roachpb.MigrateRequest: case *roachpb.QueryResolvedTimestampRequest: case *roachpb.BarrierRequest: - case *roachpb.ScanInterleavedIntentsRequest: default: if result.Err == nil { result.Err = errors.Errorf("unsupported reply: %T for %T", @@ -881,27 +880,6 @@ func (b *Batch) queryResolvedTimestamp(s, e interface{}) { b.initResult(1, 0, notRaw, nil) } -func (b *Batch) scanInterleavedIntents(s, e interface{}) { - begin, err := marshalKey(s) - if err != nil { - b.initResult(0, 0, notRaw, err) - return - } - end, err := marshalKey(e) - if err != nil { - b.initResult(0, 0, notRaw, err) - return - } - req := &roachpb.ScanInterleavedIntentsRequest{ - RequestHeader: roachpb.RequestHeader{ - Key: begin, - EndKey: end, - }, - } - b.appendReqs(req) - b.initResult(1, 0, notRaw, nil) -} - func (b *Batch) barrier(s, e interface{}) { begin, err := marshalKey(s) if err != nil { diff --git a/pkg/kv/db.go b/pkg/kv/db.go index c45d71fa99bd..644ed1cf823f 100644 --- a/pkg/kv/db.go +++ b/pkg/kv/db.go @@ -783,30 +783,6 @@ func (db *DB) QueryResolvedTimestamp( return r.ResolvedTS, nil } -// ScanInterleavedIntents is a command that returns all interleaved intents -// encountered in the request span. A resume span is returned if the entirety -// of the request span was not scanned. -func (db *DB) ScanInterleavedIntents( - ctx context.Context, begin, end interface{}, ts hlc.Timestamp, -) ([]roachpb.Intent, *roachpb.Span, error) { - b := &Batch{Header: roachpb.Header{Timestamp: ts}} - b.scanInterleavedIntents(begin, end) - result, err := getOneResult(db.Run(ctx, b), b) - if err != nil { - return nil, nil, err - } - responses := b.response.Responses - if len(responses) == 0 { - return nil, nil, errors.Errorf("unexpected empty response for ScanInterleavedIntents") - } - resp, ok := responses[0].GetInner().(*roachpb.ScanInterleavedIntentsResponse) - if !ok { - return nil, nil, errors.Errorf("unexpected response of type %T for ScanInterleavedIntents", - responses[0].GetInner()) - } - return resp.Intents, result.ResumeSpan, nil -} - // Barrier is a command that waits for conflicting operations such as earlier // writes on the specified key range to finish. func (db *DB) Barrier(ctx context.Context, begin, end interface{}) (hlc.Timestamp, error) { diff --git a/pkg/kv/kvserver/batcheval/BUILD.bazel b/pkg/kv/kvserver/batcheval/BUILD.bazel index e9daf4480e6a..1c5328479297 100644 --- a/pkg/kv/kvserver/batcheval/BUILD.bazel +++ b/pkg/kv/kvserver/batcheval/BUILD.bazel @@ -42,7 +42,6 @@ go_library( "cmd_reverse_scan.go", "cmd_revert_range.go", "cmd_scan.go", - "cmd_scan_interleaved_intents.go", "cmd_subsume.go", "cmd_truncate_log.go", "command.go", diff --git a/pkg/kv/kvserver/batcheval/cmd_scan_interleaved_intents.go b/pkg/kv/kvserver/batcheval/cmd_scan_interleaved_intents.go deleted file mode 100644 index a9cff2183020..000000000000 --- a/pkg/kv/kvserver/batcheval/cmd_scan_interleaved_intents.go +++ /dev/null @@ -1,111 +0,0 @@ -// Copyright 2021 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package batcheval - -import ( - "context" - "time" - - "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/storage" - "github.com/cockroachdb/cockroach/pkg/storage/enginepb" - "github.com/cockroachdb/cockroach/pkg/util/protoutil" - "github.com/cockroachdb/errors" -) - -func init() { - RegisterReadOnlyCommand(roachpb.ScanInterleavedIntents, declareKeysScanInterleavedIntents, ScanInterleavedIntents) -} - -func declareKeysScanInterleavedIntents( - rs ImmutableRangeState, - _ *roachpb.Header, - _ roachpb.Request, - latchSpans, _ *spanset.SpanSet, - _ time.Duration, -) { - latchSpans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(rs.GetStartKey())}) -} - -// ScanInterleavedIntents returns intents encountered in the provided span. -// These intents are then resolved in the separated intents migration, the -// usual caller for this request. -func ScanInterleavedIntents( - ctx context.Context, reader storage.Reader, cArgs CommandArgs, response roachpb.Response, -) (result.Result, error) { - req := cArgs.Args.(*roachpb.ScanInterleavedIntentsRequest) - resp := response.(*roachpb.ScanInterleavedIntentsResponse) - - // Put a limit on memory usage by scanning for at least maxIntentCount - // intents or maxIntentBytes in intent values, whichever is reached first, - // then returning those. - const maxIntentCount = 1000 - const maxIntentBytes = 1 << 20 // 1MB - iter := reader.NewEngineIterator(storage.IterOptions{ - LowerBound: req.Key, - UpperBound: req.EndKey, - }) - defer iter.Close() - valid, err := iter.SeekEngineKeyGE(storage.EngineKey{Key: req.Key}) - intentCount := 0 - intentBytes := 0 - - for ; valid && err == nil; valid, err = iter.NextEngineKey() { - key, err := iter.EngineKey() - if err != nil { - return result.Result{}, err - } - if !key.IsMVCCKey() { - // This should never happen, as the only non-MVCC keys are lock table - // keys and those are in the local keyspace. Return an error. - return result.Result{}, errors.New("encountered non-MVCC key during lock table migration") - } - mvccKey, err := key.ToMVCCKey() - if err != nil { - return result.Result{}, err - } - if !mvccKey.Timestamp.IsEmpty() { - // Versioned value - not an intent. - // - // TODO(bilal): Explore seeking here in case there are keys with lots of - // versioned values. - continue - } - - val := iter.Value() - meta := enginepb.MVCCMetadata{} - if err := protoutil.Unmarshal(val, &meta); err != nil { - return result.Result{}, err - } - if meta.IsInline() { - // Inlined value - not an intent. - continue - } - - if intentCount >= maxIntentCount || intentBytes >= maxIntentBytes { - // Batch limit reached - cut short this batch here. This kv - // will be added to txnIntents on the next iteration of the outer loop. - resp.ResumeSpan = &roachpb.Span{ - Key: mvccKey.Key, - EndKey: req.EndKey, - } - break - } - resp.Intents = append(resp.Intents, roachpb.MakeIntent(meta.Txn, mvccKey.Key)) - intentCount++ - intentBytes += len(val) - } - - return result.Result{}, nil -} diff --git a/pkg/kv/kvserver/batcheval/eval_context.go b/pkg/kv/kvserver/batcheval/eval_context.go index 277e01c02c43..49799483353f 100644 --- a/pkg/kv/kvserver/batcheval/eval_context.go +++ b/pkg/kv/kvserver/batcheval/eval_context.go @@ -39,8 +39,7 @@ type Limiters struct { // rangefeeds in the "catch-up" state across the store. The "catch-up" state // is a temporary state at the beginning of a rangefeed which is expensive // because it uses an engine iterator. - ConcurrentRangefeedIters limit.ConcurrentRequestLimiter - ConcurrentScanInterleavedIntents limit.ConcurrentRequestLimiter + ConcurrentRangefeedIters limit.ConcurrentRequestLimiter } // EvalContext is the interface through which command evaluation accesses the diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 8a9a528fb98b..17297ddee744 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -174,17 +174,6 @@ var concurrentRangefeedItersLimit = settings.RegisterIntSetting( settings.PositiveInt, ) -// concurrentscanInterleavedIntentsLimit is the number of concurrent -// ScanInterleavedIntents requests that will be run on a store. Used as part -// of pre-evaluation throttling. -var concurrentscanInterleavedIntentsLimit = settings.RegisterIntSetting( - settings.TenantWritable, - "kv.migration.concurrent_scan_interleaved_intents", - "number of scan interleaved intents requests a store will handle concurrently before queueing", - 1, - settings.PositiveInt, -) - // Minimum time interval between system config updates which will lead to // enqueuing replicas. var queueAdditionOnSystemConfigUpdateRate = settings.RegisterFloatSetting( @@ -1278,11 +1267,6 @@ func NewStore( s.limiters.ConcurrentExportRequests = limit.MakeConcurrentRequestLimiter( "exportRequestLimiter", int(ExportRequestsLimit.Get(&cfg.Settings.SV)), ) - s.limiters.ConcurrentScanInterleavedIntents = limit.MakeConcurrentRequestLimiter( - "scanInterleavedIntentsLimiter", int(concurrentscanInterleavedIntentsLimit.Get(&cfg.Settings.SV))) - concurrentscanInterleavedIntentsLimit.SetOnChange(&cfg.Settings.SV, func(ctx context.Context) { - s.limiters.ConcurrentScanInterleavedIntents.SetLimit(int(concurrentscanInterleavedIntentsLimit.Get(&cfg.Settings.SV))) - }) // The snapshot storage is usually empty at this point since it is cleared // after each snapshot application, except when the node crashed right before diff --git a/pkg/kv/kvserver/store_send.go b/pkg/kv/kvserver/store_send.go index 13e028e28d61..ec30becb76a6 100644 --- a/pkg/kv/kvserver/store_send.go +++ b/pkg/kv/kvserver/store_send.go @@ -376,19 +376,6 @@ func (s *Store) maybeThrottleBatch( } return res, nil - case *roachpb.ScanInterleavedIntentsRequest: - before := timeutil.Now() - res, err := s.limiters.ConcurrentScanInterleavedIntents.Begin(ctx) - if err != nil { - return nil, err - } - - waited := timeutil.Since(before) - if waited > time.Second { - log.Infof(ctx, "ScanInterleavedIntents request was delayed by %v", waited) - } - return res, nil - default: return nil, nil } diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index 20b6627309d6..f72eb145be31 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -523,20 +523,6 @@ func (r *BarrierResponse) combine(c combinable) error { var _ combinable = &BarrierResponse{} -// combine implements the combinable interface. -func (r *ScanInterleavedIntentsResponse) combine(c combinable) error { - otherR := c.(*ScanInterleavedIntentsResponse) - if r != nil { - if err := r.ResponseHeader.combine(otherR.Header()); err != nil { - return err - } - r.Intents = append(r.Intents, otherR.Intents...) - } - return nil -} - -var _ combinable = &ScanInterleavedIntentsResponse{} - // combine implements the combinable interface. func (r *QueryLocksResponse) combine(c combinable) error { otherR := c.(*QueryLocksResponse) @@ -801,9 +787,6 @@ func (*AdminVerifyProtectedTimestampRequest) Method() Method { return AdminVerif // Method implements the Request interface. func (*QueryResolvedTimestampRequest) Method() Method { return QueryResolvedTimestamp } -// Method implements the Request interface. -func (*ScanInterleavedIntentsRequest) Method() Method { return ScanInterleavedIntents } - // Method implements the Request interface. func (*BarrierRequest) Method() Method { return Barrier } @@ -1086,12 +1069,6 @@ func (r *QueryResolvedTimestampRequest) ShallowCopy() Request { return &shallowCopy } -// ShallowCopy implements the Request interface. -func (r *ScanInterleavedIntentsRequest) ShallowCopy() Request { - shallowCopy := *r - return &shallowCopy -} - // ShallowCopy implements the Request interface. func (r *BarrierRequest) ShallowCopy() Request { shallowCopy := *r @@ -1469,9 +1446,8 @@ func (*RangeStatsRequest) flags() flag { return isRead } func (*QueryResolvedTimestampRequest) flags() flag { return isRead | isRange | requiresClosedTSOlderThanStorageSnapshot } -func (*ScanInterleavedIntentsRequest) flags() flag { return isRead | isRange } -func (*BarrierRequest) flags() flag { return isWrite | isRange } -func (*IsSpanEmptyRequest) flags() flag { return isRead | isRange } +func (*BarrierRequest) flags() flag { return isWrite | isRange } +func (*IsSpanEmptyRequest) flags() flag { return isRead | isRange } // IsParallelCommit returns whether the EndTxn request is attempting to perform // a parallel commit. See txn_interceptor_committer.go for a discussion about diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto index 9cfe01a2ce66..0152fa6a6714 100644 --- a/pkg/roachpb/api.proto +++ b/pkg/roachpb/api.proto @@ -2075,23 +2075,6 @@ message QueryResolvedTimestampResponse { (gogoproto.nullable) = false, (gogoproto.customname) = "ResolvedTS"]; } -// ScanInterleavedIntentsRequest is the request for a ScanInterleavedIntents operation. -// This is a read-only operation that returns all interleaved (non-separated) -// intents found over the request range. -message ScanInterleavedIntentsRequest { - RequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true]; -} - -// ScanInterleavedIntentsResponse is the response to a ScanInterleavedIntents operation. -message ScanInterleavedIntentsResponse { - ResponseHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true]; - - // The intents encountered in the part of the request span that was - // evaluated. A resume span is set in the response header if the entirety of - // the request span was not evaluated. - repeated Intent intents = 3 [(gogoproto.nullable) = false]; -} - // BarrierRequest is the request for a Barrier operation. This goes through Raft // and has the purpose of waiting until all conflicting in-flight operations on // this range have completed, without blocking any new operations. @@ -2160,12 +2143,11 @@ message RequestUnion { AdminVerifyProtectedTimestampRequest admin_verify_protected_timestamp = 49; MigrateRequest migrate = 50; QueryResolvedTimestampRequest query_resolved_timestamp = 51; - ScanInterleavedIntentsRequest scan_interleaved_intents = 52; BarrierRequest barrier = 53; ProbeRequest probe = 54; IsSpanEmptyRequest is_span_empty = 56; } - reserved 8, 15, 23, 25, 27, 31, 34; + reserved 8, 15, 23, 25, 27, 31, 34, 52; } // A ResponseUnion contains exactly one of the responses. @@ -2216,12 +2198,11 @@ message ResponseUnion { AdminVerifyProtectedTimestampResponse admin_verify_protected_timestamp = 49; MigrateResponse migrate = 50; QueryResolvedTimestampResponse query_resolved_timestamp = 51; - ScanInterleavedIntentsResponse scan_interleaved_intents = 52; BarrierResponse barrier = 53; ProbeResponse probe = 54; IsSpanEmptyResponse is_span_empty = 56; } - reserved 8, 15, 23, 25, 27, 28, 31, 34; + reserved 8, 15, 23, 25, 27, 28, 31, 34, 52; } // A Header is attached to a BatchRequest, encapsulating routing and auxiliary diff --git a/pkg/roachpb/batch_generated.go b/pkg/roachpb/batch_generated.go index 378a046fd99f..de7080127570 100644 --- a/pkg/roachpb/batch_generated.go +++ b/pkg/roachpb/batch_generated.go @@ -176,8 +176,6 @@ func (ru RequestUnion) GetInner() Request { return t.Migrate case *RequestUnion_QueryResolvedTimestamp: return t.QueryResolvedTimestamp - case *RequestUnion_ScanInterleavedIntents: - return t.ScanInterleavedIntents case *RequestUnion_Barrier: return t.Barrier case *RequestUnion_Probe: @@ -280,8 +278,6 @@ func (ru ResponseUnion) GetInner() Response { return t.Migrate case *ResponseUnion_QueryResolvedTimestamp: return t.QueryResolvedTimestamp - case *ResponseUnion_ScanInterleavedIntents: - return t.ScanInterleavedIntents case *ResponseUnion_Barrier: return t.Barrier case *ResponseUnion_Probe: @@ -465,8 +461,6 @@ func (ru *RequestUnion) MustSetInner(r Request) { union = &RequestUnion_Migrate{t} case *QueryResolvedTimestampRequest: union = &RequestUnion_QueryResolvedTimestamp{t} - case *ScanInterleavedIntentsRequest: - union = &RequestUnion_ScanInterleavedIntents{t} case *BarrierRequest: union = &RequestUnion_Barrier{t} case *ProbeRequest: @@ -572,8 +566,6 @@ func (ru *ResponseUnion) MustSetInner(r Response) { union = &ResponseUnion_Migrate{t} case *QueryResolvedTimestampResponse: union = &ResponseUnion_QueryResolvedTimestamp{t} - case *ScanInterleavedIntentsResponse: - union = &ResponseUnion_ScanInterleavedIntents{t} case *BarrierResponse: union = &ResponseUnion_Barrier{t} case *ProbeResponse: @@ -586,7 +578,7 @@ func (ru *ResponseUnion) MustSetInner(r Response) { ru.Value = union } -type reqCounts [49]int32 +type reqCounts [48]int32 // getReqCounts returns the number of times each // request type appears in the batch. @@ -684,14 +676,12 @@ func (ba *BatchRequest) getReqCounts() reqCounts { counts[43]++ case *RequestUnion_QueryResolvedTimestamp: counts[44]++ - case *RequestUnion_ScanInterleavedIntents: - counts[45]++ case *RequestUnion_Barrier: - counts[46]++ + counts[45]++ case *RequestUnion_Probe: - counts[47]++ + counts[46]++ case *RequestUnion_IsSpanEmpty: - counts[48]++ + counts[47]++ default: panic(fmt.Sprintf("unsupported request: %+v", ru)) } @@ -745,7 +735,6 @@ var requestNames = []string{ "AdmVerifyProtectedTimestamp", "Migrate", "QueryResolvedTimestamp", - "ScanInterleavedIntents", "Barrier", "Probe", "IsSpanEmpty", @@ -964,10 +953,6 @@ type queryResolvedTimestampResponseAlloc struct { union ResponseUnion_QueryResolvedTimestamp resp QueryResolvedTimestampResponse } -type scanInterleavedIntentsResponseAlloc struct { - union ResponseUnion_ScanInterleavedIntents - resp ScanInterleavedIntentsResponse -} type barrierResponseAlloc struct { union ResponseUnion_Barrier resp BarrierResponse @@ -1035,10 +1020,9 @@ func (ba *BatchRequest) CreateReply() *BatchResponse { var buf42 []adminVerifyProtectedTimestampResponseAlloc var buf43 []migrateResponseAlloc var buf44 []queryResolvedTimestampResponseAlloc - var buf45 []scanInterleavedIntentsResponseAlloc - var buf46 []barrierResponseAlloc - var buf47 []probeResponseAlloc - var buf48 []isSpanEmptyResponseAlloc + var buf45 []barrierResponseAlloc + var buf46 []probeResponseAlloc + var buf47 []isSpanEmptyResponseAlloc for i, r := range ba.Requests { switch r.GetValue().(type) { @@ -1357,34 +1341,27 @@ func (ba *BatchRequest) CreateReply() *BatchResponse { buf44[0].union.QueryResolvedTimestamp = &buf44[0].resp br.Responses[i].Value = &buf44[0].union buf44 = buf44[1:] - case *RequestUnion_ScanInterleavedIntents: + case *RequestUnion_Barrier: if buf45 == nil { - buf45 = make([]scanInterleavedIntentsResponseAlloc, counts[45]) + buf45 = make([]barrierResponseAlloc, counts[45]) } - buf45[0].union.ScanInterleavedIntents = &buf45[0].resp + buf45[0].union.Barrier = &buf45[0].resp br.Responses[i].Value = &buf45[0].union buf45 = buf45[1:] - case *RequestUnion_Barrier: + case *RequestUnion_Probe: if buf46 == nil { - buf46 = make([]barrierResponseAlloc, counts[46]) + buf46 = make([]probeResponseAlloc, counts[46]) } - buf46[0].union.Barrier = &buf46[0].resp + buf46[0].union.Probe = &buf46[0].resp br.Responses[i].Value = &buf46[0].union buf46 = buf46[1:] - case *RequestUnion_Probe: + case *RequestUnion_IsSpanEmpty: if buf47 == nil { - buf47 = make([]probeResponseAlloc, counts[47]) + buf47 = make([]isSpanEmptyResponseAlloc, counts[47]) } - buf47[0].union.Probe = &buf47[0].resp + buf47[0].union.IsSpanEmpty = &buf47[0].resp br.Responses[i].Value = &buf47[0].union buf47 = buf47[1:] - case *RequestUnion_IsSpanEmpty: - if buf48 == nil { - buf48 = make([]isSpanEmptyResponseAlloc, counts[48]) - } - buf48[0].union.IsSpanEmpty = &buf48[0].resp - br.Responses[i].Value = &buf48[0].union - buf48 = buf48[1:] default: panic(fmt.Sprintf("unsupported request: %+v", r)) } @@ -1485,8 +1462,6 @@ func CreateRequest(method Method) Request { return &MigrateRequest{} case QueryResolvedTimestamp: return &QueryResolvedTimestampRequest{} - case ScanInterleavedIntents: - return &ScanInterleavedIntentsRequest{} case Barrier: return &BarrierRequest{} case Probe: diff --git a/pkg/roachpb/method.go b/pkg/roachpb/method.go index abf87e68b479..421aae2e9f21 100644 --- a/pkg/roachpb/method.go +++ b/pkg/roachpb/method.go @@ -164,9 +164,6 @@ const ( // QueryResolvedTimestamp requests the resolved timestamp of the key span it // is issued over. QueryResolvedTimestamp - // ScanInterleavedIntents is a command to return interleaved intents - // encountered over a key range. - ScanInterleavedIntents // Barrier is a command that ensures all conflicting in-flight operations on // this range before this command have finished by the time it returns. It // does not block new operations that started after this command's evaluation. diff --git a/pkg/roachpb/method_string.go b/pkg/roachpb/method_string.go index bff55781d521..64947031722b 100644 --- a/pkg/roachpb/method_string.go +++ b/pkg/roachpb/method_string.go @@ -54,16 +54,15 @@ func _() { _ = x[RangeStats-43] _ = x[AdminVerifyProtectedTimestamp-44] _ = x[QueryResolvedTimestamp-45] - _ = x[ScanInterleavedIntents-46] - _ = x[Barrier-47] - _ = x[Probe-48] - _ = x[IsSpanEmpty-49] - _ = x[NumMethods-50] + _ = x[Barrier-46] + _ = x[Probe-47] + _ = x[IsSpanEmpty-48] + _ = x[NumMethods-49] } -const _Method_name = "GetPutConditionalPutIncrementDeleteDeleteRangeClearRangeRevertRangeScanReverseScanEndTxnAdminSplitAdminUnsplitAdminMergeAdminTransferLeaseAdminChangeReplicasAdminRelocateRangeHeartbeatTxnGCPushTxnRecoverTxnQueryLocksQueryTxnQueryIntentResolveIntentResolveIntentRangeMergeTruncateLogRequestLeaseTransferLeaseLeaseInfoComputeChecksumCheckConsistencyInitPutWriteBatchExportAdminScatterAddSSTableMigrateRecomputeStatsRefreshRefreshRangeSubsumeRangeStatsAdminVerifyProtectedTimestampQueryResolvedTimestampScanInterleavedIntentsBarrierProbeIsSpanEmptyNumMethods" +const _Method_name = "GetPutConditionalPutIncrementDeleteDeleteRangeClearRangeRevertRangeScanReverseScanEndTxnAdminSplitAdminUnsplitAdminMergeAdminTransferLeaseAdminChangeReplicasAdminRelocateRangeHeartbeatTxnGCPushTxnRecoverTxnQueryLocksQueryTxnQueryIntentResolveIntentResolveIntentRangeMergeTruncateLogRequestLeaseTransferLeaseLeaseInfoComputeChecksumCheckConsistencyInitPutWriteBatchExportAdminScatterAddSSTableMigrateRecomputeStatsRefreshRefreshRangeSubsumeRangeStatsAdminVerifyProtectedTimestampQueryResolvedTimestampBarrierProbeIsSpanEmptyNumMethods" -var _Method_index = [...]uint16{0, 3, 6, 20, 29, 35, 46, 56, 67, 71, 82, 88, 98, 110, 120, 138, 157, 175, 187, 189, 196, 206, 216, 224, 235, 248, 266, 271, 282, 294, 307, 316, 331, 347, 354, 364, 370, 382, 392, 399, 413, 420, 432, 439, 449, 478, 500, 522, 529, 534, 545, 555} +var _Method_index = [...]uint16{0, 3, 6, 20, 29, 35, 46, 56, 67, 71, 82, 88, 98, 110, 120, 138, 157, 175, 187, 189, 196, 206, 216, 224, 235, 248, 266, 271, 282, 294, 307, 316, 331, 347, 354, 364, 370, 382, 392, 399, 413, 420, 432, 439, 449, 478, 500, 507, 512, 523, 533} func (i Method) String() string { if i < 0 || i >= Method(len(_Method_index)-1) { diff --git a/pkg/sql/schemachanger/sctest/BUILD.bazel b/pkg/sql/schemachanger/sctest/BUILD.bazel index bbb6d95b85dd..c655619de81a 100644 --- a/pkg/sql/schemachanger/sctest/BUILD.bazel +++ b/pkg/sql/schemachanger/sctest/BUILD.bazel @@ -36,7 +36,9 @@ go_library( "//pkg/testutils/serverutils", "//pkg/testutils/skip", "//pkg/testutils/sqlutils", + "//pkg/util/log", "//pkg/util/randutil", + "//pkg/util/syncutil", "@com_github_cockroachdb_cockroach_go_v2//crdb", "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/sql/schemachanger/sctest/end_to_end.go b/pkg/sql/schemachanger/sctest/end_to_end.go index 3d473dc14da0..4ea548d14106 100644 --- a/pkg/sql/schemachanger/sctest/end_to_end.go +++ b/pkg/sql/schemachanger/sctest/end_to_end.go @@ -44,7 +44,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/datadriven" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -61,7 +64,7 @@ func SingleNodeCluster(t *testing.T, knobs *scexec.TestingKnobs) (*gosql.DB, fun DisableDefaultTestTenant: true, Knobs: base.TestingKnobs{ SQLDeclarativeSchemaChanger: knobs, - JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + JobsTestingKnobs: newJobsKnobs(), SQLExecutor: &sql.ExecutorTestingKnobs{ UseTransactionalDescIDGenerator: true, }, @@ -72,6 +75,39 @@ func SingleNodeCluster(t *testing.T, knobs *scexec.TestingKnobs) (*gosql.DB, fun } } +// newJobsKnobs constructs jobs.TestingKnobs for the end-to-end tests. +func newJobsKnobs() *jobs.TestingKnobs { + jobKnobs := jobs.NewTestingKnobsWithShortIntervals() + + // We want to force the process of marking the job as successful + // to fail sometimes. This will ensure that the schema change job + // is idempotent. + var injectedFailures = struct { + syncutil.Mutex + m map[jobspb.JobID]struct{} + }{ + m: make(map[jobspb.JobID]struct{}), + } + jobKnobs.BeforeUpdate = func(orig, updated jobs.JobMetadata) error { + sc := orig.Payload.GetNewSchemaChange() + if sc == nil { + return nil + } + if orig.Status != jobs.StatusRunning || updated.Status != jobs.StatusSucceeded { + return nil + } + injectedFailures.Lock() + defer injectedFailures.Unlock() + if _, ok := injectedFailures.m[orig.ID]; !ok { + injectedFailures.m[orig.ID] = struct{}{} + log.Infof(context.Background(), "injecting failure while marking job succeeded") + return errors.New("injected failure when marking succeeded") + } + return nil + } + return jobKnobs +} + // EndToEndSideEffects is a data-driven test runner that executes DDL statements in the // declarative schema changer injected with test dependencies and compares the // accumulated side effects logs with expected results from the data-driven diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 1a9ab6d2bad4..ff63abda4135 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -356,7 +356,6 @@ var charts = []sectionDescription{ "rpc.method.reversescan.recv", "rpc.method.revertrange.recv", "rpc.method.scan.recv", - "rpc.method.scaninterleavedintents.recv", "rpc.method.subsume.recv", "rpc.method.transferlease.recv", "rpc.method.truncatelog.recv", @@ -410,7 +409,6 @@ var charts = []sectionDescription{ "distsender.rpc.reversescan.sent", "distsender.rpc.revertrange.sent", "distsender.rpc.scan.sent", - "distsender.rpc.scaninterleavedintents.sent", "distsender.rpc.subsume.sent", "distsender.rpc.transferlease.sent", "distsender.rpc.probe.sent", diff --git a/pkg/upgrade/upgrades/testdata/separated_intents b/pkg/upgrade/upgrades/testdata/separated_intents deleted file mode 100644 index 5787e058fb87..000000000000 --- a/pkg/upgrade/upgrades/testdata/separated_intents +++ /dev/null @@ -1,442 +0,0 @@ - -# Simple case - no resume spans (unlimited intents returned per scan), -# one range, no range local keys. - -reset ----- - -add-range id=3 key=a endkey=g - a 1 - b 1 - c 1 - d 2 - e 2 - f 2 ----- - -run-migration ----- -ok - -count-calls ----- -barrier: 2 -scanInterleavedIntents: 2 - -pushed-txns ----- -1 -2 - -resolved-intents ----- -"a" -"b" -"c" -"d" -"e" -"f" - -reset ----- - -# Same case as above, but with use of resume spans. There should be 4 -# scanInterleavedIntents requests, 3 for the six range replicates keys, and 1 -# for range local keys (which there are none of). - -add-range id=3 key=a endkey=g - a 1 - b 1 - c 1 - d 2 - e 2 - f 2 ----- - -set-max-intent-count -2 ----- - -run-migration ----- -ok - -count-calls ----- -barrier: 2 -scanInterleavedIntents: 4 - -pushed-txns ----- -1 -1 -2 -2 - -resolved-intents ----- -"a" -"b" -"c" -"d" -"e" -"f" - -reset ----- - -# Add range local keys, while maintaining the same limit of two intents returned -# per scanInterleavedIntents call. There should be 5 calls to -# scanInterleavedIntents in all - 3 for replicated keys, 2 for range local keys. - -add-range id=3 key=a endkey=g - a 1 - b 1 - c 1 - d 2 - e 2 - f 2 -local - a 4 - b 3 - c 3 ----- - -set-max-intent-count -2 ----- - -run-migration ----- -ok - -count-calls ----- -barrier: 2 -scanInterleavedIntents: 5 - -pushed-txns ----- -1 -1 -2 -2 -3 -3 -4 - -resolved-intents ----- -/Local/Range"a"/QueueLastProcessed/"a" -/Local/Range"a"/QueueLastProcessed/"b" -/Local/Range"a"/QueueLastProcessed/"c" -"a" -"b" -"c" -"d" -"e" -"f" - -reset ----- - -# Case where one scanInterleavedIntent request is fired per intent. There are 15 -# keys across 2 ranges, so there should be 4 barrier calls (2x per range), and 16 -# scanInterleavedIntents calls (1 per intent, 1 for the empty range local keys -# of range 2). - -add-range id=1 key=a endkey=g - a 1 - b 1 - c 1 - d 2 - e 2 - f 2 -local - a 4 - b 3 - c 3 ----- - -add-range id=2 key=g endkey=k - g 5 - gg 6 - h 7 - hh 8 - i 9 - ii 10 ----- - -set-max-intent-count -1 ----- - -run-migration ----- -ok - -count-calls ----- -barrier: 4 -scanInterleavedIntents: 16 - -pushed-txns ----- -1 -1 -1 -2 -2 -2 -3 -3 -4 -5 -6 -7 -8 -9 -10 - -resolved-intents ----- -/Local/Range"a"/QueueLastProcessed/"a" -/Local/Range"a"/QueueLastProcessed/"b" -/Local/Range"a"/QueueLastProcessed/"c" -"a" -"b" -"c" -"d" -"e" -"f" -"g" -"gg" -"h" -"hh" -"i" -"ii" - -reset ----- - -# Same case as above, but with no resume span usage. - -add-range id=1 key=a endkey=g - a 1 - b 1 - c 1 - d 2 - e 2 - f 2 -local - a 4 - b 3 - c 3 ----- - -add-range id=2 key=g endkey=k - g 5 - gg 6 - h 7 - hh 8 - i 9 - ii 10 ----- - -set-max-intent-count -0 ----- - -run-migration ----- -ok - -count-calls ----- -barrier: 4 -scanInterleavedIntents: 4 - -pushed-txns ----- -1 -2 -3 -4 -5 -6 -7 -8 -9 -10 - -resolved-intents ----- -/Local/Range"a"/QueueLastProcessed/"a" -/Local/Range"a"/QueueLastProcessed/"b" -/Local/Range"a"/QueueLastProcessed/"c" -"a" -"b" -"c" -"d" -"e" -"f" -"g" -"gg" -"h" -"hh" -"i" -"ii" - -reset ----- - -# Take the case above, and add an error injection rate of 33% (1/3). The number -# of calls will go up, but the end result (pushed txns, resolved intents) will -# be the same. count-calls isn't added here as the count itself could be -# possibly non-deterministic. - -add-range id=1 key=a endkey=g - a 1 - b 1 - c 1 - d 2 - e 2 - f 2 -local - a 4 - b 3 - c 3 ----- - -add-range id=2 key=g endkey=k - g 5 - gg 6 - h 7 - hh 8 - i 9 - ii 10 ----- - -error-per-n-calls -3 ----- - -run-migration ----- -ok - - -pushed-txns ----- -1 -2 -3 -4 -5 -6 -7 -8 -9 -10 - -resolved-intents ----- -/Local/Range"a"/QueueLastProcessed/"a" -/Local/Range"a"/QueueLastProcessed/"b" -/Local/Range"a"/QueueLastProcessed/"c" -"a" -"b" -"c" -"d" -"e" -"f" -"g" -"gg" -"h" -"hh" -"i" -"ii" - -reset ----- - -# Take the case above, and add an error injection rate of 100% (1/1). The -# migration should error out and quit gracefully. - -add-range id=1 key=a endkey=g - a 1 - b 1 - c 1 - d 2 - e 2 - f 2 -local - a 4 - b 3 - c 3 ----- - -add-range id=2 key=g endkey=k - g 5 - gg 6 - h 7 - hh 8 - i 9 - ii 10 ----- - -error-per-n-calls -1 ----- - -run-migration ----- -error when invoking Barrier command: injected - -pushed-txns ----- - -resolved-intents ----- - -reset ----- - -# Test a range containing timeseries keys only. Its range-local keys should be -# migrated, but not its global keys. - -add-range id=1 key=/tsd/a endkey=/tsd/g - /tsd/a 1 - /tsd/b 1 - /tsd/c 1 - /tsd/d 2 - /tsd/e 2 - /tsd/f 2 -local - a 4 - b 3 - c 3 ----- - -run-migration ----- -ok - -pushed-txns ----- -3 -4 - -resolved-intents ----- -/Local/Range/System/tsd/"a"/QueueLastProcessed/"a" -/Local/Range/System/tsd/"a"/QueueLastProcessed/"b" -/Local/Range/System/tsd/"a"/QueueLastProcessed/"c" - -count-calls ----- -barrier: 1 -scanInterleavedIntents: 1 - -reset -----