Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve commitlog bootstrapping reliability in the face of sudden failures #1065

Merged
merged 15 commits into from
Oct 11, 2018
145 changes: 126 additions & 19 deletions src/dbnode/persist/fs/commitlog/read_write_prop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"time"

"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/x/os"
xtest "github.com/m3db/m3/src/x/test"
"github.com/m3db/m3x/context"
"github.com/m3db/m3x/ident"
xtime "github.com/m3db/m3x/time"
Expand Down Expand Up @@ -121,24 +123,49 @@ func TestCommitLogReadWrite(t *testing.T) {
require.Equal(t, len(writes), i)
}

// TestCommitLogPropTest property tests the commitlog by performing various
// operations (Open, Write, Close) in various orders, and then ensuring that
// all the data can be read back. In addition, in some runs it will arbitrarily
// (based on a randomly generate probability) corrupt any bytes written to disk by
// the commitlog to ensure that the commitlog reader is resilient to arbitrarily
// corrupted files and will not deadlock / panic.
func TestCommitLogPropTest(t *testing.T) {
// Temporarily reduce size of buffered channels to increase chance of
// catching deadlock issues.
var (
oldDecoderInBufChanSize = decoderInBufChanSize
oldDecoderOutBufChanSize = decoderOutBufChanSize
)
defer func() {
decoderInBufChanSize = oldDecoderInBufChanSize
decoderOutBufChanSize = oldDecoderOutBufChanSize
}()
decoderInBufChanSize = 0
decoderOutBufChanSize = 0

basePath, err := ioutil.TempDir("", "commit-log-tests")
require.NoError(t, err)
defer os.RemoveAll(basePath)

parameters := gopter.DefaultTestParameters()
parameters.MinSuccessfulTests = 8
var (
seed = time.Now().Unix()
parameters = gopter.DefaultTestParametersWithSeed(seed)
reporter = gopter.NewFormatedReporter(true, 160, os.Stdout)
)
parameters.MinSuccessfulTests = 20
properties := gopter.NewProperties(parameters)

comms := clCommandFunctor(basePath, t)
comms := clCommandFunctor(t, basePath, seed)
properties.Property("CommitLog System", commands.Prop(comms))
properties.TestingRun(t)
if !properties.Run(reporter) {
t.Errorf("failed with initial seed: %d", seed)
}
}

