Skip to content

Commit

Permalink
db: document memTable.writerRefs subtlety
Browse files Browse the repository at this point in the history
Previously it was not obvious why memTable.availBytes() could reset the
memtable's reserved size when memTable.writerRefs() == 1. In #3557, a code
reading lead us to believe there was a race where competing writers could
improperly double reserve the tail of the memtable's arena. However, in the
code reading we missed that the mutable memtable always has a writerRefs()≥1,
which is why a writerRefs()==1 indicates there are no concurrent applications
to the memtable.

Additionally, add a unit test that exercises the race of two batches competing
for the tail of the memtable arena.

Close #3557.
  • Loading branch information
jbowens committed May 3, 2024
1 parent 16c5635 commit 94cfeb2
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 4 deletions.
30 changes: 27 additions & 3 deletions data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,24 @@ func writeRangeKeys(b io.Writer, iter *Iterator) {
}
}

func parseValue(s string) []byte {
if strings.HasPrefix(s, "<rand-bytes=") {
s = strings.TrimPrefix(s, "<rand-bytes=")
s = strings.TrimSuffix(s, ">")
n, err := strconv.Atoi(s)
if err != nil {
panic(err)
}
b := make([]byte, n)
rnd := rand.New(rand.NewSource(int64(n)))
if _, err := rnd.Read(b); err != nil {
panic(err)
}
return b
}
return []byte(s)
}

func runBatchDefineCmd(d *datadriven.TestData, b *Batch) error {
for _, line := range strings.Split(d.Input, "\n") {
parts := strings.Fields(line)
Expand All @@ -408,7 +426,7 @@ func runBatchDefineCmd(d *datadriven.TestData, b *Batch) error {
if len(parts) != 3 {
return errors.Errorf("%s expects 2 arguments", parts[0])
}
err = b.Set([]byte(parts[1]), []byte(parts[2]), nil)
err = b.Set([]byte(parts[1]), parseValue(parts[2]), nil)
case "del":
if len(parts) != 2 {
return errors.Errorf("%s expects 1 argument", parts[0])
Expand Down Expand Up @@ -438,14 +456,14 @@ func runBatchDefineCmd(d *datadriven.TestData, b *Batch) error {
if len(parts) != 3 {
return errors.Errorf("%s expects 2 arguments", parts[0])
}
err = b.Merge([]byte(parts[1]), []byte(parts[2]), nil)
err = b.Merge([]byte(parts[1]), parseValue(parts[2]), nil)
case "range-key-set":
if len(parts) < 4 || len(parts) > 5 {
return errors.Errorf("%s expects 3 or 4 arguments", parts[0])
}
var val []byte
if len(parts) == 5 {
val = []byte(parts[4])
val = parseValue(parts[4])
}
err = b.RangeKeySet(
[]byte(parts[1]),
Expand Down Expand Up @@ -795,6 +813,12 @@ func runDBDefineCmdReuseFS(td *datadriven.TestData, opts *Options) (*DB, error)
}
levelMaxBytes[level] = size
}
case "memtable-size":
memTableSize, err := strconv.ParseUint(arg.Vals[0], 10, 64)
if err != nil {
return nil, err
}
opts.MemTableSize = memTableSize
case "auto-compactions":
switch arg.Vals[0] {
case "off":
Expand Down
20 changes: 19 additions & 1 deletion db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2180,9 +2180,27 @@ func TestDeterminism(t *testing.T) {
d.opts.DisableAutomaticCompactions = true
return ""
})
case "memtable-info":
return addStep(td, func(td *datadriven.TestData) string {
d.commit.mu.Lock()
defer d.commit.mu.Unlock()
d.mu.Lock()
defer d.mu.Unlock()
var buf bytes.Buffer
fmt.Fprintf(&buf, "flushable queue: %d entries\n", len(d.mu.mem.queue))
fmt.Fprintf(&buf, "mutable:\n")
fmt.Fprintf(&buf, " alloced: %d\n", d.mu.mem.mutable.totalBytes())
if td.HasArg("reserved") {
fmt.Fprintf(&buf, " reserved: %d\n", d.mu.mem.mutable.reserved)
}
if td.HasArg("in-use") {
fmt.Fprintf(&buf, " in-use: %d\n", d.mu.mem.mutable.inuseBytes())
}
return buf.String()
})
case "run":
var mkfs func() vfs.FS = func() vfs.FS { return vfs.NewMem() }
var beforeStep func()
var beforeStep func() = func() {}
for _, cmdArg := range td.CmdArgs {
switch cmdArg.Key {
case "io-latency", "step-latency":
Expand Down
6 changes: 6 additions & 0 deletions mem_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,12 @@ func (m *memTable) containsRangeKeys() bool {
func (m *memTable) availBytes() uint32 {
a := m.skl.Arena()
if m.writerRefs.Load() == 1 {
// Note that one ref is maintained as long as the memtable is the
// current mutable memtable, so when evaluating whether the current
// mutable memtable has sufficient space for committing a batch, it is
// guaranteed that m.writerRefs() >= 1. This means a writerRefs() of 1
// indicates there are no other concurrent apply operations.
//
// If there are no other concurrent apply operations, we can update the
// reserved bytes setting to accurately reflect how many bytes of been
// allocated vs the over-estimation present in memTableEntrySize.
Expand Down
52 changes: 52 additions & 0 deletions testdata/determinism
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,55 @@ run io-latency=(.1,100µs) step-latency=(.2,5ms) count=10
sequential( 0:define 1:build 2:build parallel( 3:batch 4:batch 5:flush 6:maybe-compact 7:batch 8:ingest-and-excise 9:ingest-and-excise ) )
----
ok

# Run a test with two batches committing in parallel, competing for the
# remaining bytes of the mutable memtable. Only one of the batches can fit in
# the remaining bytes, and the other must rotate the memtable.

reset
----

define memtable-size=65536
----
0:define

memtable-info reserved in-use
----
flushable queue: 1 entries
mutable:
alloced: 65536
reserved: 1123
in-use: 0
1:memtable-info

batch
set a <rand-bytes=30000>
----
2:batch

batch
set b <rand-bytes=30000>
----
3:batch

memtable-info
----
flushable queue: 1 entries
mutable:
alloced: 65536
4:memtable-info

batch
set c <rand-bytes=3000>
----
5:batch

batch
set d <rand-bytes=3000>
----
6:batch

run count=100
sequential( 0:define 1:memtable-info 2:batch 3:batch 4:memtable-info parallel( 5:batch 6:batch ) )
----
ok

0 comments on commit 94cfeb2

Please sign in to comment.