diff --git a/src/dbnode/storage/database.go b/src/dbnode/storage/database.go index 5f6ae35ea2..5d0a645d12 100644 --- a/src/dbnode/storage/database.go +++ b/src/dbnode/storage/database.go @@ -1029,8 +1029,8 @@ func (d *db) WideQuery( streamedWideEntries := make([]block.StreamedWideEntry, 0, batchSize) indexChecksumProcessor := func(batch *ident.IDBatch) error { streamedWideEntries = streamedWideEntries[:0] - for _, id := range batch.IDs { - streamedWideEntry, err := d.fetchWideEntries(ctx, n, id, start) + for _, shardID := range batch.ShardIDs { + streamedWideEntry, err := n.FetchWideEntry(ctx, shardID.ID, start) if err != nil { return err } @@ -1058,26 +1058,6 @@ func (d *db) WideQuery( return collectedChecksums, nil } -func (d *db) fetchWideEntries( - ctx context.Context, - ns databaseNamespace, - id ident.ID, - start time.Time, -) (block.StreamedWideEntry, error) { - ctx, sp, sampled := ctx.StartSampledTraceSpan(tracepoint.DBWideEntry) - if sampled { - sp.LogFields( - opentracinglog.String("namespace", ns.ID().String()), - opentracinglog.String("id", id.String()), - xopentracing.Time("start", start), - ) - } - - defer sp.Finish() - - return ns.FetchWideEntry(ctx, id, start) -} - func (d *db) FetchBlocks( ctx context.Context, namespace ident.ID, diff --git a/src/dbnode/storage/database_test.go b/src/dbnode/storage/database_test.go index 88d3af5540..a843400f67 100644 --- a/src/dbnode/storage/database_test.go +++ b/src/dbnode/storage/database_test.go @@ -44,7 +44,6 @@ import ( "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/dbnode/ts/writes" xmetrics "github.com/m3db/m3/src/dbnode/x/metrics" - "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3/src/m3ninx/idx" xclock "github.com/m3db/m3/src/x/clock" "github.com/m3db/m3/src/x/context" @@ -310,57 +309,6 @@ func TestDatabaseWideQueryNamespaceNonExistent(t *testing.T) { require.True(t, dberrors.IsUnknownNamespaceError(err)) } -func TestDatabaseWideEntry(t *testing.T) { - ctrl := xtest.NewController(t) - defer ctrl.Finish() - - ctx := context.NewContext() - defer ctx.Close() - - d, mapCh, _ := defaultTestDatabase(t, ctrl, Bootstrapped) - defer func() { - close(mapCh) - }() - - nsID := ident.StringID("testns1") - seriesID := ident.StringID("bar") - end := time.Now() - start := end.Add(-time.Hour) - - indexChecksumWithID := block.NewMockStreamedWideEntry(ctrl) - indexChecksumWithID.EXPECT().RetrieveWideEntry(). - Return( - xio.WideEntry{ - ID: ident.StringID("foo"), - MetadataChecksum: 5, - }, nil) - mockNamespace := NewMockdatabaseNamespace(ctrl) - mockNamespace.EXPECT().FetchWideEntry(ctx, seriesID, start). - Return(indexChecksumWithID, nil) - - indexChecksumWithoutID := block.NewMockStreamedWideEntry(ctrl) - indexChecksumWithoutID.EXPECT().RetrieveWideEntry(). - Return(xio.WideEntry{MetadataChecksum: 7}, nil) - mockNamespace.EXPECT().FetchWideEntry(ctx, seriesID, start). - Return(indexChecksumWithoutID, nil) - d.namespaces.Set(nsID, mockNamespace) - - res, err := d.fetchWideEntries(ctx, mockNamespace, seriesID, start) - require.NoError(t, err) - checksum, err := res.RetrieveWideEntry() - require.NoError(t, err) - assert.Equal(t, "foo", checksum.ID.String()) - assert.Equal(t, 5, int(checksum.MetadataChecksum)) - - res, err = d.fetchWideEntries(ctx, mockNamespace, seriesID, start) - require.NoError(t, err) - checksum, err = res.RetrieveWideEntry() - require.NoError(t, err) - require.NoError(t, err) - assert.Nil(t, checksum.ID) - assert.Equal(t, 7, int(checksum.MetadataChecksum)) -} - func TestDatabaseFetchBlocksNamespaceNonExistent(t *testing.T) { ctrl := xtest.NewController(t) defer ctrl.Finish() @@ -965,7 +913,6 @@ func TestWideQuery(t *testing.T) { } exSpans := []string{ - tracepoint.DBWideEntry, tracepoint.DBWideQuery, tracepoint.DBWideQuery, "root", @@ -1027,7 +974,11 @@ func testWideFunction(t *testing.T, testFn wideQueryTestFn, exSpans []string) { assert.Equal(t, opts.BatchSize, wideOpts.BatchSize) assert.Equal(t, opts.ShardsQueried, shards) go func() { - batch := &ident.IDBatch{IDs: []ident.ID{ident.StringID("foo")}} + batch := &ident.IDBatch{ + ShardIDs: []ident.ShardID{ + {ID: ident.StringID("foo")}, + }, + } batch.ReadyForProcessing() collector <- batch close(collector) diff --git a/src/dbnode/storage/index/wide_query_results.go b/src/dbnode/storage/index/wide_query_results.go index 92903b2643..442fd158cb 100644 --- a/src/dbnode/storage/index/wide_query_results.go +++ b/src/dbnode/storage/index/wide_query_results.go @@ -45,7 +45,7 @@ type wideResults struct { idPool ident.Pool closed bool - idsOverflow []ident.ID + idsOverflow []ident.ShardID batch *ident.IDBatch batchCh chan<- *ident.IDBatch batchSize int @@ -76,17 +76,13 @@ func NewWideQueryResults( nsID: namespaceID, idPool: idPool, batchSize: batchSize, - idsOverflow: make([]ident.ID, 0, batchSize), + idsOverflow: make([]ident.ShardID, 0, batchSize), batch: &ident.IDBatch{ - IDs: make([]ident.ID, 0, batchSize), + ShardIDs: make([]ident.ShardID, 0, batchSize), }, - batchCh: collector, - shards: opts.ShardsQueried, - } - - if len(opts.ShardsQueried) > 0 { - // Only apply filter if there are shards to filter against. - results.shardFilter = shardFilter + batchCh: collector, + shardFilter: shardFilter, + shards: opts.ShardsQueried, } return results @@ -118,7 +114,7 @@ func (r *wideResults) AddDocuments(batch []doc.Document) (int, int, error) { return size, totalDocsCount, err } - release := len(r.batch.IDs) == r.batchSize + release := len(r.batch.ShardIDs) == r.batchSize if release { r.releaseAndWaitWithLock() r.releaseOverflowWithLock() @@ -145,10 +141,15 @@ func (r *wideResults) addDocumentWithLock(d doc.Document) error { var tsID ident.ID = ident.BytesID(d.ID) - // Need to apply filter if set first. - if r.shardFilter != nil { + documentShard, documentShardOwned := r.shardFilter(tsID) + if !documentShardOwned { + // node is no longer responsible for this document's shard. + return nil + } + + if len(r.shards) > 0 { + // Need to apply filter if shard set provided. filteringShard := r.shards[r.shardIdx] - documentShard, documentShardOwned := r.shardFilter(tsID) // NB: Check to see if shard is exceeded first (to short circuit earlier if // the current shard is not owned by this node, but shard exceeds filtered). if filteringShard > documentShard { @@ -173,19 +174,21 @@ func (r *wideResults) addDocumentWithLock(d doc.Document) error { return nil } } - - if !documentShardOwned { - return nil - } } r.size++ r.totalDocsCount++ - if len(r.batch.IDs) < r.batchSize { - r.batch.IDs = append(r.batch.IDs, tsID) + + shardID := ident.ShardID{ + Shard: documentShard, + ID: tsID, + } + + if len(r.batch.ShardIDs) < r.batchSize { + r.batch.ShardIDs = append(r.batch.ShardIDs, shardID) } else { // NB: Add any IDs overflowing the batch size to the overflow slice. - r.idsOverflow = append(r.idsOverflow, tsID) + r.idsOverflow = append(r.idsOverflow, shardID) } return nil @@ -225,19 +228,19 @@ func (r *wideResults) Finalize() { } func (r *wideResults) releaseAndWaitWithLock() { - if r.closed || len(r.batch.IDs) == 0 { + if r.closed || len(r.batch.ShardIDs) == 0 { return } r.batch.ReadyForProcessing() r.batchCh <- r.batch r.batch.WaitUntilProcessed() - r.batch.IDs = r.batch.IDs[:0] + r.batch.ShardIDs = r.batch.ShardIDs[:0] r.size = len(r.idsOverflow) } func (r *wideResults) releaseOverflowWithLock() { - if len(r.batch.IDs) != 0 { + if len(r.batch.ShardIDs) != 0 { // If still some IDs in the batch, noop. Theoretically this should not // happen, since releaseAndWaitWithLock should be called before // releaseOverflowWithLock, which should drain the channel. @@ -264,7 +267,7 @@ func (r *wideResults) releaseOverflowWithLock() { } // NB: move overflow IDs to the batch itself. - r.batch.IDs = append(r.batch.IDs, r.idsOverflow[0:size]...) + r.batch.ShardIDs = append(r.batch.ShardIDs, r.idsOverflow[0:size]...) copy(r.idsOverflow, r.idsOverflow[size:]) r.idsOverflow = r.idsOverflow[:overflow-size] if incomplete { diff --git a/src/dbnode/storage/index/wide_query_results_test.go b/src/dbnode/storage/index/wide_query_results_test.go index b0279e8bd5..414e24de01 100644 --- a/src/dbnode/storage/index/wide_query_results_test.go +++ b/src/dbnode/storage/index/wide_query_results_test.go @@ -93,16 +93,22 @@ func buildExpected(t *testing.T, docs [][]doc.Document) [][]string { func drainAndCheckBatches( t *testing.T, expected [][]string, + checkShard bool, batchCh <-chan *ident.IDBatch, doneCh chan<- struct{}, ) { go func() { i := 0 + var exShard uint32 for batch := range batchCh { - batchStr := make([]string, 0, len(batch.IDs)) - for _, id := range batch.IDs { - batchStr = append(batchStr, id.String()) - id.Finalize() + batchStr := make([]string, 0, len(batch.ShardIDs)) + for _, shardID := range batch.ShardIDs { + batchStr = append(batchStr, shardID.ID.String()) + shardID.ID.Finalize() + if checkShard { + assert.Equal(t, exShard, shardID.Shard) + } + exShard++ } batch.Processed() @@ -119,6 +125,13 @@ func drainAndCheckBatches( }() } +func testFilterFn(t *testing.T, id ident.ID) (uint32, bool) { + i, err := strconv.Atoi(strings.TrimPrefix(id.String(), "foo")) + require.NoError(t, err) + // mark this shard as not owned by this node; should not appear in result + return uint32(i), true +} + func TestWideSeriesResults(t *testing.T) { var ( max = 31 @@ -138,13 +151,18 @@ func TestWideSeriesResults(t *testing.T) { docs := buildDocs(documentCount, docBatchSize) // NB: expected should have docs split by `batchSize` expected := buildExpected(t, buildDocs(documentCount, batchSize)) - drainAndCheckBatches(t, expected, batchCh, doneCh) + drainAndCheckBatches(t, expected, true, batchCh, doneCh) wideQueryOptions, err := NewWideQueryOptions( now, batchSize, blockSize, nil, IterationOptions{}) require.NoError(t, err) - wideRes := NewWideQueryResults(testNs, testIDPool, nil, batchCh, wideQueryOptions) + + filter := func(id ident.ID) (uint32, bool) { + return testFilterFn(t, id) + } + + wideRes := NewWideQueryResults(testNs, testIDPool, filter, batchCh, wideQueryOptions) var runningSize, batchDocCount int for _, docBatch := range docs { size, docsCount, err := wideRes.AddDocuments(docBatch) @@ -176,7 +194,7 @@ func TestWideSeriesResultsWithShardFilter(t *testing.T) { // This shard is part of the shard set being queried, but it will be // flagged as not being owned by this node in the filter function, thus // it should not appear in the result. - notOwnedShard = 7 + notOwnedShard = uint32(7) // After reading the last queried shard, add documents should // short-circuit future shards by returning shard exhausted error. @@ -192,16 +210,15 @@ func TestWideSeriesResultsWithShardFilter(t *testing.T) { docs := buildDocs(documentCount, docBatchSize) // NB: feed these in out of order to ensure WideQueryOptions sorts them. - shards := []uint32{21, uint32(notOwnedShard), 41, uint32(lastQueriedShard), 1} + shards := []uint32{21, notOwnedShard, 41, uint32(lastQueriedShard), 1} expected := [][]string{{"foo1", "foo21", "foo41"}, {"foo42"}} - drainAndCheckBatches(t, expected, batchCh, doneCh) + drainAndCheckBatches(t, expected, false, batchCh, doneCh) wideQueryOptions, err := NewWideQueryOptions( now, batchSize, blockSize, shards, IterationOptions{}) require.NoError(t, err) filter := func(id ident.ID) (uint32, bool) { - i, err := strconv.Atoi(strings.TrimPrefix(id.String(), "foo")) - require.NoError(t, err) + i, _ := testFilterFn(t, id) // mark this shard as not owned by this node; should not appear in result owned := i != notOwnedShard return uint32(i), owned @@ -216,7 +233,7 @@ func TestWideSeriesResultsWithShardFilter(t *testing.T) { minInclusive := i * docBatchSize maxExclusive := minInclusive + docBatchSize for _, shard := range shards { - if int(shard) == notOwnedShard { + if shard == notOwnedShard { continue } diff --git a/src/dbnode/storage/index_queue_forward_write_test.go b/src/dbnode/storage/index_queue_forward_write_test.go index 8b72db6771..82a7dcb298 100644 --- a/src/dbnode/storage/index_queue_forward_write_test.go +++ b/src/dbnode/storage/index_queue_forward_write_test.go @@ -234,9 +234,9 @@ func TestNamespaceForwardIndexWideQuery(t *testing.T) { go func() { i := 0 for b := range collector { - batchStr := make([]string, 0, len(b.IDs)) - for _, id := range b.IDs { - batchStr = append(batchStr, id.String()) + batchStr := make([]string, 0, len(b.ShardIDs)) + for _, shardID := range b.ShardIDs { + batchStr = append(batchStr, shardID.ID.String()) } withinIndex := i < len(expectedBatchIDs) diff --git a/src/dbnode/storage/index_queue_test.go b/src/dbnode/storage/index_queue_test.go index 9960da5beb..04c663d655 100644 --- a/src/dbnode/storage/index_queue_test.go +++ b/src/dbnode/storage/index_queue_test.go @@ -417,9 +417,9 @@ func TestNamespaceIndexInsertWideQuery(t *testing.T) { go func() { i := 0 for b := range collector { - batchStr := make([]string, 0, len(b.IDs)) - for _, id := range b.IDs { - batchStr = append(batchStr, id.String()) + batchStr := make([]string, 0, len(b.ShardIDs)) + for _, shardIDs := range b.ShardIDs { + batchStr = append(batchStr, shardIDs.ID.String()) } withinIndex := i < len(expectedBatchIDs) @@ -465,8 +465,8 @@ func TestNamespaceIndexInsertWideQueryFilteredByShard(t *testing.T) { go func() { i := 0 for b := range collector { + assert.Equal(t, 0, len(b.ShardIDs)) b.Processed() - fmt.Println(b.IDs) i++ } assert.Equal(t, 0, i) diff --git a/src/dbnode/tracepoint/tracepoint.go b/src/dbnode/tracepoint/tracepoint.go index 69eefcfe97..a81fb5ba15 100644 --- a/src/dbnode/tracepoint/tracepoint.go +++ b/src/dbnode/tracepoint/tracepoint.go @@ -73,9 +73,6 @@ const ( // DBWriteBatch is the operation name for the db WriteBatch path. DBWriteBatch = "storage.db.WriteBatch" - // DBWideEntry is the operation name for the tchannelthrift WideEntry path. - DBWideEntry = "storage.db.WideEntry" - // DBFetchMismatch is the operation name for the tchannelthrift DBFetchMismatch path. DBFetchMismatch = "storage.db.FetchMismatch" diff --git a/src/x/ident/types.go b/src/x/ident/types.go index b2689f7dab..ed6a0348ed 100644 --- a/src/x/ident/types.go +++ b/src/x/ident/types.go @@ -312,12 +312,20 @@ func (t Tags) Equal(other Tags) bool { return true } -// IDBatch is a batch of IDs that is consumed asynchronously. +// ShardID is a tuple of series ID and the shard it belongs to. +type ShardID struct { + // Shard is the shard the series belongs to. + Shard uint32 + // ID is the series ID. + ID ID +} + +// IDBatch is a batch of ShardIDs that is consumed asynchronously. type IDBatch struct { wg sync.WaitGroup - // IDs are the IDs for the batch. - IDs []ID + // ShardIDs are the ShardIDs for the batch. + ShardIDs []ShardID } // ReadyForProcessing indicates this batch is ready for processing.