From b8d83b8d03593734e0bd503b30603eed355d0296 Mon Sep 17 00:00:00 2001 From: gfanton <8671905+gfanton@users.noreply.github.com> Date: Tue, 10 Oct 2023 17:15:24 +0200 Subject: [PATCH 1/7] feat: add `FirstBlockReceived` method to Node mostly to check if the node is ready Signed-off-by: gfanton <8671905+gfanton@users.noreply.github.com> --- tm2/pkg/bft/node/node.go | 23 +++++++++++++++++++++++ tm2/pkg/bft/node/node_test.go | 28 ++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/tm2/pkg/bft/node/node.go b/tm2/pkg/bft/node/node.go index 23b42cec6b9..7f6c19118ba 100644 --- a/tm2/pkg/bft/node/node.go +++ b/tm2/pkg/bft/node/node.go @@ -9,6 +9,7 @@ import ( "net/http" _ "net/http/pprof" //nolint:gosec "strings" + "sync" "time" "github.com/rs/cors" @@ -166,6 +167,7 @@ type Node struct { rpcListeners []net.Listener // rpc servers txIndexer txindex.TxIndexer indexerService *txindex.IndexerService + firstBlockSignal <-chan struct{} } func initDBs(config *cfg.Config, dbProvider DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) { @@ -437,6 +439,20 @@ func NewNode(config *cfg.Config, // but before it indexed the txs, or, endblocker panicked) evsw := events.NewEventSwitch() + // Signal readiness when receiving the first block. + const readinessListenerID = "first_block_listener" + + cFirstBlock := make(chan struct{}) + var once sync.Once + evsw.AddListener(readinessListenerID, func(ev events.Event) { + if _, ok := ev.(types.EventNewBlock); ok { + once.Do(func() { + close(cFirstBlock) + evsw.RemoveListener(readinessListenerID) + }) + } + }) + // Transaction indexing indexerService, txIndexer, err := createAndStartIndexerService(config, dbProvider, evsw, logger) if err != nil { @@ -552,6 +568,8 @@ func NewNode(config *cfg.Config, proxyApp: proxyApp, txIndexer: txIndexer, indexerService: indexerService, + + firstBlockSignal: cFirstBlock, } node.BaseService = *service.NewBaseService(logger, "Node", node) @@ -651,6 +669,11 @@ func (n *Node) OnStop() { } } +// FirstBlockReceived returns a channel that gets closed when the node receives its first block. +func (n *Node) FirstBlockReceived() <-chan struct{} { + return n.firstBlockSignal +} + // ConfigureRPC sets all variables in rpccore so they will serve // rpc calls from this node func (n *Node) ConfigureRPC() { diff --git a/tm2/pkg/bft/node/node_test.go b/tm2/pkg/bft/node/node_test.go index 2352df49762..e9c77e69868 100644 --- a/tm2/pkg/bft/node/node_test.go +++ b/tm2/pkg/bft/node/node_test.go @@ -108,6 +108,34 @@ func TestNodeDelayedStart(t *testing.T) { assert.Equal(t, true, startTime.After(n.GenesisDoc().GenesisTime)) } +func TestNodeFirstBlockSignal(t *testing.T) { + config := cfg.ResetTestRoot("node_node_test") + defer os.RemoveAll(config.RootDir) + now := tmtime.Now() + + // create & start node + n, err := DefaultNewNode(config, log.TestingLogger()) + require.NoError(t, err) + + // add a small delay to genesis time + n.GenesisDoc().GenesisTime = now.Add(time.Second) + require.NoError(t, err) + + err = n.Start() + require.NoError(t, err) + + defer n.Stop() + + select { + case <-n.FirstBlockReceived(): + case <-time.After(time.Second * 6): + require.FailNow(t, "timeout while waiting for first block signal") + } + + // Check that blockstore have at last 1 height size. + require.GreaterOrEqual(t, n.BlockStore().Height(), int64(1)) +} + func TestNodeSetAppVersion(t *testing.T) { config := cfg.ResetTestRoot("node_app_version_test") defer os.RemoveAll(config.RootDir) From 010cda3c39153ba21c98a93bc0c8926b4bbb4ff3 Mon Sep 17 00:00:00 2001 From: gfanton <8671905+gfanton@users.noreply.github.com> Date: Tue, 10 Oct 2023 17:16:15 +0200 Subject: [PATCH 2/7] fix: use `FirstBlockReceived` signal in gnoland integration setup Signed-off-by: gfanton <8671905+gfanton@users.noreply.github.com> --- .../pkg/integration/testing_integration.go | 28 ++++--------------- 1 file changed, 5 insertions(+), 23 deletions(-) diff --git a/gno.land/pkg/integration/testing_integration.go b/gno.land/pkg/integration/testing_integration.go index f0a696ddd85..7bde8d871eb 100644 --- a/gno.land/pkg/integration/testing_integration.go +++ b/gno.land/pkg/integration/testing_integration.go @@ -14,11 +14,9 @@ import ( "github.com/gnolang/gno/gno.land/pkg/gnoland" "github.com/gnolang/gno/tm2/pkg/bft/node" - "github.com/gnolang/gno/tm2/pkg/bft/types" "github.com/gnolang/gno/tm2/pkg/commands" "github.com/gnolang/gno/tm2/pkg/crypto/keys" "github.com/gnolang/gno/tm2/pkg/crypto/keys/client" - "github.com/gnolang/gno/tm2/pkg/events" "github.com/gnolang/gno/tm2/pkg/log" "github.com/rogpeppe/go-internal/testscript" ) @@ -141,29 +139,13 @@ func SetupGnolandTestScript(t *testing.T, txtarDir string) testscript.Params { ts.Setenv("RPC_ADDR", laddr) ts.Setenv("GNODATA", gnoDataDir) - const listenerID = "testing_listener" - - // Wait for first block by waiting for `EventNewBlock` event. - nb := make(chan struct{}, 1) - node.EventSwitch().AddListener(listenerID, func(ev events.Event) { - if _, ok := ev.(types.EventNewBlock); ok { - select { - case nb <- struct{}{}: - default: - } - } - }) - - if node.BlockStore().Height() == 0 { - select { - case <-nb: // ok - case <-time.After(time.Second * 6): - ts.Fatalf("timeout while waiting for the node to start") - } + // Wait for first block. + select { + case <-time.After(time.Second * 6): + ts.Fatalf("timeout while waiting for the node to start") + case <-node.FirstBlockReceived(): // ready } - node.EventSwitch().RemoveListener(listenerID) - fmt.Fprintln(ts.Stdout(), "node started successfully") } case "stop": From 308edfa82f728434f4d551a11313a224dade47bd Mon Sep 17 00:00:00 2001 From: gfanton <8671905+gfanton@users.noreply.github.com> Date: Tue, 10 Oct 2023 20:28:06 +0200 Subject: [PATCH 3/7] fix: signal test Signed-off-by: gfanton <8671905+gfanton@users.noreply.github.com> --- tm2/pkg/bft/node/node_test.go | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/tm2/pkg/bft/node/node_test.go b/tm2/pkg/bft/node/node_test.go index e9c77e69868..69cb761b2e5 100644 --- a/tm2/pkg/bft/node/node_test.go +++ b/tm2/pkg/bft/node/node_test.go @@ -111,28 +111,32 @@ func TestNodeDelayedStart(t *testing.T) { func TestNodeFirstBlockSignal(t *testing.T) { config := cfg.ResetTestRoot("node_node_test") defer os.RemoveAll(config.RootDir) - now := tmtime.Now() - // create & start node + // Create & start node n, err := DefaultNewNode(config, log.TestingLogger()) require.NoError(t, err) - // add a small delay to genesis time - n.GenesisDoc().GenesisTime = now.Add(time.Second) - require.NoError(t, err) + // Assert that blockstore is 0 before waiting for the first block + require.Equal(t, int64(0), n.BlockStore().Height()) + + // Assert that first block signal is not alreay received + select { + case <-n.FirstBlockReceived(): + require.FailNow(t, "firstblock signal should not be close before starting the node") + default: // ok + } err = n.Start() require.NoError(t, err) - defer n.Stop() select { - case <-n.FirstBlockReceived(): - case <-time.After(time.Second * 6): + case <-n.FirstBlockReceived(): // ready + case <-time.After(time.Second): require.FailNow(t, "timeout while waiting for first block signal") } - // Check that blockstore have at last 1 height size. + // Check that blockstore have at last 1 height size require.GreaterOrEqual(t, n.BlockStore().Height(), int64(1)) } From dfb8b929ec9b059155fe4fbc380c9a56a4064af8 Mon Sep 17 00:00:00 2001 From: gfanton <8671905+gfanton@users.noreply.github.com> Date: Tue, 10 Oct 2023 23:07:36 +0200 Subject: [PATCH 4/7] chore: lint comments Signed-off-by: gfanton <8671905+gfanton@users.noreply.github.com> --- tm2/pkg/bft/node/node_test.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tm2/pkg/bft/node/node_test.go b/tm2/pkg/bft/node/node_test.go index 69cb761b2e5..551ebd63c20 100644 --- a/tm2/pkg/bft/node/node_test.go +++ b/tm2/pkg/bft/node/node_test.go @@ -116,13 +116,13 @@ func TestNodeFirstBlockSignal(t *testing.T) { n, err := DefaultNewNode(config, log.TestingLogger()) require.NoError(t, err) - // Assert that blockstore is 0 before waiting for the first block + // Assert that blockstore has zero block before waiting for the first block require.Equal(t, int64(0), n.BlockStore().Height()) // Assert that first block signal is not alreay received select { case <-n.FirstBlockReceived(): - require.FailNow(t, "firstblock signal should not be close before starting the node") + require.FailNow(t, "first block signal should not be close before starting the node") default: // ok } @@ -130,13 +130,14 @@ func TestNodeFirstBlockSignal(t *testing.T) { require.NoError(t, err) defer n.Stop() + // Wait until we got the first block or timeout select { - case <-n.FirstBlockReceived(): // ready case <-time.After(time.Second): require.FailNow(t, "timeout while waiting for first block signal") + case <-n.FirstBlockReceived(): // ready } - // Check that blockstore have at last 1 height size + // Check that blockstore have at last one block require.GreaterOrEqual(t, n.BlockStore().Height(), int64(1)) } From 53c5e03ab9770d6704c587a035eaf0e6f16d13fa Mon Sep 17 00:00:00 2001 From: gfanton <8671905+gfanton@users.noreply.github.com> Date: Wed, 31 Jan 2024 15:18:35 +0100 Subject: [PATCH 5/7] fix: increase node timeout Signed-off-by: gfanton <8671905+gfanton@users.noreply.github.com> --- gno.land/pkg/integration/testing_node.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gno.land/pkg/integration/testing_node.go b/gno.land/pkg/integration/testing_node.go index e8167639493..2fc7c0ea736 100644 --- a/gno.land/pkg/integration/testing_node.go +++ b/gno.land/pkg/integration/testing_node.go @@ -33,7 +33,7 @@ func TestingInMemoryNode(t TestingTS, logger *slog.Logger, config *gnoland.InMem select { case <-node.FirstBlockReceived(): - case <-time.After(time.Second * 6): + case <-time.After(time.Second * 10): require.FailNow(t, "timeout while waiting for the node to start") } From c494ec4d80914196b1a4c99ef9f68bc00d49550b Mon Sep 17 00:00:00 2001 From: gfanton <8671905+gfanton@users.noreply.github.com> Date: Wed, 31 Jan 2024 15:22:26 +0100 Subject: [PATCH 6/7] chore: fixup lint Signed-off-by: gfanton <8671905+gfanton@users.noreply.github.com> --- tm2/pkg/bft/node/node_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tm2/pkg/bft/node/node_test.go b/tm2/pkg/bft/node/node_test.go index c4d976c0dd9..ce06be6fbe1 100644 --- a/tm2/pkg/bft/node/node_test.go +++ b/tm2/pkg/bft/node/node_test.go @@ -113,7 +113,7 @@ func TestNodeFirstBlockSignal(t *testing.T) { defer os.RemoveAll(config.RootDir) // Create & start node - n, err := DefaultNewNode(config, log.TestingLogger()) + n, err := DefaultNewNode(config, log.NewTestingLogger(t)) require.NoError(t, err) // Assert that blockstore has zero block before waiting for the first block From 170506e6b058352edab9653e4fbb717aef4e810a Mon Sep 17 00:00:00 2001 From: gfanton <8671905+gfanton@users.noreply.github.com> Date: Wed, 31 Jan 2024 16:22:22 +0100 Subject: [PATCH 7/7] chore: rename `FirstBlockReceived` into `Ready` Signed-off-by: gfanton <8671905+gfanton@users.noreply.github.com> --- gno.land/pkg/integration/testing_node.go | 2 +- tm2/pkg/bft/node/node.go | 4 ++-- tm2/pkg/bft/node/node_test.go | 10 +++++----- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/gno.land/pkg/integration/testing_node.go b/gno.land/pkg/integration/testing_node.go index 2fc7c0ea736..cc0262b8105 100644 --- a/gno.land/pkg/integration/testing_node.go +++ b/gno.land/pkg/integration/testing_node.go @@ -32,7 +32,7 @@ func TestingInMemoryNode(t TestingTS, logger *slog.Logger, config *gnoland.InMem require.NoError(t, err) select { - case <-node.FirstBlockReceived(): + case <-node.Ready(): case <-time.After(time.Second * 10): require.FailNow(t, "timeout while waiting for the node to start") } diff --git a/tm2/pkg/bft/node/node.go b/tm2/pkg/bft/node/node.go index 7f2e3cc1460..57cc18b8ad7 100644 --- a/tm2/pkg/bft/node/node.go +++ b/tm2/pkg/bft/node/node.go @@ -670,8 +670,8 @@ func (n *Node) OnStop() { } } -// FirstBlockReceived returns a channel that gets closed when the node receives its first block. -func (n *Node) FirstBlockReceived() <-chan struct{} { +// Ready signals that the node is ready by returning a blocking channel. This channel is closed when the node receives its first block. +func (n *Node) Ready() <-chan struct{} { return n.firstBlockSignal } diff --git a/tm2/pkg/bft/node/node_test.go b/tm2/pkg/bft/node/node_test.go index ce06be6fbe1..3057a41b5f3 100644 --- a/tm2/pkg/bft/node/node_test.go +++ b/tm2/pkg/bft/node/node_test.go @@ -108,7 +108,7 @@ func TestNodeDelayedStart(t *testing.T) { assert.Equal(t, true, startTime.After(n.GenesisDoc().GenesisTime)) } -func TestNodeFirstBlockSignal(t *testing.T) { +func TestNodeReady(t *testing.T) { config := cfg.ResetTestRoot("node_node_test") defer os.RemoveAll(config.RootDir) @@ -119,9 +119,9 @@ func TestNodeFirstBlockSignal(t *testing.T) { // Assert that blockstore has zero block before waiting for the first block require.Equal(t, int64(0), n.BlockStore().Height()) - // Assert that first block signal is not alreay received + // Assert that first block signal is not alreay received by calling Ready select { - case <-n.FirstBlockReceived(): + case <-n.Ready(): require.FailNow(t, "first block signal should not be close before starting the node") default: // ok } @@ -130,11 +130,11 @@ func TestNodeFirstBlockSignal(t *testing.T) { require.NoError(t, err) defer n.Stop() - // Wait until we got the first block or timeout + // Wait until the node is ready or timeout select { case <-time.After(time.Second): require.FailNow(t, "timeout while waiting for first block signal") - case <-n.FirstBlockReceived(): // ready + case <-n.Ready(): // ready } // Check that blockstore have at last one block