Skip to content

Commit

Permalink
[query] Series iterators processor (#2512)
Browse files Browse the repository at this point in the history
  • Loading branch information
rallen090 authored Sep 4, 2020
1 parent c2b2272 commit 9f176c8
Show file tree
Hide file tree
Showing 19 changed files with 173 additions and 55 deletions.
10 changes: 9 additions & 1 deletion src/dbnode/client/reader_slice_of_slices_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (it *readerSliceOfSlicesIterator) Close() {

func (it *readerSliceOfSlicesIterator) Reset(segments []*rpc.Segments) {
it.segments = segments
it.idx = -1
it.resetIndex()
it.closed = false
}

Expand All @@ -204,3 +204,11 @@ func (it *readerSliceOfSlicesIterator) Size() (int, error) {
}
return size, nil
}

func (it *readerSliceOfSlicesIterator) Rewind() {
it.resetIndex()
}

func (it *readerSliceOfSlicesIterator) resetIndex() {
it.idx = -1
}
8 changes: 4 additions & 4 deletions src/dbnode/encoding/encoding_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions src/dbnode/encoding/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ func (it *testReaderSliceOfSlicesIterator) Size() (int, error) {
return 0, nil
}

func (it *testReaderSliceOfSlicesIterator) Rewind() {
it.idx = -1
}

func (it *testReaderSliceOfSlicesIterator) arrayIdx() int {
idx := it.idx
if idx == -1 {
Expand Down
4 changes: 4 additions & 0 deletions src/dbnode/encoding/multi_reader_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,3 +246,7 @@ func (it *singleSlicesOfSlicesIterator) Size() (int, error) {
}
return size, nil
}

func (it *singleSlicesOfSlicesIterator) Rewind() {
it.firstNext = true
}
9 changes: 5 additions & 4 deletions src/dbnode/encoding/series_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,13 @@ func TestSeriesIteratorSetIterateEqualTimestampStrategy(t *testing.T) {
DefaultIterateEqualTimestampStrategy)
}

type testConsolidator struct {
type testSeriesConsolidator struct {
iters []MultiReaderIterator
}

func (c *testConsolidator) ConsolidateReplicas(
_ []MultiReaderIterator) ([]MultiReaderIterator, error) {
func (c *testSeriesConsolidator) ConsolidateReplicas(
_ []MultiReaderIterator,
) ([]MultiReaderIterator, error) {
return c.iters, nil
}

Expand All @@ -259,7 +260,7 @@ func TestSeriesIteratorSetSeriesIteratorConsolidator(t *testing.T) {
newIter.EXPECT().Current().Return(ts.Datapoint{}, xtime.Second, nil).Times(2)

iter.iters.setFilter(0, 1)
consolidator := &testConsolidator{iters: []MultiReaderIterator{newIter}}
consolidator := &testSeriesConsolidator{iters: []MultiReaderIterator{newIter}}
oldIter := NewMockMultiReaderIterator(ctrl)
oldIters := []MultiReaderIterator{oldIter}
iter.multiReaderIters = oldIters
Expand Down
9 changes: 4 additions & 5 deletions src/dbnode/encoding/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"github.com/m3db/m3/src/dbnode/x/xio"
"github.com/m3db/m3/src/dbnode/x/xpool"
"github.com/m3db/m3/src/x/checked"
"github.com/m3db/m3/src/x/context"
xcontext "github.com/m3db/m3/src/x/context"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/pool"
"github.com/m3db/m3/src/x/serialize"
Expand All @@ -55,7 +55,7 @@ type Encoder interface {
// passed to this method is closed, so to avoid not returning the
// encoder's buffer back to the pool when it is completed be sure to call
// close on the context eventually.
Stream(ctx context.Context) (xio.SegmentReader, bool)
Stream(ctx xcontext.Context) (xio.SegmentReader, bool)

// NumEncoded returns the number of encoded datapoints.
NumEncoded() int
Expand Down Expand Up @@ -280,11 +280,10 @@ type SeriesIteratorStats struct {
ApproximateSizeInBytes int
}

// SeriesIteratorConsolidator optionally defines methods to consolidate newly
// reset series iterators.
// SeriesIteratorConsolidator optionally defines methods to consolidate series iterators.
type SeriesIteratorConsolidator interface {
// ConsolidateReplicas consolidates MultiReaderIterator slices.
ConsolidateReplicas([]MultiReaderIterator) ([]MultiReaderIterator, error)
ConsolidateReplicas(replicas []MultiReaderIterator) ([]MultiReaderIterator, error)
}

// SeriesIteratorOptions is a set of options for using a series iterator.
Expand Down
1 change: 0 additions & 1 deletion src/dbnode/storage/index/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ type QueryOptions struct {
SeriesLimit int
DocsLimit int
RequireExhaustive bool
IterationOptions IterationOptions
}

// IterationOptions enables users to specify iteration preferences.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewReaderSliceOfSlicesFromBlockReadersIterator(
}

func (it *readerSliceOfSlicesIterator) Next() bool {
if !(it.idx+1 < it.len) {
if it.idx >= it.len-1 {
return false
}
it.idx++
Expand Down Expand Up @@ -71,7 +71,7 @@ func (it *readerSliceOfSlicesIterator) CurrentReaderAt(idx int) BlockReader {

func (it *readerSliceOfSlicesIterator) Reset(blocks [][]BlockReader) {
it.blocks = blocks
it.idx = -1
it.resetIndex()
it.len = len(blocks)
it.closed = false
}
Expand Down Expand Up @@ -104,3 +104,11 @@ func (it *readerSliceOfSlicesIterator) Size() (int, error) {
}
return size, nil
}

func (it *readerSliceOfSlicesIterator) Rewind() {
it.resetIndex()
}

func (it *readerSliceOfSlicesIterator) resetIndex() {
it.idx = -1
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,32 @@ func TestReaderSliceOfSlicesFromBlockReadersIterator(t *testing.T) {
}

iter := NewReaderSliceOfSlicesFromBlockReadersIterator(readers)
validateIterReaders(t, iter, readers)
}

func TestRewind(t *testing.T) {
var a, b, c, d, e, f BlockReader
all := []BlockReader{a, b, c, d, e, f}
for i := range all {
all[i] = BlockReader{
SegmentReader: nullSegmentReader{},
}
}

readers := [][]BlockReader{
[]BlockReader{a, b, c},
[]BlockReader{d},
[]BlockReader{e, f},
}

iter := NewReaderSliceOfSlicesFromBlockReadersIterator(readers)
validateIterReaders(t, iter, readers)

iter.Rewind()
validateIterReaders(t, iter, readers)
}

func validateIterReaders(t *testing.T, iter ReaderSliceOfSlicesFromBlockReadersIterator, readers [][]BlockReader) {
for i := range readers {
assert.True(t, iter.Next())
l, _, _ := iter.CurrentReaders()
Expand Down
4 changes: 4 additions & 0 deletions src/dbnode/x/xio/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ type ReaderSliceOfSlicesIterator interface {

// Size gives the size of bytes in this iterator.
Size() (int, error)

// Rewind returns the iterator to the beginning.
// This operation is invalid if any of the block readers have been read.
Rewind()
}

// ReaderSliceOfSlicesFromBlockReadersIterator is an iterator
Expand Down
2 changes: 1 addition & 1 deletion src/query/remote/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (c *grpcClient) fetchRaw(

receivedMeta := decodeResultMetadata(result.GetMeta())
meta = meta.CombineMetadata(receivedMeta)
iters, err := decodeCompressedFetchResponse(result, pools)
iters, err := DecodeCompressedFetchResponse(result, pools)
if err != nil {
return fetchResult, err
}
Expand Down
53 changes: 32 additions & 21 deletions src/query/remote/compressed_codecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,23 +138,24 @@ func buildTags(tagIter ident.TagIterator, iterPools encoding.IteratorPools) ([]b
return nil, errors.ErrCannotEncodeCompressedTags
}

/*
Builds compressed rpc series from a SeriesIterator
SeriesIterator is the top level iterator returned by m3db
This SeriesIterator contains MultiReaderIterators, each representing a single
replica. Each MultiReaderIterator has a ReaderSliceOfSlicesIterator where each
step through the iterator exposes a slice of underlying BlockReaders. Each
BlockReader contains the run time encoded bytes that represent the series.
SeriesIterator also has a TagIterator representing the tags associated with it.
This function transforms a SeriesIterator into a protobuf representation to be
able to send it across the wire without needing to expand the series.
*/
func compressedSeriesFromSeriesIterator(
// CompressedSeriesFromSeriesIterator builds compressed rpc series from a SeriesIterator
// SeriesIterator is the top level iterator returned by m3db
func CompressedSeriesFromSeriesIterator(
it encoding.SeriesIterator,
iterPools encoding.IteratorPools,
) (*rpc.Series, error) {
// This SeriesIterator contains MultiReaderIterators, each representing a single
// replica. Each MultiReaderIterator has a ReaderSliceOfSlicesIterator where each
// step through the iterator exposes a slice of underlying BlockReaders. Each
// BlockReader contains the run time encoded bytes that represent the series.
//
// SeriesIterator also has a TagIterator representing the tags associated with it.
//
// This function transforms a SeriesIterator into a protobuf representation to be
// able to send it across the wire without needing to expand the series.
//
// If reset argument is true, the SeriesIterator readers will be reset so it can
// be iterated again. If false, the SeriesIterator will no longer be useable.
replicas, err := it.Replicas()
if err != nil {
return nil, err
Expand All @@ -169,19 +170,29 @@ func compressedSeriesFromSeriesIterator(
if err != nil {
return nil, err
}

replicaSegments = append(replicaSegments, segments)
}

compressedReplicas = append(compressedReplicas, &rpc.M3CompressedValuesReplica{
// Rewind the reader state back to beginning to the it can be re-iterated by caller.
// These multi-readers are queued up via ResetSliceOfSlices so that the first Current
// index is set, and therefore we must also call an initial Next here to match that state.
// This behavior is not obvious so we should later change ResetSliceOfSlices to not do this
// initial Next move and assert that all iters start w/ Current as nil.
readers.Rewind()
readers.Next()

r := &rpc.M3CompressedValuesReplica{
Segments: replicaSegments,
})
}
compressedReplicas = append(compressedReplicas, r)
}

start := xtime.ToNanoseconds(it.Start())
end := xtime.ToNanoseconds(it.End())

tags, err := buildTags(it.Tags(), iterPools)
itTags := it.Tags()
defer itTags.Rewind()
tags, err := buildTags(itTags, iterPools)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -209,7 +220,7 @@ func encodeToCompressedSeries(
iters := results.SeriesIterators()
seriesList := make([]*rpc.Series, 0, len(iters))
for _, iter := range iters {
series, err := compressedSeriesFromSeriesIterator(iter, iterPools)
series, err := CompressedSeriesFromSeriesIterator(iter, iterPools)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -397,9 +408,9 @@ func seriesIteratorFromCompressedSeries(
return seriesIter, nil
}

// decodeCompressedFetchResponse decodes compressed fetch
// DecodeCompressedFetchResponse decodes compressed fetch
// response to seriesIterators.
func decodeCompressedFetchResponse(
func DecodeCompressedFetchResponse(
fetchResult *rpc.FetchResponse,
iteratorPools encoding.IteratorPools,
) (encoding.SeriesIterators, error) {
Expand Down
22 changes: 16 additions & 6 deletions src/query/remote/compressed_codecs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,22 +179,22 @@ func verifyCompressedSeries(t *testing.T, s *rpc.Series) {

func TestConversionToCompressedData(t *testing.T) {
it := buildTestSeriesIterator(t)
series, err := compressedSeriesFromSeriesIterator(it, nil)
series, err := CompressedSeriesFromSeriesIterator(it, nil)
require.Error(t, err)
require.Nil(t, series)
}

func TestSeriesConversionFromCompressedData(t *testing.T) {
it := buildTestSeriesIterator(t)
series, err := compressedSeriesFromSeriesIterator(it, nil)
series, err := CompressedSeriesFromSeriesIterator(it, nil)
require.Error(t, err)
require.Nil(t, series)
}

func TestSeriesConversionFromCompressedDataWithIteratorPool(t *testing.T) {
it := buildTestSeriesIterator(t)
ip := test.MakeMockIteratorPool()
series, err := compressedSeriesFromSeriesIterator(it, ip)
series, err := CompressedSeriesFromSeriesIterator(it, ip)

require.NoError(t, err)
verifyCompressedSeries(t, series)
Expand Down Expand Up @@ -291,15 +291,15 @@ func TestDecodeCompressedFetchResultWithIteratorPool(t *testing.T) {
Series: compressed,
}

revertedIters, err := decodeCompressedFetchResponse(fetchResult, ip)
revertedIters, err := DecodeCompressedFetchResponse(fetchResult, ip)
require.NoError(t, err)
revertedIterList := revertedIters.Iters()
require.Len(t, revertedIterList, 2)
for _, seriesIterator := range revertedIterList {
validateSeries(t, seriesIterator)
}

revertedIters, err = decodeCompressedFetchResponse(fetchResult, ip)
revertedIters, err = DecodeCompressedFetchResponse(fetchResult, ip)
require.NoError(t, err)
revertedIterList = revertedIters.Iters()
require.Len(t, revertedIterList, 2)
Expand Down Expand Up @@ -329,5 +329,15 @@ func TestConversionDoesNotCloseSeriesIterator(t *testing.T) {
mockIter.EXPECT().Namespace().Return(ident.StringID("")).Times(1)
mockIter.EXPECT().ID().Return(ident.StringID("")).Times(1)

compressedSeriesFromSeriesIterator(mockIter, nil)
CompressedSeriesFromSeriesIterator(mockIter, nil)
}

func TestIterablePostCompression(t *testing.T) {
it := buildTestSeriesIterator(t)
ip := test.MakeMockIteratorPool()
series, err := CompressedSeriesFromSeriesIterator(it, ip)
require.NoError(t, err)
require.NotNil(t, series)

validateSeries(t, it)
}
Loading

0 comments on commit 9f176c8

Please sign in to comment.