Skip to content

Commit

Permalink
exposes asyncOperationProcessor (thanos-io#6908)
Browse files Browse the repository at this point in the history
expose struct



fix lint

Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 authored and douglascamata committed Dec 13, 2023
1 parent 5a51d4e commit 0d21207
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 15 deletions.
22 changes: 15 additions & 7 deletions pkg/cacheutil/async_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@ package cacheutil

import (
"sync"

"github.com/pkg/errors"
)

var (
ErrAsyncBufferFull = errors.New("the async buffer is full")
)

type asyncOperationProcessor struct {
type AsyncOperationProcessor struct {
// Channel used to notify internal goroutines when they should quit.
stop chan struct{}

Expand All @@ -18,8 +24,9 @@ type asyncOperationProcessor struct {
workers sync.WaitGroup
}

func newAsyncOperationProcessor(bufferSize, concurrency int) *asyncOperationProcessor {
p := &asyncOperationProcessor{
// NewAsyncOperationProcessor creates an async processor with given bufferSize and concurrency.
func NewAsyncOperationProcessor(bufferSize, concurrency int) *AsyncOperationProcessor {
p := &AsyncOperationProcessor{
stop: make(chan struct{}, 1),
asyncQueue: make(chan func(), bufferSize),
}
Expand All @@ -32,14 +39,14 @@ func newAsyncOperationProcessor(bufferSize, concurrency int) *asyncOperationProc
return p
}

func (p *asyncOperationProcessor) Stop() {
func (p *AsyncOperationProcessor) Stop() {
close(p.stop)

// Wait until all workers have terminated.
p.workers.Wait()
}

func (p *asyncOperationProcessor) asyncQueueProcessLoop() {
func (p *AsyncOperationProcessor) asyncQueueProcessLoop() {
defer p.workers.Done()

for {
Expand All @@ -52,11 +59,12 @@ func (p *asyncOperationProcessor) asyncQueueProcessLoop() {
}
}

func (p *asyncOperationProcessor) enqueueAsync(op func()) error {
// EnqueueAsync enqueues op to async queue. If enqueue failed, ErrAsyncBufferFull is returned.
func (p *AsyncOperationProcessor) EnqueueAsync(op func()) error {
select {
case p.asyncQueue <- op:
return nil
default:
return errMemcachedAsyncBufferFull
return ErrAsyncBufferFull
}
}
9 changes: 4 additions & 5 deletions pkg/cacheutil/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ const (
)

var (
errMemcachedAsyncBufferFull = errors.New("the async buffer is full")
errMemcachedConfigNoAddrs = errors.New("no memcached addresses provided")
errMemcachedDNSUpdateIntervalNotPositive = errors.New("DNS provider update interval must be positive")
errMemcachedMaxAsyncConcurrencyNotPositive = errors.New("max async concurrency must be positive")
Expand Down Expand Up @@ -195,7 +194,7 @@ type memcachedClient struct {
duration *prometheus.HistogramVec
dataSize *prometheus.HistogramVec

p *asyncOperationProcessor
p *AsyncOperationProcessor
}

// AddressProvider performs node address resolution given a list of clusters.
Expand Down Expand Up @@ -278,7 +277,7 @@ func newMemcachedClient(
config.MaxGetMultiConcurrency,
gate.Gets,
),
p: newAsyncOperationProcessor(config.MaxAsyncBufferSize, config.MaxAsyncConcurrency),
p: NewAsyncOperationProcessor(config.MaxAsyncBufferSize, config.MaxAsyncConcurrency),
}

c.clientInfo = promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{
Expand Down Expand Up @@ -372,7 +371,7 @@ func (c *memcachedClient) SetAsync(key string, value []byte, ttl time.Duration)
return nil
}

err := c.p.enqueueAsync(func() {
err := c.p.EnqueueAsync(func() {
start := time.Now()
c.operations.WithLabelValues(opSet).Inc()

Expand Down Expand Up @@ -400,7 +399,7 @@ func (c *memcachedClient) SetAsync(key string, value []byte, ttl time.Duration)
c.duration.WithLabelValues(opSet).Observe(time.Since(start).Seconds())
})

if errors.Is(err, errMemcachedAsyncBufferFull) {
if errors.Is(err, ErrAsyncBufferFull) {
c.skipped.WithLabelValues(opSet, reasonAsyncBufferFull).Inc()
level.Debug(c.logger).Log("msg", "failed to store item to memcached because the async buffer is full", "err", err, "size", len(c.p.asyncQueue))
return nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/cacheutil/redis_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ type RedisClient struct {
durationSetMulti prometheus.Observer
durationGetMulti prometheus.Observer

p *asyncOperationProcessor
p *AsyncOperationProcessor
}

// NewRedisClient makes a new RedisClient.
Expand Down Expand Up @@ -221,7 +221,7 @@ func NewRedisClientWithConfig(logger log.Logger, name string, config RedisClient
client: client,
config: config,
logger: logger,
p: newAsyncOperationProcessor(config.MaxAsyncBufferSize, config.MaxAsyncConcurrency),
p: NewAsyncOperationProcessor(config.MaxAsyncBufferSize, config.MaxAsyncConcurrency),
getMultiGate: gate.New(
extprom.WrapRegistererWithPrefix("thanos_redis_getmulti_", reg),
config.MaxGetMultiConcurrency,
Expand All @@ -247,7 +247,7 @@ func NewRedisClientWithConfig(logger log.Logger, name string, config RedisClient

// SetAsync implement RemoteCacheClient.
func (c *RedisClient) SetAsync(key string, value []byte, ttl time.Duration) error {
return c.p.enqueueAsync(func() {
return c.p.EnqueueAsync(func() {
start := time.Now()
if err := c.client.Do(context.Background(), c.client.B().Set().Key(key).Value(rueidis.BinaryString(value)).ExSeconds(int64(ttl.Seconds())).Build()).Error(); err != nil {
level.Warn(c.logger).Log("msg", "failed to set item into redis", "err", err, "key", key, "value_size", len(value))
Expand Down

0 comments on commit 0d21207

Please sign in to comment.