Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: Add multi-queue implementation #85640

Merged
merged 1 commit into from
Aug 20, 2022

Conversation

andrewbaptist
Copy link
Collaborator

@andrewbaptist andrewbaptist commented Aug 4, 2022

A multi-queue allows multiple clients to each add to the queue with
their own priorities. The queue will round-robin between the queues
and prioritize within a queue.

Release note: None

Release justification: Does not affect existing code.

@cockroach-teamcity
Copy link
Member

This change is Reviewable

@andrewbaptist andrewbaptist marked this pull request as ready for review August 4, 2022 23:59
@andrewbaptist andrewbaptist requested a review from a team as a code owner August 4, 2022 23:59
@andrewbaptist andrewbaptist force-pushed the receive_queues branch 3 times, most recently from 24332f2 to 696dce9 Compare August 6, 2022 00:04
@andrewbaptist andrewbaptist linked an issue Aug 6, 2022 that may be closed by this pull request
Copy link
Member

@nvanbenschoten nvanbenschoten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 3 files at r1, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andrewbaptist and @shralex)


pkg/kv/kvserver/multi_queue.go line 1 at r1 (raw file):

// Copyright 2018 The Cockroach Authors.

2022


pkg/kv/kvserver/multi_queue.go line 54 at r1 (raw file):

}

// MultiQueue is the main structure that manages fairly round-robins through

"manages fairly round-robins through"

Something is off with this sentence.


pkg/kv/kvserver/multi_queue.go line 88 at r1 (raw file):

