From 9f176c8d747f560db08c6a442d020b2100585724 Mon Sep 17 00:00:00 2001 From: Ryan Allen Date: Fri, 4 Sep 2020 17:49:08 -0400 Subject: [PATCH] [query] Series iterators processor (#2512) --- .../client/reader_slice_of_slices_iterator.go | 10 +++- src/dbnode/encoding/encoding_mock.go | 8 +-- src/dbnode/encoding/iterator_test.go | 4 ++ src/dbnode/encoding/multi_reader_iterator.go | 4 ++ src/dbnode/encoding/series_iterator_test.go | 9 ++-- src/dbnode/encoding/types.go | 9 ++-- src/dbnode/storage/index/types.go | 1 - ...e_of_slices_from_block_readers_iterator.go | 12 ++++- ...slices_from_block_readers_iterator_test.go | 26 +++++++++ src/dbnode/x/xio/types.go | 4 ++ src/query/remote/client.go | 2 +- src/query/remote/compressed_codecs.go | 53 +++++++++++-------- src/query/remote/compressed_codecs_test.go | 22 +++++--- src/query/storage/m3/storage.go | 33 +++++++++--- src/query/storage/m3/storage_test.go | 5 +- src/query/test/test_series_iterator.go | 1 - src/query/tracepoint/tracepoint.go | 3 ++ src/query/ts/m3db/options.go | 11 ++++ src/query/ts/m3db/types.go | 11 ++++ 19 files changed, 173 insertions(+), 55 deletions(-) diff --git a/src/dbnode/client/reader_slice_of_slices_iterator.go b/src/dbnode/client/reader_slice_of_slices_iterator.go index 624925f1d2..d6e6f78289 100644 --- a/src/dbnode/client/reader_slice_of_slices_iterator.go +++ b/src/dbnode/client/reader_slice_of_slices_iterator.go @@ -189,7 +189,7 @@ func (it *readerSliceOfSlicesIterator) Close() { func (it *readerSliceOfSlicesIterator) Reset(segments []*rpc.Segments) { it.segments = segments - it.idx = -1 + it.resetIndex() it.closed = false } @@ -204,3 +204,11 @@ func (it *readerSliceOfSlicesIterator) Size() (int, error) { } return size, nil } + +func (it *readerSliceOfSlicesIterator) Rewind() { + it.resetIndex() +} + +func (it *readerSliceOfSlicesIterator) resetIndex() { + it.idx = -1 +} diff --git a/src/dbnode/encoding/encoding_mock.go b/src/dbnode/encoding/encoding_mock.go index fa61de0683..9d779fa176 100644 --- a/src/dbnode/encoding/encoding_mock.go +++ b/src/dbnode/encoding/encoding_mock.go @@ -1293,18 +1293,18 @@ func (m *MockSeriesIteratorConsolidator) EXPECT() *MockSeriesIteratorConsolidato } // ConsolidateReplicas mocks base method -func (m *MockSeriesIteratorConsolidator) ConsolidateReplicas(arg0 []MultiReaderIterator) ([]MultiReaderIterator, error) { +func (m *MockSeriesIteratorConsolidator) ConsolidateReplicas(replicas []MultiReaderIterator) ([]MultiReaderIterator, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ConsolidateReplicas", arg0) + ret := m.ctrl.Call(m, "ConsolidateReplicas", replicas) ret0, _ := ret[0].([]MultiReaderIterator) ret1, _ := ret[1].(error) return ret0, ret1 } // ConsolidateReplicas indicates an expected call of ConsolidateReplicas -func (mr *MockSeriesIteratorConsolidatorMockRecorder) ConsolidateReplicas(arg0 interface{}) *gomock.Call { +func (mr *MockSeriesIteratorConsolidatorMockRecorder) ConsolidateReplicas(replicas interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConsolidateReplicas", reflect.TypeOf((*MockSeriesIteratorConsolidator)(nil).ConsolidateReplicas), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConsolidateReplicas", reflect.TypeOf((*MockSeriesIteratorConsolidator)(nil).ConsolidateReplicas), replicas) } // MockSeriesIterators is a mock of SeriesIterators interface diff --git a/src/dbnode/encoding/iterator_test.go b/src/dbnode/encoding/iterator_test.go index d34eaf20d5..6df695374d 100644 --- a/src/dbnode/encoding/iterator_test.go +++ b/src/dbnode/encoding/iterator_test.go @@ -198,6 +198,10 @@ func (it *testReaderSliceOfSlicesIterator) Size() (int, error) { return 0, nil } +func (it *testReaderSliceOfSlicesIterator) Rewind() { + it.idx = -1 +} + func (it *testReaderSliceOfSlicesIterator) arrayIdx() int { idx := it.idx if idx == -1 { diff --git a/src/dbnode/encoding/multi_reader_iterator.go b/src/dbnode/encoding/multi_reader_iterator.go index 77e61b8034..c3e986f641 100644 --- a/src/dbnode/encoding/multi_reader_iterator.go +++ b/src/dbnode/encoding/multi_reader_iterator.go @@ -246,3 +246,7 @@ func (it *singleSlicesOfSlicesIterator) Size() (int, error) { } return size, nil } + +func (it *singleSlicesOfSlicesIterator) Rewind() { + it.firstNext = true +} diff --git a/src/dbnode/encoding/series_iterator_test.go b/src/dbnode/encoding/series_iterator_test.go index 85f831ea15..ea60f8b30a 100644 --- a/src/dbnode/encoding/series_iterator_test.go +++ b/src/dbnode/encoding/series_iterator_test.go @@ -235,12 +235,13 @@ func TestSeriesIteratorSetIterateEqualTimestampStrategy(t *testing.T) { DefaultIterateEqualTimestampStrategy) } -type testConsolidator struct { +type testSeriesConsolidator struct { iters []MultiReaderIterator } -func (c *testConsolidator) ConsolidateReplicas( - _ []MultiReaderIterator) ([]MultiReaderIterator, error) { +func (c *testSeriesConsolidator) ConsolidateReplicas( + _ []MultiReaderIterator, +) ([]MultiReaderIterator, error) { return c.iters, nil } @@ -259,7 +260,7 @@ func TestSeriesIteratorSetSeriesIteratorConsolidator(t *testing.T) { newIter.EXPECT().Current().Return(ts.Datapoint{}, xtime.Second, nil).Times(2) iter.iters.setFilter(0, 1) - consolidator := &testConsolidator{iters: []MultiReaderIterator{newIter}} + consolidator := &testSeriesConsolidator{iters: []MultiReaderIterator{newIter}} oldIter := NewMockMultiReaderIterator(ctrl) oldIters := []MultiReaderIterator{oldIter} iter.multiReaderIters = oldIters diff --git a/src/dbnode/encoding/types.go b/src/dbnode/encoding/types.go index 2ef8c17729..0b88c32b8e 100644 --- a/src/dbnode/encoding/types.go +++ b/src/dbnode/encoding/types.go @@ -29,7 +29,7 @@ import ( "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3/src/dbnode/x/xpool" "github.com/m3db/m3/src/x/checked" - "github.com/m3db/m3/src/x/context" + xcontext "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/pool" "github.com/m3db/m3/src/x/serialize" @@ -55,7 +55,7 @@ type Encoder interface { // passed to this method is closed, so to avoid not returning the // encoder's buffer back to the pool when it is completed be sure to call // close on the context eventually. - Stream(ctx context.Context) (xio.SegmentReader, bool) + Stream(ctx xcontext.Context) (xio.SegmentReader, bool) // NumEncoded returns the number of encoded datapoints. NumEncoded() int @@ -280,11 +280,10 @@ type SeriesIteratorStats struct { ApproximateSizeInBytes int } -// SeriesIteratorConsolidator optionally defines methods to consolidate newly -// reset series iterators. +// SeriesIteratorConsolidator optionally defines methods to consolidate series iterators. type SeriesIteratorConsolidator interface { // ConsolidateReplicas consolidates MultiReaderIterator slices. - ConsolidateReplicas([]MultiReaderIterator) ([]MultiReaderIterator, error) + ConsolidateReplicas(replicas []MultiReaderIterator) ([]MultiReaderIterator, error) } // SeriesIteratorOptions is a set of options for using a series iterator. diff --git a/src/dbnode/storage/index/types.go b/src/dbnode/storage/index/types.go index 69ed50bb32..4a667f57a6 100644 --- a/src/dbnode/storage/index/types.go +++ b/src/dbnode/storage/index/types.go @@ -85,7 +85,6 @@ type QueryOptions struct { SeriesLimit int DocsLimit int RequireExhaustive bool - IterationOptions IterationOptions } // IterationOptions enables users to specify iteration preferences. diff --git a/src/dbnode/x/xio/reader_slice_of_slices_from_block_readers_iterator.go b/src/dbnode/x/xio/reader_slice_of_slices_from_block_readers_iterator.go index 1914d24448..9df35045c3 100644 --- a/src/dbnode/x/xio/reader_slice_of_slices_from_block_readers_iterator.go +++ b/src/dbnode/x/xio/reader_slice_of_slices_from_block_readers_iterator.go @@ -41,7 +41,7 @@ func NewReaderSliceOfSlicesFromBlockReadersIterator( } func (it *readerSliceOfSlicesIterator) Next() bool { - if !(it.idx+1 < it.len) { + if it.idx >= it.len-1 { return false } it.idx++ @@ -71,7 +71,7 @@ func (it *readerSliceOfSlicesIterator) CurrentReaderAt(idx int) BlockReader { func (it *readerSliceOfSlicesIterator) Reset(blocks [][]BlockReader) { it.blocks = blocks - it.idx = -1 + it.resetIndex() it.len = len(blocks) it.closed = false } @@ -104,3 +104,11 @@ func (it *readerSliceOfSlicesIterator) Size() (int, error) { } return size, nil } + +func (it *readerSliceOfSlicesIterator) Rewind() { + it.resetIndex() +} + +func (it *readerSliceOfSlicesIterator) resetIndex() { + it.idx = -1 +} diff --git a/src/dbnode/x/xio/reader_slice_of_slices_from_block_readers_iterator_test.go b/src/dbnode/x/xio/reader_slice_of_slices_from_block_readers_iterator_test.go index 49b4dc6f50..21a1a82edf 100644 --- a/src/dbnode/x/xio/reader_slice_of_slices_from_block_readers_iterator_test.go +++ b/src/dbnode/x/xio/reader_slice_of_slices_from_block_readers_iterator_test.go @@ -42,6 +42,32 @@ func TestReaderSliceOfSlicesFromBlockReadersIterator(t *testing.T) { } iter := NewReaderSliceOfSlicesFromBlockReadersIterator(readers) + validateIterReaders(t, iter, readers) +} + +func TestRewind(t *testing.T) { + var a, b, c, d, e, f BlockReader + all := []BlockReader{a, b, c, d, e, f} + for i := range all { + all[i] = BlockReader{ + SegmentReader: nullSegmentReader{}, + } + } + + readers := [][]BlockReader{ + []BlockReader{a, b, c}, + []BlockReader{d}, + []BlockReader{e, f}, + } + + iter := NewReaderSliceOfSlicesFromBlockReadersIterator(readers) + validateIterReaders(t, iter, readers) + + iter.Rewind() + validateIterReaders(t, iter, readers) +} + +func validateIterReaders(t *testing.T, iter ReaderSliceOfSlicesFromBlockReadersIterator, readers [][]BlockReader) { for i := range readers { assert.True(t, iter.Next()) l, _, _ := iter.CurrentReaders() diff --git a/src/dbnode/x/xio/types.go b/src/dbnode/x/xio/types.go index cf06d9af63..09c18dca6d 100644 --- a/src/dbnode/x/xio/types.go +++ b/src/dbnode/x/xio/types.go @@ -86,6 +86,10 @@ type ReaderSliceOfSlicesIterator interface { // Size gives the size of bytes in this iterator. Size() (int, error) + + // Rewind returns the iterator to the beginning. + // This operation is invalid if any of the block readers have been read. + Rewind() } // ReaderSliceOfSlicesFromBlockReadersIterator is an iterator diff --git a/src/query/remote/client.go b/src/query/remote/client.go index e5cf2d8fe6..e7fc702471 100644 --- a/src/query/remote/client.go +++ b/src/query/remote/client.go @@ -170,7 +170,7 @@ func (c *grpcClient) fetchRaw( receivedMeta := decodeResultMetadata(result.GetMeta()) meta = meta.CombineMetadata(receivedMeta) - iters, err := decodeCompressedFetchResponse(result, pools) + iters, err := DecodeCompressedFetchResponse(result, pools) if err != nil { return fetchResult, err } diff --git a/src/query/remote/compressed_codecs.go b/src/query/remote/compressed_codecs.go index 766004ae98..ef466b1426 100644 --- a/src/query/remote/compressed_codecs.go +++ b/src/query/remote/compressed_codecs.go @@ -138,23 +138,24 @@ func buildTags(tagIter ident.TagIterator, iterPools encoding.IteratorPools) ([]b return nil, errors.ErrCannotEncodeCompressedTags } -/* -Builds compressed rpc series from a SeriesIterator -SeriesIterator is the top level iterator returned by m3db -This SeriesIterator contains MultiReaderIterators, each representing a single -replica. Each MultiReaderIterator has a ReaderSliceOfSlicesIterator where each -step through the iterator exposes a slice of underlying BlockReaders. Each -BlockReader contains the run time encoded bytes that represent the series. - -SeriesIterator also has a TagIterator representing the tags associated with it. - -This function transforms a SeriesIterator into a protobuf representation to be -able to send it across the wire without needing to expand the series. -*/ -func compressedSeriesFromSeriesIterator( +// CompressedSeriesFromSeriesIterator builds compressed rpc series from a SeriesIterator +// SeriesIterator is the top level iterator returned by m3db +func CompressedSeriesFromSeriesIterator( it encoding.SeriesIterator, iterPools encoding.IteratorPools, ) (*rpc.Series, error) { + // This SeriesIterator contains MultiReaderIterators, each representing a single + // replica. Each MultiReaderIterator has a ReaderSliceOfSlicesIterator where each + // step through the iterator exposes a slice of underlying BlockReaders. Each + // BlockReader contains the run time encoded bytes that represent the series. + // + // SeriesIterator also has a TagIterator representing the tags associated with it. + // + // This function transforms a SeriesIterator into a protobuf representation to be + // able to send it across the wire without needing to expand the series. + // + // If reset argument is true, the SeriesIterator readers will be reset so it can + // be iterated again. If false, the SeriesIterator will no longer be useable. replicas, err := it.Replicas() if err != nil { return nil, err @@ -169,19 +170,29 @@ func compressedSeriesFromSeriesIterator( if err != nil { return nil, err } - replicaSegments = append(replicaSegments, segments) } - compressedReplicas = append(compressedReplicas, &rpc.M3CompressedValuesReplica{ + // Rewind the reader state back to beginning to the it can be re-iterated by caller. + // These multi-readers are queued up via ResetSliceOfSlices so that the first Current + // index is set, and therefore we must also call an initial Next here to match that state. + // This behavior is not obvious so we should later change ResetSliceOfSlices to not do this + // initial Next move and assert that all iters start w/ Current as nil. + readers.Rewind() + readers.Next() + + r := &rpc.M3CompressedValuesReplica{ Segments: replicaSegments, - }) + } + compressedReplicas = append(compressedReplicas, r) } start := xtime.ToNanoseconds(it.Start()) end := xtime.ToNanoseconds(it.End()) - tags, err := buildTags(it.Tags(), iterPools) + itTags := it.Tags() + defer itTags.Rewind() + tags, err := buildTags(itTags, iterPools) if err != nil { return nil, err } @@ -209,7 +220,7 @@ func encodeToCompressedSeries( iters := results.SeriesIterators() seriesList := make([]*rpc.Series, 0, len(iters)) for _, iter := range iters { - series, err := compressedSeriesFromSeriesIterator(iter, iterPools) + series, err := CompressedSeriesFromSeriesIterator(iter, iterPools) if err != nil { return nil, err } @@ -397,9 +408,9 @@ func seriesIteratorFromCompressedSeries( return seriesIter, nil } -// decodeCompressedFetchResponse decodes compressed fetch +// DecodeCompressedFetchResponse decodes compressed fetch // response to seriesIterators. -func decodeCompressedFetchResponse( +func DecodeCompressedFetchResponse( fetchResult *rpc.FetchResponse, iteratorPools encoding.IteratorPools, ) (encoding.SeriesIterators, error) { diff --git a/src/query/remote/compressed_codecs_test.go b/src/query/remote/compressed_codecs_test.go index 122eb5ac82..14b22437eb 100644 --- a/src/query/remote/compressed_codecs_test.go +++ b/src/query/remote/compressed_codecs_test.go @@ -179,14 +179,14 @@ func verifyCompressedSeries(t *testing.T, s *rpc.Series) { func TestConversionToCompressedData(t *testing.T) { it := buildTestSeriesIterator(t) - series, err := compressedSeriesFromSeriesIterator(it, nil) + series, err := CompressedSeriesFromSeriesIterator(it, nil) require.Error(t, err) require.Nil(t, series) } func TestSeriesConversionFromCompressedData(t *testing.T) { it := buildTestSeriesIterator(t) - series, err := compressedSeriesFromSeriesIterator(it, nil) + series, err := CompressedSeriesFromSeriesIterator(it, nil) require.Error(t, err) require.Nil(t, series) } @@ -194,7 +194,7 @@ func TestSeriesConversionFromCompressedData(t *testing.T) { func TestSeriesConversionFromCompressedDataWithIteratorPool(t *testing.T) { it := buildTestSeriesIterator(t) ip := test.MakeMockIteratorPool() - series, err := compressedSeriesFromSeriesIterator(it, ip) + series, err := CompressedSeriesFromSeriesIterator(it, ip) require.NoError(t, err) verifyCompressedSeries(t, series) @@ -291,7 +291,7 @@ func TestDecodeCompressedFetchResultWithIteratorPool(t *testing.T) { Series: compressed, } - revertedIters, err := decodeCompressedFetchResponse(fetchResult, ip) + revertedIters, err := DecodeCompressedFetchResponse(fetchResult, ip) require.NoError(t, err) revertedIterList := revertedIters.Iters() require.Len(t, revertedIterList, 2) @@ -299,7 +299,7 @@ func TestDecodeCompressedFetchResultWithIteratorPool(t *testing.T) { validateSeries(t, seriesIterator) } - revertedIters, err = decodeCompressedFetchResponse(fetchResult, ip) + revertedIters, err = DecodeCompressedFetchResponse(fetchResult, ip) require.NoError(t, err) revertedIterList = revertedIters.Iters() require.Len(t, revertedIterList, 2) @@ -329,5 +329,15 @@ func TestConversionDoesNotCloseSeriesIterator(t *testing.T) { mockIter.EXPECT().Namespace().Return(ident.StringID("")).Times(1) mockIter.EXPECT().ID().Return(ident.StringID("")).Times(1) - compressedSeriesFromSeriesIterator(mockIter, nil) + CompressedSeriesFromSeriesIterator(mockIter, nil) +} + +func TestIterablePostCompression(t *testing.T) { + it := buildTestSeriesIterator(t) + ip := test.MakeMockIteratorPool() + series, err := CompressedSeriesFromSeriesIterator(it, ip) + require.NoError(t, err) + require.NotNil(t, series) + + validateSeries(t, it) } diff --git a/src/query/storage/m3/storage.go b/src/query/storage/m3/storage.go index 7f4e3c25a9..e3a9084a71 100644 --- a/src/query/storage/m3/storage.go +++ b/src/query/storage/m3/storage.go @@ -28,6 +28,7 @@ import ( "time" "github.com/m3db/m3/src/dbnode/client" + "github.com/m3db/m3/src/dbnode/storage/index" "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/cost" "github.com/m3db/m3/src/query/errors" @@ -91,7 +92,8 @@ func (s *m3storage) FetchProm( query *storage.FetchQuery, options *storage.FetchOptions, ) (storage.PromResult, error) { - accumulator, err := s.fetchCompressed(ctx, query, options) + queryOptions := storage.FetchOptionsToM3Options(options, query) + accumulator, err := s.fetchCompressed(ctx, query, options, queryOptions) if err != nil { return storage.PromResult{}, err } @@ -184,7 +186,8 @@ func (s *m3storage) FetchCompressed( query *storage.FetchQuery, options *storage.FetchOptions, ) (consolidators.SeriesFetchResult, Cleanup, error) { - accumulator, err := s.fetchCompressed(ctx, query, options) + queryOptions := storage.FetchOptionsToM3Options(options, query) + accumulator, err := s.fetchCompressed(ctx, query, options, queryOptions) if err != nil { return consolidators.SeriesFetchResult{ Metadata: block.NewResultMetadata(), @@ -197,6 +200,24 @@ func (s *m3storage) FetchCompressed( return result, noop, err } + if processor := s.opts.SeriesIteratorProcessor(); processor != nil { + _, span, sampled := xcontext.StartSampledTraceSpan(ctx, + tracepoint.FetchCompressedInspectSeries) + iters := result.SeriesIterators() + if err := processor.InspectSeries(ctx, iters); err != nil { + s.logger.Error("error inspecting series", zap.Error(err)) + } + if sampled { + span.LogFields( + log.String("query", query.Raw), + log.String("start", query.Start.String()), + log.String("end", query.End.String()), + log.String("interval", query.Interval.String()), + ) + } + span.Finish() + } + if options.IncludeResolution { resolutions := make([]int64, 0, len(attrs)) for _, attr := range attrs { @@ -214,6 +235,7 @@ func (s *m3storage) fetchCompressed( ctx context.Context, query *storage.FetchQuery, options *storage.FetchOptions, + queryOptions index.QueryOptions, ) (consolidators.MultiFetchResult, error) { if err := options.BlockType.Validate(); err != nil { // This is an invariant error; should not be able to get to here. @@ -273,10 +295,7 @@ func (s *m3storage) fetchCompressed( } } - var ( - opts = storage.FetchOptionsToM3Options(options, query) - wg sync.WaitGroup - ) + var wg sync.WaitGroup if len(namespaces) == 0 { return nil, errNoNamespacesConfigured } @@ -300,7 +319,7 @@ func (s *m3storage) fetchCompressed( session := namespace.Session() namespaceID := namespace.NamespaceID() - iters, metadata, err := session.FetchTagged(namespaceID, m3query, opts) + iters, metadata, err := session.FetchTagged(namespaceID, m3query, queryOptions) if err == nil && sampled { span.LogFields( log.String("namespace", namespaceID.String()), diff --git a/src/query/storage/m3/storage_test.go b/src/query/storage/m3/storage_test.go index df7cb53342..3fcb2ae847 100644 --- a/src/query/storage/m3/storage_test.go +++ b/src/query/storage/m3/storage_test.go @@ -789,11 +789,12 @@ func TestInvalidBlockTypes(t *testing.T) { s, err := NewStorage(nil, opts, instrument.NewOptions()) require.NoError(t, err) + query := &storage.FetchQuery{} fetchOpts := &storage.FetchOptions{BlockType: models.TypeDecodedBlock} - _, err = s.FetchBlocks(context.TODO(), nil, fetchOpts) + _, err = s.FetchBlocks(context.TODO(), query, fetchOpts) assert.Error(t, err) fetchOpts.BlockType = models.TypeMultiBlock - _, err = s.FetchBlocks(context.TODO(), nil, fetchOpts) + _, err = s.FetchBlocks(context.TODO(), query, fetchOpts) assert.Error(t, err) } diff --git a/src/query/test/test_series_iterator.go b/src/query/test/test_series_iterator.go index 3e51be81c2..a2adb81b66 100644 --- a/src/query/test/test_series_iterator.go +++ b/src/query/test/test_series_iterator.go @@ -137,7 +137,6 @@ func buildReplica() (encoding.MultiReaderIterator, error) { {mergedReader}, unmergedReaders, }) - multiReader.ResetSliceOfSlices(sliceOfSlicesIter, nil) return multiReader, nil } diff --git a/src/query/tracepoint/tracepoint.go b/src/query/tracepoint/tracepoint.go index 99c0499b55..cdcd02b316 100644 --- a/src/query/tracepoint/tracepoint.go +++ b/src/query/tracepoint/tracepoint.go @@ -31,6 +31,9 @@ const ( // FetchCompressedFetchTagged is for the call to FetchTagged in fetchCompressed. FetchCompressedFetchTagged = "m3.m3storage.fetchCompressed.FetchTagged" + // FetchCompressedInspectSeries is for the call to InspectSeries in FetchCompressed. + FetchCompressedInspectSeries = "m3.m3storage.FetchCompressed.InspectSeries" + // SearchCompressedFetchTaggedIDs is for the call to FetchTaggedIDs in SearchCompressed. SearchCompressedFetchTaggedIDs = "m3.m3storage.SearchCompressed.FetchTaggedIDs" diff --git a/src/query/ts/m3db/options.go b/src/query/ts/m3db/options.go index a9e93e9da6..57bd13a666 100644 --- a/src/query/ts/m3db/options.go +++ b/src/query/ts/m3db/options.go @@ -61,6 +61,7 @@ type encodedBlockOptions struct { readWorkerPools xsync.PooledWorkerPool writeWorkerPools xsync.PooledWorkerPool queryConsolidatorMatchOptions queryconsolidator.MatchOptions + seriesIteratorProcessor SeriesIteratorProcessor batchingFn IteratorBatchingFn adminOptions []client.CustomAdminOption instrumented bool @@ -207,6 +208,16 @@ func (o *encodedBlockOptions) SeriesConsolidationMatchOptions() queryconsolidato return o.queryConsolidatorMatchOptions } +func (o *encodedBlockOptions) SetSeriesIteratorProcessor(p SeriesIteratorProcessor) Options { + opts := *o + opts.seriesIteratorProcessor = p + return &opts +} + +func (o *encodedBlockOptions) SeriesIteratorProcessor() SeriesIteratorProcessor { + return o.seriesIteratorProcessor +} + func (o *encodedBlockOptions) SetIteratorBatchingFn(fn IteratorBatchingFn) Options { opts := *o opts.batchingFn = fn diff --git a/src/query/ts/m3db/types.go b/src/query/ts/m3db/types.go index d93ac1f18a..f0c9407ec0 100644 --- a/src/query/ts/m3db/types.go +++ b/src/query/ts/m3db/types.go @@ -21,6 +21,7 @@ package m3db import ( + "context" "time" "github.com/m3db/m3/src/dbnode/client" @@ -80,6 +81,10 @@ type Options interface { SetSeriesConsolidationMatchOptions(value queryconsolidator.MatchOptions) Options // SetSeriesConsolidationMatchOptions sets series consolidation options. SeriesConsolidationMatchOptions() queryconsolidator.MatchOptions + // SetSeriesIteratorProcessor sets the series iterator processor. + SetSeriesIteratorProcessor(SeriesIteratorProcessor) Options + // SeriesIteratorProcessor returns the series iterator processor. + SeriesIteratorProcessor() SeriesIteratorProcessor // SetIteratorBatchingFn sets the batching function for the converter. SetIteratorBatchingFn(IteratorBatchingFn) Options // IteratorBatchingFn returns the batching function for the converter. @@ -96,6 +101,12 @@ type Options interface { Validate() error } +// SeriesIteratorProcessor optionally defines methods to process series iterators. +type SeriesIteratorProcessor interface { + // InspectSeries inspects SeriesIterator slices. + InspectSeries(ctx context.Context, seriesIterators []encoding.SeriesIterator) error +} + // IteratorBatchingFn determines how the iterator is split into batches. type IteratorBatchingFn func( concurrency int,