Skip to content

Commit

Permalink
Merge #89441 #89468
Browse files Browse the repository at this point in the history
89441: kvserver: remove `ScanInterleavedIntents` method r=erikgrinaker a=erikgrinaker

Resolves #89428.

**upgrades: remove separated intents test data**

This test no longer exists, only the datadriven test data.

Release note: None

**kvserver: remove `ScanInterleavedIntents` method**

This was used by the upgrade migration to separated intents in 21.2.
Interleaved intents no longer exist, and there are no callers in 22.2,
so this can safely be removed.

Release note: None
  
**cli: remove interleaved intents from `debug intent-count`**

Interleaved intents no longer exist. Only count separated intents.

Release note: None

89468: sctest: ensure job succeeds when retried upon success r=ajwerner a=ajwerner

We found a bug recently whereby a schema change which was resumed after its last stage would fail if it were resumed again. This could happen if the process of moving the state machine in the jobs subsystem to succeeded fails for whatever reason. This change ensures that we test the failure case.

Release note: None

Co-authored-by: Erik Grinaker <[email protected]>
Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
3 people committed Oct 6, 2022
3 parents f4d710c + 5cc2f47 + 6452229 commit 2a1abc8
Show file tree
Hide file tree
Showing 17 changed files with 77 additions and 763 deletions.
42 changes: 11 additions & 31 deletions pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -1195,9 +1195,8 @@ var debugIntentCount = &cobra.Command{
Use: "intent-count <store directory>",
Short: "return a count of intents in directory",
Long: `
Returns a count of interleaved and separated intents in the store directory.
Used to investigate stores with lots of unresolved intents, or to confirm
if the migration away from interleaved intents was successful.
Returns a count of intents in the store directory. Used to investigate stores
with lots of unresolved intents.
`,
Args: cobra.MinimumNArgs(1),
RunE: runDebugIntentCount,
Expand All @@ -1214,7 +1213,7 @@ func runDebugIntentCount(cmd *cobra.Command, args []string) error {
}
defer db.Close()

var interleavedIntentCount, separatedIntentCount int
var intentCount int
var keysCount uint64
var wg sync.WaitGroup
closer := make(chan bool)
Expand All @@ -1239,48 +1238,29 @@ func runDebugIntentCount(cmd *cobra.Command, args []string) error {
})

iter := db.NewEngineIterator(storage.IterOptions{
LowerBound: roachpb.KeyMin,
UpperBound: roachpb.KeyMax,
LowerBound: keys.LockTableSingleKeyStart,
UpperBound: keys.LockTableSingleKeyEnd,
})
defer iter.Close()
valid, err := iter.SeekEngineKeyGE(storage.EngineKey{Key: roachpb.KeyMin})
var meta enginepb.MVCCMetadata
for ; valid && err == nil; valid, err = iter.NextEngineKey() {
seekKey := storage.EngineKey{Key: keys.LockTableSingleKeyStart}

var valid bool
for valid, err = iter.SeekEngineKeyGE(seekKey); valid && err == nil; valid, err = iter.NextEngineKey() {
key, err := iter.EngineKey()
if err != nil {
return err
}
atomic.AddUint64(&keysCount, 1)
if key.IsLockTableKey() {
separatedIntentCount++
continue
}
if !key.IsMVCCKey() {
continue
}
mvccKey, err := key.ToMVCCKey()
if err != nil {
return err
}
if !mvccKey.Timestamp.IsEmpty() {
continue
}
val := iter.UnsafeValue()
if err := protoutil.Unmarshal(val, &meta); err != nil {
return err
}
if meta.IsInline() {
continue
intentCount++
}
interleavedIntentCount++
}
if err != nil {
return err
}
close(closer)
wg.Wait()
fmt.Printf("interleaved intents: %d\nseparated intents: %d\n",
interleavedIntentCount, separatedIntentCount)
fmt.Printf("intents: %d\n", intentCount)
return nil
}

