Skip to content

Commit

Permalink
Fix Ostream panic + regression test
Browse files Browse the repository at this point in the history
  • Loading branch information
Richard Artoul committed Mar 8, 2019
1 parent 2454a92 commit 794654c
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 27 deletions.
4 changes: 2 additions & 2 deletions src/dbnode/encoding/m3tsz/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ func (enc *encoder) segment(resType resultType) ts.Segment {
// of the encoder data.
var head checked.Bytes
buffer, pos := enc.os.Rawbytes()
lastByte := buffer.Bytes()[length-1]
lastByte := buffer[length-1]
if resType == byRefResultType {
// Take ref from the ostream
head = enc.os.Discard()
Expand All @@ -622,7 +622,7 @@ func (enc *encoder) segment(resType resultType) ts.Segment {
defer head.DecRef()

// Copy up to last byte
head.AppendAll(buffer.Bytes()[:length-1])
head.AppendAll(buffer[:length-1])
}

// Take a shared ref to a known good tail
Expand Down
18 changes: 9 additions & 9 deletions src/dbnode/encoding/m3tsz/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestWriteDeltaOfDeltaTimeUnitUnchanged(t *testing.T) {
encoder.Reset(testStartTime, 0)
encoder.writeDeltaOfDeltaTimeUnitUnchanged(0, input.delta, input.timeUnit)
b, p := encoder.os.Rawbytes()
require.Equal(t, input.expectedBytes, b.Bytes())
require.Equal(t, input.expectedBytes, b)
require.Equal(t, input.expectedPos, p)
}
}
Expand All @@ -90,7 +90,7 @@ func TestWriteDeltaOfDeltaTimeUnitChanged(t *testing.T) {
encoder.Reset(testStartTime, 0)
encoder.writeDeltaOfDeltaTimeUnitChanged(0, input.delta)
b, p := encoder.os.Rawbytes()
require.Equal(t, input.expectedBytes, b.Bytes())
require.Equal(t, input.expectedBytes, b)
require.Equal(t, input.expectedPos, p)
}
}
Expand All @@ -111,7 +111,7 @@ func TestWriteValue(t *testing.T) {
encoder.Reset(testStartTime, 0)
encoder.writeXOR(input.previousXOR, input.currentXOR)
b, p := encoder.os.Rawbytes()
require.Equal(t, input.expectedBytes, b.Bytes())
require.Equal(t, input.expectedBytes, b)
require.Equal(t, input.expectedPos, p)
}
}
Expand Down Expand Up @@ -144,7 +144,7 @@ func TestWriteAnnotation(t *testing.T) {
encoder.Reset(testStartTime, 0)
encoder.writeAnnotation(input.annotation)
b, p := encoder.os.Rawbytes()
require.Equal(t, input.expectedBytes, b.Bytes())
require.Equal(t, input.expectedBytes, b)
require.Equal(t, input.expectedPos, p)
}
}
Expand Down Expand Up @@ -192,7 +192,7 @@ func TestWriteTimeUnit(t *testing.T) {
encoder.tu = xtime.None
assert.Equal(t, input.expectedResult, encoder.writeTimeUnit(input.timeUnit))
b, p := encoder.os.Rawbytes()
assert.Equal(t, input.expectedBytes, b.Bytes())
assert.Equal(t, input.expectedBytes, b)
assert.Equal(t, input.expectedPos, p)
}
}
Expand Down Expand Up @@ -229,7 +229,7 @@ func TestEncodeNoAnnotation(t *testing.T) {
}

b, p := encoder.os.Rawbytes()
require.Equal(t, expectedBuffer, b.Bytes())
require.Equal(t, expectedBuffer, b)
require.Equal(t, 6, p)
}

Expand Down Expand Up @@ -262,7 +262,7 @@ func TestEncodeWithAnnotation(t *testing.T) {
}

b, p := encoder.os.Rawbytes()
require.Equal(t, expectedBuffer, b.Bytes())
require.Equal(t, expectedBuffer, b)
require.Equal(t, 4, p)

