Skip to content

Commit

Permalink
[dbnode] Large tiles aggregation flow v2 (#2643)
Browse files Browse the repository at this point in the history
* Refactor and cleanup

* Refactor interfaces to more closely match design

* Update frame iterator read from a encoding.ReaderIterator

* Removing unnecessary files

* az

* Add utility to apply tile calculations on data.

* test fix

* Added concurrency

* Concurrency logging

* [dbnode] A noop AggregateTiles thrift RPC

* Add AggregateTilesRequest.rangeType

* sourceNameSpace / targetNameSpace

* Drop AggregateTilesRequest.shardId

* A partial implementation of AggregateTiles

* Open DataFileSetReader and iterate through it

* Decompress the data read

* Add explicit FileSetType

* [dbnode] Add OrderedByIndex option for DataFileSetReader.Open

* Remove dbShard.TagsFromSeriesID

* Regenerate mocks

* Unit tests

* Mockgen

* Fix test

* Resurrect rpc_mock.go

* Remove accidentally committed files

* Trigger build

* Add step parameter

* Write aggregated data to other namespace

* Fix tests

* Introduced AggregateTilesOptions

* Minor improvements

* Cleanup

* PR response

* Add headers

* Remove unnecessary stuff.

* [dbnode] A noop AggregateTiles thrift RPC

* Add AggregateTilesRequest.rangeType

* sourceNameSpace / targetNameSpace

* Drop AggregateTilesRequest.shardId

* A partial implementation of AggregateTiles

* Open DataFileSetReader and iterate through it

* Decompress the data read

* Add explicit FileSetType

* Remove dbShard.TagsFromSeriesID

* Regenerate mocks

* Unit tests

* Mockgen

* Fix test

* Resurrect rpc_mock.go

* Remove accidentally committed files

* Trigger build

* Add step parameter

* Write aggregated data to other namespace

* Fix tests

* Introduced AggregateTilesOptions

* Minor improvements

* Cleanup

* [dbnode] Integrate arrow iterators into tile aggregation

* Fix close error after EOF

* Can already close the SeriesBlockIterator

* Update to use concurrent iteration and prefer single metadata

* [dbnode] Cross block series reader

* Assert on OrderedByIndex

* Tests

* Mocks

* Dont test just the happy path

* Compute and validate block time frames

* [dbnode] Integration test for large tiles (#2478)

* [dbnode] Create a virtual reverse index for a computed namespace

* Return processedBlockCount from AggregateTiles

* Improve error handling

* Validate AggregateTilesOptions

* Unnest read locks

* Use default instead of constant

* Fix test

* minor refactoring

* Addressed review feedback

* Legal stuff

* Refactor recorder

* Allow using flat buffers rather than arrow

* [dbnode] persist manager for large tiles

* revert of .ci

* minor

* Adding better comparisons for arrow vs flat

* Some fixes for query_data_files

* An option to read from all shards

* Fix large_tiles_test

* Fix TestDatabaseAggregateTiles

* Read data ordered by index

* Generate mocks

* Fix TestAggregateTiles

* Group Read() results by id

* Remodel CrossBlockReader as an Iterator

* Mockgen

* Erase slice contents before draining them

* Resolve merge conflicts

* Align with master

* Integrate CrossBlockReader

* Make a defensive copy of dataFileSetReaders

* avoid panics

* Improve TestNamespaceAggregateTiles

* Added TODO on TestAggregateTiles

* Align query_data_files

* Mockgen

* Added cross block iterator to be able to read multiple BlockRecords.
Also removed concurrency from tile iterators and cleaned up utility

* Add HandleCounterResets to AggregateTilesOptions

* Additional tests and cleanup.

* [dbnode] Large Tiles fs.writer experimental implementation

* Implement DownsampleCounterResets

* Improve readablitiy

* Use pointer arguments to get the results

* Reduce code duplication

* Refine comments

* Remove dependency on SeriesBlockFrame

* [dbnode] Add OrderedByIndex option for DataFileSetReader.Open (#2465)

* [dbnode] Cross-block series reader (#2481)

* Fix build

* Integrate DownsampleCounterResets

* Introduce DownsampledValue struct

* Update DownsampleCounterResets integration

* Preallocate capacity for downsampledValues

* Large tiles parrallel indexing.

* Checkpoint fixed

* Successful write/fetch with some hardcoded values.

* Some FIXME solved

* [dbnode] AggregateTiles RPC - minimal E2E flow (#2466)

* minor fixes

* codegen fix

* Address feedback from PR 2477

* TestShardAggregateTiles using 2 readers

* Fix large_tiles_test.go

* integration test fix

* Bug fix and test

* [large tiles] Fix refcounting in CrossBlockReader

* Workaround for negative reference count

* Integration test fix

* [large-tiles] Try detect double finalize

* [dbnode] Large tiles concurrency test

* Batch writer is used for waster writes

* race fix

* Fix compilation error

* Comment out some noise

* Fix data races on time field (bootstrapManager, flushManager)

* Fix misplaced wd.Add

* Close context used for aggregation (in test code)

* Close encoders during large tile writes

* removing some debug code

* Close series in test code

* Move series.Close() after err check (test code)

* Update to series frame API

* Additional tests

* PR

* work on integ test

* tags close

* [large-tiles] Fix management of pooled objects

* Fix query_data_files tool

* Use mock checked bytes in cross_block_reader_test.go

* query test

* Query in place of fetch

* test fix

* Bug reproduced

* Heavier concurrency test

* Fix session related races in large_tiles_test.go

* Fix string conversion

* Use mocks for pooled objects in TestShardAggregateTiles

* Add build integration tag

* test fix

* minor refactoring

* increased the amount of series to 5k

* Remove a noop

* Log the finish of namespace aggregation

* some minor refactorings

* Streaming aggregation, reusing resources for each series

* Do not allocate minHeapEntries

* Cleanup

* Fix query_data_files

* Fix build

* go.sum

* Align with StreamingWriter API changes

* Add FIXME WRT segment.Tail finalizing

* Exclude query_data_files

* Exclude counter_resets_downsampler.go

* Remove arrow related code

* Cleanup

* Use explicit EncodedTags type

* Rename processedBlockCount to processedTileCount

* Fix build

* Exclude read_index_ids changes

* Address review feedback

* Increase fetch timeout to stabilize TestAggregationAndQueryingAtHighConcurrency

* Abort the writer in case of any error during aggregation

Co-authored-by: Artem <[email protected]>
Co-authored-by: Gediminas <[email protected]>
  • Loading branch information
3 people authored Sep 25, 2020
1 parent ac530fa commit aefca00
Show file tree
Hide file tree
Showing 32 changed files with 829 additions and 443 deletions.
3 changes: 1 addition & 2 deletions src/dbnode/encoding/m3tsz/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,7 @@ func (enc *encoder) Discard() ts.Segment {
return segment
}

// DiscardReset does the same thing as Discard except it also resets the encoder
// for reuse.
// DiscardReset does the same thing as Discard except it does not close the encoder but resets it for reuse.
func (enc *encoder) DiscardReset(start time.Time, capacity int, descr namespace.SchemaDescr) ts.Segment {
segment := enc.segmentTakeOwnership()
enc.Reset(start, capacity, descr)
Expand Down
9 changes: 6 additions & 3 deletions src/dbnode/encoding/tile/series_block_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
package tile

import (
"time"

"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/x/ident"
xtime "github.com/m3db/m3/src/x/time"
)
Expand All @@ -32,12 +35,12 @@ type seriesBlockIter struct {
err error
exhausted bool

step xtime.UnixNano
step time.Duration
start xtime.UnixNano

iter SeriesFrameIterator
blockIter fs.CrossBlockIterator
encodedTags []byte
encodedTags ts.EncodedTags
id ident.ID
}

Expand Down Expand Up @@ -75,7 +78,7 @@ func (b *seriesBlockIter) Next() bool {
return true
}

func (b *seriesBlockIter) Current() (SeriesFrameIterator, ident.ID, []byte) {
func (b *seriesBlockIter) Current() (SeriesFrameIterator, ident.ID, ts.EncodedTags) {
return b.iter, b.id, b.encodedTags
}

Expand Down
4 changes: 2 additions & 2 deletions src/dbnode/encoding/tile/series_block_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ func TestSeriesBlockIterator(t *testing.T) {
iterPool.EXPECT().Get().Return(it)

opts := Options{
FrameSize: xtime.UnixNano(100),
FrameSize: time.Duration(100),
Start: xtime.UnixNano(0),
ReaderIteratorPool: iterPool,
}

reader := fs.NewMockCrossBlockReader(ctrl)
reader.EXPECT().Next().Return(true)
tags := []byte("encoded tags")
tags := ts.EncodedTags("encoded tags")
records := []fs.BlockRecord{{Data: []byte("block_record")}}
reader.EXPECT().Current().Return(ident.StringID("foobar"), tags, records)
reader.EXPECT().Next().Return(false)
Expand Down
11 changes: 6 additions & 5 deletions src/dbnode/encoding/tile/series_frame_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package tile
import (
"errors"
"fmt"
"time"

"github.com/m3db/m3/src/dbnode/persist/fs"
xtime "github.com/m3db/m3/src/x/time"
Expand All @@ -37,7 +38,7 @@ type seriesFrameIter struct {

iter fs.CrossBlockIterator

frameStep xtime.UnixNano
frameStep time.Duration
frameStart xtime.UnixNano
}

Expand All @@ -50,7 +51,7 @@ func newSeriesFrameIterator(recorder *recorder) SeriesFrameIterator {

func (b *seriesFrameIter) Reset(
start xtime.UnixNano,
frameStep xtime.UnixNano,
frameStep time.Duration,
iter fs.CrossBlockIterator,
) error {
if frameStep <= 0 {
Expand All @@ -64,7 +65,7 @@ func (b *seriesFrameIter) Reset(
b.started = false
b.frameStart = start
b.frameStep = frameStep
b.curr.reset(start, start+frameStep)
b.curr.reset(start, start+xtime.UnixNano(frameStep))

return nil
}
Expand Down Expand Up @@ -93,10 +94,10 @@ func (b *seriesFrameIter) Next() bool {
return false
}
} else {
b.curr.reset(b.frameStart, b.frameStart+b.frameStep)
b.curr.reset(b.frameStart, b.frameStart+xtime.UnixNano(b.frameStep))
}

cutover := b.frameStart + b.frameStep
cutover := b.frameStart + xtime.UnixNano(b.frameStep)
b.curr.FrameStartInclusive = b.frameStart
b.curr.FrameEndExclusive = cutover
b.frameStart = cutover
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/encoding/tile/series_frame_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func TestSeriesFrameIterator(t *testing.T) {
iter := newSequentialIterator(ctrl, start, stepSize, numPoints)
require.NoError(t, it.Reset(
xtime.ToUnixNano(start),
xtime.UnixNano(tt.frameSize),
tt.frameSize,
iter,
))

Expand Down
8 changes: 5 additions & 3 deletions src/dbnode/encoding/tile/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package tile

import (
"time"

"github.com/m3db/m3/src/dbnode/encoding"
"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/ts"
Expand Down Expand Up @@ -69,7 +71,7 @@ type SeriesFrameIterator interface {
// Reset resets the series frame iterator.
Reset(
start xtime.UnixNano,
step xtime.UnixNano,
step time.Duration,
iter fs.CrossBlockIterator,
) error
}
Expand All @@ -84,13 +86,13 @@ type SeriesBlockIterator interface {
// Close closes the iterator.
Close() error
// Current returns the next set of series frame iterators.
Current() (SeriesFrameIterator, ident.ID, []byte)
Current() (SeriesFrameIterator, ident.ID, ts.EncodedTags)
}

// Options are series block iterator options.
type Options struct {
// FrameSize is the frame size in nanos.
FrameSize xtime.UnixNano
FrameSize time.Duration
// Start is the start time for the iterator in nanos from epoch.
Start xtime.UnixNano
// EncodingOpts are options for the encoder.
Expand Down
6 changes: 3 additions & 3 deletions src/dbnode/encoding/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,13 @@ type Encoder interface {
// Reset sets up the schema for schema-aware encoders such as proto encoders.
Reset(t time.Time, capacity int, schema namespace.SchemaDescr)

// Close closes the encoder and if pooled will return to the pool.
// Close closes the encoder and if pooled will return it to the pool.
Close()

// Discard will take ownership of the encoder data and if pooled will return to the pool.
// Discard will take ownership of the encoder data and if pooled will return the encoder to the pool.
Discard() ts.Segment

// DiscardReset will take ownership of the encoder data and reset the encoder for use.
// DiscardReset will take ownership of the encoder data and reset the encoder for reuse.
// DiscardReset sets up the schema for schema-aware encoders such as proto encoders.
DiscardReset(t time.Time, capacity int, schema namespace.SchemaDescr) ts.Segment
}
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/generated/thrift/rpc.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ struct AggregateTilesRequest {
}

struct AggregateTilesResult {
1: required i64 processedBlockCount
1: required i64 processedTileCount
}

struct DebugProfileStartRequest {
Expand Down
28 changes: 14 additions & 14 deletions src/dbnode/generated/thrift/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -13247,24 +13247,24 @@ func (p *AggregateTilesRequest) String() string {
}

// Attributes:
// - ProcessedBlockCount
// - ProcessedTileCount
type AggregateTilesResult_ struct {
ProcessedBlockCount int64 `thrift:"processedBlockCount,1,required" db:"processedBlockCount" json:"processedBlockCount"`
ProcessedTileCount int64 `thrift:"processedTileCount,1,required" db:"processedTileCount" json:"processedTileCount"`
}

func NewAggregateTilesResult_() *AggregateTilesResult_ {
return &AggregateTilesResult_{}
}

func (p *AggregateTilesResult_) GetProcessedBlockCount() int64 {
return p.ProcessedBlockCount
func (p *AggregateTilesResult_) GetProcessedTileCount() int64 {
return p.ProcessedTileCount
}
func (p *AggregateTilesResult_) Read(iprot thrift.TProtocol) error {
if _, err := iprot.ReadStructBegin(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err)
}

var issetProcessedBlockCount bool = false
var issetProcessedTileCount bool = false

for {
_, fieldTypeId, fieldId, err := iprot.ReadFieldBegin()
Expand All @@ -13279,7 +13279,7 @@ func (p *AggregateTilesResult_) Read(iprot thrift.TProtocol) error {
if err := p.ReadField1(iprot); err != nil {
return err
}
issetProcessedBlockCount = true
issetProcessedTileCount = true
default:
if err := iprot.Skip(fieldTypeId); err != nil {
return err
Expand All @@ -13292,8 +13292,8 @@ func (p *AggregateTilesResult_) Read(iprot thrift.TProtocol) error {
if err := iprot.ReadStructEnd(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err)
}
if !issetProcessedBlockCount {
return thrift.NewTProtocolExceptionWithType(thrift.INVALID_DATA, fmt.Errorf("Required field ProcessedBlockCount is not set"))
if !issetProcessedTileCount {
return thrift.NewTProtocolExceptionWithType(thrift.INVALID_DATA, fmt.Errorf("Required field ProcessedTileCount is not set"))
}
return nil
}
Expand All @@ -13302,7 +13302,7 @@ func (p *AggregateTilesResult_) ReadField1(iprot thrift.TProtocol) error {
if v, err := iprot.ReadI64(); err != nil {
return thrift.PrependError("error reading field 1: ", err)
} else {
p.ProcessedBlockCount = v
p.ProcessedTileCount = v
}
return nil
}
Expand All @@ -13326,14 +13326,14 @@ func (p *AggregateTilesResult_) Write(oprot thrift.TProtocol) error {
}

func (p *AggregateTilesResult_) writeField1(oprot thrift.TProtocol) (err error) {
if err := oprot.WriteFieldBegin("processedBlockCount", thrift.I64, 1); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field begin error 1:processedBlockCount: ", p), err)
if err := oprot.WriteFieldBegin("processedTileCount", thrift.I64, 1); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field begin error 1:processedTileCount: ", p), err)
}
if err := oprot.WriteI64(int64(p.ProcessedBlockCount)); err != nil {
return thrift.PrependError(fmt.Sprintf("%T.processedBlockCount (1) field write error: ", p), err)
if err := oprot.WriteI64(int64(p.ProcessedTileCount)); err != nil {
return thrift.PrependError(fmt.Sprintf("%T.processedTileCount (1) field write error: ", p), err)
}
if err := oprot.WriteFieldEnd(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field end error 1:processedBlockCount: ", p), err)
return thrift.PrependError(fmt.Sprintf("%T write field end error 1:processedTileCount: ", p), err)
}
return err
}
Expand Down
Loading

0 comments on commit aefca00

Please sign in to comment.