diff --git a/src/dbnode/storage/database.go b/src/dbnode/storage/database.go index 971bd92169..f74bf6ba90 100644 --- a/src/dbnode/storage/database.go +++ b/src/dbnode/storage/database.go @@ -1290,7 +1290,7 @@ func (d *db) AggregateTiles( return 0, err } - processedTileCount, err := targetNs.AggregateTiles(sourceNs, opts) + processedTileCount, err := targetNs.AggregateTiles(ctx, sourceNs, opts) if err != nil { d.log.Error("error writing large tiles", zap.String("sourceNs", sourceNsID.String()), diff --git a/src/dbnode/storage/database_test.go b/src/dbnode/storage/database_test.go index d0f669e77a..f65fc30019 100644 --- a/src/dbnode/storage/database_test.go +++ b/src/dbnode/storage/database_test.go @@ -1433,6 +1433,9 @@ func TestDatabaseAggregateTiles(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() + ctx := context.NewContext() + defer ctx.Close() + d, mapCh, _ := defaultTestDatabase(t, ctrl, Bootstrapped) defer func() { close(mapCh) @@ -1441,7 +1444,6 @@ func TestDatabaseAggregateTiles(t *testing.T) { var ( sourceNsID = ident.StringID("source") targetNsID = ident.StringID("target") - ctx = context.NewContext() start = time.Now().Truncate(time.Hour) ) @@ -1451,7 +1453,7 @@ func TestDatabaseAggregateTiles(t *testing.T) { sourceNs := dbAddNewMockNamespace(ctrl, d, sourceNsID.String()) targetNs := dbAddNewMockNamespace(ctrl, d, targetNsID.String()) - targetNs.EXPECT().AggregateTiles(sourceNs, opts).Return(int64(4), nil) + targetNs.EXPECT().AggregateTiles(ctx, sourceNs, opts).Return(int64(4), nil) processedTileCount, err := d.AggregateTiles(ctx, sourceNsID, targetNsID, opts) require.NoError(t, err) diff --git a/src/dbnode/storage/namespace.go b/src/dbnode/storage/namespace.go index c4bdbb4e62..4992f63c3c 100644 --- a/src/dbnode/storage/namespace.go +++ b/src/dbnode/storage/namespace.go @@ -1700,17 +1700,19 @@ func (n *dbNamespace) nsContextWithRLock() namespace.Context { } func (n *dbNamespace) AggregateTiles( + ctx context.Context, sourceNs databaseNamespace, opts AggregateTilesOptions, ) (int64, error) { callStart := n.nowFn() - processedTileCount, err := n.aggregateTiles(sourceNs, opts) + processedTileCount, err := n.aggregateTiles(ctx, sourceNs, opts) n.metrics.aggregateTiles.ReportSuccessOrError(err, n.nowFn().Sub(callStart)) return processedTileCount, err } func (n *dbNamespace) aggregateTiles( + ctx context.Context, sourceNs databaseNamespace, opts AggregateTilesOptions, ) (int64, error) { @@ -1778,7 +1780,7 @@ func (n *dbNamespace) aggregateTiles( } shardProcessedTileCount, err := targetShard.AggregateTiles( - sourceNs.ID(), n, sourceShard.ID(), blockReaders, writer, sourceBlockVolumes, + ctx, sourceNs, n, sourceShard.ID(), blockReaders, writer, sourceBlockVolumes, onColdFlushNs, opts) processedTileCount += shardProcessedTileCount diff --git a/src/dbnode/storage/namespace_test.go b/src/dbnode/storage/namespace_test.go index 5e2e0e9346..01f2f6d798 100644 --- a/src/dbnode/storage/namespace_test.go +++ b/src/dbnode/storage/namespace_test.go @@ -1419,6 +1419,9 @@ func TestNamespaceFlushState(t *testing.T) { } func TestNamespaceAggregateTilesFailUntilBootstrapped(t *testing.T) { + ctx := context.NewContext() + defer ctx.Close() + var ( sourceNsID = ident.StringID("source") targetNsID = ident.StringID("target") @@ -1432,12 +1435,12 @@ func TestNamespaceAggregateTilesFailUntilBootstrapped(t *testing.T) { targetNs, targetCloser := newTestNamespaceWithIDOpts(t, targetNsID, namespace.NewOptions()) defer targetCloser() - _, err := targetNs.AggregateTiles(sourceNs, opts) + _, err := targetNs.AggregateTiles(ctx, sourceNs, opts) require.Equal(t, errNamespaceNotBootstrapped, err) sourceNs.bootstrapState = Bootstrapped - _, err = targetNs.AggregateTiles(sourceNs, opts) + _, err = targetNs.AggregateTiles(ctx, sourceNs, opts) require.Equal(t, errNamespaceNotBootstrapped, err) } @@ -1445,6 +1448,9 @@ func TestNamespaceAggregateTiles(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() + ctx := context.NewContext() + defer ctx.Close() + var ( sourceNsID = ident.StringID("source") targetNsID = ident.StringID("target") @@ -1505,21 +1511,19 @@ func TestNamespaceAggregateTiles(t *testing.T) { sourceBlockVolumes0 := []shardBlockVolume{{start, 5}, {secondSourceBlockStart, 15}} sourceBlockVolumes1 := []shardBlockVolume{{start, 7}, {secondSourceBlockStart, 17}} - sourceNsIDMatcher := ident.NewIDMatcher(sourceNsID.String()) - targetShard0.EXPECT(). AggregateTiles( - sourceNsIDMatcher, targetNs, shard0ID, gomock.Len(2), gomock.Any(), + ctx, sourceNs, targetNs, shard0ID, gomock.Len(2), gomock.Any(), sourceBlockVolumes0, gomock.Any(), opts). Return(int64(3), nil) targetShard1.EXPECT(). AggregateTiles( - sourceNsIDMatcher, targetNs, shard1ID, gomock.Len(2), gomock.Any(), + ctx, sourceNs, targetNs, shard1ID, gomock.Len(2), gomock.Any(), sourceBlockVolumes1, gomock.Any(), opts). Return(int64(2), nil) - processedTileCount, err := targetNs.AggregateTiles(sourceNs, opts) + processedTileCount, err := targetNs.AggregateTiles(ctx, sourceNs, opts) require.NoError(t, err) assert.Equal(t, int64(3+2), processedTileCount) diff --git a/src/dbnode/storage/options.go b/src/dbnode/storage/options.go index 05bf1764e0..9da8bc65f4 100644 --- a/src/dbnode/storage/options.go +++ b/src/dbnode/storage/options.go @@ -942,12 +942,13 @@ func (h *noopNamespaceHooks) OnCreatedNamespace(Namespace, GetNamespaceFn) error type noopTileAggregator struct{} func (a *noopTileAggregator) AggregateTiles( - opts AggregateTilesOptions, - ns Namespace, + ctx context.Context, + sourceNs, targetNs Namespace, shardID uint32, - readers []fs.DataFileSetReader, + blockReaders []fs.DataFileSetReader, writer fs.StreamingWriter, onFlushSeries persist.OnFlushSeries, + opts AggregateTilesOptions, ) (int64, error) { return 0, nil } diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index 6ccd37a9f8..56d61d776d 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -2662,8 +2662,8 @@ func (s *dbShard) Repair( } func (s *dbShard) AggregateTiles( - sourceNsID ident.ID, - targetNs Namespace, + ctx context.Context, + sourceNs, targetNs Namespace, shardID uint32, blockReaders []fs.DataFileSetReader, writer fs.StreamingWriter, @@ -2687,7 +2687,11 @@ func (s *dbShard) AggregateTiles( } }() - maxEntries := 0 + var ( + sourceNsID = sourceNs.ID() + maxEntries = 0 + ) + for sourceBlockPos, blockReader := range blockReaders { sourceBlockVolume := sourceBlockVolumes[sourceBlockPos] openOpts := fs.DataReaderOpenOptions{ @@ -2742,7 +2746,7 @@ func (s *dbShard) AggregateTiles( var multiErr xerrors.MultiError processedTileCount, err := s.tileAggregator.AggregateTiles( - opts, targetNs, s.ID(), openBlockReaders, writer, onFlushSeries) + ctx, sourceNs, targetNs, s.ID(), openBlockReaders, writer, onFlushSeries, opts) if err != nil { // NB: cannot return on the error here, must finish writing. multiErr = multiErr.Add(err) diff --git a/src/dbnode/storage/shard_test.go b/src/dbnode/storage/shard_test.go index 3a6e948bdf..bceaf17151 100644 --- a/src/dbnode/storage/shard_test.go +++ b/src/dbnode/storage/shard_test.go @@ -1848,6 +1848,9 @@ func TestShardAggregateTiles(t *testing.T) { ctrl := xtest.NewController(t) defer ctrl.Finish() + ctx := context.NewContext() + defer ctx.Close() + var ( sourceBlockSize = time.Hour targetBlockSize = 2 * time.Hour @@ -1869,8 +1872,6 @@ func TestShardAggregateTiles(t *testing.T) { sourceShard := testDatabaseShard(t, testOpts) defer assert.NoError(t, sourceShard.Close()) - sourceNsID := sourceShard.namespace.ID() - reader0, volume0 := getMockReader(ctrl, t, sourceShard, start, true) reader0.EXPECT().Entries().Return(firstSourceBlockEntries) @@ -1903,16 +1904,22 @@ func TestShardAggregateTiles(t *testing.T) { }), writer.EXPECT().Close(), ) - noOpColdFlushNs := &persist.NoOpColdFlushNamespace{} - targetNs := NewMockNamespace(ctrl) + var ( + noOpColdFlushNs = &persist.NoOpColdFlushNamespace{} + sourceNs = NewMockNamespace(ctrl) + targetNs = NewMockNamespace(ctrl) + ) + + sourceNs.EXPECT().ID().Return(sourceShard.namespace.ID()) + aggregator.EXPECT(). - AggregateTiles(opts, targetNs, sourceShard.ID(), gomock.Len(2), writer, - noOpColdFlushNs). + AggregateTiles(ctx, sourceNs, targetNs, sourceShard.ID(), gomock.Len(2), writer, + noOpColdFlushNs, opts). Return(expectedProcessedTileCount, nil) processedTileCount, err := targetShard.AggregateTiles( - sourceNsID, targetNs, sourceShard.ID(), blockReaders, writer, + ctx, sourceNs, targetNs, sourceShard.ID(), blockReaders, writer, sourceBlockVolumes, noOpColdFlushNs, opts) require.NoError(t, err) assert.Equal(t, expectedProcessedTileCount, processedTileCount) @@ -1922,21 +1929,23 @@ func TestShardAggregateTilesVerifySliceLengths(t *testing.T) { ctrl := xtest.NewController(t) defer ctrl.Finish() - var ( - srcNsID = ident.StringID("src") - start = time.Now() - ) + ctx := context.NewContext() + defer ctx.Close() targetShard := testDatabaseShardWithIndexFn(t, DefaultTestOptions(), nil, true) defer assert.NoError(t, targetShard.Close()) - var blockReaders []fs.DataFileSetReader - sourceBlockVolumes := []shardBlockVolume{{start, 0}} - - writer := fs.NewMockStreamingWriter(ctrl) + var ( + start = time.Now() + blockReaders []fs.DataFileSetReader + sourceBlockVolumes = []shardBlockVolume{{start, 0}} + writer = fs.NewMockStreamingWriter(ctrl) + sourceNs = NewMockNamespace(ctrl) + targetNs = NewMockNamespace(ctrl) + ) _, err := targetShard.AggregateTiles( - srcNsID, nil, 1, blockReaders, writer, sourceBlockVolumes, + ctx, sourceNs, targetNs, 1, blockReaders, writer, sourceBlockVolumes, &persist.NoOpColdFlushNamespace{}, AggregateTilesOptions{}) require.EqualError(t, err, "blockReaders and sourceBlockVolumes length mismatch (0 != 1)") } diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index 27d3afc98a..550fd18483 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -1808,18 +1808,18 @@ func (mr *MockdatabaseNamespaceMockRecorder) WritePendingIndexInserts(pending in } // AggregateTiles mocks base method -func (m *MockdatabaseNamespace) AggregateTiles(sourceNs databaseNamespace, opts AggregateTilesOptions) (int64, error) { +func (m *MockdatabaseNamespace) AggregateTiles(ctx context.Context, sourceNs databaseNamespace, opts AggregateTilesOptions) (int64, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AggregateTiles", sourceNs, opts) + ret := m.ctrl.Call(m, "AggregateTiles", ctx, sourceNs, opts) ret0, _ := ret[0].(int64) ret1, _ := ret[1].(error) return ret0, ret1 } // AggregateTiles indicates an expected call of AggregateTiles -func (mr *MockdatabaseNamespaceMockRecorder) AggregateTiles(sourceNs, opts interface{}) *gomock.Call { +func (mr *MockdatabaseNamespaceMockRecorder) AggregateTiles(ctx, sourceNs, opts interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateTiles", reflect.TypeOf((*MockdatabaseNamespace)(nil).AggregateTiles), sourceNs, opts) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateTiles", reflect.TypeOf((*MockdatabaseNamespace)(nil).AggregateTiles), ctx, sourceNs, opts) } // ReadableShardAt mocks base method @@ -2331,18 +2331,18 @@ func (mr *MockdatabaseShardMockRecorder) DocRef(id interface{}) *gomock.Call { } // AggregateTiles mocks base method -func (m *MockdatabaseShard) AggregateTiles(sourceNsID ident.ID, targetNs Namespace, shardID uint32, blockReaders []fs.DataFileSetReader, writer fs.StreamingWriter, sourceBlockVolumes []shardBlockVolume, onFlushSeries persist.OnFlushSeries, opts AggregateTilesOptions) (int64, error) { +func (m *MockdatabaseShard) AggregateTiles(ctx context.Context, sourceNs, targetNs Namespace, shardID uint32, blockReaders []fs.DataFileSetReader, writer fs.StreamingWriter, sourceBlockVolumes []shardBlockVolume, onFlushSeries persist.OnFlushSeries, opts AggregateTilesOptions) (int64, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AggregateTiles", sourceNsID, targetNs, shardID, blockReaders, writer, sourceBlockVolumes, onFlushSeries, opts) + ret := m.ctrl.Call(m, "AggregateTiles", ctx, sourceNs, targetNs, shardID, blockReaders, writer, sourceBlockVolumes, onFlushSeries, opts) ret0, _ := ret[0].(int64) ret1, _ := ret[1].(error) return ret0, ret1 } // AggregateTiles indicates an expected call of AggregateTiles -func (mr *MockdatabaseShardMockRecorder) AggregateTiles(sourceNsID, targetNs, shardID, blockReaders, writer, sourceBlockVolumes, onFlushSeries, opts interface{}) *gomock.Call { +func (mr *MockdatabaseShardMockRecorder) AggregateTiles(ctx, sourceNs, targetNs, shardID, blockReaders, writer, sourceBlockVolumes, onFlushSeries, opts interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateTiles", reflect.TypeOf((*MockdatabaseShard)(nil).AggregateTiles), sourceNsID, targetNs, shardID, blockReaders, writer, sourceBlockVolumes, onFlushSeries, opts) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateTiles", reflect.TypeOf((*MockdatabaseShard)(nil).AggregateTiles), ctx, sourceNs, targetNs, shardID, blockReaders, writer, sourceBlockVolumes, onFlushSeries, opts) } // LatestVolume mocks base method @@ -5273,18 +5273,18 @@ func (m *MockTileAggregator) EXPECT() *MockTileAggregatorMockRecorder { } // AggregateTiles mocks base method -func (m *MockTileAggregator) AggregateTiles(opts AggregateTilesOptions, ns Namespace, shardID uint32, readers []fs.DataFileSetReader, writer fs.StreamingWriter, onFlushSeries persist.OnFlushSeries) (int64, error) { +func (m *MockTileAggregator) AggregateTiles(ctx context.Context, sourceNs, targetNs Namespace, shardID uint32, readers []fs.DataFileSetReader, writer fs.StreamingWriter, onFlushSeries persist.OnFlushSeries, opts AggregateTilesOptions) (int64, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AggregateTiles", opts, ns, shardID, readers, writer, onFlushSeries) + ret := m.ctrl.Call(m, "AggregateTiles", ctx, sourceNs, targetNs, shardID, readers, writer, onFlushSeries, opts) ret0, _ := ret[0].(int64) ret1, _ := ret[1].(error) return ret0, ret1 } // AggregateTiles indicates an expected call of AggregateTiles -func (mr *MockTileAggregatorMockRecorder) AggregateTiles(opts, ns, shardID, readers, writer, onFlushSeries interface{}) *gomock.Call { +func (mr *MockTileAggregatorMockRecorder) AggregateTiles(ctx, sourceNs, targetNs, shardID, readers, writer, onFlushSeries, opts interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateTiles", reflect.TypeOf((*MockTileAggregator)(nil).AggregateTiles), opts, ns, shardID, readers, writer, onFlushSeries) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateTiles", reflect.TypeOf((*MockTileAggregator)(nil).AggregateTiles), ctx, sourceNs, targetNs, shardID, readers, writer, onFlushSeries, opts) } // MockNamespaceHooks is a mock of NamespaceHooks interface diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index 81af6d2e37..e27c25a658 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -470,7 +470,11 @@ type databaseNamespace interface { WritePendingIndexInserts(pending []writes.PendingIndexInsert) error // AggregateTiles does large tile aggregation from source namespace into this namespace. - AggregateTiles(sourceNs databaseNamespace, opts AggregateTilesOptions) (int64, error) + AggregateTiles( + ctx context.Context, + sourceNs databaseNamespace, + opts AggregateTilesOptions, + ) (int64, error) // ReadableShardAt returns a shard of this namespace by shardID. ReadableShardAt(shardID uint32) (databaseShard, namespace.Context, error) @@ -662,7 +666,8 @@ type databaseShard interface { // AggregateTiles does large tile aggregation from source shards into this shard. AggregateTiles( - sourceNsID ident.ID, + ctx context.Context, + sourceNs Namespace, targetNs Namespace, shardID uint32, blockReaders []fs.DataFileSetReader, @@ -1420,12 +1425,13 @@ type AggregateTilesOptions struct { type TileAggregator interface { // AggregateTiles does tile aggregation. AggregateTiles( - opts AggregateTilesOptions, - ns Namespace, + ctx context.Context, + sourceNs, targetNs Namespace, shardID uint32, readers []fs.DataFileSetReader, writer fs.StreamingWriter, onFlushSeries persist.OnFlushSeries, + opts AggregateTilesOptions, ) (int64, error) }