Expand Down
22 changes: 0 additions & 22 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,6 @@ func (b *Batch) fillResults(ctx context.Context) {
case *roachpb.MigrateRequest:
case *roachpb.QueryResolvedTimestampRequest:
case *roachpb.BarrierRequest:
case *roachpb.ScanInterleavedIntentsRequest:
default:
if result.Err == nil {
result.Err = errors.Errorf("unsupported reply: %T for %T",
Expand Down Expand Up @@ -881,27 +880,6 @@ func (b *Batch) queryResolvedTimestamp(s, e interface{}) {
b.initResult(1, 0, notRaw, nil)
}

func (b *Batch) scanInterleavedIntents(s, e interface{}) {
begin, err := marshalKey(s)
if err != nil {
b.initResult(0, 0, notRaw, err)
return
}
end, err := marshalKey(e)
if err != nil {
b.initResult(0, 0, notRaw, err)
return
}
req := &roachpb.ScanInterleavedIntentsRequest{
RequestHeader: roachpb.RequestHeader{
Key: begin,
EndKey: end,
},
}
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
}

func (b *Batch) barrier(s, e interface{}) {
begin, err := marshalKey(s)
if err != nil {
Expand Down
24 changes: 0 additions & 24 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,30 +783,6 @@ func (db *DB) QueryResolvedTimestamp(
return r.ResolvedTS, nil
}

// ScanInterleavedIntents is a command that returns all interleaved intents
// encountered in the request span. A resume span is returned if the entirety
// of the request span was not scanned.
func (db *DB) ScanInterleavedIntents(
ctx context.Context, begin, end interface{}, ts hlc.Timestamp,
) ([]roachpb.Intent, *roachpb.Span, error) {
b := &Batch{Header: roachpb.Header{Timestamp: ts}}
b.scanInterleavedIntents(begin, end)
result, err := getOneResult(db.Run(ctx, b), b)
if err != nil {
return nil, nil, err
}
responses := b.response.Responses
if len(responses) == 0 {
return nil, nil, errors.Errorf("unexpected empty response for ScanInterleavedIntents")
}
resp, ok := responses[0].GetInner().(*roachpb.ScanInterleavedIntentsResponse)
if !ok {
return nil, nil, errors.Errorf("unexpected response of type %T for ScanInterleavedIntents",
responses[0].GetInner())
}
return resp.Intents, result.ResumeSpan, nil
}

// Barrier is a command that waits for conflicting operations such as earlier
// writes on the specified key range to finish.
func (db *DB) Barrier(ctx context.Context, begin, end interface{}) (hlc.Timestamp, error) {
Expand Down
1 change: 0 additions & 1 deletion pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ go_library(
"cmd_reverse_scan.go",
"cmd_revert_range.go",
"cmd_scan.go",
"cmd_scan_interleaved_intents.go",
"cmd_subsume.go",
"cmd_truncate_log.go",
"command.go",
Expand Down
111 changes: 0 additions & 111 deletions pkg/kv/kvserver/batcheval/cmd_scan_interleaved_intents.go

This file was deleted.

3 changes: 1 addition & 2 deletions pkg/kv/kvserver/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ type Limiters struct {
// rangefeeds in the "catch-up" state across the store. The "catch-up" state
// is a temporary state at the beginning of a rangefeed which is expensive
// because it uses an engine iterator.
ConcurrentRangefeedIters limit.ConcurrentRequestLimiter
ConcurrentScanInterleavedIntents limit.ConcurrentRequestLimiter
ConcurrentRangefeedIters limit.ConcurrentRequestLimiter
}

// EvalContext is the interface through which command evaluation accesses the
Expand Down
16 changes: 0 additions & 16 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,17 +174,6 @@ var concurrentRangefeedItersLimit = settings.RegisterIntSetting(
settings.PositiveInt,
)

// concurrentscanInterleavedIntentsLimit is the number of concurrent
// ScanInterleavedIntents requests that will be run on a store. Used as part
// of pre-evaluation throttling.
var concurrentscanInterleavedIntentsLimit = settings.RegisterIntSetting(
settings.TenantWritable,
"kv.migration.concurrent_scan_interleaved_intents",
"number of scan interleaved intents requests a store will handle concurrently before queueing",
1,
settings.PositiveInt,
)

// Minimum time interval between system config updates which will lead to
// enqueuing replicas.
var queueAdditionOnSystemConfigUpdateRate = settings.RegisterFloatSetting(
Expand Down Expand Up @@ -1278,11 +1267,6 @@ func NewStore(
s.limiters.ConcurrentExportRequests = limit.MakeConcurrentRequestLimiter(
"exportRequestLimiter", int(ExportRequestsLimit.Get(&cfg.Settings.SV)),
)
s.limiters.ConcurrentScanInterleavedIntents = limit.MakeConcurrentRequestLimiter(
"scanInterleavedIntentsLimiter", int(concurrentscanInterleavedIntentsLimit.Get(&cfg.Settings.SV)))
concurrentscanInterleavedIntentsLimit.SetOnChange(&cfg.Settings.SV, func(ctx context.Context) {
s.limiters.ConcurrentScanInterleavedIntents.SetLimit(int(concurrentscanInterleavedIntentsLimit.Get(&cfg.Settings.SV)))
})

// The snapshot storage is usually empty at this point since it is cleared
// after each snapshot application, except when the node crashed right before
Expand Down
13 changes: 0 additions & 13 deletions pkg/kv/kvserver/store_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,19 +376,6 @@ func (s *Store) maybeThrottleBatch(
}
return res, nil

case *roachpb.ScanInterleavedIntentsRequest:
before := timeutil.Now()
res, err := s.limiters.ConcurrentScanInterleavedIntents.Begin(ctx)
if err != nil {
return nil, err
}

waited := timeutil.Since(before)
if waited > time.Second {
log.Infof(ctx, "ScanInterleavedIntents request was delayed by %v", waited)
}
return res, nil

default:
return nil, nil
}
Expand Down
28 changes: 2 additions & 26 deletions pkg/roachpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,20 +523,6 @@ func (r *BarrierResponse) combine(c combinable) error {

var _ combinable = &BarrierResponse{}

// combine implements the combinable interface.
func (r *ScanInterleavedIntentsResponse) combine(c combinable) error {
otherR := c.(*ScanInterleavedIntentsResponse)
if r != nil {
if err := r.ResponseHeader.combine(otherR.Header()); err != nil {
return err
}
r.Intents = append(r.Intents, otherR.Intents...)
}
return nil
}

var _ combinable = &ScanInterleavedIntentsResponse{}

// combine implements the combinable interface.
func (r *QueryLocksResponse) combine(c combinable) error {
otherR := c.(*QueryLocksResponse)
Expand Down Expand Up @@ -801,9 +787,6 @@ func (*AdminVerifyProtectedTimestampRequest) Method() Method { return AdminVerif
// Method implements the Request interface.
func (*QueryResolvedTimestampRequest) Method() Method { return QueryResolvedTimestamp }

// Method implements the Request interface.
func (*ScanInterleavedIntentsRequest) Method() Method { return ScanInterleavedIntents }

// Method implements the Request interface.
func (*BarrierRequest) Method() Method { return Barrier }

Expand Down Expand Up @@ -1086,12 +1069,6 @@ func (r *QueryResolvedTimestampRequest) ShallowCopy() Request {
return &shallowCopy
}

// ShallowCopy implements the Request interface.
func (r *ScanInterleavedIntentsRequest) ShallowCopy() Request {
shallowCopy := *r
return &shallowCopy
}

// ShallowCopy implements the Request interface.
func (r *BarrierRequest) ShallowCopy() Request {
shallowCopy := *r
Expand Down Expand Up @@ -1469,9 +1446,8 @@ func (*RangeStatsRequest) flags() flag { return isRead }
func (*QueryResolvedTimestampRequest) flags() flag {
return isRead | isRange | requiresClosedTSOlderThanStorageSnapshot
}
func (*ScanInterleavedIntentsRequest) flags() flag { return isRead | isRange }
func (*BarrierRequest) flags() flag { return isWrite | isRange }
func (*IsSpanEmptyRequest) flags() flag { return isRead | isRange }
func (*BarrierRequest) flags() flag { return isWrite | isRange }
func (*IsSpanEmptyRequest) flags() flag { return isRead | isRange }

// IsParallelCommit returns whether the EndTxn request is attempting to perform
// a parallel commit. See txn_interceptor_committer.go for a discussion about
Expand Down
Loading

0 comments on commit 2a1abc8

Please sign in to comment.