diff --git a/pkg/ccl/storageccl/engineccl/BUILD.bazel b/pkg/ccl/storageccl/engineccl/BUILD.bazel index 21c5e75b146e..1c66dec6382d 100644 --- a/pkg/ccl/storageccl/engineccl/BUILD.bazel +++ b/pkg/ccl/storageccl/engineccl/BUILD.bazel @@ -6,19 +6,28 @@ go_library( "ctr_stream.go", "encrypted_fs.go", "pebble_key_manager.go", + "shared_storage.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/ccl/storageccl/engineccl", visibility = ["//visibility:public"], deps = [ "//pkg/ccl/baseccl", "//pkg/ccl/storageccl/engineccl/enginepbccl", + "//pkg/ccl/utilccl", + "//pkg/kv/kvserver/rditer", + "//pkg/roachpb", + "//pkg/settings/cluster", "//pkg/storage", "//pkg/storage/enginepb", "//pkg/util/log", "//pkg/util/protoutil", "//pkg/util/syncutil", + "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_errors//oserror", + "@com_github_cockroachdb_pebble//:pebble", + "@com_github_cockroachdb_pebble//objstorage/remote", + "@com_github_cockroachdb_pebble//rangekey", "@com_github_cockroachdb_pebble//vfs", "@com_github_cockroachdb_pebble//vfs/atomicfs", "@com_github_gogo_protobuf//proto", diff --git a/pkg/ccl/storageccl/engineccl/shared_storage.go b/pkg/ccl/storageccl/engineccl/shared_storage.go new file mode 100644 index 000000000000..5ec392ab6602 --- /dev/null +++ b/pkg/ccl/storageccl/engineccl/shared_storage.go @@ -0,0 +1,74 @@ +// Copyright 2023 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package engineccl + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/pebble" + "github.com/cockroachdb/pebble/objstorage/remote" + "github.com/cockroachdb/pebble/rangekey" +) + +func configureForSharedStorage(opts *pebble.Options, remoteStorage remote.Storage) error { + opts.Experimental.RemoteStorage = remote.MakeSimpleFactory(map[remote.Locator]remote.Storage{ + "": remoteStorage, + }) + opts.Experimental.CreateOnShared = remote.CreateOnSharedLower + opts.Experimental.CreateOnSharedLocator = "" + return nil +} + +// iterateReplicaKeySpansShared iterates over the range's user key span, +// skipping any keys present in shared files. It calls the appropriate visitor +// function for the type of key visited, namely, point keys, range deletes and +// range keys. Shared files that are skipped during this iteration are also +// surfaced through a dedicated visitor. Note that this method only iterates +// over a range's user key span; IterateReplicaKeySpans must be called to +// iterate over the other key spans. +// +// Must use a reader with consistent iterators. +func iterateReplicaKeySpansShared( + ctx context.Context, + desc *roachpb.RangeDescriptor, + st *cluster.Settings, + clusterID uuid.UUID, + reader storage.Reader, + visitPoint func(key *pebble.InternalKey, val pebble.LazyValue, info pebble.IteratorLevel) error, + visitRangeDel func(start, end []byte, seqNum uint64) error, + visitRangeKey func(start, end []byte, keys []rangekey.Key) error, + visitSharedFile func(sst *pebble.SharedSSTMeta) error, +) error { + if !reader.ConsistentIterators() { + panic("reader must provide consistent iterators") + } + if err := utilccl.CheckEnterpriseEnabled(st, clusterID, "disaggregated shared storage"); err != nil { + // NB: ScanInternal returns ErrInvalidSkipSharedIteration if we can't do + // a skip-shared iteration. Return the same error here so the caller can + // fall back to regular, non-shared snapshots. + return pebble.ErrInvalidSkipSharedIteration + } + spans := rditer.Select(desc.RangeID, rditer.SelectOpts{ + ReplicatedSpansFilter: rditer.ReplicatedSpansUserOnly, + ReplicatedBySpan: desc.RSpan(), + }) + span := spans[0] + return reader.ScanInternal(ctx, span.Key, span.EndKey, visitPoint, visitRangeDel, visitRangeKey, visitSharedFile) +} + +func init() { + storage.ConfigureForSharedStorage = configureForSharedStorage + rditer.IterateReplicaKeySpansShared = iterateReplicaKeySpansShared +} diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index b7296e44e40a..672aafe27b21 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -39,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "go.etcd.io/raft/v3/raftpb" "google.golang.org/grpc" @@ -1117,6 +1118,7 @@ func (t *RaftTransport) dropFlowTokensForDisconnectedNodes() { // of the snapshot. func (t *RaftTransport) SendSnapshot( ctx context.Context, + clusterID uuid.UUID, storePool *storepool.StorePool, header kvserverpb.SnapshotRequest_Header, snap *OutgoingSnapshot, @@ -1141,7 +1143,7 @@ func (t *RaftTransport) SendSnapshot( log.Warningf(ctx, "failed to close snapshot stream: %+v", err) } }() - return sendSnapshot(ctx, t.st, t.tracer, stream, storePool, header, snap, newWriteBatch, sent, recordBytesSent) + return sendSnapshot(ctx, clusterID, t.st, t.tracer, stream, storePool, header, snap, newWriteBatch, sent, recordBytesSent) } // DelegateSnapshot sends a DelegateSnapshotRequest to a remote store diff --git a/pkg/kv/kvserver/rditer/BUILD.bazel b/pkg/kv/kvserver/rditer/BUILD.bazel index 23cf0b373fc9..35c4fe2f1e28 100644 --- a/pkg/kv/kvserver/rditer/BUILD.bazel +++ b/pkg/kv/kvserver/rditer/BUILD.bazel @@ -13,9 +13,11 @@ go_library( "//pkg/keys", "//pkg/kv/kvserver/spanset", "//pkg/roachpb", + "//pkg/settings/cluster", "//pkg/storage", "//pkg/storage/enginepb", "//pkg/util/iterutil", + "//pkg/util/uuid", "@com_github_cockroachdb_pebble//:pebble", "@com_github_cockroachdb_pebble//rangekey", ], diff --git a/pkg/kv/kvserver/rditer/replica_data_iter.go b/pkg/kv/kvserver/rditer/replica_data_iter.go index 247dc66d9802..91b7270edd74 100644 --- a/pkg/kv/kvserver/rditer/replica_data_iter.go +++ b/pkg/kv/kvserver/rditer/replica_data_iter.go @@ -16,8 +16,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/iterutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/pebble" "github.com/cockroachdb/pebble/rangekey" ) @@ -429,34 +431,20 @@ func IterateReplicaKeySpans( return nil } -// IterateReplicaKeySpansShared iterates over the range's user key span, -// skipping any keys present in shared files. It calls the appropriate visitor -// function for the type of key visited, namely, point keys, range deletes and -// range keys. Shared files that are skipped during this iteration are also -// surfaced through a dedicated visitor. Note that this method only iterates -// over a range's user key span; IterateReplicaKeySpans must be called to -// iterate over the other key spans. -// -// Must use a reader with consistent iterators. -func IterateReplicaKeySpansShared( +// IterateReplicaKeySpansShared is a shared-replicate version of +// IterateReplicaKeySpans. See definitions of this method for how it is +// implemented. +var IterateReplicaKeySpansShared func( ctx context.Context, desc *roachpb.RangeDescriptor, + st *cluster.Settings, + clusterID uuid.UUID, reader storage.Reader, visitPoint func(key *pebble.InternalKey, val pebble.LazyValue, info pebble.IteratorLevel) error, visitRangeDel func(start, end []byte, seqNum uint64) error, visitRangeKey func(start, end []byte, keys []rangekey.Key) error, visitSharedFile func(sst *pebble.SharedSSTMeta) error, -) error { - if !reader.ConsistentIterators() { - panic("reader must provide consistent iterators") - } - spans := Select(desc.RangeID, SelectOpts{ - ReplicatedSpansFilter: ReplicatedSpansUserOnly, - ReplicatedBySpan: desc.RSpan(), - }) - span := spans[0] - return reader.ScanInternal(ctx, span.Key, span.EndKey, visitPoint, visitRangeDel, visitRangeKey, visitSharedFile) -} +) error // IterateOptions instructs how points and ranges should be presented to visitor // and if iterators should be visited in forward or reverse order. diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 75ea8ca228cf..222c96d6213d 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -3210,6 +3210,7 @@ func (r *Replica) followerSendSnapshot( ctx, "send-snapshot", sendSnapshotTimeout, func(ctx context.Context) error { resp, err := r.store.cfg.Transport.SendSnapshot( ctx, + r.store.ClusterID(), r.store.cfg.StorePool, header, snap, diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index fa9bfc25135b..4984031a6b61 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -126,8 +126,9 @@ type kvBatchSnapshotStrategy struct { // before flushing to disk. Only used on the receiver side. sstChunkSize int64 // Only used on the receiver side. - scratch *SSTSnapshotStorageScratch - st *cluster.Settings + scratch *SSTSnapshotStorageScratch + st *cluster.Settings + clusterID uuid.UUID } // multiSSTWriter is a wrapper around an SSTWriter and SSTSnapshotStorageScratch @@ -769,7 +770,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send( // If snapshots containing shared files are allowed, and this range is a // non-system range, take advantage of shared storage to minimize the amount // of data we're iterating on and sending over the network. - sharedReplicate := header.SharedReplicate + sharedReplicate := header.SharedReplicate && rditer.IterateReplicaKeySpansShared != nil replicatedFilter := rditer.ReplicatedSpansAll if sharedReplicate { replicatedFilter = rditer.ReplicatedSpansExcludeUser @@ -837,7 +838,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send( var valBuf []byte if sharedReplicate { - err := rditer.IterateReplicaKeySpansShared(ctx, snap.State.Desc, snap.EngineSnap, func(key *pebble.InternalKey, value pebble.LazyValue, _ pebble.IteratorLevel) error { + err := rditer.IterateReplicaKeySpansShared(ctx, snap.State.Desc, kvSS.st, kvSS.clusterID, snap.EngineSnap, func(key *pebble.InternalKey, value pebble.LazyValue, _ pebble.IteratorLevel) error { kvs++ if b == nil { b = kvSS.newWriteBatch() @@ -1375,6 +1376,7 @@ func (s *Store) receiveSnapshot( scratch: s.sstSnapshotStorage.NewScratchSpace(header.State.Desc.RangeID, snapUUID), sstChunkSize: snapshotSSTWriteSyncRate.Get(&s.cfg.Settings.SV), st: s.ClusterSettings(), + clusterID: s.ClusterID(), } defer ss.Close(ctx) @@ -1629,6 +1631,7 @@ var snapshotSSTWriteSyncRate = settings.RegisterByteSizeSetting( // snapshot to the replica specified in the input. func SendEmptySnapshot( ctx context.Context, + clusterID uuid.UUID, st *cluster.Settings, tracer *tracing.Tracer, cc *grpc.ClientConn, @@ -1751,6 +1754,7 @@ func SendEmptySnapshot( if _, err := sendSnapshot( ctx, + clusterID, st, tracer, stream, @@ -1774,6 +1778,7 @@ func (n noopStorePool) Throttle(storepool.ThrottleReason, string, roachpb.StoreI // sendSnapshot sends an outgoing snapshot via a pre-opened GRPC stream. func sendSnapshot( ctx context.Context, + clusterID uuid.UUID, st *cluster.Settings, tracer *tracing.Tracer, stream outgoingSnapshotStream, @@ -1844,6 +1849,7 @@ func sendSnapshot( limiter: limiter, newWriteBatch: newWriteBatch, st: st, + clusterID: clusterID, } // Record timings for snapshot send if kv.trace.snapshot.enable_threshold is enabled diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 058abec270ca..f85782c98c6a 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -3127,7 +3127,7 @@ func TestSendSnapshotThrottling(t *testing.T) { expectedErr := errors.New("") c := fakeSnapshotStream{nil, expectedErr} _, err := sendSnapshot( - ctx, st, tr, c, sp, header, nil /* snap */, newBatch, nil /* sent */, nil, /* recordBytesSent */ + ctx, uuid.MakeV4(), st, tr, c, sp, header, nil /* snap */, newBatch, nil /* sent */, nil, /* recordBytesSent */ ) if sp.failedThrottles != 1 { t.Fatalf("expected 1 failed throttle, but found %d", sp.failedThrottles) @@ -3146,7 +3146,7 @@ func TestSendSnapshotThrottling(t *testing.T) { } c := fakeSnapshotStream{resp, nil} _, err := sendSnapshot( - ctx, st, tr, c, sp, header, nil /* snap */, newBatch, nil /* sent */, nil, /* recordBytesSent */ + ctx, uuid.MakeV4(), st, tr, c, sp, header, nil /* snap */, newBatch, nil /* sent */, nil, /* recordBytesSent */ ) if sp.failedThrottles != 1 { t.Fatalf("expected 1 failed throttle, but found %d", sp.failedThrottles) diff --git a/pkg/server/node.go b/pkg/server/node.go index 4c7aa181592d..012c91ff4591 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -1990,6 +1990,7 @@ func (n *Node) ResetQuorum( // replicas from this fresh snapshot. if err := kvserver.SendEmptySnapshot( ctx, + n.clusterID.Get(), n.storeCfg.Settings, n.storeCfg.Tracer(), conn, diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index d502ea35dd73..3d6686e4a9c2 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -996,6 +996,10 @@ func ResolveEncryptedEnvOptions( return fileRegistry, env, nil } +// ConfigureForSharedStorage is used to configure a pebble Options for shared +// storage. +var ConfigureForSharedStorage func(opts *pebble.Options, storage remote.Storage) error + // NewPebble creates a new Pebble instance, at the specified path. // Do not use directly (except in test); use Open instead. // @@ -1225,11 +1229,12 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) { // in it is needed for CRDB to function properly. if cfg.SharedStorage != nil { esWrapper := &externalStorageWrapper{p: p, es: cfg.SharedStorage, ctx: ctx} - opts.Experimental.RemoteStorage = remote.MakeSimpleFactory(map[remote.Locator]remote.Storage{ - "": esWrapper, - }) - opts.Experimental.CreateOnShared = remote.CreateOnSharedLower - opts.Experimental.CreateOnSharedLocator = "" + if ConfigureForSharedStorage == nil { + return nil, errors.New("shared storage requires CCL features") + } + if err := ConfigureForSharedStorage(opts, esWrapper); err != nil { + return nil, errors.Wrap(err, "error when configuring shared storage") + } } else { if cfg.RemoteStorageFactory != nil { opts.Experimental.RemoteStorage = remoteStorageAdaptor{p: p, ctx: ctx, factory: cfg.RemoteStorageFactory}