From dbaeab130090852c5343af5de2d3c7fef06d6182 Mon Sep 17 00:00:00 2001 From: Bo Du Date: Thu, 13 Feb 2020 17:27:10 -0500 Subject: [PATCH 01/10] Concurrent indexing. --- src/dbnode/storage/index.go | 1 + src/dbnode/storage/index/block.go | 22 +-- src/m3ninx/index/batch.go | 13 ++ src/m3ninx/index/segment/builder/builder.go | 141 ++++++++++++------ .../index/segment/builder/builder_test.go | 2 + src/m3ninx/index/segment/segment_mock.go | 139 ++++++++++++++++- src/m3ninx/index/segment/types.go | 9 +- 7 files changed, 271 insertions(+), 56 deletions(-) diff --git a/src/dbnode/storage/index.go b/src/dbnode/storage/index.go index e2a0d3aafa..bae698a096 100644 --- a/src/dbnode/storage/index.go +++ b/src/dbnode/storage/index.go @@ -748,6 +748,7 @@ func (i *nsIndex) Flush( builderOpts := i.opts.IndexOptions().SegmentBuilderOptions() builder, err := builder.NewBuilderFromDocuments(builderOpts) + defer builder.Close() if err != nil { return err } diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index 324fc8ab37..e156e9b66f 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -706,18 +706,20 @@ func (b *block) cleanupForegroundCompactWithLock() { b.foregroundSegments = nil // Free compactor resources. - if b.compact.foregroundCompactor == nil { - return + if b.compact.foregroundCompactor != nil { + if err := b.compact.foregroundCompactor.Close(); err != nil { + instrument.EmitAndLogInvariantViolation(b.iopts, func(l *zap.Logger) { + l.Error("error closing index block foreground compactor", zap.Error(err)) + }) + } + b.compact.foregroundCompactor = nil } - if err := b.compact.foregroundCompactor.Close(); err != nil { - instrument.EmitAndLogInvariantViolation(b.iopts, func(l *zap.Logger) { - l.Error("error closing index block foreground compactor", zap.Error(err)) - }) + // Free segment builder resources. + if b.compact.segmentBuilder != nil { + b.compact.segmentBuilder.Close() + b.compact.segmentBuilder = nil } - - b.compact.foregroundCompactor = nil - b.compact.segmentBuilder = nil } func (b *block) executorWithRLock() (search.Executor, error) { @@ -1484,7 +1486,7 @@ func (b *block) writeBatchErrorInvalidState(state blockState) error { // blockCompact has several lazily allocated compaction components. type blockCompact struct { - segmentBuilder segment.DocumentsBuilder + segmentBuilder segment.CloseableDocumentsBuilder foregroundCompactor *compaction.Compactor backgroundCompactor *compaction.Compactor compactingForeground bool diff --git a/src/m3ninx/index/batch.go b/src/m3ninx/index/batch.go index 82803a6349..32aa68d7ee 100644 --- a/src/m3ninx/index/batch.go +++ b/src/m3ninx/index/batch.go @@ -24,6 +24,7 @@ import ( "bytes" "errors" "fmt" + "sync" "github.com/m3db/m3/src/m3ninx/doc" ) @@ -79,6 +80,8 @@ func NewBatch(docs []doc.Document, opts ...BatchOption) Batch { // BatchPartialError indicates an error was encountered inserting some documents in a batch. // It is not safe for concurrent use. type BatchPartialError struct { + sync.Mutex + errs []BatchError } @@ -138,6 +141,16 @@ func (e *BatchPartialError) Add(err BatchError) { e.errs = append(e.errs, err) } +// AddWithLock adds an error to e with a lock. Any nil errors are ignored. +func (e *BatchPartialError) AddWithLock(err BatchError) { + e.Lock() + defer e.Unlock() + if err.Err == nil { + return + } + e.errs = append(e.errs, err) +} + // Errs returns the errors with the indexes of the documents in the batch // which were not indexed. func (e *BatchPartialError) Errs() []BatchError { diff --git a/src/m3ninx/index/segment/builder/builder.go b/src/m3ninx/index/segment/builder/builder.go index 8367c5f6cf..81876f8798 100644 --- a/src/m3ninx/index/segment/builder/builder.go +++ b/src/m3ninx/index/segment/builder/builder.go @@ -23,19 +23,39 @@ package builder import ( "errors" "fmt" + "runtime" + "sync" "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/m3ninx/index" "github.com/m3db/m3/src/m3ninx/index/segment" "github.com/m3db/m3/src/m3ninx/postings" "github.com/m3db/m3/src/m3ninx/util" + "go.uber.org/atomic" + + "github.com/cespare/xxhash" ) var ( errDocNotFound = errors.New("doc not found") ) +const ( + // Slightly buffer the work to avoid blocking main thread. + indexQueueSize = 2 << 7 +) + +type indexJob struct { + id postings.ID + field doc.Field + + idx int + batchErr *index.BatchPartialError +} + type builder struct { + sync.Mutex + opts Options newUUIDFn util.NewUUIDFn @@ -46,18 +66,22 @@ type builder struct { idSet *IDsMap fields *fieldsMap uniqueFields [][]byte + + wg sync.WaitGroup + indexQueues []chan indexJob + closed *atomic.Bool } // NewBuilderFromDocuments returns a builder from documents, it is // not thread safe and is optimized for insertion speed and a // final build step when documents are indexed. -func NewBuilderFromDocuments(opts Options) (segment.DocumentsBuilder, error) { - return &builder{ +func NewBuilderFromDocuments(opts Options) (segment.CloseableDocumentsBuilder, error) { + concurrency := runtime.NumCPU() + b := &builder{ opts: opts, newUUIDFn: opts.NewUUIDFn(), batchSizeOne: index.Batch{ - Docs: make([]doc.Document, 1), - AllowPartialUpdates: false, + Docs: make([]doc.Document, 1), }, idSet: NewIDsMap(IDsMapOptions{ InitialSize: opts.InitialCapacity(), @@ -66,7 +90,17 @@ func NewBuilderFromDocuments(opts Options) (segment.DocumentsBuilder, error) { InitialSize: opts.InitialCapacity(), }), uniqueFields: make([][]byte, 0, opts.InitialCapacity()), - }, nil + indexQueues: make([]chan indexJob, 0, concurrency), + closed: atomic.NewBool(false), + } + + for i := 0; i < concurrency; i++ { + indexQueue := make(chan indexJob, indexQueueSize) + b.indexQueues = append(b.indexQueues, indexQueue) + go b.indexWorker(indexQueue) + } + + return b, nil } func (b *builder) Reset(offset postings.ID) { @@ -113,9 +147,6 @@ func (b *builder) InsertBatch(batch index.Batch) error { for i, d := range batch.Docs { // Validate doc if err := d.Validate(); err != nil { - if !batch.AllowPartialUpdates { - return err - } batchErr.Add(index.BatchError{Err: err, Idx: i}) continue } @@ -124,9 +155,6 @@ func (b *builder) InsertBatch(batch index.Batch) error { if !d.HasID() { id, err := b.newUUIDFn() if err != nil { - if !batch.AllowPartialUpdates { - return err - } batchErr.Add(index.BatchError{Err: err, Idx: i}) continue } @@ -139,9 +167,6 @@ func (b *builder) InsertBatch(batch index.Batch) error { // Avoid duplicates. if _, ok := b.idSet.Get(d.ID); ok { - if !batch.AllowPartialUpdates { - return index.ErrDuplicateID - } batchErr.Add(index.BatchError{Err: index.ErrDuplicateID, Idx: i}) continue } @@ -158,50 +183,70 @@ func (b *builder) InsertBatch(batch index.Batch) error { // Index the terms. for _, f := range d.Fields { - if err := b.index(postings.ID(postingsListID), f); err != nil { - if !batch.AllowPartialUpdates { - return err - } - batchErr.Add(index.BatchError{Err: err, Idx: i}) - } + b.index(postings.ID(postingsListID), f, i, batchErr) } - if err := b.index(postings.ID(postingsListID), doc.Field{ + b.index(postings.ID(postingsListID), doc.Field{ Name: doc.IDReservedFieldName, Value: d.ID, - }); err != nil { - if !batch.AllowPartialUpdates { - return err - } - batchErr.Add(index.BatchError{Err: err, Idx: i}) - } + }, i, batchErr) } + // Wait for all the concurrent indexing jobs to finish. + b.wg.Wait() + if !batchErr.IsEmpty() { return batchErr } return nil } -func (b *builder) index(id postings.ID, f doc.Field) error { - terms, ok := b.fields.Get(f.Name) - if !ok { - terms = newTerms(b.opts) - b.fields.SetUnsafe(f.Name, terms, fieldsMapSetUnsafeOptions{ - NoCopyKey: true, - NoFinalizeKey: true, - }) +func (b *builder) index( + id postings.ID, + f doc.Field, + i int, + batchErr *index.BatchPartialError, +) { + // Do-nothing if we are already closed to avoid send on closed panics. + if !b.closed.Load() { + return } - - // If empty field, track insertion of this key into the fields - // collection for correct response when retrieving all fields. - newField := terms.size() == 0 - if err := terms.post(f.Value, id); err != nil { - return err + b.wg.Add(1) + // NB(bodu): To avoid locking inside of the terms, we shard the work + // by field name. + shard := xxhash.Sum64(f.Name) % uint64(len(b.indexQueues)) + b.indexQueues[shard] <- indexJob{ + id: id, + field: f, + idx: i, + batchErr: batchErr, } - if newField { - b.uniqueFields = append(b.uniqueFields, f.Name) +} + +func (b *builder) indexWorker(indexQueue chan indexJob) { + for job := range indexQueue { + terms, ok := b.fields.Get(job.field.Name) + if !ok { + terms = newTerms(b.opts) + b.fields.SetUnsafe(job.field.Name, terms, fieldsMapSetUnsafeOptions{ + NoCopyKey: true, + NoFinalizeKey: true, + }) + } + + // If empty field, track insertion of this key into the fields + // collection for correct response when retrieving all fields. + newField := terms.size() == 0 + // NB(bodu): Bulk of the cpu time during insertion is spent inside of terms.post(). + if err := terms.post(job.field.Value, job.id); err != nil { + job.batchErr.AddWithLock(index.BatchError{Err: err, Idx: job.idx}) + } + if newField { + b.Lock() + b.uniqueFields = append(b.uniqueFields, job.field.Name) + b.Unlock() + } + b.wg.Done() } - return nil } func (b *builder) AllDocs() (index.IDDocIterator, error) { @@ -247,3 +292,11 @@ func (b *builder) Terms(field []byte) (segment.TermsIterator, error) { return newTermsIter(terms.uniqueTerms), nil } + +func (b *builder) Close() error { + b.closed.Store(true) + for _, q := range b.indexQueues { + close(q) + } + return nil +} diff --git a/src/m3ninx/index/segment/builder/builder_test.go b/src/m3ninx/index/segment/builder/builder_test.go index 2449db5e72..035afdfb08 100644 --- a/src/m3ninx/index/segment/builder/builder_test.go +++ b/src/m3ninx/index/segment/builder/builder_test.go @@ -78,6 +78,7 @@ var ( func TestBuilderFields(t *testing.T) { builder, err := NewBuilderFromDocuments(testOptions) + defer builder.Close() require.NoError(t, err) for i := 0; i < 10; i++ { @@ -105,6 +106,7 @@ func TestBuilderFields(t *testing.T) { func TestBuilderTerms(t *testing.T) { builder, err := NewBuilderFromDocuments(testOptions) + defer builder.Close() require.NoError(t, err) for i := 0; i < 10; i++ { diff --git a/src/m3ninx/index/segment/segment_mock.go b/src/m3ninx/index/segment/segment_mock.go index 3325e03523..4102918076 100644 --- a/src/m3ninx/index/segment/segment_mock.go +++ b/src/m3ninx/index/segment/segment_mock.go @@ -1,7 +1,7 @@ // Code generated by MockGen. DO NOT EDIT. // Source: github.com/m3db/m3/src/m3ninx/index/segment/types.go -// Copyright (c) 2019 Uber Technologies, Inc. +// Copyright (c) 2020 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -1093,6 +1093,143 @@ func (mr *MockDocumentsBuilderMockRecorder) InsertBatch(b interface{}) *gomock.C return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertBatch", reflect.TypeOf((*MockDocumentsBuilder)(nil).InsertBatch), b) } +// MockCloseableDocumentsBuilder is a mock of CloseableDocumentsBuilder interface +type MockCloseableDocumentsBuilder struct { + ctrl *gomock.Controller + recorder *MockCloseableDocumentsBuilderMockRecorder +} + +// MockCloseableDocumentsBuilderMockRecorder is the mock recorder for MockCloseableDocumentsBuilder +type MockCloseableDocumentsBuilderMockRecorder struct { + mock *MockCloseableDocumentsBuilder +} + +// NewMockCloseableDocumentsBuilder creates a new mock instance +func NewMockCloseableDocumentsBuilder(ctrl *gomock.Controller) *MockCloseableDocumentsBuilder { + mock := &MockCloseableDocumentsBuilder{ctrl: ctrl} + mock.recorder = &MockCloseableDocumentsBuilderMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockCloseableDocumentsBuilder) EXPECT() *MockCloseableDocumentsBuilderMockRecorder { + return m.recorder +} + +// Fields mocks base method +func (m *MockCloseableDocumentsBuilder) Fields() (FieldsIterator, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Fields") + ret0, _ := ret[0].(FieldsIterator) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Fields indicates an expected call of Fields +func (mr *MockCloseableDocumentsBuilderMockRecorder) Fields() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Fields", reflect.TypeOf((*MockCloseableDocumentsBuilder)(nil).Fields)) +} + +// Terms mocks base method +func (m *MockCloseableDocumentsBuilder) Terms(field []byte) (TermsIterator, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Terms", field) + ret0, _ := ret[0].(TermsIterator) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Terms indicates an expected call of Terms +func (mr *MockCloseableDocumentsBuilderMockRecorder) Terms(field interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Terms", reflect.TypeOf((*MockCloseableDocumentsBuilder)(nil).Terms), field) +} + +// Reset mocks base method +func (m *MockCloseableDocumentsBuilder) Reset(offset postings.ID) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Reset", offset) +} + +// Reset indicates an expected call of Reset +func (mr *MockCloseableDocumentsBuilderMockRecorder) Reset(offset interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reset", reflect.TypeOf((*MockCloseableDocumentsBuilder)(nil).Reset), offset) +} + +// Docs mocks base method +func (m *MockCloseableDocumentsBuilder) Docs() []doc.Document { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Docs") + ret0, _ := ret[0].([]doc.Document) + return ret0 +} + +// Docs indicates an expected call of Docs +func (mr *MockCloseableDocumentsBuilderMockRecorder) Docs() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Docs", reflect.TypeOf((*MockCloseableDocumentsBuilder)(nil).Docs)) +} + +// AllDocs mocks base method +func (m *MockCloseableDocumentsBuilder) AllDocs() (index.IDDocIterator, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AllDocs") + ret0, _ := ret[0].(index.IDDocIterator) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// AllDocs indicates an expected call of AllDocs +func (mr *MockCloseableDocumentsBuilderMockRecorder) AllDocs() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AllDocs", reflect.TypeOf((*MockCloseableDocumentsBuilder)(nil).AllDocs)) +} + +// Insert mocks base method +func (m *MockCloseableDocumentsBuilder) Insert(d doc.Document) ([]byte, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Insert", d) + ret0, _ := ret[0].([]byte) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Insert indicates an expected call of Insert +func (mr *MockCloseableDocumentsBuilderMockRecorder) Insert(d interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Insert", reflect.TypeOf((*MockCloseableDocumentsBuilder)(nil).Insert), d) +} + +// InsertBatch mocks base method +func (m *MockCloseableDocumentsBuilder) InsertBatch(b index.Batch) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InsertBatch", b) + ret0, _ := ret[0].(error) + return ret0 +} + +// InsertBatch indicates an expected call of InsertBatch +func (mr *MockCloseableDocumentsBuilderMockRecorder) InsertBatch(b interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertBatch", reflect.TypeOf((*MockCloseableDocumentsBuilder)(nil).InsertBatch), b) +} + +// Close mocks base method +func (m *MockCloseableDocumentsBuilder) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close +func (mr *MockCloseableDocumentsBuilderMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockCloseableDocumentsBuilder)(nil).Close)) +} + // MockSegmentsBuilder is a mock of SegmentsBuilder interface type MockSegmentsBuilder struct { ctrl *gomock.Controller diff --git a/src/m3ninx/index/segment/types.go b/src/m3ninx/index/segment/types.go index c80bc3a8c1..48bcf5b3c8 100644 --- a/src/m3ninx/index/segment/types.go +++ b/src/m3ninx/index/segment/types.go @@ -167,12 +167,19 @@ type Builder interface { AllDocs() (index.IDDocIterator, error) } -// DocumentsBuilder is a builder is written documents to. +// DocumentsBuilder is a builder that has documents written to it. type DocumentsBuilder interface { Builder index.Writer } +// CloseableDocumentsBuilder is a builder that has documents written to it and has freeable resources. +type CloseableDocumentsBuilder interface { + DocumentsBuilder + + Close() error +} + // SegmentsBuilder is a builder that is built from segments. type SegmentsBuilder interface { Builder From 9662f7abed45bc40e39b5cc030fa96563437220e Mon Sep 17 00:00:00 2001 From: Bo Du Date: Thu, 13 Feb 2020 18:35:38 -0500 Subject: [PATCH 02/10] Shard unique fields and add custom 2d sort. --- src/m3ninx/index/segment/builder/builder.go | 46 +++++++++---- .../index/segment/builder/bytes_slice_iter.go | 69 ++++++++++++++++--- 2 files changed, 90 insertions(+), 25 deletions(-) diff --git a/src/m3ninx/index/segment/builder/builder.go b/src/m3ninx/index/segment/builder/builder.go index 81876f8798..e01d4846ab 100644 --- a/src/m3ninx/index/segment/builder/builder.go +++ b/src/m3ninx/index/segment/builder/builder.go @@ -49,6 +49,7 @@ type indexJob struct { id postings.ID field doc.Field + shard int idx int batchErr *index.BatchPartialError } @@ -65,7 +66,7 @@ type builder struct { docs []doc.Document idSet *IDsMap fields *fieldsMap - uniqueFields [][]byte + uniqueFields [][][]byte wg sync.WaitGroup indexQueues []chan indexJob @@ -89,7 +90,7 @@ func NewBuilderFromDocuments(opts Options) (segment.CloseableDocumentsBuilder, e fields: newFieldsMap(fieldsMapOptions{ InitialSize: opts.InitialCapacity(), }), - uniqueFields: make([][]byte, 0, opts.InitialCapacity()), + uniqueFields: make([][][]byte, 0, concurrency), indexQueues: make([]chan indexJob, 0, concurrency), closed: atomic.NewBool(false), } @@ -98,6 +99,14 @@ func NewBuilderFromDocuments(opts Options) (segment.CloseableDocumentsBuilder, e indexQueue := make(chan indexJob, indexQueueSize) b.indexQueues = append(b.indexQueues, indexQueue) go b.indexWorker(indexQueue) + + // Give each shard a fraction of the configured initial capacity. + shardInitialCapcity := opts.InitialCapacity() + if shardInitialCapcity > 0 { + shardInitialCapcity /= concurrency + } + shardUniqueFields := make([][]byte, 0, shardInitialCapcity) + b.uniqueFields = append(b.uniqueFields, shardUniqueFields) } return b, nil @@ -122,10 +131,12 @@ func (b *builder) Reset(offset postings.ID) { } // Reset the unique fields slice - for i := range b.uniqueFields { - b.uniqueFields[i] = nil + for i, shardUniqueFields := range b.uniqueFields { + for i := range shardUniqueFields { + shardUniqueFields[i] = nil + } + b.uniqueFields[i] = shardUniqueFields[:0] } - b.uniqueFields = b.uniqueFields[:0] } func (b *builder) Insert(d doc.Document) ([]byte, error) { @@ -207,16 +218,17 @@ func (b *builder) index( batchErr *index.BatchPartialError, ) { // Do-nothing if we are already closed to avoid send on closed panics. - if !b.closed.Load() { + if b.closed.Load() { return } b.wg.Add(1) // NB(bodu): To avoid locking inside of the terms, we shard the work // by field name. - shard := xxhash.Sum64(f.Name) % uint64(len(b.indexQueues)) + shard := int(xxhash.Sum64(f.Name) % uint64(len(b.indexQueues))) b.indexQueues[shard] <- indexJob{ id: id, field: f, + shard: shard, idx: i, batchErr: batchErr, } @@ -226,11 +238,17 @@ func (b *builder) indexWorker(indexQueue chan indexJob) { for job := range indexQueue { terms, ok := b.fields.Get(job.field.Name) if !ok { - terms = newTerms(b.opts) - b.fields.SetUnsafe(job.field.Name, terms, fieldsMapSetUnsafeOptions{ - NoCopyKey: true, - NoFinalizeKey: true, - }) + b.Lock() + // NB(bodu): Check again within the lock to make sure we aren't making concurrent map writes. + terms, ok = b.fields.Get(job.field.Name) + if !ok { + terms = newTerms(b.opts) + b.fields.SetUnsafe(job.field.Name, terms, fieldsMapSetUnsafeOptions{ + NoCopyKey: true, + NoFinalizeKey: true, + }) + } + b.Unlock() } // If empty field, track insertion of this key into the fields @@ -241,9 +259,7 @@ func (b *builder) indexWorker(indexQueue chan indexJob) { job.batchErr.AddWithLock(index.BatchError{Err: err, Idx: job.idx}) } if newField { - b.Lock() - b.uniqueFields = append(b.uniqueFields, job.field.Name) - b.Unlock() + b.uniqueFields[job.shard] = append(b.uniqueFields[job.shard], job.field.Name) } b.wg.Done() } diff --git a/src/m3ninx/index/segment/builder/bytes_slice_iter.go b/src/m3ninx/index/segment/builder/bytes_slice_iter.go index 336f1e0e1a..67a10324f1 100644 --- a/src/m3ninx/index/segment/builder/bytes_slice_iter.go +++ b/src/m3ninx/index/segment/builder/bytes_slice_iter.go @@ -25,6 +25,8 @@ import ( "sort" "github.com/m3db/m3/src/m3ninx/index/segment" + + "github.com/twotwotwo/sorts" ) // OrderedBytesSliceIter is a new ordered bytes slice iterator. @@ -32,9 +34,9 @@ type OrderedBytesSliceIter struct { err error done bool - currentIdx int - current []byte - backingSlice [][]byte + currentIdx int + current []byte + backingSlices *sortableSliceOfSliceOfByteSlices } var _ segment.FieldsIterator = &OrderedBytesSliceIter{} @@ -42,12 +44,13 @@ var _ segment.FieldsIterator = &OrderedBytesSliceIter{} // NewOrderedBytesSliceIter sorts a slice of bytes and then // returns an iterator over them. func NewOrderedBytesSliceIter( - maybeUnorderedSlice [][]byte, + maybeUnorderedSlices [][][]byte, ) *OrderedBytesSliceIter { - sortSliceOfByteSlices(maybeUnorderedSlice) + sortable := &sortableSliceOfSliceOfByteSlices{data: maybeUnorderedSlices} + sorts.ByBytes(sortable) return &OrderedBytesSliceIter{ - currentIdx: -1, - backingSlice: maybeUnorderedSlice, + currentIdx: -1, + backingSlices: sortable, } } @@ -57,11 +60,12 @@ func (b *OrderedBytesSliceIter) Next() bool { return false } b.currentIdx++ - if b.currentIdx >= len(b.backingSlice) { + if b.currentIdx >= b.backingSlices.Len() { b.done = true return false } - b.current = b.backingSlice[b.currentIdx] + iOuter, iInner := b.backingSlices.getIndices(b.currentIdx) + b.current = b.backingSlices.data[iOuter][iInner] return true } @@ -77,7 +81,7 @@ func (b *OrderedBytesSliceIter) Err() error { // Len returns the length of the slice. func (b *OrderedBytesSliceIter) Len() int { - return len(b.backingSlice) + return b.backingSlices.Len() } // Close releases resources. @@ -86,6 +90,51 @@ func (b *OrderedBytesSliceIter) Close() error { return nil } +type sortableSliceOfSliceOfByteSlices struct { + data [][][]byte + length *int +} + +func (s *sortableSliceOfSliceOfByteSlices) Len() int { + if s.length != nil { + return *s.length + } + + totalLen := 0 + for _, innerSlice := range s.data { + totalLen += len(innerSlice) + } + s.length = &totalLen + + return *s.length +} + +func (s *sortableSliceOfSliceOfByteSlices) Less(i, j int) bool { + iOuter, iInner := s.getIndices(i) + jOuter, jInner := s.getIndices(j) + return bytes.Compare(s.data[iOuter][iInner], s.data[jOuter][jInner]) < 0 +} + +func (s *sortableSliceOfSliceOfByteSlices) Swap(i, j int) { + iOuter, iInner := s.getIndices(i) + jOuter, jInner := s.getIndices(j) + s.data[iOuter][iInner], s.data[jOuter][jInner] = s.data[jOuter][jInner], s.data[iOuter][iInner] +} + +func (s *sortableSliceOfSliceOfByteSlices) Key(i int) []byte { + iOuter, iInner := s.getIndices(i) + return s.data[iOuter][iInner] +} + +func (s *sortableSliceOfSliceOfByteSlices) getIndices(idx int) (int, int) { + currentSliceIdx := 0 + for idx >= len(s.data[currentSliceIdx]) { + idx -= len(s.data[currentSliceIdx]) + currentSliceIdx++ + } + return currentSliceIdx, idx +} + func sortSliceOfByteSlices(b [][]byte) { sort.Slice(b, func(i, j int) bool { return bytes.Compare(b[i], b[j]) < 0 From c93ab0230be7ee71ebddc18391052d6193917676 Mon Sep 17 00:00:00 2001 From: Bo Du Date: Fri, 14 Feb 2020 18:07:03 -0500 Subject: [PATCH 03/10] Shard fields map and remove locks for concurrency. --- src/m3ninx/index/segment/builder/builder.go | 29 +++------- .../segment/builder/sharded_fields_map.go | 55 +++++++++++++++++++ 2 files changed, 64 insertions(+), 20 deletions(-) create mode 100644 src/m3ninx/index/segment/builder/sharded_fields_map.go diff --git a/src/m3ninx/index/segment/builder/builder.go b/src/m3ninx/index/segment/builder/builder.go index e01d4846ab..2ae2a4a5e3 100644 --- a/src/m3ninx/index/segment/builder/builder.go +++ b/src/m3ninx/index/segment/builder/builder.go @@ -55,8 +55,6 @@ type indexJob struct { } type builder struct { - sync.Mutex - opts Options newUUIDFn util.NewUUIDFn @@ -65,7 +63,7 @@ type builder struct { batchSizeOne index.Batch docs []doc.Document idSet *IDsMap - fields *fieldsMap + fields *shardedFieldsMap uniqueFields [][][]byte wg sync.WaitGroup @@ -87,9 +85,6 @@ func NewBuilderFromDocuments(opts Options) (segment.CloseableDocumentsBuilder, e idSet: NewIDsMap(IDsMapOptions{ InitialSize: opts.InitialCapacity(), }), - fields: newFieldsMap(fieldsMapOptions{ - InitialSize: opts.InitialCapacity(), - }), uniqueFields: make([][][]byte, 0, concurrency), indexQueues: make([]chan indexJob, 0, concurrency), closed: atomic.NewBool(false), @@ -107,6 +102,7 @@ func NewBuilderFromDocuments(opts Options) (segment.CloseableDocumentsBuilder, e } shardUniqueFields := make([][]byte, 0, shardInitialCapcity) b.uniqueFields = append(b.uniqueFields, shardUniqueFields) + b.fields = newShardedFieldsMap(concurrency, shardInitialCapcity) } return b, nil @@ -126,9 +122,7 @@ func (b *builder) Reset(offset postings.ID) { b.idSet.Reset() // Keep fields around, just reset the terms set for each one. - for _, entry := range b.fields.Iter() { - entry.Value().reset() - } + b.fields.ResetTermsSets() // Reset the unique fields slice for i, shardUniqueFields := range b.uniqueFields { @@ -236,19 +230,14 @@ func (b *builder) index( func (b *builder) indexWorker(indexQueue chan indexJob) { for job := range indexQueue { - terms, ok := b.fields.Get(job.field.Name) + terms, ok := b.fields.ShardedGet(job.shard, job.field.Name) if !ok { - b.Lock() // NB(bodu): Check again within the lock to make sure we aren't making concurrent map writes. - terms, ok = b.fields.Get(job.field.Name) - if !ok { - terms = newTerms(b.opts) - b.fields.SetUnsafe(job.field.Name, terms, fieldsMapSetUnsafeOptions{ - NoCopyKey: true, - NoFinalizeKey: true, - }) - } - b.Unlock() + terms = newTerms(b.opts) + b.fields.ShardedSetUnsafe(job.shard, job.field.Name, terms, fieldsMapSetUnsafeOptions{ + NoCopyKey: true, + NoFinalizeKey: true, + }) } // If empty field, track insertion of this key into the fields diff --git a/src/m3ninx/index/segment/builder/sharded_fields_map.go b/src/m3ninx/index/segment/builder/sharded_fields_map.go new file mode 100644 index 0000000000..f7f3528555 --- /dev/null +++ b/src/m3ninx/index/segment/builder/sharded_fields_map.go @@ -0,0 +1,55 @@ +package builder + +type shardedFieldsMap struct { + data []*fieldsMap +} + +func newShardedFieldsMap( + numShards int, + shardInitialCapacity int, +) *shardedFieldsMap { + data := make([]*fieldsMap, 0, numShards) + for i := 0; i < numShards; i++ { + data = append(data, newFieldsMap(fieldsMapOptions{ + InitialSize: shardInitialCapacity, + })) + } + return &shardedFieldsMap{ + data: data, + } +} + +func (s *shardedFieldsMap) Get(k []byte) (*terms, bool) { + for _, fieldMap := range s.data { + t, found := fieldMap.Get(k) + if found { + return t, found + } + } + return nil, false +} + +func (s *shardedFieldsMap) ShardedGet( + shard int, + k []byte, +) (*terms, bool) { + return s.data[shard].Get(k) +} + +func (s *shardedFieldsMap) ShardedSetUnsafe( + shard int, + k []byte, + v *terms, + opts fieldsMapSetUnsafeOptions, +) { + s.data[shard].SetUnsafe(k, v, opts) +} + +// ResetTerms keeps fields around but resets the terms set for each one. +func (s *shardedFieldsMap) ResetTermsSets() { + for _, fieldMap := range s.data { + for _, entry := range fieldMap.Iter() { + entry.Value().reset() + } + } +} From 81692ede4802e63fd0852addacfe447494679ba6 Mon Sep 17 00:00:00 2001 From: Bo Du Date: Wed, 19 Feb 2020 11:48:40 -0500 Subject: [PATCH 04/10] Add header comment. --- .../segment/builder/sharded_fields_map.go | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/src/m3ninx/index/segment/builder/sharded_fields_map.go b/src/m3ninx/index/segment/builder/sharded_fields_map.go index f7f3528555..4a520fa65b 100644 --- a/src/m3ninx/index/segment/builder/sharded_fields_map.go +++ b/src/m3ninx/index/segment/builder/sharded_fields_map.go @@ -1,3 +1,23 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + package builder type shardedFieldsMap struct { From 8be41387b53f3b0306603c03cf00e1a0e074e60f Mon Sep 17 00:00:00 2001 From: Bo Du Date: Mon, 24 Feb 2020 17:43:00 -0500 Subject: [PATCH 05/10] Address PR comments. --- src/m3ninx/index/batch.go | 4 +-- src/m3ninx/index/segment/builder/builder.go | 28 +++++++++++-------- .../index/segment/builder/builder_test.go | 4 +-- .../index/segment/builder/bytes_slice_iter.go | 26 ++++++++--------- src/m3ninx/index/segment/builder/options.go | 24 ++++++++++++++++ .../segment/builder/sharded_fields_map.go | 5 ++-- 6 files changed, 60 insertions(+), 31 deletions(-) diff --git a/src/m3ninx/index/batch.go b/src/m3ninx/index/batch.go index 32aa68d7ee..a22145610c 100644 --- a/src/m3ninx/index/batch.go +++ b/src/m3ninx/index/batch.go @@ -143,12 +143,12 @@ func (e *BatchPartialError) Add(err BatchError) { // AddWithLock adds an error to e with a lock. Any nil errors are ignored. func (e *BatchPartialError) AddWithLock(err BatchError) { - e.Lock() - defer e.Unlock() if err.Err == nil { return } + e.Lock() e.errs = append(e.errs, err) + e.Unlock() } // Errs returns the errors with the indexes of the documents in the batch diff --git a/src/m3ninx/index/segment/builder/builder.go b/src/m3ninx/index/segment/builder/builder.go index 2ae2a4a5e3..d5c2fb71c4 100644 --- a/src/m3ninx/index/segment/builder/builder.go +++ b/src/m3ninx/index/segment/builder/builder.go @@ -23,7 +23,6 @@ package builder import ( "errors" "fmt" - "runtime" "sync" "github.com/m3db/m3/src/m3ninx/doc" @@ -42,10 +41,12 @@ var ( const ( // Slightly buffer the work to avoid blocking main thread. - indexQueueSize = 2 << 7 + indexQueueSize = 2 << 9 // 1024 ) type indexJob struct { + wg *sync.WaitGroup + id postings.ID field doc.Field @@ -75,7 +76,7 @@ type builder struct { // not thread safe and is optimized for insertion speed and a // final build step when documents are indexed. func NewBuilderFromDocuments(opts Options) (segment.CloseableDocumentsBuilder, error) { - concurrency := runtime.NumCPU() + concurrency := opts.Concurrency() b := &builder{ opts: opts, newUUIDFn: opts.NewUUIDFn(), @@ -148,6 +149,7 @@ func (b *builder) Insert(d doc.Document) ([]byte, error) { func (b *builder) InsertBatch(batch index.Batch) error { // NB(r): This is all kept in a single method to make the // insertion path fast. + var wg sync.WaitGroup batchErr := index.NewBatchPartialError() for i, d := range batch.Docs { // Validate doc @@ -188,9 +190,9 @@ func (b *builder) InsertBatch(batch index.Batch) error { // Index the terms. for _, f := range d.Fields { - b.index(postings.ID(postingsListID), f, i, batchErr) + b.index(&wg, postings.ID(postingsListID), f, i, batchErr) } - b.index(postings.ID(postingsListID), doc.Field{ + b.index(&wg, postings.ID(postingsListID), doc.Field{ Name: doc.IDReservedFieldName, Value: d.ID, }, i, batchErr) @@ -206,6 +208,7 @@ func (b *builder) InsertBatch(batch index.Batch) error { } func (b *builder) index( + wg *sync.WaitGroup, id postings.ID, f doc.Field, i int, @@ -215,11 +218,12 @@ func (b *builder) index( if b.closed.Load() { return } - b.wg.Add(1) + wg.Add(1) // NB(bodu): To avoid locking inside of the terms, we shard the work // by field name. shard := int(xxhash.Sum64(f.Name) % uint64(len(b.indexQueues))) b.indexQueues[shard] <- indexJob{ + wg: wg, id: id, field: f, shard: shard, @@ -244,13 +248,15 @@ func (b *builder) indexWorker(indexQueue chan indexJob) { // collection for correct response when retrieving all fields. newField := terms.size() == 0 // NB(bodu): Bulk of the cpu time during insertion is spent inside of terms.post(). - if err := terms.post(job.field.Value, job.id); err != nil { + err := terms.post(job.field.Value, job.id) + if err == nil { + if newField { + b.uniqueFields[job.shard] = append(b.uniqueFields[job.shard], job.field.Name) + } + } else { job.batchErr.AddWithLock(index.BatchError{Err: err, Idx: job.idx}) } - if newField { - b.uniqueFields[job.shard] = append(b.uniqueFields[job.shard], job.field.Name) - } - b.wg.Done() + job.wg.Done() } } diff --git a/src/m3ninx/index/segment/builder/builder_test.go b/src/m3ninx/index/segment/builder/builder_test.go index 035afdfb08..c5913829a2 100644 --- a/src/m3ninx/index/segment/builder/builder_test.go +++ b/src/m3ninx/index/segment/builder/builder_test.go @@ -78,8 +78,8 @@ var ( func TestBuilderFields(t *testing.T) { builder, err := NewBuilderFromDocuments(testOptions) - defer builder.Close() require.NoError(t, err) + defer require.NoError(t, builder.Close()) for i := 0; i < 10; i++ { builder.Reset(0) @@ -106,8 +106,8 @@ func TestBuilderFields(t *testing.T) { func TestBuilderTerms(t *testing.T) { builder, err := NewBuilderFromDocuments(testOptions) - defer builder.Close() require.NoError(t, err) + defer require.NoError(t, builder.Close()) for i := 0; i < 10; i++ { builder.Reset(0) diff --git a/src/m3ninx/index/segment/builder/bytes_slice_iter.go b/src/m3ninx/index/segment/builder/bytes_slice_iter.go index 67a10324f1..d516cfc285 100644 --- a/src/m3ninx/index/segment/builder/bytes_slice_iter.go +++ b/src/m3ninx/index/segment/builder/bytes_slice_iter.go @@ -36,7 +36,7 @@ type OrderedBytesSliceIter struct { currentIdx int current []byte - backingSlices *sortableSliceOfSliceOfByteSlices + backingSlices *sortableSliceOfSliceOfByteSlicesAsc } var _ segment.FieldsIterator = &OrderedBytesSliceIter{} @@ -46,7 +46,7 @@ var _ segment.FieldsIterator = &OrderedBytesSliceIter{} func NewOrderedBytesSliceIter( maybeUnorderedSlices [][][]byte, ) *OrderedBytesSliceIter { - sortable := &sortableSliceOfSliceOfByteSlices{data: maybeUnorderedSlices} + sortable := &sortableSliceOfSliceOfByteSlicesAsc{data: maybeUnorderedSlices} sorts.ByBytes(sortable) return &OrderedBytesSliceIter{ currentIdx: -1, @@ -90,43 +90,43 @@ func (b *OrderedBytesSliceIter) Close() error { return nil } -type sortableSliceOfSliceOfByteSlices struct { +type sortableSliceOfSliceOfByteSlicesAsc struct { data [][][]byte - length *int + length int } -func (s *sortableSliceOfSliceOfByteSlices) Len() int { - if s.length != nil { - return *s.length +func (s *sortableSliceOfSliceOfByteSlicesAsc) Len() int { + if s.length > 0 { + return s.length } totalLen := 0 for _, innerSlice := range s.data { totalLen += len(innerSlice) } - s.length = &totalLen + s.length = totalLen - return *s.length + return s.length } -func (s *sortableSliceOfSliceOfByteSlices) Less(i, j int) bool { +func (s *sortableSliceOfSliceOfByteSlicesAsc) Less(i, j int) bool { iOuter, iInner := s.getIndices(i) jOuter, jInner := s.getIndices(j) return bytes.Compare(s.data[iOuter][iInner], s.data[jOuter][jInner]) < 0 } -func (s *sortableSliceOfSliceOfByteSlices) Swap(i, j int) { +func (s *sortableSliceOfSliceOfByteSlicesAsc) Swap(i, j int) { iOuter, iInner := s.getIndices(i) jOuter, jInner := s.getIndices(j) s.data[iOuter][iInner], s.data[jOuter][jInner] = s.data[jOuter][jInner], s.data[iOuter][iInner] } -func (s *sortableSliceOfSliceOfByteSlices) Key(i int) []byte { +func (s *sortableSliceOfSliceOfByteSlicesAsc) Key(i int) []byte { iOuter, iInner := s.getIndices(i) return s.data[iOuter][iInner] } -func (s *sortableSliceOfSliceOfByteSlices) getIndices(idx int) (int, int) { +func (s *sortableSliceOfSliceOfByteSlicesAsc) getIndices(idx int) (int, int) { currentSliceIdx := 0 for idx >= len(s.data[currentSliceIdx]) { idx -= len(s.data[currentSliceIdx]) diff --git a/src/m3ninx/index/segment/builder/options.go b/src/m3ninx/index/segment/builder/options.go index d96800c209..e2b69b0719 100644 --- a/src/m3ninx/index/segment/builder/options.go +++ b/src/m3ninx/index/segment/builder/options.go @@ -21,6 +21,8 @@ package builder import ( + "runtime" + "github.com/m3db/m3/src/m3ninx/postings" "github.com/m3db/m3/src/m3ninx/postings/roaring" "github.com/m3db/m3/src/m3ninx/util" @@ -30,6 +32,10 @@ const ( defaultInitialCapacity = 128 ) +var ( + defaultConcurrency = runtime.NumCPU() +) + // Options is a collection of options for segment building. type Options interface { // SetNewUUIDFn sets the function used to generate new UUIDs. @@ -49,12 +55,19 @@ type Options interface { // PostingsListPool returns the postings list pool. PostingsListPool() postings.Pool + + // SetConcurrency sets the indexing concurrency. + SetConcurrency(value int) Options + + // Concurrency returns the indexing concurrency. + Concurrency() int } type opts struct { newUUIDFn util.NewUUIDFn initialCapacity int postingsPool postings.Pool + concurrency int } // NewOptions returns new options. @@ -63,6 +76,7 @@ func NewOptions() Options { newUUIDFn: util.NewUUID, initialCapacity: defaultInitialCapacity, postingsPool: postings.NewPool(nil, roaring.NewPostingsList), + concurrency: defaultConcurrency, } } @@ -95,3 +109,13 @@ func (o *opts) SetPostingsListPool(v postings.Pool) Options { func (o *opts) PostingsListPool() postings.Pool { return o.postingsPool } + +func (o *opts) SetConcurrency(v int) Options { + opts := *o + opts.concurrency = v + return &opts +} + +func (o *opts) Concurrency() int { + return o.concurrency +} diff --git a/src/m3ninx/index/segment/builder/sharded_fields_map.go b/src/m3ninx/index/segment/builder/sharded_fields_map.go index 4a520fa65b..a493939a15 100644 --- a/src/m3ninx/index/segment/builder/sharded_fields_map.go +++ b/src/m3ninx/index/segment/builder/sharded_fields_map.go @@ -41,8 +41,7 @@ func newShardedFieldsMap( func (s *shardedFieldsMap) Get(k []byte) (*terms, bool) { for _, fieldMap := range s.data { - t, found := fieldMap.Get(k) - if found { + if t, found := fieldMap.Get(k); found { return t, found } } @@ -65,7 +64,7 @@ func (s *shardedFieldsMap) ShardedSetUnsafe( s.data[shard].SetUnsafe(k, v, opts) } -// ResetTerms keeps fields around but resets the terms set for each one. +// ResetTermsSets keeps fields around but resets the terms set for each one. func (s *shardedFieldsMap) ResetTermsSets() { for _, fieldMap := range s.data { for _, entry := range fieldMap.Iter() { From 97d56ff6dc981b5fc19b2ef0a6b73f10e0b759df Mon Sep 17 00:00:00 2001 From: Bo Du Date: Wed, 26 Feb 2020 17:40:48 -0500 Subject: [PATCH 06/10] Remove wg on builder. --- src/m3ninx/index/segment/builder/builder.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/m3ninx/index/segment/builder/builder.go b/src/m3ninx/index/segment/builder/builder.go index d5c2fb71c4..891b28e7c8 100644 --- a/src/m3ninx/index/segment/builder/builder.go +++ b/src/m3ninx/index/segment/builder/builder.go @@ -67,7 +67,6 @@ type builder struct { fields *shardedFieldsMap uniqueFields [][][]byte - wg sync.WaitGroup indexQueues []chan indexJob closed *atomic.Bool } @@ -199,7 +198,7 @@ func (b *builder) InsertBatch(batch index.Batch) error { } // Wait for all the concurrent indexing jobs to finish. - b.wg.Wait() + wg.Wait() if !batchErr.IsEmpty() { return batchErr From f8a84d800f9740a2ddedb825a0f0eb78b5b617e0 Mon Sep 17 00:00:00 2001 From: Bo Du Date: Thu, 27 Feb 2020 11:12:16 -0500 Subject: [PATCH 07/10] Need to defer inside of a function or the Close gets evaluated immediately. --- src/m3ninx/index/segment/builder/builder_test.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/m3ninx/index/segment/builder/builder_test.go b/src/m3ninx/index/segment/builder/builder_test.go index c5913829a2..cba14e86d7 100644 --- a/src/m3ninx/index/segment/builder/builder_test.go +++ b/src/m3ninx/index/segment/builder/builder_test.go @@ -79,7 +79,9 @@ var ( func TestBuilderFields(t *testing.T) { builder, err := NewBuilderFromDocuments(testOptions) require.NoError(t, err) - defer require.NoError(t, builder.Close()) + defer func() { + require.NoError(t, builder.Close()) + }() for i := 0; i < 10; i++ { builder.Reset(0) @@ -107,7 +109,9 @@ func TestBuilderFields(t *testing.T) { func TestBuilderTerms(t *testing.T) { builder, err := NewBuilderFromDocuments(testOptions) require.NoError(t, err) - defer require.NoError(t, builder.Close()) + defer func() { + require.NoError(t, builder.Close()) + }() for i := 0; i < 10; i++ { builder.Reset(0) From 77ffa29670535459e5dd0f126a2164424ac5ad58 Mon Sep 17 00:00:00 2001 From: Bo Du Date: Fri, 28 Feb 2020 16:40:53 -0500 Subject: [PATCH 08/10] Address PR comments. --- src/dbnode/storage/index.go | 2 +- src/dbnode/storage/index/block.go | 6 +++- src/m3ninx/index/segment/builder/builder.go | 29 +++++++++++++------ .../segment/builder/sharded_fields_map.go | 9 ------ 4 files changed, 26 insertions(+), 20 deletions(-) diff --git a/src/dbnode/storage/index.go b/src/dbnode/storage/index.go index bae698a096..97bb398a11 100644 --- a/src/dbnode/storage/index.go +++ b/src/dbnode/storage/index.go @@ -748,10 +748,10 @@ func (i *nsIndex) Flush( builderOpts := i.opts.IndexOptions().SegmentBuilderOptions() builder, err := builder.NewBuilderFromDocuments(builderOpts) - defer builder.Close() if err != nil { return err } + defer builder.Close() var evicted int for _, block := range flushable { diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index e156e9b66f..b0e90f13d5 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -717,7 +717,11 @@ func (b *block) cleanupForegroundCompactWithLock() { // Free segment builder resources. if b.compact.segmentBuilder != nil { - b.compact.segmentBuilder.Close() + if err := b.compact.segmentBuilder.Close(); err != nil { + instrument.EmitAndLogInvariantViolation(b.iopts, func(l *zap.Logger) { + l.Error("error closing index block segment builder", zap.Error(err)) + }) + } b.compact.segmentBuilder = nil } } diff --git a/src/m3ninx/index/segment/builder/builder.go b/src/m3ninx/index/segment/builder/builder.go index 891b28e7c8..5ab62e7f41 100644 --- a/src/m3ninx/index/segment/builder/builder.go +++ b/src/m3ninx/index/segment/builder/builder.go @@ -30,13 +30,13 @@ import ( "github.com/m3db/m3/src/m3ninx/index/segment" "github.com/m3db/m3/src/m3ninx/postings" "github.com/m3db/m3/src/m3ninx/util" - "go.uber.org/atomic" "github.com/cespare/xxhash" ) var ( errDocNotFound = errors.New("doc not found") + errClosed = errors.New("builder closed") ) const ( @@ -55,6 +55,11 @@ type indexJob struct { batchErr *index.BatchPartialError } +type builderStatus struct { + sync.RWMutex + closed bool +} + type builder struct { opts Options newUUIDFn util.NewUUIDFn @@ -68,7 +73,7 @@ type builder struct { uniqueFields [][][]byte indexQueues []chan indexJob - closed *atomic.Bool + status builderStatus } // NewBuilderFromDocuments returns a builder from documents, it is @@ -87,7 +92,6 @@ func NewBuilderFromDocuments(opts Options) (segment.CloseableDocumentsBuilder, e }), uniqueFields: make([][][]byte, 0, concurrency), indexQueues: make([]chan indexJob, 0, concurrency), - closed: atomic.NewBool(false), } for i := 0; i < concurrency; i++ { @@ -146,6 +150,13 @@ func (b *builder) Insert(d doc.Document) ([]byte, error) { } func (b *builder) InsertBatch(batch index.Batch) error { + b.status.RLock() + defer b.status.RUnlock() + + if b.status.closed { + return errClosed + } + // NB(r): This is all kept in a single method to make the // insertion path fast. var wg sync.WaitGroup @@ -213,10 +224,6 @@ func (b *builder) index( i int, batchErr *index.BatchPartialError, ) { - // Do-nothing if we are already closed to avoid send on closed panics. - if b.closed.Load() { - return - } wg.Add(1) // NB(bodu): To avoid locking inside of the terms, we shard the work // by field name. @@ -291,7 +298,9 @@ func (b *builder) Fields() (segment.FieldsIterator, error) { } func (b *builder) Terms(field []byte) (segment.TermsIterator, error) { - terms, ok := b.fields.Get(field) + // NB(bodu): The # of indexQueues and field map shards are equal. + shard := int(xxhash.Sum64(field) % uint64(len(b.indexQueues))) + terms, ok := b.fields.ShardedGet(shard, field) if !ok { return nil, fmt.Errorf("field not found: %s", string(field)) } @@ -304,9 +313,11 @@ func (b *builder) Terms(field []byte) (segment.TermsIterator, error) { } func (b *builder) Close() error { - b.closed.Store(true) + b.status.Lock() + defer b.status.Unlock() for _, q := range b.indexQueues { close(q) } + b.status.closed = true return nil } diff --git a/src/m3ninx/index/segment/builder/sharded_fields_map.go b/src/m3ninx/index/segment/builder/sharded_fields_map.go index a493939a15..ffa452c94c 100644 --- a/src/m3ninx/index/segment/builder/sharded_fields_map.go +++ b/src/m3ninx/index/segment/builder/sharded_fields_map.go @@ -39,15 +39,6 @@ func newShardedFieldsMap( } } -func (s *shardedFieldsMap) Get(k []byte) (*terms, bool) { - for _, fieldMap := range s.data { - if t, found := fieldMap.Get(k); found { - return t, found - } - } - return nil, false -} - func (s *shardedFieldsMap) ShardedGet( shard int, k []byte, From bbb3a8420971b583ab632d35ae46947e1a83e91d Mon Sep 17 00:00:00 2001 From: Bo Du Date: Wed, 4 Mar 2020 19:13:51 -0500 Subject: [PATCH 09/10] Update src/m3ninx/index/segment/builder/builder.go Co-Authored-By: Rob Skillington --- src/m3ninx/index/segment/builder/builder.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/m3ninx/index/segment/builder/builder.go b/src/m3ninx/index/segment/builder/builder.go index 5ab62e7f41..93161934df 100644 --- a/src/m3ninx/index/segment/builder/builder.go +++ b/src/m3ninx/index/segment/builder/builder.go @@ -255,13 +255,12 @@ func (b *builder) indexWorker(indexQueue chan indexJob) { newField := terms.size() == 0 // NB(bodu): Bulk of the cpu time during insertion is spent inside of terms.post(). err := terms.post(job.field.Value, job.id) - if err == nil { - if newField { - b.uniqueFields[job.shard] = append(b.uniqueFields[job.shard], job.field.Name) - } - } else { + if err != nil { job.batchErr.AddWithLock(index.BatchError{Err: err, Idx: job.idx}) } + if err == nil && newField { + b.uniqueFields[job.shard] = append(b.uniqueFields[job.shard], job.field.Name) + } job.wg.Done() } } From ac934c22b705547333f385fe4507ccb42f8d8cba Mon Sep 17 00:00:00 2001 From: Bo Du Date: Wed, 4 Mar 2020 19:38:59 -0500 Subject: [PATCH 10/10] Address PR comments. --- src/m3ninx/index/segment/builder/builder.go | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/m3ninx/index/segment/builder/builder.go b/src/m3ninx/index/segment/builder/builder.go index 93161934df..a7895d8808 100644 --- a/src/m3ninx/index/segment/builder/builder.go +++ b/src/m3ninx/index/segment/builder/builder.go @@ -100,13 +100,13 @@ func NewBuilderFromDocuments(opts Options) (segment.CloseableDocumentsBuilder, e go b.indexWorker(indexQueue) // Give each shard a fraction of the configured initial capacity. - shardInitialCapcity := opts.InitialCapacity() - if shardInitialCapcity > 0 { - shardInitialCapcity /= concurrency + shardInitialCapacity := opts.InitialCapacity() + if shardInitialCapacity > 0 { + shardInitialCapacity /= concurrency } - shardUniqueFields := make([][]byte, 0, shardInitialCapcity) + shardUniqueFields := make([][]byte, 0, shardInitialCapacity) b.uniqueFields = append(b.uniqueFields, shardUniqueFields) - b.fields = newShardedFieldsMap(concurrency, shardInitialCapcity) + b.fields = newShardedFieldsMap(concurrency, shardInitialCapacity) } return b, nil @@ -227,7 +227,7 @@ func (b *builder) index( wg.Add(1) // NB(bodu): To avoid locking inside of the terms, we shard the work // by field name. - shard := int(xxhash.Sum64(f.Name) % uint64(len(b.indexQueues))) + shard := b.calculateShard(f.Name) b.indexQueues[shard] <- indexJob{ wg: wg, id: id, @@ -265,6 +265,10 @@ func (b *builder) indexWorker(indexQueue chan indexJob) { } } +func (b *builder) calculateShard(field []byte) int { + return int(xxhash.Sum64(field) % uint64(len(b.indexQueues))) +} + func (b *builder) AllDocs() (index.IDDocIterator, error) { rangeIter := postings.NewRangeIterator(b.offset, b.offset+postings.ID(len(b.docs))) @@ -297,9 +301,7 @@ func (b *builder) Fields() (segment.FieldsIterator, error) { } func (b *builder) Terms(field []byte) (segment.TermsIterator, error) { - // NB(bodu): The # of indexQueues and field map shards are equal. - shard := int(xxhash.Sum64(field) % uint64(len(b.indexQueues))) - terms, ok := b.fields.ShardedGet(shard, field) + terms, ok := b.fields.ShardedGet(b.calculateShard(field), field) if !ok { return nil, fmt.Errorf("field not found: %s", string(field)) }