Skip to content

Commit

Permalink
Fixed volumeIndex increment
Browse files Browse the repository at this point in the history
  • Loading branch information
gediminasgu committed Sep 30, 2020
1 parent 43ff200 commit 2e0342b
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 43 deletions.
12 changes: 9 additions & 3 deletions src/dbnode/integration/large_tiles_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@ func TestReadAggregateWrite(t *testing.T) {
}, time.Second*10))

log.Info("validating aggregated data")

flushState, err := testSetup.DB().FlushState(trgNs.ID(), 0, dpTimeStart)
require.NoError(t, err)
require.Equal(t, 1, flushState.ColdVersionRetrievable)
require.Equal(t, 1, flushState.ColdVersionFlushed)

expectedDps := []ts.Datapoint{
{Timestamp: dpTimeStart.Add(110 * time.Minute), Value: 53.1},
{Timestamp: dpTimeStart.Add(170 * time.Minute), Value: 59.1},
Expand Down Expand Up @@ -285,9 +291,9 @@ func setupServer(t *testing.T) (TestSetup, namespace.Metadata, namespace.Metadat
idxOpts = namespace.NewIndexOptions().SetEnabled(true).SetBlockSize(indexBlockSize)
idxOptsT = namespace.NewIndexOptions().SetEnabled(false).SetBlockSize(indexBlockSizeT)
nsOpts = namespace.NewOptions().
SetRetentionOptions(rOpts).
SetIndexOptions(idxOpts).
SetColdWritesEnabled(true)
SetRetentionOptions(rOpts).
SetIndexOptions(idxOpts).
SetColdWritesEnabled(true)
nsOptsT = namespace.NewOptions().
SetRetentionOptions(rOptsT).
SetIndexOptions(idxOptsT).
Expand Down
86 changes: 46 additions & 40 deletions src/dbnode/storage/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -2852,11 +2852,9 @@ READER:
// Notify all block leasers that a new volume for the namespace/shard/blockstart
// has been created. This will block until all leasers have relinquished their
// leases.
_, err = s.opts.BlockLeaseManager().UpdateOpenLeases(block.LeaseDescriptor{
Namespace: s.namespace.ID(),
Shard: s.ID(),
BlockStart: opts.Start,
}, block.LeaseState{Volume: nextVolume})
if err = s.writingIsFinished(opts.Start, nextVolume); err != nil {
multiErr = multiErr.Add(err)
}
}

closed = true
Expand Down Expand Up @@ -2907,6 +2905,48 @@ func (s *dbShard) logFlushResult(r dbShardFlushResult) {
)
}

func (s *dbShard) writingIsFinished(startTime time.Time, nextVersion int) error {
// After writing the full block successfully update the ColdVersionFlushed number. This will
// allow the SeekerManager to open a lease on the latest version of the fileset files because
// the BlockLeaseVerifier will check the ColdVersionFlushed value, but the buffer only looks at
// ColdVersionRetrievable so a concurrent tick will not yet cause the blocks in memory to be
// evicted (which is the desired behavior because we haven't updated the open leases yet which
// means the newly written data is not available for querying via the SeekerManager yet.)
s.setFlushStateColdVersionFlushed(startTime, nextVersion)

// Notify all block leasers that a new volume for the namespace/shard/blockstart
// has been created. This will block until all leasers have relinquished their
// leases.
_, err := s.opts.BlockLeaseManager().UpdateOpenLeases(block.LeaseDescriptor{
Namespace: s.namespace.ID(),
Shard: s.ID(),
BlockStart: startTime,
}, block.LeaseState{Volume: nextVersion})
// After writing the full block successfully **and** propagating the new lease to the
// BlockLeaseManager, update the ColdVersionRetrievable in the flush state. Once this function
// completes concurrent ticks will be able to evict the data from memory that was just flushed
// (which is now safe to do since the SeekerManager has been notified of the presence of new
// files).
//
// NB(rartoul): Ideally the ColdVersionRetrievable would only be updated if the call to UpdateOpenLeases
// succeeded, but that would allow the ColdVersionRetrievable and ColdVersionFlushed numbers to drift
// which would increase the complexity of the code to address a situation that is probably not
// recoverable (failure to UpdateOpenLeases is an invariant violated error).
s.setFlushStateColdVersionRetrievable(startTime, nextVersion)
if err != nil {
instrument.EmitAndLogInvariantViolation(s.opts.InstrumentOptions(), func(l *zap.Logger) {
l.With(
zap.String("namespace", s.namespace.ID().String()),
zap.Uint32("shard", s.ID()),
zap.Time("blockStart", startTime),
zap.Int("nextVersion", nextVersion),
).Error("failed to update open leases after updating flush state cold version")
})
return err
}
return nil
}

type shardColdFlushDone struct {
startTime time.Time
nextVersion int
Expand All @@ -2928,44 +2968,10 @@ func (s shardColdFlush) Done() error {
multiErr = multiErr.Add(err)
continue
}
// After writing the full block successfully update the ColdVersionFlushed number. This will
// allow the SeekerManager to open a lease on the latest version of the fileset files because
// the BlockLeaseVerifier will check the ColdVersionFlushed value, but the buffer only looks at
// ColdVersionRetrievable so a concurrent tick will not yet cause the blocks in memory to be
// evicted (which is the desired behavior because we haven't updated the open leases yet which
// means the newly written data is not available for querying via the SeekerManager yet.)
s.shard.setFlushStateColdVersionFlushed(startTime, nextVersion)

// Notify all block leasers that a new volume for the namespace/shard/blockstart
// has been created. This will block until all leasers have relinquished their
// leases.
_, err := s.shard.opts.BlockLeaseManager().UpdateOpenLeases(block.LeaseDescriptor{
Namespace: s.shard.namespace.ID(),
Shard: s.shard.ID(),
BlockStart: startTime,
}, block.LeaseState{Volume: nextVersion})
// After writing the full block successfully **and** propagating the new lease to the
// BlockLeaseManager, update the ColdVersionRetrievable in the flush state. Once this function
// completes concurrent ticks will be able to evict the data from memory that was just flushed
// (which is now safe to do since the SeekerManager has been notified of the presence of new
// files).
//
// NB(rartoul): Ideally the ColdVersionRetrievable would only be updated if the call to UpdateOpenLeases
// succeeded, but that would allow the ColdVersionRetrievable and ColdVersionFlushed numbers to drift
// which would increase the complexity of the code to address a situation that is probably not
// recoverable (failure to UpdateOpenLeases is an invariant violated error).
s.shard.setFlushStateColdVersionRetrievable(startTime, nextVersion)
err := s.shard.writingIsFinished(startTime, nextVersion)
if err != nil {
instrument.EmitAndLogInvariantViolation(s.shard.opts.InstrumentOptions(), func(l *zap.Logger) {
l.With(
zap.String("namespace", s.shard.namespace.ID().String()),
zap.Uint32("shard", s.shard.ID()),
zap.Time("blockStart", startTime),
zap.Int("nextVersion", nextVersion),
).Error("failed to update open leases after updating flush state cold version")
})
multiErr = multiErr.Add(err)
continue
}
}
return multiErr.FinalError()
Expand Down

0 comments on commit 2e0342b

Please sign in to comment.