Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Negate possibility of point in time segment rotation returning query error #2432

6 changes: 3 additions & 3 deletions src/dbnode/storage/index/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ type block struct {
coldMutableSegments []*mutableSegments
shardRangesSegmentsByVolumeType shardRangesSegmentsByVolumeType
newFieldsAndTermsIteratorFn newFieldsAndTermsIteratorFn
newExecutorFn newExecutorFn
newExecutorWithRLockFn newExecutorFn
blockStart time.Time
blockEnd time.Time
blockSize time.Duration
Expand Down Expand Up @@ -234,7 +234,7 @@ func NewBlock(
queryStats: opts.QueryStats(),
}
b.newFieldsAndTermsIteratorFn = newFieldsAndTermsIterator
b.newExecutorFn = b.executorWithRLock
b.newExecutorWithRLockFn = b.executorWithRLock

return b, nil
}
Expand Down Expand Up @@ -423,7 +423,7 @@ func (b *block) queryWithSpan(
return false, ErrUnableToQueryBlockClosed
}

exec, err := b.newExecutorFn()
exec, err := b.newExecutorWithRLockFn()
if err != nil {
return false, err
}
Expand Down
24 changes: 11 additions & 13 deletions src/dbnode/storage/index/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,9 +393,7 @@ func TestBlockQueryExecutorError(t *testing.T) {
b, ok := blk.(*block)
require.True(t, ok)

b.newExecutorFn = func() (search.Executor, error) {
b.RLock() // ensures we call newExecutorFn with RLock, or this would deadlock
defer b.RUnlock()
b.newExecutorWithRLockFn = func() (search.Executor, error) {
return nil, fmt.Errorf("random-err")
}

Expand Down Expand Up @@ -479,7 +477,7 @@ func TestBlockMockQueryExecutorExecError(t *testing.T) {

// dIter:= doc.NewMockIterator(ctrl)
exec := search.NewMockExecutor(ctrl)
b.newExecutorFn = func() (search.Executor, error) {
b.newExecutorWithRLockFn = func() (search.Executor, error) {
return exec, nil
}
gomock.InOrder(
Expand All @@ -504,7 +502,7 @@ func TestBlockMockQueryExecutorExecIterErr(t *testing.T) {
require.True(t, ok)

exec := search.NewMockExecutor(ctrl)
b.newExecutorFn = func() (search.Executor, error) {
b.newExecutorWithRLockFn = func() (search.Executor, error) {
return exec, nil
}

Expand Down Expand Up @@ -544,7 +542,7 @@ func TestBlockMockQueryExecutorExecLimit(t *testing.T) {
require.True(t, ok)

exec := search.NewMockExecutor(ctrl)
b.newExecutorFn = func() (search.Executor, error) {
b.newExecutorWithRLockFn = func() (search.Executor, error) {
return exec, nil
}

Expand Down Expand Up @@ -594,7 +592,7 @@ func TestBlockMockQueryExecutorExecIterCloseErr(t *testing.T) {
require.True(t, ok)

exec := search.NewMockExecutor(ctrl)
b.newExecutorFn = func() (search.Executor, error) {
b.newExecutorWithRLockFn = func() (search.Executor, error) {
return exec, nil
}

Expand Down Expand Up @@ -632,7 +630,7 @@ func TestBlockMockQuerySeriesLimitNonExhaustive(t *testing.T) {
require.True(t, ok)

exec := search.NewMockExecutor(ctrl)
b.newExecutorFn = func() (search.Executor, error) {
b.newExecutorWithRLockFn = func() (search.Executor, error) {
return exec, nil
}

Expand Down Expand Up @@ -681,7 +679,7 @@ func TestBlockMockQuerySeriesLimitExhaustive(t *testing.T) {
require.True(t, ok)

exec := search.NewMockExecutor(ctrl)
b.newExecutorFn = func() (search.Executor, error) {
b.newExecutorWithRLockFn = func() (search.Executor, error) {
return exec, nil
}

Expand Down Expand Up @@ -732,7 +730,7 @@ func TestBlockMockQueryDocsLimitNonExhaustive(t *testing.T) {
require.True(t, ok)

exec := search.NewMockExecutor(ctrl)
b.newExecutorFn = func() (search.Executor, error) {
b.newExecutorWithRLockFn = func() (search.Executor, error) {
return exec, nil
}

Expand Down Expand Up @@ -781,7 +779,7 @@ func TestBlockMockQueryDocsLimitExhaustive(t *testing.T) {
require.True(t, ok)

exec := search.NewMockExecutor(ctrl)
b.newExecutorFn = func() (search.Executor, error) {
b.newExecutorWithRLockFn = func() (search.Executor, error) {
return exec, nil
}

Expand Down Expand Up @@ -833,7 +831,7 @@ func TestBlockMockQueryMergeResultsMapLimit(t *testing.T) {
require.NoError(t, b.Seal())

exec := search.NewMockExecutor(ctrl)
b.newExecutorFn = func() (search.Executor, error) {
b.newExecutorWithRLockFn = func() (search.Executor, error) {
return exec, nil
}

Expand Down Expand Up @@ -885,7 +883,7 @@ func TestBlockMockQueryMergeResultsDupeID(t *testing.T) {
require.True(t, ok)

exec := search.NewMockExecutor(ctrl)
b.newExecutorFn = func() (search.Executor, error) {
b.newExecutorWithRLockFn = func() (search.Executor, error) {
return exec, nil
}

Expand Down
11 changes: 11 additions & 0 deletions src/dbnode/storage/index/mutable_segments.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,17 @@ func (m *mutableSegments) backgroundCompactWithTask(
return err
}

// Add a read through cache for repeated expensive queries against
// background compacted segments since they can live for quite some
// time and accrue a large set of documents.
if immSeg, ok := compacted.(segment.ImmutableSegment); ok {
var (
plCache = m.opts.PostingsListCache()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it fine for this to share the same cache or should we generate a new cache?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sharing the cache is best since they can be reused and doesn't matter if it's reused across background segments or on disk segments.

readThroughOpts = m.opts.ReadThroughSegmentOptions()
)
compacted = NewReadThroughSegment(immSeg, plCache, readThroughOpts)
}

// Rotate out the replaced frozen segments and add the compacted one.
m.Lock()
defer m.Unlock()
Expand Down
Loading