Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve commitlog bootstrapping reliability in the face of sudden failures #1065

Merged
merged 15 commits into from
Oct 11, 2018
110 changes: 97 additions & 13 deletions src/dbnode/persist/fs/commitlog/read_write_prop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ import (
"testing"
"time"

"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/ts"
xtest "github.com/m3db/m3/src/x/test"
"github.com/m3db/m3x/context"
"github.com/m3db/m3x/ident"
xtime "github.com/m3db/m3x/time"
Expand Down Expand Up @@ -122,12 +124,25 @@ func TestCommitLogReadWrite(t *testing.T) {
}

func TestCommitLogPropTest(t *testing.T) {
// Temporarily reduce size of buffered channels to increase change of
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/change/chance/

// catching deadlock issues.
var (
oldDecoderInBufChanSize = decoderInBufChanSize
oldDecoderOutBufChanSize = decoderOutBufChanSize
)
defer func() {
decoderInBufChanSize = oldDecoderInBufChanSize
decoderOutBufChanSize = oldDecoderOutBufChanSize
}()
decoderInBufChanSize = 0
decoderOutBufChanSize = 0

basePath, err := ioutil.TempDir("", "commit-log-tests")
require.NoError(t, err)
defer os.RemoveAll(basePath)

parameters := gopter.DefaultTestParameters()
parameters.MinSuccessfulTests = 8
parameters.MinSuccessfulTests = 20
properties := gopter.NewProperties(parameters)

comms := clCommandFunctor(basePath, t)
Expand Down Expand Up @@ -178,6 +193,19 @@ var genOpenCommand = gen.Const(&commands.ProtoCommand{
if err != nil {
return err
}

if s.shouldCorrupt {
cLog := s.cLog.(*commitLog)
cLog.newCommitLogWriterFn = func(flushFn flushFn, opts Options) commitLogWriter {
wIface := newCommitLogWriter(flushFn, opts)
w := wIface.(*writer)
w.chunkWriter = &corruptingChunkWriter{
richardartoul marked this conversation as resolved.
Show resolved Hide resolved
chunkWriter: w.chunkWriter.(*chunkWriter),
corruptionProbability: s.corruptionProbability,
}
return w
}
}
return s.cLog.Open()
},
NextStateFunc: func(state commands.State) commands.State {
Expand Down Expand Up @@ -230,8 +258,8 @@ var genCloseCommand = gen.Const(&commands.ProtoCommand{
},
})

var genWriteBehindCommand = genWrite().
Map(func(w generatedWrite) commands.Command {
var genWriteBehindCommand = gen.SliceOfN(10, genWrite()).
Map(func(writes []generatedWrite) commands.Command {
return &commands.ProtoCommand{
Name: "WriteBehind",
PreConditionFunc: func(state commands.State) bool {
Expand All @@ -241,11 +269,19 @@ var genWriteBehindCommand = genWrite().
s := q.(*clState)
ctx := context.NewContext()
defer ctx.Close()
return s.cLog.Write(ctx, w.series, w.datapoint, w.unit, w.annotation)

for _, w := range writes {
err := s.cLog.Write(ctx, w.series, w.datapoint, w.unit, w.annotation)
if err != nil {
return err
}
}

return nil
},
NextStateFunc: func(state commands.State) commands.State {
s := state.(*clState)
s.pendingWrites = append(s.pendingWrites, w)
s.pendingWrites = append(s.pendingWrites, writes...)
return s
},
PostConditionFunc: func(state commands.State, result commands.Result) *gopter.PropResult {
Expand All @@ -267,27 +303,38 @@ type clState struct {
open bool
cLog CommitLog
pendingWrites []generatedWrite
// Whether the test should corrupt the commit log.
shouldCorrupt bool
// If the test should corrupt the commit log, what is the probability of
// corruption for any given write.
corruptionProbability float64
}

// generator for commit log write
func genState(basePath string, t *testing.T) gopter.Gen {
return gen.Identifier().
return gopter.CombineGens(gen.Identifier(), gen.Bool(), gen.Float64Range(0, 1)).
MapResult(func(r *gopter.GenResult) *gopter.GenResult {
iface, ok := r.Retrieve()
if !ok {
return gopter.NewEmptyResult(reflect.PtrTo(reflect.TypeOf(clState{})))
}
p, ok := iface.(string)
p, ok := iface.([]interface{})
if !ok {
return gopter.NewEmptyResult(reflect.PtrTo(reflect.TypeOf(clState{})))
}
initPath := path.Join(basePath, p)
result := newInitState(initPath, t)

var (
initPath = path.Join(basePath, p[0].(string))
shouldCorrupt = p[1].(bool)
corruptionProbability = p[2].(float64)
result = newInitState(t, initPath, shouldCorrupt, corruptionProbability)
)
return gopter.NewGenResult(result, gopter.NoShrinker)
})
}

func newInitState(dir string, t *testing.T) *clState {
func newInitState(
t *testing.T, dir string, shouldCorrupt bool, corruptionProbability float64) *clState {
opts := NewOptions().
SetStrategy(StrategyWriteBehind).
SetFlushInterval(defaultTestFlushInterval).
Expand All @@ -298,8 +345,10 @@ func newInitState(dir string, t *testing.T) *clState {
fsOpts := opts.FilesystemOptions().SetFilePathPrefix(dir)
opts = opts.SetFilesystemOptions(fsOpts)
return &clState{
basePath: dir,
opts: opts,
basePath: dir,
opts: opts,
shouldCorrupt: shouldCorrupt,
corruptionProbability: corruptionProbability,
}
}

Expand All @@ -312,10 +361,15 @@ func (s *clState) writesArePresent(writes ...generatedWrite) error {
}
iter, err := NewIterator(iterOpts)
if err != nil {
if s.shouldCorrupt {
return nil
}
return err
}

defer iter.Close()

count := 0
for iter.Next() {
series, datapoint, unit, annotation := iter.Current()
idString := series.ID.String()
Expand All @@ -330,9 +384,15 @@ func (s *clState) writesArePresent(writes ...generatedWrite) error {
unit: unit,
annotation: annotation,
}
count++
}

if s.shouldCorrupt {
return nil
}

if err := iter.Err(); err != nil {
return err
return fmt.Errorf("failed after reading %d datapoints: %v", count, err)
}

missingErr := fmt.Errorf("writesOnDisk: %+v, writes: %+v", writesOnDisk, writes)
Expand Down Expand Up @@ -433,3 +493,27 @@ func uniqueID(ns, s string) uint64 {
nsMap[s] = idx
return idx
}

// corruptingChunkWriter implements the chunkWriter interface and can corrupt all writes issued
// to it based on a configurable probability.
type corruptingChunkWriter struct {
richardartoul marked this conversation as resolved.
Show resolved Hide resolved
chunkWriter *chunkWriter
corruptionProbability float64
}

func (c *corruptingChunkWriter) reset(f fs.FD) {
c.chunkWriter.fd = xtest.NewCorruptingFD(
f, c.corruptionProbability)
}

func (c *corruptingChunkWriter) Write(p []byte) (int, error) {
return c.chunkWriter.Write(p)
}

func (c *corruptingChunkWriter) close() error {
return c.chunkWriter.close()
}

func (c *corruptingChunkWriter) isOpen() bool {
return c.chunkWriter.isOpen()
}
15 changes: 10 additions & 5 deletions src/dbnode/persist/fs/commitlog/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@ import (
xtime "github.com/m3db/m3x/time"
)

const defaultDecodeEntryBufSize = 1024
const decoderInBufChanSize = 1000
const decoderOutBufChanSize = 1000
var (
// var instead of const so we can modify them in tests.
defaultDecodeEntryBufSize = 1024
decoderInBufChanSize = 1000
decoderOutBufChanSize = 1000
)

var (
emptyLogInfo schema.LogInfo
Expand Down Expand Up @@ -250,7 +253,7 @@ func (r *reader) readLoop() {
}

r.decoderQueues[0] <- decoderArg{
bytes: data,
bytes: nil,
err: err,
}
continue
Expand Down Expand Up @@ -383,7 +386,9 @@ func (r *reader) decoderLoop(inBuf <-chan decoderArg, outBuf chan<- readResponse
}

func (r *reader) handleDecoderLoopIterationEnd(arg decoderArg, outBuf chan<- readResponse, response readResponse, err error) {
arg.bufPool <- arg.bytes
if arg.bytes != nil {
arg.bufPool <- arg.bytes
}
if outBuf != nil {
response.resultErr = err
outBuf <- response
Expand Down
31 changes: 25 additions & 6 deletions src/dbnode/persist/fs/commitlog/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ type commitLogWriter interface {
Close() error
}

type chunkWriterIface interface {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naming nit: not a fan of the Iface - how about calling the interface chunkWriter and the usual struct fsChunkWriter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah sure, I didn't like this either

reset(f fs.FD)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use uniform casing in method names

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some have to be public to implement other interfaces, other methods should probably just be private...but I guess its a private interface so I guess it doesnt matter

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

embed the interface's rather than copying methods if you can help it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that worked actually, cheers

Write(p []byte) (int, error)
close() error
isOpen() bool
}

type flushFn func(err error)

type writer struct {
Expand All @@ -89,7 +96,7 @@ type writer struct {
nowFn clock.NowFn
start time.Time
duration time.Duration
chunkWriter *chunkWriter
chunkWriter chunkWriterIface
chunkReserveHeader []byte
buffer *bufio.Writer
sizeBuffer []byte
Expand Down Expand Up @@ -151,7 +158,7 @@ func (w *writer) Open(start time.Time, duration time.Duration) error {
return err
}

w.chunkWriter.fd = fd
w.chunkWriter.reset(fd)
w.buffer.Reset(w.chunkWriter)
if err := w.write(w.logEncoder.Bytes()); err != nil {
w.Close()
Expand All @@ -164,7 +171,7 @@ func (w *writer) Open(start time.Time, duration time.Duration) error {
}

func (w *writer) isOpen() bool {
return w.chunkWriter.fd != nil
return w.chunkWriter.isOpen()
}

func (w *writer) Write(
Expand Down Expand Up @@ -245,11 +252,11 @@ func (w *writer) Close() error {
if err := w.Flush(); err != nil {
return err
}
if err := w.chunkWriter.fd.Close(); err != nil {
if err := w.chunkWriter.close(); err != nil {
return err
}

w.chunkWriter.fd = nil
w.chunkWriter.reset(nil)
w.start = timeZero
w.duration = 0
w.seen.ClearAll()
Expand Down Expand Up @@ -278,7 +285,7 @@ func (w *writer) write(data []byte) error {
}

type chunkWriter struct {
fd *os.File
fd fs.FD
flushFn flushFn
buff []byte
fsync bool
Expand All @@ -292,6 +299,18 @@ func newChunkWriter(flushFn flushFn, fsync bool) *chunkWriter {
}
}

func (w *chunkWriter) reset(f fs.FD) {
w.fd = f
}

func (w *chunkWriter) close() error {
return w.fd.Close()
}

func (w *chunkWriter) isOpen() bool {
return w.fd != nil
}

func (w *chunkWriter) Write(p []byte) (int, error) {
size := len(p)

Expand Down
10 changes: 10 additions & 0 deletions src/dbnode/persist/fs/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,3 +460,13 @@ type BlockRetrieverOptions interface {
// IdentifierPool returns the identifierPool
IdentifierPool() ident.Pool
}

// FD is the interface implemented by *os.File.
type FD interface {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move this to m3/src/x/os and call it File instead of FD

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call

// Write bytes to the file descriptor.
Write(p []byte) (int, error)
// Sync fsyncs the file descriptor ensuring all writes have made it to disk.
Sync() error
// Close the file descriptor.
Close() error
}
Loading