Skip to content

Commit

Permalink
storage: introduce dedicated raft storage
Browse files Browse the repository at this point in the history
Implements cockroachdb#16361.

This is a breaking change. To see why consider that prior to this we
stored all consensus data in addition to all system metadata and user
level keys in the same, single RocksDB instance. Here we introduce a
separate, dedicated instance for raft data (log entries and
HardState). Cockroach nodes simply restarting with these changes, unless
migrated properly, will fail to find the most recent raft long entries
and HardState data in the new RocksDB instance.

Also consider a cluster running mixed versions (nodes with dedicated
raft storage and nodes without), what would the communication between
nodes here like in light of proposer evaluated
KV? Current we propagate a storagebase.WriteBatch through raft
containing a serialized representation of a RocksDB write batch, this
models the changes to be made to the single underlying RocksDB instance.
For log truncation requests where we delete log entries and/or admin
splits where we write initial HardState for newly formed replicas, we
need to similarly propagate a write batch (through raft) addressing the
new RocksDB instance (if the recipient node is one with these changes)
or the original RocksDB instance (if the recipient node is one without
these changes). What if an older version node is the raft leader and is
therefore the one upstream of raft, propagating storagebase.WriteBatches
with raft data changes but addressed to the original RocksDB instance?
What would rollbacks look like?

