Skip to content

Commit

Permalink
[wip] cli,server,storage: migration story + review comments
Browse files Browse the repository at this point in the history
- Include lastIndex in raft engine
- Address backwards compatibility and preserve safety for clusters running
  multiple versions
- Introduce --transitioning flag
- Address review comments
  • Loading branch information
irfansharif committed Jul 10, 2017
1 parent 5fafa43 commit f21ad2f
Show file tree
Hide file tree
Showing 22 changed files with 560 additions and 451 deletions.
9 changes: 9 additions & 0 deletions pkg/cli/cliflags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,15 @@ strongly discouraged for production usage and should never be used on
a public network without combining it with --host.`,
}

TransitioningMode = FlagInfo{
Name: "transitioning",
// TODO(irfansharif): Flesh this section out to better convey the
// intended usage pattern.
Description: `
Start in transitioning mode.
`,
}

// KeySize, CertificateLifetime, AllowKeyReuse, and OverwriteFiles are used for
// certificate generation functions.
KeySize = FlagInfo{
Expand Down
10 changes: 10 additions & 0 deletions pkg/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,16 @@ func init() {
// We share the default with the ClientInsecure flag.
boolFlag(f, &serverInsecure, cliflags.ServerInsecure, baseCfg.Insecure)

// The default transitioning mode should be false, unless specified we
// will run the new major version. It's only when explicitly specified
// that we run in the slower transitioning mode, intended to work
// interoperably across major versions.
//
// TODO(irfansharif): Set to false by default, set to true temporarily
// as a stand-in while acceptance tests get updated to use this flag
// when testing version upgrades/rollbacks.
boolFlag(f, &serverCfg.TransitioningMode, cliflags.TransitioningMode, true)

// Certificates directory. Use a server-specific flag and value to ignore environment
// variables, but share the same default.
stringFlag(f, &serverSSLCertsDir, cliflags.ServerCertsDir, base.DefaultCertsDirectory)
Expand Down
86 changes: 55 additions & 31 deletions pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,37 @@ type Config struct {
// it is ready.
PIDFile string

// We define two modes of operation during migrations across subsequent major
// versions. Changes introduced in a new major version can either be enabled or
// run under transitioning mode (this corresponds to a cluster running mixed
// versions, think rolling upgrades).
//
// Consider the example where we introduced a dedicated RocksDB instance for
// raft data where the following modes are used. Briefly, the major version
// with this feature stores raft data (log entries, HardState, etc.) in a
// new, dedicated RocksDB instance whereas the version prior stored it in the
// same instance storing all user-level keys.
// - The 'transitioning' mode corresponded to storing raft data on both engines
// interoperably in order to facilitate rolling migrations
// - The 'enabled' mode corresponded to storing raft data only in the dedicated
// raft engine, the base engine for everything else (as before)
// See usages of transitioningRaftStorage in pkg/storage/store.go for an
// example of how mode was used.
//
// NB: It should be safe to transition from the old version to transitioning
// mode and then from transitioning to enabled (once all the nodes in the
// cluster are in transitioning mode). Likewise, to facilitate rollbacks, it
// should be safe to transition from enabled to transitioning mode and from
// transitioning to the old version (again, once all the nodes in the cluster
// are in transitioning mode). The intended design here is that version
// upgrades and rollbacks actually go through a sequence of two rolling
// restarts with nodes running in transitioning mode performing sub-optimally
// so as to maintain interoperability.
//
// TransitioningMode is the switch to indicate that we're running in
// transitioning mode.
TransitioningMode bool

enginesCreated bool
}

Expand Down Expand Up @@ -436,15 +467,15 @@ func (e *Engines) Close() {
*e = nil
}

// CreateEngines creates Engines based on the specs in cfg.Stores.
// CreateEngines creates engines based on the specs in cfg.Stores. For each
// store we create two engines, one for raft data (log entries, HardState,
// etc.) and one for everything else.
func (cfg *Config) CreateEngines(ctx context.Context) (Engines, Engines, error) {
var engines Engines
defer engines.Close()

var raftEngines Engines
if storage.TransitioningRaftStorage || storage.EnabledRaftStorage {
defer raftEngines.Close()
}
defer raftEngines.Close()

if cfg.enginesCreated {
return Engines{}, Engines{}, errors.Errorf("engines already created")
Expand Down Expand Up @@ -486,18 +517,12 @@ func (cfg *Config) CreateEngines(ctx context.Context) (Engines, Engines, error)
}
details = append(details, fmt.Sprintf("store %d: in-memory, size %s",
i, humanizeutil.IBytes(sizeInBytes)))
var engSize int64
if storage.TransitioningRaftStorage || storage.EnabledRaftStorage {
engSize = (9 * sizeInBytes) / 10
}
engSize := (9 * sizeInBytes) / 10
eng := engine.NewInMem(spec.Attributes, engSize)
raftEng := eng
if storage.TransitioningRaftStorage || storage.EnabledRaftStorage {
// TODO(irfansharif): For now we specify initialize the raft
// engine with 10% of the total size specified, this can/should
// be determined via user specified flags.
raftEng = engine.NewInMem(spec.Attributes, sizeInBytes-engSize)
}
// TODO(irfansharif): For now we specify initialize the raft
// engine with 10% of the total size specified, this can/should
// be determined via user specified flags.
raftEng := engine.NewInMem(spec.Attributes, sizeInBytes-engSize)

engines = append(engines, eng)
raftEngines = append(raftEngines, raftEng)
Expand Down Expand Up @@ -527,21 +552,18 @@ func (cfg *Config) CreateEngines(ctx context.Context) (Engines, Engines, error)
return Engines{}, Engines{}, err
}

raftEng := eng
if storage.TransitioningRaftStorage || storage.EnabledRaftStorage {
// TODO(irfansharif): TBD on max open files. For now we also
// use the same shared cache. It's worth exploring if there's
// performance gain to be had using a dedicated cache instead.
raftEng, err = engine.NewRocksDB(
spec.Attributes,
filepath.Join(spec.Path, raftEngineSubDir),
cache,
sizeInBytes,
engine.DefaultMaxOpenFiles,
)
if err != nil {
return Engines{}, Engines{}, err
}
// TODO(irfansharif): TBD on max open files. For now we also
// use the same shared cache. It's worth exploring if there's a
// performance gain to be had using a dedicated cache instead.
raftEng, err := engine.NewRocksDB(
spec.Attributes,
filepath.Join(spec.Path, raftEngineSubDir),
cache,
sizeInBytes,
engine.DefaultMaxOpenFiles,
)
if err != nil {
return Engines{}, Engines{}, err
}

engines = append(engines, eng)
Expand All @@ -555,8 +577,10 @@ func (cfg *Config) CreateEngines(ctx context.Context) (Engines, Engines, error)
log.Info(ctx, s)
}
enginesCopy := engines
engines = nil
raftEnginesCopy := raftEngines
// Neutralize the preceding deferred {engines,raftEngines}.Close(). See the
// comment on Engines.Close() for this pattern.
engines = nil
raftEngines = nil
return enginesCopy, raftEnginesCopy, nil
}
Expand Down
9 changes: 2 additions & 7 deletions pkg/server/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/gossip/resolver"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand All @@ -41,10 +40,8 @@ func TestParseInitNodeAttributes(t *testing.T) {
if err != nil {
t.Fatalf("Failed to initialize stores: %s", err)
}
if storage.TransitioningRaftStorage || storage.EnabledRaftStorage {
defer raftEngines.Close()
}
defer engines.Close()
defer raftEngines.Close()
if err := cfg.InitNode(); err != nil {
t.Fatalf("Failed to initialize node: %s", err)
}
Expand All @@ -66,9 +63,7 @@ func TestParseJoinUsingAddrs(t *testing.T) {
t.Fatalf("Failed to initialize stores: %s", err)
}
defer engines.Close()
if storage.TransitioningRaftStorage || storage.EnabledRaftStorage {
defer raftEngines.Close()
}
defer raftEngines.Close()
if err := cfg.InitNode(); err != nil {
t.Fatalf("Failed to initialize node: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func (n *Node) start(
n.startComputePeriodicMetrics(n.stopper, n.storeCfg.MetricsSampleInterval)
n.startGossip(n.stopper)

log.Infof(ctx, "%s: started with %v engine(s), %v raft engines and attributes %v", n, engines, raftEngines, attrs.Attrs)
log.Infof(ctx, "%s: started with %v engine(s), %v raft engine(s) and attributes %v", n, engines, raftEngines, attrs.Attrs)
return nil
}

Expand Down
62 changes: 19 additions & 43 deletions pkg/server/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,8 @@ func TestBootstrapCluster(t *testing.T) {
defer stopper.Stop(context.TODO())
e := engine.NewInMem(roachpb.Attributes{}, 1<<20)
stopper.AddCloser(e)
re := e
if storage.TransitioningRaftStorage || storage.EnabledRaftStorage {
re = engine.NewInMem(roachpb.Attributes{}, 1<<20)
stopper.AddCloser(re)
}
re := engine.NewInMem(roachpb.Attributes{}, 1<<20)
stopper.AddCloser(re)
if _, err := bootstrapCluster(
storage.StoreConfig{}, []engine.Engine{e}, []engine.Engine{re}, kv.MakeTxnMetrics(metric.TestSampleInterval),
); err != nil {
Expand Down Expand Up @@ -231,10 +228,7 @@ func TestBootstrapCluster(t *testing.T) {
func TestBootstrapNewStore(t *testing.T) {
defer leaktest.AfterTest(t)()
e := engine.NewInMem(roachpb.Attributes{}, 1<<20)
re := e
if storage.TransitioningRaftStorage || storage.EnabledRaftStorage {
re = engine.NewInMem(roachpb.Attributes{}, 1<<20)
}
re := engine.NewInMem(roachpb.Attributes{}, 1<<20)
if _, err := bootstrapCluster(
storage.StoreConfig{}, []engine.Engine{e}, []engine.Engine{re}, kv.MakeTxnMetrics(metric.TestSampleInterval),
); err != nil {
Expand All @@ -249,15 +243,12 @@ func TestBootstrapNewStore(t *testing.T) {
})
defer engines.Close()

raftEngines := engines
if storage.TransitioningRaftStorage || storage.EnabledRaftStorage {
raftEngines = Engines([]engine.Engine{
re,
engine.NewInMem(roachpb.Attributes{}, 1<<20),
engine.NewInMem(roachpb.Attributes{}, 1<<20),
})
defer raftEngines.Close()
}
raftEngines := Engines([]engine.Engine{
re,
engine.NewInMem(roachpb.Attributes{}, 1<<20),
engine.NewInMem(roachpb.Attributes{}, 1<<20),
})
defer raftEngines.Close()
_, _, node, stopper := createAndStartTestNode(
util.TestAddr,
engines,
Expand Down Expand Up @@ -299,11 +290,8 @@ func TestNodeJoin(t *testing.T) {
e := engine.NewInMem(roachpb.Attributes{}, 1<<20)
engineStopper.AddCloser(e)

re := e
if storage.TransitioningRaftStorage || storage.EnabledRaftStorage {
re = engine.NewInMem(roachpb.Attributes{}, 1<<20)
engineStopper.AddCloser(re)
}
re := engine.NewInMem(roachpb.Attributes{}, 1<<20)
engineStopper.AddCloser(re)

if _, err := bootstrapCluster(
storage.StoreConfig{}, []engine.Engine{e}, []engine.Engine{re}, kv.MakeTxnMetrics(metric.TestSampleInterval),
Expand All @@ -328,11 +316,8 @@ func TestNodeJoin(t *testing.T) {
e2 := engine.NewInMem(roachpb.Attributes{}, 1<<20)
engineStopper.AddCloser(e2)

re2 := e2
if storage.TransitioningRaftStorage || storage.EnabledRaftStorage {
re2 = engine.NewInMem(roachpb.Attributes{}, 1<<20)
engineStopper.AddCloser(re2)
}
re2 := engine.NewInMem(roachpb.Attributes{}, 1<<20)
engineStopper.AddCloser(re2)

engines2 := []engine.Engine{e2}
raftEngines2 := []engine.Engine{re2}
Expand Down Expand Up @@ -384,11 +369,8 @@ func TestNodeJoinSelf(t *testing.T) {
e := engine.NewInMem(roachpb.Attributes{}, 1<<20)
defer e.Close()

re := e
if storage.TransitioningRaftStorage || storage.EnabledRaftStorage {
re = engine.NewInMem(roachpb.Attributes{}, 1<<20)
defer re.Close()
}
re := engine.NewInMem(roachpb.Attributes{}, 1<<20)
defer re.Close()

engines := []engine.Engine{e}
raftEngines := []engine.Engine{re}
Expand All @@ -407,11 +389,8 @@ func TestCorruptedClusterID(t *testing.T) {

e := engine.NewInMem(roachpb.Attributes{}, 1<<20)
defer e.Close()
re := e
if storage.TransitioningRaftStorage || storage.EnabledRaftStorage {
re = engine.NewInMem(roachpb.Attributes{}, 1<<20)
defer re.Close()
}
re := engine.NewInMem(roachpb.Attributes{}, 1<<20)
defer re.Close()
if _, err := bootstrapCluster(
storage.StoreConfig{}, []engine.Engine{e}, []engine.Engine{re}, kv.MakeTxnMetrics(metric.TestSampleInterval),
); err != nil {
Expand Down Expand Up @@ -744,11 +723,8 @@ func TestStartNodeWithLocality(t *testing.T) {
e := engine.NewInMem(roachpb.Attributes{}, 1<<20)
defer e.Close()

re := e
if storage.TransitioningRaftStorage || storage.EnabledRaftStorage {
re = engine.NewInMem(roachpb.Attributes{}, 1<<20)
defer re.Close()
}
re := engine.NewInMem(roachpb.Attributes{}, 1<<20)
defer re.Close()
if _, err := bootstrapCluster(
storage.StoreConfig{}, []engine.Engine{e}, []engine.Engine{re}, kv.MakeTxnMetrics(metric.TestSampleInterval),
); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
)
s.db = client.NewDB(s.txnCoordSender, s.clock)

storage.SetTransitioningMode(s.cfg.TransitioningMode)

active, renewal := storage.NodeLivenessDurations(
storage.RaftElectionTimeout(s.cfg.RaftTickInterval, s.cfg.RaftElectionTimeoutTicks))
s.nodeLiveness = storage.NewNodeLiveness(
Expand Down Expand Up @@ -680,9 +682,7 @@ func (s *Server) Start(ctx context.Context) error {
return errors.Wrap(err, "failed to create {raft,}engines")
}
s.stopper.AddCloser(&s.engines)
if storage.TransitioningRaftStorage || storage.EnabledRaftStorage {
s.stopper.AddCloser(&s.raftEngines)
}
s.stopper.AddCloser(&s.raftEngines)

// We might have to sleep a bit to protect against this node producing non-
// monotonic timestamps. Before restarting, its clock might have been driven
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,8 @@ func TestSystemConfigGossip(t *testing.T) {
func TestOfficializeAddr(t *testing.T) {
defer leaktest.AfterTest(t)()

t.Skip("TODO(irfansharif): Remove this, local hostname problem")

host, err := os.Hostname()
if err != nil {
t.Fatal(err)
Expand Down
14 changes: 4 additions & 10 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,8 @@ func TestStoreRecoverFromEngine(t *testing.T) {
defer engineStopper.Stop(context.TODO())
eng := engine.NewInMem(roachpb.Attributes{}, 1<<20)
engineStopper.AddCloser(eng)
raftEng := eng
if storage.TransitioningRaftStorage || storage.EnabledRaftStorage {
raftEng = engine.NewInMem(roachpb.Attributes{}, 1<<20)
engineStopper.AddCloser(raftEng)
}
raftEng := engine.NewInMem(roachpb.Attributes{}, 1<<20)
engineStopper.AddCloser(raftEng)
var rangeID2 roachpb.RangeID

get := func(store *storage.Store, rangeID roachpb.RangeID, key roachpb.Key) int64 {
Expand Down Expand Up @@ -173,11 +170,8 @@ func TestStoreRecoverWithErrors(t *testing.T) {
storeCfg.TestingKnobs.DisableSplitQueue = true
eng := engine.NewInMem(roachpb.Attributes{}, 1<<20)
defer eng.Close()
raftEng := eng
if storage.TransitioningRaftStorage || storage.EnabledRaftStorage {
raftEng = engine.NewInMem(roachpb.Attributes{}, 10<<20)
defer raftEng.Close()
}
raftEng := engine.NewInMem(roachpb.Attributes{}, 10<<20)
defer raftEng.Close()

numIncrements := 0

Expand Down
7 changes: 2 additions & 5 deletions pkg/storage/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,8 @@ func TestTxnPutOutOfOrder(t *testing.T) {
}
eng := engine.NewInMem(roachpb.Attributes{}, 10<<20)
stopper.AddCloser(eng)
raftEng := eng
if storage.TransitioningRaftStorage || storage.EnabledRaftStorage {
raftEng = engine.NewInMem(roachpb.Attributes{}, 10<<20)
stopper.AddCloser(raftEng)
}
raftEng := engine.NewInMem(roachpb.Attributes{}, 10<<20)
stopper.AddCloser(raftEng)
store := createTestStoreWithEngine(t,
eng,
raftEng,
Expand Down
1 change: 0 additions & 1 deletion pkg/storage/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1225,7 +1225,6 @@ func TestSplitSnapshotRace_SplitWins(t *testing.T) {
// split, so it still has a conflicting range.
func TestSplitSnapshotRace_SnapshotWins(t *testing.T) {
defer leaktest.AfterTest(t)()
t.Skip()
runSetupSplitSnapshotRace(t, func(mtc *multiTestContext, leftKey, rightKey roachpb.Key) {
// Bring the right range up first.
for i := 3; i <= 5; i++ {
Expand Down
Loading

0 comments on commit f21ad2f

Please sign in to comment.