Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Concurrent time series indexing within a single batch #2146

Merged
merged 11 commits into from
Mar 5, 2020
1 change: 1 addition & 0 deletions src/dbnode/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,7 @@ func (i *nsIndex) Flush(
if err != nil {
return err
}
defer builder.Close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.


var evicted int
for _, block := range flushable {
Expand Down
26 changes: 16 additions & 10 deletions src/dbnode/storage/index/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,18 +706,24 @@ func (b *block) cleanupForegroundCompactWithLock() {
b.foregroundSegments = nil

// Free compactor resources.
if b.compact.foregroundCompactor == nil {
return
if b.compact.foregroundCompactor != nil {
if err := b.compact.foregroundCompactor.Close(); err != nil {
instrument.EmitAndLogInvariantViolation(b.iopts, func(l *zap.Logger) {
l.Error("error closing index block foreground compactor", zap.Error(err))
})
}
b.compact.foregroundCompactor = nil
}

if err := b.compact.foregroundCompactor.Close(); err != nil {
instrument.EmitAndLogInvariantViolation(b.iopts, func(l *zap.Logger) {
l.Error("error closing index block foreground compactor", zap.Error(err))
})
// Free segment builder resources.
if b.compact.segmentBuilder != nil {
if err := b.compact.segmentBuilder.Close(); err != nil {
instrument.EmitAndLogInvariantViolation(b.iopts, func(l *zap.Logger) {
l.Error("error closing index block segment builder", zap.Error(err))
})
}
b.compact.segmentBuilder = nil
}

b.compact.foregroundCompactor = nil
b.compact.segmentBuilder = nil
}

func (b *block) executorWithRLock() (search.Executor, error) {
Expand Down Expand Up @@ -1484,7 +1490,7 @@ func (b *block) writeBatchErrorInvalidState(state blockState) error {

// blockCompact has several lazily allocated compaction components.
type blockCompact struct {
segmentBuilder segment.DocumentsBuilder
segmentBuilder segment.CloseableDocumentsBuilder
foregroundCompactor *compaction.Compactor
backgroundCompactor *compaction.Compactor
compactingForeground bool
Expand Down
13 changes: 13 additions & 0 deletions src/m3ninx/index/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"bytes"
"errors"
"fmt"
"sync"

"github.com/m3db/m3/src/m3ninx/doc"
)
Expand Down Expand Up @@ -79,6 +80,8 @@ func NewBatch(docs []doc.Document, opts ...BatchOption) Batch {
// BatchPartialError indicates an error was encountered inserting some documents in a batch.
// It is not safe for concurrent use.
type BatchPartialError struct {
sync.Mutex

errs []BatchError
}

Expand Down Expand Up @@ -138,6 +141,16 @@ func (e *BatchPartialError) Add(err BatchError) {
e.errs = append(e.errs, err)
}

// AddWithLock adds an error to e with a lock. Any nil errors are ignored.
func (e *BatchPartialError) AddWithLock(err BatchError) {
if err.Err == nil {
return
}
e.Lock()
e.errs = append(e.errs, err)
e.Unlock()
}

// Errs returns the errors with the indexes of the documents in the batch
// which were not indexed.
func (e *BatchPartialError) Errs() []BatchError {
Expand Down
188 changes: 131 additions & 57 deletions src/m3ninx/index/segment/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,43 @@ package builder
import (
"errors"
"fmt"
"sync"

"github.com/m3db/m3/src/m3ninx/doc"
"github.com/m3db/m3/src/m3ninx/index"
"github.com/m3db/m3/src/m3ninx/index/segment"
"github.com/m3db/m3/src/m3ninx/postings"
"github.com/m3db/m3/src/m3ninx/util"

"github.com/cespare/xxhash"
)

var (
errDocNotFound = errors.New("doc not found")
errClosed = errors.New("builder closed")
)

const (
// Slightly buffer the work to avoid blocking main thread.
indexQueueSize = 2 << 9 // 1024
)

type indexJob struct {
wg *sync.WaitGroup

id postings.ID
field doc.Field

shard int
idx int
batchErr *index.BatchPartialError
}

type builderStatus struct {
sync.RWMutex
closed bool
}

type builder struct {
opts Options
newUUIDFn util.NewUUIDFn
Expand All @@ -44,29 +69,47 @@ type builder struct {
batchSizeOne index.Batch
docs []doc.Document
idSet *IDsMap
fields *fieldsMap
uniqueFields [][]byte
fields *shardedFieldsMap
uniqueFields [][][]byte

indexQueues []chan indexJob
status builderStatus
}

// NewBuilderFromDocuments returns a builder from documents, it is
// not thread safe and is optimized for insertion speed and a
// final build step when documents are indexed.
func NewBuilderFromDocuments(opts Options) (segment.DocumentsBuilder, error) {
return &builder{
func NewBuilderFromDocuments(opts Options) (segment.CloseableDocumentsBuilder, error) {
concurrency := opts.Concurrency()
b := &builder{
opts: opts,
newUUIDFn: opts.NewUUIDFn(),
batchSizeOne: index.Batch{
Docs: make([]doc.Document, 1),
AllowPartialUpdates: false,
Docs: make([]doc.Document, 1),
},
idSet: NewIDsMap(IDsMapOptions{
InitialSize: opts.InitialCapacity(),
}),
fields: newFieldsMap(fieldsMapOptions{
InitialSize: opts.InitialCapacity(),
}),
uniqueFields: make([][]byte, 0, opts.InitialCapacity()),
}, nil
uniqueFields: make([][][]byte, 0, concurrency),
indexQueues: make([]chan indexJob, 0, concurrency),
}

for i := 0; i < concurrency; i++ {
indexQueue := make(chan indexJob, indexQueueSize)
b.indexQueues = append(b.indexQueues, indexQueue)
go b.indexWorker(indexQueue)
robskillington marked this conversation as resolved.
Show resolved Hide resolved

// Give each shard a fraction of the configured initial capacity.
shardInitialCapcity := opts.InitialCapacity()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shardInitialCapcity should be shardInitialCapacity?

if shardInitialCapcity > 0 {
shardInitialCapcity /= concurrency
}
shardUniqueFields := make([][]byte, 0, shardInitialCapcity)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be more performant if you allocate the entire [][]byte as a full block, then index into it; i.e.

shardUniqueFields := make([][]byte, 0, concurrency * shardInitialCapcity)
for i := 0; i < concurrency; i++ {
 //...
 b.uniqueFields = append(b.uniqueFields, shardUniqueFields[i*concurrency:(i+1)*concurrency])
 // ...
}

Copy link
Contributor Author

@notbdu notbdu Feb 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't be able to grow a shard in this case. Or it would be difficult to do so.

b.uniqueFields = append(b.uniqueFields, shardUniqueFields)
b.fields = newShardedFieldsMap(concurrency, shardInitialCapcity)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be possible to bulk-allocate these similarly to b.uniqueFields?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as before. Growing a shard would be very painful.

}

return b, nil
}

func (b *builder) Reset(offset postings.ID) {
Expand All @@ -83,15 +126,15 @@ func (b *builder) Reset(offset postings.ID) {
b.idSet.Reset()

// Keep fields around, just reset the terms set for each one.
for _, entry := range b.fields.Iter() {
entry.Value().reset()
}
b.fields.ResetTermsSets()

// Reset the unique fields slice
for i := range b.uniqueFields {
b.uniqueFields[i] = nil
for i, shardUniqueFields := range b.uniqueFields {
for i := range shardUniqueFields {
shardUniqueFields[i] = nil
}
b.uniqueFields[i] = shardUniqueFields[:0]
}
b.uniqueFields = b.uniqueFields[:0]
}

func (b *builder) Insert(d doc.Document) ([]byte, error) {
Expand All @@ -107,15 +150,20 @@ func (b *builder) Insert(d doc.Document) ([]byte, error) {
}

func (b *builder) InsertBatch(batch index.Batch) error {
b.status.RLock()
defer b.status.RUnlock()

if b.status.closed {
return errClosed
}

// NB(r): This is all kept in a single method to make the
// insertion path fast.
var wg sync.WaitGroup
batchErr := index.NewBatchPartialError()
for i, d := range batch.Docs {
// Validate doc
if err := d.Validate(); err != nil {
if !batch.AllowPartialUpdates {
return err
}
batchErr.Add(index.BatchError{Err: err, Idx: i})
continue
}
Expand All @@ -124,9 +172,6 @@ func (b *builder) InsertBatch(batch index.Batch) error {
if !d.HasID() {
id, err := b.newUUIDFn()
if err != nil {
if !batch.AllowPartialUpdates {
return err
}
batchErr.Add(index.BatchError{Err: err, Idx: i})
continue
}
Expand All @@ -139,9 +184,6 @@ func (b *builder) InsertBatch(batch index.Batch) error {

// Avoid duplicates.
if _, ok := b.idSet.Get(d.ID); ok {
if !batch.AllowPartialUpdates {
return index.ErrDuplicateID
}
batchErr.Add(index.BatchError{Err: index.ErrDuplicateID, Idx: i})
continue
}
Expand All @@ -158,50 +200,70 @@ func (b *builder) InsertBatch(batch index.Batch) error {

// Index the terms.
for _, f := range d.Fields {
if err := b.index(postings.ID(postingsListID), f); err != nil {
if !batch.AllowPartialUpdates {
return err
}
batchErr.Add(index.BatchError{Err: err, Idx: i})
}
b.index(&wg, postings.ID(postingsListID), f, i, batchErr)
}
if err := b.index(postings.ID(postingsListID), doc.Field{
b.index(&wg, postings.ID(postingsListID), doc.Field{
Name: doc.IDReservedFieldName,
Value: d.ID,
}); err != nil {
if !batch.AllowPartialUpdates {
return err
}
batchErr.Add(index.BatchError{Err: err, Idx: i})
}
}, i, batchErr)
}

// Wait for all the concurrent indexing jobs to finish.
wg.Wait()

if !batchErr.IsEmpty() {
return batchErr
}
return nil
}

func (b *builder) index(id postings.ID, f doc.Field) error {
terms, ok := b.fields.Get(f.Name)
if !ok {
terms = newTerms(b.opts)
b.fields.SetUnsafe(f.Name, terms, fieldsMapSetUnsafeOptions{
NoCopyKey: true,
NoFinalizeKey: true,
})
func (b *builder) index(
wg *sync.WaitGroup,
id postings.ID,
f doc.Field,
i int,
batchErr *index.BatchPartialError,
) {
wg.Add(1)
// NB(bodu): To avoid locking inside of the terms, we shard the work
// by field name.
shard := int(xxhash.Sum64(f.Name) % uint64(len(b.indexQueues)))
b.indexQueues[shard] <- indexJob{
wg: wg,
id: id,
field: f,
shard: shard,
idx: i,
batchErr: batchErr,
}
}

// If empty field, track insertion of this key into the fields
// collection for correct response when retrieving all fields.
newField := terms.size() == 0
if err := terms.post(f.Value, id); err != nil {
return err
}
if newField {
b.uniqueFields = append(b.uniqueFields, f.Name)
func (b *builder) indexWorker(indexQueue chan indexJob) {
for job := range indexQueue {
terms, ok := b.fields.ShardedGet(job.shard, job.field.Name)
if !ok {
// NB(bodu): Check again within the lock to make sure we aren't making concurrent map writes.
terms = newTerms(b.opts)
b.fields.ShardedSetUnsafe(job.shard, job.field.Name, terms, fieldsMapSetUnsafeOptions{
NoCopyKey: true,
NoFinalizeKey: true,
})
}

// If empty field, track insertion of this key into the fields
// collection for correct response when retrieving all fields.
newField := terms.size() == 0
// NB(bodu): Bulk of the cpu time during insertion is spent inside of terms.post().
err := terms.post(job.field.Value, job.id)
if err == nil {
if newField {
b.uniqueFields[job.shard] = append(b.uniqueFields[job.shard], job.field.Name)
}
} else {
job.batchErr.AddWithLock(index.BatchError{Err: err, Idx: job.idx})
}
notbdu marked this conversation as resolved.
Show resolved Hide resolved
job.wg.Done()
}
return nil
}

func (b *builder) AllDocs() (index.IDDocIterator, error) {
Expand Down Expand Up @@ -236,7 +298,9 @@ func (b *builder) Fields() (segment.FieldsIterator, error) {
}

func (b *builder) Terms(field []byte) (segment.TermsIterator, error) {
terms, ok := b.fields.Get(field)
// NB(bodu): The # of indexQueues and field map shards are equal.
shard := int(xxhash.Sum64(field) % uint64(len(b.indexQueues)))
Copy link
Collaborator

@robskillington robskillington Mar 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this a method on the builder to reuse this code? I see it replicated on this line and line 230 i.e.

func (b *builder) shardForField(field []byte) {
  return int(xxhash.Sum64(field) % uint64(len(b.indexQueues)))
}

terms, ok := b.fields.ShardedGet(shard, field)
if !ok {
return nil, fmt.Errorf("field not found: %s", string(field))
}
Expand All @@ -247,3 +311,13 @@ func (b *builder) Terms(field []byte) (segment.TermsIterator, error) {

return newTermsIter(terms.uniqueTerms), nil
}

func (b *builder) Close() error {
b.status.Lock()
defer b.status.Unlock()
for _, q := range b.indexQueues {
close(q)
robskillington marked this conversation as resolved.
Show resolved Hide resolved
}
b.status.closed = true
return nil
}
6 changes: 6 additions & 0 deletions src/m3ninx/index/segment/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ var (
func TestBuilderFields(t *testing.T) {
builder, err := NewBuilderFromDocuments(testOptions)
require.NoError(t, err)
defer func() {
require.NoError(t, builder.Close())
}()

for i := 0; i < 10; i++ {
builder.Reset(0)
Expand Down Expand Up @@ -106,6 +109,9 @@ func TestBuilderFields(t *testing.T) {
func TestBuilderTerms(t *testing.T) {
builder, err := NewBuilderFromDocuments(testOptions)
require.NoError(t, err)
defer func() {
require.NoError(t, builder.Close())
}()

for i := 0; i < 10; i++ {
builder.Reset(0)
Expand Down
Loading