Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove CacheAllMetadata caching policy #1110

Merged
merged 13 commits into from
Oct 22, 2018
12 changes: 0 additions & 12 deletions src/dbnode/storage/block/block_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions src/dbnode/storage/block/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,6 @@ type DatabaseBlock interface {
// merged during Stream().
HasMergeTarget() bool

// IsRetrieved returns whether the block is already retrieved. Only
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, shouldn't this be required for some cache policies (e.g. LRU/RecentlyRead)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I trusted the comment! I believe the other caching policies use IsCachedBlock() and "retrieved" means we have metadata but we may not have the data for it.

Copy link
Contributor

@richardartoul richardartoul Oct 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think IsRetrieved was specific to that policy (you can see the same logic implemented in the IsCachedBlock function. I think we definitely want to get rid of this function and move towards using IsCachedBlock or honestly at this point we can delete IsCachedBlock as well and only use WasRetrievedFromDisk since they will have the exact same logic at that point. The only place we use IsCachedBlock is in series.FetchBlocksMetadata and its just an optimization there (and not a particularly accurate one at that since it doesn't work in the recently_read policy)

Also, the only place where i see IsRetrieved being used outside the context of the CacheAllMetadata policy is in the series buffer:

func (b *dbBufferBucket) merge() (mergeResult, error) {
	if !b.needsMerge() {
		// Save unnecessary work
		return mergeResult{}, nil
	}

	merges := 0
	bopts := b.opts.DatabaseBlockOptions()
	encoder := bopts.EncoderPool().Get()
	encoder.Reset(b.start, bopts.DatabaseBlockAllocSize())

	// If we have to merge bootstrapped from disk during a merge then this
	// can make ticking very slow, ensure to notify this bug
	if len(b.bootstrapped) > 0 {
		unretrieved := 0
		for i := range b.bootstrapped {
			if !b.bootstrapped[i].IsRetrieved() {
				unretrieved++
			}
		}
		if unretrieved > 0 {
			log := b.opts.InstrumentOptions().Logger()
			log.Warnf("buffer merging %d unretrieved blocks", unretrieved)
		}
	}

This is a very weird code path that I don't think we would ever follow at this point. If we want to be extra cautious could replace that IsRetrieved() call with WasReadFromDisk()/IsCachedBlock as an invariant violated.

@robskillington

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah let's remove it, don't think this is plausible now.

// meaningful in the context of the CacheAllMetadata series caching policy.
IsRetrieved() bool

// WasRetrievedFromDisk returns whether the block was retrieved from storage.
WasRetrievedFromDisk() bool

Expand Down
32 changes: 1 addition & 31 deletions src/dbnode/storage/bootstrap/bootstrapper/fs/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,15 +506,6 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult(
if shardRetrieverMgr != nil {
shardRetriever = shardRetrieverMgr.ShardRetriever(shard)
}
if seriesCachePolicy == series.CacheAllMetadata && shardRetriever == nil {
s.log.WithFields(
xlog.NewField("has-shard-retriever-mgr", shardRetrieverMgr != nil),
xlog.NewField("has-shard-retriever", shardRetriever != nil),
).Errorf("shard retriever missing for shard: %d", shard)
s.markRunResultErrorsAndUnfulfilled(runResult, requestedRanges,
remainingRanges, timesWithErrors)
return
}
}

for _, r := range readers {
Expand Down Expand Up @@ -558,8 +549,6 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult(
switch seriesCachePolicy {
case series.CacheAll:
validateErr = r.Validate()
case series.CacheAllMetadata:
validateErr = r.ValidateMetadata()
default:
err = fmt.Errorf("invalid series cache policy: %s", seriesCachePolicy.String())
}
Expand Down Expand Up @@ -639,15 +628,11 @@ func (s *fileSystemSource) readNextEntryAndRecordBlock(
id ident.ID
tagsIter ident.TagIterator
data checked.Bytes
length int
checksum uint32
err error
)
switch seriesCachePolicy {
case series.CacheAll:
id, tagsIter, data, checksum, err = r.Read()
case series.CacheAllMetadata:
id, tagsIter, length, checksum, err = r.ReadMetadata()
id, tagsIter, data, _, err = r.Read()
default:
err = fmt.Errorf("invalid series cache policy: %s", seriesCachePolicy.String())
}
Expand Down Expand Up @@ -683,13 +668,6 @@ func (s *fileSystemSource) readNextEntryAndRecordBlock(
case series.CacheAll:
seg := ts.NewSegment(data, nil, ts.FinalizeHead)
seriesBlock.Reset(blockStart, blockSize, seg)
case series.CacheAllMetadata:
metadata := block.RetrievableBlockMetadata{
ID: id,
Length: length,
Checksum: checksum,
}
seriesBlock.ResetRetrievable(blockStart, blockSize, shardRetriever, metadata)
default:
return fmt.Errorf("invalid series cache policy: %s", seriesCachePolicy.String())
}
Expand Down Expand Up @@ -914,7 +892,6 @@ func (s *fileSystemSource) read(
runOpts bootstrap.RunOptions,
) (*runResult, error) {
var (
nsID = md.ID()
seriesCachePolicy = s.opts.ResultOptions().SeriesCachePolicy()
blockRetriever block.DatabaseBlockRetriever
res *runResult
Expand Down Expand Up @@ -958,13 +935,6 @@ func (s *fileSystemSource) read(
switch seriesCachePolicy {
case series.CacheAll:
// No checks necessary
case series.CacheAllMetadata:
// Need to check block retriever available
if blockRetriever == nil {
return nil, fmt.Errorf(
"missing block retriever when using series cache metadata for namespace: %s",
nsID.String())
}
default:
// Unless we're caching all series (or all series metadata) in memory, we
// return just the availability of the files we have
Expand Down
29 changes: 4 additions & 25 deletions src/dbnode/storage/bootstrap/bootstrapper/peers/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,11 +343,8 @@ func (s *peersSource) logFetchBootstrapBlocksFromPeersOutcome(
// the series will either be held in memory, or removed from memory once
// flushing has completed.
// Once everything has been flushed to disk then depending on the series
// caching policy the function is either done, or in the case of the
// CacheAllMetadata policy we loop through every series and make every block
// retrievable (so that we can retrieve data for the blocks that we're caching
// the metadata for).
// In addition, if the caching policy is not CacheAll or CacheAllMetadata, then
// caching policy the function is either done.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sentence terminates too early now

// In addition, if the caching policy is not CacheAll, then
// at the end we remove all the series objects from the shard result as well
// (since all their corresponding blocks have been removed anyways) to prevent
// a huge memory spike caused by adding lots of unused series to the Shard
Expand All @@ -364,14 +361,10 @@ func (s *peersSource) flush(
var (
ropts = nsMetadata.Options().RetentionOptions()
blockSize = ropts.BlockSize()
shardRetriever = shardRetrieverMgr.ShardRetriever(shard)
tmpCtx = context.NewContext()
seriesCachePolicy = s.opts.ResultOptions().SeriesCachePolicy()
persistConfig = opts.PersistConfig()
)
if seriesCachePolicy == series.CacheAllMetadata && shardRetriever == nil {
return fmt.Errorf("shard retriever missing for shard: %d", shard)
}

for start := tr.Start; start.Before(tr.End); start = start.Add(blockSize) {
prepareOpts := persist.DataPrepareOptions{
Expand Down Expand Up @@ -440,18 +433,6 @@ func (s *peersSource) flush(
case series.CacheAll:
// Leave the blocks in the shard result, we need to return all blocks
// so we can cache in memory
case series.CacheAllMetadata:
// NB(r): We can now make the flushed blocks retrievable, note that we
// explicitly perform another loop here and lookup the block again
// to avoid a large expensive allocation to hold onto the blocks
// that we just flushed that would have to be pooled.
// We are explicitly trading CPU time here for lower GC pressure.
metadata := block.RetrievableBlockMetadata{
ID: s.ID,
Length: bl.Len(),
Checksum: checksum,
}
bl.ResetRetrievable(start, blockSize, shardRetriever, metadata)
default:
// Not caching the series or metadata in memory so finalize the block,
// better to do this as we loop through to make blocks return to the
Expand Down Expand Up @@ -486,15 +467,13 @@ func (s *peersSource) flush(
}
}

// We only want to retain the series metadata in one of three cases:
// We only want to retain the series metadata in one of two cases:
// 1) CacheAll caching policy (because we're expected to cache everything in memory)
// 2) CacheAllMetadata caching policy (because we're expected to cache all metadata in memory)
// 3) PersistConfig.FileSetType is set to FileSetSnapshotType because that means we're bootstrapping
// 2) PersistConfig.FileSetType is set to FileSetSnapshotType because that means we're bootstrapping
// an active block that we'll want to perform a flush on later, and we're only flushing here for
// the sake of allowing the commit log bootstrapper to be able to recover this data if the node
// goes down in-between this bootstrapper completing and the subsequent flush.
shouldRetainSeriesMetadata := seriesCachePolicy == series.CacheAll ||
seriesCachePolicy == series.CacheAllMetadata ||
persistConfig.FileSetType == persist.FileSetSnapshotType

if !shouldRetainSeriesMetadata {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,6 @@ func TestPeersSourceReturnsFulfilledAndUnfulfilled(t *testing.T) {

func TestPeersSourceRunWithPersist(t *testing.T) {
for _, cachePolicy := range []series.CachePolicy{
series.CacheAllMetadata,
series.CacheRecentlyRead,
} {
ctrl := gomock.NewController(t)
Expand Down Expand Up @@ -406,36 +405,9 @@ func TestPeersSourceRunWithPersist(t *testing.T) {
require.True(t, r.Unfulfilled()[0].IsEmpty())
require.True(t, r.Unfulfilled()[1].IsEmpty())

if cachePolicy == series.CacheAllMetadata {
assert.Equal(t, 2, len(r.ShardResults()))
require.NotNil(t, r.ShardResults()[0])
require.NotNil(t, r.ShardResults()[1])

block, ok := r.ShardResults()[0].BlockAt(ident.StringID("foo"), start)
require.True(t, ok)
fooBlockChecksum, err := fooBlock.Checksum()
require.NoError(t, err)
assertBlockChecksum(t, fooBlockChecksum, block)
assert.False(t, block.IsRetrieved())

block, ok = r.ShardResults()[0].BlockAt(ident.StringID("bar"), start.Add(ropts.BlockSize()))
require.True(t, ok)
barBlockChecksum, err := barBlock.Checksum()
require.NoError(t, err)
assertBlockChecksum(t, barBlockChecksum, block)
assert.False(t, block.IsRetrieved())

block, ok = r.ShardResults()[1].BlockAt(ident.StringID("baz"), start)
require.True(t, ok)
bazBlockChecksum, err := bazBlock.Checksum()
require.NoError(t, err)
assertBlockChecksum(t, bazBlockChecksum, block)
assert.False(t, block.IsRetrieved())
} else {
assert.Equal(t, 0, len(r.ShardResults()))
require.Nil(t, r.ShardResults()[0])
require.Nil(t, r.ShardResults()[1])
}
assert.Equal(t, 0, len(r.ShardResults()))
require.Nil(t, r.ShardResults()[0])
require.Nil(t, r.ShardResults()[1])

assert.Equal(t, map[string]int{
"foo": 1, "bar": 1, "baz": 1,
Expand Down
15 changes: 0 additions & 15 deletions src/dbnode/storage/series/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -769,21 +769,6 @@ func (b *dbBufferBucket) merge() (mergeResult, error) {
encoder := bopts.EncoderPool().Get()
encoder.Reset(b.start, bopts.DatabaseBlockAllocSize())

// If we have to merge bootstrapped from disk during a merge then this
// can make ticking very slow, ensure to notify this bug
if len(b.bootstrapped) > 0 {
unretrieved := 0
for i := range b.bootstrapped {
if !b.bootstrapped[i].IsRetrieved() {
unretrieved++
}
}
if unretrieved > 0 {
log := b.opts.InstrumentOptions().Logger()
log.Warnf("buffer merging %d unretrieved blocks", unretrieved)
}
}

var (
start = b.start
readers = make([]xio.SegmentReader, 0, len(b.encoders)+len(b.bootstrapped))
Expand Down
9 changes: 1 addition & 8 deletions src/dbnode/storage/series/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@ const (
// which requires loading all into cache on bootstrap and never
// expiring series from memory until expired from retention.
CacheAll
// CacheAllMetadata specifies that all series metadata but not the
// data itself must be cached at all times and the metadata is never
// expired from memory until expired from retention.
// TODO: Remove this once recently read is production grade.
CacheAllMetadata
// CacheRecentlyRead specifies that series that are recently read
// must be cached, configurable by the namespace block expiry after
// not accessed period.
Expand All @@ -59,7 +54,7 @@ const (

// ValidCachePolicies returns the valid series cache policies.
func ValidCachePolicies() []CachePolicy {
return []CachePolicy{CacheNone, CacheAll, CacheAllMetadata, CacheRecentlyRead, CacheLRU}
return []CachePolicy{CacheNone, CacheAll, CacheRecentlyRead, CacheLRU}
}

func (p CachePolicy) String() string {
Expand All @@ -68,8 +63,6 @@ func (p CachePolicy) String() string {
return "none"
case CacheAll:
return "all"
case CacheAllMetadata:
return "all_metadata"
case CacheRecentlyRead:
return "recently_read"
case CacheLRU:
Expand Down
4 changes: 0 additions & 4 deletions src/dbnode/storage/series/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,6 @@ func (r Reader) readersWithBlocksMapAndBuffer(
switch {
case cachePolicy == CacheAll:
// No-op, block metadata should have been in-memory
case cachePolicy == CacheAllMetadata:
// No-op, block metadata should have been in-memory
case r.retriever != nil:
// Try to stream from disk
if r.retriever.IsBlockRetrievable(blockAt) {
Expand Down Expand Up @@ -213,8 +211,6 @@ func (r Reader) fetchBlocksWithBlocksMapAndBuffer(
switch {
case cachePolicy == CacheAll:
// No-op, block metadata should have been in-memory
case cachePolicy == CacheAllMetadata:
// No-op, block metadata should have been in-memory
case r.retriever != nil:
// Try to stream from disk
if r.retriever.IsBlockRetrievable(start) {
Expand Down
35 changes: 3 additions & 32 deletions src/dbnode/storage/series/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,6 @@ func (s *dbSeries) updateBlocksWithLock() (updateBlocksResult, error) {
continue
}

if cachePolicy == CacheAllMetadata && !currBlock.IsRetrieved() {
// Already unwired
result.UnwiredBlocks++
continue
}

// Potentially unwire
var unwired, shouldUnwire bool
// IsBlockRetrievable makes sure that the block has been flushed. This
Expand All @@ -217,9 +211,6 @@ func (s *dbSeries) updateBlocksWithLock() (updateBlocksResult, error) {
switch cachePolicy {
case CacheNone:
shouldUnwire = true
case CacheAllMetadata:
// Apply RecentlyRead logic (CacheAllMetadata is being removed soon)
fallthrough
case CacheRecentlyRead:
sinceLastRead := now.Sub(currBlock.LastReadTime())
shouldUnwire = sinceLastRead >= wiredTimeout
Expand All @@ -235,29 +226,9 @@ func (s *dbSeries) updateBlocksWithLock() (updateBlocksResult, error) {
}

if shouldUnwire {
switch cachePolicy {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So much cleanup :D

case CacheAllMetadata:
// Keep the metadata but remove contents

// NB(r): Each block needs shared ref to the series ID
// or else each block needs to have a copy of the ID
id := s.id
checksum, err := currBlock.Checksum()
if err != nil {
return result, err
}
metadata := block.RetrievableBlockMetadata{
ID: id,
Length: currBlock.Len(),
Checksum: checksum,
}
currBlock.ResetRetrievable(start, currBlock.BlockSize(), retriever, metadata)
default:
// Remove the block and it will be looked up later
s.blocks.RemoveBlockAt(start)
currBlock.Close()
}

// Remove the block and it will be looked up later
s.blocks.RemoveBlockAt(start)
currBlock.Close()
unwired = true
result.madeUnwiredBlocks++
}
Expand Down
Loading