From 961ac172611a5876ba94a06ca64af2a98bddefaf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrej=20Buko=C5=A1ek?= Date: Mon, 13 Apr 2020 12:30:49 +0200 Subject: [PATCH 1/9] {registry,roothash,staking}: Add GetEvents --- go/consensus/tendermint/apps/staking/api.go | 10 +- go/consensus/tendermint/registry/registry.go | 109 +++++++++++++++++++ go/consensus/tendermint/roothash/roothash.go | 48 ++++++++ go/consensus/tendermint/staking/staking.go | 93 ++++++++++++++++ go/registry/api/api.go | 27 +++++ go/registry/api/grpc.go | 37 +++++++ go/roothash/api/api.go | 3 + go/staking/api/api.go | 10 ++ go/staking/api/grpc.go | 37 +++++++ 9 files changed, 369 insertions(+), 5 deletions(-) diff --git a/go/consensus/tendermint/apps/staking/api.go b/go/consensus/tendermint/apps/staking/api.go index 9e42e9e500c..0baa00666b4 100644 --- a/go/consensus/tendermint/apps/staking/api.go +++ b/go/consensus/tendermint/apps/staking/api.go @@ -22,22 +22,22 @@ var ( QueryApp = api.QueryForApp(AppName) // KeyTakeEscrow is an ABCI event attribute key for TakeEscrow calls - // (value is an app.TakeEscrowEvent). + // (value is an api.TakeEscrowEvent). KeyTakeEscrow = stakingState.KeyTakeEscrow // KeyReclaimEscrow is an ABCI event attribute key for ReclaimEscrow - // calls (value is an app.ReclaimEscrowEvent). + // calls (value is an api.ReclaimEscrowEvent). KeyReclaimEscrow = []byte("reclaim_escrow") // KeyTransfer is an ABCI event attribute key for Transfers (value is - // an app.TransferEvent). + // an api.TransferEvent). KeyTransfer = stakingState.KeyTransfer // KeyBurn is an ABCI event attribute key for Burn calls (value is - // an app.BurnEvent). + // an api.BurnEvent). KeyBurn = []byte("burn") // KeyAddEscrow is an ABCI event attribute key for AddEscrow calls - // (value is an app.EscrowEvent). + // (value is an api.AddEscrowEvent). KeyAddEscrow = []byte("add_escrow") ) diff --git a/go/consensus/tendermint/registry/registry.go b/go/consensus/tendermint/registry/registry.go index 39af0a95d18..8e2355f01ff 100644 --- a/go/consensus/tendermint/registry/registry.go +++ b/go/consensus/tendermint/registry/registry.go @@ -8,9 +8,11 @@ import ( "github.com/eapache/channels" "github.com/pkg/errors" abcitypes "github.com/tendermint/tendermint/abci/types" + tmrpctypes "github.com/tendermint/tendermint/rpc/core/types" tmtypes "github.com/tendermint/tendermint/types" "github.com/oasislabs/oasis-core/go/common/cbor" + "github.com/oasislabs/oasis-core/go/common/crypto/signature" "github.com/oasislabs/oasis-core/go/common/entity" "github.com/oasislabs/oasis-core/go/common/logging" "github.com/oasislabs/oasis-core/go/common/node" @@ -150,6 +152,113 @@ func (tb *tendermintBackend) StateToGenesis(ctx context.Context, height int64) ( return q.Genesis(ctx) } +func (tb *tendermintBackend) GetEvents(ctx context.Context, height int64) (*[]api.Event, error) { + // Get block results at given height. + var results *tmrpctypes.ResultBlockResults + results, err := tb.service.GetBlockResults(&height) + if err != nil { + tb.logger.Error("failed to get tendermint block results", + "err", err, + "height", height, + ) + return nil, err + } + + // Decode events from block results. + tmEvents := append(results.Results.BeginBlock.GetEvents(), results.Results.EndBlock.GetEvents()...) + for _, txResults := range results.Results.DeliverTx { + tmEvents = append(tmEvents, txResults.GetEvents()...) + } + events := []api.Event{} + for _, tmEv := range tmEvents { + // Ignore events that don't relate to the registry app. + if tmEv.GetType() != app.EventType { + continue + } + + for _, pair := range tmEv.GetAttributes() { + key := pair.GetKey() + val := pair.GetValue() + if bytes.Equal(key, app.KeyRuntimeRegistered) { + // Runtime registered event. + var rt api.Runtime + if err := cbor.Unmarshal(val, &rt); err != nil { + return nil, errors.Wrap(err, "registry: corrupt RuntimeRegistered event") + } + evt := api.Event{ + RuntimeEvent: &api.RuntimeEvent{Runtime: &rt}, + } + events = append(events, evt) + } else if bytes.Equal(key, app.KeyEntityRegistered) { + // Entity registered event. + var ent entity.Entity + if err := cbor.Unmarshal(val, &ent); err != nil { + return nil, errors.Wrap(err, "registry: corrupt EntityRegistered event") + } + evt := api.Event{ + EntityEvent: &api.EntityEvent{ + Entity: &ent, + IsRegistration: true, + }, + } + events = append(events, evt) + } else if bytes.Equal(key, app.KeyEntityDeregistered) { + // Entity deregistered event. + var dereg app.EntityDeregistration + if err := cbor.Unmarshal(val, &dereg); err != nil { + return nil, errors.Wrap(err, "registry: corrupt EntityDeregistered event") + } + evt := api.Event{ + EntityEvent: &api.EntityEvent{ + Entity: &dereg.Entity, + IsRegistration: false, + }, + } + events = append(events, evt) + } else if bytes.Equal(key, app.KeyNodeRegistered) { + // Node registered event. + var n node.Node + if err := cbor.Unmarshal(val, &n); err != nil { + return nil, errors.Wrap(err, "registry: corrupt NodeRegistered event") + } + evt := api.Event{ + NodeEvent: &api.NodeEvent{ + Node: &n, + IsRegistration: true, + }, + } + events = append(events, evt) + } else if bytes.Equal(key, app.KeyNodesExpired) { + // Nodes expired event. + var nodes []*node.Node + if err := cbor.Unmarshal(val, &nodes); err != nil { + return nil, errors.Wrap(err, "registry: corrupt NodesExpired event") + } + evt := api.Event{ + NodesExpiredEvent: &api.NodesExpiredEvent{ + Nodes: nodes, + }, + } + events = append(events, evt) + } else if bytes.Equal(key, app.KeyNodeUnfrozen) { + // Node unfrozen event. + var nid signature.PublicKey + if err := cbor.Unmarshal(val, &nid); err != nil { + return nil, errors.Wrap(err, "registry: corrupt NodeUnfrozen event") + } + evt := api.Event{ + NodeUnfrozenEvent: &api.NodeUnfrozenEvent{ + NodeID: nid, + }, + } + events = append(events, evt) + } + } + } + + return &events, nil +} + func (tb *tendermintBackend) worker(ctx context.Context) { // Subscribe to transactions which modify state. sub, err := tb.service.Subscribe("registry-worker", app.QueryApp) diff --git a/go/consensus/tendermint/roothash/roothash.go b/go/consensus/tendermint/roothash/roothash.go index 6076db2e676..1bcc89d38cc 100644 --- a/go/consensus/tendermint/roothash/roothash.go +++ b/go/consensus/tendermint/roothash/roothash.go @@ -196,6 +196,54 @@ func (tb *tendermintBackend) StateToGenesis(ctx context.Context, height int64) ( return q.Genesis(ctx) } +func (tb *tendermintBackend) GetEvents(ctx context.Context, height int64) (*[]api.Event, error) { + // Get block results at given height. + var results *tmrpctypes.ResultBlockResults + results, err := tb.service.GetBlockResults(&height) + if err != nil { + tb.logger.Error("failed to get tendermint block results", + "err", err, + "height", height, + ) + return nil, err + } + + // Decode events from block results. + tmEvents := append(results.Results.BeginBlock.GetEvents(), results.Results.EndBlock.GetEvents()...) + for _, txResults := range results.Results.DeliverTx { + tmEvents = append(tmEvents, txResults.GetEvents()...) + } + events := []api.Event{} + for _, tmEv := range tmEvents { + // Ignore events that don't relate to the roothash app. + if tmEv.GetType() != app.EventType { + continue + } + + for _, pair := range tmEv.GetAttributes() { + if bytes.Equal(pair.GetKey(), app.KeyMergeDiscrepancyDetected) { + // Merge discrepancy event. + evt := api.Event{ + MergeDiscrepancyDetected: &api.MergeDiscrepancyDetectedEvent{}, + } + events = append(events, evt) + } else if bytes.Equal(pair.GetKey(), app.KeyExecutionDiscrepancyDetected) { + // Execution discrepancy event. + var eddValue app.ValueExecutionDiscrepancyDetected + if err := cbor.Unmarshal(pair.GetValue(), &eddValue); err != nil { + return nil, errors.Wrap(err, "roothash: corrupt ExecutionDiscrepancyDetected tag") + } + evt := api.Event{ + ExecutionDiscrepancyDetected: &eddValue.Event, + } + events = append(events, evt) + } + } + } + + return &events, nil +} + func (tb *tendermintBackend) Cleanup() { tb.closeOnce.Do(func() { <-tb.closedCh diff --git a/go/consensus/tendermint/staking/staking.go b/go/consensus/tendermint/staking/staking.go index a247c30b16d..4b7ca70bdb1 100644 --- a/go/consensus/tendermint/staking/staking.go +++ b/go/consensus/tendermint/staking/staking.go @@ -5,7 +5,10 @@ import ( "bytes" "context" + "github.com/pkg/errors" + abcitypes "github.com/tendermint/tendermint/abci/types" + tmrpctypes "github.com/tendermint/tendermint/rpc/core/types" tmtypes "github.com/tendermint/tendermint/types" "github.com/oasislabs/oasis-core/go/common/cbor" @@ -140,6 +143,96 @@ func (tb *tendermintBackend) StateToGenesis(ctx context.Context, height int64) ( return q.Genesis(ctx) } +func (tb *tendermintBackend) GetEvents(ctx context.Context, height int64) (*[]api.Event, error) { + // Get block results at given height. + var results *tmrpctypes.ResultBlockResults + results, err := tb.service.GetBlockResults(&height) + if err != nil { + tb.logger.Error("failed to get tendermint block results", + "err", err, + "height", height, + ) + return nil, err + } + + // Decode events from block results. + tmEvents := append(results.Results.BeginBlock.GetEvents(), results.Results.EndBlock.GetEvents()...) + for _, txResults := range results.Results.DeliverTx { + tmEvents = append(tmEvents, txResults.GetEvents()...) + } + events := []api.Event{} + for _, tmEv := range tmEvents { + // Ignore events that don't relate to the staking app. + if tmEv.GetType() != app.EventType { + continue + } + + for _, pair := range tmEv.GetAttributes() { + key := pair.GetKey() + val := pair.GetValue() + if bytes.Equal(key, app.KeyTransfer) { + // Transfer event. + var te api.TransferEvent + if err := cbor.Unmarshal(val, &te); err != nil { + return nil, errors.Wrap(err, "staking: corrupt Transfer event") + } + evt := api.Event{ + TransferEvent: &te, + } + events = append(events, evt) + } else if bytes.Equal(key, app.KeyBurn) { + // Burn event. + var be api.BurnEvent + if err := cbor.Unmarshal(val, &be); err != nil { + return nil, errors.Wrap(err, "staking: corrupt Burn event") + } + evt := api.Event{ + BurnEvent: &be, + } + events = append(events, evt) + } else if bytes.Equal(key, app.KeyAddEscrow) { + // Add escrow event. + var aee api.AddEscrowEvent + if err := cbor.Unmarshal(val, &aee); err != nil { + return nil, errors.Wrap(err, "staking: corrupt AddEscrow event") + } + evt := api.Event{ + EscrowEvent: &api.EscrowEvent{ + Add: &aee, + }, + } + events = append(events, evt) + } else if bytes.Equal(key, app.KeyTakeEscrow) { + // Take escrow event. + var tee api.TakeEscrowEvent + if err := cbor.Unmarshal(val, &tee); err != nil { + return nil, errors.Wrap(err, "staking: corrupt TakeEscrow event") + } + evt := api.Event{ + EscrowEvent: &api.EscrowEvent{ + Take: &tee, + }, + } + events = append(events, evt) + } else if bytes.Equal(key, app.KeyReclaimEscrow) { + // Reclaim escrow event. + var ree api.ReclaimEscrowEvent + if err := cbor.Unmarshal(val, &ree); err != nil { + return nil, errors.Wrap(err, "staking: corrupt ReclaimEscrow event") + } + evt := api.Event{ + EscrowEvent: &api.EscrowEvent{ + Reclaim: &ree, + }, + } + events = append(events, evt) + } + } + } + + return &events, nil +} + func (tb *tendermintBackend) ConsensusParameters(ctx context.Context, height int64) (*api.ConsensusParameters, error) { q, err := tb.querier.QueryAt(ctx, height) if err != nil { diff --git a/go/registry/api/api.go b/go/registry/api/api.go index eb30d26eca8..0cfe6285097 100644 --- a/go/registry/api/api.go +++ b/go/registry/api/api.go @@ -226,6 +226,9 @@ type Backend interface { // StateToGenesis returns the genesis state at specified block height. StateToGenesis(context.Context, int64) (*Genesis, error) + // GetEvents returns the events at specified block height. + GetEvents(ctx context.Context, height int64) (*[]Event, error) + // Cleanup cleans up the registry backend. Cleanup() } @@ -281,6 +284,30 @@ type NodeEvent struct { IsRegistration bool } +// RuntimeEvent signifies new runtime registration. +type RuntimeEvent struct { + Runtime *Runtime +} + +// NodesExpiredEvent signifies node expirations. +type NodesExpiredEvent struct { + Nodes []*node.Node +} + +// NodeUnfrozenEvent signifies when node becomes unfrozen. +type NodeUnfrozenEvent struct { + NodeID signature.PublicKey +} + +// Event is a registry event returned via GetEvents. +type Event struct { + RuntimeEvent *RuntimeEvent + EntityEvent *EntityEvent + NodeEvent *NodeEvent + NodesExpiredEvent *NodesExpiredEvent + NodeUnfrozenEvent *NodeUnfrozenEvent +} + // NodeList is a per-epoch immutable node list. type NodeList struct { Nodes []*node.Node diff --git a/go/registry/api/grpc.go b/go/registry/api/grpc.go index d538236c95f..326e468cc02 100644 --- a/go/registry/api/grpc.go +++ b/go/registry/api/grpc.go @@ -33,6 +33,8 @@ var ( methodGetNodeList = serviceName.NewMethod("GetNodeList", int64(0)) // methodStateToGenesis is the StateToGenesis method. methodStateToGenesis = serviceName.NewMethod("StateToGenesis", int64(0)) + // methodGetEvents is the GetEvents method. + methodGetEvents = serviceName.NewMethod("GetEvents", int64(0)) // methodWatchEntities is the WatchEntities method. methodWatchEntities = serviceName.NewMethod("WatchEntities", nil) @@ -84,6 +86,10 @@ var ( MethodName: methodStateToGenesis.ShortName(), Handler: handlerStateToGenesis, }, + { + MethodName: methodGetEvents.ShortName(), + Handler: handlerGetEvents, + }, }, Streams: []grpc.StreamDesc{ { @@ -317,6 +323,29 @@ func handlerStateToGenesis( // nolint: golint return interceptor(ctx, height, info, handler) } +func handlerGetEvents( // nolint: golint + srv interface{}, + ctx context.Context, + dec func(interface{}) error, + interceptor grpc.UnaryServerInterceptor, +) (interface{}, error) { + var height int64 + if err := dec(&height); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(Backend).GetEvents(ctx, height) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: methodGetEvents.FullName(), + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(Backend).GetEvents(ctx, req.(int64)) + } + return interceptor(ctx, height, info, handler) +} + func handlerWatchEntities(srv interface{}, stream grpc.ServerStream) error { if err := stream.RecvMsg(nil); err != nil { return err @@ -650,6 +679,14 @@ func (c *registryClient) StateToGenesis(ctx context.Context, height int64) (*Gen return &rsp, nil } +func (c *registryClient) GetEvents(ctx context.Context, height int64) (*[]Event, error) { + var rsp *[]Event + if err := c.conn.Invoke(ctx, methodGetEvents.FullName(), height, &rsp); err != nil { + return nil, err + } + return rsp, nil +} + func (c *registryClient) Cleanup() { } diff --git a/go/roothash/api/api.go b/go/roothash/api/api.go index c7288654309..8eee49377f1 100644 --- a/go/roothash/api/api.go +++ b/go/roothash/api/api.go @@ -91,6 +91,9 @@ type Backend interface { // StateToGenesis returns the genesis state at specified block height. StateToGenesis(ctx context.Context, height int64) (*Genesis, error) + // GetEvents returns the events at specified block height. + GetEvents(ctx context.Context, height int64) (*[]Event, error) + // Cleanup cleans up the roothash backend. Cleanup() } diff --git a/go/staking/api/api.go b/go/staking/api/api.go index 6e7b29859d4..9bc914366b8 100644 --- a/go/staking/api/api.go +++ b/go/staking/api/api.go @@ -114,6 +114,9 @@ type Backend interface { // general balance. WatchEscrows(ctx context.Context) (<-chan *EscrowEvent, pubsub.ClosableSubscription, error) + // GetEvents returns the events at specified block height. + GetEvents(ctx context.Context, height int64) (*[]Event, error) + // Cleanup cleans up the backend. Cleanup() } @@ -151,6 +154,13 @@ type EscrowEvent struct { Reclaim *ReclaimEscrowEvent `json:"reclaim,omitempty"` } +// Event signifies a staking event, returned via GetEvents. +type Event struct { + TransferEvent *TransferEvent + BurnEvent *BurnEvent + EscrowEvent *EscrowEvent +} + // AddEscrowEvent is the event emitted when a balance is transfered into a escrow // balance. type AddEscrowEvent struct { diff --git a/go/staking/api/grpc.go b/go/staking/api/grpc.go index 600ff7017cf..8301763c615 100644 --- a/go/staking/api/grpc.go +++ b/go/staking/api/grpc.go @@ -35,6 +35,8 @@ var ( methodStateToGenesis = serviceName.NewMethod("StateToGenesis", int64(0)) // methodConsensusParameters is the ConsensusParameters method. methodConsensusParameters = serviceName.NewMethod("ConsensusParameters", int64(0)) + // methodGetEvents is the GetEvents method. + methodGetEvents = serviceName.NewMethod("GetEvents", int64(0)) // methodWatchTransfers is the WatchTransfers method. methodWatchTransfers = serviceName.NewMethod("WatchTransfers", nil) @@ -88,6 +90,10 @@ var ( MethodName: methodConsensusParameters.ShortName(), Handler: handlerConsensusParameters, }, + { + MethodName: methodGetEvents.ShortName(), + Handler: handlerGetEvents, + }, }, Streams: []grpc.StreamDesc{ { @@ -339,6 +345,29 @@ func handlerConsensusParameters( // nolint: golint return interceptor(ctx, height, info, handler) } +func handlerGetEvents( // nolint: golint + srv interface{}, + ctx context.Context, + dec func(interface{}) error, + interceptor grpc.UnaryServerInterceptor, +) (interface{}, error) { + var height int64 + if err := dec(&height); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(Backend).GetEvents(ctx, height) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: methodGetEvents.FullName(), + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(Backend).GetEvents(ctx, req.(int64)) + } + return interceptor(ctx, height, info, handler) +} + func handlerWatchTransfers(srv interface{}, stream grpc.ServerStream) error { if err := stream.RecvMsg(nil); err != nil { return err @@ -512,6 +541,14 @@ func (c *stakingClient) ConsensusParameters(ctx context.Context, height int64) ( return &rsp, nil } +func (c *stakingClient) GetEvents(ctx context.Context, height int64) (*[]Event, error) { + var rsp *[]Event + if err := c.conn.Invoke(ctx, methodGetEvents.FullName(), height, &rsp); err != nil { + return nil, err + } + return rsp, nil +} + func (c *stakingClient) WatchTransfers(ctx context.Context) (<-chan *TransferEvent, pubsub.ClosableSubscription, error) { ctx, sub := pubsub.NewContextSubscription(ctx) From 193d8a2ff5bae9416f047e6bc7a35665f4769383 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrej=20Buko=C5=A1ek?= Date: Tue, 14 Apr 2020 13:35:35 +0200 Subject: [PATCH 2/9] Add changelog entry for GetEvents --- .changelog/2778.feature.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changelog/2778.feature.md diff --git a/.changelog/2778.feature.md b/.changelog/2778.feature.md new file mode 100644 index 00000000000..ca435e18a0e --- /dev/null +++ b/.changelog/2778.feature.md @@ -0,0 +1,6 @@ +Add `GetEvents` to backends + +The new `GetEvents` call returns all events at a specific height, +without having to watch for them using the `Watch*` methods. +It is currently implemented for the registry, roothash, and staking +backends. From ae33c33ac98abfe6b6f176e6d7713d7b6e43e4c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrej=20Buko=C5=A1ek?= Date: Tue, 14 Apr 2020 13:32:40 +0200 Subject: [PATCH 3/9] Add tests for GetEvents --- go/consensus/tendermint/registry/registry.go | 6 +- go/consensus/tendermint/roothash/roothash.go | 6 +- go/consensus/tendermint/staking/staking.go | 6 +- go/registry/tests/tester.go | 71 +++++++++++++++++++ go/roothash/tests/tester.go | 5 ++ go/staking/tests/tester.go | 72 +++++++++++++++++++- 6 files changed, 156 insertions(+), 10 deletions(-) diff --git a/go/consensus/tendermint/registry/registry.go b/go/consensus/tendermint/registry/registry.go index 8e2355f01ff..a05e77f4c55 100644 --- a/go/consensus/tendermint/registry/registry.go +++ b/go/consensus/tendermint/registry/registry.go @@ -165,9 +165,9 @@ func (tb *tendermintBackend) GetEvents(ctx context.Context, height int64) (*[]ap } // Decode events from block results. - tmEvents := append(results.Results.BeginBlock.GetEvents(), results.Results.EndBlock.GetEvents()...) - for _, txResults := range results.Results.DeliverTx { - tmEvents = append(tmEvents, txResults.GetEvents()...) + tmEvents := append(results.BeginBlockEvents, results.EndBlockEvents...) + for _, txResults := range results.TxsResults { + tmEvents = append(tmEvents, txResults.Events...) } events := []api.Event{} for _, tmEv := range tmEvents { diff --git a/go/consensus/tendermint/roothash/roothash.go b/go/consensus/tendermint/roothash/roothash.go index 1bcc89d38cc..471b623bf24 100644 --- a/go/consensus/tendermint/roothash/roothash.go +++ b/go/consensus/tendermint/roothash/roothash.go @@ -209,9 +209,9 @@ func (tb *tendermintBackend) GetEvents(ctx context.Context, height int64) (*[]ap } // Decode events from block results. - tmEvents := append(results.Results.BeginBlock.GetEvents(), results.Results.EndBlock.GetEvents()...) - for _, txResults := range results.Results.DeliverTx { - tmEvents = append(tmEvents, txResults.GetEvents()...) + tmEvents := append(results.BeginBlockEvents, results.EndBlockEvents...) + for _, txResults := range results.TxsResults { + tmEvents = append(tmEvents, txResults.Events...) } events := []api.Event{} for _, tmEv := range tmEvents { diff --git a/go/consensus/tendermint/staking/staking.go b/go/consensus/tendermint/staking/staking.go index 4b7ca70bdb1..bb83a6508f8 100644 --- a/go/consensus/tendermint/staking/staking.go +++ b/go/consensus/tendermint/staking/staking.go @@ -156,9 +156,9 @@ func (tb *tendermintBackend) GetEvents(ctx context.Context, height int64) (*[]ap } // Decode events from block results. - tmEvents := append(results.Results.BeginBlock.GetEvents(), results.Results.EndBlock.GetEvents()...) - for _, txResults := range results.Results.DeliverTx { - tmEvents = append(tmEvents, txResults.GetEvents()...) + tmEvents := append(results.BeginBlockEvents, results.EndBlockEvents...) + for _, txResults := range results.TxsResults { + tmEvents = append(tmEvents, txResults.Events...) } events := []api.Event{} for _, tmEv := range tmEvents { diff --git a/go/registry/tests/tester.go b/go/registry/tests/tester.go index 9069f383cd9..aabd7e950bb 100644 --- a/go/registry/tests/tester.go +++ b/go/registry/tests/tester.go @@ -85,6 +85,20 @@ func testRegistryEntityNodes( // nolint: gocyclo case ev := <-entityCh: require.EqualValues(v.Entity, ev.Entity, "registered entity") require.True(ev.IsRegistration, "event is registration") + + // Make sure that GetEvents also returns the registration event. + evts, grr := backend.GetEvents(context.Background(), consensusAPI.HeightLatest) + require.NoError(grr, "GetEvents") + var gotIt bool + for _, evt := range *evts { + if evt.EntityEvent != nil { + if evt.EntityEvent.Entity.ID.Equal(ev.Entity.ID) && evt.EntityEvent.IsRegistration { + gotIt = true + break + } + } + } + require.EqualValues(true, gotIt, "GetEvents should return entity registration event") case <-time.After(recvTimeout): t.Fatalf("failed to receive entity registration event") } @@ -167,6 +181,20 @@ func testRegistryEntityNodes( // nolint: gocyclo case ev := <-nodeCh: require.EqualValues(tn.Node, ev.Node, "registered node") require.True(ev.IsRegistration, "event is registration") + + // Make sure that GetEvents also returns the registration event. + evts, grr := backend.GetEvents(context.Background(), consensusAPI.HeightLatest) + require.NoError(grr, "GetEvents") + var gotIt bool + for _, evt := range *evts { + if evt.NodeEvent != nil { + if evt.NodeEvent.Node.ID.Equal(tn.Node.ID) && evt.NodeEvent.IsRegistration { + gotIt = true + break + } + } + } + require.EqualValues(true, gotIt, "GetEvents should return node registration event") case <-time.After(recvTimeout): t.Fatalf("failed to receive node registration event") } @@ -356,6 +384,20 @@ func testRegistryEntityNodes( // nolint: gocyclo case ev := <-entityCh: require.EqualValues(entities[0].Entity, ev.Entity, "deregistered entity") require.False(ev.IsRegistration, "event is deregistration") + + // Make sure that GetEvents also returns the deregistration event. + evts, err := backend.GetEvents(context.Background(), consensusAPI.HeightLatest) + require.NoError(err, "GetEvents") + var gotIt bool + for _, evt := range *evts { + if evt.EntityEvent != nil { + if evt.EntityEvent.Entity.ID.Equal(ev.Entity.ID) && !evt.EntityEvent.IsRegistration { + gotIt = true + break + } + } + } + require.EqualValues(true, gotIt, "GetEvents should return entity deregistration event") case <-time.After(recvTimeout): t.Fatalf("failed to receive entity deregistration event") } @@ -379,6 +421,20 @@ func testRegistryEntityNodes( // nolint: gocyclo case ev := <-entityCh: require.EqualValues(v.Entity, ev.Entity, "deregistered entity") require.False(ev.IsRegistration, "event is deregistration") + + // Make sure that GetEvents also returns the deregistration event. + evts, err := backend.GetEvents(context.Background(), consensusAPI.HeightLatest) + require.NoError(err, "GetEvents") + var gotIt bool + for _, evt := range *evts { + if evt.EntityEvent != nil { + if evt.EntityEvent.Entity.ID.Equal(ev.Entity.ID) && !evt.EntityEvent.IsRegistration { + gotIt = true + break + } + } + } + require.EqualValues(true, gotIt, "GetEvents should return entity deregistration event") case <-time.After(recvTimeout): t.Fatalf("failed to receive entity deregistration event") } @@ -1040,6 +1096,21 @@ func (rt *TestRuntime) MustRegister(t *testing.T, backend api.Backend, consensus if seen > 0 || !rt.didRegister { require.EqualValues(rt.Runtime, v, "registered runtime") rt.didRegister = true + + // Make sure that GetEvents also returns the registration event. + evts, err := backend.GetEvents(context.Background(), consensusAPI.HeightLatest) + require.NoError(err, "GetEvents") + var gotIt bool + for _, evt := range *evts { + if evt.RuntimeEvent != nil { + if evt.RuntimeEvent.Runtime.ID.Equal(&v.ID) { + gotIt = true + break + } + } + } + require.EqualValues(true, gotIt, "GetEvents should return runtime registration event") + return } seen++ diff --git a/go/roothash/tests/tester.go b/go/roothash/tests/tester.go index 0a7eec445c7..db7dc982c70 100644 --- a/go/roothash/tests/tester.go +++ b/go/roothash/tests/tester.go @@ -377,6 +377,11 @@ func (s *runtimeState) testSuccessfulRound(t *testing.T, backend api.Backend, co require.EqualValues(parent.Header.IORoot, header.IORoot, "block I/O root") require.EqualValues(parent.Header.StateRoot, header.StateRoot, "block root hash") + // There should be no discrepancy events. + evts, err := backend.GetEvents(ctx, consensusAPI.HeightLatest) + require.NoError(err, "GetEvents") + require.EqualValues(0, len(*evts), "should have no discrepancy events") + // Nothing more to do after the block was received. return case <-time.After(recvTimeout): diff --git a/go/staking/tests/tester.go b/go/staking/tests/tester.go index 0525c86ae42..42d86161498 100644 --- a/go/staking/tests/tester.go +++ b/go/staking/tests/tester.go @@ -166,6 +166,20 @@ func testTransfer(t *testing.T, backend api.Backend, consensus consensusAPI.Back require.Equal(SrcID, ev.From, "Event: from") require.Equal(DestID, ev.To, "Event: to") require.Equal(xfer.Tokens, ev.Tokens, "Event: tokens") + + // Make sure that GetEvents also returns the transfer event. + evts, grr := backend.GetEvents(context.Background(), consensusAPI.HeightLatest) + require.NoError(grr, "GetEvents") + var gotIt bool + for _, evt := range *evts { + if evt.TransferEvent != nil { + if evt.TransferEvent.From.Equal(ev.From) && evt.TransferEvent.To.Equal(ev.To) && evt.TransferEvent.Tokens.Cmp(&ev.Tokens) == 0 { + gotIt = true + break + } + } + } + require.EqualValues(true, gotIt, "GetEvents should return transfer event") case <-time.After(recvTimeout): t.Fatalf("failed to receive transfer event") } @@ -256,6 +270,20 @@ func testBurn(t *testing.T, backend api.Backend, consensus consensusAPI.Backend) case ev := <-ch: require.Equal(SrcID, ev.Owner, "Event: owner") require.Equal(burn.Tokens, ev.Tokens, "Event: tokens") + + // Make sure that GetEvents also returns the burn event. + evts, grr := backend.GetEvents(context.Background(), consensusAPI.HeightLatest) + require.NoError(grr, "GetEvents") + var gotIt bool + for _, evt := range *evts { + if evt.BurnEvent != nil { + if evt.BurnEvent.Owner.Equal(ev.Owner) && evt.BurnEvent.Tokens.Cmp(&ev.Tokens) == 0 { + gotIt = true + break + } + } + } + require.EqualValues(true, gotIt, "GetEvents should return burn event") case <-time.After(recvTimeout): t.Fatalf("failed to receive burn event") } @@ -280,7 +308,7 @@ func testSelfEscrow(t *testing.T, backend api.Backend, consensus consensusAPI.Ba testEscrowEx(t, backend, consensus, SrcID, srcSigner, SrcID) } -func testEscrowEx( +func testEscrowEx( // nolint: gocyclo t *testing.T, backend api.Backend, consensus consensusAPI.Backend, @@ -326,6 +354,20 @@ func testEscrowEx( require.Equal(srcID, ev.Owner, "Event: owner") require.Equal(dstID, ev.Escrow, "Event: escrow") require.Equal(escrow.Tokens, ev.Tokens, "Event: tokens") + + // Make sure that GetEvents also returns the add escrow event. + evts, grr := backend.GetEvents(context.Background(), consensusAPI.HeightLatest) + require.NoError(grr, "GetEvents") + var gotIt bool + for _, evt := range *evts { + if evt.EscrowEvent != nil && evt.EscrowEvent.Add != nil { + if evt.EscrowEvent.Add.Owner.Equal(ev.Owner) && evt.EscrowEvent.Add.Escrow.Equal(ev.Escrow) && evt.EscrowEvent.Add.Tokens.Cmp(&ev.Tokens) == 0 { + gotIt = true + break + } + } + } + require.EqualValues(true, gotIt, "GetEvents should return add escrow event") case <-time.After(recvTimeout): t.Fatalf("failed to receive escrow event") } @@ -374,6 +416,20 @@ func testEscrowEx( require.Equal(srcID, ev.Owner, "Event: owner") require.Equal(dstID, ev.Escrow, "Event: escrow") require.Equal(escrow.Tokens, ev.Tokens, "Event: tokens") + + // Make sure that GetEvents also returns the add escrow event. + evts, grr := backend.GetEvents(context.Background(), consensusAPI.HeightLatest) + require.NoError(grr, "GetEvents") + var gotIt bool + for _, evt := range *evts { + if evt.EscrowEvent != nil && evt.EscrowEvent.Add != nil { + if evt.EscrowEvent.Add.Owner.Equal(ev.Owner) && evt.EscrowEvent.Add.Escrow.Equal(ev.Escrow) && evt.EscrowEvent.Add.Tokens.Cmp(&ev.Tokens) == 0 { + gotIt = true + break + } + } + } + require.EqualValues(true, gotIt, "GetEvents should return add escrow event") case <-time.After(recvTimeout): t.Fatalf("failed to receive escrow event") } @@ -436,6 +492,20 @@ func testEscrowEx( require.Equal(srcID, ev.Owner, "Event: owner") require.Equal(dstID, ev.Escrow, "Event: escrow") require.Equal(&totalEscrowed, &ev.Tokens, "Event: tokens") + + // Make sure that GetEvents also returns the reclaim escrow event. + evts, grr := backend.GetEvents(context.Background(), consensusAPI.HeightLatest) + require.NoError(grr, "GetEvents") + var gotIt bool + for _, evt := range *evts { + if evt.EscrowEvent != nil && evt.EscrowEvent.Reclaim != nil { + if evt.EscrowEvent.Reclaim.Owner.Equal(ev.Owner) && evt.EscrowEvent.Reclaim.Escrow.Equal(ev.Escrow) && evt.EscrowEvent.Reclaim.Tokens.Cmp(&ev.Tokens) == 0 { + gotIt = true + break + } + } + } + require.EqualValues(true, gotIt, "GetEvents should return reclaim escrow event") case <-time.After(recvTimeout): t.Fatalf("failed to receive reclaim escrow event") } From be379714ef5bca1ff8e2eb7f0c4d2623e39210c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrej=20Buko=C5=A1ek?= Date: Tue, 14 Apr 2020 14:22:29 +0200 Subject: [PATCH 4/9] Handle latest block height properly in GetBlockResults --- go/consensus/tendermint/registry/registry.go | 2 +- go/consensus/tendermint/roothash/roothash.go | 4 ++-- go/consensus/tendermint/service/service.go | 2 +- go/consensus/tendermint/staking/staking.go | 2 +- go/consensus/tendermint/tendermint.go | 17 +++++++++++++++-- 5 files changed, 20 insertions(+), 7 deletions(-) diff --git a/go/consensus/tendermint/registry/registry.go b/go/consensus/tendermint/registry/registry.go index a05e77f4c55..3b8e5fef49f 100644 --- a/go/consensus/tendermint/registry/registry.go +++ b/go/consensus/tendermint/registry/registry.go @@ -155,7 +155,7 @@ func (tb *tendermintBackend) StateToGenesis(ctx context.Context, height int64) ( func (tb *tendermintBackend) GetEvents(ctx context.Context, height int64) (*[]api.Event, error) { // Get block results at given height. var results *tmrpctypes.ResultBlockResults - results, err := tb.service.GetBlockResults(&height) + results, err := tb.service.GetBlockResults(height) if err != nil { tb.logger.Error("failed to get tendermint block results", "err", err, diff --git a/go/consensus/tendermint/roothash/roothash.go b/go/consensus/tendermint/roothash/roothash.go index 471b623bf24..65b1776dab1 100644 --- a/go/consensus/tendermint/roothash/roothash.go +++ b/go/consensus/tendermint/roothash/roothash.go @@ -199,7 +199,7 @@ func (tb *tendermintBackend) StateToGenesis(ctx context.Context, height int64) ( func (tb *tendermintBackend) GetEvents(ctx context.Context, height int64) (*[]api.Event, error) { // Get block results at given height. var results *tmrpctypes.ResultBlockResults - results, err := tb.service.GetBlockResults(&height) + results, err := tb.service.GetBlockResults(height) if err != nil { tb.logger.Error("failed to get tendermint block results", "err", err, @@ -300,7 +300,7 @@ func (tb *tendermintBackend) reindexBlocks(bh api.BlockHistory) error { // TODO: Take prune strategy into account (e.g., skip heights). for height := lastHeight + 1; height <= currentBlk.Height; height++ { var results *tmrpctypes.ResultBlockResults - results, err = tb.service.GetBlockResults(&height) + results, err = tb.service.GetBlockResults(height) if err != nil { tb.logger.Error("failed to get tendermint block", "err", err, diff --git a/go/consensus/tendermint/service/service.go b/go/consensus/tendermint/service/service.go index cf07963756b..a7a0729681e 100644 --- a/go/consensus/tendermint/service/service.go +++ b/go/consensus/tendermint/service/service.go @@ -44,7 +44,7 @@ type TendermintService interface { // GetBlockResults returns the ABCI results from processing a block // at a specific height. - GetBlockResults(height *int64) (*tmrpctypes.ResultBlockResults, error) + GetBlockResults(height int64) (*tmrpctypes.ResultBlockResults, error) // WatchTendermintBlocks returns a stream of Tendermint blocks as they are // returned via the `EventDataNewBlock` query. diff --git a/go/consensus/tendermint/staking/staking.go b/go/consensus/tendermint/staking/staking.go index bb83a6508f8..c24d296eeea 100644 --- a/go/consensus/tendermint/staking/staking.go +++ b/go/consensus/tendermint/staking/staking.go @@ -146,7 +146,7 @@ func (tb *tendermintBackend) StateToGenesis(ctx context.Context, height int64) ( func (tb *tendermintBackend) GetEvents(ctx context.Context, height int64) (*[]api.Event, error) { // Get block results at given height. var results *tmrpctypes.ResultBlockResults - results, err := tb.service.GetBlockResults(&height) + results, err := tb.service.GetBlockResults(height) if err != nil { tb.logger.Error("failed to get tendermint block results", "err", err, diff --git a/go/consensus/tendermint/tendermint.go b/go/consensus/tendermint/tendermint.go index be666161dfd..653b3f6d6e8 100644 --- a/go/consensus/tendermint/tendermint.go +++ b/go/consensus/tendermint/tendermint.go @@ -835,12 +835,25 @@ func (t *tendermintService) GetHeight(ctx context.Context) (int64, error) { return blk.Header.Height, nil } -func (t *tendermintService) GetBlockResults(height *int64) (*tmrpctypes.ResultBlockResults, error) { +func (t *tendermintService) GetBlockResults(height int64) (*tmrpctypes.ResultBlockResults, error) { if t.client == nil { panic("client not available yet") } - result, err := t.client.BlockResults(height) + // As in GetTendermintBlock above, get the latest tendermint block height + // from our mux. + var tmHeight int64 + if height == consensusAPI.HeightLatest { + tmHeight = t.mux.BlockHeight() + if tmHeight == 0 { + // No committed blocks yet. + return nil, nil + } + } else { + tmHeight = height + } + + result, err := t.client.BlockResults(&tmHeight) if err != nil { return nil, fmt.Errorf("tendermint: block results query failed: %w", err) } From 80c44f7a83a551220a7e25b10a161223ce0f5c62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrej=20Buko=C5=A1ek?= Date: Tue, 14 Apr 2020 15:07:58 +0200 Subject: [PATCH 5/9] txsource: Add GetEvents to workload --- go/oasis-node/cmd/debug/txsource/workload/queries.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/go/oasis-node/cmd/debug/txsource/workload/queries.go b/go/oasis-node/cmd/debug/txsource/workload/queries.go index 88c3238b562..b2345d72055 100644 --- a/go/oasis-node/cmd/debug/txsource/workload/queries.go +++ b/go/oasis-node/cmd/debug/txsource/workload/queries.go @@ -232,6 +232,12 @@ func (q *queries) doRegistryQueries(ctx context.Context, rng *rand.Rand, height } } + // Events. + _, err = q.registry.GetEvents(ctx, height) + if err != nil { + return fmt.Errorf("GetEvents error at height %d: %w", height, err) + } + q.logger.Debug("Done registry queries", "height", height, ) @@ -316,6 +322,12 @@ func (q *queries) doStakingQueries(ctx context.Context, rng *rand.Rand, height i return fmt.Errorf("staking total supply mismatch") } + // Events. + _, grr := q.staking.GetEvents(ctx, height) + if grr != nil { + return fmt.Errorf("GetEvents error at height %d: %w", height, grr) + } + q.logger.Debug("Done staking queries", "height", height, "total", total, From f94b2f8932a58eb6fce1253d213c19327a0be190 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrej=20Buko=C5=A1ek?= Date: Wed, 15 Apr 2020 09:31:46 +0200 Subject: [PATCH 6/9] {registry,staking}: Unify event decoding with onABCIEvents --- go/consensus/tendermint/registry/registry.go | 228 +++++++++---------- go/consensus/tendermint/staking/staking.go | 182 +++++++-------- 2 files changed, 192 insertions(+), 218 deletions(-) diff --git a/go/consensus/tendermint/registry/registry.go b/go/consensus/tendermint/registry/registry.go index 3b8e5fef49f..a185266ca83 100644 --- a/go/consensus/tendermint/registry/registry.go +++ b/go/consensus/tendermint/registry/registry.go @@ -169,94 +169,7 @@ func (tb *tendermintBackend) GetEvents(ctx context.Context, height int64) (*[]ap for _, txResults := range results.TxsResults { tmEvents = append(tmEvents, txResults.Events...) } - events := []api.Event{} - for _, tmEv := range tmEvents { - // Ignore events that don't relate to the registry app. - if tmEv.GetType() != app.EventType { - continue - } - - for _, pair := range tmEv.GetAttributes() { - key := pair.GetKey() - val := pair.GetValue() - if bytes.Equal(key, app.KeyRuntimeRegistered) { - // Runtime registered event. - var rt api.Runtime - if err := cbor.Unmarshal(val, &rt); err != nil { - return nil, errors.Wrap(err, "registry: corrupt RuntimeRegistered event") - } - evt := api.Event{ - RuntimeEvent: &api.RuntimeEvent{Runtime: &rt}, - } - events = append(events, evt) - } else if bytes.Equal(key, app.KeyEntityRegistered) { - // Entity registered event. - var ent entity.Entity - if err := cbor.Unmarshal(val, &ent); err != nil { - return nil, errors.Wrap(err, "registry: corrupt EntityRegistered event") - } - evt := api.Event{ - EntityEvent: &api.EntityEvent{ - Entity: &ent, - IsRegistration: true, - }, - } - events = append(events, evt) - } else if bytes.Equal(key, app.KeyEntityDeregistered) { - // Entity deregistered event. - var dereg app.EntityDeregistration - if err := cbor.Unmarshal(val, &dereg); err != nil { - return nil, errors.Wrap(err, "registry: corrupt EntityDeregistered event") - } - evt := api.Event{ - EntityEvent: &api.EntityEvent{ - Entity: &dereg.Entity, - IsRegistration: false, - }, - } - events = append(events, evt) - } else if bytes.Equal(key, app.KeyNodeRegistered) { - // Node registered event. - var n node.Node - if err := cbor.Unmarshal(val, &n); err != nil { - return nil, errors.Wrap(err, "registry: corrupt NodeRegistered event") - } - evt := api.Event{ - NodeEvent: &api.NodeEvent{ - Node: &n, - IsRegistration: true, - }, - } - events = append(events, evt) - } else if bytes.Equal(key, app.KeyNodesExpired) { - // Nodes expired event. - var nodes []*node.Node - if err := cbor.Unmarshal(val, &nodes); err != nil { - return nil, errors.Wrap(err, "registry: corrupt NodesExpired event") - } - evt := api.Event{ - NodesExpiredEvent: &api.NodesExpiredEvent{ - Nodes: nodes, - }, - } - events = append(events, evt) - } else if bytes.Equal(key, app.KeyNodeUnfrozen) { - // Node unfrozen event. - var nid signature.PublicKey - if err := cbor.Unmarshal(val, &nid); err != nil { - return nil, errors.Wrap(err, "registry: corrupt NodeUnfrozen event") - } - evt := api.Event{ - NodeUnfrozenEvent: &api.NodeUnfrozenEvent{ - NodeID: nid, - }, - } - events = append(events, evt) - } - } - } - - return &events, nil + return tb.onABCIEvents(ctx, tmEvents, height, false) } func (tb *tendermintBackend) worker(ctx context.Context) { @@ -298,72 +211,123 @@ func (tb *tendermintBackend) onEventDataNewBlock(ctx context.Context, ev tmtypes events := append([]abcitypes.Event{}, ev.ResultBeginBlock.GetEvents()...) events = append(events, ev.ResultEndBlock.GetEvents()...) - tb.onABCIEvents(ctx, events, ev.Block.Header.Height) + _, _ = tb.onABCIEvents(ctx, events, ev.Block.Header.Height, true) } func (tb *tendermintBackend) onEventDataTx(ctx context.Context, tx tmtypes.EventDataTx) { - tb.onABCIEvents(ctx, tx.Result.Events, tx.Height) + _, _ = tb.onABCIEvents(ctx, tx.Result.Events, tx.Height, true) } -func (tb *tendermintBackend) onABCIEvents(ctx context.Context, events []abcitypes.Event, height int64) { - for _, tmEv := range events { +func (tb *tendermintBackend) onABCIEvents(ctx context.Context, tmEvents []abcitypes.Event, height int64, doBroadcast bool) (*[]api.Event, error) { // nolint: gocyclo + events := []api.Event{} + for _, tmEv := range tmEvents { + // Ignore events that don't relate to the registry app. if tmEv.GetType() != app.EventType { continue } for _, pair := range tmEv.GetAttributes() { - if bytes.Equal(pair.GetKey(), app.KeyNodesExpired) { + key := pair.GetKey() + val := pair.GetValue() + if bytes.Equal(key, app.KeyNodesExpired) { + // Nodes expired event. var nodes []*node.Node - if err := cbor.Unmarshal(pair.GetValue(), &nodes); err != nil { + if err := cbor.Unmarshal(val, &nodes); err != nil { tb.logger.Error("worker: failed to get nodes from tag", "err", err, ) + if !doBroadcast { + return nil, errors.Wrap(err, "registry: corrupt NodesExpired event") + } } - for _, node := range nodes { - tb.nodeNotifier.Broadcast(&api.NodeEvent{ - Node: node, - IsRegistration: false, - }) + if doBroadcast { + for _, node := range nodes { + tb.nodeNotifier.Broadcast(&api.NodeEvent{ + Node: node, + IsRegistration: false, + }) + } + } else { + evt := api.Event{ + NodesExpiredEvent: &api.NodesExpiredEvent{ + Nodes: nodes, + }, + } + events = append(events, evt) } - } else if bytes.Equal(pair.GetKey(), app.KeyRuntimeRegistered) { + } else if bytes.Equal(key, app.KeyRuntimeRegistered) { + // Runtime registered event. var rt api.Runtime - if err := cbor.Unmarshal(pair.GetValue(), &rt); err != nil { + if err := cbor.Unmarshal(val, &rt); err != nil { tb.logger.Error("worker: failed to get runtime from tag", "err", err, ) - continue + if doBroadcast { + continue + } else { + return nil, errors.Wrap(err, "registry: corrupt RuntimeRegistered event") + } } - tb.runtimeNotifier.Broadcast(&rt) - } else if bytes.Equal(pair.GetKey(), app.KeyEntityRegistered) { + if doBroadcast { + tb.runtimeNotifier.Broadcast(&rt) + } else { + evt := api.Event{ + RuntimeEvent: &api.RuntimeEvent{Runtime: &rt}, + } + events = append(events, evt) + } + } else if bytes.Equal(key, app.KeyEntityRegistered) { + // Entity registered event. var ent entity.Entity - if err := cbor.Unmarshal(pair.GetValue(), &ent); err != nil { + if err := cbor.Unmarshal(val, &ent); err != nil { tb.logger.Error("worker: failed to get entity from tag", "err", err, ) - continue + if doBroadcast { + continue + } else { + return nil, errors.Wrap(err, "registry: corrupt EntityRegistered event") + } } - tb.entityNotifier.Broadcast(&api.EntityEvent{ + eev := &api.EntityEvent{ Entity: &ent, IsRegistration: true, - }) - } else if bytes.Equal(pair.GetKey(), app.KeyEntityDeregistered) { + } + + if doBroadcast { + tb.entityNotifier.Broadcast(eev) + } else { + events = append(events, api.Event{EntityEvent: eev}) + } + } else if bytes.Equal(key, app.KeyEntityDeregistered) { + // Entity deregistered event. var dereg app.EntityDeregistration - if err := cbor.Unmarshal(pair.GetValue(), &dereg); err != nil { + if err := cbor.Unmarshal(val, &dereg); err != nil { tb.logger.Error("worker: failed to get entity deregistration from tag", "err", err, ) - continue + if doBroadcast { + continue + } else { + return nil, errors.Wrap(err, "registry: corrupt EntityDeregistered event") + } } - // Entity deregistration. - tb.entityNotifier.Broadcast(&api.EntityEvent{ + eev := &api.EntityEvent{ Entity: &dereg.Entity, IsRegistration: false, - }) - } else if bytes.Equal(pair.GetKey(), app.KeyRegistryNodeListEpoch) { + } + + if doBroadcast { + tb.entityNotifier.Broadcast(eev) + } else { + events = append(events, api.Event{EntityEvent: eev}) + } + } else if bytes.Equal(key, app.KeyRegistryNodeListEpoch) && doBroadcast { + // Node list epoch event. nl, err := tb.getNodeList(ctx, height) if err != nil { tb.logger.Error("worker: failed to get node list", @@ -373,22 +337,46 @@ func (tb *tendermintBackend) onABCIEvents(ctx context.Context, events []abcitype continue } tb.nodeListNotifier.Broadcast(nl) - } else if bytes.Equal(pair.GetKey(), app.KeyNodeRegistered) { + } else if bytes.Equal(key, app.KeyNodeRegistered) { + // Node registered event. var n node.Node - if err := cbor.Unmarshal(pair.GetValue(), &n); err != nil { + if err := cbor.Unmarshal(val, &n); err != nil { tb.logger.Error("worker: failed to get node from tag", "err", err, ) - continue + if doBroadcast { + continue + } else { + return nil, errors.Wrap(err, "registry: corrupt NodeRegistered event") + } } - tb.nodeNotifier.Broadcast(&api.NodeEvent{ + nev := &api.NodeEvent{ Node: &n, IsRegistration: true, - }) + } + + if doBroadcast { + tb.nodeNotifier.Broadcast(nev) + } else { + events = append(events, api.Event{NodeEvent: nev}) + } + } else if bytes.Equal(key, app.KeyNodeUnfrozen) && !doBroadcast { + // Node unfrozen event. + var nid signature.PublicKey + if err := cbor.Unmarshal(val, &nid); err != nil { + return nil, errors.Wrap(err, "registry: corrupt NodeUnfrozen event") + } + evt := api.Event{ + NodeUnfrozenEvent: &api.NodeUnfrozenEvent{ + NodeID: nid, + }, + } + events = append(events, evt) } } } + return &events, nil } func (tb *tendermintBackend) getNodeList(ctx context.Context, height int64) (*api.NodeList, error) { diff --git a/go/consensus/tendermint/staking/staking.go b/go/consensus/tendermint/staking/staking.go index c24d296eeea..6de2fa3dce0 100644 --- a/go/consensus/tendermint/staking/staking.go +++ b/go/consensus/tendermint/staking/staking.go @@ -160,77 +160,7 @@ func (tb *tendermintBackend) GetEvents(ctx context.Context, height int64) (*[]ap for _, txResults := range results.TxsResults { tmEvents = append(tmEvents, txResults.Events...) } - events := []api.Event{} - for _, tmEv := range tmEvents { - // Ignore events that don't relate to the staking app. - if tmEv.GetType() != app.EventType { - continue - } - - for _, pair := range tmEv.GetAttributes() { - key := pair.GetKey() - val := pair.GetValue() - if bytes.Equal(key, app.KeyTransfer) { - // Transfer event. - var te api.TransferEvent - if err := cbor.Unmarshal(val, &te); err != nil { - return nil, errors.Wrap(err, "staking: corrupt Transfer event") - } - evt := api.Event{ - TransferEvent: &te, - } - events = append(events, evt) - } else if bytes.Equal(key, app.KeyBurn) { - // Burn event. - var be api.BurnEvent - if err := cbor.Unmarshal(val, &be); err != nil { - return nil, errors.Wrap(err, "staking: corrupt Burn event") - } - evt := api.Event{ - BurnEvent: &be, - } - events = append(events, evt) - } else if bytes.Equal(key, app.KeyAddEscrow) { - // Add escrow event. - var aee api.AddEscrowEvent - if err := cbor.Unmarshal(val, &aee); err != nil { - return nil, errors.Wrap(err, "staking: corrupt AddEscrow event") - } - evt := api.Event{ - EscrowEvent: &api.EscrowEvent{ - Add: &aee, - }, - } - events = append(events, evt) - } else if bytes.Equal(key, app.KeyTakeEscrow) { - // Take escrow event. - var tee api.TakeEscrowEvent - if err := cbor.Unmarshal(val, &tee); err != nil { - return nil, errors.Wrap(err, "staking: corrupt TakeEscrow event") - } - evt := api.Event{ - EscrowEvent: &api.EscrowEvent{ - Take: &tee, - }, - } - events = append(events, evt) - } else if bytes.Equal(key, app.KeyReclaimEscrow) { - // Reclaim escrow event. - var ree api.ReclaimEscrowEvent - if err := cbor.Unmarshal(val, &ree); err != nil { - return nil, errors.Wrap(err, "staking: corrupt ReclaimEscrow event") - } - evt := api.Event{ - EscrowEvent: &api.EscrowEvent{ - Reclaim: &ree, - }, - } - events = append(events, evt) - } - } - } - - return &events, nil + return tb.onABCIEvents(ctx, tmEvents, height, false) } func (tb *tendermintBackend) ConsensusParameters(ctx context.Context, height int64) (*api.ConsensusParameters, error) { @@ -284,73 +214,129 @@ func (tb *tendermintBackend) onEventDataNewBlock(ctx context.Context, ev tmtypes events := append([]abcitypes.Event{}, ev.ResultBeginBlock.GetEvents()...) events = append(events, ev.ResultEndBlock.GetEvents()...) - tb.onABCIEvents(ctx, events, ev.Block.Header.Height) + _, _ = tb.onABCIEvents(ctx, events, ev.Block.Header.Height, true) +} + +func (tb *tendermintBackend) onEventDataTx(ctx context.Context, tx tmtypes.EventDataTx) { + _, _ = tb.onABCIEvents(ctx, tx.Result.Events, tx.Height, true) } -func (tb *tendermintBackend) onABCIEvents(context context.Context, events []abcitypes.Event, height int64) { - for _, tmEv := range events { +func (tb *tendermintBackend) onABCIEvents(context context.Context, tmEvents []abcitypes.Event, height int64, doBroadcast bool) (*[]api.Event, error) { + events := []api.Event{} + for _, tmEv := range tmEvents { + // Ignore events that don't relate to the staking app. if tmEv.GetType() != app.EventType { continue } for _, pair := range tmEv.GetAttributes() { - if bytes.Equal(pair.GetKey(), app.KeyTakeEscrow) { + key := pair.GetKey() + val := pair.GetValue() + if bytes.Equal(key, app.KeyTakeEscrow) { + // Take escrow event. var e api.TakeEscrowEvent - if err := cbor.Unmarshal(pair.GetValue(), &e); err != nil { + if err := cbor.Unmarshal(val, &e); err != nil { tb.logger.Error("worker: failed to get take escrow event from tag", "err", err, ) - continue + if doBroadcast { + continue + } else { + return nil, errors.Wrap(err, "staking: corrupt TakeEscrow event") + } } - tb.escrowNotifier.Broadcast(&api.EscrowEvent{Take: &e}) - } else if bytes.Equal(pair.GetKey(), app.KeyTransfer) { + ee := &api.EscrowEvent{Take: &e} + + if doBroadcast { + tb.escrowNotifier.Broadcast(ee) + } else { + events = append(events, api.Event{EscrowEvent: ee}) + } + } else if bytes.Equal(key, app.KeyTransfer) { + // Transfer event. var e api.TransferEvent - if err := cbor.Unmarshal(pair.GetValue(), &e); err != nil { + if err := cbor.Unmarshal(val, &e); err != nil { tb.logger.Error("worker: failed to get transfer event from tag", "err", err, ) - continue + if doBroadcast { + continue + } else { + return nil, errors.Wrap(err, "staking: corrupt Transfer event") + } } - tb.transferNotifier.Broadcast(&e) - } else if bytes.Equal(pair.GetKey(), app.KeyReclaimEscrow) { + if doBroadcast { + tb.transferNotifier.Broadcast(&e) + } else { + events = append(events, api.Event{TransferEvent: &e}) + } + } else if bytes.Equal(key, app.KeyReclaimEscrow) { + // Reclaim escrow event. var e api.ReclaimEscrowEvent - if err := cbor.Unmarshal(pair.GetValue(), &e); err != nil { + if err := cbor.Unmarshal(val, &e); err != nil { tb.logger.Error("worker: failed to get reclaim escrow event from tag", "err", err, ) - continue + if doBroadcast { + continue + } else { + return nil, errors.Wrap(err, "staking: corrupt ReclaimEscrow event") + } } - tb.escrowNotifier.Broadcast(&api.EscrowEvent{Reclaim: &e}) - } else if bytes.Equal(pair.GetKey(), app.KeyAddEscrow) { + ee := &api.EscrowEvent{Reclaim: &e} + + if doBroadcast { + tb.escrowNotifier.Broadcast(ee) + } else { + events = append(events, api.Event{EscrowEvent: ee}) + } + } else if bytes.Equal(key, app.KeyAddEscrow) { + // Add escrow event. var e api.AddEscrowEvent - if err := cbor.Unmarshal(pair.GetValue(), &e); err != nil { + if err := cbor.Unmarshal(val, &e); err != nil { tb.logger.Error("worker: failed to get escrow event from tag", "err", err, ) - continue + if doBroadcast { + continue + } else { + return nil, errors.Wrap(err, "staking: corrupt AddEscrow event") + } } - tb.escrowNotifier.Broadcast(&api.EscrowEvent{Add: &e}) - } else if bytes.Equal(pair.GetKey(), app.KeyBurn) { + ee := &api.EscrowEvent{Add: &e} + + if doBroadcast { + tb.escrowNotifier.Broadcast(ee) + } else { + events = append(events, api.Event{EscrowEvent: ee}) + } + } else if bytes.Equal(key, app.KeyBurn) { + // Burn event. var e api.BurnEvent - if err := cbor.Unmarshal(pair.GetValue(), &e); err != nil { + if err := cbor.Unmarshal(val, &e); err != nil { tb.logger.Error("worker: failed to get burn event from tag", "err", err, ) - continue + if doBroadcast { + continue + } else { + return nil, errors.Wrap(err, "staking: corrupt Burn event") + } } - tb.burnNotifier.Broadcast(&e) + if doBroadcast { + tb.burnNotifier.Broadcast(&e) + } else { + events = append(events, api.Event{BurnEvent: &e}) + } } } } -} - -func (tb *tendermintBackend) onEventDataTx(ctx context.Context, tx tmtypes.EventDataTx) { - tb.onABCIEvents(ctx, tx.Result.Events, tx.Height) + return &events, nil } // New constructs a new tendermint backed staking Backend instance. From 055c3318d41b3d8a10f6de9cb80d36946f38e61f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrej=20Buko=C5=A1ek?= Date: Wed, 15 Apr 2020 09:59:56 +0200 Subject: [PATCH 7/9] registry: Add more tests for GetEvents --- go/consensus/tendermint/registry/registry.go | 10 +++++ go/registry/tests/tester.go | 42 ++++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/go/consensus/tendermint/registry/registry.go b/go/consensus/tendermint/registry/registry.go index a185266ca83..3728b4d7d8c 100644 --- a/go/consensus/tendermint/registry/registry.go +++ b/go/consensus/tendermint/registry/registry.go @@ -255,6 +255,16 @@ func (tb *tendermintBackend) onABCIEvents(ctx context.Context, tmEvents []abcity }, } events = append(events, evt) + + // For compatibility, we also emit NodeEvents with + // IsRegistration set to false, as in the broadcast case + // above. + for _, node := range nodes { + events = append(events, api.Event{NodeEvent: &api.NodeEvent{ + Node: node, + IsRegistration: false, + }}) + } } } else if bytes.Equal(key, app.KeyRuntimeRegistered) { // Runtime registered event. diff --git a/go/registry/tests/tester.go b/go/registry/tests/tester.go index aabd7e950bb..7b90d78cd1d 100644 --- a/go/registry/tests/tester.go +++ b/go/registry/tests/tester.go @@ -330,6 +330,20 @@ func testRegistryEntityNodes( // nolint: gocyclo case ev := <-nodeCh: require.False(ev.IsRegistration, "event is deregistration") deregisteredNodes[ev.Node.ID] = ev.Node + + // Make sure that GetEvents also returns the deregistration event. + evts, grr := backend.GetEvents(context.Background(), consensusAPI.HeightLatest) + require.NoError(grr, "GetEvents") + var gotIt bool + for _, evt := range *evts { + if evt.NodeEvent != nil { + if evt.NodeEvent.Node.ID.Equal(ev.Node.ID) && !evt.NodeEvent.IsRegistration { + gotIt = true + break + } + } + } + require.EqualValues(true, gotIt, "GetEvents should return node deregistration event") case <-time.After(recvTimeout): t.Fatalf("failed to receive node deregistration event") } @@ -1163,6 +1177,20 @@ func BulkPopulate(t *testing.T, backend api.Backend, consensus consensusAPI.Back case ev := <-entityCh: require.EqualValues(entity.Entity, ev.Entity, "registered entity") require.True(ev.IsRegistration, "event is registration") + + // Make sure that GetEvents also returns the registration event. + evts, grr := backend.GetEvents(context.Background(), consensusAPI.HeightLatest) + require.NoError(grr, "GetEvents") + var gotIt bool + for _, evt := range *evts { + if evt.EntityEvent != nil { + if evt.EntityEvent.Entity.ID.Equal(ev.Entity.ID) && evt.EntityEvent.IsRegistration { + gotIt = true + break + } + } + } + require.EqualValues(true, gotIt, "GetEvents should return entity registration event") case <-time.After(recvTimeout): t.Fatalf("failed to receive entity registration event") } @@ -1196,6 +1224,20 @@ func BulkPopulate(t *testing.T, backend api.Backend, consensus consensusAPI.Back case ev := <-nodeCh: require.EqualValues(node.Node, ev.Node, "registered node") require.True(ev.IsRegistration, "event is registration") + + // Make sure that GetEvents also returns the registration event. + evts, grr := backend.GetEvents(context.Background(), consensusAPI.HeightLatest) + require.NoError(grr, "GetEvents") + var gotIt bool + for _, evt := range *evts { + if evt.NodeEvent != nil { + if evt.NodeEvent.Node.ID.Equal(ev.Node.ID) && evt.NodeEvent.IsRegistration { + gotIt = true + break + } + } + } + require.EqualValues(true, gotIt, "GetEvents should return node registration event") case <-time.After(recvTimeout): t.Fatalf("failed to receive node registration event") } From d534837c973729db310041803f89919f753477e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrej=20Buko=C5=A1ek?= Date: Wed, 15 Apr 2020 10:31:23 +0200 Subject: [PATCH 8/9] Cleanup code --- go/consensus/tendermint/registry/registry.go | 28 +++++++++++--------- go/consensus/tendermint/roothash/roothash.go | 16 +++++------ go/consensus/tendermint/staking/staking.go | 21 +++++++-------- go/registry/api/api.go | 28 ++++++++++---------- go/registry/api/grpc.go | 4 +-- go/registry/tests/tester.go | 16 +++++------ go/roothash/api/api.go | 6 ++--- go/roothash/tests/tester.go | 2 +- go/staking/api/api.go | 8 +++--- go/staking/api/grpc.go | 4 +-- go/staking/tests/tester.go | 10 +++---- 11 files changed, 72 insertions(+), 71 deletions(-) diff --git a/go/consensus/tendermint/registry/registry.go b/go/consensus/tendermint/registry/registry.go index 3728b4d7d8c..e40eb812a40 100644 --- a/go/consensus/tendermint/registry/registry.go +++ b/go/consensus/tendermint/registry/registry.go @@ -4,9 +4,9 @@ package registry import ( "bytes" "context" + "fmt" "github.com/eapache/channels" - "github.com/pkg/errors" abcitypes "github.com/tendermint/tendermint/abci/types" tmrpctypes "github.com/tendermint/tendermint/rpc/core/types" tmtypes "github.com/tendermint/tendermint/types" @@ -152,7 +152,7 @@ func (tb *tendermintBackend) StateToGenesis(ctx context.Context, height int64) ( return q.Genesis(ctx) } -func (tb *tendermintBackend) GetEvents(ctx context.Context, height int64) (*[]api.Event, error) { +func (tb *tendermintBackend) GetEvents(ctx context.Context, height int64) ([]api.Event, error) { // Get block results at given height. var results *tmrpctypes.ResultBlockResults results, err := tb.service.GetBlockResults(height) @@ -218,8 +218,8 @@ func (tb *tendermintBackend) onEventDataTx(ctx context.Context, tx tmtypes.Event _, _ = tb.onABCIEvents(ctx, tx.Result.Events, tx.Height, true) } -func (tb *tendermintBackend) onABCIEvents(ctx context.Context, tmEvents []abcitypes.Event, height int64, doBroadcast bool) (*[]api.Event, error) { // nolint: gocyclo - events := []api.Event{} +func (tb *tendermintBackend) onABCIEvents(ctx context.Context, tmEvents []abcitypes.Event, height int64, doBroadcast bool) ([]api.Event, error) { // nolint: gocyclo + var events []api.Event for _, tmEv := range tmEvents { // Ignore events that don't relate to the registry app. if tmEv.GetType() != app.EventType { @@ -236,8 +236,10 @@ func (tb *tendermintBackend) onABCIEvents(ctx context.Context, tmEvents []abcity tb.logger.Error("worker: failed to get nodes from tag", "err", err, ) - if !doBroadcast { - return nil, errors.Wrap(err, "registry: corrupt NodesExpired event") + if doBroadcast { + continue + } else { + return nil, fmt.Errorf("registry: corrupt NodesExpired event: %w", err) } } @@ -276,7 +278,7 @@ func (tb *tendermintBackend) onABCIEvents(ctx context.Context, tmEvents []abcity if doBroadcast { continue } else { - return nil, errors.Wrap(err, "registry: corrupt RuntimeRegistered event") + return nil, fmt.Errorf("registry: corrupt RuntimeRegistered event: %w", err) } } @@ -298,7 +300,7 @@ func (tb *tendermintBackend) onABCIEvents(ctx context.Context, tmEvents []abcity if doBroadcast { continue } else { - return nil, errors.Wrap(err, "registry: corrupt EntityRegistered event") + return nil, fmt.Errorf("registry: corrupt EntityRegistered event: %w", err) } } @@ -322,7 +324,7 @@ func (tb *tendermintBackend) onABCIEvents(ctx context.Context, tmEvents []abcity if doBroadcast { continue } else { - return nil, errors.Wrap(err, "registry: corrupt EntityDeregistered event") + return nil, fmt.Errorf("registry: corrupt EntityDeregistered event: %w", err) } } @@ -357,7 +359,7 @@ func (tb *tendermintBackend) onABCIEvents(ctx context.Context, tmEvents []abcity if doBroadcast { continue } else { - return nil, errors.Wrap(err, "registry: corrupt NodeRegistered event") + return nil, fmt.Errorf("registry: corrupt NodeRegistered event: %w", err) } } @@ -375,7 +377,7 @@ func (tb *tendermintBackend) onABCIEvents(ctx context.Context, tmEvents []abcity // Node unfrozen event. var nid signature.PublicKey if err := cbor.Unmarshal(val, &nid); err != nil { - return nil, errors.Wrap(err, "registry: corrupt NodeUnfrozen event") + return nil, fmt.Errorf("registry: corrupt NodeUnfrozen event: %w", err) } evt := api.Event{ NodeUnfrozenEvent: &api.NodeUnfrozenEvent{ @@ -386,7 +388,7 @@ func (tb *tendermintBackend) onABCIEvents(ctx context.Context, tmEvents []abcity } } } - return &events, nil + return events, nil } func (tb *tendermintBackend) getNodeList(ctx context.Context, height int64) (*api.NodeList, error) { @@ -398,7 +400,7 @@ func (tb *tendermintBackend) getNodeList(ctx context.Context, height int64) (*ap nodes, err := q.Nodes(ctx) if err != nil { - return nil, errors.Wrap(err, "registry: failed to query nodes") + return nil, fmt.Errorf("registry: failed to query nodes: %w", err) } api.SortNodeList(nodes) diff --git a/go/consensus/tendermint/roothash/roothash.go b/go/consensus/tendermint/roothash/roothash.go index 65b1776dab1..308a34f008d 100644 --- a/go/consensus/tendermint/roothash/roothash.go +++ b/go/consensus/tendermint/roothash/roothash.go @@ -4,11 +4,11 @@ package roothash import ( "bytes" "context" + "fmt" "math" "sync" "github.com/eapache/channels" - "github.com/pkg/errors" "github.com/tendermint/tendermint/abci/types" tmrpctypes "github.com/tendermint/tendermint/rpc/core/types" tmtypes "github.com/tendermint/tendermint/types" @@ -146,16 +146,16 @@ func (tb *tendermintBackend) WatchBlocks(id common.Namespace) (<-chan *api.Annot func (tb *tendermintBackend) getBlockFromFinalizedTag(ctx context.Context, rawValue []byte, height int64) (*block.Block, *app.ValueFinalized, error) { var value app.ValueFinalized if err := cbor.Unmarshal(rawValue, &value); err != nil { - return nil, nil, errors.Wrap(err, "roothash: corrupt finalized tag") + return nil, nil, fmt.Errorf("roothash: corrupt finalized tag: %w", err) } block, err := tb.getLatestBlockAt(ctx, value.ID, height) if err != nil { - return nil, nil, errors.Wrap(err, "roothash: failed to fetch block") + return nil, nil, fmt.Errorf("roothash: failed to fetch block: %w", err) } if block.Header.Round != value.Round { - return nil, nil, errors.Errorf("roothash: tag/query round mismatch (tag: %d, query: %d)", value.Round, block.Header.Round) + return nil, nil, fmt.Errorf("roothash: tag/query round mismatch (tag: %d, query: %d)", value.Round, block.Header.Round) } return block, &value, nil @@ -196,7 +196,7 @@ func (tb *tendermintBackend) StateToGenesis(ctx context.Context, height int64) ( return q.Genesis(ctx) } -func (tb *tendermintBackend) GetEvents(ctx context.Context, height int64) (*[]api.Event, error) { +func (tb *tendermintBackend) GetEvents(ctx context.Context, height int64) ([]api.Event, error) { // Get block results at given height. var results *tmrpctypes.ResultBlockResults results, err := tb.service.GetBlockResults(height) @@ -213,7 +213,7 @@ func (tb *tendermintBackend) GetEvents(ctx context.Context, height int64) (*[]ap for _, txResults := range results.TxsResults { tmEvents = append(tmEvents, txResults.Events...) } - events := []api.Event{} + var events []api.Event for _, tmEv := range tmEvents { // Ignore events that don't relate to the roothash app. if tmEv.GetType() != app.EventType { @@ -231,7 +231,7 @@ func (tb *tendermintBackend) GetEvents(ctx context.Context, height int64) (*[]ap // Execution discrepancy event. var eddValue app.ValueExecutionDiscrepancyDetected if err := cbor.Unmarshal(pair.GetValue(), &eddValue); err != nil { - return nil, errors.Wrap(err, "roothash: corrupt ExecutionDiscrepancyDetected tag") + return nil, fmt.Errorf("roothash: corrupt ExecutionDiscrepancyDetected event: %w", err) } evt := api.Event{ ExecutionDiscrepancyDetected: &eddValue.Event, @@ -241,7 +241,7 @@ func (tb *tendermintBackend) GetEvents(ctx context.Context, height int64) (*[]ap } } - return &events, nil + return events, nil } func (tb *tendermintBackend) Cleanup() { diff --git a/go/consensus/tendermint/staking/staking.go b/go/consensus/tendermint/staking/staking.go index 6de2fa3dce0..7c9652d034a 100644 --- a/go/consensus/tendermint/staking/staking.go +++ b/go/consensus/tendermint/staking/staking.go @@ -4,8 +4,7 @@ package staking import ( "bytes" "context" - - "github.com/pkg/errors" + "fmt" abcitypes "github.com/tendermint/tendermint/abci/types" tmrpctypes "github.com/tendermint/tendermint/rpc/core/types" @@ -143,7 +142,7 @@ func (tb *tendermintBackend) StateToGenesis(ctx context.Context, height int64) ( return q.Genesis(ctx) } -func (tb *tendermintBackend) GetEvents(ctx context.Context, height int64) (*[]api.Event, error) { +func (tb *tendermintBackend) GetEvents(ctx context.Context, height int64) ([]api.Event, error) { // Get block results at given height. var results *tmrpctypes.ResultBlockResults results, err := tb.service.GetBlockResults(height) @@ -221,8 +220,8 @@ func (tb *tendermintBackend) onEventDataTx(ctx context.Context, tx tmtypes.Event _, _ = tb.onABCIEvents(ctx, tx.Result.Events, tx.Height, true) } -func (tb *tendermintBackend) onABCIEvents(context context.Context, tmEvents []abcitypes.Event, height int64, doBroadcast bool) (*[]api.Event, error) { - events := []api.Event{} +func (tb *tendermintBackend) onABCIEvents(context context.Context, tmEvents []abcitypes.Event, height int64, doBroadcast bool) ([]api.Event, error) { + var events []api.Event for _, tmEv := range tmEvents { // Ignore events that don't relate to the staking app. if tmEv.GetType() != app.EventType { @@ -242,7 +241,7 @@ func (tb *tendermintBackend) onABCIEvents(context context.Context, tmEvents []ab if doBroadcast { continue } else { - return nil, errors.Wrap(err, "staking: corrupt TakeEscrow event") + return nil, fmt.Errorf("staking: corrupt TakeEscrow event: %w", err) } } @@ -263,7 +262,7 @@ func (tb *tendermintBackend) onABCIEvents(context context.Context, tmEvents []ab if doBroadcast { continue } else { - return nil, errors.Wrap(err, "staking: corrupt Transfer event") + return nil, fmt.Errorf("staking: corrupt Transfer event: %w", err) } } @@ -282,7 +281,7 @@ func (tb *tendermintBackend) onABCIEvents(context context.Context, tmEvents []ab if doBroadcast { continue } else { - return nil, errors.Wrap(err, "staking: corrupt ReclaimEscrow event") + return nil, fmt.Errorf("staking: corrupt ReclaimEscrow event: %w", err) } } @@ -303,7 +302,7 @@ func (tb *tendermintBackend) onABCIEvents(context context.Context, tmEvents []ab if doBroadcast { continue } else { - return nil, errors.Wrap(err, "staking: corrupt AddEscrow event") + return nil, fmt.Errorf("staking: corrupt AddEscrow event: %w", err) } } @@ -324,7 +323,7 @@ func (tb *tendermintBackend) onABCIEvents(context context.Context, tmEvents []ab if doBroadcast { continue } else { - return nil, errors.Wrap(err, "staking: corrupt Burn event") + return nil, fmt.Errorf("staking: corrupt Burn event: %w", err) } } @@ -336,7 +335,7 @@ func (tb *tendermintBackend) onABCIEvents(context context.Context, tmEvents []ab } } } - return &events, nil + return events, nil } // New constructs a new tendermint backed staking Backend instance. diff --git a/go/registry/api/api.go b/go/registry/api/api.go index 0cfe6285097..4c0b84d7c1e 100644 --- a/go/registry/api/api.go +++ b/go/registry/api/api.go @@ -227,7 +227,7 @@ type Backend interface { StateToGenesis(context.Context, int64) (*Genesis, error) // GetEvents returns the events at specified block height. - GetEvents(ctx context.Context, height int64) (*[]Event, error) + GetEvents(ctx context.Context, height int64) ([]Event, error) // Cleanup cleans up the registry backend. Cleanup() @@ -273,44 +273,44 @@ func NewRegisterRuntimeTx(nonce uint64, fee *transaction.Fee, sigRt *SignedRunti // EntityEvent is the event that is returned via WatchEntities to signify // entity registration changes and updates. type EntityEvent struct { - Entity *entity.Entity - IsRegistration bool + Entity *entity.Entity `json:"entity"` + IsRegistration bool `json:"is_registration"` } // NodeEvent is the event that is returned via WatchNodes to signify node // registration changes and updates. type NodeEvent struct { - Node *node.Node - IsRegistration bool + Node *node.Node `json:"node"` + IsRegistration bool `json:"is_registration"` } // RuntimeEvent signifies new runtime registration. type RuntimeEvent struct { - Runtime *Runtime + Runtime *Runtime `json:"runtime"` } // NodesExpiredEvent signifies node expirations. type NodesExpiredEvent struct { - Nodes []*node.Node + Nodes []*node.Node `json:"nodes"` } // NodeUnfrozenEvent signifies when node becomes unfrozen. type NodeUnfrozenEvent struct { - NodeID signature.PublicKey + NodeID signature.PublicKey `json:"node_id"` } // Event is a registry event returned via GetEvents. type Event struct { - RuntimeEvent *RuntimeEvent - EntityEvent *EntityEvent - NodeEvent *NodeEvent - NodesExpiredEvent *NodesExpiredEvent - NodeUnfrozenEvent *NodeUnfrozenEvent + RuntimeEvent *RuntimeEvent `json:"runtime,omitempty"` + EntityEvent *EntityEvent `json:"entity,omitempty"` + NodeEvent *NodeEvent `json:"node,omitempty"` + NodesExpiredEvent *NodesExpiredEvent `json:"nodes_expired,omitempty"` + NodeUnfrozenEvent *NodeUnfrozenEvent `json:"node_unfrozen,omitempty"` } // NodeList is a per-epoch immutable node list. type NodeList struct { - Nodes []*node.Node + Nodes []*node.Node `json:"nodes"` } // NodeLookup interface implements various ways for the verification diff --git a/go/registry/api/grpc.go b/go/registry/api/grpc.go index 326e468cc02..191e3ce8bdc 100644 --- a/go/registry/api/grpc.go +++ b/go/registry/api/grpc.go @@ -679,8 +679,8 @@ func (c *registryClient) StateToGenesis(ctx context.Context, height int64) (*Gen return &rsp, nil } -func (c *registryClient) GetEvents(ctx context.Context, height int64) (*[]Event, error) { - var rsp *[]Event +func (c *registryClient) GetEvents(ctx context.Context, height int64) ([]Event, error) { + var rsp []Event if err := c.conn.Invoke(ctx, methodGetEvents.FullName(), height, &rsp); err != nil { return nil, err } diff --git a/go/registry/tests/tester.go b/go/registry/tests/tester.go index 7b90d78cd1d..2a8ea4b18de 100644 --- a/go/registry/tests/tester.go +++ b/go/registry/tests/tester.go @@ -90,7 +90,7 @@ func testRegistryEntityNodes( // nolint: gocyclo evts, grr := backend.GetEvents(context.Background(), consensusAPI.HeightLatest) require.NoError(grr, "GetEvents") var gotIt bool - for _, evt := range *evts { + for _, evt := range evts { if evt.EntityEvent != nil { if evt.EntityEvent.Entity.ID.Equal(ev.Entity.ID) && evt.EntityEvent.IsRegistration { gotIt = true @@ -186,7 +186,7 @@ func testRegistryEntityNodes( // nolint: gocyclo evts, grr := backend.GetEvents(context.Background(), consensusAPI.HeightLatest) require.NoError(grr, "GetEvents") var gotIt bool - for _, evt := range *evts { + for _, evt := range evts { if evt.NodeEvent != nil { if evt.NodeEvent.Node.ID.Equal(tn.Node.ID) && evt.NodeEvent.IsRegistration { gotIt = true @@ -335,7 +335,7 @@ func testRegistryEntityNodes( // nolint: gocyclo evts, grr := backend.GetEvents(context.Background(), consensusAPI.HeightLatest) require.NoError(grr, "GetEvents") var gotIt bool - for _, evt := range *evts { + for _, evt := range evts { if evt.NodeEvent != nil { if evt.NodeEvent.Node.ID.Equal(ev.Node.ID) && !evt.NodeEvent.IsRegistration { gotIt = true @@ -403,7 +403,7 @@ func testRegistryEntityNodes( // nolint: gocyclo evts, err := backend.GetEvents(context.Background(), consensusAPI.HeightLatest) require.NoError(err, "GetEvents") var gotIt bool - for _, evt := range *evts { + for _, evt := range evts { if evt.EntityEvent != nil { if evt.EntityEvent.Entity.ID.Equal(ev.Entity.ID) && !evt.EntityEvent.IsRegistration { gotIt = true @@ -440,7 +440,7 @@ func testRegistryEntityNodes( // nolint: gocyclo evts, err := backend.GetEvents(context.Background(), consensusAPI.HeightLatest) require.NoError(err, "GetEvents") var gotIt bool - for _, evt := range *evts { + for _, evt := range evts { if evt.EntityEvent != nil { if evt.EntityEvent.Entity.ID.Equal(ev.Entity.ID) && !evt.EntityEvent.IsRegistration { gotIt = true @@ -1115,7 +1115,7 @@ func (rt *TestRuntime) MustRegister(t *testing.T, backend api.Backend, consensus evts, err := backend.GetEvents(context.Background(), consensusAPI.HeightLatest) require.NoError(err, "GetEvents") var gotIt bool - for _, evt := range *evts { + for _, evt := range evts { if evt.RuntimeEvent != nil { if evt.RuntimeEvent.Runtime.ID.Equal(&v.ID) { gotIt = true @@ -1182,7 +1182,7 @@ func BulkPopulate(t *testing.T, backend api.Backend, consensus consensusAPI.Back evts, grr := backend.GetEvents(context.Background(), consensusAPI.HeightLatest) require.NoError(grr, "GetEvents") var gotIt bool - for _, evt := range *evts { + for _, evt := range evts { if evt.EntityEvent != nil { if evt.EntityEvent.Entity.ID.Equal(ev.Entity.ID) && evt.EntityEvent.IsRegistration { gotIt = true @@ -1229,7 +1229,7 @@ func BulkPopulate(t *testing.T, backend api.Backend, consensus consensusAPI.Back evts, grr := backend.GetEvents(context.Background(), consensusAPI.HeightLatest) require.NoError(grr, "GetEvents") var gotIt bool - for _, evt := range *evts { + for _, evt := range evts { if evt.NodeEvent != nil { if evt.NodeEvent.Node.ID.Equal(ev.Node.ID) && evt.NodeEvent.IsRegistration { gotIt = true diff --git a/go/roothash/api/api.go b/go/roothash/api/api.go index 8eee49377f1..d3b83b3e195 100644 --- a/go/roothash/api/api.go +++ b/go/roothash/api/api.go @@ -92,7 +92,7 @@ type Backend interface { StateToGenesis(ctx context.Context, height int64) (*Genesis, error) // GetEvents returns the events at specified block height. - GetEvents(ctx context.Context, height int64) (*[]Event, error) + GetEvents(ctx context.Context, height int64) ([]Event, error) // Cleanup cleans up the roothash backend. Cleanup() @@ -151,8 +151,8 @@ type MergeDiscrepancyDetectedEvent struct { // Event is a protocol event. type Event struct { - ExecutionDiscrepancyDetected *ExecutionDiscrepancyDetectedEvent - MergeDiscrepancyDetected *MergeDiscrepancyDetectedEvent + ExecutionDiscrepancyDetected *ExecutionDiscrepancyDetectedEvent `json:"execution_discrepancy,omitempty"` + MergeDiscrepancyDetected *MergeDiscrepancyDetectedEvent `json:"merge_discrepancy,omitempty"` } // MetricsMonitorable is the interface exposed by backends capable of diff --git a/go/roothash/tests/tester.go b/go/roothash/tests/tester.go index db7dc982c70..a35105dc4d0 100644 --- a/go/roothash/tests/tester.go +++ b/go/roothash/tests/tester.go @@ -380,7 +380,7 @@ func (s *runtimeState) testSuccessfulRound(t *testing.T, backend api.Backend, co // There should be no discrepancy events. evts, err := backend.GetEvents(ctx, consensusAPI.HeightLatest) require.NoError(err, "GetEvents") - require.EqualValues(0, len(*evts), "should have no discrepancy events") + require.EqualValues(0, len(evts), "should have no discrepancy events") // Nothing more to do after the block was received. return diff --git a/go/staking/api/api.go b/go/staking/api/api.go index 9bc914366b8..349811fc38a 100644 --- a/go/staking/api/api.go +++ b/go/staking/api/api.go @@ -115,7 +115,7 @@ type Backend interface { WatchEscrows(ctx context.Context) (<-chan *EscrowEvent, pubsub.ClosableSubscription, error) // GetEvents returns the events at specified block height. - GetEvents(ctx context.Context, height int64) (*[]Event, error) + GetEvents(ctx context.Context, height int64) ([]Event, error) // Cleanup cleans up the backend. Cleanup() @@ -156,9 +156,9 @@ type EscrowEvent struct { // Event signifies a staking event, returned via GetEvents. type Event struct { - TransferEvent *TransferEvent - BurnEvent *BurnEvent - EscrowEvent *EscrowEvent + TransferEvent *TransferEvent `json:"transfer,omitempty"` + BurnEvent *BurnEvent `json:"burn,omitempty"` + EscrowEvent *EscrowEvent `json:"escrow,omitempty"` } // AddEscrowEvent is the event emitted when a balance is transfered into a escrow diff --git a/go/staking/api/grpc.go b/go/staking/api/grpc.go index 8301763c615..6f68b836dc2 100644 --- a/go/staking/api/grpc.go +++ b/go/staking/api/grpc.go @@ -541,8 +541,8 @@ func (c *stakingClient) ConsensusParameters(ctx context.Context, height int64) ( return &rsp, nil } -func (c *stakingClient) GetEvents(ctx context.Context, height int64) (*[]Event, error) { - var rsp *[]Event +func (c *stakingClient) GetEvents(ctx context.Context, height int64) ([]Event, error) { + var rsp []Event if err := c.conn.Invoke(ctx, methodGetEvents.FullName(), height, &rsp); err != nil { return nil, err } diff --git a/go/staking/tests/tester.go b/go/staking/tests/tester.go index 42d86161498..536c1963154 100644 --- a/go/staking/tests/tester.go +++ b/go/staking/tests/tester.go @@ -171,7 +171,7 @@ func testTransfer(t *testing.T, backend api.Backend, consensus consensusAPI.Back evts, grr := backend.GetEvents(context.Background(), consensusAPI.HeightLatest) require.NoError(grr, "GetEvents") var gotIt bool - for _, evt := range *evts { + for _, evt := range evts { if evt.TransferEvent != nil { if evt.TransferEvent.From.Equal(ev.From) && evt.TransferEvent.To.Equal(ev.To) && evt.TransferEvent.Tokens.Cmp(&ev.Tokens) == 0 { gotIt = true @@ -275,7 +275,7 @@ func testBurn(t *testing.T, backend api.Backend, consensus consensusAPI.Backend) evts, grr := backend.GetEvents(context.Background(), consensusAPI.HeightLatest) require.NoError(grr, "GetEvents") var gotIt bool - for _, evt := range *evts { + for _, evt := range evts { if evt.BurnEvent != nil { if evt.BurnEvent.Owner.Equal(ev.Owner) && evt.BurnEvent.Tokens.Cmp(&ev.Tokens) == 0 { gotIt = true @@ -359,7 +359,7 @@ func testEscrowEx( // nolint: gocyclo evts, grr := backend.GetEvents(context.Background(), consensusAPI.HeightLatest) require.NoError(grr, "GetEvents") var gotIt bool - for _, evt := range *evts { + for _, evt := range evts { if evt.EscrowEvent != nil && evt.EscrowEvent.Add != nil { if evt.EscrowEvent.Add.Owner.Equal(ev.Owner) && evt.EscrowEvent.Add.Escrow.Equal(ev.Escrow) && evt.EscrowEvent.Add.Tokens.Cmp(&ev.Tokens) == 0 { gotIt = true @@ -421,7 +421,7 @@ func testEscrowEx( // nolint: gocyclo evts, grr := backend.GetEvents(context.Background(), consensusAPI.HeightLatest) require.NoError(grr, "GetEvents") var gotIt bool - for _, evt := range *evts { + for _, evt := range evts { if evt.EscrowEvent != nil && evt.EscrowEvent.Add != nil { if evt.EscrowEvent.Add.Owner.Equal(ev.Owner) && evt.EscrowEvent.Add.Escrow.Equal(ev.Escrow) && evt.EscrowEvent.Add.Tokens.Cmp(&ev.Tokens) == 0 { gotIt = true @@ -497,7 +497,7 @@ func testEscrowEx( // nolint: gocyclo evts, grr := backend.GetEvents(context.Background(), consensusAPI.HeightLatest) require.NoError(grr, "GetEvents") var gotIt bool - for _, evt := range *evts { + for _, evt := range evts { if evt.EscrowEvent != nil && evt.EscrowEvent.Reclaim != nil { if evt.EscrowEvent.Reclaim.Owner.Equal(ev.Owner) && evt.EscrowEvent.Reclaim.Escrow.Equal(ev.Escrow) && evt.EscrowEvent.Reclaim.Tokens.Cmp(&ev.Tokens) == 0 { gotIt = true From af87d1d164f9414874ffdfbdb446cd1abb85f037 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrej=20Buko=C5=A1ek?= Date: Wed, 15 Apr 2020 13:11:01 +0200 Subject: [PATCH 9/9] registry: Remove redundant NodesExpirationEvent --- go/consensus/tendermint/registry/registry.go | 30 ++++++-------------- go/registry/api/api.go | 6 ---- 2 files changed, 9 insertions(+), 27 deletions(-) diff --git a/go/consensus/tendermint/registry/registry.go b/go/consensus/tendermint/registry/registry.go index e40eb812a40..aff6b27cc29 100644 --- a/go/consensus/tendermint/registry/registry.go +++ b/go/consensus/tendermint/registry/registry.go @@ -243,29 +243,17 @@ func (tb *tendermintBackend) onABCIEvents(ctx context.Context, tmEvents []abcity } } - if doBroadcast { - for _, node := range nodes { - tb.nodeNotifier.Broadcast(&api.NodeEvent{ - Node: node, - IsRegistration: false, - }) - } - } else { - evt := api.Event{ - NodesExpiredEvent: &api.NodesExpiredEvent{ - Nodes: nodes, - }, + // Generate node deregistration events. + for _, node := range nodes { + ne := &api.NodeEvent{ + Node: node, + IsRegistration: false, } - events = append(events, evt) - // For compatibility, we also emit NodeEvents with - // IsRegistration set to false, as in the broadcast case - // above. - for _, node := range nodes { - events = append(events, api.Event{NodeEvent: &api.NodeEvent{ - Node: node, - IsRegistration: false, - }}) + if doBroadcast { + tb.nodeNotifier.Broadcast(ne) + } else { + events = append(events, api.Event{NodeEvent: ne}) } } } else if bytes.Equal(key, app.KeyRuntimeRegistered) { diff --git a/go/registry/api/api.go b/go/registry/api/api.go index 4c0b84d7c1e..72475d9fe16 100644 --- a/go/registry/api/api.go +++ b/go/registry/api/api.go @@ -289,11 +289,6 @@ type RuntimeEvent struct { Runtime *Runtime `json:"runtime"` } -// NodesExpiredEvent signifies node expirations. -type NodesExpiredEvent struct { - Nodes []*node.Node `json:"nodes"` -} - // NodeUnfrozenEvent signifies when node becomes unfrozen. type NodeUnfrozenEvent struct { NodeID signature.PublicKey `json:"node_id"` @@ -304,7 +299,6 @@ type Event struct { RuntimeEvent *RuntimeEvent `json:"runtime,omitempty"` EntityEvent *EntityEvent `json:"entity,omitempty"` NodeEvent *NodeEvent `json:"node,omitempty"` - NodesExpiredEvent *NodesExpiredEvent `json:"nodes_expired,omitempty"` NodeUnfrozenEvent *NodeUnfrozenEvent `json:"node_unfrozen,omitempty"` }