From 8c557d37f8ec9d89804618a58e52d359fd155596 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Sun, 28 Jun 2020 10:48:57 -0400 Subject: [PATCH 1/7] [dbnode] Negate possibility of segment rotation returning query error during point in time rotation --- src/dbnode/storage/index/block.go | 6 +- src/dbnode/storage/index/block_test.go | 24 ++- src/dbnode/storage/index/mutable_segments.go | 11 ++ src/m3ninx/index/segment/fst/segment.go | 144 ++++++++++++++++-- .../index/segment/fst/writer_reader_test.go | 92 ++++++++++- 5 files changed, 244 insertions(+), 33 deletions(-) diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index 7789aece49..2a6759ea2e 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -131,7 +131,7 @@ type block struct { coldMutableSegments []*mutableSegments shardRangesSegmentsByVolumeType shardRangesSegmentsByVolumeType newFieldsAndTermsIteratorFn newFieldsAndTermsIteratorFn - newExecutorFn newExecutorFn + newExecutorWithRLockFn newExecutorFn blockStart time.Time blockEnd time.Time blockSize time.Duration @@ -234,7 +234,7 @@ func NewBlock( queryStats: opts.QueryStats(), } b.newFieldsAndTermsIteratorFn = newFieldsAndTermsIterator - b.newExecutorFn = b.executorWithRLock + b.newExecutorWithRLockFn = b.executorWithRLock return b, nil } @@ -423,7 +423,7 @@ func (b *block) queryWithSpan( return false, ErrUnableToQueryBlockClosed } - exec, err := b.newExecutorFn() + exec, err := b.newExecutorWithRLockFn() if err != nil { return false, err } diff --git a/src/dbnode/storage/index/block_test.go b/src/dbnode/storage/index/block_test.go index 0f1b5ad8e0..53cb68a0f0 100644 --- a/src/dbnode/storage/index/block_test.go +++ b/src/dbnode/storage/index/block_test.go @@ -393,9 +393,7 @@ func TestBlockQueryExecutorError(t *testing.T) { b, ok := blk.(*block) require.True(t, ok) - b.newExecutorFn = func() (search.Executor, error) { - b.RLock() // ensures we call newExecutorFn with RLock, or this would deadlock - defer b.RUnlock() + b.newExecutorWithRLockFn = func() (search.Executor, error) { return nil, fmt.Errorf("random-err") } @@ -479,7 +477,7 @@ func TestBlockMockQueryExecutorExecError(t *testing.T) { // dIter:= doc.NewMockIterator(ctrl) exec := search.NewMockExecutor(ctrl) - b.newExecutorFn = func() (search.Executor, error) { + b.newExecutorWithRLockFn = func() (search.Executor, error) { return exec, nil } gomock.InOrder( @@ -504,7 +502,7 @@ func TestBlockMockQueryExecutorExecIterErr(t *testing.T) { require.True(t, ok) exec := search.NewMockExecutor(ctrl) - b.newExecutorFn = func() (search.Executor, error) { + b.newExecutorWithRLockFn = func() (search.Executor, error) { return exec, nil } @@ -544,7 +542,7 @@ func TestBlockMockQueryExecutorExecLimit(t *testing.T) { require.True(t, ok) exec := search.NewMockExecutor(ctrl) - b.newExecutorFn = func() (search.Executor, error) { + b.newExecutorWithRLockFn = func() (search.Executor, error) { return exec, nil } @@ -594,7 +592,7 @@ func TestBlockMockQueryExecutorExecIterCloseErr(t *testing.T) { require.True(t, ok) exec := search.NewMockExecutor(ctrl) - b.newExecutorFn = func() (search.Executor, error) { + b.newExecutorWithRLockFn = func() (search.Executor, error) { return exec, nil } @@ -632,7 +630,7 @@ func TestBlockMockQuerySeriesLimitNonExhaustive(t *testing.T) { require.True(t, ok) exec := search.NewMockExecutor(ctrl) - b.newExecutorFn = func() (search.Executor, error) { + b.newExecutorWithRLockFn = func() (search.Executor, error) { return exec, nil } @@ -681,7 +679,7 @@ func TestBlockMockQuerySeriesLimitExhaustive(t *testing.T) { require.True(t, ok) exec := search.NewMockExecutor(ctrl) - b.newExecutorFn = func() (search.Executor, error) { + b.newExecutorWithRLockFn = func() (search.Executor, error) { return exec, nil } @@ -732,7 +730,7 @@ func TestBlockMockQueryDocsLimitNonExhaustive(t *testing.T) { require.True(t, ok) exec := search.NewMockExecutor(ctrl) - b.newExecutorFn = func() (search.Executor, error) { + b.newExecutorWithRLockFn = func() (search.Executor, error) { return exec, nil } @@ -781,7 +779,7 @@ func TestBlockMockQueryDocsLimitExhaustive(t *testing.T) { require.True(t, ok) exec := search.NewMockExecutor(ctrl) - b.newExecutorFn = func() (search.Executor, error) { + b.newExecutorWithRLockFn = func() (search.Executor, error) { return exec, nil } @@ -833,7 +831,7 @@ func TestBlockMockQueryMergeResultsMapLimit(t *testing.T) { require.NoError(t, b.Seal()) exec := search.NewMockExecutor(ctrl) - b.newExecutorFn = func() (search.Executor, error) { + b.newExecutorWithRLockFn = func() (search.Executor, error) { return exec, nil } @@ -885,7 +883,7 @@ func TestBlockMockQueryMergeResultsDupeID(t *testing.T) { require.True(t, ok) exec := search.NewMockExecutor(ctrl) - b.newExecutorFn = func() (search.Executor, error) { + b.newExecutorWithRLockFn = func() (search.Executor, error) { return exec, nil } diff --git a/src/dbnode/storage/index/mutable_segments.go b/src/dbnode/storage/index/mutable_segments.go index 3a2669b84c..5d01ee939a 100644 --- a/src/dbnode/storage/index/mutable_segments.go +++ b/src/dbnode/storage/index/mutable_segments.go @@ -439,6 +439,17 @@ func (m *mutableSegments) backgroundCompactWithTask( return err } + // Add a read through cache for repeated expensive queries against + // background compacted segments since they can live for quite some + // time and accrue a large set of documents. + if immSeg, ok := compacted.(segment.ImmutableSegment); ok { + var ( + plCache = m.opts.PostingsListCache() + readThroughOpts = m.opts.ReadThroughSegmentOptions() + ) + compacted = NewReadThroughSegment(immSeg, plCache, readThroughOpts) + } + // Rotate out the replaced frozen segments and add the compacted one. m.Lock() defer m.Unlock() diff --git a/src/m3ninx/index/segment/fst/segment.go b/src/m3ninx/index/segment/fst/segment.go index 783d95e348..ea27fd739e 100644 --- a/src/m3ninx/index/segment/fst/segment.go +++ b/src/m3ninx/index/segment/fst/segment.go @@ -47,6 +47,7 @@ import ( var ( errReaderClosed = errors.New("segment is closed") + errReaderFinalized = errors.New("segment is finalized") errReaderNilRegexp = errors.New("nil regexp provided") errUnsupportedMajorVersion = errors.New("unsupported major version") errDocumentsDataUnset = errors.New("documents data bytes are not set") @@ -181,6 +182,7 @@ type fsSegment struct { sync.RWMutex ctx context.Context closed bool + finalized bool fieldsFST *vellum.FST docsDataReader *docs.DataReader docsIndexReader *docs.IndexReader @@ -194,6 +196,12 @@ type fsSegment struct { } func (r *fsSegment) SegmentData(ctx context.Context) (SegmentData, error) { + r.RLock() + defer r.RUnlock() + if r.closed { + return SegmentData{}, errReaderClosed + } + // NB(r): Ensure that we do not release, mmaps, etc // until all readers have been closed. r.ctx.DependsOn(ctx) @@ -274,10 +282,13 @@ func (r *fsSegment) Close() error { } func (r *fsSegment) Finalize() { + r.Lock() r.fieldsFST.Close() if r.data.Closer != nil { r.data.Closer.Close() } + r.finalized = true + r.Unlock() } func (r *fsSegment) FieldsIterable() sgmt.FieldsIterable { @@ -390,9 +401,21 @@ func (r *fsSegment) MatchField(field []byte) (postings.List, error) { if r.closed { return nil, errReaderClosed } + return r.matchFieldWithRLockNotClosedMaybeFinalized(field) +} + +func (r *fsSegment) matchFieldWithRLockNotClosedMaybeFinalized( + field []byte, +) (postings.List, error) { + // NB(r): Not closed, but could be finalized (i.e. closed segment reader) + // calling match field after this segment is finalized. + if r.finalized { + return nil, errReaderFinalized + } + if !r.data.Version.supportsFieldPostingsList() { // i.e. don't have the field level postings list, so fall back to regexp - return r.matchRegexpWithRLock(field, index.DotStarCompiledRegex()) + return r.matchRegexpWithRLockNotClosedMaybeFinalized(field, index.DotStarCompiledRegex()) } termsFSTOffset, exists, err := r.fieldsFST.Get(field) @@ -424,6 +447,17 @@ func (r *fsSegment) MatchTerm(field []byte, term []byte) (postings.List, error) if r.closed { return nil, errReaderClosed } + return r.matchTermWithRLockNotClosedMaybeFinalized(field, term) +} + +func (r *fsSegment) matchTermWithRLockNotClosedMaybeFinalized( + field, term []byte, +) (postings.List, error) { + // NB(r): Not closed, but could be finalized (i.e. closed segment reader) + // calling match field after this segment is finalized. + if r.finalized { + return nil, errReaderFinalized + } termsFST, exists, err := r.retrieveTermsFSTWithRLock(field) if err != nil { @@ -460,17 +494,27 @@ func (r *fsSegment) MatchTerm(field []byte, term []byte) (postings.List, error) return pl, nil } -func (r *fsSegment) MatchRegexp(field []byte, compiled index.CompiledRegex) (postings.List, error) { +func (r *fsSegment) MatchRegexp( + field []byte, + compiled index.CompiledRegex, +) (postings.List, error) { r.RLock() - pl, err := r.matchRegexpWithRLock(field, compiled) - r.RUnlock() - return pl, err -} - -func (r *fsSegment) matchRegexpWithRLock(field []byte, compiled index.CompiledRegex) (postings.List, error) { + defer r.Unlock() if r.closed { return nil, errReaderClosed } + return r.matchRegexpWithRLockNotClosedMaybeFinalized(field, compiled) +} + +func (r *fsSegment) matchRegexpWithRLockNotClosedMaybeFinalized( + field []byte, + compiled index.CompiledRegex, +) (postings.List, error) { + // NB(r): Not closed, but could be finalized (i.e. closed segment reader) + // calling match field after this segment is finalized. + if r.finalized { + return nil, errReaderFinalized + } re := compiled.FST if re == nil { @@ -539,6 +583,15 @@ func (r *fsSegment) MatchAll() (postings.MutableList, error) { if r.closed { return nil, errReaderClosed } + return r.matchAllWithRLockNotClosedMaybeFinalized() +} + +func (r *fsSegment) matchAllWithRLockNotClosedMaybeFinalized() (postings.MutableList, error) { + // NB(r): Not closed, but could be finalized (i.e. closed segment reader) + // calling match field after this segment is finalized. + if r.finalized { + return nil, errReaderFinalized + } pl := r.opts.PostingsListPool().Get() err := pl.AddRange(r.startInclusive, r.endExclusive) @@ -555,6 +608,15 @@ func (r *fsSegment) Doc(id postings.ID) (doc.Document, error) { if r.closed { return doc.Document{}, errReaderClosed } + return r.docWithRLockNotClosedMaybeFinalized(id) +} + +func (r *fsSegment) docWithRLockNotClosedMaybeFinalized(id postings.ID) (doc.Document, error) { + // NB(r): Not closed, but could be finalized (i.e. closed segment reader) + // calling match field after this segment is finalized. + if r.finalized { + return doc.Document{}, errReaderFinalized + } // If using docs slice reader, return from the in memory slice reader if r.docsSliceReader != nil { @@ -575,6 +637,15 @@ func (r *fsSegment) Docs(pl postings.List) (doc.Iterator, error) { if r.closed { return nil, errReaderClosed } + return r.docsWithRLockNotClosedMaybeFinalized(pl) +} + +func (r *fsSegment) docsWithRLockNotClosedMaybeFinalized(pl postings.List) (doc.Iterator, error) { + // NB(r): Not closed, but could be finalized (i.e. closed segment reader) + // calling match field after this segment is finalized. + if r.finalized { + return nil, errReaderFinalized + } return index.NewIDDocIterator(r, pl.Iterator()), nil } @@ -585,6 +656,16 @@ func (r *fsSegment) AllDocs() (index.IDDocIterator, error) { if r.closed { return nil, errReaderClosed } + return r.allDocsWithRLockNotClosedMaybeFinalized() +} + +func (r *fsSegment) allDocsWithRLockNotClosedMaybeFinalized() (index.IDDocIterator, error) { + // NB(r): Not closed, but could be finalized (i.e. closed segment reader) + // calling match field after this segment is finalized. + if r.finalized { + return nil, errReaderFinalized + } + pi := postings.NewRangeIterator(r.startInclusive, r.endExclusive) return index.NewIDDocIterator(r, pi), nil } @@ -771,7 +852,11 @@ func (sr *fsSegmentReader) MatchField(field []byte) (postings.List, error) { sr.RUnlock() return nil, errReaderClosed } - pl, err := sr.fsSegment.MatchField(field) + // NB(r): We are allowed to call match field after Close called on + // the segment but not after it is finalized. + sr.fsSegment.RLock() + pl, err := sr.fsSegment.matchFieldWithRLockNotClosedMaybeFinalized(field) + sr.fsSegment.RUnlock() sr.RUnlock() return pl, err } @@ -782,18 +867,29 @@ func (sr *fsSegmentReader) MatchTerm(field []byte, term []byte) (postings.List, sr.RUnlock() return nil, errReaderClosed } - pl, err := sr.fsSegment.MatchTerm(field, term) + // NB(r): We are allowed to call match field after Close called on + // the segment but not after it is finalized. + sr.fsSegment.RLock() + pl, err := sr.fsSegment.matchTermWithRLockNotClosedMaybeFinalized(field, term) + sr.fsSegment.RUnlock() sr.RUnlock() return pl, err } -func (sr *fsSegmentReader) MatchRegexp(field []byte, compiled index.CompiledRegex) (postings.List, error) { +func (sr *fsSegmentReader) MatchRegexp( + field []byte, + compiled index.CompiledRegex, +) (postings.List, error) { sr.RLock() if sr.closed { sr.RUnlock() return nil, errReaderClosed } - pl, err := sr.fsSegment.MatchRegexp(field, compiled) + // NB(r): We are allowed to call match field after Close called on + // the segment but not after it is finalized. + sr.fsSegment.RLock() + pl, err := sr.fsSegment.matchRegexpWithRLockNotClosedMaybeFinalized(field, compiled) + sr.fsSegment.RUnlock() sr.RUnlock() return pl, err } @@ -804,7 +900,11 @@ func (sr *fsSegmentReader) MatchAll() (postings.MutableList, error) { sr.RUnlock() return nil, errReaderClosed } - pl, err := sr.fsSegment.MatchAll() + // NB(r): We are allowed to call match field after Close called on + // the segment but not after it is finalized. + sr.fsSegment.RLock() + pl, err := sr.fsSegment.matchAllWithRLockNotClosedMaybeFinalized() + sr.fsSegment.RUnlock() sr.RUnlock() return pl, err } @@ -815,7 +915,11 @@ func (sr *fsSegmentReader) Doc(id postings.ID) (doc.Document, error) { sr.RUnlock() return doc.Document{}, errReaderClosed } - pl, err := sr.fsSegment.Doc(id) + // NB(r): We are allowed to call match field after Close called on + // the segment but not after it is finalized. + sr.fsSegment.RLock() + pl, err := sr.fsSegment.docWithRLockNotClosedMaybeFinalized(id) + sr.fsSegment.RUnlock() sr.RUnlock() return pl, err } @@ -826,7 +930,11 @@ func (sr *fsSegmentReader) Docs(pl postings.List) (doc.Iterator, error) { sr.RUnlock() return nil, errReaderClosed } - iter, err := sr.fsSegment.Docs(pl) + // NB(r): We are allowed to call match field after Close called on + // the segment but not after it is finalized. + sr.fsSegment.RLock() + iter, err := sr.fsSegment.docsWithRLockNotClosedMaybeFinalized(pl) + sr.fsSegment.RUnlock() sr.RUnlock() return iter, err } @@ -837,7 +945,11 @@ func (sr *fsSegmentReader) AllDocs() (index.IDDocIterator, error) { sr.RUnlock() return nil, errReaderClosed } - iter, err := sr.fsSegment.AllDocs() + // NB(r): We are allowed to call match field after Close called on + // the segment but not after it is finalized. + sr.fsSegment.RLock() + iter, err := sr.fsSegment.allDocsWithRLockNotClosedMaybeFinalized() + sr.fsSegment.RUnlock() sr.RUnlock() return iter, err } diff --git a/src/m3ninx/index/segment/fst/writer_reader_test.go b/src/m3ninx/index/segment/fst/writer_reader_test.go index 52bcb605bc..9449dc1078 100644 --- a/src/m3ninx/index/segment/fst/writer_reader_test.go +++ b/src/m3ninx/index/segment/fst/writer_reader_test.go @@ -476,7 +476,6 @@ func TestFieldsEqualsParallel(t *testing.T) { func TestPostingsListLifecycleSimple(t *testing.T) { _, fstSeg := newTestSegments(t, fewTestDocuments) - require.NoError(t, fstSeg.Close()) _, err := fstSeg.FieldsIterable().Fields() @@ -498,6 +497,55 @@ func TestPostingsListReaderLifecycle(t *testing.T) { require.NoError(t, err) } +func TestSegmentReaderValidUntilClose(t *testing.T) { + _, fstSeg := newTestSegments(t, fewTestDocuments) + + reader, err := fstSeg.Reader() + require.NoError(t, err) + + // Close segment early, expect reader still valid until close. + err = fstSeg.Close() + require.NoError(t, err) + + // Make sure all methods allow for calls until the reader is closed. + var ( + list postings.List + ) + list, err = reader.MatchField([]byte("fruit")) + require.NoError(t, err) + assertPostingsList(t, list, []postings.ID{0, 1, 2}) + + list, err = reader.MatchTerm([]byte("color"), []byte("yellow")) + require.NoError(t, err) + assertPostingsList(t, list, []postings.ID{0, 2}) + + re, err := index.CompileRegex([]byte("^.*apple$")) + require.NoError(t, err) + list, err = reader.MatchRegexp([]byte("fruit"), re) + require.NoError(t, err) + assertPostingsList(t, list, []postings.ID{1, 2}) + + list, err = reader.MatchAll() + require.NoError(t, err) + assertPostingsList(t, list, []postings.ID{0, 1, 2}) + + _, err = reader.Doc(0) + require.NoError(t, err) + + _, err = reader.Docs(list) + require.NoError(t, err) + + _, err = reader.AllDocs() + require.NoError(t, err) + + // Now close. + require.NoError(t, reader.Close()) + + // Make sure reader now starts returning errors. + _, err = reader.MatchTerm([]byte("color"), []byte("yellow")) + require.Error(t, err) +} + func newTestSegments(t *testing.T, docs []doc.Document) (memSeg sgmt.MutableSegment, fstSeg sgmt.Segment) { s := newTestMemSegment(t) for _, d := range docs { @@ -535,6 +583,48 @@ func assertDocsEqual(t *testing.T, a, b doc.Iterator) { } } +func assertPostingsList(t *testing.T, l postings.List, exp []postings.ID) { + it := l.Iterator() + + defer func() { + require.False(t, it.Next(), "should exhaust just once") + require.NoError(t, it.Err(), "should not complete with error") + require.NoError(t, it.Close(), "should not encounter error on close") + }() + + match := make(map[postings.ID]struct{}, len(exp)) + for _, v := range exp { + match[v] = struct{}{} + } + + for it.Next() { + curr := it.Current() + + _, ok := match[curr] + if !ok { + require.Fail(t, + fmt.Sprintf("expected %d, not found in postings iter", curr)) + return + } + + delete(match, curr) + } + + if len(match) == 0 { + // Success. + return + } + + remaining := make([]int, 0, len(match)) + for id := range match { + remaining = append(remaining, int(id)) + } + + msg := fmt.Sprintf("unmatched expected IDs %v, not found in postings iter", + remaining) + require.Fail(t, msg) +} + func collectDocs(iter doc.Iterator) ([]doc.Document, error) { var docs []doc.Document for iter.Next() { From 8840fd2fcd0e43c89ae73405e7f8ab0f063cc5fd Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Sun, 28 Jun 2020 12:20:57 -0400 Subject: [PATCH 2/7] Fix flakey test and add integration test for high concurrent queries during indexing --- src/dbnode/integration/index_helpers.go | 30 +++++++++-- ...index_single_node_high_concurrency_test.go | 51 +++++++++++++++++++ src/dbnode/storage/stats/query_stats_test.go | 23 ++++++--- 3 files changed, 92 insertions(+), 12 deletions(-) diff --git a/src/dbnode/integration/index_helpers.go b/src/dbnode/integration/index_helpers.go index 944028fc1d..e66f380060 100644 --- a/src/dbnode/integration/index_helpers.go +++ b/src/dbnode/integration/index_helpers.go @@ -171,25 +171,45 @@ func genIDTags(i int, j int, numTags int, opts ...genIDTagsOption) (ident.ID, id } func isIndexed(t *testing.T, s client.Session, ns ident.ID, id ident.ID, tags ident.TagIterator) bool { + result, err := isIndexedChecked(t, s, ns, id, tags) + if err != nil { + return false + } + return result +} + +func isIndexedChecked(t *testing.T, s client.Session, ns ident.ID, id ident.ID, tags ident.TagIterator) (bool, error) { q := newQuery(t, tags) iter, _, err := s.FetchTaggedIDs(ns, index.Query{Query: q}, index.QueryOptions{ StartInclusive: time.Now(), EndExclusive: time.Now(), SeriesLimit: 10}) if err != nil { - return false + return false, err } + + defer iter.Finalize() + if !iter.Next() { - return false + return false, nil } + cuNs, cuID, cuTag := iter.Current() + if err := iter.Err(); err != nil { + return false, fmt.Errorf("iter err: %v", err) + } + if ns.String() != cuNs.String() { - return false + return false, fmt.Errorf("namespace not matched") } if id.String() != cuID.String() { - return false + return false, fmt.Errorf("id not matched") } - return ident.NewTagIterMatcher(tags).Matches(cuTag) + if !ident.NewTagIterMatcher(tags).Matches(cuTag) { + return false, fmt.Errorf("tags did not match") + } + + return true, nil } func newQuery(t *testing.T, tags ident.TagIterator) idx.Query { diff --git a/src/dbnode/integration/index_single_node_high_concurrency_test.go b/src/dbnode/integration/index_single_node_high_concurrency_test.go index c75884632d..47e7c70bab 100644 --- a/src/dbnode/integration/index_single_node_high_concurrency_test.go +++ b/src/dbnode/integration/index_single_node_high_concurrency_test.go @@ -24,6 +24,7 @@ package integration import ( "fmt" + "math/rand" "sync" "testing" "time" @@ -81,6 +82,20 @@ func TestIndexSingleNodeHighConcurrencyFewTagsHighCardinalitySkipWrites(t *testi }) } +func TestIndexSingleNodeHighConcurrencyFewTagsHighCardinalityQueryDuringWrites(t *testing.T) { + if testing.Short() { + t.SkipNow() // Just skip if we're doing a short run + } + + testIndexSingleNodeHighConcurrency(t, testIndexHighConcurrencyOptions{ + concurrencyEnqueueWorker: 8, + concurrencyWrites: 5000, + enqueuePerWorker: 10000, + numTags: 2, + concurrencyQueryDuringWrites: 8, + }) +} + type testIndexHighConcurrencyOptions struct { concurrencyEnqueueWorker int concurrencyWrites int @@ -89,6 +104,9 @@ type testIndexHighConcurrencyOptions struct { // skipWrites will mix in skipped to make sure // it doesn't interrupt the regular real-time ingestion pipeline. skipWrites bool + // concurrencyQueryDuringWrites will issue queries while we + // are performing writes. + concurrencyQueryDuringWrites int } func testIndexSingleNodeHighConcurrency( @@ -188,7 +206,40 @@ func testIndexSingleNodeHighConcurrency( }() } + // If concurrent query load enabled while writing also hit with queries. + queryConcDuringWritesCloseCh := make(chan struct{}, 1) + if opts.concurrencyQueryDuringWrites == 0 { + log.Info("no concurrent queries during writes configured") + } else { + log.Info("starting concurrent queries during writes", + zap.Int("concurrency", opts.concurrencyQueryDuringWrites)) + for i := 0; i < opts.concurrencyQueryDuringWrites; i++ { + go func() { + src := rand.NewSource(int64(i)) + rng := rand.New(src) + for { + select { + case <-queryConcDuringWritesCloseCh: + return + default: + randI := rng.Intn(opts.concurrencyEnqueueWorker) + randJ := opts.enqueuePerWorker + id, tags := genIDTags(randI, randJ, opts.numTags) + _, err := isIndexedChecked(t, session, md.ID(), id, tags) + if err != nil { + if n := numTotalErrors.Inc(); n < 10 { + // Log the first 10 errors for visibility but not flood. + log.Error("sampled query error", zap.Error(err)) + } + } + } + } + }() + } + } + wg.Wait() + close(queryConcDuringWritesCloseCh) require.Equal(t, int(0), int(numTotalErrors.Load())) diff --git a/src/dbnode/storage/stats/query_stats_test.go b/src/dbnode/storage/stats/query_stats_test.go index aa6f051a82..db746c7094 100644 --- a/src/dbnode/storage/stats/query_stats_test.go +++ b/src/dbnode/storage/stats/query_stats_test.go @@ -25,6 +25,8 @@ import ( "testing" "time" + xclock "github.com/m3db/m3/src/x/clock" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -64,31 +66,38 @@ func TestUpdateTracker(t *testing.T) { err := queryStats.Update(3) require.NoError(t, err) - verifyStats(t, tracker, 3, 3) + verifyStats(t, tracker, 3, 3) err = queryStats.Update(2) require.NoError(t, err) - verifyStats(t, tracker, 2, 5) + verifyStats(t, tracker, 2, 5) } func TestPeriodicallyResetRecentDocs(t *testing.T) { tracker := &testQueryStatsTracker{lookback: time.Millisecond} queryStats := NewQueryStats(tracker) - defer queryStats.Stop() err := queryStats.Update(1) require.NoError(t, err) - verifyStats(t, tracker, 1, 1) + verifyStats(t, tracker, 1, 1) queryStats.Start() + defer queryStats.Stop() time.Sleep(tracker.lookback * 2) - verifyStats(t, tracker, 0, 0) + success := xclock.WaitUntil(func() bool { + return statsEqual(tracker.StatsValues(), 0, 0) + }, 10*time.Second) + require.True(t, success, "did not eventually reset") } func verifyStats(t *testing.T, tracker *testQueryStatsTracker, expectedNew int64, expectedRecent int64) { values := tracker.StatsValues() - assert.Equal(t, expectedNew, values.NewDocs) - assert.Equal(t, expectedRecent, values.RecentDocs) + assert.True(t, statsEqual(values, expectedNew, expectedRecent)) +} + +func statsEqual(values QueryStatsValues, expectedNew int64, expectedRecent int64) bool { + return expectedNew == values.NewDocs && + expectedRecent == values.RecentDocs } From 9e2c0bbf5d9d0d47c4343506458d850ed5374ad1 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Sun, 28 Jun 2020 12:22:42 -0400 Subject: [PATCH 3/7] Correctly search for random metric --- .../integration/index_single_node_high_concurrency_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dbnode/integration/index_single_node_high_concurrency_test.go b/src/dbnode/integration/index_single_node_high_concurrency_test.go index 47e7c70bab..6545747bca 100644 --- a/src/dbnode/integration/index_single_node_high_concurrency_test.go +++ b/src/dbnode/integration/index_single_node_high_concurrency_test.go @@ -223,7 +223,7 @@ func testIndexSingleNodeHighConcurrency( return default: randI := rng.Intn(opts.concurrencyEnqueueWorker) - randJ := opts.enqueuePerWorker + randJ := rng.Intn(opts.enqueuePerWorker) id, tags := genIDTags(randI, randJ, opts.numTags) _, err := isIndexedChecked(t, session, md.ID(), id, tags) if err != nil { From e9998ecddf3b3fef7208a1126983c485ff55744a Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Sun, 28 Jun 2020 12:35:34 -0400 Subject: [PATCH 4/7] Better testing of the compaction case --- ...index_single_node_high_concurrency_test.go | 105 +++++++++++------- 1 file changed, 62 insertions(+), 43 deletions(-) diff --git a/src/dbnode/integration/index_single_node_high_concurrency_test.go b/src/dbnode/integration/index_single_node_high_concurrency_test.go index 6545747bca..5b3d057604 100644 --- a/src/dbnode/integration/index_single_node_high_concurrency_test.go +++ b/src/dbnode/integration/index_single_node_high_concurrency_test.go @@ -90,9 +90,10 @@ func TestIndexSingleNodeHighConcurrencyFewTagsHighCardinalityQueryDuringWrites(t testIndexSingleNodeHighConcurrency(t, testIndexHighConcurrencyOptions{ concurrencyEnqueueWorker: 8, concurrencyWrites: 5000, - enqueuePerWorker: 10000, + enqueuePerWorker: 100000, numTags: 2, - concurrencyQueryDuringWrites: 8, + concurrencyQueryDuringWrites: 16, + skipVerify: true, }) } @@ -101,12 +102,19 @@ type testIndexHighConcurrencyOptions struct { concurrencyWrites int enqueuePerWorker int numTags int + // skipWrites will mix in skipped to make sure // it doesn't interrupt the regular real-time ingestion pipeline. skipWrites bool + // concurrencyQueryDuringWrites will issue queries while we // are performing writes. concurrencyQueryDuringWrites int + + // skipVerify will skip verifying the actual series were indexed + // which is useful if just sanity checking can write/read concurrently + // without issue/errors and the stats look good. + skipVerify bool } func testIndexSingleNodeHighConcurrency( @@ -208,6 +216,7 @@ func testIndexSingleNodeHighConcurrency( // If concurrent query load enabled while writing also hit with queries. queryConcDuringWritesCloseCh := make(chan struct{}, 1) + numTotalQueryErrors := atomic.NewUint32(0) if opts.concurrencyQueryDuringWrites == 0 { log.Info("no concurrent queries during writes configured") } else { @@ -227,7 +236,7 @@ func testIndexSingleNodeHighConcurrency( id, tags := genIDTags(randI, randJ, opts.numTags) _, err := isIndexedChecked(t, session, md.ID(), id, tags) if err != nil { - if n := numTotalErrors.Inc(); n < 10 { + if n := numTotalQueryErrors.Inc(); n < 10 { // Log the first 10 errors for visibility but not flood. log.Error("sampled query error", zap.Error(err)) } @@ -238,9 +247,10 @@ func testIndexSingleNodeHighConcurrency( } } + // Wait for writes to at least be enqueued. wg.Wait() - close(queryConcDuringWritesCloseCh) + // Check no write errors. require.Equal(t, int(0), int(numTotalErrors.Load())) log.Info("test data written", @@ -275,49 +285,58 @@ func testIndexSingleNodeHighConcurrency( assert.True(t, indexProcess, fmt.Sprintf("expected to index %d but processed %d", expectNumIndex, value)) - // Now check all of them are individually indexed. - var ( - fetchWg sync.WaitGroup - notIndexedErrs []error - notIndexedLock sync.Mutex - ) - for i := 0; i < opts.concurrencyEnqueueWorker; i++ { - fetchWg.Add(1) - i := i - go func() { - defer fetchWg.Done() + // Allow concurrent query during writes to finish. + close(queryConcDuringWritesCloseCh) - for j := 0; j < opts.enqueuePerWorker; j++ { - if opts.skipWrites && j%2 == 0 { - continue // not meant to be indexed. - } + // Check no query errors. + require.Equal(t, int(0), int(numTotalErrors.Load())) - j := j - fetchWg.Add(1) - workerPool.Go(func() { - defer fetchWg.Done() - - id, tags := genIDTags(i, j, opts.numTags) - indexed := xclock.WaitUntil(func() bool { - found := isIndexed(t, session, md.ID(), id, tags) - return found - }, 30*time.Second) - if !indexed { - err := fmt.Errorf("not indexed series: i=%d, j=%d", i, j) - notIndexedLock.Lock() - notIndexedErrs = append(notIndexedErrs, err) - notIndexedLock.Unlock() + if !opts.skipVerify { + log.Info("data indexing each series visible start") + // Now check all of them are individually indexed. + var ( + fetchWg sync.WaitGroup + notIndexedErrs []error + notIndexedLock sync.Mutex + ) + for i := 0; i < opts.concurrencyEnqueueWorker; i++ { + fetchWg.Add(1) + i := i + go func() { + defer fetchWg.Done() + + for j := 0; j < opts.enqueuePerWorker; j++ { + if opts.skipWrites && j%2 == 0 { + continue // not meant to be indexed. } - }) - } - }() + + j := j + fetchWg.Add(1) + workerPool.Go(func() { + defer fetchWg.Done() + + id, tags := genIDTags(i, j, opts.numTags) + indexed := xclock.WaitUntil(func() bool { + found := isIndexed(t, session, md.ID(), id, tags) + return found + }, 30*time.Second) + if !indexed { + err := fmt.Errorf("not indexed series: i=%d, j=%d", i, j) + notIndexedLock.Lock() + notIndexedErrs = append(notIndexedErrs, err) + notIndexedLock.Unlock() + } + }) + } + }() + } + fetchWg.Wait() + log.Info("data indexing verify done", + zap.Int("notIndexed", len(notIndexedErrs)), + zap.Duration("took", time.Since(start))) + require.Equal(t, 0, len(notIndexedErrs), + fmt.Sprintf("not indexed errors: %v", notIndexedErrs[:min(5, len(notIndexedErrs))])) } - fetchWg.Wait() - log.Info("data indexing verify done", - zap.Int("notIndexed", len(notIndexedErrs)), - zap.Duration("took", time.Since(start))) - require.Equal(t, 0, len(notIndexedErrs), - fmt.Sprintf("not indexed errors: %v", notIndexedErrs[:min(5, len(notIndexedErrs))])) // Make sure attempted total indexing = skipped + written. counters = testSetup.Scope().Snapshot().Counters() From 3a724800893e3824d3ada67b3899c54f69cea399 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Sun, 28 Jun 2020 17:16:58 -0400 Subject: [PATCH 5/7] Fix not checking for segment being closed when iterating --- src/dbnode/storage/index/block.go | 2 +- src/m3ninx/index/segment/fst/segment.go | 61 ++++++++----------------- 2 files changed, 21 insertions(+), 42 deletions(-) diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index 2a6759ea2e..1c1a7cd603 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -513,8 +513,8 @@ func (b *block) queryWithSpan( } func (b *block) closeExecutorAsync(exec search.Executor) { - // Note: This only happens if closing the readers isn't clean. if err := exec.Close(); err != nil { + // Note: This only happens if closing the readers isn't clean. b.logger.Error("could not close search exec", zap.Error(err)) } } diff --git a/src/m3ninx/index/segment/fst/segment.go b/src/m3ninx/index/segment/fst/segment.go index ea27fd739e..ce05d25b70 100644 --- a/src/m3ninx/index/segment/fst/segment.go +++ b/src/m3ninx/index/segment/fst/segment.go @@ -637,17 +637,20 @@ func (r *fsSegment) Docs(pl postings.List) (doc.Iterator, error) { if r.closed { return nil, errReaderClosed } - return r.docsWithRLockNotClosedMaybeFinalized(pl) + return r.docsWithRLockNotClosedMaybeFinalized(r, pl) } -func (r *fsSegment) docsWithRLockNotClosedMaybeFinalized(pl postings.List) (doc.Iterator, error) { +func (r *fsSegment) docsWithRLockNotClosedMaybeFinalized( + retriever index.DocRetriever, + pl postings.List, +) (doc.Iterator, error) { // NB(r): Not closed, but could be finalized (i.e. closed segment reader) // calling match field after this segment is finalized. if r.finalized { return nil, errReaderFinalized } - return index.NewIDDocIterator(r, pl.Iterator()), nil + return index.NewIDDocIterator(retriever, pl.Iterator()), nil } func (r *fsSegment) AllDocs() (index.IDDocIterator, error) { @@ -656,10 +659,12 @@ func (r *fsSegment) AllDocs() (index.IDDocIterator, error) { if r.closed { return nil, errReaderClosed } - return r.allDocsWithRLockNotClosedMaybeFinalized() + return r.allDocsWithRLockNotClosedMaybeFinalized(r) } -func (r *fsSegment) allDocsWithRLockNotClosedMaybeFinalized() (index.IDDocIterator, error) { +func (r *fsSegment) allDocsWithRLockNotClosedMaybeFinalized( + retriever index.DocRetriever, +) (index.IDDocIterator, error) { // NB(r): Not closed, but could be finalized (i.e. closed segment reader) // calling match field after this segment is finalized. if r.finalized { @@ -667,7 +672,7 @@ func (r *fsSegment) allDocsWithRLockNotClosedMaybeFinalized() (index.IDDocIterat } pi := postings.NewRangeIterator(r.startInclusive, r.endExclusive) - return index.NewIDDocIterator(r, pi), nil + return index.NewIDDocIterator(retriever, pi), nil } func (r *fsSegment) retrievePostingsListWithRLock(postingsOffset uint64) (postings.List, error) { @@ -828,10 +833,11 @@ func (r *fsSegment) retrieveBytesWithRLock(base []byte, offset uint64) ([]byte, return base[payloadStart:payloadEnd], nil } -var _ index.Reader = &fsSegmentReader{} +var _ index.Reader = (*fsSegmentReader)(nil) +// fsSegmentReader is not thread safe for use and relies on the underlying +// segment for synchronization. type fsSegmentReader struct { - sync.RWMutex closed bool ctx context.Context fsSegment *fsSegment @@ -847,32 +853,24 @@ func newReader( } func (sr *fsSegmentReader) MatchField(field []byte) (postings.List, error) { - sr.RLock() if sr.closed { - sr.RUnlock() - return nil, errReaderClosed } // NB(r): We are allowed to call match field after Close called on // the segment but not after it is finalized. sr.fsSegment.RLock() pl, err := sr.fsSegment.matchFieldWithRLockNotClosedMaybeFinalized(field) sr.fsSegment.RUnlock() - sr.RUnlock() return pl, err } func (sr *fsSegmentReader) MatchTerm(field []byte, term []byte) (postings.List, error) { - sr.RLock() if sr.closed { - sr.RUnlock() - return nil, errReaderClosed } // NB(r): We are allowed to call match field after Close called on // the segment but not after it is finalized. sr.fsSegment.RLock() pl, err := sr.fsSegment.matchTermWithRLockNotClosedMaybeFinalized(field, term) sr.fsSegment.RUnlock() - sr.RUnlock() return pl, err } @@ -880,88 +878,69 @@ func (sr *fsSegmentReader) MatchRegexp( field []byte, compiled index.CompiledRegex, ) (postings.List, error) { - sr.RLock() if sr.closed { - sr.RUnlock() - return nil, errReaderClosed } // NB(r): We are allowed to call match field after Close called on // the segment but not after it is finalized. sr.fsSegment.RLock() pl, err := sr.fsSegment.matchRegexpWithRLockNotClosedMaybeFinalized(field, compiled) sr.fsSegment.RUnlock() - sr.RUnlock() return pl, err } func (sr *fsSegmentReader) MatchAll() (postings.MutableList, error) { - sr.RLock() if sr.closed { - sr.RUnlock() - return nil, errReaderClosed } // NB(r): We are allowed to call match field after Close called on // the segment but not after it is finalized. sr.fsSegment.RLock() pl, err := sr.fsSegment.matchAllWithRLockNotClosedMaybeFinalized() sr.fsSegment.RUnlock() - sr.RUnlock() return pl, err } func (sr *fsSegmentReader) Doc(id postings.ID) (doc.Document, error) { - sr.RLock() if sr.closed { - sr.RUnlock() - return doc.Document{}, errReaderClosed } // NB(r): We are allowed to call match field after Close called on // the segment but not after it is finalized. sr.fsSegment.RLock() pl, err := sr.fsSegment.docWithRLockNotClosedMaybeFinalized(id) sr.fsSegment.RUnlock() - sr.RUnlock() return pl, err } func (sr *fsSegmentReader) Docs(pl postings.List) (doc.Iterator, error) { - sr.RLock() if sr.closed { - sr.RUnlock() - return nil, errReaderClosed } // NB(r): We are allowed to call match field after Close called on // the segment but not after it is finalized. + // Also make sure the doc retriever is the reader not the segment so that + // is closed check is not performed and only the is finalized check. sr.fsSegment.RLock() - iter, err := sr.fsSegment.docsWithRLockNotClosedMaybeFinalized(pl) + iter, err := sr.fsSegment.docsWithRLockNotClosedMaybeFinalized(sr, pl) sr.fsSegment.RUnlock() - sr.RUnlock() return iter, err } func (sr *fsSegmentReader) AllDocs() (index.IDDocIterator, error) { - sr.RLock() if sr.closed { - sr.RUnlock() - return nil, errReaderClosed } // NB(r): We are allowed to call match field after Close called on // the segment but not after it is finalized. + // Also make sure the doc retriever is the reader not the segment so that + // is closed check is not performed and only the is finalized check. sr.fsSegment.RLock() - iter, err := sr.fsSegment.allDocsWithRLockNotClosedMaybeFinalized() + iter, err := sr.fsSegment.allDocsWithRLockNotClosedMaybeFinalized(sr) sr.fsSegment.RUnlock() - sr.RUnlock() return iter, err } func (sr *fsSegmentReader) Close() error { - sr.Lock() if sr.closed { - sr.Unlock() return errReaderClosed } sr.closed = true - sr.Unlock() // Close the context so that segment doesn't need to track this any longer. sr.ctx.Close() return nil From f30bf085b7d7da1aa24a7dbaa7e4390a3bed7ee7 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Sun, 28 Jun 2020 17:27:06 -0400 Subject: [PATCH 6/7] Address feedback --- src/m3ninx/index/segment/fst/segment.go | 51 ++++++++++++++----------- 1 file changed, 29 insertions(+), 22 deletions(-) diff --git a/src/m3ninx/index/segment/fst/segment.go b/src/m3ninx/index/segment/fst/segment.go index ce05d25b70..f5786532d1 100644 --- a/src/m3ninx/index/segment/fst/segment.go +++ b/src/m3ninx/index/segment/fst/segment.go @@ -401,10 +401,10 @@ func (r *fsSegment) MatchField(field []byte) (postings.List, error) { if r.closed { return nil, errReaderClosed } - return r.matchFieldWithRLockNotClosedMaybeFinalized(field) + return r.matchFieldNotClosedMaybeFinalizedWithRLock(field) } -func (r *fsSegment) matchFieldWithRLockNotClosedMaybeFinalized( +func (r *fsSegment) matchFieldNotClosedMaybeFinalizedWithRLock( field []byte, ) (postings.List, error) { // NB(r): Not closed, but could be finalized (i.e. closed segment reader) @@ -415,7 +415,7 @@ func (r *fsSegment) matchFieldWithRLockNotClosedMaybeFinalized( if !r.data.Version.supportsFieldPostingsList() { // i.e. don't have the field level postings list, so fall back to regexp - return r.matchRegexpWithRLockNotClosedMaybeFinalized(field, index.DotStarCompiledRegex()) + return r.matchRegexpNotClosedMaybeFinalizedWithRLock(field, index.DotStarCompiledRegex()) } termsFSTOffset, exists, err := r.fieldsFST.Get(field) @@ -447,10 +447,10 @@ func (r *fsSegment) MatchTerm(field []byte, term []byte) (postings.List, error) if r.closed { return nil, errReaderClosed } - return r.matchTermWithRLockNotClosedMaybeFinalized(field, term) + return r.matchTermNotClosedMaybeFinalizedWithRLock(field, term) } -func (r *fsSegment) matchTermWithRLockNotClosedMaybeFinalized( +func (r *fsSegment) matchTermNotClosedMaybeFinalizedWithRLock( field, term []byte, ) (postings.List, error) { // NB(r): Not closed, but could be finalized (i.e. closed segment reader) @@ -503,10 +503,10 @@ func (r *fsSegment) MatchRegexp( if r.closed { return nil, errReaderClosed } - return r.matchRegexpWithRLockNotClosedMaybeFinalized(field, compiled) + return r.matchRegexpNotClosedMaybeFinalizedWithRLock(field, compiled) } -func (r *fsSegment) matchRegexpWithRLockNotClosedMaybeFinalized( +func (r *fsSegment) matchRegexpNotClosedMaybeFinalizedWithRLock( field []byte, compiled index.CompiledRegex, ) (postings.List, error) { @@ -583,10 +583,10 @@ func (r *fsSegment) MatchAll() (postings.MutableList, error) { if r.closed { return nil, errReaderClosed } - return r.matchAllWithRLockNotClosedMaybeFinalized() + return r.matchAllNotClosedMaybeFinalizedWithRLock() } -func (r *fsSegment) matchAllWithRLockNotClosedMaybeFinalized() (postings.MutableList, error) { +func (r *fsSegment) matchAllNotClosedMaybeFinalizedWithRLock() (postings.MutableList, error) { // NB(r): Not closed, but could be finalized (i.e. closed segment reader) // calling match field after this segment is finalized. if r.finalized { @@ -608,10 +608,10 @@ func (r *fsSegment) Doc(id postings.ID) (doc.Document, error) { if r.closed { return doc.Document{}, errReaderClosed } - return r.docWithRLockNotClosedMaybeFinalized(id) + return r.docNotClosedMaybeFinalizedWithRLock(id) } -func (r *fsSegment) docWithRLockNotClosedMaybeFinalized(id postings.ID) (doc.Document, error) { +func (r *fsSegment) docNotClosedMaybeFinalizedWithRLock(id postings.ID) (doc.Document, error) { // NB(r): Not closed, but could be finalized (i.e. closed segment reader) // calling match field after this segment is finalized. if r.finalized { @@ -637,10 +637,10 @@ func (r *fsSegment) Docs(pl postings.List) (doc.Iterator, error) { if r.closed { return nil, errReaderClosed } - return r.docsWithRLockNotClosedMaybeFinalized(r, pl) + return r.docsNotClosedMaybeFinalizedWithRLock(r, pl) } -func (r *fsSegment) docsWithRLockNotClosedMaybeFinalized( +func (r *fsSegment) docsNotClosedMaybeFinalizedWithRLock( retriever index.DocRetriever, pl postings.List, ) (doc.Iterator, error) { @@ -659,10 +659,10 @@ func (r *fsSegment) AllDocs() (index.IDDocIterator, error) { if r.closed { return nil, errReaderClosed } - return r.allDocsWithRLockNotClosedMaybeFinalized(r) + return r.allDocsNotClosedMaybeFinalizedWithRLock(r) } -func (r *fsSegment) allDocsWithRLockNotClosedMaybeFinalized( +func (r *fsSegment) allDocsNotClosedMaybeFinalizedWithRLock( retriever index.DocRetriever, ) (index.IDDocIterator, error) { // NB(r): Not closed, but could be finalized (i.e. closed segment reader) @@ -854,22 +854,24 @@ func newReader( func (sr *fsSegmentReader) MatchField(field []byte) (postings.List, error) { if sr.closed { + return nil, errReaderClosed } // NB(r): We are allowed to call match field after Close called on // the segment but not after it is finalized. sr.fsSegment.RLock() - pl, err := sr.fsSegment.matchFieldWithRLockNotClosedMaybeFinalized(field) + pl, err := sr.fsSegment.matchFieldNotClosedMaybeFinalizedWithRLock(field) sr.fsSegment.RUnlock() return pl, err } func (sr *fsSegmentReader) MatchTerm(field []byte, term []byte) (postings.List, error) { if sr.closed { + return nil, errReaderClosed } // NB(r): We are allowed to call match field after Close called on // the segment but not after it is finalized. sr.fsSegment.RLock() - pl, err := sr.fsSegment.matchTermWithRLockNotClosedMaybeFinalized(field, term) + pl, err := sr.fsSegment.matchTermNotClosedMaybeFinalizedWithRLock(field, term) sr.fsSegment.RUnlock() return pl, err } @@ -879,59 +881,64 @@ func (sr *fsSegmentReader) MatchRegexp( compiled index.CompiledRegex, ) (postings.List, error) { if sr.closed { + return nil, errReaderClosed } // NB(r): We are allowed to call match field after Close called on // the segment but not after it is finalized. sr.fsSegment.RLock() - pl, err := sr.fsSegment.matchRegexpWithRLockNotClosedMaybeFinalized(field, compiled) + pl, err := sr.fsSegment.matchRegexpNotClosedMaybeFinalizedWithRLock(field, compiled) sr.fsSegment.RUnlock() return pl, err } func (sr *fsSegmentReader) MatchAll() (postings.MutableList, error) { if sr.closed { + return nil, errReaderClosed } // NB(r): We are allowed to call match field after Close called on // the segment but not after it is finalized. sr.fsSegment.RLock() - pl, err := sr.fsSegment.matchAllWithRLockNotClosedMaybeFinalized() + pl, err := sr.fsSegment.matchAllNotClosedMaybeFinalizedWithRLock() sr.fsSegment.RUnlock() return pl, err } func (sr *fsSegmentReader) Doc(id postings.ID) (doc.Document, error) { if sr.closed { + return doc.Document{}, errReaderClosed } // NB(r): We are allowed to call match field after Close called on // the segment but not after it is finalized. sr.fsSegment.RLock() - pl, err := sr.fsSegment.docWithRLockNotClosedMaybeFinalized(id) + pl, err := sr.fsSegment.docNotClosedMaybeFinalizedWithRLock(id) sr.fsSegment.RUnlock() return pl, err } func (sr *fsSegmentReader) Docs(pl postings.List) (doc.Iterator, error) { if sr.closed { + return nil, errReaderClosed } // NB(r): We are allowed to call match field after Close called on // the segment but not after it is finalized. // Also make sure the doc retriever is the reader not the segment so that // is closed check is not performed and only the is finalized check. sr.fsSegment.RLock() - iter, err := sr.fsSegment.docsWithRLockNotClosedMaybeFinalized(sr, pl) + iter, err := sr.fsSegment.docsNotClosedMaybeFinalizedWithRLock(sr, pl) sr.fsSegment.RUnlock() return iter, err } func (sr *fsSegmentReader) AllDocs() (index.IDDocIterator, error) { if sr.closed { + return nil, errReaderClosed } // NB(r): We are allowed to call match field after Close called on // the segment but not after it is finalized. // Also make sure the doc retriever is the reader not the segment so that // is closed check is not performed and only the is finalized check. sr.fsSegment.RLock() - iter, err := sr.fsSegment.allDocsWithRLockNotClosedMaybeFinalized(sr) + iter, err := sr.fsSegment.allDocsNotClosedMaybeFinalizedWithRLock(sr) sr.fsSegment.RUnlock() return iter, err } From e8e0aa45828ddcc3b9f7adf1cd2868147df710be Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Sun, 28 Jun 2020 17:38:58 -0400 Subject: [PATCH 7/7] Add test for returned iterators valid --- .../index/segment/fst/writer_reader_test.go | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/src/m3ninx/index/segment/fst/writer_reader_test.go b/src/m3ninx/index/segment/fst/writer_reader_test.go index 9449dc1078..68ea96780e 100644 --- a/src/m3ninx/index/segment/fst/writer_reader_test.go +++ b/src/m3ninx/index/segment/fst/writer_reader_test.go @@ -538,6 +538,28 @@ func TestSegmentReaderValidUntilClose(t *testing.T) { _, err = reader.AllDocs() require.NoError(t, err) + // Test returned iterators also work + re, err = index.CompileRegex([]byte("^.*apple$")) + require.NoError(t, err) + list, err = reader.MatchRegexp([]byte("fruit"), re) + require.NoError(t, err) + iter, err := reader.Docs(list) + require.NoError(t, err) + var docs int + for iter.Next() { + docs++ + var fruitField doc.Field + for _, field := range iter.Current().Fields { + if bytes.Equal(field.Name, []byte("fruit")) { + fruitField = field + break + } + } + require.True(t, bytes.HasSuffix(fruitField.Value, []byte("apple"))) + } + require.NoError(t, iter.Err()) + require.NoError(t, iter.Close()) + // Now close. require.NoError(t, reader.Close())