Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go/worker/storage: Add configurable limits for storage operations #2618

Merged
merged 1 commit into from
Jan 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changelog/1914.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/worker/storage: Add configurable limits for storage operations.
6 changes: 5 additions & 1 deletion go/genesis/genesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,11 @@ func TestGenesisSanityCheck(t *testing.T) {
MaxBatchSizeBytes: 1,
},
Storage: registry.StorageParameters{
GroupSize: 1,
GroupSize: 1,
MaxApplyWriteLogEntries: 100_000,
MaxApplyOps: 2,
MaxMergeRoots: 8,
MaxMergeOps: 2,
},
AdmissionPolicy: registry.RuntimeAdmissionPolicy{
AnyNode: &registry.AnyNodeRuntimeAdmissionPolicy{},
Expand Down
8 changes: 7 additions & 1 deletion go/oasis-net-runner/fixtures/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,13 @@ func NewDefaultFixture() (*oasis.NetworkFixture, error) {
MaxBatchSizeBytes: 1000,
BatchFlushTimeout: 20 * time.Second,
},
Storage: registry.StorageParameters{GroupSize: 1},
Storage: registry.StorageParameters{
GroupSize: 1,
MaxApplyWriteLogEntries: 100_000,
MaxApplyOps: 2,
MaxMergeRoots: 8,
MaxMergeOps: 2,
},
AdmissionPolicy: registry.RuntimeAdmissionPolicy{
AnyNode: &registry.AnyNodeRuntimeAdmissionPolicy{},
},
Expand Down
32 changes: 23 additions & 9 deletions go/oasis-node/cmd/registry/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ const (
CfgMergeRoundTimeout = "runtime.merge.round_timeout"

// Storage committee flags.
CfgStorageGroupSize = "runtime.storage.group_size"
CfgStorageGroupSize = "runtime.storage.group_size"
CfgStorageMaxApplyWriteLogEntries = "runtime.storage.max_apply_write_log_entries"
CfgStorageMaxApplyOps = "runtime.storage.max_apply_ops"
CfgStorageMaxMergeRoots = "runtime.storage.max_merge_roots"
CfgStorageMaxMergeOps = "runtime.storage.max_merge_ops"

// Transaction scheduler flags.
CfgTxnSchedulerGroupSize = "runtime.txn_scheduler.group_size"
Expand Down Expand Up @@ -356,25 +360,31 @@ func runtimeFromFlags() (*registry.Runtime, signature.Signer, error) {
},
KeyManager: kmID,
Executor: registry.ExecutorParameters{
GroupSize: uint64(viper.GetInt64(CfgExecutorGroupSize)),
GroupBackupSize: uint64(viper.GetInt64(CfgExecutorGroupBackupSize)),
AllowedStragglers: uint64(viper.GetInt64(CfgExecutorAllowedStragglers)),
GroupSize: viper.GetUint64(CfgExecutorGroupSize),
GroupBackupSize: viper.GetUint64(CfgExecutorGroupBackupSize),
AllowedStragglers: viper.GetUint64(CfgExecutorAllowedStragglers),
RoundTimeout: viper.GetDuration(CfgExecutorRoundTimeout),
},
Merge: registry.MergeParameters{
GroupSize: uint64(viper.GetInt64(CfgMergeGroupSize)),
GroupBackupSize: uint64(viper.GetInt64(CfgMergeGroupBackupSize)),
AllowedStragglers: uint64(viper.GetInt64(CfgMergeAllowedStragglers)),
GroupSize: viper.GetUint64(CfgMergeGroupSize),
GroupBackupSize: viper.GetUint64(CfgMergeGroupBackupSize),
AllowedStragglers: viper.GetUint64(CfgMergeAllowedStragglers),
RoundTimeout: viper.GetDuration(CfgMergeRoundTimeout),
},
TxnScheduler: registry.TxnSchedulerParameters{
GroupSize: uint64(viper.GetInt64(CfgTxnSchedulerGroupSize)),
GroupSize: viper.GetUint64(CfgTxnSchedulerGroupSize),
Algorithm: viper.GetString(CfgTxnSchedulerAlgorithm),
BatchFlushTimeout: viper.GetDuration(CfgTxnSchedulerBatchFlushTimeout),
MaxBatchSize: viper.GetUint64(CfgTxnSchedulerMaxBatchSize),
MaxBatchSizeBytes: uint64(viper.GetSizeInBytes(CfgTxnSchedulerMaxBatchSizeBytes)),
},
Storage: registry.StorageParameters{GroupSize: uint64(viper.GetInt64(CfgStorageGroupSize))},
Storage: registry.StorageParameters{
GroupSize: viper.GetUint64(CfgStorageGroupSize),
MaxApplyWriteLogEntries: viper.GetUint64(CfgStorageMaxApplyWriteLogEntries),
MaxApplyOps: viper.GetUint64(CfgStorageMaxApplyOps),
MaxMergeRoots: viper.GetUint64(CfgStorageMaxMergeRoots),
MaxMergeOps: viper.GetUint64(CfgStorageMaxMergeOps),
},
}
if teeHardware == node.TEEHardwareIntelSGX {
var vi registry.VersionInfoIntelSGX
Expand Down Expand Up @@ -503,6 +513,10 @@ func init() {

// Init Storage committee flags.
runtimeFlags.Uint64(CfgStorageGroupSize, 1, "Number of storage nodes for the runtime")
runtimeFlags.Uint64(CfgStorageMaxApplyWriteLogEntries, 100_000, "Maximum number of write log entries")
runtimeFlags.Uint64(CfgStorageMaxApplyOps, 2, "Maximum number of apply operations in a batch")
runtimeFlags.Uint64(CfgStorageMaxMergeRoots, 8, "Maximum number of merge roots")
runtimeFlags.Uint64(CfgStorageMaxMergeOps, 2, "Maximum number of merge operations in a batch")

// Init Admission policy flags.
runtimeFlags.String(CfgAdmissionPolicy, "", "What type of node admission policy to have")
Expand Down
8 changes: 7 additions & 1 deletion go/oasis-node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,13 @@ var (
MaxBatchSizeBytes: 1000,
BatchFlushTimeout: 20 * time.Second,
},
Storage: registry.StorageParameters{GroupSize: 1},
Storage: registry.StorageParameters{
GroupSize: 1,
MaxApplyWriteLogEntries: 100_000,
MaxApplyOps: 2,
MaxMergeRoots: 8,
MaxMergeOps: 2,
},
AdmissionPolicy: registry.RuntimeAdmissionPolicy{
AnyNode: &registry.AnyNodeRuntimeAdmissionPolicy{},
},
Expand Down
4 changes: 4 additions & 0 deletions go/oasis-test-runner/oasis/cli/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func (r *RegistryHelpers) GenerateRegisterRuntimeTx(
"--"+cmdRegRt.CfgMergeAllowedStragglers, strconv.FormatUint(runtime.Merge.AllowedStragglers, 10),
"--"+cmdRegRt.CfgMergeRoundTimeout, runtime.Merge.RoundTimeout.String(),
"--"+cmdRegRt.CfgStorageGroupSize, strconv.FormatUint(runtime.Storage.GroupSize, 10),
"--"+cmdRegRt.CfgStorageMaxApplyWriteLogEntries, strconv.FormatUint(runtime.Storage.MaxApplyWriteLogEntries, 10),
"--"+cmdRegRt.CfgStorageMaxApplyOps, strconv.FormatUint(runtime.Storage.MaxApplyOps, 10),
"--"+cmdRegRt.CfgStorageMaxMergeRoots, strconv.FormatUint(runtime.Storage.MaxMergeRoots, 10),
"--"+cmdRegRt.CfgStorageMaxMergeOps, strconv.FormatUint(runtime.Storage.MaxMergeOps, 10),
"--"+cmdRegRt.CfgTxnSchedulerGroupSize, strconv.FormatUint(runtime.TxnScheduler.GroupSize, 10),
"--"+cmdRegRt.CfgTxnSchedulerAlgorithm, runtime.TxnScheduler.Algorithm,
"--"+cmdRegRt.CfgTxnSchedulerBatchFlushTimeout, runtime.TxnScheduler.BatchFlushTimeout.String(),
Expand Down
4 changes: 4 additions & 0 deletions go/oasis-test-runner/oasis/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ func (net *Network) NewRuntime(cfg *RuntimeCfg) (*Runtime, error) {
"--" + cmdRegRt.CfgTxnSchedulerAlgorithm, cfg.TxnScheduler.Algorithm,
"--" + cmdRegRt.CfgTxnSchedulerBatchFlushTimeout, cfg.TxnScheduler.BatchFlushTimeout.String(),
"--" + cmdRegRt.CfgStorageGroupSize, strconv.FormatUint(cfg.Storage.GroupSize, 10),
"--" + cmdRegRt.CfgStorageMaxApplyWriteLogEntries, strconv.FormatUint(cfg.Storage.MaxApplyWriteLogEntries, 10),
"--" + cmdRegRt.CfgStorageMaxApplyOps, strconv.FormatUint(cfg.Storage.MaxApplyOps, 10),
"--" + cmdRegRt.CfgStorageMaxMergeRoots, strconv.FormatUint(cfg.Storage.MaxMergeRoots, 10),
"--" + cmdRegRt.CfgStorageMaxMergeOps, strconv.FormatUint(cfg.Storage.MaxMergeOps, 10),
}...)

if cfg.GenesisState != "" {
Expand Down
8 changes: 7 additions & 1 deletion go/oasis-test-runner/scenario/e2e/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,13 @@ func (sc *basicImpl) Fixture() (*oasis.NetworkFixture, error) {
MaxBatchSizeBytes: 1000,
BatchFlushTimeout: 1 * time.Second,
},
Storage: registry.StorageParameters{GroupSize: 2},
Storage: registry.StorageParameters{
GroupSize: 2,
MaxApplyWriteLogEntries: 100_000,
MaxApplyOps: 2,
MaxMergeRoots: 8,
MaxMergeOps: 2,
},
AdmissionPolicy: registry.RuntimeAdmissionPolicy{
AnyNode: &registry.AnyNodeRuntimeAdmissionPolicy{},
},
Expand Down
8 changes: 7 additions & 1 deletion go/oasis-test-runner/scenario/e2e/multiple_runtimes.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,13 @@ func (mr *multipleRuntimesImpl) Fixture() (*oasis.NetworkFixture, error) {
MaxBatchSizeBytes: 1000,
BatchFlushTimeout: 1 * time.Second,
},
Storage: registry.StorageParameters{GroupSize: 2},
Storage: registry.StorageParameters{
GroupSize: 2,
MaxApplyWriteLogEntries: 100_000,
MaxApplyOps: 2,
MaxMergeRoots: 8,
MaxMergeOps: 2,
},
AdmissionPolicy: registry.RuntimeAdmissionPolicy{
AnyNode: &registry.AnyNodeRuntimeAdmissionPolicy{},
},
Expand Down
6 changes: 5 additions & 1 deletion go/oasis-test-runner/scenario/e2e/registry_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,11 @@ func (r *registryCLIImpl) testRuntime(childEnv *env.Env, cli *cli.Helpers) error
MaxBatchSizeBytes: 13,
},
Storage: registry.StorageParameters{
GroupSize: 9,
GroupSize: 9,
MaxApplyWriteLogEntries: 10,
MaxApplyOps: 11,
MaxMergeRoots: 12,
MaxMergeOps: 13,
},
AdmissionPolicy: registry.RuntimeAdmissionPolicy{
EntityWhitelist: &registry.EntityWhitelistRuntimeAdmissionPolicy{
Expand Down
24 changes: 24 additions & 0 deletions go/registry/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1073,6 +1073,30 @@ func VerifyRegisterRuntimeArgs(
)
return nil, fmt.Errorf("%w: storage group too small", ErrInvalidArgument)
}
if rt.Storage.MaxApplyWriteLogEntries < 10 {
logger.Error("RegisterRuntime: storage MaxApplyWriteLogEntries parameter too small",
"runtime", rt,
)
return nil, fmt.Errorf("%w: storage MaxApplyWriteLogEntries parameter too small", ErrInvalidArgument)
}
if rt.Storage.MaxApplyOps < 2 {
logger.Error("RegisterRuntime: storage MaxApplyOps parameter too small",
"runtime", rt,
)
return nil, fmt.Errorf("%w: storage MaxApplyOps parameter too small", ErrInvalidArgument)
}
if rt.Storage.MaxMergeRoots == 0 {
logger.Error("RegisterRuntime: storage MaxMergeRoots parameter too small",
"runtime", rt,
)
return nil, fmt.Errorf("%w: storage MaxMergeRoots parameter too small", ErrInvalidArgument)
}
if rt.Storage.MaxMergeOps < 2 {
logger.Error("RegisterRuntime: storage MaxMergeOps parameter too small",
"runtime", rt,
)
return nil, fmt.Errorf("%w: storage MaxMergeOps parameter too small", ErrInvalidArgument)
}

if rt.ID.IsKeyManager() {
logger.Error("RegisterRuntime: runtime ID flag mismatch",
Expand Down
13 changes: 13 additions & 0 deletions go/registry/api/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,19 @@ type TxnSchedulerParameters struct {
type StorageParameters struct {
// GroupSize is the size of the storage group.
GroupSize uint64 `json:"group_size"`

// MaxApplyWriteLogEntries is the maximum number of write log entries when performing an Apply
// operation.
MaxApplyWriteLogEntries uint64 `json:"max_apply_write_log_entries"`

// MaxApplyOps is the maximum number of apply operations in a batch.
MaxApplyOps uint64 `json:"max_apply_ops"`

// MaxMergeRoots is the maximum number of merge roots.
MaxMergeRoots uint64 `json:"max_merge_roots"`

// MaxApplyOps configures the maximum number of merge operations in a batch.
MaxMergeOps uint64 `json:"max_merge_ops"`
}

// AnyNodeRuntimeAdmissionPolicy allows any node to register.
Expand Down
8 changes: 7 additions & 1 deletion go/registry/tests/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1233,7 +1233,13 @@ func NewTestRuntime(seed []byte, entity *TestEntity, isKeyManager bool) (*TestRu
MaxBatchSize: 1,
MaxBatchSizeBytes: 1000,
},
Storage: api.StorageParameters{GroupSize: 3},
Storage: api.StorageParameters{
GroupSize: 3,
MaxApplyWriteLogEntries: 100_000,
MaxApplyOps: 2,
MaxMergeRoots: 8,
MaxMergeOps: 2,
},
AdmissionPolicy: api.RuntimeAdmissionPolicy{
AnyNode: &api.AnyNodeRuntimeAdmissionPolicy{},
},
Expand Down
2 changes: 2 additions & 0 deletions go/storage/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ var (
// ErrNoMergeRoots is the error returned when no other roots are passed
// to the Merge operation.
ErrNoMergeRoots = errors.New(ModuleName, 5, "storage: no roots to merge")
// ErrLimitReached means that a configured limit has been reached.
ErrLimitReached = errors.New(ModuleName, 6, "storage: limit reached")

// The following errors are reimports from NodeDB.

Expand Down
64 changes: 64 additions & 0 deletions go/worker/storage/service_external.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package storage
import (
"context"
"errors"
"fmt"

"github.com/oasislabs/oasis-core/go/common"
"github.com/oasislabs/oasis-core/go/common/grpc/auth"
"github.com/oasislabs/oasis-core/go/common/grpc/policy"
registry "github.com/oasislabs/oasis-core/go/registry/api"
"github.com/oasislabs/oasis-core/go/storage/api"
)

Expand Down Expand Up @@ -37,6 +40,19 @@ func (s *storageService) ensureInitialized(ctx context.Context) error {
}
}

func (s *storageService) getConfig(ctx context.Context, ns common.Namespace) (*registry.StorageParameters, error) {
rt, err := s.w.commonWorker.RuntimeRegistry.GetRuntime(ns)
if err != nil {
return nil, fmt.Errorf("storage: failed to get runtime %s: %w", ns, err)
}

rtDesc, err := rt.RegistryDescriptor(ctx)
if err != nil {
return nil, fmt.Errorf("storage: failed to get runtime %s configuration: %w", ns, err)
}
return &rtDesc.Storage, nil
}

func (s *storageService) SyncGet(ctx context.Context, request *api.GetRequest) (*api.ProofResponse, error) {
if err := s.ensureInitialized(ctx); err != nil {
return nil, err
Expand Down Expand Up @@ -66,6 +82,15 @@ func (s *storageService) Apply(ctx context.Context, request *api.ApplyRequest) (
return nil, errDebugRejectUpdates
}

// Limit maximum number of entries in a write log.
cfg, err := s.getConfig(ctx, request.Namespace)
if err != nil {
return nil, err
}
if uint64(len(request.WriteLog)) > cfg.MaxApplyWriteLogEntries {
return nil, api.ErrLimitReached
}

return s.storage.Apply(ctx, request)
}

Expand All @@ -77,6 +102,21 @@ func (s *storageService) ApplyBatch(ctx context.Context, request *api.ApplyBatch
return nil, errDebugRejectUpdates
}

// Limit maximum number of operations in a batch.
cfg, err := s.getConfig(ctx, request.Namespace)
if err != nil {
return nil, err
}
if uint64(len(request.Ops)) > cfg.MaxApplyOps {
return nil, api.ErrLimitReached
}
// Limit maximum number of entries in a write log.
for _, op := range request.Ops {
if uint64(len(op.WriteLog)) > cfg.MaxApplyWriteLogEntries {
return nil, api.ErrLimitReached
}
}

return s.storage.ApplyBatch(ctx, request)
}

Expand All @@ -88,6 +128,15 @@ func (s *storageService) Merge(ctx context.Context, request *api.MergeRequest) (
return nil, errDebugRejectUpdates
}

// Limit maximum number of roots to merge.
cfg, err := s.getConfig(ctx, request.Namespace)
if err != nil {
return nil, err
}
if uint64(len(request.Others)) > cfg.MaxMergeRoots {
return nil, api.ErrLimitReached
}

return s.storage.Merge(ctx, request)
}

Expand All @@ -99,6 +148,21 @@ func (s *storageService) MergeBatch(ctx context.Context, request *api.MergeBatch
return nil, errDebugRejectUpdates
}

// Limit maximum number of operations in a batch.
cfg, err := s.getConfig(ctx, request.Namespace)
if err != nil {
return nil, err
}
if uint64(len(request.Ops)) > cfg.MaxMergeOps {
return nil, api.ErrLimitReached
}
// Limit maximum number of roots to merge.
for _, op := range request.Ops {
if uint64(len(op.Others)) > cfg.MaxMergeRoots {
return nil, api.ErrLimitReached
}
}

return s.storage.MergeBatch(ctx, request)
}

Expand Down
1 change: 1 addition & 0 deletions go/worker/storage/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ func (s *Worker) initGenesis(gen *genesis.Document) error {
func init() {
Flags.Bool(CfgWorkerEnabled, false, "Enable storage worker")
Flags.Uint(cfgWorkerFetcherCount, 4, "Number of concurrent storage diff fetchers")

Flags.Bool(CfgWorkerDebugIgnoreApply, false, "Ignore Apply operations (for debugging purposes)")
_ = Flags.MarkHidden(CfgWorkerDebugIgnoreApply)

Expand Down