diff --git a/src/dbnode/persist/fs/read_write_test.go b/src/dbnode/persist/fs/read_write_test.go index 5c5e1889cc..edc10276be 100644 --- a/src/dbnode/persist/fs/read_write_test.go +++ b/src/dbnode/persist/fs/read_write_test.go @@ -494,12 +494,6 @@ func TestWriterOnlyWritesNonNilBytes(t *testing.T) { filePathPrefix := filepath.Join(dir, "") defer os.RemoveAll(dir) - checkedBytes := func(b []byte) checked.Bytes { - r := checked.NewBytes(b, nil) - r.IncRef() - return r - } - w := newTestWriter(t, filePathPrefix) writerOpts := DataWriterOpenOptions{ BlockSize: testBlockSize, @@ -526,3 +520,9 @@ func TestWriterOnlyWritesNonNilBytes(t *testing.T) { {"foo", nil, []byte{1, 2, 3, 4, 5, 6}}, }) } + +func checkedBytes(b []byte) checked.Bytes { + r := checked.NewBytes(b, nil) + r.IncRef() + return r +} diff --git a/src/dbnode/persist/fs/write.go b/src/dbnode/persist/fs/write.go index e3f3f1882e..cb05a82c3b 100644 --- a/src/dbnode/persist/fs/write.go +++ b/src/dbnode/persist/fs/write.go @@ -152,15 +152,7 @@ func (w *writer) Open(opts DataWriterOpenOptions) error { blockStart = opts.Identifier.BlockStart volumeIndex = opts.Identifier.VolumeIndex ) - - w.blockSize = opts.BlockSize - w.start = blockStart - w.volumeIndex = volumeIndex - w.snapshotTime = opts.Snapshot.SnapshotTime - w.snapshotID = opts.Snapshot.SnapshotID - w.currIdx = 0 - w.currOffset = 0 - w.err = nil + w.reset(opts) var ( shardDir string @@ -229,6 +221,22 @@ func (w *writer) Open(opts DataWriterOpenOptions) error { return nil } +func (w *writer) reset(opts DataWriterOpenOptions) { + w.blockSize = opts.BlockSize + w.start = opts.Identifier.BlockStart + w.volumeIndex = opts.Identifier.VolumeIndex + w.snapshotTime = opts.Snapshot.SnapshotTime + w.snapshotID = opts.Snapshot.SnapshotID + w.currIdx = 0 + w.currOffset = 0 + w.err = nil + // This happens after writing the previous set of files index files, however, do it + // again to ensure they get cleared even if there was a premature error writing out the + // previous set of files which would have prevented them from being cleared. + w.indexEntries.releaseRefs() + w.indexEntries = w.indexEntries[:0] +} + func (w *writer) writeData(data []byte) error { if len(data) == 0 { return nil diff --git a/src/dbnode/persist/fs/write_test.go b/src/dbnode/persist/fs/write_test.go new file mode 100644 index 0000000000..2c8d718f1b --- /dev/null +++ b/src/dbnode/persist/fs/write_test.go @@ -0,0 +1,67 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package fs + +import ( + "os" + "path/filepath" + "testing" + "time" + + "github.com/m3db/m3/src/dbnode/persist" + "github.com/m3db/m3/src/x/ident" + "github.com/stretchr/testify/require" +) + +// TestWriteReuseAfterError was added as a regression test after it was +// discovered that reusing a fileset writer after a called to Close() had +// returned an error could make the fileset writer end up in a near infinite +// loop when it was reused to write out a completely indepedent set of files. +// +// This test verifies that the fix works as expected and prevents regressions +// of the issue. +func TestWriteReuseAfterError(t *testing.T) { + dir := createTempDir(t) + filePathPrefix := filepath.Join(dir, "") + defer os.RemoveAll(dir) + + seriesID := ident.StringID("series1") + w := newTestWriter(t, filePathPrefix) + writerOpts := DataWriterOpenOptions{ + Identifier: FileSetFileIdentifier{ + Namespace: testNs1ID, + Shard: 0, + BlockStart: time.Now().Truncate(time.Hour), + VolumeIndex: 0, + }, + BlockSize: time.Hour, + FileSetType: persist.FileSetFlushType, + } + data := checkedBytes([]byte{1, 2, 3}) + + require.NoError(t, w.Open(writerOpts)) + require.NoError(t, w.Write(seriesID, ident.Tags{}, data, 0)) + require.NoError(t, w.Write(seriesID, ident.Tags{}, data, 0)) + require.Error(t, w.Close()) + + require.NoError(t, w.Open(writerOpts)) + require.NoError(t, w.Close()) +}