Skip to content

Commit

Permalink
go/roothash: Add support for failure indicating commitments
Browse files Browse the repository at this point in the history
  • Loading branch information
ptrus committed Oct 9, 2020
1 parent 849b4a9 commit 7aa195b
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 59 deletions.
2 changes: 1 addition & 1 deletion go/common/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ var (
// checked in Oasis Core.
// It is converted to TendermintAppVersion whose compatibility is checked
// via Tendermint's version checks.
ConsensusProtocol = Version{Major: 1, Minor: 0, Patch: 0}
ConsensusProtocol = Version{Major: 2, Minor: 0, Patch: 0}

// TendermintAppVersion is Tendermint ABCI application's version computed by
// masking non-major consensus protocol version segments to 0 to be
Expand Down
3 changes: 3 additions & 0 deletions go/roothash/api/commitment/commitment.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ type OpenCommitment interface {
// type.
MostlyEqual(OpenCommitment) bool

// IsIndicatingFailure returns true if this commitment indicates a failure.
IsIndicatingFailure() bool

// ToVote returns a hash that represents a vote for this commitment as
// per discrepancy resolution criteria.
ToVote() hash.Hash
Expand Down
28 changes: 25 additions & 3 deletions go/roothash/api/commitment/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,32 @@ func (h *ComputeResultsHeader) EncodedHash() hash.Hash {
return hash.NewFrom(h)
}

// ExecutorCommitmentFailure is the executor commitment failure reason.
type ExecutorCommitmentFailure uint8

const (
// FailureNone indicates that no failure has occurred.
FailureNone ExecutorCommitmentFailure = 0
// FailureUnknown indicates a generic failure.
FailureUnknown ExecutorCommitmentFailure = 1
// FailureStorageUnavailable indicates that batch processing failed due to
// storage being unavailable.
FailureStorageUnavailable ExecutorCommitmentFailure = 2
)

// ComputeBody holds the data signed in a compute worker commitment.
type ComputeBody struct {
Header ComputeResultsHeader `json:"header"`
StorageSignatures []signature.Signature `json:"storage_signatures"`
RakSig signature.RawSignature `json:"rak_sig"`
Header ComputeResultsHeader `json:"header"`
Failure ExecutorCommitmentFailure `json:"failure,omitempty"`

TxnSchedSig signature.Signature `json:"txn_sched_sig"`
InputRoot hash.Hash `json:"input_root"`
InputStorageSigs []signature.Signature `json:"input_storage_sigs"`

// Optional fields (may be absent for failure indication).

StorageSignatures []signature.Signature `json:"storage_signatures,omitempty"`
RakSig signature.RawSignature `json:"rak_sig,omitempty"`
}

// VerifyTxnSchedSignature rebuilds the batch dispatch message from the data
Expand Down Expand Up @@ -172,6 +189,11 @@ func (c OpenExecutorCommitment) MostlyEqual(other OpenCommitment) bool {
return h.Equal(&otherHash)
}

// IsIndicatingFailure returns true if this commitment indicates a failure.
func (c OpenExecutorCommitment) IsIndicatingFailure() bool {
return c.Body.Failure != FailureNone
}

// ToVote returns a hash that represents a vote for this commitment as
// per discrepancy resolution criteria.
func (c OpenExecutorCommitment) ToVote() hash.Hash {
Expand Down
142 changes: 88 additions & 54 deletions go/roothash/api/commitment/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,35 +226,6 @@ func (p *Pool) addOpenExecutorCommitment(
return ErrNotBasedOnCorrectBlock
}

// Verify RAK-attestation.
if p.Runtime.TEEHardware != node.TEEHardwareInvalid {
n, err := nl.Node(ctx, id)
if err != nil {
// This should never happen as nodes cannot disappear mid-epoch.
logger.Warn("unable to fetch node descriptor to verify RAK-attestation",
"err", err,
"node_id", id,
)
return ErrNotInCommittee
}

rt := n.GetRuntime(p.Runtime.ID)
if rt == nil {
// We currently prevent this case throughout the rest of the system.
// Still, it's prudent to check.
logger.Warn("committee member not registered with this runtime",
"runtime_id", p.Runtime.ID,
"node_id", id,
)
return ErrNotInCommittee
}

rak := rt.Capabilities.TEE.RAK
if !rak.Verify(ComputeResultsHeaderSignatureContext, cbor.Marshal(header), body.RakSig[:]) {
return ErrRakSigInvalid
}
}

if err := sv.VerifyTxnSchedulerSignature(body.TxnSchedSig, blk.Header.Round); err != nil {
logger.Debug("executor commitment has bad transaction scheduler signer",
"node_id", id,
Expand All @@ -267,30 +238,80 @@ func (p *Pool) addOpenExecutorCommitment(
return ErrTxnSchedSigInvalid
}

// Check if the header refers to merkle roots in storage.
if uint64(len(body.StorageSignatures)) < p.Runtime.Storage.MinWriteReplication {
logger.Debug("executor commitment doesn't have enough storage receipts",
"node_id", id,
"min_write_replication", p.Runtime.Storage.MinWriteReplication,
"num_receipts", len(body.StorageSignatures),
)
return ErrBadStorageReceipts
}
if err := sv.VerifyCommitteeSignatures(scheduler.KindStorage, body.StorageSignatures); err != nil {
logger.Debug("executor commitment has bad storage receipt signers",
"node_id", id,
"err", err,
)
return err
}
if err := body.VerifyStorageReceiptSignatures(blk.Header.Namespace); err != nil {
logger.Debug("executor commitment has bad storage receipt signatures",
"node_id", id,
"err", err,
)
return p2pError.Permanent(err)
}
switch body.Failure {
case FailureNone:
// Verify RAK-attestation.
if p.Runtime.TEEHardware != node.TEEHardwareInvalid {
n, err := nl.Node(ctx, id)
if err != nil {
// This should never happen as nodes cannot disappear mid-epoch.
logger.Warn("unable to fetch node descriptor to verify RAK-attestation",
"err", err,
"node_id", id,
)
return ErrNotInCommittee
}

rt := n.GetRuntime(p.Runtime.ID)
if rt == nil {
// We currently prevent this case throughout the rest of the system.
// Still, it's prudent to check.
logger.Warn("committee member not registered with this runtime",
"runtime_id", p.Runtime.ID,
"node_id", id,
)
return ErrNotInCommittee
}

rak := rt.Capabilities.TEE.RAK
if !rak.Verify(ComputeResultsHeaderSignatureContext, cbor.Marshal(header), body.RakSig[:]) {
return ErrRakSigInvalid
}
}

// Check if the header refers to merkle roots in storage.
if uint64(len(body.StorageSignatures)) < p.Runtime.Storage.MinWriteReplication {
logger.Debug("executor commitment doesn't have enough storage receipts",
"node_id", id,
"min_write_replication", p.Runtime.Storage.MinWriteReplication,
"num_receipts", len(body.StorageSignatures),
)
return ErrBadStorageReceipts
}
if err := sv.VerifyCommitteeSignatures(scheduler.KindStorage, body.StorageSignatures); err != nil {
logger.Debug("executor commitment has bad storage receipt signers",
"node_id", id,
"err", err,
)
return err
}
if err := body.VerifyStorageReceiptSignatures(blk.Header.Namespace); err != nil {
logger.Debug("executor commitment has bad storage receipt signatures",
"node_id", id,
"err", err,
)
return p2pError.Permanent(err)
}
default:
// In case of failure indicating commitment make sure storage signatures are empty.
if len(body.StorageSignatures) > 0 {
logger.Debug("failure indicating commitment includes storage receipt signatures",
"node_id", id,
"num_receipts", len(body.StorageSignatures),
)
return ErrBadStorageReceipts
}

// In case of failure indicating commitment make sure RAK signature is empty.
if !body.RakSig.Equal(signature.RawSignature{}) {
logger.Debug("failure indicating commitment includes RAK signature",
"node_id", id,
"rak_sig", body.RakSig.String(),
)
return ErrRakSigInvalid
}

}
if p.ExecuteCommitments == nil {
p.ExecuteCommitments = make(map[signature.PublicKey]OpenExecutorCommitment)
}
Expand Down Expand Up @@ -434,6 +455,10 @@ func (p *Pool) DetectDiscrepancy() (OpenCommitment, error) {
if !commit.MostlyEqual(c) {
discrepancyDetected = true
}

if c.IsIndicatingFailure() {
discrepancyDetected = true
}
}

if commit == nil || discrepancyDetected {
Expand All @@ -455,11 +480,12 @@ func (p *Pool) ResolveDiscrepancy() (OpenCommitment, error) {

type voteEnt struct {
commit OpenCommitment
tally int
tally uint64
}

votes := make(map[hash.Hash]*voteEnt)
var backupNodes int
var failuresTally uint64
var backupNodes uint64
for _, n := range p.Committee.Members {
if n.Role != scheduler.RoleBackupWorker {
continue
Expand All @@ -471,6 +497,11 @@ func (p *Pool) ResolveDiscrepancy() (OpenCommitment, error) {
continue
}

if c.IsIndicatingFailure() {
failuresTally++
continue
}

k := c.ToVote()
if ent, ok := votes[k]; !ok {
votes[k] = &voteEnt{
Expand All @@ -483,6 +514,9 @@ func (p *Pool) ResolveDiscrepancy() (OpenCommitment, error) {
}

minVotes := (backupNodes / 2) + 1
if failuresTally >= minVotes {
return nil, ErrInsufficientVotes // TODO: new error?
}
for _, ent := range votes {
if ent.tally >= minVotes {
return ent.commit, nil
Expand Down
98 changes: 97 additions & 1 deletion go/roothash/api/commitment/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ func TestPoolSingleCommitment(t *testing.T) {
{"BlockBadRound", func(b *ComputeBody) { b.Header.Round-- }, ErrNotBasedOnCorrectBlock},
{"BlockBadPreviousHash", func(b *ComputeBody) { b.Header.PreviousHash.FromBytes([]byte("invalid")) }, ErrNotBasedOnCorrectBlock},
{"StorageSigs1", func(b *ComputeBody) { b.StorageSignatures = nil }, ErrBadStorageReceipts},
{"FailureIndicatingWithStorageSigs", func(b *ComputeBody) { b.Failure = FailureStorageUnavailable }, ErrBadStorageReceipts},
} {
_, _, invalidBody := generateComputeBody(t)
invalidBody.StorageSignatures = append([]signature.Signature{}, body.StorageSignatures...)
Expand Down Expand Up @@ -760,20 +761,115 @@ func TestExecutorTimeoutRequest(t *testing.T) {
}
}

// Timeout after commitment.
// Generate a commitment.
childBlk, _, body := generateComputeBody(t)
commit1, err := SignExecutorCommitment(sk1, &body)
require.NoError(err, "SignExecutorCommitment")
// Adding commitment 1 should succeed.
err = pool.AddExecutorCommitment(ctx, childBlk, nopSV, nl, commit1)
require.NoError(err, "AddExecutorCommitment")

// Timeout after commitment should fail.
err = pool.CheckProposerTimeout(ctx, childBlk, nopSV, nl, sk2.Public(), 0)
require.Error(err, "CheckProposerTimeout commitment exists")
require.Equal(ErrAlreadyCommitted, err, "CheckProposerTimeout commitment exists")
})
}

func TestPoolFailureIndicatingCommitment(t *testing.T) {
rt, sks, committee, nl := generateMockCommittee(t)
sk1 := sks[0]
sk2 := sks[1]
sk3 := sks[2]

genesisTestHelpers.SetTestChainContext()

// Create a pool.
pool := Pool{
Runtime: rt,
Committee: committee,
}

// Generate a compute body.
childBlk, _, body := generateComputeBody(t)

// Generate a valid commitment.
commit1, err := SignExecutorCommitment(sk1, &body)
require.NoError(t, err, "SignExecutorCommitment")

failedBody := ComputeBody{
Header: ComputeResultsHeader{
Round: body.Header.Round,
PreviousHash: body.Header.PreviousHash,
IORoot: body.Header.IORoot,
StateRoot: body.Header.StateRoot,
},
Failure: FailureStorageUnavailable,
}
failedBody.TxnSchedSig = generateTxnSchedulerSignature(t, childBlk, &failedBody)
commit2, err := SignExecutorCommitment(sk2, &failedBody)
require.NoError(t, err, "SignExecutorCommitment")

commit3, err := SignExecutorCommitment(sk3, &body)
require.NoError(t, err, "SignExecutorCommitment")

// There should not be enough executor commitments.
err = pool.CheckEnoughCommitments(false)
require.Error(t, err, "CheckEnoughCommitments")
require.Equal(t, ErrStillWaiting, err, "CheckEnoughCommitments")
err = pool.CheckEnoughCommitments(true)
require.Error(t, err, "CheckEnoughCommitments")
require.Equal(t, ErrStillWaiting, err, "CheckEnoughCommitments")

// Adding a commitment should succeed.
err = pool.AddExecutorCommitment(context.Background(), childBlk, nopSV, nl, commit1)
require.NoError(t, err, "AddExecutorCommitment")

// Adding a commitment twice for the same node should fail.
err = pool.AddExecutorCommitment(context.Background(), childBlk, nopSV, nl, commit1)
require.Error(t, err, "AddExecutorCommitment(context.Background(), duplicate)")

// There should not be enough executor commitments.
err = pool.CheckEnoughCommitments(false)
require.Error(t, err, "CheckEnoughCommitments")

// Adding a failure indicating commitment.
err = pool.AddExecutorCommitment(context.Background(), childBlk, nopSV, nl, commit2)
require.NoError(t, err, "AddExecutorCommitment")

// There should be enough commitments.
err = pool.CheckEnoughCommitments(false)
require.NoError(t, err, "CheckEnoughCommitments")

// There should be a discrepancy.
_, err = pool.DetectDiscrepancy()
require.Error(t, err, "DetectDiscrepancy")
require.Equal(t, ErrDiscrepancyDetected, err)
require.Equal(t, true, pool.Discrepancy)

// There should not be enough executor commitments from backup workers.
err = pool.CheckEnoughCommitments(false)
require.Error(t, err, "CheckEnoughCommitments")
require.Equal(t, ErrStillWaiting, err, "CheckEnoughCommitments")

// Resolve discrepancy with commit from backup worker.
err = pool.AddExecutorCommitment(context.Background(), childBlk, nopSV, nl, commit3)
require.NoError(t, err, "AddExecutorCommitment")

// There should be enough executor commitments from backup workers.
err = pool.CheckEnoughCommitments(false)
require.NoError(t, err, "CheckEnoughCommitments")

// Discrepancy resolution should succeed.
dc, err := pool.ResolveDiscrepancy()
require.NoError(t, err, "ResolveDiscrepancy")
require.Equal(t, true, pool.Discrepancy)
header := dc.ToDDResult().(ComputeResultsHeader)
require.EqualValues(t, &body.Header, &header, "DR should return the same header")

// TODO: test failure indicating commitments in discrepancy resolution.
}

func generateMockCommittee(t *testing.T) (
rt *registry.Runtime,
sks []signature.Signer,
Expand Down

0 comments on commit 7aa195b

Please sign in to comment.