Skip to content

Commit

Permalink
go/registry/api: Add GetNodeByConesnusID method
Browse files Browse the repository at this point in the history
  • Loading branch information
ptrus committed Jul 1, 2020
1 parent 1aabc70 commit 65a8632
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 22 deletions.
4 changes: 4 additions & 0 deletions .changelog/2961.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
go/registry/api: Add `GetNodeByConsensusAddress` method

`GetNodeByConsensusAddress` can be used to query nodes by their Consensus
address.
5 changes: 5 additions & 0 deletions go/consensus/tendermint/apps/registry/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Query interface {
Entity(context.Context, signature.PublicKey) (*entity.Entity, error)
Entities(context.Context) ([]*entity.Entity, error)
Node(context.Context, signature.PublicKey) (*node.Node, error)
NodeByConsensusAddress(context.Context, []byte) (*node.Node, error)
NodeStatus(context.Context, signature.PublicKey) (*registry.NodeStatus, error)
Nodes(context.Context) ([]*node.Node, error)
Runtime(context.Context, common.Namespace) (*registry.Runtime, error)
Expand Down Expand Up @@ -71,6 +72,10 @@ func (rq *registryQuerier) Node(ctx context.Context, id signature.PublicKey) (*n
return node, nil
}

func (rq *registryQuerier) NodeByConsensusAddress(ctx context.Context, address []byte) (*node.Node, error) {
return rq.state.NodeByConsensusAddress(ctx, address)
}

func (rq *registryQuerier) NodeStatus(ctx context.Context, id signature.PublicKey) (*registry.NodeStatus, error) {
return rq.state.NodeStatus(ctx, id)
}
Expand Down
9 changes: 9 additions & 0 deletions go/consensus/tendermint/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ func (tb *tendermintBackend) GetNodes(ctx context.Context, height int64) ([]*nod
return q.Nodes(ctx)
}

func (tb *tendermintBackend) GetNodeByConsensusAddress(ctx context.Context, query *api.ConsensusAddressQuery) (*node.Node, error) {
q, err := tb.querier.QueryAt(ctx, query.Height)
if err != nil {
return nil, err
}

return q.NodeByConsensusAddress(ctx, query.Address)
}

func (tb *tendermintBackend) WatchNodes(ctx context.Context) (<-chan *api.NodeEvent, pubsub.ClosableSubscription, error) {
typedCh := make(chan *api.NodeEvent)
sub := tb.nodeNotifier.Subscribe()
Expand Down
12 changes: 12 additions & 0 deletions go/oasis-node/cmd/debug/txsource/workload/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/oasisprotocol/oasis-core/go/common/node"
"github.com/oasisprotocol/oasis-core/go/common/quantity"
consensus "github.com/oasisprotocol/oasis-core/go/consensus/api"
tmcrypto "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/crypto"
control "github.com/oasisprotocol/oasis-core/go/control/api"
epochtime "github.com/oasisprotocol/oasis-core/go/epochtime/api"
registry "github.com/oasisprotocol/oasis-core/go/registry/api"
Expand Down Expand Up @@ -213,6 +214,17 @@ func (q *queries) doRegistryQueries(ctx context.Context, rng *rand.Rand, height
if err != nil {
return fmt.Errorf("GetNodeStatus error at height %d: %w", height, err)
}
node, err = q.registry.GetNodeByConsensusAddress(
ctx,
&registry.ConsensusAddressQuery{
Address: []byte(tmcrypto.PublicKeyToTendermint(&nod.ID).Address()), Height: height},
)
if err != nil {
return fmt.Errorf("GetNodeByConsensusAddress error at height %d: %w", height, err)
}
if !nod.ID.Equal(node.ID) {
return fmt.Errorf("GetNodeByConsensusAddress mismatch, expected: %s, got: %s", nod, node)
}
}

// Runtimes.
Expand Down
14 changes: 13 additions & 1 deletion go/registry/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ type Backend interface {
// GetNodes gets a list of all registered nodes.
GetNodes(context.Context, int64) ([]*node.Node, error)

// GetNodeByConsensusAddress returns the Node by Consensus address at the
// specified block height.
GetNodeByConsensusAddress(context.Context, *ConsensusAddressQuery) (*node.Node, error)

// WatchNodes returns a channel that produces a stream of
// NodeEvent on node registration changes.
WatchNodes(context.Context) (<-chan *NodeEvent, pubsub.ClosableSubscription, error)
Expand All @@ -204,7 +208,7 @@ type Backend interface {
// Upon subscription, the node list for the current epoch will be sent
// immediately if available.
//
// Each node list will be sorted by node ID in lexographically ascending
// Each node list will be sorted by node ID in lexicographically ascending
// order.
WatchNodeList(context.Context) (<-chan *NodeList, pubsub.ClosableSubscription, error)

Expand Down Expand Up @@ -241,6 +245,14 @@ type NamespaceQuery struct {
ID common.Namespace `json:"id"`
}

// ConsensusAddressQuery is a registry query by consensus address.
// Address is a passed as binary blob and it depends on the actual consensus
// implementation.
type ConsensusAddressQuery struct {
Height int64 `json:"height"`
Address []byte `json:"address"`
}

// NewRegisterEntityTx creates a new register entity transaction.
func NewRegisterEntityTx(nonce uint64, fee *transaction.Fee, sigEnt *entity.SignedEntity) *transaction.Transaction {
return transaction.NewTransaction(nonce, fee, MethodRegisterEntity, sigEnt)
Expand Down
37 changes: 37 additions & 0 deletions go/registry/api/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ var (
methodGetEntities = serviceName.NewMethod("GetEntities", int64(0))
// methodGetNode is the GetNode method.
methodGetNode = serviceName.NewMethod("GetNode", IDQuery{})
// methodGetNodeByConsensusAddress is the GetNodeByConsensusAddress method.
methodGetNodeByConsensusAddress = serviceName.NewMethod("GetNodeByConsensusAddress", IDQuery{})
// methodGetNodeStatus is the GetNodeStatus method.
methodGetNodeStatus = serviceName.NewMethod("GetNodeStatus", IDQuery{})
// methodGetNodes is the GetNodes method.
Expand Down Expand Up @@ -60,6 +62,10 @@ var (
MethodName: methodGetNode.ShortName(),
Handler: handlerGetNode,
},
{
MethodName: methodGetNodeByConsensusAddress.ShortName(),
Handler: handlerGetNodeByConsensusAddress,
},
{
MethodName: methodGetNodeStatus.ShortName(),
Handler: handlerGetNodeStatus,
Expand Down Expand Up @@ -179,6 +185,29 @@ func handlerGetNode( // nolint: golint
return interceptor(ctx, &query, info, handler)
}

func handlerGetNodeByConsensusAddress( // nolint: golint
srv interface{},
ctx context.Context,
dec func(interface{}) error,
interceptor grpc.UnaryServerInterceptor,
) (interface{}, error) {
var query ConsensusAddressQuery
if err := dec(&query); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(Backend).GetNodeByConsensusAddress(ctx, &query)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: methodGetNodeByConsensusAddress.FullName(),
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(Backend).GetNodeByConsensusAddress(ctx, req.(*ConsensusAddressQuery))
}
return interceptor(ctx, &query, info, handler)
}

func handlerGetNodeStatus( // nolint: golint
srv interface{},
ctx context.Context,
Expand Down Expand Up @@ -497,6 +526,14 @@ func (c *registryClient) GetNode(ctx context.Context, query *IDQuery) (*node.Nod
return &rsp, nil
}

func (c *registryClient) GetNodeByConsensusAddress(ctx context.Context, query *ConsensusAddressQuery) (*node.Node, error) {
var rsp node.Node
if err := c.conn.Invoke(ctx, methodGetNodeByConsensusAddress.FullName(), query, &rsp); err != nil {
return nil, err
}
return &rsp, nil
}

func (c *registryClient) GetNodeStatus(ctx context.Context, query *IDQuery) (*NodeStatus, error) {
var rsp NodeStatus
if err := c.conn.Invoke(ctx, methodGetNodeStatus.FullName(), query, &rsp); err != nil {
Expand Down
50 changes: 30 additions & 20 deletions go/registry/tests/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/oasisprotocol/oasis-core/go/common/node"
"github.com/oasisprotocol/oasis-core/go/common/quantity"
consensusAPI "github.com/oasisprotocol/oasis-core/go/consensus/api"
tmcrypto "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/crypto"
epochtime "github.com/oasisprotocol/oasis-core/go/epochtime/api"
epochtimeTests "github.com/oasisprotocol/oasis-core/go/epochtime/tests"
"github.com/oasisprotocol/oasis-core/go/registry/api"
Expand Down Expand Up @@ -60,18 +61,20 @@ func testRegistryEntityNodes( // nolint: gocyclo
runtimeID common.Namespace,
runtimeEWID common.Namespace,
) {
ctx := context.Background()

// Generate the entities used for the test cases.
entities, err := NewTestEntities(entityNodeSeed, 3)
require.NoError(t, err, "NewTestEntities")

timeSource := consensus.EpochTime().(epochtime.SetableBackend)
epoch, err := timeSource.GetEpoch(context.Background(), consensusAPI.HeightLatest)
epoch, err := timeSource.GetEpoch(ctx, consensusAPI.HeightLatest)
require.NoError(t, err, "GetEpoch")

// All of these tests are combined because the Entity and Node structures
// are linked togehter.

entityCh, entitySub, err := backend.WatchEntities(context.Background())
entityCh, entitySub, err := backend.WatchEntities(ctx)
require.NoError(t, err, "WatchEntities")
defer entitySub.Close()

Expand All @@ -94,7 +97,7 @@ func testRegistryEntityNodes( // nolint: gocyclo
require.True(ev.IsRegistration, "event is registration")

// Make sure that GetEvents also returns the registration event.
evts, grr := backend.GetEvents(context.Background(), consensusAPI.HeightLatest)
evts, grr := backend.GetEvents(ctx, consensusAPI.HeightLatest)
require.NoError(grr, "GetEvents")
var gotIt bool
for _, evt := range evts {
Expand All @@ -113,13 +116,13 @@ func testRegistryEntityNodes( // nolint: gocyclo

for _, v := range entities {
var ent *entity.Entity
ent, err = backend.GetEntity(context.Background(), &api.IDQuery{ID: v.Entity.ID, Height: consensusAPI.HeightLatest})
ent, err = backend.GetEntity(ctx, &api.IDQuery{ID: v.Entity.ID, Height: consensusAPI.HeightLatest})
require.NoError(err, "GetEntity")
require.EqualValues(v.Entity, ent, "retrieved entity")
}

var registeredEntities []*entity.Entity
registeredEntities, err = backend.GetEntities(context.Background(), consensusAPI.HeightLatest)
registeredEntities, err = backend.GetEntities(ctx, consensusAPI.HeightLatest)
require.NoError(err, "GetEntities")
// NOTE: The test entity is alway present as it controls a runtime and cannot be removed.
testEntity, _, _ := entity.TestEntity()
Expand Down Expand Up @@ -166,7 +169,7 @@ func testRegistryEntityNodes( // nolint: gocyclo
nonWhitelistedNodes, err := entities[0].NewTestNodes(1, 1, []byte("nonWhitelistedNodes"), nodeRuntimesEW, epoch+2)
require.NoError(t, err, "NewTestNodes non-whitelisted")

nodeCh, nodeSub, err := backend.WatchNodes(context.Background())
nodeCh, nodeSub, err := backend.WatchNodes(ctx)
require.NoError(t, err, "WatchNodes")
defer nodeSub.Close()

Expand All @@ -190,7 +193,7 @@ func testRegistryEntityNodes( // nolint: gocyclo
require.True(ev.IsRegistration, "event is registration")

// Make sure that GetEvents also returns the registration event.
evts, grr := backend.GetEvents(context.Background(), consensusAPI.HeightLatest)
evts, grr := backend.GetEvents(ctx, consensusAPI.HeightLatest)
require.NoError(grr, "GetEvents")
var gotIt bool
for _, evt := range evts {
Expand All @@ -207,10 +210,20 @@ func testRegistryEntityNodes( // nolint: gocyclo
}

var nod *node.Node
nod, err = backend.GetNode(context.Background(), &api.IDQuery{ID: tn.Node.ID, Height: consensusAPI.HeightLatest})
nod, err = backend.GetNode(ctx, &api.IDQuery{ID: tn.Node.ID, Height: consensusAPI.HeightLatest})
require.NoError(err, "GetNode")
require.EqualValues(tn.Node, nod, "retrieved node")

var nodeByConsensus *node.Node
nodeByConsensus, err = backend.GetNodeByConsensusAddress(
ctx,
&api.ConsensusAddressQuery{
Address: []byte(tmcrypto.PublicKeyToTendermint(&tn.Node.Consensus.ID).Address()),
Height: consensusAPI.HeightLatest},
)
require.NoError(err, "GetNodeByConsensusAddress")
require.EqualValues(tn.Node, nodeByConsensus, "retrieved node by Consensus Address")

for _, v := range tn.invalidAfter {
err = tn.Register(consensus, v.signed)
require.Error(err, v.descr)
Expand Down Expand Up @@ -274,7 +287,7 @@ func testRegistryEntityNodes( // nolint: gocyclo
expectedNodeList := getExpectedNodeList()
epoch = epochtimeTests.MustAdvanceEpoch(t, timeSource, 1)

registeredNodes, nerr := backend.GetNodes(context.Background(), consensusAPI.HeightLatest)
registeredNodes, nerr := backend.GetNodes(ctx, consensusAPI.HeightLatest)
require.NoError(nerr, "GetNodes")
require.EqualValues(expectedNodeList, registeredNodes, "node list")
})
Expand All @@ -285,9 +298,6 @@ func testRegistryEntityNodes( // nolint: gocyclo
entity := entities[0]
node := nodes[0][0]

ctx, cancel := context.WithTimeout(context.Background(), recvTimeout)
defer cancel()

// Get node status.
var nodeStatus *api.NodeStatus
nodeStatus, err = backend.GetNodeStatus(ctx, &api.IDQuery{ID: node.Node.ID, Height: consensusAPI.HeightLatest})
Expand All @@ -299,7 +309,7 @@ func testRegistryEntityNodes( // nolint: gocyclo
tx := api.NewUnfreezeNodeTx(0, nil, &api.UnfreezeNode{
NodeID: node.Node.ID,
})
err = consensusAPI.SignAndSubmitTx(context.Background(), consensus, entity.Signer, tx)
err = consensusAPI.SignAndSubmitTx(ctx, consensus, entity.Signer, tx)
require.NoError(err, "UnfreezeNode")

// Try to unfreeze an invalid node (should fail).
Expand All @@ -308,7 +318,7 @@ func testRegistryEntityNodes( // nolint: gocyclo
err = unfreeze.NodeID.UnmarshalHex("ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
require.NoError(err, "UnmarshalHex")
tx = api.NewUnfreezeNodeTx(0, nil, &unfreeze)
err = consensusAPI.SignAndSubmitTx(context.Background(), consensus, entity.Signer, tx)
err = consensusAPI.SignAndSubmitTx(ctx, consensus, entity.Signer, tx)
require.Error(err, "UnfreezeNode (with invalid node)")
require.Equal(err, api.ErrNoSuchNode)

Expand All @@ -317,7 +327,7 @@ func testRegistryEntityNodes( // nolint: gocyclo
tx = api.NewUnfreezeNodeTx(0, nil, &api.UnfreezeNode{
NodeID: node.Node.ID,
})
err = consensusAPI.SignAndSubmitTx(context.Background(), consensus, node.Signer, tx)
err = consensusAPI.SignAndSubmitTx(ctx, consensus, node.Signer, tx)
require.Error(err, "UnfreezeNode (with invalid signer)")
require.Equal(err, api.ErrBadEntityForNode)
})
Expand All @@ -339,7 +349,7 @@ func testRegistryEntityNodes( // nolint: gocyclo
deregisteredNodes[ev.Node.ID] = ev.Node

// Make sure that GetEvents also returns the deregistration event.
evts, grr := backend.GetEvents(context.Background(), consensusAPI.HeightLatest)
evts, grr := backend.GetEvents(ctx, consensusAPI.HeightLatest)
require.NoError(grr, "GetEvents")
var gotIt bool
for _, evt := range evts {
Expand Down Expand Up @@ -371,7 +381,7 @@ func testRegistryEntityNodes( // nolint: gocyclo

// Ensure the node list doesn't have the expired nodes.
expectedNodeList := getExpectedNodeList()
registeredNodes, nerr := backend.GetNodes(context.Background(), consensusAPI.HeightLatest)
registeredNodes, nerr := backend.GetNodes(ctx, consensusAPI.HeightLatest)
require.NoError(nerr, "GetNodes")
require.EqualValues(expectedNodeList, registeredNodes, "node list")

Expand Down Expand Up @@ -407,7 +417,7 @@ func testRegistryEntityNodes( // nolint: gocyclo
require.False(ev.IsRegistration, "event is deregistration")

// Make sure that GetEvents also returns the deregistration event.
evts, err := backend.GetEvents(context.Background(), consensusAPI.HeightLatest)
evts, err := backend.GetEvents(ctx, consensusAPI.HeightLatest)
require.NoError(err, "GetEvents")
var gotIt bool
for _, evt := range evts {
Expand Down Expand Up @@ -444,7 +454,7 @@ func testRegistryEntityNodes( // nolint: gocyclo
require.False(ev.IsRegistration, "event is deregistration")

// Make sure that GetEvents also returns the deregistration event.
evts, err := backend.GetEvents(context.Background(), consensusAPI.HeightLatest)
evts, err := backend.GetEvents(ctx, consensusAPI.HeightLatest)
require.NoError(err, "GetEvents")
var gotIt bool
for _, evt := range evts {
Expand All @@ -463,7 +473,7 @@ func testRegistryEntityNodes( // nolint: gocyclo

// There should be no more entities.
for _, v := range entities {
_, err := backend.GetEntity(context.Background(), &api.IDQuery{ID: v.Entity.ID, Height: consensusAPI.HeightLatest})
_, err := backend.GetEntity(ctx, &api.IDQuery{ID: v.Entity.ID, Height: consensusAPI.HeightLatest})
require.Equal(api.ErrNoSuchEntity, err, "GetEntity")
}
})
Expand Down
2 changes: 1 addition & 1 deletion go/worker/common/p2p/peermgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (mgr *PeerManager) watchRegistryNodes(consensus consensus.Backend) {

nodeCh, nSub, err := consensus.Registry().WatchNodes(mgr.ctx)
if err != nil {
mgr.logger.Error("failed to watch registery for node changes",
mgr.logger.Error("failed to watch registry for node changes",
"err", err,
)
return
Expand Down

0 comments on commit 65a8632

Please sign in to comment.