Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve raft write performance by utilizing FSM Batching #7527

Merged
merged 16 commits into from
Oct 14, 2019
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ require (
github.com/hashicorp/go-memdb v1.0.2
github.com/hashicorp/go-msgpack v0.5.5
github.com/hashicorp/go-multierror v1.0.0
github.com/hashicorp/go-raftchunking v0.6.2
github.com/hashicorp/go-raftchunking v0.6.3-0.20191002164813-7e9e8525653a
github.com/hashicorp/go-rootcerts v1.0.1
github.com/hashicorp/go-sockaddr v1.0.2
github.com/hashicorp/go-syslog v1.0.0
github.com/hashicorp/go-uuid v1.0.1
github.com/hashicorp/golang-lru v0.5.3
github.com/hashicorp/hcl v1.0.0
github.com/hashicorp/nomad/api v0.0.0-20190412184103-1c38ced33adf
github.com/hashicorp/raft v1.1.1
github.com/hashicorp/raft v1.1.2-0.20191002163536-9c6bd3e3eb17
github.com/hashicorp/raft-snapshot v1.0.2-0.20190827162939-8117efcc5aab
github.com/hashicorp/vault-plugin-auth-alicloud v0.5.2-0.20190814210027-93970f08f2ec
github.com/hashicorp/vault-plugin-auth-azure v0.5.2-0.20190814210035-08e00d801115
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYE
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4=
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
github.com/boombuler/barcode v1.0.0 h1:s1TvRnXwL2xJRaccrdcBQMZxq6X7DvsMogtmJeHDdrc=
github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
Expand Down Expand Up @@ -291,6 +292,8 @@ github.com/hashicorp/go-plugin v1.0.1 h1:4OtAfUGbnKC6yS48p0CtMX2oFYtzFZVv6rok3cR
github.com/hashicorp/go-plugin v1.0.1/go.mod h1:++UyYGoz3o5w9ZzAdZxtQKrWWP+iqPBn3cQptSMzBuY=
github.com/hashicorp/go-raftchunking v0.6.2 h1:imj6CVkwXj6VzgXZQvzS+fSrkbFCzlJ2t00F3PacnuU=
github.com/hashicorp/go-raftchunking v0.6.2/go.mod h1:cGlg3JtDy7qy6c/3Bu660Mic1JF+7lWqIwCFSb08fX0=
github.com/hashicorp/go-raftchunking v0.6.3-0.20191002164813-7e9e8525653a h1:FmnBDwGwlTgugDGbVxwV8UavqSMACbGrUpfc98yFLR4=
github.com/hashicorp/go-raftchunking v0.6.3-0.20191002164813-7e9e8525653a/go.mod h1:xbXnmKqX9/+RhPkJ4zrEx4738HacP72aaUPlT2RZ4sU=
github.com/hashicorp/go-retryablehttp v0.5.3 h1:QlWt0KvWT0lq8MFppF9tsJGF+ynG7ztc2KIPhzRGk7s=
github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs=
github.com/hashicorp/go-retryablehttp v0.5.4 h1:1BZvpawXoJCWX6pNtow9+rpEj+3itIlutiqnntI6jOE=
Expand Down Expand Up @@ -330,6 +333,8 @@ github.com/hashicorp/nomad/api v0.0.0-20190412184103-1c38ced33adf/go.mod h1:BDng
github.com/hashicorp/raft v1.0.1/go.mod h1:DVSAWItjLjTOkVbSpWQ0j0kUADIvDaCtBxIcbNAQLkI=
github.com/hashicorp/raft v1.1.1 h1:HJr7UE1x/JrJSc9Oy6aDBHtNHUUBHjcQjTgvUVihoZs=
github.com/hashicorp/raft v1.1.1/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
github.com/hashicorp/raft v1.1.2-0.20191002163536-9c6bd3e3eb17 h1:p+2EISNdFCnD9R+B4xCiqSn429MCFtvM41aHJDJ6qW4=
github.com/hashicorp/raft v1.1.2-0.20191002163536-9c6bd3e3eb17/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk=
github.com/hashicorp/raft-snapshot v1.0.2-0.20190827162939-8117efcc5aab h1:WzGMwlO1DvaC93SvVOBOKtn+nXGEDXapyJuaRV3/VaY=
github.com/hashicorp/raft-snapshot v1.0.2-0.20190827162939-8117efcc5aab/go.mod h1:5sL9eUn72lH5DzsFIJ9jaysITbHksSSszImWSOTC8Ic=
Expand Down
217 changes: 100 additions & 117 deletions physical/raft/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ var (
var _ physical.Backend = (*FSM)(nil)
var _ physical.Transactional = (*FSM)(nil)
var _ raft.FSM = (*FSM)(nil)
var _ raft.ConfigurationStore = (*FSM)(nil)
var _ raft.BatchingFSM = (*FSM)(nil)

type restoreCallback func(context.Context) error

Expand All @@ -75,7 +75,6 @@ type FSM struct {
l sync.RWMutex
path string
logger log.Logger
permitPool *physical.PermitPool
noopRestore bool

db *bolt.DB
Expand All @@ -88,7 +87,7 @@ type FSM struct {
// additional state in the backend.
storeLatestState bool

chunker *raftchunking.ChunkingConfigurationStore
chunker *raftchunking.ChunkingBatchingFSM
}

// NewFSM constructs a FSM using the given directory
Expand Down Expand Up @@ -159,9 +158,8 @@ func NewFSM(conf map[string]string, logger log.Logger) (*FSM, error) {
}

f := &FSM{
path: conf["path"],
logger: logger,
permitPool: physical.NewPermitPool(physical.DefaultParallelOperations),
path: conf["path"],
logger: logger,

db: boltDB,
latestTerm: latestTerm,
Expand All @@ -170,7 +168,7 @@ func NewFSM(conf map[string]string, logger log.Logger) (*FSM, error) {
storeLatestState: storeLatestState,
}

f.chunker = raftchunking.NewChunkingConfigurationStore(f, &FSMChunkStorage{
f.chunker = raftchunking.NewChunkingBatchingFSM(f, &FSMChunkStorage{
f: f,
ctx: context.Background(),
})
Expand Down Expand Up @@ -245,9 +243,6 @@ func (f *FSM) witnessSnapshot(index, term, configurationIndex uint64, configurat
func (f *FSM) Delete(ctx context.Context, path string) error {
defer metrics.MeasureSince([]string{"raft", "delete"}, time.Now())

f.permitPool.Acquire()
defer f.permitPool.Release()

f.l.RLock()
defer f.l.RUnlock()

Expand All @@ -260,9 +255,6 @@ func (f *FSM) Delete(ctx context.Context, path string) error {
func (f *FSM) DeletePrefix(ctx context.Context, prefix string) error {
defer metrics.MeasureSince([]string{"raft", "delete_prefix"}, time.Now())

f.permitPool.Acquire()
defer f.permitPool.Release()

f.l.RLock()
defer f.l.RUnlock()

Expand All @@ -287,9 +279,6 @@ func (f *FSM) DeletePrefix(ctx context.Context, prefix string) error {
func (f *FSM) Get(ctx context.Context, path string) (*physical.Entry, error) {
defer metrics.MeasureSince([]string{"raft", "get"}, time.Now())

f.permitPool.Acquire()
defer f.permitPool.Release()

f.l.RLock()
defer f.l.RUnlock()

Expand Down Expand Up @@ -324,9 +313,6 @@ func (f *FSM) Get(ctx context.Context, path string) (*physical.Entry, error) {
func (f *FSM) Put(ctx context.Context, entry *physical.Entry) error {
defer metrics.MeasureSince([]string{"raft", "put"}, time.Now())

f.permitPool.Acquire()
defer f.permitPool.Release()

f.l.RLock()
defer f.l.RUnlock()

Expand All @@ -340,9 +326,6 @@ func (f *FSM) Put(ctx context.Context, entry *physical.Entry) error {
func (f *FSM) List(ctx context.Context, prefix string) ([]string, error) {
defer metrics.MeasureSince([]string{"raft", "list"}, time.Now())

f.permitPool.Acquire()
defer f.permitPool.Release()

f.l.RLock()
defer f.l.RUnlock()

Expand Down Expand Up @@ -374,9 +357,6 @@ func (f *FSM) List(ctx context.Context, prefix string) ([]string, error) {
// Transaction writes all the operations in the provided transaction to the bolt
// file.
func (f *FSM) Transaction(ctx context.Context, txns []*physical.TxnEntry) error {
f.permitPool.Acquire()
defer f.permitPool.Release()

f.l.RLock()
defer f.l.RUnlock()

Expand Down Expand Up @@ -404,57 +384,99 @@ func (f *FSM) Transaction(ctx context.Context, txns []*physical.TxnEntry) error
return err
}

// Apply will apply a log value to the FSM. This is called from the raft
// ApplyBatch will apply a set of logs to the FSM. This is called from the raft
// library.
func (f *FSM) Apply(log *raft.Log) interface{} {
command := &LogData{}
err := proto.Unmarshal(log.Data, command)
if err != nil {
f.logger.Error("error proto unmarshaling log data", "error", err)
panic("error proto unmarshaling log data")
}
func (f *FSM) ApplyBatch(logs []*raft.Log) []interface{} {
if len(logs) == 0 {
return []interface{}{}
}

// Do the unmarshalling first so we don't hold locks
var latestConfiguration *ConfigurationValue
commands := make([]interface{}, 0, len(logs))
for _, log := range logs {
switch log.Type {
case raft.LogCommand:
command := &LogData{}
err := proto.Unmarshal(log.Data, command)
if err != nil {
f.logger.Error("error proto unmarshaling log data", "error", err)
panic(len(log.Data))
michelvocks marked this conversation as resolved.
Show resolved Hide resolved
briankassouf marked this conversation as resolved.
Show resolved Hide resolved
panic("error proto unmarshaling log data")
}
commands = append(commands, command)
case raft.LogConfiguration:
configuration := raft.DecodeConfiguration(log.Data)
config := raftConfigurationToProtoConfiguration(log.Index, configuration)

f.l.RLock()
defer f.l.RUnlock()
commands = append(commands, config)

// Update the latest configuration the fsm has received; we will
// store this after it has been committed to storage.
latestConfiguration = config

default:
panic(fmt.Sprintf("got unexpected log type: %d", log.Type))
}
}

// Only advance latest pointer if this log has a higher index value than
// what we have seen in the past.
var logIndex []byte
var err error
latestIndex, _ := f.LatestState()
if latestIndex.Index < log.Index {
lastLog := logs[len(logs)-1]
if latestIndex.Index < lastLog.Index {
logIndex, err = proto.Marshal(&IndexValue{
Term: log.Term,
Index: log.Index,
Term: lastLog.Term,
Index: lastLog.Index,
})
if err != nil {
f.logger.Error("unable to marshal latest index", "error", err)
panic("unable to marshal latest index")
}
}

f.l.RLock()
defer f.l.RUnlock()

err = f.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(dataBucketName)
for _, op := range command.Operations {
var err error
switch op.OpType {
case putOp:
err = b.Put([]byte(op.Key), op.Value)
case deleteOp:
err = b.Delete([]byte(op.Key))
case restoreCallbackOp:
if f.restoreCb != nil {
// Kick off the restore callback function in a go routine
go f.restoreCb(context.Background())
for _, commandRaw := range commands {
switch command := commandRaw.(type) {
case *LogData:
for _, op := range command.Operations {
var err error
switch op.OpType {
case putOp:
err = b.Put([]byte(op.Key), op.Value)
case deleteOp:
err = b.Delete([]byte(op.Key))
case restoreCallbackOp:
if f.restoreCb != nil {
// Kick off the restore callback function in a go routine
go f.restoreCb(context.Background())
}
default:
return fmt.Errorf("%q is not a supported transaction operation", op.OpType)
}
if err != nil {
return err
}
}

case *ConfigurationValue:
b := tx.Bucket(configBucketName)
configBytes, err := proto.Marshal(command)
if err != nil {
return err
}
if err := b.Put(latestConfigKey, configBytes); err != nil {
return err
}
default:
return fmt.Errorf("%q is not a supported transaction operation", op.OpType)
}
if err != nil {
return err
}
}

// TODO: benchmark so we can know how much time this adds
if f.storeLatestState && len(logIndex) > 0 {
b := tx.Bucket(configBucketName)
err = b.Put(latestIndexKey, logIndex)
Expand All @@ -472,13 +494,32 @@ func (f *FSM) Apply(log *raft.Log) interface{} {

// If we advanced the latest value, update the in-memory representation too.
if len(logIndex) > 0 {
atomic.StoreUint64(f.latestTerm, log.Term)
atomic.StoreUint64(f.latestIndex, log.Index)
atomic.StoreUint64(f.latestTerm, lastLog.Term)
atomic.StoreUint64(f.latestIndex, lastLog.Index)
}

// If one or more configuration changes were processed, store the latest one.
if latestConfiguration != nil {
f.latestConfig.Store(latestConfiguration)
}

return &FSMApplyResponse{
Success: true,
// Build the responses. The logs array is used here to esnure we reply to
briankassouf marked this conversation as resolved.
Show resolved Hide resolved
// all command values; even if they are not of the types we expect. This
// should future proof this function from more log types being provided.
resp := make([]interface{}, len(logs))
for i := range logs {
resp[i] = &FSMApplyResponse{
Success: true,
}
}

return resp
}

// Apply will apply a log value to the FSM. This is called from the raft
// library.
func (f *FSM) Apply(log *raft.Log) interface{} {
return f.ApplyBatch([]*raft.Log{log})[0]
}

type writeErrorCloser interface {
Expand Down Expand Up @@ -609,61 +650,6 @@ func (s *noopSnapshotter) Persist(sink raft.SnapshotSink) error {
// Release doesn't do anything.
func (s *noopSnapshotter) Release() {}

// StoreConfig satisfies the raft.ConfigurationStore interface and persists the
// latest raft server configuration to the bolt file.
func (f *FSM) StoreConfiguration(index uint64, configuration raft.Configuration) {
f.l.RLock()
defer f.l.RUnlock()

var indexBytes []byte
latestIndex, _ := f.LatestState()
// Only write the new index if we are advancing the pointer
if index > latestIndex.Index {
latestIndex.Index = index

var err error
indexBytes, err = proto.Marshal(latestIndex)
if err != nil {
f.logger.Error("unable to marshal latest index", "error", err)
panic(fmt.Sprintf("unable to marshal latest index: %v", err))
}
}

protoConfig := raftConfigurationToProtoConfiguration(index, configuration)
configBytes, err := proto.Marshal(protoConfig)
if err != nil {
f.logger.Error("unable to marshal config", "error", err)
panic(fmt.Sprintf("unable to marshal config: %v", err))
}

if f.storeLatestState {
err = f.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(configBucketName)
err := b.Put(latestConfigKey, configBytes)
if err != nil {
return err
}

// TODO: benchmark so we can know how much time this adds
if len(indexBytes) > 0 {
err = b.Put(latestIndexKey, indexBytes)
if err != nil {
return err
}
}

return nil
})
if err != nil {
f.logger.Error("unable to store latest configuration", "error", err)
panic(fmt.Sprintf("unable to store latest configuration: %v", err))
}
}

f.witnessIndex(latestIndex)
f.latestConfig.Store(protoConfig)
}

// raftConfigurationToProtoConfiguration converts a raft configuration object to
// a proto value.
func raftConfigurationToProtoConfiguration(index uint64, configuration raft.Configuration) *ConfigurationValue {
Expand Down Expand Up @@ -722,9 +708,6 @@ func (f *FSMChunkStorage) StoreChunk(chunk *raftchunking.ChunkInfo) (bool, error
Value: b,
}

f.f.permitPool.Acquire()
defer f.f.permitPool.Release()

f.f.l.RLock()
defer f.f.l.RUnlock()

Expand Down
Loading