// Stop has been called.
func (m *MultiQueue) Start() {
	m.wg.Add(1)

Instead of managing the lifecycle of the queue directly using a waitgroup, we should integrate this with the stopper package. There are plenty of examples of how to do this in pkg/kv/kvserver. For instance, see the raftScheduler.


pkg/kv/kvserver/multi_queue.go line 154 at r1 (raw file):

	heap.Push(&m.outstanding[pos], &t)

	// Notify the MultiQueue that there may be work to do. If it wasn't waiting,

This would be better served by a condition variable that is tied to m.mu. See raftScheduler for an example of how one can be used.


pkg/kv/kvserver/multi_queue.go line 168 at r1 (raw file):

func (m *MultiQueue) TaskDone() {
	// Take my permit back out.
	<-m.concurrencySem

This assumes that the task has already received a notification on its channel. That means that a caller won't be able to call this method on any return paths (e.g. on context cancellation).

However, it may be the case that the caller is canceled concurrently with the notification of its own channel. So these return paths also can't assume the channel was not signaled or we risk leaking concurrency slots.

I think this is why you have the if _, ok := <-m.wakeUp; !ok { logic above, so that if no-one is listening on the channel, you stop sending. But that's also broken, because a task may have registered with the MultiQueue but hasn't yet reached its select statement.

My suggestion is to push the concurrencySem entirely inside of the queue's goroutine. Then use the mutex synchronization on the heap plus each task's channel as the two points of communication. For instance, consider the following structure:

type permit struct{}

type task struct {
	permitC  chan permit
	priority float64
	heapIdx  int // TODO: can be used to remove from heap in O(1). See batchQueue for example.
}

func (m *MultiQueue) Start(stopper *stop.Stopper) {
	for i := 0; i < maxConcurrency< i++ {
		m.concurrencySem <- permit{}
	}

	stopper.RunAsyncTask(..., func() {
		for {
			// Wait for concurrency before choosing task.
			permit := <-m.concurrencySem

			m.mu.Lock()
			for {
				task, ok := tryPop()
				if ok {
					break
				}
				m.cond.Wait()
			}
			m.mu.Unlock()

			task.permitC <- permit
		}
	})
}

func (m *MultiQueue) Cancel(task *task) {
	m.mu.Lock()
	ok := tryRemove(task)
	m.mu.Unlock()
	if !ok {
		// Must have already been popped.
		m.Release(<-task.permitC)
	}
}

func (m *MultiQueue) Release(permit permit) {
	m.concurrencySem <- permit
}

...


func snapshot(ctx context.Context) {
	...
	task := m.Add(name, priority)
	var permit permit
	select {
	case permit = <-task.permitC:
	case <-ctx.Done():
		m.Cancel(task)
		return
	}
	...
	m.Release(permit)
}

Copy link
Collaborator Author

@andrewbaptist andrewbaptist left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @shralex)


pkg/kv/kvserver/multi_queue.go line 1 at r1 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

2022

Done


pkg/kv/kvserver/multi_queue.go line 54 at r1 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

"manages fairly round-robins through"

Something is off with this sentence.

Done


pkg/kv/kvserver/multi_queue.go line 88 at r1 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Instead of managing the lifecycle of the queue directly using a waitgroup, we should integrate this with the stopper package. There are plenty of examples of how to do this in pkg/kv/kvserver. For instance, see the raftScheduler.

Done, thanks for pointing out stopper (I had noticed it before, but didn't understand what it did)


pkg/kv/kvserver/multi_queue.go line 154 at r1 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

This would be better served by a condition variable that is tied to m.mu. See raftScheduler for an example of how one can be used.

Done


pkg/kv/kvserver/multi_queue.go line 168 at r1 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

This assumes that the task has already received a notification on its channel. That means that a caller won't be able to call this method on any return paths (e.g. on context cancellation).

However, it may be the case that the caller is canceled concurrently with the notification of its own channel. So these return paths also can't assume the channel was not signaled or we risk leaking concurrency slots.

I think this is why you have the if _, ok := <-m.wakeUp; !ok { logic above, so that if no-one is listening on the channel, you stop sending. But that's also broken, because a task may have registered with the MultiQueue but hasn't yet reached its select statement.

My suggestion is to push the concurrencySem entirely inside of the queue's goroutine. Then use the mutex synchronization on the heap plus each task's channel as the two points of communication. For instance, consider the following structure:

type permit struct{}

type task struct {
	permitC  chan permit
	priority float64
	heapIdx  int // TODO: can be used to remove from heap in O(1). See batchQueue for example.
}

func (m *MultiQueue) Start(stopper *stop.Stopper) {
	for i := 0; i < maxConcurrency< i++ {
		m.concurrencySem <- permit{}
	}

	stopper.RunAsyncTask(..., func() {
		for {
			// Wait for concurrency before choosing task.
			permit := <-m.concurrencySem

			m.mu.Lock()
			for {
				task, ok := tryPop()
				if ok {
					break
				}
				m.cond.Wait()
			}
			m.mu.Unlock()

			task.permitC <- permit
		}
	})
}

func (m *MultiQueue) Cancel(task *task) {
	m.mu.Lock()
	ok := tryRemove(task)
	m.mu.Unlock()
	if !ok {
		// Must have already been popped.
		m.Release(<-task.permitC)
	}
}

func (m *MultiQueue) Release(permit permit) {
	m.concurrencySem <- permit
}

...


func snapshot(ctx context.Context) {
	...
	task := m.Add(name, priority)
	var permit permit
	select {
	case permit = <-task.permitC:
	case <-ctx.Done():
		m.Cancel(task)
		return
	}
	...
	m.Release(permit)
}

Done, Thanks for all the references - this is much better.

@andrewbaptist andrewbaptist force-pushed the receive_queues branch 4 times, most recently from 7996416 to 9ee4e51 Compare August 16, 2022 18:08
Copy link
Contributor

@AlexTalks AlexTalks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have a bit more to go over here but a couple notes thus far...

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andrewbaptist, @nvanbenschoten, and @shralex)


pkg/kv/kvserver/multi_queue.go line 25 at r2 (raw file):

// Task represents a piece of work that needs to be done.
// When the task is done, it must be closed so that other tasks can run
type Task struct {

nit: given that this is in kvserver and we have other Task types in subpackages, plus things like stopper.RunAsyncTask(..), perhaps a more specific name would be useful here? Or this could be put in a subpackage of some kind.


pkg/kv/kvserver/multi_queue.go line 34 at r2 (raw file):

// Await returns a permit channel which is used with to wait for the permit to
// become available.
func (t *Task) Await() chan Permit {

nit: since Await is a verb, it sounds like this is going to do the waiting - maybe GetWaitSemaphore or GetWaitChan or something?


pkg/kv/kvserver/multi_queue.go line 38 at r2 (raw file):

}

func (t *Task) String() string {

This can likely also be made a redact.SafeValue, see https://wiki.crdb.io/wiki/spaces/CRDB/pages/1824817806/Log+and+error+redactability


pkg/kv/kvserver/multi_queue.go line 136 at r2 (raw file):

		m.mu.Unlock()
	})
	_ = stopper.RunAsyncTask(startCtx, "snapshot-multi-queue", func(ctx context.Context) {

This should probably be created with a particular name, so that it can be %s-multi-queue


pkg/kv/kvserver/multi_queue.go line 211 at r2 (raw file):

// Cancel will cancel a Task that may not have started yet. This is useful if it
// is determined that it is no longer required to run this Task.:w

nit: VIM artifacts? :)

Copy link
Member

@nvanbenschoten nvanbenschoten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is starting to take shape. Nice job!

Reviewed 1 of 3 files at r2, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andrewbaptist and @shralex)


pkg/kv/kvserver/multi_queue.go line 28 at r2 (raw file):

	permitC  chan Permit
	priority float64
	queueIdx int

nit: as a reader of this code, it was not clear what these two indexes represented. I think it would be clearer if you stored the name string here and then renamed idx to heapIdx.


pkg/kv/kvserver/multi_queue.go line 32 at r2 (raw file):

}

// Await returns a permit channel which is used with to wait for the permit to

"used to wait"


pkg/kv/kvserver/multi_queue.go line 70 at r2 (raw file):

	n := len(old)
	x := old[n-1]
	*h = old[0 : n-1]

nit:

old[n-1] = nil // for GC
*h = old[:n-1]

pkg/kv/kvserver/multi_queue.go line 97 at r2 (raw file):

	mu             syncutil.Mutex
	concurrencySem chan Permit
	nameMapping    map[string]int

I wonder if we should switch from indexing the individual queues on strings to indexing them on integers. These will be passed through a protobuf and I assume we'll want to define an enum that defines the set of different snapshot sources (REPLICATE_QUEUE, STORE_REBALANCER, etc.). That will provide marginally more strong typing in case we ever want to do anything different depending on the source.


pkg/kv/kvserver/multi_queue.go line 100 at r2 (raw file):

	lastQueueIndex int
	outstanding    []notifyHeap
	wakeUp         *sync.Cond

nit: this can be a value. Use queue.wakeUp.L = &queue.mu when initializing.

nit: move this up right below mu to make their relationship more clear.


pkg/kv/kvserver/multi_queue.go line 128 at r2 (raw file):

// Stop has been called.
func (m *MultiQueue) Start(startCtx context.Context, stopper *stop.Stopper) {
	_ = stopper.RunAsyncTask(startCtx, "snapshot-multi-queue-quiesce", func(ctx context.Context) {

Do we want to accept a name for the MultiQueue in the constructor? I don't think we should hardcode "snapshot" here.


pkg/kv/kvserver/multi_queue.go line 140 at r2 (raw file):

		for {
			select {
			case <-stopper.ShouldQuiesce():

This structure of waiting in two different places (select, cond.Wait) depending on whether the producer is outpacing the consumer or the consumer is outpacing the producer is a little strange. I think we can avoid it with a structure like the following, which is also a little more idiomatic:

for {
    var p Permit
    var ok bool
    for {
        if m.stopping {
            return
        }
        if !ok {
            p, ok = <-m.concurrencySem
        }
        if ok && m.tryRunNext(p) {
            break
        }
        m.wakeUp.Wait()
    }
}

The only other change that would be needed with this structure is to also m.wakeUp.Signal() when adding to m.concurrencySem in MultiQueue.Release.


pkg/kv/kvserver/multi_queue.go line 162 at r2 (raw file):

}

// runNext will run the next task in order based on

nit: this sentence got cut off.


pkg/kv/kvserver/multi_queue.go line 185 at r2 (raw file):

	defer m.mu.Unlock()

	// the mutex starts locked, unlock when we are ready to run

nit: Recall the capitalization and punctuation requirements from the Go style guide.


pkg/kv/kvserver/multi_queue.go line 195 at r2 (raw file):

	newTask := Task{
		priority: priority,
		permitC:  make(chan Permit),

We can give this a capacity of 1, because the sender doesn't need to wait for the receiver. In fact, I think there's actually the potential for a deadlock if it does wait, as the receiver may be calling Cancel at the same time that the caller is holding the mutex and sending in tryRunNext.

Do we need a randomized test that exercises cancellation?


pkg/kv/kvserver/multi_queue.go line 219 at r2 (raw file):

	ok := m.outstanding[task.queueIdx].tryRemove(task)
	if !ok {
		// Assume has already been popped,

This is more than just an assumption. This will block if the task hasn't been given the permit. Maybe we avoid that with:

if p, ok := <-task.permitC; ok {
    m.Release(p)
}

We know we aren't racing and leaking a permit because we hold the lock and the queue goroutine will only send while holding the lock.


pkg/kv/kvserver/multi_queue.go line 224 at r2 (raw file):

}

//Release needs to be called once the Task that was running has completed and

nit: missing spaces at the beginning of each comment line.

Copy link
Collaborator Author

@andrewbaptist andrewbaptist left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dismissed @AlexTalks from 2 discussions.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andrewbaptist, @nvanbenschoten, and @shralex)


pkg/kv/kvserver/multi_queue.go line 25 at r2 (raw file):

Previously, AlexTalks (Alex Sarkesian) wrote…

nit: given that this is in kvserver and we have other Task types in subpackages, plus things like stopper.RunAsyncTask(..), perhaps a more specific name would be useful here? Or this could be put in a subpackage of some kind.

Done, I moved this to kv/kvserver/multiqueue as it makes the dependencies cleaner, and possibly will have other uses. (I'm hoping that reviewable will handle this ok...)


pkg/kv/kvserver/multi_queue.go line 28 at r2 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

nit: as a reader of this code, it was not clear what these two indexes represented. I think it would be clearer if you stored the name string here and then renamed idx to heapIdx.

Done


pkg/kv/kvserver/multi_queue.go line 32 at r2 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

"used to wait"

Done


pkg/kv/kvserver/multi_queue.go line 34 at r2 (raw file):

Previously, AlexTalks (Alex Sarkesian) wrote…

nit: since Await is a verb, it sounds like this is going to do the waiting - maybe GetWaitSemaphore or GetWaitChan or something?

Done


pkg/kv/kvserver/multi_queue.go line 38 at r2 (raw file):

Previously, AlexTalks (Alex Sarkesian) wrote…

This can likely also be made a redact.SafeValue, see https://wiki.crdb.io/wiki/spaces/CRDB/pages/1824817806/Log+and+error+redactability

Done


pkg/kv/kvserver/multi_queue.go line 70 at r2 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

nit:

old[n-1] = nil // for GC
*h = old[:n-1]

Done


pkg/kv/kvserver/multi_queue.go line 97 at r2 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

I wonder if we should switch from indexing the individual queues on strings to indexing them on integers. These will be passed through a protobuf and I assume we'll want to define an enum that defines the set of different snapshot sources (REPLICATE_QUEUE, STORE_REBALANCER, etc.). That will provide marginally more strong typing in case we ever want to do anything different depending on the source.

Working, I bounced back and forth on this idea, the advantage of strong typing is exactly what you describe, but the disadvantage is dealing with version compatibility in the future. With the current design, it is very agnostic of use and can allow different splitting (or even dynamic named queues, as long as they are bounded). With an enum some of those things become harder, however it does provide some additional possible capabilities. It seems like this is the slightly more flexible model, so I was inclined to keep it this way but wanted to chat with you first if you don't agree. Moving to a separate package makes this even less tied to a specific usage.


pkg/kv/kvserver/multi_queue.go line 100 at r2 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

nit: this can be a value. Use queue.wakeUp.L = &queue.mu when initializing.

nit: move this up right below mu to make their relationship more clear.

OK, moved, but didn't change to use the .L syntax. It was much less readable to me, and it wasn't clear to me that the very minor difference in perf would matter. It seems like in the code base we have between both of these.


pkg/kv/kvserver/multi_queue.go line 128 at r2 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Do we want to accept a name for the MultiQueue in the constructor? I don't think we should hardcode "snapshot" here.

Done, passing in a name now


pkg/kv/kvserver/multi_queue.go line 136 at r2 (raw file):

Previously, AlexTalks (Alex Sarkesian) wrote…

This should probably be created with a particular name, so that it can be %s-multi-queue

Done


pkg/kv/kvserver/multi_queue.go line 140 at r2 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

This structure of waiting in two different places (select, cond.Wait) depending on whether the producer is outpacing the consumer or the consumer is outpacing the producer is a little strange. I think we can avoid it with a structure like the following, which is also a little more idiomatic:

for {
    var p Permit
    var ok bool
    for {
        if m.stopping {
            return
        }
        if !ok {
            p, ok = <-m.concurrencySem
        }
        if ok && m.tryRunNext(p) {
            break
        }
        m.wakeUp.Wait()
    }
}

The only other change that would be needed with this structure is to also m.wakeUp.Signal() when adding to m.concurrencySem in MultiQueue.Release.

Working, I wanted to avoid an extra wakeUp signal, but I refactored this to be more idiomatic. Please look at the changes again and see if they are better. The main loop is meant to balance the consumer and producer rates by keeping the permits between 0 and n. The refactor you suggested does still have both the waits (on select and cond.Wait) in the loop, so I'm not sure I see the advantage. I did change to not look at the ShouldQuiese in this loop anymore as it wasn't necessary, and also changed to close the concurrencySem when the stopper goroutine gets a quiese signal.

for p := range m.concurrencySem {
	for {
		if m.stopping {
			break
		}	
		if m.tryRunNext(p) {
			break
		}
		m.wakeUp.Wait()
	}

pkg/kv/kvserver/multi_queue.go line 162 at r2 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

nit: this sentence got cut off.

Done


pkg/kv/kvserver/multi_queue.go line 185 at r2 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

nit: Recall the capitalization and punctuation requirements from the Go style guide.

Done


pkg/kv/kvserver/multi_queue.go line 195 at r2 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

We can give this a capacity of 1, because the sender doesn't need to wait for the receiver. In fact, I think there's actually the potential for a deadlock if it does wait, as the receiver may be calling Cancel at the same time that the caller is holding the mutex and sending in tryRunNext.

Do we need a randomized test that exercises cancellation?

Done, I changed the capacity to 1 and I expanded the TestMultiQueueStress test to add random cancellation cases. I do not hit any deadlocks even without changing the capacity to 1 but I agree that tryRunNext should never block on giving the permit. I think it would block if the client neither read from the permit nor Canceled which is undesirable.


pkg/kv/kvserver/multi_queue.go line 211 at r2 (raw file):

Previously, AlexTalks (Alex Sarkesian) wrote…

nit: VIM artifacts? :)

Done


pkg/kv/kvserver/multi_queue.go line 219 at r2 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

This is more than just an assumption. This will block if the task hasn't been given the permit. Maybe we avoid that with:

if p, ok := <-task.permitC; ok {
    m.Release(p)
}

We know we aren't racing and leaking a permit because we hold the lock and the queue goroutine will only send while holding the lock.

Done, thanks for the tip here, I wasn't sure how to make that assumption more concrete, and this definitely helps.


pkg/kv/kvserver/multi_queue.go line 224 at r2 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

nit: missing spaces at the beginning of each comment line.

Done, weird that the formatter doesn't do this automattically.

@andrewbaptist andrewbaptist force-pushed the receive_queues branch 3 times, most recently from c2d9ff7 to e497ab4 Compare August 17, 2022 20:53
Copy link
Member

@nvanbenschoten nvanbenschoten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a few more comments, then this should be good to go.

Reviewed 1 of 4 files at r3, 2 of 2 files at r4, 2 of 2 files at r6, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andrewbaptist and @shralex)


pkg/kv/kvserver/multi_queue.go line 97 at r2 (raw file):
I don't think I understand the compatability concerns with making this an integer. Protobufs allow values to be added to or removed from an enum across versions. In https://developers.google.com/protocol-buffers/docs/proto3#enum, see:

In languages that support open enum types with values outside the range of specified symbols, such as C++ and Go, the unknown enum value is simply stored as its underlying integer representation.

Recipients of a new enum value will not be able to identify it against any of their expected enum values, but the value itself will just be the enum value's integer representation.


pkg/kv/kvserver/multi_queue.go line 140 at r2 (raw file):

The refactor you suggested does still have both the waits (on select and cond.Wait) in the loop, so I'm not sure I see the advantage.

I think what you have is good, but just for the sake of onboarding on Go, recall that p, ok = <-m.concurrencySem is a non-blocking read of the channel, so there was only one wait in that loop.


pkg/kv/kvserver/multiqueue/multi_queue.go line 38 at r6 (raw file):

// GetWaitChan returns a permit channel which is used to wait for the permit to
// become available.

We should note here that the user may not ever receive a notification on this channel if the stopper passed to Start begins to quiesce, so it should also select on stopper.ShouldQuiesce().


pkg/kv/kvserver/multiqueue/multi_queue_test.go line 39 at r6 (raw file):

	queue := NewMultiQueue("test", 1)
	queue.Start(ctx, stopper)
	stopper.Stop(ctx)

In this test and others, it's usually common in tests to see:

stopper := stop.NewStopper()
defer stopper.Stop(ctx)

This avoids the risk of forgetting to stop the stopper.


pkg/kv/kvserver/multiqueue/multi_queue_test.go line 194 at r6 (raw file):

	for i, task := range tasks {
		var found Permit
		testutils.SucceedsWithin(t, func() error {

Why SucceedsWithin instead of SucceedsSoon?


pkg/kv/kvserver/multiqueue/multi_queue_test.go line 195 at r6 (raw file):

		var found Permit
		testutils.SucceedsWithin(t, func() error {
			for j, t2 := range tasks[i+1:] {

I feel like you could use reflect.Select to make this cleaner and avoid the retry loop entirely. Think it's worth giving that a try?

@andrewbaptist andrewbaptist force-pushed the receive_queues branch 2 times, most recently from feca31d to 4ec8242 Compare August 18, 2022 21:33
Copy link
Collaborator Author

@andrewbaptist andrewbaptist left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @nvanbenschoten and @shralex)


pkg/kv/kvserver/multi_queue.go line 97 at r2 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

I don't think I understand the compatability concerns with making this an integer. Protobufs allow values to be added to or removed from an enum across versions. In https://developers.google.com/protocol-buffers/docs/proto3#enum, see:

In languages that support open enum types with values outside the range of specified symbols, such as C++ and Go, the unknown enum value is simply stored as its underlying integer representation.

Recipients of a new enum value will not be able to identify it against any of their expected enum values, but the value itself will just be the enum value's integer representation.

Done, I understand what you mean now, I thought you meant to index against an enum directly (not an int). This makes more sense and I changed to take an int.


pkg/kv/kvserver/multi_queue.go line 140 at r2 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

The refactor you suggested does still have both the waits (on select and cond.Wait) in the loop, so I'm not sure I see the advantage.

I think what you have is good, but just for the sake of onboarding on Go, recall that p, ok = <-m.concurrencySem is a non-blocking read of the channel, so there was only one wait in that loop.

OK, I didn't think that that form of reading from a channel was non-blocking, the doc is somewhat poorly worded here: https://go.dev/ref/spec#Receive_operator and makes it seem like it only returns if closed. I created a test like this:

c := make(chan struct{})
_, ok := <-c
fmt.Printf("%t", ok)

and it never gets to the end. If I change it like this:

c := make(chan struct{})
select {
case _, ok := <-c:
	fmt.Printf("%t", ok)
default:
}
It finishes but doesn't print anything. Is there something I'm doing wrong here?

pkg/kv/kvserver/multiqueue/multi_queue.go line 38 at r6 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

We should note here that the user may not ever receive a notification on this channel if the stopper passed to Start begins to quiesce, so it should also select on stopper.ShouldQuiesce().

Done, See next commit - stopper complete removed.


pkg/kv/kvserver/multiqueue/multi_queue_test.go line 39 at r6 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

In this test and others, it's usually common in tests to see:

stopper := stop.NewStopper()
defer stopper.Stop(ctx)

This avoids the risk of forgetting to stop the stopper.

Done, changed to defer.


pkg/kv/kvserver/multiqueue/multi_queue_test.go line 194 at r6 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Why SucceedsWithin instead of SucceedsSoon?

Done, I had it that way for debugging some of my tests, but forgot to change it back.


pkg/kv/kvserver/multiqueue/multi_queue_test.go line 195 at r6 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

I feel like you could use reflect.Select to make this cleaner and avoid the retry loop entirely. Think it's worth giving that a try?

OK, I was able to remove the retry loop (it loops once after each release to check every queue). I could probably use reflect here, but doesn't seem as necessary anymore.

Copy link
Member

@nvanbenschoten nvanbenschoten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flushing a few comments, but I'm having trouble with adding new comments to Reviewable. Maybe this will help.

Reviewed 1 of 3 files at r7, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andrewbaptist, @nvanbenschoten, and @shralex)


pkg/kv/kvserver/multi_queue.go line 97 at r2 (raw file):

Previously, andrewbaptist (Andrew Baptist) wrote…

Done, I understand what you mean now, I thought you meant to index against an enum directly (not an int). This makes more sense and I changed to take an int.

Yeah, this looks better. With that change, do we want to rename references to "name" to "type", or something like that?


pkg/kv/kvserver/multi_queue.go line 140 at r2 (raw file):

Previously, andrewbaptist (Andrew Baptist) wrote…

OK, I didn't think that that form of reading from a channel was non-blocking, the doc is somewhat poorly worded here: https://go.dev/ref/spec#Receive_operator and makes it seem like it only returns if closed. I created a test like this:

c := make(chan struct{})
_, ok := <-c
fmt.Printf("%t", ok)

and it never gets to the end. If I change it like this:

c := make(chan struct{})
select {
case _, ok := <-c:
	fmt.Printf("%t", ok)
default:
}
It finishes but doesn't print anything. Is there something I'm doing wrong here?

Ah sorry, I was wrong about this! You're right that _, ok := <-c is not non-blocking.


pkg/kv/kvserver/multiqueue/multi_queue.go line 196 at r9 (raw file):

			// Only release if the channel is open, and we can get the permit.
			if ok {
				p.valid = false

Do we need to set valid = false here?


pkg/kv/kvserver/multiqueue/multi_queue.go line 204 at r9 (raw file):

		}
	}
	m.tryRunNext()

Is this needed if we didn't receive a permit on our channel (i.e. if we didn't hit the if ok { case)?

Copy link
Member

@nvanbenschoten nvanbenschoten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable is still having trouble on this PR, so flushing two more comments. Otherwise, this looks very good. I'm glad you were able to get rid of the goroutines and the stopper.


// Permit is a token which is returned from a Task.GetWaitChan call.
type Permit struct {
valid bool
Copy link
Member

@nvanbenschoten nvanbenschoten Aug 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a good idea, but then let's pass a pointer in through the channel (chan *Permit) so that all interfaces work on *Permit.

// tryRunNext will run the next task in order round-robin through the queues and in
// priority order within a queue. It will return true if it ran a task. The
// MultiQueue.mu lock must be held before calling this func.
func (m *MultiQueue) tryRunNext() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if this requires the mutex to be held, it's common to call it tryRunNextLocked.

@andrewbaptist andrewbaptist force-pushed the receive_queues branch 3 times, most recently from 0918136 to d1eeb79 Compare August 19, 2022 18:45
Copy link
Collaborator Author

@andrewbaptist andrewbaptist left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @nvanbenschoten and @shralex)


pkg/kv/kvserver/multiqueue/multi_queue.go line 196 at r9 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Do we need to set valid = false here?

It shouldn't matter whether in practice we set this or not, but the permit is no longer valid after this point (we took it off the channel and so the end user shouldn't get it). It is nice for us to set this to false though as no one else can safely release this permit.


pkg/kv/kvserver/multiqueue/multi_queue.go line 204 at r9 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Is this needed if we didn't receive a permit on our channel (i.e. if we didn't hit the if ok { case)?

Done, You are correct, this only needs to happen in the case where we free a permit, moved it up.


pkg/kv/kvserver/multiqueue/multi_queue.go line 119 at r10 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

This seems like a good idea, but then let's pass a pointer in through the channel (chan *Permit) so that all interfaces work on *Permit.

Done, Still getting up to speed with the best way to do this so thanks!


pkg/kv/kvserver/multiqueue/multi_queue.go line 125 at r10 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

nit: if this requires the mutex to be held, it's common to call it tryRunNextLocked.

OK, renamed

Copy link
Member

@nvanbenschoten nvanbenschoten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

case p, ok := <-task.permitC:
// Only release if the channel is open, and we can get the permit.
if ok {
p.valid = false
Copy link
Member

@nvanbenschoten nvanbenschoten Aug 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider extracting a releaseLocked method that you can call from here and from Release to avoid repetition. Something like:

func (m *MultiQueue) Release(permit *Permit) {
	if !permit.valid {
		panic("double release of permit")
	}
	m.mu.Lock()
	defer m.mu.Unlock()
  m.releaseLocked(permit)
}

func (m *MultiQueue) releaseLocked(permit *Permit) {
  p.valid = false
  m.remainingRuns++
	m.tryRunNextLocked()
}


// Len returns the number of additional tasks that can be added without
// queueing. This will return 0 if there is anything queued. This method should
// only be used for testing.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this method is only for testing then you can actually define it in multi_queue_test.go, because that's in the same package.

@andrewbaptist
Copy link
Collaborator Author

bors r=nvanbenschoten

@craig
Copy link
Contributor

craig bot commented Aug 19, 2022

👎 Rejected by code reviews

@andrewbaptist andrewbaptist dismissed AlexTalks’s stale review August 19, 2022 21:35

Addressed all points

A multi-queue allows multiple clients to each add to the queue with
their own priorities. The queue will round-robin between the queues
and prioritize within a queue.

Release note: None

Release justification: Does not change any existing code.
@andrewbaptist
Copy link
Collaborator Author

bors r=nvanbenschoten

@craig
Copy link
Contributor

craig bot commented Aug 20, 2022

Build succeeded:

@craig craig bot merged commit 4e2c09d into cockroachdb:master Aug 20, 2022
@andrewbaptist andrewbaptist deleted the receive_queues branch December 15, 2023 21:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

kvserver: prioritize replication activity
4 participants