Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix panic introduced by OStream optimization #1437

Merged
merged 5 commits into from
Mar 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/dbnode/encoding/encoding_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions src/dbnode/encoding/m3tsz/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ func (enc *encoder) segment(resType resultType) ts.Segment {
// of the encoder data.
var head checked.Bytes
buffer, pos := enc.os.Rawbytes()
lastByte := buffer.Bytes()[length-1]
lastByte := buffer[length-1]
if resType == byRefResultType {
// Take ref from the ostream
head = enc.os.Discard()
Expand All @@ -622,7 +622,7 @@ func (enc *encoder) segment(resType resultType) ts.Segment {
defer head.DecRef()

// Copy up to last byte
head.AppendAll(buffer.Bytes()[:length-1])
head.AppendAll(buffer[:length-1])
}

// Take a shared ref to a known good tail
Expand Down
18 changes: 9 additions & 9 deletions src/dbnode/encoding/m3tsz/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestWriteDeltaOfDeltaTimeUnitUnchanged(t *testing.T) {
encoder.Reset(testStartTime, 0)
encoder.writeDeltaOfDeltaTimeUnitUnchanged(0, input.delta, input.timeUnit)
b, p := encoder.os.Rawbytes()
require.Equal(t, input.expectedBytes, b.Bytes())
require.Equal(t, input.expectedBytes, b)
require.Equal(t, input.expectedPos, p)
}
}
Expand All @@ -90,7 +90,7 @@ func TestWriteDeltaOfDeltaTimeUnitChanged(t *testing.T) {
encoder.Reset(testStartTime, 0)
encoder.writeDeltaOfDeltaTimeUnitChanged(0, input.delta)
b, p := encoder.os.Rawbytes()
require.Equal(t, input.expectedBytes, b.Bytes())
require.Equal(t, input.expectedBytes, b)
require.Equal(t, input.expectedPos, p)
}
}
Expand All @@ -111,7 +111,7 @@ func TestWriteValue(t *testing.T) {
encoder.Reset(testStartTime, 0)
encoder.writeXOR(input.previousXOR, input.currentXOR)
b, p := encoder.os.Rawbytes()
require.Equal(t, input.expectedBytes, b.Bytes())
require.Equal(t, input.expectedBytes, b)
require.Equal(t, input.expectedPos, p)
}
}
Expand Down Expand Up @@ -144,7 +144,7 @@ func TestWriteAnnotation(t *testing.T) {
encoder.Reset(testStartTime, 0)
encoder.writeAnnotation(input.annotation)
b, p := encoder.os.Rawbytes()
require.Equal(t, input.expectedBytes, b.Bytes())
require.Equal(t, input.expectedBytes, b)
require.Equal(t, input.expectedPos, p)
}
}
Expand Down Expand Up @@ -192,7 +192,7 @@ func TestWriteTimeUnit(t *testing.T) {
encoder.tu = xtime.None
assert.Equal(t, input.expectedResult, encoder.writeTimeUnit(input.timeUnit))
b, p := encoder.os.Rawbytes()
assert.Equal(t, input.expectedBytes, b.Bytes())
assert.Equal(t, input.expectedBytes, b)
assert.Equal(t, input.expectedPos, p)
}
}
Expand Down Expand Up @@ -229,7 +229,7 @@ func TestEncodeNoAnnotation(t *testing.T) {
}

b, p := encoder.os.Rawbytes()
require.Equal(t, expectedBuffer, b.Bytes())
require.Equal(t, expectedBuffer, b)
require.Equal(t, 6, p)
}

Expand Down Expand Up @@ -262,7 +262,7 @@ func TestEncodeWithAnnotation(t *testing.T) {
}

b, p := encoder.os.Rawbytes()
require.Equal(t, expectedBuffer, b.Bytes())
require.Equal(t, expectedBuffer, b)
require.Equal(t, 4, p)

