Skip to content

Commit

Permalink
dot/rpc: implement chain_subscribeNewHeads (#836)
Browse files Browse the repository at this point in the history
* fix formating of header digest logs

* update digest type values to conform to spec

* hardcode RPC results for testing

* implement RPC subscribe_newHeads WebSocket call

* implement subscription

* header number as hex string in json responses

* rpc getStorage

* implement basic rpc state_getStorage call

* added test for state_getStorage

* cleanup comments

* implement state_getPairs

* implement state_getStorageHash

* implement state_getStorageSize

* refactor tests

* cleanup comments

* added tests

* fix lint issues

* update empty request test

* lint issues

* map for holding subscription connections

* use channel for block listener subscription

* make json results

* lint issues

* update tests to init coreAPI service

* changed if statement to switch

* Use HeaderToJSON function

* add tests for WebSocket

* clean up

* update comments

* add channel to BlockState service to notify RPC serivce

* add done channel to rpc newBlock receiver

* add check for open websocket connection

* update chan done notifier to use struct instead of bool

clean-up comments, move map init.
  • Loading branch information
edwardmack authored and ryanchristo committed Jun 24, 2020
1 parent 230381b commit 27e2616
Show file tree
Hide file tree
Showing 11 changed files with 413 additions and 191 deletions.
46 changes: 34 additions & 12 deletions dot/rpc/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import (
"net/http"

"github.com/ChainSafe/gossamer/dot/rpc/modules"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/gorilla/mux"
"github.com/gorilla/rpc/v2"
"github.com/gorilla/websocket"

log "github.com/ChainSafe/log15"
)
Expand All @@ -35,17 +37,26 @@ type HTTPServer struct {

// HTTPServerConfig configures the HTTPServer
type HTTPServerConfig struct {
BlockAPI modules.BlockAPI
StorageAPI modules.StorageAPI
NetworkAPI modules.NetworkAPI
CoreAPI modules.CoreAPI
RuntimeAPI modules.RuntimeAPI
TransactionQueueAPI modules.TransactionQueueAPI
RPCAPI modules.RPCAPI
Host string
RPCPort uint32
WSPort uint32
Modules []string
BlockAPI modules.BlockAPI
StorageAPI modules.StorageAPI
NetworkAPI modules.NetworkAPI
CoreAPI modules.CoreAPI
RuntimeAPI modules.RuntimeAPI
TransactionQueueAPI modules.TransactionQueueAPI
RPCAPI modules.RPCAPI
Host string
RPCPort uint32
WSPort uint32
Modules []string
WSSubscriptions map[uint32]*WebSocketSubscription
BlockAddedReceiver chan *types.Block
BlockAddedReceiverDone chan struct{}
}

// WebSocketSubscription holds subscription details
type WebSocketSubscription struct {
WSConnection *websocket.Conn
SubscriptionType int
}

// NewHTTPServer creates a new http server and registers an associated rpc server
Expand All @@ -54,7 +65,9 @@ func NewHTTPServer(cfg *HTTPServerConfig) *HTTPServer {
rpcServer: rpc.NewServer(),
serverConfig: cfg,
}

if cfg.WSSubscriptions == nil {
cfg.WSSubscriptions = make(map[uint32]*WebSocketSubscription)
}
server.RegisterModules(cfg.Modules)
return server
}
Expand Down Expand Up @@ -119,10 +132,19 @@ func (h *HTTPServer) Start() error {
}
}()

// init and start block received listener routine
if h.serverConfig.BlockAPI != nil {
h.serverConfig.BlockAddedReceiver = make(chan *types.Block)
h.serverConfig.BlockAddedReceiverDone = make(chan struct{})
h.serverConfig.BlockAPI.SetBlockAddedChannel(h.serverConfig.BlockAddedReceiver, h.serverConfig.BlockAddedReceiverDone)
go h.blockReceivedListener()
}

return nil
}

// Stop stops the server
func (h *HTTPServer) Stop() error {
close(h.serverConfig.BlockAddedReceiverDone) // notify sender we're done receiving so it can close
return nil
}
5 changes: 4 additions & 1 deletion dot/rpc/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,19 @@ import (
"testing"
"time"

"github.com/ChainSafe/gossamer/dot/core"
"github.com/stretchr/testify/require"
)

func TestNewHTTPServer(t *testing.T) {

coreAPI := core.NewTestService(t, nil)
cfg := &HTTPServerConfig{
Modules: []string{"system"},
RPCPort: 8545,
RPCAPI: NewService(),
CoreAPI: coreAPI,
}

s := NewHTTPServer(cfg)
err := s.Start()
require.Nil(t, err)
Expand Down
1 change: 1 addition & 0 deletions dot/rpc/modules/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type BlockAPI interface {
HighestBlockHash() common.Hash
GetBlockByHash(hash common.Hash) (*types.Block, error)
GetBlockHash(blockNumber *big.Int) (*common.Hash, error)
SetBlockAddedChannel(chan<- *types.Block, <-chan struct{})
}

// NetworkAPI interface for network state methods
Expand Down
85 changes: 48 additions & 37 deletions dot/rpc/modules/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"reflect"
"regexp"

"github.com/ChainSafe/gossamer/dot/types"

"github.com/ChainSafe/gossamer/lib/common"
)

