Skip to content

Commit

Permalink
go/worker/storage: Add configurable limits for storage operations
Browse files Browse the repository at this point in the history
  • Loading branch information
kostko committed Jan 30, 2020
1 parent 8b63486 commit e563f39
Show file tree
Hide file tree
Showing 16 changed files with 181 additions and 16 deletions.
1 change: 1 addition & 0 deletions .changelog/1914.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/worker/storage: Add configurable limits for storage operations.
6 changes: 5 additions & 1 deletion go/genesis/genesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,11 @@ func TestGenesisSanityCheck(t *testing.T) {
MaxBatchSizeBytes: 1,
},
Storage: registry.StorageParameters{
GroupSize: 1,
GroupSize: 1,
MaxApplyWriteLogEntries: 100_000,
MaxApplyOps: 2,
MaxMergeRoots: 8,
MaxMergeOps: 2,
},
AdmissionPolicy: registry.RuntimeAdmissionPolicy{
AnyNode: &registry.AnyNodeRuntimeAdmissionPolicy{},
Expand Down
8 changes: 7 additions & 1 deletion go/oasis-net-runner/fixtures/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,13 @@ func NewDefaultFixture() (*oasis.NetworkFixture, error) {
MaxBatchSizeBytes: 1000,
BatchFlushTimeout: 20 * time.Second,
},
Storage: registry.StorageParameters{GroupSize: 1},
Storage: registry.StorageParameters{
GroupSize: 1,
MaxApplyWriteLogEntries: 100_000,
MaxApplyOps: 2,
MaxMergeRoots: 8,
MaxMergeOps: 2,
},
AdmissionPolicy: registry.RuntimeAdmissionPolicy{
AnyNode: &registry.AnyNodeRuntimeAdmissionPolicy{},
},
Expand Down
32 changes: 23 additions & 9 deletions go/oasis-node/cmd/registry/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ const (
CfgMergeRoundTimeout = "runtime.merge.round_timeout"

// Storage committee flags.
CfgStorageGroupSize = "runtime.storage.group_size"
CfgStorageGroupSize = "runtime.storage.group_size"
CfgStorageMaxApplyWriteLogEntries = "runtime.storage.max_apply_write_log_entries"
CfgStorageMaxApplyOps = "runtime.storage.max_apply_ops"
CfgStorageMaxMergeRoots = "runtime.storage.max_merge_roots"
CfgStorageMaxMergeOps = "runtime.storage.max_merge_ops"

// Transaction scheduler flags.
CfgTxnSchedulerGroupSize = "runtime.txn_scheduler.group_size"
Expand Down Expand Up @@ -356,25 +360,31 @@ func runtimeFromFlags() (*registry.Runtime, signature.Signer, error) {
},
KeyManager: kmID,
Executor: registry.ExecutorParameters{
GroupSize: uint64(viper.GetInt64(CfgExecutorGroupSize)),
GroupBackupSize: uint64(viper.GetInt64(CfgExecutorGroupBackupSize)),
AllowedStragglers: uint64(viper.GetInt64(CfgExecutorAllowedStragglers)),
GroupSize: viper.GetUint64(CfgExecutorGroupSize),
GroupBackupSize: viper.GetUint64(CfgExecutorGroupBackupSize),
AllowedStragglers: viper.GetUint64(CfgExecutorAllowedStragglers),
RoundTimeout: viper.GetDuration(CfgExecutorRoundTimeout),
},
Merge: registry.MergeParameters{
GroupSize: uint64(viper.GetInt64(CfgMergeGroupSize)),
GroupBackupSize: uint64(viper.GetInt64(CfgMergeGroupBackupSize)),
AllowedStragglers: uint64(viper.GetInt64(CfgMergeAllowedStragglers)),
GroupSize: viper.GetUint64(CfgMergeGroupSize),
GroupBackupSize: viper.GetUint64(CfgMergeGroupBackupSize),
AllowedStragglers: viper.GetUint64(CfgMergeAllowedStragglers),
RoundTimeout: viper.GetDuration(CfgMergeRoundTimeout),
},
TxnScheduler: registry.TxnSchedulerParameters{
GroupSize: uint64(viper.GetInt64(CfgTxnSchedulerGroupSize)),
GroupSize: viper.GetUint64(CfgTxnSchedulerGroupSize),
Algorithm: viper.GetString(CfgTxnSchedulerAlgorithm),
BatchFlushTimeout: viper.GetDuration(CfgTxnSchedulerBatchFlushTimeout),
MaxBatchSize: viper.GetUint64(CfgTxnSchedulerMaxBatchSize),
MaxBatchSizeBytes: uint64(viper.GetSizeInBytes(CfgTxnSchedulerMaxBatchSizeBytes)),
},
Storage: registry.StorageParameters{GroupSize: uint64(viper.GetInt64(CfgStorageGroupSize))},
Storage: registry.StorageParameters{
GroupSize: viper.GetUint64(CfgStorageGroupSize),
MaxApplyWriteLogEntries: viper.GetUint64(CfgStorageMaxApplyWriteLogEntries),
MaxApplyOps: viper.GetUint64(CfgStorageMaxApplyOps),
MaxMergeRoots: viper.GetUint64(CfgStorageMaxMergeRoots),
MaxMergeOps: viper.GetUint64(CfgStorageMaxMergeOps),
},
}
if teeHardware == node.TEEHardwareIntelSGX {
var vi registry.VersionInfoIntelSGX
Expand Down Expand Up @@ -503,6 +513,10 @@ func init() {

// Init Storage committee flags.
runtimeFlags.Uint64(CfgStorageGroupSize, 1, "Number of storage nodes for the runtime")
runtimeFlags.Uint64(CfgStorageMaxApplyWriteLogEntries, 100_000, "Maximum number of write log entries")
runtimeFlags.Uint64(CfgStorageMaxApplyOps, 2, "Maximum number of apply operations in a batch")
runtimeFlags.Uint64(CfgStorageMaxMergeRoots, 8, "Maximum number of merge roots")
runtimeFlags.Uint64(CfgStorageMaxMergeOps, 2, "Maximum number of merge operations in a batch")

// Init Admission policy flags.
runtimeFlags.String(CfgAdmissionPolicy, "", "What type of node admission policy to have")
Expand Down
8 changes: 7 additions & 1 deletion go/oasis-node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,13 @@ var (
MaxBatchSizeBytes: 1000,
BatchFlushTimeout: 20 * time.Second,
},
Storage: registry.StorageParameters{GroupSize: 1},
Storage: registry.StorageParameters{
GroupSize: 1,
MaxApplyWriteLogEntries: 100_000,
MaxApplyOps: 2,
MaxMergeRoots: 8,
MaxMergeOps: 2,
},
AdmissionPolicy: registry.RuntimeAdmissionPolicy{
AnyNode: &registry.AnyNodeRuntimeAdmissionPolicy{},
},
Expand Down
4 changes: 4 additions & 0 deletions go/oasis-test-runner/oasis/cli/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func (r *RegistryHelpers) GenerateRegisterRuntimeTx(
"--"+cmdRegRt.CfgMergeAllowedStragglers, strconv.FormatUint(runtime.Merge.AllowedStragglers, 10),
"--"+cmdRegRt.CfgMergeRoundTimeout, runtime.Merge.RoundTimeout.String(),
"--"+cmdRegRt.CfgStorageGroupSize, strconv.FormatUint(runtime.Storage.GroupSize, 10),
"--"+cmdRegRt.CfgStorageMaxApplyWriteLogEntries, strconv.FormatUint(runtime.Storage.MaxApplyWriteLogEntries, 10),
"--"+cmdRegRt.CfgStorageMaxApplyOps, strconv.FormatUint(runtime.Storage.MaxApplyOps, 10),
"--"+cmdRegRt.CfgStorageMaxMergeRoots, strconv.FormatUint(runtime.Storage.MaxMergeRoots, 10),
"--"+cmdRegRt.CfgStorageMaxMergeOps, strconv.FormatUint(runtime.Storage.MaxMergeOps, 10),
"--"+cmdRegRt.CfgTxnSchedulerGroupSize, strconv.FormatUint(runtime.TxnScheduler.GroupSize, 10),
"--"+cmdRegRt.CfgTxnSchedulerAlgorithm, runtime.TxnScheduler.Algorithm,
"--"+cmdRegRt.CfgTxnSchedulerBatchFlushTimeout, runtime.TxnScheduler.BatchFlushTimeout.String(),
Expand Down
4 changes: 4 additions & 0 deletions go/oasis-test-runner/oasis/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ func (net *Network) NewRuntime(cfg *RuntimeCfg) (*Runtime, error) {
"--" + cmdRegRt.CfgTxnSchedulerAlgorithm, cfg.TxnScheduler.Algorithm,
"--" + cmdRegRt.CfgTxnSchedulerBatchFlushTimeout, cfg.TxnScheduler.BatchFlushTimeout.String(),
"--" + cmdRegRt.CfgStorageGroupSize, strconv.FormatUint(cfg.Storage.GroupSize, 10),
"--" + cmdRegRt.CfgStorageMaxApplyWriteLogEntries, strconv.FormatUint(cfg.Storage.MaxApplyWriteLogEntries, 10),
"--" + cmdRegRt.CfgStorageMaxApplyOps, strconv.FormatUint(cfg.Storage.MaxApplyOps, 10),
"--" + cmdRegRt.CfgStorageMaxMergeRoots, strconv.FormatUint(cfg.Storage.MaxMergeRoots, 10),
"--" + cmdRegRt.CfgStorageMaxMergeOps, strconv.FormatUint(cfg.Storage.MaxMergeOps, 10),
}...)

if cfg.GenesisState != "" {
Expand Down
8 changes: 7 additions & 1 deletion go/oasis-test-runner/scenario/e2e/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,13 @@ func (sc *basicImpl) Fixture() (*oasis.NetworkFixture, error) {
MaxBatchSizeBytes: 1000,
BatchFlushTimeout: 1 * time.Second,
},
Storage: registry.StorageParameters{GroupSize: 2},
Storage: registry.StorageParameters{
GroupSize: 2,
MaxApplyWriteLogEntries: 100_000,
MaxApplyOps: 2,
MaxMergeRoots: 8,
MaxMergeOps: 2,
},
AdmissionPolicy: registry.RuntimeAdmissionPolicy{
AnyNode: &registry.AnyNodeRuntimeAdmissionPolicy{},
},
Expand Down
8 changes: 7 additions & 1 deletion go/oasis-test-runner/scenario/e2e/multiple_runtimes.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,13 @@ func (mr *multipleRuntimesImpl) Fixture() (*oasis.NetworkFixture, error) {
MaxBatchSizeBytes: 1000,
BatchFlushTimeout: 1 * time.Second,
},
Storage: registry.StorageParameters{GroupSize: 2},
Storage: registry.StorageParameters{
GroupSize: 2,
MaxApplyWriteLogEntries: 100_000,
MaxApplyOps: 2,
MaxMergeRoots: 8,
MaxMergeOps: 2,
},
AdmissionPolicy: registry.RuntimeAdmissionPolicy{
AnyNode: &registry.AnyNodeRuntimeAdmissionPolicy{},
},
Expand Down
6 changes: 5 additions & 1 deletion go/oasis-test-runner/scenario/e2e/registry_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,11 @@ func (r *registryCLIImpl) testRuntime(childEnv *env.Env, cli *cli.Helpers) error
MaxBatchSizeBytes: 13,
},
Storage: registry.StorageParameters{
GroupSize: 9,
GroupSize: 9,
MaxApplyWriteLogEntries: 10,
MaxApplyOps: 11,
MaxMergeRoots: 12,
MaxMergeOps: 13,
},
AdmissionPolicy: registry.RuntimeAdmissionPolicy{
EntityWhitelist: &registry.EntityWhitelistRuntimeAdmissionPolicy{
Expand Down
24 changes: 24 additions & 0 deletions go/registry/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1073,6 +1073,30 @@ func VerifyRegisterRuntimeArgs(
)
return nil, fmt.Errorf("%w: storage group too small", ErrInvalidArgument)
}
if rt.Storage.MaxApplyWriteLogEntries < 10 {
logger.Error("RegisterRuntime: storage MaxApplyWriteLogEntries parameter too small",
"runtime", rt,
)
return nil, fmt.Errorf("%w: storage MaxApplyWriteLogEntries parameter too small", ErrInvalidArgument)
}
if rt.Storage.MaxApplyOps < 2 {
logger.Error("RegisterRuntime: storage MaxApplyOps parameter too small",
"runtime", rt,
)
return nil, fmt.Errorf("%w: storage MaxApplyOps parameter too small", ErrInvalidArgument)
}
if rt.Storage.MaxMergeRoots == 0 {
logger.Error("RegisterRuntime: storage MaxMergeRoots parameter too small",
"runtime", rt,
)
return nil, fmt.Errorf("%w: storage MaxMergeRoots parameter too small", ErrInvalidArgument)
}
if rt.Storage.MaxMergeOps < 2 {
logger.Error("RegisterRuntime: storage MaxMergeOps parameter too small",
"runtime", rt,
)
return nil, fmt.Errorf("%w: storage MaxMergeOps parameter too small", ErrInvalidArgument)
}

if rt.ID.IsKeyManager() {
logger.Error("RegisterRuntime: runtime ID flag mismatch",
Expand Down
13 changes: 13 additions & 0 deletions go/registry/api/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,19 @@ type TxnSchedulerParameters struct {
type StorageParameters struct {
// GroupSize is the size of the storage group.
GroupSize uint64 `json:"group_size"`

// MaxApplyWriteLogEntries is the maximum number of write log entries when performing an Apply
// operation.
MaxApplyWriteLogEntries uint64 `json:"max_apply_write_log_entries"`

// MaxApplyOps is the maximum number of apply operations in a batch.
MaxApplyOps uint64 `json:"max_apply_ops"`

// MaxMergeRoots is the maximum number of merge roots.
MaxMergeRoots uint64 `json:"max_merge_roots"`

// MaxApplyOps configures the maximum number of merge operations in a batch.
MaxMergeOps uint64 `json:"max_merge_ops"`
}

// AnyNodeRuntimeAdmissionPolicy allows any node to register.
Expand Down
8 changes: 7 additions & 1 deletion go/registry/tests/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1233,7 +1233,13 @@ func NewTestRuntime(seed []byte, entity *TestEntity, isKeyManager bool) (*TestRu
MaxBatchSize: 1,
MaxBatchSizeBytes: 1000,
},
Storage: api.StorageParameters{GroupSize: 3},
Storage: api.StorageParameters{
GroupSize: 3,
MaxApplyWriteLogEntries: 100_000,
MaxApplyOps: 2,
MaxMergeRoots: 8,
MaxMergeOps: 2,
},
AdmissionPolicy: api.RuntimeAdmissionPolicy{
AnyNode: &api.AnyNodeRuntimeAdmissionPolicy{},
},
Expand Down
2 changes: 2 additions & 0 deletions go/storage/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ var (
// ErrNoMergeRoots is the error returned when no other roots are passed
// to the Merge operation.
ErrNoMergeRoots = errors.New(ModuleName, 5, "storage: no roots to merge")
// ErrLimitReached means that a configured limit has been reached.
ErrLimitReached = errors.New(ModuleName, 6, "storage: limit reached")

// The following errors are reimports from NodeDB.

Expand Down
64 changes: 64 additions & 0 deletions go/worker/storage/service_external.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package storage
import (
"context"
"errors"
"fmt"

"github.com/oasislabs/oasis-core/go/common"
"github.com/oasislabs/oasis-core/go/common/grpc/auth"
"github.com/oasislabs/oasis-core/go/common/grpc/policy"
registry "github.com/oasislabs/oasis-core/go/registry/api"
"github.com/oasislabs/oasis-core/go/storage/api"
)

Expand Down Expand Up @@ -37,6 +40,19 @@ func (s *storageService) ensureInitialized(ctx context.Context) error {
}
}

func (s *storageService) getConfig(ctx context.Context, ns common.Namespace) (*registry.StorageParameters, error) {
rt, err := s.w.commonWorker.RuntimeRegistry.GetRuntime(ns)
if err != nil {
return nil, fmt.Errorf("storage: failed to get runtime %s: %w", ns, err)
}

rtDesc, err := rt.RegistryDescriptor(ctx)
if err != nil {
return nil, fmt.Errorf("storage: failed to get runtime %s configuration: %w", ns, err)
}
return &rtDesc.Storage, nil
}

func (s *storageService) SyncGet(ctx context.Context, request *api.GetRequest) (*api.ProofResponse, error) {
if err := s.ensureInitialized(ctx); err != nil {
return nil, err
Expand Down Expand Up @@ -66,6 +82,15 @@ func (s *storageService) Apply(ctx context.Context, request *api.ApplyRequest) (
return nil, errDebugRejectUpdates
}

// Limit maximum number of entries in a write log.
cfg, err := s.getConfig(ctx, request.Namespace)
if err != nil {
return nil, err
}
if uint64(len(request.WriteLog)) > cfg.MaxApplyWriteLogEntries {
return nil, api.ErrLimitReached
}

return s.storage.Apply(ctx, request)
}

Expand All @@ -77,6 +102,21 @@ func (s *storageService) ApplyBatch(ctx context.Context, request *api.ApplyBatch
return nil, errDebugRejectUpdates
}

// Limit maximum number of operations in a batch.
cfg, err := s.getConfig(ctx, request.Namespace)
if err != nil {
return nil, err
}
if uint64(len(request.Ops)) > cfg.MaxApplyOps {
return nil, api.ErrLimitReached
}
// Limit maximum number of entries in a write log.
for _, op := range request.Ops {
if uint64(len(op.WriteLog)) > cfg.MaxApplyWriteLogEntries {
return nil, api.ErrLimitReached
}
}

return s.storage.ApplyBatch(ctx, request)
}

Expand All @@ -88,6 +128,15 @@ func (s *storageService) Merge(ctx context.Context, request *api.MergeRequest) (
return nil, errDebugRejectUpdates
}

// Limit maximum number of roots to merge.
cfg, err := s.getConfig(ctx, request.Namespace)
if err != nil {
return nil, err
}
if uint64(len(request.Others)) > cfg.MaxMergeRoots {
return nil, api.ErrLimitReached
}

return s.storage.Merge(ctx, request)
}

Expand All @@ -99,6 +148,21 @@ func (s *storageService) MergeBatch(ctx context.Context, request *api.MergeBatch
return nil, errDebugRejectUpdates
}

// Limit maximum number of operations in a batch.
cfg, err := s.getConfig(ctx, request.Namespace)
if err != nil {
return nil, err
}
if uint64(len(request.Ops)) > cfg.MaxMergeOps {
return nil, api.ErrLimitReached
}
// Limit maximum number of roots to merge.
for _, op := range request.Ops {
if uint64(len(op.Others)) > cfg.MaxMergeRoots {
return nil, api.ErrLimitReached
}
}

return s.storage.MergeBatch(ctx, request)
}

Expand Down
1 change: 1 addition & 0 deletions go/worker/storage/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ func (s *Worker) initGenesis(gen *genesis.Document) error {
func init() {
Flags.Bool(CfgWorkerEnabled, false, "Enable storage worker")
Flags.Uint(cfgWorkerFetcherCount, 4, "Number of concurrent storage diff fetchers")

Flags.Bool(CfgWorkerDebugIgnoreApply, false, "Ignore Apply operations (for debugging purposes)")
_ = Flags.MarkHidden(CfgWorkerDebugIgnoreApply)

Expand Down

0 comments on commit e563f39

Please sign in to comment.