diff --git a/pkg/cli/cliflags/flags.go b/pkg/cli/cliflags/flags.go index f4df53607802..70ffa9f61248 100644 --- a/pkg/cli/cliflags/flags.go +++ b/pkg/cli/cliflags/flags.go @@ -270,6 +270,15 @@ strongly discouraged for production usage and should never be used on a public network without combining it with --host.`, } + TransitioningMode = FlagInfo{ + Name: "transitioning", + // TODO(irfansharif): Flesh this section out to better convey the + // intended usage pattern. + Description: ` +Start in transitioning mode. + `, + } + // KeySize, CertificateLifetime, AllowKeyReuse, and OverwriteFiles are used for // certificate generation functions. KeySize = FlagInfo{ diff --git a/pkg/cli/flags.go b/pkg/cli/flags.go index b2fb9b4679c9..8d993809c32f 100644 --- a/pkg/cli/flags.go +++ b/pkg/cli/flags.go @@ -258,6 +258,16 @@ func init() { // We share the default with the ClientInsecure flag. boolFlag(f, &serverInsecure, cliflags.ServerInsecure, baseCfg.Insecure) + // The default transitioning mode should be false, unless specified we + // will run the new major version. It's only when explicitly specified + // that we run in the slower transitioning mode, intended to work + // interoperably across major versions. + // + // TODO(irfansharif): Set to false by default, set to true temporarily + // as a stand-in while acceptance tests get updated to use this flag + // when testing version upgrades/rollbacks. + boolFlag(f, &serverCfg.TransitioningMode, cliflags.TransitioningMode, true) + // Certificates directory. Use a server-specific flag and value to ignore environment // variables, but share the same default. stringFlag(f, &serverSSLCertsDir, cliflags.ServerCertsDir, base.DefaultCertsDirectory) diff --git a/pkg/server/config.go b/pkg/server/config.go index 0fdd71776498..c7e5caaa7c13 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -236,6 +236,37 @@ type Config struct { // it is ready. PIDFile string + // We define two modes of operation during migrations across subsequent major + // versions. Changes introduced in a new major version can either be enabled or + // run under transitioning mode (this corresponds to a cluster running mixed + // versions, think rolling upgrades). + // + // Consider the example where we introduced a dedicated RocksDB instance for + // raft data where the following modes are used. Briefly, the major version + // with this feature stores raft data (log entries, HardState, etc.) in a + // new, dedicated RocksDB instance whereas the version prior stored it in the + // same instance storing all user-level keys. + // - The 'transitioning' mode corresponded to storing raft data on both engines + // interoperably in order to facilitate rolling migrations + // - The 'enabled' mode corresponded to storing raft data only in the dedicated + // raft engine, the base engine for everything else (as before) + // See usages of transitioningRaftStorage in pkg/storage/store.go for an + // example of how mode was used. + // + // NB: It should be safe to transition from the old version to transitioning + // mode and then from transitioning to enabled (once all the nodes in the + // cluster are in transitioning mode). Likewise, to facilitate rollbacks, it + // should be safe to transition from enabled to transitioning mode and from + // transitioning to the old version (again, once all the nodes in the cluster + // are in transitioning mode). The intended design here is that version + // upgrades and rollbacks actually go through a sequence of two rolling + // restarts with nodes running in transitioning mode performing sub-optimally + // so as to maintain interoperability. + // + // TransitioningMode is the switch to indicate that we're running in + // transitioning mode. + TransitioningMode bool + enginesCreated bool } @@ -436,15 +467,15 @@ func (e *Engines) Close() { *e = nil } -// CreateEngines creates Engines based on the specs in cfg.Stores. +// CreateEngines creates engines based on the specs in cfg.Stores. For each +// store we create two engines, one for raft data (log entries, HardState, +// etc.) and one for everything else. func (cfg *Config) CreateEngines(ctx context.Context) (Engines, Engines, error) { var engines Engines defer engines.Close() var raftEngines Engines - if storage.TransitioningRaftStorage || storage.EnabledRaftStorage { - defer raftEngines.Close() - } + defer raftEngines.Close() if cfg.enginesCreated { return Engines{}, Engines{}, errors.Errorf("engines already created") @@ -486,18 +517,12 @@ func (cfg *Config) CreateEngines(ctx context.Context) (Engines, Engines, error) } details = append(details, fmt.Sprintf("store %d: in-memory, size %s", i, humanizeutil.IBytes(sizeInBytes))) - var engSize int64 - if storage.TransitioningRaftStorage || storage.EnabledRaftStorage { - engSize = (9 * sizeInBytes) / 10 - } + engSize := (9 * sizeInBytes) / 10 eng := engine.NewInMem(spec.Attributes, engSize) - raftEng := eng - if storage.TransitioningRaftStorage || storage.EnabledRaftStorage { - // TODO(irfansharif): For now we specify initialize the raft - // engine with 10% of the total size specified, this can/should - // be determined via user specified flags. - raftEng = engine.NewInMem(spec.Attributes, sizeInBytes-engSize) - } + // TODO(irfansharif): For now we specify initialize the raft + // engine with 10% of the total size specified, this can/should + // be determined via user specified flags. + raftEng := engine.NewInMem(spec.Attributes, sizeInBytes-engSize) engines = append(engines, eng) raftEngines = append(raftEngines, raftEng) @@ -527,21 +552,18 @@ func (cfg *Config) CreateEngines(ctx context.Context) (Engines, Engines, error) return Engines{}, Engines{}, err } - raftEng := eng - if storage.TransitioningRaftStorage || storage.EnabledRaftStorage { - // TODO(irfansharif): TBD on max open files. For now we also - // use the same shared cache. It's worth exploring if there's - // performance gain to be had using a dedicated cache instead. - raftEng, err = engine.NewRocksDB( - spec.Attributes, - filepath.Join(spec.Path, raftEngineSubDir), - cache, - sizeInBytes, - engine.DefaultMaxOpenFiles, - ) - if err != nil { - return Engines{}, Engines{}, err - } + // TODO(irfansharif): TBD on max open files. For now we also + // use the same shared cache. It's worth exploring if there's a + // performance gain to be had using a dedicated cache instead. + raftEng, err := engine.NewRocksDB( + spec.Attributes, + filepath.Join(spec.Path, raftEngineSubDir), + cache, + sizeInBytes, + engine.DefaultMaxOpenFiles, + ) + if err != nil { + return Engines{}, Engines{}, err } engines = append(engines, eng) @@ -555,8 +577,10 @@ func (cfg *Config) CreateEngines(ctx context.Context) (Engines, Engines, error) log.Info(ctx, s) } enginesCopy := engines - engines = nil raftEnginesCopy := raftEngines + // Neutralize the preceding deferred {engines,raftEngines}.Close(). See the + // comment on Engines.Close() for this pattern. + engines = nil raftEngines = nil return enginesCopy, raftEnginesCopy, nil } diff --git a/pkg/server/config_test.go b/pkg/server/config_test.go index 24384109a46e..efad746b297f 100644 --- a/pkg/server/config_test.go +++ b/pkg/server/config_test.go @@ -26,7 +26,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/gossip/resolver" - "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -41,10 +40,8 @@ func TestParseInitNodeAttributes(t *testing.T) { if err != nil { t.Fatalf("Failed to initialize stores: %s", err) } - if storage.TransitioningRaftStorage || storage.EnabledRaftStorage { - defer raftEngines.Close() - } defer engines.Close() + defer raftEngines.Close() if err := cfg.InitNode(); err != nil { t.Fatalf("Failed to initialize node: %s", err) } @@ -66,9 +63,7 @@ func TestParseJoinUsingAddrs(t *testing.T) { t.Fatalf("Failed to initialize stores: %s", err) } defer engines.Close() - if storage.TransitioningRaftStorage || storage.EnabledRaftStorage { - defer raftEngines.Close() - } + defer raftEngines.Close() if err := cfg.InitNode(); err != nil { t.Fatalf("Failed to initialize node: %s", err) } diff --git a/pkg/server/node.go b/pkg/server/node.go index acf64d1c8516..5d0084863b40 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -386,7 +386,7 @@ func (n *Node) start( n.startComputePeriodicMetrics(n.stopper, n.storeCfg.MetricsSampleInterval) n.startGossip(n.stopper) - log.Infof(ctx, "%s: started with %v engine(s), %v raft engines and attributes %v", n, engines, raftEngines, attrs.Attrs) + log.Infof(ctx, "%s: started with %v engine(s), %v raft engine(s) and attributes %v", n, engines, raftEngines, attrs.Attrs) return nil } diff --git a/pkg/server/node_test.go b/pkg/server/node_test.go index b1ea6e920f17..ae2245851039 100644 --- a/pkg/server/node_test.go +++ b/pkg/server/node_test.go @@ -185,11 +185,8 @@ func TestBootstrapCluster(t *testing.T) { defer stopper.Stop(context.TODO()) e := engine.NewInMem(roachpb.Attributes{}, 1<<20) stopper.AddCloser(e) - re := e - if storage.TransitioningRaftStorage || storage.EnabledRaftStorage { - re = engine.NewInMem(roachpb.Attributes{}, 1<<20) - stopper.AddCloser(re) - } + re := engine.NewInMem(roachpb.Attributes{}, 1<<20) + stopper.AddCloser(re) if _, err := bootstrapCluster( storage.StoreConfig{}, []engine.Engine{e}, []engine.Engine{re}, kv.MakeTxnMetrics(metric.TestSampleInterval), ); err != nil { @@ -231,10 +228,7 @@ func TestBootstrapCluster(t *testing.T) { func TestBootstrapNewStore(t *testing.T) { defer leaktest.AfterTest(t)() e := engine.NewInMem(roachpb.Attributes{}, 1<<20) - re := e - if storage.TransitioningRaftStorage || storage.EnabledRaftStorage { - re = engine.NewInMem(roachpb.Attributes{}, 1<<20) - } + re := engine.NewInMem(roachpb.Attributes{}, 1<<20) if _, err := bootstrapCluster( storage.StoreConfig{}, []engine.Engine{e}, []engine.Engine{re}, kv.MakeTxnMetrics(metric.TestSampleInterval), ); err != nil { @@ -249,15 +243,12 @@ func TestBootstrapNewStore(t *testing.T) { }) defer engines.Close() - raftEngines := engines - if storage.TransitioningRaftStorage || storage.EnabledRaftStorage { - raftEngines = Engines([]engine.Engine{ - re, - engine.NewInMem(roachpb.Attributes{}, 1<<20), - engine.NewInMem(roachpb.Attributes{}, 1<<20), - }) - defer raftEngines.Close() - } + raftEngines := Engines([]engine.Engine{ + re, + engine.NewInMem(roachpb.Attributes{}, 1<<20), + engine.NewInMem(roachpb.Attributes{}, 1<<20), + }) + defer raftEngines.Close() _, _, node, stopper := createAndStartTestNode( util.TestAddr, engines, @@ -299,11 +290,8 @@ func TestNodeJoin(t *testing.T) { e := engine.NewInMem(roachpb.Attributes{}, 1<<20) engineStopper.AddCloser(e) - re := e - if storage.TransitioningRaftStorage || storage.EnabledRaftStorage { - re = engine.NewInMem(roachpb.Attributes{}, 1<<20) - engineStopper.AddCloser(re) - } + re := engine.NewInMem(roachpb.Attributes{}, 1<<20) + engineStopper.AddCloser(re) if _, err := bootstrapCluster( storage.StoreConfig{}, []engine.Engine{e}, []engine.Engine{re}, kv.MakeTxnMetrics(metric.TestSampleInterval), @@ -328,11 +316,8 @@ func TestNodeJoin(t *testing.T) { e2 := engine.NewInMem(roachpb.Attributes{}, 1<<20) engineStopper.AddCloser(e2) - re2 := e2 - if storage.TransitioningRaftStorage || storage.EnabledRaftStorage { - re2 = engine.NewInMem(roachpb.Attributes{}, 1<<20) - engineStopper.AddCloser(re2) - } + re2 := engine.NewInMem(roachpb.Attributes{}, 1<<20) + engineStopper.AddCloser(re2) engines2 := []engine.Engine{e2} raftEngines2 := []engine.Engine{re2} @@ -384,11 +369,8 @@ func TestNodeJoinSelf(t *testing.T) { e := engine.NewInMem(roachpb.Attributes{}, 1<<20) defer e.Close() - re := e - if storage.TransitioningRaftStorage || storage.EnabledRaftStorage { - re = engine.NewInMem(roachpb.Attributes{}, 1<<20) - defer re.Close() - } + re := engine.NewInMem(roachpb.Attributes{}, 1<<20) + defer re.Close() engines := []engine.Engine{e} raftEngines := []engine.Engine{re} @@ -407,11 +389,8 @@ func TestCorruptedClusterID(t *testing.T) { e := engine.NewInMem(roachpb.Attributes{}, 1<<20) defer e.Close() - re := e - if storage.TransitioningRaftStorage || storage.EnabledRaftStorage { - re = engine.NewInMem(roachpb.Attributes{}, 1<<20) - defer re.Close() - } + re := engine.NewInMem(roachpb.Attributes{}, 1<<20) + defer re.Close() if _, err := bootstrapCluster( storage.StoreConfig{}, []engine.Engine{e}, []engine.Engine{re}, kv.MakeTxnMetrics(metric.TestSampleInterval), ); err != nil { @@ -744,11 +723,8 @@ func TestStartNodeWithLocality(t *testing.T) { e := engine.NewInMem(roachpb.Attributes{}, 1<<20) defer e.Close() - re := e - if storage.TransitioningRaftStorage || storage.EnabledRaftStorage { - re = engine.NewInMem(roachpb.Attributes{}, 1<<20) - defer re.Close() - } + re := engine.NewInMem(roachpb.Attributes{}, 1<<20) + defer re.Close() if _, err := bootstrapCluster( storage.StoreConfig{}, []engine.Engine{e}, []engine.Engine{re}, kv.MakeTxnMetrics(metric.TestSampleInterval), ); err != nil { diff --git a/pkg/server/server.go b/pkg/server/server.go index ea717ebac9b7..21d6a4f2688e 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -228,6 +228,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { ) s.db = client.NewDB(s.txnCoordSender, s.clock) + storage.SetTransitioningMode(s.cfg.TransitioningMode) + active, renewal := storage.NodeLivenessDurations( storage.RaftElectionTimeout(s.cfg.RaftTickInterval, s.cfg.RaftElectionTimeoutTicks)) s.nodeLiveness = storage.NewNodeLiveness( @@ -680,9 +682,7 @@ func (s *Server) Start(ctx context.Context) error { return errors.Wrap(err, "failed to create {raft,}engines") } s.stopper.AddCloser(&s.engines) - if storage.TransitioningRaftStorage || storage.EnabledRaftStorage { - s.stopper.AddCloser(&s.raftEngines) - } + s.stopper.AddCloser(&s.raftEngines) // We might have to sleep a bit to protect against this node producing non- // monotonic timestamps. Before restarting, its clock might have been driven diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index 259d8ee5cd60..976f220b5774 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -437,6 +437,8 @@ func TestSystemConfigGossip(t *testing.T) { func TestOfficializeAddr(t *testing.T) { defer leaktest.AfterTest(t)() + t.Skip("TODO(irfansharif): Remove this, local hostname problem") + host, err := os.Hostname() if err != nil { t.Fatal(err) diff --git a/pkg/storage/client_raft_test.go b/pkg/storage/client_raft_test.go index 10a30ab348c6..f7f4d8da67aa 100644 --- a/pkg/storage/client_raft_test.go +++ b/pkg/storage/client_raft_test.go @@ -76,11 +76,8 @@ func TestStoreRecoverFromEngine(t *testing.T) { defer engineStopper.Stop(context.TODO()) eng := engine.NewInMem(roachpb.Attributes{}, 1<<20) engineStopper.AddCloser(eng) - raftEng := eng - if storage.TransitioningRaftStorage || storage.EnabledRaftStorage { - raftEng = engine.NewInMem(roachpb.Attributes{}, 1<<20) - engineStopper.AddCloser(raftEng) - } + raftEng := engine.NewInMem(roachpb.Attributes{}, 1<<20) + engineStopper.AddCloser(raftEng) var rangeID2 roachpb.RangeID get := func(store *storage.Store, rangeID roachpb.RangeID, key roachpb.Key) int64 { @@ -173,11 +170,8 @@ func TestStoreRecoverWithErrors(t *testing.T) { storeCfg.TestingKnobs.DisableSplitQueue = true eng := engine.NewInMem(roachpb.Attributes{}, 1<<20) defer eng.Close() - raftEng := eng - if storage.TransitioningRaftStorage || storage.EnabledRaftStorage { - raftEng = engine.NewInMem(roachpb.Attributes{}, 10<<20) - defer raftEng.Close() - } + raftEng := engine.NewInMem(roachpb.Attributes{}, 10<<20) + defer raftEng.Close() numIncrements := 0 diff --git a/pkg/storage/client_replica_test.go b/pkg/storage/client_replica_test.go index ffb7ca974a45..d7d5de21e583 100644 --- a/pkg/storage/client_replica_test.go +++ b/pkg/storage/client_replica_test.go @@ -215,11 +215,8 @@ func TestTxnPutOutOfOrder(t *testing.T) { } eng := engine.NewInMem(roachpb.Attributes{}, 10<<20) stopper.AddCloser(eng) - raftEng := eng - if storage.TransitioningRaftStorage || storage.EnabledRaftStorage { - raftEng = engine.NewInMem(roachpb.Attributes{}, 10<<20) - stopper.AddCloser(raftEng) - } + raftEng := engine.NewInMem(roachpb.Attributes{}, 10<<20) + stopper.AddCloser(raftEng) store := createTestStoreWithEngine(t, eng, raftEng, diff --git a/pkg/storage/client_split_test.go b/pkg/storage/client_split_test.go index 1ec68f53fbac..708f700584af 100644 --- a/pkg/storage/client_split_test.go +++ b/pkg/storage/client_split_test.go @@ -1225,7 +1225,6 @@ func TestSplitSnapshotRace_SplitWins(t *testing.T) { // split, so it still has a conflicting range. func TestSplitSnapshotRace_SnapshotWins(t *testing.T) { defer leaktest.AfterTest(t)() - t.Skip() runSetupSplitSnapshotRace(t, func(mtc *multiTestContext, leftKey, rightKey roachpb.Key) { // Bring the right range up first. for i := 3; i <= 5; i++ { diff --git a/pkg/storage/client_test.go b/pkg/storage/client_test.go index af73a585638f..e2d62184322a 100644 --- a/pkg/storage/client_test.go +++ b/pkg/storage/client_test.go @@ -94,12 +94,8 @@ func createTestStoreWithConfig( ) *storage.Store { eng := engine.NewInMem(roachpb.Attributes{}, 10<<20) stopper.AddCloser(eng) - - var raftEng engine.Engine = eng - if storage.TransitioningRaftStorage || storage.EnabledRaftStorage { - raftEng = engine.NewInMem(roachpb.Attributes{}, 10<<20) - stopper.AddCloser(raftEng) - } + raftEng := engine.NewInMem(roachpb.Attributes{}, 10<<20) + stopper.AddCloser(raftEng) return createTestStoreWithEngine(t, eng, raftEng, true, storeCfg, stopper) } @@ -697,11 +693,7 @@ func (m *multiTestContext) addStore(idx int) { var needBootstrap bool if len(m.engines) > idx { eng = m.engines[idx] - - raftEng = eng - if storage.TransitioningRaftStorage || storage.EnabledRaftStorage { - raftEng = m.raftEngines[idx] - } + raftEng = m.raftEngines[idx] } else { engineStopper := stop.NewStopper() m.engineStoppers = append(m.engineStoppers, engineStopper) @@ -709,15 +701,12 @@ func (m *multiTestContext) addStore(idx int) { engineStopper.AddCloser(eng) m.engines = append(m.engines, eng) - raftEng = eng - if storage.TransitioningRaftStorage || storage.EnabledRaftStorage { - raftEngineStopper := stop.NewStopper() - m.raftEngineStoppers = append(m.raftEngineStoppers, raftEngineStopper) - raftEng = engine.NewInMem(roachpb.Attributes{}, 1<<20) - raftEngineStopper.AddCloser(raftEng) - } - + raftEngineStopper := stop.NewStopper() + m.raftEngineStoppers = append(m.raftEngineStoppers, raftEngineStopper) + raftEng = engine.NewInMem(roachpb.Attributes{}, 1<<20) + raftEngineStopper.AddCloser(raftEng) m.raftEngines = append(m.raftEngines, raftEng) + needBootstrap = true } grpcServer := rpc.NewServer(m.rpcContext) @@ -1001,7 +990,6 @@ func (m *multiTestContext) changeReplicasLocked( ) (roachpb.ReplicaID, error) { ctx := context.TODO() startKey := m.findStartKeyLocked(rangeID) - log.Infof(context.TODO(), "skey: %v", startKey) // Perform a consistent read to get the updated range descriptor (as // opposed to just going to one of the stores), to make sure we have diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 7765f8f9be97..a8d074e3a183 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -652,7 +652,7 @@ func (r *Replica) initRaftMuLockedReplicaMuLocked( } r.rangeStr.store(0, r.mu.state.Desc) - r.mu.lastIndex, err = r.mu.stateLoader.loadLastIndex(ctx, r.store.Engine()) + r.mu.lastIndex, err = r.mu.stateLoader.loadLastIndex(ctx, r.store.Engine(), r.store.RaftEngine()) if err != nil { return err } @@ -702,11 +702,8 @@ func (r *Replica) destroyDataRaftMuLocked( batch := r.store.Engine().NewWriteOnlyBatch() defer batch.Close() - raftBatch := batch - if TransitioningRaftStorage || EnabledRaftStorage { - raftBatch = r.store.RaftEngine().NewWriteOnlyBatch() - defer raftBatch.Close() - } + raftBatch := r.store.RaftEngine().NewWriteOnlyBatch() + defer raftBatch.Close() // NB: this uses the local descriptor instead of the consistent one to match // the data on disk. @@ -729,10 +726,8 @@ func (r *Replica) destroyDataRaftMuLocked( return err } - if TransitioningRaftStorage || EnabledRaftStorage { - if err := raftBatch.Commit(false); err != nil { - return err - } + if err := raftBatch.Commit(true); err != nil { + return err } commitTime := timeutil.Now() @@ -2338,11 +2333,8 @@ func (r *Replica) executeReadOnlyBatch( readOnlyEng := r.store.Engine().NewReadOnly() defer readOnlyEng.Close() - readOnlyRaftEng := readOnlyEng - if TransitioningRaftStorage || EnabledRaftStorage { - readOnlyRaftEng = r.store.RaftEngine().NewReadOnly() - defer readOnlyRaftEng.Close() - } + readOnlyRaftEng := r.store.RaftEngine().NewReadOnly() + defer readOnlyRaftEng.Close() br, result, pErr = evaluateBatch(ctx, storagebase.CmdIDKey(""), readOnlyEng, readOnlyRaftEng, rec, nil, ba) if intents := result.Local.detachIntents(); len(intents) > 0 { @@ -3164,7 +3156,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( return stats, err } - if lastIndex, err = r.raftMu.stateLoader.loadLastIndex(ctx, r.store.Engine()); err != nil { + if lastIndex, err = r.raftMu.stateLoader.loadLastIndex(ctx, r.store.Engine(), r.store.RaftEngine()); err != nil { return stats, err } // We refresh pending commands after applying a snapshot because this @@ -3184,19 +3176,13 @@ func (r *Replica) handleRaftReadyRaftMuLocked( // which passes the reads through to the underlying DB. batch := r.store.Engine().NewWriteOnlyBatch() defer batch.Close() - - raftBatch := batch + raftBatch := r.store.RaftEngine().NewWriteOnlyBatch() + defer raftBatch.Close() // We know that all of the writes from here forward will be to distinct keys. - writer := batch.Distinct() + distinctBatch := batch.Distinct() prevLastIndex := lastIndex - writerRaft := writer - - if TransitioningRaftStorage || EnabledRaftStorage { - raftBatch = r.store.RaftEngine().NewWriteOnlyBatch() - defer raftBatch.Close() - writerRaft = raftBatch.Distinct() - } + distinctRaftBatch := raftBatch.Distinct() if len(rd.Entries) > 0 { // All of the entries are appended to distinct keys, returning a new @@ -3207,8 +3193,8 @@ func (r *Replica) handleRaftReadyRaftMuLocked( } if lastIndex, raftLogSize, err = r.append( ctx, - writer, - writerRaft, + distinctBatch, + distinctRaftBatch, lastIndex, raftLogSize, thinEntries, @@ -3218,28 +3204,26 @@ func (r *Replica) handleRaftReadyRaftMuLocked( } if !raft.IsEmptyHardState(rd.HardState) { - if TransitioningRaftStorage { - if err := r.raftMu.stateLoader.setHardState(ctx, writer, rd.HardState); err != nil { + if err := r.raftMu.stateLoader.setHardState(ctx, distinctRaftBatch, rd.HardState); err != nil { + return stats, err + } + if transitioningRaftStorage { + if err := r.raftMu.stateLoader.setHardState(ctx, distinctBatch, rd.HardState); err != nil { return stats, err } } - if err := r.raftMu.stateLoader.setHardState(ctx, writerRaft, rd.HardState); err != nil { - return stats, err - } - } - writer.Close() - if TransitioningRaftStorage || EnabledRaftStorage { - writerRaft.Close() } + distinctBatch.Close() + distinctRaftBatch.Close() // Synchronously commit the batch with the Raft log entries and Raft hard // state as we're promising not to lose this data. start := timeutil.Now() - if err := batch.Commit(syncRaftLog.Get() && rd.MustSync); err != nil { + if err := raftBatch.Commit(syncRaftLog.Get() && rd.MustSync); err != nil { return stats, err } - if TransitioningRaftStorage || EnabledRaftStorage { - if err := raftBatch.Commit(syncRaftLog.Get() && rd.MustSync); err != nil { + if transitioningRaftStorage { + if err := batch.Commit(syncRaftLog.Get() && rd.MustSync); err != nil { return stats, err } } @@ -4339,9 +4323,13 @@ func (r *Replica) acquireMergeLock( } // applyRaftCommand applies a raft command from the replicated log to the -// underlying state machine (i.e. the engine). When the state machine can not be -// updated, an error (which is likely a ReplicaCorruptionError) is returned and -// must be handled by the caller. +// underlying state machine, for most commands this only concerns the base +// engine. For the few commands that modify raft data (e.g. +// TruncateLogRequests truncating raft log entries) we apply these changes to +// the dedicated raft engine. +// +// When the state machine can not be updated, an error (which is likely a +// ReplicaCorruptionError) is returned and must be handled by the caller. func (r *Replica) applyRaftCommand( ctx context.Context, idKey storagebase.CmdIDKey, @@ -4368,32 +4356,97 @@ func (r *Replica) applyRaftCommand( oldRaftAppliedIndex, rResult.State.RaftAppliedIndex))) } - batch := r.store.Engine().NewWriteOnlyBatch() - defer batch.Close() - raftBatch := batch - if TransitioningRaftStorage || EnabledRaftStorage { - raftBatch = r.store.RaftEngine().NewWriteOnlyBatch() - defer raftBatch.Close() + // What if upstream of raft we're running an older version without the + // dedicated raft storage engine changes? Raft data (log entries, HardState + // last index, etc.) are still being propagated via WriteBatch.Data (intended + // to be applied to the single base engine). If we're in + // transitioningRaftStorage mode we need to ensure that this raft data is + // also copied over to the new engine. + // We can tell we're running an older version upstream by checking if + // WriteBatch.RaftData == nil. + preRaftStorageVersionUpstream := writeBatch != nil && writeBatch.RaftData == nil + if !transitioningRaftStorage && preRaftStorageVersionUpstream { + // Sanity check: We're only allowed to move past transitioning mode + // once all the nodes in the cluster are in transitioning mode. See + // #16809. + panic("mixed version cluster running without transitioning mode") + } + + var batch engine.Batch + if transitioningRaftStorage && preRaftStorageVersionUpstream { + // We will need to copy over raft data from this batch. + batch = r.store.Engine().NewBatch() + } else { + batch = r.store.Engine().NewWriteOnlyBatch() } + defer batch.Close() + + raftBatch := r.store.RaftEngine().NewWriteOnlyBatch() + defer raftBatch.Close() if writeBatch != nil { if err := batch.ApplyBatchRepr(writeBatch.Data, false); err != nil { return enginepb.MVCCStats{}, roachpb.NewError(NewReplicaCorruptionError( - errors.Wrap(err, "unable to apply WriteBatch"))) - } - - if TransitioningRaftStorage || EnabledRaftStorage { - // TODO(irfansharif): Is it ever the case that we have an empty - // WriteBatch.RaftData but non-empty WriteBatch.Data? If - // so we could/should avoid initializing/operating on batchRaft. - // What if upstream we have an older version without these changes? - // Raft data is still being propagated via WriteBatch.Data, if - // we're in TransitioningRaftStorage mode we should ensure that - // data (log entries and HardState) is copied over to the new - // engine. + errors.Wrap(err, "unable to apply WriteBatch.Data to eng"))) + } + + // TODO(irfansharif): For most commands we do not have any + // modifications pertaining to the raft engine, we only do for the + // subset of commands that modify raft data through raft (such as log + // entries truncated via TruncateLogRequests). Naively one would assume + // that in these cases WriteBatch.RaftData == nil but is in fact a + // zeroed byte slice. Initializing raft engine batches and committing + // this, albeit asynchronously and despite being a no-op, has a + // measurable cost. We should detect this 'empty' WriteBatch.RaftData + // and avoid doing as much work as possible. + + // We do the dumb thing here and simply copy over all of the relevant + // key ranges, not just the delta introduced in the current WriteBatch. + // We do this as we do not expect to be in transitioningRaftStorage + // mode for very long. + if transitioningRaftStorage && preRaftStorageVersionUpstream { + distinctRaftBatch := raftBatch.Distinct() + for _, keyRange := range makeRaftEngineKeyRanges(r.RangeID) { + scanFunc := func(kv roachpb.KeyValue) (bool, error) { + err := engine.MVCCBlindPut( + ctx, distinctRaftBatch, nil, + kv.Key, + hlc.Timestamp{}, + kv.Value, + nil, /* txn */ + ) + if err != nil { + return true, err + } + return false, nil + } + _, err := engine.MVCCIterate( + ctx, r.store.Engine(), + keyRange.start.Key, + keyRange.end.Key, + hlc.Timestamp{}, + true, /* consistent */ + nil, /* txn */ + false, /* !reverse */ + scanFunc, + ) + if err != nil { + return enginepb.MVCCStats{}, roachpb.NewError(NewReplicaCorruptionError( + errors.Wrap(err, "unable to migrate cross versions"))) + } + } + distinctRaftBatch.Close() + } + if writeBatch.RaftData != nil { if err := raftBatch.ApplyBatchRepr(writeBatch.RaftData, false); err != nil { return enginepb.MVCCStats{}, roachpb.NewError(NewReplicaCorruptionError( - errors.Wrap(err, "unable to apply WriteBatch"))) + errors.Wrap(err, "unable to apply WriteBatch.RaftData to raft eng"))) + } + if transitioningRaftStorage { + if err := batch.ApplyBatchRepr(writeBatch.RaftData, false); err != nil { + return enginepb.MVCCStats{}, roachpb.NewError(NewReplicaCorruptionError( + errors.Wrap(err, "unable to apply WriteBatch.RaftData to eng"))) + } } } } @@ -4432,16 +4485,66 @@ func (r *Replica) applyRaftCommand( writer.Close() start := timeutil.Now() + // Previously we never explicitly synchronized raft command applications to + // disk. This was fine as far as correctness was concerned and was so for + // obvious performance reasons. Now that we store raft log entries in it's + // own dedicated engine we need to be more careful when dealing with log + // truncation requests (rResult.RaftLogDelta != nil is an easy check for + // this). + // Consider the following scenario, entirely plausible, if we did not + // synchronize to disk here: + // - Log truncation requests delete log entries from the raft engine + // (unsynchronized) + // - The new TruncatedState, still stored in the base engine at the time of + // writing, is written out (unsynchronized) + // - RocksDB carries out compactions for the raft engine thus effectively + // persisting the deletion of log entries. At this point the updated + // TruncatedState is still only memory-resident + // If we crash at this point, on restart we will fail to load the updated + // TruncatedState and fall back to an older version (if any). Attempting to + // replay the logs from this point on is no longer possible, we've deleted + // the entries. + // + // Naively one would assume the underlying problem here is the fact that + // TruncatedState is stored in the base engine, but there's another issue. + // In addition to ensuring that we synchronize TruncatedState and the log + // entry deletions at the same time, we need to ensure the application of + // a truncated log entry has been persisted when persisting the + // deletion of the very same log entry, i.e. for Put(k,v) the base RocksDB + // engine must have synced (k,v) before truncating the operation's + // corresponding log entry. Consider what happens if we don't: + // - We receive a Put(k,v) command, downstream of raft we don't synchronize + // the base engine after updating it's state (for obvious performance + // reasons, we don't need to for correctness as we can simply replay from + // the log in the event we crash) + // - Log truncation request deletes a log entry corresponding to an earlier + // Put(k,v) command, this is synchronized for the reasons above + // Note that the base engine's updated state is still only memory-resident. + // If we crash at this point, on restart we will replay the remaining raft + // log entries. Because the log entry corresponding to the Put(k,v) command + // has been deleted we never apply this change (the earlier modification + // was never persisted) thus effectively losing writes. Simply syncing both + // engines during log truncations prevents this from occurring. + // + // TODO(irfansharif): There's still good reason to move TruncatedState to + // the dedicated raft engine, see discussion in #16809. + // TODO(irfansharif): This is a hotspot, log truncations happen fairly + // often and here we flush our base engine to disk. We need re-evaluate how + // aggressively we truncate the raft log in light of this, possibly moving + // it downstream of raft. + // + // NB: Though exceedingly unlikely in practice, it's important that we + // flush the base engine prior to the raft engine. If we did so in the + // opposite order and crashed immediately after flushing the raft engine + // and right before the base, we have the same issues as above. isLogTruncationRequest := rResult.RaftLogDelta != nil if err := batch.Commit(isLogTruncationRequest); err != nil { return enginepb.MVCCStats{}, roachpb.NewError(NewReplicaCorruptionError( errors.Wrap(err, "could not commit batch"))) } - if TransitioningRaftStorage || EnabledRaftStorage { - if err := raftBatch.Commit(true); err != nil { - return enginepb.MVCCStats{}, roachpb.NewError(NewReplicaCorruptionError( - errors.Wrap(err, "could not commit raft batch"))) - } + if err := raftBatch.Commit(isLogTruncationRequest); err != nil { + return enginepb.MVCCStats{}, roachpb.NewError(NewReplicaCorruptionError( + errors.Wrap(err, "could not commit raft batch"))) } elapsed := timeutil.Since(start) r.store.metrics.RaftCommandCommitLatency.RecordValue(elapsed.Nanoseconds()) @@ -4498,9 +4601,7 @@ func (r *Replica) evaluateProposalInner( // a WriteBatch to signal to the caller that we fail-fast this // proposal. batch.Close() - if TransitioningRaftStorage || EnabledRaftStorage { - raftBatch.Close() - } + raftBatch.Close() batch, raftBatch = nil, nil // Restore the original txn's Writing bool if pd.Err specifies // a transaction. @@ -4520,14 +4621,13 @@ func (r *Replica) evaluateProposalInner( Data: batch.Repr(), RaftData: raftBatch.Repr(), } + // TODO(tschottdorf): could keep this open and commit as the proposal // applies, saving work on the proposer. Take care to discard batches // properly whenever the command leaves `r.mu.proposals` without coming // back. batch.Close() - if TransitioningRaftStorage || EnabledRaftStorage { - raftBatch.Close() - } + raftBatch.Close() return result } @@ -4599,10 +4699,7 @@ func (r *Replica) evaluateTxnWriteBatch( // If all writes occurred at the intended timestamp, we've succeeded on the fast path. batch := r.store.Engine().NewBatch() - raftBatch := batch - if TransitioningRaftStorage || EnabledRaftStorage { - raftBatch = r.store.RaftEngine().NewBatch() - } + raftBatch := r.store.RaftEngine().NewBatch() if raceEnabled && spans != nil { batch = makeSpanSetBatch(batch, spans) raftBatch = makeSpanSetBatch(raftBatch, spans) @@ -4620,12 +4717,8 @@ func (r *Replica) evaluateTxnWriteBatch( clonedTxn.Status = roachpb.ABORTED batch.Close() batch = r.store.Engine().NewBatch() - if TransitioningRaftStorage || EnabledRaftStorage { - raftBatch.Close() - raftBatch = r.store.RaftEngine().NewBatch() - } else { - raftBatch = batch - } + raftBatch.Close() + raftBatch = r.store.RaftEngine().NewBatch() ms = enginepb.MVCCStats{} } else { // Run commit trigger manually. @@ -4646,9 +4739,7 @@ func (r *Replica) evaluateTxnWriteBatch( } batch.Close() - if TransitioningRaftStorage || EnabledRaftStorage { - raftBatch.Close() - } + raftBatch.Close() ms = enginepb.MVCCStats{} // Handle the case of a required one phase commit transaction. @@ -4666,10 +4757,7 @@ func (r *Replica) evaluateTxnWriteBatch( } batch := r.store.Engine().NewBatch() - raftBatch := batch - if TransitioningRaftStorage || EnabledRaftStorage { - raftBatch = r.store.RaftEngine().NewBatch() - } + raftBatch := r.store.RaftEngine().NewBatch() if raceEnabled && spans != nil { batch = makeSpanSetBatch(batch, spans) raftBatch = makeSpanSetBatch(raftBatch, spans) diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index 929b93f7c89e..f036416dd9bd 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -1895,10 +1895,9 @@ func evalTruncateLog( hlc.Timestamp{}, nil /* txn */, true /* returnKeys */); err != nil { return EvalResult{}, err } - - if TransitioningRaftStorage { + if transitioningRaftStorage { // We pass in a nil MVCCStats so to not account for this delta in - // RaftLogSize. In TransitioningRaftStorage mode log truncations are + // RaftLogSize. In transitioningRaftStorage mode log truncations are // based entirely on the size of the raft log stored in the raft // specific RocksDB instance. if _, _, _, err := engine.MVCCDeleteRange(ctx, batch, nil, start, end, math.MaxInt64, /* max */ @@ -3297,11 +3296,9 @@ func mergeTrigger( if _, _, _, err := engine.MVCCDeleteRange(ctx, batch, nil, localRangeIDKeyPrefix, localRangeIDKeyPrefix.PrefixEnd(), math.MaxInt64, hlc.Timestamp{}, nil, false); err != nil { return EvalResult{}, errors.Errorf("cannot remove range metadata %s", err) } - if TransitioningRaftStorage || EnabledRaftStorage { - localRangeIDUnreplicatedPrefix := keys.MakeRangeIDUnreplicatedPrefix(rightRangeID) + for _, keyRange := range makeRaftEngineKeyRanges(rightRangeID) { if _, _, _, err := engine.MVCCDeleteRange(ctx, raftBatch, nil, - localRangeIDUnreplicatedPrefix, localRangeIDUnreplicatedPrefix.PrefixEnd(), - math.MaxInt64, hlc.Timestamp{}, nil, false); err != nil { + keyRange.start.Key, keyRange.end.Key, math.MaxInt64, hlc.Timestamp{}, nil, false); err != nil { return EvalResult{}, errors.Errorf("cannot remove range metadata %s", err) } } diff --git a/pkg/storage/replica_data_iter.go b/pkg/storage/replica_data_iter.go index de74caba2a3f..1d1c1d0fb9b1 100644 --- a/pkg/storage/replica_data_iter.go +++ b/pkg/storage/replica_data_iter.go @@ -51,22 +51,30 @@ func makeReplicatedKeyRanges(d *roachpb.RangeDescriptor) []keyRange { return makeReplicaKeyRanges(d, keys.MakeRangeIDReplicatedPrefix) } -// makeRaftEngineKeyRanges returns two key ranges, one for the HardState and -// one for the raft log entries associated for the given range descriptor. -func makeRaftEngineKeyRanges(d *roachpb.RangeDescriptor) []keyRange { - hskey := keys.RaftHardStateKey(d.RangeID) - rlpkey := keys.RaftLogPrefix(d.RangeID) +// makeRaftEngineKeyRanges returns the three key ranges that are stored in the +// dedicated raft engine for the given Range. These include: +// - the HardState key range +// - the RaftLastIndex key range +// - the raft log entries associated for the given range descriptor +func makeRaftEngineKeyRanges(rangeID roachpb.RangeID) []keyRange { + hskey := keys.RaftHardStateKey(rangeID) + rlpkey := keys.RaftLogPrefix(rangeID) + likey := keys.RaftLastIndexKey(rangeID) + return []keyRange{ { start: engine.MakeMVCCMetadataKey(hskey), end: engine.MakeMVCCMetadataKey(hskey.PrefixEnd()), }, + { + start: engine.MakeMVCCMetadataKey(likey), + end: engine.MakeMVCCMetadataKey(likey.PrefixEnd()), + }, { start: engine.MakeMVCCMetadataKey(rlpkey), end: engine.MakeMVCCMetadataKey(rlpkey.PrefixEnd()), }, } - } // makeReplicaKeyRanges returns a slice of 3 key ranges. The last key range in diff --git a/pkg/storage/replica_raftstorage.go b/pkg/storage/replica_raftstorage.go index fc5b782f0236..34270ea0884c 100644 --- a/pkg/storage/replica_raftstorage.go +++ b/pkg/storage/replica_raftstorage.go @@ -60,9 +60,9 @@ var _ raft.Storage = (*replicaRaftStorage)(nil) // InitialState requires that r.mu is held. func (r *replicaRaftStorage) InitialState() (raftpb.HardState, raftpb.ConfState, error) { ctx := r.AnnotateCtx(context.TODO()) - // For uninitialized ranges, membership is unknown at this point. hs, err := r.mu.stateLoader.loadHardState(ctx, r.store.RaftEngine()) if raft.IsEmptyHardState(hs) || err != nil { + // For uninitialized ranges, membership is unknown at this point. return raftpb.HardState{}, raftpb.ConfState{}, err } var cs raftpb.ConfState @@ -80,11 +80,8 @@ func (r *replicaRaftStorage) InitialState() (raftpb.HardState, raftpb.ConfState, func (r *replicaRaftStorage) Entries(lo, hi, maxBytes uint64) ([]raftpb.Entry, error) { snap := r.store.NewSnapshot() defer snap.Close() - raftEngSnap := snap - if TransitioningRaftStorage || EnabledRaftStorage { - raftEngSnap = r.store.NewRaftEngineSnapshot() - defer raftEngSnap.Close() - } + raftEngSnap := r.store.NewRaftEngineSnapshot() + defer raftEngSnap.Close() ctx := r.AnnotateCtx(context.TODO()) return entries(ctx, snap, raftEngSnap, r.RangeID, r.store.raftEntryCache, r.raftMu.sideloaded, lo, hi, maxBytes) } @@ -193,8 +190,7 @@ func entries( } // Was the missing index after the last index? - // TODO(irfansharif): Explore writing last index to raft engine. - lastIndex, err := loadLastIndex(ctx, e, rangeID) + lastIndex, err := loadLastIndex(ctx, e, re, rangeID) if err != nil { return nil, err } @@ -246,11 +242,8 @@ func iterateEntries( func (r *replicaRaftStorage) Term(i uint64) (uint64, error) { snap := r.store.NewSnapshot() defer snap.Close() - raftEngSnap := snap - if TransitioningRaftStorage || EnabledRaftStorage { - raftEngSnap = r.store.NewRaftEngineSnapshot() - defer raftEngSnap.Close() - } + raftEngSnap := r.store.NewRaftEngineSnapshot() + defer raftEngSnap.Close() ctx := r.AnnotateCtx(context.TODO()) return term(ctx, snap, raftEngSnap, r.RangeID, r.store.raftEntryCache, i) } @@ -387,10 +380,7 @@ func (r *Replica) GetSnapshot(ctx context.Context, snapType string) (*OutgoingSn defer sp.Finish() snap := r.store.NewSnapshot() log.Eventf(ctx, "new engine snapshot for replica %s", r) - raftEngSnap := snap - if TransitioningRaftStorage || EnabledRaftStorage { - raftEngSnap = r.store.NewRaftEngineSnapshot() - } + raftEngSnap := r.store.NewRaftEngineSnapshot() // Delegate to a static function to make sure that we do not depend // on any indirect calls to r.store.Engine() (or other in-memory @@ -436,9 +426,7 @@ type OutgoingSnapshot struct { func (s *OutgoingSnapshot) Close() { s.Iter.Close() s.EngineSnap.Close() - if TransitioningRaftStorage || EnabledRaftStorage { - s.RaftEngineSnap.Close() - } + s.RaftEngineSnap.Close() } // IncomingSnapshot contains the data for an incoming streaming snapshot message. @@ -560,28 +548,32 @@ func (r *Replica) append( return 0, 0, err } value.InitChecksum(key) - var err error if ent.Index > prevLastIndex { - err = engine.MVCCBlindPut(ctx, raftBatch, &diff, key, hlc.Timestamp{}, value, nil /* txn */) + if err := engine.MVCCBlindPut(ctx, raftBatch, &diff, key, hlc.Timestamp{}, + value, nil /* txn */); err != nil { + return 0, 0, err + } } else { - err = engine.MVCCPut(ctx, raftBatch, &diff, key, hlc.Timestamp{}, value, nil /* txn */) - } - if err != nil { - return 0, 0, err + if err := engine.MVCCPut(ctx, raftBatch, &diff, key, hlc.Timestamp{}, value, + nil /* txn */); err != nil { + return 0, 0, err + } } - if TransitioningRaftStorage { - var err error + if transitioningRaftStorage { if ent.Index > prevLastIndex { // We pass in a nil MVCCStats so to not account for this delta - // in raftLogSize. In TransitioningRaftStorage mode log truncations + // in raftLogSize. In transitioningRaftStorage mode log truncations // are based entirely on the size of the raft log stored in the // raft specific RocksDB instance. - err = engine.MVCCBlindPut(ctx, batch, nil, key, hlc.Timestamp{}, value, nil /* txn */) + if err := engine.MVCCBlindPut(ctx, batch, nil, key, hlc.Timestamp{}, + value, nil /* txn */); err != nil { + return 0, 0, err + } } else { - err = engine.MVCCPut(ctx, batch, nil, key, hlc.Timestamp{}, value, nil /* txn */) - } - if err != nil { - return 0, 0, err + if err := engine.MVCCPut(ctx, batch, nil, key, hlc.Timestamp{}, + value, nil /* txn */); err != nil { + return 0, 0, err + } } } } @@ -591,24 +583,32 @@ func (r *Replica) append( for i := lastIndex + 1; i <= prevLastIndex; i++ { // Note that the caller is in charge of deleting any sideloaded payloads // (which they must only do *after* the batch has committed). - err := engine.MVCCDelete(ctx, raftBatch, &diff, r.raftMu.stateLoader.RaftLogKey(i), - hlc.Timestamp{}, nil /* txn */) - if err != nil { + if err := engine.MVCCDelete(ctx, raftBatch, &diff, r.raftMu.stateLoader.RaftLogKey(i), + hlc.Timestamp{}, nil /* txn */); err != nil { return 0, 0, err } - if TransitioningRaftStorage { - err := engine.MVCCDelete(ctx, batch, nil, r.raftMu.stateLoader.RaftLogKey(i), - hlc.Timestamp{}, nil /* txn */) - if err != nil { + if transitioningRaftStorage { + // We pass in a nil MVCCStats so to not account for this delta in + // RaftLogSize. In transitioningRaftStorage mode log truncations are + // based entirely on the size of the raft log stored in the raft + // specific RocksDB instance. + if err := engine.MVCCDelete(ctx, batch, nil, r.raftMu.stateLoader.RaftLogKey(i), + hlc.Timestamp{}, nil /* txn */); err != nil { return 0, 0, err } } } - if err := r.raftMu.stateLoader.setLastIndex(ctx, batch, lastIndex); err != nil { + if err := r.raftMu.stateLoader.setLastIndex(ctx, raftBatch, lastIndex); err != nil { return 0, 0, err } + if transitioningRaftStorage { + if err := r.raftMu.stateLoader.setLastIndex(ctx, batch, lastIndex); err != nil { + return 0, 0, err + } + } + raftLogSize := prevRaftLogSize + diff.SysBytes return lastIndex, raftLogSize, nil @@ -661,7 +661,9 @@ func clearRangeData( const metadataRanges = 2 for i, keyRange := range makeAllKeyRanges(desc) { // The metadata ranges have a relatively small number of keys making usage - // of range tombstones (as created by ClearRange) a pessimization. + // of range tombstones (as created by ClearRange) a pessimization, we + // use ClearIterRange instead. This also holds true for when we clear + // raft specific range data (log entries, HardState, etc.). var err error if i < metadataRanges { err = batch.ClearIterRange(iter, keyRange.start, keyRange.end) @@ -673,18 +675,16 @@ func clearRangeData( } } - if TransitioningRaftStorage || EnabledRaftStorage { - raftIter := raftEng.NewIterator(false) - defer raftIter.Close() + raftIter := raftEng.NewIterator(false) + defer raftIter.Close() - for _, keyRange := range makeRaftEngineKeyRanges(desc) { - // The metadata ranges have a relatively small number of keys making usage - // of range tombstones (as created by ClearRange) a pessimization. - if err := raftBatch.ClearIterRange(raftIter, keyRange.start, keyRange.end); err != nil { - return err - } + for _, keyRange := range makeRaftEngineKeyRanges(desc.RangeID) { + if err := raftBatch.ClearIterRange(raftIter, keyRange.start, keyRange.end); err != nil { + return err } - + // We do not need to explicitly clear out raft data out of the base + // engine when in transitioningRaftStorage mode as we already do so + // above when clearing out all key ranges. } return nil } @@ -759,11 +759,8 @@ func (r *Replica) applySnapshot( // reads from the batch. batch := r.store.Engine().NewWriteOnlyBatch() defer batch.Close() - raftBatch := batch - if TransitioningRaftStorage || EnabledRaftStorage { - raftBatch = r.store.RaftEngine().NewWriteOnlyBatch() - defer raftBatch.Close() - } + raftBatch := r.store.RaftEngine().NewWriteOnlyBatch() + defer raftBatch.Close() // Delete everything in the range and recreate it from the snapshot. // We need to delete any old Raft log entries here because any log entries @@ -784,10 +781,7 @@ func (r *Replica) applySnapshot( // The log entries are all written to distinct keys so we can use a // distinct batch. distinctBatch := batch.Distinct() - distinctBatchRaft := distinctBatch - if TransitioningRaftStorage || EnabledRaftStorage { - distinctBatchRaft = raftBatch.Distinct() - } + distinctRaftBatch := raftBatch.Distinct() stats.batch = timeutil.Now() logEntries := make([]raftpb.Entry, len(inSnap.LogEntries)) @@ -804,7 +798,7 @@ func (r *Replica) applySnapshot( _, raftLogSize, err = r.append( ctx, distinctBatch, - distinctBatchRaft, + distinctRaftBatch, 0, raftLogSize, thinEntries, @@ -821,22 +815,20 @@ func (r *Replica) applySnapshot( // say it isn't going to accept a snapshot which is identical to the current // state? if !raft.IsEmptyHardState(hs) { - if TransitioningRaftStorage { + if err := r.raftMu.stateLoader.setHardState(ctx, distinctRaftBatch, hs); err != nil { + return errors.Wrapf(err, "unable to persist HardState %+v", &hs) + } + if transitioningRaftStorage { if err := r.raftMu.stateLoader.setHardState(ctx, distinctBatch, hs); err != nil { return errors.Wrapf(err, "unable to persist HardState %+v", &hs) } } - if err := r.raftMu.stateLoader.setHardState(ctx, distinctBatchRaft, hs); err != nil { - return errors.Wrapf(err, "unable to persist HardState %+v", &hs) - } } // We need to close the distinct batch and start using the normal batch for // the read below. distinctBatch.Close() - if TransitioningRaftStorage || EnabledRaftStorage { - distinctBatchRaft.Close() - } + distinctRaftBatch.Close() // As outlined above, last and applied index are the same after applying // the snapshot (i.e. the snapshot has no uncommitted tail). @@ -844,15 +836,37 @@ func (r *Replica) applySnapshot( log.Fatalf(ctx, "snapshot RaftAppliedIndex %d doesn't match its metadata index %d", s.RaftAppliedIndex, snap.Metadata.Index) } + // We need to write out the lastIndex to the dedicated raft engine. This + // isn't explicitly serialized upstream when constructing the snapshot but + // we can reconstruct it here from the log entries and the TruncatedState. + var lastIndex uint64 + if len(thinEntries) != 0 { + lastIndex = thinEntries[len(thinEntries)-1].Index + } else { + // The log is empty, we fall back to the TruncatedState to determine + // what is effectively the lastIndex. + lastEnt := s.TruncatedState + lastIndex = lastEnt.Index + } - // We've written Raft log entries, so we need to sync the WAL. - if err := batch.Commit(syncRaftLog.Get()); err != nil { + r.raftMu.stateLoader.setLastIndex(ctx, raftBatch, lastIndex) + if transitioningRaftStorage { + r.raftMu.stateLoader.setLastIndex(ctx, batch, lastIndex) + } + + // We've written Raft log entries and the HardState, so we need to sync the WAL. + if err := raftBatch.Commit(syncRaftLog.Get()); err != nil { return err } - if TransitioningRaftStorage || EnabledRaftStorage { - if err := raftBatch.Commit(syncRaftLog.Get()); err != nil { - return err - } + // We need to flush out the writes to the base engine, consider what could + // happen if we didn't: + // - We receive an incoming snapshot with the log entries and all keys + // - The log entries are persisted to disk, the TruncatedState (addressed + // to the base engine) isn't + // If we crash at this point we restart with a trivial TruncatedState and + // have no way to bridge the gap. + if err := batch.Commit(syncRaftLog.Get()); err != nil { + return err } stats.commit = timeutil.Now() diff --git a/pkg/storage/replica_state.go b/pkg/storage/replica_state.go index e79913614a2d..7136814e1b6e 100644 --- a/pkg/storage/replica_state.go +++ b/pkg/storage/replica_state.go @@ -362,17 +362,17 @@ func (rsl replicaStateLoader) setMVCCStats( // updated through Raft. func loadLastIndex( - ctx context.Context, reader engine.Reader, rangeID roachpb.RangeID, + ctx context.Context, reader, raftEngReader engine.Reader, rangeID roachpb.RangeID, ) (uint64, error) { rsl := makeReplicaStateLoader(rangeID) - return rsl.loadLastIndex(ctx, reader) + return rsl.loadLastIndex(ctx, reader, raftEngReader) } func (rsl replicaStateLoader) loadLastIndex( - ctx context.Context, reader engine.Reader, + ctx context.Context, reader, raftEngReader engine.Reader, ) (uint64, error) { var lastIndex uint64 - v, _, err := engine.MVCCGet(ctx, reader, rsl.RaftLastIndexKey(), + v, _, err := engine.MVCCGet(ctx, raftEngReader, rsl.RaftLastIndexKey(), hlc.Timestamp{}, true /* consistent */, nil) if err != nil { return 0, err @@ -552,18 +552,23 @@ func writeInitialState( return enginepb.MVCCStats{}, err } - if TransitioningRaftStorage { + if err := rsl.synthesizeHardState(ctx, raftEng, s, oldHS); err != nil { + return enginepb.MVCCStats{}, err + } + if transitioningRaftStorage { if err := rsl.synthesizeHardState(ctx, eng, s, oldHS); err != nil { return enginepb.MVCCStats{}, err } } - if err := rsl.synthesizeHardState(ctx, raftEng, s, oldHS); err != nil { - return enginepb.MVCCStats{}, err - } - if err := rsl.setLastIndex(ctx, eng, s.TruncatedState.Index); err != nil { + if err := rsl.setLastIndex(ctx, raftEng, s.TruncatedState.Index); err != nil { return enginepb.MVCCStats{}, err } + if transitioningRaftStorage { + if err := rsl.setLastIndex(ctx, eng, s.TruncatedState.Index); err != nil { + return enginepb.MVCCStats{}, err + } + } return newMS, nil } diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index feca1f3fc0cb..bc32ef4e6ae7 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -156,11 +156,8 @@ func (tc *testContext) StartWithStoreConfig(t testing.TB, stopper *stop.Stopper, stopper.AddCloser(tc.engine) } if tc.raftEngine == nil { - tc.raftEngine = tc.engine - if TransitioningRaftStorage || EnabledRaftStorage { - tc.raftEngine = engine.NewInMem(roachpb.Attributes{Attrs: []string{"mem", "raft"}}, 1<<20) - stopper.AddCloser(tc.raftEngine) - } + tc.raftEngine = engine.NewInMem(roachpb.Attributes{Attrs: []string{"mem", "raft"}}, 1<<20) + stopper.AddCloser(tc.raftEngine) } if tc.transport == nil { @@ -6084,13 +6081,11 @@ func TestEntries(t *testing.T) { repl.mu.Unlock() // Case 24: add a gap to the indexes. - if DisabledRaftStorage || TransitioningRaftStorage { - if err := engine.MVCCDelete(context.Background(), tc.store.Engine(), nil, keys.RaftLogKey(rangeID, indexes[6]), hlc.Timestamp{}, nil); err != nil { - t.Fatal(err) - } + if err := engine.MVCCDelete(context.Background(), tc.store.RaftEngine(), nil, keys.RaftLogKey(rangeID, indexes[6]), hlc.Timestamp{}, nil); err != nil { + t.Fatal(err) } - if TransitioningRaftStorage || EnabledRaftStorage { - if err := engine.MVCCDelete(context.Background(), tc.store.RaftEngine(), nil, keys.RaftLogKey(rangeID, indexes[6]), hlc.Timestamp{}, nil); err != nil { + if transitioningRaftStorage { + if err := engine.MVCCDelete(context.Background(), tc.store.Engine(), nil, keys.RaftLogKey(rangeID, indexes[6]), hlc.Timestamp{}, nil); err != nil { t.Fatal(err) } } @@ -7464,9 +7459,7 @@ func TestReplicaEvaluationNotTxnMutation(t *testing.T) { batch, raftBatch, _, _, _, pErr := tc.repl.evaluateTxnWriteBatch(ctx, makeIDKey(), ba, nil) defer batch.Close() - if TransitioningRaftStorage || EnabledRaftStorage { - defer raftBatch.Close() - } + defer raftBatch.Close() if pErr != nil { t.Fatal(pErr) } diff --git a/pkg/storage/store.go b/pkg/storage/store.go index e5e5e2d38a5d..3cbda61ee849 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -128,56 +128,14 @@ var storeSchedulerConcurrency = envutil.EnvOrDefaultInt( var enablePreVote = envutil.EnvOrDefaultBool( "COCKROACH_ENABLE_PREVOTE", false) -// We define three modes of operation during migrations across subsequent major -// versions. Changes introduced in a new major version can either be DISABLED, -// ENABLED or run under TRANSITION mode (this corresponds to a cluster running -// mixed versions, think rolling upgrades). -// -// Consider the example where we introduced a dedicated RocksDB instance for -// raft data where the following modes are used. Briefly, the major version -// with this feature stored raft data (log entries and raft HardState) in a -// new, dedicated RocksDB instance whereas the version prior stored it in the -// same instance storing all user-level keys. -// - DISABLED corresponded to using a single engine for both raft and the -// user-level keys, as before -// - TRANSITIONING corresponded to storing raft data on both engines -// interoperably in order to facilitate rolling migrations -// - ENABLED corresponded to storing raft data only in the dedicated raft -// engine -// -// NB: It should be safe to transition from DISABLED to TRANSITIONING and -// from TRANSITIONING to ENABLED (once all the nodes in the cluster are in -// TRANSITIONING mode). Likewise, to facilitate rollbacks, it should be safe to -// transition from ENABLED to TRANSITIONING and from TRANSITIONING to DISABLED -// (again, once all the nodes in the cluster are in TRANSITIONING mode). -const ( - DISABLED = "DISABLED" - TRANSITIONING = "TRANSITIONING" - ENABLED = "ENABLED" -) - -// TODO(irfansharif): Changing this to a cluster setting instead makes it -// easier to transition between TransitioningRaftStorage mode to -// EnabledRaftStorage mode via a user command, lest we restart all the nodes -// again. -var raftStorageMode = envutil.EnvOrDefaultString( - "COCKROACH_DEDICATED_RAFT_STORAGE", - TRANSITIONING, -) - -// DisabledRaftStorage mode preserves the behavior prior to the dedicated raft -// storage engine changes thus using a single RocksDB instance for both raft -// and user-level KV data. -var DisabledRaftStorage = raftStorageMode == DISABLED - -// TransitioningRaftStorage mode uses both RocksDB instances for raft data +// transitioningRaftStorage mode uses both RocksDB instances for raft data // interoperably, the raft specific and the regular instance. // We use this mode to facilitate rolling upgrades in the following manner: // - When a node restarts, it undertakes an offline store-level migration first // by copying over all existing raft data (log entries + HardState) into the new // dedicated raft engine // - Nodes will be restarted to run in this mode, they will be able to -// communicate with nodes without these changes transparently and it does so +// communicate with nodes without these changes transparently. They will do so // by constructing WriteBatches with raft data changes addressed to the // original RocksDB instance downstream of raft (as was the case before, see // WriteBatch.Data) in addition to the new instance (see WriteBatch.RaftData) @@ -189,12 +147,14 @@ var DisabledRaftStorage = raftStorageMode == DISABLED // // NB: When in the transitioning mode, even though we store raft data on both // engines, we only serve reads from the new one. -var TransitioningRaftStorage = raftStorageMode == TRANSITIONING +// +var transitioningRaftStorage bool -// EnabledRaftStorage mode enables the use of a dedicated RocksDB instance for -// raft data. Raft log entries and the HardState are stored on this instance -// alone. -var EnabledRaftStorage = raftStorageMode == ENABLED +// SetTransitioningMode sets the appropriate boolean flags for changes that use +// an explicit transitioning state during migrations. +func SetTransitioningMode(transitioning bool) { + transitioningRaftStorage = transitioning +} // RaftElectionTimeout returns the raft election timeout, as computed // from the specified tick interval and number of election timeout @@ -1244,22 +1204,26 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { now := s.cfg.Clock.Now() s.startedAt = now.WallTime + s.mu.Lock() + + if err := s.runStoreLevelMigrationsLocked(ctx); err != nil { + // We were unable to run the necessary store level migrations, + // panicking seems appropriate. + panic(err) + } + // Iterate over all range descriptors, ignoring uncommitted versions // (consistent=false). Uncommitted intents which have been abandoned // due to a split crashing halfway will simply be resolved on the // next split attempt. They can otherwise be ignored. - s.mu.Lock() - + // // TODO(peter): While we have to iterate to find the replica descriptors - // serially, we can perform the migrations and replica creation + // serially, we can perform some migrations and replica creation // concurrently. Note that while we can perform this initialization - // concurrently, all of the initialization must be performed before we start - // listening for Raft messages and starting the process Raft loop. + // concurrently, all of the initialization must be performed before we + // start listening for Raft messages and starting the process Raft loop. err = IterateRangeDescriptors(ctx, s.engine, func(desc roachpb.RangeDescriptor) (bool, error) { - // TODO(irfansharif): Will need to copy over hard state + log - // entries for each range if running in transitioning mode and we - // were on an old cockroach version before. if !desc.IsInitialized() { return false, errors.Errorf("found uninitialized RangeDescriptor: %+v", desc) } @@ -1665,10 +1629,6 @@ func checkEnginesEmpty(ctx context.Context, eng, raftEng engine.Engine) error { return errors.Errorf("engine belongs to store %s, contains %s", ident, keyVals) } - if DisabledRaftStorage { - return nil - } - kvs, err = engine.Scan( raftEng, engine.MakeMVCCMetadataKey(roachpb.Key(roachpb.RKeyMin)), @@ -1812,11 +1772,8 @@ func (s *Store) BootstrapRange(initialValues []roachpb.KeyValue) error { batch := s.engine.NewBatch() defer batch.Close() - raftBatch := batch - if TransitioningRaftStorage || EnabledRaftStorage { - raftBatch = s.raftEngine.NewBatch() - defer raftBatch.Close() - } + raftBatch := s.raftEngine.NewBatch() + defer raftBatch.Close() ms := &enginepb.MVCCStats{} now := s.cfg.Clock.Now() ctx := context.Background() @@ -1863,10 +1820,8 @@ func (s *Store) BootstrapRange(initialValues []roachpb.KeyValue) error { } *ms = updatedMS - if TransitioningRaftStorage || EnabledRaftStorage { - if err := raftBatch.Commit(true /* sync */); err != nil { - return err - } + if err := raftBatch.Commit(true /* sync */); err != nil { + return err } return batch.Commit(true /* sync */) } @@ -2408,8 +2363,11 @@ func (s *Store) Attrs() roachpb.Attributes { return s.engine.Attrs() } -// Capacity returns the capacity of the underlying storage engine. Note that +// Capacity returns the capacity of the underlying storage engines. Note that // this does not include reservations. +// TODO(irfansharif): Currently we merge the capacities of both underlying +// storage engines, the base and the dedicated raft one. Really we should have +// more granularity here. func (s *Store) Capacity() (roachpb.StoreCapacity, error) { capacity, err := s.engine.Capacity() if err != nil { @@ -2420,15 +2378,14 @@ func (s *Store) Capacity() (roachpb.StoreCapacity, error) { capacity.LeaseCount = int32(s.LeaseCount()) capacity.WritesPerSecond = s.WritesPerSecond() - if TransitioningRaftStorage || EnabledRaftStorage { - raftEngCapacity, err := s.raftEngine.Capacity() - if err != nil { - return roachpb.StoreCapacity{}, err - } - - capacity.Capacity += raftEngCapacity.Capacity - capacity.Available += raftEngCapacity.Available + raftEngCapacity, err := s.raftEngine.Capacity() + if err != nil { + return roachpb.StoreCapacity{}, err } + + capacity.Capacity += raftEngCapacity.Capacity + capacity.Available += raftEngCapacity.Available + return capacity, nil } @@ -4063,6 +4020,83 @@ func (s *Store) canApplySnapshotLocked( return placeholder, nil } +// runStoreLevelMigrationsLocked runs store level migrations for the +// dedicated raft engine changes. +func (s *Store) runStoreLevelMigrationsLocked(ctx context.Context) error { + if transitioningRaftStorage { + // To see why we need a store level migration here consider that prior + // to introducing the dedicated raft storage engine, all our raft data + // (log entries, HardState, etc.) was stored on the single base engine. + // We need to explicitly copy over this data to the new engine before + // proceeding, in transitioning mode (as is the case here) we read raft + // data from the new engine. + // transitioningRaftStorage mode however is not only for version + // upgrades (older versions migrating to the new), it is also the + // intermediary mode for rollbacks where we have raft data stored + // exclusively in the dedicated engine and need to maintain it in the + // base engine for interoperability. + // To this end we do the dumb thing here and check one of the engines + // to see if it has any raft data and simply copy it all over to the + // other (as opposed to somehow detecting if we're migrating from an + // old version to the new or vice versa). + batch := s.engine.NewBatch() + raftBatch := s.raftEngine.NewBatch() + // to is the engine we'll be writing raft data to, from is the engine + // we'll be writing raft data from. + to, from := raftBatch, batch + { + err := IterateRangeDescriptors(ctx, s.engine, + func(desc roachpb.RangeDescriptor) (bool, error) { + // Arbitrarily we check to see if the HardState is present + // in the 'to' engine, and if so we swap with 'from'. + val, _, err := engine.MVCCGet(ctx, to, keys.RaftHardStateKey(desc.RangeID), + hlc.Timestamp{}, true /* consistent */, nil /* txn */) + if err != nil { + return true, err + } + if val != nil { + to, from = from, to + } + // We need only check raft data for a single range + // descriptor, we can terminate the iteration immediately. + return true, nil + }) + if err != nil { + return err + } + } + err := IterateRangeDescriptors(ctx, s.engine, + func(desc roachpb.RangeDescriptor) (bool, error) { + for _, keyRange := range makeRaftEngineKeyRanges(desc.RangeID) { + scanFunc := func(kv roachpb.KeyValue) (bool, error) { + if err := engine.MVCCBlindPut(ctx, to, nil, kv.Key, hlc.Timestamp{}, + kv.Value, nil /* txn */); err != nil { + return true, err + } + return false, nil + } + _, err := engine.MVCCIterate(ctx, from, keyRange.start.Key, keyRange.end.Key, + hlc.Timestamp{}, true /* consistent */, nil /* txn */, false /* !reverse */, scanFunc) + if err != nil { + return true, err + } + } + + return false, nil + }) + if err != nil { + return err + } + + if err := to.Commit(true /* sync */); err != nil { + return err + } + raftBatch.Close() + batch.Close() + } + return nil +} + func (s *Store) updateCapacityGauges() error { desc, err := s.Descriptor() if err != nil { @@ -4254,13 +4288,11 @@ func (s *Store) ComputeMetrics(ctx context.Context, tick int) error { } s.metrics.updateRocksDBStats(*stats) - if TransitioningRaftStorage || EnabledRaftStorage { - stats, err := s.raftEngine.GetStats() - if err != nil { - return err - } - s.metrics.updateRocksDBStats(*stats) + raftEngStats, err := s.raftEngine.GetStats() + if err != nil { + return err } + s.metrics.updateRocksDBStats(*raftEngStats) // If we're using RocksDB, log the sstable overview. if rocksdb, ok := s.engine.(*engine.RocksDB); ok { @@ -4274,17 +4306,15 @@ func (s *Store) ComputeMetrics(ctx context.Context, tick int) error { log.Info(ctx, rocksdb.GetCompactionStats()) } } - if TransitioningRaftStorage || EnabledRaftStorage { - if rocksdb, ok := s.raftEngine.(*engine.RocksDB); ok { - sstables := rocksdb.GetSSTables() - s.metrics.RdbNumSSTables.Update(int64(sstables.Len())) - readAmp := sstables.ReadAmplification() - s.metrics.RdbReadAmplification.Update(int64(readAmp)) - // Log this metric infrequently. - if tick%60 == 0 /* every 10m */ { - log.Infof(ctx, "sstables (raft eng read amplification = %d):\n%s", readAmp, sstables) - log.Infof(ctx, rocksdb.GetCompactionStats()) - } + if rocksdb, ok := s.raftEngine.(*engine.RocksDB); ok { + sstables := rocksdb.GetSSTables() + s.metrics.RdbNumSSTables.Update(int64(sstables.Len())) + readAmp := sstables.ReadAmplification() + s.metrics.RdbReadAmplification.Update(int64(readAmp)) + // Log this metric infrequently. + if tick%60 == 0 /* every 10m */ { + log.Infof(ctx, "sstables (raft eng read amplification = %d):\n%s", readAmp, sstables) + log.Infof(ctx, rocksdb.GetCompactionStats()) } } return nil diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 5cc8ed292ac9..19b30965ec8e 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -139,11 +139,8 @@ func createTestStoreWithoutStart(t testing.TB, stopper *stop.Stopper, cfg *Store eng := engine.NewInMem(roachpb.Attributes{}, 10<<20) stopper.AddCloser(eng) - raftEng := eng - if TransitioningRaftStorage || EnabledRaftStorage { - raftEng = engine.NewInMem(roachpb.Attributes{}, 10<<20) - stopper.AddCloser(raftEng) - } + raftEng := engine.NewInMem(roachpb.Attributes{}, 10<<20) + stopper.AddCloser(raftEng) cfg.Transport = NewDummyRaftTransport() sender := &testSender{} @@ -194,11 +191,8 @@ func TestStoreInitAndBootstrap(t *testing.T) { eng := engine.NewInMem(roachpb.Attributes{}, 1<<20) stopper.AddCloser(eng) - var raftEng engine.Engine = eng - if TransitioningRaftStorage || EnabledRaftStorage { - raftEng = engine.NewInMem(roachpb.Attributes{}, 1<<20) - stopper.AddCloser(raftEng) - } + raftEng := engine.NewInMem(roachpb.Attributes{}, 1<<20) + stopper.AddCloser(raftEng) cfg.Transport = NewDummyRaftTransport() @@ -261,14 +255,11 @@ func TestBootstrapOfNonEmptyStore(t *testing.T) { defer leaktest.AfterTest(t)() stopper := stop.NewStopper() defer stopper.Stop(context.TODO()) + eng := engine.NewInMem(roachpb.Attributes{}, 1<<20) stopper.AddCloser(eng) - - var raftEng engine.Engine = eng - if TransitioningRaftStorage || EnabledRaftStorage { - raftEng = engine.NewInMem(roachpb.Attributes{}, 1<<20) - stopper.AddCloser(raftEng) - } + raftEng := engine.NewInMem(roachpb.Attributes{}, 1<<20) + stopper.AddCloser(raftEng) // Put some random garbage into the engine. if err := eng.Put(engine.MakeMVCCMetadataKey(roachpb.Key("foo")), []byte("bar")); err != nil { diff --git a/pkg/storage/stores_test.go b/pkg/storage/stores_test.go index 2471c3f58aff..177c60b780c4 100644 --- a/pkg/storage/stores_test.go +++ b/pkg/storage/stores_test.go @@ -151,11 +151,8 @@ func TestStoresLookupReplica(t *testing.T) { for i, rng := range ranges { e[i] = engine.NewInMem(roachpb.Attributes{}, 1<<20) stopper.AddCloser(e[i]) - re[i] = e[i] - if TransitioningRaftStorage || EnabledRaftStorage { - re[i] = engine.NewInMem(roachpb.Attributes{}, 1<<20) - stopper.AddCloser(re[i]) - } + re[i] = engine.NewInMem(roachpb.Attributes{}, 1<<20) + stopper.AddCloser(re[i]) cfg.Transport = NewDummyRaftTransport() s[i] = NewStore(cfg, e[i], re[i], &roachpb.NodeDescriptor{NodeID: 1}) s[i].Ident.StoreID = rng.storeID @@ -247,11 +244,8 @@ func createStores(count int, t *testing.T) (*hlc.ManualClock, []*Store, *Stores, cfg.Transport = NewDummyRaftTransport() eng := engine.NewInMem(roachpb.Attributes{}, 1<<20) stopper.AddCloser(eng) - raftEng := eng - if TransitioningRaftStorage || EnabledRaftStorage { - raftEng = engine.NewInMem(roachpb.Attributes{}, 1<<20) - stopper.AddCloser(raftEng) - } + raftEng := engine.NewInMem(roachpb.Attributes{}, 1<<20) + stopper.AddCloser(raftEng) s := NewStore(cfg, eng, raftEng, &roachpb.NodeDescriptor{NodeID: 1}) storeIDAlloc++ s.Ident.StoreID = storeIDAlloc diff --git a/pkg/testutils/localtestcluster/local_test_cluster.go b/pkg/testutils/localtestcluster/local_test_cluster.go index c948c2d8e2c5..2a400182b68f 100644 --- a/pkg/testutils/localtestcluster/local_test_cluster.go +++ b/pkg/testutils/localtestcluster/local_test_cluster.go @@ -101,12 +101,8 @@ func (ltc *LocalTestCluster) Start(t testing.TB, baseCtx *base.Config, initSende ltc.Gossip = gossip.New(ambient, nc, rpcContext, server, ltc.Stopper, metric.NewRegistry()) ltc.Eng = engine.NewInMem(roachpb.Attributes{}, 50<<20) ltc.Stopper.AddCloser(ltc.Eng) - ltc.RaftEng = ltc.Eng - - if storage.TransitioningRaftStorage || storage.EnabledRaftStorage { - ltc.RaftEng = engine.NewInMem(roachpb.Attributes{}, 50<<20) - ltc.Stopper.AddCloser(ltc.RaftEng) - } + ltc.RaftEng = engine.NewInMem(roachpb.Attributes{}, 50<<20) + ltc.Stopper.AddCloser(ltc.RaftEng) ltc.Stores = storage.NewStores(ambient, ltc.Clock)