diff --git a/src/m3ninx/index/segment/builder/builder.go b/src/m3ninx/index/segment/builder/builder.go index e01d4846ab..2ae2a4a5e3 100644 --- a/src/m3ninx/index/segment/builder/builder.go +++ b/src/m3ninx/index/segment/builder/builder.go @@ -55,8 +55,6 @@ type indexJob struct { } type builder struct { - sync.Mutex - opts Options newUUIDFn util.NewUUIDFn @@ -65,7 +63,7 @@ type builder struct { batchSizeOne index.Batch docs []doc.Document idSet *IDsMap - fields *fieldsMap + fields *shardedFieldsMap uniqueFields [][][]byte wg sync.WaitGroup @@ -87,9 +85,6 @@ func NewBuilderFromDocuments(opts Options) (segment.CloseableDocumentsBuilder, e idSet: NewIDsMap(IDsMapOptions{ InitialSize: opts.InitialCapacity(), }), - fields: newFieldsMap(fieldsMapOptions{ - InitialSize: opts.InitialCapacity(), - }), uniqueFields: make([][][]byte, 0, concurrency), indexQueues: make([]chan indexJob, 0, concurrency), closed: atomic.NewBool(false), @@ -107,6 +102,7 @@ func NewBuilderFromDocuments(opts Options) (segment.CloseableDocumentsBuilder, e } shardUniqueFields := make([][]byte, 0, shardInitialCapcity) b.uniqueFields = append(b.uniqueFields, shardUniqueFields) + b.fields = newShardedFieldsMap(concurrency, shardInitialCapcity) } return b, nil @@ -126,9 +122,7 @@ func (b *builder) Reset(offset postings.ID) { b.idSet.Reset() // Keep fields around, just reset the terms set for each one. - for _, entry := range b.fields.Iter() { - entry.Value().reset() - } + b.fields.ResetTermsSets() // Reset the unique fields slice for i, shardUniqueFields := range b.uniqueFields { @@ -236,19 +230,14 @@ func (b *builder) index( func (b *builder) indexWorker(indexQueue chan indexJob) { for job := range indexQueue { - terms, ok := b.fields.Get(job.field.Name) + terms, ok := b.fields.ShardedGet(job.shard, job.field.Name) if !ok { - b.Lock() // NB(bodu): Check again within the lock to make sure we aren't making concurrent map writes. - terms, ok = b.fields.Get(job.field.Name) - if !ok { - terms = newTerms(b.opts) - b.fields.SetUnsafe(job.field.Name, terms, fieldsMapSetUnsafeOptions{ - NoCopyKey: true, - NoFinalizeKey: true, - }) - } - b.Unlock() + terms = newTerms(b.opts) + b.fields.ShardedSetUnsafe(job.shard, job.field.Name, terms, fieldsMapSetUnsafeOptions{ + NoCopyKey: true, + NoFinalizeKey: true, + }) } // If empty field, track insertion of this key into the fields diff --git a/src/m3ninx/index/segment/builder/sharded_fields_map.go b/src/m3ninx/index/segment/builder/sharded_fields_map.go new file mode 100644 index 0000000000..f7f3528555 --- /dev/null +++ b/src/m3ninx/index/segment/builder/sharded_fields_map.go @@ -0,0 +1,55 @@ +package builder + +type shardedFieldsMap struct { + data []*fieldsMap +} + +func newShardedFieldsMap( + numShards int, + shardInitialCapacity int, +) *shardedFieldsMap { + data := make([]*fieldsMap, 0, numShards) + for i := 0; i < numShards; i++ { + data = append(data, newFieldsMap(fieldsMapOptions{ + InitialSize: shardInitialCapacity, + })) + } + return &shardedFieldsMap{ + data: data, + } +} + +func (s *shardedFieldsMap) Get(k []byte) (*terms, bool) { + for _, fieldMap := range s.data { + t, found := fieldMap.Get(k) + if found { + return t, found + } + } + return nil, false +} + +func (s *shardedFieldsMap) ShardedGet( + shard int, + k []byte, +) (*terms, bool) { + return s.data[shard].Get(k) +} + +func (s *shardedFieldsMap) ShardedSetUnsafe( + shard int, + k []byte, + v *terms, + opts fieldsMapSetUnsafeOptions, +) { + s.data[shard].SetUnsafe(k, v, opts) +} + +// ResetTerms keeps fields around but resets the terms set for each one. +func (s *shardedFieldsMap) ResetTermsSets() { + for _, fieldMap := range s.data { + for _, entry := range fieldMap.Iter() { + entry.Value().reset() + } + } +}