From 7213076d0a26bc44709a6a47faf20c78c066cc2d Mon Sep 17 00:00:00 2001 From: Paul Banks Date: Tue, 23 Jan 2024 10:45:47 +0000 Subject: [PATCH 1/3] Add support for larger transactions in Raft --- physical/consul/consul.go | 18 ++++++-- physical/consul/consul_test.go | 4 ++ physical/raft/raft.go | 71 +++++++++++++++++++++++++++-- physical/raft/raft_test.go | 81 ++++++++++++++++++++++++++++++++++ physical/raft/testing.go | 20 +++++++-- sdk/physical/cache.go | 12 +++++ sdk/physical/cache_test.go | 54 +++++++++++++++++++++++ sdk/physical/encoding.go | 11 +++++ sdk/physical/encoding_test.go | 51 +++++++++++++++++++++ sdk/physical/error.go | 11 +++++ sdk/physical/error_test.go | 53 ++++++++++++++++++++++ sdk/physical/inmem/inmem.go | 57 +++++++++++++++++++++--- sdk/physical/latency.go | 11 +++++ sdk/physical/latency_test.go | 53 ++++++++++++++++++++++ sdk/physical/testing.go | 40 +++++++++++++++++ sdk/physical/transactions.go | 29 ++++++++++++ 16 files changed, 558 insertions(+), 18 deletions(-) create mode 100644 sdk/physical/cache_test.go create mode 100644 sdk/physical/encoding_test.go create mode 100644 sdk/physical/error_test.go create mode 100644 sdk/physical/latency_test.go diff --git a/physical/consul/consul.go b/physical/consul/consul.go index d7c5e6505e11..dec3717a0207 100644 --- a/physical/consul/consul.go +++ b/physical/consul/consul.go @@ -41,10 +41,11 @@ const ( // Verify ConsulBackend satisfies the correct interfaces var ( - _ physical.Backend = (*ConsulBackend)(nil) - _ physical.FencingHABackend = (*ConsulBackend)(nil) - _ physical.Lock = (*ConsulLock)(nil) - _ physical.Transactional = (*ConsulBackend)(nil) + _ physical.Backend = (*ConsulBackend)(nil) + _ physical.FencingHABackend = (*ConsulBackend)(nil) + _ physical.Lock = (*ConsulLock)(nil) + _ physical.Transactional = (*ConsulBackend)(nil) + _ physical.TransactionalLimits = (*ConsulBackend)(nil) GetInTxnDisabledError = errors.New("get operations inside transactions are disabled in consul backend") ) @@ -430,6 +431,15 @@ func (c *ConsulBackend) makeApiTxn(txn *physical.TxnEntry) (*api.TxnOp, error) { return &api.TxnOp{KV: op}, nil } +func (c *ConsulBackend) TransactionLimits() (int, int) { + // Note that even for modern Consul versions that support 128 entries per txn, + // we have an effective limit of 64 write operations because the other 64 are + // used for undo log read operations. We also reserve 1 for a check-session + // operation to prevent split brain so the most we allow WAL to put in a batch + // is 63. + return 63, 128 * 1024 +} + // Put is used to insert or update an entry func (c *ConsulBackend) Put(ctx context.Context, entry *physical.Entry) error { txns := []*physical.TxnEntry{ diff --git a/physical/consul/consul_test.go b/physical/consul/consul_test.go index b01ddd210363..bf1d809afdde 100644 --- a/physical/consul/consul_test.go +++ b/physical/consul/consul_test.go @@ -159,6 +159,10 @@ func TestConsul_newConsulBackend(t *testing.T) { // if test.max_parallel != cap(c.permitPool) { // t.Errorf("bad: %v != %v", test.max_parallel, cap(c.permitPool)) // } + + maxEntries, maxBytes := be.(physical.TransactionalLimits).TransactionLimits() + require.Equal(t, 63, maxEntries) + require.Equal(t, 128*1024, maxBytes) } } diff --git a/physical/raft/raft.go b/physical/raft/raft.go index e937697777ad..86b6b3958fec 100644 --- a/physical/raft/raft.go +++ b/physical/raft/raft.go @@ -52,16 +52,36 @@ const ( // EnvVaultRaftNonVoter is used to override the non_voter config option, telling Vault to join as a non-voter (i.e. read replica). EnvVaultRaftNonVoter = "VAULT_RAFT_RETRY_JOIN_AS_NON_VOTER" raftNonVoterConfigKey = "retry_join_as_non_voter" + + // EnvVaultRaftMaxBatchEntries is used to override the default maxBatchEntries + // limit. + EnvVaultRaftMaxBatchEntries = "VAULT_RAFT_MAX_BATCH_ENTRIES" + + // EnvVaultRaftMaxBatchSizeBytes is used to override the default maxBatchSize + // limit. + EnvVaultRaftMaxBatchSizeBytes = "VAULT_RAFT_MAX_BATCH_SIZE_BYTES" + + // defaultMaxBatchEntries is the default maxBatchEntries limit. This was + // derived from performance testing. It is effectively high enough never to be + // a real limit for realistic Vault operation sizes and the size limit + // provides the actual limit since that amount of data stored is more relevant + // that the specific number of operations. + defaultMaxBatchEntries = 4096 + + // defaultMaxBatchSize is the default maxBatchSize limit. This was derived + // from performance testing. + defaultMaxBatchSize = 128 * 1024 ) var getMmapFlags = func(string) int { return 0 } // Verify RaftBackend satisfies the correct interfaces var ( - _ physical.Backend = (*RaftBackend)(nil) - _ physical.Transactional = (*RaftBackend)(nil) - _ physical.HABackend = (*RaftBackend)(nil) - _ physical.Lock = (*RaftLock)(nil) + _ physical.Backend = (*RaftBackend)(nil) + _ physical.Transactional = (*RaftBackend)(nil) + _ physical.TransactionalLimits = (*RaftBackend)(nil) + _ physical.HABackend = (*RaftBackend)(nil) + _ physical.Lock = (*RaftLock)(nil) ) var ( @@ -141,6 +161,17 @@ type RaftBackend struct { // performance. maxEntrySize uint64 + // maxBatchEntries is the number of operation entries in each batch. It is set + // by default to a value we've tested to work well but may be overridden by + // Environment variable VAULT_RAFT_MAX_BATCH_ENTRIES. + maxBatchEntries int + + // maxBatchSize is the maximum combined key and value size of operation + // entries in each batch. It is set by default to a value we've tested to work + // well but may be overridden by Environment variable + // VAULT_RAFT_MAX_BATCH_SIZE_BYTES. + maxBatchSize int + // autopilot is the instance of raft-autopilot library implementation of the // autopilot features. This will be instantiated in both leader and followers. // However, only active node will have a "running" autopilot. @@ -339,6 +370,30 @@ func (c *ClusterAddrBridge) ServerAddr(id raft.ServerID) (raft.ServerAddress, er return "", fmt.Errorf("could not find cluster addr for id=%s", id) } +func batchLimitsFromEnv(logger log.Logger) (int, int) { + maxBatchEntries := defaultMaxBatchEntries + if envVal := os.Getenv(EnvVaultRaftMaxBatchEntries); envVal != "" { + if i, err := strconv.Atoi(envVal); err == nil && i > 0 { + maxBatchEntries = i + } else { + logger.Warn("failed to parse VAULT_RAFT_MAX_BATCH_ENTRIES as an integer > 0. Using default value.", + "env_val", envVal, "default_used", maxBatchEntries) + } + } + + maxBatchSize := defaultMaxBatchSize + if envVal := os.Getenv(EnvVaultRaftMaxBatchSizeBytes); envVal != "" { + if i, err := strconv.Atoi(envVal); err == nil && i > 0 { + maxBatchSize = i + } else { + logger.Warn("failed to parse VAULT_RAFT_MAX_BATCH_SIZE_BYTES as an integer > 0. Using default value.", + "env_val", envVal, "default_used", maxBatchSize) + } + } + + return maxBatchEntries, maxBatchSize +} + // NewRaftBackend constructs a RaftBackend using the given directory func NewRaftBackend(conf map[string]string, logger log.Logger) (physical.Backend, error) { path := os.Getenv(EnvVaultRaftPath) @@ -531,6 +586,8 @@ func NewRaftBackend(conf map[string]string, logger log.Logger) (physical.Backend return nil, fmt.Errorf("setting %s to true is only valid if at least one retry_join stanza is specified", raftNonVoterConfigKey) } + maxBatchEntries, maxBatchSize := batchLimitsFromEnv(logger) + return &RaftBackend{ logger: logger, fsm: fsm, @@ -543,6 +600,8 @@ func NewRaftBackend(conf map[string]string, logger log.Logger) (physical.Backend localID: localID, permitPool: physical.NewPermitPool(physical.DefaultParallelOperations), maxEntrySize: maxEntrySize, + maxBatchEntries: maxBatchEntries, + maxBatchSize: maxBatchSize, followerHeartbeatTicker: time.NewTicker(time.Second), autopilotReconcileInterval: reconcileInterval, autopilotUpdateInterval: updateInterval, @@ -1674,6 +1733,10 @@ func (b *RaftBackend) Transaction(ctx context.Context, txns []*physical.TxnEntry return err } +func (b *RaftBackend) TransactionLimits() (int, int) { + return b.maxBatchEntries, b.maxBatchSize +} + // applyLog will take a given log command and apply it to the raft log. applyLog // doesn't return until the log has been applied to a quorum of servers and is // persisted to the local FSM. Caller should hold the backend's read lock. diff --git a/physical/raft/raft_test.go b/physical/raft/raft_test.go index e8166ad16412..a1b8ea4bc1ee 100644 --- a/physical/raft/raft_test.go +++ b/physical/raft/raft_test.go @@ -28,6 +28,7 @@ import ( "github.com/hashicorp/raft" "github.com/hashicorp/vault/sdk/helper/jsonutil" "github.com/hashicorp/vault/sdk/physical" + "github.com/stretchr/testify/require" ) func connectPeers(nodes ...*RaftBackend) { @@ -634,6 +635,86 @@ func TestRaft_TransactionalBackend_ThreeNode(t *testing.T) { compareFSMs(t, raft1.fsm, raft3.fsm) } +func TestRaft_TransactionalLimitsEnvOverride(t *testing.T) { + tc := []struct { + name string + envEntries string + envSize string + wantEntries int + wantSize int + wantLog string + }{ + { + name: "defaults", + wantEntries: defaultMaxBatchEntries, + wantSize: defaultMaxBatchSize, + }, + { + name: "valid env", + envEntries: "123", + envSize: "456", + wantEntries: 123, + wantSize: 456, + }, + { + name: "invalid entries", + envEntries: "not-a-number", + envSize: "100", + wantEntries: defaultMaxBatchEntries, + wantSize: 100, + wantLog: "failed to parse VAULT_RAFT_MAX_BATCH_ENTRIES", + }, + { + name: "invalid entries", + envEntries: "100", + envSize: "asdasdsasd", + wantEntries: 100, + wantSize: defaultMaxBatchSize, + wantLog: "failed to parse VAULT_RAFT_MAX_BATCH_SIZE_BYTES", + }, + { + name: "zero entries", + envEntries: "0", + envSize: "100", + wantEntries: defaultMaxBatchEntries, + wantSize: 100, + wantLog: "failed to parse VAULT_RAFT_MAX_BATCH_ENTRIES as an integer > 0", + }, + { + name: "zero size", + envEntries: "100", + envSize: "0", + wantEntries: 100, + wantSize: defaultMaxBatchSize, + wantLog: "failed to parse VAULT_RAFT_MAX_BATCH_SIZE_BYTES as an integer > 0", + }, + } + + for _, tt := range tc { + t.Run(tt.name, func(t *testing.T) { + // Set the env vars within this test + if tt.envEntries != "" { + t.Setenv(EnvVaultRaftMaxBatchEntries, tt.envEntries) + } + if tt.envSize != "" { + t.Setenv(EnvVaultRaftMaxBatchSizeBytes, tt.envSize) + } + + var logBuf bytes.Buffer + raft1, dir := GetRaftWithLogOutput(t, false, true, &logBuf) + defer os.RemoveAll(dir) + + e, s := raft1.TransactionLimits() + + require.Equal(t, tt.wantEntries, e) + require.Equal(t, tt.wantSize, s) + if tt.wantLog != "" { + require.Contains(t, logBuf.String(), tt.wantLog) + } + }) + } +} + func TestRaft_Backend_Performance(t *testing.T) { b, dir := GetRaft(t, true, false) defer os.RemoveAll(dir) diff --git a/physical/raft/testing.go b/physical/raft/testing.go index 632924950081..16f74ff1696b 100644 --- a/physical/raft/testing.go +++ b/physical/raft/testing.go @@ -6,6 +6,7 @@ package raft import ( "context" "fmt" + "io" "io/ioutil" "testing" @@ -20,18 +21,29 @@ func GetRaft(t testing.TB, bootstrap bool, noStoreState bool) (*RaftBackend, str } t.Logf("raft dir: %s", raftDir) - return getRaftWithDir(t, bootstrap, noStoreState, raftDir) + return getRaftWithDirAndLogOutput(t, bootstrap, noStoreState, raftDir, nil) } -func getRaftWithDir(t testing.TB, bootstrap bool, noStoreState bool, raftDir string) (*RaftBackend, string) { +func GetRaftWithLogOutput(t testing.TB, bootstrap bool, noStoreState bool, logOutput io.Writer) (*RaftBackend, string) { + raftDir, err := ioutil.TempDir("", "vault-raft-") + if err != nil { + t.Fatal(err) + } + t.Logf("raft dir: %s", raftDir) + + return getRaftWithDirAndLogOutput(t, bootstrap, noStoreState, raftDir, logOutput) +} + +func getRaftWithDirAndLogOutput(t testing.TB, bootstrap bool, noStoreState bool, raftDir string, logOutput io.Writer) (*RaftBackend, string) { id, err := uuid.GenerateUUID() if err != nil { t.Fatal(err) } logger := hclog.New(&hclog.LoggerOptions{ - Name: fmt.Sprintf("raft-%s", id), - Level: hclog.Trace, + Name: fmt.Sprintf("raft-%s", id), + Level: hclog.Trace, + Output: logOutput, }) logger.Info("raft dir", "dir", raftDir) diff --git a/sdk/physical/cache.go b/sdk/physical/cache.go index 874d6c529dea..3816609e2bcd 100644 --- a/sdk/physical/cache.go +++ b/sdk/physical/cache.go @@ -86,6 +86,7 @@ var ( _ ToggleablePurgemonster = (*TransactionalCache)(nil) _ Backend = (*Cache)(nil) _ Transactional = (*TransactionalCache)(nil) + _ TransactionalLimits = (*TransactionalCache)(nil) ) // NewCache returns a physical cache of the given size. @@ -271,3 +272,14 @@ func (c *TransactionalCache) Transaction(ctx context.Context, txns []*TxnEntry) return nil } + +// TransactionLimits implements physical.TransactionalLimits +func (c *TransactionalCache) TransactionLimits() (int, int) { + if tl, ok := c.Transactional.(TransactionalLimits); ok { + return tl.TransactionLimits() + } + // We don't have any specific limits of our own so return zeros to signal that + // the caller should use whatever reasonable defaults it would if it used a + // non-TransactionalLimits backend. + return 0, 0 +} diff --git a/sdk/physical/cache_test.go b/sdk/physical/cache_test.go new file mode 100644 index 000000000000..7e9bf3232a04 --- /dev/null +++ b/sdk/physical/cache_test.go @@ -0,0 +1,54 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package physical + +import ( + "testing" + + "github.com/hashicorp/go-hclog" + "github.com/stretchr/testify/require" +) + +func TestTransactionalCache_TransactionLimits(t *testing.T) { + tc := []struct { + name string + be Backend + wantEntries int + wantSize int + }{ + { + name: "non-transactionlimits backend", + be: &TestTransactionalNonLimitBackend{}, + + // Should return zeros to let the implementor choose defaults. + wantEntries: 0, + wantSize: 0, + }, + { + name: "transactionlimits backend", + be: &TestTransactionalLimitBackend{ + MaxEntries: 123, + MaxSize: 345, + }, + + // Should return underlying limits + wantEntries: 123, + wantSize: 345, + }, + } + + for _, tt := range tc { + t.Run(tt.name, func(t *testing.T) { + logger := hclog.NewNullLogger() + + be := NewTransactionalCache(tt.be, 1024, logger, nil) + + // Call the TransactionLimits method + maxEntries, maxBytes := be.TransactionLimits() + + require.Equal(t, tt.wantEntries, maxEntries) + require.Equal(t, tt.wantSize, maxBytes) + }) + } +} diff --git a/sdk/physical/encoding.go b/sdk/physical/encoding.go index 49e00ae6ace5..af581207f9cb 100644 --- a/sdk/physical/encoding.go +++ b/sdk/physical/encoding.go @@ -98,6 +98,17 @@ func (e *TransactionalStorageEncoding) Transaction(ctx context.Context, txns []* return e.Transactional.Transaction(ctx, txns) } +// TransactionLimits implements physical.TransactionalLimits +func (e *TransactionalStorageEncoding) TransactionLimits() (int, int) { + if tl, ok := e.Transactional.(TransactionalLimits); ok { + return tl.TransactionLimits() + } + // We don't have any specific limits of our own so return zeros to signal that + // the caller should use whatever reasonable defaults it would if it used a + // non-TransactionalLimits backend. + return 0, 0 +} + func (e *StorageEncoding) Purge(ctx context.Context) { if purgeable, ok := e.Backend.(ToggleablePurgemonster); ok { purgeable.Purge(ctx) diff --git a/sdk/physical/encoding_test.go b/sdk/physical/encoding_test.go new file mode 100644 index 000000000000..e4d9cceaa417 --- /dev/null +++ b/sdk/physical/encoding_test.go @@ -0,0 +1,51 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package physical + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestTransactionalStorageEncoding_TransactionLimits(t *testing.T) { + tc := []struct { + name string + be Backend + wantEntries int + wantSize int + }{ + { + name: "non-transactionlimits backend", + be: &TestTransactionalNonLimitBackend{}, + + // Should return zeros to let the implementor choose defaults. + wantEntries: 0, + wantSize: 0, + }, + { + name: "transactionlimits backend", + be: &TestTransactionalLimitBackend{ + MaxEntries: 123, + MaxSize: 345, + }, + + // Should return underlying limits + wantEntries: 123, + wantSize: 345, + }, + } + + for _, tt := range tc { + t.Run(tt.name, func(t *testing.T) { + be := NewStorageEncoding(tt.be).(TransactionalLimits) + + // Call the TransactionLimits method + maxEntries, maxBytes := be.TransactionLimits() + + require.Equal(t, tt.wantEntries, maxEntries) + require.Equal(t, tt.wantSize, maxBytes) + }) + } +} diff --git a/sdk/physical/error.go b/sdk/physical/error.go index 4af7b7d639fc..aa7418fd7893 100644 --- a/sdk/physical/error.go +++ b/sdk/physical/error.go @@ -111,3 +111,14 @@ func (e *TransactionalErrorInjector) Transaction(ctx context.Context, txns []*Tx } return e.Transactional.Transaction(ctx, txns) } + +// TransactionLimits implements physical.TransactionalLimits +func (e *TransactionalErrorInjector) TransactionLimits() (int, int) { + if tl, ok := e.Transactional.(TransactionalLimits); ok { + return tl.TransactionLimits() + } + // We don't have any specific limits of our own so return zeros to signal that + // the caller should use whatever reasonable defaults it would if it used a + // non-TransactionalLimits backend. + return 0, 0 +} diff --git a/sdk/physical/error_test.go b/sdk/physical/error_test.go new file mode 100644 index 000000000000..779cd1bc1c98 --- /dev/null +++ b/sdk/physical/error_test.go @@ -0,0 +1,53 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package physical + +import ( + "testing" + + "github.com/hashicorp/go-hclog" + "github.com/stretchr/testify/require" +) + +func TestTransactionalErrorInjector_TransactionLimits(t *testing.T) { + tc := []struct { + name string + be Backend + wantEntries int + wantSize int + }{ + { + name: "non-transactionlimits backend", + be: &TestTransactionalNonLimitBackend{}, + + // Should return zeros to let the implementor choose defaults. + wantEntries: 0, + wantSize: 0, + }, + { + name: "transactionlimits backend", + be: &TestTransactionalLimitBackend{ + MaxEntries: 123, + MaxSize: 345, + }, + + // Should return underlying limits + wantEntries: 123, + wantSize: 345, + }, + } + + for _, tt := range tc { + t.Run(tt.name, func(t *testing.T) { + logger := hclog.NewNullLogger() + + injector := NewTransactionalErrorInjector(tt.be, 0, logger) + + maxEntries, maxBytes := injector.TransactionLimits() + + require.Equal(t, tt.wantEntries, maxEntries) + require.Equal(t, tt.wantSize, maxBytes) + }) + } +} diff --git a/sdk/physical/inmem/inmem.go b/sdk/physical/inmem/inmem.go index e4fa1f69ba23..e30294f1e35f 100644 --- a/sdk/physical/inmem/inmem.go +++ b/sdk/physical/inmem/inmem.go @@ -16,16 +16,18 @@ import ( "github.com/armon/go-radix" log "github.com/hashicorp/go-hclog" "github.com/hashicorp/vault/sdk/physical" + uberAtomic "go.uber.org/atomic" ) // Verify interfaces are satisfied var ( - _ physical.Backend = (*InmemBackend)(nil) - _ physical.HABackend = (*InmemHABackend)(nil) - _ physical.HABackend = (*TransactionalInmemHABackend)(nil) - _ physical.Lock = (*InmemLock)(nil) - _ physical.Transactional = (*TransactionalInmemBackend)(nil) - _ physical.Transactional = (*TransactionalInmemHABackend)(nil) + _ physical.Backend = (*InmemBackend)(nil) + _ physical.HABackend = (*InmemHABackend)(nil) + _ physical.HABackend = (*TransactionalInmemHABackend)(nil) + _ physical.Lock = (*InmemLock)(nil) + _ physical.Transactional = (*TransactionalInmemBackend)(nil) + _ physical.Transactional = (*TransactionalInmemHABackend)(nil) + _ physical.TransactionalLimits = (*TransactionalInmemBackend)(nil) ) var ( @@ -55,6 +57,16 @@ type InmemBackend struct { type TransactionalInmemBackend struct { InmemBackend + + // Using Uber atomic because our SemGrep rules don't like the old pointer + // trick we used above any more even though it's fine. The newer sync/atomic + // types are almost the same, but lack was to initialize them cleanly in New* + // functions so sticking with what SemGrep likes for now. + maxBatchEntries *uberAtomic.Int32 + maxBatchSize *uberAtomic.Int32 + + largestBatchLen *uberAtomic.Uint64 + largestBatchSize *uberAtomic.Uint64 } // NewInmem constructs a new in-memory backend @@ -109,6 +121,11 @@ func NewTransactionalInmem(conf map[string]string, logger log.Logger) (physical. logOps: os.Getenv("VAULT_INMEM_LOG_ALL_OPS") != "", maxValueSize: maxValueSize, }, + + maxBatchEntries: uberAtomic.NewInt32(64), + maxBatchSize: uberAtomic.NewInt32(128 * 1024), + largestBatchLen: uberAtomic.NewUint64(0), + largestBatchSize: uberAtomic.NewUint64(0), }, nil } @@ -303,11 +320,39 @@ func (t *TransactionalInmemBackend) Transaction(ctx context.Context, txns []*phy defer t.Unlock() failGetInTxn := atomic.LoadUint32(t.failGetInTxn) + size := uint64(0) for _, t := range txns { + // We use 2x key length to match the logic in WALBackend.persistWALs + // presumably this is attempting to account for some amount of encoding + // overhead. + size += uint64(2*len(t.Entry.Key) + len(t.Entry.Value)) if t.Operation == physical.GetOperation && failGetInTxn != 0 { return GetInTxnDisabledError } } + if size > t.largestBatchSize.Load() { + t.largestBatchSize.Store(size) + } + if len(txns) > int(t.largestBatchLen.Load()) { + t.largestBatchLen.Store(uint64(len(txns))) + } + return physical.GenericTransactionHandler(ctx, t, txns) } + +func (t *TransactionalInmemBackend) SetMaxBatchEntries(entries int) { + t.maxBatchEntries.Store(int32(entries)) +} + +func (t *TransactionalInmemBackend) SetMaxBatchSize(entries int) { + t.maxBatchSize.Store(int32(entries)) +} + +func (t *TransactionalInmemBackend) TransactionLimits() (int, int) { + return int(t.maxBatchEntries.Load()), int(t.maxBatchSize.Load()) +} + +func (t *TransactionalInmemBackend) BatchStats() (maxEntries uint64, maxSize uint64) { + return t.largestBatchLen.Load(), t.largestBatchSize.Load() +} diff --git a/sdk/physical/latency.go b/sdk/physical/latency.go index f4cced5270b1..56d045e30264 100644 --- a/sdk/physical/latency.go +++ b/sdk/physical/latency.go @@ -117,3 +117,14 @@ func (l *TransactionalLatencyInjector) Transaction(ctx context.Context, txns []* l.addLatency() return l.Transactional.Transaction(ctx, txns) } + +// TransactionLimits implements physical.TransactionalLimits +func (l *TransactionalLatencyInjector) TransactionLimits() (int, int) { + if tl, ok := l.Transactional.(TransactionalLimits); ok { + return tl.TransactionLimits() + } + // We don't have any specific limits of our own so return zeros to signal that + // the caller should use whatever reasonable defaults it would if it used a + // non-TransactionalLimits backend. + return 0, 0 +} diff --git a/sdk/physical/latency_test.go b/sdk/physical/latency_test.go new file mode 100644 index 000000000000..2585a04e0f12 --- /dev/null +++ b/sdk/physical/latency_test.go @@ -0,0 +1,53 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package physical + +import ( + "testing" + + "github.com/hashicorp/go-hclog" + "github.com/stretchr/testify/require" +) + +func TestTransactionalLatencyInjector_TransactionLimits(t *testing.T) { + tc := []struct { + name string + be Backend + wantEntries int + wantSize int + }{ + { + name: "non-transactionlimits backend", + be: &TestTransactionalNonLimitBackend{}, + + // Should return zeros to let the implementor choose defaults. + wantEntries: 0, + wantSize: 0, + }, + { + name: "transactionlimits backend", + be: &TestTransactionalLimitBackend{ + MaxEntries: 123, + MaxSize: 345, + }, + + // Should return underlying limits + wantEntries: 123, + wantSize: 345, + }, + } + + for _, tt := range tc { + t.Run(tt.name, func(t *testing.T) { + logger := hclog.NewNullLogger() + + injector := NewTransactionalLatencyInjector(tt.be, 0, 0, logger) + + maxEntries, maxBytes := injector.TransactionLimits() + + require.Equal(t, tt.wantEntries, maxEntries) + require.Equal(t, tt.wantSize, maxBytes) + }) + } +} diff --git a/sdk/physical/testing.go b/sdk/physical/testing.go index b80f3697ea26..1114f34a5195 100644 --- a/sdk/physical/testing.go +++ b/sdk/physical/testing.go @@ -518,3 +518,43 @@ func SetupTestingTransactions(t testing.TB, b Backend) []*TxnEntry { return txns } + +// Several tests across packages have to test logic with a few variations of +// transactional backends. Make some suitable for testing limits support that +// can be re-used. + +type TestTransactionalNonLimitBackend struct{} + +var _ Transactional = (*TestTransactionalNonLimitBackend)(nil) + +func (b *TestTransactionalNonLimitBackend) Put(ctx context.Context, entry *Entry) error { + return nil +} + +func (b *TestTransactionalNonLimitBackend) Get(ctx context.Context, key string) (*Entry, error) { + return nil, nil +} + +func (b *TestTransactionalNonLimitBackend) Delete(ctx context.Context, key string) error { + return nil +} + +func (b *TestTransactionalNonLimitBackend) List(ctx context.Context, prefix string) ([]string, error) { + return nil, nil +} + +func (b *TestTransactionalNonLimitBackend) Transaction(ctx context.Context, txns []*TxnEntry) error { + return nil +} + +type TestTransactionalLimitBackend struct { + TestTransactionalNonLimitBackend + + MaxEntries, MaxSize int +} + +var _ TransactionalLimits = (*TestTransactionalLimitBackend)(nil) + +func (b *TestTransactionalLimitBackend) TransactionLimits() (int, int) { + return b.MaxEntries, b.MaxSize +} diff --git a/sdk/physical/transactions.go b/sdk/physical/transactions.go index 8d4e33321e2c..de91689ffc57 100644 --- a/sdk/physical/transactions.go +++ b/sdk/physical/transactions.go @@ -34,6 +34,35 @@ type TransactionalBackend interface { Transactional } +// TransactionalLimits SHOULD be implemented by all TransactionalBackend +// implementations. It is separate for backwards compatibility reasons since +// this in a public SDK module. If a TransactionalBackend does not implement +// this, the historic default limits of 63 entries and 128kb (based on Consul's +// limits) are used by replication internals when encoding batches of +// transactions. +type TransactionalLimits interface { + TransactionalBackend + + // TransactionLimits must return the limits of how large each transaction may + // be. The limits returned indicate how many individual operation entries are + // supported in total and an overall size limit on the contents of each + // transaction if applicable. Vault will deduct any meta-operations it needs + // to add from the maxEntries given. maxSize will be compared against the sum + // of the key and value sizes for all operations in a transaction. The backend + // should provide a reasonable margin of safety for any overhead it may have + // while encoding, for example Consul's encoded transaction in JSON must fit + // in the configured max transaction size so it must leave adequate room for + // JSON encoding overhead on top of the raw key and value sizes. + // + // If zero is returned for either value, the replication internals will use + // historic reasonable defaults. This allows middleware implementations such + // as cache layers to either pass through to the underlying backend if it + // implements this interface, or to return zeros to indicate that the + // implementer should apply whatever defaults it would use if the middleware + // were not present. + TransactionLimits() (maxEntries int, maxSize int) +} + type PseudoTransactional interface { // An internal function should do no locking or permit pool acquisition. // Depending on the backend and if it natively supports transactions, these From 50320f421308adfc8f6b1b5263da2ba32b851f14 Mon Sep 17 00:00:00 2001 From: Paul Banks Date: Tue, 23 Jan 2024 10:53:42 +0000 Subject: [PATCH 2/3] Add CHANGELOG --- changelog/24991.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 changelog/24991.txt diff --git a/changelog/24991.txt b/changelog/24991.txt new file mode 100644 index 000000000000..28df55379bee --- /dev/null +++ b/changelog/24991.txt @@ -0,0 +1,3 @@ +```release-note:improvement +storage/raft: Add support for larger transactions when using raft storage. +``` From baee8dc5ca6a6432a8c10555d69e0eaeb43a8868 Mon Sep 17 00:00:00 2001 From: Paul Banks Date: Tue, 23 Jan 2024 11:23:17 +0000 Subject: [PATCH 3/3] Appease the new lint rules --- physical/raft/raft_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/physical/raft/raft_test.go b/physical/raft/raft_test.go index a1b8ea4bc1ee..c634d1d8c97d 100644 --- a/physical/raft/raft_test.go +++ b/physical/raft/raft_test.go @@ -635,6 +635,8 @@ func TestRaft_TransactionalBackend_ThreeNode(t *testing.T) { compareFSMs(t, raft1.fsm, raft3.fsm) } +// TestRaft_TransactionalLimitsEnvOverride ensures the ENV var overrides for +// transaction size limits are plumbed through as expected. func TestRaft_TransactionalLimitsEnvOverride(t *testing.T) { tc := []struct { name string