From 5ed1f5ac807afaece8833a69152866593f34c11b Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Thu, 28 Mar 2024 12:19:23 +0100 Subject: [PATCH 1/8] fix --- Makefile | 18 +- cmd/main.go | 6 +- config/config.go | 1 + config/default.go | 1 + db/migrations/0002.sql | 3 + docker/data/agglayer/agglayer.toml | 1 + interop/executor.go | 2 +- interop/executor_test.go | 10 +- mocks/eth_tx_manager.generated.go | 117 +--- mocks/etherman.generated.go | 657 ++++++++++++++++++++- rpc/rpc_test.go | 12 +- txmanager/pgstorage.go | 278 +++++++++ txmanager/pgstorage_test.go | 338 +++++++++++ txmanager/txmanager.go | 595 +++++++++++++++++++ txmanager/txmanager_test.go | 918 +++++++++++++++++++++++++++++ txmanager/types/interfaces.go | 37 ++ txmanager/types/monitoredtx.go | 213 +++++++ txmanager/types/monitoretx_test.go | 38 ++ types/interfaces.go | 20 +- 19 files changed, 3135 insertions(+), 130 deletions(-) create mode 100644 db/migrations/0002.sql create mode 100644 txmanager/pgstorage.go create mode 100644 txmanager/pgstorage_test.go create mode 100644 txmanager/txmanager.go create mode 100644 txmanager/txmanager_test.go create mode 100644 txmanager/types/interfaces.go create mode 100644 txmanager/types/monitoredtx.go create mode 100644 txmanager/types/monitoretx_test.go diff --git a/Makefile b/Makefile index 85709dd2..22255565 100644 --- a/Makefile +++ b/Makefile @@ -62,6 +62,12 @@ LDFLAGS += -X 'github.com/0xPolygon/agglayer.GitRev=$(GITREV)' LDFLAGS += -X 'github.com/0xPolygon/agglayer.GitBranch=$(GITBRANCH)' LDFLAGS += -X 'github.com/0xPolygon/agglayer.BuildDate=$(DATE)' +DOCKERCOMPOSE := docker-compose -f docker/docker-compose.yaml +DOCKERCOMPOSESTATEDB := agglayer-db + +RUNSTATEDB := $(DOCKERCOMPOSE) up -d $(DOCKERCOMPOSESTATEDB) +STOP := $(DOCKERCOMPOSE) down --remove-orphans + .PHONY: build build: ## Builds the binary locally into ./dist $(GOENVVARS) go build -ldflags "all=$(LDFLAGS)" -o $(GOBIN)/$(GOBINARY) $(GOCMD) @@ -111,13 +117,19 @@ help: ## Prints the help | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}' .PHONY: e2e-tests -e2e-tests: ## Runs E2E tests +e2e-tests: stop ## Runs E2E tests go test -v -timeout=30m github.com/0xPolygon/agglayer/test .PHONY: unit-tests -unit-tests: ## Runs unit tests - go test -v -timeout=5m -race -shuffle=on -coverprofile coverage.out `go list ./... | grep -v test` +unit-tests: stop ## Runs unit tests + $(RUNSTATEDB) + sleep 40 + trap '$(STOP)' EXIT; MallocNanoZone=0 go test -v -timeout=5m -race -shuffle=on -coverprofile coverage.out `go list ./... | grep -v test` .PHONY: generate-mocks generate-mocks: ## Generates mocks and other autogenerated types mockery + +.PHONY: stop +stop: ## Stops all services + $(STOP) diff --git a/cmd/main.go b/cmd/main.go index 5f9b152c..fea18218 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -12,7 +12,6 @@ import ( jRPC "github.com/0xPolygon/cdk-rpc/rpc" dbConf "github.com/0xPolygonHermez/zkevm-node/db" - "github.com/0xPolygonHermez/zkevm-node/ethtxmanager" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -35,6 +34,7 @@ import ( "github.com/0xPolygon/agglayer/log" "github.com/0xPolygon/agglayer/network" "github.com/0xPolygon/agglayer/rpc" + "github.com/0xPolygon/agglayer/txmanager" ) const appName = "cdk-agglayer" @@ -131,11 +131,11 @@ func start(cliCtx *cli.Context) error { } // Prepare EthTxMan client - ethTxManagerStorage, err := ethtxmanager.NewPostgresStorage(c.DB) + ethTxManagerStorage, err := txmanager.NewPostgresStorage(c.DB) if err != nil { return err } - etm := ethtxmanager.New(c.EthTxManager.Config, ðMan, ethTxManagerStorage, ðMan) + etm := txmanager.New(c.EthTxManager, ðMan, ethTxManagerStorage, ðMan) // Create opentelemetry metric provider meterProvider, err := createMeterProvider() diff --git a/config/config.go b/config/config.go index 2df62a49..37ef8294 100644 --- a/config/config.go +++ b/config/config.go @@ -51,6 +51,7 @@ type EthTxManagerConfig struct { GasOffset uint64 `mapstructure:"GasOffset"` KMSKeyName string `mapstructure:"KMSKeyName"` KMSConnectionTimeout types.Duration `mapstructure:"KMSConnectionTimeout"` + MaxRetries uint64 `mapstructure:"MaxRetries"` } // Load loads the configuration baseed on the cli context diff --git a/config/default.go b/config/default.go index d2266f65..6c206175 100644 --- a/config/default.go +++ b/config/default.go @@ -45,6 +45,7 @@ const DefaultValues = ` KMSKeyName = "gcp/resource/id" KMSConnectionTimeout = "30s" GasOffset = 100000 + MaxRetries = 10 [L1] ChainID = 1337 diff --git a/db/migrations/0002.sql b/db/migrations/0002.sql new file mode 100644 index 00000000..3fbd2e99 --- /dev/null +++ b/db/migrations/0002.sql @@ -0,0 +1,3 @@ +-- +migrate Up +ALTER TABLE state.monitored_txs +ADD COLUMN num_retries DECIMAL(78, 0) NOT NULL DEFAULT 0; \ No newline at end of file diff --git a/docker/data/agglayer/agglayer.toml b/docker/data/agglayer/agglayer.toml index 60783823..99880fd1 100644 --- a/docker/data/agglayer/agglayer.toml +++ b/docker/data/agglayer/agglayer.toml @@ -34,6 +34,7 @@ GasOffset = 100000 KMSKeyName = "" KMSConnectionTimeout = "30s" + MaxRetries = 10 [L1] ChainID = 1337 diff --git a/interop/executor.go b/interop/executor.go index 9382049f..6f147bec 100644 --- a/interop/executor.go +++ b/interop/executor.go @@ -172,7 +172,7 @@ func (e *Executor) Execute(ctx context.Context, signedTx tx.SignedTx) error { if batch.StateRoot != signedTx.Tx.ZKP.NewStateRoot || batch.LocalExitRoot != signedTx.Tx.ZKP.NewLocalExitRoot { return fmt.Errorf( - "Mismatch detected, expected local exit root: %s actual: %s. expected state root: %s actual: %s", + "mismatch detected, expected local exit root: %s actual: %s. expected state root: %s actual: %s", signedTx.Tx.ZKP.NewLocalExitRoot.Hex(), batch.LocalExitRoot.Hex(), signedTx.Tx.ZKP.NewStateRoot.Hex(), diff --git a/interop/executor_test.go b/interop/executor_test.go index 528b0100..34fec0c7 100644 --- a/interop/executor_test.go +++ b/interop/executor_test.go @@ -8,8 +8,8 @@ import ( "testing" "github.com/0xPolygon/agglayer/log" + txmTypes "github.com/0xPolygon/agglayer/txmanager/types" jRPC "github.com/0xPolygon/cdk-rpc/rpc" - "github.com/0xPolygonHermez/zkevm-node/ethtxmanager" rpctypes "github.com/0xPolygonHermez/zkevm-node/jsonrpc/types" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" @@ -313,9 +313,9 @@ func TestExecutor_GetTxStatus(t *testing.T) { expectedError := jRPC.NewRPCError(rpctypes.DefaultErrorCode, "failed to get tx, error: sampleError") ethTxManager.On("Result", mock.Anything, ethTxManOwner, hash.Hex(), dbTx). - Return(ethtxmanager.MonitoredTxResult{ + Return(txmTypes.MonitoredTxResult{ ID: "0x1", - Status: ethtxmanager.MonitoredTxStatus("0x1"), + Status: txmTypes.MonitoredTxStatus("0x1"), }, nil).Once() result, err := executor.GetTxStatus(context.Background(), hash, dbTx) @@ -324,9 +324,9 @@ func TestExecutor_GetTxStatus(t *testing.T) { assert.NoError(t, err) ethTxManager.On("Result", mock.Anything, ethTxManOwner, hash.Hex(), dbTx). - Return(ethtxmanager.MonitoredTxResult{ + Return(txmTypes.MonitoredTxResult{ ID: "0x0", - Status: ethtxmanager.MonitoredTxStatus("0x1"), + Status: txmTypes.MonitoredTxStatus("0x1"), }, errors.New("sampleError")).Once() result, err = executor.GetTxStatus(context.Background(), hash, dbTx) diff --git a/mocks/eth_tx_manager.generated.go b/mocks/eth_tx_manager.generated.go index 74fdc226..c557a140 100644 --- a/mocks/eth_tx_manager.generated.go +++ b/mocks/eth_tx_manager.generated.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.39.0. DO NOT EDIT. +// Code generated by mockery v2.42.1. DO NOT EDIT. package mocks @@ -8,11 +8,11 @@ import ( common "github.com/ethereum/go-ethereum/common" - ethtxmanager "github.com/0xPolygonHermez/zkevm-node/ethtxmanager" - mock "github.com/stretchr/testify/mock" pgx "github.com/jackc/pgx/v4" + + txmanagertypes "github.com/0xPolygon/agglayer/txmanager/types" ) // EthTxManagerMock is an autogenerated mock type for the IEthTxManager type @@ -82,59 +82,23 @@ func (_c *EthTxManagerMock_Add_Call) RunAndReturn(run func(context.Context, stri return _c } -// ProcessPendingMonitoredTxs provides a mock function with given fields: ctx, owner, failedResultHandler, dbTx -func (_m *EthTxManagerMock) ProcessPendingMonitoredTxs(ctx context.Context, owner string, failedResultHandler ethtxmanager.ResultHandler, dbTx pgx.Tx) { - _m.Called(ctx, owner, failedResultHandler, dbTx) -} - -// EthTxManagerMock_ProcessPendingMonitoredTxs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ProcessPendingMonitoredTxs' -type EthTxManagerMock_ProcessPendingMonitoredTxs_Call struct { - *mock.Call -} - -// ProcessPendingMonitoredTxs is a helper method to define mock.On call -// - ctx context.Context -// - owner string -// - failedResultHandler ethtxmanager.ResultHandler -// - dbTx pgx.Tx -func (_e *EthTxManagerMock_Expecter) ProcessPendingMonitoredTxs(ctx interface{}, owner interface{}, failedResultHandler interface{}, dbTx interface{}) *EthTxManagerMock_ProcessPendingMonitoredTxs_Call { - return &EthTxManagerMock_ProcessPendingMonitoredTxs_Call{Call: _e.mock.On("ProcessPendingMonitoredTxs", ctx, owner, failedResultHandler, dbTx)} -} - -func (_c *EthTxManagerMock_ProcessPendingMonitoredTxs_Call) Run(run func(ctx context.Context, owner string, failedResultHandler ethtxmanager.ResultHandler, dbTx pgx.Tx)) *EthTxManagerMock_ProcessPendingMonitoredTxs_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].(ethtxmanager.ResultHandler), args[3].(pgx.Tx)) - }) - return _c -} - -func (_c *EthTxManagerMock_ProcessPendingMonitoredTxs_Call) Return() *EthTxManagerMock_ProcessPendingMonitoredTxs_Call { - _c.Call.Return() - return _c -} - -func (_c *EthTxManagerMock_ProcessPendingMonitoredTxs_Call) RunAndReturn(run func(context.Context, string, ethtxmanager.ResultHandler, pgx.Tx)) *EthTxManagerMock_ProcessPendingMonitoredTxs_Call { - _c.Call.Return(run) - return _c -} - // Result provides a mock function with given fields: ctx, owner, id, dbTx -func (_m *EthTxManagerMock) Result(ctx context.Context, owner string, id string, dbTx pgx.Tx) (ethtxmanager.MonitoredTxResult, error) { +func (_m *EthTxManagerMock) Result(ctx context.Context, owner string, id string, dbTx pgx.Tx) (txmanagertypes.MonitoredTxResult, error) { ret := _m.Called(ctx, owner, id, dbTx) if len(ret) == 0 { panic("no return value specified for Result") } - var r0 ethtxmanager.MonitoredTxResult + var r0 txmanagertypes.MonitoredTxResult var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string, string, pgx.Tx) (ethtxmanager.MonitoredTxResult, error)); ok { + if rf, ok := ret.Get(0).(func(context.Context, string, string, pgx.Tx) (txmanagertypes.MonitoredTxResult, error)); ok { return rf(ctx, owner, id, dbTx) } - if rf, ok := ret.Get(0).(func(context.Context, string, string, pgx.Tx) ethtxmanager.MonitoredTxResult); ok { + if rf, ok := ret.Get(0).(func(context.Context, string, string, pgx.Tx) txmanagertypes.MonitoredTxResult); ok { r0 = rf(ctx, owner, id, dbTx) } else { - r0 = ret.Get(0).(ethtxmanager.MonitoredTxResult) + r0 = ret.Get(0).(txmanagertypes.MonitoredTxResult) } if rf, ok := ret.Get(1).(func(context.Context, string, string, pgx.Tx) error); ok { @@ -167,73 +131,12 @@ func (_c *EthTxManagerMock_Result_Call) Run(run func(ctx context.Context, owner return _c } -func (_c *EthTxManagerMock_Result_Call) Return(_a0 ethtxmanager.MonitoredTxResult, _a1 error) *EthTxManagerMock_Result_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *EthTxManagerMock_Result_Call) RunAndReturn(run func(context.Context, string, string, pgx.Tx) (ethtxmanager.MonitoredTxResult, error)) *EthTxManagerMock_Result_Call { - _c.Call.Return(run) - return _c -} - -// ResultsByStatus provides a mock function with given fields: ctx, owner, statuses, dbTx -func (_m *EthTxManagerMock) ResultsByStatus(ctx context.Context, owner string, statuses []ethtxmanager.MonitoredTxStatus, dbTx pgx.Tx) ([]ethtxmanager.MonitoredTxResult, error) { - ret := _m.Called(ctx, owner, statuses, dbTx) - - if len(ret) == 0 { - panic("no return value specified for ResultsByStatus") - } - - var r0 []ethtxmanager.MonitoredTxResult - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string, []ethtxmanager.MonitoredTxStatus, pgx.Tx) ([]ethtxmanager.MonitoredTxResult, error)); ok { - return rf(ctx, owner, statuses, dbTx) - } - if rf, ok := ret.Get(0).(func(context.Context, string, []ethtxmanager.MonitoredTxStatus, pgx.Tx) []ethtxmanager.MonitoredTxResult); ok { - r0 = rf(ctx, owner, statuses, dbTx) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]ethtxmanager.MonitoredTxResult) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, string, []ethtxmanager.MonitoredTxStatus, pgx.Tx) error); ok { - r1 = rf(ctx, owner, statuses, dbTx) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// EthTxManagerMock_ResultsByStatus_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ResultsByStatus' -type EthTxManagerMock_ResultsByStatus_Call struct { - *mock.Call -} - -// ResultsByStatus is a helper method to define mock.On call -// - ctx context.Context -// - owner string -// - statuses []ethtxmanager.MonitoredTxStatus -// - dbTx pgx.Tx -func (_e *EthTxManagerMock_Expecter) ResultsByStatus(ctx interface{}, owner interface{}, statuses interface{}, dbTx interface{}) *EthTxManagerMock_ResultsByStatus_Call { - return &EthTxManagerMock_ResultsByStatus_Call{Call: _e.mock.On("ResultsByStatus", ctx, owner, statuses, dbTx)} -} - -func (_c *EthTxManagerMock_ResultsByStatus_Call) Run(run func(ctx context.Context, owner string, statuses []ethtxmanager.MonitoredTxStatus, dbTx pgx.Tx)) *EthTxManagerMock_ResultsByStatus_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].([]ethtxmanager.MonitoredTxStatus), args[3].(pgx.Tx)) - }) - return _c -} - -func (_c *EthTxManagerMock_ResultsByStatus_Call) Return(_a0 []ethtxmanager.MonitoredTxResult, _a1 error) *EthTxManagerMock_ResultsByStatus_Call { +func (_c *EthTxManagerMock_Result_Call) Return(_a0 txmanagertypes.MonitoredTxResult, _a1 error) *EthTxManagerMock_Result_Call { _c.Call.Return(_a0, _a1) return _c } -func (_c *EthTxManagerMock_ResultsByStatus_Call) RunAndReturn(run func(context.Context, string, []ethtxmanager.MonitoredTxStatus, pgx.Tx) ([]ethtxmanager.MonitoredTxResult, error)) *EthTxManagerMock_ResultsByStatus_Call { +func (_c *EthTxManagerMock_Result_Call) RunAndReturn(run func(context.Context, string, string, pgx.Tx) (txmanagertypes.MonitoredTxResult, error)) *EthTxManagerMock_Result_Call { _c.Call.Return(run) return _c } diff --git a/mocks/etherman.generated.go b/mocks/etherman.generated.go index cf7ed8da..32d0a2a3 100644 --- a/mocks/etherman.generated.go +++ b/mocks/etherman.generated.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.39.0. DO NOT EDIT. +// Code generated by mockery v2.42.1. DO NOT EDIT. package mocks @@ -8,10 +8,18 @@ import ( common "github.com/ethereum/go-ethereum/common" + coretypes "github.com/ethereum/go-ethereum/core/types" + ethereum "github.com/ethereum/go-ethereum" mock "github.com/stretchr/testify/mock" + pgx "github.com/jackc/pgx/v4" + + state "github.com/0xPolygonHermez/zkevm-node/state" + + time "time" + tx "github.com/0xPolygon/agglayer/tx" ) @@ -149,6 +157,248 @@ func (_c *EthermanMock_CallContract_Call) RunAndReturn(run func(context.Context, return _c } +// CheckTxWasMined provides a mock function with given fields: ctx, txHash +func (_m *EthermanMock) CheckTxWasMined(ctx context.Context, txHash common.Hash) (bool, *coretypes.Receipt, error) { + ret := _m.Called(ctx, txHash) + + if len(ret) == 0 { + panic("no return value specified for CheckTxWasMined") + } + + var r0 bool + var r1 *coretypes.Receipt + var r2 error + if rf, ok := ret.Get(0).(func(context.Context, common.Hash) (bool, *coretypes.Receipt, error)); ok { + return rf(ctx, txHash) + } + if rf, ok := ret.Get(0).(func(context.Context, common.Hash) bool); ok { + r0 = rf(ctx, txHash) + } else { + r0 = ret.Get(0).(bool) + } + + if rf, ok := ret.Get(1).(func(context.Context, common.Hash) *coretypes.Receipt); ok { + r1 = rf(ctx, txHash) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(*coretypes.Receipt) + } + } + + if rf, ok := ret.Get(2).(func(context.Context, common.Hash) error); ok { + r2 = rf(ctx, txHash) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// EthermanMock_CheckTxWasMined_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckTxWasMined' +type EthermanMock_CheckTxWasMined_Call struct { + *mock.Call +} + +// CheckTxWasMined is a helper method to define mock.On call +// - ctx context.Context +// - txHash common.Hash +func (_e *EthermanMock_Expecter) CheckTxWasMined(ctx interface{}, txHash interface{}) *EthermanMock_CheckTxWasMined_Call { + return &EthermanMock_CheckTxWasMined_Call{Call: _e.mock.On("CheckTxWasMined", ctx, txHash)} +} + +func (_c *EthermanMock_CheckTxWasMined_Call) Run(run func(ctx context.Context, txHash common.Hash)) *EthermanMock_CheckTxWasMined_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(common.Hash)) + }) + return _c +} + +func (_c *EthermanMock_CheckTxWasMined_Call) Return(_a0 bool, _a1 *coretypes.Receipt, _a2 error) *EthermanMock_CheckTxWasMined_Call { + _c.Call.Return(_a0, _a1, _a2) + return _c +} + +func (_c *EthermanMock_CheckTxWasMined_Call) RunAndReturn(run func(context.Context, common.Hash) (bool, *coretypes.Receipt, error)) *EthermanMock_CheckTxWasMined_Call { + _c.Call.Return(run) + return _c +} + +// EstimateGas provides a mock function with given fields: ctx, from, to, value, data +func (_m *EthermanMock) EstimateGas(ctx context.Context, from common.Address, to *common.Address, value *big.Int, data []byte) (uint64, error) { + ret := _m.Called(ctx, from, to, value, data) + + if len(ret) == 0 { + panic("no return value specified for EstimateGas") + } + + var r0 uint64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, common.Address, *common.Address, *big.Int, []byte) (uint64, error)); ok { + return rf(ctx, from, to, value, data) + } + if rf, ok := ret.Get(0).(func(context.Context, common.Address, *common.Address, *big.Int, []byte) uint64); ok { + r0 = rf(ctx, from, to, value, data) + } else { + r0 = ret.Get(0).(uint64) + } + + if rf, ok := ret.Get(1).(func(context.Context, common.Address, *common.Address, *big.Int, []byte) error); ok { + r1 = rf(ctx, from, to, value, data) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// EthermanMock_EstimateGas_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'EstimateGas' +type EthermanMock_EstimateGas_Call struct { + *mock.Call +} + +// EstimateGas is a helper method to define mock.On call +// - ctx context.Context +// - from common.Address +// - to *common.Address +// - value *big.Int +// - data []byte +func (_e *EthermanMock_Expecter) EstimateGas(ctx interface{}, from interface{}, to interface{}, value interface{}, data interface{}) *EthermanMock_EstimateGas_Call { + return &EthermanMock_EstimateGas_Call{Call: _e.mock.On("EstimateGas", ctx, from, to, value, data)} +} + +func (_c *EthermanMock_EstimateGas_Call) Run(run func(ctx context.Context, from common.Address, to *common.Address, value *big.Int, data []byte)) *EthermanMock_EstimateGas_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(common.Address), args[2].(*common.Address), args[3].(*big.Int), args[4].([]byte)) + }) + return _c +} + +func (_c *EthermanMock_EstimateGas_Call) Return(_a0 uint64, _a1 error) *EthermanMock_EstimateGas_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *EthermanMock_EstimateGas_Call) RunAndReturn(run func(context.Context, common.Address, *common.Address, *big.Int, []byte) (uint64, error)) *EthermanMock_EstimateGas_Call { + _c.Call.Return(run) + return _c +} + +// GetLastBlock provides a mock function with given fields: ctx, dbTx +func (_m *EthermanMock) GetLastBlock(ctx context.Context, dbTx pgx.Tx) (*state.Block, error) { + ret := _m.Called(ctx, dbTx) + + if len(ret) == 0 { + panic("no return value specified for GetLastBlock") + } + + var r0 *state.Block + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, pgx.Tx) (*state.Block, error)); ok { + return rf(ctx, dbTx) + } + if rf, ok := ret.Get(0).(func(context.Context, pgx.Tx) *state.Block); ok { + r0 = rf(ctx, dbTx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*state.Block) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, pgx.Tx) error); ok { + r1 = rf(ctx, dbTx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// EthermanMock_GetLastBlock_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLastBlock' +type EthermanMock_GetLastBlock_Call struct { + *mock.Call +} + +// GetLastBlock is a helper method to define mock.On call +// - ctx context.Context +// - dbTx pgx.Tx +func (_e *EthermanMock_Expecter) GetLastBlock(ctx interface{}, dbTx interface{}) *EthermanMock_GetLastBlock_Call { + return &EthermanMock_GetLastBlock_Call{Call: _e.mock.On("GetLastBlock", ctx, dbTx)} +} + +func (_c *EthermanMock_GetLastBlock_Call) Run(run func(ctx context.Context, dbTx pgx.Tx)) *EthermanMock_GetLastBlock_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(pgx.Tx)) + }) + return _c +} + +func (_c *EthermanMock_GetLastBlock_Call) Return(_a0 *state.Block, _a1 error) *EthermanMock_GetLastBlock_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *EthermanMock_GetLastBlock_Call) RunAndReturn(run func(context.Context, pgx.Tx) (*state.Block, error)) *EthermanMock_GetLastBlock_Call { + _c.Call.Return(run) + return _c +} + +// GetRevertMessage provides a mock function with given fields: ctx, _a1 +func (_m *EthermanMock) GetRevertMessage(ctx context.Context, _a1 *coretypes.Transaction) (string, error) { + ret := _m.Called(ctx, _a1) + + if len(ret) == 0 { + panic("no return value specified for GetRevertMessage") + } + + var r0 string + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *coretypes.Transaction) (string, error)); ok { + return rf(ctx, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *coretypes.Transaction) string); ok { + r0 = rf(ctx, _a1) + } else { + r0 = ret.Get(0).(string) + } + + if rf, ok := ret.Get(1).(func(context.Context, *coretypes.Transaction) error); ok { + r1 = rf(ctx, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// EthermanMock_GetRevertMessage_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetRevertMessage' +type EthermanMock_GetRevertMessage_Call struct { + *mock.Call +} + +// GetRevertMessage is a helper method to define mock.On call +// - ctx context.Context +// - _a1 *coretypes.Transaction +func (_e *EthermanMock_Expecter) GetRevertMessage(ctx interface{}, _a1 interface{}) *EthermanMock_GetRevertMessage_Call { + return &EthermanMock_GetRevertMessage_Call{Call: _e.mock.On("GetRevertMessage", ctx, _a1)} +} + +func (_c *EthermanMock_GetRevertMessage_Call) Run(run func(ctx context.Context, _a1 *coretypes.Transaction)) *EthermanMock_GetRevertMessage_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*coretypes.Transaction)) + }) + return _c +} + +func (_c *EthermanMock_GetRevertMessage_Call) Return(_a0 string, _a1 error) *EthermanMock_GetRevertMessage_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *EthermanMock_GetRevertMessage_Call) RunAndReturn(run func(context.Context, *coretypes.Transaction) (string, error)) *EthermanMock_GetRevertMessage_Call { + _c.Call.Return(run) + return _c +} + // GetSequencerAddr provides a mock function with given fields: rollupId func (_m *EthermanMock) GetSequencerAddr(rollupId uint32) (common.Address, error) { ret := _m.Called(rollupId) @@ -207,6 +457,411 @@ func (_c *EthermanMock_GetSequencerAddr_Call) RunAndReturn(run func(uint32) (com return _c } +// GetTx provides a mock function with given fields: ctx, txHash +func (_m *EthermanMock) GetTx(ctx context.Context, txHash common.Hash) (*coretypes.Transaction, bool, error) { + ret := _m.Called(ctx, txHash) + + if len(ret) == 0 { + panic("no return value specified for GetTx") + } + + var r0 *coretypes.Transaction + var r1 bool + var r2 error + if rf, ok := ret.Get(0).(func(context.Context, common.Hash) (*coretypes.Transaction, bool, error)); ok { + return rf(ctx, txHash) + } + if rf, ok := ret.Get(0).(func(context.Context, common.Hash) *coretypes.Transaction); ok { + r0 = rf(ctx, txHash) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*coretypes.Transaction) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, common.Hash) bool); ok { + r1 = rf(ctx, txHash) + } else { + r1 = ret.Get(1).(bool) + } + + if rf, ok := ret.Get(2).(func(context.Context, common.Hash) error); ok { + r2 = rf(ctx, txHash) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// EthermanMock_GetTx_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetTx' +type EthermanMock_GetTx_Call struct { + *mock.Call +} + +// GetTx is a helper method to define mock.On call +// - ctx context.Context +// - txHash common.Hash +func (_e *EthermanMock_Expecter) GetTx(ctx interface{}, txHash interface{}) *EthermanMock_GetTx_Call { + return &EthermanMock_GetTx_Call{Call: _e.mock.On("GetTx", ctx, txHash)} +} + +func (_c *EthermanMock_GetTx_Call) Run(run func(ctx context.Context, txHash common.Hash)) *EthermanMock_GetTx_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(common.Hash)) + }) + return _c +} + +func (_c *EthermanMock_GetTx_Call) Return(_a0 *coretypes.Transaction, _a1 bool, _a2 error) *EthermanMock_GetTx_Call { + _c.Call.Return(_a0, _a1, _a2) + return _c +} + +func (_c *EthermanMock_GetTx_Call) RunAndReturn(run func(context.Context, common.Hash) (*coretypes.Transaction, bool, error)) *EthermanMock_GetTx_Call { + _c.Call.Return(run) + return _c +} + +// GetTxReceipt provides a mock function with given fields: ctx, txHash +func (_m *EthermanMock) GetTxReceipt(ctx context.Context, txHash common.Hash) (*coretypes.Receipt, error) { + ret := _m.Called(ctx, txHash) + + if len(ret) == 0 { + panic("no return value specified for GetTxReceipt") + } + + var r0 *coretypes.Receipt + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, common.Hash) (*coretypes.Receipt, error)); ok { + return rf(ctx, txHash) + } + if rf, ok := ret.Get(0).(func(context.Context, common.Hash) *coretypes.Receipt); ok { + r0 = rf(ctx, txHash) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*coretypes.Receipt) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, common.Hash) error); ok { + r1 = rf(ctx, txHash) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// EthermanMock_GetTxReceipt_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetTxReceipt' +type EthermanMock_GetTxReceipt_Call struct { + *mock.Call +} + +// GetTxReceipt is a helper method to define mock.On call +// - ctx context.Context +// - txHash common.Hash +func (_e *EthermanMock_Expecter) GetTxReceipt(ctx interface{}, txHash interface{}) *EthermanMock_GetTxReceipt_Call { + return &EthermanMock_GetTxReceipt_Call{Call: _e.mock.On("GetTxReceipt", ctx, txHash)} +} + +func (_c *EthermanMock_GetTxReceipt_Call) Run(run func(ctx context.Context, txHash common.Hash)) *EthermanMock_GetTxReceipt_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(common.Hash)) + }) + return _c +} + +func (_c *EthermanMock_GetTxReceipt_Call) Return(_a0 *coretypes.Receipt, _a1 error) *EthermanMock_GetTxReceipt_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *EthermanMock_GetTxReceipt_Call) RunAndReturn(run func(context.Context, common.Hash) (*coretypes.Receipt, error)) *EthermanMock_GetTxReceipt_Call { + _c.Call.Return(run) + return _c +} + +// PendingNonce provides a mock function with given fields: ctx, account +func (_m *EthermanMock) PendingNonce(ctx context.Context, account common.Address) (uint64, error) { + ret := _m.Called(ctx, account) + + if len(ret) == 0 { + panic("no return value specified for PendingNonce") + } + + var r0 uint64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, common.Address) (uint64, error)); ok { + return rf(ctx, account) + } + if rf, ok := ret.Get(0).(func(context.Context, common.Address) uint64); ok { + r0 = rf(ctx, account) + } else { + r0 = ret.Get(0).(uint64) + } + + if rf, ok := ret.Get(1).(func(context.Context, common.Address) error); ok { + r1 = rf(ctx, account) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// EthermanMock_PendingNonce_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PendingNonce' +type EthermanMock_PendingNonce_Call struct { + *mock.Call +} + +// PendingNonce is a helper method to define mock.On call +// - ctx context.Context +// - account common.Address +func (_e *EthermanMock_Expecter) PendingNonce(ctx interface{}, account interface{}) *EthermanMock_PendingNonce_Call { + return &EthermanMock_PendingNonce_Call{Call: _e.mock.On("PendingNonce", ctx, account)} +} + +func (_c *EthermanMock_PendingNonce_Call) Run(run func(ctx context.Context, account common.Address)) *EthermanMock_PendingNonce_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(common.Address)) + }) + return _c +} + +func (_c *EthermanMock_PendingNonce_Call) Return(_a0 uint64, _a1 error) *EthermanMock_PendingNonce_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *EthermanMock_PendingNonce_Call) RunAndReturn(run func(context.Context, common.Address) (uint64, error)) *EthermanMock_PendingNonce_Call { + _c.Call.Return(run) + return _c +} + +// SendTx provides a mock function with given fields: ctx, _a1 +func (_m *EthermanMock) SendTx(ctx context.Context, _a1 *coretypes.Transaction) error { + ret := _m.Called(ctx, _a1) + + if len(ret) == 0 { + panic("no return value specified for SendTx") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *coretypes.Transaction) error); ok { + r0 = rf(ctx, _a1) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// EthermanMock_SendTx_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SendTx' +type EthermanMock_SendTx_Call struct { + *mock.Call +} + +// SendTx is a helper method to define mock.On call +// - ctx context.Context +// - _a1 *coretypes.Transaction +func (_e *EthermanMock_Expecter) SendTx(ctx interface{}, _a1 interface{}) *EthermanMock_SendTx_Call { + return &EthermanMock_SendTx_Call{Call: _e.mock.On("SendTx", ctx, _a1)} +} + +func (_c *EthermanMock_SendTx_Call) Run(run func(ctx context.Context, _a1 *coretypes.Transaction)) *EthermanMock_SendTx_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*coretypes.Transaction)) + }) + return _c +} + +func (_c *EthermanMock_SendTx_Call) Return(_a0 error) *EthermanMock_SendTx_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *EthermanMock_SendTx_Call) RunAndReturn(run func(context.Context, *coretypes.Transaction) error) *EthermanMock_SendTx_Call { + _c.Call.Return(run) + return _c +} + +// SignTx provides a mock function with given fields: ctx, sender, _a2 +func (_m *EthermanMock) SignTx(ctx context.Context, sender common.Address, _a2 *coretypes.Transaction) (*coretypes.Transaction, error) { + ret := _m.Called(ctx, sender, _a2) + + if len(ret) == 0 { + panic("no return value specified for SignTx") + } + + var r0 *coretypes.Transaction + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, common.Address, *coretypes.Transaction) (*coretypes.Transaction, error)); ok { + return rf(ctx, sender, _a2) + } + if rf, ok := ret.Get(0).(func(context.Context, common.Address, *coretypes.Transaction) *coretypes.Transaction); ok { + r0 = rf(ctx, sender, _a2) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*coretypes.Transaction) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, common.Address, *coretypes.Transaction) error); ok { + r1 = rf(ctx, sender, _a2) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// EthermanMock_SignTx_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SignTx' +type EthermanMock_SignTx_Call struct { + *mock.Call +} + +// SignTx is a helper method to define mock.On call +// - ctx context.Context +// - sender common.Address +// - _a2 *coretypes.Transaction +func (_e *EthermanMock_Expecter) SignTx(ctx interface{}, sender interface{}, _a2 interface{}) *EthermanMock_SignTx_Call { + return &EthermanMock_SignTx_Call{Call: _e.mock.On("SignTx", ctx, sender, _a2)} +} + +func (_c *EthermanMock_SignTx_Call) Run(run func(ctx context.Context, sender common.Address, _a2 *coretypes.Transaction)) *EthermanMock_SignTx_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(common.Address), args[2].(*coretypes.Transaction)) + }) + return _c +} + +func (_c *EthermanMock_SignTx_Call) Return(_a0 *coretypes.Transaction, _a1 error) *EthermanMock_SignTx_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *EthermanMock_SignTx_Call) RunAndReturn(run func(context.Context, common.Address, *coretypes.Transaction) (*coretypes.Transaction, error)) *EthermanMock_SignTx_Call { + _c.Call.Return(run) + return _c +} + +// SuggestedGasPrice provides a mock function with given fields: ctx +func (_m *EthermanMock) SuggestedGasPrice(ctx context.Context) (*big.Int, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for SuggestedGasPrice") + } + + var r0 *big.Int + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (*big.Int, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) *big.Int); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*big.Int) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// EthermanMock_SuggestedGasPrice_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SuggestedGasPrice' +type EthermanMock_SuggestedGasPrice_Call struct { + *mock.Call +} + +// SuggestedGasPrice is a helper method to define mock.On call +// - ctx context.Context +func (_e *EthermanMock_Expecter) SuggestedGasPrice(ctx interface{}) *EthermanMock_SuggestedGasPrice_Call { + return &EthermanMock_SuggestedGasPrice_Call{Call: _e.mock.On("SuggestedGasPrice", ctx)} +} + +func (_c *EthermanMock_SuggestedGasPrice_Call) Run(run func(ctx context.Context)) *EthermanMock_SuggestedGasPrice_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *EthermanMock_SuggestedGasPrice_Call) Return(_a0 *big.Int, _a1 error) *EthermanMock_SuggestedGasPrice_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *EthermanMock_SuggestedGasPrice_Call) RunAndReturn(run func(context.Context) (*big.Int, error)) *EthermanMock_SuggestedGasPrice_Call { + _c.Call.Return(run) + return _c +} + +// WaitTxToBeMined provides a mock function with given fields: ctx, _a1, timeout +func (_m *EthermanMock) WaitTxToBeMined(ctx context.Context, _a1 *coretypes.Transaction, timeout time.Duration) (bool, error) { + ret := _m.Called(ctx, _a1, timeout) + + if len(ret) == 0 { + panic("no return value specified for WaitTxToBeMined") + } + + var r0 bool + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *coretypes.Transaction, time.Duration) (bool, error)); ok { + return rf(ctx, _a1, timeout) + } + if rf, ok := ret.Get(0).(func(context.Context, *coretypes.Transaction, time.Duration) bool); ok { + r0 = rf(ctx, _a1, timeout) + } else { + r0 = ret.Get(0).(bool) + } + + if rf, ok := ret.Get(1).(func(context.Context, *coretypes.Transaction, time.Duration) error); ok { + r1 = rf(ctx, _a1, timeout) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// EthermanMock_WaitTxToBeMined_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WaitTxToBeMined' +type EthermanMock_WaitTxToBeMined_Call struct { + *mock.Call +} + +// WaitTxToBeMined is a helper method to define mock.On call +// - ctx context.Context +// - _a1 *coretypes.Transaction +// - timeout time.Duration +func (_e *EthermanMock_Expecter) WaitTxToBeMined(ctx interface{}, _a1 interface{}, timeout interface{}) *EthermanMock_WaitTxToBeMined_Call { + return &EthermanMock_WaitTxToBeMined_Call{Call: _e.mock.On("WaitTxToBeMined", ctx, _a1, timeout)} +} + +func (_c *EthermanMock_WaitTxToBeMined_Call) Run(run func(ctx context.Context, _a1 *coretypes.Transaction, timeout time.Duration)) *EthermanMock_WaitTxToBeMined_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*coretypes.Transaction), args[2].(time.Duration)) + }) + return _c +} + +func (_c *EthermanMock_WaitTxToBeMined_Call) Return(_a0 bool, _a1 error) *EthermanMock_WaitTxToBeMined_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *EthermanMock_WaitTxToBeMined_Call) RunAndReturn(run func(context.Context, *coretypes.Transaction, time.Duration) (bool, error)) *EthermanMock_WaitTxToBeMined_Call { + _c.Call.Return(run) + return _c +} + // NewEthermanMock creates a new instance of EthermanMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewEthermanMock(t interface { diff --git a/rpc/rpc_test.go b/rpc/rpc_test.go index 1cdef1b9..f5e60c28 100644 --- a/rpc/rpc_test.go +++ b/rpc/rpc_test.go @@ -8,10 +8,10 @@ import ( "github.com/0xPolygon/agglayer/config" "github.com/0xPolygon/agglayer/interop" "github.com/0xPolygon/agglayer/mocks" + txmTypes "github.com/0xPolygon/agglayer/txmanager/types" "github.com/0xPolygon/agglayer/log" agglayerTypes "github.com/0xPolygon/agglayer/rpc/types" - "github.com/0xPolygonHermez/zkevm-node/ethtxmanager" validiumTypes "github.com/0xPolygonHermez/zkevm-node/jsonrpc/types" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -62,7 +62,7 @@ func TestInteropEndpointsGetTxStatus(t *testing.T) { txManagerMock := mocks.NewEthTxManagerMock(t) txManagerMock.On("Result", mock.Anything, ethTxManOwner, txHash.Hex(), txMock). - Return(ethtxmanager.MonitoredTxResult{}, errors.New("error")).Once() + Return(txmTypes.MonitoredTxResult{}, errors.New("error")).Once() cfg := &config.Config{} e := interop.New( @@ -89,10 +89,10 @@ func TestInteropEndpointsGetTxStatus(t *testing.T) { to := common.HexToAddress("0xreceiver") txHash := common.HexToHash("0xsomeTxHash") - result := ethtxmanager.MonitoredTxResult{ + result := txmTypes.MonitoredTxResult{ ID: "1", - Status: ethtxmanager.MonitoredTxStatusConfirmed, - Txs: map[common.Hash]ethtxmanager.TxResult{ + Status: txmTypes.MonitoredTxStatusConfirmed, + Txs: map[common.Hash]txmTypes.TxResult{ txHash: { Tx: types.NewTransaction(1, to, big.NewInt(100_000), 21000, big.NewInt(10_000), nil), }, @@ -579,7 +579,7 @@ func TestInteropEndpointsSendTx(t *testing.T) { isSignerValid: true, canGetBatch: true, isBatchValid: false, - expectedError: "Mismatch detected", + expectedError: "mismatch detected", }) }) diff --git a/txmanager/pgstorage.go b/txmanager/pgstorage.go new file mode 100644 index 00000000..1f55ae4d --- /dev/null +++ b/txmanager/pgstorage.go @@ -0,0 +1,278 @@ +package txmanager + +import ( + "context" + "encoding/hex" + "errors" + "math/big" + "time" + + txmTypes "github.com/0xPolygon/agglayer/txmanager/types" + "github.com/0xPolygonHermez/zkevm-node/db" + "github.com/ethereum/go-ethereum/common" + "github.com/jackc/pgconn" + "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4/pgxpool" +) + +// PostgresStorage hold txs to be managed +type PostgresStorage struct { + *pgxpool.Pool +} + +// NewPostgresStorage creates a new instance of storage that use +// postgres to store data +func NewPostgresStorage(dbCfg db.Config) (*PostgresStorage, error) { + db, err := db.NewSQLDB(dbCfg) + if err != nil { + return nil, err + } + + return &PostgresStorage{ + db, + }, nil +} + +// Add persist a monitored tx +func (s *PostgresStorage) Add(ctx context.Context, mTx txmTypes.MonitoredTx, dbTx pgx.Tx) error { + conn := s.dbConn(dbTx) + cmd := ` + INSERT INTO state.monitored_txs (owner, id, from_addr, to_addr, nonce, value, data, gas, gas_offset, gas_price, status, block_num, history, created_at, updated_at, num_retries) + VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)` + + _, err := conn.Exec(ctx, cmd, mTx.Owner, + mTx.ID, mTx.From.String(), mTx.ToStringPtr(), + mTx.Nonce, mTx.ValueU64Ptr(), mTx.DataStringPtr(), + mTx.Gas, mTx.GasOffset, mTx.GasPrice.Uint64(), string(mTx.Status), mTx.BlockNumberU64Ptr(), + mTx.HistoryStringSlice(), time.Now().UTC().Round(time.Microsecond), + time.Now().UTC().Round(time.Microsecond), mTx.NumRetries) + + if err != nil { + if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.ConstraintName == "monitored_txs_pkey" { + return txmTypes.ErrAlreadyExists + } else { + return err + } + } + + return nil +} + +// Get loads a persisted monitored tx +func (s *PostgresStorage) Get(ctx context.Context, owner, id string, dbTx pgx.Tx) (txmTypes.MonitoredTx, error) { + conn := s.dbConn(dbTx) + cmd := ` + SELECT owner, id, from_addr, to_addr, nonce, value, data, gas, gas_offset, gas_price, status, block_num, history, created_at, updated_at, num_retries + FROM state.monitored_txs + WHERE owner = $1 + AND id = $2` + + mTx := txmTypes.MonitoredTx{} + + row := conn.QueryRow(ctx, cmd, owner, id) + err := s.scanMtx(row, &mTx) + if errors.Is(err, pgx.ErrNoRows) { + return mTx, txmTypes.ErrNotFound + } else if err != nil { + return mTx, err + } + + return mTx, nil +} + +// GetByStatus loads all monitored tx that match the provided status +func (s *PostgresStorage) GetByStatus(ctx context.Context, owner *string, statuses []txmTypes.MonitoredTxStatus, dbTx pgx.Tx) ([]txmTypes.MonitoredTx, error) { + hasStatusToFilter := len(statuses) > 0 + + conn := s.dbConn(dbTx) + cmd := ` + SELECT owner, id, from_addr, to_addr, nonce, value, data, gas, gas_offset, gas_price, status, block_num, history, created_at, updated_at, num_retries + FROM state.monitored_txs + WHERE (owner = $1 OR $1 IS NULL)` + if hasStatusToFilter { + cmd += ` + AND status = ANY($2)` + } + cmd += ` + ORDER BY created_at` + + mTxs := []txmTypes.MonitoredTx{} + + var rows pgx.Rows + var err error + if hasStatusToFilter { + rows, err = conn.Query(ctx, cmd, owner, statuses) + } else { + rows, err = conn.Query(ctx, cmd, owner) + } + + if errors.Is(err, pgx.ErrNoRows) { + return []txmTypes.MonitoredTx{}, nil + } else if err != nil { + return nil, err + } + + for rows.Next() { + mTx := txmTypes.MonitoredTx{} + err := s.scanMtx(rows, &mTx) + if err != nil { + return nil, err + } + mTxs = append(mTxs, mTx) + } + + return mTxs, nil +} + +// GetBySenderAndStatus loads all monitored txs of the given sender that match the provided status +func (s *PostgresStorage) GetBySenderAndStatus( + ctx context.Context, sender common.Address, + statuses []txmTypes.MonitoredTxStatus, dbTx pgx.Tx) ([]txmTypes.MonitoredTx, error) { + hasStatusToFilter := len(statuses) > 0 + + conn := s.dbConn(dbTx) + cmd := ` + SELECT owner, id, from_addr, to_addr, nonce, value, data, gas, gas_offset, gas_price, status, block_num, history, created_at, updated_at, num_retries + FROM state.monitored_txs + WHERE from_addr = $1` + if hasStatusToFilter { + cmd += ` + AND status = ANY($2)` + } + cmd += ` + ORDER BY created_at` + + mTxs := []txmTypes.MonitoredTx{} + + var rows pgx.Rows + var err error + if hasStatusToFilter { + rows, err = conn.Query(ctx, cmd, sender.String(), statuses) + } else { + rows, err = conn.Query(ctx, cmd, sender.String()) + } + + if errors.Is(err, pgx.ErrNoRows) { + return []txmTypes.MonitoredTx{}, nil + } else if err != nil { + return nil, err + } + + for rows.Next() { + mTx := txmTypes.MonitoredTx{} + err := s.scanMtx(rows, &mTx) + if err != nil { + return nil, err + } + mTxs = append(mTxs, mTx) + } + + return mTxs, nil +} + +// Update a persisted monitored tx +func (s *PostgresStorage) Update(ctx context.Context, mTx txmTypes.MonitoredTx, dbTx pgx.Tx) error { + conn := s.dbConn(dbTx) + cmd := ` + UPDATE state.monitored_txs + SET from_addr = $3 + , to_addr = $4 + , nonce = $5 + , value = $6 + , data = $7 + , gas = $8 + , gas_offset = $9 + , gas_price = $10 + , status = $11 + , block_num = $12 + , history = $13 + , updated_at = $14 + , num_retries = $15 + WHERE owner = $1 + AND id = $2` + + var bn *uint64 + if mTx.BlockNumber != nil { + tmp := mTx.BlockNumber.Uint64() + bn = &tmp + } + + _, err := conn.Exec(ctx, cmd, mTx.Owner, + mTx.ID, mTx.From.String(), mTx.ToStringPtr(), + mTx.Nonce, mTx.ValueU64Ptr(), mTx.DataStringPtr(), + mTx.Gas, mTx.GasOffset, mTx.GasPrice.Uint64(), string(mTx.Status), bn, + mTx.HistoryStringSlice(), time.Now().UTC().Round(time.Microsecond), mTx.NumRetries) + + if err != nil { + return err + } + + return nil +} + +// scanMtx scans a row and fill the provided instance of monitoredTx with +// the row data +func (s *PostgresStorage) scanMtx(row pgx.Row, mTx *txmTypes.MonitoredTx) error { + // id, from, to, nonce, value, data, gas, gas_offset, gas_price, status, history, created_at, updated_at, num_retries + var from, status string + var to, data *string + var history []string + var value, blockNumber *uint64 + var gasPrice uint64 + + err := row.Scan(&mTx.Owner, &mTx.ID, &from, &to, &mTx.Nonce, &value, + &data, &mTx.Gas, &mTx.GasOffset, &gasPrice, &status, &blockNumber, &history, + &mTx.CreatedAt, &mTx.UpdatedAt, &mTx.NumRetries) + if err != nil { + return err + } + + mTx.From = common.HexToAddress(from) + mTx.GasPrice = big.NewInt(0).SetUint64(gasPrice) + mTx.Status = txmTypes.MonitoredTxStatus(status) + + if to != nil { + tmp := common.HexToAddress(*to) + mTx.To = &tmp + } + if value != nil { + tmp := *value + mTx.Value = big.NewInt(0).SetUint64(tmp) + } + if data != nil { + tmp := *data + bytes, err := hex.DecodeString(tmp) + if err != nil { + return err + } + mTx.Data = bytes + } + if blockNumber != nil { + tmp := *blockNumber + mTx.BlockNumber = big.NewInt(0).SetUint64(tmp) + } + + h := make(map[common.Hash]bool, len(history)) + for _, txHash := range history { + h[common.HexToHash(txHash)] = true + } + mTx.History = h + + return nil +} + +// dbConn represents an instance of an object that can +// connect to a postgres db to execute sql commands and query data +type dbConn interface { + Exec(ctx context.Context, sql string, arguments ...interface{}) (commandTag pgconn.CommandTag, err error) + Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error) + QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row +} + +// dbConn determines which db connection to use, dbTx or the main pgxpool +func (p *PostgresStorage) dbConn(dbTx pgx.Tx) dbConn { + if dbTx != nil { + return dbTx + } + return p +} diff --git a/txmanager/pgstorage_test.go b/txmanager/pgstorage_test.go new file mode 100644 index 00000000..651218b8 --- /dev/null +++ b/txmanager/pgstorage_test.go @@ -0,0 +1,338 @@ +package txmanager + +import ( + "context" + "fmt" + "math/big" + "testing" + "time" + + "github.com/0xPolygonHermez/zkevm-node/db" + "github.com/0xPolygonHermez/zkevm-node/test/testutils" + "github.com/ethereum/go-ethereum/common" + "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4/stdlib" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + aggLayerDB "github.com/0xPolygon/agglayer/db" + txmTypes "github.com/0xPolygon/agglayer/txmanager/types" +) + +func newStateDBConfig(t *testing.T) db.Config { + t.Helper() + + const maxDBPoolConns = 50 + + cfg := db.Config{ + User: testutils.GetEnv("PGUSER", "agglayer_user"), + Password: testutils.GetEnv("PGPASSWORD", "agglayer_password"), + Name: testutils.GetEnv("PGDATABASE", "agglayer_db"), + Host: testutils.GetEnv("PGHOST", "localhost"), + Port: testutils.GetEnv("PGPORT", "5434"), + EnableLog: false, + MaxConns: maxDBPoolConns, + } + + // connect to database + dbPool, err := db.NewSQLDB(cfg) + require.NoError(t, err) + + defer dbPool.Close() + + c, err := pgx.ParseConfig(fmt.Sprintf("postgres://%s:%s@%s:%s/%s", cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.Name)) + require.NoError(t, err) + + db := stdlib.OpenDB(*c) + + require.NoError(t, aggLayerDB.RunMigrationsDown(db)) + require.NoError(t, aggLayerDB.RunMigrationsUp(db)) + + return cfg +} + +func TestAddGetAndUpdate(t *testing.T) { + dbCfg := newStateDBConfig(t) + storage, err := NewPostgresStorage(dbCfg) + require.NoError(t, err) + + owner := "owner" + id := "id" + from := common.HexToAddress("0x1") + to := common.HexToAddress("0x2") + nonce := uint64(1) + value := big.NewInt(2) + data := []byte("data") + gas := uint64(3) + gasPrice := big.NewInt(4) + status := txmTypes.MonitoredTxStatusCreated + blockNumber := big.NewInt(5) + history := map[common.Hash]bool{common.HexToHash("0x3"): true, common.HexToHash("0x4"): true} + + mTx := txmTypes.MonitoredTx{ + Owner: owner, ID: id, From: from, To: &to, Nonce: nonce, Value: value, Data: data, + BlockNumber: blockNumber, Gas: gas, GasPrice: gasPrice, Status: status, History: history, + } + err = storage.Add(context.Background(), mTx, nil) + require.NoError(t, err) + + returnedMtx, err := storage.Get(context.Background(), owner, id, nil) + require.NoError(t, err) + + assert.Equal(t, owner, returnedMtx.Owner) + assert.Equal(t, id, returnedMtx.ID) + assert.Equal(t, from.String(), returnedMtx.From.String()) + assert.Equal(t, to.String(), returnedMtx.To.String()) + assert.Equal(t, nonce, returnedMtx.Nonce) + assert.Equal(t, value, returnedMtx.Value) + assert.Equal(t, data, returnedMtx.Data) + assert.Equal(t, gas, returnedMtx.Gas) + assert.Equal(t, gasPrice, returnedMtx.GasPrice) + assert.Equal(t, status, returnedMtx.Status) + assert.Equal(t, 0, blockNumber.Cmp(returnedMtx.BlockNumber)) + assert.Equal(t, history, returnedMtx.History) + assert.Greater(t, time.Now().UTC().Round(time.Microsecond), returnedMtx.CreatedAt) + assert.Less(t, time.Time{}, returnedMtx.CreatedAt) + assert.Greater(t, time.Now().UTC().Round(time.Microsecond), returnedMtx.UpdatedAt) + assert.Less(t, time.Time{}, returnedMtx.UpdatedAt) + + from = common.HexToAddress("0x11") + to = common.HexToAddress("0x22") + nonce = uint64(11) + value = big.NewInt(22) + data = []byte("data data") + gas = uint64(33) + gasPrice = big.NewInt(44) + status = txmTypes.MonitoredTxStatusFailed + blockNumber = big.NewInt(55) + history = map[common.Hash]bool{common.HexToHash("0x33"): true, common.HexToHash("0x44"): true} + + mTx = txmTypes.MonitoredTx{ + Owner: owner, ID: id, From: from, To: &to, Nonce: nonce, Value: value, Data: data, + BlockNumber: blockNumber, Gas: gas, GasPrice: gasPrice, Status: status, History: history, + } + err = storage.Update(context.Background(), mTx, nil) + require.NoError(t, err) + + returnedMtx, err = storage.Get(context.Background(), owner, id, nil) + require.NoError(t, err) + + assert.Equal(t, owner, returnedMtx.Owner) + assert.Equal(t, id, returnedMtx.ID) + assert.Equal(t, from.String(), returnedMtx.From.String()) + assert.Equal(t, to.String(), returnedMtx.To.String()) + assert.Equal(t, nonce, returnedMtx.Nonce) + assert.Equal(t, value, returnedMtx.Value) + assert.Equal(t, data, returnedMtx.Data) + assert.Equal(t, gas, returnedMtx.Gas) + assert.Equal(t, gasPrice, returnedMtx.GasPrice) + assert.Equal(t, status, returnedMtx.Status) + assert.Equal(t, 0, blockNumber.Cmp(returnedMtx.BlockNumber)) + assert.Equal(t, history, returnedMtx.History) + assert.Greater(t, time.Now().UTC().Round(time.Microsecond), returnedMtx.CreatedAt) + assert.Less(t, time.Time{}, returnedMtx.CreatedAt) + assert.Greater(t, time.Now().UTC().Round(time.Microsecond), returnedMtx.UpdatedAt) + assert.Less(t, time.Time{}, returnedMtx.UpdatedAt) +} + +func TestAddAndGetByStatus(t *testing.T) { + dbCfg := newStateDBConfig(t) + storage, err := NewPostgresStorage(dbCfg) + require.NoError(t, err) + + to := common.HexToAddress("0x2") + baseMtx := txmTypes.MonitoredTx{ + Owner: "owner", + From: common.HexToAddress("0x1"), + To: &to, + Nonce: uint64(1), + Value: big.NewInt(2), + Data: []byte("data"), + BlockNumber: big.NewInt(1), + Gas: uint64(3), + GasPrice: big.NewInt(4), + History: map[common.Hash]bool{common.HexToHash("0x3"): true, common.HexToHash("0x4"): true}, + } + + type mTxReplaceInfo struct { + id string + status txmTypes.MonitoredTxStatus + } + + mTxsReplaceInfo := []mTxReplaceInfo{ + {id: "created1", status: txmTypes.MonitoredTxStatusCreated}, + {id: "sent1", status: txmTypes.MonitoredTxStatusSent}, + {id: "failed1", status: txmTypes.MonitoredTxStatusFailed}, + {id: "confirmed1", status: txmTypes.MonitoredTxStatusConfirmed}, + {id: "created2", status: txmTypes.MonitoredTxStatusCreated}, + {id: "sent2", status: txmTypes.MonitoredTxStatusSent}, + {id: "failed2", status: txmTypes.MonitoredTxStatusFailed}, + {id: "confirmed2", status: txmTypes.MonitoredTxStatusConfirmed}, + } + + for _, replaceInfo := range mTxsReplaceInfo { + baseMtx.ID = replaceInfo.id + baseMtx.Status = replaceInfo.status + baseMtx.CreatedAt = baseMtx.CreatedAt.Add(time.Microsecond) + baseMtx.UpdatedAt = baseMtx.UpdatedAt.Add(time.Microsecond) + err = storage.Add(context.Background(), baseMtx, nil) + require.NoError(t, err) + } + + mTxs, err := storage.GetByStatus(context.Background(), nil, []txmTypes.MonitoredTxStatus{txmTypes.MonitoredTxStatusConfirmed}, nil) + require.NoError(t, err) + assert.Equal(t, 2, len(mTxs)) + assert.Equal(t, "confirmed1", mTxs[0].ID) + assert.Equal(t, "confirmed2", mTxs[1].ID) + + mTxs, err = storage.GetByStatus(context.Background(), nil, []txmTypes.MonitoredTxStatus{txmTypes.MonitoredTxStatusSent, txmTypes.MonitoredTxStatusCreated}, nil) + require.NoError(t, err) + assert.Equal(t, 4, len(mTxs)) + assert.Equal(t, "created1", mTxs[0].ID) + assert.Equal(t, "sent1", mTxs[1].ID) + assert.Equal(t, "created2", mTxs[2].ID) + assert.Equal(t, "sent2", mTxs[3].ID) + + mTxs, err = storage.GetByStatus(context.Background(), nil, []txmTypes.MonitoredTxStatus{}, nil) + require.NoError(t, err) + assert.Equal(t, 8, len(mTxs)) + assert.Equal(t, "created1", mTxs[0].ID) + assert.Equal(t, "sent1", mTxs[1].ID) + assert.Equal(t, "failed1", mTxs[2].ID) + assert.Equal(t, "confirmed1", mTxs[3].ID) + assert.Equal(t, "created2", mTxs[4].ID) + assert.Equal(t, "sent2", mTxs[5].ID) + assert.Equal(t, "failed2", mTxs[6].ID) + assert.Equal(t, "confirmed2", mTxs[7].ID) +} + +func TestAddAndGetBySenderAndStatus(t *testing.T) { + dbCfg := newStateDBConfig(t) + storage, err := NewPostgresStorage(dbCfg) + require.NoError(t, err) + + from := common.HexToAddress("0x1") + to := common.HexToAddress("0x2") + baseMtx := txmTypes.MonitoredTx{ + Owner: "owner", + From: common.HexToAddress("0x1"), + To: &to, + Nonce: uint64(1), + Value: big.NewInt(2), + Data: []byte("data"), + BlockNumber: big.NewInt(1), + Gas: uint64(3), + GasPrice: big.NewInt(4), + History: map[common.Hash]bool{common.HexToHash("0x3"): true, common.HexToHash("0x4"): true}, + } + + type mTxReplaceInfo struct { + id string + status txmTypes.MonitoredTxStatus + } + + mTxsReplaceInfo := []mTxReplaceInfo{ + {id: "created1", status: txmTypes.MonitoredTxStatusCreated}, + {id: "sent1", status: txmTypes.MonitoredTxStatusSent}, + {id: "failed1", status: txmTypes.MonitoredTxStatusFailed}, + {id: "confirmed1", status: txmTypes.MonitoredTxStatusConfirmed}, + {id: "created2", status: txmTypes.MonitoredTxStatusCreated}, + {id: "sent2", status: txmTypes.MonitoredTxStatusSent}, + {id: "failed2", status: txmTypes.MonitoredTxStatusFailed}, + {id: "confirmed2", status: txmTypes.MonitoredTxStatusConfirmed}, + } + + for _, replaceInfo := range mTxsReplaceInfo { + baseMtx.ID = replaceInfo.id + baseMtx.Status = replaceInfo.status + baseMtx.CreatedAt = baseMtx.CreatedAt.Add(time.Microsecond) + baseMtx.UpdatedAt = baseMtx.UpdatedAt.Add(time.Microsecond) + err = storage.Add(context.Background(), baseMtx, nil) + require.NoError(t, err) + } + + mTxs, err := storage.GetBySenderAndStatus(context.Background(), from, []txmTypes.MonitoredTxStatus{txmTypes.MonitoredTxStatusConfirmed}, nil) + require.NoError(t, err) + assert.Equal(t, 2, len(mTxs)) + assert.Equal(t, "confirmed1", mTxs[0].ID) + assert.Equal(t, "confirmed2", mTxs[1].ID) + + mTxs, err = storage.GetBySenderAndStatus(context.Background(), from, []txmTypes.MonitoredTxStatus{txmTypes.MonitoredTxStatusSent, txmTypes.MonitoredTxStatusCreated}, nil) + require.NoError(t, err) + assert.Equal(t, 4, len(mTxs)) + assert.Equal(t, "created1", mTxs[0].ID) + assert.Equal(t, "sent1", mTxs[1].ID) + assert.Equal(t, "created2", mTxs[2].ID) + assert.Equal(t, "sent2", mTxs[3].ID) + + mTxs, err = storage.GetBySenderAndStatus(context.Background(), from, []txmTypes.MonitoredTxStatus{}, nil) + require.NoError(t, err) + assert.Equal(t, 8, len(mTxs)) + assert.Equal(t, "created1", mTxs[0].ID) + assert.Equal(t, "sent1", mTxs[1].ID) + assert.Equal(t, "failed1", mTxs[2].ID) + assert.Equal(t, "confirmed1", mTxs[3].ID) + assert.Equal(t, "created2", mTxs[4].ID) + assert.Equal(t, "sent2", mTxs[5].ID) + assert.Equal(t, "failed2", mTxs[6].ID) + assert.Equal(t, "confirmed2", mTxs[7].ID) +} + +func TestAddRepeated(t *testing.T) { + dbCfg := newStateDBConfig(t) + storage, err := NewPostgresStorage(dbCfg) + require.NoError(t, err) + + owner := "owner" + id := "id" + from := common.HexToAddress("0x1") + to := common.HexToAddress("0x2") + nonce := uint64(1) + value := big.NewInt(2) + data := []byte("data") + gas := uint64(3) + gasPrice := big.NewInt(4) + blockNumber := big.NewInt(5) + status := txmTypes.MonitoredTxStatusCreated + history := map[common.Hash]bool{common.HexToHash("0x3"): true, common.HexToHash("0x4"): true} + + mTx := txmTypes.MonitoredTx{ + Owner: owner, + ID: id, + From: from, + To: &to, + Nonce: nonce, + Value: value, + Data: data, + BlockNumber: blockNumber, + Gas: gas, + GasPrice: gasPrice, + Status: status, + History: history, + } + + err = storage.Add(context.Background(), mTx, nil) + require.NoError(t, err) + + err = storage.Add(context.Background(), mTx, nil) + require.Equal(t, txmTypes.ErrAlreadyExists, err) +} + +func TestGetNotFound(t *testing.T) { + dbCfg := newStateDBConfig(t) + storage, err := NewPostgresStorage(dbCfg) + require.NoError(t, err) + + _, err = storage.Get(context.Background(), "not found owner", "not found id", nil) + require.Equal(t, txmTypes.ErrNotFound, err) +} + +func TestGetByStatusNoRows(t *testing.T) { + dbCfg := newStateDBConfig(t) + storage, err := NewPostgresStorage(dbCfg) + require.NoError(t, err) + + mTxs, err := storage.GetByStatus(context.Background(), nil, []txmTypes.MonitoredTxStatus{}, nil) + require.NoError(t, err) + require.Empty(t, mTxs) +} diff --git a/txmanager/txmanager.go b/txmanager/txmanager.go new file mode 100644 index 00000000..c89c0827 --- /dev/null +++ b/txmanager/txmanager.go @@ -0,0 +1,595 @@ +package txmanager + +import ( + "context" + "errors" + "fmt" + "math/big" + "sync" + "time" + + "github.com/0xPolygon/agglayer/config" + "github.com/0xPolygon/agglayer/log" + txmTypes "github.com/0xPolygon/agglayer/txmanager/types" + aggLayerTypes "github.com/0xPolygon/agglayer/types" + "github.com/0xPolygonHermez/zkevm-node/state" + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/jackc/pgx/v4" + "go.uber.org/zap" +) + +const failureIntervalInSeconds = 5 + +// Client for eth tx manager +type Client struct { + ctx context.Context + cancel context.CancelFunc + + cfg config.EthTxManagerConfig + etherman aggLayerTypes.IEtherman + storage txmTypes.StorageInterface + state txmTypes.StateInterface +} + +// New creates new eth tx manager +func New(cfg config.EthTxManagerConfig, ethMan aggLayerTypes.IEtherman, storage txmTypes.StorageInterface, state txmTypes.StateInterface) *Client { + c := &Client{ + cfg: cfg, + etherman: ethMan, + storage: storage, + state: state, + } + + return c +} + +// Start will start the tx management, reading txs from storage, +// send then to the blockchain and keep monitoring them until they +// get mined +func (c *Client) Start() { + // infinite loop to manage txs as they arrive + c.ctx, c.cancel = context.WithCancel(context.Background()) + + for { + select { + case <-c.ctx.Done(): + return + case <-time.After(c.cfg.FrequencyToMonitorTxs.Duration): + err := c.monitorTxs(context.Background()) + if err != nil { + c.logErrorAndWait("failed to monitor txs: %v", err) + } + } + } +} + +// Stop will stops the monitored tx management +func (c *Client) Stop() { + c.cancel() +} + +// Add a transaction to be sent and monitored +func (c *Client) Add(ctx context.Context, owner, id string, from common.Address, to *common.Address, value *big.Int, data []byte, gasOffset uint64, dbTx pgx.Tx) error { + // get nonce + nonce, err := c.getTxNonce(ctx, from) + if err != nil { + err := fmt.Errorf("failed to get nonce: %w", err) + log.Errorf(err.Error()) + return err + } + + // get gas + gas, err := c.etherman.EstimateGas(ctx, from, to, value, data) + if err != nil { + err := fmt.Errorf("failed to estimate gas: %w, data: %v", err, common.Bytes2Hex(data)) + log.Error(err.Error()) + if c.cfg.ForcedGas > 0 { + gas = c.cfg.ForcedGas + } else { + return err + } + } + + // get gas price + gasPrice, err := c.suggestedGasPrice(ctx) + if err != nil { + err := fmt.Errorf("failed to get suggested gas price: %w", err) + log.Errorf(err.Error()) + return err + } + + // create monitored tx + mTx := txmTypes.MonitoredTx{ + Owner: owner, + ID: id, + From: from, + To: to, + Nonce: nonce, + Value: value, + Data: data, + Gas: gas, + GasOffset: gasOffset, + GasPrice: gasPrice, + Status: txmTypes.MonitoredTxStatusCreated, + } + + // add to storage + err = c.storage.Add(ctx, mTx, dbTx) + if err != nil { + err := fmt.Errorf("failed to add tx to get monitored: %w", err) + log.Errorf(err.Error()) + return err + } + + mTxLog := log.WithFields("monitoredTx", mTx.ID, "createdAt", mTx.CreatedAt) + mTxLog.Infof("created") + + return nil +} + +// Result returns the current result of the transaction execution with all the details +func (c *Client) Result(ctx context.Context, owner, id string, dbTx pgx.Tx) (txmTypes.MonitoredTxResult, error) { + mTx, err := c.storage.Get(ctx, owner, id, dbTx) + if err != nil { + return txmTypes.MonitoredTxResult{}, err + } + + return c.buildResult(ctx, mTx) +} + +func (c *Client) buildResult(ctx context.Context, mTx txmTypes.MonitoredTx) (txmTypes.MonitoredTxResult, error) { + history := mTx.HistoryHashSlice() + txs := make(map[common.Hash]txmTypes.TxResult, len(history)) + + for _, txHash := range history { + tx, _, err := c.etherman.GetTx(ctx, txHash) + if !errors.Is(err, ethereum.NotFound) && err != nil { + return txmTypes.MonitoredTxResult{}, err + } + + receipt, err := c.etherman.GetTxReceipt(ctx, txHash) + if !errors.Is(err, ethereum.NotFound) && err != nil { + return txmTypes.MonitoredTxResult{}, err + } + + revertMessage, err := c.etherman.GetRevertMessage(ctx, tx) + if !errors.Is(err, ethereum.NotFound) && err != nil && err.Error() != txmTypes.ErrExecutionReverted.Error() { + return txmTypes.MonitoredTxResult{}, err + } + + txs[txHash] = txmTypes.TxResult{ + Tx: tx, + Receipt: receipt, + RevertMessage: revertMessage, + } + } + + result := txmTypes.MonitoredTxResult{ + ID: mTx.ID, + Status: mTx.Status, + Txs: txs, + } + + return result, nil +} + +// monitorTxs process all pending monitored tx +func (c *Client) monitorTxs(ctx context.Context) error { + statusesFilter := []txmTypes.MonitoredTxStatus{txmTypes.MonitoredTxStatusCreated, txmTypes.MonitoredTxStatusSent, txmTypes.MonitoredTxStatusReorged} + mTxs, err := c.storage.GetByStatus(ctx, nil, statusesFilter, nil) + if err != nil { + return fmt.Errorf("failed to get created monitored txs: %v", err) + } + + log.Infof("found %v monitored tx to process", len(mTxs)) + + wg := sync.WaitGroup{} + wg.Add(len(mTxs)) + for _, mTx := range mTxs { + if mTx.NumRetries == 0 { + // this is only done for old monitored txs that were not updated before this fix + mTx.NumRetries = uint64(len(mTx.History)) + } + + mTx := mTx // force variable shadowing to avoid pointer conflicts + go func(c *Client, mTx txmTypes.MonitoredTx) { + mTxLogger := createMonitoredTxLogger(mTx) + defer func(mTx txmTypes.MonitoredTx, mTxLogger *zap.SugaredLogger) { + if err := recover(); err != nil { + mTxLogger.Error("monitoring recovered from this err: %v", err) + } + wg.Done() + }(mTx, mTxLogger) + c.monitorTx(ctx, mTx, mTxLogger) + }(c, mTx) + } + wg.Wait() + + return nil +} + +// monitorTx does all the monitoring steps to the monitored tx +func (c *Client) monitorTx(ctx context.Context, mTx txmTypes.MonitoredTx, logger *zap.SugaredLogger) { + var err error + logger.Info("processing") + // check if any of the txs in the history was confirmed + var lastReceiptChecked types.Receipt + // monitored tx is confirmed until we find a successful receipt + confirmed := false + // monitored tx doesn't have a failed receipt until we find a failed receipt for any + // tx in the monitored tx history + hasFailedReceipts := false + // all history txs are considered mined until we can't find a receipt for any + // tx in the monitored tx history + allHistoryTxsWereMined := true + for txHash := range mTx.History { + mined, receipt, err := c.etherman.CheckTxWasMined(ctx, txHash) + if err != nil { + logger.Errorf("failed to check if tx %v was mined: %v", txHash.String(), err) + continue + } + + // if the tx is not mined yet, check that not all the tx were mined and go to the next + if !mined { + allHistoryTxsWereMined = false + continue + } + + lastReceiptChecked = *receipt + + // if the tx was mined successfully we can set it as confirmed and break the loop + if lastReceiptChecked.Status == types.ReceiptStatusSuccessful { + confirmed = true + break + } + + // if the tx was mined but failed, we continue to consider it was not confirmed + // and set that we have found a failed receipt. This info will be used later + // to check if nonce needs to be reviewed + confirmed = false + hasFailedReceipts = true + } + + // we need to check if we need to review the nonce carefully, to avoid sending + // duplicated data to the roll-up and causing an unnecessary trusted state reorg. + // + // if we have failed receipts, this means at least one of the generated txs was mined, + // in this case maybe the current nonce was already consumed(if this is the first iteration + // of this cycle, next iteration might have the nonce already updated by the preivous one), + // then we need to check if there are tx that were not mined yet, if so, we just need to wait + // because maybe one of them will get mined successfully + // + // in case of the monitored tx is not confirmed yet, all tx were mined and none of them were + // mined successfully, we need to review the nonce + if !confirmed && hasFailedReceipts && allHistoryTxsWereMined { + logger.Infof("nonce needs to be updated") + err := c.reviewMonitoredTxNonce(ctx, &mTx, logger) + if err != nil { + logger.Errorf("failed to review monitored tx nonce: %v", err) + return + } + err = c.storage.Update(ctx, mTx, nil) + if err != nil { + logger.Errorf("failed to update monitored tx nonce change: %v", err) + return + } + } + + // if num of retires reaches the max retry limit, this means something is really wrong with + // this Tx and we are not able to identify automatically, so we mark this as failed to let the + // caller know something is not right and needs to be review and to avoid to monitor this + // tx infinitely + if mTx.NumRetries >= c.cfg.MaxRetries { + mTx.Status = txmTypes.MonitoredTxStatusFailed + logger.Infof("marked as failed because reached the num of retires limit: %v", err) + // update monitored tx changes into storage + err = c.storage.Update(ctx, mTx, nil) + if err != nil { + logger.Errorf("failed to update monitored tx when num of retires reached: %v", err) + } + + return + } + + var signedTx *types.Transaction + if !confirmed { + // if is a reorged, move to the next + if mTx.Status == txmTypes.MonitoredTxStatusReorged { + return + } + + // review tx and increase gas and gas price if needed + if mTx.Status == txmTypes.MonitoredTxStatusSent { + err := c.reviewMonitoredTx(ctx, &mTx, logger) + if err != nil { + logger.Errorf("failed to review monitored tx: %v", err) + mTx.NumRetries++ + + // update numRetries and return + if err := c.storage.Update(ctx, mTx, nil); err != nil { + logger.Errorf("failed to update monitored tx review change: %v", err) + } + + return + } + + if err := c.storage.Update(ctx, mTx, nil); err != nil { + logger.Errorf("failed to update monitored tx review change: %v", err) + return + } + } + + // rebuild transaction + tx := mTx.Tx() + logger.Debugf("unsigned tx %v created", tx.Hash().String()) + + // sign tx + signedTx, err = c.etherman.SignTx(ctx, mTx.From, tx) + if err != nil { + logger.Errorf("failed to sign tx %v: %v", tx.Hash().String(), err) + return + } + logger.Debugf("signed tx %v created", signedTx.Hash().String()) + + // add tx to monitored tx history + err = mTx.AddHistory(signedTx) + if errors.Is(err, txmTypes.ErrAlreadyExists) { + logger.Infof("signed tx already existed in the history") + } else if err != nil { + logger.Errorf("failed to add signed tx %v to monitored tx history: %v", signedTx.Hash().String(), err) + return + } else { + // update monitored tx changes into storage + err = c.storage.Update(ctx, mTx, nil) + if err != nil { + logger.Errorf("failed to update monitored tx: %v", err) + return + } + logger.Debugf("signed tx added to the monitored tx history") + } + + // check if the tx is already in the network, if not, send it + _, _, err = c.etherman.GetTx(ctx, signedTx.Hash()) + // if not found, send it tx to the network + if errors.Is(err, ethereum.NotFound) { + logger.Debugf("signed tx not found in the network") + err := c.etherman.SendTx(ctx, signedTx) + if err != nil { + logger.Errorf("failed to send tx %v to network: %v", signedTx.Hash().String(), err) + return + } + logger.Infof("signed tx sent to the network: %v", signedTx.Hash().String()) + if mTx.Status == txmTypes.MonitoredTxStatusCreated { + // update tx status to sent + mTx.Status = txmTypes.MonitoredTxStatusSent + logger.Debugf("status changed to %v", string(mTx.Status)) + // update monitored tx changes into storage + err = c.storage.Update(ctx, mTx, nil) + if err != nil { + logger.Errorf("failed to update monitored tx changes: %v", err) + return + } + } + } else { + logger.Infof("signed tx already found in the network") + } + + log.Infof("waiting signedTx to be mined...") + + // wait tx to get mined + confirmed, err = c.etherman.WaitTxToBeMined(ctx, signedTx, c.cfg.WaitTxToBeMined.Duration) + if err != nil { + logger.Errorf("failed to wait tx to be mined: %v", err) + return + } + if !confirmed { + log.Infof("signedTx not mined yet and timeout has been reached") + return + } + + // get tx receipt + var txReceipt *types.Receipt + txReceipt, err = c.etherman.GetTxReceipt(ctx, signedTx.Hash()) + if err != nil { + logger.Errorf("failed to get tx receipt for tx %v: %v", signedTx.Hash().String(), err) + return + } + lastReceiptChecked = *txReceipt + } + + // if mined, check receipt and mark as Failed or Confirmed + if lastReceiptChecked.Status == types.ReceiptStatusSuccessful { + receiptBlockNum := lastReceiptChecked.BlockNumber.Uint64() + + // check if state is already synchronized until the block + // where the tx was mined + block, err := c.state.GetLastBlock(ctx, nil) + if errors.Is(err, state.ErrStateNotSynchronized) { + logger.Debugf("state not synchronized yet, waiting for L1 block %v to be synced", receiptBlockNum) + return + } else if err != nil { + logger.Errorf("failed to check if L1 block %v is already synced: %v", receiptBlockNum, err) + return + } else if block.BlockNumber < receiptBlockNum { + logger.Debugf("L1 block %v not synchronized yet, waiting for L1 block to be synced in order to confirm monitored tx", receiptBlockNum) + return + } else { + mTx.Status = txmTypes.MonitoredTxStatusConfirmed + mTx.BlockNumber = lastReceiptChecked.BlockNumber + logger.Info("confirmed") + } + } else { + // if we should continue to monitor, we move to the next one and this will + // be reviewed in the next monitoring cycle + if c.shouldContinueToMonitorThisTx(ctx, lastReceiptChecked) { + return + } + // otherwise we understand this monitored tx has failed + mTx.Status = txmTypes.MonitoredTxStatusFailed + mTx.BlockNumber = lastReceiptChecked.BlockNumber + logger.Info("failed") + } + + // update monitored tx changes into storage + err = c.storage.Update(ctx, mTx, nil) + if err != nil { + logger.Errorf("failed to update monitored tx: %v", err) + return + } +} + +// getTxNonce get the nonce for the given account +func (c *Client) getTxNonce(ctx context.Context, from common.Address) (uint64, error) { + // Get created transactions from the database for the given account + createdTxs, err := c.storage.GetBySenderAndStatus(ctx, from, []txmTypes.MonitoredTxStatus{txmTypes.MonitoredTxStatusCreated}, nil) + if err != nil { + return 0, fmt.Errorf("failed to get created monitored txs: %w", err) + } + + var nonce uint64 + if len(createdTxs) > 0 { + // if there are pending txs, we adjust the nonce accordingly + for _, createdTx := range createdTxs { + if createdTx.Nonce > nonce { + nonce = createdTx.Nonce + } + } + + nonce++ + } else { + // if there are no pending txs, we get the pending nonce from the etherman + if nonce, err = c.etherman.PendingNonce(ctx, from); err != nil { + return 0, fmt.Errorf("failed to get pending nonce: %w", err) + } + } + + return nonce, nil +} + +// shouldContinueToMonitorThisTx checks the the tx receipt and decides if it should +// continue or not to monitor the monitored tx related to the tx from this receipt +func (c *Client) shouldContinueToMonitorThisTx(ctx context.Context, receipt types.Receipt) bool { + // if the receipt has a is successful result, stop monitoring + if receipt.Status == types.ReceiptStatusSuccessful { + return false + } + + tx, _, err := c.etherman.GetTx(ctx, receipt.TxHash) + if err != nil { + log.Errorf("failed to get tx when monitored tx identified as failed, tx : %v", receipt.TxHash.String(), err) + return false + } + _, err = c.etherman.GetRevertMessage(ctx, tx) + if err != nil { + // if the error when getting the revert message is not identified, continue to monitor + if err.Error() == txmTypes.ErrExecutionReverted.Error() { + return true + } else { + log.Errorf("failed to get revert message for monitored tx identified as failed, tx %v: %v", receipt.TxHash.String(), err) + } + } + // if nothing weird was found, stop monitoring + return false +} + +// reviewMonitoredTx checks if some field needs to be updated +// accordingly to the current information stored and the current +// state of the blockchain +func (c *Client) reviewMonitoredTx(ctx context.Context, mTx *txmTypes.MonitoredTx, mTxLogger *zap.SugaredLogger) error { + mTxLogger.Debug("reviewing") + // get gas + gas, err := c.etherman.EstimateGas(ctx, mTx.From, mTx.To, mTx.Value, mTx.Data) + if err != nil { + err := fmt.Errorf("failed to estimate gas: %w", err) + mTxLogger.Errorf(err.Error()) + return err + } + + // check gas + if gas > mTx.Gas { + mTxLogger.Infof("monitored tx gas updated from %v to %v", mTx.Gas, gas) + mTx.Gas = gas + } + + // get gas price + gasPrice, err := c.suggestedGasPrice(ctx) + if err != nil { + err := fmt.Errorf("failed to get suggested gas price: %w", err) + mTxLogger.Errorf(err.Error()) + return err + } + + // check gas price + if gasPrice.Cmp(mTx.GasPrice) == 1 { + mTxLogger.Infof("monitored tx gas price updated from %v to %v", mTx.GasPrice.String(), gasPrice.String()) + mTx.GasPrice = gasPrice + } + return nil +} + +// reviewMonitoredTxNonce checks if the nonce needs to be updated accordingly to +// the current nonce of the sender account. +// +// IMPORTANT: Nonce is reviewed apart from the other fields because it is a very +// sensible information and can make duplicated data to be sent to the blockchain, +// causing possible side effects and wasting resources. +func (c *Client) reviewMonitoredTxNonce(ctx context.Context, mTx *txmTypes.MonitoredTx, mTxLogger *zap.SugaredLogger) error { + mTxLogger.Debug("reviewing nonce") + nonce, err := c.getTxNonce(ctx, mTx.From) + if err != nil { + err := fmt.Errorf("failed to load current nonce for acc %v: %w", mTx.From.String(), err) + mTxLogger.Errorf(err.Error()) + return err + } + + if nonce > mTx.Nonce { + mTxLogger.Infof("monitored tx nonce updated from %v to %v", mTx.Nonce, nonce) + mTx.Nonce = nonce + } + + return nil +} + +func (c *Client) suggestedGasPrice(ctx context.Context) (*big.Int, error) { + // get gas price + gasPrice, err := c.etherman.SuggestedGasPrice(ctx) + if err != nil { + return nil, err + } + + // adjust the gas price by the margin factor + marginFactor := big.NewFloat(0).SetFloat64(c.cfg.GasPriceMarginFactor) + fGasPrice := big.NewFloat(0).SetInt(gasPrice) + adjustedGasPrice, _ := big.NewFloat(0).Mul(fGasPrice, marginFactor).Int(big.NewInt(0)) + + // if there is a max gas price limit configured and the current + // adjusted gas price is over this limit, set the gas price as the limit + if c.cfg.MaxGasPriceLimit > 0 { + maxGasPrice := big.NewInt(0).SetUint64(c.cfg.MaxGasPriceLimit) + if adjustedGasPrice.Cmp(maxGasPrice) == 1 { + adjustedGasPrice.Set(maxGasPrice) + } + } + + return adjustedGasPrice, nil +} + +// logErrorAndWait used when an error is detected before trying again +func (c *Client) logErrorAndWait(msg string, err error) { + log.Errorf(msg, err) + time.Sleep(failureIntervalInSeconds * time.Second) +} + +// createMonitoredTxLogger creates an instance of logger with all the important +// fields already set for a monitoredTx +func createMonitoredTxLogger(mTx txmTypes.MonitoredTx) *zap.SugaredLogger { + return log.WithFields( + "owner", mTx.Owner, + "monitoredTxId", mTx.ID, + "createdAt", mTx.CreatedAt, + "from", mTx.From, + "to", mTx.To, + ) +} diff --git a/txmanager/txmanager_test.go b/txmanager/txmanager_test.go new file mode 100644 index 00000000..ef409b26 --- /dev/null +++ b/txmanager/txmanager_test.go @@ -0,0 +1,918 @@ +package txmanager + +import ( + "context" + "errors" + "fmt" + "math/big" + "testing" + "time" + + "github.com/0xPolygon/agglayer/config" + "github.com/0xPolygon/agglayer/mocks" + txmTypes "github.com/0xPolygon/agglayer/txmanager/types" + "github.com/0xPolygonHermez/zkevm-node/config/types" + "github.com/0xPolygonHermez/zkevm-node/ethtxmanager" + "github.com/0xPolygonHermez/zkevm-node/state" + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + ethTypes "github.com/ethereum/go-ethereum/core/types" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +var defaultEthTxmanagerConfigForTests = config.EthTxManagerConfig{ + Config: ethtxmanager.Config{ + FrequencyToMonitorTxs: types.NewDuration(time.Millisecond), + WaitTxToBeMined: types.NewDuration(time.Second), + GasPriceMarginFactor: 1, + MaxGasPriceLimit: 0, + }, + MaxRetries: 10, +} + +func TestTxGetMined(t *testing.T) { + dbCfg := newStateDBConfig(t) + etherman := mocks.NewEthermanMock(t) + storage, err := NewPostgresStorage(dbCfg) + require.NoError(t, err) + + ethTxManagerClient := New(defaultEthTxmanagerConfigForTests, etherman, storage, etherman) + + owner := "owner" + id := "unique_id" + from := common.HexToAddress("") + var to *common.Address + var value *big.Int + var data []byte = nil + + ctx := context.Background() + + currentNonce := uint64(1) + etherman. + On("PendingNonce", ctx, from). + Return(currentNonce, nil). + Once() + + estimatedGas := uint64(1) + etherman. + On("EstimateGas", ctx, from, to, value, data). + Return(estimatedGas, nil). + Once() + + gasOffset := uint64(1) + + suggestedGasPrice := big.NewInt(1) + etherman. + On("SuggestedGasPrice", ctx). + Return(suggestedGasPrice, nil). + Once() + + signedTx := ethTypes.NewTx(ðTypes.LegacyTx{ + Nonce: currentNonce, + To: to, + Value: value, + Gas: estimatedGas + gasOffset, + GasPrice: suggestedGasPrice, + Data: data, + }) + etherman. + On("SignTx", ctx, from, mock.IsType(ðTypes.Transaction{})). + Return(signedTx, nil). + Once() + + etherman. + On("GetTx", ctx, signedTx.Hash()). + Return(nil, false, ethereum.NotFound). + Once() + etherman. + On("GetTx", ctx, signedTx.Hash()). + Return(signedTx, false, nil). + Once() + + etherman. + On("SendTx", ctx, signedTx). + Return(nil). + Once() + + etherman. + On("WaitTxToBeMined", ctx, signedTx, mock.IsType(time.Second)). + Return(true, nil). + Once() + + blockNumber := big.NewInt(1) + + receipt := ðTypes.Receipt{ + BlockNumber: blockNumber, + Status: ethTypes.ReceiptStatusSuccessful, + } + etherman. + On("GetTxReceipt", ctx, signedTx.Hash()). + Return(receipt, nil). + Once() + etherman. + On("GetTxReceipt", ctx, signedTx.Hash()). + Run(func(args mock.Arguments) { ethTxManagerClient.Stop() }). // stops the management cycle to avoid problems with mocks + Return(receipt, nil). + Once() + + etherman. + On("GetRevertMessage", ctx, signedTx). + Return("", nil). + Once() + + block := &state.Block{ + BlockNumber: blockNumber.Uint64(), + } + etherman. + On("GetLastBlock", ctx, nil). + Return(block, nil). + Once() + + err = ethTxManagerClient.Add(ctx, owner, id, from, to, value, data, gasOffset, nil) + require.NoError(t, err) + + go ethTxManagerClient.Start() + + time.Sleep(time.Second) + result, err := ethTxManagerClient.Result(ctx, owner, id, nil) + require.NoError(t, err) + require.Equal(t, id, result.ID) + require.Equal(t, txmTypes.MonitoredTxStatusConfirmed, result.Status) + require.Equal(t, 1, len(result.Txs)) + require.Equal(t, signedTx, result.Txs[signedTx.Hash()].Tx) + require.Equal(t, receipt, result.Txs[signedTx.Hash()].Receipt) + require.Equal(t, "", result.Txs[signedTx.Hash()].RevertMessage) +} + +func TestTxGetMinedAfterReviewed(t *testing.T) { + dbCfg := newStateDBConfig(t) + + etherman := mocks.NewEthermanMock(t) + storage, err := NewPostgresStorage(dbCfg) + require.NoError(t, err) + + ethTxManagerClient := New(defaultEthTxmanagerConfigForTests, etherman, storage, etherman) + + ctx := context.Background() + + owner := "owner" + id := "unique_id" + from := common.HexToAddress("") + var to *common.Address + var value *big.Int + var data []byte = nil + + // Add + currentNonce := uint64(1) + etherman. + On("PendingNonce", ctx, from). + Return(currentNonce, nil). + Once() + + firstGasEstimation := uint64(1) + etherman. + On("EstimateGas", ctx, from, to, value, data). + Return(firstGasEstimation, nil). + Once() + + gasOffset := uint64(2) + + firstGasPriceSuggestion := big.NewInt(1) + etherman. + On("SuggestedGasPrice", ctx). + Return(firstGasPriceSuggestion, nil). + Once() + + // Monitoring Cycle 1 + firstSignedTx := ethTypes.NewTx(ðTypes.LegacyTx{ + Nonce: currentNonce, + To: to, + Value: value, + Gas: firstGasEstimation + gasOffset, + GasPrice: firstGasPriceSuggestion, + Data: data, + }) + etherman. + On("SignTx", ctx, from, mock.IsType(ðTypes.Transaction{})). + Return(firstSignedTx, nil). + Once() + etherman. + On("GetTx", ctx, firstSignedTx.Hash()). + Return(nil, false, ethereum.NotFound). + Once() + etherman. + On("SendTx", ctx, firstSignedTx). + Return(nil). + Once() + etherman. + On("WaitTxToBeMined", ctx, firstSignedTx, mock.IsType(time.Second)). + Return(false, errors.New("tx not mined yet")). + Once() + + // Monitoring Cycle 2 + etherman. + On("CheckTxWasMined", ctx, firstSignedTx.Hash()). + Return(false, nil, nil). + Once() + + secondGasEstimation := uint64(2) + etherman. + On("EstimateGas", ctx, from, to, value, data). + Return(secondGasEstimation, nil). + Once() + secondGasPriceSuggestion := big.NewInt(2) + etherman. + On("SuggestedGasPrice", ctx). + Return(secondGasPriceSuggestion, nil). + Once() + + secondSignedTx := ethTypes.NewTx(ðTypes.LegacyTx{ + Nonce: currentNonce, + To: to, + Value: value, + Gas: secondGasEstimation + gasOffset, + GasPrice: secondGasPriceSuggestion, + Data: data, + }) + etherman. + On("SignTx", ctx, from, mock.IsType(ðTypes.Transaction{})). + Return(secondSignedTx, nil). + Once() + etherman. + On("GetTx", ctx, secondSignedTx.Hash()). + Return(nil, false, ethereum.NotFound). + Once() + etherman. + On("SendTx", ctx, secondSignedTx). + Return(nil). + Once() + etherman. + On("WaitTxToBeMined", ctx, secondSignedTx, mock.IsType(time.Second)). + Run(func(args mock.Arguments) { ethTxManagerClient.Stop() }). // stops the management cycle to avoid problems with mocks + Return(true, nil). + Once() + + blockNumber := big.NewInt(1) + + receipt := ðTypes.Receipt{ + BlockNumber: blockNumber, + Status: ethTypes.ReceiptStatusSuccessful, + } + etherman. + On("GetTxReceipt", ctx, secondSignedTx.Hash()). + Return(receipt, nil). + Once() + + block := &state.Block{ + BlockNumber: blockNumber.Uint64(), + } + etherman. + On("GetLastBlock", ctx, nil). + Return(block, nil). + Once() + + // Build result + etherman. + On("GetTx", ctx, firstSignedTx.Hash()). + Return(firstSignedTx, false, nil). + Once() + etherman. + On("GetTxReceipt", ctx, firstSignedTx.Hash()). + Return(nil, ethereum.NotFound). + Once() + etherman. + On("GetRevertMessage", ctx, firstSignedTx). + Return("", nil). + Once() + etherman. + On("GetTx", ctx, secondSignedTx.Hash()). + Return(secondSignedTx, false, nil). + Once() + etherman. + On("GetTxReceipt", ctx, secondSignedTx.Hash()). + Return(receipt, nil). + Once() + etherman. + On("GetRevertMessage", ctx, secondSignedTx). + Return("", nil). + Once() + + err = ethTxManagerClient.Add(ctx, owner, id, from, to, value, data, gasOffset, nil) + require.NoError(t, err) + + go ethTxManagerClient.Start() + + time.Sleep(time.Second) + result, err := ethTxManagerClient.Result(ctx, owner, id, nil) + require.NoError(t, err) + require.Equal(t, txmTypes.MonitoredTxStatusConfirmed, result.Status) +} + +func TestExecutionReverted(t *testing.T) { + dbCfg := newStateDBConfig(t) + etherman := mocks.NewEthermanMock(t) + storage, err := NewPostgresStorage(dbCfg) + require.NoError(t, err) + + ethTxManagerClient := New(defaultEthTxmanagerConfigForTests, etherman, storage, etherman) + + ctx := context.Background() + + owner := "owner" + id := "unique_id" + from := common.HexToAddress("") + var to *common.Address + var value *big.Int + var data []byte = nil + + // Add + currentNonce := uint64(1) + etherman. + On("PendingNonce", ctx, from). + Return(currentNonce, nil). + Once() + + firstGasEstimation := uint64(1) + etherman. + On("EstimateGas", ctx, from, to, value, data). + Return(firstGasEstimation, nil). + Once() + + gasOffset := uint64(1) + + firstGasPriceSuggestion := big.NewInt(1) + etherman. + On("SuggestedGasPrice", ctx). + Return(firstGasPriceSuggestion, nil). + Once() + + // Monitoring Cycle 1 + firstSignedTx := ethTypes.NewTx(ðTypes.LegacyTx{ + Nonce: currentNonce, + To: to, + Value: value, + Gas: firstGasEstimation + gasOffset, + GasPrice: firstGasPriceSuggestion, + Data: data, + }) + etherman. + On("SignTx", ctx, from, mock.IsType(ðTypes.Transaction{})). + Return(firstSignedTx, nil). + Once() + etherman. + On("GetTx", ctx, firstSignedTx.Hash()). + Return(nil, false, ethereum.NotFound). + Once() + etherman. + On("SendTx", ctx, firstSignedTx). + Return(nil). + Once() + etherman. + On("WaitTxToBeMined", ctx, firstSignedTx, mock.IsType(time.Second)). + Return(true, nil). + Once() + + blockNumber := big.NewInt(1) + failedReceipt := ðTypes.Receipt{ + BlockNumber: blockNumber, + Status: ethTypes.ReceiptStatusFailed, + TxHash: firstSignedTx.Hash(), + } + + etherman. + On("GetTxReceipt", ctx, firstSignedTx.Hash()). + Return(failedReceipt, nil). + Once() + etherman. + On("GetTx", ctx, firstSignedTx.Hash()). + Return(firstSignedTx, false, nil). + Once() + etherman. + On("GetRevertMessage", ctx, firstSignedTx). + Return("", txmTypes.ErrExecutionReverted). + Once() + + // Monitoring Cycle 2 + etherman. + On("CheckTxWasMined", ctx, firstSignedTx.Hash()). + Return(true, failedReceipt, nil). + Once() + + currentNonce = uint64(2) + etherman. + On("PendingNonce", ctx, from). + Return(currentNonce, nil). + Once() + secondGasEstimation := uint64(2) + etherman. + On("EstimateGas", ctx, from, to, value, data). + Return(secondGasEstimation, nil). + Once() + secondGasPriceSuggestion := big.NewInt(2) + etherman. + On("SuggestedGasPrice", ctx). + Return(secondGasPriceSuggestion, nil). + Once() + + secondSignedTx := ethTypes.NewTx(ðTypes.LegacyTx{ + Nonce: currentNonce, + To: to, + Value: value, + Gas: secondGasEstimation + gasOffset, + GasPrice: secondGasPriceSuggestion, + Data: data, + }) + etherman. + On("SignTx", ctx, from, mock.IsType(ðTypes.Transaction{})). + Return(secondSignedTx, nil). + Once() + etherman. + On("GetTx", ctx, secondSignedTx.Hash()). + Return(nil, false, ethereum.NotFound). + Once() + etherman. + On("SendTx", ctx, secondSignedTx). + Return(nil). + Once() + etherman. + On("WaitTxToBeMined", ctx, secondSignedTx, mock.IsType(time.Second)). + Run(func(args mock.Arguments) { ethTxManagerClient.Stop() }). // stops the management cycle to avoid problems with mocks + Return(true, nil). + Once() + + blockNumber = big.NewInt(2) + receipt := ðTypes.Receipt{ + BlockNumber: blockNumber, + Status: ethTypes.ReceiptStatusSuccessful, + } + etherman. + On("GetTxReceipt", ctx, secondSignedTx.Hash()). + Return(receipt, nil). + Once() + + block := &state.Block{ + BlockNumber: blockNumber.Uint64(), + } + etherman. + On("GetLastBlock", ctx, nil). + Return(block, nil). + Once() + + // Build result + etherman. + On("GetTx", ctx, firstSignedTx.Hash()). + Return(firstSignedTx, false, nil). + Once() + etherman. + On("GetTxReceipt", ctx, firstSignedTx.Hash()). + Return(nil, ethereum.NotFound). + Once() + etherman. + On("GetRevertMessage", ctx, firstSignedTx). + Return("", nil). + Once() + etherman. + On("GetTx", ctx, secondSignedTx.Hash()). + Return(secondSignedTx, false, nil). + Once() + etherman. + On("GetTxReceipt", ctx, secondSignedTx.Hash()). + Return(receipt, nil). + Once() + etherman. + On("GetRevertMessage", ctx, secondSignedTx). + Return("", nil). + Once() + + err = ethTxManagerClient.Add(ctx, owner, id, from, to, value, data, gasOffset, nil) + require.NoError(t, err) + + go ethTxManagerClient.Start() + + time.Sleep(time.Second) + result, err := ethTxManagerClient.Result(ctx, owner, id, nil) + require.NoError(t, err) + require.Equal(t, txmTypes.MonitoredTxStatusConfirmed, result.Status) +} + +func TestGasPriceMarginAndLimit(t *testing.T) { + type testCase struct { + name string + gasPriceMarginFactor float64 + maxGasPriceLimit uint64 + suggestedGasPrice int64 + expectedGasPrice int64 + } + + testCases := []testCase{ + { + name: "no margin and no limit", + gasPriceMarginFactor: 1, + maxGasPriceLimit: 0, + suggestedGasPrice: 100, + expectedGasPrice: 100, + }, + { + name: "20% margin", + gasPriceMarginFactor: 1.2, + maxGasPriceLimit: 0, + suggestedGasPrice: 100, + expectedGasPrice: 120, + }, + { + name: "20% margin but limited", + gasPriceMarginFactor: 1.2, + maxGasPriceLimit: 110, + suggestedGasPrice: 100, + expectedGasPrice: 110, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + dbCfg := newStateDBConfig(t) + etherman := mocks.NewEthermanMock(t) + storage, err := NewPostgresStorage(dbCfg) + require.NoError(t, err) + + var cfg = config.EthTxManagerConfig{ + Config: ethtxmanager.Config{ + FrequencyToMonitorTxs: defaultEthTxmanagerConfigForTests.FrequencyToMonitorTxs, + WaitTxToBeMined: defaultEthTxmanagerConfigForTests.WaitTxToBeMined, + GasPriceMarginFactor: tc.gasPriceMarginFactor, + MaxGasPriceLimit: tc.maxGasPriceLimit, + }, + MaxRetries: defaultEthTxmanagerConfigForTests.MaxRetries, + } + + ethTxManagerClient := New(cfg, etherman, storage, etherman) + + owner := "owner" + id := "unique_id" + from := common.HexToAddress("") + var to *common.Address + var value *big.Int + var data []byte = nil + + ctx := context.Background() + + currentNonce := uint64(1) + etherman. + On("PendingNonce", ctx, from). + Return(currentNonce, nil). + Once() + + estimatedGas := uint64(1) + etherman. + On("EstimateGas", ctx, from, to, value, data). + Return(estimatedGas, nil). + Once() + + gasOffset := uint64(1) + + suggestedGasPrice := big.NewInt(tc.suggestedGasPrice) + etherman. + On("SuggestedGasPrice", ctx). + Return(suggestedGasPrice, nil). + Once() + + expectedSuggestedGasPrice := big.NewInt(tc.expectedGasPrice) + + err = ethTxManagerClient.Add(ctx, owner, id, from, to, value, data, gasOffset, nil) + require.NoError(t, err) + + monitoredTx, err := storage.Get(ctx, owner, id, nil) + require.NoError(t, err) + require.Equal(t, monitoredTx.GasPrice.Cmp(expectedSuggestedGasPrice), 0, + fmt.Sprintf("expected gas price %v, found %v", expectedSuggestedGasPrice.String(), monitoredTx.GasPrice.String())) + }) + } +} + +func TestGasOffset(t *testing.T) { + type testCase struct { + name string + estimatedGas uint64 + gasOffset uint64 + expectedGas uint64 + } + + testCases := []testCase{ + { + name: "no gas offset", + estimatedGas: 1, + gasOffset: 0, + expectedGas: 1, + }, + { + name: "gas offset", + estimatedGas: 1, + gasOffset: 1, + expectedGas: 2, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + dbCfg := newStateDBConfig(t) + + etherman := mocks.NewEthermanMock(t) + storage, err := NewPostgresStorage(dbCfg) + require.NoError(t, err) + + var cfg = config.EthTxManagerConfig{ + Config: ethtxmanager.Config{ + FrequencyToMonitorTxs: defaultEthTxmanagerConfigForTests.FrequencyToMonitorTxs, + WaitTxToBeMined: defaultEthTxmanagerConfigForTests.WaitTxToBeMined, + }, + } + + ethTxManagerClient := New(cfg, etherman, storage, etherman) + + owner := "owner" + id := "unique_id" + from := common.HexToAddress("") + var to *common.Address + var value *big.Int + var data []byte = nil + + ctx := context.Background() + + currentNonce := uint64(1) + etherman. + On("PendingNonce", ctx, from). + Return(currentNonce, nil). + Once() + + etherman. + On("EstimateGas", ctx, from, to, value, data). + Return(tc.estimatedGas, nil). + Once() + + suggestedGasPrice := big.NewInt(int64(10)) + etherman. + On("SuggestedGasPrice", ctx). + Return(suggestedGasPrice, nil). + Once() + + err = ethTxManagerClient.Add(ctx, owner, id, from, to, value, data, tc.gasOffset, nil) + require.NoError(t, err) + + monitoredTx, err := storage.Get(ctx, owner, id, nil) + require.NoError(t, err) + require.Equal(t, monitoredTx.Gas, tc.estimatedGas) + require.Equal(t, monitoredTx.GasOffset, tc.gasOffset) + + tx := monitoredTx.Tx() + require.Equal(t, tx.Gas(), tc.expectedGas) + }) + } +} + +func TestFailedToEstimateTxWithForcedGasGetMined(t *testing.T) { + dbCfg := newStateDBConfig(t) + etherman := mocks.NewEthermanMock(t) + storage, err := NewPostgresStorage(dbCfg) + require.NoError(t, err) + + // set forced gas + defaultEthTxmanagerConfigForTests.ForcedGas = 300000000 + + ethTxManagerClient := New(defaultEthTxmanagerConfigForTests, etherman, storage, etherman) + + owner := "owner" + id := "unique_id" + from := common.HexToAddress("") + var to *common.Address + var value *big.Int + var data []byte = nil + + ctx := context.Background() + + currentNonce := uint64(1) + etherman. + On("PendingNonce", ctx, from). + Return(currentNonce, nil). + Once() + + // forces the estimate gas to fail + etherman. + On("EstimateGas", ctx, from, to, value, data). + Return(uint64(0), fmt.Errorf("failed to estimate gas")). + Once() + + // set estimated gas as the config ForcedGas + estimatedGas := defaultEthTxmanagerConfigForTests.ForcedGas + gasOffset := uint64(1) + + suggestedGasPrice := big.NewInt(1) + etherman. + On("SuggestedGasPrice", ctx). + Return(suggestedGasPrice, nil). + Once() + + signedTx := ethTypes.NewTx(ðTypes.LegacyTx{ + Nonce: currentNonce, + To: to, + Value: value, + Gas: estimatedGas + gasOffset, + GasPrice: suggestedGasPrice, + Data: data, + }) + etherman. + On("SignTx", ctx, from, mock.IsType(ðTypes.Transaction{})). + Return(signedTx, nil). + Once() + + etherman. + On("GetTx", ctx, signedTx.Hash()). + Return(nil, false, ethereum.NotFound). + Once() + etherman. + On("GetTx", ctx, signedTx.Hash()). + Return(signedTx, false, nil). + Once() + + etherman. + On("SendTx", ctx, signedTx). + Return(nil). + Once() + + etherman. + On("WaitTxToBeMined", ctx, signedTx, mock.IsType(time.Second)). + Return(true, nil). + Once() + + blockNumber := big.NewInt(1) + + receipt := ðTypes.Receipt{ + BlockNumber: blockNumber, + Status: ethTypes.ReceiptStatusSuccessful, + } + etherman. + On("GetTxReceipt", ctx, signedTx.Hash()). + Return(receipt, nil). + Once() + etherman. + On("GetTxReceipt", ctx, signedTx.Hash()). + Run(func(args mock.Arguments) { ethTxManagerClient.Stop() }). // stops the management cycle to avoid problems with mocks + Return(receipt, nil). + Once() + + etherman. + On("GetRevertMessage", ctx, signedTx). + Return("", nil). + Once() + + block := &state.Block{ + BlockNumber: blockNumber.Uint64(), + } + etherman. + On("GetLastBlock", ctx, nil). + Return(block, nil). + Once() + + err = ethTxManagerClient.Add(ctx, owner, id, from, to, value, data, gasOffset, nil) + require.NoError(t, err) + + go ethTxManagerClient.Start() + + time.Sleep(time.Second) + result, err := ethTxManagerClient.Result(ctx, owner, id, nil) + require.NoError(t, err) + require.Equal(t, id, result.ID) + require.Equal(t, txmTypes.MonitoredTxStatusConfirmed, result.Status) + require.Equal(t, 1, len(result.Txs)) + require.Equal(t, signedTx, result.Txs[signedTx.Hash()].Tx) + require.Equal(t, receipt, result.Txs[signedTx.Hash()].Receipt) + require.Equal(t, "", result.Txs[signedTx.Hash()].RevertMessage) +} + +func TestTxRetryFailed(t *testing.T) { + dbCfg := newStateDBConfig(t) + + etherman := mocks.NewEthermanMock(t) + storage, err := NewPostgresStorage(dbCfg) + require.NoError(t, err) + + config := config.EthTxManagerConfig{ + Config: ethtxmanager.Config{ + FrequencyToMonitorTxs: types.NewDuration(time.Second), + WaitTxToBeMined: defaultEthTxmanagerConfigForTests.WaitTxToBeMined, + GasPriceMarginFactor: defaultEthTxmanagerConfigForTests.GasPriceMarginFactor, + MaxGasPriceLimit: defaultEthTxmanagerConfigForTests.MaxGasPriceLimit, + }, + MaxRetries: 3, + } + + ethTxManagerClient := New(config, etherman, storage, etherman) + + ctx := context.Background() + + owner := "owner" + id := "unique_id" + from := common.HexToAddress("") + var to *common.Address + var value *big.Int + var data []byte = nil + + // Add + currentNonce := uint64(1) + etherman. + On("PendingNonce", ctx, from). + Return(currentNonce, nil). + Once() + + firstGasEstimation := uint64(1) + etherman. + On("EstimateGas", ctx, from, to, value, data). + Return(firstGasEstimation, nil). + Once() + + gasOffset := uint64(2) + + firstGasPriceSuggestion := big.NewInt(1) + etherman. + On("SuggestedGasPrice", ctx). + Return(firstGasPriceSuggestion, nil). + Once() + + // Monitoring Cycle 1 + firstSignedTx := ethTypes.NewTx(ðTypes.LegacyTx{ + Nonce: currentNonce, + To: to, + Value: value, + Gas: firstGasEstimation + gasOffset, + GasPrice: firstGasPriceSuggestion, + Data: data, + }) + etherman. + On("SignTx", ctx, from, mock.IsType(ðTypes.Transaction{})). + Return(firstSignedTx, nil). + Once() + etherman. + On("GetTx", ctx, firstSignedTx.Hash()). + Return(nil, false, ethereum.NotFound). + Once() + etherman. + On("SendTx", ctx, firstSignedTx). + Return(nil). + Once() + etherman. + On("WaitTxToBeMined", ctx, firstSignedTx, mock.IsType(time.Second)). + Return(false, errors.New("tx not mined yet")). + Once() + + // Monitoring Cycle 2 + etherman. + On("CheckTxWasMined", ctx, firstSignedTx.Hash()). + Return(false, nil, nil). + Once() + + etherman. + On("EstimateGas", ctx, from, to, value, data). + Return(uint64(0), errors.New("execution reverted")). + Once() + + // Monitoring Cycle 3 + etherman. + On("CheckTxWasMined", ctx, firstSignedTx.Hash()). + Return(false, nil, nil). + Once() + + etherman. + On("EstimateGas", ctx, from, to, value, data). + Return(uint64(0), errors.New("execution reverted")). + Once() + + // Monitoring Cycle 4 + etherman. + On("CheckTxWasMined", ctx, firstSignedTx.Hash()). + Return(false, nil, nil). + Once() + + // Build result + etherman. + On("GetTx", ctx, firstSignedTx.Hash()). + Return(firstSignedTx, false, nil). + Once() + etherman. + On("GetTxReceipt", ctx, firstSignedTx.Hash()). + Return(nil, ethereum.NotFound). + Once() + etherman. + On("GetRevertMessage", ctx, firstSignedTx). + Return("", nil). + Once() + + err = ethTxManagerClient.Add(ctx, owner, id, from, to, value, data, gasOffset, nil) + require.NoError(t, err) + + go ethTxManagerClient.Start() + + time.Sleep(5 * time.Second) + result, err := ethTxManagerClient.Result(ctx, owner, id, nil) + require.NoError(t, err) + require.Equal(t, txmTypes.MonitoredTxStatusFailed, result.Status) +} diff --git a/txmanager/types/interfaces.go b/txmanager/types/interfaces.go new file mode 100644 index 00000000..0d6c7427 --- /dev/null +++ b/txmanager/types/interfaces.go @@ -0,0 +1,37 @@ +package types + +import ( + "context" + "math/big" + "time" + + "github.com/0xPolygonHermez/zkevm-node/state" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/jackc/pgx/v4" +) + +type EthermanInterface interface { + GetTx(ctx context.Context, txHash common.Hash) (*types.Transaction, bool, error) + GetTxReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) + WaitTxToBeMined(ctx context.Context, tx *types.Transaction, timeout time.Duration) (bool, error) + SendTx(ctx context.Context, tx *types.Transaction) error + PendingNonce(ctx context.Context, account common.Address) (uint64, error) + SuggestedGasPrice(ctx context.Context) (*big.Int, error) + EstimateGas(ctx context.Context, from common.Address, to *common.Address, value *big.Int, data []byte) (uint64, error) + CheckTxWasMined(ctx context.Context, txHash common.Hash) (bool, *types.Receipt, error) + SignTx(ctx context.Context, sender common.Address, tx *types.Transaction) (*types.Transaction, error) + GetRevertMessage(ctx context.Context, tx *types.Transaction) (string, error) +} + +type StorageInterface interface { + Add(ctx context.Context, mTx MonitoredTx, dbTx pgx.Tx) error + Get(ctx context.Context, owner, id string, dbTx pgx.Tx) (MonitoredTx, error) + GetByStatus(ctx context.Context, owner *string, statuses []MonitoredTxStatus, dbTx pgx.Tx) ([]MonitoredTx, error) + GetBySenderAndStatus(ctx context.Context, sender common.Address, statuses []MonitoredTxStatus, dbTx pgx.Tx) ([]MonitoredTx, error) + Update(ctx context.Context, mTx MonitoredTx, dbTx pgx.Tx) error +} + +type StateInterface interface { + GetLastBlock(ctx context.Context, dbTx pgx.Tx) (*state.Block, error) +} diff --git a/txmanager/types/monitoredtx.go b/txmanager/types/monitoredtx.go new file mode 100644 index 00000000..201532e1 --- /dev/null +++ b/txmanager/types/monitoredtx.go @@ -0,0 +1,213 @@ +package types + +import ( + "encoding/hex" + "errors" + "math/big" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +var ( + // ErrNotFound when the object is not found + ErrNotFound = errors.New("not found") + // ErrAlreadyExists when the object already exists + ErrAlreadyExists = errors.New("already exists") + + // ErrExecutionReverted returned when trying to get the revert message + // but the call fails without revealing the revert reason + ErrExecutionReverted = errors.New("execution reverted") +) + +const ( + // MonitoredTxStatusCreated mean the tx was just added to the storage + MonitoredTxStatusCreated = MonitoredTxStatus("created") + + // MonitoredTxStatusSent means that at least a eth tx was sent to the network + MonitoredTxStatusSent = MonitoredTxStatus("sent") + + // MonitoredTxStatusFailed means the tx was already mined and failed with an + // error that can't be recovered automatically, ex: the data in the tx is invalid + // and the tx gets reverted + MonitoredTxStatusFailed = MonitoredTxStatus("failed") + + // MonitoredTxStatusConfirmed means the tx was already mined and the receipt + // status is Successful + MonitoredTxStatusConfirmed = MonitoredTxStatus("confirmed") + + // MonitoredTxStatusReorged is used when a monitored tx was already confirmed but + // the L1 block where this tx was confirmed has been reorged, in this situation + // the caller needs to review this information and wait until it gets confirmed + // again in a future block + MonitoredTxStatusReorged = MonitoredTxStatus("reorged") + + // MonitoredTxStatusDone means the tx was set by the owner as done + MonitoredTxStatusDone = MonitoredTxStatus("done") +) + +// MonitoredTxStatus represents the status of a monitored tx +type MonitoredTxStatus string + +// String returns a string representation of the status +func (s MonitoredTxStatus) String() string { + return string(s) +} + +// MonitoredTx represents a set of information used to build tx +// plus information to monitor if the transactions was sent successfully +type MonitoredTx struct { + // Owner is the common identifier among all the monitored tx to identify who + // created this, it's a identification provided by the caller in order to be + // used in the future to query the monitored tx by the Owner, this allows the + // caller to be free of implementing a persistence layer to monitor the txs + Owner string + + // ID is the tx identifier controller by the caller + ID string + + // sender of the tx, used to identify which private key should be used to sing the tx + From common.Address + + // receiver of the tx + To *common.Address + + // Nonce used to create the tx + Nonce uint64 + + // tx Value + Value *big.Int + + // tx Data + Data []byte + + // tx Gas + Gas uint64 + + // tx gas offset + GasOffset uint64 + + // tx gas price + GasPrice *big.Int + + // Status of this monitoring + Status MonitoredTxStatus + + // BlockNumber represents the block where the tx was identified + // to be mined, it's the same as the block number found in the + // tx receipt, this is used to control reorged monitored txs + BlockNumber *big.Int + + // History represent all transaction hashes from + // transactions created using this struct data and + // sent to the network + History map[common.Hash]bool + + // CreatedAt date time it was created + CreatedAt time.Time + + // UpdatedAt last date time it was updated + UpdatedAt time.Time + + // NumRetries number of times tx was sent to the network + NumRetries uint64 +} + +// Tx uses the current information to build a tx +func (mTx MonitoredTx) Tx() *types.Transaction { + tx := types.NewTx(&types.LegacyTx{ + To: mTx.To, + Nonce: mTx.Nonce, + Value: mTx.Value, + Data: mTx.Data, + Gas: mTx.Gas + mTx.GasOffset, + GasPrice: mTx.GasPrice, + }) + + return tx +} + +// AddHistory adds a transaction to the monitoring history +func (mTx *MonitoredTx) AddHistory(tx *types.Transaction) error { + if _, found := mTx.History[tx.Hash()]; found { + return ErrAlreadyExists + } + + mTx.History[tx.Hash()] = true + mTx.NumRetries++ + + return nil +} + +// ToStringPtr returns the current to field as a string pointer +func (mTx *MonitoredTx) ToStringPtr() *string { + var to *string + if mTx.To != nil { + s := mTx.To.String() + to = &s + } + return to +} + +// ValueU64Ptr returns the current value field as a uint64 pointer +func (mTx *MonitoredTx) ValueU64Ptr() *uint64 { + var value *uint64 + if mTx.Value != nil { + tmp := mTx.Value.Uint64() + value = &tmp + } + return value +} + +// DataStringPtr returns the current data field as a string pointer +func (mTx *MonitoredTx) DataStringPtr() *string { + var data *string + if mTx.Data != nil { + tmp := hex.EncodeToString(mTx.Data) + data = &tmp + } + return data +} + +// HistoryStringSlice returns the current history field as a string slice +func (mTx *MonitoredTx) HistoryStringSlice() []string { + history := make([]string, 0, len(mTx.History)) + for h := range mTx.History { + history = append(history, h.String()) + } + return history +} + +// HistoryHashSlice returns the current history field as a string slice +func (mTx *MonitoredTx) HistoryHashSlice() []common.Hash { + history := make([]common.Hash, 0, len(mTx.History)) + for h := range mTx.History { + history = append(history, h) + } + return history +} + +// BlockNumberU64Ptr returns the current blockNumber as a uint64 pointer +func (mTx *MonitoredTx) BlockNumberU64Ptr() *uint64 { + var blockNumber *uint64 + if mTx.BlockNumber != nil { + tmp := mTx.BlockNumber.Uint64() + blockNumber = &tmp + } + return blockNumber +} + +// MonitoredTxResult represents the result of a execution of a monitored tx +type MonitoredTxResult struct { + ID string + Status MonitoredTxStatus + Txs map[common.Hash]TxResult +} + +// TxResult represents the result of a execution of a ethereum transaction in the block chain +type TxResult struct { + Tx *types.Transaction + Receipt *types.Receipt + RevertMessage string +} diff --git a/txmanager/types/monitoretx_test.go b/txmanager/types/monitoretx_test.go new file mode 100644 index 00000000..dd3bb83b --- /dev/null +++ b/txmanager/types/monitoretx_test.go @@ -0,0 +1,38 @@ +package types + +import ( + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/assert" +) + +func TestTx(t *testing.T) { + to := common.HexToAddress("0x2") + nonce := uint64(1) + value := big.NewInt(2) + data := []byte("data") + gas := uint64(3) + gasOffset := uint64(4) + gasPrice := big.NewInt(5) + + mTx := MonitoredTx{ + To: &to, + Nonce: nonce, + Value: value, + Data: data, + Gas: gas, + GasOffset: gasOffset, + GasPrice: gasPrice, + } + + tx := mTx.Tx() + + assert.Equal(t, &to, tx.To()) + assert.Equal(t, nonce, tx.Nonce()) + assert.Equal(t, value, tx.Value()) + assert.Equal(t, data, tx.Data()) + assert.Equal(t, gas+gasOffset, tx.Gas()) + assert.Equal(t, gasPrice, tx.GasPrice()) +} diff --git a/types/interfaces.go b/types/interfaces.go index ac5d6782..59534d49 100644 --- a/types/interfaces.go +++ b/types/interfaces.go @@ -3,12 +3,15 @@ package types import ( "context" "math/big" + "time" "github.com/0xPolygon/agglayer/tx" - "github.com/0xPolygonHermez/zkevm-node/ethtxmanager" + txmTypes "github.com/0xPolygon/agglayer/txmanager/types" "github.com/0xPolygonHermez/zkevm-node/jsonrpc/types" + "github.com/0xPolygonHermez/zkevm-node/state" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" + gethTypes "github.com/ethereum/go-ethereum/core/types" "github.com/jackc/pgx/v4" ) @@ -20,13 +23,22 @@ type IEtherman interface { GetSequencerAddr(rollupId uint32) (common.Address, error) BuildTrustedVerifyBatchesTxData(lastVerifiedBatch, newVerifiedBatch uint64, proof tx.ZKP, rollupId uint32) (data []byte, err error) CallContract(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) + GetTx(ctx context.Context, txHash common.Hash) (*gethTypes.Transaction, bool, error) + GetTxReceipt(ctx context.Context, txHash common.Hash) (*gethTypes.Receipt, error) + WaitTxToBeMined(ctx context.Context, tx *gethTypes.Transaction, timeout time.Duration) (bool, error) + SendTx(ctx context.Context, tx *gethTypes.Transaction) error + PendingNonce(ctx context.Context, account common.Address) (uint64, error) + SuggestedGasPrice(ctx context.Context) (*big.Int, error) + EstimateGas(ctx context.Context, from common.Address, to *common.Address, value *big.Int, data []byte) (uint64, error) + CheckTxWasMined(ctx context.Context, txHash common.Hash) (bool, *gethTypes.Receipt, error) + SignTx(ctx context.Context, sender common.Address, tx *gethTypes.Transaction) (*gethTypes.Transaction, error) + GetRevertMessage(ctx context.Context, tx *gethTypes.Transaction) (string, error) + GetLastBlock(ctx context.Context, dbTx pgx.Tx) (*state.Block, error) } type IEthTxManager interface { Add(ctx context.Context, owner, id string, from common.Address, to *common.Address, value *big.Int, data []byte, gasOffset uint64, dbTx pgx.Tx) error - Result(ctx context.Context, owner, id string, dbTx pgx.Tx) (ethtxmanager.MonitoredTxResult, error) - ResultsByStatus(ctx context.Context, owner string, statuses []ethtxmanager.MonitoredTxStatus, dbTx pgx.Tx) ([]ethtxmanager.MonitoredTxResult, error) - ProcessPendingMonitoredTxs(ctx context.Context, owner string, failedResultHandler ethtxmanager.ResultHandler, dbTx pgx.Tx) + Result(ctx context.Context, owner, id string, dbTx pgx.Tx) (txmTypes.MonitoredTxResult, error) } type IZkEVMClient interface { From 85e6cada137baab35718f52aa576b48ed5642a9f Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Thu, 28 Mar 2024 13:09:05 +0100 Subject: [PATCH 2/8] rename test --- txmanager/txmanager_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/txmanager/txmanager_test.go b/txmanager/txmanager_test.go index ef409b26..199590c2 100644 --- a/txmanager/txmanager_test.go +++ b/txmanager/txmanager_test.go @@ -789,7 +789,7 @@ func TestFailedToEstimateTxWithForcedGasGetMined(t *testing.T) { require.Equal(t, "", result.Txs[signedTx.Hash()].RevertMessage) } -func TestTxRetryFailed(t *testing.T) { +func TestTxRetry_MaxRetriesReached(t *testing.T) { dbCfg := newStateDBConfig(t) etherman := mocks.NewEthermanMock(t) From 9e1521cdc622fbbe32745bb6878592358c928e0c Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Thu, 28 Mar 2024 13:17:01 +0100 Subject: [PATCH 3/8] stop in test --- txmanager/txmanager_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/txmanager/txmanager_test.go b/txmanager/txmanager_test.go index 199590c2..7ef0a560 100644 --- a/txmanager/txmanager_test.go +++ b/txmanager/txmanager_test.go @@ -807,6 +807,7 @@ func TestTxRetry_MaxRetriesReached(t *testing.T) { } ethTxManagerClient := New(config, etherman, storage, etherman) + defer ethTxManagerClient.Stop() ctx := context.Background() @@ -915,4 +916,5 @@ func TestTxRetry_MaxRetriesReached(t *testing.T) { result, err := ethTxManagerClient.Result(ctx, owner, id, nil) require.NoError(t, err) require.Equal(t, txmTypes.MonitoredTxStatusFailed, result.Status) + } From 4efe478f3a5ddeb041ca11816e79d780f0e4f21b Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Thu, 28 Mar 2024 16:39:41 +0100 Subject: [PATCH 4/8] add migrate down --- db/migrations/0002.sql | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/db/migrations/0002.sql b/db/migrations/0002.sql index 3fbd2e99..8c554f3b 100644 --- a/db/migrations/0002.sql +++ b/db/migrations/0002.sql @@ -1,3 +1,6 @@ -- +migrate Up ALTER TABLE state.monitored_txs -ADD COLUMN num_retries DECIMAL(78, 0) NOT NULL DEFAULT 0; \ No newline at end of file +ADD COLUMN num_retries DECIMAL(78, 0) NOT NULL DEFAULT 0; + +-- +migrate Down +ALTER TABLE state.monitored_txs DROP COLUMN num_retries; \ No newline at end of file From 6e0a701826e5532bcd6214bb28c833473b5c2d4f Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Thu, 28 Mar 2024 16:50:43 +0100 Subject: [PATCH 5/8] rows.Close() --- txmanager/pgstorage.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/txmanager/pgstorage.go b/txmanager/pgstorage.go index 1f55ae4d..23c71d74 100644 --- a/txmanager/pgstorage.go +++ b/txmanager/pgstorage.go @@ -106,6 +106,8 @@ func (s *PostgresStorage) GetByStatus(ctx context.Context, owner *string, status rows, err = conn.Query(ctx, cmd, owner) } + defer rows.Close() + if errors.Is(err, pgx.ErrNoRows) { return []txmTypes.MonitoredTx{}, nil } else if err != nil { @@ -152,6 +154,8 @@ func (s *PostgresStorage) GetBySenderAndStatus( rows, err = conn.Query(ctx, cmd, sender.String()) } + defer rows.Close() + if errors.Is(err, pgx.ErrNoRows) { return []txmTypes.MonitoredTx{}, nil } else if err != nil { From e4a57fc577d36818ae3706a39ca0c5ecc0f12295 Mon Sep 17 00:00:00 2001 From: Victor Castell Date: Fri, 5 Apr 2024 12:08:03 +0200 Subject: [PATCH 6/8] Use existing interface --- types/interfaces.go | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/types/interfaces.go b/types/interfaces.go index 59534d49..ea52a116 100644 --- a/types/interfaces.go +++ b/types/interfaces.go @@ -3,7 +3,6 @@ package types import ( "context" "math/big" - "time" "github.com/0xPolygon/agglayer/tx" txmTypes "github.com/0xPolygon/agglayer/txmanager/types" @@ -11,7 +10,6 @@ import ( "github.com/0xPolygonHermez/zkevm-node/state" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" - gethTypes "github.com/ethereum/go-ethereum/core/types" "github.com/jackc/pgx/v4" ) @@ -23,16 +21,7 @@ type IEtherman interface { GetSequencerAddr(rollupId uint32) (common.Address, error) BuildTrustedVerifyBatchesTxData(lastVerifiedBatch, newVerifiedBatch uint64, proof tx.ZKP, rollupId uint32) (data []byte, err error) CallContract(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) - GetTx(ctx context.Context, txHash common.Hash) (*gethTypes.Transaction, bool, error) - GetTxReceipt(ctx context.Context, txHash common.Hash) (*gethTypes.Receipt, error) - WaitTxToBeMined(ctx context.Context, tx *gethTypes.Transaction, timeout time.Duration) (bool, error) - SendTx(ctx context.Context, tx *gethTypes.Transaction) error - PendingNonce(ctx context.Context, account common.Address) (uint64, error) - SuggestedGasPrice(ctx context.Context) (*big.Int, error) - EstimateGas(ctx context.Context, from common.Address, to *common.Address, value *big.Int, data []byte) (uint64, error) - CheckTxWasMined(ctx context.Context, txHash common.Hash) (bool, *gethTypes.Receipt, error) - SignTx(ctx context.Context, sender common.Address, tx *gethTypes.Transaction) (*gethTypes.Transaction, error) - GetRevertMessage(ctx context.Context, tx *gethTypes.Transaction) (string, error) + txmTypes.EthermanInterface GetLastBlock(ctx context.Context, dbTx pgx.Tx) (*state.Block, error) } From 1d7df9c140b29d7666dbf3ac28577043b9a0c5e3 Mon Sep 17 00:00:00 2001 From: Victor Castell <0x@vcastellm.xyz> Date: Fri, 5 Apr 2024 10:25:38 +0000 Subject: [PATCH 7/8] Add some tests --- txmanager/types/monitoretx_test.go | 57 ++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/txmanager/types/monitoretx_test.go b/txmanager/types/monitoretx_test.go index dd3bb83b..57fcdf85 100644 --- a/txmanager/types/monitoretx_test.go +++ b/txmanager/types/monitoretx_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" "github.com/stretchr/testify/assert" ) @@ -36,3 +37,59 @@ func TestTx(t *testing.T) { assert.Equal(t, gas+gasOffset, tx.Gas()) assert.Equal(t, gasPrice, tx.GasPrice()) } + +func TestAddHistory(t *testing.T) { + mTx := MonitoredTx{ + History: make(map[common.Hash]bool), + NumRetries: 0, + } + + tx := types.NewTransaction(0, common.HexToAddress("0x1"), big.NewInt(0), 0, big.NewInt(0), nil) + + err := mTx.AddHistory(tx) + assert.NoError(t, err) + assert.True(t, mTx.History[tx.Hash()]) + assert.Equal(t, uint64(0x1), mTx.NumRetries) + + err = mTx.AddHistory(tx) + assert.Equal(t, ErrAlreadyExists, err) +} + +func TestMonitoredTx_HistoryStringSlice(t *testing.T) { + mTx := MonitoredTx{ + History: map[common.Hash]bool{ + common.HexToHash("0x1"): true, + common.HexToHash("0x2"): true, + common.HexToHash("0x3"): true, + }, + } + + expected := []string{ + "0x0000000000000000000000000000000000000000000000000000000000000001", + "0x0000000000000000000000000000000000000000000000000000000000000002", + "0x0000000000000000000000000000000000000000000000000000000000000003", + } + result := mTx.HistoryStringSlice() + + assert.Equal(t, expected, result) +} + +func TestHistoryHashSlice(t *testing.T) { + mTx := MonitoredTx{ + History: map[common.Hash]bool{ + common.HexToHash("0x1"): true, + common.HexToHash("0x2"): true, + common.HexToHash("0x3"): true, + }, + } + + expected := []common.Hash{ + common.HexToHash("0x1"), + common.HexToHash("0x2"), + common.HexToHash("0x3"), + } + + result := mTx.HistoryHashSlice() + + assert.Equal(t, expected, result) +} From ed52aaeca7ff4ebf042156c9874a583e04afc4fa Mon Sep 17 00:00:00 2001 From: Victor Castell <0x@vcastellm.xyz> Date: Fri, 5 Apr 2024 10:35:09 +0000 Subject: [PATCH 8/8] Add tests --- txmanager/types/monitoretx_test.go | 41 ++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/txmanager/types/monitoretx_test.go b/txmanager/types/monitoretx_test.go index 57fcdf85..8113abf3 100644 --- a/txmanager/types/monitoretx_test.go +++ b/txmanager/types/monitoretx_test.go @@ -1,6 +1,7 @@ package types import ( + "encoding/hex" "math/big" "testing" @@ -93,3 +94,43 @@ func TestHistoryHashSlice(t *testing.T) { assert.Equal(t, expected, result) } + +func TestMonitoredTx_BlockNumberU64Ptr(t *testing.T) { + // Create a monitoredTx instance with a non-nil BlockNumber + mTx := MonitoredTx{ + BlockNumber: big.NewInt(123), + } + + // Call the BlockNumberU64Ptr method + result := mTx.BlockNumberU64Ptr() + + // Assert that the result is not nil + assert.NotNil(t, result) + + // Assert that the value pointed by result is equal to the expected value + expected := uint64(123) + assert.Equal(t, expected, *result) + + // Create a monitoredTx instance with a nil BlockNumber + mTx2 := MonitoredTx{ + BlockNumber: nil, + } + + // Call the BlockNumberU64Ptr method + result2 := mTx2.BlockNumberU64Ptr() + + // Assert that the result is nil + assert.Nil(t, result2) +} + +func TestMonitoredTx_DataStringPtr(t *testing.T) { + mTx := MonitoredTx{ + Data: []byte("data"), + } + + expected := hex.EncodeToString(mTx.Data) + actual := mTx.DataStringPtr() + + assert.NotNil(t, actual) + assert.Equal(t, expected, *actual) +}