Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

db: genericize rangeDel{Cache,Frags} into keySpan{Cache,Frags} #1356

Merged
merged 1 commit into from
Nov 1, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 60 additions & 40 deletions mem_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type memTable struct {
// applied. The memtable cannot be flushed to disk until the writer refs
// drops to zero.
writerRefs int32
tombstones rangeTombstoneCache
tombstones keySpanCache
// The current logSeqNum at the time the memtable was created. This is
// guaranteed to be less than or equal to any seqnum stored in the memtable.
logSeqNum uint64
Expand Down Expand Up @@ -116,6 +116,11 @@ func newMemTable(opts memTableOptions) *memTable {
writerRefs: 1,
logSeqNum: opts.logSeqNum,
}
m.tombstones = keySpanCache{
cmp: m.cmp,
formatKey: m.formatKey,
skl: &m.rangeDelSkl,
}

if m.arenaBuf == nil {
m.arenaBuf = make([]byte, opts.size)
Expand Down Expand Up @@ -236,7 +241,7 @@ func (m *memTable) newFlushIter(o *IterOptions, bytesFlushed *uint64) internalIt
}

func (m *memTable) newRangeDelIter(*IterOptions) internalIterator {
tombstones := m.tombstones.get(m)
tombstones := m.tombstones.get()
if tombstones == nil {
return nil
}
Expand Down Expand Up @@ -271,69 +276,84 @@ func (m *memTable) empty() bool {
return m.skl.Size() == memTableEmptySize
}

// A rangeTombstoneFrags holds a set of fragmented range tombstones generated
// at a particular "sequence number" for a memtable. Rather than use actual
// sequence numbers, this cache uses a count of the number of range tombstones
// in the memTable. Note that the count of range tombstones in a memTable only
// ever increases, which provides a monotonically increasing sequence.
type rangeTombstoneFrags struct {
count uint32
once sync.Once
tombstones []keyspan.Span
// A keySpanFrags holds a set of fragmented keyspan.Spans with a particular key
// kind at a particular moment for a memtable.
//
// When a new span of a particular kind is added to the memtable, it may overlap
// with other spans of the same kind. Instead of performing the fragmentation
// whenever an iterator requires it, fragments are cached within a keySpanCache
// type. The keySpanCache uses keySpanFrags to hold the cached fragmented spans.
//
// The count of keys (and keys of any given kind) in a memtable only
// monotonically increases. The count of key spans of a particular kind is used
// as a stand-in for a 'sequence number'. A keySpanFrags represents the
// fragmented state of the memtable's keys of a given kind at the moment while
// there existed `count` keys of that kind in the memtable.
//
// It's currently only used to contain fragmented range deletion tombstones.
type keySpanFrags struct {
count uint32
once sync.Once
spans []keyspan.Span
}

// get retrieves the fragmented tombstones, populating them if necessary. Note
// that the populated tombstone fragments may be built from more than f.count
// memTable range tombstones, but that is ok for correctness. All we're
// requiring is that the memTable contains at least f.count range
// tombstones. This situation can occur if there are multiple concurrent
// additions of range tombstones and a concurrent reader. The reader can load a
// tombstoneFrags and populate it even though is has been invalidated
// (i.e. replaced with a newer tombstoneFrags).
func (f *rangeTombstoneFrags) get(m *memTable) []keyspan.Span {
// get retrieves the fragmented spans, populating them if necessary. Note that
// the populated span fragments may be built from more than f.count memTable
// spans, but that is ok for correctness. All we're requiring is that the
// memTable contains at least f.count keys of the configured kind. This
// situation can occur if there are multiple concurrent additions of the key
// kind and a concurrent reader. The reader can load a keySpanFrags and populate
// it even though is has been invalidated (i.e. replaced with a newer
// keySpanFrags).
func (f *keySpanFrags) get(
skl *arenaskl.Skiplist, cmp Compare, formatKey base.FormatKey,
) []keyspan.Span {
f.once.Do(func() {
frag := &keyspan.Fragmenter{
Cmp: m.cmp,
Format: m.formatKey,
Cmp: cmp,
Format: formatKey,
Emit: func(fragmented []keyspan.Span) {
f.tombstones = append(f.tombstones, fragmented...)
f.spans = append(f.spans, fragmented...)
},
}
it := m.rangeDelSkl.NewIter(nil, nil)
it := skl.NewIter(nil, nil)
for key, val := it.First(); key != nil; key, val = it.Next() {
frag.Add(*key, val)
}
frag.Finish()
})
return f.tombstones
return f.spans
}

// A rangeTombstoneCache is used to cache a set of fragmented tombstones. The
// cache is invalidated whenever a tombstone is added to a memTable, and
// populated when empty when a range-del iterator is created.
type rangeTombstoneCache struct {
count uint32
frags unsafe.Pointer
// A keySpanCache is used to cache a set of fragmented spans. The cache is
// invalidated whenever a key of the same kind is added to a memTable, and
// populated when empty when a span iterator of that key kind is created.
type keySpanCache struct {
count uint32
frags unsafe.Pointer
cmp Compare
formatKey base.FormatKey
skl *arenaskl.Skiplist
}

// Invalidate the current set of cached tombstones, indicating the number of
// tombstones that were added.
func (c *rangeTombstoneCache) invalidate(count uint32) {
// Invalidate the current set of cached spans, indicating the number of
// spans that were added.
func (c *keySpanCache) invalidate(count uint32) {
newCount := atomic.AddUint32(&c.count, count)
var frags *rangeTombstoneFrags
var frags *keySpanFrags

for {
oldPtr := atomic.LoadPointer(&c.frags)
if oldPtr != nil {
oldFrags := (*rangeTombstoneFrags)(oldPtr)
oldFrags := (*keySpanFrags)(oldPtr)
if oldFrags.count >= newCount {
// Someone else invalidated the cache before us and their invalidation
// subsumes ours.
break
}
}
if frags == nil {
frags = &rangeTombstoneFrags{count: newCount}
frags = &keySpanFrags{count: newCount}
}
if atomic.CompareAndSwapPointer(&c.frags, oldPtr, unsafe.Pointer(frags)) {
// We successfully invalidated the cache.
Expand All @@ -343,10 +363,10 @@ func (c *rangeTombstoneCache) invalidate(count uint32) {
}
}

func (c *rangeTombstoneCache) get(m *memTable) []keyspan.Span {
frags := (*rangeTombstoneFrags)(atomic.LoadPointer(&c.frags))
func (c *keySpanCache) get() []keyspan.Span {
frags := (*keySpanFrags)(atomic.LoadPointer(&c.frags))
if frags == nil {
return nil
}
return frags.get(m)
return frags.get(c.skl, c.cmp, c.formatKey)
}