Skip to content

Commit

Permalink
go/worker/compute/merge: Drop support for multiple committees
Browse files Browse the repository at this point in the history
Since there is currently no transaction scheduler implementation which would
support multiple committees, there is no sense in the merge node to try to
support such cases as it could be a source of bugs. Additionally it results
in extra round trips to storage nodes due to the Merge operation which in
case of a single committee does not do anything.

The merge node is also the only client for the Merge* storage operations, so
they can just be removed in order to reduce the exposed API surface.
  • Loading branch information
kostko committed Aug 7, 2020
1 parent 3a8f9a9 commit c50b525
Show file tree
Hide file tree
Showing 23 changed files with 48 additions and 621 deletions.
2 changes: 1 addition & 1 deletion go/genesis/genesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func TestGenesisSanityCheck(t *testing.T) {
MinWriteReplication: 1,
MaxApplyWriteLogEntries: 100_000,
MaxApplyOps: 2,
MaxMergeRoots: 8,
MaxMergeRoots: 1,
MaxMergeOps: 2,
},
AdmissionPolicy: registry.RuntimeAdmissionPolicy{
Expand Down
2 changes: 1 addition & 1 deletion go/oasis-net-runner/fixtures/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func newDefaultFixture() (*oasis.NetworkFixture, error) {
MinWriteReplication: 1,
MaxApplyWriteLogEntries: 100_000,
MaxApplyOps: 2,
MaxMergeRoots: 8,
MaxMergeRoots: 1,
MaxMergeOps: 2,
},
AdmissionPolicy: registry.RuntimeAdmissionPolicy{
Expand Down
39 changes: 7 additions & 32 deletions go/oasis-node/cmd/debug/byzantine/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,18 @@ import (

"github.com/oasisprotocol/oasis-core/go/common"
"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
"github.com/oasisprotocol/oasis-core/go/common/crypto/signature"
"github.com/oasisprotocol/oasis-core/go/common/identity"
consensus "github.com/oasisprotocol/oasis-core/go/consensus/api"
"github.com/oasisprotocol/oasis-core/go/roothash/api/block"
"github.com/oasisprotocol/oasis-core/go/roothash/api/commitment"
storage "github.com/oasisprotocol/oasis-core/go/storage/api"
)

type mergeBatchContext struct {
currentBlock *block.Block
commitments []*commitment.OpenExecutorCommitment

storageReceipts []*storage.Receipt
newBlock *block.Block
commit *commitment.MergeCommitment
newBlock *block.Block
commit *commitment.MergeCommitment
}

func newMergeBatchContext() *mergeBatchContext {
Expand Down Expand Up @@ -86,36 +83,14 @@ func (mbc *mergeBatchContext) process(ctx context.Context, hnss []*honestNodeSto
}
}

var emptyRoot hash.Hash
emptyRoot.Empty()

var err error
mbc.storageReceipts, err = storageBroadcastMergeBatch(ctx, hnss, mbc.currentBlock.Header.Namespace, mbc.currentBlock.Header.Round, []storage.MergeOp{
{
Base: emptyRoot,
Others: ioRoots,
},
{
Base: mbc.currentBlock.Header.StateRoot,
Others: stateRoots,
},
})
if err != nil {
return fmt.Errorf("storage broadcast merge batch: %w", err)
}

var firstReceiptBody storage.ReceiptBody
if err := mbc.storageReceipts[0].Open(&firstReceiptBody); err != nil {
return fmt.Errorf("storage receipt Open: %w", err)
}
var signatures []signature.Signature
for _, receipt := range mbc.storageReceipts {
signatures = append(signatures, receipt.Signature)
if len(collectedCommittees) != 1 {
return fmt.Errorf("multiple committees not supported: %d", len(collectedCommittees))
}
signatures := mbc.commitments[0].Body.StorageSignatures

mbc.newBlock = block.NewEmptyBlock(mbc.currentBlock, 0, block.Normal)
mbc.newBlock.Header.IORoot = firstReceiptBody.Roots[0]
mbc.newBlock.Header.StateRoot = firstReceiptBody.Roots[1]
mbc.newBlock.Header.IORoot = ioRoots[0]
mbc.newBlock.Header.StateRoot = stateRoots[0]
mbc.newBlock.Header.Messages = messages
mbc.newBlock.Header.StorageSignatures = signatures

Expand Down
28 changes: 0 additions & 28 deletions go/oasis-node/cmd/debug/byzantine/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,6 @@ func (hns *honestNodeStorage) ApplyBatch(ctx context.Context, request *storage.A
return hns.client.ApplyBatch(ctx, request)
}

func (hns *honestNodeStorage) Merge(ctx context.Context, request *storage.MergeRequest) ([]*storage.Receipt, error) {
return hns.client.Merge(ctx, request)
}

func (hns *honestNodeStorage) MergeBatch(ctx context.Context, request *storage.MergeBatchRequest) ([]*storage.Receipt, error) {
return hns.client.MergeBatch(ctx, request)
}

func (hns *honestNodeStorage) GetDiff(ctx context.Context, request *storage.GetDiffRequest) (storage.WriteLogIterator, error) {
return hns.client.GetDiff(ctx, request)
}
Expand Down Expand Up @@ -176,23 +168,3 @@ func storageBroadcastApplyBatch(

return receipts, nil
}

func storageBroadcastMergeBatch(
ctx context.Context,
hnss []*honestNodeStorage,
ns common.Namespace,
round uint64,
ops []storage.MergeOp,
) ([]*storage.Receipt, error) {
var receipts []*storage.Receipt
for _, hns := range hnss {
r, err := hns.MergeBatch(ctx, &storage.MergeBatchRequest{Namespace: ns, Round: round, Ops: ops})
if err != nil {
return receipts, fmt.Errorf("honest node storage MergeBatch %s: %w", hns.nodeID, err)
}

receipts = append(receipts, r...)
}

return receipts, nil
}
2 changes: 1 addition & 1 deletion go/oasis-node/cmd/debug/txsource/workload/registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func getRuntime(entityID signature.PublicKey, id common.Namespace) *registry.Run
MinWriteReplication: 1,
MaxApplyWriteLogEntries: 100_000,
MaxApplyOps: 2,
MaxMergeRoots: 8,
MaxMergeRoots: 1,
MaxMergeOps: 2,
},
AdmissionPolicy: registry.RuntimeAdmissionPolicy{
Expand Down
2 changes: 1 addition & 1 deletion go/oasis-node/cmd/registry/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ func init() {
runtimeFlags.Uint64(CfgStorageMinWriteReplication, 1, "Minimum required storage write replication")
runtimeFlags.Uint64(CfgStorageMaxApplyWriteLogEntries, 100_000, "Maximum number of write log entries")
runtimeFlags.Uint64(CfgStorageMaxApplyOps, 2, "Maximum number of apply operations in a batch")
runtimeFlags.Uint64(CfgStorageMaxMergeRoots, 8, "Maximum number of merge roots")
runtimeFlags.Uint64(CfgStorageMaxMergeRoots, 1, "Maximum number of merge roots")
runtimeFlags.Uint64(CfgStorageMaxMergeOps, 2, "Maximum number of merge operations in a batch")
runtimeFlags.Uint64(CfgStorageCheckpointInterval, 0, "Storage checkpoint interval (in rounds)")
runtimeFlags.Uint64(CfgStorageCheckpointNumKept, 0, "Number of storage checkpoints to keep")
Expand Down
2 changes: 1 addition & 1 deletion go/oasis-node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ var (
MinWriteReplication: 1,
MaxApplyWriteLogEntries: 100_000,
MaxApplyOps: 2,
MaxMergeRoots: 8,
MaxMergeRoots: 1,
MaxMergeOps: 2,
},
AdmissionPolicy: registry.RuntimeAdmissionPolicy{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (sc *multipleRuntimesImpl) Fixture() (*oasis.NetworkFixture, error) {
MinWriteReplication: 1,
MaxApplyWriteLogEntries: 100_000,
MaxApplyOps: 2,
MaxMergeRoots: 8,
MaxMergeRoots: 1,
MaxMergeOps: 2,
},
AdmissionPolicy: registry.RuntimeAdmissionPolicy{
Expand Down
2 changes: 1 addition & 1 deletion go/oasis-test-runner/scenario/e2e/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (sc *runtimeImpl) Fixture() (*oasis.NetworkFixture, error) {
MinWriteReplication: 2,
MaxApplyWriteLogEntries: 100_000,
MaxApplyOps: 2,
MaxMergeRoots: 8,
MaxMergeRoots: 1,
MaxMergeOps: 2,
},
AdmissionPolicy: registry.RuntimeAdmissionPolicy{
Expand Down
2 changes: 1 addition & 1 deletion go/registry/tests/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1629,7 +1629,7 @@ func NewTestRuntime(seed []byte, ent *TestEntity, isKeyManager bool) (*TestRunti
MinWriteReplication: 3,
MaxApplyWriteLogEntries: 100_000,
MaxApplyOps: 2,
MaxMergeRoots: 8,
MaxMergeRoots: 1,
MaxMergeOps: 2,
},
AdmissionPolicy: api.RuntimeAdmissionPolicy{
Expand Down
4 changes: 2 additions & 2 deletions go/roothash/api/commitment/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,10 +764,10 @@ func (m *MultiPool) CheckEnoughCommitments() error {
}

// GetExecutorCommitments returns a list of executor commitments in the pool.
func (m *MultiPool) GetExecutorCommitments() (result []ExecutorCommitment) {
func (m *MultiPool) GetOpenExecutorCommitments() (result []OpenExecutorCommitment) {
for _, p := range m.Committees {
for _, c := range p.ExecuteCommitments {
result = append(result, c.ExecutorCommitment)
result = append(result, c)
}
}
return
Expand Down
16 changes: 0 additions & 16 deletions go/runtime/registry/storage_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,6 @@ func (sr *storageRouter) ApplyBatch(ctx context.Context, request *api.ApplyBatch
return rt.Storage().ApplyBatch(ctx, request)
}

func (sr *storageRouter) Merge(ctx context.Context, request *api.MergeRequest) ([]*api.Receipt, error) {
rt, err := sr.getRuntime(request.Namespace)
if err != nil {
return nil, err
}
return rt.Storage().Merge(ctx, request)
}

func (sr *storageRouter) MergeBatch(ctx context.Context, request *api.MergeBatchRequest) ([]*api.Receipt, error) {
rt, err := sr.getRuntime(request.Namespace)
if err != nil {
return nil, err
}
return rt.Storage().MergeBatch(ctx, request)
}

func (sr *storageRouter) GetDiff(ctx context.Context, request *api.GetDiffRequest) (api.WriteLogIterator, error) {
rt, err := sr.getRuntime(request.StartRoot.Namespace)
if err != nil {
Expand Down
43 changes: 1 addition & 42 deletions go/storage/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,8 @@ var (
// ErrUnsupported is the error returned when the called method is not
// supported by the given backend.
ErrUnsupported = errors.New(ModuleName, 4, "storage: method not supported by backend")
// ErrNoMergeRoots is the error returned when no other roots are passed
// to the Merge operation.
ErrNoMergeRoots = errors.New(ModuleName, 5, "storage: no roots to merge")
// ErrLimitReached means that a configured limit has been reached.
ErrLimitReached = errors.New(ModuleName, 6, "storage: limit reached")
ErrLimitReached = errors.New(ModuleName, 5, "storage: limit reached")

// The following errors are reimports from NodeDB.

Expand Down Expand Up @@ -240,14 +237,6 @@ type ApplyOp struct {
WriteLog WriteLog `json:"writelog"`
}

// MergeOps is a merge operation within a batch of merge operations.
type MergeOp struct {
// Base is the base root for the merge.
Base hash.Hash `json:"base"`
// Others is a list of roots derived from base that should be merged.
Others []hash.Hash `json:"others"`
}

// ApplyRequest is an Apply request.
type ApplyRequest struct {
Namespace common.Namespace `json:"namespace"`
Expand All @@ -265,21 +254,6 @@ type ApplyBatchRequest struct {
Ops []ApplyOp `json:"ops"`
}

// MergeRequest is a Merge request.
type MergeRequest struct {
Namespace common.Namespace `json:"namespace"`
Round uint64 `json:"round"`
Base hash.Hash `json:"base"`
Others []hash.Hash `json:"others"`
}

// MergeBatchRequest is a MergeBatch request.
type MergeBatchRequest struct {
Namespace common.Namespace `json:"namespace"`
Round uint64 `json:"round"`
Ops []MergeOp `json:"ops"`
}

// SyncOptions are the sync options.
type SyncOptions struct {
OffsetKey []byte `json:"offset_key"`
Expand Down Expand Up @@ -317,21 +291,6 @@ type Backend interface {
// See Apply for more details.
ApplyBatch(ctx context.Context, request *ApplyBatchRequest) ([]*Receipt, error)

// TODO: Add proof.
// Merge performs a 3-way merge operation between the specified
// roots and returns a receipt for the merged root.
//
// Round is the round of the base root while all other roots are
// expected to be in the next round.
Merge(ctx context.Context, request *MergeRequest) ([]*Receipt, error)

// TODO: Add proof.
// MergeBatch performs multiple sets of merge operations and returns
// a single receipt covering all merged roots.
//
// See Merge for more details.
MergeBatch(ctx context.Context, request *MergeBatchRequest) ([]*Receipt, error)

// GetDiff returns an iterator of write log entries that must be applied
// to get from the first given root to the second one.
GetDiff(ctx context.Context, request *GetDiffRequest) (WriteLogIterator, error)
Expand Down
Loading

0 comments on commit c50b525

Please sign in to comment.