Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: remove ScanInterleavedIntents method #89441

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 11 additions & 31 deletions pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -1195,9 +1195,8 @@ var debugIntentCount = &cobra.Command{
Use: "intent-count <store directory>",
Short: "return a count of intents in directory",
Long: `
Returns a count of interleaved and separated intents in the store directory.
Used to investigate stores with lots of unresolved intents, or to confirm
if the migration away from interleaved intents was successful.
Returns a count of intents in the store directory. Used to investigate stores
with lots of unresolved intents.
`,
Args: cobra.MinimumNArgs(1),
RunE: runDebugIntentCount,
Expand All @@ -1214,7 +1213,7 @@ func runDebugIntentCount(cmd *cobra.Command, args []string) error {
}
defer db.Close()

var interleavedIntentCount, separatedIntentCount int
var intentCount int
var keysCount uint64
var wg sync.WaitGroup
closer := make(chan bool)
Expand All @@ -1239,48 +1238,29 @@ func runDebugIntentCount(cmd *cobra.Command, args []string) error {
})

iter := db.NewEngineIterator(storage.IterOptions{
LowerBound: roachpb.KeyMin,
UpperBound: roachpb.KeyMax,
LowerBound: keys.LockTableSingleKeyStart,
UpperBound: keys.LockTableSingleKeyEnd,
})
defer iter.Close()
valid, err := iter.SeekEngineKeyGE(storage.EngineKey{Key: roachpb.KeyMin})
var meta enginepb.MVCCMetadata
for ; valid && err == nil; valid, err = iter.NextEngineKey() {
seekKey := storage.EngineKey{Key: keys.LockTableSingleKeyStart}

var valid bool
for valid, err = iter.SeekEngineKeyGE(seekKey); valid && err == nil; valid, err = iter.NextEngineKey() {
key, err := iter.EngineKey()
if err != nil {
return err
}
atomic.AddUint64(&keysCount, 1)
if key.IsLockTableKey() {
separatedIntentCount++
continue
}
if !key.IsMVCCKey() {
continue
}
mvccKey, err := key.ToMVCCKey()
if err != nil {
return err
}
if !mvccKey.Timestamp.IsEmpty() {
continue
}
val := iter.UnsafeValue()
if err := protoutil.Unmarshal(val, &meta); err != nil {
return err
}
if meta.IsInline() {
continue
intentCount++
}
interleavedIntentCount++
}
if err != nil {
return err
}
close(closer)
wg.Wait()
fmt.Printf("interleaved intents: %d\nseparated intents: %d\n",
interleavedIntentCount, separatedIntentCount)
fmt.Printf("intents: %d\n", intentCount)
return nil
}

