-
Notifications
You must be signed in to change notification settings - Fork 454
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[m3db] Check bloom filter before stream request allocation #3103
Merged
Merged
Changes from 2 commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -89,12 +89,13 @@ const ( | |
type blockRetriever struct { | ||
sync.RWMutex | ||
|
||
opts BlockRetrieverOptions | ||
fsOpts Options | ||
logger *zap.Logger | ||
queryLimits limits.QueryLimits | ||
bytesReadLimit limits.LookbackLimit | ||
seriesReadCount tally.Counter | ||
opts BlockRetrieverOptions | ||
fsOpts Options | ||
logger *zap.Logger | ||
queryLimits limits.QueryLimits | ||
bytesReadLimit limits.LookbackLimit | ||
seriesReadCount tally.Counter | ||
seriesBloomFilterMisses tally.Counter | ||
|
||
newSeekerMgrFn newSeekerMgrFn | ||
|
||
|
@@ -126,18 +127,19 @@ func NewBlockRetriever( | |
scope := fsOpts.InstrumentOptions().MetricsScope().SubScope("retriever") | ||
|
||
return &blockRetriever{ | ||
opts: opts, | ||
fsOpts: fsOpts, | ||
logger: fsOpts.InstrumentOptions().Logger(), | ||
queryLimits: opts.QueryLimits(), | ||
bytesReadLimit: opts.QueryLimits().BytesReadLimit(), | ||
seriesReadCount: scope.Counter("series-read"), | ||
newSeekerMgrFn: NewSeekerManager, | ||
reqPool: opts.RetrieveRequestPool(), | ||
bytesPool: opts.BytesPool(), | ||
idPool: opts.IdentifierPool(), | ||
status: blockRetrieverNotOpen, | ||
notifyFetch: make(chan struct{}, 1), | ||
opts: opts, | ||
fsOpts: fsOpts, | ||
logger: fsOpts.InstrumentOptions().Logger(), | ||
queryLimits: opts.QueryLimits(), | ||
bytesReadLimit: opts.QueryLimits().BytesReadLimit(), | ||
seriesReadCount: scope.Counter("series-read"), | ||
seriesBloomFilterMisses: scope.Counter("series-bloom-filter-misses"), | ||
newSeekerMgrFn: NewSeekerManager, | ||
reqPool: opts.RetrieveRequestPool(), | ||
bytesPool: opts.BytesPool(), | ||
idPool: opts.IdentifierPool(), | ||
status: blockRetrieverNotOpen, | ||
notifyFetch: make(chan struct{}, 1), | ||
// We just close this channel when the fetchLoops should shutdown, so no | ||
// buffering is required | ||
fetchLoopsShouldShutdownCh: make(chan struct{}), | ||
|
@@ -560,6 +562,33 @@ func (r *blockRetriever) fetchBatch( | |
} | ||
} | ||
|
||
func (r *blockRetriever) seriesPresentInBloomFilter( | ||
id ident.ID, | ||
shard uint32, | ||
startTime time.Time, | ||
) (bool, error) { | ||
// Capture variable and RLock() because this slice can be modified in the | ||
// Open() method | ||
r.RLock() | ||
// This should never happen unless caller tries to use Stream() before Open() | ||
if r.seekerMgr == nil { | ||
r.RUnlock() | ||
return false, errNoSeekerMgr | ||
} | ||
r.RUnlock() | ||
|
||
idExists, err := r.seekerMgr.Test(id, shard, startTime) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be the captured variable no? r.RLock()
seekerMgr := r.seekerMgr
r.RUnlock()
if r.seekerMgr == nil {
return false, errNoSeekerMgr
}
idExists, err := seekerMgr.Test(id, shard, startTime) As per the comment: // Capture variable and RLock() because this slice can be modified in the
// Open() method There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
if err != nil { | ||
return false, err | ||
} | ||
|
||
if !idExists { | ||
r.seriesBloomFilterMisses.Inc(1) | ||
} | ||
|
||
return idExists, nil | ||
} | ||
|
||
// streamRequest returns a bool indicating if the ID was found, and any errors. | ||
func (r *blockRetriever) streamRequest( | ||
ctx context.Context, | ||
|
@@ -568,11 +597,11 @@ func (r *blockRetriever) streamRequest( | |
id ident.ID, | ||
startTime time.Time, | ||
nsCtx namespace.Context, | ||
) (bool, error) { | ||
) error { | ||
req.resultWg.Add(1) | ||
r.seriesReadCount.Inc(1) | ||
if err := r.queryLimits.DiskSeriesReadLimit().Inc(1, req.source); err != nil { | ||
return false, err | ||
return err | ||
} | ||
req.shard = shard | ||
|
||
|
@@ -592,29 +621,9 @@ func (r *blockRetriever) streamRequest( | |
// Ensure to finalize at the end of request | ||
ctx.RegisterFinalizer(req) | ||
|
||
// Capture variable and RLock() because this slice can be modified in the | ||
// Open() method | ||
r.RLock() | ||
// This should never happen unless caller tries to use Stream() before Open() | ||
if r.seekerMgr == nil { | ||
r.RUnlock() | ||
return false, errNoSeekerMgr | ||
} | ||
r.RUnlock() | ||
|
||
idExists, err := r.seekerMgr.Test(id, shard, startTime) | ||
if err != nil { | ||
return false, err | ||
} | ||
|
||
// If the ID is not in the seeker's bloom filter, then it's definitely not on | ||
// disk and we can return immediately. | ||
if !idExists { | ||
return false, nil | ||
} | ||
reqs, err := r.shardRequests(shard) | ||
if err != nil { | ||
return false, err | ||
return err | ||
} | ||
|
||
reqs.Lock() | ||
|
@@ -633,7 +642,7 @@ func (r *blockRetriever) streamRequest( | |
// the data. This means that even though we're returning nil for error | ||
// here, the caller may still encounter an error when they attempt to | ||
// read the data. | ||
return true, nil | ||
return nil | ||
} | ||
|
||
func (r *blockRetriever) Stream( | ||
|
@@ -644,6 +653,16 @@ func (r *blockRetriever) Stream( | |
onRetrieve block.OnRetrieveBlock, | ||
nsCtx namespace.Context, | ||
) (xio.BlockReader, error) { | ||
found, err := r.seriesPresentInBloomFilter(id, shard, startTime) | ||
if err != nil { | ||
return xio.EmptyBlockReader, err | ||
} | ||
// If the ID is not in the seeker's bloom filter, then it's definitely not on | ||
// disk and we can return immediately. | ||
if !found { | ||
return xio.EmptyBlockReader, nil | ||
} | ||
|
||
req := r.reqPool.Get() | ||
req.onRetrieve = onRetrieve | ||
req.streamReqType = streamDataReq | ||
|
@@ -655,18 +674,12 @@ func (r *blockRetriever) Stream( | |
} | ||
} | ||
|
||
found, err := r.streamRequest(ctx, req, shard, id, startTime, nsCtx) | ||
err = r.streamRequest(ctx, req, shard, id, startTime, nsCtx) | ||
if err != nil { | ||
req.resultWg.Done() | ||
return xio.EmptyBlockReader, err | ||
} | ||
|
||
if !found { | ||
req.onRetrieved(ts.Segment{}, namespace.Context{}) | ||
req.success = true | ||
req.onDone() | ||
} | ||
|
||
// The request may not have completed yet, but it has an internal | ||
// waitgroup which the caller will have to wait for before retrieving | ||
// the data. This means that even though we're returning nil for error | ||
|
@@ -683,22 +696,26 @@ func (r *blockRetriever) StreamWideEntry( | |
filter schema.WideEntryFilter, | ||
nsCtx namespace.Context, | ||
) (block.StreamedWideEntry, error) { | ||
found, err := r.seriesPresentInBloomFilter(id, shard, startTime) | ||
if err != nil { | ||
return block.EmptyStreamedWideEntry, err | ||
} | ||
// If the ID is not in the seeker's bloom filter, then it's definitely not on | ||
// disk and we can return immediately. | ||
if !found { | ||
return block.EmptyStreamedWideEntry, nil | ||
} | ||
|
||
req := r.reqPool.Get() | ||
req.streamReqType = streamWideEntryReq | ||
req.wideFilter = filter | ||
|
||
found, err := r.streamRequest(ctx, req, shard, id, startTime, nsCtx) | ||
err = r.streamRequest(ctx, req, shard, id, startTime, nsCtx) | ||
if err != nil { | ||
req.resultWg.Done() | ||
return block.EmptyStreamedWideEntry, err | ||
} | ||
|
||
if !found { | ||
req.wideEntry = xio.WideEntry{} | ||
req.success = true | ||
req.onDone() | ||
} | ||
|
||
// The request may not have completed yet, but it has an internal | ||
// waitgroup which the caller will have to wait for before retrieving | ||
// the data. This means that even though we're returning nil for error | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while you're here, you could kill
seriesReadCount
. it's redundant with thetotal
count metric in the limit