Skip to content

Commit

Permalink
merge main + resolve conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
cyberhorsey committed Nov 16, 2022
2 parents 3a4423c + 7ab1291 commit 4e3cc85
Show file tree
Hide file tree
Showing 21 changed files with 316 additions and 215 deletions.
Binary file added packages/branding/Logo/PNG/Taiko_Favicon_Fluo.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 2 additions & 1 deletion packages/relayer/.default.env
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ MYSQL_MAX_IDLE_CONNS=
MYSQL_MAX_OPEN_CONNS=
MYSQL_CONN_MAX_LIFETIME_IN_MS=
NUM_GOROUTINES=20
SUBSCRIPTION_BACKOFF_IN_SECONDS=3
SUBSCRIPTION_BACKOFF_IN_SECONDS=3
CONFIRMATIONS_BEFORE_PROCESSING=15
9 changes: 9 additions & 0 deletions packages/relayer/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ var (
"MYSQL_DATABASE",
"MYSQL_HOST",
"RELAYER_ECDSA_KEY",
"CONFIRMATIONS_BEFORE_PROCESSING",
}

defaultBlockBatchSize = 2
defaultNumGoroutines = 10
defaultSubscriptionBackoff = 2 * time.Second
defaultConfirmations = 15
)

func Run(mode relayer.Mode, layer relayer.Layer) {
Expand Down Expand Up @@ -132,6 +134,11 @@ func makeIndexers(layer relayer.Layer, db *gorm.DB) ([]*indexer.Service, func(),
subscriptionBackoff = time.Duration(subscriptionBackoffInSeconds) * time.Second
}

confirmations, err := strconv.Atoi(os.Getenv("CONFIRMATIONS_BEFORE_PROCESSING"))
if err != nil || confirmations <= 0 {
confirmations = defaultConfirmations
}

indexers := make([]*indexer.Service, 0)

if layer == relayer.L1 || layer == relayer.Both {
Expand All @@ -151,6 +158,7 @@ func makeIndexers(layer relayer.Layer, db *gorm.DB) ([]*indexer.Service, func(),
BlockBatchSize: uint64(blockBatchSize),
NumGoroutines: numGoroutines,
SubscriptionBackoff: subscriptionBackoff,
Confirmations: uint64(confirmations),
})
if err != nil {
log.Fatal(err)
Expand All @@ -176,6 +184,7 @@ func makeIndexers(layer relayer.Layer, db *gorm.DB) ([]*indexer.Service, func(),
BlockBatchSize: uint64(blockBatchSize),
NumGoroutines: numGoroutines,
SubscriptionBackoff: subscriptionBackoff,
Confirmations: uint64(confirmations),
})
if err != nil {
log.Fatal(err)
Expand Down
5 changes: 5 additions & 0 deletions packages/relayer/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,10 @@ var (
ErrNoRPCClient = errors.Validation.NewWithKeyAndDetail("ERR_NO_RPC_CLIENT", "RPCClient is required")
ErrNoBridge = errors.Validation.NewWithKeyAndDetail("ERR_NO_BRIDGE", "Bridge is required")
ErrNoTaikoL2 = errors.Validation.NewWithKeyAndDetail("ERR_NO_TAIKO_L2", "TaikoL2 is required")

ErrInvalidConfirmations = errors.Validation.NewWithKeyAndDetail(
"ERR_INVALID_CONFIRMATIONS",
"Confirmations amount is invalid, must be numerical and > 0",
)
ErrInvalidMode = errors.Validation.NewWithKeyAndDetail("ERR_INVALID_MODE", "Mode not supported")
)
3 changes: 3 additions & 0 deletions packages/relayer/indexer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type NewServiceOpts struct {
BlockBatchSize uint64
NumGoroutines int
SubscriptionBackoff time.Duration
Confirmations uint64
}

func NewService(opts NewServiceOpts) (*Service, error) {
Expand Down Expand Up @@ -141,6 +142,8 @@ func NewService(opts NewServiceOpts) (*Service, error) {
EventRepo: opts.EventRepo,
DestHeaderSyncer: destHeaderSyncer,
RelayerAddress: relayerAddr,
Confirmations: opts.Confirmations,
SrcETHClient: opts.EthClient,
})
if err != nil {
return nil, errors.Wrap(err, "message.NewProcessor")
Expand Down
9 changes: 9 additions & 0 deletions packages/relayer/indexer/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func Test_NewService(t *testing.T) {
ECDSAKey: dummyEcdsaKey,
BridgeAddress: common.HexToAddress(dummyAddress),
DestBridgeAddress: common.HexToAddress(dummyAddress),
Confirmations: 1,
},
nil,
},
Expand All @@ -54,6 +55,7 @@ func Test_NewService(t *testing.T) {
ECDSAKey: dummyEcdsaKey,
BridgeAddress: common.HexToAddress(dummyAddress),
DestBridgeAddress: common.HexToAddress(dummyAddress),
Confirmations: 1,
},
relayer.ErrNoRPCClient,
},
Expand All @@ -67,6 +69,7 @@ func Test_NewService(t *testing.T) {
ECDSAKey: dummyEcdsaKey,
RPCClient: &rpc.Client{},
DestBridgeAddress: common.HexToAddress(dummyAddress),
Confirmations: 1,
},
relayer.ErrNoBridgeAddress,
},
Expand All @@ -80,6 +83,7 @@ func Test_NewService(t *testing.T) {
ECDSAKey: dummyEcdsaKey,
RPCClient: &rpc.Client{},
BridgeAddress: common.HexToAddress(dummyAddress),
Confirmations: 1,
},
relayer.ErrNoBridgeAddress,
},
Expand All @@ -93,6 +97,7 @@ func Test_NewService(t *testing.T) {
DestEthClient: &ethclient.Client{},
BridgeAddress: common.HexToAddress(dummyAddress),
DestBridgeAddress: common.HexToAddress(dummyAddress),
Confirmations: 1,
},
relayer.ErrNoECDSAKey,
},
Expand All @@ -106,6 +111,7 @@ func Test_NewService(t *testing.T) {
BridgeAddress: common.HexToAddress(dummyAddress),
RPCClient: &rpc.Client{},
DestBridgeAddress: common.HexToAddress(dummyAddress),
Confirmations: 1,
},
relayer.ErrNoEventRepository,
},
Expand All @@ -119,6 +125,7 @@ func Test_NewService(t *testing.T) {
DestEthClient: &ethclient.Client{},
BridgeAddress: common.HexToAddress(dummyAddress),
DestBridgeAddress: common.HexToAddress(dummyAddress),
Confirmations: 1,
},
relayer.ErrNoBlockRepository,
},
Expand All @@ -132,6 +139,7 @@ func Test_NewService(t *testing.T) {
DestEthClient: &ethclient.Client{},
BridgeAddress: common.HexToAddress(dummyAddress),
DestBridgeAddress: common.HexToAddress(dummyAddress),
Confirmations: 1,
},
relayer.ErrNoEthClient,
},
Expand All @@ -145,6 +153,7 @@ func Test_NewService(t *testing.T) {
RPCClient: &rpc.Client{},
BridgeAddress: common.HexToAddress(dummyAddress),
DestBridgeAddress: common.HexToAddress(dummyAddress),
Confirmations: 1,
},
relayer.ErrNoEthClient,
},
Expand Down
23 changes: 23 additions & 0 deletions packages/relayer/message/process_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/hex"
"math/big"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
Expand All @@ -30,6 +31,10 @@ func (p *Processor) ProcessMessage(
return errors.New("only user can process this, gasLimit set to 0")
}

