Skip to content

Commit

Permalink
Merge pull request hyperledger#142 from hyperledger/ws-blocks
Browse files Browse the repository at this point in the history
Faster block detection using WebSocket notifications
  • Loading branch information
peterbroadhurst authored Jun 14, 2024
2 parents c9b6e62 + 172cc40 commit 3f1f078
Show file tree
Hide file tree
Showing 10 changed files with 320 additions and 105 deletions.
13 changes: 13 additions & 0 deletions config.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,19 @@
|keyFile|The path to the private key file for TLS on this API|`string`|`<nil>`
|requiredDNAttributes|A set of required subject DN attributes. Each entry is a regular expression, and the subject certificate must have a matching attribute of the specified type (CN, C, O, OU, ST, L, STREET, POSTALCODE, SERIALNUMBER are valid attributes)|`map[string]string`|`<nil>`

## connector.ws

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|connectionTimeout|The amount of time to wait while establishing a connection (or auto-reconnection)|[`time.Duration`](https://pkg.go.dev/time#Duration)|`45s`
|enabled|When true a WebSocket is established for block listening, in addition to the HTTP RPC connections used for other functions|`boolean`|`false`
|heartbeatInterval|The amount of time to wait between heartbeat signals on the WebSocket connection|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`
|initialConnectAttempts|The number of attempts FireFly will make to connect to the WebSocket when starting up, before failing|`int`|`5`
|path|The WebSocket sever URL to which FireFly should connect|WebSocket URL `string`|`<nil>`
|readBufferSize|The size in bytes of the read buffer for the WebSocket connection|[`BytesSize`](https://pkg.go.dev/github.com/docker/go-units#BytesSize)|`16Kb`
|url|URL to use for WebSocket - overrides url one level up (in the HTTP config)|`string`|`<nil>`
|writeBufferSize|The size in bytes of the write buffer for the WebSocket connection|[`BytesSize`](https://pkg.go.dev/github.com/docker/go-units#BytesSize)|`16Kb`

## cors

|Key|Description|Type|Default Value|
Expand Down
85 changes: 75 additions & 10 deletions internal/ethereum/blocklistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,15 @@ import (
"sync"
"time"

lru "github.com/hashicorp/golang-lru"
"github.com/hyperledger/firefly-common/pkg/config"
"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly-common/pkg/i18n"
"github.com/hyperledger/firefly-common/pkg/log"
"github.com/hyperledger/firefly-common/pkg/wsclient"
"github.com/hyperledger/firefly-evmconnect/internal/msgs"
"github.com/hyperledger/firefly-signer/pkg/ethtypes"
"github.com/hyperledger/firefly-signer/pkg/rpcbackend"
"github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi"
)

Expand All @@ -41,15 +46,20 @@ type blockUpdateConsumer struct {
type blockListener struct {
ctx context.Context
c *ethConnector
backend rpcbackend.RPC
wsBackend rpcbackend.WebSocketRPCClient // if configured the getting the blockheight will not complete until WS connects, overrides backend once connected
listenLoopDone chan struct{}
initialBlockHeightObtained chan struct{}
newHeadsTap chan struct{}
newHeadsSub rpcbackend.Subscription
highestBlock int64
mux sync.Mutex
consumers map[fftypes.UUID]*blockUpdateConsumer
blockPollingInterval time.Duration
unstableHeadLength int
canonicalChain *list.List
hederaCompatibilityMode bool
blockCache *lru.Cache
}

type minimalBlockInfo struct {
Expand All @@ -58,26 +68,76 @@ type minimalBlockInfo struct {
parentHash string
}

func newBlockListener(ctx context.Context, c *ethConnector, conf config.Section) *blockListener {
bl := &blockListener{
func newBlockListener(ctx context.Context, c *ethConnector, conf config.Section, wsConf *wsclient.WSConfig) (bl *blockListener, err error) {
bl = &blockListener{
ctx: log.WithLogField(ctx, "role", "blocklistener"),
c: c,
backend: c.backend, // use the HTTP backend - might get overwritten by a connected websocket later
initialBlockHeightObtained: make(chan struct{}),
newHeadsTap: make(chan struct{}),
highestBlock: -1,
consumers: make(map[fftypes.UUID]*blockUpdateConsumer),
blockPollingInterval: conf.GetDuration(BlockPollingInterval),
canonicalChain: list.New(),
unstableHeadLength: int(c.checkpointBlockGap),
hederaCompatibilityMode: conf.GetBool(HederaCompatibilityMode),
}
return bl
if wsConf != nil {
bl.wsBackend = rpcbackend.NewWSRPCClient(wsConf)
}
bl.blockCache, err = lru.New(conf.GetInt(BlockCacheSize))
if err != nil {
return nil, i18n.WrapError(ctx, err, msgs.MsgCacheInitFail, "block")
}
return bl, nil
}

func (bl *blockListener) newHeadsSubListener() {
for range bl.newHeadsSub.Notifications() {
select {
case bl.newHeadsTap <- struct{}{}:
// Do nothing apart from tap the listener to wake up early
// when there's a notification to the change of the head.
default:
}
}
}

// getBlockHeightWithRetry keeps retrying attempting to get the initial block height until successful
func (bl *blockListener) establishBlockHeightWithRetry() error {
wsConnected := false
return bl.c.retry.Do(bl.ctx, "get initial block height", func(attempt int) (retry bool, err error) {

// If we have a WebSocket backend, then we connect it and switch over to using it
// (we accept an un-locked update here to backend, as the most important routine that's
// querying block state is the one we're called on)
if bl.wsBackend != nil {
if !wsConnected {
if err := bl.wsBackend.Connect(bl.ctx); err != nil {
log.L(bl.ctx).Warnf("WebSocket connection failed, blocking startup of block listener: %s", err)
return true, err
}
// if we retry subscribe, we don't want to retry connect
wsConnected = true
}
if bl.newHeadsSub == nil {
// Once subscribed the backend will keep us subscribed over reconnect
sub, rpcErr := bl.wsBackend.Subscribe(bl.ctx, "newHeads")
if rpcErr != nil {
return true, rpcErr.Error()
}
bl.newHeadsSub = sub
go bl.newHeadsSubListener()
}
// Ok all JSON/RPC from this point on uses our WS Backend, thus ensuring we're
// sticky to the same node that the WS is connected to when we're doing queries
// and building our cache.
bl.backend = bl.wsBackend
}

// Now get the block heiht
var hexBlockHeight ethtypes.HexInteger
rpcErr := bl.c.backend.CallRPC(bl.ctx, &hexBlockHeight, "eth_blockNumber")
rpcErr := bl.backend.CallRPC(bl.ctx, &hexBlockHeight, "eth_blockNumber")
if rpcErr != nil {
log.L(bl.ctx).Warnf("Block height could not be obtained: %s", rpcErr.Message)
return true, rpcErr.Error()
Expand Down Expand Up @@ -108,17 +168,18 @@ func (bl *blockListener) listenLoop() {
return
}
} else {
// Sleep for the polling interval
// Sleep for the polling interval, or until we're shoulder tapped by the newHeads listener
select {
case <-time.After(bl.blockPollingInterval):
case <-bl.newHeadsTap:
case <-bl.ctx.Done():
log.L(bl.ctx).Debugf("Block listener loop stopping")
return
}
}

if filter == "" {
err := bl.c.backend.CallRPC(bl.ctx, &filter, "eth_newBlockFilter")
err := bl.backend.CallRPC(bl.ctx, &filter, "eth_newBlockFilter")
if err != nil {
log.L(bl.ctx).Errorf("Failed to establish new block filter: %s", err.Message)
failCount++
Expand All @@ -127,7 +188,7 @@ func (bl *blockListener) listenLoop() {
}

var blockHashes []ethtypes.HexBytes0xPrefix
rpcErr := bl.c.backend.CallRPC(bl.ctx, &blockHashes, "eth_getFilterChanges", filter)
rpcErr := bl.backend.CallRPC(bl.ctx, &blockHashes, "eth_getFilterChanges", filter)
if rpcErr != nil {
if mapError(filterRPCMethods, rpcErr.Error()) == ffcapi.ErrorReasonNotFound {
log.L(bl.ctx).Warnf("Block filter '%v' no longer valid. Recreating filter: %s", filter, rpcErr.Message)
Expand Down Expand Up @@ -159,7 +220,7 @@ func (bl *blockListener) listenLoop() {
}

// Do a lookup of the block (which will then go into our cache).
bi, err := bl.c.getBlockInfoByHash(bl.ctx, h.String())
bi, err := bl.getBlockInfoByHash(bl.ctx, h.String())
switch {
case err != nil:
log.L(bl.ctx).Debugf("Failed to query block '%s': %s", h, err)
Expand Down Expand Up @@ -312,7 +373,7 @@ func (bl *blockListener) rebuildCanonicalChain() *list.Element {
var bi *blockInfoJSONRPC
var reason ffcapi.ErrorReason
err := bl.c.retry.Do(bl.ctx, "rebuild listener canonical chain", func(attempt int) (retry bool, err error) {
bi, reason, err = bl.c.getBlockInfoByNumber(bl.ctx, nextBlockNumber, false, "")
bi, reason, err = bl.getBlockInfoByNumber(bl.ctx, nextBlockNumber, false, "")
return reason != ffcapi.ErrorReasonNotFound, err
})
if err != nil {
Expand Down Expand Up @@ -366,7 +427,7 @@ func (bl *blockListener) trimToLastValidBlock() (lastValidBlock *minimalBlockInf
var freshBlockInfo *blockInfoJSONRPC
var reason ffcapi.ErrorReason
err := bl.c.retry.Do(bl.ctx, "rebuild listener canonical chain", func(attempt int) (retry bool, err error) {
freshBlockInfo, reason, err = bl.c.getBlockInfoByNumber(bl.ctx, currentViewBlock.number, false, "")
freshBlockInfo, reason, err = bl.getBlockInfoByNumber(bl.ctx, currentViewBlock.number, false, "")
return reason != ffcapi.ErrorReasonNotFound, err
})
if err != nil {
Expand Down Expand Up @@ -447,6 +508,10 @@ func (bl *blockListener) waitClosed() {
bl.mux.Lock()
listenLoopDone := bl.listenLoopDone
bl.mux.Unlock()
if bl.wsBackend != nil {
_ = bl.wsBackend.UnsubscribeAll(bl.ctx)
bl.wsBackend.Close()
}
if listenLoopDone != nil {
<-listenLoopDone
}
Expand Down
107 changes: 107 additions & 0 deletions internal/ethereum/blocklistener_blockquery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ethereum

import (
"context"
"strconv"

"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly-common/pkg/i18n"
"github.com/hyperledger/firefly-common/pkg/log"
"github.com/hyperledger/firefly-evmconnect/internal/msgs"
"github.com/hyperledger/firefly-signer/pkg/ethtypes"
"github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi"
)

// blockInfoJSONRPC are the info fields we parse from the JSON/RPC response, and cache
type blockInfoJSONRPC struct {
Number *ethtypes.HexInteger `json:"number"`
Hash ethtypes.HexBytes0xPrefix `json:"hash"`
ParentHash ethtypes.HexBytes0xPrefix `json:"parentHash"`
Timestamp *ethtypes.HexInteger `json:"timestamp"`
Transactions []ethtypes.HexBytes0xPrefix `json:"transactions"`
}

func transformBlockInfo(bi *blockInfoJSONRPC, t *ffcapi.BlockInfo) {
t.BlockNumber = (*fftypes.FFBigInt)(bi.Number)
t.BlockHash = bi.Hash.String()
t.ParentHash = bi.ParentHash.String()
stringHashes := make([]string, len(bi.Transactions))
for i, th := range bi.Transactions {
stringHashes[i] = th.String()
}
t.TransactionHashes = stringHashes
}

func (bl *blockListener) addToBlockCache(blockInfo *blockInfoJSONRPC) {
bl.blockCache.Add(blockInfo.Hash.String(), blockInfo)
bl.blockCache.Add(blockInfo.Number.BigInt().String(), blockInfo)
}

func (bl *blockListener) getBlockInfoByNumber(ctx context.Context, blockNumber int64, allowCache bool, expectedHashStr string) (*blockInfoJSONRPC, ffcapi.ErrorReason, error) {
var blockInfo *blockInfoJSONRPC
if allowCache {
cached, ok := bl.blockCache.Get(strconv.FormatInt(blockNumber, 10))
if ok {
blockInfo = cached.(*blockInfoJSONRPC)
if expectedHashStr != "" && blockInfo.ParentHash.String() != expectedHashStr {
log.L(ctx).Debugf("Block cache miss for block %d due to mismatched parent hash expected=%s found=%s", blockNumber, expectedHashStr, blockInfo.ParentHash)
blockInfo = nil
}
}
}

if blockInfo == nil {
rpcErr := bl.backend.CallRPC(ctx, &blockInfo, "eth_getBlockByNumber", ethtypes.NewHexInteger64(blockNumber), false /* only the txn hashes */)
if rpcErr != nil {
if mapError(blockRPCMethods, rpcErr.Error()) == ffcapi.ErrorReasonNotFound {
log.L(ctx).Debugf("Received error signifying 'block not found': '%s'", rpcErr.Message)
return nil, ffcapi.ErrorReasonNotFound, i18n.NewError(ctx, msgs.MsgBlockNotAvailable)
}
return nil, ffcapi.ErrorReason(""), rpcErr.Error()
}
if blockInfo == nil {
return nil, ffcapi.ErrorReason(""), nil
}
bl.addToBlockCache(blockInfo)
}

return blockInfo, "", nil
}

func (bl *blockListener) getBlockInfoByHash(ctx context.Context, hash0xString string) (*blockInfoJSONRPC, error) {
var blockInfo *blockInfoJSONRPC
cached, ok := bl.blockCache.Get(hash0xString)
if ok {
blockInfo = cached.(*blockInfoJSONRPC)
}

if blockInfo == nil {
rpcErr := bl.backend.CallRPC(ctx, &blockInfo, "eth_getBlockByHash", hash0xString, false /* only the txn hashes */)
if rpcErr != nil || blockInfo == nil {
var err error
if rpcErr != nil {
err = rpcErr.Error()
}
return nil, err
}
bl.addToBlockCache(blockInfo)
}

return blockInfo, nil
}
Loading

0 comments on commit 3f1f078

Please sign in to comment.