diff --git a/pkg/cacheutil/async_op.go b/pkg/cacheutil/async_op.go index 524d2fdba2..f03f1d0827 100644 --- a/pkg/cacheutil/async_op.go +++ b/pkg/cacheutil/async_op.go @@ -5,9 +5,15 @@ package cacheutil import ( "sync" + + "github.com/pkg/errors" +) + +var ( + ErrAsyncBufferFull = errors.New("the async buffer is full") ) -type asyncOperationProcessor struct { +type AsyncOperationProcessor struct { // Channel used to notify internal goroutines when they should quit. stop chan struct{} @@ -18,8 +24,9 @@ type asyncOperationProcessor struct { workers sync.WaitGroup } -func newAsyncOperationProcessor(bufferSize, concurrency int) *asyncOperationProcessor { - p := &asyncOperationProcessor{ +// NewAsyncOperationProcessor creates an async processor with given bufferSize and concurrency. +func NewAsyncOperationProcessor(bufferSize, concurrency int) *AsyncOperationProcessor { + p := &AsyncOperationProcessor{ stop: make(chan struct{}, 1), asyncQueue: make(chan func(), bufferSize), } @@ -32,14 +39,14 @@ func newAsyncOperationProcessor(bufferSize, concurrency int) *asyncOperationProc return p } -func (p *asyncOperationProcessor) Stop() { +func (p *AsyncOperationProcessor) Stop() { close(p.stop) // Wait until all workers have terminated. p.workers.Wait() } -func (p *asyncOperationProcessor) asyncQueueProcessLoop() { +func (p *AsyncOperationProcessor) asyncQueueProcessLoop() { defer p.workers.Done() for { @@ -52,11 +59,12 @@ func (p *asyncOperationProcessor) asyncQueueProcessLoop() { } } -func (p *asyncOperationProcessor) enqueueAsync(op func()) error { +// EnqueueAsync enqueues op to async queue. If enqueue failed, ErrAsyncBufferFull is returned. +func (p *AsyncOperationProcessor) EnqueueAsync(op func()) error { select { case p.asyncQueue <- op: return nil default: - return errMemcachedAsyncBufferFull + return ErrAsyncBufferFull } } diff --git a/pkg/cacheutil/memcached_client.go b/pkg/cacheutil/memcached_client.go index 55d07d9d5a..29caaed02b 100644 --- a/pkg/cacheutil/memcached_client.go +++ b/pkg/cacheutil/memcached_client.go @@ -40,7 +40,6 @@ const ( ) var ( - errMemcachedAsyncBufferFull = errors.New("the async buffer is full") errMemcachedConfigNoAddrs = errors.New("no memcached addresses provided") errMemcachedDNSUpdateIntervalNotPositive = errors.New("DNS provider update interval must be positive") errMemcachedMaxAsyncConcurrencyNotPositive = errors.New("max async concurrency must be positive") @@ -195,7 +194,7 @@ type memcachedClient struct { duration *prometheus.HistogramVec dataSize *prometheus.HistogramVec - p *asyncOperationProcessor + p *AsyncOperationProcessor } // AddressProvider performs node address resolution given a list of clusters. @@ -278,7 +277,7 @@ func newMemcachedClient( config.MaxGetMultiConcurrency, gate.Gets, ), - p: newAsyncOperationProcessor(config.MaxAsyncBufferSize, config.MaxAsyncConcurrency), + p: NewAsyncOperationProcessor(config.MaxAsyncBufferSize, config.MaxAsyncConcurrency), } c.clientInfo = promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{ @@ -372,7 +371,7 @@ func (c *memcachedClient) SetAsync(key string, value []byte, ttl time.Duration) return nil } - err := c.p.enqueueAsync(func() { + err := c.p.EnqueueAsync(func() { start := time.Now() c.operations.WithLabelValues(opSet).Inc() @@ -400,7 +399,7 @@ func (c *memcachedClient) SetAsync(key string, value []byte, ttl time.Duration) c.duration.WithLabelValues(opSet).Observe(time.Since(start).Seconds()) }) - if errors.Is(err, errMemcachedAsyncBufferFull) { + if errors.Is(err, ErrAsyncBufferFull) { c.skipped.WithLabelValues(opSet, reasonAsyncBufferFull).Inc() level.Debug(c.logger).Log("msg", "failed to store item to memcached because the async buffer is full", "err", err, "size", len(c.p.asyncQueue)) return nil diff --git a/pkg/cacheutil/redis_client.go b/pkg/cacheutil/redis_client.go index 1ef3755ca7..032ed2942d 100644 --- a/pkg/cacheutil/redis_client.go +++ b/pkg/cacheutil/redis_client.go @@ -149,7 +149,7 @@ type RedisClient struct { durationSetMulti prometheus.Observer durationGetMulti prometheus.Observer - p *asyncOperationProcessor + p *AsyncOperationProcessor } // NewRedisClient makes a new RedisClient. @@ -221,7 +221,7 @@ func NewRedisClientWithConfig(logger log.Logger, name string, config RedisClient client: client, config: config, logger: logger, - p: newAsyncOperationProcessor(config.MaxAsyncBufferSize, config.MaxAsyncConcurrency), + p: NewAsyncOperationProcessor(config.MaxAsyncBufferSize, config.MaxAsyncConcurrency), getMultiGate: gate.New( extprom.WrapRegistererWithPrefix("thanos_redis_getmulti_", reg), config.MaxGetMultiConcurrency, @@ -247,7 +247,7 @@ func NewRedisClientWithConfig(logger log.Logger, name string, config RedisClient // SetAsync implement RemoteCacheClient. func (c *RedisClient) SetAsync(key string, value []byte, ttl time.Duration) error { - return c.p.enqueueAsync(func() { + return c.p.EnqueueAsync(func() { start := time.Now() if err := c.client.Do(context.Background(), c.client.B().Set().Key(key).Value(rueidis.BinaryString(value)).ExSeconds(int64(ttl.Seconds())).Build()).Error(); err != nil { level.Warn(c.logger).Log("msg", "failed to set item into redis", "err", err, "key", key, "value_size", len(value))