Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: move shared storage instantiation code to CCL #114267

Merged
merged 1 commit into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pkg/ccl/storageccl/engineccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,28 @@ go_library(
"ctr_stream.go",
"encrypted_fs.go",
"pebble_key_manager.go",
"shared_storage.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/storageccl/engineccl",
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/baseccl",
"//pkg/ccl/storageccl/engineccl/enginepbccl",
"//pkg/ccl/utilccl",
"//pkg/kv/kvserver/rditer",
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/syncutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_errors//oserror",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_cockroachdb_pebble//objstorage/remote",
"@com_github_cockroachdb_pebble//rangekey",
"@com_github_cockroachdb_pebble//vfs",
"@com_github_cockroachdb_pebble//vfs/atomicfs",
"@com_github_gogo_protobuf//proto",
Expand Down
74 changes: 74 additions & 0 deletions pkg/ccl/storageccl/engineccl/shared_storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2023 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package engineccl

import (
"context"

"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/objstorage/remote"
"github.com/cockroachdb/pebble/rangekey"
)

func configureForSharedStorage(opts *pebble.Options, remoteStorage remote.Storage) error {
opts.Experimental.RemoteStorage = remote.MakeSimpleFactory(map[remote.Locator]remote.Storage{
"": remoteStorage,
})
opts.Experimental.CreateOnShared = remote.CreateOnSharedLower
opts.Experimental.CreateOnSharedLocator = ""
return nil
}

// iterateReplicaKeySpansShared iterates over the range's user key span,
// skipping any keys present in shared files. It calls the appropriate visitor
// function for the type of key visited, namely, point keys, range deletes and
// range keys. Shared files that are skipped during this iteration are also
// surfaced through a dedicated visitor. Note that this method only iterates
// over a range's user key span; IterateReplicaKeySpans must be called to
// iterate over the other key spans.
//
// Must use a reader with consistent iterators.
func iterateReplicaKeySpansShared(
ctx context.Context,
desc *roachpb.RangeDescriptor,
st *cluster.Settings,
clusterID uuid.UUID,
reader storage.Reader,
visitPoint func(key *pebble.InternalKey, val pebble.LazyValue, info pebble.IteratorLevel) error,
visitRangeDel func(start, end []byte, seqNum uint64) error,
visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
visitSharedFile func(sst *pebble.SharedSSTMeta) error,
) error {
if !reader.ConsistentIterators() {
panic("reader must provide consistent iterators")
}
if err := utilccl.CheckEnterpriseEnabled(st, clusterID, "disaggregated shared storage"); err != nil {
// NB: ScanInternal returns ErrInvalidSkipSharedIteration if we can't do
// a skip-shared iteration. Return the same error here so the caller can
// fall back to regular, non-shared snapshots.
return pebble.ErrInvalidSkipSharedIteration
}
spans := rditer.Select(desc.RangeID, rditer.SelectOpts{
ReplicatedSpansFilter: rditer.ReplicatedSpansUserOnly,
ReplicatedBySpan: desc.RSpan(),
})
span := spans[0]
return reader.ScanInternal(ctx, span.Key, span.EndKey, visitPoint, visitRangeDel, visitRangeKey, visitSharedFile)
}

func init() {
storage.ConfigureForSharedStorage = configureForSharedStorage
rditer.IterateReplicaKeySpansShared = iterateReplicaKeySpansShared
}
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"go.etcd.io/raft/v3/raftpb"
"google.golang.org/grpc"
Expand Down Expand Up @@ -1117,6 +1118,7 @@ func (t *RaftTransport) dropFlowTokensForDisconnectedNodes() {
// of the snapshot.
func (t *RaftTransport) SendSnapshot(
ctx context.Context,
clusterID uuid.UUID,
storePool *storepool.StorePool,
header kvserverpb.SnapshotRequest_Header,
snap *OutgoingSnapshot,
Expand All @@ -1141,7 +1143,7 @@ func (t *RaftTransport) SendSnapshot(
log.Warningf(ctx, "failed to close snapshot stream: %+v", err)
}
}()
return sendSnapshot(ctx, t.st, t.tracer, stream, storePool, header, snap, newWriteBatch, sent, recordBytesSent)
return sendSnapshot(ctx, clusterID, t.st, t.tracer, stream, storePool, header, snap, newWriteBatch, sent, recordBytesSent)
}

// DelegateSnapshot sends a DelegateSnapshotRequest to a remote store
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/rditer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ go_library(
"//pkg/keys",
"//pkg/kv/kvserver/spanset",
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/util/iterutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_cockroachdb_pebble//rangekey",
],
Expand Down
30 changes: 9 additions & 21 deletions pkg/kv/kvserver/rditer/replica_data_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/rangekey"
)
Expand Down Expand Up @@ -429,34 +431,20 @@ func IterateReplicaKeySpans(
return nil
}

