Skip to content

Commit

Permalink
[dbnode] Skip bootstrapping shards from aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
linasm committed Mar 29, 2021
1 parent 9eacaa7 commit a6a9b99
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 0 deletions.
9 changes: 9 additions & 0 deletions src/dbnode/storage/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -1766,6 +1766,7 @@ func (n *dbNamespace) aggregateTiles(
processedTileCount int64
aggregationSuccess bool
)

defer func() {
if aggregationSuccess {
return
Expand All @@ -1776,7 +1777,15 @@ func (n *dbNamespace) aggregateTiles(
zap.Stringer("sourceNs", sourceNs.ID()), zap.Error(err))
}
}()

for _, targetShard := range n.OwnedShards() {
if !targetShard.IsBootstrapped() {
n.log.
With(zap.Uint32("shard", targetShard.ID())).
Debug("skipping aggregateTiles due to shard not bootstrapped")
continue
}

shardProcessedTileCount, err := targetShard.AggregateTiles(
ctx, sourceNs, n, targetShard.ID(), onColdFlushNs, opts)

Expand Down
53 changes: 53 additions & 0 deletions src/dbnode/storage/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1629,6 +1629,9 @@ func TestNamespaceAggregateTiles(t *testing.T) {
targetNs.shards[0] = targetShard0
targetNs.shards[1] = targetShard1

targetShard0.EXPECT().IsBootstrapped().Return(true)
targetShard1.EXPECT().IsBootstrapped().Return(true)

targetShard0.EXPECT().ID().Return(shard0ID)
targetShard1.EXPECT().ID().Return(shard1ID)

Expand All @@ -1646,6 +1649,56 @@ func TestNamespaceAggregateTiles(t *testing.T) {
assert.Equal(t, int64(3+2), processedTileCount)
}

func TestNamespaceAggregateTilesShipBootstrappingShards(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()

ctx := context.NewBackground()
defer ctx.Close()

var (
sourceNsID = ident.StringID("source")
targetNsID = ident.StringID("target")
sourceBlockSize = time.Hour
targetBlockSize = 2 * time.Hour
start = time.Now().Truncate(targetBlockSize)
shard0ID = uint32(10)
shard1ID = uint32(20)
insOpts = instrument.NewOptions()
)

opts, err := NewAggregateTilesOptions(start, start.Add(targetBlockSize), time.Second, targetNsID, insOpts)
require.NoError(t, err)

sourceNs, sourceCloser := newTestNamespaceWithIDOpts(t, sourceNsID, namespace.NewOptions())
defer sourceCloser()
sourceNs.bootstrapState = Bootstrapped
sourceRetentionOpts := sourceNs.nopts.RetentionOptions().SetBlockSize(sourceBlockSize)
sourceNs.nopts = sourceNs.nopts.SetRetentionOptions(sourceRetentionOpts)

targetNs, targetCloser := newTestNamespaceWithIDOpts(t, targetNsID, namespace.NewOptions())
defer targetCloser()
targetNs.bootstrapState = Bootstrapped
targetRetentionOpts := targetNs.nopts.RetentionOptions().SetBlockSize(targetBlockSize)
targetNs.nopts = targetNs.nopts.SetColdWritesEnabled(true).SetRetentionOptions(targetRetentionOpts)

targetShard0 := NewMockdatabaseShard(ctrl)
targetShard1 := NewMockdatabaseShard(ctrl)
targetNs.shards[0] = targetShard0
targetNs.shards[1] = targetShard1

targetShard0.EXPECT().IsBootstrapped().Return(false)
targetShard1.EXPECT().IsBootstrapped().Return(false)

targetShard0.EXPECT().ID().Return(shard0ID)
targetShard1.EXPECT().ID().Return(shard1ID)

processedTileCount, err := targetNs.AggregateTiles(ctx, sourceNs, opts)

require.NoError(t, err)
assert.Zero(t, processedTileCount)
}

func waitForStats(
reporter xmetrics.TestStatsReporter,
check func(xmetrics.TestStatsReporter) bool,
Expand Down

0 comments on commit a6a9b99

Please sign in to comment.