Skip to content

Commit

Permalink
Pass a context to the query worker pool (#3350)
Browse files Browse the repository at this point in the history
* Pass a context to the query worker pool

This ensures requests that have already timed out don't stick around
waiting for a worker to complete the request.

* use std ctx

* Add fast worker pools to amortize ctx checking

The fast wrappers allow a caller to amortize the cost of checking the
Done channel over a batchSize of iterations.
  • Loading branch information
ryanhall07 authored Mar 31, 2021
1 parent cc58917 commit 1843061
Show file tree
Hide file tree
Showing 12 changed files with 242 additions and 12 deletions.
5 changes: 5 additions & 0 deletions src/dbnode/integration/query_cancellation_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ type mockWorkerPool struct {
hook func(ctx context.Context)
}


func newMockWorkerPool() *mockWorkerPool {
return &mockWorkerPool{}
}
Expand Down Expand Up @@ -216,3 +217,7 @@ func (p *mockWorkerPool) GoWithContext(ctx context.Context, work xsync.Work) boo
go func() { work() }()
return true
}

func (p *mockWorkerPool) FastContextCheck(batchSize int) xsync.PooledWorkerPool {
return p
}
1 change: 1 addition & 0 deletions src/query/remote/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ func (c *grpcClient) FetchProm(
}

