Skip to content

Commit

Permalink
[dbnode] Avoid loading blocks in memory for namespaces with snapshots…
Browse files Browse the repository at this point in the history
… disabled during bootstrapping (#3919)

* Use `persist.FileSetFlushType` instead of `persist.FileSetSnapshotType` for second target data and index ranges if bootstrapping namespace is read only. Because of this change, bootstrappers won't keep second range blocks in memory for read only namespaces (`shouldPersist` will evaluate to true for them).

* Inline `namespace.ReadOnly` instead of declaring it as var.

* Use `snapshotEnabled` instead of `namespace.ReadOnly`.

* Removed unused field.

* Updated unit test names to align with snapshotEnabled.
  • Loading branch information
soundvibe authored Nov 17, 2021
1 parent a4f7af1 commit e2b4a8a
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 46 deletions.
83 changes: 46 additions & 37 deletions src/dbnode/storage/bootstrap/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist"
"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/retention"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/result"
"github.com/m3db/m3/src/dbnode/topology"
"github.com/m3db/m3/src/dbnode/tracepoint"
Expand Down Expand Up @@ -188,14 +187,16 @@ func (b bootstrapProcess) Run(
}
namespaceDetails := make([]NamespaceDetails, 0, len(namespaces))
for _, namespace := range namespaces {
ropts := namespace.Metadata.Options().RetentionOptions()
idxopts := namespace.Metadata.Options().IndexOptions()
dataRanges := b.targetRangesForData(at, ropts)
indexRanges := b.targetRangesForIndex(at, ropts, idxopts)
firstRanges := b.newShardTimeRanges(
dataRanges.firstRangeWithPersistTrue.Range,
namespace.Shards,
var (
nsOpts = namespace.Metadata.Options()
dataRanges = b.targetRangesForData(at, nsOpts)
indexRanges = b.targetRangesForIndex(at, nsOpts)
firstRanges = b.newShardTimeRanges(
dataRanges.firstRangeWithPersistTrue.Range,
namespace.Shards,
)
)

namespacesRunFirst.Namespaces.Set(namespace.Metadata.ID(), Namespace{
Metadata: namespace.Metadata,
Shards: namespace.Shards,
Expand All @@ -215,23 +216,23 @@ func (b bootstrapProcess) Run(
},
})
secondRanges := b.newShardTimeRanges(
dataRanges.secondRangeWithPersistFalse.Range, namespace.Shards)
dataRanges.secondRange.Range, namespace.Shards)
namespacesRunSecond.Namespaces.Set(namespace.Metadata.ID(), Namespace{
Metadata: namespace.Metadata,
Shards: namespace.Shards,
DataAccumulator: namespace.DataAccumulator,
Hooks: namespace.Hooks,
DataTargetRange: dataRanges.secondRangeWithPersistFalse,
IndexTargetRange: indexRanges.secondRangeWithPersistFalse,
DataTargetRange: dataRanges.secondRange,
IndexTargetRange: indexRanges.secondRange,
DataRunOptions: NamespaceRunOptions{
ShardTimeRanges: secondRanges.Copy(),
TargetShardTimeRanges: secondRanges.Copy(),
RunOptions: dataRanges.secondRangeWithPersistFalse.RunOptions,
RunOptions: dataRanges.secondRange.RunOptions,
},
IndexRunOptions: NamespaceRunOptions{
ShardTimeRanges: secondRanges.Copy(),
TargetShardTimeRanges: secondRanges.Copy(),
RunOptions: indexRanges.secondRangeWithPersistFalse.RunOptions,
RunOptions: indexRanges.secondRange.RunOptions,
},
})
namespaceDetails = append(namespaceDetails, NamespaceDetails{
Expand All @@ -247,12 +248,12 @@ func (b bootstrapProcess) Run(
return NamespaceResults{}, err
}

bootstrapResult := NewNamespaceResults(namespacesRunFirst)
for _, namespaces := range []Namespaces{
namespacesRunFirst,
namespacesRunSecond,
} {

var (
bootstrapResult = NewNamespaceResults(namespacesRunFirst)
namespacesToRun = []Namespaces{namespacesRunFirst, namespacesRunSecond}
lastRunIndex = len(namespacesToRun) - 1
)
for runIndex, namespaces := range namespacesToRun {
for _, entry := range namespaces.Namespaces.Iter() {
ns := entry.Value()

Expand All @@ -266,24 +267,22 @@ func (b bootstrapProcess) Run(
continue
}

// Check if snapshot-type ranges have advanced while bootstrapping previous ranges.
// If yes, return an error to force a retry
if persistConf := ns.DataRunOptions.RunOptions.PersistConfig(); persistConf.Enabled &&
persistConf.FileSetType == persist.FileSetSnapshotType {
// If last run, check if ranges have advanced while bootstrapping previous ranges.
// If yes, return an error to force a retry.
if runIndex == lastRunIndex {
var (
now = xtime.ToUnixNano(b.nowFn())
nsOptions = ns.Metadata.Options()
upToDateDataRanges = b.targetRangesForData(now, nsOptions.RetentionOptions())
upToDateDataRanges = b.targetRangesForData(now, nsOptions)
)
// Only checking data ranges. Since index blocks can only be a multiple of
// data block size, the ranges for index could advance only if data ranges
// have advanced, too (while opposite is not necessarily true)
if !upToDateDataRanges.secondRangeWithPersistFalse.Range.Equal(ns.DataTargetRange.Range) {
upToDateIndexRanges := b.targetRangesForIndex(now, nsOptions.RetentionOptions(),
nsOptions.IndexOptions())
if !upToDateDataRanges.secondRange.Range.Equal(ns.DataTargetRange.Range) {
upToDateIndexRanges := b.targetRangesForIndex(now, nsOptions)
fields := b.logFields(ns.Metadata, ns.Shards,
upToDateDataRanges.secondRangeWithPersistFalse.Range,
upToDateIndexRanges.secondRangeWithPersistFalse.Range)
upToDateDataRanges.secondRange.Range,
upToDateIndexRanges.secondRange.Range)
b.log.Error("time ranges of snapshot-type blocks advanced", fields...)
return NamespaceResults{}, ErrFileSetSnapshotTypeRangeAdvanced
}
Expand Down Expand Up @@ -444,28 +443,31 @@ func (b bootstrapProcess) logBootstrapResult(

func (b bootstrapProcess) targetRangesForData(
at xtime.UnixNano,
ropts retention.Options,
nsOpts namespace.Options,
) targetRangesResult {
ropts := nsOpts.RetentionOptions()
return b.targetRanges(at, targetRangesOptions{
retentionPeriod: ropts.RetentionPeriod(),
futureRetentionPeriod: ropts.FutureRetentionPeriod(),
blockSize: ropts.BlockSize(),
bufferPast: ropts.BufferPast(),
bufferFuture: ropts.BufferFuture(),
snapshotEnabled: nsOpts.SnapshotEnabled(),
})
}

func (b bootstrapProcess) targetRangesForIndex(
at xtime.UnixNano,
ropts retention.Options,
idxopts namespace.IndexOptions,
nsOpts namespace.Options,
) targetRangesResult {
ropts := nsOpts.RetentionOptions()
return b.targetRanges(at, targetRangesOptions{
retentionPeriod: ropts.RetentionPeriod(),
futureRetentionPeriod: ropts.FutureRetentionPeriod(),
blockSize: idxopts.BlockSize(),
blockSize: nsOpts.IndexOptions().BlockSize(),
bufferPast: ropts.BufferPast(),
bufferFuture: ropts.BufferFuture(),
snapshotEnabled: nsOpts.SnapshotEnabled(),
})
}

Expand All @@ -475,11 +477,12 @@ type targetRangesOptions struct {
blockSize time.Duration
bufferPast time.Duration
bufferFuture time.Duration
snapshotEnabled bool
}

type targetRangesResult struct {
firstRangeWithPersistTrue TargetRange
secondRangeWithPersistFalse TargetRange
firstRangeWithPersistTrue TargetRange
secondRange TargetRange
}

func (b bootstrapProcess) targetRanges(
Expand All @@ -499,6 +502,12 @@ func (b bootstrapProcess) targetRanges(
Truncate(opts.blockSize).
Add(opts.blockSize)

secondRangeFilesetType := persist.FileSetSnapshotType
if !opts.snapshotEnabled {
// NB: If snapshots are disabled for a namespace, we want to use flush type.
secondRangeFilesetType = persist.FileSetFlushType
}

// NB(r): We want the large initial time range bootstrapped to
// bootstrap with persistence so we don't keep the full raw
// data in process until we finish bootstrapping which could
Expand All @@ -514,15 +523,15 @@ func (b bootstrapProcess) targetRanges(
FileSetType: persist.FileSetFlushType,
}),
},
secondRangeWithPersistFalse: TargetRange{
secondRange: TargetRange{
Range: xtime.Range{Start: midPoint, End: cutover},
RunOptions: b.newRunOptions().SetPersistConfig(PersistConfig{
Enabled: true,
// These blocks are still active so we'll have to keep them
// in memory, but we want to snapshot them as we receive them
// so that once bootstrapping completes we can still recover
// from just the commit log bootstrapper.
FileSetType: persist.FileSetSnapshotType,
FileSetType: secondRangeFilesetType,
}),
},
}
Expand Down
33 changes: 33 additions & 0 deletions src/dbnode/storage/bootstrap/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/m3db/m3/src/cluster/shard"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist"
"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/retention"
"github.com/m3db/m3/src/dbnode/sharding"
Expand Down Expand Up @@ -135,3 +136,35 @@ func TestBootstrapProcessRunActiveBlockAdvanced(t *testing.T) {
})
}
}

func TestTargetRangesFileSetTypeForSnapshotDisabledNamespace(t *testing.T) {
sut := bootstrapProcess{processOpts: NewProcessOptions()}
nsOpts := namespace.NewOptions().SetSnapshotEnabled(false)

rangesForData := sut.targetRangesForData(xtime.Now(), nsOpts)
rangesForIndex := sut.targetRangesForIndex(xtime.Now(), nsOpts)

requireFilesetTypes(t, rangesForData, persist.FileSetFlushType)
requireFilesetTypes(t, rangesForIndex, persist.FileSetFlushType)
}

func TestTargetRangesFileSetTypeForSnapshotEnabledNamespace(t *testing.T) {
sut := bootstrapProcess{processOpts: NewProcessOptions()}
nsOpts := namespace.NewOptions().SetSnapshotEnabled(true)

rangesForData := sut.targetRangesForData(xtime.Now(), nsOpts)
rangesForIndex := sut.targetRangesForIndex(xtime.Now(), nsOpts)

requireFilesetTypes(t, rangesForData, persist.FileSetSnapshotType)
requireFilesetTypes(t, rangesForIndex, persist.FileSetSnapshotType)
}

func requireFilesetTypes(t *testing.T, ranges targetRangesResult, expectedSecond persist.FileSetType) {
persistConfigFirstRange := ranges.firstRangeWithPersistTrue.RunOptions.PersistConfig()
require.True(t, persistConfigFirstRange.Enabled)
require.Equal(t, persist.FileSetFlushType, persistConfigFirstRange.FileSetType)

persistConfigSecondRange := ranges.secondRange.RunOptions.PersistConfig()
require.True(t, persistConfigSecondRange.Enabled)
require.Equal(t, expectedSecond, persistConfigSecondRange.FileSetType)
}
15 changes: 6 additions & 9 deletions src/dbnode/storage/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,14 @@ import (
"testing"
"time"

"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/storage/bootstrap"
"github.com/m3db/m3/src/x/context"
"github.com/m3db/m3/src/x/ident"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/storage/bootstrap"
"github.com/m3db/m3/src/x/context"
"github.com/m3db/m3/src/x/ident"
xtest "github.com/m3db/m3/src/x/test"
)

Expand Down Expand Up @@ -141,9 +140,8 @@ func TestDatabaseBootstrapSubsequentCallsQueued(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)

ns.EXPECT().PrepareBootstrap(gomock.Any()).Return([]databaseShard{}, nil).AnyTimes()
ns.EXPECT().Metadata().Return(meta).AnyTimes()

ns.EXPECT().PrepareBootstrap(gomock.Any()).Return([]databaseShard{}, nil).Times(2)
ns.EXPECT().Metadata().Return(meta).Times(2)
ns.EXPECT().
Bootstrap(gomock.Any(), gomock.Any()).
Return(nil).
Expand Down Expand Up @@ -216,7 +214,6 @@ func TestDatabaseBootstrapBootstrapHooks(t *testing.T) {

ns.EXPECT().PrepareBootstrap(gomock.Any()).Return(shards, nil).AnyTimes()
ns.EXPECT().Metadata().Return(meta).AnyTimes()

ns.EXPECT().
Bootstrap(gomock.Any(), gomock.Any()).
Return(nil).
Expand Down

0 comments on commit e2b4a8a

Please sign in to comment.