Expand Down
22 changes: 0 additions & 22 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,6 @@ func (b *Batch) fillResults(ctx context.Context) {
case *roachpb.MigrateRequest:
case *roachpb.QueryResolvedTimestampRequest:
case *roachpb.BarrierRequest:
case *roachpb.ScanInterleavedIntentsRequest:
default:
if result.Err == nil {
result.Err = errors.Errorf("unsupported reply: %T for %T",
Expand Down Expand Up @@ -881,27 +880,6 @@ func (b *Batch) queryResolvedTimestamp(s, e interface{}) {
b.initResult(1, 0, notRaw, nil)
}

func (b *Batch) scanInterleavedIntents(s, e interface{}) {
begin, err := marshalKey(s)
if err != nil {
b.initResult(0, 0, notRaw, err)
return
}
end, err := marshalKey(e)
if err != nil {
b.initResult(0, 0, notRaw, err)
return
}
req := &roachpb.ScanInterleavedIntentsRequest{
RequestHeader: roachpb.RequestHeader{
Key: begin,
EndKey: end,
},
}
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
}

func (b *Batch) barrier(s, e interface{}) {
begin, err := marshalKey(s)
if err != nil {
Expand Down
24 changes: 0 additions & 24 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,30 +783,6 @@ func (db *DB) QueryResolvedTimestamp(
return r.ResolvedTS, nil
}

// ScanInterleavedIntents is a command that returns all interleaved intents
// encountered in the request span. A resume span is returned if the entirety
// of the request span was not scanned.
func (db *DB) ScanInterleavedIntents(
ctx context.Context, begin, end interface{}, ts hlc.Timestamp,
) ([]roachpb.Intent, *roachpb.Span, error) {
b := &Batch{Header: roachpb.Header{Timestamp: ts}}
b.scanInterleavedIntents(begin, end)
result, err := getOneResult(db.Run(ctx, b), b)
if err != nil {
return nil, nil, err
}
responses := b.response.Responses
if len(responses) == 0 {
return nil, nil, errors.Errorf("unexpected empty response for ScanInterleavedIntents")
}
resp, ok := responses[0].GetInner().(*roachpb.ScanInterleavedIntentsResponse)
if !ok {
return nil, nil, errors.Errorf("unexpected response of type %T for ScanInterleavedIntents",
responses[0].GetInner())
}
return resp.Intents, result.ResumeSpan, nil
}

// Barrier is a command that waits for conflicting operations such as earlier
// writes on the specified key range to finish.
func (db *DB) Barrier(ctx context.Context, begin, end interface{}) (hlc.Timestamp, error) {
Expand Down
1 change: 0 additions & 1 deletion pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ go_library(
"cmd_reverse_scan.go",
"cmd_revert_range.go",
"cmd_scan.go",
"cmd_scan_interleaved_intents.go",
"cmd_subsume.go",
"cmd_truncate_log.go",
"command.go",
Expand Down
111 changes: 0 additions & 111 deletions pkg/kv/kvserver/batcheval/cmd_scan_interleaved_intents.go

This file was deleted.

3 changes: 1 addition & 2 deletions pkg/kv/kvserver/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ type Limiters struct {
// rangefeeds in the "catch-up" state across the store. The "catch-up" state
// is a temporary state at the beginning of a rangefeed which is expensive
// because it uses an engine iterator.
ConcurrentRangefeedIters limit.ConcurrentRequestLimiter
ConcurrentScanInterleavedIntents limit.ConcurrentRequestLimiter
ConcurrentRangefeedIters limit.ConcurrentRequestLimiter
}

// EvalContext is the interface through which command evaluation accesses the
Expand Down
16 changes: 0 additions & 16 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,17 +174,6 @@ var concurrentRangefeedItersLimit = settings.RegisterIntSetting(
settings.PositiveInt,
)

// concurrentscanInterleavedIntentsLimit is the number of concurrent
// ScanInterleavedIntents requests that will be run on a store. Used as part
// of pre-evaluation throttling.
var concurrentscanInterleavedIntentsLimit = settings.RegisterIntSetting(
settings.TenantWritable,
"kv.migration.concurrent_scan_interleaved_intents",
"number of scan interleaved intents requests a store will handle concurrently before queueing",
1,
settings.PositiveInt,
)

// Minimum time interval between system config updates which will lead to
// enqueuing replicas.
var queueAdditionOnSystemConfigUpdateRate = settings.RegisterFloatSetting(
Expand Down Expand Up @@ -1278,11 +1267,6 @@ func NewStore(
s.limiters.ConcurrentExportRequests = limit.MakeConcurrentRequestLimiter(
"exportRequestLimiter", int(ExportRequestsLimit.Get(&cfg.Settings.SV)),
)
s.limiters.ConcurrentScanInterleavedIntents = limit.MakeConcurrentRequestLimiter(
"scanInterleavedIntentsLimiter", int(concurrentscanInterleavedIntentsLimit.Get(&cfg.Settings.SV)))
concurrentscanInterleavedIntentsLimit.SetOnChange(&cfg.Settings.SV, func(ctx context.Context) {
s.limiters.ConcurrentScanInterleavedIntents.SetLimit(int(concurrentscanInterleavedIntentsLimit.Get(&cfg.Settings.SV)))
})

// The snapshot storage is usually empty at this point since it is cleared
// after each snapshot application, except when the node crashed right before
Expand Down
13 changes: 0 additions & 13 deletions pkg/kv/kvserver/store_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,19 +376,6 @@ func (s *Store) maybeThrottleBatch(
}
return res, nil

case *roachpb.ScanInterleavedIntentsRequest:
before := timeutil.Now()
res, err := s.limiters.ConcurrentScanInterleavedIntents.Begin(ctx)
if err != nil {
return nil, err
}

waited := timeutil.Since(before)
if waited > time.Second {
log.Infof(ctx, "ScanInterleavedIntents request was delayed by %v", waited)
}
return res, nil

default:
return nil, nil
}
Expand Down
28 changes: 2 additions & 26 deletions pkg/roachpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,20 +523,6 @@ func (r *BarrierResponse) combine(c combinable) error {

var _ combinable = &BarrierResponse{}

// combine implements the combinable interface.
func (r *ScanInterleavedIntentsResponse) combine(c combinable) error {
otherR := c.(*ScanInterleavedIntentsResponse)
if r != nil {
if err := r.ResponseHeader.combine(otherR.Header()); err != nil {
return err
}
r.Intents = append(r.Intents, otherR.Intents...)
}
return nil
}

var _ combinable = &ScanInterleavedIntentsResponse{}

// combine implements the combinable interface.
func (r *QueryLocksResponse) combine(c combinable) error {
otherR := c.(*QueryLocksResponse)
Expand Down Expand Up @@ -801,9 +787,6 @@ func (*AdminVerifyProtectedTimestampRequest) Method() Method { return AdminVerif
// Method implements the Request interface.
func (*QueryResolvedTimestampRequest) Method() Method { return QueryResolvedTimestamp }

// Method implements the Request interface.
func (*ScanInterleavedIntentsRequest) Method() Method { return ScanInterleavedIntents }

// Method implements the Request interface.
func (*BarrierRequest) Method() Method { return Barrier }

Expand Down Expand Up @@ -1086,12 +1069,6 @@ func (r *QueryResolvedTimestampRequest) ShallowCopy() Request {
return &shallowCopy
}

// ShallowCopy implements the Request interface.
func (r *ScanInterleavedIntentsRequest) ShallowCopy() Request {
shallowCopy := *r
return &shallowCopy
}

// ShallowCopy implements the Request interface.
func (r *BarrierRequest) ShallowCopy() Request {
shallowCopy := *r
Expand Down Expand Up @@ -1469,9 +1446,8 @@ func (*RangeStatsRequest) flags() flag { return isRead }
func (*QueryResolvedTimestampRequest) flags() flag {
return isRead | isRange | requiresClosedTSOlderThanStorageSnapshot
}
func (*ScanInterleavedIntentsRequest) flags() flag { return isRead | isRange }
func (*BarrierRequest) flags() flag { return isWrite | isRange }
func (*IsSpanEmptyRequest) flags() flag { return isRead | isRange }
func (*BarrierRequest) flags() flag { return isWrite | isRange }
func (*IsSpanEmptyRequest) flags() flag { return isRead | isRange }

// IsParallelCommit returns whether the EndTxn request is attempting to perform
// a parallel commit. See txn_interceptor_committer.go for a discussion about
Expand Down
Loading