Skip to content

Commit

Permalink
Shard fields map and remove locks for concurrency.
Browse files Browse the repository at this point in the history
  • Loading branch information
notbdu committed Feb 19, 2020
1 parent b273c4b commit 40aef04
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 20 deletions.
29 changes: 9 additions & 20 deletions src/m3ninx/index/segment/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ type indexJob struct {
}

type builder struct {
sync.Mutex

opts Options
newUUIDFn util.NewUUIDFn

Expand All @@ -65,7 +63,7 @@ type builder struct {
batchSizeOne index.Batch
docs []doc.Document
idSet *IDsMap
fields *fieldsMap
fields *shardedFieldsMap
uniqueFields [][][]byte

wg sync.WaitGroup
Expand All @@ -87,9 +85,6 @@ func NewBuilderFromDocuments(opts Options) (segment.CloseableDocumentsBuilder, e
idSet: NewIDsMap(IDsMapOptions{
InitialSize: opts.InitialCapacity(),
}),
fields: newFieldsMap(fieldsMapOptions{
InitialSize: opts.InitialCapacity(),
}),
uniqueFields: make([][][]byte, 0, concurrency),
indexQueues: make([]chan indexJob, 0, concurrency),
closed: atomic.NewBool(false),
Expand All @@ -107,6 +102,7 @@ func NewBuilderFromDocuments(opts Options) (segment.CloseableDocumentsBuilder, e
}
shardUniqueFields := make([][]byte, 0, shardInitialCapcity)
b.uniqueFields = append(b.uniqueFields, shardUniqueFields)
b.fields = newShardedFieldsMap(concurrency, shardInitialCapcity)
}

return b, nil
Expand All @@ -126,9 +122,7 @@ func (b *builder) Reset(offset postings.ID) {
b.idSet.Reset()

// Keep fields around, just reset the terms set for each one.
for _, entry := range b.fields.Iter() {
entry.Value().reset()
}
b.fields.ResetTermsSets()

// Reset the unique fields slice
for i, shardUniqueFields := range b.uniqueFields {
Expand Down Expand Up @@ -236,19 +230,14 @@ func (b *builder) index(

func (b *builder) indexWorker(indexQueue chan indexJob) {
for job := range indexQueue {
terms, ok := b.fields.Get(job.field.Name)
terms, ok := b.fields.ShardedGet(job.shard, job.field.Name)
if !ok {
b.Lock()
// NB(bodu): Check again within the lock to make sure we aren't making concurrent map writes.
terms, ok = b.fields.Get(job.field.Name)
if !ok {
terms = newTerms(b.opts)
b.fields.SetUnsafe(job.field.Name, terms, fieldsMapSetUnsafeOptions{
NoCopyKey: true,
NoFinalizeKey: true,
})
}
b.Unlock()
terms = newTerms(b.opts)
b.fields.ShardedSetUnsafe(job.shard, job.field.Name, terms, fieldsMapSetUnsafeOptions{
NoCopyKey: true,
NoFinalizeKey: true,
})
}

// If empty field, track insertion of this key into the fields
Expand Down
55 changes: 55 additions & 0 deletions src/m3ninx/index/segment/builder/sharded_fields_map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package builder

type shardedFieldsMap struct {
data []*fieldsMap
}

func newShardedFieldsMap(
numShards int,
shardInitialCapacity int,
) *shardedFieldsMap {
data := make([]*fieldsMap, 0, numShards)
for i := 0; i < numShards; i++ {
data = append(data, newFieldsMap(fieldsMapOptions{
InitialSize: shardInitialCapacity,
}))
}
return &shardedFieldsMap{
data: data,
}
}

func (s *shardedFieldsMap) Get(k []byte) (*terms, bool) {
for _, fieldMap := range s.data {
t, found := fieldMap.Get(k)
if found {
return t, found
}
}
return nil, false
}

func (s *shardedFieldsMap) ShardedGet(
shard int,
k []byte,
) (*terms, bool) {
return s.data[shard].Get(k)
}

func (s *shardedFieldsMap) ShardedSetUnsafe(
shard int,
k []byte,
v *terms,
opts fieldsMapSetUnsafeOptions,
) {
s.data[shard].SetUnsafe(k, v, opts)
}

// ResetTerms keeps fields around but resets the terms set for each one.
func (s *shardedFieldsMap) ResetTermsSets() {
for _, fieldMap := range s.data {
for _, entry := range fieldMap.Iter() {
entry.Value().reset()
}
}
}

0 comments on commit 40aef04

Please sign in to comment.