Skip to content

Commit

Permalink
[dbnode] Add tracing for bootstrap process (#2216)
Browse files Browse the repository at this point in the history
  • Loading branch information
robskillington authored Apr 27, 2020
1 parent 7b4303b commit 542d128
Show file tree
Hide file tree
Showing 32 changed files with 406 additions and 142 deletions.
4 changes: 2 additions & 2 deletions integrations/grafana/m3db_dashboard.json
Original file line number Diff line number Diff line change
Expand Up @@ -5126,11 +5126,11 @@
"steppedLine": false,
"targets": [
{
"expr": "sum(build_information{}) by (revision)",
"expr": "sum(build_information{}) by (build_version, revision)",
"format": "time_series",
"intervalFactor": 1,
"key": 0.5783520603949805,
"legendFormat": "{{revision}}",
"legendFormat": "{{build_version}} ({{revision}})",
"refId": "A"
}
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper"
bcl "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/commitlog"
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/x/context"
"github.com/m3db/m3/src/x/ident"
xtime "github.com/m3db/m3/src/x/time"

Expand Down Expand Up @@ -122,6 +123,7 @@ func TestBootstrapAfterBufferRotation(t *testing.T) {

test := newTestBootstrapperSource(testBootstrapperSourceOptions{
read: func(
ctx context.Context,
namespaces bootstrap.Namespaces,
) (bootstrap.NamespaceResults, error) {
<-signalCh
Expand All @@ -131,7 +133,7 @@ func TestBootstrapAfterBufferRotation(t *testing.T) {
if err != nil {
return bootstrap.NamespaceResults{}, err
}
return bs.Bootstrap(namespaces)
return bs.Bootstrap(ctx, namespaces)
},
}, bootstrapOpts, bs)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper"
bcl "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/commitlog"
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/x/context"
"github.com/m3db/m3/src/x/ident"
xtime "github.com/m3db/m3/src/x/time"

Expand Down Expand Up @@ -136,6 +137,7 @@ func TestBootstrapBeforeBufferRotationNoTick(t *testing.T) {

test := newTestBootstrapperSource(testBootstrapperSourceOptions{
read: func(
ctx context.Context,
namespaces bootstrap.Namespaces,
) (bootstrap.NamespaceResults, error) {
<-signalCh
Expand All @@ -145,7 +147,7 @@ func TestBootstrapBeforeBufferRotationNoTick(t *testing.T) {
if err != nil {
return bootstrap.NamespaceResults{}, err
}
return bs.Bootstrap(namespaces)
return bs.Bootstrap(ctx, namespaces)
},
}, bootstrapOpts, bs)

Expand Down
10 changes: 6 additions & 4 deletions src/dbnode/integration/bootstrap_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper"
bcl "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/commitlog"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/result"
"github.com/m3db/m3/src/x/context"

"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -63,7 +64,7 @@ func newTestBootstrapperSource(
if opts.read != nil {
src.read = opts.read
} else {
src.read = func(namespaces bootstrap.Namespaces) (bootstrap.NamespaceResults, error) {
src.read = func(ctx context.Context, namespaces bootstrap.Namespaces) (bootstrap.NamespaceResults, error) {
return bootstrap.NewNamespaceResults(namespaces), nil
}
}
Expand Down Expand Up @@ -100,15 +101,15 @@ type testBootstrapper struct {
type testBootstrapperSourceOptions struct {
availableData func(namespace.Metadata, result.ShardTimeRanges, bootstrap.RunOptions) (result.ShardTimeRanges, error)
availableIndex func(namespace.Metadata, result.ShardTimeRanges, bootstrap.RunOptions) (result.ShardTimeRanges, error)
read func(namespaces bootstrap.Namespaces) (bootstrap.NamespaceResults, error)
read func(ctx context.Context, namespaces bootstrap.Namespaces) (bootstrap.NamespaceResults, error)
}

var _ bootstrap.Source = &testBootstrapperSource{}

type testBootstrapperSource struct {
availableData func(namespace.Metadata, result.ShardTimeRanges, bootstrap.RunOptions) (result.ShardTimeRanges, error)
availableIndex func(namespace.Metadata, result.ShardTimeRanges, bootstrap.RunOptions) (result.ShardTimeRanges, error)
read func(namespaces bootstrap.Namespaces) (bootstrap.NamespaceResults, error)
read func(ctx context.Context, namespaces bootstrap.Namespaces) (bootstrap.NamespaceResults, error)
}

func (t testBootstrapperSource) AvailableData(
Expand All @@ -128,9 +129,10 @@ func (t testBootstrapperSource) AvailableIndex(
}

func (t testBootstrapperSource) Read(
ctx context.Context,
namespaces bootstrap.Namespaces,
) (bootstrap.NamespaceResults, error) {
return t.read(namespaces)
return t.read(ctx, namespaces)
}

func (t testBootstrapperSource) String() string {
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/persist/fs/persist_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func NewPersistManager(opts Options) (persist.Manager, error) {

func (pm *persistManager) reset() {
pm.status = persistManagerIdle
pm.start = timeZero
pm.start = timeZero
pm.count = 0
pm.bytesWritten = 0
pm.worked = 0
Expand Down
10 changes: 7 additions & 3 deletions src/dbnode/storage/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/m3db/m3/src/dbnode/clock"
"github.com/m3db/m3/src/dbnode/storage/bootstrap"
"github.com/m3db/m3/src/x/context"
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/instrument"

Expand Down Expand Up @@ -215,6 +216,9 @@ type bootstrapNamespace struct {
}

func (m *bootstrapManager) bootstrap() error {
ctx := context.NewContext()
defer ctx.Close()

// NB(r): construct new instance of the bootstrap process to avoid
// state being kept around by bootstrappers.
process, err := m.processProvider.Provide()
Expand Down Expand Up @@ -256,7 +260,7 @@ func (m *bootstrapManager) bootstrap() error {
i, namespace := i, namespace
prepareWg.Add(1)
go func() {
shards, err := namespace.PrepareBootstrap()
shards, err := namespace.PrepareBootstrap(ctx)

prepareLock.Lock()
defer func() {
Expand Down Expand Up @@ -338,7 +342,7 @@ func (m *bootstrapManager) bootstrap() error {
m.log.Info("bootstrap started", logFields...)

// Run the bootstrap.
bootstrapResult, err := process.Run(start, targets)
bootstrapResult, err := process.Run(ctx, start, targets)

bootstrapDuration := m.nowFn().Sub(start)
m.bootstrapDuration.Record(bootstrapDuration)
Expand Down Expand Up @@ -369,7 +373,7 @@ func (m *bootstrapManager) bootstrap() error {
return err
}

if err := namespace.Bootstrap(result); err != nil {
if err := namespace.Bootstrap(ctx, result); err != nil {
m.log.Info("bootstrap error", append(logFields, []zapcore.Field{
zap.String("namespace", id.String()),
zap.Error(err),
Expand Down
25 changes: 13 additions & 12 deletions src/dbnode/storage/bootstrap/bootstrap_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions src/dbnode/storage/bootstrap/bootstrapper/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/m3db/m3/src/dbnode/storage/bootstrap"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/result"
"github.com/m3db/m3/src/x/context"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -75,6 +76,7 @@ func (b baseBootstrapper) String() string {
}

func (b baseBootstrapper) Bootstrap(
ctx context.Context,
namespaces bootstrap.Namespaces,
) (bootstrap.NamespaceResults, error) {
logFields := []zapcore.Field{
Expand Down Expand Up @@ -135,7 +137,7 @@ func (b baseBootstrapper) Bootstrap(
b.log.Info("bootstrap from source started", logFields...)

// Run the bootstrap source.
currResults, err := b.src.Read(curr)
currResults, err := b.src.Read(ctx, curr)

logFields = append(logFields, zap.Duration("took", nowFn().Sub(begin)))
if err != nil {
Expand Down Expand Up @@ -164,7 +166,7 @@ func (b baseBootstrapper) Bootstrap(
// If there are some time ranges the current bootstrapper could not fulfill,
// that we can attempt then pass it along to the next bootstrapper.
if next.Namespaces.Len() > 0 {
nextResults, err := b.next.Bootstrap(next)
nextResults, err := b.next.Bootstrap(ctx, next)
if err != nil {
return bootstrap.NamespaceResults{}, err
}
Expand Down
25 changes: 17 additions & 8 deletions src/dbnode/storage/bootstrap/bootstrapper/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/storage/bootstrap"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/result"
"github.com/m3db/m3/src/x/context"
"github.com/m3db/m3/src/x/ident"
xtime "github.com/m3db/m3/src/x/time"

Expand Down Expand Up @@ -146,8 +147,12 @@ func testBaseBootstrapperEmptyRange(t *testing.T, withIndex bool) {
defer tester.Finish()

matcher := bootstrap.NamespaceMatcher{Namespaces: tester.Namespaces}
src.EXPECT().Read(matcher).DoAndReturn(
func(namespaces bootstrap.Namespaces) (bootstrap.NamespaceResults, error) {
src.EXPECT().
Read(gomock.Any(), matcher).
DoAndReturn(func(
ctx context.Context,
namespaces bootstrap.Namespaces,
) (bootstrap.NamespaceResults, error) {
return nsResults, nil
})

Expand Down Expand Up @@ -190,8 +195,12 @@ func testBaseBootstrapperCurrentNoUnfulfilled(t *testing.T, withIndex bool) {
defer tester.Finish()

matcher := bootstrap.NamespaceMatcher{Namespaces: tester.Namespaces}
src.EXPECT().Read(matcher).DoAndReturn(
func(namespaces bootstrap.Namespaces) (bootstrap.NamespaceResults, error) {
src.EXPECT().
Read(gomock.Any(), matcher).
DoAndReturn(func(
ctx context.Context,
namespaces bootstrap.Namespaces,
) (bootstrap.NamespaceResults, error) {
return nsResults, nil
})

Expand Down Expand Up @@ -236,8 +245,8 @@ func testBaseBootstrapperCurrentSomeUnfulfilled(t *testing.T, withIndex bool) {
defer tester.Finish()

matcher := bootstrap.NamespaceMatcher{Namespaces: tester.Namespaces}
src.EXPECT().Read(matcher).Return(currResult, nil)
next.EXPECT().Bootstrap(matcher).Return(nextResult, nil)
src.EXPECT().Read(gomock.Any(), matcher).Return(currResult, nil)
next.EXPECT().Bootstrap(gomock.Any(), matcher).Return(nextResult, nil)

tester.TestBootstrapWith(base)
tester.TestUnfulfilledForNamespaceIsEmpty(testNs)
Expand Down Expand Up @@ -270,8 +279,8 @@ func testBasebootstrapperNext(
emptyResult := testEmptyResult(testNs)
nextResult := testResult(testNs, withIndex, testShard, nextUnfulfilled)
matcher := bootstrap.NamespaceMatcher{Namespaces: tester.Namespaces}
src.EXPECT().Read(matcher).Return(emptyResult, nil)
next.EXPECT().Bootstrap(matcher).Return(nextResult, nil)
src.EXPECT().Read(gomock.Any(), matcher).Return(emptyResult, nil)
next.EXPECT().Bootstrap(gomock.Any(), matcher).Return(nextResult, nil)

tester.TestBootstrapWith(base)

Expand Down
12 changes: 11 additions & 1 deletion src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/m3db/m3/src/dbnode/storage/bootstrap/result"
"github.com/m3db/m3/src/dbnode/storage/series"
"github.com/m3db/m3/src/dbnode/topology"
"github.com/m3db/m3/src/dbnode/tracepoint"
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/x/checked"
"github.com/m3db/m3/src/x/context"
Expand Down Expand Up @@ -168,8 +169,12 @@ type readNamespaceResult struct {
// TODO(rartoul): Make this take the SnapshotMetadata files into account to reduce the
// number of commitlogs / snapshots that we need to read.
func (s *commitLogSource) Read(
ctx context.Context,
namespaces bootstrap.Namespaces,
) (bootstrap.NamespaceResults, error) {
ctx, span, _ := ctx.StartSampledTraceSpan(tracepoint.BootstrapperCommitLogSourceRead)
defer span.Finish()

timeRangesEmpty := true
for _, elem := range namespaces.Namespaces.Iter() {
namespace := elem.Value()
Expand Down Expand Up @@ -202,6 +207,8 @@ func (s *commitLogSource) Read(

startSnapshotsRead := s.nowFn()
s.log.Info("read snapshots start")
span.LogEvent("read_snapshots_start")

for _, elem := range namespaceIter {
ns := elem.Value()
accumulator := ns.DataAccumulator
Expand Down Expand Up @@ -252,6 +259,7 @@ func (s *commitLogSource) Read(

s.log.Info("read snapshots done",
zap.Duration("took", s.nowFn().Sub(startSnapshotsRead)))
span.LogEvent("read_snapshots_done")

// Setup the series accumulator pipeline.
var (
Expand Down Expand Up @@ -294,17 +302,19 @@ func (s *commitLogSource) Read(
startCommitLogsRead = s.nowFn()
)
s.log.Info("read commit logs start")
span.LogEvent("read_commitlogs_start")
defer func() {
datapointsRead := 0
for _, worker := range workers {
datapointsRead += worker.datapointsRead
}
s.log.Info("read finished",
s.log.Info("read commit logs done",
zap.Stringer("took", s.nowFn().Sub(startCommitLogsRead)),
zap.Int("datapointsRead", datapointsRead),
zap.Int("datapointsSkippedNotBootstrappingNamespace", datapointsSkippedNotBootstrappingNamespace),
zap.Int("datapointsSkippedNotBootstrappingShard", datapointsSkippedNotBootstrappingShard),
zap.Int("datapointsSkippedShardNoLongerOwned", datapointsSkippedShardNoLongerOwned))
span.LogEvent("read_commitlogs_done")
}()

iter, corruptFiles, err := s.newIteratorFn(iterOpts)
Expand Down
Loading

0 comments on commit 542d128

Please sign in to comment.