// IterateReplicaKeySpansShared iterates over the range's user key span,
// skipping any keys present in shared files. It calls the appropriate visitor
// function for the type of key visited, namely, point keys, range deletes and
// range keys. Shared files that are skipped during this iteration are also
// surfaced through a dedicated visitor. Note that this method only iterates
// over a range's user key span; IterateReplicaKeySpans must be called to
// iterate over the other key spans.
//
// Must use a reader with consistent iterators.
func IterateReplicaKeySpansShared(
// IterateReplicaKeySpansShared is a shared-replicate version of
// IterateReplicaKeySpans. See definitions of this method for how it is
// implemented.
var IterateReplicaKeySpansShared func(
ctx context.Context,
desc *roachpb.RangeDescriptor,
st *cluster.Settings,
clusterID uuid.UUID,
reader storage.Reader,
visitPoint func(key *pebble.InternalKey, val pebble.LazyValue, info pebble.IteratorLevel) error,
visitRangeDel func(start, end []byte, seqNum uint64) error,
visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
visitSharedFile func(sst *pebble.SharedSSTMeta) error,
) error {
if !reader.ConsistentIterators() {
panic("reader must provide consistent iterators")
}
spans := Select(desc.RangeID, SelectOpts{
ReplicatedSpansFilter: ReplicatedSpansUserOnly,
ReplicatedBySpan: desc.RSpan(),
})
span := spans[0]
return reader.ScanInternal(ctx, span.Key, span.EndKey, visitPoint, visitRangeDel, visitRangeKey, visitSharedFile)
}
) error

