Skip to content

Commit

Permalink
db: return error if manual compaction start >= end
Browse files Browse the repository at this point in the history
We use the user keys to construct InternalKeys that are
(start, InternalKeySeqNumMax, InternalKeyKindMax) and
(end, 0, 0)
so it is not meaningful to have start >= end since it
represents an empty interval. The current behavior
did actual work because of how the memtable overlap
code is written.

This came up in the code review for
cockroachdb/cockroach#57709
  • Loading branch information
sumeerbhola committed Dec 21, 2020
1 parent 4b1164f commit 4c4e043
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 15 deletions.
26 changes: 24 additions & 2 deletions compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2369,7 +2369,10 @@ func TestCompactFlushQueuedMemTable(t *testing.T) {
}
}

require.NoError(t, d.Compact([]byte("a"), []byte("a")))
require.NoError(t, d.Compact([]byte("a"), []byte("a\x00")))
d.mu.Lock()
require.Equal(t, 1, len(d.mu.mem.queue))
d.mu.Unlock()
require.NoError(t, d.Close())
}

Expand All @@ -2389,13 +2392,21 @@ func TestCompactFlushQueuedLargeBatch(t *testing.T) {
// queued but not automatically flushed.
d.mu.Lock()
d.largeBatchThreshold = d.opts.MemTableSize / 8
require.Equal(t, 1, len(d.mu.mem.queue))
d.mu.Unlock()

// Set a record with a large value. This will be transformed into a large
// batch and placed in the flushable queue.
require.NoError(t, d.Set([]byte("a"), bytes.Repeat([]byte("v"), d.largeBatchThreshold), nil))
d.mu.Lock()
require.Greater(t, len(d.mu.mem.queue), 1)
d.mu.Unlock()

require.NoError(t, d.Compact([]byte("a"), []byte("a\x00")))
d.mu.Lock()
require.Equal(t, 1, len(d.mu.mem.queue))
d.mu.Unlock()

require.NoError(t, d.Compact([]byte("a"), []byte("a")))
require.NoError(t, d.Close())
}

Expand Down Expand Up @@ -2483,3 +2494,14 @@ func TestAdjustGrandparentOverlapBytesForFlush(t *testing.T) {
})
}
}

