Skip to content

Commit

Permalink
go/worker/storage: Add configurable limits for storage operations
Browse files Browse the repository at this point in the history
  • Loading branch information
kostko committed Jan 30, 2020
1 parent 8b63486 commit 677f73e
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 9 deletions.
1 change: 1 addition & 0 deletions .changelog/1914.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/worker/storage: Add configurable limits for storage operations.
32 changes: 23 additions & 9 deletions go/oasis-node/cmd/registry/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ const (
CfgMergeRoundTimeout = "runtime.merge.round_timeout"

// Storage committee flags.
CfgStorageGroupSize = "runtime.storage.group_size"
CfgStorageGroupSize = "runtime.storage.group_size"
CfgStorageMaxApplyWriteLogEntries = "runtime.storage.max_apply_write_log_entries"
CfgStorageMaxApplyOps = "runtime.storage.max_apply_ops"
CfgStorageMaxMergeRoots = "runtime.storage.max_merge_roots"
CfgStorageMaxMergeOps = "runtime.storage.max_merge_ops"

// Transaction scheduler flags.
CfgTxnSchedulerGroupSize = "runtime.txn_scheduler.group_size"
Expand Down Expand Up @@ -356,25 +360,31 @@ func runtimeFromFlags() (*registry.Runtime, signature.Signer, error) {
},
KeyManager: kmID,
Executor: registry.ExecutorParameters{
GroupSize: uint64(viper.GetInt64(CfgExecutorGroupSize)),
GroupBackupSize: uint64(viper.GetInt64(CfgExecutorGroupBackupSize)),
AllowedStragglers: uint64(viper.GetInt64(CfgExecutorAllowedStragglers)),
GroupSize: viper.GetUint64(CfgExecutorGroupSize),
GroupBackupSize: viper.GetUint64(CfgExecutorGroupBackupSize),
AllowedStragglers: viper.GetUint64(CfgExecutorAllowedStragglers),
RoundTimeout: viper.GetDuration(CfgExecutorRoundTimeout),
},
Merge: registry.MergeParameters{
GroupSize: uint64(viper.GetInt64(CfgMergeGroupSize)),
GroupBackupSize: uint64(viper.GetInt64(CfgMergeGroupBackupSize)),
AllowedStragglers: uint64(viper.GetInt64(CfgMergeAllowedStragglers)),
GroupSize: viper.GetUint64(CfgMergeGroupSize),
GroupBackupSize: viper.GetUint64(CfgMergeGroupBackupSize),
AllowedStragglers: viper.GetUint64(CfgMergeAllowedStragglers),
RoundTimeout: viper.GetDuration(CfgMergeRoundTimeout),
},
TxnScheduler: registry.TxnSchedulerParameters{
GroupSize: uint64(viper.GetInt64(CfgTxnSchedulerGroupSize)),
GroupSize: viper.GetUint64(CfgTxnSchedulerGroupSize),
Algorithm: viper.GetString(CfgTxnSchedulerAlgorithm),
BatchFlushTimeout: viper.GetDuration(CfgTxnSchedulerBatchFlushTimeout),
MaxBatchSize: viper.GetUint64(CfgTxnSchedulerMaxBatchSize),
MaxBatchSizeBytes: uint64(viper.GetSizeInBytes(CfgTxnSchedulerMaxBatchSizeBytes)),
},
Storage: registry.StorageParameters{GroupSize: uint64(viper.GetInt64(CfgStorageGroupSize))},
Storage: registry.StorageParameters{
GroupSize: viper.GetUint64(CfgStorageGroupSize),
MaxApplyWriteLogEntries: viper.GetUint64(CfgStorageMaxApplyWriteLogEntries),
MaxApplyOps: viper.GetUint64(CfgStorageMaxApplyOps),
MaxMergeRoots: viper.GetUint64(CfgStorageMaxMergeRoots),
MaxMergeOps: viper.GetUint64(CfgStorageMaxMergeOps),
},
}
if teeHardware == node.TEEHardwareIntelSGX {
var vi registry.VersionInfoIntelSGX
Expand Down Expand Up @@ -503,6 +513,10 @@ func init() {

// Init Storage committee flags.
runtimeFlags.Uint64(CfgStorageGroupSize, 1, "Number of storage nodes for the runtime")
runtimeFlags.Uint64(CfgStorageMaxApplyWriteLogEntries, 100_000, "Maximum number of write log entries")
runtimeFlags.Uint64(CfgStorageMaxApplyOps, 2, "Maximum number of apply operations in a batch")
runtimeFlags.Uint64(CfgStorageMaxMergeRoots, 8, "Maximum number of merge roots")
runtimeFlags.Uint64(CfgStorageMaxMergeOps, 2, "Maximum number of merge operations in a batch")

// Init Admission policy flags.
runtimeFlags.String(CfgAdmissionPolicy, "", "What type of node admission policy to have")
Expand Down
13 changes: 13 additions & 0 deletions go/registry/api/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,19 @@ type TxnSchedulerParameters struct {
type StorageParameters struct {
// GroupSize is the size of the storage group.
GroupSize uint64 `json:"group_size"`

// MaxApplyWriteLogEntries is the maximum number of write log entries when performing an Apply
// operation.
MaxApplyWriteLogEntries uint64 `json:"max_apply_write_log_entries"`

// MaxApplyOps is the maximum number of apply operations in a batch.
MaxApplyOps uint64 `json:"max_apply_ops"`

// MaxMergeRoots is the maximum number of merge roots.
MaxMergeRoots uint64 `json:"max_merge_roots"`

// MaxApplyOps configures the maximum number of merge operations in a batch.
MaxMergeOps uint64 `json:"max_merge_ops"`
}

// AnyNodeRuntimeAdmissionPolicy allows any node to register.
Expand Down
2 changes: 2 additions & 0 deletions go/storage/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ var (
// ErrNoMergeRoots is the error returned when no other roots are passed
// to the Merge operation.
ErrNoMergeRoots = errors.New(ModuleName, 5, "storage: no roots to merge")
// ErrLimitReached means that a configured limit has been reached.
ErrLimitReached = errors.New(ModuleName, 6, "storage: limit reached")

// The following errors are reimports from NodeDB.

Expand Down
64 changes: 64 additions & 0 deletions go/worker/storage/service_external.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package storage
import (
"context"
"errors"
"fmt"

"github.com/oasislabs/oasis-core/go/common"
"github.com/oasislabs/oasis-core/go/common/grpc/auth"
"github.com/oasislabs/oasis-core/go/common/grpc/policy"
registry "github.com/oasislabs/oasis-core/go/registry/api"
"github.com/oasislabs/oasis-core/go/storage/api"
)

Expand Down Expand Up @@ -37,6 +40,19 @@ func (s *storageService) ensureInitialized(ctx context.Context) error {
}
}

func (s *storageService) getConfig(ctx context.Context, ns common.Namespace) (*registry.StorageParameters, error) {
rt, err := s.w.commonWorker.RuntimeRegistry.GetRuntime(ns)
if err != nil {
return nil, fmt.Errorf("storage: failed to get runtime %s: %w", ns, err)
}

rtDesc, err := rt.RegistryDescriptor(ctx)
if err != nil {
return nil, fmt.Errorf("storage: failed to get runtime %s configuration: %w", ns, err)
}
return &rtDesc.Storage, nil
}

func (s *storageService) SyncGet(ctx context.Context, request *api.GetRequest) (*api.ProofResponse, error) {
if err := s.ensureInitialized(ctx); err != nil {
return nil, err
Expand Down Expand Up @@ -66,6 +82,15 @@ func (s *storageService) Apply(ctx context.Context, request *api.ApplyRequest) (
return nil, errDebugRejectUpdates
}

// Limit maximum number of entries in a write log.
cfg, err := s.getConfig(ctx, request.Namespace)
if err != nil {
return nil, err
}
if uint64(len(request.WriteLog)) > cfg.MaxApplyWriteLogEntries {
return nil, api.ErrLimitReached
}

return s.storage.Apply(ctx, request)
}

Expand All @@ -77,6 +102,21 @@ func (s *storageService) ApplyBatch(ctx context.Context, request *api.ApplyBatch
return nil, errDebugRejectUpdates
}

// Limit maximum number of operations in a batch.
cfg, err := s.getConfig(ctx, request.Namespace)
if err != nil {
return nil, err
}
if uint64(len(request.Ops)) > cfg.MaxApplyOps {
return nil, api.ErrLimitReached
}
// Limit maximum number of entries in a write log.
for _, op := range request.Ops {
if uint64(len(op.WriteLog)) > cfg.MaxApplyWriteLogEntries {
return nil, api.ErrLimitReached
}
}

return s.storage.ApplyBatch(ctx, request)
}

Expand All @@ -88,6 +128,15 @@ func (s *storageService) Merge(ctx context.Context, request *api.MergeRequest) (
return nil, errDebugRejectUpdates
}

// Limit maximum number of roots to merge.
cfg, err := s.getConfig(ctx, request.Namespace)
if err != nil {
return nil, err
}
if uint64(len(request.Others)) > cfg.MaxMergeRoots {
return nil, api.ErrLimitReached
}

return s.storage.Merge(ctx, request)
}

Expand All @@ -99,6 +148,21 @@ func (s *storageService) MergeBatch(ctx context.Context, request *api.MergeBatch
return nil, errDebugRejectUpdates
}

// Limit maximum number of operations in a batch.
cfg, err := s.getConfig(ctx, request.Namespace)
if err != nil {
return nil, err
}
if uint64(len(request.Ops)) > cfg.MaxMergeOps {
return nil, api.ErrLimitReached
}
// Limit maximum number of roots to merge.
for _, op := range request.Ops {
if uint64(len(op.Others)) > cfg.MaxMergeRoots {
return nil, api.ErrLimitReached
}
}

return s.storage.MergeBatch(ctx, request)
}

Expand Down
1 change: 1 addition & 0 deletions go/worker/storage/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ func (s *Worker) initGenesis(gen *genesis.Document) error {
func init() {
Flags.Bool(CfgWorkerEnabled, false, "Enable storage worker")
Flags.Uint(cfgWorkerFetcherCount, 4, "Number of concurrent storage diff fetchers")

Flags.Bool(CfgWorkerDebugIgnoreApply, false, "Ignore Apply operations (for debugging purposes)")
_ = Flags.MarkHidden(CfgWorkerDebugIgnoreApply)

Expand Down

0 comments on commit 677f73e

Please sign in to comment.