From d7c8bc26a47c476111ca158687addbb181ff9e11 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Fri, 8 Mar 2019 14:24:02 -0500 Subject: [PATCH 1/5] Fix Ostream panic + regression test --- src/dbnode/encoding/m3tsz/encoder.go | 4 +- src/dbnode/encoding/m3tsz/encoder_test.go | 18 ++++---- src/dbnode/encoding/ostream.go | 18 ++++---- src/dbnode/encoding/ostream_test.go | 13 +++--- src/dbnode/encoding/scheme.go | 3 +- src/dbnode/encoding/types.go | 2 +- src/dbnode/storage/series/series_test.go | 50 +++++++++++++++++++++++ 7 files changed, 81 insertions(+), 27 deletions(-) diff --git a/src/dbnode/encoding/m3tsz/encoder.go b/src/dbnode/encoding/m3tsz/encoder.go index 059d85ed55..688e687baa 100644 --- a/src/dbnode/encoding/m3tsz/encoder.go +++ b/src/dbnode/encoding/m3tsz/encoder.go @@ -604,7 +604,7 @@ func (enc *encoder) segment(resType resultType) ts.Segment { // of the encoder data. var head checked.Bytes buffer, pos := enc.os.Rawbytes() - lastByte := buffer.Bytes()[length-1] + lastByte := buffer[length-1] if resType == byRefResultType { // Take ref from the ostream head = enc.os.Discard() @@ -622,7 +622,7 @@ func (enc *encoder) segment(resType resultType) ts.Segment { defer head.DecRef() // Copy up to last byte - head.AppendAll(buffer.Bytes()[:length-1]) + head.AppendAll(buffer[:length-1]) } // Take a shared ref to a known good tail diff --git a/src/dbnode/encoding/m3tsz/encoder_test.go b/src/dbnode/encoding/m3tsz/encoder_test.go index 9fe08b5788..b636c7f7ba 100644 --- a/src/dbnode/encoding/m3tsz/encoder_test.go +++ b/src/dbnode/encoding/m3tsz/encoder_test.go @@ -70,7 +70,7 @@ func TestWriteDeltaOfDeltaTimeUnitUnchanged(t *testing.T) { encoder.Reset(testStartTime, 0) encoder.writeDeltaOfDeltaTimeUnitUnchanged(0, input.delta, input.timeUnit) b, p := encoder.os.Rawbytes() - require.Equal(t, input.expectedBytes, b.Bytes()) + require.Equal(t, input.expectedBytes, b) require.Equal(t, input.expectedPos, p) } } @@ -90,7 +90,7 @@ func TestWriteDeltaOfDeltaTimeUnitChanged(t *testing.T) { encoder.Reset(testStartTime, 0) encoder.writeDeltaOfDeltaTimeUnitChanged(0, input.delta) b, p := encoder.os.Rawbytes() - require.Equal(t, input.expectedBytes, b.Bytes()) + require.Equal(t, input.expectedBytes, b) require.Equal(t, input.expectedPos, p) } } @@ -111,7 +111,7 @@ func TestWriteValue(t *testing.T) { encoder.Reset(testStartTime, 0) encoder.writeXOR(input.previousXOR, input.currentXOR) b, p := encoder.os.Rawbytes() - require.Equal(t, input.expectedBytes, b.Bytes()) + require.Equal(t, input.expectedBytes, b) require.Equal(t, input.expectedPos, p) } } @@ -144,7 +144,7 @@ func TestWriteAnnotation(t *testing.T) { encoder.Reset(testStartTime, 0) encoder.writeAnnotation(input.annotation) b, p := encoder.os.Rawbytes() - require.Equal(t, input.expectedBytes, b.Bytes()) + require.Equal(t, input.expectedBytes, b) require.Equal(t, input.expectedPos, p) } } @@ -192,7 +192,7 @@ func TestWriteTimeUnit(t *testing.T) { encoder.tu = xtime.None assert.Equal(t, input.expectedResult, encoder.writeTimeUnit(input.timeUnit)) b, p := encoder.os.Rawbytes() - assert.Equal(t, input.expectedBytes, b.Bytes()) + assert.Equal(t, input.expectedBytes, b) assert.Equal(t, input.expectedPos, p) } } @@ -229,7 +229,7 @@ func TestEncodeNoAnnotation(t *testing.T) { } b, p := encoder.os.Rawbytes() - require.Equal(t, expectedBuffer, b.Bytes()) + require.Equal(t, expectedBuffer, b) require.Equal(t, 6, p) } @@ -262,7 +262,7 @@ func TestEncodeWithAnnotation(t *testing.T) { } b, p := encoder.os.Rawbytes() - require.Equal(t, expectedBuffer, b.Bytes()) + require.Equal(t, expectedBuffer, b) require.Equal(t, 4, p) expectedBytes := []byte{ @@ -373,7 +373,7 @@ func TestEncoderResets(t *testing.T) { require.Equal(t, 0, enc.os.Len()) require.Equal(t, nil, enc.Stream()) b, _ := enc.os.Rawbytes() - require.Equal(t, []byte{}, b.Bytes()) + require.Equal(t, []byte{}, b) enc.Encode(ts.Datapoint{now, 13}, xtime.Second, nil) require.True(t, enc.os.Len() > 0) @@ -382,7 +382,7 @@ func TestEncoderResets(t *testing.T) { require.Equal(t, 0, enc.os.Len()) require.Equal(t, nil, enc.Stream()) b, _ = enc.os.Rawbytes() - require.Equal(t, []byte{}, b.Bytes()) + require.Equal(t, []byte{}, b) } func TestEncoderNumEncoded(t *testing.T) { diff --git a/src/dbnode/encoding/ostream.go b/src/dbnode/encoding/ostream.go index 2cb3256483..f32c088c6f 100644 --- a/src/dbnode/encoding/ostream.go +++ b/src/dbnode/encoding/ostream.go @@ -31,16 +31,15 @@ const ( // Ostream encapsulates a writable stream. type ostream struct { - // We want to use a checked.Bytes when exposing or returning the buffer + // We want to use a checked.Bytes when transfering ownership of the buffer // of the ostream. Unfortunately, the accounting overhead of going through // the checked.Bytes for every write is massive. As a result, we store both // the rawBuffer that backs the checked.Bytes AND the checked.Bytes themselves // in this struct. // // That way, whenever we're writing to the buffer we can avoid the cost accounting - // overhead entirely, but when the data needs to be exposed outside of this datastructure - // or ownership of the data needs to be transferred, then we use the checked.Bytes, which - // is when the accounting really matters anyways. + // overhead entirely, but when the data needs to be transffered to another owner + // we use the checked.Bytes, which is when the accounting really matters anyways. // // The rawBuffer and checked.Bytes may get out of sync as the rawBuffer is written to, // but thats fine because we perform a "repair" by resetting the checked.Bytes underlying @@ -232,10 +231,13 @@ func (os *ostream) Reset(buffer checked.Bytes) { } } -// Rawbytes returns the Osteam's raw bytes -func (os *ostream) Rawbytes() (checked.Bytes, int) { - os.repairCheckedBytes() - return os.checked, os.pos +// Rawbytes returns the Osteam's raw bytes. Note that this does not transfer ownership +// of the data and bypasses the checked.Bytes accounting so callers should: +// 1. Only use the returned slice as a "read-only" snapshot of the data in a context +// where the caller has at least a read lock on the ostream itself. +// 2. Use this function with care. +func (os *ostream) Rawbytes() ([]byte, int) { + return os.rawBuffer, os.pos } // repairCheckedBytes makes sure that the checked.Bytes wraps the rawBuffer as diff --git a/src/dbnode/encoding/ostream_test.go b/src/dbnode/encoding/ostream_test.go index 493790c217..dff4300747 100644 --- a/src/dbnode/encoding/ostream_test.go +++ b/src/dbnode/encoding/ostream_test.go @@ -29,6 +29,7 @@ import ( ) var ( + nilBytes []byte testBytesPool = newTestCheckedBytesPool() ) @@ -63,8 +64,8 @@ func testWriteBits(t *testing.T, o OStream) { for _, input := range inputs { os.WriteBits(input.value, input.numBits) require.Equal(t, input.expectedBytes, os.rawBuffer) - checked, _ := os.Rawbytes() - require.Equal(t, input.expectedBytes, checked.Bytes()) + b, _ := os.Rawbytes() + require.Equal(t, input.expectedBytes, b) require.Equal(t, input.expectedPos, os.pos) } require.False(t, os.Empty()) @@ -85,8 +86,8 @@ func testWriteBytes(t *testing.T, o OStream) { require.Equal(t, rawBytes, os.rawBuffer) - checked, _ := os.Rawbytes() - require.Equal(t, rawBytes, checked.Bytes()) + b, _ := os.Rawbytes() + require.Equal(t, rawBytes, b) require.Equal(t, 8, os.pos) } @@ -108,8 +109,8 @@ func testResetOStream(t *testing.T, o OStream) { require.Equal(t, 0, os.Len()) require.Equal(t, 0, os.pos) - checked, _ := os.Rawbytes() - require.Equal(t, nil, checked) + b, _ := os.Rawbytes() + require.Equal(t, nilBytes, b) } func BenchmarkWriteBytes(b *testing.B) { diff --git a/src/dbnode/encoding/scheme.go b/src/dbnode/encoding/scheme.go index fca7643a7c..a7aa17d26b 100644 --- a/src/dbnode/encoding/scheme.go +++ b/src/dbnode/encoding/scheme.go @@ -219,7 +219,8 @@ func newMarkerEncodingScheme( tmp := NewOStream(checked.NewBytes(nil, nil), false, nil) tmp.WriteBits(uint64(i)>>uint(8-pos), pos) WriteSpecialMarker(tmp, scheme, endOfStream) - tail, _ := tmp.Rawbytes() + rawBytes, _ := tmp.Rawbytes() + tail := checked.NewBytes(rawBytes, nil) scheme.tails[i][j] = tail } } diff --git a/src/dbnode/encoding/types.go b/src/dbnode/encoding/types.go index 4c634cc2c0..d27a3ad068 100644 --- a/src/dbnode/encoding/types.go +++ b/src/dbnode/encoding/types.go @@ -263,7 +263,7 @@ type OStream interface { WriteBytes(bytes []byte) Reset(buffer checked.Bytes) Discard() checked.Bytes - Rawbytes() (checked.Bytes, int) + Rawbytes() ([]byte, int) } // EncoderPool provides a pool for encoders diff --git a/src/dbnode/storage/series/series_test.go b/src/dbnode/storage/series/series_test.go index d9d82807fb..837efbc8bc 100644 --- a/src/dbnode/storage/series/series_test.go +++ b/src/dbnode/storage/series/series_test.go @@ -24,6 +24,7 @@ import ( "errors" "fmt" "io" + "sync" "testing" "time" @@ -849,6 +850,55 @@ func TestSeriesWriteReadFromTheSameBucket(t *testing.T) { require.Equal(t, 3, len(values)) } +// TestSeriesWriteReadParallel is a regression test that was added to capture panics that might +// arise when many parallel writes and reads are happening at the same time. +func TestSeriesWriteReadParallel(t *testing.T) { + var ( + numWorkers = 100 + numStepsPerWorker = 100 + opts = newSeriesTestOptions() + curr = time.Now() + series = NewDatabaseSeries(ident.StringID("foo"), ident.Tags{}, opts).(*dbSeries) + ) + + _, err := series.Bootstrap(nil) + assert.NoError(t, err) + + ctx := context.NewContext() + defer ctx.Close() + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + for i := 0; i < numStepsPerWorker; i++ { + wasWritten, err := series.Write( + ctx, curr.Add(time.Duration(i)*time.Nanosecond), float64(i), xtime.Second, nil) + if err != nil { + panic(err) + } + if !wasWritten { + panic("write failed") + } + } + wg.Done() + }() + + // Outer loop so that reads are competing with other reads, not just writes. + for j := 0; j < numWorkers; j++ { + wg.Add(1) + go func() { + for i := 0; i < numStepsPerWorker; i++ { + _, err := series.ReadEncoded(ctx, curr.Add(-5*time.Minute), curr.Add(time.Minute)) + if err != nil { + panic(err) + } + } + wg.Done() + }() + } + wg.Wait() +} + func TestSeriesCloseNonCacheLRUPolicy(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() From 4814d9c7e520bd5c2112370a2ab30076b8bad29b Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Fri, 8 Mar 2019 14:59:42 -0500 Subject: [PATCH 2/5] Fix lint issues and regen mocks --- src/dbnode/encoding/encoding_mock.go | 6 +++--- src/dbnode/encoding/ostream.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/dbnode/encoding/encoding_mock.go b/src/dbnode/encoding/encoding_mock.go index 8323d3bcae..9a83239447 100644 --- a/src/dbnode/encoding/encoding_mock.go +++ b/src/dbnode/encoding/encoding_mock.go @@ -1,7 +1,7 @@ // Code generated by MockGen. DO NOT EDIT. // Source: github.com/m3db/m3/src/dbnode/encoding/types.go -// Copyright (c) 2018 Uber Technologies, Inc. +// Copyright (c) 2019 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -1302,10 +1302,10 @@ func (mr *MockOStreamMockRecorder) Discard() *gomock.Call { } // Rawbytes mocks base method -func (m *MockOStream) Rawbytes() (checked.Bytes, int) { +func (m *MockOStream) Rawbytes() ([]byte, int) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Rawbytes") - ret0, _ := ret[0].(checked.Bytes) + ret0, _ := ret[0].([]byte) ret1, _ := ret[1].(int) return ret0, ret1 } diff --git a/src/dbnode/encoding/ostream.go b/src/dbnode/encoding/ostream.go index f32c088c6f..a518ab3d95 100644 --- a/src/dbnode/encoding/ostream.go +++ b/src/dbnode/encoding/ostream.go @@ -31,7 +31,7 @@ const ( // Ostream encapsulates a writable stream. type ostream struct { - // We want to use a checked.Bytes when transfering ownership of the buffer + // We want to use a checked.Bytes when transferring ownership of the buffer // of the ostream. Unfortunately, the accounting overhead of going through // the checked.Bytes for every write is massive. As a result, we store both // the rawBuffer that backs the checked.Bytes AND the checked.Bytes themselves From 702514c71e71e088b9e8fbed084bb7b28421ff33 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Mon, 11 Mar 2019 09:30:32 -0400 Subject: [PATCH 3/5] update config for parallel test --- src/dbnode/storage/series/series_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dbnode/storage/series/series_test.go b/src/dbnode/storage/series/series_test.go index 837efbc8bc..caa7e11aae 100644 --- a/src/dbnode/storage/series/series_test.go +++ b/src/dbnode/storage/series/series_test.go @@ -855,7 +855,7 @@ func TestSeriesWriteReadFromTheSameBucket(t *testing.T) { func TestSeriesWriteReadParallel(t *testing.T) { var ( numWorkers = 100 - numStepsPerWorker = 100 + numStepsPerWorker = numWorkers * 100 opts = newSeriesTestOptions() curr = time.Now() series = NewDatabaseSeries(ident.StringID("foo"), ident.Tags{}, opts).(*dbSeries) From f2eb6d12072399b23ad4c936f2f1f7834885cc9b Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Mon, 11 Mar 2019 09:35:08 -0400 Subject: [PATCH 4/5] mark parallel test as big --- .../storage/series/series_parallel_test.go | 83 +++++++++++++++++++ src/dbnode/storage/series/series_test.go | 50 ----------- 2 files changed, 83 insertions(+), 50 deletions(-) create mode 100644 src/dbnode/storage/series/series_parallel_test.go diff --git a/src/dbnode/storage/series/series_parallel_test.go b/src/dbnode/storage/series/series_parallel_test.go new file mode 100644 index 0000000000..6266c29fcc --- /dev/null +++ b/src/dbnode/storage/series/series_parallel_test.go @@ -0,0 +1,83 @@ +// +build big +// +// Copyright (c) 2016 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package series + +import ( + "sync" + "testing" + "time" + + "github.com/m3db/m3x/context" + "github.com/m3db/m3x/ident" + xtime "github.com/m3db/m3x/time" + "github.com/stretchr/testify/assert" +) + +// TestSeriesWriteReadParallel is a regression test that was added to capture panics that might +// arise when many parallel writes and reads are happening at the same time. +func TestSeriesWriteReadParallel(t *testing.T) { + var ( + numWorkers = 100 + numStepsPerWorker = numWorkers * 100 + opts = newSeriesTestOptions() + curr = time.Now() + series = NewDatabaseSeries(ident.StringID("foo"), ident.Tags{}, opts).(*dbSeries) + ) + + _, err := series.Bootstrap(nil) + assert.NoError(t, err) + + ctx := context.NewContext() + defer ctx.Close() + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + for i := 0; i < numStepsPerWorker; i++ { + wasWritten, err := series.Write( + ctx, curr.Add(time.Duration(i)*time.Nanosecond), float64(i), xtime.Second, nil) + if err != nil { + panic(err) + } + if !wasWritten { + panic("write failed") + } + } + wg.Done() + }() + + // Outer loop so that reads are competing with other reads, not just writes. + for j := 0; j < numWorkers; j++ { + wg.Add(1) + go func() { + for i := 0; i < numStepsPerWorker; i++ { + _, err := series.ReadEncoded(ctx, curr.Add(-5*time.Minute), curr.Add(time.Minute)) + if err != nil { + panic(err) + } + } + wg.Done() + }() + } + wg.Wait() +} diff --git a/src/dbnode/storage/series/series_test.go b/src/dbnode/storage/series/series_test.go index caa7e11aae..d9d82807fb 100644 --- a/src/dbnode/storage/series/series_test.go +++ b/src/dbnode/storage/series/series_test.go @@ -24,7 +24,6 @@ import ( "errors" "fmt" "io" - "sync" "testing" "time" @@ -850,55 +849,6 @@ func TestSeriesWriteReadFromTheSameBucket(t *testing.T) { require.Equal(t, 3, len(values)) } -// TestSeriesWriteReadParallel is a regression test that was added to capture panics that might -// arise when many parallel writes and reads are happening at the same time. -func TestSeriesWriteReadParallel(t *testing.T) { - var ( - numWorkers = 100 - numStepsPerWorker = numWorkers * 100 - opts = newSeriesTestOptions() - curr = time.Now() - series = NewDatabaseSeries(ident.StringID("foo"), ident.Tags{}, opts).(*dbSeries) - ) - - _, err := series.Bootstrap(nil) - assert.NoError(t, err) - - ctx := context.NewContext() - defer ctx.Close() - - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - for i := 0; i < numStepsPerWorker; i++ { - wasWritten, err := series.Write( - ctx, curr.Add(time.Duration(i)*time.Nanosecond), float64(i), xtime.Second, nil) - if err != nil { - panic(err) - } - if !wasWritten { - panic("write failed") - } - } - wg.Done() - }() - - // Outer loop so that reads are competing with other reads, not just writes. - for j := 0; j < numWorkers; j++ { - wg.Add(1) - go func() { - for i := 0; i < numStepsPerWorker; i++ { - _, err := series.ReadEncoded(ctx, curr.Add(-5*time.Minute), curr.Add(time.Minute)) - if err != nil { - panic(err) - } - } - wg.Done() - }() - } - wg.Wait() -} - func TestSeriesCloseNonCacheLRUPolicy(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() From 809dcc44dd83e30fcf4a8970008ad1cbf9660c29 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Mon, 11 Mar 2019 10:05:02 -0400 Subject: [PATCH 5/5] fix imports --- src/dbnode/storage/series/series_parallel_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/src/dbnode/storage/series/series_parallel_test.go b/src/dbnode/storage/series/series_parallel_test.go index 6266c29fcc..6d8515df6b 100644 --- a/src/dbnode/storage/series/series_parallel_test.go +++ b/src/dbnode/storage/series/series_parallel_test.go @@ -30,6 +30,7 @@ import ( "github.com/m3db/m3x/context" "github.com/m3db/m3x/ident" xtime "github.com/m3db/m3x/time" + "github.com/stretchr/testify/assert" )