// IterateOptions instructs how points and ranges should be presented to visitor
// and if iterators should be visited in forward or reverse order.
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -3210,6 +3210,7 @@ func (r *Replica) followerSendSnapshot(
ctx, "send-snapshot", sendSnapshotTimeout, func(ctx context.Context) error {
resp, err := r.store.cfg.Transport.SendSnapshot(
ctx,
r.store.ClusterID(),
r.store.cfg.StorePool,
header,
snap,
Expand Down
14 changes: 10 additions & 4 deletions pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,9 @@ type kvBatchSnapshotStrategy struct {
// before flushing to disk. Only used on the receiver side.
sstChunkSize int64
// Only used on the receiver side.
scratch *SSTSnapshotStorageScratch
st *cluster.Settings
scratch *SSTSnapshotStorageScratch
st *cluster.Settings
clusterID uuid.UUID
}

// multiSSTWriter is a wrapper around an SSTWriter and SSTSnapshotStorageScratch
Expand Down Expand Up @@ -769,7 +770,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send(
// If snapshots containing shared files are allowed, and this range is a
// non-system range, take advantage of shared storage to minimize the amount
// of data we're iterating on and sending over the network.
sharedReplicate := header.SharedReplicate
sharedReplicate := header.SharedReplicate && rditer.IterateReplicaKeySpansShared != nil
replicatedFilter := rditer.ReplicatedSpansAll
if sharedReplicate {
replicatedFilter = rditer.ReplicatedSpansExcludeUser
Expand Down Expand Up @@ -837,7 +838,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send(

var valBuf []byte
if sharedReplicate {
err := rditer.IterateReplicaKeySpansShared(ctx, snap.State.Desc, snap.EngineSnap, func(key *pebble.InternalKey, value pebble.LazyValue, _ pebble.IteratorLevel) error {
err := rditer.IterateReplicaKeySpansShared(ctx, snap.State.Desc, kvSS.st, kvSS.clusterID, snap.EngineSnap, func(key *pebble.InternalKey, value pebble.LazyValue, _ pebble.IteratorLevel) error {
kvs++
if b == nil {
b = kvSS.newWriteBatch()
Expand Down Expand Up @@ -1375,6 +1376,7 @@ func (s *Store) receiveSnapshot(
scratch: s.sstSnapshotStorage.NewScratchSpace(header.State.Desc.RangeID, snapUUID),
sstChunkSize: snapshotSSTWriteSyncRate.Get(&s.cfg.Settings.SV),
st: s.ClusterSettings(),
clusterID: s.ClusterID(),
}
defer ss.Close(ctx)

Expand Down Expand Up @@ -1629,6 +1631,7 @@ var snapshotSSTWriteSyncRate = settings.RegisterByteSizeSetting(
// snapshot to the replica specified in the input.
func SendEmptySnapshot(
ctx context.Context,
clusterID uuid.UUID,
st *cluster.Settings,
tracer *tracing.Tracer,
cc *grpc.ClientConn,
Expand Down Expand Up @@ -1751,6 +1754,7 @@ func SendEmptySnapshot(

if _, err := sendSnapshot(
ctx,
clusterID,
st,
tracer,
stream,
Expand All @@ -1774,6 +1778,7 @@ func (n noopStorePool) Throttle(storepool.ThrottleReason, string, roachpb.StoreI
// sendSnapshot sends an outgoing snapshot via a pre-opened GRPC stream.
func sendSnapshot(
ctx context.Context,
clusterID uuid.UUID,
st *cluster.Settings,
tracer *tracing.Tracer,
stream outgoingSnapshotStream,
Expand Down Expand Up @@ -1844,6 +1849,7 @@ func sendSnapshot(
limiter: limiter,
newWriteBatch: newWriteBatch,
st: st,
clusterID: clusterID,
}

// Record timings for snapshot send if kv.trace.snapshot.enable_threshold is enabled
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3127,7 +3127,7 @@ func TestSendSnapshotThrottling(t *testing.T) {
expectedErr := errors.New("")
c := fakeSnapshotStream{nil, expectedErr}
_, err := sendSnapshot(
ctx, st, tr, c, sp, header, nil /* snap */, newBatch, nil /* sent */, nil, /* recordBytesSent */
ctx, uuid.MakeV4(), st, tr, c, sp, header, nil /* snap */, newBatch, nil /* sent */, nil, /* recordBytesSent */
)
if sp.failedThrottles != 1 {
t.Fatalf("expected 1 failed throttle, but found %d", sp.failedThrottles)
Expand All @@ -3146,7 +3146,7 @@ func TestSendSnapshotThrottling(t *testing.T) {
}
c := fakeSnapshotStream{resp, nil}
_, err := sendSnapshot(
ctx, st, tr, c, sp, header, nil /* snap */, newBatch, nil /* sent */, nil, /* recordBytesSent */
ctx, uuid.MakeV4(), st, tr, c, sp, header, nil /* snap */, newBatch, nil /* sent */, nil, /* recordBytesSent */
)
if sp.failedThrottles != 1 {
t.Fatalf("expected 1 failed throttle, but found %d", sp.failedThrottles)
Expand Down
1 change: 1 addition & 0 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1990,6 +1990,7 @@ func (n *Node) ResetQuorum(
// replicas from this fresh snapshot.
if err := kvserver.SendEmptySnapshot(
ctx,
n.clusterID.Get(),
n.storeCfg.Settings,
n.storeCfg.Tracer(),
conn,
Expand Down
15 changes: 10 additions & 5 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,10 @@ func ResolveEncryptedEnvOptions(
return fileRegistry, env, nil
}

// ConfigureForSharedStorage is used to configure a pebble Options for shared
// storage.
var ConfigureForSharedStorage func(opts *pebble.Options, storage remote.Storage) error

// NewPebble creates a new Pebble instance, at the specified path.
// Do not use directly (except in test); use Open instead.
//
Expand Down Expand Up @@ -1225,11 +1229,12 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) {
// in it is needed for CRDB to function properly.
if cfg.SharedStorage != nil {
esWrapper := &externalStorageWrapper{p: p, es: cfg.SharedStorage, ctx: ctx}
opts.Experimental.RemoteStorage = remote.MakeSimpleFactory(map[remote.Locator]remote.Storage{
"": esWrapper,
})
opts.Experimental.CreateOnShared = remote.CreateOnSharedLower
opts.Experimental.CreateOnSharedLocator = ""
if ConfigureForSharedStorage == nil {
return nil, errors.New("shared storage requires CCL features")
}
if err := ConfigureForSharedStorage(opts, esWrapper); err != nil {
return nil, errors.Wrap(err, "error when configuring shared storage")
}
} else {
if cfg.RemoteStorageFactory != nil {
opts.Experimental.RemoteStorage = remoteStorageAdaptor{p: p, ctx: ctx, factory: cfg.RemoteStorageFactory}
Expand Down