From 120fe60ce8255fdce9f97848d82d1dafaa099009 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Mon, 29 Apr 2019 11:59:55 -0400 Subject: [PATCH] Open new commitlogs async (#1576) --- docs/code_assets/README.md | 5 + docs/code_assets/commitlog/queue.monopic | Bin 0 -> 1625 bytes src/dbnode/persist/fs/commitlog/README.md | 74 +++++ src/dbnode/persist/fs/commitlog/commit_log.go | 275 +++++++++++++----- .../fs/commitlog/commit_log_conc_test.go | 2 +- .../persist/fs/commitlog/commit_log_test.go | 87 +++--- src/dbnode/persist/fs/commitlog/files.go | 2 +- .../fs/commitlog/read_write_prop_test.go | 22 +- src/dbnode/persist/fs/files.go | 4 +- src/dbnode/persist/fs/files_test.go | 4 +- 10 files changed, 342 insertions(+), 133 deletions(-) create mode 100644 docs/code_assets/README.md create mode 100644 docs/code_assets/commitlog/queue.monopic create mode 100644 src/dbnode/persist/fs/commitlog/README.md diff --git a/docs/code_assets/README.md b/docs/code_assets/README.md new file mode 100644 index 0000000000..cf8eb44f2d --- /dev/null +++ b/docs/code_assets/README.md @@ -0,0 +1,5 @@ +# Code Assets + +## Overview + +This directory contains assets for READMEs and documentation in the source code. Some assets are kept separate from the source code in order to avoid bloating it with large / binary files. \ No newline at end of file diff --git a/docs/code_assets/commitlog/queue.monopic b/docs/code_assets/commitlog/queue.monopic new file mode 100644 index 0000000000000000000000000000000000000000..d63fc881b76f8b96ae523033b0cd6b00a4c186be GIT binary patch literal 1625 zcmV-f2B!J{O;1iwP)S1pABzY8000000u$|8&5ol+5PlVlQzEev{R{ZGd&ni1D2E(& zMiz#)O$NgR@a#@jBjsuGV0n^sH#7zeGsbp#HS%6OZKkQNs{ZQhs%n3;vgMy5EDLM? zO$|S|t{Rokk75x=)*N}Z6=mTf7b|P7Cm_qV%J{2D^ed(KS}wvYEk#;d^UwEo_2e;& zQzi5B{mYA8$z^K#V^+jvoTV*!+_#37h~m5uQlw*B8xcecrIA#}ZmH(fqOp>k#HmdO@+DtDtL6m(t5Sj*Cu@F&Q zww~2)SjdTKq@I`gPP7zeTAKV)*@A0bit;&;Uy0NpkJqtW+H!11c{x~X#nf8z;jeP? zjLGM1Cn_IT{-svq==<7dX;ubmqF(FJ#ltCil|+EDF=$|Vn;o$vD>8B0YY(Kzow*S-RFchzORYA%^PNY}e0$QRWFZTl)$ zs0-3?ljSXkVil1cIf~PjoF4QKcfJR9WrO_hFPU^(so^%C{gF$Me+TJwlG0qxaqAjrEx=j_{I(OCUyQ0P#k(-`2yo$xf$U028a(tR~u0p@qe|}v z*D8v+>8EkiPvfSa=9+$*dn5fc_mcfI7pR(Y;c80`S1RxmYbsxsGLA_HU4GK3;L?#( zfqCbi3VJVw;PncW56G|`1$;0!8YZw|R&1Xi-HGiOsabY7z3EYCeWPv8!8+$Mmu7}F z3|G{D4VxG;L556_xo$)Hx~-RH_Ku_e;nQru=owp+@~0qu3W^yT$WZT11#m-|IvvZ@ zx!G54HkMPsl)bmgl$4!w_>`TqDJi>mLjXdfrJf0AYLG+;WX;!IWm3(@u2A#QNtgBL zeP7l)U{`MV$CcY#?l`Wv!0h14?V8mEHm5!|G=WV3V{;T>dK=XR=9j#3BNw=%z|IAP zAO==6U<@W2xnRwDg1|&$3MZPwl&jPMy8Hv`-(wG~sY%CfadZxKUU!ngU6U)xLGmcG z2T>&=6|WZJuSL4sE=8^#jCAV6^#rQLS2=0P6)?p>M?p!bx&Nl3vnQf zF>;cn>|JMzj9oZe{FH6CahYUm)q4FsPS=U}Ak#q^DI%)UrF_<;P`B8aKEgr&{qyh9 zsU&x-PN{g+M3R|46Elq_W;#vGw3?WI8;K-G7fd7>8f%U^=N?MbJB`o@P$t4hEY+3B zS(f_mnwbfOV&>UzJFyervwA1~xz+o*q7V3ADwZ0n%y34HC$Yv^U;OtTX2Y? X`la%>qS9~HlYa6)`^K+sJ3IgYVALR7 literal 0 HcmV?d00001 diff --git a/src/dbnode/persist/fs/commitlog/README.md b/src/dbnode/persist/fs/commitlog/README.md new file mode 100644 index 0000000000..d2bfc41697 --- /dev/null +++ b/src/dbnode/persist/fs/commitlog/README.md @@ -0,0 +1,74 @@ +# Overview + +The commitlog package contains all the code for reading and writing commitlog files. + +# Implementation + +## Writer + +The commitlog writer consists of two main components: + +1. The `CommitLog` itself which contains all of the public APIs as well as the logic for managing concurrency, rotating files, and inspecting the status of the commitlog (via the `ActiveLogs` and `QueueLength` APIs). +2. The `CommitLogWriter` which contains all of the logic for opening/closing files and writing bytes to disk. + +### Commitlog + +At a high-level, the `CommitLog` handles writes by inserting them into a queue (buffered channel) in batches and then dequeing the batches in a single-threaded manner and writing all of the individual writes for a batch into the commitlog file one at a time. + +#### Synchronization + +The primary synchronization that happens in the commitlog is via the queue (buffered channel). Many goroutines will insert pending writes into the queue concurrently via the `Write()` and `WriteBatch()` APIs, however, only a single goroutine will pull items off the queue and write them to disk. + +``` +┌──────────────────────────┐ +│Goroutine 1: WriteBatch() ├─────┐ ┌───────────────────────────────────┐ ┌───────────────────────────────────┐ +└──────────────────────────┘ │ │ │ │ │ + │ │ │ │ │ + │ │ │ │ │ +┌──────────────────────────┐ │ ┌─────────────────────────────────────┐ │ Commitlog │ │ Writer │ +│Goroutine 2: WriteBatch() │─────┼──────▶│ Commitlog Queue ├──────────▶│ │─────────▶│ │ +└──────────────────────────┘ │ └─────────────────────────────────────┘ │ Single-threaded Writer Goroutine │ │ Write to disk │ + │ │ │ │ │ + │ │ │ │ │ +┌──────────────────────────┐ │ │ │ │ │ +│Goroutine 3: WriteBatch() ├─────┘ └───────────────────────────────────┘ └───────────────────────────────────┘ +└──────────────────────────┘ +``` +*The monopic file that was used to generate the diagram above is located at `docs/code_assets/commitlog/queue.monopic`.* + +Since there is only one goroutine pulling items off of the queue, any state that it alone manages can remain unsynchronized since no other goroutines will interact with it. + +In addition to the queue, the commitlog has two other forms of synchronization: + +1. The `closedState` lock which is an RWLock. An RLock is held for the duration of any operation during which the commitlog must remain open. +2. The `flushState` lock. The scope of this lock is very narrow as its only used to protect access to the `lastFlushAt` field. + +#### Rotating Files + +Rotating commitlog files is initiated by the `RotateLogs()` API so that callers can control when this occurs. The `CommitLog` itself will never rotate files on its own without the `RotateLogs()` API being called. + +The commitlog files are not rotated immediately when the `RotateLogs()` method is called because that would require a lot of complicated and expensive synchronization with the `CommitLogWriter` goroutine. Instead, a `rotateLogsEventType` is pushed into the queue and when the single-threaded writer goroutine pulls this event off of the channel it will rotate the commitlog files (since it has exclusive access to them) and then invoke a callback function which notifies the `RotateLogs()` method call (which has been blocked this whole time) to complete and return success to the caller. + +While the `CommitLog` only writes to a single file at once, it maintains two open writers at all times so that they can be "hot-swapped" when the commitlog files need to be rotated. This allows the single-threaded writer goroutine to continue uninterrupted by syscalls and I/O during rotation events which in turn prevents the queue from backing up. Otherwise, rotation events could block the writer for so long (while it waited for a new file to be created) that it caused the queue to back up significantly. + +When a rotation event occurs, instead of waiting for a new file to be opened, the `CommitLog` writer goroutine will swap the primary and secondary `CommitLogWriter` such that the secondary `CommitLogWriter` (which has an empty file) becomes the primary and vice versa. This allows the `CommitLog` writer goroutine to continue writing uninterrupted. + +In the meantime, a goroutine is started in the background that is responsible for resetting the now secondary (formerly primary) `CommitLogWriter` by closing it (which will flush any pending / buffered writes to disk) and re-opening it (which will create a new empty commitlog file in anticipation of the next rotation event). + +Finally, the next time the `CommitLog` attempts to rotate its commitlogs it will need to use the associated `sync.WaitGroup` to ensure that the previously spawned background goroutine has completed resetting the secondary `CommitLogWriter` before it attempts a new hot-swap. + +### Handling Errors + +The current implementation will panic if any I/O errors are ever encountered while writing bytes to disk or opening/closing files. In the future a "commitlog failure policy" similar to [Cassandra's "stop"](https://github.com/apache/cassandra/blob/6dfc1e7eeba539774784dfd650d3e1de6785c938/conf/cassandra.yaml#L232) may be introduced. + +# Testing + +The commitlog package is tested via: + +1. Standard unit tests +2. Property tests +3. Concurrency tests + +# File Format + +See `/docs/m3db/architecture/commitlogs.md`. \ No newline at end of file diff --git a/src/dbnode/persist/fs/commitlog/commit_log.go b/src/dbnode/persist/fs/commitlog/commit_log.go index 74069be016..3df38972ce 100644 --- a/src/dbnode/persist/fs/commitlog/commit_log.go +++ b/src/dbnode/persist/fs/commitlog/commit_log.go @@ -29,8 +29,10 @@ import ( "github.com/m3db/m3/src/dbnode/clock" "github.com/m3db/m3/src/dbnode/persist" + "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/x/context" + xerrors "github.com/m3db/m3/src/x/errors" xtime "github.com/m3db/m3/src/x/time" "github.com/uber-go/tally" @@ -43,6 +45,8 @@ var ( ErrCommitLogQueueFull = errors.New("commit log queue is full") errCommitLogClosed = errors.New("commit log is closed") + + zeroFile = persist.CommitLogFile{} ) type newCommitLogWriterFn func( @@ -86,9 +90,8 @@ type commitLog struct { // being accessed. closeErr chan error - writes chan commitLogWrite - pendingFlushFns []callbackFn - maxQueueSize int64 + writes chan commitLogWrite + maxQueueSize int64 opts Options nowFn clock.NowFn @@ -124,8 +127,43 @@ func (f *flushState) getLastFlushAt() time.Time { } type writerState struct { - writer commitLogWriter - activeFile *persist.CommitLogFile + // See "Rotating Files" section of README.md for an explanation of how the + // primary and secondary fields are used during commitlog rotation. + primary asyncResettableWriter + secondary asyncResettableWriter + // Convenience slice that is used to simplify code whenever an operation needs + // to be performed on both primary and secondary writers. Note that order is not + // maintained (I.E primary may be index 1 and secondary index 0) so the slice can + // only be used when the order of operations does not matter. + writers []commitLogWriter + activeFiles persist.CommitLogFiles +} + +type asyncResettableWriter struct { + // Backreference to commitlog for the purpose of calling onFlush(). + commitlog *commitLog + // The commitlog writer is single-threaded, so normally the commitLogWriter can be + // accessed without synchronization. However, since the secondaryWriter is reset by + // a background goroutine, a waitgroup is used to ensure that the previous background + // reset has completed before attempting to access the secondary writers state and/or + // begin a new hot-swap. + *sync.WaitGroup + writer commitLogWriter + // Each writer maintains its own slice of pending flushFns because each writer will get + // flushed independently. This is important for maintaining correctness in code paths + // that care about durability, particularly during commitlog rotations. + // + // For example, imagine a call to WriteWait() occurs and the pending write is buffered + // in commitlog 1, but not yet flushed. Subsequently, a call to RotateLogs() occurs causing + // commitlog 1 to be (asynchronously) reset and commitlog 2 to become the new primary. Once + // the asynchronous Close and flush of commitlog 1 completes, only pending flushFns associated + // with commitlog 1 should be called as the writer associated with commitlog 2 may not have been + // flushed at all yet. + pendingFlushFns []callbackFn +} + +func (w *asyncResettableWriter) onFlush(err error) { + w.commitlog.onFlush(w, err) } type closedState struct { @@ -165,7 +203,7 @@ type callbackResult struct { } type activeLogsCallbackResult struct { - file *persist.CommitLogFile + files persist.CommitLogFiles } type rotateLogsResult struct { @@ -221,8 +259,16 @@ func NewCommitLog(opts Options) (CommitLog, error) { log: iopts.Logger(), newCommitLogWriterFn: newCommitLogWriter, writes: make(chan commitLogWrite, opts.BacklogQueueChannelSize()), - maxQueueSize: int64(opts.BacklogQueueSize()), - closeErr: make(chan error), + writerState: writerState{ + primary: asyncResettableWriter{ + WaitGroup: &sync.WaitGroup{}, + }, + secondary: asyncResettableWriter{ + WaitGroup: &sync.WaitGroup{}, + }, + }, + maxQueueSize: int64(opts.BacklogQueueSize()), + closeErr: make(chan error), metrics: commitLogMetrics{ numWritesInQueue: scope.Gauge("writes.queued"), queueLength: scope.Gauge("writes.queue-length"), @@ -235,6 +281,9 @@ func NewCommitLog(opts Options) (CommitLog, error) { flushDone: scope.Counter("writes.flush-done"), }, } + // Setup backreferences for onFlush(). + commitLog.writerState.primary.commitlog = commitLog + commitLog.writerState.secondary.commitlog = commitLog switch opts.Strategy() { case StrategyWriteWait: @@ -251,20 +300,18 @@ func (l *commitLog) Open() error { defer l.closedState.Unlock() // Open the buffered commit log writer - if _, err := l.openWriter(); err != nil { + if _, _, err := l.openWriters(); err != nil { return err } // Sync the info header to ensure we can write to disk and make sure that we can at least // read the info about the commitlog file later. - if err := l.writerState.writer.Flush(true); err != nil { - return err + for _, writer := range l.writerState.writers { + if err := writer.Flush(true); err != nil { + return err + } } - // NB(r): In the future we can introduce a commit log failure policy - // similar to Cassandra's "stop", for example see: - // https://github.com/apache/cassandra/blob/6dfc1e7eeba539774784dfd650d3e1de6785c938/conf/cassandra.yaml#L232 - // Right now it is a large amount of coordination to implement something similar. l.commitLogFailFn = func(err error) { l.log.Fatal("fatal commit log error", zap.Error(err)) } @@ -306,9 +353,7 @@ func (l *commitLog) ActiveLogs() (persist.CommitLogFiles, error) { return } - if result.file != nil { - files = append(files, *result.file) - } + files = result.files }, } @@ -326,7 +371,7 @@ func (l *commitLog) RotateLogs() (persist.CommitLogFile, error) { defer l.closedState.RUnlock() if l.closedState.closed { - return persist.CommitLogFile{}, errCommitLogClosed + return zeroFile, errCommitLogClosed } var ( @@ -349,7 +394,7 @@ func (l *commitLog) RotateLogs() (persist.CommitLogFile, error) { wg.Wait() if err != nil { - return persist.CommitLogFile{}, err + return zeroFile, err } return file, nil @@ -409,7 +454,7 @@ func (l *commitLog) write() { for write := range l.writes { if write.eventType == flushEventType { - l.writerState.writer.Flush(false) + l.writerState.primary.writer.Flush(false) continue } @@ -418,7 +463,7 @@ func (l *commitLog) write() { eventType: write.eventType, err: nil, activeLogs: activeLogsCallbackResult{ - file: l.writerState.activeFile, + files: l.writerState.activeFiles, }, }) continue @@ -426,12 +471,13 @@ func (l *commitLog) write() { // For writes requiring acks add to pending acks if write.eventType == writeEventType && write.callbackFn != nil { - l.pendingFlushFns = append(l.pendingFlushFns, write.callbackFn) + l.writerState.primary.pendingFlushFns = append( + l.writerState.primary.pendingFlushFns, write.callbackFn) } isRotateLogsEvent := write.eventType == rotateLogsEventType if isRotateLogsEvent { - file, err := l.openWriter() + primaryFile, _, err := l.openWriters() if err != nil { l.metrics.errors.Inc(1) l.metrics.openErrors.Inc(1) @@ -442,19 +488,15 @@ func (l *commitLog) write() { } } - if isRotateLogsEvent { - write.callbackFn(callbackResult{ - eventType: write.eventType, - err: err, - rotateLogs: rotateLogsResult{ - file: file, - }, - }) - } + write.callbackFn(callbackResult{ + eventType: write.eventType, + err: err, + rotateLogs: rotateLogsResult{ + file: primaryFile, + }, + }) - if err != nil || isRotateLogsEvent { - continue - } + continue } var ( @@ -488,7 +530,7 @@ func (l *commitLog) write() { } write := writeBatch.Write - err := l.writerState.writer.Write(write.Series, + err := l.writerState.primary.writer.Write(write.Series, write.Datapoint, write.Unit, write.Annotation) if err != nil { l.handleWriteErr(err) @@ -506,13 +548,28 @@ func (l *commitLog) write() { l.metrics.success.Inc(numWritesSuccess) } - writer := l.writerState.writer - l.writerState.writer = nil + // Ensure that there is no active background goroutine in the middle of reseting + // the secondary writer / modifying its state. + l.waitForSecondaryWriterAsyncResetComplete() - l.closeErr <- writer.Close() + var multiErr xerrors.MultiError + for i, writer := range l.writerState.writers { + if writer == nil { + // Can be nil in the case where the background goroutine spawned in openWriters + // encountered an error trying to re-open it. + continue + } + + multiErr = multiErr.Add(writer.Close()) + l.writerState.writers[i] = nil + } + l.writerState.primary.writer = nil + l.writerState.secondary.writer = nil + + l.closeErr <- multiErr.FinalError() } -func (l *commitLog) onFlush(err error) { +func (l *commitLog) onFlush(writer *asyncResettableWriter, err error) { l.flushState.setLastFlushAt(l.nowFn()) if err != nil { @@ -525,50 +582,134 @@ func (l *commitLog) onFlush(err error) { } } - // onFlush only ever called by "write()" and "openWriter" or - // before "write()" begins on "Open()" and there are no other - // accessors of "pendingFlushFns" so it is safe to read and mutate - // without a lock here - if len(l.pendingFlushFns) == 0 { + // onFlush will never be called concurrently. The flushFn for the primaryWriter + // will only ever be called synchronously by the single-threaded writer goroutine + // and the flushFn for the secondaryWriter will only be called by the asynchronous + // goroutine (created by the single-threaded writer) when it calls Close() on the + // secondary (previously primary due to a hot-swap) writer during the reset. + // + // Note that both the primary and secondar's flushFn may be called during calls to + // Open() on the commitlog, but this takes place before the single-threaded writer + // is spawned which precludes it from occurring concurrently with either of the + // scenarios described above. + if len(writer.pendingFlushFns) == 0 { l.metrics.flushDone.Inc(1) return } - for i := range l.pendingFlushFns { - l.pendingFlushFns[i](callbackResult{ + for i := range writer.pendingFlushFns { + writer.pendingFlushFns[i](callbackResult{ eventType: flushEventType, err: err, }) - l.pendingFlushFns[i] = nil + writer.pendingFlushFns[i] = nil } - l.pendingFlushFns = l.pendingFlushFns[:0] + writer.pendingFlushFns = writer.pendingFlushFns[:0] l.metrics.flushDone.Inc(1) } // writerState lock must be held for the duration of this function call. -func (l *commitLog) openWriter() (persist.CommitLogFile, error) { - if l.writerState.writer != nil { - if err := l.writerState.writer.Close(); err != nil { - l.metrics.closeErrors.Inc(1) - l.log.Error("failed to close commit log", zap.Error(err)) - - // If we failed to close then create a new commit log writer - l.writerState.writer = nil +func (l *commitLog) openWriters() (persist.CommitLogFile, persist.CommitLogFile, error) { + // Ensure that the previous asynchronous reset of the secondary writer (if any) + // has completed before attempting to start a new one and/or modify the writerState + // in any way. + l.waitForSecondaryWriterAsyncResetComplete() + + if l.writerState.primary.writer == nil || l.writerState.secondary.writer == nil { + if l.writerState.primary.writer != nil { + // Make sure to close and flush any remaining data before creating a new writer if the + // primary (which contains data) is not nil. + if err := l.writerState.primary.writer.Close(); err != nil { + return zeroFile, zeroFile, err + } } - } - if l.writerState.writer == nil { - l.writerState.writer = l.newCommitLogWriterFn(l.onFlush, l.opts) - } + if l.writerState.secondary.writer != nil { + // Ignore errors because the secondary file doesn't have any data. + l.writerState.secondary.writer.Close() + } - file, err := l.writerState.writer.Open() - if err != nil { - return persist.CommitLogFile{}, err + // If either of the commitlog writers is nil then open both of them synchronously. Under + // normal circumstances this will only occur when the commitlog is first opened. Although + // it can also happen if something goes wrong during the asynchronous reset of the secondary + // writer in which case this path will try again, but synchronously this time. + l.writerState.primary.writer = l.newCommitLogWriterFn(l.writerState.primary.onFlush, l.opts) + l.writerState.secondary.writer = l.newCommitLogWriterFn(l.writerState.secondary.onFlush, l.opts) + + primaryFile, err := l.writerState.primary.writer.Open() + if err != nil { + return zeroFile, zeroFile, err + } + + secondaryFile, err := l.writerState.secondary.writer.Open() + if err != nil { + return zeroFile, zeroFile, err + } + + l.writerState.activeFiles = persist.CommitLogFiles{primaryFile, secondaryFile} + l.writerState.writers = []commitLogWriter{ + l.writerState.primary.writer, + l.writerState.secondary.writer} + + return primaryFile, secondaryFile, nil } - l.writerState.activeFile = &file + // Swap the primary and secondary writers so that the secondary becomes primary and vice versa. + // This consumes the standby secondary writer, but a new one will be prepared asynchronously by + // resetting the formerly primary writer. + l.writerState.primary, l.writerState.secondary = l.writerState.secondary, l.writerState.primary + l.startSecondaryWriterAsyncReset() - return file, nil + var ( + // Determine the persist.CommitLogFile for the not-yet-created secondary file so that the + // ActiveLogs() API returns the correct values even before the asynchronous reset completes. + primaryFile = l.writerState.activeFiles[1] + fsPrefix = l.opts.FilesystemOptions().FilePathPrefix() + nextIndex = primaryFile.Index + 1 + secondaryFile = persist.CommitLogFile{ + FilePath: fs.CommitLogFilePath(fsPrefix, int(nextIndex)), + Index: nextIndex, + } + ) + files := persist.CommitLogFiles{primaryFile, secondaryFile} + l.writerState.activeFiles = files + + return primaryFile, secondaryFile, nil +} + +func (l *commitLog) startSecondaryWriterAsyncReset() { + l.writerState.secondary.Add(1) + + go func() { + var err error + defer func() { + if err != nil { + // Set to nil so that the next call to openWriters() will attempt + // to try and create a new writer. + l.writerState.secondary.writer = nil + + l.metrics.errors.Inc(1) + l.metrics.openErrors.Inc(1) + } + + l.writerState.secondary.Done() + }() + + if err = l.writerState.secondary.writer.Close(); err != nil { + l.commitLogFailFn(err) + return + } + + _, err = l.writerState.secondary.writer.Open() + if err != nil { + l.commitLogFailFn(err) + return + } + }() +} + +func (l *commitLog) waitForSecondaryWriterAsyncResetComplete() { + l.writerState.secondary.Wait() } func (l *commitLog) Write( diff --git a/src/dbnode/persist/fs/commitlog/commit_log_conc_test.go b/src/dbnode/persist/fs/commitlog/commit_log_conc_test.go index 529058427c..213808714a 100644 --- a/src/dbnode/persist/fs/commitlog/commit_log_conc_test.go +++ b/src/dbnode/persist/fs/commitlog/commit_log_conc_test.go @@ -109,7 +109,7 @@ func TestCommitLogActiveLogsConcurrency(t *testing.T) { if err != nil { panic(err) } - require.Equal(t, 1, len(logs)) + require.Equal(t, 2, len(logs)) if logs[0].FilePath != lastSeenFile { lastSeenFile = logs[0].FilePath numFilesSeen++ diff --git a/src/dbnode/persist/fs/commitlog/commit_log_test.go b/src/dbnode/persist/fs/commitlog/commit_log_test.go index e48b454ebd..3ae894195c 100644 --- a/src/dbnode/persist/fs/commitlog/commit_log_test.go +++ b/src/dbnode/persist/fs/commitlog/commit_log_test.go @@ -209,7 +209,7 @@ func newTestCommitLog(t *testing.T, opts Options) *commitLog { fsopts := opts.FilesystemOptions() files, err := fs.SortedCommitLogFiles(fs.CommitLogsDirPath(fsopts.FilePathPrefix())) require.NoError(t, err) - require.True(t, len(files) == 1) + require.True(t, len(files) == 2) return commitLog } @@ -258,9 +258,13 @@ func writeCommitLogs( err := commitLog.Write(ctx, series, datapoint, write.u, write.a) if write.expectedErr != nil { - require.True(t, strings.Contains(fmt.Sprintf("%v", err), fmt.Sprintf("%v", write.expectedErr))) + if !strings.Contains(fmt.Sprintf("%v", err), fmt.Sprintf("%v", write.expectedErr)) { + panic(fmt.Sprintf("unexpected error: %v", err)) + } } else { - require.NoError(t, err) + if err != nil { + panic(err) + } } }() } @@ -273,26 +277,6 @@ func writeCommitLogs( return &wg } -func flushUntilDone(l *commitLog, wg *sync.WaitGroup) { - done := uint64(0) - blockWg := sync.WaitGroup{} - blockWg.Add(1) - go func() { - for atomic.LoadUint64(&done) == 0 { - l.writes <- commitLogWrite{eventType: flushEventType} - time.Sleep(time.Millisecond) - } - blockWg.Done() - }() - - go func() { - wg.Wait() - atomic.StoreUint64(&done, 1) - }() - - blockWg.Wait() -} - type seriesTestWritesAndReadPosition struct { writes []testWrite readPosition int @@ -387,7 +371,8 @@ func TestReadCommitLogMissingMetadata(t *testing.T) { // Replace bitset in writer with one that configurably returns true or false // depending on the series commitLog := newTestCommitLog(t, opts) - writer := commitLog.writerState.writer.(*writer) + primary := commitLog.writerState.primary.writer.(*writer) + secondary := commitLog.writerState.secondary.writer.(*writer) bitSet := bitset.NewBitSet(0) @@ -407,7 +392,8 @@ func TestReadCommitLogMissingMetadata(t *testing.T) { bitSet.Set(uint(i)) } } - writer.seen = bitSet + primary.seen = bitSet + secondary.seen = bitSet // Generate fake writes for each of the series writes := []testWrite{} @@ -472,7 +458,7 @@ func TestCommitLogReaderIsNotReusable(t *testing.T) { fsopts := opts.FilesystemOptions() files, err := fs.SortedCommitLogFiles(fs.CommitLogsDirPath(fsopts.FilePathPrefix())) require.NoError(t, err) - require.Equal(t, 1, len(files)) + require.Equal(t, 2, len(files)) // Assert commitlog cannot be opened more than once reader := newCommitLogReader(opts, ReadAllSeriesPredicate()) @@ -510,9 +496,7 @@ func TestCommitLogIteratorUsesPredicateFilterForNonCorruptFiles(t *testing.T) { // Rotate frequently to ensure we're generating multiple files. _, err := commitLog.RotateLogs() require.NoError(t, err) - wg := writeCommitLogs(t, scope, commitLog, []testWrite{write}) - // Flush until finished, this is required as timed flusher not active when clock is mocked. - flushUntilDone(commitLog, wg) + writeCommitLogs(t, scope, commitLog, []testWrite{write}) } // Close the commit log and consequently flush. @@ -522,7 +506,7 @@ func TestCommitLogIteratorUsesPredicateFilterForNonCorruptFiles(t *testing.T) { fsopts := opts.FilesystemOptions() files, err := fs.SortedCommitLogFiles(fs.CommitLogsDirPath(fsopts.FilePathPrefix())) require.NoError(t, err) - require.Equal(t, 4, len(files)) + require.Equal(t, 5, len(files)) // This predicate should eliminate the first commitlog file. commitLogPredicate := func(f FileFilterInfo) bool { @@ -539,10 +523,10 @@ func TestCommitLogIteratorUsesPredicateFilterForNonCorruptFiles(t *testing.T) { } iter, corruptFiles, err := NewIterator(iterOpts) require.NoError(t, err) - require.Equal(t, 0, len(corruptFiles)) + require.True(t, len(corruptFiles) <= 1) iterStruct := iter.(*iterator) - require.Equal(t, 3, len(iterStruct.files)) + require.True(t, len(iterStruct.files) >= 4) } func TestCommitLogIteratorUsesPredicateFilterForCorruptFiles(t *testing.T) { @@ -561,7 +545,7 @@ func TestCommitLogIteratorUsesPredicateFilterForCorruptFiles(t *testing.T) { fsopts := opts.FilesystemOptions() files, err := fs.SortedCommitLogFiles(fs.CommitLogsDirPath(fsopts.FilePathPrefix())) require.NoError(t, err) - require.Equal(t, 1, len(files)) + require.Equal(t, 2, len(files)) // Write out a corrupt commitlog file. nextCommitlogFilePath, _, err := NextFile(opts) @@ -573,7 +557,7 @@ func TestCommitLogIteratorUsesPredicateFilterForCorruptFiles(t *testing.T) { // Make sure the corrupt file is visibile. files, err = fs.SortedCommitLogFiles(fs.CommitLogsDirPath(fsopts.FilePathPrefix())) require.NoError(t, err) - require.Equal(t, 2, len(files)) + require.Equal(t, 3, len(files)) // Assert that the corrupt file is returned from the iterator. iterOpts := IteratorOpts{ @@ -586,7 +570,7 @@ func TestCommitLogIteratorUsesPredicateFilterForCorruptFiles(t *testing.T) { require.Equal(t, 1, len(corruptFiles)) iterStruct := iter.(*iterator) - require.Equal(t, 1, len(iterStruct.files)) + require.Equal(t, 2, len(iterStruct.files)) // Assert that the iterator ignores the corrupt file given an appropriate predicate. ignoreCorruptPredicate := func(f FileFilterInfo) bool { @@ -603,7 +587,7 @@ func TestCommitLogIteratorUsesPredicateFilterForCorruptFiles(t *testing.T) { require.Equal(t, 0, len(corruptFiles)) iterStruct = iter.(*iterator) - require.Equal(t, 1, len(iterStruct.files)) + require.Equal(t, 2, len(iterStruct.files)) } func TestCommitLogWriteBehind(t *testing.T) { @@ -747,7 +731,7 @@ func TestCommitLogFailOnWriteError(t *testing.T) { } writer.flushFn = func(bool) error { - commitLog.onFlush(nil) + commitLog.writerState.primary.onFlush(nil) return nil } @@ -789,14 +773,14 @@ func TestCommitLogFailOnOpenError(t *testing.T) { var opens int64 writer.openFn = func() (persist.CommitLogFile, error) { - if atomic.AddInt64(&opens, 1) >= 2 { + if atomic.AddInt64(&opens, 1) >= 3 { return persist.CommitLogFile{}, fmt.Errorf("an error") } return persist.CommitLogFile{}, nil } writer.flushFn = func(bool) error { - commitLog.onFlush(nil) + commitLog.writerState.primary.onFlush(nil) return nil } @@ -821,6 +805,9 @@ func TestCommitLogFailOnOpenError(t *testing.T) { commitLog.RotateLogs() wg.Wait() + // Secondary writer open is async so wait for it to complete before asserting + // that it failed. + commitLog.waitForSecondaryWriterAsyncResetComplete() // Check stats errors, ok := snapshotCounterValue(scope, "commitlog.writes.errors") @@ -846,9 +833,9 @@ func TestCommitLogFailOnFlushError(t *testing.T) { var flushes int64 writer.flushFn = func(bool) error { if atomic.AddInt64(&flushes, 1) >= 2 { - commitLog.onFlush(fmt.Errorf("an error")) + commitLog.writerState.primary.onFlush(fmt.Errorf("an error")) } else { - commitLog.onFlush(nil) + commitLog.writerState.primary.onFlush(nil) } return nil } @@ -875,11 +862,11 @@ func TestCommitLogFailOnFlushError(t *testing.T) { // Check stats errors, ok := snapshotCounterValue(scope, "commitlog.writes.errors") require.True(t, ok) - require.Equal(t, int64(1), errors.Value()) + require.Equal(t, int64(2), errors.Value()) flushErrors, ok := snapshotCounterValue(scope, "commitlog.writes.flush-errors") require.True(t, ok) - require.Equal(t, int64(1), flushErrors.Value()) + require.Equal(t, int64(2), flushErrors.Value()) } func TestCommitLogActiveLogs(t *testing.T) { @@ -903,7 +890,7 @@ func TestCommitLogActiveLogs(t *testing.T) { logs, err := commitLog.ActiveLogs() require.NoError(t, err) - require.Equal(t, 1, len(logs)) + require.Equal(t, 2, len(logs)) // Close the commit log and consequently flush require.NoError(t, commitLog.Close()) @@ -938,22 +925,24 @@ func TestCommitLogRotateLogs(t *testing.T) { clock.Add(write.t.Sub(clock.Now())) // Write entry. - wg := writeCommitLogs(t, scope, commitLog, []testWrite{write}) + writeCommitLogs(t, scope, commitLog, []testWrite{write}) file, err := commitLog.RotateLogs() require.NoError(t, err) require.Equal(t, file.Index, int64(i+1)) require.Contains(t, file.FilePath, "commitlog-0") - - // Flush until finished, this is required as timed flusher not active when clock is mocked - flushUntilDone(commitLog, wg) } + // Secondary writer open is async so wait for it to complete so that its safe to assert + // on the number of files that should be on disk otherwise test will flake depending + // on whether or not the async open completed in time. + commitLog.waitForSecondaryWriterAsyncResetComplete() + // Ensure files present for each call to RotateLogs(). fsopts := opts.FilesystemOptions() files, err := fs.SortedCommitLogFiles(fs.CommitLogsDirPath(fsopts.FilePathPrefix())) require.NoError(t, err) - require.Equal(t, len(files), len(writes)+1) // +1 to account for the initial file + require.Equal(t, len(writes)+2, len(files)) // +2 to account for the initial files. // Close and consequently flush. require.NoError(t, commitLog.Close()) diff --git a/src/dbnode/persist/fs/commitlog/files.go b/src/dbnode/persist/fs/commitlog/files.go index 816b3c2bc4..abbbe7a1cd 100644 --- a/src/dbnode/persist/fs/commitlog/files.go +++ b/src/dbnode/persist/fs/commitlog/files.go @@ -60,7 +60,7 @@ func NextFile(opts Options) (string, int, error) { // for any information, we just list all the files in a directory and then // read their encoded heads to obtain information about them), so in the future // we can just get rid of this. - filePath = fs.CommitLogFilePath(prefix, timeNone, newIndex) + filePath = fs.CommitLogFilePath(prefix, newIndex) ) exists, err := fs.FileExists(filePath) if err != nil { diff --git a/src/dbnode/persist/fs/commitlog/read_write_prop_test.go b/src/dbnode/persist/fs/commitlog/read_write_prop_test.go index 9602b2c373..909e9ab28d 100644 --- a/src/dbnode/persist/fs/commitlog/read_write_prop_test.go +++ b/src/dbnode/persist/fs/commitlog/read_write_prop_test.go @@ -34,10 +34,10 @@ import ( "time" "github.com/m3db/m3/src/dbnode/ts" - "github.com/m3db/m3/src/x/os" - xtest "github.com/m3db/m3/src/x/test" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" + xos "github.com/m3db/m3/src/x/os" + xtest "github.com/m3db/m3/src/x/test" xtime "github.com/m3db/m3/src/x/time" "github.com/leanovate/gopter" @@ -128,7 +128,7 @@ func TestCommitLogReadWrite(t *testing.T) { // TestCommitLogPropTest property tests the commitlog by performing various // operations (Open, Write, Close) in various orders, and then ensuring that // all the data can be read back. In addition, in some runs it will arbitrarily -// (based on a randomly generate probability) corrupt any bytes written to disk by +// (based on a randomly generated probability) corrupt any bytes written to disk by // the commitlog to ensure that the commitlog reader is resilient to arbitrarily // corrupted files and will not deadlock / panic. func TestCommitLogPropTest(t *testing.T) { @@ -244,7 +244,7 @@ var genOpenCommand = gen.Const(&commands.ProtoCommand{ return &gopter.PropResult{Status: gopter.PropTrue} } return &gopter.PropResult{ - Status: gopter.PropFalse, + Status: gopter.PropError, Error: result.(error), } }, @@ -319,7 +319,7 @@ var genWriteBehindCommand = gen.SliceOfN(10, genWrite()). return &gopter.PropResult{Status: gopter.PropTrue} } return &gopter.PropResult{ - Status: gopter.PropFalse, + Status: gopter.PropError, Error: result.(error), } }, @@ -350,8 +350,8 @@ var genActiveLogsCommand = gen.Const(&commands.ProtoCommand{ return err } - if len(logs) != 1 { - return fmt.Errorf("ActiveLogs did not return exactly one log file: %v", logs) + if len(logs) != 2 { + return fmt.Errorf("ActiveLogs did not return exactly two log files: %v", logs) } return nil @@ -365,7 +365,7 @@ var genActiveLogsCommand = gen.Const(&commands.ProtoCommand{ return &gopter.PropResult{Status: gopter.PropTrue} } return &gopter.PropResult{ - Status: gopter.PropFalse, + Status: gopter.PropError, Error: result.(error), } }, @@ -406,7 +406,7 @@ var genRotateLogsCommand = gen.Const(&commands.ProtoCommand{ return &gopter.PropResult{Status: gopter.PropTrue} } return &gopter.PropResult{ - Status: gopter.PropFalse, + Status: gopter.PropError, Error: result.(error), } }, @@ -473,7 +473,7 @@ func newInitState( opts: opts, shouldCorrupt: shouldCorrupt, corruptionProbability: corruptionProbability, - seed: seed, + seed: seed, } } @@ -637,7 +637,7 @@ func newCorruptingChunkWriter( return &corruptingChunkWriter{ chunkWriter: chunkWriter, corruptionProbability: corruptionProbability, - seed: seed, + seed: seed, } } diff --git a/src/dbnode/persist/fs/files.go b/src/dbnode/persist/fs/files.go index 7566946b70..ca7abf4935 100644 --- a/src/dbnode/persist/fs/files.go +++ b/src/dbnode/persist/fs/files.go @@ -1362,9 +1362,9 @@ func OpenWritable(filePath string, perm os.FileMode) (*os.File, error) { } // CommitLogFilePath returns the path for a commitlog file. -func CommitLogFilePath(prefix string, start time.Time, index int) string { +func CommitLogFilePath(prefix string, index int) string { var ( - entry = fmt.Sprintf("%d%s%d", start.UnixNano(), separator, index) + entry = fmt.Sprintf("%d%s%d", 0, separator, index) fileName = fmt.Sprintf("%s%s%s%s", commitLogFilePrefix, separator, entry, fileSuffix) filePath = path.Join(CommitLogsDirPath(prefix), fileName) ) diff --git a/src/dbnode/persist/fs/files_test.go b/src/dbnode/persist/fs/files_test.go index 14b5e1f05a..4e5c23eb20 100644 --- a/src/dbnode/persist/fs/files_test.go +++ b/src/dbnode/persist/fs/files_test.go @@ -1078,7 +1078,7 @@ func TestSnapshotFileSnapshotTimeAndIDNotSnapshot(t *testing.T) { func TestCommitLogFilePath(t *testing.T) { expected := "/var/lib/m3db/commitlogs/commitlog-0-1.db" - actual := CommitLogFilePath("/var/lib/m3db", time.Unix(0, 0), 1) + actual := CommitLogFilePath("/var/lib/m3db", 1) require.Equal(t, expected, actual) } @@ -1231,7 +1231,7 @@ func createCommitLogFiles(t *testing.T, iter int) string { commitLogsDir := path.Join(dir, commitLogsDirName) assert.NoError(t, os.Mkdir(commitLogsDir, 0755)) for i := 0; i < iter; i++ { - filePath := CommitLogFilePath(dir, time.Unix(0, 0), i) + filePath := CommitLogFilePath(dir, i) fd, err := os.Create(filePath) assert.NoError(t, err) assert.NoError(t, fd.Close())