Skip to content

Commit

Permalink
storage: use size-carrying point tombstones
Browse files Browse the repository at this point in the history
Adapt calls to ClearMVCC, ClearUnversioned, ClearEngineKey and ClearIntent to
write a new kind of point tombstone exposed by Pebble that carries the size of
the deleted kv pair. These sizes are used by Pebble to produce more accurate
estimations of space amplification within the database and pick more productive
compactions.

In point-tombstone/hetereogeneous-value-sizes roachtest runs, the kv database's
approximate disk size is reduced to 2.3 GiB by the end of the run.

Epic: CRDB-25405
Release note (performance improvement): Improve the time to disk space
reclamation when deleting rows. Previously, in scenarios where rows had large
variations in row size, it was possible for disk space to not be reclaimed
after MVCC garbage collection deleted the rows.
  • Loading branch information
jbowens committed Jun 20, 2023
1 parent 41494d1 commit 5012c4c
Show file tree
Hide file tree
Showing 25 changed files with 354 additions and 113 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -300,4 +300,4 @@ trace.opentelemetry.collector string address of an OpenTelemetry trace collecto
trace.snapshot.rate duration 0s if non-zero, interval at which background trace snapshots are captured tenant-rw
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez tenant-rw
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used. tenant-rw
version version 1000023.1-10 set the active cluster version in the format '<major>.<minor>' tenant-rw
version version 1000023.1-14 set the active cluster version in the format '<major>.<minor>' tenant-rw
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,6 @@
<tr><td><div id="setting-trace-snapshot-rate" class="anchored"><code>trace.snapshot.rate</code></div></td><td>duration</td><td><code>0s</code></td><td>if non-zero, interval at which background trace snapshots are captured</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-trace-span-registry-enabled" class="anchored"><code>trace.span_registry.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://&lt;ui&gt;/#/debug/tracez</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-trace-zipkin-collector" class="anchored"><code>trace.zipkin.collector</code></div></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 9411 will be used.</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000023.1-10</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000023.1-14</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
</tbody>
</table>
19 changes: 19 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,17 @@ const (
// that (optionally) embed below-raft admission data.
V23_2_UseACRaftEntryEntryEncodings

// V23_2_PebbleFormatDeleteSizedAndObsolete upgrades Pebble's format major
// version to FormatDeleteSizedAndObsolete, allowing use of a new sstable
// format version Pebblev4. This version has two improvements:
// a) It allows the use of DELSIZED point tombstones.
// b) It encodes the obsolence of keys in a key-kind bit.
V23_2_PebbleFormatDeleteSizedAndObsolete

// V23_2_UseSizedPebblePointTombstones enables the use of Pebble's new
// DeleteSized operations.
V23_2_UseSizedPebblePointTombstones

// *************************************************
// Step (1) Add new versions here.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -943,6 +954,14 @@ var rawVersionsSingleton = keyedVersions{
Key: V23_2_UseACRaftEntryEntryEncodings,
Version: roachpb.Version{Major: 23, Minor: 1, Internal: 10},
},
{
Key: V23_2_PebbleFormatDeleteSizedAndObsolete,
Version: roachpb.Version{Major: 23, Minor: 1, Internal: 12},
},
{
Key: V23_2_UseSizedPebblePointTombstones,
Version: roachpb.Version{Major: 23, Minor: 1, Internal: 14},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
10 changes: 2 additions & 8 deletions pkg/cmd/roachtest/tests/tombstones.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,6 @@ import (
// registerPointTombstone registers the point tombstone test.
func registerPointTombstone(r registry.Registry) {
r.Add(registry.TestSpec{
Skip: "pebble#2340",
SkipDetails: "This roachtest is implemented ahead of implementing and using " +
"pebble#2340 within Cockroach. Currently, this roachtest fails through " +
"a timeout because the disk space corresponding to the large KVs is " +
"never reclaimed. Once pebble#2340 is integrated into Cockroach, we " +
"expect this to begin passing, and we can un-skip it.",
Name: "point-tombstone/heterogeneous-value-sizes",
Owner: registry.OwnerStorage,
Cluster: r.MakeClusterSpec(4),
Expand Down Expand Up @@ -136,7 +130,7 @@ func registerPointTombstone(r registry.Registry) {
require.LessOrEqual(t, statsAfterDeletes.livePercentage, 0.10)

// Wait for garbage collection to delete the non-live data.
targetSize := uint64(2 << 30) /* 2 GB */
targetSize := uint64(3 << 30) /* 3 GiB */
t.Status("waiting for garbage collection and compaction to reduce on-disk size to ", humanize.IBytes(targetSize))
m = c.NewMonitor(ctx, c.Range(1, 3))
m.Go(func(ctx context.Context) error {
Expand Down Expand Up @@ -172,7 +166,7 @@ type tableSizeInfo struct {
}

func (info tableSizeInfo) String() string {
return fmt.Sprintf("databaseID: %d, tableID: %d, rangeCount: %d, approxDiskBytes: %s, liveBytes: %s, totalBytes: %s, livePercentage: %.1f",
return fmt.Sprintf("databaseID: %d, tableID: %d, rangeCount: %d, approxDiskBytes: %s, liveBytes: %s, totalBytes: %s, livePercentage: %.2f",
info.databaseID,
info.tableID,
info.rangeCount,
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/batch_spanset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestSpanSetBatchBoundaries(t *testing.T) {
}

t.Run("writes before range", func(t *testing.T) {
if err := batch.ClearUnversioned(outsideKey.Key); !isWriteSpanErr(err) {
if err := batch.ClearUnversioned(outsideKey.Key, storage.ClearOptions{}); !isWriteSpanErr(err) {
t.Errorf("ClearUnversioned: unexpected error %v", err)
}
if err := batch.ClearRawRange(outsideKey.Key, outsideKey2.Key, true, true); !isWriteSpanErr(err) {
Expand All @@ -93,7 +93,7 @@ func TestSpanSetBatchBoundaries(t *testing.T) {
})

t.Run("writes after range", func(t *testing.T) {
if err := batch.ClearUnversioned(outsideKey3.Key); !isWriteSpanErr(err) {
if err := batch.ClearUnversioned(outsideKey3.Key, storage.ClearOptions{}); !isWriteSpanErr(err) {
t.Errorf("ClearUnversioned: unexpected error %v", err)
}
if err := batch.ClearRawRange(insideKey2.Key, outsideKey4.Key, true, true); !isWriteSpanErr(err) {
Expand Down Expand Up @@ -303,7 +303,7 @@ func TestSpanSetBatchTimestamps(t *testing.T) {
}

for _, batch := range []storage.Batch{batchBefore, batchNonMVCC} {
if err := batch.ClearUnversioned(wkey.Key); !isWriteSpanErr(err) {
if err := batch.ClearUnversioned(wkey.Key, storage.ClearOptions{}); !isWriteSpanErr(err) {
t.Errorf("ClearUnversioned: unexpected error %v", err)
}
{
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/kvstorage/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ func LoadAndReconcileReplicas(ctx context.Context, eng storage.Engine) ([]Replic
// TODO(tbg): if clearRangeData were in this package we could destroy more
// effectively even if for some reason we had in the past written state
// other than the HardState here (not supposed to happen, but still).
if err := eng.ClearUnversioned(logstore.NewStateLoader(repl.RangeID).RaftHardStateKey()); err != nil {
if err := eng.ClearUnversioned(logstore.NewStateLoader(repl.RangeID).RaftHardStateKey(), storage.ClearOptions{}); err != nil {
return nil, errors.Wrapf(err, "removing HardState for r%d", repl.RangeID)
}
log.Eventf(ctx, "removed legacy uninitialized replica for r%s", repl.RangeID)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/loqrecovery/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func RegisterOfflineRecoveryEvents(
continue
}
if removeEvent {
if err := readWriter.ClearUnversioned(iter.UnsafeKey().Key); err != nil {
if err := readWriter.ClearUnversioned(iter.UnsafeKey().Key, storage.ClearOptions{}); err != nil {
processingErrors = errors.CombineErrors(processingErrors, errors.Wrapf(
err, "failed to delete replica recovery record at key %s", iter.UnsafeKey()))
continue
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -2479,7 +2479,10 @@ func handleTruncatedStateBelowRaftPreApply(
// avoid allocating when constructing Raft log keys (16 bytes).
prefix := prefixBuf.RaftLogPrefix()
for idx := currentTruncatedState.Index + 1; idx <= suggestedTruncatedState.Index; idx++ {
if err := readWriter.ClearUnversioned(keys.RaftLogKeyFromPrefix(prefix, idx)); err != nil {
if err := readWriter.ClearUnversioned(
keys.RaftLogKeyFromPrefix(prefix, idx),
storage.ClearOptions{},
); err != nil {
return false, errors.Wrapf(err, "unable to clear truncated Raft entries for %+v at index %d",
suggestedTruncatedState, idx)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1824,7 +1824,7 @@ func TestOptimizePuts(t *testing.T) {
require.NoError(t, tc.engine.ClearMVCCRangeKey(storage.MVCCRangeKey{
StartKey: c.exKey, EndKey: c.exEndKey, Timestamp: hlc.MinTimestamp}))
} else if c.exKey != nil {
require.NoError(t, tc.engine.ClearUnversioned(c.exKey))
require.NoError(t, tc.engine.ClearUnversioned(c.exKey, storage.ClearOptions{}))
}
}
}
Expand Down
21 changes: 13 additions & 8 deletions pkg/kv/kvserver/spanset/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,11 @@ func (i *EngineIterator) Value() ([]byte, error) {
return i.i.Value()
}

// ValueLen is part of the storage.EngineIterator interface.
func (i *EngineIterator) ValueLen() int {
return i.i.ValueLen()
}

// UnsafeRawEngineKey is part of the storage.EngineIterator interface.
func (i *EngineIterator) UnsafeRawEngineKey() []byte {
return i.i.UnsafeRawEngineKey()
Expand Down Expand Up @@ -522,34 +527,34 @@ func (s spanSetWriter) checkAllowed(key roachpb.Key) error {
return nil
}

func (s spanSetWriter) ClearMVCC(key storage.MVCCKey) error {
func (s spanSetWriter) ClearMVCC(key storage.MVCCKey, opts storage.ClearOptions) error {
if err := s.checkAllowed(key.Key); err != nil {
return err
}
return s.w.ClearMVCC(key)
return s.w.ClearMVCC(key, opts)
}

func (s spanSetWriter) ClearUnversioned(key roachpb.Key) error {
func (s spanSetWriter) ClearUnversioned(key roachpb.Key, opts storage.ClearOptions) error {
if err := s.checkAllowed(key); err != nil {
return err
}
return s.w.ClearUnversioned(key)
return s.w.ClearUnversioned(key, opts)
}

func (s spanSetWriter) ClearIntent(
key roachpb.Key, txnDidNotUpdateMeta bool, txnUUID uuid.UUID,
key roachpb.Key, txnDidNotUpdateMeta bool, txnUUID uuid.UUID, opts storage.ClearOptions,
) error {
if err := s.checkAllowed(key); err != nil {
return err
}
return s.w.ClearIntent(key, txnDidNotUpdateMeta, txnUUID)
return s.w.ClearIntent(key, txnDidNotUpdateMeta, txnUUID, opts)
}

func (s spanSetWriter) ClearEngineKey(key storage.EngineKey) error {
func (s spanSetWriter) ClearEngineKey(key storage.EngineKey, opts storage.ClearOptions) error {
if err := s.spans.CheckAllowed(SpanReadWrite, roachpb.Span{Key: key.Key}); err != nil {
return err
}
return s.w.ClearEngineKey(key)
return s.w.ClearEngineKey(key, opts)
}

func (s spanSetWriter) SingleClearEngineKey(key storage.EngineKey) error {
Expand Down
49 changes: 39 additions & 10 deletions pkg/storage/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package storage
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"reflect"
"strconv"
Expand Down Expand Up @@ -78,7 +79,7 @@ func testBatchBasics(t *testing.T, writeOnly bool, commit func(e Engine, b Write

// Write an engine value to be deleted.
require.NoError(t, e.PutUnversioned(mvccKey("b").Key, []byte("value")))
require.NoError(t, b.ClearUnversioned(mvccKey("b").Key))
require.NoError(t, b.ClearUnversioned(mvccKey("b").Key, ClearOptions{}))

// Write an engine value to be merged.
require.NoError(t, e.PutUnversioned(mvccKey("c").Key, appender("foo")))
Expand All @@ -91,12 +92,25 @@ func testBatchBasics(t *testing.T, writeOnly bool, commit func(e Engine, b Write
require.NoError(t, e.PutUnversioned(mvccKey("d").Key, []byte("before")))
require.NoError(t, b.SingleClearEngineKey(EngineKey{Key: mvccKey("d").Key}))

// Write a MVCC value to be deleted with a known value size.
keyF := mvccKey("f")
keyF.Timestamp.WallTime = 1
valueF := MVCCValue{Value: roachpb.Value{RawBytes: []byte("fvalue")}}
encodedValueF, err := EncodeMVCCValue(valueF)
require.NoError(t, err)
require.NoError(t, e.PutMVCC(keyF, valueF))
require.NoError(t, b.ClearMVCC(keyF, ClearOptions{
ValueSizeKnown: true,
ValueSize: uint32(len(encodedValueF)),
}))

// Check all keys are in initial state (nothing from batch has gone
// through to engine until commit).
expValues := []MVCCKeyValue{
{Key: mvccKey("b"), Value: []byte("value")},
{Key: mvccKey("c"), Value: appender("foo")},
{Key: mvccKey("d"), Value: []byte("before")},
{Key: keyF, Value: encodedValueF},
}
kvs, err := Scan(e, localMax, roachpb.KeyMax, 0)
require.NoError(t, err)
Expand Down Expand Up @@ -199,7 +213,7 @@ func TestReadOnlyBasics(t *testing.T) {
// For a read-only ReadWriter, all Writer methods should panic.
failureTestCases := []func(){
func() { _ = ro.ApplyBatchRepr(nil, false) },
func() { _ = ro.ClearUnversioned(a.Key) },
func() { _ = ro.ClearUnversioned(a.Key, ClearOptions{}) },
func() { _ = ro.SingleClearEngineKey(EngineKey{Key: a.Key}) },
func() { _ = ro.ClearRawRange(a.Key, a.Key, true, true) },
func() { _ = ro.Merge(a, nil) },
Expand All @@ -215,7 +229,7 @@ func TestReadOnlyBasics(t *testing.T) {
if err := e.PutUnversioned(mvccKey("b").Key, []byte("value")); err != nil {
t.Fatal(err)
}
if err := e.ClearUnversioned(mvccKey("b").Key); err != nil {
if err := e.ClearUnversioned(mvccKey("b").Key, ClearOptions{}); err != nil {
t.Fatal(err)
}
if err := e.PutUnversioned(mvccKey("c").Key, appender("foo")); err != nil {
Expand Down Expand Up @@ -249,13 +263,17 @@ func TestReadOnlyBasics(t *testing.T) {
func TestBatchRepr(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
// Disable metamorphism in the value-encoding; our asserts include the
// length of the encoded value to test delete-sized.

DisableMetamorphicSimpleValueEncoding(t)
testBatchBasics(t, false /* writeOnly */, func(e Engine, b WriteBatch) error {
repr := b.Repr()

r, err := NewBatchReader(repr)
require.NoError(t, err)

const expectedCount = 5
const expectedCount = 6
require.Equal(t, expectedCount, r.Count())
count, err := BatchCount(repr)
require.NoError(t, err)
Expand All @@ -267,6 +285,9 @@ func TestBatchRepr(t *testing.T) {
switch r.KeyKind() {
case pebble.InternalKeyKindDelete:
ops = append(ops, fmt.Sprintf("delete(%s)", string(r.Key())))
case pebble.InternalKeyKindDeleteSized:
v, _ := binary.Uvarint(r.Value())
ops = append(ops, fmt.Sprintf("delete-sized(%s,%d)", string(r.Key()), v))
case pebble.InternalKeyKindSet:
ops = append(ops, fmt.Sprintf("put(%s,%s)", string(r.Key()), string(r.Value())))
case pebble.InternalKeyKindMerge:
Expand All @@ -287,6 +308,7 @@ func TestBatchRepr(t *testing.T) {
"merge(c\x00)",
"put(e\x00,)",
"single_delete(d\x00)",
"delete-sized(f\x00\x00\x00\x00\x00\x00\x00\x00\x01\t,17)",
}
require.Equal(t, expOps, ops)

Expand Down Expand Up @@ -383,7 +405,7 @@ func TestBatchGet(t *testing.T) {
if err := b.PutUnversioned(mvccKey("a").Key, []byte("value")); err != nil {
t.Fatal(err)
}
if err := b.ClearUnversioned(mvccKey("b").Key); err != nil {
if err := b.ClearUnversioned(mvccKey("b").Key, ClearOptions{}); err != nil {
t.Fatal(err)
}
if err := b.Merge(mvccKey("c"), appender("bar")); err != nil {
Expand Down Expand Up @@ -435,7 +457,7 @@ func TestBatchMerge(t *testing.T) {
if err := b.PutUnversioned(mvccKey("a").Key, appender("a-value")); err != nil {
t.Fatal(err)
}
if err := b.ClearUnversioned(mvccKey("b").Key); err != nil {
if err := b.ClearUnversioned(mvccKey("b").Key, ClearOptions{}); err != nil {
t.Fatal(err)
}
if err := b.Merge(mvccKey("c"), appender("c-value")); err != nil {
Expand Down Expand Up @@ -578,7 +600,10 @@ func TestBatchScanWithDelete(t *testing.T) {
if err := e.PutUnversioned(mvccKey("a").Key, []byte("value")); err != nil {
t.Fatal(err)
}
if err := b.ClearUnversioned(mvccKey("a").Key); err != nil {
if err := b.ClearUnversioned(mvccKey("a").Key, ClearOptions{
ValueSizeKnown: true,
ValueSize: uint32(len("value")),
}); err != nil {
t.Fatal(err)
}
kvs, err := Scan(b, localMax, roachpb.KeyMax, 0)
Expand Down Expand Up @@ -611,7 +636,10 @@ func TestBatchScanMaxWithDeleted(t *testing.T) {
t.Fatal(err)
}
// Now, delete "a" in batch.
if err := b.ClearUnversioned(mvccKey("a").Key); err != nil {
if err := b.ClearUnversioned(mvccKey("a").Key, ClearOptions{
ValueSizeKnown: true,
ValueSize: uint32(len("value1")),
}); err != nil {
t.Fatal(err)
}
// A scan with max=1 should scan "b".
Expand Down Expand Up @@ -862,7 +890,8 @@ func TestDecodeKey(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

e, err := Open(context.Background(), InMemory(), cluster.MakeClusterSettings(), CacheSize(1<<20 /* 1 MiB */))
e, err := Open(context.Background(), InMemory(),
cluster.MakeTestingClusterSettings(), CacheSize(1<<20 /* 1 MiB */))
assert.NoError(t, err)
defer e.Close()

Expand Down Expand Up @@ -924,7 +953,7 @@ func TestBatchReader(t *testing.T) {
require.NoError(t, b.PutEngineRangeKey(roachpb.Key("rangeFrom"), roachpb.Key("rangeTo"), []byte{7}, []byte("engineRangeKey")))

// Clear some already empty keys.
require.NoError(t, b.ClearMVCC(pointKey("mvccKey", 9)))
require.NoError(t, b.ClearMVCC(pointKey("mvccKey", 9), ClearOptions{}))
require.NoError(t, b.ClearMVCCRangeKey(rangeKey("rangeFrom", "rangeTo", 9)))
require.NoError(t, b.ClearRawRange(roachpb.Key("clearFrom"), roachpb.Key("clearTo"), true, true))

Expand Down
Loading

0 comments on commit 5012c4c

Please sign in to comment.