Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for larger transactions in Raft #24991

Merged
merged 3 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog/24991.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
storage/raft: Add support for larger transactions when using raft storage.
```
18 changes: 14 additions & 4 deletions physical/consul/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ const (

// Verify ConsulBackend satisfies the correct interfaces
var (
_ physical.Backend = (*ConsulBackend)(nil)
_ physical.FencingHABackend = (*ConsulBackend)(nil)
_ physical.Lock = (*ConsulLock)(nil)
_ physical.Transactional = (*ConsulBackend)(nil)
_ physical.Backend = (*ConsulBackend)(nil)
_ physical.FencingHABackend = (*ConsulBackend)(nil)
_ physical.Lock = (*ConsulLock)(nil)
_ physical.Transactional = (*ConsulBackend)(nil)
_ physical.TransactionalLimits = (*ConsulBackend)(nil)

GetInTxnDisabledError = errors.New("get operations inside transactions are disabled in consul backend")
)
Expand Down Expand Up @@ -430,6 +431,15 @@ func (c *ConsulBackend) makeApiTxn(txn *physical.TxnEntry) (*api.TxnOp, error) {
return &api.TxnOp{KV: op}, nil
}

func (c *ConsulBackend) TransactionLimits() (int, int) {
// Note that even for modern Consul versions that support 128 entries per txn,
// we have an effective limit of 64 write operations because the other 64 are
// used for undo log read operations. We also reserve 1 for a check-session
// operation to prevent split brain so the most we allow WAL to put in a batch
// is 63.
return 63, 128 * 1024
}

// Put is used to insert or update an entry
func (c *ConsulBackend) Put(ctx context.Context, entry *physical.Entry) error {
txns := []*physical.TxnEntry{
Expand Down
4 changes: 4 additions & 0 deletions physical/consul/consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ func TestConsul_newConsulBackend(t *testing.T) {
// if test.max_parallel != cap(c.permitPool) {
// t.Errorf("bad: %v != %v", test.max_parallel, cap(c.permitPool))
// }

maxEntries, maxBytes := be.(physical.TransactionalLimits).TransactionLimits()
require.Equal(t, 63, maxEntries)
require.Equal(t, 128*1024, maxBytes)
}
}

Expand Down
71 changes: 67 additions & 4 deletions physical/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,36 @@ const (
// EnvVaultRaftNonVoter is used to override the non_voter config option, telling Vault to join as a non-voter (i.e. read replica).
EnvVaultRaftNonVoter = "VAULT_RAFT_RETRY_JOIN_AS_NON_VOTER"
raftNonVoterConfigKey = "retry_join_as_non_voter"

// EnvVaultRaftMaxBatchEntries is used to override the default maxBatchEntries
// limit.
EnvVaultRaftMaxBatchEntries = "VAULT_RAFT_MAX_BATCH_ENTRIES"

// EnvVaultRaftMaxBatchSizeBytes is used to override the default maxBatchSize
// limit.
EnvVaultRaftMaxBatchSizeBytes = "VAULT_RAFT_MAX_BATCH_SIZE_BYTES"

// defaultMaxBatchEntries is the default maxBatchEntries limit. This was
// derived from performance testing. It is effectively high enough never to be
// a real limit for realistic Vault operation sizes and the size limit
// provides the actual limit since that amount of data stored is more relevant
// that the specific number of operations.
defaultMaxBatchEntries = 4096

// defaultMaxBatchSize is the default maxBatchSize limit. This was derived
// from performance testing.
defaultMaxBatchSize = 128 * 1024
)

var getMmapFlags = func(string) int { return 0 }

// Verify RaftBackend satisfies the correct interfaces
var (
_ physical.Backend = (*RaftBackend)(nil)
_ physical.Transactional = (*RaftBackend)(nil)
_ physical.HABackend = (*RaftBackend)(nil)
_ physical.Lock = (*RaftLock)(nil)
_ physical.Backend = (*RaftBackend)(nil)
_ physical.Transactional = (*RaftBackend)(nil)
_ physical.TransactionalLimits = (*RaftBackend)(nil)
_ physical.HABackend = (*RaftBackend)(nil)
_ physical.Lock = (*RaftLock)(nil)
)

var (
Expand Down Expand Up @@ -141,6 +161,17 @@ type RaftBackend struct {
// performance.
maxEntrySize uint64

// maxBatchEntries is the number of operation entries in each batch. It is set
// by default to a value we've tested to work well but may be overridden by
// Environment variable VAULT_RAFT_MAX_BATCH_ENTRIES.
maxBatchEntries int

// maxBatchSize is the maximum combined key and value size of operation
// entries in each batch. It is set by default to a value we've tested to work
// well but may be overridden by Environment variable
// VAULT_RAFT_MAX_BATCH_SIZE_BYTES.
maxBatchSize int

// autopilot is the instance of raft-autopilot library implementation of the
// autopilot features. This will be instantiated in both leader and followers.
// However, only active node will have a "running" autopilot.
Expand Down Expand Up @@ -339,6 +370,30 @@ func (c *ClusterAddrBridge) ServerAddr(id raft.ServerID) (raft.ServerAddress, er
return "", fmt.Errorf("could not find cluster addr for id=%s", id)
}

func batchLimitsFromEnv(logger log.Logger) (int, int) {
maxBatchEntries := defaultMaxBatchEntries
if envVal := os.Getenv(EnvVaultRaftMaxBatchEntries); envVal != "" {
if i, err := strconv.Atoi(envVal); err == nil && i > 0 {
maxBatchEntries = i
} else {
logger.Warn("failed to parse VAULT_RAFT_MAX_BATCH_ENTRIES as an integer > 0. Using default value.",
"env_val", envVal, "default_used", maxBatchEntries)
}
}

maxBatchSize := defaultMaxBatchSize
if envVal := os.Getenv(EnvVaultRaftMaxBatchSizeBytes); envVal != "" {
if i, err := strconv.Atoi(envVal); err == nil && i > 0 {
maxBatchSize = i
} else {
logger.Warn("failed to parse VAULT_RAFT_MAX_BATCH_SIZE_BYTES as an integer > 0. Using default value.",
"env_val", envVal, "default_used", maxBatchSize)
}
}

return maxBatchEntries, maxBatchSize
}

// NewRaftBackend constructs a RaftBackend using the given directory
func NewRaftBackend(conf map[string]string, logger log.Logger) (physical.Backend, error) {
path := os.Getenv(EnvVaultRaftPath)
Expand Down Expand Up @@ -531,6 +586,8 @@ func NewRaftBackend(conf map[string]string, logger log.Logger) (physical.Backend
return nil, fmt.Errorf("setting %s to true is only valid if at least one retry_join stanza is specified", raftNonVoterConfigKey)
}

maxBatchEntries, maxBatchSize := batchLimitsFromEnv(logger)

return &RaftBackend{
logger: logger,
fsm: fsm,
Expand All @@ -543,6 +600,8 @@ func NewRaftBackend(conf map[string]string, logger log.Logger) (physical.Backend
localID: localID,
permitPool: physical.NewPermitPool(physical.DefaultParallelOperations),
maxEntrySize: maxEntrySize,
maxBatchEntries: maxBatchEntries,
maxBatchSize: maxBatchSize,
followerHeartbeatTicker: time.NewTicker(time.Second),
autopilotReconcileInterval: reconcileInterval,
autopilotUpdateInterval: updateInterval,
Expand Down Expand Up @@ -1674,6 +1733,10 @@ func (b *RaftBackend) Transaction(ctx context.Context, txns []*physical.TxnEntry
return err
}

func (b *RaftBackend) TransactionLimits() (int, int) {
return b.maxBatchEntries, b.maxBatchSize
}

// applyLog will take a given log command and apply it to the raft log. applyLog
// doesn't return until the log has been applied to a quorum of servers and is
// persisted to the local FSM. Caller should hold the backend's read lock.
Expand Down
83 changes: 83 additions & 0 deletions physical/raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/hashicorp/raft"
"github.com/hashicorp/vault/sdk/helper/jsonutil"
"github.com/hashicorp/vault/sdk/physical"
"github.com/stretchr/testify/require"
)

func connectPeers(nodes ...*RaftBackend) {
Expand Down Expand Up @@ -634,6 +635,88 @@ func TestRaft_TransactionalBackend_ThreeNode(t *testing.T) {
compareFSMs(t, raft1.fsm, raft3.fsm)
}

// TestRaft_TransactionalLimitsEnvOverride ensures the ENV var overrides for
// transaction size limits are plumbed through as expected.
func TestRaft_TransactionalLimitsEnvOverride(t *testing.T) {
tc := []struct {
name string
envEntries string
envSize string
wantEntries int
wantSize int
wantLog string
}{
{
name: "defaults",
wantEntries: defaultMaxBatchEntries,
wantSize: defaultMaxBatchSize,
},
{
name: "valid env",
envEntries: "123",
envSize: "456",
wantEntries: 123,
wantSize: 456,
},
{
name: "invalid entries",
envEntries: "not-a-number",
envSize: "100",
wantEntries: defaultMaxBatchEntries,
wantSize: 100,
wantLog: "failed to parse VAULT_RAFT_MAX_BATCH_ENTRIES",
},
{
name: "invalid entries",
envEntries: "100",
envSize: "asdasdsasd",
wantEntries: 100,
wantSize: defaultMaxBatchSize,
wantLog: "failed to parse VAULT_RAFT_MAX_BATCH_SIZE_BYTES",
},
{
name: "zero entries",
envEntries: "0",
envSize: "100",
wantEntries: defaultMaxBatchEntries,
wantSize: 100,
wantLog: "failed to parse VAULT_RAFT_MAX_BATCH_ENTRIES as an integer > 0",
},
{
name: "zero size",
envEntries: "100",
envSize: "0",
wantEntries: 100,
wantSize: defaultMaxBatchSize,
wantLog: "failed to parse VAULT_RAFT_MAX_BATCH_SIZE_BYTES as an integer > 0",
},
}

for _, tt := range tc {
t.Run(tt.name, func(t *testing.T) {
// Set the env vars within this test
if tt.envEntries != "" {
t.Setenv(EnvVaultRaftMaxBatchEntries, tt.envEntries)
}
if tt.envSize != "" {
t.Setenv(EnvVaultRaftMaxBatchSizeBytes, tt.envSize)
}

var logBuf bytes.Buffer
raft1, dir := GetRaftWithLogOutput(t, false, true, &logBuf)
defer os.RemoveAll(dir)

e, s := raft1.TransactionLimits()

require.Equal(t, tt.wantEntries, e)
require.Equal(t, tt.wantSize, s)
if tt.wantLog != "" {
require.Contains(t, logBuf.String(), tt.wantLog)
}
})
}
}

func TestRaft_Backend_Performance(t *testing.T) {
b, dir := GetRaft(t, true, false)
defer os.RemoveAll(dir)
Expand Down
20 changes: 16 additions & 4 deletions physical/raft/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package raft
import (
"context"
"fmt"
"io"
"io/ioutil"
"testing"

Expand All @@ -20,18 +21,29 @@ func GetRaft(t testing.TB, bootstrap bool, noStoreState bool) (*RaftBackend, str
}
t.Logf("raft dir: %s", raftDir)

return getRaftWithDir(t, bootstrap, noStoreState, raftDir)
return getRaftWithDirAndLogOutput(t, bootstrap, noStoreState, raftDir, nil)
}

func getRaftWithDir(t testing.TB, bootstrap bool, noStoreState bool, raftDir string) (*RaftBackend, string) {
func GetRaftWithLogOutput(t testing.TB, bootstrap bool, noStoreState bool, logOutput io.Writer) (*RaftBackend, string) {
raftDir, err := ioutil.TempDir("", "vault-raft-")
if err != nil {
t.Fatal(err)
}
t.Logf("raft dir: %s", raftDir)

return getRaftWithDirAndLogOutput(t, bootstrap, noStoreState, raftDir, logOutput)
}

func getRaftWithDirAndLogOutput(t testing.TB, bootstrap bool, noStoreState bool, raftDir string, logOutput io.Writer) (*RaftBackend, string) {
id, err := uuid.GenerateUUID()
if err != nil {
t.Fatal(err)
}

logger := hclog.New(&hclog.LoggerOptions{
Name: fmt.Sprintf("raft-%s", id),
Level: hclog.Trace,
Name: fmt.Sprintf("raft-%s", id),
Level: hclog.Trace,
Output: logOutput,
})
logger.Info("raft dir", "dir", raftDir)

Expand Down
12 changes: 12 additions & 0 deletions sdk/physical/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ var (
_ ToggleablePurgemonster = (*TransactionalCache)(nil)
_ Backend = (*Cache)(nil)
_ Transactional = (*TransactionalCache)(nil)
_ TransactionalLimits = (*TransactionalCache)(nil)
)

// NewCache returns a physical cache of the given size.
Expand Down Expand Up @@ -271,3 +272,14 @@ func (c *TransactionalCache) Transaction(ctx context.Context, txns []*TxnEntry)

return nil
}

// TransactionLimits implements physical.TransactionalLimits
func (c *TransactionalCache) TransactionLimits() (int, int) {
if tl, ok := c.Transactional.(TransactionalLimits); ok {
return tl.TransactionLimits()
}
// We don't have any specific limits of our own so return zeros to signal that
// the caller should use whatever reasonable defaults it would if it used a
// non-TransactionalLimits backend.
return 0, 0
}
54 changes: 54 additions & 0 deletions sdk/physical/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package physical

import (
"testing"

"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/require"
)

func TestTransactionalCache_TransactionLimits(t *testing.T) {
tc := []struct {
name string
be Backend
wantEntries int
wantSize int
}{
{
name: "non-transactionlimits backend",
be: &TestTransactionalNonLimitBackend{},

// Should return zeros to let the implementor choose defaults.
wantEntries: 0,
wantSize: 0,
},
{
name: "transactionlimits backend",
be: &TestTransactionalLimitBackend{
MaxEntries: 123,
MaxSize: 345,
},

// Should return underlying limits
wantEntries: 123,
wantSize: 345,
},
}

for _, tt := range tc {
t.Run(tt.name, func(t *testing.T) {
logger := hclog.NewNullLogger()

be := NewTransactionalCache(tt.be, 1024, logger, nil)

// Call the TransactionLimits method
maxEntries, maxBytes := be.TransactionLimits()

require.Equal(t, tt.wantEntries, maxEntries)
require.Equal(t, tt.wantSize, maxBytes)
})
}
}
Loading
Loading