func TestCompactionInvalidBounds(t *testing.T) {
db, err := Open("", &Options{
FS: vfs.NewMem(),
})
require.NoError(t, err)
defer db.Close()
require.NoError(t, db.Compact([]byte("a"), []byte("b")))
require.Error(t, db.Compact([]byte("a"), []byte("a")))
require.Error(t, db.Compact([]byte("b"), []byte("a")))
}
5 changes: 4 additions & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,10 @@ func (d *DB) Compact(
if d.opts.ReadOnly {
return ErrReadOnly
}

if d.cmp(start, end) >= 0 {
return errors.Errorf("Compact start %s is not less than end %s",
d.opts.Comparer.FormatKey(start), d.opts.Comparer.FormatKey(end))
}
iStart := base.MakeInternalKey(start, InternalKeySeqNumMax, InternalKeyKindMax)
iEnd := base.MakeInternalKey(end, 0, 0)
meta := []*fileMetadata{{Smallest: iStart, Largest: iEnd}}
Expand Down
2 changes: 1 addition & 1 deletion error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func TestErrors(t *testing.T) {
if err := d.Flush(); err != nil {
return err
}
if err := d.Compact(nil, nil); err != nil {
if err := d.Compact(nil, []byte("\xff")); err != nil {
return err
}

Expand Down
4 changes: 2 additions & 2 deletions open_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func TestOpenReadOnly(t *testing.T) {
require.NoError(t, err)

// Verify various write operations fail in read-only mode.
require.EqualValues(t, ErrReadOnly, d.Compact(nil, nil))
require.EqualValues(t, ErrReadOnly, d.Compact(nil, []byte("\xff")))
require.EqualValues(t, ErrReadOnly, d.Flush())
require.EqualValues(t, ErrReadOnly, func() error { _, err := d.AsyncFlush(); return err }())

Expand Down Expand Up @@ -672,7 +672,7 @@ func TestOpenWALReplayReadOnlySeqNums(t *testing.T) {
// written to the MANIFEST. This produces a MANIFEST where the `logSeqNum`
// is greater than the sequence numbers contained in the
// `minUnflushedLogNum` log file
require.NoError(t, d.Compact([]byte("a"), []byte("a")))
require.NoError(t, d.Compact([]byte("a"), []byte("a\x00")))

// While the MANIFEST is still in this state, copy all the files in the
// database to a new directory.
Expand Down
18 changes: 9 additions & 9 deletions range_del_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,15 +203,15 @@ func TestRangeDelCompactionTruncation(t *testing.T) {
require.NoError(t, d.DeleteRange([]byte("a"), []byte("d"), nil))

// Compact to produce the L1 tables.
require.NoError(t, d.Compact([]byte("c"), []byte("c")))
require.NoError(t, d.Compact([]byte("c"), []byte("c\x00")))
expectLSM(`
1:
000008:[a#3,RANGEDEL-b#72057594037927935,RANGEDEL]
000009:[b#3,RANGEDEL-d#72057594037927935,RANGEDEL]
`)

// Compact again to move one of the tables to L2.
require.NoError(t, d.Compact([]byte("c"), []byte("c")))
require.NoError(t, d.Compact([]byte("c"), []byte("c\x00")))
expectLSM(`
1:
000008:[a#3,RANGEDEL-b#72057594037927935,RANGEDEL]
Expand Down Expand Up @@ -260,7 +260,7 @@ func TestRangeDelCompactionTruncation(t *testing.T) {
// containing "c" will be compacted again with the L2 table creating two
// tables in L2. Lastly, the L2 table containing "c" will be compacted
// creating the L3 table.
require.NoError(t, d.Compact([]byte("c"), []byte("c")))
require.NoError(t, d.Compact([]byte("c"), []byte("c\x00")))
expectLSM(`
1:
000012:[a#3,RANGEDEL-b#72057594037927935,RANGEDEL]
Expand Down Expand Up @@ -329,15 +329,15 @@ func TestRangeDelCompactionTruncation2(t *testing.T) {
require.NoError(t, d.DeleteRange([]byte("a"), []byte("d"), nil))

// Compact to produce the L1 tables.
require.NoError(t, d.Compact([]byte("b"), []byte("b")))
require.NoError(t, d.Compact([]byte("b"), []byte("b\x00")))
expectLSM(`
6:
000008:[a#3,RANGEDEL-b#2,SET]
000009:[b#1,RANGEDEL-d#72057594037927935,RANGEDEL]
`)

require.NoError(t, d.Set([]byte("c"), bytes.Repeat([]byte("d"), 100), nil))
require.NoError(t, d.Compact([]byte("c"), []byte("c")))
require.NoError(t, d.Compact([]byte("c"), []byte("c\x00")))
expectLSM(`
6:
000012:[a#3,RANGEDEL-b#2,SET]
Expand Down Expand Up @@ -404,7 +404,7 @@ func TestRangeDelCompactionTruncation3(t *testing.T) {

// Compact a few times to move the tables down to L3.
for i := 0; i < 3; i++ {
require.NoError(t, d.Compact([]byte("b"), []byte("b")))
require.NoError(t, d.Compact([]byte("b"), []byte("b\x00")))
}
expectLSM(`
3:
Expand All @@ -414,15 +414,15 @@ func TestRangeDelCompactionTruncation3(t *testing.T) {

require.NoError(t, d.Set([]byte("c"), bytes.Repeat([]byte("d"), 100), nil))

require.NoError(t, d.Compact([]byte("c"), []byte("c")))
require.NoError(t, d.Compact([]byte("c"), []byte("c\x00")))
expectLSM(`
4:
000020:[a#3,RANGEDEL-b#2,SET]
000021:[b#1,RANGEDEL-c#72057594037927935,RANGEDEL]
000022:[c#4,SET-d#72057594037927935,RANGEDEL]
`)

require.NoError(t, d.Compact([]byte("c"), []byte("c")))
require.NoError(t, d.Compact([]byte("c"), []byte("c\x00")))
expectLSM(`
5:
000023:[a#3,RANGEDEL-b#2,SET]
Expand All @@ -434,7 +434,7 @@ func TestRangeDelCompactionTruncation3(t *testing.T) {
t.Fatalf("expected not found, but found %v", err)
}

require.NoError(t, d.Compact([]byte("a"), []byte("a")))
require.NoError(t, d.Compact([]byte("a"), []byte("a\x00")))
expectLSM(`
5:
000025:[c#4,SET-d#72057594037927935,RANGEDEL]
Expand Down

0 comments on commit 4c4e043

Please sign in to comment.