Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: use size-carrying point tombstones #104539

Merged
merged 1 commit into from
Jun 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -301,4 +301,4 @@ trace.opentelemetry.collector string address of an OpenTelemetry trace collecto
trace.snapshot.rate duration 0s if non-zero, interval at which background trace snapshots are captured tenant-rw
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez tenant-rw
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used. tenant-rw
version version 1000023.1-10 set the active cluster version in the format '<major>.<minor>' tenant-rw
version version 1000023.1-14 set the active cluster version in the format '<major>.<minor>' tenant-rw
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,6 @@
<tr><td><div id="setting-trace-snapshot-rate" class="anchored"><code>trace.snapshot.rate</code></div></td><td>duration</td><td><code>0s</code></td><td>if non-zero, interval at which background trace snapshots are captured</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-trace-span-registry-enabled" class="anchored"><code>trace.span_registry.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://&lt;ui&gt;/#/debug/tracez</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-trace-zipkin-collector" class="anchored"><code>trace.zipkin.collector</code></div></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 9411 will be used.</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000023.1-10</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000023.1-14</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
</tbody>
</table>
19 changes: 19 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,17 @@ const (
// that (optionally) embed below-raft admission data.
V23_2_UseACRaftEntryEntryEncodings

// V23_2_PebbleFormatDeleteSizedAndObsolete upgrades Pebble's format major
// version to FormatDeleteSizedAndObsolete, allowing use of a new sstable
// format version Pebblev4. This version has two improvements:
// a) It allows the use of DELSIZED point tombstones.
// b) It encodes the obsolence of keys in a key-kind bit.
V23_2_PebbleFormatDeleteSizedAndObsolete

// V23_2_UseSizedPebblePointTombstones enables the use of Pebble's new
// DeleteSized operations.
V23_2_UseSizedPebblePointTombstones

// *************************************************
// Step (1) Add new versions here.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -943,6 +954,14 @@ var rawVersionsSingleton = keyedVersions{
Key: V23_2_UseACRaftEntryEntryEncodings,
Version: roachpb.Version{Major: 23, Minor: 1, Internal: 10},
},
{
Key: V23_2_PebbleFormatDeleteSizedAndObsolete,
Version: roachpb.Version{Major: 23, Minor: 1, Internal: 12},
},
{
Key: V23_2_UseSizedPebblePointTombstones,
Version: roachpb.Version{Major: 23, Minor: 1, Internal: 14},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
10 changes: 2 additions & 8 deletions pkg/cmd/roachtest/tests/tombstones.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,6 @@ import (
// registerPointTombstone registers the point tombstone test.
func registerPointTombstone(r registry.Registry) {
r.Add(registry.TestSpec{
Skip: "pebble#2340",
SkipDetails: "This roachtest is implemented ahead of implementing and using " +
"pebble#2340 within Cockroach. Currently, this roachtest fails through " +
"a timeout because the disk space corresponding to the large KVs is " +
"never reclaimed. Once pebble#2340 is integrated into Cockroach, we " +
"expect this to begin passing, and we can un-skip it.",
Name: "point-tombstone/heterogeneous-value-sizes",
Owner: registry.OwnerStorage,
Cluster: r.MakeClusterSpec(4),
Expand Down Expand Up @@ -136,7 +130,7 @@ func registerPointTombstone(r registry.Registry) {
require.LessOrEqual(t, statsAfterDeletes.livePercentage, 0.10)

// Wait for garbage collection to delete the non-live data.
targetSize := uint64(2 << 30) /* 2 GB */
targetSize := uint64(3 << 30) /* 3 GiB */
t.Status("waiting for garbage collection and compaction to reduce on-disk size to ", humanize.IBytes(targetSize))
m = c.NewMonitor(ctx, c.Range(1, 3))
m.Go(func(ctx context.Context) error {
Expand Down Expand Up @@ -172,7 +166,7 @@ type tableSizeInfo struct {
}

func (info tableSizeInfo) String() string {
return fmt.Sprintf("databaseID: %d, tableID: %d, rangeCount: %d, approxDiskBytes: %s, liveBytes: %s, totalBytes: %s, livePercentage: %.1f",
return fmt.Sprintf("databaseID: %d, tableID: %d, rangeCount: %d, approxDiskBytes: %s, liveBytes: %s, totalBytes: %s, livePercentage: %.2f",
info.databaseID,
info.tableID,
info.rangeCount,
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/batch_spanset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestSpanSetBatchBoundaries(t *testing.T) {
}

t.Run("writes before range", func(t *testing.T) {
if err := batch.ClearUnversioned(outsideKey.Key); !isWriteSpanErr(err) {
if err := batch.ClearUnversioned(outsideKey.Key, storage.ClearOptions{}); !isWriteSpanErr(err) {
t.Errorf("ClearUnversioned: unexpected error %v", err)
}
if err := batch.ClearRawRange(outsideKey.Key, outsideKey2.Key, true, true); !isWriteSpanErr(err) {
Expand All @@ -93,7 +93,7 @@ func TestSpanSetBatchBoundaries(t *testing.T) {
})

t.Run("writes after range", func(t *testing.T) {
if err := batch.ClearUnversioned(outsideKey3.Key); !isWriteSpanErr(err) {
if err := batch.ClearUnversioned(outsideKey3.Key, storage.ClearOptions{}); !isWriteSpanErr(err) {
t.Errorf("ClearUnversioned: unexpected error %v", err)
}
if err := batch.ClearRawRange(insideKey2.Key, outsideKey4.Key, true, true); !isWriteSpanErr(err) {
Expand Down Expand Up @@ -303,7 +303,7 @@ func TestSpanSetBatchTimestamps(t *testing.T) {
}

for _, batch := range []storage.Batch{batchBefore, batchNonMVCC} {
if err := batch.ClearUnversioned(wkey.Key); !isWriteSpanErr(err) {
if err := batch.ClearUnversioned(wkey.Key, storage.ClearOptions{}); !isWriteSpanErr(err) {
t.Errorf("ClearUnversioned: unexpected error %v", err)
}
{
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/kvstorage/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ func LoadAndReconcileReplicas(ctx context.Context, eng storage.Engine) ([]Replic
// TODO(tbg): if clearRangeData were in this package we could destroy more
// effectively even if for some reason we had in the past written state
// other than the HardState here (not supposed to happen, but still).
if err := eng.ClearUnversioned(logstore.NewStateLoader(repl.RangeID).RaftHardStateKey()); err != nil {
if err := eng.ClearUnversioned(logstore.NewStateLoader(repl.RangeID).RaftHardStateKey(), storage.ClearOptions{}); err != nil {
return nil, errors.Wrapf(err, "removing HardState for r%d", repl.RangeID)
}
log.Eventf(ctx, "removed legacy uninitialized replica for r%s", repl.RangeID)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/loqrecovery/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func RegisterOfflineRecoveryEvents(
continue
}
if removeEvent {
if err := readWriter.ClearUnversioned(iter.UnsafeKey().Key); err != nil {
if err := readWriter.ClearUnversioned(iter.UnsafeKey().Key, storage.ClearOptions{}); err != nil {
processingErrors = errors.CombineErrors(processingErrors, errors.Wrapf(
err, "failed to delete replica recovery record at key %s", iter.UnsafeKey()))
continue
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -2479,7 +2479,10 @@ func handleTruncatedStateBelowRaftPreApply(
// avoid allocating when constructing Raft log keys (16 bytes).
prefix := prefixBuf.RaftLogPrefix()
for idx := currentTruncatedState.Index + 1; idx <= suggestedTruncatedState.Index; idx++ {
if err := readWriter.ClearUnversioned(keys.RaftLogKeyFromPrefix(prefix, idx)); err != nil {
if err := readWriter.ClearUnversioned(
keys.RaftLogKeyFromPrefix(prefix, idx),
storage.ClearOptions{},
); err != nil {
return false, errors.Wrapf(err, "unable to clear truncated Raft entries for %+v at index %d",
suggestedTruncatedState, idx)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1824,7 +1824,7 @@ func TestOptimizePuts(t *testing.T) {
require.NoError(t, tc.engine.ClearMVCCRangeKey(storage.MVCCRangeKey{
StartKey: c.exKey, EndKey: c.exEndKey, Timestamp: hlc.MinTimestamp}))
} else if c.exKey != nil {
require.NoError(t, tc.engine.ClearUnversioned(c.exKey))
require.NoError(t, tc.engine.ClearUnversioned(c.exKey, storage.ClearOptions{}))
}
}
}
Expand Down
21 changes: 13 additions & 8 deletions pkg/kv/kvserver/spanset/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,11 @@ func (i *EngineIterator) Value() ([]byte, error) {
return i.i.Value()
}

// ValueLen is part of the storage.EngineIterator interface.
func (i *EngineIterator) ValueLen() int {
return i.i.ValueLen()
}

// UnsafeRawEngineKey is part of the storage.EngineIterator interface.
func (i *EngineIterator) UnsafeRawEngineKey() []byte {
return i.i.UnsafeRawEngineKey()
Expand Down Expand Up @@ -522,34 +527,34 @@ func (s spanSetWriter) checkAllowed(key roachpb.Key) error {
return nil
}

func (s spanSetWriter) ClearMVCC(key storage.MVCCKey) error {
func (s spanSetWriter) ClearMVCC(key storage.MVCCKey, opts storage.ClearOptions) error {
if err := s.checkAllowed(key.Key); err != nil {
return err
}
return s.w.ClearMVCC(key)
return s.w.ClearMVCC(key, opts)
}

func (s spanSetWriter) ClearUnversioned(key roachpb.Key) error {
func (s spanSetWriter) ClearUnversioned(key roachpb.Key, opts storage.ClearOptions) error {
if err := s.checkAllowed(key); err != nil {
return err
}
return s.w.ClearUnversioned(key)
return s.w.ClearUnversioned(key, opts)
}

func (s spanSetWriter) ClearIntent(
key roachpb.Key, txnDidNotUpdateMeta bool, txnUUID uuid.UUID,
key roachpb.Key, txnDidNotUpdateMeta bool, txnUUID uuid.UUID, opts storage.ClearOptions,
) error {
if err := s.checkAllowed(key); err != nil {
return err
}
return s.w.ClearIntent(key, txnDidNotUpdateMeta, txnUUID)
return s.w.ClearIntent(key, txnDidNotUpdateMeta, txnUUID, opts)
}

func (s spanSetWriter) ClearEngineKey(key storage.EngineKey) error {
func (s spanSetWriter) ClearEngineKey(key storage.EngineKey, opts storage.ClearOptions) error {
if err := s.spans.CheckAllowed(SpanReadWrite, roachpb.Span{Key: key.Key}); err != nil {
return err
}
return s.w.ClearEngineKey(key)
return s.w.ClearEngineKey(key, opts)
}

func (s spanSetWriter) SingleClearEngineKey(key storage.EngineKey) error {
Expand Down
49 changes: 39 additions & 10 deletions pkg/storage/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package storage
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"reflect"
"strconv"
Expand Down Expand Up @@ -78,7 +79,7 @@ func testBatchBasics(t *testing.T, writeOnly bool, commit func(e Engine, b Write

// Write an engine value to be deleted.
require.NoError(t, e.PutUnversioned(mvccKey("b").Key, []byte("value")))
require.NoError(t, b.ClearUnversioned(mvccKey("b").Key))
require.NoError(t, b.ClearUnversioned(mvccKey("b").Key, ClearOptions{}))

// Write an engine value to be merged.
require.NoError(t, e.PutUnversioned(mvccKey("c").Key, appender("foo")))
Expand All @@ -91,12 +92,25 @@ func testBatchBasics(t *testing.T, writeOnly bool, commit func(e Engine, b Write
require.NoError(t, e.PutUnversioned(mvccKey("d").Key, []byte("before")))
require.NoError(t, b.SingleClearEngineKey(EngineKey{Key: mvccKey("d").Key}))

// Write a MVCC value to be deleted with a known value size.
keyF := mvccKey("f")
keyF.Timestamp.WallTime = 1
valueF := MVCCValue{Value: roachpb.Value{RawBytes: []byte("fvalue")}}
encodedValueF, err := EncodeMVCCValue(valueF)
require.NoError(t, err)
require.NoError(t, e.PutMVCC(keyF, valueF))
require.NoError(t, b.ClearMVCC(keyF, ClearOptions{
ValueSizeKnown: true,
ValueSize: uint32(len(encodedValueF)),
}))

// Check all keys are in initial state (nothing from batch has gone
// through to engine until commit).
expValues := []MVCCKeyValue{
{Key: mvccKey("b"), Value: []byte("value")},
{Key: mvccKey("c"), Value: appender("foo")},
{Key: mvccKey("d"), Value: []byte("before")},
{Key: keyF, Value: encodedValueF},
}
kvs, err := Scan(e, localMax, roachpb.KeyMax, 0)
require.NoError(t, err)
Expand Down Expand Up @@ -199,7 +213,7 @@ func TestReadOnlyBasics(t *testing.T) {
// For a read-only ReadWriter, all Writer methods should panic.
failureTestCases := []func(){
func() { _ = ro.ApplyBatchRepr(nil, false) },
func() { _ = ro.ClearUnversioned(a.Key) },
func() { _ = ro.ClearUnversioned(a.Key, ClearOptions{}) },
func() { _ = ro.SingleClearEngineKey(EngineKey{Key: a.Key}) },
func() { _ = ro.ClearRawRange(a.Key, a.Key, true, true) },
func() { _ = ro.Merge(a, nil) },
Expand All @@ -215,7 +229,7 @@ func TestReadOnlyBasics(t *testing.T) {
if err := e.PutUnversioned(mvccKey("b").Key, []byte("value")); err != nil {
t.Fatal(err)
}
if err := e.ClearUnversioned(mvccKey("b").Key); err != nil {
if err := e.ClearUnversioned(mvccKey("b").Key, ClearOptions{}); err != nil {
t.Fatal(err)
}
if err := e.PutUnversioned(mvccKey("c").Key, appender("foo")); err != nil {
Expand Down Expand Up @@ -249,13 +263,17 @@ func TestReadOnlyBasics(t *testing.T) {
func TestBatchRepr(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
// Disable metamorphism in the value-encoding; our asserts include the
// length of the encoded value to test delete-sized.

DisableMetamorphicSimpleValueEncoding(t)
testBatchBasics(t, false /* writeOnly */, func(e Engine, b WriteBatch) error {
repr := b.Repr()

r, err := NewBatchReader(repr)
require.NoError(t, err)

const expectedCount = 5
const expectedCount = 6
require.Equal(t, expectedCount, r.Count())
count, err := BatchCount(repr)
require.NoError(t, err)
Expand All @@ -267,6 +285,9 @@ func TestBatchRepr(t *testing.T) {
switch r.KeyKind() {
case pebble.InternalKeyKindDelete:
ops = append(ops, fmt.Sprintf("delete(%s)", string(r.Key())))
case pebble.InternalKeyKindDeleteSized:
v, _ := binary.Uvarint(r.Value())
ops = append(ops, fmt.Sprintf("delete-sized(%s,%d)", string(r.Key()), v))
case pebble.InternalKeyKindSet:
ops = append(ops, fmt.Sprintf("put(%s,%s)", string(r.Key()), string(r.Value())))
case pebble.InternalKeyKindMerge:
Expand All @@ -287,6 +308,7 @@ func TestBatchRepr(t *testing.T) {
"merge(c\x00)",
"put(e\x00,)",
"single_delete(d\x00)",
"delete-sized(f\x00\x00\x00\x00\x00\x00\x00\x00\x01\t,17)",
}
require.Equal(t, expOps, ops)

Expand Down Expand Up @@ -383,7 +405,7 @@ func TestBatchGet(t *testing.T) {
if err := b.PutUnversioned(mvccKey("a").Key, []byte("value")); err != nil {
t.Fatal(err)
}
if err := b.ClearUnversioned(mvccKey("b").Key); err != nil {
if err := b.ClearUnversioned(mvccKey("b").Key, ClearOptions{}); err != nil {
t.Fatal(err)
}
if err := b.Merge(mvccKey("c"), appender("bar")); err != nil {
Expand Down Expand Up @@ -435,7 +457,7 @@ func TestBatchMerge(t *testing.T) {
if err := b.PutUnversioned(mvccKey("a").Key, appender("a-value")); err != nil {
t.Fatal(err)
}
if err := b.ClearUnversioned(mvccKey("b").Key); err != nil {
if err := b.ClearUnversioned(mvccKey("b").Key, ClearOptions{}); err != nil {
t.Fatal(err)
}
if err := b.Merge(mvccKey("c"), appender("c-value")); err != nil {
Expand Down Expand Up @@ -578,7 +600,10 @@ func TestBatchScanWithDelete(t *testing.T) {
if err := e.PutUnversioned(mvccKey("a").Key, []byte("value")); err != nil {
t.Fatal(err)
}
if err := b.ClearUnversioned(mvccKey("a").Key); err != nil {
if err := b.ClearUnversioned(mvccKey("a").Key, ClearOptions{
ValueSizeKnown: true,
ValueSize: uint32(len("value")),
}); err != nil {
t.Fatal(err)
}
kvs, err := Scan(b, localMax, roachpb.KeyMax, 0)
Expand Down Expand Up @@ -611,7 +636,10 @@ func TestBatchScanMaxWithDeleted(t *testing.T) {
t.Fatal(err)
}
// Now, delete "a" in batch.
if err := b.ClearUnversioned(mvccKey("a").Key); err != nil {
if err := b.ClearUnversioned(mvccKey("a").Key, ClearOptions{
ValueSizeKnown: true,
ValueSize: uint32(len("value1")),
}); err != nil {
t.Fatal(err)
}
// A scan with max=1 should scan "b".
Expand Down Expand Up @@ -862,7 +890,8 @@ func TestDecodeKey(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

e, err := Open(context.Background(), InMemory(), cluster.MakeClusterSettings(), CacheSize(1<<20 /* 1 MiB */))
e, err := Open(context.Background(), InMemory(),
cluster.MakeTestingClusterSettings(), CacheSize(1<<20 /* 1 MiB */))
assert.NoError(t, err)
defer e.Close()

Expand Down Expand Up @@ -924,7 +953,7 @@ func TestBatchReader(t *testing.T) {
require.NoError(t, b.PutEngineRangeKey(roachpb.Key("rangeFrom"), roachpb.Key("rangeTo"), []byte{7}, []byte("engineRangeKey")))

// Clear some already empty keys.
require.NoError(t, b.ClearMVCC(pointKey("mvccKey", 9)))
require.NoError(t, b.ClearMVCC(pointKey("mvccKey", 9), ClearOptions{}))
require.NoError(t, b.ClearMVCCRangeKey(rangeKey("rangeFrom", "rangeTo", 9)))
require.NoError(t, b.ClearRawRange(roachpb.Key("clearFrom"), roachpb.Key("clearTo"), true, true))

Expand Down
Loading