Skip to content

Commit

Permalink
[largetiles] Fixed volumeIndex increment (#2681)
Browse files Browse the repository at this point in the history
* Fixed volumeIndex increment

* minor refactoring

Co-authored-by: Linas Medžiūnas <[email protected]>
Co-authored-by: Rob Skillington <[email protected]>
  • Loading branch information
3 people authored Oct 5, 2020
1 parent 1df3eed commit 89f6fcc
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 40 deletions.
9 changes: 9 additions & 0 deletions src/dbnode/integration/large_tiles_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ func TestReadAggregateWrite(t *testing.T) {
// Write test data.
dpTimeStart := nowFn().Truncate(blockSizeT).Add(-blockSizeT)
dpTime := dpTimeStart
// "aab" ID is stored to the same shard 0 same as "foo", this is important
// for a test to store them to the same shard to test data consistency
err = session.WriteTagged(srcNs.ID(), ident.StringID("aab"),
ident.MustNewTagStringsIterator("__name__", "cpu", "job", "job1"),
dpTime, 15, xtime.Second, nil)
Expand Down Expand Up @@ -116,6 +118,13 @@ func TestReadAggregateWrite(t *testing.T) {
assert.Equal(t, int64(10), processedTileCount)

log.Info("validating aggregated data")

// check shard 0 as we wrote both aab and foo to this shard.
flushState, err := testSetup.DB().FlushState(trgNs.ID(), 0, dpTimeStart)
require.NoError(t, err)
require.Equal(t, 1, flushState.ColdVersionRetrievable)
require.Equal(t, 1, flushState.ColdVersionFlushed)

expectedDps := []ts.Datapoint{
{Timestamp: dpTimeStart.Add(110 * time.Minute), Value: 53.1},
{Timestamp: dpTimeStart.Add(170 * time.Minute), Value: 59.1},
Expand Down
86 changes: 46 additions & 40 deletions src/dbnode/storage/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -2833,11 +2833,9 @@ READER:
// Notify all block leasers that a new volume for the namespace/shard/blockstart
// has been created. This will block until all leasers have relinquished their
// leases.
_, err = s.opts.BlockLeaseManager().UpdateOpenLeases(block.LeaseDescriptor{
Namespace: s.namespace.ID(),
Shard: s.ID(),
BlockStart: opts.Start,
}, block.LeaseState{Volume: nextVolume})
if err = s.finishWriting(opts.Start, nextVolume); err != nil {
multiErr = multiErr.Add(err)
}
}

closed = true
Expand Down Expand Up @@ -2888,6 +2886,48 @@ func (s *dbShard) logFlushResult(r dbShardFlushResult) {
)
}

func (s *dbShard) finishWriting(startTime time.Time, nextVersion int) error {
// After writing the full block successfully update the ColdVersionFlushed number. This will
// allow the SeekerManager to open a lease on the latest version of the fileset files because
// the BlockLeaseVerifier will check the ColdVersionFlushed value, but the buffer only looks at
// ColdVersionRetrievable so a concurrent tick will not yet cause the blocks in memory to be
// evicted (which is the desired behavior because we haven't updated the open leases yet which
// means the newly written data is not available for querying via the SeekerManager yet.)
s.setFlushStateColdVersionFlushed(startTime, nextVersion)

// Notify all block leasers that a new volume for the namespace/shard/blockstart
// has been created. This will block until all leasers have relinquished their
// leases.
_, err := s.opts.BlockLeaseManager().UpdateOpenLeases(block.LeaseDescriptor{
Namespace: s.namespace.ID(),
Shard: s.ID(),
BlockStart: startTime,
}, block.LeaseState{Volume: nextVersion})
// After writing the full block successfully **and** propagating the new lease to the
// BlockLeaseManager, update the ColdVersionRetrievable in the flush state. Once this function
// completes concurrent ticks will be able to evict the data from memory that was just flushed
// (which is now safe to do since the SeekerManager has been notified of the presence of new
// files).
//
// NB(rartoul): Ideally the ColdVersionRetrievable would only be updated if the call to UpdateOpenLeases
// succeeded, but that would allow the ColdVersionRetrievable and ColdVersionFlushed numbers to drift
// which would increase the complexity of the code to address a situation that is probably not
// recoverable (failure to UpdateOpenLeases is an invariant violated error).
s.setFlushStateColdVersionRetrievable(startTime, nextVersion)
if err != nil {
instrument.EmitAndLogInvariantViolation(s.opts.InstrumentOptions(), func(l *zap.Logger) {
l.With(
zap.String("namespace", s.namespace.ID().String()),
zap.Uint32("shard", s.ID()),
zap.Time("blockStart", startTime),
zap.Int("nextVersion", nextVersion),
).Error("failed to update open leases after updating flush state cold version")
})
return err
}
return nil
}

type shardColdFlushDone struct {
startTime time.Time
nextVersion int
Expand All @@ -2909,44 +2949,10 @@ func (s shardColdFlush) Done() error {
multiErr = multiErr.Add(err)
continue
}
// After writing the full block successfully update the ColdVersionFlushed number. This will
// allow the SeekerManager to open a lease on the latest version of the fileset files because
// the BlockLeaseVerifier will check the ColdVersionFlushed value, but the buffer only looks at
// ColdVersionRetrievable so a concurrent tick will not yet cause the blocks in memory to be
// evicted (which is the desired behavior because we haven't updated the open leases yet which
// means the newly written data is not available for querying via the SeekerManager yet.)
s.shard.setFlushStateColdVersionFlushed(startTime, nextVersion)

// Notify all block leasers that a new volume for the namespace/shard/blockstart
// has been created. This will block until all leasers have relinquished their
// leases.
_, err := s.shard.opts.BlockLeaseManager().UpdateOpenLeases(block.LeaseDescriptor{
Namespace: s.shard.namespace.ID(),
Shard: s.shard.ID(),
BlockStart: startTime,
}, block.LeaseState{Volume: nextVersion})
// After writing the full block successfully **and** propagating the new lease to the
// BlockLeaseManager, update the ColdVersionRetrievable in the flush state. Once this function
// completes concurrent ticks will be able to evict the data from memory that was just flushed
// (which is now safe to do since the SeekerManager has been notified of the presence of new
// files).
//
// NB(rartoul): Ideally the ColdVersionRetrievable would only be updated if the call to UpdateOpenLeases
// succeeded, but that would allow the ColdVersionRetrievable and ColdVersionFlushed numbers to drift
// which would increase the complexity of the code to address a situation that is probably not
// recoverable (failure to UpdateOpenLeases is an invariant violated error).
s.shard.setFlushStateColdVersionRetrievable(startTime, nextVersion)
err := s.shard.finishWriting(startTime, nextVersion)
if err != nil {
instrument.EmitAndLogInvariantViolation(s.shard.opts.InstrumentOptions(), func(l *zap.Logger) {
l.With(
zap.String("namespace", s.shard.namespace.ID().String()),
zap.Uint32("shard", s.shard.ID()),
zap.Time("blockStart", startTime),
zap.Int("nextVersion", nextVersion),
).Error("failed to update open leases after updating flush state cold version")
})
multiErr = multiErr.Add(err)
continue
}
}
return multiErr.FinalError()
Expand Down

0 comments on commit 89f6fcc

Please sign in to comment.