Skip to content

Commit

Permalink
feat: Add QueuedChannel
Browse files Browse the repository at this point in the history
Allows the user to queue items on a channel that may be blocked waiting
for reads, without having to wait for those read to complete.
  • Loading branch information
LBeernaertProton committed Aug 30, 2022
1 parent 15b3218 commit 5014519
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 0 deletions.
8 changes: 8 additions & 0 deletions internal/queue/ctqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ func NewCTQueue[T any]() *CTQueue[T] {
}
}

func NewCTQueueWithCapacity[T any](capacity int) *CTQueue[T] {
return &CTQueue[T]{
items: make([]T, 0, capacity),
cond: sync.NewCond(&sync.Mutex{}),
closed: 0,
}
}

func (ctq *CTQueue[T]) Push(val T) bool {
ctq.cond.L.Lock()
defer ctq.cond.L.Unlock()
Expand Down
49 changes: 49 additions & 0 deletions internal/queue/queued_channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package queue

// QueuedChannel represents a channel on which queued items can be published without having to worry if the reader
// has actually consumed existing items first or if there's no way of knowing ahead of time what the ideal channel
// buffer size should be.
type QueuedChannel[T any] struct {
ch chan T
closeCh chan struct{}
queue *CTQueue[T]
}

func NewQueuedChannel[T any](channelBufferSize int, capacity int) *QueuedChannel[T] {
queue := &QueuedChannel[T]{
ch: make(chan T, channelBufferSize),
queue: NewCTQueueWithCapacity[T](capacity),
closeCh: make(chan struct{}),
}

go func() {
for {
item, ok := queue.queue.Pop()
if !ok {
return
}

select {
case queue.ch <- item:

case <-queue.closeCh:
return
}
}
}()

return queue
}

func (q *QueuedChannel[T]) Queue(items ...T) bool {
return q.queue.PushMany(items...)
}

func (q *QueuedChannel[T]) GetChannel() <-chan T {
return q.ch
}

func (q *QueuedChannel[T]) Close() {
q.queue.Close()
close(q.closeCh)
}

0 comments on commit 5014519

Please sign in to comment.