expectedBytes := []byte{
Expand Down Expand Up @@ -373,7 +373,7 @@ func TestEncoderResets(t *testing.T) {
require.Equal(t, 0, enc.os.Len())
require.Equal(t, nil, enc.Stream())
b, _ := enc.os.Rawbytes()
require.Equal(t, []byte{}, b.Bytes())
require.Equal(t, []byte{}, b)

enc.Encode(ts.Datapoint{now, 13}, xtime.Second, nil)
require.True(t, enc.os.Len() > 0)
Expand All @@ -382,7 +382,7 @@ func TestEncoderResets(t *testing.T) {
require.Equal(t, 0, enc.os.Len())
require.Equal(t, nil, enc.Stream())
b, _ = enc.os.Rawbytes()
require.Equal(t, []byte{}, b.Bytes())
require.Equal(t, []byte{}, b)
}

func TestEncoderNumEncoded(t *testing.T) {
Expand Down
18 changes: 10 additions & 8 deletions src/dbnode/encoding/ostream.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,15 @@ const (

// Ostream encapsulates a writable stream.
type ostream struct {
// We want to use a checked.Bytes when exposing or returning the buffer
// We want to use a checked.Bytes when transferring ownership of the buffer
// of the ostream. Unfortunately, the accounting overhead of going through
// the checked.Bytes for every write is massive. As a result, we store both
// the rawBuffer that backs the checked.Bytes AND the checked.Bytes themselves
// in this struct.
//
// That way, whenever we're writing to the buffer we can avoid the cost accounting
// overhead entirely, but when the data needs to be exposed outside of this datastructure
// or ownership of the data needs to be transferred, then we use the checked.Bytes, which
// is when the accounting really matters anyways.
// overhead entirely, but when the data needs to be transffered to another owner
// we use the checked.Bytes, which is when the accounting really matters anyways.
//
// The rawBuffer and checked.Bytes may get out of sync as the rawBuffer is written to,
// but thats fine because we perform a "repair" by resetting the checked.Bytes underlying
Expand Down Expand Up @@ -232,10 +231,13 @@ func (os *ostream) Reset(buffer checked.Bytes) {
}
}

// Rawbytes returns the Osteam's raw bytes
func (os *ostream) Rawbytes() (checked.Bytes, int) {
os.repairCheckedBytes()
return os.checked, os.pos
// Rawbytes returns the Osteam's raw bytes. Note that this does not transfer ownership
// of the data and bypasses the checked.Bytes accounting so callers should:
// 1. Only use the returned slice as a "read-only" snapshot of the data in a context
// where the caller has at least a read lock on the ostream itself.
// 2. Use this function with care.
func (os *ostream) Rawbytes() ([]byte, int) {
return os.rawBuffer, os.pos
}

// repairCheckedBytes makes sure that the checked.Bytes wraps the rawBuffer as
Expand Down
13 changes: 7 additions & 6 deletions src/dbnode/encoding/ostream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
)

var (
nilBytes []byte
testBytesPool = newTestCheckedBytesPool()
)

Expand Down Expand Up @@ -63,8 +64,8 @@ func testWriteBits(t *testing.T, o OStream) {
for _, input := range inputs {
os.WriteBits(input.value, input.numBits)
require.Equal(t, input.expectedBytes, os.rawBuffer)
checked, _ := os.Rawbytes()
require.Equal(t, input.expectedBytes, checked.Bytes())
b, _ := os.Rawbytes()
require.Equal(t, input.expectedBytes, b)
require.Equal(t, input.expectedPos, os.pos)
}
require.False(t, os.Empty())
Expand All @@ -85,8 +86,8 @@ func testWriteBytes(t *testing.T, o OStream) {

require.Equal(t, rawBytes, os.rawBuffer)

checked, _ := os.Rawbytes()
require.Equal(t, rawBytes, checked.Bytes())
b, _ := os.Rawbytes()
require.Equal(t, rawBytes, b)

require.Equal(t, 8, os.pos)
}
Expand All @@ -108,8 +109,8 @@ func testResetOStream(t *testing.T, o OStream) {
require.Equal(t, 0, os.Len())
require.Equal(t, 0, os.pos)

checked, _ := os.Rawbytes()
require.Equal(t, nil, checked)
b, _ := os.Rawbytes()
require.Equal(t, nilBytes, b)
}

func BenchmarkWriteBytes(b *testing.B) {
Expand Down
3 changes: 2 additions & 1 deletion src/dbnode/encoding/scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ func newMarkerEncodingScheme(
tmp := NewOStream(checked.NewBytes(nil, nil), false, nil)
tmp.WriteBits(uint64(i)>>uint(8-pos), pos)
WriteSpecialMarker(tmp, scheme, endOfStream)
tail, _ := tmp.Rawbytes()
rawBytes, _ := tmp.Rawbytes()
tail := checked.NewBytes(rawBytes, nil)
scheme.tails[i][j] = tail
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/encoding/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ type OStream interface {
WriteBytes(bytes []byte)
Reset(buffer checked.Bytes)
Discard() checked.Bytes
Rawbytes() (checked.Bytes, int)
Rawbytes() ([]byte, int)
}

// EncoderPool provides a pool for encoders
Expand Down
84 changes: 84 additions & 0 deletions src/dbnode/storage/series/series_parallel_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// +build big
//
// Copyright (c) 2016 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package series

import (
"sync"
"testing"
"time"

"github.com/m3db/m3x/context"
"github.com/m3db/m3x/ident"
xtime "github.com/m3db/m3x/time"

"github.com/stretchr/testify/assert"
)

// TestSeriesWriteReadParallel is a regression test that was added to capture panics that might
// arise when many parallel writes and reads are happening at the same time.
func TestSeriesWriteReadParallel(t *testing.T) {
var (
numWorkers = 100
numStepsPerWorker = numWorkers * 100
opts = newSeriesTestOptions()
curr = time.Now()
series = NewDatabaseSeries(ident.StringID("foo"), ident.Tags{}, opts).(*dbSeries)
)

_, err := series.Bootstrap(nil)
assert.NoError(t, err)

ctx := context.NewContext()
defer ctx.Close()

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
for i := 0; i < numStepsPerWorker; i++ {
wasWritten, err := series.Write(
ctx, curr.Add(time.Duration(i)*time.Nanosecond), float64(i), xtime.Second, nil)
if err != nil {
panic(err)
}
if !wasWritten {
panic("write failed")
}
}
wg.Done()
}()

// Outer loop so that reads are competing with other reads, not just writes.
for j := 0; j < numWorkers; j++ {
wg.Add(1)
go func() {
for i := 0; i < numStepsPerWorker; i++ {
_, err := series.ReadEncoded(ctx, curr.Add(-5*time.Minute), curr.Add(time.Minute))
if err != nil {
panic(err)
}
}
wg.Done()
}()
}
wg.Wait()
}