Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(solver/app): wire event streamers #2555

Merged
merged 1 commit into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions e2e/app/definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,6 @@ func newBackends(ctx context.Context, cfg DefinitionConfig, testnet types.Testne
return ethbackend.Backends{}, errors.Wrap(err, "new fireblocks")
}

// TODO(corver): Fireblocks keys need to be funded on private/internal chains we deploy.

return ethbackend.NewFireBackends(ctx, testnet, fireCl)
}

Expand Down
8 changes: 4 additions & 4 deletions e2e/app/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ func writeRelayerConfig(ctx context.Context, def Definition, logCfg log.Config)
return nil
}

func writeSolverConfig(_ context.Context, def Definition, logCfg log.Config) error {
func writeSolverConfig(ctx context.Context, def Definition, logCfg log.Config) error {
confRoot := filepath.Join(def.Testnet.Dir, "solver")

const (
Expand All @@ -516,10 +516,10 @@ func writeSolverConfig(_ context.Context, def Definition, logCfg log.Config) err
endpoints = ExternalEndpoints(def)
}

// TODO(corver): save proper private key
privKey, err := ethcrypto.GenerateKey()
// Save private key
privKey, err := eoa.PrivateKey(ctx, def.Testnet.Network, eoa.RoleSolver)
if err != nil {
return errors.Wrap(err, "generate private key")
return errors.Wrap(err, "get relayer key")
}
if err := ethcrypto.SaveECDSA(filepath.Join(confRoot, privKeyFile), privKey); err != nil {
return errors.Wrap(err, "write private key")
Expand Down
34 changes: 34 additions & 0 deletions lib/ethclient/ethbackend/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/omni-network/omni/lib/fireblocks"
"github.com/omni-network/omni/lib/netconf"
"github.com/omni-network/omni/lib/txmgr"
"github.com/omni-network/omni/lib/xchain"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -82,6 +83,30 @@ func NewFireBackends(ctx context.Context, testnet types.Testnet, fireCl firebloc
}, nil
}

func BackendsFromNetwork(network netconf.Network, endpoints xchain.RPCEndpoints, privKeys ...*ecdsa.PrivateKey) (Backends, error) {
inner := make(map[uint64]*Backend)
for _, chain := range network.EVMChains() {
endpoint, err := endpoints.ByNameOrID(chain.Name, chain.ID)
if err != nil {
return Backends{}, err
}

ethCl, err := ethclient.Dial(chain.Name, endpoint)
if err != nil {
return Backends{}, errors.Wrap(err, "dial")
}

inner[chain.ID], err = NewBackend(chain.Name, chain.ID, chain.BlockPeriod, ethCl, privKeys...)
if err != nil {
return Backends{}, errors.Wrap(err, "new backend")
}
}

return Backends{
backends: inner,
}, nil
}

// NewBackends returns a multi-backends backed by in-memory keys that supports configured all chains.
func NewBackends(ctx context.Context, testnet types.Testnet, deployKeyFile string) (Backends, error) {
var err error
Expand Down Expand Up @@ -164,6 +189,15 @@ func (b Backends) All() map[uint64]*Backend {
return b.backends
}

func (b Backends) Clients() map[uint64]ethclient.Client {
clients := make(map[uint64]ethclient.Client)
for chainID, backend := range b.backends {
clients[chainID] = backend.Client
}

return clients
}

func (b Backends) Backend(sourceChainID uint64) (*Backend, error) {
backend, ok := b.backends[sourceChainID]
if !ok {
Expand Down
138 changes: 88 additions & 50 deletions solver/app/app.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
//nolint:unused // It will be used in PRs.
package app

import (
Expand All @@ -10,6 +9,7 @@ import (
"github.com/omni-network/omni/contracts/bindings"
"github.com/omni-network/omni/halo/genutil/evm/predeploys"
"github.com/omni-network/omni/lib/buildinfo"
"github.com/omni-network/omni/lib/contracts"
"github.com/omni-network/omni/lib/errors"
"github.com/omni-network/omni/lib/ethclient"
"github.com/omni-network/omni/lib/ethclient/ethbackend"
Expand All @@ -19,7 +19,9 @@ import (
"github.com/omni-network/omni/lib/xchain"
xprovider "github.com/omni-network/omni/lib/xchain/provider"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
ethcrypto "github.com/ethereum/go-ethereum/crypto"

"github.com/prometheus/client_golang/prometheus/promhttp"
)
Expand Down Expand Up @@ -50,17 +52,37 @@ func Run(ctx context.Context, cfg Config) error {
return err
}

ethClients, err := initializeEthClients(network.EVMChains(), cfg.RPCEndpoints)
if cfg.PrivateKey == "" {
return errors.New("private key not set")
}
privKey, err := ethcrypto.LoadECDSA(cfg.PrivateKey)
if err != nil {
return errors.Wrap(err, "load private key")
}
solverAddr := ethcrypto.PubkeyToAddress(privKey.PublicKey)
log.Debug(ctx, "Using solver address", "address", solverAddr.Hex())

backends, err := ethbackend.BackendsFromNetwork(network, cfg.RPCEndpoints, privKey)
if err != nil {
return err
}

_, err = newSolverDB(cfg.DBDir)
xprov := xprovider.New(network, backends.Clients(), nil)

db, err := newSolverDB(cfg.DBDir)
if err != nil {
return err
}

_ = xprovider.New(network, ethClients, nil)
cursors, err := newCursors(db)
if err != nil {
return errors.Wrap(err, "create cursor store")
}

err = startEventStreams(ctx, network, xprov, backends, solverAddr, cursors)
if err != nil {
return errors.Wrap(err, "start event streams")
}

select {
case <-ctx.Done():
Expand Down Expand Up @@ -119,68 +141,82 @@ func makePortalRegistry(network netconf.ID, endpoints xchain.RPCEndpoints) (*bin
return resp, nil
}

// initializeEthClients initializes the RPC clients for the given chains.
func initializeEthClients(chains []netconf.Chain, endpoints xchain.RPCEndpoints) (map[uint64]ethclient.Client, error) {
rpcClientPerChain := make(map[uint64]ethclient.Client)
for _, chain := range chains {
rpc, err := endpoints.ByNameOrID(chain.Name, chain.ID)
if err != nil {
return nil, err
}
c, err := ethclient.Dial(chain.Name, rpc)
if err != nil {
return nil, errors.Wrap(err, "dial rpc", "chain_name", chain.Name, "chain_id", chain.ID, "rpc_url", rpc)
}
rpcClientPerChain[chain.ID] = c
}

return rpcClientPerChain, nil
}

// startEventStreams starts the event streams for the solver.
// TODO(corver): Make this robust against chains not be available on startup.
func startEventStreams(
ctx context.Context,
network netconf.Network,
xprov xchain.Provider,
backends ethbackend.Backends,
def Definition,
solverAddr common.Address,
cursors *cursors,
) error {
addrs, err := contracts.GetAddresses(ctx, network.ID)
if err != nil {
return errors.Wrap(err, "get contract addresses")
}

inboxChains, err := detectContractChains(ctx, network, backends, addrs.SolveInbox)
if err != nil {
return errors.Wrap(err, "detect inbox chains")
}

inboxContracts := make(map[uint64]*bindings.SolveInbox)
outboxContracts := make(map[uint64]*bindings.SolveOutbox)
for _, chain := range network.EVMChains() {
// Maybe init cursor store with deploy heights
chainVer := chainVerFromID(chain.ID)
for _, chain := range inboxChains {
name := network.ChainName(chain)
chainVer := chainVerFromID(chain)
log.Debug(ctx, "Using inbox contract", "chain", name, "address", addrs.SolveInbox.Hex())

backend, err := backends.Backend(chain)
if err != nil {
return err
}

inbox, err := bindings.NewSolveInbox(addrs.SolveInbox, backend)
if err != nil {
return errors.Wrap(err, "create inbox contract", "chain", name)
}
inboxContracts[chain] = inbox

// Check if cursor store should be initialized with deploy height
if _, ok, err := cursors.Get(ctx, chainVer); err != nil {
return errors.Wrap(err, "get cursor", "chain_id", chain.ID)
} else if !ok {
height, ok := def.InboxDeployHeights[chain.ID]
if !ok {
return errors.New("missing inbox deploy height", "chain_id", chain.ID)
}
err := cursors.Set(ctx, chainVer, height)
if err != nil {
return err
}
return errors.Wrap(err, "get cursor", "chain", name)
} else if ok { // Cursor already set, skip
continue
}

backend, err := backends.Backend(chain.ID)
height, err := inbox.DeployedAt(&bind.CallOpts{Context: ctx})
if err != nil {
return errors.New("get inbox deploy height", "chain", name)
}

log.Info(ctx, "Initializing inbox cursor", "chain", name, "deployed_at", height)

if err := cursors.Set(ctx, chainVer, height.Uint64()); err != nil {
return err
}
}

inbox, err := bindings.NewSolveInbox(def.InboxAddress, backend)
outboxChains, err := detectContractChains(ctx, network, backends, addrs.SolveOutbox)
if err != nil {
return errors.Wrap(err, "detect outbox chains")
}

outboxContracts := make(map[uint64]*bindings.SolveOutbox)
for _, chain := range outboxChains {
name := network.ChainName(chain)
log.Debug(ctx, "Using outbox contract", "chain", name, "address", addrs.SolveInbox.Hex())

backend, err := backends.Backend(chain)
if err != nil {
return errors.Wrap(err, "create inbox contract", "chain_id", chain.ID)
return err
}

outbox, err := bindings.NewSolveOutbox(def.OutboxAddress, backend)
outbox, err := bindings.NewSolveOutbox(addrs.SolveOutbox, backend)
if err != nil {
return errors.Wrap(err, "create outbox contract", "chain_id", chain.ID)
return errors.Wrap(err, "create outbox contract", "chain", name)
}

inboxContracts[chain.ID] = inbox
outboxContracts[chain.ID] = outbox
outboxContracts[chain] = outbox
}

cursorSetter := func(ctx context.Context, chainID uint64, height uint64) error {
Expand All @@ -190,28 +226,30 @@ func startEventStreams(
deps := procDeps{
ParseID: newIDParser(inboxContracts),
GetRequest: newRequestGetter(inboxContracts),
ShouldReject: newRequestValidator(def),
ShouldReject: newRequestValidator(),
Accept: newAcceptor(network.ID, inboxContracts, backends, solverAddr),
Reject: newRejector(inboxContracts, backends, solverAddr),
Fulfill: newFulfiller(outboxContracts, backends, solverAddr),
Claim: newClaimer(inboxContracts, backends, solverAddr),
SetCursor: cursorSetter,
}

for _, chain := range network.EVMChains() {
go streamEventsForever(ctx, chain.ID, xprov, deps, def, cursors)
for _, chain := range inboxChains {
log.Info(ctx, "Starting inbox event stream", "chain", network.ChainName(chain))
go streamEventsForever(ctx, chain, xprov, deps, cursors, addrs.SolveInbox)
}

return nil
}

// streamEventsForever streams events from the inbox contract on the given chain.
func streamEventsForever(
ctx context.Context,
chainID uint64,
xprov xchain.Provider,
deps procDeps,
def Definition,
cursors *cursors,
inboxAddr common.Address,
) {
backoff := expbackoff.New(ctx, expbackoff.WithPeriodicConfig(time.Second*5))
for {
Expand All @@ -227,7 +265,7 @@ func streamEventsForever(
ChainID: chainID,
Height: from,
ConfLevel: confLevel,
FilterAddress: def.InboxAddress,
FilterAddress: inboxAddr,
FilterTopics: allEventTopics,
}
err = xprov.StreamEventLogs(ctx, req, newEventProcessor(deps, chainID))
Expand Down
1 change: 0 additions & 1 deletion solver/app/bindings.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ var (
return resp
}()

//nolint:unused // False positive
allEventTopics = func() []common.Hash {
resp := make([]common.Hash, 0, len(allEvents))
for _, e := range allEvents {
Expand Down
1 change: 0 additions & 1 deletion solver/app/cursor.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
//nolint:unused // It will be used
package app

import (
Expand Down
12 changes: 1 addition & 11 deletions solver/app/definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,9 @@ import (

"github.com/omni-network/omni/contracts/bindings"
"github.com/omni-network/omni/lib/errors"

"github.com/ethereum/go-ethereum/common"
)

// Definition defines the solver rules and addresses and config per network.
type Definition struct {
InboxAddress common.Address
OutboxAddress common.Address
InboxDeployHeights map[uint64]uint64
}

//nolint:unused // False positive.
func newRequestValidator(_ Definition) func(ctx context.Context, chainID uint64, req bindings.SolveRequest) (uint8, bool, error) {
func newRequestValidator() func(ctx context.Context, chainID uint64, req bindings.SolveRequest) (uint8, bool, error) {
return func(context.Context, uint64, bindings.SolveRequest) (uint8, bool, error) {
return 0, false, errors.New("not implemented")
}
Expand Down
1 change: 0 additions & 1 deletion solver/app/deps.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
//nolint:unused // Some functions are unused but are kept for future use
package app

import (
Expand Down
33 changes: 33 additions & 0 deletions solver/app/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package app

import (
"context"

"github.com/omni-network/omni/lib/errors"
"github.com/omni-network/omni/lib/ethclient/ethbackend"
"github.com/omni-network/omni/lib/netconf"

"github.com/ethereum/go-ethereum/common"
)

// detectContractChains returns the chains on which the contract is deployed at the provided address.
func detectContractChains(ctx context.Context, network netconf.Network, backends ethbackend.Backends, address common.Address) ([]uint64, error) {
var resp []uint64
for _, chain := range network.EVMChains() {
backend, err := backends.Backend(chain.ID)
if err != nil {
return nil, err
}

code, err := backend.CodeAt(ctx, address, nil)
if err != nil {
return nil, errors.Wrap(err, "get code", "chain", chain.Name)
} else if len(code) == 0 {
continue
}

resp = append(resp, chain.ID)
}

return resp, nil
}
1 change: 0 additions & 1 deletion solver/app/targets.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
//nolint:unused // Some functions are unused but are kept for future use
package app

import (
Expand Down
Loading