Skip to content

Commit

Permalink
[dbnode] Large tile aggregation improvements (#2668)
Browse files Browse the repository at this point in the history
* [dbnode] Large tile aggregation improvements

* Fix mocking

* Log block start and volume on error

* Infra for reverse index sharing

* StorageOptions.AfterNamespaceCreatedFn

* Adjust test timing

* Do not fail on fs.ErrCheckpointFileNotFound

* mockgen

* Improve handling of shared reverse index

* Improve Shard.AggregateTiles UT coverage

* Remove unused reverseIndex from test

* Address review feedback (partial)

* Rearrange imports
  • Loading branch information
linasm authored Oct 1, 2020
1 parent d5fbe4b commit f996e2d
Show file tree
Hide file tree
Showing 17 changed files with 578 additions and 232 deletions.
206 changes: 19 additions & 187 deletions src/dbnode/integration/large_tiles_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ package integration

import (
"io"
"strconv"
"testing"
"time"

Expand All @@ -35,11 +34,8 @@ import (
"github.com/m3db/m3/src/dbnode/ts"
xmetrics "github.com/m3db/m3/src/dbnode/x/metrics"
xclock "github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/context"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/pool"
"github.com/m3db/m3/src/x/serialize"
xtime "github.com/m3db/m3/src/x/time"

"github.com/stretchr/testify/assert"
Expand All @@ -49,10 +45,8 @@ import (
)

var (
blockSize = 2 * time.Hour
blockSizeT = 24 * time.Hour
indexBlockSize = 2 * blockSize
indexBlockSizeT = 2 * blockSizeT
blockSize = 2 * time.Hour
blockSizeT = 24 * time.Hour
)

func TestReadAggregateWrite(t *testing.T) {
Expand All @@ -74,21 +68,21 @@ func TestReadAggregateWrite(t *testing.T) {
nowFn := testSetup.NowFn()

// Write test data.
dpTimeStart := nowFn().Truncate(indexBlockSizeT).Add(-2 * indexBlockSizeT)
dpTimeStart := nowFn().Truncate(blockSizeT).Add(-blockSizeT)
dpTime := dpTimeStart
err = session.WriteTagged(srcNs.ID(), ident.StringID("aab"),
ident.MustNewTagStringsIterator("__name__", "cpu", "job", "job1"),
dpTime, 15, xtime.Second, nil)

testDataPointsCount := 60.0
for a := 0.0; a < testDataPointsCount; a++ {
testDataPointsCount := 60
for a := 0; a < testDataPointsCount; a++ {
if a < 10 {
dpTime = dpTime.Add(10 * time.Minute)
continue
}
err = session.WriteTagged(srcNs.ID(), ident.StringID("foo"),
ident.MustNewTagStringsIterator("__name__", "cpu", "job", "job1"),
dpTime, 42.1+a, xtime.Second, nil)
dpTime, 42.1+float64(a), xtime.Second, nil)
require.NoError(t, err)
dpTime = dpTime.Add(10 * time.Minute)
}
Expand Down Expand Up @@ -121,14 +115,6 @@ func TestReadAggregateWrite(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, int64(10), processedTileCount)

require.True(t, xclock.WaitUntil(func() bool {
counters := reporter.Counters()
writeErrorsCount, _ := counters["database.writeAggData.errors"]
require.Equal(t, int64(0), writeErrorsCount)
seriesWritesCount, _ := counters["dbshard.large-tiles-writes"]
return seriesWritesCount >= 2
}, time.Second*10))

log.Info("validating aggregated data")
expectedDps := []ts.Datapoint{
{Timestamp: dpTimeStart.Add(110 * time.Minute), Value: 53.1},
Expand Down Expand Up @@ -156,101 +142,6 @@ func TestReadAggregateWrite(t *testing.T) {
expectedDps)
}

var (
iterationCount = 10
testSeriesCount = 5000
testDataPointsCount = int(blockSizeT.Hours()) * 100
)

//func TestAggregationAndQueryingAtHighConcurrency(t *testing.T) {
// testSetup, srcNs, trgNs, reporter, closer := setupServer(t)
// storageOpts := testSetup.StorageOpts()
// log := storageOpts.InstrumentOptions().Logger()
//
// // Stop the server.
// defer func() {
// require.NoError(t, testSetup.StopServer())
// log.Debug("server is now down")
// testSetup.Close()
// _ = closer.Close()
// }()
//
// nowFn := testSetup.NowFn()
// dpTimeStart := nowFn().Truncate(indexBlockSizeT).Add(-2 * indexBlockSizeT)
// writeTestData(t, testSetup, log, reporter, dpTimeStart, srcNs.ID())
//
// aggOpts, err := storage.NewAggregateTilesOptions(
// dpTimeStart, dpTimeStart.Add(blockSizeT),
// 10*time.Minute, false)
// require.NoError(t, err)
//
// log.Info("Starting aggregation loop")
// start := time.Now()
//
// inProgress := atomic.NewBool(true)
// var wg sync.WaitGroup
// for b := 0; b < 4; b++ {
//
// wg.Add(1)
//
// go func() {
// defer wg.Done()
//
// query := index.Query{
// Query: idx.NewTermQuery([]byte("job"), []byte("job1"))}
//
// for inProgress.Load() {
// session, err := testSetup.M3DBClient().NewSession()
// require.NoError(t, err)
// result, _, err := session.FetchTagged(srcNs.ID(), query,
// index.QueryOptions{
// StartInclusive: dpTimeStart.Add(-blockSizeT),
// EndExclusive: nowFn(),
// })
// session.Close()
// if err != nil {
// require.NoError(t, err)
// return
// }
// require.Equal(t, testSeriesCount, len(result.Iters()))
//
// result.Close()
// time.Sleep(time.Millisecond)
// }
// }()
// }
//
// var expectedPoints = int64(testDataPointsCount * testSeriesCount / 100 * 6)
// for a := 0; a < iterationCount; a++ {
// ctx := storageOpts.ContextPool().Get()
// processedTileCount, err := testSetup.DB().AggregateTiles(ctx, srcNs.ID(), trgNs.ID(), aggOpts)
// ctx.BlockingClose()
// if err != nil {
// require.NoError(t, err)
// }
// require.Equal(t, processedTileCount, expectedPoints)
// }
// log.Info("Finished aggregation", zap.Duration("took", time.Since(start)))
//
// inProgress.Toggle()
// wg.Wait()
// log.Info("Finished parallel querying")
//
// counters := reporter.Counters()
// writeErrorsCount, _ := counters["database.writeAggData.errors"]
// require.Equal(t, int64(0), writeErrorsCount)
// seriesWritesCount, _ := counters["dbshard.large-tiles-writes"]
// require.Equal(t, int64(testSeriesCount*iterationCount), seriesWritesCount)
//
// session, err := testSetup.M3DBClient().NewSession()
// require.NoError(t, err)
// _, err = session.Fetch(srcNs.ID(),
// ident.StringID("foo"+strconv.Itoa(50)),
// dpTimeStart, dpTimeStart.Add(blockSizeT))
// session.Close()
// require.NoError(t, err)
//}

func fetchAndValidate(
t *testing.T,
session client.Session,
Expand All @@ -277,17 +168,18 @@ func fetchAndValidate(
func setupServer(t *testing.T) (TestSetup, namespace.Metadata, namespace.Metadata, xmetrics.TestStatsReporter, io.Closer) {
var (
rOpts = retention.NewOptions().SetRetentionPeriod(500 * blockSize).SetBlockSize(blockSize)
rOptsT = retention.NewOptions().SetRetentionPeriod(100 * blockSize).SetBlockSize(blockSizeT)
idxOpts = namespace.NewIndexOptions().SetEnabled(true).SetBlockSize(indexBlockSize)
idxOptsT = namespace.NewIndexOptions().SetEnabled(false).SetBlockSize(indexBlockSizeT)
rOptsT = retention.NewOptions().SetRetentionPeriod(100 * blockSize).SetBlockSize(blockSizeT).SetBufferPast(0)
idxOpts = namespace.NewIndexOptions().SetEnabled(true).SetBlockSize(blockSize)
idxOptsT = namespace.NewIndexOptions().SetEnabled(true).SetBlockSize(blockSizeT)
nsOpts = namespace.NewOptions().
SetRetentionOptions(rOpts).
SetIndexOptions(idxOpts).
SetColdWritesEnabled(true)
nsOptsT = namespace.NewOptions().
SetRetentionOptions(rOptsT).
SetIndexOptions(idxOptsT).
SetColdWritesEnabled(true)
SetIndexOptions(idxOptsT)

fixedNow = time.Now().Truncate(blockSizeT)
)

srcNs, err := namespace.NewMetadata(testNamespaces[0], nsOpts)
Expand All @@ -299,81 +191,21 @@ func setupServer(t *testing.T) (TestSetup, namespace.Metadata, namespace.Metadat
SetNamespaces([]namespace.Metadata{srcNs, trgNs}).
SetWriteNewSeriesAsync(true).
SetNumShards(1).
SetFetchRequestTimeout(time.Second * 30)
SetFetchRequestTimeout(time.Second * 30).
SetNowFn(func() time.Time {
return fixedNow
})

testSetup := newTestSetupWithCommitLogAndFilesystemBootstrapper(t, testOpts)
reporter := xmetrics.NewTestStatsReporter(xmetrics.NewTestStatsReporterOptions())
scope, closer := tally.NewRootScope(
tally.ScopeOptions{Reporter: reporter}, time.Millisecond)
testSetup.SetStorageOpts(testSetup.StorageOpts().SetInstrumentOptions(
instrument.NewOptions().SetMetricsScope(scope)))
storageOpts := testSetup.StorageOpts().
SetInstrumentOptions(instrument.NewOptions().SetMetricsScope(scope))
testSetup.SetStorageOpts(storageOpts)

// Start the server.
require.NoError(t, testSetup.StartServer())

return testSetup, srcNs, trgNs, reporter, closer
}

func writeTestData(
t *testing.T, testSetup TestSetup, log *zap.Logger,
reporter xmetrics.TestStatsReporter,
dpTimeStart time.Time, ns ident.ID,
) {
dpTime := dpTimeStart

testTagEncodingPool := serialize.NewTagEncoderPool(serialize.NewTagEncoderOptions(),
pool.NewObjectPoolOptions().SetSize(1))
testTagEncodingPool.Init()
encoder := testTagEncodingPool.Get()
tagsIter := ident.MustNewTagStringsIterator("__name__", "cpu", "job", "job1")
err := encoder.Encode(tagsIter)
require.NoError(t, err)

encodedTags, ok := encoder.Data()
require.True(t, ok)
encodedTagsBytes := encodedTags.Bytes()

start := time.Now()
for a := 0; a < testDataPointsCount; a++ {
i := 0
batchWriter, err := testSetup.DB().BatchWriter(ns, testDataPointsCount)
require.NoError(t, err)

for b := 0; b < testSeriesCount; b++ {
tagsIter.Rewind()
err := batchWriter.AddTagged(i,
ident.StringID("foo"+strconv.Itoa(b)),
tagsIter, encodedTagsBytes,
dpTime, 42.1+float64(a), xtime.Second, nil)
require.NoError(t, err)
i++
}
for r := 0; r < 3; r++ {
err = testSetup.DB().WriteTaggedBatch(context.NewContext(), ns, batchWriter, nil)
if err != nil && err.Error() == "commit log queue is full" {
time.Sleep(time.Second)
continue
}
break
}
require.NoError(t, err)

dpTime = dpTime.Add(time.Minute)
}

log.Info("test data written", zap.Duration("took", time.Since(start)))

log.Info("waiting till data is cold flushed")
start = time.Now()
flushed := xclock.WaitUntil(func() bool {
counters := reporter.Counters()
flushes, _ := counters["database.flushIndex.success"]
writes, _ := counters["database.series.cold-writes"]
successFlushes, _ := counters["database.flushColdData.success"]
return flushes >= 1 &&
int(writes) >= testDataPointsCount*testSeriesCount &&
successFlushes >= 4
}, time.Minute)
require.True(t, flushed)
log.Info("verified data has been cold flushed", zap.Duration("took", time.Since(start)))
}
1 change: 1 addition & 0 deletions src/dbnode/server/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ type StorageOptions struct {
OnColdFlush storage.OnColdFlush
ForceColdWritesEnabled bool
TChanNodeServerFn node.NewTChanNodeServerFn
NamespaceHooks storage.NamespaceHooks
}
4 changes: 4 additions & 0 deletions src/dbnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,10 @@ func Run(runOpts RunOptions) {
opts = opts.SetOnColdFlush(runOpts.StorageOptions.OnColdFlush)
}

if runOpts.StorageOptions.NamespaceHooks != nil {
opts = opts.SetNamespaceHooks(runOpts.StorageOptions.NamespaceHooks)
}

// Set bootstrap options - We need to create a topology map provider from the
// same topology that will be passed to the cluster so that when we make
// bootstrapping decisions they are in sync with the clustered database
Expand Down
14 changes: 13 additions & 1 deletion src/dbnode/storage/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,8 @@ func (d *db) logNamespaceUpdate(removes []ident.ID, adds, updates []namespace.Me
}

func (d *db) addNamespacesWithLock(namespaces []namespace.Metadata) error {
createdNamespaces := make([]databaseNamespace, 0, len(namespaces))

for _, n := range namespaces {
// ensure namespace doesn't exist
_, ok := d.namespaces.Get(n.ID())
Expand All @@ -382,7 +384,17 @@ func (d *db) addNamespacesWithLock(namespaces []namespace.Metadata) error {
return err
}
d.namespaces.Set(n.ID(), newNs)
createdNamespaces = append(createdNamespaces, newNs)
}

hooks := d.Options().NamespaceHooks()
for _, ns := range createdNamespaces {
err := hooks.OnCreatedNamespace(ns, d.namespaces.Get)
if err != nil {
return err
}
}

return nil
}

Expand Down Expand Up @@ -1047,7 +1059,7 @@ func (d *db) BootstrapState() DatabaseBootstrapState {
d.RLock()
for _, n := range d.namespaces.Iter() {
ns := n.Value()
nsBootstrapStates[ns.ID().String()] = ns.BootstrapState()
nsBootstrapStates[ns.ID().String()] = ns.ShardBootstrapState()
}
d.RUnlock()

Expand Down
10 changes: 5 additions & 5 deletions src/dbnode/storage/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ func testDatabaseNamespaceIndexFunctions(t *testing.T, commitlogEnabled bool) {

ns.EXPECT().OwnedShards().Return([]databaseShard{}).AnyTimes()
ns.EXPECT().Tick(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
ns.EXPECT().BootstrapState().Return(ShardBootstrapStates{}).AnyTimes()
ns.EXPECT().ShardBootstrapState().Return(ShardBootstrapStates{}).AnyTimes()
ns.EXPECT().Options().Return(nsOptions).AnyTimes()
require.NoError(t, d.Open())

Expand Down Expand Up @@ -969,7 +969,7 @@ func testDatabaseWriteBatch(t *testing.T,

ns.EXPECT().OwnedShards().Return([]databaseShard{}).AnyTimes()
ns.EXPECT().Tick(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
ns.EXPECT().BootstrapState().Return(ShardBootstrapStates{}).AnyTimes()
ns.EXPECT().ShardBootstrapState().Return(ShardBootstrapStates{}).AnyTimes()
ns.EXPECT().Options().Return(nsOptions).AnyTimes()
ns.EXPECT().Close().Return(nil).Times(1)
require.NoError(t, d.Open())
Expand Down Expand Up @@ -1115,11 +1115,11 @@ func TestDatabaseBootstrapState(t *testing.T) {
}()

ns1 := dbAddNewMockNamespace(ctrl, d, "testns1")
ns1.EXPECT().BootstrapState().Return(ShardBootstrapStates{
ns1.EXPECT().ShardBootstrapState().Return(ShardBootstrapStates{
1: Bootstrapping,
})
ns2 := dbAddNewMockNamespace(ctrl, d, "testns2")
ns2.EXPECT().BootstrapState().Return(ShardBootstrapStates{
ns2.EXPECT().ShardBootstrapState().Return(ShardBootstrapStates{
2: Bootstrapped,
})

Expand Down Expand Up @@ -1199,7 +1199,7 @@ func TestUpdateBatchWriterBasedOnShardResults(t *testing.T) {
SetWritesToCommitLog(false)
ns.EXPECT().OwnedShards().Return([]databaseShard{}).AnyTimes()
ns.EXPECT().Tick(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
ns.EXPECT().BootstrapState().Return(ShardBootstrapStates{}).AnyTimes()
ns.EXPECT().ShardBootstrapState().Return(ShardBootstrapStates{}).AnyTimes()
ns.EXPECT().Options().Return(nsOptions).AnyTimes()
ns.EXPECT().Close().Return(nil).Times(1)
require.NoError(t, d.Open())
Expand Down
Loading

0 comments on commit f996e2d

Please sign in to comment.