Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve how M3DB handles data durability during topology changes #1183

Merged
merged 58 commits into from
Dec 14, 2018
Merged
Show file tree
Hide file tree
Changes from 53 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
6ef03a3
Add clarifying comment
Nov 15, 2018
ff2de89
Add durably persisted method to database and default to snapshot enab…
Nov 15, 2018
bb703b1
Refactor peers bootstrapper to not incrementally snapshot
Nov 15, 2018
cd52034
wip
Nov 15, 2018
98259a5
Remove debug prints
Nov 16, 2018
a78c997
wip
Nov 16, 2018
c84140c
Improve integration test
Nov 16, 2018
e211867
remove debug log
Nov 16, 2018
24774e8
Clean up peer bootstrapper more
Nov 16, 2018
53d2e27
Clean up server properly
Nov 16, 2018
eb08aea
Fix typo
Nov 16, 2018
e77a2f9
wip
Nov 16, 2018
0677a2a
Add unit tests for IsBootstrapped and IsBootstrappedAndDurable
Nov 16, 2018
fbfdd6f
Update flushmanager test with lastsuccessfulsnapshotstarttime
Nov 16, 2018
f9652fb
Update clustered database
Nov 16, 2018
a8b0a3d
Update test
Nov 16, 2018
e0110b9
remove print
Nov 16, 2018
e42468d
Fix comment
Nov 16, 2018
0684d5c
fix comment
Nov 16, 2018
a651bcf
fix comments
Nov 16, 2018
68f54c9
fix comment
Nov 16, 2018
ca1c3a7
fix lint issue
Nov 16, 2018
5d09229
WIP bullshit
Nov 16, 2018
71f4f81
Fix broken logic and improve integration test
Nov 16, 2018
79232da
Add comment
Nov 16, 2018
e124a5a
Refactor guard statements
Nov 19, 2018
d473e88
Improve tests and comments
Nov 19, 2018
7a6700d
remove unused struct field
Nov 19, 2018
4bf1283
set shardSetAssignedAt on instantiation
Nov 19, 2018
7692578
Update comment
Nov 19, 2018
a9d82cd
Update comment
Nov 19, 2018
bf8c623
Comment
Nov 19, 2018
211955b
Delete commented out code
Nov 19, 2018
35c645d
Delete invalid test
Nov 19, 2018
1a18570
Add test for snapshot minimum interval
Nov 19, 2018
9313876
fix comments
Nov 19, 2018
da65182
Make zero snapshotminimum interval work
Nov 19, 2018
053b706
fix broken unit tests
Nov 19, 2018
343d9a3
Change default min snapshot interval to 5 seconds
Nov 19, 2018
7ca3da6
Change default min snapshot interval to 10- seconds
Nov 19, 2018
176edb9
regen mocks
Nov 19, 2018
db44926
Fix merge conflict issue
Nov 19, 2018
e0b8541
Fix scripts to wait for shards to be marked available
Nov 19, 2018
8cbc5a7
Only make undurable if received new shards
Nov 19, 2018
ed99b59
Add unit test
Nov 19, 2018
b359a10
Simplify test
Nov 19, 2018
dffcbaa
fix comments
Nov 19, 2018
9f6b13a
Fix flaky test
Nov 19, 2018
e6167ba
fix broke ntest
Nov 19, 2018
b472abc
Fix broken test
Nov 20, 2018
4431196
Shrink defaultTickMinimumInterval to 5 seconds
Nov 20, 2018
fc42969
Add debug logs
Nov 20, 2018
48bfa8a
Improve debug logs
Nov 20, 2018
bd1eb3b
bump default tick min interval to 10 seconds
Nov 20, 2018
19918e7
Address feedback
Nov 21, 2018
ca02bfe
address nit feedback
Dec 13, 2018
bc798ea
Update comment
Dec 14, 2018
8cfaefb
fix comment
Dec 14, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions scripts/development/m3_stack/start_m3.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

set -xe

source $GOPATH/src/github.com/m3db/m3/scripts/docker-integration-tests/common.sh

DOCKER_ARGS="-d --renew-anon-volumes"
if [[ "$FORCE_BUILD" = true ]] ; then
DOCKER_ARGS="--build -d --renew-anon-volumes"
Expand Down Expand Up @@ -146,6 +148,10 @@ echo "Validating topology"
[ "$(curl -sSf localhost:7201/api/v1/placement | jq .placement.instances.m3db_seed.id)" == '"m3db_seed"' ]
echo "Done validating topology"

