Skip to content

Commit

Permalink
go/consensus: Add block accessors
Browse files Browse the repository at this point in the history
  • Loading branch information
kostko committed Dec 17, 2019
1 parent 4d49c73 commit e79af06
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 36 deletions.
22 changes: 20 additions & 2 deletions go/consensus/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type ClientBackend interface {
SubmitTx(ctx context.Context, tx *transaction.SignedTransaction) error

// StateToGenesis returns the genesis state at the specified block height.
StateToGenesis(ctx context.Context, blockHeight int64) (*genesis.Document, error)
StateToGenesis(ctx context.Context, height int64) (*genesis.Document, error)

// WaitEpoch waits for consensus to reach an epoch.
//
Expand All @@ -47,7 +47,25 @@ type ClientBackend interface {
// in the future).
WaitEpoch(ctx context.Context, epoch epochtime.EpochTime) error

// TODO: Add things like following consensus blocks.
// GetBlock returns a consensus block at a specific height.
GetBlock(ctx context.Context, height int64) (*Block, error)

// GetTransactions returns a list of all transactions contained within a
// consensus block at a specific height.
//
// NOTE: Any of these transactions could be invalid.
GetTransactions(ctx context.Context, height int64) ([][]byte, error)
}

// Block is a consensus block.
//
// While some common fields are provided, most of the structure is dependent on
// the actual backend implementation.
type Block struct {
// Height contains the block height.
Height int64 `json:"height"`
// Meta contains the consensus backend specific block metadata.
Meta interface{} `json:"meta"`
}

// Backend is an interface that a consensus backend must provide.
Expand Down
86 changes: 80 additions & 6 deletions go/consensus/api/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ var (
methodStateToGenesis = serviceName.NewMethodName("StateToGenesis")
// methodWaitEpoch is the name of the WaitEpoch method.
methodWaitEpoch = serviceName.NewMethodName("WaitEpoch")
// methodGetBlock is the name of the GetBlock method.
methodGetBlock = serviceName.NewMethodName("GetBlock")
// methodGetTransactions is the name of the GetTransactions method.
methodGetTransactions = serviceName.NewMethodName("GetTransactions")

// serviceDesc is the gRPC service descriptor.
serviceDesc = grpc.ServiceDesc{
Expand All @@ -39,6 +43,14 @@ var (
MethodName: methodWaitEpoch.Short(),
Handler: handlerWaitEpoch,
},
{
MethodName: methodGetBlock.Short(),
Handler: handlerGetBlock,
},
{
MethodName: methodGetTransactions.Short(),
Handler: handlerGetTransactions,
},
},
Streams: []grpc.StreamDesc{},
}
Expand Down Expand Up @@ -73,12 +85,12 @@ func handlerStateToGenesis( // nolint: golint
dec func(interface{}) error,
interceptor grpc.UnaryServerInterceptor,
) (interface{}, error) {
var blockHeight int64
if err := dec(&blockHeight); err != nil {
var height int64
if err := dec(&height); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(Backend).StateToGenesis(ctx, blockHeight)
return srv.(Backend).StateToGenesis(ctx, height)
}
info := &grpc.UnaryServerInfo{
Server: srv,
Expand All @@ -87,7 +99,7 @@ func handlerStateToGenesis( // nolint: golint
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(Backend).StateToGenesis(ctx, req.(int64))
}
return interceptor(ctx, blockHeight, info, handler)
return interceptor(ctx, height, info, handler)
}

func handlerWaitEpoch( // nolint: golint
Expand All @@ -113,6 +125,52 @@ func handlerWaitEpoch( // nolint: golint
return interceptor(ctx, epoch, info, handler)
}

func handlerGetBlock( // nolint: golint
srv interface{},
ctx context.Context,
dec func(interface{}) error,
interceptor grpc.UnaryServerInterceptor,
) (interface{}, error) {
var height int64
if err := dec(&height); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(Backend).GetBlock(ctx, height)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: methodGetBlock.Full(),
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(Backend).GetBlock(ctx, req.(int64))
}
return interceptor(ctx, height, info, handler)
}

func handlerGetTransactions( // nolint: golint
srv interface{},
ctx context.Context,
dec func(interface{}) error,
interceptor grpc.UnaryServerInterceptor,
) (interface{}, error) {
var height int64
if err := dec(&height); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(Backend).GetTransactions(ctx, height)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: methodGetTransactions.Full(),
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(Backend).GetTransactions(ctx, req.(int64))
}
return interceptor(ctx, height, info, handler)
}

// RegisterService registers a new consensus backend service with the
// given gRPC server.
func RegisterService(server *grpc.Server, service Backend) {
Expand All @@ -127,9 +185,9 @@ func (c *consensusClient) SubmitTx(ctx context.Context, tx *transaction.SignedTr
return c.conn.Invoke(ctx, methodSubmitTx.Full(), tx, nil)
}

func (c *consensusClient) StateToGenesis(ctx context.Context, blockHeight int64) (*genesis.Document, error) {
func (c *consensusClient) StateToGenesis(ctx context.Context, height int64) (*genesis.Document, error) {
var rsp genesis.Document
if err := c.conn.Invoke(ctx, methodStateToGenesis.Full(), blockHeight, &rsp); err != nil {
if err := c.conn.Invoke(ctx, methodStateToGenesis.Full(), height, &rsp); err != nil {
return nil, err
}
return &rsp, nil
Expand All @@ -139,6 +197,22 @@ func (c *consensusClient) WaitEpoch(ctx context.Context, epoch epochtime.EpochTi
return c.conn.Invoke(ctx, methodWaitEpoch.Full(), epoch, nil)
}

func (c *consensusClient) GetBlock(ctx context.Context, height int64) (*Block, error) {
var rsp Block
if err := c.conn.Invoke(ctx, methodGetBlock.Full(), height, &rsp); err != nil {
return nil, err
}
return &rsp, nil
}

func (c *consensusClient) GetTransactions(ctx context.Context, height int64) ([][]byte, error) {
var rsp [][]byte
if err := c.conn.Invoke(ctx, methodGetBlock.Full(), height, &rsp); err != nil {
return nil, err
}
return rsp, nil
}

// NewConsensusClient creates a new gRPC consensus client service.
func NewConsensusClient(c *grpc.ClientConn) ClientBackend {
return &consensusClient{c}
Expand Down
2 changes: 1 addition & 1 deletion go/consensus/tendermint/epochtime/epochtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (t *tendermintBackend) StateToGenesis(ctx context.Context, height int64) (*
}

func (t *tendermintBackend) worker(ctx context.Context) {
ch, sub := t.service.WatchBlocks()
ch, sub := t.service.WatchTendermintBlocks()
defer sub.Close()

for {
Expand Down
2 changes: 1 addition & 1 deletion go/consensus/tendermint/roothash/roothash.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (tb *tendermintBackend) reindexBlocks() error {
// we can safely snapshot the current height as we have already subscribed
// to new blocks.
var currentBlk *tmtypes.Block
if currentBlk, err = tb.service.GetBlock(nil); err != nil {
if currentBlk, err = tb.service.GetTendermintBlock(tb.ctx, consensus.HeightLatest); err != nil {
tb.logger.Error("failed to get latest block",
"err", err,
)
Expand Down
10 changes: 6 additions & 4 deletions go/consensus/tendermint/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
package service

import (
"context"

tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
tmrpctypes "github.com/tendermint/tendermint/rpc/core/types"
tmtypes "github.com/tendermint/tendermint/types"
Expand Down Expand Up @@ -35,18 +37,18 @@ type TendermintService interface {
GetGenesis() *genesis.Document

// GetHeight returns the Tendermint block height.
GetHeight() (int64, error)
GetHeight(ctx context.Context) (int64, error)

// GetBlock returns the Tendermint block at the specified height.
GetBlock(height *int64) (*tmtypes.Block, error)
GetTendermintBlock(ctx context.Context, height int64) (*tmtypes.Block, error)

// GetBlockResults returns the ABCI results from processing a block
// at a specific height.
GetBlockResults(height *int64) (*tmrpctypes.ResultBlockResults, error)

// WatchBlocks returns a stream of Tendermint blocks as they are
// WatchTendermintBlocks returns a stream of Tendermint blocks as they are
// returned via the `EventDataNewBlock` query.
WatchBlocks() (<-chan *tmtypes.Block, *pubsub.Subscription)
WatchTendermintBlocks() (<-chan *tmtypes.Block, *pubsub.Subscription)

// Subscribe subscribes to tendermint events.
Subscribe(subscriber string, query tmpubsub.Query) (tmtypes.Subscription, error)
Expand Down
67 changes: 46 additions & 21 deletions go/consensus/tendermint/tendermint.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,25 +307,15 @@ func (t *tendermintService) GetAddresses() ([]node.ConsensusAddress, error) {
}

func (t *tendermintService) StateToGenesis(ctx context.Context, blockHeight int64) (*genesisAPI.Document, error) {
if blockHeight <= 0 {
var err error
if blockHeight, err = t.GetHeight(); err != nil {
t.Logger.Error("failed querying height",
"err", err,
"height", blockHeight,
)
return nil, err
}
}

blk, err := t.GetBlock(&blockHeight)
blk, err := t.GetTendermintBlock(ctx, blockHeight)
if err != nil {
t.Logger.Error("failed to get tendermint block",
"err", err,
"block_height", blockHeight,
)
return nil, err
}
blockHeight = blk.Header.Height

// Get initial genesis doc.
genesisFileProvider, err := file.DefaultFileProvider()
Expand Down Expand Up @@ -648,6 +638,31 @@ func (t *tendermintService) WaitEpoch(ctx context.Context, epoch epochtimeAPI.Ep
}
}

func (t *tendermintService) GetBlock(ctx context.Context, height int64) (*consensusAPI.Block, error) {
blk, err := t.GetTendermintBlock(ctx, height)
if err != nil {
return nil, err
}

return &consensusAPI.Block{
Height: blk.Header.Height,
Meta: blk.Header,
}, nil
}

func (t *tendermintService) GetTransactions(ctx context.Context, height int64) ([][]byte, error) {
blk, err := t.GetTendermintBlock(ctx, height)
if err != nil {
return nil, err
}

txs := make([][]byte, 0, len(blk.Data.Txs))
for _, v := range blk.Data.Txs {
txs = append(txs, v[:])
}
return txs, nil
}

func (t *tendermintService) initialize() error {
t.Lock()
defer t.Unlock()
Expand Down Expand Up @@ -721,25 +736,35 @@ func (t *tendermintService) initialize() error {
return nil
}

func (t *tendermintService) GetBlock(height *int64) (*tmtypes.Block, error) {
if t.client == nil {
panic("client not available yet")
func (t *tendermintService) GetTendermintBlock(ctx context.Context, height int64) (*tmtypes.Block, error) {
// Make sure that the Tendermint service has started so that we
// have the client interface available.
select {
case <-t.startedCh:
case <-t.ctx.Done():
return nil, t.ctx.Err()
case <-ctx.Done():
return nil, ctx.Err()
}

result, err := t.client.Block(height)
var tmHeight *int64
if height == consensusAPI.HeightLatest {
tmHeight = nil
} else {
tmHeight = &height
}
result, err := t.client.Block(tmHeight)
if err != nil {
return nil, fmt.Errorf("tendermint: block query failed: %w", err)
}

return result.Block, nil
}

func (t *tendermintService) GetHeight() (int64, error) {
blk, err := t.GetBlock(nil)
func (t *tendermintService) GetHeight(ctx context.Context) (int64, error) {
blk, err := t.GetTendermintBlock(ctx, consensusAPI.HeightLatest)
if err != nil {
return 0, err
}

return blk.Header.Height, nil
}

Expand All @@ -756,7 +781,7 @@ func (t *tendermintService) GetBlockResults(height *int64) (*tmrpctypes.ResultBl
return result, nil
}

func (t *tendermintService) WatchBlocks() (<-chan *tmtypes.Block, *pubsub.Subscription) {
func (t *tendermintService) WatchTendermintBlocks() (<-chan *tmtypes.Block, *pubsub.Subscription) {
typedCh := make(chan *tmtypes.Block)
sub := t.blockNotifier.Subscribe()
sub.Unwrap(typedCh)
Expand Down
2 changes: 1 addition & 1 deletion go/oasis-node/cmd/debug/byzantine/tendermint.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (ht *honestTendermint) start(id *identity.Identity, dataDir string) error {

// Wait for height=1 to pass, during which mux apps perform deferred initialization.
blockOne := make(chan struct{})
blocksCh, blocksSub := ht.service.WatchBlocks()
blocksCh, blocksSub := ht.service.WatchTendermintBlocks()
go func() {
defer blocksSub.Close()
for {
Expand Down

0 comments on commit e79af06

Please sign in to comment.