Skip to content
This repository has been archived by the owner on Oct 22, 2024. It is now read-only.

Commit

Permalink
Relayer worker pool re-architecture (paritytech#354)
Browse files Browse the repository at this point in the history
* Worker interface + WorkerPool run logic

* Comment out beefy stuff in listener for now

* Create Eth relayer using worker interface

* Update Worker interface, simplify ethrelayer

* Use factories in WorkerPool, better interface

* Add back deadlock detection

* Allow v1 or v2 relay to be run

* Fix conn bug

* Remove unused keypair
  • Loading branch information
Rizziepit authored Apr 22, 2021
1 parent 2872d28 commit d271d09
Show file tree
Hide file tree
Showing 9 changed files with 288 additions and 117 deletions.
13 changes: 8 additions & 5 deletions relayer/chain/ethereum/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,19 @@ type Listener struct {
basicOutboundChannel *outbound.BasicOutboundChannel
incentivizedOutboundChannel *outbound.IncentivizedOutboundChannel
mapping map[common.Address]string
address common.Address
messages chan<- []chain.Message
headers chan<- chain.Header
blockWaitPeriod uint64
log *logrus.Entry
}

func NewListener(config *Config, conn *Connection, messages chan<- []chain.Message,
func NewListener(
config *Config,
conn *Connection,
messages chan<- []chain.Message,
headers chan<- chain.Header,
log *logrus.Entry) (*Listener, error) {
log *logrus.Entry,
) (*Listener, error) {
return &Listener{
config: config,
conn: conn,
Expand All @@ -51,7 +54,7 @@ func NewListener(config *Config, conn *Connection, messages chan<- []chain.Messa
}, nil
}

func (li *Listener) Start(cxt context.Context, eg *errgroup.Group, initBlockHeight uint64, descendantsUntilFinal uint64) error {
func (li *Listener) Start(ctx context.Context, eg *errgroup.Group, initBlockHeight uint64, descendantsUntilFinal uint64) error {
hcs, err := NewHeaderCacheState(
eg,
initBlockHeight,
Expand All @@ -78,7 +81,7 @@ func (li *Listener) Start(cxt context.Context, eg *errgroup.Group, initBlockHeig
li.mapping[common.HexToAddress(li.config.Channels.Incentivized.Outbound)] = "IncentivizedInboundChannel.submit"

eg.Go(func() error {
err := li.pollEventsAndHeaders(cxt, initBlockHeight, descendantsUntilFinal, hcs)
err := li.pollEventsAndHeaders(ctx, initBlockHeight, descendantsUntilFinal, hcs)
if li.messages != nil {
close(li.messages)
}
Expand Down
12 changes: 12 additions & 0 deletions relayer/chain/parachain/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,15 @@ func (co *Connection) Connect(_ context.Context) error {
func (co *Connection) Close() {
// TODO: Fix design issue in GSRPC preventing on-demand closing of connections
}

func (co *Connection) Api() *gsrpc.SubstrateAPI {
return co.api
}

func (co *Connection) GenesisHash() types.Hash {
return co.genesisHash
}

func (co *Connection) Metadata() *types.Metadata {
return &co.metadata
}
7 changes: 7 additions & 0 deletions relayer/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func runCmd() *cobra.Command {
"Relay messages bi-directionally (0), from Eth to Sub (1), or from Sub to Eth (2)",
)
cmd.PersistentFlags().Bool("headers-only", false, "Only forward headers")
cmd.PersistentFlags().Bool("v2", false, "Use the new relayer")
return cmd
}

Expand All @@ -37,6 +38,12 @@ func RunFn(cmd *cobra.Command, _ []string) error {
viper.BindPFlag("relay.direction", cmd.Flags().Lookup("direction"))
viper.BindPFlag("relay.headers-only", cmd.Flags().Lookup("headers-only"))

useV2 := cmd.Flags().Lookup("v2").Value.String() == "true"
if useV2 {
relay := &core.RelayV2{}
return relay.Run()
}

relay, err := core.NewRelay()
if err != nil {
logrus.WithField("error", err).Error("Failed to initialize relayer")
Expand Down
11 changes: 5 additions & 6 deletions relayer/core/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,11 @@ func NewRelay() (*Relay, error) {
ethRelayer := &ethrelayer.Worker{}

if config.Workers.EthRelayer == true {
ethRelayer, err = ethrelayer.NewWorker(&config.Eth, &config.Parachain)
if err != nil {
return nil, err
}
ethRelayer = ethrelayer.NewWorker(
&config.Eth,
&config.Parachain,
log.WithField("worker", ethrelayer.Name),
)
}

return &Relay{
Expand Down Expand Up @@ -117,7 +118,6 @@ func (re *Relay) Start() {
return
}
log.WithField("name", re.ethRelayer.Name()).Info("Started worker")
defer re.ethRelayer.Stop()
}

if re.beefyRelayer != nil {
Expand Down Expand Up @@ -174,7 +174,6 @@ func (re *Relay) Start() {
}

log.WithError(ctx.Err()).Error("Goroutines appear deadlocked. Killing process")
re.ethRelayer.Stop()
re.parachainCommitmentRelayer.Stop()
re.beefyRelayer.Stop()

Expand Down
35 changes: 35 additions & 0 deletions relayer/core/relayV2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2021 Snowfork
// SPDX-License-Identifier: LGPL-3.0-only

package core

import (
"github.com/sirupsen/logrus"

"github.com/snowfork/polkadot-ethereum/relayer/workers"
"github.com/snowfork/polkadot-ethereum/relayer/workers/ethrelayer"
)

type RelayV2 struct{}

func (re *RelayV2) Run() error {
config, err := LoadConfig()
if err != nil {
return err
}

ethrelayerFactory := func() (workers.Worker, error) {
return ethrelayer.NewWorker(
&config.Eth,
&config.Parachain,
logrus.WithField("worker", ethrelayer.Name),
), nil
}

// TODO: add all workers
pool := workers.WorkerPool{
ethrelayerFactory,
}

return pool.Run()
}
16 changes: 10 additions & 6 deletions relayer/workers/ethrelayer/ethereum-listener.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright 2021 Snowfork
// SPDX-License-Identifier: LGPL-3.0-only

package ethrelayer

import (
Expand Down Expand Up @@ -26,16 +29,18 @@ type EthereumListener struct {
basicOutboundChannel *outbound.BasicOutboundChannel
incentivizedOutboundChannel *outbound.IncentivizedOutboundChannel
mapping map[common.Address]string
address common.Address
messages chan<- []chain.Message
headers chan<- chain.Header
blockWaitPeriod uint64
log *logrus.Entry
}

func NewEthereumListener(config *ethereum.Config, conn *ethereum.Connection, messages chan<- []chain.Message,
func NewEthereumListener(
config *ethereum.Config,
conn *ethereum.Connection,
messages chan<- []chain.Message,
headers chan<- chain.Header,
log *logrus.Entry) (*EthereumListener, error) {
log *logrus.Entry,
) *EthereumListener {
return &EthereumListener{
config: config,
conn: conn,
Expand All @@ -44,9 +49,8 @@ func NewEthereumListener(config *ethereum.Config, conn *ethereum.Connection, mes
mapping: make(map[common.Address]string),
messages: messages,
headers: headers,
blockWaitPeriod: 0,
log: log,
}, nil
}
}

func (li *EthereumListener) Start(cxt context.Context, eg *errgroup.Group, initBlockHeight uint64, descendantsUntilFinal uint64) error {
Expand Down
Loading

0 comments on commit d271d09

Please sign in to comment.