Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: avoid blocking the command queue for scans with limits #31904

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 59 additions & 15 deletions pkg/storage/command_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/interval"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

// A CommandQueue maintains an interval tree of keys or key ranges for
Expand Down Expand Up @@ -81,11 +82,12 @@ type CommandQueue struct {
}

type cmd struct {
id int64
key interval.Range
readOnly bool
timestamp hlc.Timestamp
debugInfo summaryWriter
id int64
key interval.Range
readOnly bool
preEvaluate bool // collects pre-requisites and dependents, instead of waiting
timestamp hlc.Timestamp
debugInfo summaryWriter

buffered bool // is this cmd buffered in readsBuffer
expanded bool // have the children been added
Expand All @@ -101,7 +103,15 @@ type cmd struct {
// in the prereq slice to avoid duplicates.
prereqIDs map[int64]struct{}

pending chan struct{} // closed when complete
// If pre-evaluating, collect commands which overlap. Because
// potentially multiple dependent commands will modify
// overlappingCmds, we need to protect it with a mutex.
mu struct {
syncutil.Mutex
overlappingCmds []*cmd
}

pending chan struct{} // closed when complete, or if pre-evaluating
}

// A summaryWriter is capable of writing a summary about itself. It is typically
Expand Down Expand Up @@ -270,13 +280,23 @@ func (c *cmd) ResolvePendingPrereq() {
}
}
for _, newPre := range *pre.prereqs {
if newPre == nil {
continue
}
if _, ok := c.prereqIDs[newPre.id]; !ok {
*c.prereqs = append(*c.prereqs, newPre)
c.prereqIDs[newPre.id] = struct{}{}
}
}
}

// If the prereq is pre-evaluating, let it know that this command overlaps.
if pre.preEvaluate {
pre.mu.Lock()
pre.mu.overlappingCmds = append(pre.mu.overlappingCmds, c)
pre.mu.Unlock()
}

// Truncate the command's prerequisite list so that it no longer includes
// the first prerequisite. Before doing so, nil out prefix of slice to allow
// GC of the first command. Without this, large chunks of the dependency
Expand All @@ -289,7 +309,7 @@ func (c *cmd) ResolvePendingPrereq() {
delete(c.prereqIDs, pre.id)
}

// OptimisticallyResolvePrereqs removes all prerequisite in the cmd's prereq
// OptimisticallyResolvePrereqs removes all prerequisites in the cmd's prereq
// slice that have already finished without blocking on pending commands.
// Prerequisite commands that are still pending or that were canceled are left
// in the prereq slice.
Expand All @@ -313,6 +333,20 @@ func (c *cmd) OptimisticallyResolvePrereqs() {
(*c.prereqs) = (*c.prereqs)[:j]
}

// Overlaps returns whether the supplied range overlaps with any span
// in the command.
func (c *cmd) Overlaps(o interval.Range) bool {
if len(c.children) == 0 {
return interval.ExclusiveOverlapper.Overlap(o, c.key)
}
for i := range c.children {
if interval.ExclusiveOverlapper.Overlap(o, c.children[i].key) {
return true
}
}
return false
}

// NewCommandQueue returns a new command queue. The boolean specifies whether
// to enable the covering span optimization. With this optimization, whenever
// a command consisting of multiple spans is added, a covering span is computed
Expand Down Expand Up @@ -428,8 +462,8 @@ func (cq *CommandQueue) flushReadsBuffer() {
// are at earlier timestamps than pending writes, and writes to proceed if they are
// at later timestamps than pending reads.
func (cq *CommandQueue) getPrereqs(
readOnly bool, timestamp hlc.Timestamp, spans []roachpb.Span,
) (prereqs []*cmd) {
readOnly, preEvaluate bool, timestamp hlc.Timestamp, spans []roachpb.Span,
) (prereqs []*cmd, overlappingCmds []*cmd) {
prepareSpans(spans)

addPrereq := func(prereq *cmd) {
Expand Down Expand Up @@ -465,6 +499,9 @@ func (cq *CommandQueue) getPrereqs(
if overlapCount := int64(len(overlaps)); overlapCount > cq.localMetrics.maxOverlapsSeen {
cq.localMetrics.maxOverlapsSeen = overlapCount
}
if preEvaluate {
overlappingCmds = append(overlappingCmds, overlaps...)
}

// Sort overlapping commands by command ID and iterate from latest to earliest,
// adding the commands' ranges to the RangeGroup to determine gating keyspace
Expand Down Expand Up @@ -633,7 +670,7 @@ func (cq *CommandQueue) getPrereqs(
cq.wRg.Clear()
cq.rwRg.Clear()
}
return prereqs
return prereqs, overlappingCmds
}

// getOverlaps returns a slice of values which overlap the specified
Expand Down Expand Up @@ -724,7 +761,11 @@ func (o *overlapHeap) PopOverlap() *cmd {
//
// Returns a nil `cmd` when no spans are given.
func (cq *CommandQueue) add(
readOnly bool, timestamp hlc.Timestamp, prereqs []*cmd, spans []roachpb.Span,
readOnly, preEvaluate bool,
timestamp hlc.Timestamp,
prereqs,
overlappingCmds []*cmd,
spans []roachpb.Span,
) *cmd {
if len(spans) == 0 {
return nil
Expand Down Expand Up @@ -766,9 +807,13 @@ func (cq *CommandQueue) add(
cmd.id = cq.nextID()
cmd.key = coveringSpan.AsRange()
cmd.readOnly = readOnly
cmd.preEvaluate = preEvaluate
cmd.timestamp = timestamp
cmd.prereqsBuf = prereqs
cmd.prereqs = &cmd.prereqsBuf
cmd.mu.Lock()
cmd.mu.overlappingCmds = overlappingCmds
cmd.mu.Unlock()

cmd.expanded = false
if len(spans) > 1 {
Expand Down Expand Up @@ -928,19 +973,18 @@ func (cq *CommandQueue) GetSnapshot() CommandQueueSnapshot {

func (cqs CommandQueueSnapshot) addCommandsFromTree(tree interval.Tree) {
tree.Do(func(item interval.Interface) (done bool) {
currentCmd := item.(*cmd)
cqs.addCommand(*currentCmd)
cqs.addCommand(item.(*cmd))
return false
})
}

// addCommand adds all leaf commands to the snapshot. This is done by
// either adding the given command if it's a leaf, or recursively calling
// itself on the given command's children.
func (cqs CommandQueueSnapshot) addCommand(command cmd) {
func (cqs CommandQueueSnapshot) addCommand(command *cmd) {
if len(command.children) > 0 {
for i := range command.children {
cqs.addCommand(command.children[i])
cqs.addCommand(&command.children[i])
}
return
}
Expand Down
Loading