diff --git a/src/dbnode/persist/fs/commitlog/read_write_prop_test.go b/src/dbnode/persist/fs/commitlog/read_write_prop_test.go index 220811d7eb..5870995c1b 100644 --- a/src/dbnode/persist/fs/commitlog/read_write_prop_test.go +++ b/src/dbnode/persist/fs/commitlog/read_write_prop_test.go @@ -33,6 +33,8 @@ import ( "time" "github.com/m3db/m3/src/dbnode/ts" + "github.com/m3db/m3/src/x/os" + xtest "github.com/m3db/m3/src/x/test" "github.com/m3db/m3x/context" "github.com/m3db/m3x/ident" xtime "github.com/m3db/m3x/time" @@ -121,24 +123,49 @@ func TestCommitLogReadWrite(t *testing.T) { require.Equal(t, len(writes), i) } +// TestCommitLogPropTest property tests the commitlog by performing various +// operations (Open, Write, Close) in various orders, and then ensuring that +// all the data can be read back. In addition, in some runs it will arbitrarily +// (based on a randomly generate probability) corrupt any bytes written to disk by +// the commitlog to ensure that the commitlog reader is resilient to arbitrarily +// corrupted files and will not deadlock / panic. func TestCommitLogPropTest(t *testing.T) { + // Temporarily reduce size of buffered channels to increase chance of + // catching deadlock issues. + var ( + oldDecoderInBufChanSize = decoderInBufChanSize + oldDecoderOutBufChanSize = decoderOutBufChanSize + ) + defer func() { + decoderInBufChanSize = oldDecoderInBufChanSize + decoderOutBufChanSize = oldDecoderOutBufChanSize + }() + decoderInBufChanSize = 0 + decoderOutBufChanSize = 0 + basePath, err := ioutil.TempDir("", "commit-log-tests") require.NoError(t, err) defer os.RemoveAll(basePath) - parameters := gopter.DefaultTestParameters() - parameters.MinSuccessfulTests = 8 + var ( + seed = time.Now().Unix() + parameters = gopter.DefaultTestParametersWithSeed(seed) + reporter = gopter.NewFormatedReporter(true, 160, os.Stdout) + ) + parameters.MinSuccessfulTests = 20 properties := gopter.NewProperties(parameters) - comms := clCommandFunctor(basePath, t) + comms := clCommandFunctor(t, basePath, seed) properties.Property("CommitLog System", commands.Prop(comms)) - properties.TestingRun(t) + if !properties.Run(reporter) { + t.Errorf("failed with initial seed: %d", seed) + } } // clCommandFunctor is a var which implements the command.Commands interface, // i.e. is responsible for creating/destroying the system under test and generating // commands and initial states (clState) -func clCommandFunctor(basePath string, t *testing.T) *commands.ProtoCommands { +func clCommandFunctor(t *testing.T, basePath string, seed int64) *commands.ProtoCommands { return &commands.ProtoCommands{ NewSystemUnderTestFunc: func(initialState commands.State) commands.SystemUnderTest { return initialState @@ -150,7 +177,7 @@ func clCommandFunctor(basePath string, t *testing.T) *commands.ProtoCommands { } os.RemoveAll(state.opts.FilesystemOptions().FilePathPrefix()) }, - InitialStateGen: genState(basePath, t), + InitialStateGen: genState(t, basePath, seed), InitialPreConditionFunc: func(state commands.State) bool { if state == nil { return false @@ -178,6 +205,20 @@ var genOpenCommand = gen.Const(&commands.ProtoCommand{ if err != nil { return err } + + if s.shouldCorrupt { + cLog := s.cLog.(*commitLog) + cLog.newCommitLogWriterFn = func(flushFn flushFn, opts Options) commitLogWriter { + wIface := newCommitLogWriter(flushFn, opts) + w := wIface.(*writer) + w.chunkWriter = newCorruptingChunkWriter( + w.chunkWriter.(*fsChunkWriter), + s.corruptionProbability, + s.seed, + ) + return w + } + } return s.cLog.Open() }, NextStateFunc: func(state commands.State) commands.State { @@ -230,8 +271,8 @@ var genCloseCommand = gen.Const(&commands.ProtoCommand{ }, }) -var genWriteBehindCommand = genWrite(). - Map(func(w generatedWrite) commands.Command { +var genWriteBehindCommand = gen.SliceOfN(10, genWrite()). + Map(func(writes []generatedWrite) commands.Command { return &commands.ProtoCommand{ Name: "WriteBehind", PreConditionFunc: func(state commands.State) bool { @@ -241,11 +282,19 @@ var genWriteBehindCommand = genWrite(). s := q.(*clState) ctx := context.NewContext() defer ctx.Close() - return s.cLog.Write(ctx, w.series, w.datapoint, w.unit, w.annotation) + + for _, w := range writes { + err := s.cLog.Write(ctx, w.series, w.datapoint, w.unit, w.annotation) + if err != nil { + return err + } + } + + return nil }, NextStateFunc: func(state commands.State) commands.State { s := state.(*clState) - s.pendingWrites = append(s.pendingWrites, w) + s.pendingWrites = append(s.pendingWrites, writes...) return s }, PostConditionFunc: func(state commands.State, result commands.Result) *gopter.PropResult { @@ -267,27 +316,46 @@ type clState struct { open bool cLog CommitLog pendingWrites []generatedWrite + // Whether the test should corrupt the commit log. + shouldCorrupt bool + // If the test should corrupt the commit log, what is the probability of + // corruption for any given write. + corruptionProbability float64 + // Seed for use with all RNGs so that runs are reproducible. + seed int64 } // generator for commit log write -func genState(basePath string, t *testing.T) gopter.Gen { - return gen.Identifier(). +func genState(t *testing.T, basePath string, seed int64) gopter.Gen { + return gopter.CombineGens(gen.Identifier(), gen.Bool(), gen.Float64Range(0, 1)). MapResult(func(r *gopter.GenResult) *gopter.GenResult { iface, ok := r.Retrieve() if !ok { return gopter.NewEmptyResult(reflect.PtrTo(reflect.TypeOf(clState{}))) } - p, ok := iface.(string) + p, ok := iface.([]interface{}) if !ok { return gopter.NewEmptyResult(reflect.PtrTo(reflect.TypeOf(clState{}))) } - initPath := path.Join(basePath, p) - result := newInitState(initPath, t) + + var ( + initPath = path.Join(basePath, p[0].(string)) + shouldCorrupt = p[1].(bool) + corruptionProbability = p[2].(float64) + result = newInitState( + t, initPath, shouldCorrupt, corruptionProbability, seed) + ) return gopter.NewGenResult(result, gopter.NoShrinker) }) } -func newInitState(dir string, t *testing.T) *clState { +func newInitState( + t *testing.T, + dir string, + shouldCorrupt bool, + corruptionProbability float64, + seed int64, +) *clState { opts := NewOptions(). SetStrategy(StrategyWriteBehind). SetFlushInterval(defaultTestFlushInterval). @@ -298,8 +366,11 @@ func newInitState(dir string, t *testing.T) *clState { fsOpts := opts.FilesystemOptions().SetFilePathPrefix(dir) opts = opts.SetFilesystemOptions(fsOpts) return &clState{ - basePath: dir, - opts: opts, + basePath: dir, + opts: opts, + shouldCorrupt: shouldCorrupt, + corruptionProbability: corruptionProbability, + seed: seed, } } @@ -312,10 +383,15 @@ func (s *clState) writesArePresent(writes ...generatedWrite) error { } iter, err := NewIterator(iterOpts) if err != nil { + if s.shouldCorrupt { + return nil + } return err } defer iter.Close() + + count := 0 for iter.Next() { series, datapoint, unit, annotation := iter.Current() idString := series.ID.String() @@ -330,9 +406,15 @@ func (s *clState) writesArePresent(writes ...generatedWrite) error { unit: unit, annotation: annotation, } + count++ } + + if s.shouldCorrupt { + return nil + } + if err := iter.Err(); err != nil { - return err + return fmt.Errorf("failed after reading %d datapoints: %v", count, err) } missingErr := fmt.Errorf("writesOnDisk: %+v, writes: %+v", writesOnDisk, writes) @@ -433,3 +515,40 @@ func uniqueID(ns, s string) uint64 { nsMap[s] = idx return idx } + +// corruptingChunkWriter implements the chunkWriter interface and can corrupt all writes issued +// to it based on a configurable probability. +type corruptingChunkWriter struct { + chunkWriter *fsChunkWriter + corruptionProbability float64 + seed int64 +} + +func newCorruptingChunkWriter( + chunkWriter *fsChunkWriter, + corruptionProbability float64, + seed int64, +) chunkWriter { + return &corruptingChunkWriter{ + chunkWriter: chunkWriter, + corruptionProbability: corruptionProbability, + seed: seed, + } +} + +func (c *corruptingChunkWriter) reset(f xos.File) { + c.chunkWriter.fd = xtest.NewCorruptingFile( + f, c.corruptionProbability, c.seed) +} + +func (c *corruptingChunkWriter) Write(p []byte) (int, error) { + return c.chunkWriter.Write(p) +} + +func (c *corruptingChunkWriter) close() error { + return c.chunkWriter.close() +} + +func (c *corruptingChunkWriter) isOpen() bool { + return c.chunkWriter.isOpen() +} diff --git a/src/dbnode/persist/fs/commitlog/reader.go b/src/dbnode/persist/fs/commitlog/reader.go index 5cb785c1be..2084520c8f 100644 --- a/src/dbnode/persist/fs/commitlog/reader.go +++ b/src/dbnode/persist/fs/commitlog/reader.go @@ -40,9 +40,12 @@ import ( xtime "github.com/m3db/m3x/time" ) -const defaultDecodeEntryBufSize = 1024 -const decoderInBufChanSize = 1000 -const decoderOutBufChanSize = 1000 +var ( + // var instead of const so we can modify them in tests. + defaultDecodeEntryBufSize = 1024 + decoderInBufChanSize = 1000 + decoderOutBufChanSize = 1000 +) var ( emptyLogInfo schema.LogInfo @@ -250,7 +253,7 @@ func (r *reader) readLoop() { } r.decoderQueues[0] <- decoderArg{ - bytes: data, + bytes: nil, err: err, } continue @@ -383,7 +386,9 @@ func (r *reader) decoderLoop(inBuf <-chan decoderArg, outBuf chan<- readResponse } func (r *reader) handleDecoderLoopIterationEnd(arg decoderArg, outBuf chan<- readResponse, response readResponse, err error) { - arg.bufPool <- arg.bytes + if arg.bytes != nil { + arg.bufPool <- arg.bytes + } if outBuf != nil { response.resultErr = err outBuf <- response diff --git a/src/dbnode/persist/fs/commitlog/writer.go b/src/dbnode/persist/fs/commitlog/writer.go index 4e8dd73e2c..133f54267b 100644 --- a/src/dbnode/persist/fs/commitlog/writer.go +++ b/src/dbnode/persist/fs/commitlog/writer.go @@ -24,6 +24,7 @@ import ( "bufio" "encoding/binary" "errors" + "io" "os" "time" @@ -34,6 +35,7 @@ import ( "github.com/m3db/m3/src/dbnode/persist/fs/msgpack" "github.com/m3db/m3/src/dbnode/persist/schema" "github.com/m3db/m3/src/dbnode/ts" + "github.com/m3db/m3/src/x/os" "github.com/m3db/m3/src/x/serialize" "github.com/m3db/m3x/ident" xtime "github.com/m3db/m3x/time" @@ -80,6 +82,14 @@ type commitLogWriter interface { Close() error } +type chunkWriter interface { + io.Writer + + reset(f xos.File) + close() error + isOpen() bool +} + type flushFn func(err error) type writer struct { @@ -89,7 +99,7 @@ type writer struct { nowFn clock.NowFn start time.Time duration time.Duration - chunkWriter *chunkWriter + chunkWriter chunkWriter chunkReserveHeader []byte buffer *bufio.Writer sizeBuffer []byte @@ -151,7 +161,7 @@ func (w *writer) Open(start time.Time, duration time.Duration) error { return err } - w.chunkWriter.fd = fd + w.chunkWriter.reset(fd) w.buffer.Reset(w.chunkWriter) if err := w.write(w.logEncoder.Bytes()); err != nil { w.Close() @@ -164,7 +174,7 @@ func (w *writer) Open(start time.Time, duration time.Duration) error { } func (w *writer) isOpen() bool { - return w.chunkWriter.fd != nil + return w.chunkWriter.isOpen() } func (w *writer) Write( @@ -245,11 +255,11 @@ func (w *writer) Close() error { if err := w.Flush(); err != nil { return err } - if err := w.chunkWriter.fd.Close(); err != nil { + if err := w.chunkWriter.close(); err != nil { return err } - w.chunkWriter.fd = nil + w.chunkWriter.reset(nil) w.start = timeZero w.duration = 0 w.seen.ClearAll() @@ -277,22 +287,34 @@ func (w *writer) write(data []byte) error { return err } -type chunkWriter struct { - fd *os.File +type fsChunkWriter struct { + fd xos.File flushFn flushFn buff []byte fsync bool } -func newChunkWriter(flushFn flushFn, fsync bool) *chunkWriter { - return &chunkWriter{ +func newChunkWriter(flushFn flushFn, fsync bool) chunkWriter { + return &fsChunkWriter{ flushFn: flushFn, buff: make([]byte, chunkHeaderLen), fsync: fsync, } } -func (w *chunkWriter) Write(p []byte) (int, error) { +func (w *fsChunkWriter) reset(f xos.File) { + w.fd = f +} + +func (w *fsChunkWriter) close() error { + return w.fd.Close() +} + +func (w *fsChunkWriter) isOpen() bool { + return w.fd != nil +} + +func (w *fsChunkWriter) Write(p []byte) (int, error) { size := len(p) sizeStart, sizeEnd := diff --git a/src/x/os/file.go b/src/x/os/file.go new file mode 100644 index 0000000000..397781190a --- /dev/null +++ b/src/x/os/file.go @@ -0,0 +1,31 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package xos + +// File is the interface implemented by *os.File. +type File interface { + // Write bytes to the file descriptor. + Write(p []byte) (int, error) + // Sync fsyncs the file descriptor ensuring all writes have made it to disk. + Sync() error + // Close the file descriptor. + Close() error +} diff --git a/src/x/test/file.go b/src/x/test/file.go new file mode 100644 index 0000000000..34459ee8d7 --- /dev/null +++ b/src/x/test/file.go @@ -0,0 +1,79 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package xtest + +import ( + "math" + "math/rand" + + "github.com/m3db/m3/src/x/os" +) + +// corruptingFile implements the xos.File interface and can corrupt all writes issued +// to it based on a configurable probability. +type corruptingFile struct { + fd xos.File + corruptionProbability float64 + rng *rand.Rand +} + +// NewCorruptingFile creates a new corrupting file. +func NewCorruptingFile( + fd xos.File, + corruptionProbability float64, + seed int64, +) xos.File { + return &corruptingFile{ + fd: fd, + corruptionProbability: corruptionProbability, + rng: rand.New(rand.NewSource(seed)), + } +} + +// Write to the underlying file with a chance of corrupting it. +func (c *corruptingFile) Write(p []byte) (int, error) { + threshold := uint64(c.corruptionProbability * float64(math.MaxUint64)) + if c.rng.Uint64() <= threshold { + var ( + byteStart int + byteOffset int + ) + if len(p) > 1 { + byteStart = rand.Intn(len(p) - 1) + byteOffset = rand.Intn(len(p) - 1 - byteStart) + } + + if byteStart >= 0 && byteStart+byteOffset < len(p) { + copy(p[byteStart:byteStart+byteOffset], make([]byte, byteOffset)) + } + } + return c.fd.Write(p) +} + +// Sync fsyncs the underlying file. +func (c *corruptingFile) Sync() error { + return c.fd.Sync() +} + +// Close the underlying file. +func (c *corruptingFile) Close() error { + return c.fd.Close() +}