Skip to content

Commit

Permalink
Shard unique fields and add custom 2d sort.
Browse files Browse the repository at this point in the history
  • Loading branch information
notbdu committed Feb 13, 2020
1 parent 619d427 commit 6a95d08
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 25 deletions.
46 changes: 31 additions & 15 deletions src/m3ninx/index/segment/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type indexJob struct {
id postings.ID
field doc.Field

shard int
idx int
batchErr *index.BatchPartialError
}
Expand All @@ -65,7 +66,7 @@ type builder struct {
docs []doc.Document
idSet *IDsMap
fields *fieldsMap
uniqueFields [][]byte
uniqueFields [][][]byte

wg sync.WaitGroup
indexQueues []chan indexJob
Expand All @@ -89,7 +90,7 @@ func NewBuilderFromDocuments(opts Options) (segment.CloseableDocumentsBuilder, e
fields: newFieldsMap(fieldsMapOptions{
InitialSize: opts.InitialCapacity(),
}),
uniqueFields: make([][]byte, 0, opts.InitialCapacity()),
uniqueFields: make([][][]byte, 0, concurrency),
indexQueues: make([]chan indexJob, 0, concurrency),
closed: atomic.NewBool(false),
}
Expand All @@ -98,6 +99,14 @@ func NewBuilderFromDocuments(opts Options) (segment.CloseableDocumentsBuilder, e
indexQueue := make(chan indexJob, indexQueueSize)
b.indexQueues = append(b.indexQueues, indexQueue)
go b.indexWorker(indexQueue)

// Give each shard a fraction of the configured initial capacity.
shardInitialCapcity := opts.InitialCapacity()
if shardInitialCapcity > 0 {
shardInitialCapcity /= concurrency
}
shardUniqueFields := make([][]byte, 0, shardInitialCapcity)
b.uniqueFields = append(b.uniqueFields, shardUniqueFields)
}

return b, nil
Expand All @@ -122,10 +131,12 @@ func (b *builder) Reset(offset postings.ID) {
}

// Reset the unique fields slice
for i := range b.uniqueFields {
b.uniqueFields[i] = nil
for i, shardUniqueFields := range b.uniqueFields {
for i := range shardUniqueFields {
shardUniqueFields[i] = nil
}
b.uniqueFields[i] = shardUniqueFields[:0]
}
b.uniqueFields = b.uniqueFields[:0]
}

func (b *builder) Insert(d doc.Document) ([]byte, error) {
Expand Down Expand Up @@ -207,16 +218,17 @@ func (b *builder) index(
batchErr *index.BatchPartialError,
) {
// Do-nothing if we are already closed to avoid send on closed panics.
if !b.closed.Load() {
if b.closed.Load() {
return
}
b.wg.Add(1)
// NB(bodu): To avoid locking inside of the terms, we shard the work
// by field name.
shard := xxhash.Sum64(f.Name) % uint64(len(b.indexQueues))
shard := int(xxhash.Sum64(f.Name) % uint64(len(b.indexQueues)))
b.indexQueues[shard] <- indexJob{
id: id,
field: f,
shard: shard,
idx: i,
batchErr: batchErr,
}
Expand All @@ -226,11 +238,17 @@ func (b *builder) indexWorker(indexQueue chan indexJob) {
for job := range indexQueue {
terms, ok := b.fields.Get(job.field.Name)
if !ok {
terms = newTerms(b.opts)
b.fields.SetUnsafe(job.field.Name, terms, fieldsMapSetUnsafeOptions{
NoCopyKey: true,
NoFinalizeKey: true,
})
b.Lock()
// NB(bodu): Check again within the lock to make sure we aren't making concurrent map writes.
terms, ok = b.fields.Get(job.field.Name)
if !ok {
terms = newTerms(b.opts)
b.fields.SetUnsafe(job.field.Name, terms, fieldsMapSetUnsafeOptions{
NoCopyKey: true,
NoFinalizeKey: true,
})
}
b.Unlock()
}

// If empty field, track insertion of this key into the fields
Expand All @@ -241,9 +259,7 @@ func (b *builder) indexWorker(indexQueue chan indexJob) {
job.batchErr.AddWithLock(index.BatchError{Err: err, Idx: job.idx})
}
if newField {
b.Lock()
b.uniqueFields = append(b.uniqueFields, job.field.Name)
b.Unlock()
b.uniqueFields[job.shard] = append(b.uniqueFields[job.shard], job.field.Name)
}
b.wg.Done()
}
Expand Down
69 changes: 59 additions & 10 deletions src/m3ninx/index/segment/builder/bytes_slice_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,32 @@ import (
"sort"

"github.com/m3db/m3/src/m3ninx/index/segment"

"github.com/twotwotwo/sorts"
)

// OrderedBytesSliceIter is a new ordered bytes slice iterator.
type OrderedBytesSliceIter struct {
err error
done bool

currentIdx int
current []byte
backingSlice [][]byte
currentIdx int
current []byte
backingSlices *sortableSliceOfSliceOfByteSlices
}

var _ segment.FieldsIterator = &OrderedBytesSliceIter{}

// NewOrderedBytesSliceIter sorts a slice of bytes and then
// returns an iterator over them.
func NewOrderedBytesSliceIter(
maybeUnorderedSlice [][]byte,
maybeUnorderedSlices [][][]byte,
) *OrderedBytesSliceIter {
sortSliceOfByteSlices(maybeUnorderedSlice)
sortable := &sortableSliceOfSliceOfByteSlices{data: maybeUnorderedSlices}
sorts.ByBytes(sortable)
return &OrderedBytesSliceIter{
currentIdx: -1,
backingSlice: maybeUnorderedSlice,
currentIdx: -1,
backingSlices: sortable,
}
}

Expand All @@ -57,11 +60,12 @@ func (b *OrderedBytesSliceIter) Next() bool {
return false
}
b.currentIdx++
if b.currentIdx >= len(b.backingSlice) {
if b.currentIdx >= b.backingSlices.Len() {
b.done = true
return false
}
b.current = b.backingSlice[b.currentIdx]
iOuter, iInner := b.backingSlices.getIndices(b.currentIdx)
b.current = b.backingSlices.data[iOuter][iInner]
return true
}

Expand All @@ -77,7 +81,7 @@ func (b *OrderedBytesSliceIter) Err() error {

// Len returns the length of the slice.
func (b *OrderedBytesSliceIter) Len() int {
return len(b.backingSlice)
return b.backingSlices.Len()
}

// Close releases resources.
Expand All @@ -86,6 +90,51 @@ func (b *OrderedBytesSliceIter) Close() error {
return nil
}

type sortableSliceOfSliceOfByteSlices struct {
data [][][]byte
length *int
}

func (s *sortableSliceOfSliceOfByteSlices) Len() int {
if s.length != nil {
return *s.length
}

totalLen := 0
for _, innerSlice := range s.data {
totalLen += len(innerSlice)
}
s.length = &totalLen

return *s.length
}

func (s *sortableSliceOfSliceOfByteSlices) Less(i, j int) bool {
iOuter, iInner := s.getIndices(i)
jOuter, jInner := s.getIndices(j)
return bytes.Compare(s.data[iOuter][iInner], s.data[jOuter][jInner]) < 0
}

func (s *sortableSliceOfSliceOfByteSlices) Swap(i, j int) {
iOuter, iInner := s.getIndices(i)
jOuter, jInner := s.getIndices(j)
s.data[iOuter][iInner], s.data[jOuter][jInner] = s.data[jOuter][jInner], s.data[iOuter][iInner]
}

func (s *sortableSliceOfSliceOfByteSlices) Key(i int) []byte {
iOuter, iInner := s.getIndices(i)
return s.data[iOuter][iInner]
}

func (s *sortableSliceOfSliceOfByteSlices) getIndices(idx int) (int, int) {
currentSliceIdx := 0
for idx >= len(s.data[currentSliceIdx]) {
idx -= len(s.data[currentSliceIdx])
currentSliceIdx++
}
return currentSliceIdx, idx
}

func sortSliceOfByteSlices(b [][]byte) {
sort.Slice(b, func(i, j int) bool {
return bytes.Compare(b[i], b[j]) < 0
Expand Down

0 comments on commit 6a95d08

Please sign in to comment.