expectedBytes := []byte{
Expand Down Expand Up @@ -373,7 +373,7 @@ func TestEncoderResets(t *testing.T) {
require.Equal(t, 0, enc.os.Len())
require.Equal(t, nil, enc.Stream())
b, _ := enc.os.Rawbytes()
require.Equal(t, []byte{}, b.Bytes())
require.Equal(t, []byte{}, b)

enc.Encode(ts.Datapoint{now, 13}, xtime.Second, nil)
require.True(t, enc.os.Len() > 0)
Expand All @@ -382,7 +382,7 @@ func TestEncoderResets(t *testing.T) {
require.Equal(t, 0, enc.os.Len())
require.Equal(t, nil, enc.Stream())
b, _ = enc.os.Rawbytes()
require.Equal(t, []byte{}, b.Bytes())
require.Equal(t, []byte{}, b)
}

func TestEncoderNumEncoded(t *testing.T) {
Expand Down
18 changes: 10 additions & 8 deletions src/dbnode/encoding/ostream.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,15 @@ const (

// Ostream encapsulates a writable stream.
type ostream struct {
// We want to use a checked.Bytes when exposing or returning the buffer
// We want to use a checked.Bytes when transfering ownership of the buffer
// of the ostream. Unfortunately, the accounting overhead of going through
// the checked.Bytes for every write is massive. As a result, we store both
// the rawBuffer that backs the checked.Bytes AND the checked.Bytes themselves
// in this struct.
//
// That way, whenever we're writing to the buffer we can avoid the cost accounting
// overhead entirely, but when the data needs to be exposed outside of this datastructure
// or ownership of the data needs to be transferred, then we use the checked.Bytes, which
// is when the accounting really matters anyways.
// overhead entirely, but when the data needs to be transffered to another owner
// we use the checked.Bytes, which is when the accounting really matters anyways.
//
// The rawBuffer and checked.Bytes may get out of sync as the rawBuffer is written to,
// but thats fine because we perform a "repair" by resetting the checked.Bytes underlying
Expand Down Expand Up @@ -232,10 +231,13 @@ func (os *ostream) Reset(buffer checked.Bytes) {
}
}

// Rawbytes returns the Osteam's raw bytes
func (os *ostream) Rawbytes() (checked.Bytes, int) {
os.repairCheckedBytes()
return os.checked, os.pos
// Rawbytes returns the Osteam's raw bytes. Note that this does not transfer ownership
// of the data and bypasses the checked.Bytes accounting so callers should:
// 1. Only use the returned slice as a "read-only" snapshot of the data in a context
// where the caller has at least a read lock on the ostream itself.
// 2. Use this function with care.
func (os *ostream) Rawbytes() ([]byte, int) {
return os.rawBuffer, os.pos
}

// repairCheckedBytes makes sure that the checked.Bytes wraps the rawBuffer as
Expand Down
13 changes: 7 additions & 6 deletions src/dbnode/encoding/ostream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
)

var (
nilBytes []byte
testBytesPool = newTestCheckedBytesPool()
)

Expand Down Expand Up @@ -63,8 +64,8 @@ func testWriteBits(t *testing.T, o OStream) {
for _, input := range inputs {
os.WriteBits(input.value, input.numBits)
require.Equal(t, input.expectedBytes, os.rawBuffer)
checked, _ := os.Rawbytes()
require.Equal(t, input.expectedBytes, checked.Bytes())
b, _ := os.Rawbytes()
require.Equal(t, input.expectedBytes, b)
require.Equal(t, input.expectedPos, os.pos)
}
require.False(t, os.Empty())
Expand All @@ -85,8 +86,8 @@ func testWriteBytes(t *testing.T, o OStream) {

require.Equal(t, rawBytes, os.rawBuffer)

checked, _ := os.Rawbytes()
require.Equal(t, rawBytes, checked.Bytes())
b, _ := os.Rawbytes()
require.Equal(t, rawBytes, b)

require.Equal(t, 8, os.pos)
}
Expand All @@ -108,8 +109,8 @@ func testResetOStream(t *testing.T, o OStream) {
require.Equal(t, 0, os.Len())
require.Equal(t, 0, os.pos)

checked, _ := os.Rawbytes()
require.Equal(t, nil, checked)
b, _ := os.Rawbytes()
require.Equal(t, nilBytes, b)
}

func BenchmarkWriteBytes(b *testing.B) {
Expand Down
3 changes: 2 additions & 1 deletion src/dbnode/encoding/scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ func newMarkerEncodingScheme(
tmp := NewOStream(checked.NewBytes(nil, nil), false, nil)
tmp.WriteBits(uint64(i)>>uint(8-pos), pos)
WriteSpecialMarker(tmp, scheme, endOfStream)
tail, _ := tmp.Rawbytes()
rawBytes, _ := tmp.Rawbytes()
tail := checked.NewBytes(rawBytes, nil)
scheme.tails[i][j] = tail
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/encoding/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ type OStream interface {
WriteBytes(bytes []byte)
Reset(buffer checked.Bytes)
Discard() checked.Bytes
Rawbytes() (checked.Bytes, int)
Rawbytes() ([]byte, int)
}

// EncoderPool provides a pool for encoders
Expand Down
50 changes: 50 additions & 0 deletions src/dbnode/storage/series/series_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"errors"
"fmt"
"io"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -849,6 +850,55 @@ func TestSeriesWriteReadFromTheSameBucket(t *testing.T) {
require.Equal(t, 3, len(values))
}

// TestSeriesWriteReadParallel is a regression test that was added to capture panics that might
// arise when many parallel writes and reads are happening at the same time.
func TestSeriesWriteReadParallel(t *testing.T) {
var (
numWorkers = 100
numStepsPerWorker = 100
opts = newSeriesTestOptions()
curr = time.Now()
series = NewDatabaseSeries(ident.StringID("foo"), ident.Tags{}, opts).(*dbSeries)
)

_, err := series.Bootstrap(nil)
assert.NoError(t, err)

ctx := context.NewContext()
defer ctx.Close()

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
for i := 0; i < numStepsPerWorker; i++ {
wasWritten, err := series.Write(
ctx, curr.Add(time.Duration(i)*time.Nanosecond), float64(i), xtime.Second, nil)
if err != nil {
panic(err)
}
if !wasWritten {
panic("write failed")
}
}
wg.Done()
}()

// Outer loop so that reads are competing with other reads, not just writes.
for j := 0; j < numWorkers; j++ {
wg.Add(1)
go func() {
for i := 0; i < numStepsPerWorker; i++ {
_, err := series.ReadEncoded(ctx, curr.Add(-5*time.Minute), curr.Add(time.Minute))
if err != nil {
panic(err)
}
}
wg.Done()
}()
}
wg.Wait()
}

func TestSeriesCloseNonCacheLRUPolicy(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down

0 comments on commit 794654c

Please sign in to comment.