From 89f6fcc7d1bd0a7ead672fa5f9c2b866daccb252 Mon Sep 17 00:00:00 2001 From: Gediminas Guoba Date: Mon, 5 Oct 2020 21:36:32 +0300 Subject: [PATCH] [largetiles] Fixed volumeIndex increment (#2681) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fixed volumeIndex increment * minor refactoring Co-authored-by: Linas Medžiūnas Co-authored-by: Rob Skillington --- src/dbnode/integration/large_tiles_test.go | 9 +++ src/dbnode/storage/shard.go | 86 ++++++++++++---------- 2 files changed, 55 insertions(+), 40 deletions(-) diff --git a/src/dbnode/integration/large_tiles_test.go b/src/dbnode/integration/large_tiles_test.go index b988c6d5ec..b8baeb927a 100644 --- a/src/dbnode/integration/large_tiles_test.go +++ b/src/dbnode/integration/large_tiles_test.go @@ -70,6 +70,8 @@ func TestReadAggregateWrite(t *testing.T) { // Write test data. dpTimeStart := nowFn().Truncate(blockSizeT).Add(-blockSizeT) dpTime := dpTimeStart + // "aab" ID is stored to the same shard 0 same as "foo", this is important + // for a test to store them to the same shard to test data consistency err = session.WriteTagged(srcNs.ID(), ident.StringID("aab"), ident.MustNewTagStringsIterator("__name__", "cpu", "job", "job1"), dpTime, 15, xtime.Second, nil) @@ -116,6 +118,13 @@ func TestReadAggregateWrite(t *testing.T) { assert.Equal(t, int64(10), processedTileCount) log.Info("validating aggregated data") + + // check shard 0 as we wrote both aab and foo to this shard. + flushState, err := testSetup.DB().FlushState(trgNs.ID(), 0, dpTimeStart) + require.NoError(t, err) + require.Equal(t, 1, flushState.ColdVersionRetrievable) + require.Equal(t, 1, flushState.ColdVersionFlushed) + expectedDps := []ts.Datapoint{ {Timestamp: dpTimeStart.Add(110 * time.Minute), Value: 53.1}, {Timestamp: dpTimeStart.Add(170 * time.Minute), Value: 59.1}, diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index 95a051468f..9897336fec 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -2833,11 +2833,9 @@ READER: // Notify all block leasers that a new volume for the namespace/shard/blockstart // has been created. This will block until all leasers have relinquished their // leases. - _, err = s.opts.BlockLeaseManager().UpdateOpenLeases(block.LeaseDescriptor{ - Namespace: s.namespace.ID(), - Shard: s.ID(), - BlockStart: opts.Start, - }, block.LeaseState{Volume: nextVolume}) + if err = s.finishWriting(opts.Start, nextVolume); err != nil { + multiErr = multiErr.Add(err) + } } closed = true @@ -2888,6 +2886,48 @@ func (s *dbShard) logFlushResult(r dbShardFlushResult) { ) } +func (s *dbShard) finishWriting(startTime time.Time, nextVersion int) error { + // After writing the full block successfully update the ColdVersionFlushed number. This will + // allow the SeekerManager to open a lease on the latest version of the fileset files because + // the BlockLeaseVerifier will check the ColdVersionFlushed value, but the buffer only looks at + // ColdVersionRetrievable so a concurrent tick will not yet cause the blocks in memory to be + // evicted (which is the desired behavior because we haven't updated the open leases yet which + // means the newly written data is not available for querying via the SeekerManager yet.) + s.setFlushStateColdVersionFlushed(startTime, nextVersion) + + // Notify all block leasers that a new volume for the namespace/shard/blockstart + // has been created. This will block until all leasers have relinquished their + // leases. + _, err := s.opts.BlockLeaseManager().UpdateOpenLeases(block.LeaseDescriptor{ + Namespace: s.namespace.ID(), + Shard: s.ID(), + BlockStart: startTime, + }, block.LeaseState{Volume: nextVersion}) + // After writing the full block successfully **and** propagating the new lease to the + // BlockLeaseManager, update the ColdVersionRetrievable in the flush state. Once this function + // completes concurrent ticks will be able to evict the data from memory that was just flushed + // (which is now safe to do since the SeekerManager has been notified of the presence of new + // files). + // + // NB(rartoul): Ideally the ColdVersionRetrievable would only be updated if the call to UpdateOpenLeases + // succeeded, but that would allow the ColdVersionRetrievable and ColdVersionFlushed numbers to drift + // which would increase the complexity of the code to address a situation that is probably not + // recoverable (failure to UpdateOpenLeases is an invariant violated error). + s.setFlushStateColdVersionRetrievable(startTime, nextVersion) + if err != nil { + instrument.EmitAndLogInvariantViolation(s.opts.InstrumentOptions(), func(l *zap.Logger) { + l.With( + zap.String("namespace", s.namespace.ID().String()), + zap.Uint32("shard", s.ID()), + zap.Time("blockStart", startTime), + zap.Int("nextVersion", nextVersion), + ).Error("failed to update open leases after updating flush state cold version") + }) + return err + } + return nil +} + type shardColdFlushDone struct { startTime time.Time nextVersion int @@ -2909,44 +2949,10 @@ func (s shardColdFlush) Done() error { multiErr = multiErr.Add(err) continue } - // After writing the full block successfully update the ColdVersionFlushed number. This will - // allow the SeekerManager to open a lease on the latest version of the fileset files because - // the BlockLeaseVerifier will check the ColdVersionFlushed value, but the buffer only looks at - // ColdVersionRetrievable so a concurrent tick will not yet cause the blocks in memory to be - // evicted (which is the desired behavior because we haven't updated the open leases yet which - // means the newly written data is not available for querying via the SeekerManager yet.) - s.shard.setFlushStateColdVersionFlushed(startTime, nextVersion) - // Notify all block leasers that a new volume for the namespace/shard/blockstart - // has been created. This will block until all leasers have relinquished their - // leases. - _, err := s.shard.opts.BlockLeaseManager().UpdateOpenLeases(block.LeaseDescriptor{ - Namespace: s.shard.namespace.ID(), - Shard: s.shard.ID(), - BlockStart: startTime, - }, block.LeaseState{Volume: nextVersion}) - // After writing the full block successfully **and** propagating the new lease to the - // BlockLeaseManager, update the ColdVersionRetrievable in the flush state. Once this function - // completes concurrent ticks will be able to evict the data from memory that was just flushed - // (which is now safe to do since the SeekerManager has been notified of the presence of new - // files). - // - // NB(rartoul): Ideally the ColdVersionRetrievable would only be updated if the call to UpdateOpenLeases - // succeeded, but that would allow the ColdVersionRetrievable and ColdVersionFlushed numbers to drift - // which would increase the complexity of the code to address a situation that is probably not - // recoverable (failure to UpdateOpenLeases is an invariant violated error). - s.shard.setFlushStateColdVersionRetrievable(startTime, nextVersion) + err := s.shard.finishWriting(startTime, nextVersion) if err != nil { - instrument.EmitAndLogInvariantViolation(s.shard.opts.InstrumentOptions(), func(l *zap.Logger) { - l.With( - zap.String("namespace", s.shard.namespace.ID().String()), - zap.Uint32("shard", s.shard.ID()), - zap.Time("blockStart", startTime), - zap.Int("nextVersion", nextVersion), - ).Error("failed to update open leases after updating flush state cold version") - }) multiErr = multiErr.Add(err) - continue } } return multiErr.FinalError()