echo "Sleeping until shards are marked as available"
richardartoul marked this conversation as resolved.
Show resolved Hide resolved
ATTEMPTS=10 TIMEOUT=1 retry_with_backoff \
'[ "$(curl -sSf 0.0.0.0:7201/api/v1/placement | grep -c INITIALIZING)" -eq 0 ]'

if [[ "$AGGREGATOR_PIPELINE" = true ]]; then
curl -vvvsSf -X POST localhost:7201/api/v1/services/m3aggregator/placement/init -d '{
"num_shards": 64,
Expand Down
4 changes: 4 additions & 0 deletions scripts/docker-integration-tests/prometheus/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ echo "Sleep until bootstrapped"
ATTEMPTS=6 TIMEOUT=2 retry_with_backoff \
'[ "$(curl -sSf 0.0.0.0:9002/health | jq .bootstrapped)" == true ]'

echo "Sleep until shards are marked as available"
richardartoul marked this conversation as resolved.
Show resolved Hide resolved
ATTEMPTS=10 TIMEOUT=1 retry_with_backoff \
'[ "$(curl -sSf 0.0.0.0:7201/api/v1/placement | grep -c INITIALIZING)" -eq 0 ]'

echo "Start Prometheus containers"
docker-compose -f ${COMPOSE_FILE} up -d prometheus01

Expand Down
6 changes: 5 additions & 1 deletion scripts/docker-integration-tests/simple/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/namespace -d '{
"flushEnabled": true,
"writesToCommitLog": true,
"cleanupEnabled": true,
"snapshotEnabled": false,
"snapshotEnabled": true,
"repairEnabled": false,
"retentionOptions": {
"retentionPeriodNanos": 172800000000000,
Expand Down Expand Up @@ -81,6 +81,10 @@ echo "Sleep until bootstrapped"
ATTEMPTS=6 TIMEOUT=2 retry_with_backoff \
'[ "$(curl -sSf 0.0.0.0:9002/health | jq .bootstrapped)" == true ]'

echo "Sleep until shards are marked as available"
richardartoul marked this conversation as resolved.
Show resolved Hide resolved
ATTEMPTS=10 TIMEOUT=1 retry_with_backoff \
'[ "$(curl -sSf 0.0.0.0:7201/api/v1/placement | grep -c INITIALIZING)" -eq 0 ]'

echo "Write data"
curl -vvvsS -X POST 0.0.0.0:9003/writetagged -d '{
"namespace": "default",
Expand Down
10 changes: 8 additions & 2 deletions src/dbnode/client/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
xerrors "github.com/m3db/m3x/errors"
)

