Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New sequence sender component #2051

Merged
merged 13 commits into from
May 4, 2023
14 changes: 8 additions & 6 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,20 @@ import (
const appName = "zkevm-node"

const (
// AGGREGATOR is the aggregator component identifier.
// AGGREGATOR is the aggregator component identifier
AGGREGATOR = "aggregator"
// SEQUENCER is the sequencer component identifier.
// SEQUENCER is the sequencer component identifier
SEQUENCER = "sequencer"
// RPC is the RPC component identifier.
// RPC is the RPC component identifier
RPC = "rpc"
// SYNCHRONIZER is the synchronizer component identifier.
// SYNCHRONIZER is the synchronizer component identifier
SYNCHRONIZER = "synchronizer"
// ETHTXMANAGER is the service that manages the tx sent to L1
ETHTXMANAGER = "eth-tx-manager"
// L2GASPRICER is the l2 gas pricer component identifier.
// L2GASPRICER is the l2 gas pricer component identifier
L2GASPRICER = "l2gaspricer"
// SEQUENCE_SENDER is the sequence sender component identifier
SEQUENCE_SENDER = "sequence-sender"
)

var (
Expand Down Expand Up @@ -58,7 +60,7 @@ var (
Aliases: []string{"co"},
Usage: "List of components to run",
Required: false,
Value: cli.NewStringSlice(AGGREGATOR, SEQUENCER, RPC, SYNCHRONIZER, ETHTXMANAGER, L2GASPRICER),
Value: cli.NewStringSlice(AGGREGATOR, SEQUENCER, RPC, SYNCHRONIZER, ETHTXMANAGER, L2GASPRICER, SEQUENCE_SENDER),
}
httpAPIFlag = cli.StringSliceFlag{
Name: config.FlagHTTPAPI,
Expand Down
53 changes: 46 additions & 7 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/0xPolygonHermez/zkevm-node/pool"
"github.com/0xPolygonHermez/zkevm-node/pool/pgpoolstorage"
"github.com/0xPolygonHermez/zkevm-node/sequencer"
"github.com/0xPolygonHermez/zkevm-node/sequencesender"
"github.com/0xPolygonHermez/zkevm-node/state"
"github.com/0xPolygonHermez/zkevm-node/state/runtime/executor"
executorpb "github.com/0xPolygonHermez/zkevm-node/state/runtime/executor/pb"
Expand Down Expand Up @@ -146,6 +147,8 @@ func start(cliCtx *cli.Context) error {
EventID: event.EventID_NodeComponentStarted,
}

var poolInstance *pool.Pool

Comment on lines +150 to +151
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change would force all the components running at the same binary to concur the db access. Multiple components will run different go routines but will consume a single pool Instance.

it's ok to initialize multiple pool instances for different components and the time it takes do instantiate shouldn't be a problem, I don't see a reason for this optimization and I'm afraid of the side effects it can cause when talking about concurrent and parallel executions to the pool.

What is the rational behind this change? Have you faced an issue during the development?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just thought it was better to just have one pool per binary. I can undo this is you don't feel confortable, or we are not sure if it may cause problems.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Single pool, maybe better performance, maybe side cases.
Multiple pools, work as always, and already tests multiple times.

It's up to you, my suggestion is that we shouldn't touch on a team that's winning unless you want more emotion in your life 🤣

for _, component := range components {
switch component {
case AGGREGATOR:
Expand All @@ -163,17 +166,33 @@ func start(cliCtx *cli.Context) error {
if err != nil {
log.Fatal(err)
}
poolInstance := createPool(c.Pool, c.NetworkConfig.L2BridgeAddr, l2ChainID, st, eventLog)
if poolInstance == nil {
poolInstance = createPool(c.Pool, c.NetworkConfig.L2BridgeAddr, l2ChainID, st, eventLog)
}
seq := createSequencer(*c, poolInstance, ethTxManagerStorage, st, eventLog)
go seq.Start(ctx)
case SEQUENCE_SENDER:
ev.Component = event.Component_Sequence_Sender
ev.Description = "Running sequence sender"
err := eventLog.LogEvent(ctx, ev)
if err != nil {
log.Fatal(err)
}
if poolInstance == nil {
poolInstance = createPool(c.Pool, c.NetworkConfig.L2BridgeAddr, l2ChainID, st, eventLog)
}
seqSender := createSequenceSender(*c, poolInstance, ethTxManagerStorage, st, eventLog)
go seqSender.Start(ctx)
case RPC:
ev.Component = event.Component_RPC
ev.Description = "Running JSON-RPC server"
err := eventLog.LogEvent(ctx, ev)
if err != nil {
log.Fatal(err)
}
poolInstance := createPool(c.Pool, c.NetworkConfig.L2BridgeAddr, l2ChainID, st, eventLog)
if poolInstance == nil {
poolInstance = createPool(c.Pool, c.NetworkConfig.L2BridgeAddr, l2ChainID, st, eventLog)
}
if c.RPC.EnableL2SuggestedGasPricePolling {
// Needed for rejecting transactions with too low gas price
poolInstance.StartPollingMinSuggestedGasPrice(ctx)
Expand All @@ -190,7 +209,9 @@ func start(cliCtx *cli.Context) error {
if err != nil {
log.Fatal(err)
}
poolInstance := createPool(c.Pool, c.NetworkConfig.L2BridgeAddr, l2ChainID, st, eventLog)
if poolInstance == nil {
poolInstance = createPool(c.Pool, c.NetworkConfig.L2BridgeAddr, l2ChainID, st, eventLog)
}
go runSynchronizer(*c, etherman, etm, st, poolInstance)
case ETHTXMANAGER:
ev.Component = event.Component_EthTxManager
Expand All @@ -208,7 +229,9 @@ func start(cliCtx *cli.Context) error {
if err != nil {
log.Fatal(err)
}
poolInstance := createPool(c.Pool, c.NetworkConfig.L2BridgeAddr, l2ChainID, st, eventLog)
if poolInstance == nil {
poolInstance = createPool(c.Pool, c.NetworkConfig.L2BridgeAddr, l2ChainID, st, eventLog)
}
go runL2GasPriceSuggester(c.L2GasPriceSuggester, st, poolInstance, etherman)
}
}
Expand Down Expand Up @@ -304,7 +327,22 @@ func createSequencer(cfg config.Config, pool *pool.Pool, etmStorage *ethtxmanage
log.Fatal(err)
}

for _, privateKey := range cfg.Sequencer.Finalizer.PrivateKeys {
ethTxManager := ethtxmanager.New(cfg.EthTxManager, etherman, etmStorage, st)

seq, err := sequencer.New(cfg.Sequencer, pool, st, etherman, ethTxManager, eventLog)
if err != nil {
log.Fatal(err)
}
return seq
}

func createSequenceSender(cfg config.Config, pool *pool.Pool, etmStorage *ethtxmanager.PostgresStorage, st *state.State, eventLog *event.EventLog) *sequencesender.SequenceSender {
etherman, err := newEtherman(cfg)
if err != nil {
log.Fatal(err)
}

for _, privateKey := range cfg.SequenceSender.PrivateKeys {
_, err := etherman.LoadAuthFromKeyStore(privateKey.Path, privateKey.Password)
if err != nil {
log.Fatal(err)
Expand All @@ -313,11 +351,12 @@ func createSequencer(cfg config.Config, pool *pool.Pool, etmStorage *ethtxmanage

ethTxManager := ethtxmanager.New(cfg.EthTxManager, etherman, etmStorage, st)

seq, err := sequencer.New(cfg.Sequencer, pool, st, etherman, ethTxManager, eventLog)
seqSender, err := sequencesender.New(cfg.SequenceSender, st, etherman, ethTxManager, eventLog)
if err != nil {
log.Fatal(err)
}
return seq

return seqSender
}

func runAggregator(ctx context.Context, c aggregator.Config, etherman *etherman.Client, ethTxManager *ethtxmanager.Client, st *state.State) {
Expand Down
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/0xPolygonHermez/zkevm-node/pool"
"github.com/0xPolygonHermez/zkevm-node/pricegetter"
"github.com/0xPolygonHermez/zkevm-node/sequencer"
"github.com/0xPolygonHermez/zkevm-node/sequencesender"
"github.com/0xPolygonHermez/zkevm-node/state/runtime/executor"
"github.com/0xPolygonHermez/zkevm-node/synchronizer"
"github.com/mitchellh/mapstructure"
Expand Down Expand Up @@ -62,6 +63,7 @@ type Config struct {
RPC jsonrpc.Config
Synchronizer synchronizer.Config
Sequencer sequencer.Config
SequenceSender sequencesender.Config
PriceGetter pricegetter.Config
Aggregator aggregator.Config
NetworkConfig NetworkConfig
Expand Down
20 changes: 12 additions & 8 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ func Test_Defaults(t *testing.T) {
path: "Sequencer.WaitPeriodPoolIsEmpty",
expectedValue: types.NewDuration(1 * time.Second),
},
{
path: "Sequencer.LastBatchVirtualizationTimeMaxWaitPeriod",
expectedValue: types.NewDuration(5 * time.Second),
},
{
path: "Sequencer.MaxTxsPerBatch",
expectedValue: uint64(150),
Expand Down Expand Up @@ -113,10 +109,6 @@ func Test_Defaults(t *testing.T) {
path: "Sequencer.MaxTxLifetime",
expectedValue: types.NewDuration(3 * time.Hour),
},
{
path: "Sequencer.MaxTxSizeForL1",
expectedValue: uint64(131072),
},
{
path: "Sequencer.WeightBatchBytesSize",
expectedValue: 1,
Expand Down Expand Up @@ -205,6 +197,18 @@ func Test_Defaults(t *testing.T) {
path: "Sequencer.Worker.ResourceCostMultiplier",
expectedValue: float64(1000),
},
{
path: "SequenceSender.WaitPeriodSendSequence",
expectedValue: types.NewDuration(5 * time.Second),
},
{
path: "SequenceSender.LastBatchVirtualizationTimeMaxWaitPeriod",
expectedValue: types.NewDuration(5 * time.Second),
},
{
path: "SequenceSender.MaxTxSizeForL1",
expectedValue: uint64(131072),
},
{
path: "Etherman.URL",
expectedValue: "http://localhost:8545",
Expand Down
14 changes: 9 additions & 5 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ TrustedSequencerURL = ""

[Sequencer]
WaitPeriodPoolIsEmpty = "1s"
WaitPeriodSendSequence = "5s"
LastBatchVirtualizationTimeMaxWaitPeriod = "5s"

BlocksAmountForTxsToBeDeleted = 100
FrequencyToCheckTxsForDelete = "12h"
MaxTxsPerBatch = 150
Expand All @@ -90,7 +89,6 @@ WeightBinaries = 1
WeightSteps = 1
TxLifetimeCheckTimeout = "10m"
MaxTxLifetime = "3h"
MaxTxSizeForL1 = 131072
[Sequencer.Finalizer]
GERDeadlineTimeoutInSec = "5s"
ForcedBatchDeadlineTimeoutInSec = "60s"
Expand All @@ -101,15 +99,21 @@ MaxTxSizeForL1 = 131072
ClosingSignalsManagerWaitForCheckingGER = "10s"
ClosingSignalsManagerWaitForCheckingForcedBatches = "10s"
ForcedBatchesFinalityNumberOfBlocks = 64
TimestampResolution = "15s"
SenderAddress = "0xf39fd6e51aad88f6f4ce6ab8827279cfffb92266"
TimestampResolution = "15s"
PrivateKeys = [{Path = "/pk/sequencer.keystore", Password = "testonly"}]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this line be removed?
Even more, shouldn't we remove from the default config, values like addresses? We can keep them in the environment files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but maybe this is outside the scope of this PR. We should open a new issue in order to handle that.

[Sequencer.DBManager]
PoolRetrievalInterval = "500ms"
L2ReorgRetrievalInterval = "5s"
[Sequencer.Worker]
ResourceCostMultiplier = 1000

[SequenceSender]
WaitPeriodSendSequence = "5s"
LastBatchVirtualizationTimeMaxWaitPeriod = "5s"
MaxTxSizeForL1 = 131072
SenderAddress = "0xf39fd6e51aad88f6f4ce6ab8827279cfffb92266"
PrivateKeys = [{Path = "/pk/sequencer.keystore", Password = "testonly"}]

[PriceGetter]
Type = "default"
DefaultPrice = "2000"
Expand Down
11 changes: 7 additions & 4 deletions config/environments/local/local.node.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ TrustedSequencerURL = ""

[Sequencer]
WaitPeriodPoolIsEmpty = "1s"
WaitPeriodSendSequence = "5s"
LastBatchVirtualizationTimeMaxWaitPeriod = "5s"
BlocksAmountForTxsToBeDeleted = 100
FrequencyToCheckTxsForDelete = "12h"
MaxTxsPerBatch = 150
Expand All @@ -81,7 +79,6 @@ WeightBinaries = 1
WeightSteps = 1
TxLifetimeCheckTimeout = "10m"
MaxTxLifetime = "3h"
MaxTxSizeForL1 = 131072
[Sequencer.Finalizer]
GERDeadlineTimeoutInSec = "5s"
ForcedBatchDeadlineTimeoutInSec = "60s"
Expand All @@ -94,13 +91,19 @@ MaxTxSizeForL1 = 131072
ForcedBatchesFinalityNumberOfBlocks = 64
TimestampResolution = "15s"
SenderAddress = "0xf39fd6e51aad88f6f4ce6ab8827279cfffb92266"
PrivateKeys = [{Path = "/pk/sequencer.keystore", Password = "testonly"}]
[Sequencer.DBManager]
PoolRetrievalInterval = "500ms"
L2ReorgRetrievalInterval = "5s"
[Sequencer.Worker]
ResourceCostMultiplier = 1000

[SequenceSender]
WaitPeriodSendSequence = "5s"
LastBatchVirtualizationTimeMaxWaitPeriod = "5s"
MaxTxSizeForL1 = 131072
SenderAddress = "0xf39fd6e51aad88f6f4ce6ab8827279cfffb92266"
PrivateKeys = [{Path = "/pk/sequencer.keystore", Password = "testonly"}]

[Aggregator]
Host = "0.0.0.0"
Port = 50081
Expand Down
2 changes: 2 additions & 0 deletions event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ const (
Component_Executor Component = "executor"
// Component_Broadcast is the component that triggered the event
Component_Broadcast Component = "broadcast"
// Component_Sequence_Sender is the component that triggered the event
Component_Sequence_Sender = "seqsender"

// Level_Emergency is the most severe level
Level_Emergency Level = "emerg"
Expand Down
21 changes: 0 additions & 21 deletions sequencer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,10 @@ import (

// Config represents the configuration of a sequencer
type Config struct {
// WaitPeriodSendSequence is the time the sequencer waits until
// trying to send a sequence to L1
WaitPeriodSendSequence types.Duration `mapstructure:"WaitPeriodSendSequence"`

// WaitPeriodPoolIsEmpty is the time the sequencer waits until
// trying to add new txs to the state
WaitPeriodPoolIsEmpty types.Duration `mapstructure:"WaitPeriodPoolIsEmpty"`

// LastBatchVirtualizationTimeMaxWaitPeriod is time since sequences should be sent
LastBatchVirtualizationTimeMaxWaitPeriod types.Duration `mapstructure:"LastBatchVirtualizationTimeMaxWaitPeriod"`

// BlocksAmountForTxsToBeDeleted is blocks amount after which txs will be deleted from the pool
BlocksAmountForTxsToBeDeleted uint64 `mapstructure:"BlocksAmountForTxsToBeDeleted"`

Expand Down Expand Up @@ -87,12 +80,6 @@ type Config struct {
// MaxTxLifetime is the time a tx can be in the sequencer memory
MaxTxLifetime types.Duration `mapstructure:"MaxTxLifetime"`

// MaxTxSizeForL1 is the maximum size a single transaction can have. This field has
// non-trivial consequences: larger transactions than 128KB are significantly harder and
// more expensive to propagate; larger transactions also take more resources
// to validate whether they fit into the pool or not.
MaxTxSizeForL1 uint64 `mapstructure:"MaxTxSizeForL1"`

// Finalizer's specific config properties
Finalizer FinalizerCfg `mapstructure:"Finalizer"`

Expand Down Expand Up @@ -134,14 +121,6 @@ type FinalizerCfg struct {

// TimestampResolution is the resolution of the timestamp used to close a batch
TimestampResolution types.Duration `mapstructure:"TimestampResolution"`

// SenderAddress defines which private key the eth tx manager needs to use
// to sign the L1 txs
SenderAddress string `mapstructure:"SenderAddress"`

// PrivateKeys defines all the key store files that are going
// to be read in order to provide the private keys to sign the L1 txs
PrivateKeys []types.KeystoreFileConfig `mapstructure:"PrivateKeys"`
}

// WorkerCfg contains the Worker's configuration properties
Expand Down
15 changes: 3 additions & 12 deletions sequencer/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ import (
type Sequencer struct {
cfg Config

pool txPool
state stateInterface
eventLog *event.EventLog
// dbManager dbManagerInterface
pool txPool
state stateInterface
eventLog *event.EventLog
ethTxManager ethTxManager
etherman etherman

Expand Down Expand Up @@ -163,15 +162,7 @@ func (s *Sequencer) Start(ctx context.Context) {

go s.trackOldTxs(ctx)
tickerProcessTxs := time.NewTicker(s.cfg.WaitPeriodPoolIsEmpty.Duration)
tickerSendSequence := time.NewTicker(s.cfg.WaitPeriodSendSequence.Duration)
defer tickerProcessTxs.Stop()
defer tickerSendSequence.Stop()

go func() {
for {
s.tryToSendSequence(ctx, tickerSendSequence)
}
}()

// Expire too old txs in the worker
go func() {
Expand Down
25 changes: 25 additions & 0 deletions sequencesender/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package sequencesender

import (
"github.com/0xPolygonHermez/zkevm-node/config/types"
)

// Config represents the configuration of a sequence sender
type Config struct {
// WaitPeriodSendSequence is the time the sequencer waits until
// trying to send a sequence to L1
WaitPeriodSendSequence types.Duration `mapstructure:"WaitPeriodSendSequence"`
// LastBatchVirtualizationTimeMaxWaitPeriod is time since sequences should be sent
LastBatchVirtualizationTimeMaxWaitPeriod types.Duration `mapstructure:"LastBatchVirtualizationTimeMaxWaitPeriod"`
// MaxTxSizeForL1 is the maximum size a single transaction can have. This field has
// non-trivial consequences: larger transactions than 128KB are significantly harder and
// more expensive to propagate; larger transactions also take more resources
// to validate whether they fit into the pool or not.
MaxTxSizeForL1 uint64 `mapstructure:"MaxTxSizeForL1"`
// SenderAddress defines which private key the eth tx manager needs to use
// to sign the L1 txs
SenderAddress string `mapstructure:"SenderAddress"`
// PrivateKeys defines all the key store files that are going
// to be read in order to provide the private keys to sign the L1 txs
PrivateKeys []types.KeystoreFileConfig `mapstructure:"PrivateKeys"`
}
Loading