Skip to content

Commit

Permalink
Merge #76780
Browse files Browse the repository at this point in the history
76780: clusterversion,storage: support Pebblev2 table format r=erikgrinaker a=nicktrav

The `Pebblev2` SSTable format adds support for range keys. Add two new
cluster versions to provide the upgrade path - the first version for
bumping the store, the second for use as a feature gate.

Rework the table format inference for new SSTable writers to take a more
conservative approach. By default, assume version `RocksDBv2`, and only
bump up to a newer version if a cluster supports it. Previously a newer
version was assumed and bumped down if the cluster didn't support it.

Update `storage.MakeBackupSSTWriter` to take in a context and cluster
settings which can be used to infer which table version to use. Enable
range keys for backups and ingest, if the cluster supports it.

Release note: None

Co-authored-by: Nick Travers <[email protected]>
  • Loading branch information
craig[bot] and nicktrav committed Feb 21, 2022
2 parents 5c5b1e3 + 9ecc619 commit cc1e630
Show file tree
Hide file tree
Showing 12 changed files with 73 additions and 30 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen
trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 21.2-70 set the active cluster version in the format '<major>.<minor>'
version version 21.2-74 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,6 @@
<tr><td><code>trace.jaeger.agent</code></td><td>string</td><td><code></code></td><td>the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.</td></tr>
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-70</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-74</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ func (s *sstSink) open(ctx context.Context) error {
}
}
s.out = w
s.sst = storage.MakeBackupSSTWriter(s.out)
s.sst = storage.MakeBackupSSTWriter(ctx, s.dest.Settings(), s.out)

return nil
}
Expand Down
11 changes: 8 additions & 3 deletions pkg/ccl/backupccl/restore_data_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,13 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) {
keySlice = append(keySlice, key)
}

ctx := context.Background()
cs := cluster.MakeTestingClusterSettings()
writeSST := func(t *testing.T, offsets []int) string {
path := strconv.FormatInt(hlc.UnixNano(), 10)

sstFile := &storage.MemFile{}
sst := storage.MakeBackupSSTWriter(sstFile)
sst := storage.MakeBackupSSTWriter(ctx, cs, sstFile)
defer sst.Close()
ts := hlc.NewClock(hlc.UnixNano, time.Nanosecond).Now()
value := roachpb.MakeValueFromString("bar")
Expand Down Expand Up @@ -248,8 +250,11 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) {
},
}}

ctx := context.Background()
args := base.TestServerArgs{Knobs: knobs, ExternalIODir: dir}
args := base.TestServerArgs{
Knobs: knobs,
ExternalIODir: dir,
Settings: cs,
}
// TODO(dan): This currently doesn't work with AddSSTable on in-memory
// stores because RocksDB's InMemoryEnv doesn't support NewRandomRWFile
// (which breaks the global-seqno rewrite used when the added sstable
Expand Down
19 changes: 19 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,16 @@ const (
// EnableLeaseHolderRemoval enables removing a leaseholder and transferring the lease
// during joint configuration, including to VOTER_INCOMING replicas.
EnableLeaseHolderRemoval
// EnsurePebbleFormatVersionRangeKeys is the first step of a two-part
// migration that bumps Pebble's format major version to a version that
// supports range keys.
EnsurePebbleFormatVersionRangeKeys
// EnablePebbleFormatVersionRangeKeys is the second of a two-part migration
// and is used as the feature gate for use of range keys. Any node at this
// version is guaranteed to reside in a cluster where all nodes support range
// keys at the Pebble layer.
EnablePebbleFormatVersionRangeKeys

// *************************************************
// Step (1): Add new versions here.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -466,6 +476,15 @@ var versionsSingleton = keyedVersions{
Key: EnableLeaseHolderRemoval,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 70},
},
{
Key: EnsurePebbleFormatVersionRangeKeys,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 72},
},
{
Key: EnablePebbleFormatVersionRangeKeys,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 74},
},