// clCommandFunctor is a var which implements the command.Commands interface,
// i.e. is responsible for creating/destroying the system under test and generating
// commands and initial states (clState)
func clCommandFunctor(basePath string, t *testing.T) *commands.ProtoCommands {
func clCommandFunctor(t *testing.T, basePath string, seed int64) *commands.ProtoCommands {
return &commands.ProtoCommands{
NewSystemUnderTestFunc: func(initialState commands.State) commands.SystemUnderTest {
return initialState
Expand All @@ -150,7 +177,7 @@ func clCommandFunctor(basePath string, t *testing.T) *commands.ProtoCommands {
}
os.RemoveAll(state.opts.FilesystemOptions().FilePathPrefix())
},
InitialStateGen: genState(basePath, t),
InitialStateGen: genState(t, basePath, seed),
InitialPreConditionFunc: func(state commands.State) bool {
if state == nil {
return false
Expand Down Expand Up @@ -178,6 +205,20 @@ var genOpenCommand = gen.Const(&commands.ProtoCommand{
if err != nil {
return err
}

if s.shouldCorrupt {
cLog := s.cLog.(*commitLog)
cLog.newCommitLogWriterFn = func(flushFn flushFn, opts Options) commitLogWriter {
wIface := newCommitLogWriter(flushFn, opts)
w := wIface.(*writer)
w.chunkWriter = &corruptingChunkWriter{
richardartoul marked this conversation as resolved.
Show resolved Hide resolved
chunkWriter: w.chunkWriter.(*fsChunkWriter),
corruptionProbability: s.corruptionProbability,
seed: s.seed,
}
return w
}
}
return s.cLog.Open()
},
NextStateFunc: func(state commands.State) commands.State {
Expand Down Expand Up @@ -230,8 +271,8 @@ var genCloseCommand = gen.Const(&commands.ProtoCommand{
},
})

var genWriteBehindCommand = genWrite().
Map(func(w generatedWrite) commands.Command {
var genWriteBehindCommand = gen.SliceOfN(10, genWrite()).
Map(func(writes []generatedWrite) commands.Command {
return &commands.ProtoCommand{
Name: "WriteBehind",
PreConditionFunc: func(state commands.State) bool {
Expand All @@ -241,11 +282,19 @@ var genWriteBehindCommand = genWrite().
s := q.(*clState)
ctx := context.NewContext()
defer ctx.Close()
return s.cLog.Write(ctx, w.series, w.datapoint, w.unit, w.annotation)

for _, w := range writes {
err := s.cLog.Write(ctx, w.series, w.datapoint, w.unit, w.annotation)
if err != nil {
return err
}
}

return nil
},
NextStateFunc: func(state commands.State) commands.State {
s := state.(*clState)
s.pendingWrites = append(s.pendingWrites, w)
s.pendingWrites = append(s.pendingWrites, writes...)
return s
},
PostConditionFunc: func(state commands.State, result commands.Result) *gopter.PropResult {
Expand All @@ -267,27 +316,46 @@ type clState struct {
open bool
cLog CommitLog
pendingWrites []generatedWrite
// Whether the test should corrupt the commit log.
shouldCorrupt bool
// If the test should corrupt the commit log, what is the probability of
// corruption for any given write.
corruptionProbability float64
// Seed for use with all RNGs so that runs are reproducible.
seed int64
}

// generator for commit log write
func genState(basePath string, t *testing.T) gopter.Gen {
return gen.Identifier().
func genState(t *testing.T, basePath string, seed int64) gopter.Gen {
return gopter.CombineGens(gen.Identifier(), gen.Bool(), gen.Float64Range(0, 1)).
MapResult(func(r *gopter.GenResult) *gopter.GenResult {
iface, ok := r.Retrieve()
if !ok {
return gopter.NewEmptyResult(reflect.PtrTo(reflect.TypeOf(clState{})))
}
p, ok := iface.(string)
p, ok := iface.([]interface{})
if !ok {
return gopter.NewEmptyResult(reflect.PtrTo(reflect.TypeOf(clState{})))
}
initPath := path.Join(basePath, p)
result := newInitState(initPath, t)

var (
initPath = path.Join(basePath, p[0].(string))
shouldCorrupt = p[1].(bool)
corruptionProbability = p[2].(float64)
result = newInitState(
t, initPath, shouldCorrupt, corruptionProbability, seed)
)
return gopter.NewGenResult(result, gopter.NoShrinker)
})
}

func newInitState(dir string, t *testing.T) *clState {
func newInitState(
t *testing.T,
dir string,
shouldCorrupt bool,
corruptionProbability float64,
seed int64,
) *clState {
opts := NewOptions().
SetStrategy(StrategyWriteBehind).
SetFlushInterval(defaultTestFlushInterval).
Expand All @@ -298,8 +366,11 @@ func newInitState(dir string, t *testing.T) *clState {
fsOpts := opts.FilesystemOptions().SetFilePathPrefix(dir)
opts = opts.SetFilesystemOptions(fsOpts)
return &clState{
basePath: dir,
opts: opts,
basePath: dir,
opts: opts,
shouldCorrupt: shouldCorrupt,
corruptionProbability: corruptionProbability,
seed: seed,
}
}

Expand All @@ -312,10 +383,15 @@ func (s *clState) writesArePresent(writes ...generatedWrite) error {
}
iter, err := NewIterator(iterOpts)
if err != nil {
if s.shouldCorrupt {
return nil
}
return err
}

defer iter.Close()

count := 0
for iter.Next() {
series, datapoint, unit, annotation := iter.Current()
idString := series.ID.String()
Expand All @@ -330,9 +406,15 @@ func (s *clState) writesArePresent(writes ...generatedWrite) error {
unit: unit,
annotation: annotation,
}
count++
}

if s.shouldCorrupt {
return nil
}

if err := iter.Err(); err != nil {
return err
return fmt.Errorf("failed after reading %d datapoints: %v", count, err)
}

missingErr := fmt.Errorf("writesOnDisk: %+v, writes: %+v", writesOnDisk, writes)
Expand Down Expand Up @@ -433,3 +515,28 @@ func uniqueID(ns, s string) uint64 {
nsMap[s] = idx
return idx
}

// corruptingChunkWriter implements the chunkWriter interface and can corrupt all writes issued
// to it based on a configurable probability.
type corruptingChunkWriter struct {
richardartoul marked this conversation as resolved.
Show resolved Hide resolved
chunkWriter *fsChunkWriter
corruptionProbability float64
seed int64
}

func (c *corruptingChunkWriter) reset(f xos.File) {
c.chunkWriter.fd = xtest.NewCorruptingFD(
f, c.corruptionProbability, c.seed)
}

func (c *corruptingChunkWriter) Write(p []byte) (int, error) {
return c.chunkWriter.Write(p)
}

func (c *corruptingChunkWriter) close() error {
return c.chunkWriter.close()
}

func (c *corruptingChunkWriter) isOpen() bool {
return c.chunkWriter.isOpen()
}
15 changes: 10 additions & 5 deletions src/dbnode/persist/fs/commitlog/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@ import (
xtime "github.com/m3db/m3x/time"
)

const defaultDecodeEntryBufSize = 1024
const decoderInBufChanSize = 1000
const decoderOutBufChanSize = 1000
var (
// var instead of const so we can modify them in tests.
defaultDecodeEntryBufSize = 1024
decoderInBufChanSize = 1000
decoderOutBufChanSize = 1000
)

var (
emptyLogInfo schema.LogInfo
Expand Down Expand Up @@ -250,7 +253,7 @@ func (r *reader) readLoop() {
}

r.decoderQueues[0] <- decoderArg{
bytes: data,
bytes: nil,
err: err,
}
continue
Expand Down Expand Up @@ -383,7 +386,9 @@ func (r *reader) decoderLoop(inBuf <-chan decoderArg, outBuf chan<- readResponse
}

func (r *reader) handleDecoderLoopIterationEnd(arg decoderArg, outBuf chan<- readResponse, response readResponse, err error) {
arg.bufPool <- arg.bytes
if arg.bytes != nil {
arg.bufPool <- arg.bytes
}
if outBuf != nil {
response.resultErr = err
outBuf <- response
Expand Down
Loading