if err := p.waitForConfirmations(ctx, event.Raw.TxHash, event.Raw.BlockNumber); err != nil {
return errors.Wrap(err, "p.waitForConfirmations")
}

// get latest synced header since not every header is synced from L1 => L2,
// and later blocks still have the storage trie proof from previous blocks.
latestSyncedHeader, err := p.destHeaderSyncer.GetLatestSyncedHeader(&bind.CallOpts{})
Expand Down Expand Up @@ -154,3 +159,21 @@ func (p *Processor) getLatestNonce(ctx context.Context, auth *bind.TransactOpts)

return nil
}

func (p *Processor) waitForConfirmations(ctx context.Context, txHash common.Hash, blockNumber uint64) error {
// TODO: make timeout a config var
ctx, cancelFunc := context.WithTimeout(ctx, 2*time.Minute)

defer cancelFunc()

if err := relayer.WaitConfirmations(
ctx,
p.srcEthClient,
p.confirmations,
txHash,
); err != nil {
return errors.Wrap(err, "relayer.WaitConfirmations")
}

return nil
}
32 changes: 24 additions & 8 deletions packages/relayer/message/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

type Processor struct {
eventRepo relayer.EventRepository
srcEthClient *ethclient.Client
destEthClient *ethclient.Client
rpc *rpc.Client
ecdsaKey *ecdsa.PrivateKey
Expand All @@ -26,19 +27,22 @@ type Processor struct {

mu *sync.Mutex

destNonce uint64
relayerAddr common.Address
destNonce uint64
relayerAddr common.Address
confirmations uint64
}

type NewProcessorOpts struct {
Prover *proof.Prover
ECDSAKey *ecdsa.PrivateKey
RPCClient *rpc.Client
SrcETHClient *ethclient.Client
DestETHClient *ethclient.Client
DestBridge *contracts.Bridge
EventRepo relayer.EventRepository
DestHeaderSyncer *contracts.IHeaderSync
RelayerAddress common.Address
Confirmations uint64
}

func NewProcessor(opts NewProcessorOpts) (*Processor, error) {
Expand All @@ -58,6 +62,10 @@ func NewProcessor(opts NewProcessorOpts) (*Processor, error) {
return nil, relayer.ErrNoEthClient
}

if opts.SrcETHClient == nil {
return nil, relayer.ErrNoEthClient
}

if opts.DestBridge == nil {
return nil, relayer.ErrNoBridge
}
Expand All @@ -70,18 +78,26 @@ func NewProcessor(opts NewProcessorOpts) (*Processor, error) {
return nil, relayer.ErrNoTaikoL2
}

if opts.Confirmations == 0 {
return nil, relayer.ErrInvalidConfirmations
}

return &Processor{
eventRepo: opts.EventRepo,
prover: opts.Prover,
ecdsaKey: opts.ECDSAKey,
rpc: opts.RPCClient,
eventRepo: opts.EventRepo,
prover: opts.Prover,
ecdsaKey: opts.ECDSAKey,
rpc: opts.RPCClient,

srcEthClient: opts.SrcETHClient,

destEthClient: opts.DestETHClient,
destBridge: opts.DestBridge,
destHeaderSyncer: opts.DestHeaderSyncer,

mu: &sync.Mutex{},

destNonce: 0,
relayerAddr: opts.RelayerAddress,
destNonce: 0,
relayerAddr: opts.RelayerAddress,
confirmations: opts.Confirmations,
}, nil
}
44 changes: 44 additions & 0 deletions packages/relayer/message/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,53 @@ func Test_NewProcessor(t *testing.T) {
Prover: &proof.Prover{},
ECDSAKey: &ecdsa.PrivateKey{},
RPCClient: &rpc.Client{},
SrcETHClient: &ethclient.Client{},
DestETHClient: &ethclient.Client{},
DestBridge: &contracts.Bridge{},
EventRepo: &repo.EventRepository{},
DestHeaderSyncer: &contracts.IHeaderSync{},
Confirmations: 1,
},
nil,
},
{
"errNoConfirmations",
NewProcessorOpts{
Prover: &proof.Prover{},
ECDSAKey: &ecdsa.PrivateKey{},
RPCClient: &rpc.Client{},
SrcETHClient: &ethclient.Client{},
DestETHClient: &ethclient.Client{},
DestBridge: &contracts.Bridge{},
EventRepo: &repo.EventRepository{},
DestHeaderSyncer: &contracts.IHeaderSync{},
},
relayer.ErrInvalidConfirmations,
},
{
"errNoSrcClient",
NewProcessorOpts{
Prover: &proof.Prover{},
ECDSAKey: &ecdsa.PrivateKey{},
RPCClient: &rpc.Client{},
DestETHClient: &ethclient.Client{},
DestBridge: &contracts.Bridge{},
EventRepo: &repo.EventRepository{},
DestHeaderSyncer: &contracts.IHeaderSync{},
Confirmations: 1,
},
relayer.ErrNoEthClient,
},
{
"errNoProver",
NewProcessorOpts{
ECDSAKey: &ecdsa.PrivateKey{},
RPCClient: &rpc.Client{},
SrcETHClient: &ethclient.Client{},
DestETHClient: &ethclient.Client{},
DestBridge: &contracts.Bridge{},
EventRepo: &repo.EventRepository{},
Confirmations: 1,
DestHeaderSyncer: &contracts.IHeaderSync{},
},
relayer.ErrNoProver,
Expand All @@ -50,10 +82,12 @@ func Test_NewProcessor(t *testing.T) {
Prover: &proof.Prover{},

RPCClient: &rpc.Client{},
SrcETHClient: &ethclient.Client{},
DestETHClient: &ethclient.Client{},
DestBridge: &contracts.Bridge{},
EventRepo: &repo.EventRepository{},
DestHeaderSyncer: &contracts.IHeaderSync{},
Confirmations: 1,
},
relayer.ErrNoECDSAKey,
},
Expand All @@ -62,10 +96,12 @@ func Test_NewProcessor(t *testing.T) {
NewProcessorOpts{
Prover: &proof.Prover{},
ECDSAKey: &ecdsa.PrivateKey{},
SrcETHClient: &ethclient.Client{},
DestETHClient: &ethclient.Client{},
DestBridge: &contracts.Bridge{},
EventRepo: &repo.EventRepository{},
DestHeaderSyncer: &contracts.IHeaderSync{},
Confirmations: 1,
},
relayer.ErrNoRPCClient,
},
Expand All @@ -75,9 +111,11 @@ func Test_NewProcessor(t *testing.T) {
Prover: &proof.Prover{},
ECDSAKey: &ecdsa.PrivateKey{},
RPCClient: &rpc.Client{},
SrcETHClient: &ethclient.Client{},
DestBridge: &contracts.Bridge{},
EventRepo: &repo.EventRepository{},
DestHeaderSyncer: &contracts.IHeaderSync{},
Confirmations: 1,
},
relayer.ErrNoEthClient,
},
Expand All @@ -87,9 +125,11 @@ func Test_NewProcessor(t *testing.T) {
Prover: &proof.Prover{},
ECDSAKey: &ecdsa.PrivateKey{},
RPCClient: &rpc.Client{},
SrcETHClient: &ethclient.Client{},
DestETHClient: &ethclient.Client{},
EventRepo: &repo.EventRepository{},
DestHeaderSyncer: &contracts.IHeaderSync{},
Confirmations: 1,
},
relayer.ErrNoBridge,
},
Expand All @@ -99,9 +139,11 @@ func Test_NewProcessor(t *testing.T) {
Prover: &proof.Prover{},
ECDSAKey: &ecdsa.PrivateKey{},
RPCClient: &rpc.Client{},
SrcETHClient: &ethclient.Client{},
DestETHClient: &ethclient.Client{},
DestBridge: &contracts.Bridge{},
DestHeaderSyncer: &contracts.IHeaderSync{},
Confirmations: 1,
},
relayer.ErrNoEventRepository,
},
Expand All @@ -111,9 +153,11 @@ func Test_NewProcessor(t *testing.T) {
Prover: &proof.Prover{},
ECDSAKey: &ecdsa.PrivateKey{},
RPCClient: &rpc.Client{},
SrcETHClient: &ethclient.Client{},
DestETHClient: &ethclient.Client{},
EventRepo: &repo.EventRepository{},
DestBridge: &contracts.Bridge{},
Confirmations: 1,
},
relayer.ErrNoTaikoL2,
},
Expand Down
Loading

0 comments on commit 4e3cc85

Please sign in to comment.