Skip to content

Commit

Permalink
Add support for incrementally persisting the active block in the peer…
Browse files Browse the repository at this point in the history
…s bootstrapper as a snapshot (#903)
  • Loading branch information
richardartoul authored Sep 20, 2018
1 parent 4fb1642 commit 2c2f509
Show file tree
Hide file tree
Showing 20 changed files with 397 additions and 175 deletions.
2 changes: 1 addition & 1 deletion src/cmd/tools/verify_commitlogs/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func main() {
nsID := ident.StringID(namespaceStr)
runOpts := bootstrap.NewRunOptions().
// Dont save intermediate results
SetIncremental(false)
SetPersistConfig(bootstrap.PersistConfig{Enabled: false})
nsMetadata, err := namespace.NewMetadata(nsID, namespace.NewOptions().SetRetentionOptions(retentionOpts))
if err != nil {
log.Fatal(err.Error())
Expand Down
35 changes: 35 additions & 0 deletions src/dbnode/integration/cluster_add_one_node_commitlog_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// +build integration

// Copyright (c) 2018 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package integration

import (
"testing"
)

func TestClusterAddOneNodeCommitlog(t *testing.T) {
if testing.Short() {
t.SkipNow()
}

testClusterAddOneNode(t, true)
}
13 changes: 13 additions & 0 deletions src/dbnode/integration/cluster_add_one_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ type idShard struct {
}

func TestClusterAddOneNode(t *testing.T) {
testClusterAddOneNode(t, false)
}

func testClusterAddOneNode(t *testing.T, verifyCommitlogCanBootstrapAfterNodeJoin bool) {
if testing.Short() {
t.SkipNow()
}
Expand Down Expand Up @@ -258,4 +262,13 @@ func TestClusterAddOneNode(t *testing.T) {
for i := range setups {
verifySeriesMaps(t, setups[i], namesp.ID(), expectedSeriesMaps[i])
}

if verifyCommitlogCanBootstrapAfterNodeJoin {
// Verify that the node that joined the cluster can immediately bootstrap
// the data it streamed from its peers from the commit log as soon as
// the bootstrapping process completes.
require.NoError(t, setups[1].stopServer())
startServerWithNewInspection(t, opts, setups[1])
verifySeriesMaps(t, setups[1], namesp.ID(), expectedSeriesMaps[1])
}
}
2 changes: 1 addition & 1 deletion src/dbnode/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func newDefaultBootstrappableTestSetups(
SetAdminClient(adminClient).
SetFetchBlocksMetadataEndpointVersion(setupOpts[i].fetchBlocksMetadataEndpointVersion).
// DatabaseBlockRetrieverManager and PersistManager need to be set or we will never execute
// the incremental path
// the persist bootstrapping path
SetDatabaseBlockRetrieverManager(setup.storageOpts.DatabaseBlockRetrieverManager()).
SetPersistManager(setup.storageOpts.PersistManager()).
SetRuntimeOptionsManager(runtimeOptsMgr)
Expand Down
26 changes: 13 additions & 13 deletions src/dbnode/storage/bootstrap/bootstrap_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/dbnode/storage/bootstrap/bootstrapper/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ The peers bootstrapper similarly bootstraps all the data from peers that the fil

For the recently read policy the filesystem bootstrapper will simply fulfill the time ranges requested matching without actually loading the series and blocks from the files it discovers. This relies on data been fetched lazily from the filesystem when data is required for a series that does not live on heap.

The peers bootstrapper will bootstrap all time ranges requested, and if performing an incremental bootstrap for a time range will write the data to disk and then remove the results from memory. An incremental bootstrap is used for any data that is immutable at the time that bootstrapping commences. For time ranges that are mutable the series and blocks are returned directly as a result from the bootstrapper.
The peers bootstrapper will bootstrap all time ranges requested, and if performing a bootstrap with persistence enabled for a time range, will write the data to disk and then remove the results from memory. A bootstrap with persistence enabled is used for any data that is immutable at the time that bootstrapping commences. For time ranges that are mutable the peer bootstrapper will still write the data out to disk in a durable manner, but in the form of a snapshot, and the series and blocks will still be returned directly as a result from the bootstrapper. This enables the commit log bootstrapper to recover the data in case the node shuts down before the in-memory data can be flushed.
3 changes: 2 additions & 1 deletion src/dbnode/storage/bootstrap/bootstrapper/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ var (
testNamespaceID = ident.StringID("testNamespace")
testTargetStart = time.Now()
testShard = uint32(0)
testDefaultRunOpts = bootstrap.NewRunOptions().SetIncremental(false)
testDefaultRunOpts = bootstrap.NewRunOptions().
SetPersistConfig(bootstrap.PersistConfig{Enabled: false})
)

func testNsMetadata(t *testing.T) namespace.Metadata {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import (

var (
testNamespaceID = ident.StringID("testnamespace")
testDefaultRunOpts = bootstrap.NewRunOptions().SetIncremental(true)
testDefaultRunOpts = bootstrap.NewRunOptions()
minCommitLogRetention = 10 * time.Minute
)

Expand Down
29 changes: 18 additions & 11 deletions src/dbnode/storage/bootstrap/bootstrapper/fs/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,15 +220,15 @@ func (s *fileSystemSource) enqueueReaders(
// Close the readers ch if and only if all readers are enqueued
defer close(readersCh)

indexIncrementalBootstrap := run == bootstrapIndexRunType && runOpts.Incremental()
if !indexIncrementalBootstrap {
shouldPersistIndexBootstrap := run == bootstrapIndexRunType && s.shouldPersist(runOpts)
if !shouldPersistIndexBootstrap {
// Normal run, open readers
s.enqueueReadersGroupedByBlockSize(ns, run, runOpts,
shardsTimeRanges, readerPool, readersCh)
return
}

// If the run is an index bootstrap using incremental bootstrapping
// If the run is an index bootstrap with the persist configuration enabled
// then we need to write out the metadata into FSTs that we store on disk,
// to avoid creating any one single huge FST at once we bucket the
// shards into number of buckets
Expand Down Expand Up @@ -594,18 +594,20 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult(
}
}

incremental := runOpts.Incremental()
noneRemaining := remainingRanges.IsEmpty()
if run == bootstrapIndexRunType && incremental && noneRemaining {
err := s.incrementalBootstrapIndexSegment(ns, requestedRanges, runResult)
var (
shouldPersist = s.shouldPersist(runOpts)
noneRemaining = remainingRanges.IsEmpty()
)
if run == bootstrapIndexRunType && shouldPersist && noneRemaining {
err := s.persistBootstrapIndexSegment(ns, requestedRanges, runResult)
if err != nil {
iopts := s.opts.ResultOptions().InstrumentOptions()
log := instrument.EmitInvariantViolationAndGetLogger(iopts)
log.WithFields(
xlog.NewField("namespace", ns.ID().String()),
xlog.NewField("requestedRanges", requestedRanges.String()),
xlog.NewField("error", err.Error()),
).Error("incremental fs index bootstrap failed")
).Error("persist fs index bootstrap failed")
}
}

Expand Down Expand Up @@ -755,12 +757,12 @@ func (s *fileSystemSource) readNextEntryAndIndex(
return err
}

func (s *fileSystemSource) incrementalBootstrapIndexSegment(
func (s *fileSystemSource) persistBootstrapIndexSegment(
ns namespace.Metadata,
requestedRanges result.ShardTimeRanges,
runResult *runResult,
) error {
// If we're performing an index run in incremental mode
// If we're performing an index run with persistence enabled
// determine if we covered a full block exactly (which should
// occur since we always group readers by block size)
min, max := requestedRanges.MinMax()
Expand Down Expand Up @@ -839,7 +841,7 @@ func (s *fileSystemSource) incrementalBootstrapIndexSegment(
requireFulfilled.Subtract(fulfilled)
exactStartEnd := min.Equal(blockStart) && max.Equal(blockStart.Add(blockSize))
if !exactStartEnd || !requireFulfilled.IsEmpty() {
return fmt.Errorf("incremental fs index bootstrap invalid ranges to persist: expected=%v, actual=%v, fulfilled=%v",
return fmt.Errorf("persistent fs index bootstrap invalid ranges to persist: expected=%v, actual=%v, fulfilled=%v",
expectedRanges.String(), requestedRanges.String(), fulfilled.String())
}

Expand Down Expand Up @@ -1155,6 +1157,11 @@ func (s *fileSystemSource) bootstrapFromIndexPersistedBlocks(
return res, nil
}

func (s *fileSystemSource) shouldPersist(runOpts bootstrap.RunOptions) bool {
persistConfig := runOpts.PersistConfig()
return persistConfig.Enabled && persistConfig.FileSetType == persist.FileSetFlushType
}

type timeWindowReaders struct {
ranges result.ShardTimeRanges
readers map[shardID]shardReaders
Expand Down
19 changes: 10 additions & 9 deletions src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,16 @@ import (
)

var (
testShard = uint32(0)
testNs1ID = ident.StringID("testNs")
testBlockSize = 2 * time.Hour
testIndexBlockSize = 4 * time.Hour
testStart = time.Now().Truncate(testBlockSize)
testFileMode = os.FileMode(0666)
testDirMode = os.ModeDir | os.FileMode(0755)
testWriterBufferSize = 10
testDefaultRunOpts = bootstrap.NewRunOptions().SetIncremental(false)
testShard = uint32(0)
testNs1ID = ident.StringID("testNs")
testBlockSize = 2 * time.Hour
testIndexBlockSize = 4 * time.Hour
testStart = time.Now().Truncate(testBlockSize)
testFileMode = os.FileMode(0666)
testDirMode = os.ModeDir | os.FileMode(0755)
testWriterBufferSize = 10
testDefaultRunOpts = bootstrap.NewRunOptions().
SetPersistConfig(bootstrap.PersistConfig{Enabled: false})
testDefaultResultOpts = result.NewOptions().SetSeriesCachePolicy(series.CacheAll)
testDefaultOpts = NewOptions().SetResultOptions(testDefaultResultOpts)
)
Expand Down
67 changes: 62 additions & 5 deletions src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/m3db/m3/src/dbnode/persist"
"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/storage/bootstrap"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/result"
"github.com/m3db/m3/src/dbnode/storage/index/convert"
"github.com/m3db/m3/src/dbnode/storage/namespace"
Expand Down Expand Up @@ -270,7 +271,7 @@ func TestBootstrapIndex(t *testing.T) {
validateGoodTaggedSeries(t, times.start, indexResults)
}

func TestBootstrapIndexIncremental(t *testing.T) {
func TestBootstrapIndexWithPersist(t *testing.T) {
dir := createTempDir(t)
defer os.RemoveAll(dir)

Expand All @@ -284,7 +285,8 @@ func TestBootstrapIndexIncremental(t *testing.T) {
scope := tally.NewTestScope("", nil)
opts = opts.SetInstrumentOptions(opts.InstrumentOptions().SetMetricsScope(scope))

runOpts := testDefaultRunOpts.SetIncremental(true)
runOpts := testDefaultRunOpts.
SetPersistConfig(bootstrap.PersistConfig{Enabled: true})

src := newFileSystemSource(opts).(*fileSystemSource)
res, err := src.ReadIndex(testNsMetadata(t), times.shardTimeRanges,
Expand Down Expand Up @@ -332,7 +334,61 @@ func TestBootstrapIndexIncremental(t *testing.T) {
require.Equal(t, int64(1), counters["fs-bootstrapper.persist-index-blocks-write+"].Value())
}

func TestBootstrapIndexIncrementalPrefersPersistedIndexBlocks(t *testing.T) {
func TestBootstrapIndexIgnoresPersistConfigIfSnapshotType(t *testing.T) {
dir := createTempDir(t)
defer os.RemoveAll(dir)

times := newTestBootstrapIndexTimes(testTimesOptions{
numBlocks: 2,
})

writeTSDBGoodTaggedSeriesDataFiles(t, dir, testNs1ID, times.start)

opts := newTestOptionsWithPersistManager(t, dir)
scope := tally.NewTestScope("", nil)
opts = opts.SetInstrumentOptions(opts.InstrumentOptions().SetMetricsScope(scope))

runOpts := testDefaultRunOpts.
SetPersistConfig(bootstrap.PersistConfig{
Enabled: true,
FileSetType: persist.FileSetSnapshotType,
})

src := newFileSystemSource(opts).(*fileSystemSource)
res, err := src.ReadIndex(testNsMetadata(t), times.shardTimeRanges,
runOpts)
require.NoError(t, err)

// Check that not segments were written out
infoFiles := fs.ReadIndexInfoFiles(src.fsopts.FilePathPrefix(), testNs1ID,
src.fsopts.InfoReaderBufferSize())
require.Equal(t, 0, len(infoFiles))

indexResults := res.IndexResults()

// Check that both segments are mutable
block, ok := indexResults[xtime.ToUnixNano(times.start)]
require.True(t, ok)
require.Equal(t, 1, len(block.Segments()))
_, mutable := block.Segments()[0].(segment.MutableSegment)
require.True(t, mutable)

block, ok = indexResults[xtime.ToUnixNano(times.start.Add(testIndexBlockSize))]
require.True(t, ok)
require.Equal(t, 1, len(block.Segments()))
_, mutable = block.Segments()[0].(segment.MutableSegment)
require.True(t, mutable)

// Validate results
validateGoodTaggedSeries(t, times.start, indexResults)

// Validate that no index blocks were read from disk and that no files were written out
counters := scope.Snapshot().Counters()
require.Equal(t, int64(0), counters["fs-bootstrapper.persist-index-blocks-read+"].Value())
require.Equal(t, int64(0), counters["fs-bootstrapper.persist-index-blocks-write+"].Value())
}

func TestBootstrapIndexWithPersistPrefersPersistedIndexBlocks(t *testing.T) {
dir := createTempDir(t)
defer os.RemoveAll(dir)

Expand All @@ -353,7 +409,8 @@ func TestBootstrapIndexIncrementalPrefersPersistedIndexBlocks(t *testing.T) {
scope := tally.NewTestScope("", nil)
opts = opts.SetInstrumentOptions(opts.InstrumentOptions().SetMetricsScope(scope))

runOpts := testDefaultRunOpts.SetIncremental(true)
runOpts := testDefaultRunOpts.
SetPersistConfig(bootstrap.PersistConfig{Enabled: true})

src := newFileSystemSource(opts).(*fileSystemSource)
res, err := src.ReadIndex(testNsMetadata(t), times.shardTimeRanges,
Expand Down Expand Up @@ -381,7 +438,7 @@ func TestBootstrapIndexIncrementalPrefersPersistedIndexBlocks(t *testing.T) {
validateGoodTaggedSeries(t, times.start, indexResults)

// Validate that read the block and no blocks were written
// (ensure incremental didn't write it back out again)
// (ensure persist config didn't write it back out again)
counters := scope.Snapshot().Counters()
require.Equal(t, int64(1), counters["fs-bootstrapper.persist-index-blocks-read+"].Value())
require.Equal(t, int64(0), counters["fs-bootstrapper.persist-index-blocks-write+"].Value())
Expand Down
4 changes: 2 additions & 2 deletions src/dbnode/storage/bootstrap/bootstrapper/fs/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ type Options interface {
FilesystemOptions() fs.Options

// SetPersistManager sets the persistence manager used to flush blocks
// when performing an incremental bootstrap run.
// when performing a bootstrap run with persistence enabled.
SetPersistManager(value persist.Manager) Options

// PersistManager returns the persistence manager used to flush blocks
// when performing an incremental bootstrap run.
// when performing a bootstrap run with persistence enabled.
PersistManager() persist.Manager

// SetBoostrapDataNumProcessors sets the number of processors for CPU-bound
Expand Down
Loading

0 comments on commit 2c2f509

Please sign in to comment.