Skip to content

Commit

Permalink
go/scheduler: Add gRPC interface
Browse files Browse the repository at this point in the history
  • Loading branch information
kostko committed Dec 18, 2019
1 parent 798c058 commit 025c8d2
Show file tree
Hide file tree
Showing 19 changed files with 303 additions and 41 deletions.
3 changes: 3 additions & 0 deletions go/consensus/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ const (

// HeightLatest is the height that represents the most recent block height.
HeightLatest int64 = 0

// VotingPower is the default voting power for all validator nodes.
VotingPower = 1
)

// ErrNoCommittedBlocks is the error returned when there are no committed
Expand Down
3 changes: 0 additions & 3 deletions go/consensus/tendermint/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ const (
LogEventPeerExchangeDisabled = "tendermint/peer_exchange_disabled"
)

// VotingPower is the default voting power for all validator nodes.
const VotingPower = 1

// PublicKeyToValidatorUpdate converts an Oasis node public key to a
// tendermint validator update.
func PublicKeyToValidatorUpdate(id signature.PublicKey, power int64) types.ValidatorUpdate {
Expand Down
4 changes: 2 additions & 2 deletions go/consensus/tendermint/apps/scheduler/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (

"github.com/oasislabs/oasis-core/go/common/crypto/signature"
"github.com/oasislabs/oasis-core/go/common/node"
consensus "github.com/oasislabs/oasis-core/go/consensus/api"
"github.com/oasislabs/oasis-core/go/consensus/tendermint/abci"
"github.com/oasislabs/oasis-core/go/consensus/tendermint/api"
registryState "github.com/oasislabs/oasis-core/go/consensus/tendermint/apps/registry/state"
schedulerState "github.com/oasislabs/oasis-core/go/consensus/tendermint/apps/scheduler/state"
genesis "github.com/oasislabs/oasis-core/go/genesis/api"
Expand Down Expand Up @@ -112,7 +112,7 @@ func (app *schedulerApplication) InitChain(ctx *abci.Context, req types.RequestI
return fmt.Errorf("scheduler: invalid genesis validator public key: %w", err)
}

if power := v.GetPower(); power != api.VotingPower {
if power := v.GetPower(); power != consensus.VotingPower {
app.logger.Error("invalid voting power",
"id", id,
"power", power,
Expand Down
6 changes: 3 additions & 3 deletions go/consensus/tendermint/apps/scheduler/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package scheduler
import (
"context"

consensus "github.com/oasislabs/oasis-core/go/consensus/api"
"github.com/oasislabs/oasis-core/go/consensus/tendermint/abci"
"github.com/oasislabs/oasis-core/go/consensus/tendermint/api"
schedulerState "github.com/oasislabs/oasis-core/go/consensus/tendermint/apps/scheduler/state"
scheduler "github.com/oasislabs/oasis-core/go/scheduler/api"
)
Expand Down Expand Up @@ -59,13 +59,13 @@ func (sq *schedulerQuerier) Validators(ctx context.Context) ([]*scheduler.Valida
}

// Since we use flat voting power for now, doing it this way saves
// having to store api.VotingPower repeatedly in the validator set
// having to store consensus.VotingPower repeatedly in the validator set
// ABCI state.
ret := make([]*scheduler.Validator, 0, len(valPks))
for _, v := range valPks {
ret = append(ret, &scheduler.Validator{
ID: v,
VotingPower: api.VotingPower,
VotingPower: consensus.VotingPower,
})
}

Expand Down
3 changes: 2 additions & 1 deletion go/consensus/tendermint/apps/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/oasislabs/oasis-core/go/common/crypto/signature"
"github.com/oasislabs/oasis-core/go/common/logging"
"github.com/oasislabs/oasis-core/go/common/node"
consensus "github.com/oasislabs/oasis-core/go/consensus/api"
"github.com/oasislabs/oasis-core/go/consensus/api/transaction"
"github.com/oasislabs/oasis-core/go/consensus/tendermint/abci"
"github.com/oasislabs/oasis-core/go/consensus/tendermint/api"
Expand Down Expand Up @@ -318,7 +319,7 @@ func (app *schedulerApplication) EndBlock(ctx *abci.Context, req types.RequestEn
app.logger.Debug("adding new validator to validator set",
"id", v,
)
updates = append(updates, api.PublicKeyToValidatorUpdate(v, api.VotingPower))
updates = append(updates, api.PublicKeyToValidatorUpdate(v, consensus.VotingPower))
} else {
app.logger.Debug("keeping existing validator in the validator set",
"id", v,
Expand Down
1 change: 1 addition & 0 deletions go/consensus/tendermint/roothash/roothash.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/oasislabs/oasis-core/go/common/crypto/signature"
"github.com/oasislabs/oasis-core/go/common/logging"
"github.com/oasislabs/oasis-core/go/common/pubsub"
consensus "github.com/oasislabs/oasis-core/go/consensus/api"
app "github.com/oasislabs/oasis-core/go/consensus/tendermint/apps/roothash"
"github.com/oasislabs/oasis-core/go/consensus/tendermint/service"
"github.com/oasislabs/oasis-core/go/roothash/api"
Expand Down
11 changes: 5 additions & 6 deletions go/consensus/tendermint/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
tmtypes "github.com/tendermint/tendermint/types"

"github.com/oasislabs/oasis-core/go/common/cbor"
"github.com/oasislabs/oasis-core/go/common/crypto/signature"
"github.com/oasislabs/oasis-core/go/common/logging"
"github.com/oasislabs/oasis-core/go/common/pubsub"
consensus "github.com/oasislabs/oasis-core/go/consensus/api"
Expand Down Expand Up @@ -50,8 +49,8 @@ func (tb *tendermintBackend) GetValidators(ctx context.Context, height int64) ([
return q.Validators(ctx)
}

func (tb *tendermintBackend) GetCommittees(ctx context.Context, id signature.PublicKey, height int64) ([]*api.Committee, error) {
q, err := tb.querier.QueryAt(ctx, height)
func (tb *tendermintBackend) GetCommittees(ctx context.Context, request *api.GetCommitteesRequest) ([]*api.Committee, error) {
q, err := tb.querier.QueryAt(ctx, request.Height)
if err != nil {
return nil, err
}
Expand All @@ -63,20 +62,20 @@ func (tb *tendermintBackend) GetCommittees(ctx context.Context, id signature.Pub

var runtimeCommittees []*api.Committee
for _, c := range committees {
if c.RuntimeID.Equal(id) {
if c.RuntimeID.Equal(request.RuntimeID) {
runtimeCommittees = append(runtimeCommittees, c)
}
}

return runtimeCommittees, nil
}

func (tb *tendermintBackend) WatchCommittees() (<-chan *api.Committee, *pubsub.Subscription) {
func (tb *tendermintBackend) WatchCommittees(ctx context.Context) (<-chan *api.Committee, pubsub.ClosableSubscription, error) {
typedCh := make(chan *api.Committee)
sub := tb.notifier.Subscribe()
sub.Unwrap(typedCh)

return typedCh, sub
return typedCh, sub, nil
}

func (tb *tendermintBackend) getCurrentCommittees() ([]*api.Committee, error) {
Expand Down
2 changes: 1 addition & 1 deletion go/consensus/tendermint/tendermint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1009,7 +1009,7 @@ func genesisToTendermint(d *genesisAPI.Document) (*tmtypes.GenesisDoc, error) {
validator := tmtypes.GenesisValidator{
Address: pk.Address(),
PubKey: pk,
Power: api.VotingPower,
Power: consensusAPI.VotingPower,
Name: "oasis-validator-" + openedNode.ID.String(),
}
tmValidators = append(tmValidators, validator)
Expand Down
5 changes: 4 additions & 1 deletion go/oasis-node/cmd/debug/byzantine/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ func schedulerNextElectionHeight(svc service.TendermintService, kind scheduler.C
}

func schedulerGetCommittee(ht *honestTendermint, height int64, kind scheduler.CommitteeKind, runtimeID signature.PublicKey) (*scheduler.Committee, error) {
committees, err := ht.service.Scheduler().GetCommittees(context.Background(), runtimeID, height)
committees, err := ht.service.Scheduler().GetCommittees(context.Background(), &scheduler.GetCommitteesRequest{
RuntimeID: runtimeID,
Height: height,
})
if err != nil {
return nil, fmt.Errorf("Scheduler GetCommittees() error: %w", err)
}
Expand Down
1 change: 1 addition & 0 deletions go/oasis-node/cmd/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ func (n *Node) initBackends() error {

// Initialize and register the internal gRPC services.
grpcSrv := n.grpcInternal.Server()
scheduler.RegisterService(grpcSrv, n.Scheduler)
registryAPI.RegisterService(grpcSrv, n.Registry)
stakingAPI.RegisterService(grpcSrv, n.Staking)
storageAPI.RegisterService(grpcSrv, n.Storage)
Expand Down
20 changes: 9 additions & 11 deletions go/oasis-node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/oasislabs/oasis-core/go/common/entity"
cmnGrpc "github.com/oasislabs/oasis-core/go/common/grpc"
consensusAPI "github.com/oasislabs/oasis-core/go/consensus/api"
tendermintAPI "github.com/oasislabs/oasis-core/go/consensus/tendermint/api"
epochtime "github.com/oasislabs/oasis-core/go/epochtime/api"
epochtimeTests "github.com/oasislabs/oasis-core/go/epochtime/tests"
cmdCommon "github.com/oasislabs/oasis-core/go/oasis-node/cmd/common"
Expand All @@ -33,6 +32,7 @@ import (
roothashTests "github.com/oasislabs/oasis-core/go/roothash/tests"
clientTests "github.com/oasislabs/oasis-core/go/runtime/client/tests"
runtimeRegistry "github.com/oasislabs/oasis-core/go/runtime/registry"
scheduler "github.com/oasislabs/oasis-core/go/scheduler/api"
schedulerTests "github.com/oasislabs/oasis-core/go/scheduler/tests"
staking "github.com/oasislabs/oasis-core/go/staking/api"
stakingTests "github.com/oasislabs/oasis-core/go/staking/tests"
Expand Down Expand Up @@ -241,7 +241,7 @@ func TestNode(t *testing.T) {
{"Storage", testStorage},
{"Registry", testRegistry},
{"Scheduler", testScheduler},
{"Scheduler/GetValidators", testSchedulerGetValidators},
{"SchedulerClient", testSchedulerClient},
{"RootHash", testRootHash},

// TestStorageClientWithoutNode runs client tests that use a mock storage
Expand Down Expand Up @@ -359,18 +359,16 @@ func testRegistry(t *testing.T, node *testNode) {
}

func testScheduler(t *testing.T, node *testNode) {
schedulerTests.SchedulerImplementationTests(t, node.Scheduler, node.Consensus)
schedulerTests.SchedulerImplementationTests(t, "", node.Scheduler, node.Consensus)
}

func testSchedulerGetValidators(t *testing.T, node *testNode) {
// Since the integration tests run with validator elections disabled,
// just ensure that the GetValidators query returns the node's identity.
validators, err := node.Scheduler.GetValidators(context.Background(), consensusAPI.HeightLatest)
require.NoError(t, err, "GetValidators")
func testSchedulerClient(t *testing.T, node *testNode) {
// Create a client backend connected to the local node's internal socket.
conn, err := cmnGrpc.Dial("unix:"+filepath.Join(node.dataDir, "internal.sock"), grpc.WithInsecure())
require.NoError(t, err, "Dial")

require.Len(t, validators, 1, "should be only one static validator")
require.Equal(t, node.Identity.ConsensusSigner.Public(), validators[0].ID)
require.EqualValues(t, tendermintAPI.VotingPower, validators[0].VotingPower)
client := scheduler.NewSchedulerClient(conn)
schedulerTests.SchedulerImplementationTests(t, "client", client, node.Consensus)
}

func testStaking(t *testing.T, node *testNode) {
Expand Down
3 changes: 2 additions & 1 deletion go/roothash/tests/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,8 @@ func mustGetCommittee(
) {
require := require.New(t)

ch, sub := sched.WatchCommittees()
ch, sub, err := sched.WatchCommittees(context.Background())
require.NoError(err, "WatchCommittees")
defer sub.Close()

for {
Expand Down
5 changes: 4 additions & 1 deletion go/runtime/client/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,10 @@ type blockWatcher struct {
}

func (w *blockWatcher) refreshCommittee(height int64) error {
committees, err := w.common.scheduler.GetCommittees(w.common.ctx, w.id, height)
committees, err := w.common.scheduler.GetCommittees(w.common.ctx, &scheduler.GetCommitteesRequest{
RuntimeID: w.id,
Height: height,
})
if err != nil {
return err
}
Expand Down
14 changes: 10 additions & 4 deletions go/scheduler/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,29 +158,35 @@ type Validator struct {
type Backend interface {
// GetValidators returns the vector of consensus validators for
// a given epoch.
GetValidators(context.Context, int64) ([]*Validator, error)
GetValidators(ctx context.Context, height int64) ([]*Validator, error)

// GetCommittees returns the vector of committees for a given
// runtime ID, at the specified block height, and optional callback
// for querying the beacon for a given epoch/block height.
//
// Iff the callback is nil, `beacon.GetBlockBeacon` will be used.
GetCommittees(context.Context, signature.PublicKey, int64) ([]*Committee, error)
GetCommittees(ctx context.Context, request *GetCommitteesRequest) ([]*Committee, error)

// WatchCommittees returns a channel that produces a stream of
// Committee.
//
// Upon subscription, all committees for the current epoch will
// be sent immediately.
WatchCommittees() (<-chan *Committee, *pubsub.Subscription)
WatchCommittees(ctx context.Context) (<-chan *Committee, pubsub.ClosableSubscription, error)

// StateToGenesis returns the genesis state at specified block height.
StateToGenesis(context.Context, int64) (*Genesis, error)
StateToGenesis(ctx context.Context, height int64) (*Genesis, error)

// Cleanup cleans up the scheduler backend.
Cleanup()
}

// GetCommitteesRequest is a GetCommittees request.
type GetCommitteesRequest struct {
Height int64 `json:"height"`
RuntimeID signature.PublicKey `json:"runtime_id"`
}

// Genesis is the committee scheduler genesis state.
type Genesis struct {
// Parameters are the scheduler consensus parameters.
Expand Down
Loading

0 comments on commit 025c8d2

Please sign in to comment.