Skip to content

Commit

Permalink
PR response
Browse files Browse the repository at this point in the history
  • Loading branch information
arnikola committed Dec 10, 2019
1 parent 78247db commit 33bb32c
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 143 deletions.
2 changes: 1 addition & 1 deletion src/query/block/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ func (b *ucContainerBlock) MultiSeriesIter(
batch, err := bl.MultiSeriesIter(concurrency)
if err != nil {
// NB: do not have to set the iterator error here, since not all
// contained blocks necessarily allow mutli series iteration.
// contained blocks necessarily allow multi series iteration.
return nil, err
}

Expand Down
38 changes: 13 additions & 25 deletions src/query/storage/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/ts"
xcost "github.com/m3db/m3/src/x/cost"
xerrors "github.com/m3db/m3/src/x/errors"
xsync "github.com/m3db/m3/src/x/sync"
xtime "github.com/m3db/m3/src/x/time"
)
Expand Down Expand Up @@ -313,45 +314,32 @@ func decompressConcurrently(
metadata block.ResultMetadata,
tagOptions models.TagOptions,
) (*FetchResult, error) {
seriesList := make([]*ts.Series, len(iters))
errorCh := make(chan error, 1)
done := make(chan struct{})
stopped := func() bool {
select {
case <-done:
return true
default:
return false
}
}
var (
seriesList = make([]*ts.Series, len(iters))

wg sync.WaitGroup
multiErr xerrors.MultiError
mu sync.Mutex
)

var wg sync.WaitGroup
for i, iter := range iters {
i, iter := i, iter
wg.Add(1)
readWorkerPool.Go(func() {
defer wg.Done()
if stopped() {
return
}

series, err := iteratorToTsSeries(iter, enforcer, tagOptions)
if err != nil {
// Return the first error that is encountered.
select {
case errorCh <- err:
close(done)
default:
}
return
mu.Lock()
multiErr = multiErr.Add(err)
mu.Unlock()
}

seriesList[i] = series
})
}

wg.Wait()
close(errorCh)
if err := <-errorCh; err != nil {
if err := multiErr.LastError(); err != nil {
return nil, err
}

Expand Down
94 changes: 4 additions & 90 deletions src/query/storage/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ import (
"github.com/m3db/m3/src/query/test/seriesiter"
"github.com/m3db/m3/src/query/ts"
xcost "github.com/m3db/m3/src/x/cost"
"github.com/m3db/m3/src/x/ident"
xsync "github.com/m3db/m3/src/x/sync"
xtest "github.com/m3db/m3/src/x/test"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -110,7 +110,7 @@ func verifyExpandSeries(
}

func testExpandSeries(t *testing.T, ex bool, pools xsync.PooledWorkerPool) {
ctrl := gomock.NewController(t)
ctrl := xtest.NewController(t)

for i := 0; i < 100; i++ {
verifyExpandSeries(t, ctrl, i, ex, pools)
Expand Down Expand Up @@ -138,92 +138,6 @@ func TestExpandSeriesSmallValidPools(t *testing.T) {
testExpandSeries(t, true, pool)
}

func TestFailingExpandSeriesValidPools(t *testing.T) {
var (
numValidSeries = 8
numValues = 2
poolSize = 2
numUncalled = 10
)
pool, err := xsync.NewPooledWorkerPool(poolSize,
xsync.NewPooledWorkerPoolOptions())
require.NoError(t, err)
pool.Init()
ctrl := gomock.NewController(t)

iters := seriesiter.NewMockSeriesIterSlice(ctrl,
seriesiter.NewMockValidTagGenerator(ctrl), numValidSeries, numValues)
// Add poolSize + 1 failing iterators; there can be slight timing
// inconsistencies which can sometimes cause failures in this test
// as one of the `uncalled` iterators gets unexpectedly used.
// This is not a big issue in practice, as all it means is one further
// iterator is expanded before erroring out.
for i := 0; i < poolSize+1; i++ {
invalidIter := encoding.NewMockSeriesIterator(ctrl)
invalidIter.EXPECT().ID().Return(ident.StringID("foo")).Times(1)

tags := ident.NewMockTagIterator(ctrl)
tags.EXPECT().Next().Return(false).MaxTimes(1)
tags.EXPECT().Remaining().Return(0).MaxTimes(1)
tags.EXPECT().Err().Return(errors.New("error")).MaxTimes(1)
invalidIter.EXPECT().Tags().Return(tags).MaxTimes(1)

iters = append(iters, invalidIter)
}

for i := 0; i < numUncalled; i++ {
uncalledIter := encoding.NewMockSeriesIterator(ctrl)
iters = append(iters, uncalledIter)
}

mockIters := encoding.NewMockSeriesIterators(ctrl)
mockIters.EXPECT().Iters().Return(iters).Times(1)
mockIters.EXPECT().Len().Return(len(iters)).Times(1)
mockIters.EXPECT().Close().Times(1)
enforcer := cost.NewMockChainedEnforcer(ctrl)
enforcer.EXPECT().Add(xcost.Cost(2)).Times(numValidSeries)

result, err := SeriesIteratorsToFetchResult(mockIters, pool, true,
block.NewResultMetadata(), enforcer, nil)
require.Nil(t, result)
require.EqualError(t, err, "error")
}

func TestOverLimit(t *testing.T) {
var (
numValidSeries = 8
numValues = 2
poolSize = 2
numUncalled = 10
)
pool, err := xsync.NewPooledWorkerPool(poolSize,
xsync.NewPooledWorkerPoolOptions())
require.NoError(t, err)
pool.Init()
ctrl := gomock.NewController(t)

iters := seriesiter.NewMockSeriesIterSlice(ctrl,
seriesiter.NewMockValidTagGenerator(ctrl), numValidSeries+poolSize+1, numValues)
for i := 0; i < numUncalled; i++ {
uncalledIter := encoding.NewMockSeriesIterator(ctrl)
iters = append(iters, uncalledIter)
}

mockIters := encoding.NewMockSeriesIterators(ctrl)
mockIters.EXPECT().Iters().Return(iters).Times(1)
mockIters.EXPECT().Len().Return(len(iters)).Times(1)
mockIters.EXPECT().Close().Times(1)
enforcer := cost.NewMockChainedEnforcer(ctrl)
enforcer.EXPECT().Add(xcost.Cost(2)).Times(numValidSeries)
enforcer.EXPECT().Add(xcost.Cost(2)).
Return(xcost.Report{Error: errors.New("error")}).MinTimes(1)

result, err := SeriesIteratorsToFetchResult(mockIters, pool, true,
block.NewResultMetadata(), enforcer, nil)
require.Nil(t, result)
require.EqualError(t, err, "error")
}

var (
name = []byte("foo")
value = []byte("bar")
Expand Down Expand Up @@ -325,7 +239,7 @@ var (

func TestIteratorToTsSeries(t *testing.T) {
t.Run("errors on iterator error", func(t *testing.T) {
ctrl := gomock.NewController(t)
ctrl := xtest.NewController(t)
mockIter := encoding.NewMockSeriesIterator(ctrl)

expectedErr := errors.New("expected")
Expand All @@ -343,7 +257,7 @@ func TestIteratorToTsSeries(t *testing.T) {
}

func TestFetchResultToPromResult(t *testing.T) {
ctrl := gomock.NewController(t)
ctrl := xtest.NewController(t)
defer ctrl.Finish()

now := time.Now()
Expand Down
43 changes: 17 additions & 26 deletions src/query/storage/prom_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,24 @@ import (
"github.com/m3db/m3/src/query/generated/proto/prompb"
"github.com/m3db/m3/src/query/models"
xcost "github.com/m3db/m3/src/x/cost"
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/ident"
xsync "github.com/m3db/m3/src/x/sync"
)

func cloneBytes(b []byte) []byte {
return append(make([]byte, 0, len(b)), b...)
}

func tagIteratorToLabels(
identTags ident.TagIterator,
) ([]prompb.Label, error) {
labels := make([]prompb.Label, 0, identTags.Remaining())
for identTags.Next() {
identTag := identTags.Current()
labels = append(labels, prompb.Label{
Name: identTag.Name.Bytes(),
Value: identTag.Value.Bytes(),
Name: cloneBytes(identTag.Name.Bytes()),
Value: cloneBytes(identTag.Value.Bytes()),
})
}

Expand Down Expand Up @@ -123,45 +128,31 @@ func toPromConcurrently(
tagOptions models.TagOptions,
) (PromResult, error) {
seriesList := make([]*prompb.TimeSeries, len(iters))
errorCh := make(chan error, 1)
done := make(chan struct{})
stopped := func() bool {
select {
case <-done:
return true
default:
return false
}
}

var wg sync.WaitGroup
var (
wg sync.WaitGroup
multiErr xerrors.MultiError
mu sync.Mutex
)

for i, iter := range iters {
i, iter := i, iter
wg.Add(1)
readWorkerPool.Go(func() {
defer wg.Done()
if stopped() {
return
}

series, err := iteratorToPromResult(iter, enforcer, tagOptions)
if err != nil {
// Return the first error that is encountered.
select {
case errorCh <- err:
close(done)
default:
}
return
mu.Lock()
multiErr = multiErr.Add(err)
mu.Unlock()
}

seriesList[i] = series
})
}

wg.Wait()
close(errorCh)
if err := <-errorCh; err != nil {
if err := multiErr.LastError(); err != nil {
return PromResult{}, err
}

Expand Down
56 changes: 55 additions & 1 deletion src/query/storage/prom_converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ import (
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/test/seriesiter"
"github.com/m3db/m3/src/query/ts"
"github.com/m3db/m3/src/x/checked"
xcost "github.com/m3db/m3/src/x/cost"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/pool"
xsync "github.com/m3db/m3/src/x/sync"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -82,7 +85,7 @@ func verifyExpandPromSeries(
func testExpandPromSeries(t *testing.T, ex bool, pools xsync.PooledWorkerPool) {
ctrl := gomock.NewController(t)

for i := 0; i < 100; i++ {
for i := 0; i < 10; i++ {
verifyExpandPromSeries(t, ctrl, i, ex, pools)
}
}
Expand Down Expand Up @@ -166,3 +169,54 @@ func TestIteratorsToPromResult(t *testing.T) {

assert.Equal(t, expected, result)
}

// overwrite overwrites existing tags with `!!!` literals.
type overwrite func()

func setupTags(name, value string) (ident.Tags, overwrite) {
var buckets = []pool.Bucket{{Capacity: 100, Count: 2}}
bytesPool := pool.NewCheckedBytesPool(buckets, nil,
func(sizes []pool.Bucket) pool.BytesPool {
return pool.NewBytesPool(sizes, nil)
})

bytesPool.Init()
getFromPool := func(id string) checked.Bytes {
pID := bytesPool.Get(len(id))
pID.IncRef()
pID.AppendAll([]byte(id))
pID.DecRef()
return pID
}

idPool := ident.NewPool(bytesPool, ident.PoolOptions{})
tags := idPool.Tags()
tags.Append(idPool.BinaryTag(getFromPool(name), getFromPool(value)))

overwrite := func() {
tags.Finalize()
getFromPool("!!!")
getFromPool("!!!")
}

return tags, overwrite
}

func TestTagIteratorToLabels(t *testing.T) {
name := "foo"
value := "bar"
tags, overwrite := setupTags(name, value)
tagIter := ident.NewTagsIterator(tags)
labels, err := tagIteratorToLabels(tagIter)
require.NoError(t, err)

verifyTags := func() {
require.Equal(t, 1, len(labels))
assert.Equal(t, name, string(labels[0].GetName()))
assert.Equal(t, value, string(labels[0].GetValue()))
}

verifyTags()
overwrite()
verifyTags()
}

0 comments on commit 33bb32c

Please sign in to comment.