Skip to content

Commit

Permalink
Adds unbuffered worker (#13)
Browse files Browse the repository at this point in the history
* Adds unbuffered worker

* remove interface method
  • Loading branch information
aidenwallis authored Aug 22, 2022
1 parent a107145 commit 120b323
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 1 deletion.
3 changes: 2 additions & 1 deletion worker/buffered_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type BufferedPool interface {
// compile time assertion
var _ BufferedPool = (*BufferedPoolImpl)(nil)

// PoolImpl is an implementation that is compatible with Pool
// BufferedPoolImpl is an implementation that is compatible with Pool
type BufferedPoolImpl struct {
closeOnce sync.Once

Expand All @@ -31,6 +31,7 @@ type BufferedPoolImpl struct {
WaitGroup sync.WaitGroup
}

// NewBufferedPool creates a new instance of BufferedPoolImpl
func NewBufferedPool(workers, maxQueue int) *BufferedPoolImpl {
w := &BufferedPoolImpl{
Tasks: make(chan func(), maxQueue),
Expand Down
43 changes: 43 additions & 0 deletions worker/unbuffered_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package worker

import "sync"

// UnbufferedPool is an interface which implements BufferedPoolImpl, allowing you to mock out the pool for tests.
type UnbufferedPool interface {
// Do enqueues a task and blocks the goroutine until it's enqueued.
Do(func())

// Wait blocks the goroutine until all tasks are complete.
Wait()
}

// compile time assertion
var _ UnbufferedPool = (*UnbufferedPoolImpl)(nil)

// UnbufferedPoolImpl is an implementation that is compatible with UnbufferedPool.
//
// The main purpose of this tool is to let you ensure all goroutines have been closed before exiting your app, you can
// pass an instance of this around everywhere instead.
type UnbufferedPoolImpl struct {
// WaitGroup tracks how many running/queued tasks there are, we expose Wait() so you can wait until all tasks are complete.
WaitGroup sync.WaitGroup
}

// NewUnbufferedPool creates a new instance of BufferedPoolImpl
func NewUnbufferedPool() *UnbufferedPoolImpl {
return &UnbufferedPoolImpl{}
}

// Do increments the wait group and invokes the goroutine, then decrements it.
func (w *UnbufferedPoolImpl) Do(cb func()) {
w.WaitGroup.Add(1)
go func(cb func()) {
cb()
w.WaitGroup.Done()
}(cb)
}

// Wait blocks the goroutine until all tasks are complete.
func (w *UnbufferedPoolImpl) Wait() {
w.WaitGroup.Wait()
}
32 changes: 32 additions & 0 deletions worker/unbuffered_pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package worker_test

import (
"sync"
"testing"

"github.com/aidenwallis/go-utils/worker"
"github.com/stretchr/testify/assert"
)

func TestUnbufferedPool(t *testing.T) {
t.Parallel()

p := worker.NewUnbufferedPool()

var (
m sync.Mutex
v int
)

for i := 0; i < 10; i++ {
p.Do(func() {
m.Lock()
v++
m.Unlock()
})
}

p.Wait()

assert.Equal(t, 10, v)
}

0 comments on commit 120b323

Please sign in to comment.