From 7fc4ba491bd6175c023616ec538b4812badfb6b5 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Wed, 4 Dec 2019 13:08:32 -0500 Subject: [PATCH 1/4] fix writer (almost) infinite loop --- src/dbnode/persist/fs/read_write_test.go | 12 +++---- src/dbnode/persist/fs/write.go | 5 +++ src/dbnode/persist/fs/write_test.go | 40 ++++++++++++++++++++++++ 3 files changed, 51 insertions(+), 6 deletions(-) create mode 100644 src/dbnode/persist/fs/write_test.go diff --git a/src/dbnode/persist/fs/read_write_test.go b/src/dbnode/persist/fs/read_write_test.go index 5c5e1889cc..edc10276be 100644 --- a/src/dbnode/persist/fs/read_write_test.go +++ b/src/dbnode/persist/fs/read_write_test.go @@ -494,12 +494,6 @@ func TestWriterOnlyWritesNonNilBytes(t *testing.T) { filePathPrefix := filepath.Join(dir, "") defer os.RemoveAll(dir) - checkedBytes := func(b []byte) checked.Bytes { - r := checked.NewBytes(b, nil) - r.IncRef() - return r - } - w := newTestWriter(t, filePathPrefix) writerOpts := DataWriterOpenOptions{ BlockSize: testBlockSize, @@ -526,3 +520,9 @@ func TestWriterOnlyWritesNonNilBytes(t *testing.T) { {"foo", nil, []byte{1, 2, 3, 4, 5, 6}}, }) } + +func checkedBytes(b []byte) checked.Bytes { + r := checked.NewBytes(b, nil) + r.IncRef() + return r +} diff --git a/src/dbnode/persist/fs/write.go b/src/dbnode/persist/fs/write.go index e3f3f1882e..84e9d31800 100644 --- a/src/dbnode/persist/fs/write.go +++ b/src/dbnode/persist/fs/write.go @@ -161,6 +161,11 @@ func (w *writer) Open(opts DataWriterOpenOptions) error { w.currIdx = 0 w.currOffset = 0 w.err = nil + // This happens after writing the previous set of files index files, however, do it + // again to ensure they get cleared even if there was a premature error writing out the + // previous set of files which would have prevented them from being cleared. + w.indexEntries.releaseRefs() + w.indexEntries = w.indexEntries[:0] var ( shardDir string diff --git a/src/dbnode/persist/fs/write_test.go b/src/dbnode/persist/fs/write_test.go new file mode 100644 index 0000000000..f3ff68a3d6 --- /dev/null +++ b/src/dbnode/persist/fs/write_test.go @@ -0,0 +1,40 @@ +package fs + +import ( + "os" + "path/filepath" + "testing" + "time" + + "github.com/m3db/m3/src/dbnode/persist" + "github.com/m3db/m3/src/x/ident" + "github.com/stretchr/testify/require" +) + +func TestWriteReuseAfterError(t *testing.T) { + dir := createTempDir(t) + filePathPrefix := filepath.Join(dir, "") + defer os.RemoveAll(dir) + + seriesID := ident.StringID("series1") + w := newTestWriter(t, filePathPrefix) + writerOpts := DataWriterOpenOptions{ + Identifier: FileSetFileIdentifier{ + Namespace: testNs1ID, + Shard: 0, + BlockStart: time.Now().Truncate(time.Hour), + VolumeIndex: 0, + }, + BlockSize: time.Hour, + FileSetType: persist.FileSetFlushType, + } + data := checkedBytes([]byte{1, 2, 3}) + + require.NoError(t, w.Open(writerOpts)) + require.NoError(t, w.Write(seriesID, ident.Tags{}, data, 0)) + require.NoError(t, w.Write(seriesID, ident.Tags{}, data, 0)) + require.Error(t, w.Close()) + + require.NoError(t, w.Open(writerOpts)) + require.NoError(t, w.Close()) +} From a0a9bd779159604ee606250e2350225bf4b8c564 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Wed, 4 Dec 2019 13:33:31 -0500 Subject: [PATCH 2/4] update test --- src/dbnode/persist/fs/write_test.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/dbnode/persist/fs/write_test.go b/src/dbnode/persist/fs/write_test.go index f3ff68a3d6..fdeb84dc6d 100644 --- a/src/dbnode/persist/fs/write_test.go +++ b/src/dbnode/persist/fs/write_test.go @@ -11,6 +11,13 @@ import ( "github.com/stretchr/testify/require" ) +// TestWriteReuseAfterError was added as a regression test after it was +// discovered that reusing a fileset writer after a called to Close() had +// returned an error could make the fileset writer end up in a near infinite +// loop when it was reused to write out a completely indepedent set of files. +// +// This test verifies that the fix works as expected and prevents regressions +// of the issue. func TestWriteReuseAfterError(t *testing.T) { dir := createTempDir(t) filePathPrefix := filepath.Join(dir, "") From 5199fdaa2539012de3a801e0665d3b6da393f264 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Wed, 4 Dec 2019 13:58:48 -0500 Subject: [PATCH 3/4] add license --- src/dbnode/persist/fs/write_test.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/src/dbnode/persist/fs/write_test.go b/src/dbnode/persist/fs/write_test.go index fdeb84dc6d..2c8d718f1b 100644 --- a/src/dbnode/persist/fs/write_test.go +++ b/src/dbnode/persist/fs/write_test.go @@ -1,3 +1,23 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + package fs import ( From d3a99320f3e73de395a667b5e7db1b158c9360bd Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Thu, 5 Dec 2019 09:21:01 -0500 Subject: [PATCH 4/4] factor our reset helper method --- src/dbnode/persist/fs/write.go | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/src/dbnode/persist/fs/write.go b/src/dbnode/persist/fs/write.go index 84e9d31800..cb05a82c3b 100644 --- a/src/dbnode/persist/fs/write.go +++ b/src/dbnode/persist/fs/write.go @@ -152,20 +152,7 @@ func (w *writer) Open(opts DataWriterOpenOptions) error { blockStart = opts.Identifier.BlockStart volumeIndex = opts.Identifier.VolumeIndex ) - - w.blockSize = opts.BlockSize - w.start = blockStart - w.volumeIndex = volumeIndex - w.snapshotTime = opts.Snapshot.SnapshotTime - w.snapshotID = opts.Snapshot.SnapshotID - w.currIdx = 0 - w.currOffset = 0 - w.err = nil - // This happens after writing the previous set of files index files, however, do it - // again to ensure they get cleared even if there was a premature error writing out the - // previous set of files which would have prevented them from being cleared. - w.indexEntries.releaseRefs() - w.indexEntries = w.indexEntries[:0] + w.reset(opts) var ( shardDir string @@ -234,6 +221,22 @@ func (w *writer) Open(opts DataWriterOpenOptions) error { return nil } +func (w *writer) reset(opts DataWriterOpenOptions) { + w.blockSize = opts.BlockSize + w.start = opts.Identifier.BlockStart + w.volumeIndex = opts.Identifier.VolumeIndex + w.snapshotTime = opts.Snapshot.SnapshotTime + w.snapshotID = opts.Snapshot.SnapshotID + w.currIdx = 0 + w.currOffset = 0 + w.err = nil + // This happens after writing the previous set of files index files, however, do it + // again to ensure they get cleared even if there was a premature error writing out the + // previous set of files which would have prevented them from being cleared. + w.indexEntries.releaseRefs() + w.indexEntries = w.indexEntries[:0] +} + func (w *writer) writeData(data []byte) error { if len(data) == 0 { return nil