-
Notifications
You must be signed in to change notification settings - Fork 454
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[dbnode] Concurrent time series indexing within a single batch #2146
Merged
Merged
Changes from all commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
dbaeab1
Concurrent indexing.
notbdu 9662f7a
Shard unique fields and add custom 2d sort.
notbdu c93ab02
Shard fields map and remove locks for concurrency.
notbdu 81692ed
Add header comment.
notbdu 8be4138
Address PR comments.
notbdu 97d56ff
Remove wg on builder.
notbdu f8a84d8
Need to defer inside of a function or the Close gets evaluated immedi…
notbdu 77ffa29
Address PR comments.
notbdu 6eab81a
Merge branch 'master' into bdu/concurrent-index
notbdu bbb3a84
Update src/m3ninx/index/segment/builder/builder.go
notbdu ac934c2
Address PR comments.
notbdu File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,18 +23,43 @@ package builder | |
import ( | ||
"errors" | ||
"fmt" | ||
"sync" | ||
|
||
"github.com/m3db/m3/src/m3ninx/doc" | ||
"github.com/m3db/m3/src/m3ninx/index" | ||
"github.com/m3db/m3/src/m3ninx/index/segment" | ||
"github.com/m3db/m3/src/m3ninx/postings" | ||
"github.com/m3db/m3/src/m3ninx/util" | ||
|
||
"github.com/cespare/xxhash" | ||
) | ||
|
||
var ( | ||
errDocNotFound = errors.New("doc not found") | ||
errClosed = errors.New("builder closed") | ||
) | ||
|
||
const ( | ||
// Slightly buffer the work to avoid blocking main thread. | ||
indexQueueSize = 2 << 9 // 1024 | ||
) | ||
|
||
type indexJob struct { | ||
wg *sync.WaitGroup | ||
|
||
id postings.ID | ||
field doc.Field | ||
|
||
shard int | ||
idx int | ||
batchErr *index.BatchPartialError | ||
} | ||
|
||
type builderStatus struct { | ||
sync.RWMutex | ||
closed bool | ||
} | ||
|
||
type builder struct { | ||
opts Options | ||
newUUIDFn util.NewUUIDFn | ||
|
@@ -44,29 +69,47 @@ type builder struct { | |
batchSizeOne index.Batch | ||
docs []doc.Document | ||
idSet *IDsMap | ||
fields *fieldsMap | ||
uniqueFields [][]byte | ||
fields *shardedFieldsMap | ||
uniqueFields [][][]byte | ||
|
||
indexQueues []chan indexJob | ||
status builderStatus | ||
} | ||
|
||
// NewBuilderFromDocuments returns a builder from documents, it is | ||
// not thread safe and is optimized for insertion speed and a | ||
// final build step when documents are indexed. | ||
func NewBuilderFromDocuments(opts Options) (segment.DocumentsBuilder, error) { | ||
return &builder{ | ||
func NewBuilderFromDocuments(opts Options) (segment.CloseableDocumentsBuilder, error) { | ||
concurrency := opts.Concurrency() | ||
b := &builder{ | ||
opts: opts, | ||
newUUIDFn: opts.NewUUIDFn(), | ||
batchSizeOne: index.Batch{ | ||
Docs: make([]doc.Document, 1), | ||
AllowPartialUpdates: false, | ||
Docs: make([]doc.Document, 1), | ||
}, | ||
idSet: NewIDsMap(IDsMapOptions{ | ||
InitialSize: opts.InitialCapacity(), | ||
}), | ||
fields: newFieldsMap(fieldsMapOptions{ | ||
InitialSize: opts.InitialCapacity(), | ||
}), | ||
uniqueFields: make([][]byte, 0, opts.InitialCapacity()), | ||
}, nil | ||
uniqueFields: make([][][]byte, 0, concurrency), | ||
indexQueues: make([]chan indexJob, 0, concurrency), | ||
} | ||
|
||
for i := 0; i < concurrency; i++ { | ||
indexQueue := make(chan indexJob, indexQueueSize) | ||
b.indexQueues = append(b.indexQueues, indexQueue) | ||
go b.indexWorker(indexQueue) | ||
robskillington marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// Give each shard a fraction of the configured initial capacity. | ||
shardInitialCapacity := opts.InitialCapacity() | ||
if shardInitialCapacity > 0 { | ||
shardInitialCapacity /= concurrency | ||
} | ||
shardUniqueFields := make([][]byte, 0, shardInitialCapacity) | ||
b.uniqueFields = append(b.uniqueFields, shardUniqueFields) | ||
b.fields = newShardedFieldsMap(concurrency, shardInitialCapacity) | ||
} | ||
|
||
return b, nil | ||
} | ||
|
||
func (b *builder) Reset(offset postings.ID) { | ||
|
@@ -83,15 +126,15 @@ func (b *builder) Reset(offset postings.ID) { | |
b.idSet.Reset() | ||
|
||
// Keep fields around, just reset the terms set for each one. | ||
for _, entry := range b.fields.Iter() { | ||
entry.Value().reset() | ||
} | ||
b.fields.ResetTermsSets() | ||
|
||
// Reset the unique fields slice | ||
for i := range b.uniqueFields { | ||
b.uniqueFields[i] = nil | ||
for i, shardUniqueFields := range b.uniqueFields { | ||
for i := range shardUniqueFields { | ||
shardUniqueFields[i] = nil | ||
} | ||
b.uniqueFields[i] = shardUniqueFields[:0] | ||
} | ||
b.uniqueFields = b.uniqueFields[:0] | ||
} | ||
|
||
func (b *builder) Insert(d doc.Document) ([]byte, error) { | ||
|
@@ -107,15 +150,20 @@ func (b *builder) Insert(d doc.Document) ([]byte, error) { | |
} | ||
|
||
func (b *builder) InsertBatch(batch index.Batch) error { | ||
b.status.RLock() | ||
defer b.status.RUnlock() | ||
|
||
if b.status.closed { | ||
return errClosed | ||
} | ||
|
||
// NB(r): This is all kept in a single method to make the | ||
// insertion path fast. | ||
var wg sync.WaitGroup | ||
batchErr := index.NewBatchPartialError() | ||
for i, d := range batch.Docs { | ||
// Validate doc | ||
if err := d.Validate(); err != nil { | ||
if !batch.AllowPartialUpdates { | ||
return err | ||
} | ||
batchErr.Add(index.BatchError{Err: err, Idx: i}) | ||
continue | ||
} | ||
|
@@ -124,9 +172,6 @@ func (b *builder) InsertBatch(batch index.Batch) error { | |
if !d.HasID() { | ||
id, err := b.newUUIDFn() | ||
if err != nil { | ||
if !batch.AllowPartialUpdates { | ||
return err | ||
} | ||
batchErr.Add(index.BatchError{Err: err, Idx: i}) | ||
continue | ||
} | ||
|
@@ -139,9 +184,6 @@ func (b *builder) InsertBatch(batch index.Batch) error { | |
|
||
// Avoid duplicates. | ||
if _, ok := b.idSet.Get(d.ID); ok { | ||
if !batch.AllowPartialUpdates { | ||
return index.ErrDuplicateID | ||
} | ||
batchErr.Add(index.BatchError{Err: index.ErrDuplicateID, Idx: i}) | ||
continue | ||
} | ||
|
@@ -158,50 +200,73 @@ func (b *builder) InsertBatch(batch index.Batch) error { | |
|
||
// Index the terms. | ||
for _, f := range d.Fields { | ||
if err := b.index(postings.ID(postingsListID), f); err != nil { | ||
if !batch.AllowPartialUpdates { | ||
return err | ||
} | ||
batchErr.Add(index.BatchError{Err: err, Idx: i}) | ||
} | ||
b.index(&wg, postings.ID(postingsListID), f, i, batchErr) | ||
} | ||
if err := b.index(postings.ID(postingsListID), doc.Field{ | ||
b.index(&wg, postings.ID(postingsListID), doc.Field{ | ||
Name: doc.IDReservedFieldName, | ||
Value: d.ID, | ||
}); err != nil { | ||
if !batch.AllowPartialUpdates { | ||
return err | ||
} | ||
batchErr.Add(index.BatchError{Err: err, Idx: i}) | ||
} | ||
}, i, batchErr) | ||
} | ||
|
||
// Wait for all the concurrent indexing jobs to finish. | ||
wg.Wait() | ||
|
||
if !batchErr.IsEmpty() { | ||
return batchErr | ||
} | ||
return nil | ||
} | ||
|
||
func (b *builder) index(id postings.ID, f doc.Field) error { | ||
terms, ok := b.fields.Get(f.Name) | ||
if !ok { | ||
terms = newTerms(b.opts) | ||
b.fields.SetUnsafe(f.Name, terms, fieldsMapSetUnsafeOptions{ | ||
NoCopyKey: true, | ||
NoFinalizeKey: true, | ||
}) | ||
func (b *builder) index( | ||
wg *sync.WaitGroup, | ||
id postings.ID, | ||
f doc.Field, | ||
i int, | ||
batchErr *index.BatchPartialError, | ||
) { | ||
wg.Add(1) | ||
// NB(bodu): To avoid locking inside of the terms, we shard the work | ||
// by field name. | ||
shard := b.calculateShard(f.Name) | ||
b.indexQueues[shard] <- indexJob{ | ||
wg: wg, | ||
id: id, | ||
field: f, | ||
shard: shard, | ||
idx: i, | ||
batchErr: batchErr, | ||
} | ||
} | ||
|
||
// If empty field, track insertion of this key into the fields | ||
// collection for correct response when retrieving all fields. | ||
newField := terms.size() == 0 | ||
if err := terms.post(f.Value, id); err != nil { | ||
return err | ||
} | ||
if newField { | ||
b.uniqueFields = append(b.uniqueFields, f.Name) | ||
func (b *builder) indexWorker(indexQueue chan indexJob) { | ||
for job := range indexQueue { | ||
terms, ok := b.fields.ShardedGet(job.shard, job.field.Name) | ||
if !ok { | ||
// NB(bodu): Check again within the lock to make sure we aren't making concurrent map writes. | ||
terms = newTerms(b.opts) | ||
b.fields.ShardedSetUnsafe(job.shard, job.field.Name, terms, fieldsMapSetUnsafeOptions{ | ||
NoCopyKey: true, | ||
NoFinalizeKey: true, | ||
}) | ||
} | ||
|
||
// If empty field, track insertion of this key into the fields | ||
// collection for correct response when retrieving all fields. | ||
newField := terms.size() == 0 | ||
// NB(bodu): Bulk of the cpu time during insertion is spent inside of terms.post(). | ||
err := terms.post(job.field.Value, job.id) | ||
if err != nil { | ||
job.batchErr.AddWithLock(index.BatchError{Err: err, Idx: job.idx}) | ||
} | ||
if err == nil && newField { | ||
b.uniqueFields[job.shard] = append(b.uniqueFields[job.shard], job.field.Name) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this happen if the post failed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, good call, moving around the logic here. |
||
} | ||
job.wg.Done() | ||
} | ||
return nil | ||
} | ||
|
||
func (b *builder) calculateShard(field []byte) int { | ||
return int(xxhash.Sum64(field) % uint64(len(b.indexQueues))) | ||
} | ||
|
||
func (b *builder) AllDocs() (index.IDDocIterator, error) { | ||
|
@@ -236,7 +301,7 @@ func (b *builder) Fields() (segment.FieldsIterator, error) { | |
} | ||
|
||
func (b *builder) Terms(field []byte) (segment.TermsIterator, error) { | ||
terms, ok := b.fields.Get(field) | ||
terms, ok := b.fields.ShardedGet(b.calculateShard(field), field) | ||
if !ok { | ||
return nil, fmt.Errorf("field not found: %s", string(field)) | ||
} | ||
|
@@ -247,3 +312,13 @@ func (b *builder) Terms(field []byte) (segment.TermsIterator, error) { | |
|
||
return newTermsIter(terms.uniqueTerms), nil | ||
} | ||
|
||
func (b *builder) Close() error { | ||
b.status.Lock() | ||
defer b.status.Unlock() | ||
for _, q := range b.indexQueues { | ||
close(q) | ||
robskillington marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
b.status.closed = true | ||
return nil | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch.