From c50b525d7475d353f3876c8c93c0904cb1cc6fb8 Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Fri, 7 Aug 2020 11:44:09 +0200 Subject: [PATCH] go/worker/compute/merge: Drop support for multiple committees Since there is currently no transaction scheduler implementation which would support multiple committees, there is no sense in the merge node to try to support such cases as it could be a source of bugs. Additionally it results in extra round trips to storage nodes due to the Merge operation which in case of a single committee does not do anything. The merge node is also the only client for the Merge* storage operations, so they can just be removed in order to reduce the exposed API surface. --- go/genesis/genesis_test.go | 2 +- go/oasis-net-runner/fixtures/default.go | 2 +- go/oasis-node/cmd/debug/byzantine/merge.go | 39 +----- go/oasis-node/cmd/debug/byzantine/storage.go | 28 ---- .../debug/txsource/workload/registration.go | 2 +- go/oasis-node/cmd/registry/runtime/runtime.go | 2 +- go/oasis-node/node_test.go | 2 +- .../scenario/e2e/runtime/multiple_runtimes.go | 2 +- .../scenario/e2e/runtime/runtime.go | 2 +- go/registry/tests/tester.go | 2 +- go/roothash/api/commitment/pool.go | 4 +- go/runtime/registry/storage_router.go | 16 --- go/storage/api/api.go | 43 +----- go/storage/api/grpc.go | 96 -------------- go/storage/api/root_cache.go | 56 -------- go/storage/client/client.go | 24 ---- go/storage/database/database.go | 32 ----- go/storage/metrics.go | 28 ---- go/storage/tests/tester.go | 108 --------------- go/worker/compute/merge/committee/node.go | 124 ++++-------------- go/worker/compute/merge/committee/state.go | 2 +- go/worker/storage/committee/policy.go | 7 +- go/worker/storage/service_external.go | 46 ------- 23 files changed, 48 insertions(+), 621 deletions(-) diff --git a/go/genesis/genesis_test.go b/go/genesis/genesis_test.go index eaeb2568cc0..294690a8a8f 100644 --- a/go/genesis/genesis_test.go +++ b/go/genesis/genesis_test.go @@ -219,7 +219,7 @@ func TestGenesisSanityCheck(t *testing.T) { MinWriteReplication: 1, MaxApplyWriteLogEntries: 100_000, MaxApplyOps: 2, - MaxMergeRoots: 8, + MaxMergeRoots: 1, MaxMergeOps: 2, }, AdmissionPolicy: registry.RuntimeAdmissionPolicy{ diff --git a/go/oasis-net-runner/fixtures/default.go b/go/oasis-net-runner/fixtures/default.go index 441b3a73032..f45327078fb 100644 --- a/go/oasis-net-runner/fixtures/default.go +++ b/go/oasis-net-runner/fixtures/default.go @@ -106,7 +106,7 @@ func newDefaultFixture() (*oasis.NetworkFixture, error) { MinWriteReplication: 1, MaxApplyWriteLogEntries: 100_000, MaxApplyOps: 2, - MaxMergeRoots: 8, + MaxMergeRoots: 1, MaxMergeOps: 2, }, AdmissionPolicy: registry.RuntimeAdmissionPolicy{ diff --git a/go/oasis-node/cmd/debug/byzantine/merge.go b/go/oasis-node/cmd/debug/byzantine/merge.go index 16e08d0ae15..d24019bf874 100644 --- a/go/oasis-node/cmd/debug/byzantine/merge.go +++ b/go/oasis-node/cmd/debug/byzantine/merge.go @@ -6,21 +6,18 @@ import ( "github.com/oasisprotocol/oasis-core/go/common" "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" - "github.com/oasisprotocol/oasis-core/go/common/crypto/signature" "github.com/oasisprotocol/oasis-core/go/common/identity" consensus "github.com/oasisprotocol/oasis-core/go/consensus/api" "github.com/oasisprotocol/oasis-core/go/roothash/api/block" "github.com/oasisprotocol/oasis-core/go/roothash/api/commitment" - storage "github.com/oasisprotocol/oasis-core/go/storage/api" ) type mergeBatchContext struct { currentBlock *block.Block commitments []*commitment.OpenExecutorCommitment - storageReceipts []*storage.Receipt - newBlock *block.Block - commit *commitment.MergeCommitment + newBlock *block.Block + commit *commitment.MergeCommitment } func newMergeBatchContext() *mergeBatchContext { @@ -86,36 +83,14 @@ func (mbc *mergeBatchContext) process(ctx context.Context, hnss []*honestNodeSto } } - var emptyRoot hash.Hash - emptyRoot.Empty() - - var err error - mbc.storageReceipts, err = storageBroadcastMergeBatch(ctx, hnss, mbc.currentBlock.Header.Namespace, mbc.currentBlock.Header.Round, []storage.MergeOp{ - { - Base: emptyRoot, - Others: ioRoots, - }, - { - Base: mbc.currentBlock.Header.StateRoot, - Others: stateRoots, - }, - }) - if err != nil { - return fmt.Errorf("storage broadcast merge batch: %w", err) - } - - var firstReceiptBody storage.ReceiptBody - if err := mbc.storageReceipts[0].Open(&firstReceiptBody); err != nil { - return fmt.Errorf("storage receipt Open: %w", err) - } - var signatures []signature.Signature - for _, receipt := range mbc.storageReceipts { - signatures = append(signatures, receipt.Signature) + if len(collectedCommittees) != 1 { + return fmt.Errorf("multiple committees not supported: %d", len(collectedCommittees)) } + signatures := mbc.commitments[0].Body.StorageSignatures mbc.newBlock = block.NewEmptyBlock(mbc.currentBlock, 0, block.Normal) - mbc.newBlock.Header.IORoot = firstReceiptBody.Roots[0] - mbc.newBlock.Header.StateRoot = firstReceiptBody.Roots[1] + mbc.newBlock.Header.IORoot = ioRoots[0] + mbc.newBlock.Header.StateRoot = stateRoots[0] mbc.newBlock.Header.Messages = messages mbc.newBlock.Header.StorageSignatures = signatures diff --git a/go/oasis-node/cmd/debug/byzantine/storage.go b/go/oasis-node/cmd/debug/byzantine/storage.go index 3129ecc75f5..f8de1011e81 100644 --- a/go/oasis-node/cmd/debug/byzantine/storage.go +++ b/go/oasis-node/cmd/debug/byzantine/storage.go @@ -106,14 +106,6 @@ func (hns *honestNodeStorage) ApplyBatch(ctx context.Context, request *storage.A return hns.client.ApplyBatch(ctx, request) } -func (hns *honestNodeStorage) Merge(ctx context.Context, request *storage.MergeRequest) ([]*storage.Receipt, error) { - return hns.client.Merge(ctx, request) -} - -func (hns *honestNodeStorage) MergeBatch(ctx context.Context, request *storage.MergeBatchRequest) ([]*storage.Receipt, error) { - return hns.client.MergeBatch(ctx, request) -} - func (hns *honestNodeStorage) GetDiff(ctx context.Context, request *storage.GetDiffRequest) (storage.WriteLogIterator, error) { return hns.client.GetDiff(ctx, request) } @@ -176,23 +168,3 @@ func storageBroadcastApplyBatch( return receipts, nil } - -func storageBroadcastMergeBatch( - ctx context.Context, - hnss []*honestNodeStorage, - ns common.Namespace, - round uint64, - ops []storage.MergeOp, -) ([]*storage.Receipt, error) { - var receipts []*storage.Receipt - for _, hns := range hnss { - r, err := hns.MergeBatch(ctx, &storage.MergeBatchRequest{Namespace: ns, Round: round, Ops: ops}) - if err != nil { - return receipts, fmt.Errorf("honest node storage MergeBatch %s: %w", hns.nodeID, err) - } - - receipts = append(receipts, r...) - } - - return receipts, nil -} diff --git a/go/oasis-node/cmd/debug/txsource/workload/registration.go b/go/oasis-node/cmd/debug/txsource/workload/registration.go index 8d08bb10cd0..d68090b7735 100644 --- a/go/oasis-node/cmd/debug/txsource/workload/registration.go +++ b/go/oasis-node/cmd/debug/txsource/workload/registration.go @@ -68,7 +68,7 @@ func getRuntime(entityID signature.PublicKey, id common.Namespace) *registry.Run MinWriteReplication: 1, MaxApplyWriteLogEntries: 100_000, MaxApplyOps: 2, - MaxMergeRoots: 8, + MaxMergeRoots: 1, MaxMergeOps: 2, }, AdmissionPolicy: registry.RuntimeAdmissionPolicy{ diff --git a/go/oasis-node/cmd/registry/runtime/runtime.go b/go/oasis-node/cmd/registry/runtime/runtime.go index 3b6d76ddaa4..a7080ffaa0e 100644 --- a/go/oasis-node/cmd/registry/runtime/runtime.go +++ b/go/oasis-node/cmd/registry/runtime/runtime.go @@ -571,7 +571,7 @@ func init() { runtimeFlags.Uint64(CfgStorageMinWriteReplication, 1, "Minimum required storage write replication") runtimeFlags.Uint64(CfgStorageMaxApplyWriteLogEntries, 100_000, "Maximum number of write log entries") runtimeFlags.Uint64(CfgStorageMaxApplyOps, 2, "Maximum number of apply operations in a batch") - runtimeFlags.Uint64(CfgStorageMaxMergeRoots, 8, "Maximum number of merge roots") + runtimeFlags.Uint64(CfgStorageMaxMergeRoots, 1, "Maximum number of merge roots") runtimeFlags.Uint64(CfgStorageMaxMergeOps, 2, "Maximum number of merge operations in a batch") runtimeFlags.Uint64(CfgStorageCheckpointInterval, 0, "Storage checkpoint interval (in rounds)") runtimeFlags.Uint64(CfgStorageCheckpointNumKept, 0, "Number of storage checkpoints to keep") diff --git a/go/oasis-node/node_test.go b/go/oasis-node/node_test.go index c0056775db6..9b9dc46d2e9 100644 --- a/go/oasis-node/node_test.go +++ b/go/oasis-node/node_test.go @@ -106,7 +106,7 @@ var ( MinWriteReplication: 1, MaxApplyWriteLogEntries: 100_000, MaxApplyOps: 2, - MaxMergeRoots: 8, + MaxMergeRoots: 1, MaxMergeOps: 2, }, AdmissionPolicy: registry.RuntimeAdmissionPolicy{ diff --git a/go/oasis-test-runner/scenario/e2e/runtime/multiple_runtimes.go b/go/oasis-test-runner/scenario/e2e/runtime/multiple_runtimes.go index 2d0e89a9ab8..a11c1c3a4a1 100644 --- a/go/oasis-test-runner/scenario/e2e/runtime/multiple_runtimes.go +++ b/go/oasis-test-runner/scenario/e2e/runtime/multiple_runtimes.go @@ -107,7 +107,7 @@ func (sc *multipleRuntimesImpl) Fixture() (*oasis.NetworkFixture, error) { MinWriteReplication: 1, MaxApplyWriteLogEntries: 100_000, MaxApplyOps: 2, - MaxMergeRoots: 8, + MaxMergeRoots: 1, MaxMergeOps: 2, }, AdmissionPolicy: registry.RuntimeAdmissionPolicy{ diff --git a/go/oasis-test-runner/scenario/e2e/runtime/runtime.go b/go/oasis-test-runner/scenario/e2e/runtime/runtime.go index 69e32dafff0..579514d58b0 100644 --- a/go/oasis-test-runner/scenario/e2e/runtime/runtime.go +++ b/go/oasis-test-runner/scenario/e2e/runtime/runtime.go @@ -180,7 +180,7 @@ func (sc *runtimeImpl) Fixture() (*oasis.NetworkFixture, error) { MinWriteReplication: 2, MaxApplyWriteLogEntries: 100_000, MaxApplyOps: 2, - MaxMergeRoots: 8, + MaxMergeRoots: 1, MaxMergeOps: 2, }, AdmissionPolicy: registry.RuntimeAdmissionPolicy{ diff --git a/go/registry/tests/tester.go b/go/registry/tests/tester.go index 6e3cfc3052a..5ed7a9fd43b 100644 --- a/go/registry/tests/tester.go +++ b/go/registry/tests/tester.go @@ -1629,7 +1629,7 @@ func NewTestRuntime(seed []byte, ent *TestEntity, isKeyManager bool) (*TestRunti MinWriteReplication: 3, MaxApplyWriteLogEntries: 100_000, MaxApplyOps: 2, - MaxMergeRoots: 8, + MaxMergeRoots: 1, MaxMergeOps: 2, }, AdmissionPolicy: api.RuntimeAdmissionPolicy{ diff --git a/go/roothash/api/commitment/pool.go b/go/roothash/api/commitment/pool.go index 0fdb5947734..50d62c2a275 100644 --- a/go/roothash/api/commitment/pool.go +++ b/go/roothash/api/commitment/pool.go @@ -764,10 +764,10 @@ func (m *MultiPool) CheckEnoughCommitments() error { } // GetExecutorCommitments returns a list of executor commitments in the pool. -func (m *MultiPool) GetExecutorCommitments() (result []ExecutorCommitment) { +func (m *MultiPool) GetOpenExecutorCommitments() (result []OpenExecutorCommitment) { for _, p := range m.Committees { for _, c := range p.ExecuteCommitments { - result = append(result, c.ExecutorCommitment) + result = append(result, c) } } return diff --git a/go/runtime/registry/storage_router.go b/go/runtime/registry/storage_router.go index aa6306349f7..c5b423ef9f8 100644 --- a/go/runtime/registry/storage_router.go +++ b/go/runtime/registry/storage_router.go @@ -59,22 +59,6 @@ func (sr *storageRouter) ApplyBatch(ctx context.Context, request *api.ApplyBatch return rt.Storage().ApplyBatch(ctx, request) } -func (sr *storageRouter) Merge(ctx context.Context, request *api.MergeRequest) ([]*api.Receipt, error) { - rt, err := sr.getRuntime(request.Namespace) - if err != nil { - return nil, err - } - return rt.Storage().Merge(ctx, request) -} - -func (sr *storageRouter) MergeBatch(ctx context.Context, request *api.MergeBatchRequest) ([]*api.Receipt, error) { - rt, err := sr.getRuntime(request.Namespace) - if err != nil { - return nil, err - } - return rt.Storage().MergeBatch(ctx, request) -} - func (sr *storageRouter) GetDiff(ctx context.Context, request *api.GetDiffRequest) (api.WriteLogIterator, error) { rt, err := sr.getRuntime(request.StartRoot.Namespace) if err != nil { diff --git a/go/storage/api/api.go b/go/storage/api/api.go index 0b197f91b27..ae126ed50be 100644 --- a/go/storage/api/api.go +++ b/go/storage/api/api.go @@ -38,11 +38,8 @@ var ( // ErrUnsupported is the error returned when the called method is not // supported by the given backend. ErrUnsupported = errors.New(ModuleName, 4, "storage: method not supported by backend") - // ErrNoMergeRoots is the error returned when no other roots are passed - // to the Merge operation. - ErrNoMergeRoots = errors.New(ModuleName, 5, "storage: no roots to merge") // ErrLimitReached means that a configured limit has been reached. - ErrLimitReached = errors.New(ModuleName, 6, "storage: limit reached") + ErrLimitReached = errors.New(ModuleName, 5, "storage: limit reached") // The following errors are reimports from NodeDB. @@ -240,14 +237,6 @@ type ApplyOp struct { WriteLog WriteLog `json:"writelog"` } -// MergeOps is a merge operation within a batch of merge operations. -type MergeOp struct { - // Base is the base root for the merge. - Base hash.Hash `json:"base"` - // Others is a list of roots derived from base that should be merged. - Others []hash.Hash `json:"others"` -} - // ApplyRequest is an Apply request. type ApplyRequest struct { Namespace common.Namespace `json:"namespace"` @@ -265,21 +254,6 @@ type ApplyBatchRequest struct { Ops []ApplyOp `json:"ops"` } -// MergeRequest is a Merge request. -type MergeRequest struct { - Namespace common.Namespace `json:"namespace"` - Round uint64 `json:"round"` - Base hash.Hash `json:"base"` - Others []hash.Hash `json:"others"` -} - -// MergeBatchRequest is a MergeBatch request. -type MergeBatchRequest struct { - Namespace common.Namespace `json:"namespace"` - Round uint64 `json:"round"` - Ops []MergeOp `json:"ops"` -} - // SyncOptions are the sync options. type SyncOptions struct { OffsetKey []byte `json:"offset_key"` @@ -317,21 +291,6 @@ type Backend interface { // See Apply for more details. ApplyBatch(ctx context.Context, request *ApplyBatchRequest) ([]*Receipt, error) - // TODO: Add proof. - // Merge performs a 3-way merge operation between the specified - // roots and returns a receipt for the merged root. - // - // Round is the round of the base root while all other roots are - // expected to be in the next round. - Merge(ctx context.Context, request *MergeRequest) ([]*Receipt, error) - - // TODO: Add proof. - // MergeBatch performs multiple sets of merge operations and returns - // a single receipt covering all merged roots. - // - // See Merge for more details. - MergeBatch(ctx context.Context, request *MergeBatchRequest) ([]*Receipt, error) - // GetDiff returns an iterator of write log entries that must be applied // to get from the first given root to the second one. GetDiff(ctx context.Context, request *GetDiffRequest) (WriteLogIterator, error) diff --git a/go/storage/api/grpc.go b/go/storage/api/grpc.go index 026ba515dce..68da3e7dc9e 100644 --- a/go/storage/api/grpc.go +++ b/go/storage/api/grpc.go @@ -52,32 +52,6 @@ var ( return true, nil }) - // MethodMerge is the Merge method. - MethodMerge = ServiceName.NewMethod("Merge", MergeRequest{}). - WithNamespaceExtractor(func(ctx context.Context, req interface{}) (common.Namespace, error) { - r, ok := req.(*MergeRequest) - if !ok { - return common.Namespace{}, errInvalidRequestType - } - return r.Namespace, nil - }). - WithAccessControl(func(ctx context.Context, req interface{}) (bool, error) { - return true, nil - }) - - // MethodMergeBatch is the MergeBatch method. - MethodMergeBatch = ServiceName.NewMethod("MergeBatch", MergeBatchRequest{}). - WithNamespaceExtractor(func(ctx context.Context, req interface{}) (common.Namespace, error) { - r, ok := req.(*MergeBatchRequest) - if !ok { - return common.Namespace{}, errInvalidRequestType - } - return r.Namespace, nil - }). - WithAccessControl(func(ctx context.Context, req interface{}) (bool, error) { - return true, nil - }) - // MethodGetDiff is the GetDiff method. MethodGetDiff = ServiceName.NewMethod("GetDiff", GetDiffRequest{}). WithNamespaceExtractor(func(ctx context.Context, req interface{}) (common.Namespace, error) { @@ -142,14 +116,6 @@ var ( MethodName: MethodApplyBatch.ShortName(), Handler: handlerApplyBatch, }, - { - MethodName: MethodMerge.ShortName(), - Handler: handlerMerge, - }, - { - MethodName: MethodMergeBatch.ShortName(), - Handler: handlerMergeBatch, - }, { MethodName: MethodGetCheckpoints.ShortName(), Handler: handlerGetCheckpoints, @@ -285,52 +251,6 @@ func handlerApplyBatch( // nolint: golint return interceptor(ctx, &req, info, handler) } -func handlerMerge( // nolint: golint - srv interface{}, - ctx context.Context, - dec func(interface{}) error, - interceptor grpc.UnaryServerInterceptor, -) (interface{}, error) { - var req MergeRequest - if err := dec(&req); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(Backend).Merge(ctx, &req) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: MethodMerge.FullName(), - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(Backend).Merge(ctx, req.(*MergeRequest)) - } - return interceptor(ctx, &req, info, handler) -} - -func handlerMergeBatch( // nolint: golint - srv interface{}, - ctx context.Context, - dec func(interface{}) error, - interceptor grpc.UnaryServerInterceptor, -) (interface{}, error) { - var req MergeBatchRequest - if err := dec(&req); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(Backend).MergeBatch(ctx, &req) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: MethodMergeBatch.FullName(), - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(Backend).MergeBatch(ctx, req.(*MergeBatchRequest)) - } - return interceptor(ctx, &req, info, handler) -} - func handlerGetCheckpoints( // nolint: golint srv interface{}, ctx context.Context, @@ -489,22 +409,6 @@ func (c *storageClient) ApplyBatch(ctx context.Context, request *ApplyBatchReque return rsp, nil } -func (c *storageClient) Merge(ctx context.Context, request *MergeRequest) ([]*Receipt, error) { - var rsp []*Receipt - if err := c.conn.Invoke(ctx, MethodMerge.FullName(), request, &rsp); err != nil { - return nil, err - } - return rsp, nil -} - -func (c *storageClient) MergeBatch(ctx context.Context, request *MergeBatchRequest) ([]*Receipt, error) { - var rsp []*Receipt - if err := c.conn.Invoke(ctx, MethodMergeBatch.FullName(), request, &rsp); err != nil { - return nil, err - } - return rsp, nil -} - func (c *storageClient) GetCheckpoints(ctx context.Context, request *checkpoint.GetCheckpointsRequest) ([]*checkpoint.Metadata, error) { var rsp []*checkpoint.Metadata if err := c.conn.Invoke(ctx, MethodGetCheckpoints.FullName(), request, &rsp); err != nil { diff --git a/go/storage/api/root_cache.go b/go/storage/api/root_cache.go index ad2c0be9c20..9e6695e70f4 100644 --- a/go/storage/api/root_cache.go +++ b/go/storage/api/root_cache.go @@ -33,62 +33,6 @@ func (rc *RootCache) GetTree(ctx context.Context, root Root) (mkvs.Tree, error) return mkvs.NewWithRoot(rc.remoteSyncer, rc.localDB, root, rc.persistEverything), nil } -// Merge performs a 3-way merge operation between the specified roots and returns -// a receipt for the merged root. -func (rc *RootCache) Merge( - ctx context.Context, - ns common.Namespace, - version uint64, - base hash.Hash, - others []hash.Hash, -) (*hash.Hash, error) { - if len(others) == 0 { - // No other roots passed, no reason to call the operation. - return nil, ErrNoMergeRoots - } - - // Make sure that all roots exist in storage before doing any work. - if !rc.localDB.HasRoot(Root{Namespace: ns, Version: version, Hash: base}) { - return nil, ErrRootNotFound - } - for _, rootHash := range others { - if !rc.localDB.HasRoot(Root{Namespace: ns, Version: version + 1, Hash: rootHash}) { - return nil, ErrRootNotFound - } - } - - if len(others) == 1 { - // Fast path: nothing to merge, just return the only root. - return &others[0], nil - } - - // Start with the first root. - // TODO: WithStorageProof. - tree := mkvs.NewWithRoot(nil, rc.localDB, Root{Namespace: ns, Version: version + 1, Hash: others[0]}) - defer tree.Close() - - // Apply operations from all roots. - baseRoot := Root{Namespace: ns, Version: version, Hash: base} - for _, rootHash := range others[1:] { - it, err := rc.localDB.GetWriteLog(ctx, baseRoot, Root{Namespace: ns, Version: version + 1, Hash: rootHash}) - if err != nil { - return nil, fmt.Errorf("storage/rootcache: failed to read write log: %w", err) - } - - if err = tree.ApplyWriteLog(ctx, it); err != nil { - return nil, fmt.Errorf("storage/rootcache: failed to apply write log: %w", err) - } - } - - var mergedRoot hash.Hash - var err error - if _, mergedRoot, err = tree.Commit(ctx, ns, version+1); err != nil { - return nil, fmt.Errorf("storage/rootcache: failed to commit write log: %w", err) - } - - return &mergedRoot, nil -} - // Apply applies the write log, bypassing the apply operation iff the new root // already is in the node database. func (rc *RootCache) Apply( diff --git a/go/storage/client/client.go b/go/storage/client/client.go index a7cdb3451ca..b1d48aee2b8 100644 --- a/go/storage/client/client.go +++ b/go/storage/client/client.go @@ -280,30 +280,6 @@ func (b *storageClientBackend) ApplyBatch(ctx context.Context, request *api.Appl ) } -func (b *storageClientBackend) Merge(ctx context.Context, request *api.MergeRequest) ([]*api.Receipt, error) { - return b.writeWithClient( - ctx, - request.Namespace, - request.Round+1, - func(ctx context.Context, c api.Backend, node *node.Node) (interface{}, error) { - return c.Merge(ctx, request) - }, - nil, - ) -} - -func (b *storageClientBackend) MergeBatch(ctx context.Context, request *api.MergeBatchRequest) ([]*api.Receipt, error) { - return b.writeWithClient( - ctx, - request.Namespace, - request.Round+1, - func(ctx context.Context, c api.Backend, node *node.Node) (interface{}, error) { - return c.MergeBatch(ctx, request) - }, - nil, - ) -} - func (b *storageClientBackend) readWithClient( ctx context.Context, ns common.Namespace, diff --git a/go/storage/database/database.go b/go/storage/database/database.go index 7f1a75832fb..73fc4106746 100644 --- a/go/storage/database/database.go +++ b/go/storage/database/database.go @@ -138,38 +138,6 @@ func (ba *databaseBackend) ApplyBatch(ctx context.Context, request *api.ApplyBat return []*api.Receipt{receipt}, err } -func (ba *databaseBackend) Merge(ctx context.Context, request *api.MergeRequest) ([]*api.Receipt, error) { - if ba.readOnly { - return nil, fmt.Errorf("storage/database: failed to Merge: %w", api.ErrReadOnly) - } - - newRoot, err := ba.rootCache.Merge(ctx, request.Namespace, request.Round, request.Base, request.Others) - if err != nil { - return nil, fmt.Errorf("storage/database: failed to Merge: %w", err) - } - - receipt, err := api.SignReceipt(ba.signer, request.Namespace, request.Round+1, []hash.Hash{*newRoot}) - return []*api.Receipt{receipt}, err -} - -func (ba *databaseBackend) MergeBatch(ctx context.Context, request *api.MergeBatchRequest) ([]*api.Receipt, error) { - if ba.readOnly { - return nil, fmt.Errorf("storage/database: failed to MergeBatch: %w", api.ErrReadOnly) - } - - newRoots := make([]hash.Hash, 0, len(request.Ops)) - for _, op := range request.Ops { - newRoot, err := ba.rootCache.Merge(ctx, request.Namespace, request.Round, op.Base, op.Others) - if err != nil { - return nil, fmt.Errorf("storage/database: failed to Merge, op: %w", err) - } - newRoots = append(newRoots, *newRoot) - } - - receipt, err := api.SignReceipt(ba.signer, request.Namespace, request.Round+1, newRoots) - return []*api.Receipt{receipt}, err -} - func (ba *databaseBackend) Cleanup() { ba.nodedb.Close() } diff --git a/go/storage/metrics.go b/go/storage/metrics.go index 1b1b06b12e2..8477d159b92 100644 --- a/go/storage/metrics.go +++ b/go/storage/metrics.go @@ -51,8 +51,6 @@ var ( labelApply = prometheus.Labels{"call": "apply"} labelApplyBatch = prometheus.Labels{"call": "apply_batch"} - labelMerge = prometheus.Labels{"call": "merge"} - labelMergeBatch = prometheus.Labels{"call": "merge_batch"} labelSyncGet = prometheus.Labels{"call": "sync_get"} labelSyncGetPrefixes = prometheus.Labels{"call": "sync_get_prefixes"} labelSyncIterate = prometheus.Labels{"call": "sync_iterate"} @@ -114,32 +112,6 @@ func (w *metricsWrapper) ApplyBatch(ctx context.Context, request *api.ApplyBatch return receipts, err } -func (w *metricsWrapper) Merge(ctx context.Context, request *api.MergeRequest) ([]*api.Receipt, error) { - start := time.Now() - receipts, err := w.Backend.Merge(ctx, request) - storageLatency.With(labelMerge).Observe(time.Since(start).Seconds()) - if err != nil { - storageFailures.With(labelMerge).Inc() - return nil, err - } - - storageCalls.With(labelMerge).Inc() - return receipts, err -} - -func (w *metricsWrapper) MergeBatch(ctx context.Context, request *api.MergeBatchRequest) ([]*api.Receipt, error) { - start := time.Now() - receipts, err := w.Backend.MergeBatch(ctx, request) - storageLatency.With(labelMergeBatch).Observe(time.Since(start).Seconds()) - if err != nil { - storageFailures.With(labelMergeBatch).Inc() - return nil, err - } - - storageCalls.With(labelMergeBatch).Inc() - return receipts, err -} - func (w *metricsWrapper) SyncGet(ctx context.Context, request *api.GetRequest) (*api.ProofResponse, error) { start := time.Now() res, err := w.Backend.SyncGet(ctx, request) diff --git a/go/storage/tests/tester.go b/go/storage/tests/tester.go index e7f2f749ef5..d49f2b71fbf 100644 --- a/go/storage/tests/tester.go +++ b/go/storage/tests/tester.go @@ -17,7 +17,6 @@ import ( "github.com/oasisprotocol/oasis-core/go/storage/api" "github.com/oasisprotocol/oasis-core/go/storage/mkvs" "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" - "github.com/oasisprotocol/oasis-core/go/storage/mkvs/writelog" ) var testValues = [][]byte{ @@ -92,9 +91,6 @@ func StorageImplementationTests(t *testing.T, localBackend api.LocalBackend, bac t.Run("Basic", func(t *testing.T) { testBasic(t, localBackend, backend, namespace, round) }) - t.Run("Merge", func(t *testing.T) { - testMerge(t, backend, namespace, round) - }) } func testBasic(t *testing.T, localBackend api.LocalBackend, backend api.Backend, namespace common.Namespace, round uint64) { @@ -263,107 +259,3 @@ func testBasic(t *testing.T, localBackend api.LocalBackend, backend api.Backend, require.Equal(t, cp.Chunks[0], hb.Build(), "GetCheckpointChunk must return correct chunk") }) } - -func testMerge(t *testing.T, backend api.Backend, namespace common.Namespace, round uint64) { - ctx := context.Background() - - writeLogs := []api.WriteLog{ - // Base root. - { - api.LogEntry{Key: []byte("foo"), Value: []byte("i am base")}, - }, - // First root. - { - api.LogEntry{Key: []byte("first"), Value: []byte("i am first root")}, - }, - // Second root. - { - api.LogEntry{Key: []byte("second"), Value: []byte("i am second root")}, - }, - // Third root. - { - api.LogEntry{Key: []byte("third"), Value: []byte("i am third root")}, - }, - } - - // Create all roots. - var roots []hash.Hash - for idx, writeLog := range writeLogs { - var dstRound uint64 - var baseRoot hash.Hash - if idx == 0 { - baseRoot.Empty() - dstRound = round - } else { - baseRoot = roots[0] - dstRound = round + 1 - } - - // Generate expected root hash. - tree := mkvs.NewWithRoot(backend, nil, api.Root{Namespace: namespace, Version: dstRound, Hash: baseRoot}) - defer tree.Close() - err := tree.ApplyWriteLog(ctx, writelog.NewStaticIterator(writeLog)) - require.NoError(t, err, "ApplyWriteLog") - var root hash.Hash - _, root, err = tree.Commit(ctx, namespace, dstRound) - require.NoError(t, err, "Commit") - - // Apply to storage backend. - _, err = backend.Apply(ctx, &api.ApplyRequest{ - Namespace: namespace, - SrcRound: round, - SrcRoot: baseRoot, - DstRound: dstRound, - DstRoot: root, - WriteLog: writeLog, - }) - require.NoError(t, err, "Apply") - - roots = append(roots, root) - } - - // Try to merge with only specifying the base. - _, err := backend.Merge(ctx, &api.MergeRequest{Namespace: namespace, Round: round, Base: roots[0]}) - require.Error(t, err, "Merge without other roots should return an error") - - // Try to merge with only specifying the base and first root. - receipts, err := backend.Merge(ctx, &api.MergeRequest{Namespace: namespace, Round: round, Base: roots[0], Others: roots[1:2]}) - require.NoError(t, err, "Merge") - require.NotNil(t, receipts, "Merge should return receipts") - - for _, receipt := range receipts { - var receiptBody api.ReceiptBody - err = receipt.Open(&receiptBody) - require.NoError(t, err, "receipt.Open") - require.Len(t, receiptBody.Roots, 1, "receipt should contain 1 root") - require.EqualValues(t, roots[1], receiptBody.Roots[0], "merged root should be equal to the only other root") - } - - // Try to merge with specifying the base and all three roots. - receipts, err = backend.Merge(ctx, &api.MergeRequest{Namespace: namespace, Round: round, Base: roots[0], Others: roots[1:]}) - require.NoError(t, err, "Merge") - require.NotNil(t, receipts, "Merge should return receipts") - - var mergedRoot hash.Hash - for _, receipt := range receipts { - var receiptBody api.ReceiptBody - err = receipt.Open(&receiptBody) - require.NoError(t, err, "receipt.Open") - require.Len(t, receiptBody.Roots, 1, "receipt should contain 1 root") - - mergedRoot = receiptBody.Roots[0] - } - - // Make sure that the merged root is the same as applying all write logs against - // the base root. - tree := mkvs.NewWithRoot(backend, nil, api.Root{Namespace: namespace, Version: round, Hash: roots[0]}) - defer tree.Close() - for _, writeLog := range writeLogs[1:] { - err = tree.ApplyWriteLog(ctx, writelog.NewStaticIterator(writeLog)) - require.NoError(t, err, "ApplyWriteLog") - } - _, expectedRoot, err := tree.Commit(ctx, namespace, round+1) - require.NoError(t, err, "Commit") - - require.Equal(t, expectedRoot, mergedRoot, "merged root should match expected root") -} diff --git a/go/worker/compute/merge/committee/node.go b/go/worker/compute/merge/committee/node.go index 206215db314..26b2e888e84 100644 --- a/go/worker/compute/merge/committee/node.go +++ b/go/worker/compute/merge/committee/node.go @@ -22,8 +22,6 @@ import ( "github.com/oasisprotocol/oasis-core/go/roothash/api/block" "github.com/oasisprotocol/oasis-core/go/roothash/api/commitment" runtimeCommittee "github.com/oasisprotocol/oasis-core/go/runtime/committee" - scheduler "github.com/oasisprotocol/oasis-core/go/scheduler/api" - storage "github.com/oasisprotocol/oasis-core/go/storage/api" workerCommon "github.com/oasisprotocol/oasis-core/go/worker/common" "github.com/oasisprotocol/oasis-core/go/worker/common/committee" "github.com/oasisprotocol/oasis-core/go/worker/common/p2p" @@ -432,7 +430,7 @@ func (n *Node) tryFinalizeResultsLocked(pool *commitment.Pool, didTimeout bool) n.logger.Info("have valid commitments from all committees, merging") - commitments := state.pool.GetExecutorCommitments() + commitments := state.pool.GetOpenExecutorCommitments() if epoch.IsMergeBackupWorker() && state.pendingEvent == nil { // Backup workers only perform merge after receiving a discrepancy event. @@ -445,12 +443,10 @@ func (n *Node) tryFinalizeResultsLocked(pool *commitment.Pool, didTimeout bool) } // Guarded by n.commonNode.CrossNode. -func (n *Node) startMergeLocked(commitments []commitment.ExecutorCommitment, results []*commitment.ComputeResultsHeader) { +func (n *Node) startMergeLocked(commitments []commitment.OpenExecutorCommitment, results []*commitment.ComputeResultsHeader) { doneCh := make(chan *commitment.MergeBody, 1) ctx, cancel := context.WithCancel(n.roundCtx) - epoch := n.commonNode.Group.GetEpochSnapshot() - // Create empty block based on previous block while we hold the lock. prevBlk := n.commonNode.CurrentBlock blk := block.NewEmptyBlock(prevBlk, 0, block.Normal) @@ -466,102 +462,38 @@ func (n *Node) startMergeLocked(commitments []commitment.ExecutorCommitment, res ctx, cancel = context.WithTimeout(ctx, n.commonCfg.StorageCommitTimeout) defer cancel() - var ioRoots, stateRoots []hash.Hash - var messages []*block.Message - for _, result := range results { - ioRoots = append(ioRoots, result.IORoot) - stateRoots = append(stateRoots, result.StateRoot) - - // Merge roothash messages. - // The rule is that at most one result can have sent roothash messages. - if len(result.Messages) > 0 { - if messages != nil { - n.logger.Error("multiple committees sent roothash messages") - return + var mergeBody commitment.MergeBody + switch len(results) { + case 1: + // Optimize the case where there is only a single committee -- there is nothing to merge + // so we can avoid a round trip to the storage nodes which already have the roots. + blk.Header.Messages = results[0].Messages + blk.Header.IORoot = results[0].IORoot + blk.Header.StateRoot = results[0].StateRoot + + // Collect all distinct storage signatures. + storageSigSet := make(map[signature.PublicKey]bool) + for _, ec := range commitments { + mergeBody.ExecutorCommits = append(mergeBody.ExecutorCommits, ec.ExecutorCommitment) + + for _, s := range ec.Body.StorageSignatures { + if storageSigSet[s.PublicKey] { + continue + } + storageSigSet[s.PublicKey] = true + blk.Header.StorageSignatures = append(blk.Header.StorageSignatures, s) } - messages = result.Messages } - } - var emptyRoot hash.Hash - emptyRoot.Empty() - - // NOTE: Order is important for verifying the receipt. - mergeOps := []storage.MergeOp{ - // I/O root. - { - Base: emptyRoot, - Others: ioRoots, - }, - // State root. - { - Base: prevBlk.Header.StateRoot, - Others: stateRoots, - }, - } - - receipts, err := n.commonNode.Storage.MergeBatch(ctx, &storage.MergeBatchRequest{ - Namespace: prevBlk.Header.Namespace, - Round: prevBlk.Header.Round, - Ops: mergeOps, - }) - if err != nil { - n.logger.Error("failed to merge", - "err", err, - ) - return - } - - signatures := []signature.Signature{} - for idx, receipt := range receipts { - var receiptBody storage.ReceiptBody - if err = receipt.Open(&receiptBody); err != nil { - n.logger.Error("failed to open receipt", - "receipt", receipt, - "err", err, - ) - return - } - - // Make sure that all merged roots from all storage nodes are the same. - ioRoot := receiptBody.Roots[0] - stateRoot := receiptBody.Roots[1] - if idx == 0 { - blk.Header.IORoot = ioRoot - blk.Header.StateRoot = stateRoot - } else if !blk.Header.IORoot.Equal(&ioRoot) || !blk.Header.StateRoot.Equal(&stateRoot) { - n.logger.Error("storage nodes returned different merge roots", - "first_io_root", blk.Header.IORoot, - "io_root", ioRoot, - "first_state_root", blk.Header.StateRoot, - "state_root", stateRoot, - ) - inconsistentMergeRootCount.With(n.getMetricLabels()).Inc() - return - } - - if err = blk.Header.VerifyStorageReceipt(&receiptBody); err != nil { - n.logger.Error("failed to validate receipt body", - "receipt body", receiptBody, - "err", err, - ) - return - } - signatures = append(signatures, receipt.Signature) - } - if err := epoch.VerifyCommitteeSignatures(scheduler.KindStorage, signatures); err != nil { - n.logger.Error("failed to validate receipt signer", - "err", err, - ) + mergeBody.Header = blk.Header + default: + // Multiple committees, we need to perform a storage merge operation. + n.logger.Error("merge from multiple committees not supported") return } - blk.Header.Messages = messages - blk.Header.StorageSignatures = signatures - doneCh <- &commitment.MergeBody{ - ExecutorCommits: commitments, - Header: blk.Header, - } + // Submit the merge result. + doneCh <- &mergeBody }() } diff --git a/go/worker/compute/merge/committee/state.go b/go/worker/compute/merge/committee/state.go index 4a452927b6f..4df3b8707eb 100644 --- a/go/worker/compute/merge/committee/state.go +++ b/go/worker/compute/merge/committee/state.go @@ -115,7 +115,7 @@ func (s StateWaitingForResults) String() string { // StateWaitingForEvent is the waiting for event state. type StateWaitingForEvent struct { - commitments []commitment.ExecutorCommitment + commitments []commitment.OpenExecutorCommitment results []*commitment.ComputeResultsHeader } diff --git a/go/worker/storage/committee/policy.go b/go/worker/storage/committee/policy.go index 2ef01d54c1c..10285d413db 100644 --- a/go/worker/storage/committee/policy.go +++ b/go/worker/storage/committee/policy.go @@ -22,10 +22,7 @@ var ( }, } mergeCommitteePolicy = &committee.AccessPolicy{ - Actions: []accessctl.Action{ - accessctl.Action(api.MethodMerge.FullName()), - accessctl.Action(api.MethodMergeBatch.FullName()), - }, + Actions: []accessctl.Action{}, } // NOTE: GetDiff/GetCheckpoint* need to be accessible to all storage nodes, // not just the ones in the current storage committee so that new nodes can @@ -44,8 +41,6 @@ var ( accessctl.Action(api.MethodGetCheckpointChunk.FullName()), accessctl.Action(api.MethodApply.FullName()), accessctl.Action(api.MethodApplyBatch.FullName()), - accessctl.Action(api.MethodMerge.FullName()), - accessctl.Action(api.MethodMergeBatch.FullName()), }, } ) diff --git a/go/worker/storage/service_external.go b/go/worker/storage/service_external.go index fbc19fb208c..43e5908afa1 100644 --- a/go/worker/storage/service_external.go +++ b/go/worker/storage/service_external.go @@ -122,52 +122,6 @@ func (s *storageService) ApplyBatch(ctx context.Context, request *api.ApplyBatch return s.storage.ApplyBatch(ctx, request) } -func (s *storageService) Merge(ctx context.Context, request *api.MergeRequest) ([]*api.Receipt, error) { - if err := s.ensureInitialized(ctx); err != nil { - return nil, err - } - if s.debugRejectUpdates { - return nil, errDebugRejectUpdates - } - - // Limit maximum number of roots to merge. - cfg, err := s.getConfig(ctx, request.Namespace) - if err != nil { - return nil, err - } - if uint64(len(request.Others)) > cfg.MaxMergeRoots { - return nil, api.ErrLimitReached - } - - return s.storage.Merge(ctx, request) -} - -func (s *storageService) MergeBatch(ctx context.Context, request *api.MergeBatchRequest) ([]*api.Receipt, error) { - if err := s.ensureInitialized(ctx); err != nil { - return nil, err - } - if s.debugRejectUpdates { - return nil, errDebugRejectUpdates - } - - // Limit maximum number of operations in a batch. - cfg, err := s.getConfig(ctx, request.Namespace) - if err != nil { - return nil, err - } - if uint64(len(request.Ops)) > cfg.MaxMergeOps { - return nil, api.ErrLimitReached - } - // Limit maximum number of roots to merge. - for _, op := range request.Ops { - if uint64(len(op.Others)) > cfg.MaxMergeRoots { - return nil, api.ErrLimitReached - } - } - - return s.storage.MergeBatch(ctx, request) -} - func (s *storageService) GetDiff(ctx context.Context, request *api.GetDiffRequest) (api.WriteLogIterator, error) { if err := s.ensureInitialized(ctx); err != nil { return nil, err