diff --git a/go.mod b/go.mod index 07e2f34d1b..1a322658bf 100644 --- a/go.mod +++ b/go.mod @@ -99,6 +99,7 @@ require ( github.com/stretchr/testify v1.6.1 github.com/subosito/gotenv v1.2.1-0.20190917103637-de67a6614a4d // indirect github.com/twotwotwo/sorts v0.0.0-20160814051341-bf5c1f2b8553 + github.com/uber-go/atomic v0.0.0-00010101000000-000000000000 github.com/uber-go/tally v3.3.13+incompatible github.com/uber/jaeger-client-go v2.25.0+incompatible github.com/uber/jaeger-lib v2.2.0+incompatible diff --git a/go.sum b/go.sum index 3aac57d087..84c0e13b6d 100644 --- a/go.sum +++ b/go.sum @@ -813,6 +813,8 @@ github.com/twmb/murmur3 v1.1.4 h1:NnlAxelwOgdQDmYuV0T/K+tpDQ/8wdsDVOGmvUqBOCw= github.com/twmb/murmur3 v1.1.4/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ= github.com/twotwotwo/sorts v0.0.0-20160814051341-bf5c1f2b8553 h1:DRC1ubdb3ZmyyIeCSTxjZIQAnpLPfKVgYrLETQuOPjo= github.com/twotwotwo/sorts v0.0.0-20160814051341-bf5c1f2b8553/go.mod h1:Rj7Csq/tZ/egz+Ltc2IVpsA5309AmSMEswjkTZmq2Xc= +github.com/uber-go/atomic v1.4.0 h1:yOuPqEq4ovnhEjpHmfFwsqBXDYbQeT6Nb0bwD6XnD5o= +github.com/uber-go/atomic v1.4.0/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g= github.com/uber-go/tally v3.3.13+incompatible h1:5ic2UsDwjcWsw9jvEdWEE2XsmGCLMTt5Ukg4d74fed4= github.com/uber-go/tally v3.3.13+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyufu1cEi0jdVnRdxvjnmU= github.com/uber/jaeger-client-go v2.25.0+incompatible h1:IxcNZ7WRY1Y3G4poYlx24szfsn/3LvK9QHCq9oQw8+U= diff --git a/src/dbnode/integration/options.go b/src/dbnode/integration/options.go index 1e76698136..29e3ffbc1a 100644 --- a/src/dbnode/integration/options.go +++ b/src/dbnode/integration/options.go @@ -81,6 +81,9 @@ const ( // defaultWriteNewSeriesAsync inserts, and index series' synchronously by default. defaultWriteNewSeriesAsync = false + + // defaultReportInterval is the default time interval of reporting metrics within the system. + defaultReportInterval = time.Second ) var ( @@ -283,6 +286,12 @@ type TestOptions interface { // NowFn returns the now fn. NowFn() func() time.Time + + // SetReportInterval sets the time between reporting metrics within the system. + SetReportInterval(value time.Duration) TestOptions + + // ReportInterval returns the time between reporting metrics within the system. + ReportInterval() time.Duration } type options struct { @@ -316,6 +325,7 @@ type options struct { protoEncoding bool assertEqual assertTestDataEqual nowFn func() time.Time + reportInterval time.Duration } // NewTestOptions returns a new set of integration test options. @@ -349,6 +359,7 @@ func NewTestOptions(t *testing.T) TestOptions { useTChannelClientForWriting: defaultUseTChannelClientForWriting, useTChannelClientForTruncation: defaultUseTChannelClientForTruncation, writeNewSeriesAsync: defaultWriteNewSeriesAsync, + reportInterval: defaultReportInterval, } } @@ -382,6 +393,7 @@ func (o *options) SetID(value string) TestOptions { func (o *options) ID() string { return o.id } + func (o *options) SetTickMinimumInterval(value time.Duration) TestOptions { opts := *o opts.tickMinimumInterval = value @@ -653,3 +665,13 @@ func (o *options) SetNowFn(value func() time.Time) TestOptions { func (o *options) NowFn() func() time.Time { return o.nowFn } + +func (o *options) SetReportInterval(value time.Duration) TestOptions { + opts := *o + opts.reportInterval = value + return &opts +} + +func (o *options) ReportInterval() time.Duration { + return o.reportInterval +} diff --git a/src/dbnode/integration/run_custom_background_process_test.go b/src/dbnode/integration/run_custom_background_process_test.go new file mode 100644 index 0000000000..ae1316334b --- /dev/null +++ b/src/dbnode/integration/run_custom_background_process_test.go @@ -0,0 +1,98 @@ +// +build integration + +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package integration + +import ( + "testing" + "time" + + "github.com/m3db/m3/src/dbnode/storage" + xclock "github.com/m3db/m3/src/x/clock" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/atomic" +) + +type testBackgroundProcess struct { + executed, reported, stopped atomic.Int32 +} + +func (p *testBackgroundProcess) run() { + xclock.WaitUntil(func() bool { + return p.reported.Load() > 0 + }, time.Minute) + p.executed.Inc() + xclock.WaitUntil(func() bool { + return p.stopped.Load() > 0 + }, time.Minute) +} + +func (p *testBackgroundProcess) Start() { + go p.run() +} + +func (p *testBackgroundProcess) Stop() { + p.stopped.Inc() + return +} + +func (p *testBackgroundProcess) Report() { + p.reported.Inc() + return +} + +func TestRunCustomBackgroundProcess(t *testing.T) { + testOpts := NewTestOptions(t).SetReportInterval(time.Millisecond) + testSetup := newTestSetupWithCommitLogAndFilesystemBootstrapper(t, testOpts) + defer testSetup.Close() + + backgroundProcess := &testBackgroundProcess{} + + storageOpts := testSetup.StorageOpts(). + SetBackgroundProcessFns([]storage.NewBackgroundProcessFn{ + func(storage.Database, storage.Options) (storage.BackgroundProcess, error) { + return backgroundProcess, nil + }}) + testSetup.SetStorageOpts(storageOpts) + + log := storageOpts.InstrumentOptions().Logger() + + // Start the server. + require.NoError(t, testSetup.StartServer()) + + // Stop the server. + defer func() { + require.NoError(t, testSetup.StopServer()) + log.Debug("server is now down") + assert.Equal(t, int32(1), backgroundProcess.stopped.Load(), "failed to stop") + }() + + log.Info("waiting for the custom background process to execute") + xclock.WaitUntil(func() bool { + return backgroundProcess.executed.Load() > 0 + }, time.Minute) + + assert.Equal(t, int32(1), backgroundProcess.executed.Load(), "failed to execute") + assert.True(t, backgroundProcess.reported.Load() > 0, "failed to report") +} diff --git a/src/dbnode/integration/setup.go b/src/dbnode/integration/setup.go index f2f580c601..c10a5fa0f1 100644 --- a/src/dbnode/integration/setup.go +++ b/src/dbnode/integration/setup.go @@ -450,6 +450,9 @@ func NewTestSetup( storageOpts = storageOpts.SetDatabaseBlockOptions(blockOpts) } + storageOpts = storageOpts.SetInstrumentOptions( + storageOpts.InstrumentOptions().SetReportInterval(opts.ReportInterval())) + // Set debugging options if environment vars set if debugFilePrefix := os.Getenv("TEST_DEBUG_FILE_PREFIX"); debugFilePrefix != "" { opts = opts.SetVerifySeriesDebugFilePathPrefix(debugFilePrefix) diff --git a/src/dbnode/server/options.go b/src/dbnode/server/options.go index 38508ad948..fa2cc58c2a 100644 --- a/src/dbnode/server/options.go +++ b/src/dbnode/server/options.go @@ -30,5 +30,6 @@ type StorageOptions struct { OnColdFlush storage.OnColdFlush ForceColdWritesEnabled bool TChanNodeServerFn node.NewTChanNodeServerFn + BackgroundProcessFns []storage.NewBackgroundProcessFn NamespaceHooks storage.NamespaceHooks } diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index b265850f8e..109a780e17 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -826,6 +826,8 @@ func Run(runOpts RunOptions) { opts = opts.SetOnColdFlush(runOpts.StorageOptions.OnColdFlush) } + opts = opts.SetBackgroundProcessFns(append(opts.BackgroundProcessFns(), runOpts.StorageOptions.BackgroundProcessFns...)) + if runOpts.StorageOptions.NamespaceHooks != nil { opts = opts.SetNamespaceHooks(runOpts.StorageOptions.NamespaceHooks) } diff --git a/src/dbnode/storage/database.go b/src/dbnode/storage/database.go index ff76b06123..d46355e398 100644 --- a/src/dbnode/storage/database.go +++ b/src/dbnode/storage/database.go @@ -103,6 +103,7 @@ type db struct { state databaseState mediator databaseMediator + repairer databaseRepairer created uint64 bootstraps int @@ -229,12 +230,34 @@ func NewDatabase( zap.Error(err)) } - mediator, err := newMediator( + d.mediator, err = newMediator( d, commitLog, opts.SetInstrumentOptions(databaseIOpts)) if err != nil { return nil, err } - d.mediator = mediator + + d.repairer = newNoopDatabaseRepairer() + if opts.RepairEnabled() { + d.repairer, err = newDatabaseRepairer(d, opts) + if err != nil { + return nil, err + } + err = d.mediator.RegisterBackgroundProcess(d.repairer) + if err != nil { + return nil, err + } + } + + for _, fn := range opts.BackgroundProcessFns() { + process, err := fn(d, opts) + if err != nil { + return nil, err + } + err = d.mediator.RegisterBackgroundProcess(process) + if err != nil { + return nil, err + } + } return d, nil } @@ -1036,7 +1059,7 @@ func (d *db) IsBootstrappedAndDurable() bool { } func (d *db) Repair() error { - return d.mediator.Repair() + return d.repairer.Repair() } func (d *db) Truncate(namespace ident.ID) (int64, error) { diff --git a/src/dbnode/storage/mediator.go b/src/dbnode/storage/mediator.go index f1462a82d2..b0afb18896 100644 --- a/src/dbnode/storage/mediator.go +++ b/src/dbnode/storage/mediator.go @@ -39,8 +39,7 @@ type ( ) const ( - fileOpCheckInterval = time.Second - fileSystemProcessesCheckInterval = 100 * time.Millisecond + fileOpCheckInterval = time.Second mediatorNotOpen mediatorState = iota mediatorOpen @@ -51,7 +50,6 @@ var ( errMediatorAlreadyOpen = errors.New("mediator is already open") errMediatorNotOpen = errors.New("mediator is not open") errMediatorAlreadyClosed = errors.New("mediator is already closed") - errMediatorTimeBarrierAlreadyWaiting = errors.New("mediator time barrier already has a waiter") errMediatorTimeTriedToProgressBackwards = errors.New("mediator time tried to progress backwards") ) @@ -78,7 +76,6 @@ type mediator struct { databaseFileSystemManager databaseColdFlushManager databaseTickManager - databaseRepairer opts Options nowFn clock.NowFn @@ -88,6 +85,7 @@ type mediator struct { mediatorTimeBarrier mediatorTimeBarrier closedCh chan struct{} tickInterval time.Duration + backgroundProcesses []BackgroundProcess } // TODO(r): Consider renaming "databaseMediator" to "databaseCoordinator" @@ -123,19 +121,23 @@ func newMediator(database database, commitlog commitlog.CommitLog, opts Options) cfm := newColdFlushManager(database, pm, opts) d.databaseColdFlushManager = cfm - d.databaseRepairer = newNoopDatabaseRepairer() - if opts.RepairEnabled() { - d.databaseRepairer, err = newDatabaseRepairer(database, opts) - if err != nil { - return nil, err - } - } - d.databaseTickManager = newTickManager(database, opts) d.databaseBootstrapManager = newBootstrapManager(database, d, opts) return d, nil } +func (m *mediator) RegisterBackgroundProcess(process BackgroundProcess) error { + m.Lock() + defer m.Unlock() + + if m.state != mediatorNotOpen { + return errMediatorAlreadyOpen + } + + m.backgroundProcesses = append(m.backgroundProcesses, process) + return nil +} + func (m *mediator) Open() error { m.Lock() defer m.Unlock() @@ -143,11 +145,16 @@ func (m *mediator) Open() error { return errMediatorAlreadyOpen } m.state = mediatorOpen + go m.reportLoop() go m.ongoingFileSystemProcesses() go m.ongoingColdFlushProcesses() go m.ongoingTick() - m.databaseRepairer.Start() + + for _, process := range m.backgroundProcesses { + process.Start() + } + return nil } @@ -175,9 +182,12 @@ func (m *mediator) EnableFileOps() { func (m *mediator) Report() { m.databaseBootstrapManager.Report() - m.databaseRepairer.Report() m.databaseFileSystemManager.Report() m.databaseColdFlushManager.Report() + + for _, process := range m.backgroundProcesses { + process.Report() + } } func (m *mediator) Close() error { @@ -191,7 +201,11 @@ func (m *mediator) Close() error { } m.state = mediatorClosed close(m.closedCh) - m.databaseRepairer.Stop() + + for _, process := range m.backgroundProcesses { + process.Stop() + } + return nil } diff --git a/src/dbnode/storage/mediator_test.go b/src/dbnode/storage/mediator_test.go index 14056d4fca..53807529fe 100644 --- a/src/dbnode/storage/mediator_test.go +++ b/src/dbnode/storage/mediator_test.go @@ -24,8 +24,11 @@ import ( "testing" "time" + xclock "github.com/m3db/m3/src/x/clock" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" + "go.uber.org/atomic" ) func TestDatabaseMediatorOpenClose(t *testing.T) { @@ -38,19 +41,43 @@ func TestDatabaseMediatorOpenClose(t *testing.T) { SetBootstrapProcessProvider(nil). SetClockOptions(opts.ClockOptions().SetNowFn(func() time.Time { return now - })) + })). + SetInstrumentOptions(opts.InstrumentOptions().SetReportInterval(time.Millisecond)) db := NewMockdatabase(ctrl) db.EXPECT().Options().Return(opts).AnyTimes() db.EXPECT().OwnedNamespaces().Return(nil, nil).AnyTimes() db.EXPECT().BootstrapState().Return(DatabaseBootstrapState{}).AnyTimes() + db.EXPECT().IsBootstrappedAndDurable().Return(true).AnyTimes() m, err := newMediator(db, nil, opts) require.NoError(t, err) + var started, reported atomic.Bool + + backgroundProcess := NewMockBackgroundProcess(ctrl) + backgroundProcess.EXPECT().Report().Do(func() { + reported.Store(true) + }).AnyTimes() + gomock.InOrder( + backgroundProcess.EXPECT().Start().Do(func() { + started.Store(true) + }), + backgroundProcess.EXPECT().Stop(), + ) + + require.NoError(t, m.RegisterBackgroundProcess(backgroundProcess)) + require.Equal(t, errMediatorNotOpen, m.Close()) require.NoError(t, m.Open()) require.Equal(t, errMediatorAlreadyOpen, m.Open()) + require.Equal(t, errMediatorAlreadyOpen, m.RegisterBackgroundProcess(backgroundProcess)) + + xclock.WaitUntil(func() bool { + return started.Load() && reported.Load() + }, time.Second) + require.True(t, started.Load(), "failed to start") + require.True(t, reported.Load(), "failed to report") require.NoError(t, m.Close()) require.Equal(t, errMediatorAlreadyClosed, m.Close()) diff --git a/src/dbnode/storage/options.go b/src/dbnode/storage/options.go index 2fdcf9b0b8..26f54e8484 100644 --- a/src/dbnode/storage/options.go +++ b/src/dbnode/storage/options.go @@ -166,6 +166,7 @@ type options struct { doNotIndexWithFieldsMap map[string]string namespaceRuntimeOptsMgrRegistry namespace.RuntimeOptionsManagerRegistry mediatorTickInterval time.Duration + newBackgroundProcessFns []NewBackgroundProcessFn namespaceHooks NamespaceHooks } @@ -815,6 +816,16 @@ func (o *options) MediatorTickInterval() time.Duration { return o.mediatorTickInterval } +func (o *options) SetBackgroundProcessFns(fns []NewBackgroundProcessFn) Options { + opts := *o + opts.newBackgroundProcessFns = fns + return &opts +} + +func (o *options) BackgroundProcessFns() []NewBackgroundProcessFn { + return o.newBackgroundProcessFns +} + func (o *options) SetNamespaceHooks(value NamespaceHooks) Options { opts := *o opts.namespaceHooks = value @@ -827,7 +838,7 @@ func (o *options) NamespaceHooks() NamespaceHooks { type noOpColdFlush struct{} -func (n *noOpColdFlush) ColdFlushNamespace(ns Namespace) (OnColdFlushNamespace, error) { +func (n *noOpColdFlush) ColdFlushNamespace(Namespace) (OnColdFlushNamespace, error) { return &persist.NoOpColdFlushNamespace{}, nil } diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index 0c5a146517..2747462ad3 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -3019,6 +3019,65 @@ func (mr *MockdatabaseShardRepairerMockRecorder) Repair(ctx, nsCtx, nsMeta, tr, return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Repair", reflect.TypeOf((*MockdatabaseShardRepairer)(nil).Repair), ctx, nsCtx, nsMeta, tr, shard) } +// MockBackgroundProcess is a mock of BackgroundProcess interface +type MockBackgroundProcess struct { + ctrl *gomock.Controller + recorder *MockBackgroundProcessMockRecorder +} + +// MockBackgroundProcessMockRecorder is the mock recorder for MockBackgroundProcess +type MockBackgroundProcessMockRecorder struct { + mock *MockBackgroundProcess +} + +// NewMockBackgroundProcess creates a new mock instance +func NewMockBackgroundProcess(ctrl *gomock.Controller) *MockBackgroundProcess { + mock := &MockBackgroundProcess{ctrl: ctrl} + mock.recorder = &MockBackgroundProcessMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockBackgroundProcess) EXPECT() *MockBackgroundProcessMockRecorder { + return m.recorder +} + +// Start mocks base method +func (m *MockBackgroundProcess) Start() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Start") +} + +// Start indicates an expected call of Start +func (mr *MockBackgroundProcessMockRecorder) Start() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockBackgroundProcess)(nil).Start)) +} + +// Stop mocks base method +func (m *MockBackgroundProcess) Stop() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Stop") +} + +// Stop indicates an expected call of Stop +func (mr *MockBackgroundProcessMockRecorder) Stop() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockBackgroundProcess)(nil).Stop)) +} + +// Report mocks base method +func (m *MockBackgroundProcess) Report() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Report") +} + +// Report indicates an expected call of Report +func (mr *MockBackgroundProcessMockRecorder) Report() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Report", reflect.TypeOf((*MockBackgroundProcess)(nil).Report)) +} + // MockdatabaseRepairer is a mock of databaseRepairer interface type MockdatabaseRepairer struct { ctrl *gomock.Controller @@ -3066,6 +3125,18 @@ func (mr *MockdatabaseRepairerMockRecorder) Stop() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockdatabaseRepairer)(nil).Stop)) } +// Report mocks base method +func (m *MockdatabaseRepairer) Report() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Report") +} + +// Report indicates an expected call of Report +func (mr *MockdatabaseRepairerMockRecorder) Report() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Report", reflect.TypeOf((*MockdatabaseRepairer)(nil).Report)) +} + // Repair mocks base method func (m *MockdatabaseRepairer) Repair() error { m.ctrl.T.Helper() @@ -3080,18 +3151,6 @@ func (mr *MockdatabaseRepairerMockRecorder) Repair() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Repair", reflect.TypeOf((*MockdatabaseRepairer)(nil).Repair)) } -// Report mocks base method -func (m *MockdatabaseRepairer) Report() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "Report") -} - -// Report indicates an expected call of Report -func (mr *MockdatabaseRepairerMockRecorder) Report() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Report", reflect.TypeOf((*MockdatabaseRepairer)(nil).Report)) -} - // MockdatabaseTickManager is a mock of databaseTickManager interface type MockdatabaseTickManager struct { ctrl *gomock.Controller @@ -3166,6 +3225,20 @@ func (mr *MockdatabaseMediatorMockRecorder) Open() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Open", reflect.TypeOf((*MockdatabaseMediator)(nil).Open)) } +// RegisterBackgroundProcess mocks base method +func (m *MockdatabaseMediator) RegisterBackgroundProcess(process BackgroundProcess) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RegisterBackgroundProcess", process) + ret0, _ := ret[0].(error) + return ret0 +} + +// RegisterBackgroundProcess indicates an expected call of RegisterBackgroundProcess +func (mr *MockdatabaseMediatorMockRecorder) RegisterBackgroundProcess(process interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterBackgroundProcess", reflect.TypeOf((*MockdatabaseMediator)(nil).RegisterBackgroundProcess), process) +} + // IsBootstrapped mocks base method func (m *MockdatabaseMediator) IsBootstrapped() bool { m.ctrl.T.Helper() @@ -3248,20 +3321,6 @@ func (mr *MockdatabaseMediatorMockRecorder) Tick(forceType, startTime interface{ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Tick", reflect.TypeOf((*MockdatabaseMediator)(nil).Tick), forceType, startTime) } -// Repair mocks base method -func (m *MockdatabaseMediator) Repair() error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Repair") - ret0, _ := ret[0].(error) - return ret0 -} - -// Repair indicates an expected call of Repair -func (mr *MockdatabaseMediatorMockRecorder) Repair() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Repair", reflect.TypeOf((*MockdatabaseMediator)(nil).Repair)) -} - // Close mocks base method func (m *MockdatabaseMediator) Close() error { m.ctrl.T.Helper() @@ -4619,6 +4678,34 @@ func (mr *MockOptionsMockRecorder) MediatorTickInterval() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MediatorTickInterval", reflect.TypeOf((*MockOptions)(nil).MediatorTickInterval)) } +// SetBackgroundProcessFns mocks base method +func (m *MockOptions) SetBackgroundProcessFns(arg0 []NewBackgroundProcessFn) Options { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetBackgroundProcessFns", arg0) + ret0, _ := ret[0].(Options) + return ret0 +} + +// SetBackgroundProcessFns indicates an expected call of SetBackgroundProcessFns +func (mr *MockOptionsMockRecorder) SetBackgroundProcessFns(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetBackgroundProcessFns", reflect.TypeOf((*MockOptions)(nil).SetBackgroundProcessFns), arg0) +} + +// BackgroundProcessFns mocks base method +func (m *MockOptions) BackgroundProcessFns() []NewBackgroundProcessFn { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "BackgroundProcessFns") + ret0, _ := ret[0].([]NewBackgroundProcessFn) + return ret0 +} + +// BackgroundProcessFns indicates an expected call of BackgroundProcessFns +func (mr *MockOptionsMockRecorder) BackgroundProcessFns() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BackgroundProcessFns", reflect.TypeOf((*MockOptions)(nil).BackgroundProcessFns)) +} + // SetNamespaceHooks mocks base method func (m *MockOptions) SetNamespaceHooks(hooks NamespaceHooks) Options { m.ctrl.T.Helper() diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index 044f133286..004503d005 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -874,21 +874,26 @@ type databaseShardRepairer interface { ) (repair.MetadataComparisonResult, error) } -// databaseRepairer repairs in-memory database data. -type databaseRepairer interface { - // Start starts the repair process. +// BackgroundProcess is a background process that is run by the database. +type BackgroundProcess interface { + // Start launches the BackgroundProcess to run asynchronously. Start() - // Stop stops the repair process. + // Stop stops the BackgroundProcess. Stop() - // Repair repairs in-memory data. - Repair() error - // Report reports runtime information. Report() } +// databaseRepairer repairs in-memory database data. +type databaseRepairer interface { + BackgroundProcess + + // Repair repairs in-memory data. + Repair() error +} + // databaseTickManager performs periodic ticking. type databaseTickManager interface { // Tick performs maintenance operations, restarting the current @@ -902,6 +907,9 @@ type databaseMediator interface { // Open opens the mediator. Open() error + // RegisterBackgroundProcess registers a BackgroundProcess to be executed by the mediator. Must happen before Open(). + RegisterBackgroundProcess(process BackgroundProcess) error + // IsBootstrapped returns whether the database is bootstrapped. IsBootstrapped() bool @@ -921,9 +929,6 @@ type databaseMediator interface { // Tick performs a tick. Tick(forceType forceType, startTime time.Time) error - // Repair repairs the database. - Repair() error - // Close closes the mediator. Close() error @@ -1215,12 +1220,18 @@ type Options interface { // NamespaceRuntimeOptionsManagerRegistry returns the namespace runtime options manager. NamespaceRuntimeOptionsManagerRegistry() namespace.RuntimeOptionsManagerRegistry - // SetMediatorTickInterval sets the ticking interval for the medidator. + // SetMediatorTickInterval sets the ticking interval for the mediator. SetMediatorTickInterval(value time.Duration) Options // MediatorTickInterval returns the ticking interval for the mediator. MediatorTickInterval() time.Duration + // SetBackgroundProcessFns sets the list of functions that create background processes for the database. + SetBackgroundProcessFns([]NewBackgroundProcessFn) Options + + // BackgroundProcessFns returns the list of functions that create background processes for the database. + BackgroundProcessFns() []NewBackgroundProcessFn + // SetNamespaceHooks sets the NamespaceHooks. SetNamespaceHooks(hooks NamespaceHooks) Options @@ -1285,6 +1296,9 @@ type newFSMergeWithMemFn func( dirtySeriesToWrite map[xtime.UnixNano]*idList, ) fs.MergeWith +// NewBackgroundProcessFn is a function that creates and returns a new BackgroundProcess. +type NewBackgroundProcessFn func(Database, Options) (BackgroundProcess, error) + // AggregateTilesOptions is the options for large tile aggregation. type AggregateTilesOptions struct { // Start and End specify the aggregation window. diff --git a/src/x/instrument/types.go b/src/x/instrument/types.go index 98fab9b969..4f7099cd6f 100644 --- a/src/x/instrument/types.go +++ b/src/x/instrument/types.go @@ -62,13 +62,13 @@ type Options interface { // when building timers from timer options. SetTimerOptions(value TimerOptions) Options - // SetTimerOptions returns the metrics timer options to used + // TimerOptions returns the metrics timer options to used // when building timers from timer options. TimerOptions() TimerOptions - // ReportInterval sets the time between reporting metrics within the system. + // SetReportInterval sets the time between reporting metrics within the system. SetReportInterval(time.Duration) Options - // GetReportInterval returns the time between reporting metrics within the system. + // ReportInterval returns the time between reporting metrics within the system. ReportInterval() time.Duration }