return storage.SeriesIteratorsToPromResult(
ctx,
result,
c.opts.ReadWorkerPool(),
c.opts.TagOptions())
Expand Down
1 change: 1 addition & 0 deletions src/query/storage/m3/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func (s *m3storage) FetchProm(

result.Metadata.Resolutions = resolutions
fetchResult, err := storage.SeriesIteratorsToPromResult(
ctx,
result,
s.opts.ReadWorkerPool(),
s.opts.TagOptions(),
Expand Down
18 changes: 15 additions & 3 deletions src/query/storage/prom_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package storage

import (
"context"
"sync"

"github.com/m3db/m3/src/dbnode/encoding"
Expand Down Expand Up @@ -88,6 +89,7 @@ func toPromSequentially(
}

func toPromConcurrently(
ctx context.Context,
fetchResult consolidators.SeriesFetchResult,
readWorkerPool xsync.PooledWorkerPool,
tagOptions models.TagOptions,
Expand All @@ -101,6 +103,7 @@ func toPromConcurrently(
mu sync.Mutex
)

fastWorkerPool := readWorkerPool.FastContextCheck(100)
for i := 0; i < count; i++ {
i := i
iter, tags, err := fetchResult.IterTagsAtIndex(i, tagOptions)
Expand All @@ -109,7 +112,7 @@ func toPromConcurrently(
}

wg.Add(1)
readWorkerPool.Go(func() {
available := fastWorkerPool.GoWithContext(ctx, func() {
defer wg.Done()
series, err := iteratorToPromResult(iter, tags, tagOptions)
if err != nil {
Expand All @@ -120,6 +123,13 @@ func toPromConcurrently(

seriesList[i] = series
})
if !available {
wg.Done()
mu.Lock()
multiErr = multiErr.Add(ctx.Err())
mu.Unlock()
break
}
}

wg.Wait()
Expand All @@ -143,6 +153,7 @@ func toPromConcurrently(
}

func seriesIteratorsToPromResult(
ctx context.Context,
fetchResult consolidators.SeriesFetchResult,
readWorkerPool xsync.PooledWorkerPool,
tagOptions models.TagOptions,
Expand All @@ -151,12 +162,13 @@ func seriesIteratorsToPromResult(
return toPromSequentially(fetchResult, tagOptions)
}

return toPromConcurrently(fetchResult, readWorkerPool, tagOptions)
return toPromConcurrently(ctx, fetchResult, readWorkerPool, tagOptions)
}

// SeriesIteratorsToPromResult converts raw series iterators directly to a
// Prometheus-compatible result.
func SeriesIteratorsToPromResult(
ctx context.Context,
fetchResult consolidators.SeriesFetchResult,
readWorkerPool xsync.PooledWorkerPool,
tagOptions models.TagOptions,
Expand All @@ -166,7 +178,7 @@ func SeriesIteratorsToPromResult(
return PromResult{}, err
}

promResult, err := seriesIteratorsToPromResult(fetchResult,
promResult, err := seriesIteratorsToPromResult(ctx, fetchResult,
readWorkerPool, tagOptions)
promResult.Metadata = fetchResult.Metadata
return promResult, err
Expand Down
37 changes: 28 additions & 9 deletions src/query/storage/prom_converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package storage

import (
"context"
"testing"
"time"

Expand Down Expand Up @@ -77,10 +78,10 @@ func verifyExpandPromSeries(
fetchResult.Metadata = block.ResultMetadata{
Exhaustive: ex,
LocalOnly: true,
Warnings: []block.Warning{block.Warning{Name: "foo", Message: "bar"}},
Warnings: []block.Warning{{Name: "foo", Message: "bar"}},
}

results, err := SeriesIteratorsToPromResult(fetchResult, pools, nil)
results, err := SeriesIteratorsToPromResult(context.Background(), fetchResult, pools, nil)
assert.NoError(t, err)

require.NotNil(t, results)
Expand All @@ -91,7 +92,7 @@ func verifyExpandPromSeries(
require.Equal(t, "foo_bar", results.Metadata.Warnings[0].Header())
require.Equal(t, len(ts), num)
expectedTags := []prompb.Label{
prompb.Label{
{
Name: []byte("foo"),
Value: []byte("bar"),
},
Expand All @@ -112,6 +113,24 @@ func testExpandPromSeries(t *testing.T, ex bool, pools xsync.PooledWorkerPool) {
}
}

func TestContextCanceled(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

ctx, cancel := context.WithCancel(context.Background())
cancel()

pool, err := xsync.NewPooledWorkerPool(100, xsync.NewPooledWorkerPoolOptions())
require.NoError(t, err)
pool.Init()

iters := seriesiter.NewMockSeriesIters(ctrl, ident.Tag{}, 1, 2)
fetchResult := fr(t, iters, makeTag("foo", "bar", 1)...)
_, err = SeriesIteratorsToPromResult(ctx, fetchResult, pool, nil)
require.Error(t, err)
require.Contains(t, err.Error(), "context canceled")
}

func TestExpandPromSeriesNilPools(t *testing.T) {
testExpandPromSeries(t, false, nil)
testExpandPromSeries(t, true, nil)
Expand Down Expand Up @@ -165,7 +184,7 @@ func TestIteratorsToPromResult(t *testing.T) {
result := FetchResultToPromResult(r, false)
expected := &prompb.QueryResult{
Timeseries: []*prompb.TimeSeries{
&prompb.TimeSeries{
{
Labels: []prompb.Label{{Name: []byte("c"), Value: []byte("d")}},
Samples: []prompb.Sample{{Timestamp: promNow, Value: 1}},
},
Expand All @@ -178,11 +197,11 @@ func TestIteratorsToPromResult(t *testing.T) {
result = FetchResultToPromResult(r, true)
expected = &prompb.QueryResult{
Timeseries: []*prompb.TimeSeries{
&prompb.TimeSeries{
{
Labels: []prompb.Label{{Name: []byte("a"), Value: []byte("b")}},
Samples: []prompb.Sample{},
},
&prompb.TimeSeries{
{
Labels: []prompb.Label{{Name: []byte("c"), Value: []byte("d")}},
Samples: []prompb.Sample{{Timestamp: promNow, Value: 1}},
},
Expand All @@ -196,7 +215,7 @@ func TestIteratorsToPromResult(t *testing.T) {
type overwrite func()

func setupTags(name, value string) (ident.Tags, overwrite) {
var buckets = []pool.Bucket{{Capacity: 100, Count: 2}}
buckets := []pool.Bucket{{Capacity: 100, Count: 2}}
bytesPool := pool.NewCheckedBytesPool(buckets, nil,
func(sizes []pool.Bucket) pool.BytesPool {
return pool.NewBytesPool(sizes, nil)
Expand Down Expand Up @@ -302,15 +321,15 @@ func TestDecodeIteratorsWithEmptySeries(t *testing.T) {
}

opts := models.NewTagOptions()
res, err := SeriesIteratorsToPromResult(buildIters(), nil, opts)
res, err := SeriesIteratorsToPromResult(context.Background(), buildIters(), nil, opts)
require.NoError(t, err)
verifyResult(t, res)

pool, err := xsync.NewPooledWorkerPool(10, xsync.NewPooledWorkerPoolOptions())
require.NoError(t, err)
pool.Init()

res, err = SeriesIteratorsToPromResult(buildIters(), pool, opts)
res, err = SeriesIteratorsToPromResult(context.Background(), buildIters(), pool, opts)
require.NoError(t, err)
verifyResult(t, res)
}
62 changes: 62 additions & 0 deletions src/x/sync/fast_pooled_worker_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package sync

import (
"context"
"time"
)

type fastPooledWorkerPool struct {
workerPool PooledWorkerPool
batchSize int
count int
}

var _ PooledWorkerPool = &fastPooledWorkerPool{}

func (f *fastPooledWorkerPool) Init() {
f.workerPool.Init()
}

func (f *fastPooledWorkerPool) Go(work Work) {
f.workerPool.Go(work)
}

func (f *fastPooledWorkerPool) GoWithTimeout(work Work, timeout time.Duration) bool {
return f.workerPool.GoWithTimeout(work, timeout)
}

func (f *fastPooledWorkerPool) GoWithContext(ctx context.Context, work Work) bool {
f.count++
if f.count == 1 {
return f.workerPool.GoWithContext(ctx, work)
}
if f.count == f.batchSize {
f.count = 0
}
f.workerPool.Go(work)
return true
}

func (f *fastPooledWorkerPool) FastContextCheck(batchSize int) PooledWorkerPool {
return &fastPooledWorkerPool{workerPool: f.workerPool, batchSize: batchSize}
}
74 changes: 74 additions & 0 deletions src/x/sync/fast_worker_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package sync

import (
"time"

"github.com/m3db/m3/src/x/context"
)

type fastWorkerPool struct {
workerPool WorkerPool
batchSize int
count int
}

var _ WorkerPool = &fastWorkerPool{}

func (p *fastWorkerPool) Init() {
p.workerPool.Init()
}

func (p *fastWorkerPool) Go(work Work) {
p.workerPool.Go(work)
}

func (p *fastWorkerPool) GoInstrument(work Work) ScheduleResult {
return p.workerPool.GoInstrument(work)
}

func (p *fastWorkerPool) GoIfAvailable(work Work) bool {
return p.workerPool.GoIfAvailable(work)
}

func (p *fastWorkerPool) GoWithTimeout(work Work, timeout time.Duration) bool {
return p.workerPool.GoWithTimeout(work, timeout)
}

func (p *fastWorkerPool) GoWithTimeoutInstrument(work Work, timeout time.Duration) ScheduleResult {
return p.workerPool.GoWithTimeoutInstrument(work, timeout)
}

func (p *fastWorkerPool) GoWithContext(ctx context.Context, work Work) ScheduleResult {
p.count++
if p.count == 1 {
return p.workerPool.GoWithContext(ctx, work)
}
if p.count == p.batchSize {
p.count = 0
}
return p.workerPool.GoInstrument(work)
}

func (p *fastWorkerPool) FastContextCheck(batchSize int) WorkerPool {
return &fastWorkerPool{workerPool: p.workerPool, batchSize: batchSize}
}
4 changes: 4 additions & 0 deletions src/x/sync/pooled_worker_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ func (p *pooledWorkerPool) GoWithContext(ctx context.Context, work Work) bool {
return p.work(maybeContext{ctx: ctx}, work, 0)
}

func (p *pooledWorkerPool) FastContextCheck(batchSize int) PooledWorkerPool {
return &fastPooledWorkerPool{workerPool: p, batchSize: batchSize}
}

// maybeContext works around the linter about optionally
// passing the context for scenarios where we don't want to use
// context in the APIs.
Expand Down
17 changes: 17 additions & 0 deletions src/x/sync/pooled_worker_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,20 @@ func TestPooledWorkerPoolSizeTooSmall(t *testing.T) {
_, err := NewPooledWorkerPool(0, NewPooledWorkerPoolOptions())
require.Error(t, err)
}

func TestPooledWorkerFast(t *testing.T) {
wp, err := NewPooledWorkerPool(1, NewPooledWorkerPoolOptions())
require.NoError(t, err)
wp.Init()

fast := wp.FastContextCheck(3)

ctx, cancel := context.WithCancel(context.Background())
cancel()

require.False(t, fast.GoWithContext(ctx, func() {}))
require.True(t, fast.GoWithContext(ctx, func() {}))
require.True(t, fast.GoWithContext(ctx, func() {}))
require.False(t, fast.GoWithContext(ctx, func() {}))
require.True(t, fast.GoWithContext(ctx, func() {}))
}
Loading

0 comments on commit 1843061

Please sign in to comment.