To this end we introduce three modes of operation,
transitioningRaftStorage and enabledRaftStorage (this is implicit if
we're not in transitioning mode). We've made it so that it is safe to
transition between an older cockroach version to
transitioningRaftStorage, from transitioningRaftStorage to
enabled and the reverse for rollbacks. Transition from one
mode to the next will take place when all the nodes in the cluster are
on the same previous mode. The operation mode is set by an env var
COCKROACH_DEDICATED_RAFT_STORAGE={DISABLED,TRANSITIONING,ENABLED}

- In the old version we use a single RocksDB instance for both raft
  and user-level KV data
- In transitioningRaftStorage mode we use both RocksDB instances for raft
  data interoperably, the raft specific and the regular instance. We use
  this mode to facilitate rolling upgrades
- In enabled mode we use the dedicated RocksDB instance for raft data.
  Raft log entries and the HardState are stored on this instance alone

Most of this commit is careful plumbing of an extra
engine.{Engine,Batch,Reader,Writer,ReadWriter} for whenever we need to
interact with the new RocksDB instance.
  • Loading branch information
irfansharif committed Jul 10, 2017
1 parent 2d9450b commit fb4ce33
Show file tree
Hide file tree
Showing 30 changed files with 948 additions and 265 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/storageccl/add_sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func init() {
}

func evalAddSSTable(
ctx context.Context, batch engine.ReadWriter, cArgs storage.CommandArgs, _ roachpb.Response,
ctx context.Context, batch, _ engine.ReadWriter, cArgs storage.CommandArgs, _ roachpb.Response,
) (storage.EvalResult, error) {
args := cArgs.Args.(*roachpb.AddSSTableRequest)
h := cArgs.Header
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/storageccl/add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func TestAddSSTableMVCCStats(t *testing.T) {
ValCount: 10000,
},
}
if _, err := evalAddSSTable(ctx, e, cArgs, nil); err != nil {
if _, err := evalAddSSTable(ctx, e, nil, cArgs, nil); err != nil {
t.Fatalf("%+v", err)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/storageccl/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (r *rowCounter) count(key roachpb.Key) error {
// evalExport dumps the requested keys into files of non-overlapping key ranges
// in a format suitable for bulk ingest.
func evalExport(
ctx context.Context, batch engine.ReadWriter, cArgs storage.CommandArgs, resp roachpb.Response,
ctx context.Context, batch, _ engine.ReadWriter, cArgs storage.CommandArgs, resp roachpb.Response,
) (storage.EvalResult, error) {
args := cArgs.Args.(*roachpb.ExportRequest)
h := cArgs.Header
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/storageccl/writebatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func init() {
// data in the affected keyrange is first cleared (not tombstoned), which makes
// this command idempotent.
func evalWriteBatch(
ctx context.Context, batch engine.ReadWriter, cArgs storage.CommandArgs, _ roachpb.Response,
ctx context.Context, batch, _ engine.ReadWriter, cArgs storage.CommandArgs, _ roachpb.Response,
) (storage.EvalResult, error) {

args := cArgs.Args.(*roachpb.WriteBatchRequest)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/storageccl/writebatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func TestWriteBatchMVCCStats(t *testing.T) {
ValCount: 10000,
},
}
if _, err := evalWriteBatch(ctx, e, cArgs, nil); err != nil {
if _, err := evalWriteBatch(ctx, e, nil, cArgs, nil); err != nil {
t.Fatalf("%+v", err)
}

Expand All @@ -167,7 +167,7 @@ func TestWriteBatchMVCCStats(t *testing.T) {
}

// Run the same WriteBatch command a second time to test the idempotence.
if _, err := evalWriteBatch(ctx, e, cArgs, nil); err != nil {
if _, err := evalWriteBatch(ctx, e, nil, cArgs, nil); err != nil {
t.Fatalf("%+v", err)
}
if !reflect.DeepEqual(expectedStats, cArgs.Stats) {
Expand Down
66 changes: 54 additions & 12 deletions pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"io/ioutil"
"math"
"net"
"path/filepath"
"runtime"
"strconv"
"strings"
Expand Down Expand Up @@ -62,6 +63,7 @@ const (

minimumNetworkFileDescriptors = 256
recommendedNetworkFileDescriptors = 5000
raftEngineSubDir = "raft"

productionSettingsWebpage = "please see https://www.cockroachlabs.com/docs/stable/recommended-production-settings.html for more details"
)
Expand Down Expand Up @@ -435,12 +437,17 @@ func (e *Engines) Close() {
}

// CreateEngines creates Engines based on the specs in cfg.Stores.
func (cfg *Config) CreateEngines(ctx context.Context) (Engines, error) {
engines := Engines(nil)
func (cfg *Config) CreateEngines(ctx context.Context) (Engines, Engines, error) {
var engines Engines
defer engines.Close()

var raftEngines Engines
if storage.TransitioningRaftStorage || storage.EnabledRaftStorage {
defer raftEngines.Close()
}

if cfg.enginesCreated {
return Engines{}, errors.Errorf("engines already created")
return Engines{}, Engines{}, errors.Errorf("engines already created")
}
cfg.enginesCreated = true

Expand All @@ -458,7 +465,7 @@ func (cfg *Config) CreateEngines(ctx context.Context) (Engines, error) {
}
openFileLimitPerStore, err := setOpenFileLimit(physicalStores)
if err != nil {
return Engines{}, err
return Engines{}, Engines{}, err
}

skipSizeCheck := cfg.TestingKnobs.Store != nil &&
Expand All @@ -469,27 +476,41 @@ func (cfg *Config) CreateEngines(ctx context.Context) (Engines, error) {
if spec.SizePercent > 0 {
sysMem, err := GetTotalMemory(ctx)
if err != nil {
return Engines{}, errors.Errorf("could not retrieve system memory")
return Engines{}, Engines{}, errors.Errorf("could not retrieve system memory")
}
sizeInBytes = int64(float64(sysMem) * spec.SizePercent / 100)
}
if sizeInBytes != 0 && !skipSizeCheck && sizeInBytes < base.MinimumStoreSize {
return Engines{}, errors.Errorf("%f%% of memory is only %s bytes, which is below the minimum requirement of %s",
return Engines{}, Engines{}, errors.Errorf("%f%% of memory is only %s bytes, which is below the minimum requirement of %s",
spec.SizePercent, humanizeutil.IBytes(sizeInBytes), humanizeutil.IBytes(base.MinimumStoreSize))
}
details = append(details, fmt.Sprintf("store %d: in-memory, size %s",
i, humanizeutil.IBytes(sizeInBytes)))
engines = append(engines, engine.NewInMem(spec.Attributes, sizeInBytes))
var engSize int64
if storage.TransitioningRaftStorage || storage.EnabledRaftStorage {
engSize = (9 * sizeInBytes) / 10
}
eng := engine.NewInMem(spec.Attributes, engSize)
raftEng := eng
if storage.TransitioningRaftStorage || storage.EnabledRaftStorage {
// TODO(irfansharif): For now we specify initialize the raft
// engine with 10% of the total size specified, this can/should
// be determined via user specified flags.
raftEng = engine.NewInMem(spec.Attributes, sizeInBytes-engSize)
}

engines = append(engines, eng)
raftEngines = append(raftEngines, raftEng)
} else {
if spec.SizePercent > 0 {
fileSystemUsage := gosigar.FileSystemUsage{}
if err := fileSystemUsage.Get(spec.Path); err != nil {
return Engines{}, err
return Engines{}, Engines{}, err
}
sizeInBytes = int64(float64(fileSystemUsage.Total) * spec.SizePercent / 100)
}
if sizeInBytes != 0 && !skipSizeCheck && sizeInBytes < base.MinimumStoreSize {
return Engines{}, errors.Errorf("%f%% of %s's total free space is only %s bytes, which is below the minimum requirement of %s",
return Engines{}, Engines{}, errors.Errorf("%f%% of %s's total free space is only %s bytes, which is below the minimum requirement of %s",
spec.SizePercent, spec.Path, humanizeutil.IBytes(sizeInBytes), humanizeutil.IBytes(base.MinimumStoreSize))
}

Expand All @@ -503,20 +524,41 @@ func (cfg *Config) CreateEngines(ctx context.Context) (Engines, error) {
openFileLimitPerStore,
)
if err != nil {
return Engines{}, err
return Engines{}, Engines{}, err
}

raftEng := eng
if storage.TransitioningRaftStorage || storage.EnabledRaftStorage {
// TODO(irfansharif): TBD on max open files. For now we also
// use the same shared cache. It's worth exploring if there's
// performance gain to be had using a dedicated cache instead.
raftEng, err = engine.NewRocksDB(
spec.Attributes,
filepath.Join(spec.Path, raftEngineSubDir),
cache,
sizeInBytes,
engine.DefaultMaxOpenFiles,
)
if err != nil {
return Engines{}, Engines{}, err
}
}

engines = append(engines, eng)
raftEngines = append(raftEngines, raftEng)
}
}

log.Infof(ctx, "%d storage engine%s initialized",
log.Infof(ctx, "%d storage {raft,}engine%s initialized",
len(engines), util.Pluralize(int64(len(engines))))
for _, s := range details {
log.Info(ctx, s)
}
enginesCopy := engines
engines = nil
return enginesCopy, nil
raftEnginesCopy := raftEngines
raftEngines = nil
return enginesCopy, raftEnginesCopy, nil
}

// InitNode parses node attributes and initializes the gossip bootstrap
Expand Down
11 changes: 9 additions & 2 deletions pkg/server/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/gossip/resolver"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand All @@ -36,10 +37,13 @@ func TestParseInitNodeAttributes(t *testing.T) {
cfg := MakeConfig()
cfg.Attrs = "attr1=val1::attr2=val2"
cfg.Stores = base.StoreSpecList{Specs: []base.StoreSpec{{InMemory: true, SizeInBytes: base.MinimumStoreSize * 100}}}
engines, err := cfg.CreateEngines(context.TODO())
engines, raftEngines, err := cfg.CreateEngines(context.TODO())
if err != nil {
t.Fatalf("Failed to initialize stores: %s", err)
}
if storage.TransitioningRaftStorage || storage.EnabledRaftStorage {
defer raftEngines.Close()
}
defer engines.Close()
if err := cfg.InitNode(); err != nil {
t.Fatalf("Failed to initialize node: %s", err)
Expand All @@ -57,11 +61,14 @@ func TestParseJoinUsingAddrs(t *testing.T) {
cfg := MakeConfig()
cfg.JoinList = []string{"localhost:12345,,localhost:23456", "localhost:34567"}
cfg.Stores = base.StoreSpecList{Specs: []base.StoreSpec{{InMemory: true, SizeInBytes: base.MinimumStoreSize * 100}}}
engines, err := cfg.CreateEngines(context.TODO())
engines, raftEngines, err := cfg.CreateEngines(context.TODO())
if err != nil {
t.Fatalf("Failed to initialize stores: %s", err)
}
defer engines.Close()
if storage.TransitioningRaftStorage || storage.EnabledRaftStorage {
defer raftEngines.Close()
}
if err := cfg.InitNode(); err != nil {
t.Fatalf("Failed to initialize node: %s", err)
}
Expand Down
32 changes: 21 additions & 11 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,11 @@ func GetBootstrapSchema() sqlbase.MetadataSchema {
// single range spanning all keys. Initial range lookup metadata is
// populated for the range. Returns the cluster ID.
func bootstrapCluster(
cfg storage.StoreConfig, engines []engine.Engine, txnMetrics kv.TxnMetrics,
cfg storage.StoreConfig, engines, raftEngines []engine.Engine, txnMetrics kv.TxnMetrics,
) (uuid.UUID, error) {
if len(engines) != len(raftEngines) {
panic(fmt.Sprintf("len(engines) %d != len(raftEngines) %d", len(engines), len(raftEngines)))
}
clusterID := uuid.MakeV4()
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())
Expand All @@ -202,7 +205,7 @@ func bootstrapCluster(
sender := kv.NewTxnCoordSender(cfg.AmbientCtx, stores, cfg.Clock, false, stopper, txnMetrics)
cfg.DB = client.NewDB(sender, cfg.Clock)
cfg.Transport = storage.NewDummyRaftTransport()
for i, eng := range engines {
for i := range engines {
sIdent := roachpb.StoreIdent{
ClusterID: clusterID,
NodeID: FirstNodeID,
Expand All @@ -211,7 +214,7 @@ func bootstrapCluster(

// The bootstrapping store will not connect to other nodes so its
// StoreConfig doesn't really matter.
s := storage.NewStore(cfg, eng, &roachpb.NodeDescriptor{NodeID: FirstNodeID})
s := storage.NewStore(cfg, engines[i], raftEngines[i], &roachpb.NodeDescriptor{NodeID: FirstNodeID})

// Bootstrap store to persist the store ident.
if err := s.Bootstrap(sIdent); err != nil {
Expand Down Expand Up @@ -347,29 +350,30 @@ func (n *Node) start(
ctx context.Context,
addr net.Addr,
engines []engine.Engine,
raftEngines []engine.Engine,
attrs roachpb.Attributes,
locality roachpb.Locality,
canBootstrap bool,
) error {
n.initDescriptor(addr, attrs, locality)

// Initialize stores, including bootstrapping new ones.
if err := n.initStores(ctx, engines, n.stopper, false); err != nil {
if err := n.initStores(ctx, engines, raftEngines, n.stopper, false); err != nil {
if err == errNeedsBootstrap {
if !canBootstrap {
return errCannotJoinSelf
}
n.initialBoot = true
// This node has no initialized stores and no way to connect to
// an existing cluster, so we bootstrap it.
clusterID, err := bootstrapCluster(n.storeCfg, engines, n.txnMetrics)
clusterID, err := bootstrapCluster(n.storeCfg, engines, raftEngines, n.txnMetrics)
if err != nil {
return err
}
log.Infof(ctx, "**** cluster %s has been created", clusterID)
log.Infof(ctx, "**** add additional nodes by specifying --join=%s", addr)
// After bootstrapping, try again to initialize the stores.
if err := n.initStores(ctx, engines, n.stopper, true); err != nil {
if err := n.initStores(ctx, engines, raftEngines, n.stopper, true); err != nil {
return err
}
} else {
Expand All @@ -382,7 +386,7 @@ func (n *Node) start(
n.startComputePeriodicMetrics(n.stopper, n.storeCfg.MetricsSampleInterval)
n.startGossip(n.stopper)

log.Infof(ctx, "%s: started with %v engine(s) and attributes %v", n, engines, attrs.Attrs)
log.Infof(ctx, "%s: started with %v engine(s), %v raft engines and attributes %v", n, engines, raftEngines, attrs.Attrs)
return nil
}

Expand Down Expand Up @@ -414,16 +418,22 @@ func (n *Node) SetDraining(drain bool) error {
// bootstraps list for initialization once the cluster and node IDs
// have been determined.
func (n *Node) initStores(
ctx context.Context, engines []engine.Engine, stopper *stop.Stopper, bootstrapped bool,
ctx context.Context,
engines, raftEngines []engine.Engine,
stopper *stop.Stopper,
bootstrapped bool,
) error {
if len(engines) != len(raftEngines) {
panic(fmt.Sprintf("len(engines) %d != len(raftEngines) %d", len(engines), len(raftEngines)))
}
var bootstraps []*storage.Store

if len(engines) == 0 {
return errors.Errorf("no engines")
}
for _, e := range engines {
s := storage.NewStore(n.storeCfg, e, &n.Descriptor)
log.Eventf(ctx, "created store for engine: %s", e)
for i := range engines {
s := storage.NewStore(n.storeCfg, engines[i], raftEngines[i], &n.Descriptor)
log.Eventf(ctx, "created store for engine: %s, raft engine: %s", engines[i], raftEngines[i])
if bootstrapped {
s.NotifyBootstrapped()
}
Expand Down
Loading

0 comments on commit fb4ce33

Please sign in to comment.