Skip to content

Commit

Permalink
[dbnode] Add shard to ID batch (#2764)
Browse files Browse the repository at this point in the history
  • Loading branch information
arnikola authored Nov 25, 2020
1 parent c1b9d47 commit 95bb87f
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 126 deletions.
24 changes: 2 additions & 22 deletions src/dbnode/storage/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -1029,8 +1029,8 @@ func (d *db) WideQuery(
streamedWideEntries := make([]block.StreamedWideEntry, 0, batchSize)
indexChecksumProcessor := func(batch *ident.IDBatch) error {
streamedWideEntries = streamedWideEntries[:0]
for _, id := range batch.IDs {
streamedWideEntry, err := d.fetchWideEntries(ctx, n, id, start)
for _, shardID := range batch.ShardIDs {
streamedWideEntry, err := n.FetchWideEntry(ctx, shardID.ID, start)
if err != nil {
return err
}
Expand Down Expand Up @@ -1058,26 +1058,6 @@ func (d *db) WideQuery(
return collectedChecksums, nil
}

func (d *db) fetchWideEntries(
ctx context.Context,
ns databaseNamespace,
id ident.ID,
start time.Time,
) (block.StreamedWideEntry, error) {
ctx, sp, sampled := ctx.StartSampledTraceSpan(tracepoint.DBWideEntry)
if sampled {
sp.LogFields(
opentracinglog.String("namespace", ns.ID().String()),
opentracinglog.String("id", id.String()),
xopentracing.Time("start", start),
)
}

defer sp.Finish()

return ns.FetchWideEntry(ctx, id, start)
}

func (d *db) FetchBlocks(
ctx context.Context,
namespace ident.ID,
Expand Down
59 changes: 5 additions & 54 deletions src/dbnode/storage/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/dbnode/ts/writes"
xmetrics "github.com/m3db/m3/src/dbnode/x/metrics"
"github.com/m3db/m3/src/dbnode/x/xio"
"github.com/m3db/m3/src/m3ninx/idx"
xclock "github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/context"
Expand Down Expand Up @@ -310,57 +309,6 @@ func TestDatabaseWideQueryNamespaceNonExistent(t *testing.T) {
require.True(t, dberrors.IsUnknownNamespaceError(err))
}

func TestDatabaseWideEntry(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()

ctx := context.NewContext()
defer ctx.Close()

d, mapCh, _ := defaultTestDatabase(t, ctrl, Bootstrapped)
defer func() {
close(mapCh)
}()

nsID := ident.StringID("testns1")
seriesID := ident.StringID("bar")
end := time.Now()
start := end.Add(-time.Hour)

indexChecksumWithID := block.NewMockStreamedWideEntry(ctrl)
indexChecksumWithID.EXPECT().RetrieveWideEntry().
Return(
xio.WideEntry{
ID: ident.StringID("foo"),
MetadataChecksum: 5,
}, nil)
mockNamespace := NewMockdatabaseNamespace(ctrl)
mockNamespace.EXPECT().FetchWideEntry(ctx, seriesID, start).
Return(indexChecksumWithID, nil)

indexChecksumWithoutID := block.NewMockStreamedWideEntry(ctrl)
indexChecksumWithoutID.EXPECT().RetrieveWideEntry().
Return(xio.WideEntry{MetadataChecksum: 7}, nil)
mockNamespace.EXPECT().FetchWideEntry(ctx, seriesID, start).
Return(indexChecksumWithoutID, nil)
d.namespaces.Set(nsID, mockNamespace)

res, err := d.fetchWideEntries(ctx, mockNamespace, seriesID, start)
require.NoError(t, err)
checksum, err := res.RetrieveWideEntry()
require.NoError(t, err)
assert.Equal(t, "foo", checksum.ID.String())
assert.Equal(t, 5, int(checksum.MetadataChecksum))

res, err = d.fetchWideEntries(ctx, mockNamespace, seriesID, start)
require.NoError(t, err)
checksum, err = res.RetrieveWideEntry()
require.NoError(t, err)
require.NoError(t, err)
assert.Nil(t, checksum.ID)
assert.Equal(t, 7, int(checksum.MetadataChecksum))
}

func TestDatabaseFetchBlocksNamespaceNonExistent(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()
Expand Down Expand Up @@ -965,7 +913,6 @@ func TestWideQuery(t *testing.T) {
}

exSpans := []string{
tracepoint.DBWideEntry,
tracepoint.DBWideQuery,
tracepoint.DBWideQuery,
"root",
Expand Down Expand Up @@ -1027,7 +974,11 @@ func testWideFunction(t *testing.T, testFn wideQueryTestFn, exSpans []string) {
assert.Equal(t, opts.BatchSize, wideOpts.BatchSize)
assert.Equal(t, opts.ShardsQueried, shards)
go func() {
batch := &ident.IDBatch{IDs: []ident.ID{ident.StringID("foo")}}
batch := &ident.IDBatch{
ShardIDs: []ident.ShardID{
{ID: ident.StringID("foo")},
},
}
batch.ReadyForProcessing()
collector <- batch
close(collector)
Expand Down
53 changes: 28 additions & 25 deletions src/dbnode/storage/index/wide_query_results.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type wideResults struct {
idPool ident.Pool

closed bool
idsOverflow []ident.ID
idsOverflow []ident.ShardID
batch *ident.IDBatch
batchCh chan<- *ident.IDBatch
batchSize int
Expand Down Expand Up @@ -76,17 +76,13 @@ func NewWideQueryResults(
nsID: namespaceID,
idPool: idPool,
batchSize: batchSize,
idsOverflow: make([]ident.ID, 0, batchSize),
idsOverflow: make([]ident.ShardID, 0, batchSize),
batch: &ident.IDBatch{
IDs: make([]ident.ID, 0, batchSize),
ShardIDs: make([]ident.ShardID, 0, batchSize),
},
batchCh: collector,
shards: opts.ShardsQueried,
}

if len(opts.ShardsQueried) > 0 {
// Only apply filter if there are shards to filter against.
results.shardFilter = shardFilter
batchCh: collector,
shardFilter: shardFilter,
shards: opts.ShardsQueried,
}

return results
Expand Down Expand Up @@ -118,7 +114,7 @@ func (r *wideResults) AddDocuments(batch []doc.Document) (int, int, error) {
return size, totalDocsCount, err
}

release := len(r.batch.IDs) == r.batchSize
release := len(r.batch.ShardIDs) == r.batchSize
if release {
r.releaseAndWaitWithLock()
r.releaseOverflowWithLock()
Expand All @@ -145,10 +141,15 @@ func (r *wideResults) addDocumentWithLock(d doc.Document) error {

var tsID ident.ID = ident.BytesID(d.ID)

// Need to apply filter if set first.
if r.shardFilter != nil {
documentShard, documentShardOwned := r.shardFilter(tsID)
if !documentShardOwned {
// node is no longer responsible for this document's shard.
return nil
}

if len(r.shards) > 0 {
// Need to apply filter if shard set provided.
filteringShard := r.shards[r.shardIdx]
documentShard, documentShardOwned := r.shardFilter(tsID)
// NB: Check to see if shard is exceeded first (to short circuit earlier if
// the current shard is not owned by this node, but shard exceeds filtered).
if filteringShard > documentShard {
Expand All @@ -173,19 +174,21 @@ func (r *wideResults) addDocumentWithLock(d doc.Document) error {
return nil
}
}

if !documentShardOwned {
return nil
}
}

r.size++
r.totalDocsCount++
if len(r.batch.IDs) < r.batchSize {
r.batch.IDs = append(r.batch.IDs, tsID)

shardID := ident.ShardID{
Shard: documentShard,
ID: tsID,
}

if len(r.batch.ShardIDs) < r.batchSize {
r.batch.ShardIDs = append(r.batch.ShardIDs, shardID)
} else {
// NB: Add any IDs overflowing the batch size to the overflow slice.
r.idsOverflow = append(r.idsOverflow, tsID)
r.idsOverflow = append(r.idsOverflow, shardID)
}

return nil
Expand Down Expand Up @@ -225,19 +228,19 @@ func (r *wideResults) Finalize() {
}

func (r *wideResults) releaseAndWaitWithLock() {
if r.closed || len(r.batch.IDs) == 0 {
if r.closed || len(r.batch.ShardIDs) == 0 {
return
}

r.batch.ReadyForProcessing()
r.batchCh <- r.batch
r.batch.WaitUntilProcessed()
r.batch.IDs = r.batch.IDs[:0]
r.batch.ShardIDs = r.batch.ShardIDs[:0]
r.size = len(r.idsOverflow)
}

func (r *wideResults) releaseOverflowWithLock() {
if len(r.batch.IDs) != 0 {
if len(r.batch.ShardIDs) != 0 {
// If still some IDs in the batch, noop. Theoretically this should not
// happen, since releaseAndWaitWithLock should be called before
// releaseOverflowWithLock, which should drain the channel.
Expand All @@ -264,7 +267,7 @@ func (r *wideResults) releaseOverflowWithLock() {
}

// NB: move overflow IDs to the batch itself.
r.batch.IDs = append(r.batch.IDs, r.idsOverflow[0:size]...)
r.batch.ShardIDs = append(r.batch.ShardIDs, r.idsOverflow[0:size]...)
copy(r.idsOverflow, r.idsOverflow[size:])
r.idsOverflow = r.idsOverflow[:overflow-size]
if incomplete {
Expand Down
41 changes: 29 additions & 12 deletions src/dbnode/storage/index/wide_query_results_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,22 @@ func buildExpected(t *testing.T, docs [][]doc.Document) [][]string {
func drainAndCheckBatches(
t *testing.T,
expected [][]string,
checkShard bool,
batchCh <-chan *ident.IDBatch,
doneCh chan<- struct{},
) {
go func() {
i := 0
var exShard uint32
for batch := range batchCh {
batchStr := make([]string, 0, len(batch.IDs))
for _, id := range batch.IDs {
batchStr = append(batchStr, id.String())
id.Finalize()
batchStr := make([]string, 0, len(batch.ShardIDs))
for _, shardID := range batch.ShardIDs {
batchStr = append(batchStr, shardID.ID.String())
shardID.ID.Finalize()
if checkShard {
assert.Equal(t, exShard, shardID.Shard)
}
exShard++
}

batch.Processed()
Expand All @@ -119,6 +125,13 @@ func drainAndCheckBatches(
}()
}

func testFilterFn(t *testing.T, id ident.ID) (uint32, bool) {
i, err := strconv.Atoi(strings.TrimPrefix(id.String(), "foo"))
require.NoError(t, err)
// mark this shard as not owned by this node; should not appear in result
return uint32(i), true
}

func TestWideSeriesResults(t *testing.T) {
var (
max = 31
Expand All @@ -138,13 +151,18 @@ func TestWideSeriesResults(t *testing.T) {
docs := buildDocs(documentCount, docBatchSize)
// NB: expected should have docs split by `batchSize`
expected := buildExpected(t, buildDocs(documentCount, batchSize))
drainAndCheckBatches(t, expected, batchCh, doneCh)
drainAndCheckBatches(t, expected, true, batchCh, doneCh)

wideQueryOptions, err := NewWideQueryOptions(
now, batchSize, blockSize, nil, IterationOptions{})

require.NoError(t, err)
wideRes := NewWideQueryResults(testNs, testIDPool, nil, batchCh, wideQueryOptions)

filter := func(id ident.ID) (uint32, bool) {
return testFilterFn(t, id)
}

wideRes := NewWideQueryResults(testNs, testIDPool, filter, batchCh, wideQueryOptions)
var runningSize, batchDocCount int
for _, docBatch := range docs {
size, docsCount, err := wideRes.AddDocuments(docBatch)
Expand Down Expand Up @@ -176,7 +194,7 @@ func TestWideSeriesResultsWithShardFilter(t *testing.T) {
// This shard is part of the shard set being queried, but it will be
// flagged as not being owned by this node in the filter function, thus
// it should not appear in the result.
notOwnedShard = 7
notOwnedShard = uint32(7)

// After reading the last queried shard, add documents should
// short-circuit future shards by returning shard exhausted error.
Expand All @@ -192,16 +210,15 @@ func TestWideSeriesResultsWithShardFilter(t *testing.T) {

docs := buildDocs(documentCount, docBatchSize)
// NB: feed these in out of order to ensure WideQueryOptions sorts them.
shards := []uint32{21, uint32(notOwnedShard), 41, uint32(lastQueriedShard), 1}
shards := []uint32{21, notOwnedShard, 41, uint32(lastQueriedShard), 1}
expected := [][]string{{"foo1", "foo21", "foo41"}, {"foo42"}}
drainAndCheckBatches(t, expected, batchCh, doneCh)
drainAndCheckBatches(t, expected, false, batchCh, doneCh)

wideQueryOptions, err := NewWideQueryOptions(
now, batchSize, blockSize, shards, IterationOptions{})
require.NoError(t, err)
filter := func(id ident.ID) (uint32, bool) {
i, err := strconv.Atoi(strings.TrimPrefix(id.String(), "foo"))
require.NoError(t, err)
i, _ := testFilterFn(t, id)
// mark this shard as not owned by this node; should not appear in result
owned := i != notOwnedShard
return uint32(i), owned
Expand All @@ -216,7 +233,7 @@ func TestWideSeriesResultsWithShardFilter(t *testing.T) {
minInclusive := i * docBatchSize
maxExclusive := minInclusive + docBatchSize
for _, shard := range shards {
if int(shard) == notOwnedShard {
if shard == notOwnedShard {
continue
}

Expand Down
6 changes: 3 additions & 3 deletions src/dbnode/storage/index_queue_forward_write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,9 @@ func TestNamespaceForwardIndexWideQuery(t *testing.T) {
go func() {
i := 0
for b := range collector {
batchStr := make([]string, 0, len(b.IDs))
for _, id := range b.IDs {
batchStr = append(batchStr, id.String())
batchStr := make([]string, 0, len(b.ShardIDs))
for _, shardID := range b.ShardIDs {
batchStr = append(batchStr, shardID.ID.String())
}

withinIndex := i < len(expectedBatchIDs)
Expand Down
8 changes: 4 additions & 4 deletions src/dbnode/storage/index_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,9 +417,9 @@ func TestNamespaceIndexInsertWideQuery(t *testing.T) {
go func() {
i := 0
for b := range collector {
batchStr := make([]string, 0, len(b.IDs))
for _, id := range b.IDs {
batchStr = append(batchStr, id.String())
batchStr := make([]string, 0, len(b.ShardIDs))
for _, shardIDs := range b.ShardIDs {
batchStr = append(batchStr, shardIDs.ID.String())
}

withinIndex := i < len(expectedBatchIDs)
Expand Down Expand Up @@ -465,8 +465,8 @@ func TestNamespaceIndexInsertWideQueryFilteredByShard(t *testing.T) {
go func() {
i := 0
for b := range collector {
assert.Equal(t, 0, len(b.ShardIDs))
b.Processed()
fmt.Println(b.IDs)
i++
}
assert.Equal(t, 0, i)
Expand Down
3 changes: 0 additions & 3 deletions src/dbnode/tracepoint/tracepoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@ const (
// DBWriteBatch is the operation name for the db WriteBatch path.
DBWriteBatch = "storage.db.WriteBatch"

// DBWideEntry is the operation name for the tchannelthrift WideEntry path.
DBWideEntry = "storage.db.WideEntry"

// DBFetchMismatch is the operation name for the tchannelthrift DBFetchMismatch path.
DBFetchMismatch = "storage.db.FetchMismatch"

Expand Down
Loading

0 comments on commit 95bb87f

Please sign in to comment.