Expand All @@ -33,17 +35,6 @@ type ChainHashRequest string
// ChainBlockNumberRequest interface is it can accept string or float64 or []
type ChainBlockNumberRequest interface{}

// ChainBlockResponse struct
type ChainBlockResponse struct {
Block ChainBlock `json:"block"`
}

// ChainBlock struct to hold json instance of a block
type ChainBlock struct {
Header ChainBlockHeaderResponse `json:"header"`
Body []string `json:"extrinsics"`
}

// ChainBlockHeaderResponse struct
type ChainBlockHeaderResponse struct {
ParentHash string `json:"parentHash"`
Expand All @@ -58,6 +49,17 @@ type ChainBlockHeaderDigest struct {
Logs []string `json:"logs"`
}

// ChainBlock struct to hold json instance of a block
type ChainBlock struct {
Header ChainBlockHeaderResponse `json:"header"`
Body []string `json:"extrinsics"`
}

// ChainBlockResponse struct
type ChainBlockResponse struct {
Block ChainBlock `json:"block"`
}

// ChainHashResponse interface to handle response
type ChainHashResponse interface{}

Expand Down Expand Up @@ -86,17 +88,8 @@ func (cm *ChainModule) GetBlock(r *http.Request, req *ChainHashRequest, res *Cha
return err
}

res.Block.Header.ParentHash = block.Header.ParentHash.String()
if block.Header.Number.Int64() == 0 {
res.Block.Header.Number = "0x0"
} else {
res.Block.Header.Number = "0x" + hex.EncodeToString(block.Header.Number.Bytes())
}
res.Block.Header.StateRoot = block.Header.StateRoot.String()
res.Block.Header.ExtrinsicsRoot = block.Header.ExtrinsicsRoot.String()
for _, item := range block.Header.Digest {
res.Block.Header.Digest.Logs = append(res.Block.Header.Digest.Logs, "0x"+hex.EncodeToString(item))
}
res.Block.Header = HeaderToJSON(*block.Header)

if *block.Body != nil {
ext, err := block.Body.AsExtrinsics()
if err != nil {
Expand Down Expand Up @@ -150,26 +143,25 @@ func (cm *ChainModule) GetHeader(r *http.Request, req *ChainHashRequest, res *Ch
return err
}

res.ParentHash = header.ParentHash.String()
if header.Number.Int64() == 0 {
res.Number = "0x0"
} else {
res.Number = "0x" + hex.EncodeToString(header.Number.Bytes())
}
res.StateRoot = header.StateRoot.String()
res.ExtrinsicsRoot = header.ExtrinsicsRoot.String()
for _, item := range header.Digest {
res.Digest.Logs = append(res.Digest.Logs, "0x"+hex.EncodeToString(item))
}
*res = HeaderToJSON(*header)
return nil
}

// SubscribeFinalizedHeads isn't implemented properly yet.
func (cm *ChainModule) SubscribeFinalizedHeads(r *http.Request, req *EmptyRequest, res *ChainBlockHeaderResponse) {
// SubscribeFinalizedHeads handled by websocket handler, but this func should remain
// here so it's added to rpc_methods list
func (cm *ChainModule) SubscribeFinalizedHeads(r *http.Request, req *EmptyRequest, res *ChainBlockHeaderResponse) error {
return nil
}

// SubscribeNewHead isn't implemented properly yet.
func (cm *ChainModule) SubscribeNewHead(r *http.Request, req *EmptyRequest, res *ChainBlockHeaderResponse) {
// SubscribeNewHead handled by websocket handler, but this func should remain
// here so it's added to rpc_methods list
func (cm *ChainModule) SubscribeNewHead(r *http.Request, req *EmptyRequest, res *ChainBlockHeaderResponse) error {
return nil
}

// SubscribeNewHeads isn't implemented properly yet.
func (cm *ChainModule) SubscribeNewHeads(r *http.Request, req *EmptyRequest, res *ChainBlockHeaderResponse) error {
return nil
}

func (cm *ChainModule) hashLookup(req *ChainHashRequest) (common.Hash, error) {
Expand Down Expand Up @@ -234,3 +226,22 @@ func (cm *ChainModule) lookupHashByInterface(i interface{}) (string, error) {
}
return h.String(), nil
}

// HeaderToJSON converts types.Header to ChainBlockHeaderResponse
func HeaderToJSON(header types.Header) ChainBlockHeaderResponse {
res := ChainBlockHeaderResponse{
ParentHash: header.ParentHash.String(),
StateRoot: header.StateRoot.String(),
ExtrinsicsRoot: header.ExtrinsicsRoot.String(),
Digest: ChainBlockHeaderDigest{},
}
if header.Number.Int64() == 0 {
res.Number = "0x0"
} else {
res.Number = "0x" + hex.EncodeToString(header.Number.Bytes())
}
for _, item := range header.Digest {
res.Digest.Logs = append(res.Digest.Logs, "0x"+hex.EncodeToString(item))
}
return res
}
Loading

0 comments on commit 27e2616

Please sign in to comment.