Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue #100924

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft

Queue #100924

wants to merge 2 commits into from

Conversation

miretskiy
Copy link
Contributor

@miretskiy miretskiy commented Apr 7, 2023

pkg/util: Add generic queue.Queue[T] container

Add generic queue.Queue[T] contaner.
Supports Push() operation. Supports FIFO, and
LIFO consumption (PopFront(), Pop()).

This implementation allocates chunks of []T to
amortize memory allocations.

pkg/util: Add concurrent package

concurrent package implements various primitives
around asynchrnonous execution.

concurrent.Executor defines an executor that can
execute functions.

Of course, Go has a perfectly good go func() mechanism
to execute concurrent code. However, sometimes the caller
wants to avoid spinning up many Go routines in short burst.
Doing so tents to negatively impact Go runtime, and cause
spikes in latency. concurrent.NewWorkQueue implements
a mechanism whereby the caller may create a work queue --
a queue of closures -- that will run on a bounded
number of worker goroutines.

Epic: None

Release note: None

@cockroach-teamcity
Copy link
Member

This change is Reviewable

Yevgeniy Miretskiy added 2 commits April 7, 2023 17:14
Add generic `queue.Queue[T]` contaner.
Supports `Push()` operation.  Suports FIFO, and
LIFO consumption (`PopFront()`, `Pop()`).

This implementation allocates chunks of []T to
amortize memory allocations.

Relase note: None
`concurrent` package implements various primitives
around asynchrnonous execution.

`concurrent.Executor` defines an executor that can
execute functions.

Of course, Go has a perfectly good `go func()` mechanism
to execute concurrent code.  However, sometimes the caller
wants to avoid spinning up many Go routines in short burst.
Doing so tents to negatively impact Go runtime, and cause
spikes in latency.  `concurrent.NewWorkQueue` implements
a mechanism whereby the caller may create a work queue --
a queue of closures -- that will run on a bounded
number of worker goroutines.

Release note: None
@erikgrinaker
Copy link
Contributor