// IsInternalServerError determines if the error is an internal server error
// IsInternalServerError determines if the error is an internal server error.
func IsInternalServerError(err error) bool {
for err != nil {
if e, ok := err.(*rpc.Error); ok && tterrors.IsInternalError(e) {
Expand All @@ -40,7 +40,7 @@ func IsInternalServerError(err error) bool {
return false
}

// IsBadRequestError determines if the error is a bad request error
// IsBadRequestError determines if the error is a bad request error.
func IsBadRequestError(err error) bool {
for err != nil {
if e, ok := err.(*rpc.Error); ok && tterrors.IsBadRequestError(e) {
Expand All @@ -54,6 +54,12 @@ func IsBadRequestError(err error) bool {
return false
}

// IsConsistencyResultError determines if the error is a consistency result error.
func IsConsistencyResultError(err error) bool {
_, ok := err.(consistencyResultErr)
return ok
}

// NumResponded returns how many nodes responded for a given error
func NumResponded(err error) int {
for err != nil {
Expand Down
111 changes: 92 additions & 19 deletions src/dbnode/integration/cluster_add_one_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
package integration

import (
"fmt"
"strconv"
"testing"
"time"

"github.com/m3db/m3/src/cluster/services"
"github.com/m3db/m3/src/cluster/shard"
"github.com/m3db/m3/src/dbnode/client"
"github.com/m3db/m3/src/dbnode/integration/fake"
"github.com/m3db/m3/src/dbnode/integration/generate"
"github.com/m3db/m3/src/dbnode/retention"
Expand Down Expand Up @@ -56,7 +58,7 @@ func testClusterAddOneNode(t *testing.T, verifyCommitlogCanBootstrapAfterNodeJoi
t.SkipNow()
}

// Test setups
// Test setups.
log := xlog.SimpleLogger

namesp, err := namespace.NewMetadata(testNamespaces[0],
Expand All @@ -68,7 +70,11 @@ func testClusterAddOneNode(t *testing.T, verifyCommitlogCanBootstrapAfterNodeJoi
SetBufferFuture(2*time.Minute)))
require.NoError(t, err)
opts := newTestOptions(t).
SetNamespaces([]namespace.Metadata{namesp})
SetNamespaces([]namespace.Metadata{namesp}).
// Prevent snapshotting from happening too frequently to allow for the
// possibility of a snapshot occurring after the shard set is assigned,
// but not after the node finishes bootstrapping.
SetMinimumSnapshotInterval(5 * time.Second)

minShard := uint32(0)
maxShard := uint32(opts.NumShards()) - uint32(1)
Expand Down Expand Up @@ -120,7 +126,7 @@ func testClusterAddOneNode(t *testing.T, verifyCommitlogCanBootstrapAfterNodeJoi
setups, closeFn := newDefaultBootstrappableTestSetups(t, opts, setupOpts)
defer closeFn()

// Write test data for first node
// Write test data for first node.
topo, err := topoInit.Init()
require.NoError(t, err)
ids := []idShard{}
Expand Down Expand Up @@ -149,20 +155,23 @@ func testClusterAddOneNode(t *testing.T, verifyCommitlogCanBootstrapAfterNodeJoi
}

for _, id := range ids {
// Verify IDs will map to halves of the shard space
// Verify IDs will map to halves of the shard space.
require.Equal(t, id.shard, shardSet.Lookup(ident.StringID(id.str)))
}

now := setups[0].getNowFn()
blockSize := namesp.Options().RetentionOptions().BlockSize()
seriesMaps := generate.BlocksByStart([]generate.BlockConfig{
{IDs: []string{ids[0].str, ids[1].str}, NumPoints: 180, Start: now.Add(-blockSize)},
{IDs: []string{ids[0].str, ids[2].str}, NumPoints: 90, Start: now},
})
var (
now = setups[0].getNowFn()
blockStart = now
blockSize = namesp.Options().RetentionOptions().BlockSize()
seriesMaps = generate.BlocksByStart([]generate.BlockConfig{
{IDs: []string{ids[0].str, ids[1].str}, NumPoints: 180, Start: blockStart.Add(-blockSize)},
{IDs: []string{ids[0].str, ids[2].str}, NumPoints: 90, Start: blockStart},
})
)
err = writeTestDataToDisk(namesp, setups[0], seriesMaps)
require.NoError(t, err)

// Prepare verification of data on nodes
// Prepare verification of data on nodes.
expectedSeriesMaps := make([]map[xtime.UnixNano]generate.SeriesBlock, 2)
expectedSeriesIDs := make([]map[string]struct{}, 2)
for i := range expectedSeriesMaps {
Expand Down Expand Up @@ -194,30 +203,80 @@ func testClusterAddOneNode(t *testing.T, verifyCommitlogCanBootstrapAfterNodeJoi
require.Equal(t, 2, len(expectedSeriesIDs[0]))
require.Equal(t, 1, len(expectedSeriesIDs[1]))

// Start the first server with filesystem bootstrapper
// Start the first server with filesystem bootstrapper.
require.NoError(t, setups[0].startServer())

// Start the last server with peers and filesystem bootstrappers, no shards
// are assigned at first
// are assigned at first.
require.NoError(t, setups[1].startServer())
log.Debug("servers are now up")

// Stop the servers at test completion
// Stop the servers on test completion.
defer func() {
log.Debug("servers closing")
setups.parallel(func(s *testSetup) {
require.NoError(t, s.stopServer())
})
log.Debug("servers are now down")
}()

// Bootstrap the new shards
// Bootstrap the new shards.
log.Debug("resharding to initialize shards on second node")
svc.SetInstances(instances.add)
svcs.NotifyServiceUpdate("m3db")
waitUntilHasBootstrappedShardsExactly(setups[1].db, testutil.Uint32Range(midShard+1, maxShard))
go func() {
for {
time.Sleep(time.Second)
for _, setup := range setups {
now = now.Add(time.Second)
setup.setNowFn(now)
}
}
}()

// Generate some new data that will be written to the node while peer streaming is taking place
// to make sure that the data that is streamed in and the data that is received while streaming
// is going on are both handled correctly. In addition, this will ensure that we hold onto both
// sets of data durably after topology changes and that the node can be properly bootstrapped
// from just the filesystem and commitlog in a later portion of the test.
seriesToWriteDuringPeerStreaming := []string{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just add this as a separate test? Seems like we're repurposing this one and changing it quite substantially?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two separate tests, they just call into this shared codepath. I think its fine, if I separated them out completely they'd be almost complete copy-pasta of each other, and the existing test benefits from this additional check as well (even if you don't verify the commitlog behavior, if you're doing a node add you probably want to make sure the node adding keeps track of the data it receives from its peer as well as all the data its receiving while actually joining)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, yeah makes sense.

"series_after_bootstrap1",
"series_after_bootstrap2",
}
// Ensure that the new series belong that we're going to write belong to the host that is peer
// streaming data.
for _, seriesName := range seriesToWriteDuringPeerStreaming {
shard := shardSet.Lookup(ident.StringID(seriesName))
require.True(t, shard > midShard,
fmt.Sprintf("series: %s does not shard to second host", seriesName))
}
seriesReceivedDuringPeerStreaming := generate.BlocksByStart([]generate.BlockConfig{
{IDs: seriesToWriteDuringPeerStreaming, NumPoints: 90, Start: blockStart},
})
// Merge the newly generated series into the expected series map.
for blockStart, series := range seriesReceivedDuringPeerStreaming {
expectedSeriesMaps[1][blockStart] = append(expectedSeriesMaps[1][blockStart], series...)
}

// Spin up a background goroutine to issue the writes to the node while its streaming data
// from its peer.
doneWritingWhilePeerStreaming := make(chan struct{})
go func() {
for _, testData := range seriesReceivedDuringPeerStreaming {
err := setups[1].writeBatch(namesp.ID(), testData)
// We expect consistency errors because we're only running with
// R.F = 2 and one node is leaving and one node is joining for
// each of the shards that is changing hands.
if !client.IsConsistencyResultError(err) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm maybe it's better to make it just RF=3?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I honestly just didn't do it because it would probably take a few hours to re-write all the sharding logic and fix any little issues that crop up and it doesn't really make the test any better. Can change it if you feel strongly

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Np, that's fine.

panic(err)
}
}
doneWritingWhilePeerStreaming <- struct{}{}
}()

waitUntilHasBootstrappedShardsExactly(setups[1].db, testutil.Uint32Range(midShard+1, maxShard))
<-doneWritingWhilePeerStreaming
log.Debug("waiting for shards to be marked initialized")

allMarkedAvailable := func(
fakePlacementService fake.M3ClusterPlacementService,
instanceID string,
Expand Down Expand Up @@ -265,9 +324,23 @@ func testClusterAddOneNode(t *testing.T, verifyCommitlogCanBootstrapAfterNodeJoi

if verifyCommitlogCanBootstrapAfterNodeJoin {
// Verify that the node that joined the cluster can immediately bootstrap
// the data it streamed from its peers from the commit log as soon as
// the bootstrapping process completes.
// the data it streamed from its peers from the commitlog / snapshots as
// soon as all the shards have been marked as available (I.E as soon as
// when the placement change is considered "complete".)
//
// In addition, verify that any data that was received during the same block
// as the streamed data (I.E while peer streaming) is also present and
// bootstrappable from the commitlog bootstrapper.

// Reset the topology initializer as the M3DB session will have closed it.
require.NoError(t, setups[1].stopServer())
topoOpts := topology.NewDynamicOptions().
SetConfigServiceClient(fake.NewM3ClusterClient(svcs, nil))
topoInit := topology.NewDynamicInitializer(topoOpts)
setups[1].topoInit = topoInit

// Start the server that performed peer streaming with only the filesystem and
// commitlog bootstrapper and make sure it has all the expected data.
startServerWithNewInspection(t, opts, setups[1])
verifySeriesMaps(t, setups[1], namesp.ID(), expectedSeriesMaps[1])
}
Expand Down
6 changes: 5 additions & 1 deletion src/dbnode/integration/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ const (
// defaultTickMinimumInterval is the default minimum tick interval.
defaultTickMinimumInterval = 1 * time.Second

// defaultMinimimumSnapshotInterval is the default minimum snapshot interval.
defaultMinimimumSnapshotInterval = 1 * time.Second

// defaultUseTChannelClientForReading determines whether we use the tchannel client for reading by default.
defaultUseTChannelClientForReading = true

Expand Down Expand Up @@ -264,7 +267,7 @@ type testOptions interface {
// SetMinimumSnapshotInterval sets the minimum interval between snapshots.
SetMinimumSnapshotInterval(value time.Duration) testOptions

// MinimumSnapshotInterval returns the minimum interval between snapshots
// MinimumSnapshotInterval returns the minimum interval between snapshots.
MinimumSnapshotInterval() time.Duration
}

Expand Down Expand Up @@ -325,6 +328,7 @@ func newTestOptions(t *testing.T) testOptions {
writeConsistencyLevel: defaultWriteConsistencyLevel,
numShards: defaultNumShards,
maxWiredBlocks: defaultMaxWiredBlocks,
minimumSnapshotInterval: defaultMinimimumSnapshotInterval,
useTChannelClientForReading: defaultUseTChannelClientForReading,
useTChannelClientForWriting: defaultUseTChannelClientForWriting,
useTChannelClientForTruncation: defaultUseTChannelClientForTruncation,
Expand Down
4 changes: 2 additions & 2 deletions src/dbnode/integration/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,12 +447,12 @@ func (ts *testSetup) startServer() error {

topo, err := ts.topoInit.Init()
if err != nil {
return err
return fmt.Errorf("error initializing topology: %v", err)
}

topoWatch, err := topo.Watch()
if err != nil {
return err
return fmt.Errorf("error watching topology: %v", err)
}

ts.db, err = cluster.NewDatabase(ts.hostID, topo, topoWatch, ts.storageOpts)
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/runtime/runtime_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ const (
defaultWriteNewSeriesLimitPerShardPerSecond = 0
defaultTickSeriesBatchSize = 512
defaultTickPerSeriesSleepDuration = 100 * time.Microsecond
defaultTickMinimumInterval = time.Minute
defaultTickMinimumInterval = 5 * time.Second
defaultMaxWiredBlocks = uint(1 << 18) // 262,144
)

Expand Down
25 changes: 16 additions & 9 deletions src/dbnode/storage/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package storage
import (
"errors"
"sync"
"time"

"github.com/m3db/m3/src/dbnode/clock"
"github.com/m3db/m3/src/dbnode/storage/bootstrap"
Expand Down Expand Up @@ -58,15 +59,16 @@ var (
type bootstrapManager struct {
sync.RWMutex

database database
mediator databaseMediator
opts Options
log xlog.Logger
nowFn clock.NowFn
processProvider bootstrap.ProcessProvider
state BootstrapState
hasPending bool
status tally.Gauge
database database
mediator databaseMediator
opts Options
log xlog.Logger
nowFn clock.NowFn
processProvider bootstrap.ProcessProvider
state BootstrapState
hasPending bool
status tally.Gauge
lastBootstrapCompletionTime time.Time
}

func newBootstrapManager(
Expand All @@ -93,6 +95,10 @@ func (m *bootstrapManager) IsBootstrapped() bool {
return state == Bootstrapped
}

func (m *bootstrapManager) LastBootstrapCompletionTime() (time.Time, bool) {
return m.lastBootstrapCompletionTime, !m.lastBootstrapCompletionTime.IsZero()
}

func (m *bootstrapManager) Bootstrap() error {
m.Lock()
switch m.state {
Expand Down Expand Up @@ -148,6 +154,7 @@ func (m *bootstrapManager) Bootstrap() error {
// on its own course so that the load of ticking and flushing is more spread out
// across the cluster.

m.lastBootstrapCompletionTime = m.nowFn()
richardartoul marked this conversation as resolved.
Show resolved Hide resolved
return multiErr.FinalError()
}

Expand Down
Loading