// *************************************************
// Step (2): Add new versions here.
// Do not add new versions to a patch release.
Expand Down
6 changes: 4 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@ func runTestDBAddSSTable(
value.InitChecksum([]byte("foo"))

sstFile := &storage.MemFile{}
w := storage.MakeBackupSSTWriter(sstFile)
w := storage.MakeBackupSSTWriter(ctx, cs, sstFile)
defer w.Close()
require.NoError(t, w.Put(key, value.RawBytes))
require.NoError(t, w.Finish())
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/bench_pebble_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand All @@ -32,7 +33,8 @@ func setupMVCCPebble(b testing.TB, dir string) Engine {
peb, err := Open(
context.Background(),
Filesystem(dir),
CacheSize(testCacheSize))
CacheSize(testCacheSize),
Settings(cluster.MakeTestingClusterSettings()))
if err != nil {
b.Fatalf("could not create new pebble instance at %s: %+v", dir, err)
}
Expand Down
15 changes: 9 additions & 6 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -934,7 +934,7 @@ func (p *Pebble) ExportMVCCToSst(
) (roachpb.BulkOpSummary, roachpb.Key, hlc.Timestamp, error) {
r := wrapReader(p)
// Doing defer r.Free() does not inline.
summary, k, err := pebbleExportToSst(ctx, r, exportOptions, dest)
summary, k, err := pebbleExportToSst(ctx, p.settings, r, exportOptions, dest)
r.Free()
return summary, k.Key, k.Timestamp, err
}
Expand Down Expand Up @@ -1602,6 +1602,10 @@ func (p *Pebble) SetMinVersion(version roachpb.Version) error {
formatVers := pebble.FormatMostCompatible
// Cases are ordered from newer to older versions.
switch {
case !version.Less(clusterversion.ByKey(clusterversion.EnsurePebbleFormatVersionRangeKeys)):
if formatVers < pebble.FormatRangeKeys {
formatVers = pebble.FormatRangeKeys
}
case !version.Less(clusterversion.ByKey(clusterversion.PebbleFormatBlockPropertyCollector)):
if formatVers < pebble.FormatBlockPropertyCollector {
formatVers = pebble.FormatBlockPropertyCollector
Expand Down Expand Up @@ -1707,7 +1711,7 @@ func (p *pebbleReadOnly) ExportMVCCToSst(
) (roachpb.BulkOpSummary, roachpb.Key, hlc.Timestamp, error) {
r := wrapReader(p)
// Doing defer r.Free() does not inline.
summary, k, err := pebbleExportToSst(ctx, r, exportOptions, dest)
summary, k, err := pebbleExportToSst(ctx, p.parent.settings, r, exportOptions, dest)
r.Free()
return summary, k.Key, k.Timestamp, err
}
Expand Down Expand Up @@ -1964,7 +1968,7 @@ func (p *pebbleSnapshot) ExportMVCCToSst(
) (roachpb.BulkOpSummary, roachpb.Key, hlc.Timestamp, error) {
r := wrapReader(p)
// Doing defer r.Free() does not inline.
summary, k, err := pebbleExportToSst(ctx, r, exportOptions, dest)
summary, k, err := pebbleExportToSst(ctx, p.settings, r, exportOptions, dest)
r.Free()
return summary, k.Key, k.Timestamp, err
}
Expand Down Expand Up @@ -2085,13 +2089,12 @@ func (e *ExceedMaxSizeError) Error() string {
}

func pebbleExportToSst(
ctx context.Context, reader Reader, options ExportOptions, dest io.Writer,
ctx context.Context, cs *cluster.Settings, reader Reader, options ExportOptions, dest io.Writer,
) (roachpb.BulkOpSummary, MVCCKey, error) {
var span *tracing.Span
ctx, span = tracing.ChildSpan(ctx, "pebbleExportToSst")
_ = ctx // ctx is currently unused, but this new ctx should be used below in the future.
defer span.Finish()
sstWriter := MakeBackupSSTWriter(dest)
sstWriter := MakeBackupSSTWriter(ctx, cs, dest)
defer sstWriter.Close()

var rows RowCounter
Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/sst_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ func TestCheckSSTConflictsMaxIntents(t *testing.T) {
}

// Create SST with keys equal to intents at txn2TS.
cs := cluster.MakeTestingClusterSettings()
sstFile := &MemFile{}
sstWriter := MakeBackupSSTWriter(sstFile)
sstWriter := MakeBackupSSTWriter(context.Background(), cs, sstFile)
defer sstWriter.Close()
for _, k := range intents {
key := MVCCKey{Key: roachpb.Key(k), Timestamp: txn2TS}
Expand All @@ -64,7 +65,7 @@ func TestCheckSSTConflictsMaxIntents(t *testing.T) {
for _, engineImpl := range mvccEngineImpls {
t.Run(engineImpl.name, func(t *testing.T) {
ctx := context.Background()
engine := engineImpl.create()
engine := engineImpl.create(Settings(cs))
defer engine.Close()

// Write some committed keys and intents at txn1TS.
Expand Down
33 changes: 22 additions & 11 deletions pkg/storage/sst_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,26 +56,37 @@ func (noopSyncCloser) Close() error {

// MakeIngestionWriterOptions returns writer options suitable for writing SSTs
// that will subsequently be ingested (e.g. with AddSSTable).
func MakeIngestionWriterOptions(ctx context.Context, st *cluster.Settings) sstable.WriterOptions {
opts := DefaultPebbleOptions().MakeWriterOptions(0, sstable.TableFormatPebblev1)
// Only enable block properties if this cluster version support it.
// NB: we check for the _second_ of the two cluster versions. The first is
// used as a barrier for the major format version bump in the store. Nodes
// that are at the second version are guaranteed by the cluster migration
// framework to have already bumped their store major format versions to a
// sufficient version by the first.
if !st.Version.IsActive(ctx, clusterversion.EnablePebbleFormatVersionBlockProperties) {
func MakeIngestionWriterOptions(ctx context.Context, cs *cluster.Settings) sstable.WriterOptions {
// By default, take a conservative approach and assume we don't have newer
// table features available. Upgrade to an appropriate version only if the
// cluster supports it.
format := sstable.TableFormatRocksDBv2
// Cases are ordered from newer to older versions.
switch {
case cs.Version.IsActive(ctx, clusterversion.EnablePebbleFormatVersionRangeKeys):
format = sstable.TableFormatPebblev2 // Range keys.
case cs.Version.IsActive(ctx, clusterversion.EnablePebbleFormatVersionBlockProperties):
format = sstable.TableFormatPebblev1 // Block properties.
}
opts := DefaultPebbleOptions().MakeWriterOptions(0, format)
if format < sstable.TableFormatPebblev1 {
// Block properties aren't available at this version. Disable collection.
opts.BlockPropertyCollectors = nil
opts.TableFormat = sstable.TableFormatRocksDBv2
}
opts.MergerName = "nullptr"
return opts
}

// MakeBackupSSTWriter creates a new SSTWriter tailored for backup SSTs which
// are typically only ever iterated in their entirety.
func MakeBackupSSTWriter(f io.Writer) SSTWriter {
func MakeBackupSSTWriter(ctx context.Context, cs *cluster.Settings, f io.Writer) SSTWriter {
// By default, take a conservative approach and assume we don't have newer
// table features available. Upgrade to an appropriate version only if the
// cluster supports it.
opts := DefaultPebbleOptions().MakeWriterOptions(0, sstable.TableFormatRocksDBv2)
if cs.Version.IsActive(ctx, clusterversion.EnablePebbleFormatVersionRangeKeys) {
opts.TableFormat = sstable.TableFormatPebblev2 // Range keys.
}
// Don't need BlockPropertyCollectors for backups.
opts.BlockPropertyCollectors = nil

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/sst_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func makePebbleSST(t testing.TB, kvs []storage.MVCCKeyValue, ingestion bool) []b
if ingestion {
w = storage.MakeIngestionSSTWriter(ctx, st, f)
} else {
w = storage.MakeBackupSSTWriter(f)
w = storage.MakeBackupSSTWriter(ctx, st, f)
}
defer w.Close()

Expand Down

0 comments on commit cc1e630

Please sign in to comment.