Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add GetEvents to backends #2841

Merged
merged 9 commits into from
Apr 15, 2020
Merged
6 changes: 6 additions & 0 deletions .changelog/2778.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Add `GetEvents` to backends

The new `GetEvents` call returns all events at a specific height,
without having to watch for them using the `Watch*` methods.
It is currently implemented for the registry, roothash, and staking
backends.
10 changes: 5 additions & 5 deletions go/consensus/tendermint/apps/staking/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,22 @@ var (
QueryApp = api.QueryForApp(AppName)

// KeyTakeEscrow is an ABCI event attribute key for TakeEscrow calls
// (value is an app.TakeEscrowEvent).
// (value is an api.TakeEscrowEvent).
KeyTakeEscrow = stakingState.KeyTakeEscrow

// KeyReclaimEscrow is an ABCI event attribute key for ReclaimEscrow
// calls (value is an app.ReclaimEscrowEvent).
// calls (value is an api.ReclaimEscrowEvent).
KeyReclaimEscrow = []byte("reclaim_escrow")

// KeyTransfer is an ABCI event attribute key for Transfers (value is
// an app.TransferEvent).
// an api.TransferEvent).
KeyTransfer = stakingState.KeyTransfer

// KeyBurn is an ABCI event attribute key for Burn calls (value is
// an app.BurnEvent).
// an api.BurnEvent).
KeyBurn = []byte("burn")

// KeyAddEscrow is an ABCI event attribute key for AddEscrow calls
// (value is an app.EscrowEvent).
// (value is an api.AddEscrowEvent).
KeyAddEscrow = []byte("add_escrow")
)
159 changes: 128 additions & 31 deletions go/consensus/tendermint/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ package registry
import (
"bytes"
"context"
"fmt"

"github.com/eapache/channels"
"github.com/pkg/errors"
abcitypes "github.com/tendermint/tendermint/abci/types"
tmrpctypes "github.com/tendermint/tendermint/rpc/core/types"
tmtypes "github.com/tendermint/tendermint/types"

"github.com/oasislabs/oasis-core/go/common/cbor"
"github.com/oasislabs/oasis-core/go/common/crypto/signature"
"github.com/oasislabs/oasis-core/go/common/entity"
"github.com/oasislabs/oasis-core/go/common/logging"
"github.com/oasislabs/oasis-core/go/common/node"
Expand Down Expand Up @@ -150,6 +152,26 @@ func (tb *tendermintBackend) StateToGenesis(ctx context.Context, height int64) (
return q.Genesis(ctx)
}

func (tb *tendermintBackend) GetEvents(ctx context.Context, height int64) ([]api.Event, error) {
// Get block results at given height.
var results *tmrpctypes.ResultBlockResults
results, err := tb.service.GetBlockResults(height)
if err != nil {
tb.logger.Error("failed to get tendermint block results",
"err", err,
"height", height,
)
return nil, err
}

// Decode events from block results.
tmEvents := append(results.BeginBlockEvents, results.EndBlockEvents...)
for _, txResults := range results.TxsResults {
tmEvents = append(tmEvents, txResults.Events...)
}
return tb.onABCIEvents(ctx, tmEvents, height, false)
}

func (tb *tendermintBackend) worker(ctx context.Context) {
// Subscribe to transactions which modify state.
sub, err := tb.service.Subscribe("registry-worker", app.QueryApp)
Expand Down Expand Up @@ -189,72 +211,123 @@ func (tb *tendermintBackend) onEventDataNewBlock(ctx context.Context, ev tmtypes
events := append([]abcitypes.Event{}, ev.ResultBeginBlock.GetEvents()...)
events = append(events, ev.ResultEndBlock.GetEvents()...)

tb.onABCIEvents(ctx, events, ev.Block.Header.Height)
_, _ = tb.onABCIEvents(ctx, events, ev.Block.Header.Height, true)
}

func (tb *tendermintBackend) onEventDataTx(ctx context.Context, tx tmtypes.EventDataTx) {
tb.onABCIEvents(ctx, tx.Result.Events, tx.Height)
_, _ = tb.onABCIEvents(ctx, tx.Result.Events, tx.Height, true)
}

func (tb *tendermintBackend) onABCIEvents(ctx context.Context, events []abcitypes.Event, height int64) {
for _, tmEv := range events {
func (tb *tendermintBackend) onABCIEvents(ctx context.Context, tmEvents []abcitypes.Event, height int64, doBroadcast bool) ([]api.Event, error) { // nolint: gocyclo
var events []api.Event
for _, tmEv := range tmEvents {
// Ignore events that don't relate to the registry app.
if tmEv.GetType() != app.EventType {
continue
}

for _, pair := range tmEv.GetAttributes() {
if bytes.Equal(pair.GetKey(), app.KeyNodesExpired) {
key := pair.GetKey()
val := pair.GetValue()
if bytes.Equal(key, app.KeyNodesExpired) {
// Nodes expired event.
var nodes []*node.Node
if err := cbor.Unmarshal(pair.GetValue(), &nodes); err != nil {
if err := cbor.Unmarshal(val, &nodes); err != nil {
tb.logger.Error("worker: failed to get nodes from tag",
"err", err,
)
if doBroadcast {
continue
} else {
return nil, fmt.Errorf("registry: corrupt NodesExpired event: %w", err)
}
}

// Generate node deregistration events.
for _, node := range nodes {
tb.nodeNotifier.Broadcast(&api.NodeEvent{
ne := &api.NodeEvent{
Node: node,
IsRegistration: false,
})
}

if doBroadcast {
tb.nodeNotifier.Broadcast(ne)
} else {
events = append(events, api.Event{NodeEvent: ne})
}
}
} else if bytes.Equal(pair.GetKey(), app.KeyRuntimeRegistered) {
} else if bytes.Equal(key, app.KeyRuntimeRegistered) {
// Runtime registered event.
var rt api.Runtime
if err := cbor.Unmarshal(pair.GetValue(), &rt); err != nil {
if err := cbor.Unmarshal(val, &rt); err != nil {
tb.logger.Error("worker: failed to get runtime from tag",
"err", err,
)
continue
if doBroadcast {
continue
} else {
return nil, fmt.Errorf("registry: corrupt RuntimeRegistered event: %w", err)
}
}

tb.runtimeNotifier.Broadcast(&rt)
} else if bytes.Equal(pair.GetKey(), app.KeyEntityRegistered) {
if doBroadcast {
tb.runtimeNotifier.Broadcast(&rt)
} else {
evt := api.Event{
RuntimeEvent: &api.RuntimeEvent{Runtime: &rt},
}
events = append(events, evt)
}
} else if bytes.Equal(key, app.KeyEntityRegistered) {
// Entity registered event.
var ent entity.Entity
if err := cbor.Unmarshal(pair.GetValue(), &ent); err != nil {
if err := cbor.Unmarshal(val, &ent); err != nil {
tb.logger.Error("worker: failed to get entity from tag",
"err", err,
)
continue
if doBroadcast {
continue
} else {
return nil, fmt.Errorf("registry: corrupt EntityRegistered event: %w", err)
}
}

tb.entityNotifier.Broadcast(&api.EntityEvent{
eev := &api.EntityEvent{
Entity: &ent,
IsRegistration: true,
})
} else if bytes.Equal(pair.GetKey(), app.KeyEntityDeregistered) {
}

if doBroadcast {
tb.entityNotifier.Broadcast(eev)
} else {
events = append(events, api.Event{EntityEvent: eev})
}
} else if bytes.Equal(key, app.KeyEntityDeregistered) {
// Entity deregistered event.
var dereg app.EntityDeregistration
if err := cbor.Unmarshal(pair.GetValue(), &dereg); err != nil {
if err := cbor.Unmarshal(val, &dereg); err != nil {
tb.logger.Error("worker: failed to get entity deregistration from tag",
"err", err,
)
continue
if doBroadcast {
continue
} else {
return nil, fmt.Errorf("registry: corrupt EntityDeregistered event: %w", err)
}
}

// Entity deregistration.
tb.entityNotifier.Broadcast(&api.EntityEvent{
eev := &api.EntityEvent{
Entity: &dereg.Entity,
IsRegistration: false,
})
} else if bytes.Equal(pair.GetKey(), app.KeyRegistryNodeListEpoch) {
}

if doBroadcast {
tb.entityNotifier.Broadcast(eev)
} else {
events = append(events, api.Event{EntityEvent: eev})
}
} else if bytes.Equal(key, app.KeyRegistryNodeListEpoch) && doBroadcast {
// Node list epoch event.
nl, err := tb.getNodeList(ctx, height)
if err != nil {
tb.logger.Error("worker: failed to get node list",
Expand All @@ -264,22 +337,46 @@ func (tb *tendermintBackend) onABCIEvents(ctx context.Context, events []abcitype
continue
}
tb.nodeListNotifier.Broadcast(nl)
} else if bytes.Equal(pair.GetKey(), app.KeyNodeRegistered) {
} else if bytes.Equal(key, app.KeyNodeRegistered) {
// Node registered event.
var n node.Node
if err := cbor.Unmarshal(pair.GetValue(), &n); err != nil {
if err := cbor.Unmarshal(val, &n); err != nil {
tb.logger.Error("worker: failed to get node from tag",
"err", err,
)
continue
if doBroadcast {
continue
} else {
return nil, fmt.Errorf("registry: corrupt NodeRegistered event: %w", err)
}
}

tb.nodeNotifier.Broadcast(&api.NodeEvent{
nev := &api.NodeEvent{
Node: &n,
IsRegistration: true,
})
}

if doBroadcast {
tb.nodeNotifier.Broadcast(nev)
} else {
events = append(events, api.Event{NodeEvent: nev})
}
} else if bytes.Equal(key, app.KeyNodeUnfrozen) && !doBroadcast {
// Node unfrozen event.
var nid signature.PublicKey
if err := cbor.Unmarshal(val, &nid); err != nil {
return nil, fmt.Errorf("registry: corrupt NodeUnfrozen event: %w", err)
}
evt := api.Event{
NodeUnfrozenEvent: &api.NodeUnfrozenEvent{
NodeID: nid,
},
}
events = append(events, evt)
}
}
}
return events, nil
}

func (tb *tendermintBackend) getNodeList(ctx context.Context, height int64) (*api.NodeList, error) {
Expand All @@ -291,7 +388,7 @@ func (tb *tendermintBackend) getNodeList(ctx context.Context, height int64) (*ap

nodes, err := q.Nodes(ctx)
if err != nil {
return nil, errors.Wrap(err, "registry: failed to query nodes")
return nil, fmt.Errorf("registry: failed to query nodes: %w", err)
}

api.SortNodeList(nodes)
Expand Down
58 changes: 53 additions & 5 deletions go/consensus/tendermint/roothash/roothash.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ package roothash
import (
"bytes"
"context"
"fmt"
"math"
"sync"

"github.com/eapache/channels"
"github.com/pkg/errors"
"github.com/tendermint/tendermint/abci/types"
tmrpctypes "github.com/tendermint/tendermint/rpc/core/types"
tmtypes "github.com/tendermint/tendermint/types"
Expand Down Expand Up @@ -146,16 +146,16 @@ func (tb *tendermintBackend) WatchBlocks(id common.Namespace) (<-chan *api.Annot
func (tb *tendermintBackend) getBlockFromFinalizedTag(ctx context.Context, rawValue []byte, height int64) (*block.Block, *app.ValueFinalized, error) {
var value app.ValueFinalized
if err := cbor.Unmarshal(rawValue, &value); err != nil {
return nil, nil, errors.Wrap(err, "roothash: corrupt finalized tag")
return nil, nil, fmt.Errorf("roothash: corrupt finalized tag: %w", err)
}

block, err := tb.getLatestBlockAt(ctx, value.ID, height)
if err != nil {
return nil, nil, errors.Wrap(err, "roothash: failed to fetch block")
return nil, nil, fmt.Errorf("roothash: failed to fetch block: %w", err)
}

if block.Header.Round != value.Round {
return nil, nil, errors.Errorf("roothash: tag/query round mismatch (tag: %d, query: %d)", value.Round, block.Header.Round)
return nil, nil, fmt.Errorf("roothash: tag/query round mismatch (tag: %d, query: %d)", value.Round, block.Header.Round)
}

return block, &value, nil
Expand Down Expand Up @@ -196,6 +196,54 @@ func (tb *tendermintBackend) StateToGenesis(ctx context.Context, height int64) (
return q.Genesis(ctx)
}

func (tb *tendermintBackend) GetEvents(ctx context.Context, height int64) ([]api.Event, error) {
// Get block results at given height.
var results *tmrpctypes.ResultBlockResults
results, err := tb.service.GetBlockResults(height)
if err != nil {
tb.logger.Error("failed to get tendermint block results",
"err", err,
"height", height,
)
return nil, err
}

// Decode events from block results.
tmEvents := append(results.BeginBlockEvents, results.EndBlockEvents...)
for _, txResults := range results.TxsResults {
tmEvents = append(tmEvents, txResults.Events...)
}
var events []api.Event
for _, tmEv := range tmEvents {
// Ignore events that don't relate to the roothash app.
if tmEv.GetType() != app.EventType {
continue
}

for _, pair := range tmEv.GetAttributes() {
if bytes.Equal(pair.GetKey(), app.KeyMergeDiscrepancyDetected) {
// Merge discrepancy event.
evt := api.Event{
MergeDiscrepancyDetected: &api.MergeDiscrepancyDetectedEvent{},
}
events = append(events, evt)
} else if bytes.Equal(pair.GetKey(), app.KeyExecutionDiscrepancyDetected) {
// Execution discrepancy event.
var eddValue app.ValueExecutionDiscrepancyDetected
if err := cbor.Unmarshal(pair.GetValue(), &eddValue); err != nil {
return nil, fmt.Errorf("roothash: corrupt ExecutionDiscrepancyDetected event: %w", err)
}
evt := api.Event{
ExecutionDiscrepancyDetected: &eddValue.Event,
}
events = append(events, evt)
}
}
}

return events, nil
}

func (tb *tendermintBackend) Cleanup() {
tb.closeOnce.Do(func() {
<-tb.closedCh
Expand Down Expand Up @@ -252,7 +300,7 @@ func (tb *tendermintBackend) reindexBlocks(bh api.BlockHistory) error {
// TODO: Take prune strategy into account (e.g., skip heights).
for height := lastHeight + 1; height <= currentBlk.Height; height++ {
var results *tmrpctypes.ResultBlockResults
results, err = tb.service.GetBlockResults(&height)
results, err = tb.service.GetBlockResults(height)
if err != nil {
tb.logger.Error("failed to get tendermint block",
"err", err,
Expand Down
2 changes: 1 addition & 1 deletion go/consensus/tendermint/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type TendermintService interface {

// GetBlockResults returns the ABCI results from processing a block
// at a specific height.
GetBlockResults(height *int64) (*tmrpctypes.ResultBlockResults, error)
GetBlockResults(height int64) (*tmrpctypes.ResultBlockResults, error)

// WatchTendermintBlocks returns a stream of Tendermint blocks as they are
// returned via the `EventDataNewBlock` query.
Expand Down
Loading