Skip to content

Commit

Permalink
✅ create internal/worker/queue test (#606)
Browse files Browse the repository at this point in the history
* ✅ create internal/worker/queue test

Signed-off-by: vankichi <[email protected]>

* ✅ add comment

Signed-off-by: vankichi <[email protected]>

* ✅ fix

Signed-off-by: vankichi <[email protected]>

* ✅ fix spell

Signed-off-by: vankichi <[email protected]>

* ✅ fix

Signed-off-by: vankichi <[email protected]>

* ✅ fix

Signed-off-by: vankichi <[email protected]>

* ✅ fix

Signed-off-by: vankichi <[email protected]>

* ✅ fix

Signed-off-by: vankichi <[email protected]>

* ✅ fix

Signed-off-by: vankichi <[email protected]>

* ✅ fix

Signed-off-by: vankichi <[email protected]>
  • Loading branch information
vankichi authored Aug 6, 2020
1 parent 9cd5fc9 commit 5698abc
Show file tree
Hide file tree
Showing 2 changed files with 534 additions and 316 deletions.
12 changes: 12 additions & 0 deletions internal/worker/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/vdaas/vald/internal/safety"
)

// Queue represents the interface of queue.
type Queue interface {
Start(ctx context.Context) (<-chan error, error)
Push(ctx context.Context, job JobFunc) error
Expand All @@ -45,6 +46,7 @@ type queue struct {
running atomic.Value
}

// NewQueue returns Queue if no error is occurred.
func NewQueue(opts ...QueueOption) (Queue, error) {
q := new(queue)
for _, opt := range append(defaultQueueOpts, opts...) {
Expand All @@ -62,13 +64,17 @@ func NewQueue(opts ...QueueOption) (Queue, error) {
return q, nil
}

// Start starts execute queueing if queue is not runnnig.
// If queue is already reunning, it returns error.
// It returns the error channel that the queueing job return.
func (q *queue) Start(ctx context.Context) (<-chan error, error) {
if q.isRunning() {
return nil, errors.ErrQueueIsAlreadyRunning()
}

ech := make(chan error, 1)
q.eg.Go(safety.RecoverFunc(func() (err error) {
defer close(ech)
defer close(q.outCh)
defer close(q.inCh)
defer q.running.Store(false)
Expand Down Expand Up @@ -102,10 +108,13 @@ func (q *queue) Start(ctx context.Context) (<-chan error, error) {
return ech, nil
}

// isRunning returns true when queue is already running or false when queue is not running.
func (q *queue) isRunning() bool {
return q.running.Load().(bool)
}

// Push sends JobFunc to channel, which will be used for stock JobFunc, when JobFunc is not nil and queue is running.
// If JobFunc is nil or queue is not running, Push returns error.
func (q *queue) Push(ctx context.Context, job JobFunc) error {
if job == nil {
return errors.ErrJobFuncIsNil()
Expand All @@ -123,6 +132,8 @@ func (q *queue) Push(ctx context.Context, job JobFunc) error {
}
}

// Pop returns (JobFunc, nil) if the channnel, which will be used for queuing job, contains JobFunc.
// It returns (nil ,error) if it failed to pop from the job queue.
func (q *queue) Pop(ctx context.Context) (JobFunc, error) {
return q.pop(ctx, q.Len())
}
Expand All @@ -149,6 +160,7 @@ func (q *queue) pop(ctx context.Context, retry uint64) (JobFunc, error) {
return q.pop(ctx, retry)
}

// Len returns the length of queue.
func (q *queue) Len() uint64 {
return q.qLen.Load().(uint64)
}
Loading

0 comments on commit 5698abc

Please sign in to comment.