diff --git a/integrations/grafana/m3db_dashboard.json b/integrations/grafana/m3db_dashboard.json index 021c2a6663..48075d16f1 100644 --- a/integrations/grafana/m3db_dashboard.json +++ b/integrations/grafana/m3db_dashboard.json @@ -5126,11 +5126,11 @@ "steppedLine": false, "targets": [ { - "expr": "sum(build_information{}) by (revision)", + "expr": "sum(build_information{}) by (build_version, revision)", "format": "time_series", "intervalFactor": 1, "key": 0.5783520603949805, - "legendFormat": "{{revision}}", + "legendFormat": "{{build_version}} ({{revision}})", "refId": "A" } ], diff --git a/src/dbnode/integration/bootstrap_after_buffer_rotation_regression_test.go b/src/dbnode/integration/bootstrap_after_buffer_rotation_regression_test.go index 17a38e70ee..77c778e8e3 100644 --- a/src/dbnode/integration/bootstrap_after_buffer_rotation_regression_test.go +++ b/src/dbnode/integration/bootstrap_after_buffer_rotation_regression_test.go @@ -34,6 +34,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper" bcl "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/commitlog" "github.com/m3db/m3/src/dbnode/ts" + "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" xtime "github.com/m3db/m3/src/x/time" @@ -122,6 +123,7 @@ func TestBootstrapAfterBufferRotation(t *testing.T) { test := newTestBootstrapperSource(testBootstrapperSourceOptions{ read: func( + ctx context.Context, namespaces bootstrap.Namespaces, ) (bootstrap.NamespaceResults, error) { <-signalCh @@ -131,7 +133,7 @@ func TestBootstrapAfterBufferRotation(t *testing.T) { if err != nil { return bootstrap.NamespaceResults{}, err } - return bs.Bootstrap(namespaces) + return bs.Bootstrap(ctx, namespaces) }, }, bootstrapOpts, bs) diff --git a/src/dbnode/integration/bootstrap_before_buffer_rotation_no_tick_regression_test.go b/src/dbnode/integration/bootstrap_before_buffer_rotation_no_tick_regression_test.go index 80ef12e351..83badce1de 100644 --- a/src/dbnode/integration/bootstrap_before_buffer_rotation_no_tick_regression_test.go +++ b/src/dbnode/integration/bootstrap_before_buffer_rotation_no_tick_regression_test.go @@ -34,6 +34,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper" bcl "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/commitlog" "github.com/m3db/m3/src/dbnode/ts" + "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" xtime "github.com/m3db/m3/src/x/time" @@ -136,6 +137,7 @@ func TestBootstrapBeforeBufferRotationNoTick(t *testing.T) { test := newTestBootstrapperSource(testBootstrapperSourceOptions{ read: func( + ctx context.Context, namespaces bootstrap.Namespaces, ) (bootstrap.NamespaceResults, error) { <-signalCh @@ -145,7 +147,7 @@ func TestBootstrapBeforeBufferRotationNoTick(t *testing.T) { if err != nil { return bootstrap.NamespaceResults{}, err } - return bs.Bootstrap(namespaces) + return bs.Bootstrap(ctx, namespaces) }, }, bootstrapOpts, bs) diff --git a/src/dbnode/integration/bootstrap_helpers.go b/src/dbnode/integration/bootstrap_helpers.go index f11a4a3ac2..58b4477373 100644 --- a/src/dbnode/integration/bootstrap_helpers.go +++ b/src/dbnode/integration/bootstrap_helpers.go @@ -33,6 +33,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper" bcl "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/commitlog" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" + "github.com/m3db/m3/src/x/context" "github.com/stretchr/testify/require" ) @@ -63,7 +64,7 @@ func newTestBootstrapperSource( if opts.read != nil { src.read = opts.read } else { - src.read = func(namespaces bootstrap.Namespaces) (bootstrap.NamespaceResults, error) { + src.read = func(ctx context.Context, namespaces bootstrap.Namespaces) (bootstrap.NamespaceResults, error) { return bootstrap.NewNamespaceResults(namespaces), nil } } @@ -100,7 +101,7 @@ type testBootstrapper struct { type testBootstrapperSourceOptions struct { availableData func(namespace.Metadata, result.ShardTimeRanges, bootstrap.RunOptions) (result.ShardTimeRanges, error) availableIndex func(namespace.Metadata, result.ShardTimeRanges, bootstrap.RunOptions) (result.ShardTimeRanges, error) - read func(namespaces bootstrap.Namespaces) (bootstrap.NamespaceResults, error) + read func(ctx context.Context, namespaces bootstrap.Namespaces) (bootstrap.NamespaceResults, error) } var _ bootstrap.Source = &testBootstrapperSource{} @@ -108,7 +109,7 @@ var _ bootstrap.Source = &testBootstrapperSource{} type testBootstrapperSource struct { availableData func(namespace.Metadata, result.ShardTimeRanges, bootstrap.RunOptions) (result.ShardTimeRanges, error) availableIndex func(namespace.Metadata, result.ShardTimeRanges, bootstrap.RunOptions) (result.ShardTimeRanges, error) - read func(namespaces bootstrap.Namespaces) (bootstrap.NamespaceResults, error) + read func(ctx context.Context, namespaces bootstrap.Namespaces) (bootstrap.NamespaceResults, error) } func (t testBootstrapperSource) AvailableData( @@ -128,9 +129,10 @@ func (t testBootstrapperSource) AvailableIndex( } func (t testBootstrapperSource) Read( + ctx context.Context, namespaces bootstrap.Namespaces, ) (bootstrap.NamespaceResults, error) { - return t.read(namespaces) + return t.read(ctx, namespaces) } func (t testBootstrapperSource) String() string { diff --git a/src/dbnode/persist/fs/persist_manager.go b/src/dbnode/persist/fs/persist_manager.go index b54ac365ec..61563cd608 100644 --- a/src/dbnode/persist/fs/persist_manager.go +++ b/src/dbnode/persist/fs/persist_manager.go @@ -200,7 +200,7 @@ func NewPersistManager(opts Options) (persist.Manager, error) { func (pm *persistManager) reset() { pm.status = persistManagerIdle - pm.start = timeZero + pm.start = timeZero pm.count = 0 pm.bytesWritten = 0 pm.worked = 0 diff --git a/src/dbnode/storage/bootstrap.go b/src/dbnode/storage/bootstrap.go index b58ff4730c..866f4bc11c 100644 --- a/src/dbnode/storage/bootstrap.go +++ b/src/dbnode/storage/bootstrap.go @@ -28,6 +28,7 @@ import ( "github.com/m3db/m3/src/dbnode/clock" "github.com/m3db/m3/src/dbnode/storage/bootstrap" + "github.com/m3db/m3/src/x/context" xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/instrument" @@ -215,6 +216,9 @@ type bootstrapNamespace struct { } func (m *bootstrapManager) bootstrap() error { + ctx := context.NewContext() + defer ctx.Close() + // NB(r): construct new instance of the bootstrap process to avoid // state being kept around by bootstrappers. process, err := m.processProvider.Provide() @@ -256,7 +260,7 @@ func (m *bootstrapManager) bootstrap() error { i, namespace := i, namespace prepareWg.Add(1) go func() { - shards, err := namespace.PrepareBootstrap() + shards, err := namespace.PrepareBootstrap(ctx) prepareLock.Lock() defer func() { @@ -338,7 +342,7 @@ func (m *bootstrapManager) bootstrap() error { m.log.Info("bootstrap started", logFields...) // Run the bootstrap. - bootstrapResult, err := process.Run(start, targets) + bootstrapResult, err := process.Run(ctx, start, targets) bootstrapDuration := m.nowFn().Sub(start) m.bootstrapDuration.Record(bootstrapDuration) @@ -369,7 +373,7 @@ func (m *bootstrapManager) bootstrap() error { return err } - if err := namespace.Bootstrap(result); err != nil { + if err := namespace.Bootstrap(ctx, result); err != nil { m.log.Info("bootstrap error", append(logFields, []zapcore.Field{ zap.String("namespace", id.String()), zap.Error(err), diff --git a/src/dbnode/storage/bootstrap/bootstrap_mock.go b/src/dbnode/storage/bootstrap/bootstrap_mock.go index 374b951130..d1d8c47423 100644 --- a/src/dbnode/storage/bootstrap/bootstrap_mock.go +++ b/src/dbnode/storage/bootstrap/bootstrap_mock.go @@ -31,6 +31,7 @@ import ( "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/topology" + "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" "github.com/golang/mock/gomock" @@ -124,18 +125,18 @@ func (m *MockProcess) EXPECT() *MockProcessMockRecorder { } // Run mocks base method -func (m *MockProcess) Run(start time.Time, namespaces []ProcessNamespace) (NamespaceResults, error) { +func (m *MockProcess) Run(ctx context.Context, start time.Time, namespaces []ProcessNamespace) (NamespaceResults, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Run", start, namespaces) + ret := m.ctrl.Call(m, "Run", ctx, start, namespaces) ret0, _ := ret[0].(NamespaceResults) ret1, _ := ret[1].(error) return ret0, ret1 } // Run indicates an expected call of Run -func (mr *MockProcessMockRecorder) Run(start, namespaces interface{}) *gomock.Call { +func (mr *MockProcessMockRecorder) Run(ctx, start, namespaces interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockProcess)(nil).Run), start, namespaces) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockProcess)(nil).Run), ctx, start, namespaces) } // MockNamespaceDataAccumulator is a mock of NamespaceDataAccumulator interface @@ -525,18 +526,18 @@ func (mr *MockBootstrapperMockRecorder) String() *gomock.Call { } // Bootstrap mocks base method -func (m *MockBootstrapper) Bootstrap(namespaces Namespaces) (NamespaceResults, error) { +func (m *MockBootstrapper) Bootstrap(ctx context.Context, namespaces Namespaces) (NamespaceResults, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Bootstrap", namespaces) + ret := m.ctrl.Call(m, "Bootstrap", ctx, namespaces) ret0, _ := ret[0].(NamespaceResults) ret1, _ := ret[1].(error) return ret0, ret1 } // Bootstrap indicates an expected call of Bootstrap -func (mr *MockBootstrapperMockRecorder) Bootstrap(namespaces interface{}) *gomock.Call { +func (mr *MockBootstrapperMockRecorder) Bootstrap(ctx, namespaces interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Bootstrap", reflect.TypeOf((*MockBootstrapper)(nil).Bootstrap), namespaces) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Bootstrap", reflect.TypeOf((*MockBootstrapper)(nil).Bootstrap), ctx, namespaces) } // MockSource is a mock of Source interface @@ -593,16 +594,16 @@ func (mr *MockSourceMockRecorder) AvailableIndex(ns, shardsTimeRanges, opts inte } // Read mocks base method -func (m *MockSource) Read(namespaces Namespaces) (NamespaceResults, error) { +func (m *MockSource) Read(ctx context.Context, namespaces Namespaces) (NamespaceResults, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Read", namespaces) + ret := m.ctrl.Call(m, "Read", ctx, namespaces) ret0, _ := ret[0].(NamespaceResults) ret1, _ := ret[1].(error) return ret0, ret1 } // Read indicates an expected call of Read -func (mr *MockSourceMockRecorder) Read(namespaces interface{}) *gomock.Call { +func (mr *MockSourceMockRecorder) Read(ctx, namespaces interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Read", reflect.TypeOf((*MockSource)(nil).Read), namespaces) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Read", reflect.TypeOf((*MockSource)(nil).Read), ctx, namespaces) } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/base.go b/src/dbnode/storage/bootstrap/bootstrapper/base.go index 0ae3c0cd58..5d968a9237 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/base.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/base.go @@ -25,6 +25,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/bootstrap" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" + "github.com/m3db/m3/src/x/context" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -75,6 +76,7 @@ func (b baseBootstrapper) String() string { } func (b baseBootstrapper) Bootstrap( + ctx context.Context, namespaces bootstrap.Namespaces, ) (bootstrap.NamespaceResults, error) { logFields := []zapcore.Field{ @@ -135,7 +137,7 @@ func (b baseBootstrapper) Bootstrap( b.log.Info("bootstrap from source started", logFields...) // Run the bootstrap source. - currResults, err := b.src.Read(curr) + currResults, err := b.src.Read(ctx, curr) logFields = append(logFields, zap.Duration("took", nowFn().Sub(begin))) if err != nil { @@ -164,7 +166,7 @@ func (b baseBootstrapper) Bootstrap( // If there are some time ranges the current bootstrapper could not fulfill, // that we can attempt then pass it along to the next bootstrapper. if next.Namespaces.Len() > 0 { - nextResults, err := b.next.Bootstrap(next) + nextResults, err := b.next.Bootstrap(ctx, next) if err != nil { return bootstrap.NamespaceResults{}, err } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/base_test.go b/src/dbnode/storage/bootstrap/bootstrapper/base_test.go index fcd9fb131e..62f99afb7a 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/base_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/base_test.go @@ -27,6 +27,7 @@ import ( "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/storage/bootstrap" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" + "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" xtime "github.com/m3db/m3/src/x/time" @@ -146,8 +147,12 @@ func testBaseBootstrapperEmptyRange(t *testing.T, withIndex bool) { defer tester.Finish() matcher := bootstrap.NamespaceMatcher{Namespaces: tester.Namespaces} - src.EXPECT().Read(matcher).DoAndReturn( - func(namespaces bootstrap.Namespaces) (bootstrap.NamespaceResults, error) { + src.EXPECT(). + Read(gomock.Any(), matcher). + DoAndReturn(func( + ctx context.Context, + namespaces bootstrap.Namespaces, + ) (bootstrap.NamespaceResults, error) { return nsResults, nil }) @@ -190,8 +195,12 @@ func testBaseBootstrapperCurrentNoUnfulfilled(t *testing.T, withIndex bool) { defer tester.Finish() matcher := bootstrap.NamespaceMatcher{Namespaces: tester.Namespaces} - src.EXPECT().Read(matcher).DoAndReturn( - func(namespaces bootstrap.Namespaces) (bootstrap.NamespaceResults, error) { + src.EXPECT(). + Read(gomock.Any(), matcher). + DoAndReturn(func( + ctx context.Context, + namespaces bootstrap.Namespaces, + ) (bootstrap.NamespaceResults, error) { return nsResults, nil }) @@ -236,8 +245,8 @@ func testBaseBootstrapperCurrentSomeUnfulfilled(t *testing.T, withIndex bool) { defer tester.Finish() matcher := bootstrap.NamespaceMatcher{Namespaces: tester.Namespaces} - src.EXPECT().Read(matcher).Return(currResult, nil) - next.EXPECT().Bootstrap(matcher).Return(nextResult, nil) + src.EXPECT().Read(gomock.Any(), matcher).Return(currResult, nil) + next.EXPECT().Bootstrap(gomock.Any(), matcher).Return(nextResult, nil) tester.TestBootstrapWith(base) tester.TestUnfulfilledForNamespaceIsEmpty(testNs) @@ -270,8 +279,8 @@ func testBasebootstrapperNext( emptyResult := testEmptyResult(testNs) nextResult := testResult(testNs, withIndex, testShard, nextUnfulfilled) matcher := bootstrap.NamespaceMatcher{Namespaces: tester.Namespaces} - src.EXPECT().Read(matcher).Return(emptyResult, nil) - next.EXPECT().Bootstrap(matcher).Return(nextResult, nil) + src.EXPECT().Read(gomock.Any(), matcher).Return(emptyResult, nil) + next.EXPECT().Bootstrap(gomock.Any(), matcher).Return(nextResult, nil) tester.TestBootstrapWith(base) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go index a594de9e5b..0ece888afb 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go @@ -37,6 +37,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/series" "github.com/m3db/m3/src/dbnode/topology" + "github.com/m3db/m3/src/dbnode/tracepoint" "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/x/checked" "github.com/m3db/m3/src/x/context" @@ -168,8 +169,12 @@ type readNamespaceResult struct { // TODO(rartoul): Make this take the SnapshotMetadata files into account to reduce the // number of commitlogs / snapshots that we need to read. func (s *commitLogSource) Read( + ctx context.Context, namespaces bootstrap.Namespaces, ) (bootstrap.NamespaceResults, error) { + ctx, span, _ := ctx.StartSampledTraceSpan(tracepoint.BootstrapperCommitLogSourceRead) + defer span.Finish() + timeRangesEmpty := true for _, elem := range namespaces.Namespaces.Iter() { namespace := elem.Value() @@ -202,6 +207,8 @@ func (s *commitLogSource) Read( startSnapshotsRead := s.nowFn() s.log.Info("read snapshots start") + span.LogEvent("read_snapshots_start") + for _, elem := range namespaceIter { ns := elem.Value() accumulator := ns.DataAccumulator @@ -252,6 +259,7 @@ func (s *commitLogSource) Read( s.log.Info("read snapshots done", zap.Duration("took", s.nowFn().Sub(startSnapshotsRead))) + span.LogEvent("read_snapshots_done") // Setup the series accumulator pipeline. var ( @@ -294,17 +302,19 @@ func (s *commitLogSource) Read( startCommitLogsRead = s.nowFn() ) s.log.Info("read commit logs start") + span.LogEvent("read_commitlogs_start") defer func() { datapointsRead := 0 for _, worker := range workers { datapointsRead += worker.datapointsRead } - s.log.Info("read finished", + s.log.Info("read commit logs done", zap.Stringer("took", s.nowFn().Sub(startCommitLogsRead)), zap.Int("datapointsRead", datapointsRead), zap.Int("datapointsSkippedNotBootstrappingNamespace", datapointsSkippedNotBootstrappingNamespace), zap.Int("datapointsSkippedNotBootstrappingShard", datapointsSkippedNotBootstrappingShard), zap.Int("datapointsSkippedShardNoLongerOwned", datapointsSkippedShardNoLongerOwned)) + span.LogEvent("read_commitlogs_done") }() iter, corruptFiles, err := s.newIteratorFn(iterOpts) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go index 86afc0539c..b51069f090 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go @@ -104,7 +104,10 @@ func TestReadErrorOnNewIteratorError(t *testing.T) { tester := bootstrap.BuildNamespacesTester(t, testDefaultRunOpts, target, md) defer tester.Finish() - res, err := src.Read(tester.Namespaces) + ctx := context.NewContext() + defer ctx.Close() + + res, err := src.Read(ctx, tester.Namespaces) require.Error(t, err) require.Nil(t, res.Results) tester.EnsureNoLoadedBlocks() diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_index_test.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_index_test.go index 294ba2d2c6..eaa5e62511 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_index_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_index_test.go @@ -32,6 +32,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/ts" idxpersist "github.com/m3db/m3/src/m3ninx/persist" + "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/pool" "github.com/m3db/m3/src/x/serialize" @@ -412,7 +413,10 @@ func TestBootstrapIndexFailsForDecodedTags(t *testing.T) { tester := bootstrap.BuildNamespacesTester(t, testDefaultRunOpts, targetRanges, md1) defer tester.Finish() - _, err = src.Read(tester.Namespaces) + ctx := context.NewContext() + defer ctx.Close() + + _, err = src.Read(ctx, tester.Namespaces) require.Error(t, err) tester.EnsureNoLoadedBlocks() diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_prop_test.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_prop_test.go index 529e5bd7db..19c680ab8e 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_prop_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_prop_test.go @@ -370,7 +370,10 @@ func TestCommitLogSourcePropCorrectlyBootstrapsFromCommitlog(t *testing.T) { runOpts := testDefaultRunOpts.SetInitialTopologyState(initialTopoState) tester := bootstrap.BuildNamespacesTester(t, runOpts, shardTimeRanges, nsMeta) - bootstrapResults, err := source.Bootstrap(tester.Namespaces) + ctx := context.NewContext() + defer ctx.Close() + + bootstrapResults, err := source.Bootstrap(ctx, tester.Namespaces) if err != nil { return false, err } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go index a3a7c8adac..eb576066ca 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go @@ -36,11 +36,13 @@ import ( "github.com/m3db/m3/src/dbnode/storage/index" "github.com/m3db/m3/src/dbnode/storage/index/convert" "github.com/m3db/m3/src/dbnode/storage/series" + "github.com/m3db/m3/src/dbnode/tracepoint" "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/m3ninx/index/segment" idxpersist "github.com/m3db/m3/src/m3ninx/persist" "github.com/m3db/m3/src/x/checked" + "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/pool" @@ -129,8 +131,12 @@ func (s *fileSystemSource) AvailableIndex( } func (s *fileSystemSource) Read( + ctx context.Context, namespaces bootstrap.Namespaces, ) (bootstrap.NamespaceResults, error) { + ctx, span, _ := ctx.StartSampledTraceSpan(tracepoint.BootstrapperFilesystemSourceRead) + defer span.Finish() + results := bootstrap.NamespaceResults{ Results: bootstrap.NewNamespaceResultsMap(bootstrap.NamespaceResultsMapOptions{}), } @@ -151,6 +157,7 @@ func (s *fileSystemSource) Read( } s.log.Info("bootstrapping time series data start", dataLogFields...) + span.LogEvent("bootstrap_data_start") for _, elem := range namespaces.Namespaces.Iter() { namespace := elem.Value() md := namespace.Metadata @@ -170,9 +177,11 @@ func (s *fileSystemSource) Read( } s.log.Info("bootstrapping time series data success", append(dataLogFields, zap.Duration("took", nowFn().Sub(start)))...) + span.LogEvent("bootstrap_data_done") start = nowFn() s.log.Info("bootstrapping index metadata start") + span.LogEvent("bootstrap_index_start") for _, elem := range namespaces.Namespaces.Iter() { namespace := elem.Value() md := namespace.Metadata @@ -202,6 +211,7 @@ func (s *fileSystemSource) Read( } s.log.Info("bootstrapping index metadata success", zap.Stringer("took", nowFn().Sub(start))) + span.LogEvent("bootstrap_index_done") return results, nil } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/noop.go b/src/dbnode/storage/bootstrap/bootstrapper/noop.go index 24aad52fd5..e8e2aaf8f1 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/noop.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/noop.go @@ -24,6 +24,7 @@ import ( "fmt" "github.com/m3db/m3/src/dbnode/storage/bootstrap" + "github.com/m3db/m3/src/x/context" ) const ( @@ -61,6 +62,7 @@ func (noop noOpNoneBootstrapper) String() string { } func (noop noOpNoneBootstrapper) Bootstrap( + ctx context.Context, namespaces bootstrap.Namespaces, ) (bootstrap.NamespaceResults, error) { results := bootstrap.NewNamespaceResults(namespaces) @@ -117,6 +119,7 @@ func (noop noOpAllBootstrapper) String() string { } func (noop noOpAllBootstrapper) Bootstrap( + ctx context.Context, namespaces bootstrap.Namespaces, ) (bootstrap.NamespaceResults, error) { return bootstrap.NewNamespaceResults(namespaces), nil diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index 344c357ef5..ba78bf3a31 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -39,8 +39,10 @@ import ( "github.com/m3db/m3/src/dbnode/storage/index/convert" "github.com/m3db/m3/src/dbnode/storage/series" "github.com/m3db/m3/src/dbnode/topology" + "github.com/m3db/m3/src/dbnode/tracepoint" "github.com/m3db/m3/src/m3ninx/doc" idxpersist "github.com/m3db/m3/src/m3ninx/persist" + "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" @@ -114,8 +116,12 @@ func (s *peersSource) AvailableIndex( } func (s *peersSource) Read( + ctx context.Context, namespaces bootstrap.Namespaces, ) (bootstrap.NamespaceResults, error) { + ctx, span, _ := ctx.StartSampledTraceSpan(tracepoint.BootstrapperPeersSourceRead) + defer span.Finish() + results := bootstrap.NamespaceResults{ Results: bootstrap.NewNamespaceResultsMap(bootstrap.NamespaceResultsMapOptions{}), } @@ -125,6 +131,7 @@ func (s *peersSource) Read( nowFn := s.opts.ResultOptions().ClockOptions().NowFn() start := nowFn() s.log.Info("bootstrapping time series data start") + span.LogEvent("bootstrap_data_start") for _, elem := range namespaces.Namespaces.Iter() { namespace := elem.Value() md := namespace.Metadata @@ -144,6 +151,7 @@ func (s *peersSource) Read( } s.log.Info("bootstrapping time series data success", zap.Duration("took", nowFn().Sub(start))) + span.LogEvent("bootstrap_data_done") alloc := s.opts.ResultOptions().IndexDocumentsBuilderAllocator() segBuilder, err := alloc() @@ -154,6 +162,7 @@ func (s *peersSource) Read( start = nowFn() s.log.Info("bootstrapping index metadata start") + span.LogEvent("bootstrap_index_start") for _, elem := range namespaces.Namespaces.Iter() { namespace := elem.Value() md := namespace.Metadata @@ -186,6 +195,7 @@ func (s *peersSource) Read( } s.log.Info("bootstrapping index metadata success", zap.Duration("took", nowFn().Sub(start))) + span.LogEvent("bootstrap_index_done") return results, nil } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_data_test.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_data_test.go index a86744543a..5000ec7b10 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_data_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_data_test.go @@ -42,6 +42,7 @@ import ( "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3/src/m3ninx/index/segment/fst" "github.com/m3db/m3/src/x/checked" + "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" xtest "github.com/m3db/m3/src/x/test" xtime "github.com/m3db/m3/src/x/time" @@ -186,7 +187,11 @@ func TestPeersSourceReturnsErrorForAdminSession(t *testing.T) { tester := bootstrap.BuildNamespacesTester(t, testDefaultRunOpts, target, nsMetadata) defer tester.Finish() - _, err = src.Read(tester.Namespaces) + + ctx := context.NewContext() + defer ctx.Close() + + _, err = src.Read(ctx, tester.Namespaces) require.Error(t, err) assert.Equal(t, expectedErr, err) tester.EnsureNoLoadedBlocks() diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_test.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_test.go index a9f1e14b9c..02348cb804 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_test.go @@ -32,6 +32,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/topology" tu "github.com/m3db/m3/src/dbnode/topology/testutil" + "github.com/m3db/m3/src/x/context" xtime "github.com/m3db/m3/src/x/time" "github.com/golang/mock/gomock" @@ -199,7 +200,11 @@ func TestPeersSourceReturnsErrorIfUnknownPersistenceFileSetType(t *testing.T) { runOpts := testRunOptsWithPersist.SetPersistConfig(bootstrap.PersistConfig{Enabled: true, FileSetType: 999}) tester := bootstrap.BuildNamespacesTester(t, runOpts, target, testNsMd) defer tester.Finish() - _, err = src.Read(tester.Namespaces) + + ctx := context.NewContext() + defer ctx.Close() + + _, err = src.Read(ctx, tester.Namespaces) require.Error(t, err) require.True(t, strings.Contains(err.Error(), "unknown persist config fileset file type")) tester.EnsureNoLoadedBlocks() diff --git a/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source.go b/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source.go index ce0c83b930..1493f189b1 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source.go @@ -28,6 +28,8 @@ import ( "github.com/m3db/m3/src/dbnode/storage/bootstrap" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/topology" + "github.com/m3db/m3/src/dbnode/tracepoint" + "github.com/m3db/m3/src/x/context" ) // The purpose of the unitializedSource is to succeed bootstraps for any @@ -136,8 +138,12 @@ func (s *uninitializedTopologySource) availability( } func (s *uninitializedTopologySource) Read( + ctx context.Context, namespaces bootstrap.Namespaces, ) (bootstrap.NamespaceResults, error) { + ctx, span, _ := ctx.StartSampledTraceSpan(tracepoint.BootstrapperUninitializedSourceRead) + defer span.Finish() + results := bootstrap.NamespaceResults{ Results: bootstrap.NewNamespaceResultsMap(bootstrap.NamespaceResultsMapOptions{}), } diff --git a/src/dbnode/storage/bootstrap/noop.go b/src/dbnode/storage/bootstrap/noop.go index 297e7a4615..dd928f8701 100644 --- a/src/dbnode/storage/bootstrap/noop.go +++ b/src/dbnode/storage/bootstrap/noop.go @@ -22,6 +22,8 @@ package bootstrap import ( "time" + + "github.com/m3db/m3/src/x/context" ) type noOpBootstrapProcessProvider struct{} @@ -45,6 +47,7 @@ func (b noOpBootstrapProcessProvider) Provide() (Process, error) { type noOpBootstrapProcess struct{} func (b noOpBootstrapProcess) Run( + ctx context.Context, start time.Time, namespaces []ProcessNamespace, ) (NamespaceResults, error) { diff --git a/src/dbnode/storage/bootstrap/process.go b/src/dbnode/storage/bootstrap/process.go index 688d82c2b0..e25cc94ed6 100644 --- a/src/dbnode/storage/bootstrap/process.go +++ b/src/dbnode/storage/bootstrap/process.go @@ -31,8 +31,11 @@ import ( "github.com/m3db/m3/src/dbnode/retention" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/topology" + "github.com/m3db/m3/src/dbnode/tracepoint" + "github.com/m3db/m3/src/x/context" xtime "github.com/m3db/m3/src/x/time" + "github.com/opentracing/opentracing-go/log" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) @@ -151,6 +154,7 @@ type bootstrapProcess struct { } func (b bootstrapProcess) Run( + ctx context.Context, at time.Time, namespaces []ProcessNamespace, ) (NamespaceResults, error) { @@ -209,42 +213,71 @@ func (b bootstrapProcess) Run( namespacesRunFirst, namespacesRunSecond, } { - for _, entry := range namespaces.Namespaces.Iter() { - namespace := entry.Value() - logFields := b.logFields(namespace.Metadata, namespace.Shards, - namespace.DataTargetRange.Range, namespace.IndexTargetRange.Range) - b.logBootstrapRun(logFields) - } - - begin := b.nowFn() - res, err := b.bootstrapper.Bootstrap(namespaces) - took := b.nowFn().Sub(begin) + res, err := b.runPass(ctx, namespaces) if err != nil { - b.log.Error("bootstrap process error", - zap.Duration("took", took), - zap.Error(err)) return NamespaceResults{}, err } - for _, entry := range namespaces.Namespaces.Iter() { - namespace := entry.Value() - nsID := namespace.Metadata.ID() + bootstrapResult = MergeNamespaceResults(bootstrapResult, res) + } - result, ok := res.Results.Get(nsID) - if !ok { - return NamespaceResults{}, - fmt.Errorf("result missing for namespace: %v", nsID.String()) - } + return bootstrapResult, nil +} + +func (b bootstrapProcess) runPass( + ctx context.Context, + namespaces Namespaces, +) (NamespaceResults, error) { + ctx, span, sampled := ctx.StartSampledTraceSpan(tracepoint.BootstrapProcessRun) + defer span.Finish() + + i := 0 + for _, entry := range namespaces.Namespaces.Iter() { + ns := entry.Value() + idx := i + i++ + + if sampled { + ext := fmt.Sprintf("[%d]", idx) + span.LogFields( + log.String("namespace"+ext, ns.Metadata.ID().String()), + log.Int("shards"+ext, len(ns.Shards)), + log.String("dataRange"+ext, ns.DataTargetRange.Range.String()), + log.String("indexRange"+ext, ns.IndexTargetRange.Range.String()), + ) + } + + logFields := b.logFields(ns.Metadata, ns.Shards, + ns.DataTargetRange.Range, ns.IndexTargetRange.Range) + b.logBootstrapRun(logFields) + } + + begin := b.nowFn() + res, err := b.bootstrapper.Bootstrap(ctx, namespaces) + took := b.nowFn().Sub(begin) + if err != nil { + b.log.Error("bootstrap process error", + zap.Duration("took", took), + zap.Error(err)) + return NamespaceResults{}, err + } + + for _, entry := range namespaces.Namespaces.Iter() { + namespace := entry.Value() + nsID := namespace.Metadata.ID() - logFields := b.logFields(namespace.Metadata, namespace.Shards, - namespace.DataTargetRange.Range, namespace.IndexTargetRange.Range) - b.logBootstrapResult(result, logFields, took) + result, ok := res.Results.Get(nsID) + if !ok { + return NamespaceResults{}, + fmt.Errorf("result missing for namespace: %v", nsID.String()) } - bootstrapResult = MergeNamespaceResults(bootstrapResult, res) + logFields := b.logFields(namespace.Metadata, namespace.Shards, + namespace.DataTargetRange.Range, namespace.IndexTargetRange.Range) + b.logBootstrapResult(result, logFields, took) } - return bootstrapResult, nil + return res, nil } func (b bootstrapProcess) logFields( diff --git a/src/dbnode/storage/bootstrap/types.go b/src/dbnode/storage/bootstrap/types.go index 2a42a4643f..d18abb558d 100644 --- a/src/dbnode/storage/bootstrap/types.go +++ b/src/dbnode/storage/bootstrap/types.go @@ -29,6 +29,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/series" "github.com/m3db/m3/src/dbnode/topology" + "github.com/m3db/m3/src/x/context" xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/ident" xtime "github.com/m3db/m3/src/x/time" @@ -54,7 +55,11 @@ type ProcessProvider interface { // with the mindset that it will always be set to default values from the constructor. type Process interface { // Run runs the bootstrap process, returning the bootstrap result and any error encountered. - Run(start time.Time, namespaces []ProcessNamespace) (NamespaceResults, error) + Run( + ctx context.Context, + start time.Time, + namespaces []ProcessNamespace, + ) (NamespaceResults, error) } // ProcessNamespace is a namespace to pass to the bootstrap process. @@ -375,7 +380,7 @@ type Bootstrapper interface { // A bootstrapper should only return an error should it want to entirely // cancel the bootstrapping of the node, i.e. non-recoverable situation // like not being able to read from the filesystem. - Bootstrap(namespaces Namespaces) (NamespaceResults, error) + Bootstrap(ctx context.Context, namespaces Namespaces) (NamespaceResults, error) } // Source represents a bootstrap source. Note that a source can and will be reused so @@ -401,5 +406,5 @@ type Source interface { // A bootstrapper source should only return an error should it want to // entirely cancel the bootstrapping of the node, i.e. non-recoverable // situation like not being able to read from the filesystem. - Read(namespaces Namespaces) (NamespaceResults, error) + Read(ctx context.Context, namespaces Namespaces) (NamespaceResults, error) } diff --git a/src/dbnode/storage/bootstrap/util.go b/src/dbnode/storage/bootstrap/util.go index 9422c8e84a..2fcff46e3e 100644 --- a/src/dbnode/storage/bootstrap/util.go +++ b/src/dbnode/storage/bootstrap/util.go @@ -536,7 +536,9 @@ func (nt *NamespacesTester) ResultForNamespace(id ident.ID) NamespaceResult { // TestBootstrapWith bootstraps the current Namespaces with the // provided bootstrapper. func (nt *NamespacesTester) TestBootstrapWith(b Bootstrapper) { - res, err := b.Bootstrap(nt.Namespaces) + ctx := context.NewContext() + defer ctx.Close() + res, err := b.Bootstrap(ctx, nt.Namespaces) assert.NoError(nt.t, err) nt.Results = res } @@ -544,7 +546,9 @@ func (nt *NamespacesTester) TestBootstrapWith(b Bootstrapper) { // TestReadWith reads the current Namespaces with the // provided bootstrap source. func (nt *NamespacesTester) TestReadWith(s Source) { - res, err := s.Read(nt.Namespaces) + ctx := context.NewContext() + defer ctx.Close() + res, err := s.Read(ctx, nt.Namespaces) require.NoError(nt.t, err) nt.Results = res } diff --git a/src/dbnode/storage/bootstrap_test.go b/src/dbnode/storage/bootstrap_test.go index 8bb2aa7158..0ccc5f448b 100644 --- a/src/dbnode/storage/bootstrap_test.go +++ b/src/dbnode/storage/bootstrap_test.go @@ -28,6 +28,7 @@ import ( "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/storage/bootstrap" + "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" "github.com/golang/mock/gomock" @@ -65,13 +66,13 @@ func TestDatabaseBootstrapWithBootstrapError(t *testing.T) { bsm.sleepFn = func(time.Duration) {} gomock.InOrder( - ns.EXPECT().PrepareBootstrap().Return([]databaseShard{}, nil), + ns.EXPECT().PrepareBootstrap(gomock.Any()).Return([]databaseShard{}, nil), ns.EXPECT().Metadata().Return(meta), ns.EXPECT().ID().Return(id), ns.EXPECT(). - Bootstrap(gomock.Any()). + Bootstrap(gomock.Any(), gomock.Any()). Return(fmt.Errorf("an error")). - Do(func(bootstrapResult bootstrap.NamespaceResult) { + Do(func(ctx context.Context, bootstrapResult bootstrap.NamespaceResult) { // After returning an error, make sure we don't re-enqueue. bsm.bootstrapFn = func() error { return nil @@ -79,6 +80,9 @@ func TestDatabaseBootstrapWithBootstrapError(t *testing.T) { }), ) + ctx := context.NewContext() + defer ctx.Close() + result, err := bsm.Bootstrap() require.NoError(t, err) @@ -110,13 +114,13 @@ func TestDatabaseBootstrapSubsequentCallsQueued(t *testing.T) { var wg sync.WaitGroup wg.Add(1) - ns.EXPECT().PrepareBootstrap().Return([]databaseShard{}, nil).AnyTimes() + ns.EXPECT().PrepareBootstrap(gomock.Any()).Return([]databaseShard{}, nil).AnyTimes() ns.EXPECT().Metadata().Return(meta).AnyTimes() ns.EXPECT(). - Bootstrap(gomock.Any()). + Bootstrap(gomock.Any(), gomock.Any()). Return(nil). - Do(func(arg0 interface{}) { + Do(func(arg0, arg1 interface{}) { defer wg.Done() // Enqueue the second bootstrap @@ -129,7 +133,7 @@ func TestDatabaseBootstrapSubsequentCallsQueued(t *testing.T) { bsm.RUnlock() // Expect the second bootstrap call - ns.EXPECT().Bootstrap(gomock.Any()).Return(nil) + ns.EXPECT().Bootstrap(gomock.Any(), gomock.Any()).Return(nil) }) ns.EXPECT(). ID(). diff --git a/src/dbnode/storage/namespace.go b/src/dbnode/storage/namespace.go index 318c58c64c..03c13a10ed 100644 --- a/src/dbnode/storage/namespace.go +++ b/src/dbnode/storage/namespace.go @@ -760,7 +760,14 @@ func (n *dbNamespace) AggregateQuery( return res, err } -func (n *dbNamespace) PrepareBootstrap() ([]databaseShard, error) { +func (n *dbNamespace) PrepareBootstrap(ctx context.Context) ([]databaseShard, error) { + ctx, span, sampled := ctx.StartSampledTraceSpan(tracepoint.NSPrepareBootstrap) + defer span.Finish() + + if sampled { + span.LogFields(opentracinglog.String("namespace", n.id.String())) + } + var ( wg sync.WaitGroup multiErrLock sync.Mutex @@ -773,7 +780,7 @@ func (n *dbNamespace) PrepareBootstrap() ([]databaseShard, error) { go func() { defer wg.Done() - err := shard.PrepareBootstrap() + err := shard.PrepareBootstrap(ctx) if err != nil { multiErrLock.Lock() multiErr = multiErr.Add(err) @@ -847,8 +854,16 @@ func (n *dbNamespace) FetchBlocksMetadataV2( } func (n *dbNamespace) Bootstrap( + ctx context.Context, bootstrapResult bootstrap.NamespaceResult, ) error { + ctx, span, sampled := ctx.StartSampledTraceSpan(tracepoint.NSBootstrap) + defer span.Finish() + + if sampled { + span.LogFields(opentracinglog.String("namespace", n.id.String())) + } + callStart := n.nowFn() n.Lock() @@ -923,7 +938,7 @@ func (n *dbNamespace) Bootstrap( wg.Add(1) shard := shard workers.Go(func() { - err := shard.Bootstrap() + err := shard.Bootstrap(ctx) mutex.Lock() multiErr = multiErr.Add(err) diff --git a/src/dbnode/storage/namespace_test.go b/src/dbnode/storage/namespace_test.go index 6065a589c5..b851b06505 100644 --- a/src/dbnode/storage/namespace_test.go +++ b/src/dbnode/storage/namespace_test.go @@ -323,16 +323,25 @@ func TestNamespaceFetchBlocksShardOwned(t *testing.T) { func TestNamespaceBootstrapBootstrapping(t *testing.T) { ns, closer := newTestNamespace(t) defer closer() + ns.bootstrapState = Bootstrapping - require.Equal(t, - errNamespaceIsBootstrapping, ns.Bootstrap(bootstrap.NamespaceResult{})) + + ctx := context.NewContext() + defer ctx.Close() + + err := ns.Bootstrap(ctx, bootstrap.NamespaceResult{}) + require.Equal(t, errNamespaceIsBootstrapping, err) } func TestNamespaceBootstrapDontNeedBootstrap(t *testing.T) { ns, closer := newTestNamespaceWithIDOpts(t, defaultTestNs1ID, namespace.NewOptions().SetBootstrapEnabled(false)) defer closer() - require.NoError(t, ns.Bootstrap(bootstrap.NamespaceResult{})) + + ctx := context.NewContext() + defer ctx.Close() + + require.NoError(t, ns.Bootstrap(ctx, bootstrap.NamespaceResult{})) require.Equal(t, Bootstrapped, ns.bootstrapState) } @@ -350,7 +359,7 @@ func TestNamespaceBootstrapAllShards(t *testing.T) { shard := NewMockdatabaseShard(ctrl) shard.EXPECT().IsBootstrapped().Return(false) shard.EXPECT().ID().Return(shardID) - shard.EXPECT().Bootstrap().Return(errs[i]) + shard.EXPECT().Bootstrap(gomock.Any()).Return(errs[i]) ns.shards[testShardIDs[i].ID()] = shard shardIDs = append(shardIDs, shardID) } @@ -360,7 +369,10 @@ func TestNamespaceBootstrapAllShards(t *testing.T) { Shards: shardIDs, } - require.Equal(t, "foo", ns.Bootstrap(nsResult).Error()) + ctx := context.NewContext() + defer ctx.Close() + + require.Equal(t, "foo", ns.Bootstrap(ctx, nsResult).Error()) require.Equal(t, BootstrapNotStarted, ns.bootstrapState) } @@ -392,7 +404,7 @@ func TestNamespaceBootstrapOnlyNonBootstrappedShards(t *testing.T) { shard := NewMockdatabaseShard(ctrl) shard.EXPECT().IsBootstrapped().Return(false) shard.EXPECT().ID().Return(testShard.ID()) - shard.EXPECT().Bootstrap().Return(nil) + shard.EXPECT().Bootstrap(gomock.Any()).Return(nil) ns.shards[testShard.ID()] = shard shardIDs = append(shardIDs, testShard.ID()) } @@ -410,7 +422,10 @@ func TestNamespaceBootstrapOnlyNonBootstrappedShards(t *testing.T) { Shards: shardIDs, } - require.Error(t, ns.Bootstrap(nsResult)) + ctx := context.NewContext() + defer ctx.Close() + + require.Error(t, ns.Bootstrap(ctx, nsResult)) require.Equal(t, BootstrapNotStarted, ns.bootstrapState) } @@ -1181,15 +1196,19 @@ func TestNamespaceTicksIndex(t *testing.T) { idx := NewMockNamespaceIndex(ctrl) ns, closer := newTestNamespaceWithIndex(t, idx) defer closer() + + ctx := context.NewContext() + defer ctx.Close() + for _, s := range ns.shards { if s != nil { - s.Bootstrap() + s.Bootstrap(ctx) } } - ctx := context.NewCancellable() - idx.EXPECT().Tick(ctx, gomock.Any()).Return(namespaceIndexTickResult{}, nil) - err := ns.Tick(ctx, time.Now()) + cancel := context.NewCancellable() + idx.EXPECT().Tick(cancel, gomock.Any()).Return(namespaceIndexTickResult{}, nil) + err := ns.Tick(cancel, time.Now()) require.NoError(t, err) } diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index 5e5bf1564b..c9d03bff87 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -43,6 +43,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/repair" "github.com/m3db/m3/src/dbnode/storage/series" "github.com/m3db/m3/src/dbnode/storage/series/lookup" + "github.com/m3db/m3/src/dbnode/tracepoint" "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3/src/m3ninx/doc" @@ -55,6 +56,7 @@ import ( xtime "github.com/m3db/m3/src/x/time" "github.com/gogo/protobuf/proto" + "github.com/opentracing/opentracing-go/log" "github.com/uber-go/tally" "go.uber.org/zap" ) @@ -1851,7 +1853,14 @@ func (s *dbShard) FetchBlocksMetadataV2( return result, nil, nil } -func (s *dbShard) PrepareBootstrap() error { +func (s *dbShard) PrepareBootstrap(ctx context.Context) error { + ctx, span, sampled := ctx.StartSampledTraceSpan(tracepoint.ShardPrepareBootstrap) + defer span.Finish() + + if sampled { + span.LogFields(log.Int("shard", int(s.shard))) + } + // Iterate flushed time ranges to determine which blocks are retrievable. // NB(r): This must be done before bootstrap since during bootstrapping // series will load blocks into series with series.LoadBlock(...) which @@ -1915,7 +1924,14 @@ func (s *dbShard) UpdateFlushStates() { } } -func (s *dbShard) Bootstrap() error { +func (s *dbShard) Bootstrap(ctx context.Context) error { + ctx, span, sampled := ctx.StartSampledTraceSpan(tracepoint.ShardBootstrap) + defer span.Finish() + + if sampled { + span.LogFields(log.Int("shard", int(s.shard))) + } + s.Lock() if s.bootstrapState == Bootstrapped { s.Unlock() @@ -1931,7 +1947,7 @@ func (s *dbShard) Bootstrap() error { multiErr := xerrors.NewMultiError() // Initialize the flush states if we haven't called prepare bootstrap. - if err := s.PrepareBootstrap(); err != nil { + if err := s.PrepareBootstrap(ctx); err != nil { multiErr = multiErr.Add(err) } diff --git a/src/dbnode/storage/shard_race_prop_test.go b/src/dbnode/storage/shard_race_prop_test.go index 29ad1002cf..b6d33f2dee 100644 --- a/src/dbnode/storage/shard_race_prop_test.go +++ b/src/dbnode/storage/shard_race_prop_test.go @@ -273,7 +273,10 @@ func TestShardTickBootstrapWriteRace(t *testing.T) { wg.Done() } - assert.NoError(t, shard.Bootstrap()) + ctx := context.NewContext() + defer ctx.Close() + + assert.NoError(t, shard.Bootstrap(ctx)) for _, id := range writeIDs { id := id go func() { diff --git a/src/dbnode/storage/shard_test.go b/src/dbnode/storage/shard_test.go index 094b9d34e7..3d0f4ba240 100644 --- a/src/dbnode/storage/shard_test.go +++ b/src/dbnode/storage/shard_test.go @@ -127,8 +127,12 @@ func TestShardBootstrapState(t *testing.T) { opts := DefaultTestOptions() s := testDatabaseShard(t, opts) defer s.Close() - require.NoError(t, s.Bootstrap()) - require.Error(t, s.Bootstrap()) + + ctx := context.NewContext() + defer ctx.Close() + + require.NoError(t, s.Bootstrap(ctx)) + require.Error(t, s.Bootstrap(ctx)) } func TestShardFlushStateNotStarted(t *testing.T) { @@ -154,7 +158,11 @@ func TestShardFlushStateNotStarted(t *testing.T) { s := testDatabaseShard(t, opts) defer s.Close() - s.Bootstrap() + + ctx := context.NewContext() + defer ctx.Close() + + s.Bootstrap(ctx) notStarted := fileOpState{WarmStatus: fileOpNotStarted} for st := earliest; !st.After(latest); st = st.Add(ropts.BlockSize()) { @@ -221,7 +229,10 @@ func TestShardBootstrapWithFlushVersion(t *testing.T) { require.NoError(t, writer.Close()) } - err = s.Bootstrap() + ctx := context.NewContext() + defer ctx.Close() + + err = s.Bootstrap(ctx) require.NoError(t, err) require.Equal(t, Bootstrapped, s.bootstrapState) @@ -282,7 +293,10 @@ func TestShardBootstrapWithFlushVersionNoCleanUp(t *testing.T) { require.NoError(t, writer.Close()) } - err = s.Bootstrap() + ctx := context.NewContext() + defer ctx.Close() + + err = s.Bootstrap(ctx) require.NoError(t, err) require.Equal(t, Bootstrapped, s.bootstrapState) @@ -321,7 +335,10 @@ func TestShardBootstrapWithCacheShardIndices(t *testing.T) { mockRetriever.EXPECT().CacheShardIndices([]uint32{s.ID()}).Return(nil) mockRetrieverMgr.EXPECT().Retriever(s.namespace).Return(mockRetriever, nil) - err = s.Bootstrap() + ctx := context.NewContext() + defer ctx.Close() + + err = s.Bootstrap(ctx) require.NoError(t, err) require.Equal(t, Bootstrapped, s.bootstrapState) } @@ -368,7 +385,11 @@ func testShardLoadLimit(t *testing.T, limit int64, shouldReturnError bool) { sr.AddBlock(ident.StringID("bar"), barTags, blocks[1]) seriesMap := sr.AllSeries() - require.NoError(t, s.Bootstrap()) + + ctx := context.NewContext() + defer ctx.Close() + + require.NoError(t, s.Bootstrap(ctx)) // First load will never trigger the limit. require.NoError(t, s.LoadBlocks(seriesMap)) @@ -388,7 +409,12 @@ func TestShardFlushSeriesFlushError(t *testing.T) { s := testDatabaseShard(t, DefaultTestOptions()) defer s.Close() - s.Bootstrap() + + ctx := context.NewContext() + defer ctx.Close() + + s.Bootstrap(ctx) + s.flushState.statesByTime[xtime.ToUnixNano(blockStart)] = fileOpState{ WarmStatus: fileOpFailed, NumFailures: 1, @@ -457,9 +483,15 @@ func TestShardFlushSeriesFlushSuccess(t *testing.T) { } opts := DefaultTestOptions() opts = opts.SetClockOptions(opts.ClockOptions().SetNowFn(nowFn)) + s := testDatabaseShard(t, opts) defer s.Close() - s.Bootstrap() + + ctx := context.NewContext() + defer ctx.Close() + + s.Bootstrap(ctx) + s.flushState.statesByTime[xtime.ToUnixNano(blockStart)] = fileOpState{ WarmStatus: fileOpFailed, NumFailures: 1, @@ -548,7 +580,11 @@ func TestShardColdFlush(t *testing.T) { blockSize := opts.SeriesOptions().RetentionOptions().BlockSize() shard := testDatabaseShard(t, opts) - require.NoError(t, shard.Bootstrap()) + + ctx := context.NewContext() + defer ctx.Close() + + require.NoError(t, shard.Bootstrap(ctx)) shard.newMergerFn = newMergerTestFn shard.newFSMergeWithMemFn = newFSMergeWithMemTestFn @@ -628,7 +664,12 @@ func TestShardColdFlushNoMergeIfNothingDirty(t *testing.T) { opts = opts.SetClockOptions(opts.ClockOptions().SetNowFn(nowFn)) blockSize := opts.SeriesOptions().RetentionOptions().BlockSize() shard := testDatabaseShard(t, opts) - require.NoError(t, shard.Bootstrap()) + + ctx := context.NewContext() + defer ctx.Close() + + require.NoError(t, shard.Bootstrap(ctx)) + shard.newMergerFn = newMergerTestFn shard.newFSMergeWithMemFn = newFSMergeWithMemTestFn @@ -882,8 +923,11 @@ func TestShardTick(t *testing.T) { sleepPerSeries := time.Microsecond + ctx := context.NewContext() + defer ctx.Close() + shard := testDatabaseShard(t, opts) - shard.Bootstrap() + shard.Bootstrap(ctx) shard.SetRuntimeOptions(runtime.NewOptions(). SetTickPerSeriesSleepDuration(sleepPerSeries). SetTickSeriesBatchSize(1)) @@ -907,9 +951,6 @@ func TestShardTick(t *testing.T) { setNow(nowFn().Add(t)) } - ctx := context.NewContext() - defer ctx.Close() - writeShardAndVerify(ctx, t, shard, "foo", nowFn(), 1.0, true, 0) // same time, different value should write writeShardAndVerify(ctx, t, shard, "foo", nowFn(), 2.0, true, 0) @@ -1051,8 +1092,11 @@ func testShardWriteAsync(t *testing.T, writes []testWrite) { sleepPerSeries := time.Microsecond + ctx := context.NewContext() + defer ctx.Close() + shard := testDatabaseShard(t, opts) - shard.Bootstrap() + shard.Bootstrap(ctx) shard.SetRuntimeOptions(runtime.NewOptions(). SetWriteNewSeriesAsync(true). SetTickPerSeriesSleepDuration(sleepPerSeries). @@ -1077,9 +1121,6 @@ func testShardWriteAsync(t *testing.T, writes []testWrite) { setNow(nowFn().Add(t)) } - ctx := context.NewContext() - defer ctx.Close() - for _, write := range writes { shard.Write(ctx, ident.StringID(write.id), nowFn(), write.value, write.unit, write.annotation, series.WriteOptions{}) } @@ -1109,7 +1150,11 @@ func TestShardTickRace(t *testing.T) { opts := DefaultTestOptions() shard := testDatabaseShard(t, opts) defer shard.Close() - shard.Bootstrap() + + ctx := context.NewContext() + defer ctx.Close() + + shard.Bootstrap(ctx) addTestSeries(shard, ident.StringID("foo")) var wg sync.WaitGroup @@ -1137,8 +1182,13 @@ func TestShardTickRace(t *testing.T) { // we had while trying to purge as a concurrent read. func TestShardTickCleanupSmallBatchSize(t *testing.T) { opts := DefaultTestOptions() + + ctx := context.NewContext() + defer ctx.Close() + shard := testDatabaseShard(t, opts) - shard.Bootstrap() + shard.Bootstrap(ctx) + addTestSeries(shard, ident.StringID("foo")) shard.Tick(context.NewNoOpCanncellable(), time.Now(), namespace.Context{}) require.Equal(t, 0, shard.lookup.Len()) @@ -1160,8 +1210,11 @@ func TestShardReturnsErrorForConcurrentTicks(t *testing.T) { SetCommitLogOptions(opts.CommitLogOptions(). SetFilesystemOptions(fsOpts)) + ctx := context.NewContext() + defer ctx.Close() + shard := testDatabaseShard(t, opts) - shard.Bootstrap() + shard.Bootstrap(ctx) shard.currRuntimeOptions.tickSleepSeriesBatchSize = 1 shard.currRuntimeOptions.tickSleepPerSeries = time.Millisecond @@ -1512,7 +1565,11 @@ func TestShardReadEncodedCachesSeriesWithRecentlyReadPolicy(t *testing.T) { shard := testDatabaseShard(t, opts) defer shard.Close() - require.NoError(t, shard.Bootstrap()) + + ctx := context.NewContext() + defer ctx.Close() + + require.NoError(t, shard.Bootstrap(ctx)) ropts := shard.seriesOpts.RetentionOptions() end := opts.ClockOptions().NowFn()().Truncate(ropts.BlockSize()) @@ -1538,9 +1595,6 @@ func TestShardReadEncodedCachesSeriesWithRecentlyReadPolicy(t *testing.T) { blockReaders = append(blockReaders, block) } - ctx := opts.ContextPool().Get() - defer ctx.Close() - mid := start.Add(ropts.BlockSize()) retriever.EXPECT(). diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index 045905eec8..0e983f71c9 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -1346,32 +1346,32 @@ func (mr *MockdatabaseNamespaceMockRecorder) FetchBlocksMetadataV2(ctx, shardID, } // PrepareBootstrap mocks base method -func (m *MockdatabaseNamespace) PrepareBootstrap() ([]databaseShard, error) { +func (m *MockdatabaseNamespace) PrepareBootstrap(ctx context.Context) ([]databaseShard, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PrepareBootstrap") + ret := m.ctrl.Call(m, "PrepareBootstrap", ctx) ret0, _ := ret[0].([]databaseShard) ret1, _ := ret[1].(error) return ret0, ret1 } // PrepareBootstrap indicates an expected call of PrepareBootstrap -func (mr *MockdatabaseNamespaceMockRecorder) PrepareBootstrap() *gomock.Call { +func (mr *MockdatabaseNamespaceMockRecorder) PrepareBootstrap(ctx interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PrepareBootstrap", reflect.TypeOf((*MockdatabaseNamespace)(nil).PrepareBootstrap)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PrepareBootstrap", reflect.TypeOf((*MockdatabaseNamespace)(nil).PrepareBootstrap), ctx) } // Bootstrap mocks base method -func (m *MockdatabaseNamespace) Bootstrap(bootstrapResult bootstrap.NamespaceResult) error { +func (m *MockdatabaseNamespace) Bootstrap(ctx context.Context, bootstrapResult bootstrap.NamespaceResult) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Bootstrap", bootstrapResult) + ret := m.ctrl.Call(m, "Bootstrap", ctx, bootstrapResult) ret0, _ := ret[0].(error) return ret0 } // Bootstrap indicates an expected call of Bootstrap -func (mr *MockdatabaseNamespaceMockRecorder) Bootstrap(bootstrapResult interface{}) *gomock.Call { +func (mr *MockdatabaseNamespaceMockRecorder) Bootstrap(ctx, bootstrapResult interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Bootstrap", reflect.TypeOf((*MockdatabaseNamespace)(nil).Bootstrap), bootstrapResult) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Bootstrap", reflect.TypeOf((*MockdatabaseNamespace)(nil).Bootstrap), ctx, bootstrapResult) } // WarmFlush mocks base method @@ -1812,31 +1812,31 @@ func (mr *MockdatabaseShardMockRecorder) FetchBlocksMetadataV2(ctx, start, end, } // PrepareBootstrap mocks base method -func (m *MockdatabaseShard) PrepareBootstrap() error { +func (m *MockdatabaseShard) PrepareBootstrap(ctx context.Context) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PrepareBootstrap") + ret := m.ctrl.Call(m, "PrepareBootstrap", ctx) ret0, _ := ret[0].(error) return ret0 } // PrepareBootstrap indicates an expected call of PrepareBootstrap -func (mr *MockdatabaseShardMockRecorder) PrepareBootstrap() *gomock.Call { +func (mr *MockdatabaseShardMockRecorder) PrepareBootstrap(ctx interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PrepareBootstrap", reflect.TypeOf((*MockdatabaseShard)(nil).PrepareBootstrap)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PrepareBootstrap", reflect.TypeOf((*MockdatabaseShard)(nil).PrepareBootstrap), ctx) } // Bootstrap mocks base method -func (m *MockdatabaseShard) Bootstrap() error { +func (m *MockdatabaseShard) Bootstrap(ctx context.Context) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Bootstrap") + ret := m.ctrl.Call(m, "Bootstrap", ctx) ret0, _ := ret[0].(error) return ret0 } // Bootstrap indicates an expected call of Bootstrap -func (mr *MockdatabaseShardMockRecorder) Bootstrap() *gomock.Call { +func (mr *MockdatabaseShardMockRecorder) Bootstrap(ctx interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Bootstrap", reflect.TypeOf((*MockdatabaseShard)(nil).Bootstrap)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Bootstrap", reflect.TypeOf((*MockdatabaseShard)(nil).Bootstrap), ctx) } // UpdateFlushStates mocks base method diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index 5ce4052244..2aabe3d35a 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -345,10 +345,10 @@ type databaseNamespace interface { // PrepareBootstrap prepares the namespace for bootstrapping by ensuring // it's shards know which flushed files reside on disk, so that calls // to series.LoadBlock(...) will succeed. - PrepareBootstrap() ([]databaseShard, error) + PrepareBootstrap(ctx context.Context) ([]databaseShard, error) // Bootstrap marks shards as bootstrapped for the namespace. - Bootstrap(bootstrapResult bootstrap.NamespaceResult) error + Bootstrap(ctx context.Context, bootstrapResult bootstrap.NamespaceResult) error // WarmFlush flushes in-memory WarmWrites. WarmFlush(blockStart time.Time, flush persist.FlushPreparer) error @@ -499,11 +499,11 @@ type databaseShard interface { // PrepareBootstrap prepares the shard for bootstrapping by ensuring // it knows which flushed files reside on disk. - PrepareBootstrap() error + PrepareBootstrap(ctx context.Context) error // Bootstrap bootstraps the shard after all provided data // has been loaded using LoadBootstrapBlocks. - Bootstrap() error + Bootstrap(ctx context.Context) error // UpdateFlushStates updates all the flush states for the current shard // by checking the file volumes that exist on disk at a point in time. diff --git a/src/dbnode/tracepoint/tracepoint.go b/src/dbnode/tracepoint/tracepoint.go index c1f6461d2a..97d2410eda 100644 --- a/src/dbnode/tracepoint/tracepoint.go +++ b/src/dbnode/tracepoint/tracepoint.go @@ -67,6 +67,18 @@ const ( // NSQueryIDs is the operation name for the dbNamespace QueryIDs path. NSQueryIDs = "storage.dbNamespace.QueryIDs" + // NSPrepareBootstrap is the operation name for the dbNamespace PrepareBootstrap path. + NSPrepareBootstrap = "storage.dbNamespace.PrepareBootstrap" + + // NSBootstrap is the operation name for the dbNamespace Bootstrap path. + NSBootstrap = "storage.dbNamespace.Bootstrap" + + // ShardPrepareBootstrap is the operation name for the dbShard PrepareBootstrap path. + ShardPrepareBootstrap = "storage.dbShard.PrepareBootstrap" + + // ShardBootstrap is the operation name for the dbShard Bootstrap path. + ShardBootstrap = "storage.dbShard.Bootstrap" + // NSIdxQuery is the operation name for the nsIndex Query path. NSIdxQuery = "storage.nsIndex.Query" @@ -87,4 +99,19 @@ const ( // BlockAggregate is the operation name for the index block aggregate path. BlockAggregate = "storage/index.block.Aggregate" + + // BootstrapProcessRun is the operation name for the bootstrap process Run path. + BootstrapProcessRun = "bootstrap.bootstrapProcess.Run" + + // BootstrapperUninitializedSourceRead is the operation for the uninitializedTopologySource Read path. + BootstrapperUninitializedSourceRead = "bootstrapper.uninitialized.uninitializedTopologySource.Read" + + // BootstrapperCommitLogSourceRead is the operation for the commit log Read path. + BootstrapperCommitLogSourceRead = "bootstrapper.commitlog.commitLogSource.Read" + + // BootstrapperPeersSourceRead is the operation for the peers Read path. + BootstrapperPeersSourceRead = "bootstrapper.peers.peersSource.Read" + + // BootstrapperFilesystemSourceRead is the operation for the peers Read path. + BootstrapperFilesystemSourceRead = "bootstrapper.fs.filesystemSource.Read" )