Skip to content

Commit

Permalink
Concurrent indexing.
Browse files Browse the repository at this point in the history
  • Loading branch information
notbdu committed Feb 13, 2020
1 parent 8ffa7be commit 619d427
Show file tree
Hide file tree
Showing 7 changed files with 271 additions and 56 deletions.
1 change: 1 addition & 0 deletions src/dbnode/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,7 @@ func (i *nsIndex) Flush(

builderOpts := i.opts.IndexOptions().SegmentBuilderOptions()
builder, err := builder.NewBuilderFromDocuments(builderOpts)
defer builder.Close()
if err != nil {
return err
}
Expand Down
22 changes: 12 additions & 10 deletions src/dbnode/storage/index/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,18 +704,20 @@ func (b *block) cleanupForegroundCompactWithLock() {
b.foregroundSegments = nil

// Free compactor resources.
if b.compact.foregroundCompactor == nil {
return
if b.compact.foregroundCompactor != nil {
if err := b.compact.foregroundCompactor.Close(); err != nil {
instrument.EmitAndLogInvariantViolation(b.iopts, func(l *zap.Logger) {
l.Error("error closing index block foreground compactor", zap.Error(err))
})
}
b.compact.foregroundCompactor = nil
}

if err := b.compact.foregroundCompactor.Close(); err != nil {
instrument.EmitAndLogInvariantViolation(b.iopts, func(l *zap.Logger) {
l.Error("error closing index block foreground compactor", zap.Error(err))
})
// Free segment builder resources.
if b.compact.segmentBuilder != nil {
b.compact.segmentBuilder.Close()
b.compact.segmentBuilder = nil
}

b.compact.foregroundCompactor = nil
b.compact.segmentBuilder = nil
}

func (b *block) executorWithRLock() (search.Executor, error) {
Expand Down Expand Up @@ -1482,7 +1484,7 @@ func (b *block) writeBatchErrorInvalidState(state blockState) error {

// blockCompact has several lazily allocated compaction components.
type blockCompact struct {
segmentBuilder segment.DocumentsBuilder
segmentBuilder segment.CloseableDocumentsBuilder
foregroundCompactor *compaction.Compactor
backgroundCompactor *compaction.Compactor
compactingForeground bool
Expand Down
13 changes: 13 additions & 0 deletions src/m3ninx/index/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"bytes"
"errors"
"fmt"
"sync"

"github.com/m3db/m3/src/m3ninx/doc"
)
Expand Down Expand Up @@ -79,6 +80,8 @@ func NewBatch(docs []doc.Document, opts ...BatchOption) Batch {
// BatchPartialError indicates an error was encountered inserting some documents in a batch.
// It is not safe for concurrent use.
type BatchPartialError struct {
sync.Mutex

errs []BatchError
}

Expand Down Expand Up @@ -138,6 +141,16 @@ func (e *BatchPartialError) Add(err BatchError) {
e.errs = append(e.errs, err)
}

// AddWithLock adds an error to e with a lock. Any nil errors are ignored.
func (e *BatchPartialError) AddWithLock(err BatchError) {
e.Lock()
defer e.Unlock()
if err.Err == nil {
return
}
e.errs = append(e.errs, err)
}

// Errs returns the errors with the indexes of the documents in the batch
// which were not indexed.
func (e *BatchPartialError) Errs() []BatchError {
Expand Down
141 changes: 97 additions & 44 deletions src/m3ninx/index/segment/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,39 @@ package builder
import (
"errors"
"fmt"
"runtime"
"sync"

"github.com/m3db/m3/src/m3ninx/doc"
"github.com/m3db/m3/src/m3ninx/index"
"github.com/m3db/m3/src/m3ninx/index/segment"
"github.com/m3db/m3/src/m3ninx/postings"
"github.com/m3db/m3/src/m3ninx/util"
"go.uber.org/atomic"

"github.com/cespare/xxhash"
)

var (
errDocNotFound = errors.New("doc not found")
)

const (
// Slightly buffer the work to avoid blocking main thread.
indexQueueSize = 2 << 7
)

type indexJob struct {
id postings.ID
field doc.Field

idx int
batchErr *index.BatchPartialError
}

type builder struct {
sync.Mutex

opts Options
newUUIDFn util.NewUUIDFn

Expand All @@ -46,18 +66,22 @@ type builder struct {
idSet *IDsMap
fields *fieldsMap
uniqueFields [][]byte

wg sync.WaitGroup
indexQueues []chan indexJob
closed *atomic.Bool
}

// NewBuilderFromDocuments returns a builder from documents, it is
// not thread safe and is optimized for insertion speed and a
// final build step when documents are indexed.
func NewBuilderFromDocuments(opts Options) (segment.DocumentsBuilder, error) {
return &builder{
func NewBuilderFromDocuments(opts Options) (segment.CloseableDocumentsBuilder, error) {
concurrency := runtime.NumCPU()
b := &builder{
opts: opts,
newUUIDFn: opts.NewUUIDFn(),
batchSizeOne: index.Batch{
Docs: make([]doc.Document, 1),
AllowPartialUpdates: false,
Docs: make([]doc.Document, 1),
},
idSet: NewIDsMap(IDsMapOptions{
InitialSize: opts.InitialCapacity(),
Expand All @@ -66,7 +90,17 @@ func NewBuilderFromDocuments(opts Options) (segment.DocumentsBuilder, error) {
InitialSize: opts.InitialCapacity(),
}),
uniqueFields: make([][]byte, 0, opts.InitialCapacity()),
}, nil
indexQueues: make([]chan indexJob, 0, concurrency),
closed: atomic.NewBool(false),
}

for i := 0; i < concurrency; i++ {
indexQueue := make(chan indexJob, indexQueueSize)
b.indexQueues = append(b.indexQueues, indexQueue)
go b.indexWorker(indexQueue)
}

return b, nil
}

func (b *builder) Reset(offset postings.ID) {
Expand Down Expand Up @@ -113,9 +147,6 @@ func (b *builder) InsertBatch(batch index.Batch) error {
for i, d := range batch.Docs {
// Validate doc
if err := d.Validate(); err != nil {
if !batch.AllowPartialUpdates {
return err
}
batchErr.Add(index.BatchError{Err: err, Idx: i})
continue
}
Expand All @@ -124,9 +155,6 @@ func (b *builder) InsertBatch(batch index.Batch) error {
if !d.HasID() {
id, err := b.newUUIDFn()
if err != nil {
if !batch.AllowPartialUpdates {
return err
}
batchErr.Add(index.BatchError{Err: err, Idx: i})
continue
}
Expand All @@ -139,9 +167,6 @@ func (b *builder) InsertBatch(batch index.Batch) error {

// Avoid duplicates.
if _, ok := b.idSet.Get(d.ID); ok {
if !batch.AllowPartialUpdates {
return index.ErrDuplicateID
}
batchErr.Add(index.BatchError{Err: index.ErrDuplicateID, Idx: i})
continue
}
Expand All @@ -158,50 +183,70 @@ func (b *builder) InsertBatch(batch index.Batch) error {

// Index the terms.
for _, f := range d.Fields {
if err := b.index(postings.ID(postingsListID), f); err != nil {
if !batch.AllowPartialUpdates {
return err
}
batchErr.Add(index.BatchError{Err: err, Idx: i})
}
b.index(postings.ID(postingsListID), f, i, batchErr)
}
if err := b.index(postings.ID(postingsListID), doc.Field{
b.index(postings.ID(postingsListID), doc.Field{
Name: doc.IDReservedFieldName,
Value: d.ID,
}); err != nil {
if !batch.AllowPartialUpdates {
return err
}
batchErr.Add(index.BatchError{Err: err, Idx: i})
}
}, i, batchErr)
}

// Wait for all the concurrent indexing jobs to finish.
b.wg.Wait()

if !batchErr.IsEmpty() {
return batchErr
}
return nil
}

func (b *builder) index(id postings.ID, f doc.Field) error {
terms, ok := b.fields.Get(f.Name)
if !ok {
terms = newTerms(b.opts)
b.fields.SetUnsafe(f.Name, terms, fieldsMapSetUnsafeOptions{
NoCopyKey: true,
NoFinalizeKey: true,
})
func (b *builder) index(
id postings.ID,
f doc.Field,
i int,
batchErr *index.BatchPartialError,
) {
// Do-nothing if we are already closed to avoid send on closed panics.
if !b.closed.Load() {
return
}

// If empty field, track insertion of this key into the fields
// collection for correct response when retrieving all fields.
newField := terms.size() == 0
if err := terms.post(f.Value, id); err != nil {
return err
b.wg.Add(1)
// NB(bodu): To avoid locking inside of the terms, we shard the work
// by field name.
shard := xxhash.Sum64(f.Name) % uint64(len(b.indexQueues))
b.indexQueues[shard] <- indexJob{
id: id,
field: f,
idx: i,
batchErr: batchErr,
}
if newField {
b.uniqueFields = append(b.uniqueFields, f.Name)
}

func (b *builder) indexWorker(indexQueue chan indexJob) {
for job := range indexQueue {
terms, ok := b.fields.Get(job.field.Name)
if !ok {
terms = newTerms(b.opts)
b.fields.SetUnsafe(job.field.Name, terms, fieldsMapSetUnsafeOptions{
NoCopyKey: true,
NoFinalizeKey: true,
})
}

// If empty field, track insertion of this key into the fields
// collection for correct response when retrieving all fields.
newField := terms.size() == 0
// NB(bodu): Bulk of the cpu time during insertion is spent inside of terms.post().
if err := terms.post(job.field.Value, job.id); err != nil {
job.batchErr.AddWithLock(index.BatchError{Err: err, Idx: job.idx})
}
if newField {
b.Lock()
b.uniqueFields = append(b.uniqueFields, job.field.Name)
b.Unlock()
}
b.wg.Done()
}
return nil
}

func (b *builder) AllDocs() (index.IDDocIterator, error) {
Expand Down Expand Up @@ -247,3 +292,11 @@ func (b *builder) Terms(field []byte) (segment.TermsIterator, error) {

return newTermsIter(terms.uniqueTerms), nil
}

func (b *builder) Close() error {
b.closed.Store(true)
for _, q := range b.indexQueues {
close(q)
}
return nil
}
2 changes: 2 additions & 0 deletions src/m3ninx/index/segment/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ var (

func TestBuilderFields(t *testing.T) {
builder, err := NewBuilderFromDocuments(testOptions)
defer builder.Close()
require.NoError(t, err)

for i := 0; i < 10; i++ {
Expand Down Expand Up @@ -105,6 +106,7 @@ func TestBuilderFields(t *testing.T) {

func TestBuilderTerms(t *testing.T) {
builder, err := NewBuilderFromDocuments(testOptions)
defer builder.Close()
require.NoError(t, err)

for i := 0; i < 10; i++ {
Expand Down
Loading

0 comments on commit 619d427

Please sign in to comment.