Skip to content

Commit

Permalink
Fix tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
notbdu committed Jul 1, 2020
1 parent e287b8f commit 0affb08
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 202 deletions.
2 changes: 1 addition & 1 deletion src/dbnode/persist/fs/index_lookup_prop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func calculateExpectedChecksum(t *testing.T, filePath string) uint32 {

func writeTestSummariesData(w DataFileSetWriter, writes []generatedWrite) error {
for _, write := range writes {
metadata := persist.NewMetadataSeriesIDAndTags(write.id, write.tags,
metadata := persist.NewMetadataFromIDAndTags(write.id, write.tags,
persist.MetadataOptions{})
err := w.Write(metadata, write.data, write.checksum)
if err != nil {
Expand Down
13 changes: 10 additions & 3 deletions src/dbnode/persist/fs/retriever_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (

"github.com/m3db/m3/src/cluster/shard"
"github.com/m3db/m3/src/dbnode/digest"
"github.com/m3db/m3/src/dbnode/persist"
"github.com/m3db/m3/src/dbnode/sharding"
"github.com/m3db/m3/src/dbnode/storage/block"
"github.com/m3db/m3/src/dbnode/storage/index/convert"
Expand Down Expand Up @@ -299,7 +300,9 @@ func testBlockRetrieverHighConcurrentSeeks(t *testing.T, shouldCacheShardIndices
}

tags := testTagsFromIDAndVolume(id.String(), volume)
err := w.Write(id, tags, data, digest.Checksum(data.Bytes()))
metadata := persist.NewMetadataFromIDAndTags(id, tags,
persist.MetadataOptions{})
err := w.Write(metadata, data, digest.Checksum(data.Bytes()))
require.NoError(t, err)
}
closer()
Expand Down Expand Up @@ -558,7 +561,9 @@ func TestBlockRetrieverIDDoesNotExist(t *testing.T) {
data := checked.NewBytes([]byte("Hello world!"), nil)
data.IncRef()
defer data.DecRef()
err = w.Write(ident.StringID("exists"), ident.Tags{}, data, digest.Checksum(data.Bytes()))
metadata := persist.NewMetadataFromIDAndTags(ident.StringID("exists"), ident.Tags{},
persist.MetadataOptions{})
err = w.Write(metadata, data, digest.Checksum(data.Bytes()))
assert.NoError(t, err)
closer()

Expand Down Expand Up @@ -626,7 +631,9 @@ func TestBlockRetrieverOnlyCreatesTagItersIfTagsExists(t *testing.T) {
data.IncRef()
defer data.DecRef()

err = w.Write(ident.StringID(write.id), write.tags, data, digest.Checksum(data.Bytes()))
metadata := persist.NewMetadataFromIDAndTags(ident.StringID(write.id), write.tags,
persist.MetadataOptions{})
err = w.Write(metadata, data, digest.Checksum(data.Bytes()))
require.NoError(t, err)
}
closer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
package commitlog

import (
"fmt"
"testing"
"time"

Expand All @@ -31,8 +30,6 @@ import (
"github.com/m3db/m3/src/dbnode/storage/bootstrap"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/result"
"github.com/m3db/m3/src/dbnode/ts"
idxpersist "github.com/m3db/m3/src/m3ninx/persist"
"github.com/m3db/m3/src/x/context"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/pool"
"github.com/m3db/m3/src/x/serialize"
Expand Down Expand Up @@ -114,20 +111,20 @@ func TestBootstrapIndex(t *testing.T) {
ID: ident.StringID("baz"), EncodedTags: bazTags}
// Make sure we can handle series that don't have tags.
untagged := ts.Series{UniqueIndex: 3, Namespace: testNamespaceID,
Shard: shardn(5), ID: ident.StringID("untagged"), Tags: ident.Tags{}}
Shard: shardn(5), ID: ident.StringID("untagged")}
// Make sure we skip series that are not within the bootstrap range.
outOfRange := ts.Series{UniqueIndex: 4, Namespace: testNamespaceID,
Shard: shardn(3), ID: ident.StringID("outOfRange"), Tags: ident.Tags{}}
Shard: shardn(3), ID: ident.StringID("outOfRange")}
// Make sure we skip and dont panic on writes for shards that are higher than the maximum we're trying to bootstrap.
shardTooHigh := ts.Series{UniqueIndex: 5, Namespace: testNamespaceID,
Shard: shardn(100), ID: ident.StringID("shardTooHigh"), Tags: ident.Tags{}}
Shard: shardn(100), ID: ident.StringID("shardTooHigh")}
// Make sure we skip series for shards that have no requested bootstrap ranges. The shard for this write needs
// to be less than the highest shard we actually plan to bootstrap.
noShardBootstrapRange := ts.Series{UniqueIndex: 6, Namespace: testNamespaceID,
Shard: shardn(4), ID: ident.StringID("noShardBootstrapRange"), Tags: ident.Tags{}}
Shard: shardn(4), ID: ident.StringID("noShardBootstrapRange")}
// Make sure it handles multiple namespaces
someOtherNamespace := ts.Series{UniqueIndex: 7, Namespace: testNamespaceID2,
Shard: shardn(0), ID: ident.StringID("series_OtherNamespace"), Tags: ident.Tags{}}
Shard: shardn(0), ID: ident.StringID("series_OtherNamespace")}

valuesNs := testValues{
{foo, start, 1.0, xtime.Second, nil},
Expand Down Expand Up @@ -233,192 +230,3 @@ func TestBootstrapIndexEmptyShardTimeRanges(t *testing.T) {
tester.EnsureNoLoadedBlocks()
tester.EnsureNoWrites()
}

func verifyIndexResultsAreCorrect(
values testValues,
seriesNotToExpect map[string]struct{},
indexResults result.IndexResults,
indexBlockSize time.Duration,
) error {
expectedIndexBlocks := map[xtime.UnixNano]map[string]map[string]string{}
for _, value := range values {
if _, shouldNotExpect := seriesNotToExpect[value.s.ID.String()]; shouldNotExpect {
continue
}

indexBlockStart := value.t.Truncate(indexBlockSize)
expectedSeries, ok := expectedIndexBlocks[xtime.ToUnixNano(indexBlockStart)]
if !ok {
expectedSeries = map[string]map[string]string{}
expectedIndexBlocks[xtime.ToUnixNano(indexBlockStart)] = expectedSeries
}

seriesID := string(value.s.ID.Bytes())

existingTags, ok := expectedSeries[seriesID]
if !ok {
existingTags = map[string]string{}
expectedSeries[seriesID] = existingTags
}
for _, tag := range value.s.Tags.Values() {
existingTags[tag.Name.String()] = tag.Value.String()
}
}

for indexBlockStart, expectedSeries := range expectedIndexBlocks {
indexBlockByVolumeType, ok := indexResults[indexBlockStart]
if !ok {
return fmt.Errorf("missing index block: %v", indexBlockStart.ToTime().String())
}
indexBlock, ok := indexBlockByVolumeType.GetBlock(idxpersist.DefaultIndexVolumeType)
if !ok {
return fmt.Errorf("missing index block: %v", indexBlockStart.ToTime().String())
}

if indexBlock.Fulfilled().IsEmpty() {
return fmt.Errorf("index-block %v fulfilled is empty", indexBlockStart)
}

for _, seg := range indexBlock.Segments() {
reader, err := seg.Reader()
if err != nil {
return err
}

docs, err := reader.AllDocs()
if err != nil {
return err
}

seenSeries := map[string]struct{}{}
for docs.Next() {
curr := docs.Current()

_, ok := seenSeries[string(curr.ID)]
if ok {
return fmt.Errorf(
"saw duplicate series: %v for block %v",
string(curr.ID), indexBlockStart.ToTime().String())
}
seenSeries[string(curr.ID)] = struct{}{}

expectedTags := expectedSeries[string(curr.ID)]
matchingTags := map[string]struct{}{}
for _, tag := range curr.Fields {
if _, ok := matchingTags[string(tag.Name)]; ok {
return fmt.Errorf("saw duplicate tag: %v for id: %v", tag.Name, string(curr.ID))
}
matchingTags[string(tag.Name)] = struct{}{}

tagValue, ok := expectedTags[string(tag.Name)]
if !ok {
return fmt.Errorf("saw unexpected tag: %v for id: %v", tag.Name, string(curr.ID))
}

if tagValue != string(tag.Value) {
return fmt.Errorf(
"tag values for series: %v do not match. Expected: %v but got: %v",
curr.ID, tagValue, string(tag.Value),
)
}
}

if len(expectedTags) != len(matchingTags) {
return fmt.Errorf(
"number of tags for series: %v do not match. Expected: %v, but got: %v",
string(curr.ID), len(expectedTags), len(matchingTags),
)
}
}

if docs.Err() != nil {
return docs.Err()
}

if err := docs.Close(); err != nil {
return err
}

if len(expectedSeries) != len(seenSeries) {
return fmt.Errorf(
"expected %v series, but got %v series", len(expectedSeries), len(seenSeries))
}
}
}

return nil
}

func TestBootstrapIndexFailsForDecodedTags(t *testing.T) {
var (
opts = testDefaultOpts
src = newCommitLogSource(opts, fs.Inspection{}).(*commitLogSource)
dataBlockSize = 2 * time.Hour
indexBlockSize = 4 * time.Hour
namespaceOptions = namespaceOptions.
SetRetentionOptions(
namespaceOptions.
RetentionOptions().
SetBlockSize(dataBlockSize),
).
SetIndexOptions(
namespaceOptions.
IndexOptions().
SetBlockSize(indexBlockSize).
SetEnabled(true),
)
)
md1, err := namespace.NewMetadata(testNamespaceID, namespaceOptions)
require.NoError(t, err)

now := time.Now()
start := now.Truncate(indexBlockSize)

fooTags := ident.NewTags(ident.StringTag("city", "ny"))

shardn := func(n int) uint32 { return uint32(n) }
foo := ts.Series{UniqueIndex: 0, Namespace: testNamespaceID, Shard: shardn(0),
ID: ident.StringID("foo"), Tags: fooTags}

values := testValues{
{foo, start, 1.0, xtime.Second, nil},
}

src.newIteratorFn = func(
_ commitlog.IteratorOpts,
) (commitlog.Iterator, []commitlog.ErrorWithPath, error) {
return newTestCommitLogIterator(values, nil), nil, nil
}

ranges := xtime.NewRanges(
xtime.Range{Start: start, End: start.Add(dataBlockSize)},
xtime.Range{Start: start.Add(dataBlockSize), End: start.Add(2 * dataBlockSize)},
xtime.Range{Start: start.Add(2 * dataBlockSize), End: start.Add(3 * dataBlockSize)})

// Don't include ranges for shard 4 as thats how we're testing the noShardBootstrapRange series.
targetRanges := result.NewShardTimeRanges().Set(
shardn(0),
ranges,
).Set(
shardn(1),
ranges,
).Set(
shardn(2),
ranges,
).Set(
shardn(5),
ranges,
)

tester := bootstrap.BuildNamespacesTester(t, testDefaultRunOpts, targetRanges, md1)
defer tester.Finish()

ctx := context.NewContext()
defer ctx.Close()

_, err = src.Read(ctx, tester.Namespaces)
require.Error(t, err)

tester.EnsureNoLoadedBlocks()
tester.EnsureNoWrites()
}
2 changes: 1 addition & 1 deletion src/dbnode/storage/index_query_concurrent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func testNamespaceIndexHighConcurrentQueries(
for _, entry := range results.Results.Map().Iter() {
id := entry.Key().String()

doc, err := convert.FromMetricIter(entry.Key(), entry.Value())
doc, err := convert.FromSeriesIDAndTagIter(entry.Key(), entry.Value())
require.NoError(t, err)
if err != nil {
continue // this will fail the test anyway, but don't want to panic
Expand Down

0 comments on commit 0affb08

Please sign in to comment.