A generic high-performance FIFO/LIFO queue with amortized allocation makes sense. We have another such implementation in the Raft scheduler that it might make sense to combine with this (we'll likely remove the prioritization there shortly in #101023):

// rangeIDQueue is a chunked queue of range IDs. Instead of a separate list
// element for every range ID, it uses a rangeIDChunk to hold many range IDs,
// amortizing the allocation/GC cost. Using a chunk queue avoids any copying
// that would occur if a slice were used (the copying would occur on slice
// reallocation).
//
// The queue has a naive understanding of priority and fairness. For the most
// part, it implements a FIFO queueing policy with no prioritization of some
// ranges over others. However, the queue can be configured with up to one
// high-priority range, which will always be placed at the front when added.
type rangeIDQueue struct {

I'm not so sure about the executor. I feel like there's always enough variation and subtlety in how we want to do concurrency in various components, and it's trivial to spin up a few goroutines as needed. I also feel like we already have too many different utility functions/frameworks for doing stuff like this. If we have more than three places where it makes sense to reuse exactly this, and we can consolidate some other utility functions, then sure, let's do it, but otherwise I'd be inclined to implement custom workers as needed and instead make them easy to construct.

In any case, this seems a bit overkill for the mux rangefeed shutdown case, since there's only a single worker and performance/allocations doesn't really matter afaict.

@miretskiy
Copy link
Contributor Author

@erikgrinaker thanks for the review; do you want me to split up this draft, and just send queue portion for "official" review?

@erikgrinaker
Copy link
Contributor

Sure.

@pav-kv
Copy link
Collaborator

pav-kv commented Apr 11, 2023

@miretskiy Do you have a specific use-case in mind for this queue? Let's compare with the standard Go's slice which can also be used like a queue: enqueue is a simple slice append, dequeue/pop is item, slice = slice[0], slice[1:].

Specifically, if this implementation is better in some sense, let's have benchmarks?

The standard slice is also allocation-efficient: there are O(log N) allocations for N appends. It's also space-efficient: max 2x overhead. Copy-efficient: amortised O(1) copies per element. But it's not thread-safe.

So there should be selling points. Are we aiming to implement a thread-safe/lock-free queue? Which also does not copy entries around, so pointers to its elements stay valid across appends?

Are we basically implementing a var-size channel?

@miretskiy
Copy link
Contributor Author

@miretskiy Do you have a specific use-case in mind for this queue? Let's compare with the standard Go's slice which can also be used like a queue: enqueue is a simple slice append, dequeue/pop is item, slice = slice[0], slice[1:].

Sure; and in the above case, the underlying slice array might stick around for a while;
Many pops like this, and you will wind up with mysterious memory that can't be GCed.
Of course, there are tricks you can use; copy the tail of the queue eventually, but, then, I guess
slice is not that easy to use. Anyways, if this is not used in the muxrf PR, this will probably remain
as a draft PR until more use cases arrise.

Specifically, if this implementation is better in some sense, let's have benchmarks?

The standard slice is also allocation-efficient: there are O(log N) allocations for N appends. It's also space-efficient: max 2x overhead. Copy-efficient: amortised O(1) copies per element. But it's not thread-safe.

So there should be selling points. Are we aiming to implement a thread-safe queue? Which also does not copy entries around, so pointers to its elements stay valid across appends?

Well, that's at least what the other PR tried to do -- built on top of that queue to give more/less efficient
and thread safe producer/consumer queue.

@pav-kv
Copy link
Collaborator

pav-kv commented Apr 11, 2023

Also, if we're supporting both LIFO/FIFO, should we name it Deque? https://en.wikipedia.org/wiki/Double-ended_queue

@pav-kv
Copy link
Collaborator

pav-kv commented Apr 11, 2023

Sure; and in the above case, the underlying slice array might stick around for a while;

Yep. It will be released next time an append reallocates. But I agree the lifetime of an element would be slightly less controlled. This could be worked around as you say: we could sometimes force the reallocation.

With the chunked queue this is true too. An element stays in the chunk until the chunk is released, which generally can happen after a while too (when dequeues pass the chunk border).

@erikgrinaker
Copy link
Contributor

The reason for the rangeIDQueue in the Raft scheduler is precisely to avoid this copying on reallocations.

// rangeIDQueue is a chunked queue of range IDs. Instead of a separate list
// element for every range ID, it uses a rangeIDChunk to hold many range IDs,
// amortizing the allocation/GC cost. Using a chunk queue avoids any copying
// that would occur if a slice were used (the copying would occur on slice
// reallocation).

Those and the chunked reallocations (as opposed to ever-growing ones) are the main advantages over a simple slice. I'd only really consider a chunk queue in performance-critical high-throughput hot paths like the Raft scheduler.

@miretskiy
Copy link
Contributor Author

Those and the chunked reallocations (as opposed to ever-growing ones) are the main advantages over a simple slice. I'd only really consider a chunk queue in performance-critical high-throughput hot paths like the Raft scheduler.

Can't really put cdc in the same realm as raft, but we had to switch to the chunked allocation because of the
same reason: many events arriving, causing many small allocations -- as opposed to chunk allocations.

@pav-kv
Copy link
Collaborator

pav-kv commented Apr 11, 2023

Note that the raft queue has a fixed chunk size. In this PR the chunk size increases depending on the queue len (with min and max limit on the chunk size). This policy could be made a parameter, or we could use the same for simplicity.

@erikgrinaker would there be a benefit in making this queue [semi-]lock-free? Currently the raft one is used under a lock, right? But the lock is really needed for allocating / linking in a new chunk. Within a chunk, appends/pops could be lock-free using some compare-and-swaps.

@pav-kv
Copy link
Collaborator

pav-kv commented Apr 11, 2023

Also, the raft queue seems to be multi-producer/single-consumer? This could also be optimized for.

The complicating bit is that the raft queue is integrated with the scheduler. The lock is used both for the queue, and for the scheduler bits. So, to take advantage of a lock-free queue, we would need to untangle the scheduler a bit.

Maybe there is no large benefit in deduping the enqueues (the invariant in raft scheduler is that the range ID is present in the queue only once). We could make it a bit more best-effort/at-least-once, if the "less locking" benefit outweighs the memory savings. We could probably still retain exactly-once though, with some workarounds.

@miretskiy
Copy link
Contributor Author

Note that the raft queue has a fixed chunk size. In this PR the chunk size increases depending on the queue len (with min and max limit on the chunk size). This policy could be made a parameter, or we could use the same for simplicity.

@erikgrinaker would there be a benefit in making this queue [semi-]lock-free? Currently the raft one is used under a lock, right? But the lock is really needed for allocating / linking in a new chunk. Within a chunk, appends/pops could be lock-free using some compare-and-swaps.

I can certainly make it lock free. And variable chunk size; well, that's just trying to be a bit fancy.
Smaller chunks for smaller queue

@pav-kv
Copy link
Collaborator

pav-kv commented Apr 11, 2023

Also, the raft scheduler has 2 kinds of queues:

  • raftReceiveQueue, which is per-range. It is multi-producer/single-consumer (the consumer is the range under raftMu?).
  • The scheduling queue that keeps the "dirty" ranges. Also multi-producer/single-consumer (the consumer is the scheduler shard).

Seemingly, we could use a generic chunked lock-free queue for both cases.

@erikgrinaker
Copy link
Contributor

The complicating bit is that the raft queue is integrated with the scheduler. The lock is used both for the queue, and for the scheduler bits. So, to take advantage of a lock-free queue, we would need to untangle the scheduler a bit.

Yeah, this is going to require a larger restructuring of the scheduler. Could try some quick experiments just to see how large the gains would be (if any). Want to write up an issue?

@pav-kv
Copy link
Collaborator

pav-kv commented Apr 11, 2023

@erikgrinaker Yeah, I can write up. Also would be up for experimenting, to get a feel for this approach as it might end up useful in rangefeeds work.

@miretskiy
Copy link
Contributor Author

If you don't mind, I'll spend some time to try to make this a lock free deque. to see how it performs

@cockroach-teamcity
Copy link
Member

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.


Yevgeniy Miretskiy seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account.
You have signed the CLA already but the status is still pending? Let us recheck it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants