diff --git a/src/dbnode/integration/query_cancellation_client_test.go b/src/dbnode/integration/query_cancellation_client_test.go index 261c219ce6..1b54c93959 100644 --- a/src/dbnode/integration/query_cancellation_client_test.go +++ b/src/dbnode/integration/query_cancellation_client_test.go @@ -179,6 +179,7 @@ type mockWorkerPool struct { hook func(ctx context.Context) } + func newMockWorkerPool() *mockWorkerPool { return &mockWorkerPool{} } @@ -216,3 +217,7 @@ func (p *mockWorkerPool) GoWithContext(ctx context.Context, work xsync.Work) boo go func() { work() }() return true } + +func (p *mockWorkerPool) FastContextCheck(batchSize int) xsync.PooledWorkerPool { + return p +} diff --git a/src/query/remote/client.go b/src/query/remote/client.go index e4c6e0ba7d..9a32596cd9 100644 --- a/src/query/remote/client.go +++ b/src/query/remote/client.go @@ -243,6 +243,7 @@ func (c *grpcClient) FetchProm( } return storage.SeriesIteratorsToPromResult( + ctx, result, c.opts.ReadWorkerPool(), c.opts.TagOptions()) diff --git a/src/query/storage/m3/storage.go b/src/query/storage/m3/storage.go index dadf40fa17..be1a719e2a 100644 --- a/src/query/storage/m3/storage.go +++ b/src/query/storage/m3/storage.go @@ -116,6 +116,7 @@ func (s *m3storage) FetchProm( result.Metadata.Resolutions = resolutions fetchResult, err := storage.SeriesIteratorsToPromResult( + ctx, result, s.opts.ReadWorkerPool(), s.opts.TagOptions(), diff --git a/src/query/storage/prom_converter.go b/src/query/storage/prom_converter.go index f0b3869773..437a844e62 100644 --- a/src/query/storage/prom_converter.go +++ b/src/query/storage/prom_converter.go @@ -21,6 +21,7 @@ package storage import ( + "context" "sync" "github.com/m3db/m3/src/dbnode/encoding" @@ -88,6 +89,7 @@ func toPromSequentially( } func toPromConcurrently( + ctx context.Context, fetchResult consolidators.SeriesFetchResult, readWorkerPool xsync.PooledWorkerPool, tagOptions models.TagOptions, @@ -101,6 +103,7 @@ func toPromConcurrently( mu sync.Mutex ) + fastWorkerPool := readWorkerPool.FastContextCheck(100) for i := 0; i < count; i++ { i := i iter, tags, err := fetchResult.IterTagsAtIndex(i, tagOptions) @@ -109,7 +112,7 @@ func toPromConcurrently( } wg.Add(1) - readWorkerPool.Go(func() { + available := fastWorkerPool.GoWithContext(ctx, func() { defer wg.Done() series, err := iteratorToPromResult(iter, tags, tagOptions) if err != nil { @@ -120,6 +123,13 @@ func toPromConcurrently( seriesList[i] = series }) + if !available { + wg.Done() + mu.Lock() + multiErr = multiErr.Add(ctx.Err()) + mu.Unlock() + break + } } wg.Wait() @@ -143,6 +153,7 @@ func toPromConcurrently( } func seriesIteratorsToPromResult( + ctx context.Context, fetchResult consolidators.SeriesFetchResult, readWorkerPool xsync.PooledWorkerPool, tagOptions models.TagOptions, @@ -151,12 +162,13 @@ func seriesIteratorsToPromResult( return toPromSequentially(fetchResult, tagOptions) } - return toPromConcurrently(fetchResult, readWorkerPool, tagOptions) + return toPromConcurrently(ctx, fetchResult, readWorkerPool, tagOptions) } // SeriesIteratorsToPromResult converts raw series iterators directly to a // Prometheus-compatible result. func SeriesIteratorsToPromResult( + ctx context.Context, fetchResult consolidators.SeriesFetchResult, readWorkerPool xsync.PooledWorkerPool, tagOptions models.TagOptions, @@ -166,7 +178,7 @@ func SeriesIteratorsToPromResult( return PromResult{}, err } - promResult, err := seriesIteratorsToPromResult(fetchResult, + promResult, err := seriesIteratorsToPromResult(ctx, fetchResult, readWorkerPool, tagOptions) promResult.Metadata = fetchResult.Metadata return promResult, err diff --git a/src/query/storage/prom_converter_test.go b/src/query/storage/prom_converter_test.go index 0f74a23f11..ff28525745 100644 --- a/src/query/storage/prom_converter_test.go +++ b/src/query/storage/prom_converter_test.go @@ -21,6 +21,7 @@ package storage import ( + "context" "testing" "time" @@ -77,10 +78,10 @@ func verifyExpandPromSeries( fetchResult.Metadata = block.ResultMetadata{ Exhaustive: ex, LocalOnly: true, - Warnings: []block.Warning{block.Warning{Name: "foo", Message: "bar"}}, + Warnings: []block.Warning{{Name: "foo", Message: "bar"}}, } - results, err := SeriesIteratorsToPromResult(fetchResult, pools, nil) + results, err := SeriesIteratorsToPromResult(context.Background(), fetchResult, pools, nil) assert.NoError(t, err) require.NotNil(t, results) @@ -91,7 +92,7 @@ func verifyExpandPromSeries( require.Equal(t, "foo_bar", results.Metadata.Warnings[0].Header()) require.Equal(t, len(ts), num) expectedTags := []prompb.Label{ - prompb.Label{ + { Name: []byte("foo"), Value: []byte("bar"), }, @@ -112,6 +113,24 @@ func testExpandPromSeries(t *testing.T, ex bool, pools xsync.PooledWorkerPool) { } } +func TestContextCanceled(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + pool, err := xsync.NewPooledWorkerPool(100, xsync.NewPooledWorkerPoolOptions()) + require.NoError(t, err) + pool.Init() + + iters := seriesiter.NewMockSeriesIters(ctrl, ident.Tag{}, 1, 2) + fetchResult := fr(t, iters, makeTag("foo", "bar", 1)...) + _, err = SeriesIteratorsToPromResult(ctx, fetchResult, pool, nil) + require.Error(t, err) + require.Contains(t, err.Error(), "context canceled") +} + func TestExpandPromSeriesNilPools(t *testing.T) { testExpandPromSeries(t, false, nil) testExpandPromSeries(t, true, nil) @@ -165,7 +184,7 @@ func TestIteratorsToPromResult(t *testing.T) { result := FetchResultToPromResult(r, false) expected := &prompb.QueryResult{ Timeseries: []*prompb.TimeSeries{ - &prompb.TimeSeries{ + { Labels: []prompb.Label{{Name: []byte("c"), Value: []byte("d")}}, Samples: []prompb.Sample{{Timestamp: promNow, Value: 1}}, }, @@ -178,11 +197,11 @@ func TestIteratorsToPromResult(t *testing.T) { result = FetchResultToPromResult(r, true) expected = &prompb.QueryResult{ Timeseries: []*prompb.TimeSeries{ - &prompb.TimeSeries{ + { Labels: []prompb.Label{{Name: []byte("a"), Value: []byte("b")}}, Samples: []prompb.Sample{}, }, - &prompb.TimeSeries{ + { Labels: []prompb.Label{{Name: []byte("c"), Value: []byte("d")}}, Samples: []prompb.Sample{{Timestamp: promNow, Value: 1}}, }, @@ -196,7 +215,7 @@ func TestIteratorsToPromResult(t *testing.T) { type overwrite func() func setupTags(name, value string) (ident.Tags, overwrite) { - var buckets = []pool.Bucket{{Capacity: 100, Count: 2}} + buckets := []pool.Bucket{{Capacity: 100, Count: 2}} bytesPool := pool.NewCheckedBytesPool(buckets, nil, func(sizes []pool.Bucket) pool.BytesPool { return pool.NewBytesPool(sizes, nil) @@ -302,7 +321,7 @@ func TestDecodeIteratorsWithEmptySeries(t *testing.T) { } opts := models.NewTagOptions() - res, err := SeriesIteratorsToPromResult(buildIters(), nil, opts) + res, err := SeriesIteratorsToPromResult(context.Background(), buildIters(), nil, opts) require.NoError(t, err) verifyResult(t, res) @@ -310,7 +329,7 @@ func TestDecodeIteratorsWithEmptySeries(t *testing.T) { require.NoError(t, err) pool.Init() - res, err = SeriesIteratorsToPromResult(buildIters(), pool, opts) + res, err = SeriesIteratorsToPromResult(context.Background(), buildIters(), pool, opts) require.NoError(t, err) verifyResult(t, res) } diff --git a/src/x/sync/fast_pooled_worker_pool.go b/src/x/sync/fast_pooled_worker_pool.go new file mode 100644 index 0000000000..ccbefd4e9b --- /dev/null +++ b/src/x/sync/fast_pooled_worker_pool.go @@ -0,0 +1,62 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package sync + +import ( + "context" + "time" +) + +type fastPooledWorkerPool struct { + workerPool PooledWorkerPool + batchSize int + count int +} + +var _ PooledWorkerPool = &fastPooledWorkerPool{} + +func (f *fastPooledWorkerPool) Init() { + f.workerPool.Init() +} + +func (f *fastPooledWorkerPool) Go(work Work) { + f.workerPool.Go(work) +} + +func (f *fastPooledWorkerPool) GoWithTimeout(work Work, timeout time.Duration) bool { + return f.workerPool.GoWithTimeout(work, timeout) +} + +func (f *fastPooledWorkerPool) GoWithContext(ctx context.Context, work Work) bool { + f.count++ + if f.count == 1 { + return f.workerPool.GoWithContext(ctx, work) + } + if f.count == f.batchSize { + f.count = 0 + } + f.workerPool.Go(work) + return true +} + +func (f *fastPooledWorkerPool) FastContextCheck(batchSize int) PooledWorkerPool { + return &fastPooledWorkerPool{workerPool: f.workerPool, batchSize: batchSize} +} diff --git a/src/x/sync/fast_worker_pool.go b/src/x/sync/fast_worker_pool.go new file mode 100644 index 0000000000..78b610599a --- /dev/null +++ b/src/x/sync/fast_worker_pool.go @@ -0,0 +1,74 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package sync + +import ( + "time" + + "github.com/m3db/m3/src/x/context" +) + +type fastWorkerPool struct { + workerPool WorkerPool + batchSize int + count int +} + +var _ WorkerPool = &fastWorkerPool{} + +func (p *fastWorkerPool) Init() { + p.workerPool.Init() +} + +func (p *fastWorkerPool) Go(work Work) { + p.workerPool.Go(work) +} + +func (p *fastWorkerPool) GoInstrument(work Work) ScheduleResult { + return p.workerPool.GoInstrument(work) +} + +func (p *fastWorkerPool) GoIfAvailable(work Work) bool { + return p.workerPool.GoIfAvailable(work) +} + +func (p *fastWorkerPool) GoWithTimeout(work Work, timeout time.Duration) bool { + return p.workerPool.GoWithTimeout(work, timeout) +} + +func (p *fastWorkerPool) GoWithTimeoutInstrument(work Work, timeout time.Duration) ScheduleResult { + return p.workerPool.GoWithTimeoutInstrument(work, timeout) +} + +func (p *fastWorkerPool) GoWithContext(ctx context.Context, work Work) ScheduleResult { + p.count++ + if p.count == 1 { + return p.workerPool.GoWithContext(ctx, work) + } + if p.count == p.batchSize { + p.count = 0 + } + return p.workerPool.GoInstrument(work) +} + +func (p *fastWorkerPool) FastContextCheck(batchSize int) WorkerPool { + return &fastWorkerPool{workerPool: p.workerPool, batchSize: batchSize} +} diff --git a/src/x/sync/pooled_worker_pool.go b/src/x/sync/pooled_worker_pool.go index 463d06bf2f..b9250709eb 100644 --- a/src/x/sync/pooled_worker_pool.go +++ b/src/x/sync/pooled_worker_pool.go @@ -99,6 +99,10 @@ func (p *pooledWorkerPool) GoWithContext(ctx context.Context, work Work) bool { return p.work(maybeContext{ctx: ctx}, work, 0) } +func (p *pooledWorkerPool) FastContextCheck(batchSize int) PooledWorkerPool { + return &fastPooledWorkerPool{workerPool: p, batchSize: batchSize} +} + // maybeContext works around the linter about optionally // passing the context for scenarios where we don't want to use // context in the APIs. diff --git a/src/x/sync/pooled_worker_pool_test.go b/src/x/sync/pooled_worker_pool_test.go index a7db7b8910..eb8bcc91e1 100644 --- a/src/x/sync/pooled_worker_pool_test.go +++ b/src/x/sync/pooled_worker_pool_test.go @@ -228,3 +228,20 @@ func TestPooledWorkerPoolSizeTooSmall(t *testing.T) { _, err := NewPooledWorkerPool(0, NewPooledWorkerPoolOptions()) require.Error(t, err) } + +func TestPooledWorkerFast(t *testing.T) { + wp, err := NewPooledWorkerPool(1, NewPooledWorkerPoolOptions()) + require.NoError(t, err) + wp.Init() + + fast := wp.FastContextCheck(3) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + require.False(t, fast.GoWithContext(ctx, func() {})) + require.True(t, fast.GoWithContext(ctx, func() {})) + require.True(t, fast.GoWithContext(ctx, func() {})) + require.False(t, fast.GoWithContext(ctx, func() {})) + require.True(t, fast.GoWithContext(ctx, func() {})) +} diff --git a/src/x/sync/types.go b/src/x/sync/types.go index c7200a114b..6fa797a6cd 100644 --- a/src/x/sync/types.go +++ b/src/x/sync/types.go @@ -70,6 +70,13 @@ type PooledWorkerPool interface { // GoWithContext waits until a worker is available or the provided ctx is // canceled. GoWithContext(ctx gocontext.Context, work Work) bool + + // FastContextCheck returns a wrapper worker pool that only checks the context deadline every batchSize calls. + // This is useful for tight looping code that wants to amortize the cost of the ctx deadline check over batchSize + // iterations. + // This should only be used for code that can guarantee the wait time for a worker is low since if the ctx is not + // checked the calling goroutine blocks waiting for a worker. + FastContextCheck(batchSize int) PooledWorkerPool } // NewPooledWorkerOptions is a set of new instrument worker pool options. @@ -105,6 +112,13 @@ type WorkerPool interface { // GoWithContext waits until a worker is available or the provided ctx is canceled. GoWithContext(ctx context.Context, work Work) ScheduleResult + + // FastContextCheck returns a wrapper worker pool that only checks the context deadline every batchSize calls. + // This is useful for tight looping code that wants to amortize the cost of the ctx deadline check over batchSize + // iterations. + // This should only be used for code that can guarantee the wait time for a worker is low since if the ctx is not + // checked the calling goroutine blocks waiting for a worker. + FastContextCheck(batchSize int) WorkerPool } // ScheduleResult is the result of scheduling a goroutine in the worker pool. diff --git a/src/x/sync/worker_pool.go b/src/x/sync/worker_pool.go index 14790fc4a0..93fad1ec3a 100644 --- a/src/x/sync/worker_pool.go +++ b/src/x/sync/worker_pool.go @@ -134,3 +134,7 @@ func (p *workerPool) GoWithContext(ctx context.Context, work Work) ScheduleResul return ScheduleResult{Available: false, WaitTime: time.Since(start)} } } + +func (p *workerPool) FastContextCheck(batchSize int) WorkerPool { + return &fastWorkerPool{workerPool: p, batchSize: batchSize} +} diff --git a/src/x/sync/worker_pool_test.go b/src/x/sync/worker_pool_test.go index fc7e3a1760..047b671918 100644 --- a/src/x/sync/worker_pool_test.go +++ b/src/x/sync/worker_pool_test.go @@ -141,3 +141,20 @@ func TestGoWithContext(t *testing.T) { result = wp.GoWithContext(ctx, func() {}) require.False(t, result.Available) } + +func TestFast(t *testing.T) { + wp := NewWorkerPool(1) + wp.Init() + + fast := wp.FastContextCheck(3) + + goctx, cancel := stdctx.WithCancel(stdctx.Background()) + cancel() + ctx := context.NewWithGoContext(goctx) + + require.False(t, fast.GoWithContext(ctx, func() {}).Available) + require.True(t, fast.GoWithContext(ctx, func() {}).Available) + require.True(t, fast.GoWithContext(ctx, func() {}).Available) + require.False(t, fast.GoWithContext(ctx, func() {}).Available) + require.True(t, fast.GoWithContext(ctx, func() {}).Available) +}