Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Ready method to Node #1216

Merged
merged 10 commits into from
Feb 1, 2024
28 changes: 5 additions & 23 deletions gno.land/pkg/integration/testing_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,9 @@

"github.com/gnolang/gno/gno.land/pkg/gnoland"
"github.com/gnolang/gno/tm2/pkg/bft/node"
"github.com/gnolang/gno/tm2/pkg/bft/types"
"github.com/gnolang/gno/tm2/pkg/commands"
"github.com/gnolang/gno/tm2/pkg/crypto/keys"
"github.com/gnolang/gno/tm2/pkg/crypto/keys/client"
"github.com/gnolang/gno/tm2/pkg/events"
"github.com/gnolang/gno/tm2/pkg/log"
"github.com/rogpeppe/go-internal/testscript"
)
Expand Down Expand Up @@ -141,29 +139,13 @@
ts.Setenv("RPC_ADDR", laddr)
ts.Setenv("GNODATA", gnoDataDir)

const listenerID = "testing_listener"

// Wait for first block by waiting for `EventNewBlock` event.
nb := make(chan struct{}, 1)
node.EventSwitch().AddListener(listenerID, func(ev events.Event) {
if _, ok := ev.(types.EventNewBlock); ok {
select {
case nb <- struct{}{}:
default:
}
}
})

if node.BlockStore().Height() == 0 {
select {
case <-nb: // ok
case <-time.After(time.Second * 6):
ts.Fatalf("timeout while waiting for the node to start")
}
// Wait for first block.
select {
case <-time.After(time.Second * 6):
gfanton marked this conversation as resolved.
Show resolved Hide resolved
ts.Fatalf("timeout while waiting for the node to start")

Check warning on line 145 in gno.land/pkg/integration/testing_integration.go

View check run for this annotation

Codecov / codecov/patch

gno.land/pkg/integration/testing_integration.go#L144-L145

Added lines #L144 - L145 were not covered by tests
case <-node.FirstBlockReceived(): // ready
}

node.EventSwitch().RemoveListener(listenerID)

fmt.Fprintln(ts.Stdout(), "node started successfully")
}
case "stop":
Expand Down
23 changes: 23 additions & 0 deletions tm2/pkg/bft/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/http"
_ "net/http/pprof" //nolint:gosec
"strings"
"sync"
"time"

"github.com/rs/cors"
Expand Down Expand Up @@ -166,6 +167,7 @@ type Node struct {
rpcListeners []net.Listener // rpc servers
txIndexer txindex.TxIndexer
indexerService *txindex.IndexerService
firstBlockSignal <-chan struct{}
}

func initDBs(config *cfg.Config, dbProvider DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) {
Expand Down Expand Up @@ -437,6 +439,20 @@ func NewNode(config *cfg.Config,
// but before it indexed the txs, or, endblocker panicked)
evsw := events.NewEventSwitch()

// Signal readiness when receiving the first block.
const readinessListenerID = "first_block_listener"

cFirstBlock := make(chan struct{})
var once sync.Once
evsw.AddListener(readinessListenerID, func(ev events.Event) {
if _, ok := ev.(types.EventNewBlock); ok {
once.Do(func() {
close(cFirstBlock)
evsw.RemoveListener(readinessListenerID)
})
}
})

// Transaction indexing
indexerService, txIndexer, err := createAndStartIndexerService(config, dbProvider, evsw, logger)
if err != nil {
Expand Down Expand Up @@ -552,6 +568,8 @@ func NewNode(config *cfg.Config,
proxyApp: proxyApp,
txIndexer: txIndexer,
indexerService: indexerService,

firstBlockSignal: cFirstBlock,
}
node.BaseService = *service.NewBaseService(logger, "Node", node)

Expand Down Expand Up @@ -651,6 +669,11 @@ func (n *Node) OnStop() {
}
}

// FirstBlockReceived returns a channel that gets closed when the node receives its first block.
func (n *Node) FirstBlockReceived() <-chan struct{} {
return n.firstBlockSignal
}

gfanton marked this conversation as resolved.
Show resolved Hide resolved
// ConfigureRPC sets all variables in rpccore so they will serve
// rpc calls from this node
func (n *Node) ConfigureRPC() {
Expand Down
33 changes: 33 additions & 0 deletions tm2/pkg/bft/node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,39 @@ func TestNodeDelayedStart(t *testing.T) {
assert.Equal(t, true, startTime.After(n.GenesisDoc().GenesisTime))
}

func TestNodeFirstBlockSignal(t *testing.T) {
config := cfg.ResetTestRoot("node_node_test")
defer os.RemoveAll(config.RootDir)

// Create & start node
n, err := DefaultNewNode(config, log.TestingLogger())
require.NoError(t, err)

// Assert that blockstore has zero block before waiting for the first block
require.Equal(t, int64(0), n.BlockStore().Height())

// Assert that first block signal is not alreay received
select {
case <-n.FirstBlockReceived():
require.FailNow(t, "first block signal should not be close before starting the node")
default: // ok
}

err = n.Start()
require.NoError(t, err)
defer n.Stop()

// Wait until we got the first block or timeout
select {
case <-time.After(time.Second):
require.FailNow(t, "timeout while waiting for first block signal")
case <-n.FirstBlockReceived(): // ready
}

// Check that blockstore have at last one block
require.GreaterOrEqual(t, n.BlockStore().Height(), int64(1))
}

func TestNodeSetAppVersion(t *testing.T) {
config := cfg.ResetTestRoot("node_app_version_test")
defer os.RemoveAll(config.RootDir)
Expand Down
Loading