Skip to content

Commit

Permalink
chore(solver/app): wire event streamers (#2555)
Browse files Browse the repository at this point in the history
Wire solver event streamers.

issue: none
  • Loading branch information
corverroos authored Nov 25, 2024
1 parent d9bb430 commit dc55732
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 71 deletions.
2 changes: 0 additions & 2 deletions e2e/app/definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,6 @@ func newBackends(ctx context.Context, cfg DefinitionConfig, testnet types.Testne
return ethbackend.Backends{}, errors.Wrap(err, "new fireblocks")
}

// TODO(corver): Fireblocks keys need to be funded on private/internal chains we deploy.

return ethbackend.NewFireBackends(ctx, testnet, fireCl)
}

Expand Down
8 changes: 4 additions & 4 deletions e2e/app/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ func writeRelayerConfig(ctx context.Context, def Definition, logCfg log.Config)
return nil
}

func writeSolverConfig(_ context.Context, def Definition, logCfg log.Config) error {
func writeSolverConfig(ctx context.Context, def Definition, logCfg log.Config) error {
confRoot := filepath.Join(def.Testnet.Dir, "solver")

const (
Expand All @@ -516,10 +516,10 @@ func writeSolverConfig(_ context.Context, def Definition, logCfg log.Config) err
endpoints = ExternalEndpoints(def)
}

// TODO(corver): save proper private key
privKey, err := ethcrypto.GenerateKey()
// Save private key
privKey, err := eoa.PrivateKey(ctx, def.Testnet.Network, eoa.RoleSolver)
if err != nil {
return errors.Wrap(err, "generate private key")
return errors.Wrap(err, "get relayer key")
}
if err := ethcrypto.SaveECDSA(filepath.Join(confRoot, privKeyFile), privKey); err != nil {
return errors.Wrap(err, "write private key")
Expand Down
34 changes: 34 additions & 0 deletions lib/ethclient/ethbackend/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/omni-network/omni/lib/fireblocks"
"github.com/omni-network/omni/lib/netconf"
"github.com/omni-network/omni/lib/txmgr"
"github.com/omni-network/omni/lib/xchain"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -82,6 +83,30 @@ func NewFireBackends(ctx context.Context, testnet types.Testnet, fireCl firebloc
}, nil
}

func BackendsFromNetwork(network netconf.Network, endpoints xchain.RPCEndpoints, privKeys ...*ecdsa.PrivateKey) (Backends, error) {
inner := make(map[uint64]*Backend)
for _, chain := range network.EVMChains() {
endpoint, err := endpoints.ByNameOrID(chain.Name, chain.ID)
if err != nil {
return Backends{}, err
}

ethCl, err := ethclient.Dial(chain.Name, endpoint)
if err != nil {
return Backends{}, errors.Wrap(err, "dial")
}

inner[chain.ID], err = NewBackend(chain.Name, chain.ID, chain.BlockPeriod, ethCl, privKeys...)
if err != nil {
return Backends{}, errors.Wrap(err, "new backend")
}
}

return Backends{
backends: inner,
}, nil
}

// NewBackends returns a multi-backends backed by in-memory keys that supports configured all chains.
func NewBackends(ctx context.Context, testnet types.Testnet, deployKeyFile string) (Backends, error) {
var err error
Expand Down Expand Up @@ -164,6 +189,15 @@ func (b Backends) All() map[uint64]*Backend {
return b.backends
}

func (b Backends) Clients() map[uint64]ethclient.Client {
clients := make(map[uint64]ethclient.Client)
for chainID, backend := range b.backends {
clients[chainID] = backend.Client
}

return clients
}

func (b Backends) Backend(sourceChainID uint64) (*Backend, error) {
backend, ok := b.backends[sourceChainID]
if !ok {
Expand Down
138 changes: 88 additions & 50 deletions solver/app/app.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
//nolint:unused // It will be used in PRs.
package app

import (
Expand All @@ -10,6 +9,7 @@ import (
"github.com/omni-network/omni/contracts/bindings"
"github.com/omni-network/omni/halo/genutil/evm/predeploys"
"github.com/omni-network/omni/lib/buildinfo"
"github.com/omni-network/omni/lib/contracts"
"github.com/omni-network/omni/lib/errors"
"github.com/omni-network/omni/lib/ethclient"
"github.com/omni-network/omni/lib/ethclient/ethbackend"
Expand All @@ -19,7 +19,9 @@ import (
"github.com/omni-network/omni/lib/xchain"
xprovider "github.com/omni-network/omni/lib/xchain/provider"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
ethcrypto "github.com/ethereum/go-ethereum/crypto"

"github.com/prometheus/client_golang/prometheus/promhttp"
)
Expand Down Expand Up @@ -50,17 +52,37 @@ func Run(ctx context.Context, cfg Config) error {
return err
}

ethClients, err := initializeEthClients(network.EVMChains(), cfg.RPCEndpoints)
if cfg.PrivateKey == "" {
return errors.New("private key not set")
}
privKey, err := ethcrypto.LoadECDSA(cfg.PrivateKey)
if err != nil {
return errors.Wrap(err, "load private key")
}
solverAddr := ethcrypto.PubkeyToAddress(privKey.PublicKey)
log.Debug(ctx, "Using solver address", "address", solverAddr.Hex())

backends, err := ethbackend.BackendsFromNetwork(network, cfg.RPCEndpoints, privKey)
if err != nil {
return err
}

_, err = newSolverDB(cfg.DBDir)
xprov := xprovider.New(network, backends.Clients(), nil)

db, err := newSolverDB(cfg.DBDir)
if err != nil {
return err
}

_ = xprovider.New(network, ethClients, nil)
cursors, err := newCursors(db)
if err != nil {
return errors.Wrap(err, "create cursor store")
}

err = startEventStreams(ctx, network, xprov, backends, solverAddr, cursors)
if err != nil {
return errors.Wrap(err, "start event streams")
}

select {
case <-ctx.Done():
Expand Down Expand Up @@ -119,68 +141,82 @@ func makePortalRegistry(network netconf.ID, endpoints xchain.RPCEndpoints) (*bin
return resp, nil
}

// initializeEthClients initializes the RPC clients for the given chains.
func initializeEthClients(chains []netconf.Chain, endpoints xchain.RPCEndpoints) (map[uint64]ethclient.Client, error) {
rpcClientPerChain := make(map[uint64]ethclient.Client)
for _, chain := range chains {
rpc, err := endpoints.ByNameOrID(chain.Name, chain.ID)
if err != nil {
return nil, err
}
c, err := ethclient.Dial(chain.Name, rpc)
if err != nil {
return nil, errors.Wrap(err, "dial rpc", "chain_name", chain.Name, "chain_id", chain.ID, "rpc_url", rpc)
}
rpcClientPerChain[chain.ID] = c
}

return rpcClientPerChain, nil
}

// startEventStreams starts the event streams for the solver.
// TODO(corver): Make this robust against chains not be available on startup.
func startEventStreams(
ctx context.Context,
network netconf.Network,
xprov xchain.Provider,
backends ethbackend.Backends,
def Definition,
solverAddr common.Address,
cursors *cursors,
) error {
addrs, err := contracts.GetAddresses(ctx, network.ID)
if err != nil {
return errors.Wrap(err, "get contract addresses")
}

inboxChains, err := detectContractChains(ctx, network, backends, addrs.SolveInbox)
if err != nil {
return errors.Wrap(err, "detect inbox chains")
}

inboxContracts := make(map[uint64]*bindings.SolveInbox)
outboxContracts := make(map[uint64]*bindings.SolveOutbox)
for _, chain := range network.EVMChains() {
// Maybe init cursor store with deploy heights
chainVer := chainVerFromID(chain.ID)
for _, chain := range inboxChains {
name := network.ChainName(chain)
chainVer := chainVerFromID(chain)
log.Debug(ctx, "Using inbox contract", "chain", name, "address", addrs.SolveInbox.Hex())

backend, err := backends.Backend(chain)
if err != nil {
return err
}

inbox, err := bindings.NewSolveInbox(addrs.SolveInbox, backend)
if err != nil {
return errors.Wrap(err, "create inbox contract", "chain", name)
}
inboxContracts[chain] = inbox

// Check if cursor store should be initialized with deploy height
if _, ok, err := cursors.Get(ctx, chainVer); err != nil {
return errors.Wrap(err, "get cursor", "chain_id", chain.ID)
} else if !ok {
height, ok := def.InboxDeployHeights[chain.ID]
if !ok {
return errors.New("missing inbox deploy height", "chain_id", chain.ID)
}
err := cursors.Set(ctx, chainVer, height)
if err != nil {
return err
}
return errors.Wrap(err, "get cursor", "chain", name)
} else if ok { // Cursor already set, skip
continue
}

backend, err := backends.Backend(chain.ID)
height, err := inbox.DeployedAt(&bind.CallOpts{Context: ctx})
if err != nil {
return errors.New("get inbox deploy height", "chain", name)
}

log.Info(ctx, "Initializing inbox cursor", "chain", name, "deployed_at", height)

if err := cursors.Set(ctx, chainVer, height.Uint64()); err != nil {
return err
}
}

inbox, err := bindings.NewSolveInbox(def.InboxAddress, backend)
outboxChains, err := detectContractChains(ctx, network, backends, addrs.SolveOutbox)
if err != nil {
return errors.Wrap(err, "detect outbox chains")
}

outboxContracts := make(map[uint64]*bindings.SolveOutbox)
for _, chain := range outboxChains {
name := network.ChainName(chain)
log.Debug(ctx, "Using outbox contract", "chain", name, "address", addrs.SolveInbox.Hex())

backend, err := backends.Backend(chain)
if err != nil {
return errors.Wrap(err, "create inbox contract", "chain_id", chain.ID)
return err
}

outbox, err := bindings.NewSolveOutbox(def.OutboxAddress, backend)
outbox, err := bindings.NewSolveOutbox(addrs.SolveOutbox, backend)
if err != nil {
return errors.Wrap(err, "create outbox contract", "chain_id", chain.ID)
return errors.Wrap(err, "create outbox contract", "chain", name)
}

inboxContracts[chain.ID] = inbox
outboxContracts[chain.ID] = outbox
outboxContracts[chain] = outbox
}

cursorSetter := func(ctx context.Context, chainID uint64, height uint64) error {
Expand All @@ -190,28 +226,30 @@ func startEventStreams(
deps := procDeps{
ParseID: newIDParser(inboxContracts),
GetRequest: newRequestGetter(inboxContracts),
ShouldReject: newRequestValidator(def),
ShouldReject: newRequestValidator(),
Accept: newAcceptor(network.ID, inboxContracts, backends, solverAddr),
Reject: newRejector(inboxContracts, backends, solverAddr),
Fulfill: newFulfiller(outboxContracts, backends, solverAddr),
Claim: newClaimer(inboxContracts, backends, solverAddr),
SetCursor: cursorSetter,
}

for _, chain := range network.EVMChains() {
go streamEventsForever(ctx, chain.ID, xprov, deps, def, cursors)
for _, chain := range inboxChains {
log.Info(ctx, "Starting inbox event stream", "chain", network.ChainName(chain))
go streamEventsForever(ctx, chain, xprov, deps, cursors, addrs.SolveInbox)
}

return nil
}

// streamEventsForever streams events from the inbox contract on the given chain.
func streamEventsForever(
ctx context.Context,
chainID uint64,
xprov xchain.Provider,
deps procDeps,
def Definition,
cursors *cursors,
inboxAddr common.Address,
) {
backoff := expbackoff.New(ctx, expbackoff.WithPeriodicConfig(time.Second*5))
for {
Expand All @@ -227,7 +265,7 @@ func streamEventsForever(
ChainID: chainID,
Height: from,
ConfLevel: confLevel,
FilterAddress: def.InboxAddress,
FilterAddress: inboxAddr,
FilterTopics: allEventTopics,
}
err = xprov.StreamEventLogs(ctx, req, newEventProcessor(deps, chainID))
Expand Down
1 change: 0 additions & 1 deletion solver/app/bindings.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ var (
return resp
}()

//nolint:unused // False positive
allEventTopics = func() []common.Hash {
resp := make([]common.Hash, 0, len(allEvents))
for _, e := range allEvents {
Expand Down
1 change: 0 additions & 1 deletion solver/app/cursor.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
//nolint:unused // It will be used
package app

import (
Expand Down
12 changes: 1 addition & 11 deletions solver/app/definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,9 @@ import (

"github.com/omni-network/omni/contracts/bindings"
"github.com/omni-network/omni/lib/errors"

"github.com/ethereum/go-ethereum/common"
)

// Definition defines the solver rules and addresses and config per network.
type Definition struct {
InboxAddress common.Address
OutboxAddress common.Address
InboxDeployHeights map[uint64]uint64
}

//nolint:unused // False positive.
func newRequestValidator(_ Definition) func(ctx context.Context, chainID uint64, req bindings.SolveRequest) (uint8, bool, error) {
func newRequestValidator() func(ctx context.Context, chainID uint64, req bindings.SolveRequest) (uint8, bool, error) {
return func(context.Context, uint64, bindings.SolveRequest) (uint8, bool, error) {
return 0, false, errors.New("not implemented")
}
Expand Down
1 change: 0 additions & 1 deletion solver/app/deps.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
//nolint:unused // Some functions are unused but are kept for future use
package app

import (
Expand Down
33 changes: 33 additions & 0 deletions solver/app/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package app

import (
"context"

"github.com/omni-network/omni/lib/errors"
"github.com/omni-network/omni/lib/ethclient/ethbackend"
"github.com/omni-network/omni/lib/netconf"

"github.com/ethereum/go-ethereum/common"
)

// detectContractChains returns the chains on which the contract is deployed at the provided address.
func detectContractChains(ctx context.Context, network netconf.Network, backends ethbackend.Backends, address common.Address) ([]uint64, error) {
var resp []uint64
for _, chain := range network.EVMChains() {
backend, err := backends.Backend(chain.ID)
if err != nil {
return nil, err
}

code, err := backend.CodeAt(ctx, address, nil)
if err != nil {
return nil, errors.Wrap(err, "get code", "chain", chain.Name)
} else if len(code) == 0 {
continue
}

resp = append(resp, chain.ID)
}

return resp, nil
}
1 change: 0 additions & 1 deletion solver/app/targets.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
//nolint:unused // Some functions are unused but are kept for future use
package app

import (
Expand Down

0 comments on commit dc55732

Please sign in to comment.