Skip to content

Commit

Permalink
* Add src, dst parameter
Browse files Browse the repository at this point in the history
* Fix start time variable
* Change relay parameter 

Signed-off-by: Dongri Jin <[email protected]>
  • Loading branch information
dongrie committed Nov 28, 2023
1 parent 37c0f44 commit 90a61b7
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 63 deletions.
28 changes: 21 additions & 7 deletions cmd/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ func serviceCmd(ctx *config.Context) *cobra.Command {

func startCmd(ctx *config.Context) *cobra.Command {
const (
flagRelayInterval = "relay-interval"
flagPrometheusAddr = "prometheus-addr"
flagRelayOptimizeInterval = "relay-optimize-interval"
flagRelayOptimizeCount = "relay-optimize-count"
flagRelayInterval = "relay-interval"
flagPrometheusAddr = "prometheus-addr"
flagSrcRelayOptimizeInterval = "src-relay-optimize-interval"
flagSrcRelayOptimizeCount = "src-relay-optimize-count"
flagDstRelayOptimizeInterval = "dst-relay-optimize-interval"
flagDstRelayOptimizeCount = "dst-relay-optimize-count"
)
const (
defaultRelayInterval = 3 * time.Second
Expand Down Expand Up @@ -64,12 +66,24 @@ func startCmd(ctx *config.Context) *cobra.Command {
if err := st.SetupRelay(context.TODO(), c[src], c[dst]); err != nil {
return err
}
return core.StartService(context.Background(), st, c[src], c[dst], viper.GetDuration(flagRelayInterval), viper.GetDuration(flagRelayOptimizeInterval), viper.GetInt64(flagRelayOptimizeCount))
return core.StartService(
context.Background(),
st,
c[src],
c[dst],
viper.GetDuration(flagRelayInterval),
viper.GetDuration(flagSrcRelayOptimizeInterval),
viper.GetInt64(flagSrcRelayOptimizeCount),
viper.GetDuration(flagDstRelayOptimizeInterval),
viper.GetInt64(flagDstRelayOptimizeCount),
)
},
}
cmd.Flags().Duration(flagRelayInterval, defaultRelayInterval, "time interval to perform relays")
cmd.Flags().String(flagPrometheusAddr, defaultPrometheusAddr, "host address to which the prometheus exporter listens")
cmd.Flags().Duration(flagRelayOptimizeInterval, defaultRelayOptimizeInterval, "time interval to perform relays optimization")
cmd.Flags().Int64(flagRelayOptimizeCount, defaultRelayOptimizeCount, "number of packets to relays optimization")
cmd.Flags().Duration(flagSrcRelayOptimizeInterval, defaultRelayOptimizeInterval, "time interval to perform relays optimization on source chain")
cmd.Flags().Int64(flagSrcRelayOptimizeCount, defaultRelayOptimizeCount, "number of packets to relays optimization on source chain")
cmd.Flags().Duration(flagDstRelayOptimizeInterval, defaultRelayOptimizeInterval, "time interval to perform relays optimization on destination chain")
cmd.Flags().Int64(flagDstRelayOptimizeCount, defaultRelayOptimizeCount, "number of packets to relays optimization on destination chain")
return cmd
}
4 changes: 2 additions & 2 deletions cmd/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func relayMsgsCmd(ctx *config.Context) *cobra.Command {
msgs.Merge(m)
}

if m, err := st.RelayPackets(c[src], c[dst], sp, sh); err != nil {
if m, err := st.RelayPackets(c[src], c[dst], sp, sh, true, true); err != nil {
return err
} else {
msgs.Merge(m)
Expand Down Expand Up @@ -301,7 +301,7 @@ func relayAcksCmd(ctx *config.Context) *cobra.Command {
msgs.Merge(m)
}

if m, err := st.RelayAcknowledgements(c[src], c[dst], sp, sh); err != nil {
if m, err := st.RelayAcknowledgements(c[src], c[dst], sp, sh, true, true); err != nil {
return err
} else {
msgs.Merge(m)
Expand Down
20 changes: 6 additions & 14 deletions core/naive-strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ type NaiveStrategy struct {
dstNoAck bool

metrics naiveStrategyMetrics

srcSkipRelay bool
dstSkipRelay bool
}

type naiveStrategyMetrics struct {
Expand Down Expand Up @@ -202,7 +199,7 @@ func (st *NaiveStrategy) UnrelayedPackets(src, dst *ProvableChain, sh SyncHeader
}, nil
}

func (st *NaiveStrategy) RelayPackets(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders) (*RelayMsgs, error) {
func (st *NaiveStrategy) RelayPackets(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders, srcOptimizeRelay, dstOptimizeRelay bool) (*RelayMsgs, error) {
logger := GetChannelPairLogger(src, dst)
defer logger.TimeTrack(time.Now(), "RelayPackets", "num_src", len(rp.Src), "num_dst", len(rp.Dst))

Expand All @@ -228,7 +225,7 @@ func (st *NaiveStrategy) RelayPackets(src, dst *ProvableChain, rp *RelayPackets,
return nil, err
}

if !st.srcSkipRelay {
if srcOptimizeRelay {
msgs.Dst, err = collectPackets(srcCtx, src, rp.Src, dstAddress)
if err != nil {
logger.Error(
Expand All @@ -239,7 +236,7 @@ func (st *NaiveStrategy) RelayPackets(src, dst *ProvableChain, rp *RelayPackets,
}
}

if !st.dstSkipRelay {
if dstOptimizeRelay {
msgs.Src, err = collectPackets(dstCtx, dst, rp.Dst, srcAddress)
if err != nil {
logger.Error(
Expand Down Expand Up @@ -423,7 +420,7 @@ func logPacketsRelayed(src, dst Chain, num int, obj string, dir string) {
)
}

func (st *NaiveStrategy) RelayAcknowledgements(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders) (*RelayMsgs, error) {
func (st *NaiveStrategy) RelayAcknowledgements(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders, srcOptimizeRelay, dstOptimizeRelay bool) (*RelayMsgs, error) {
logger := GetChannelPairLogger(src, dst)
defer logger.TimeTrack(time.Now(), "RelayAcknowledgements", "num_src", len(rp.Src), "num_dst", len(rp.Dst))

Expand All @@ -448,13 +445,13 @@ func (st *NaiveStrategy) RelayAcknowledgements(src, dst *ProvableChain, rp *Rela
return nil, err
}

if !st.dstNoAck && !st.srcSkipRelay {
if !st.dstNoAck && srcOptimizeRelay {
msgs.Dst, err = collectAcks(srcCtx, src, rp.Src, dstAddress)
if err != nil {
return nil, err
}
}
if !st.srcNoAck && !st.dstSkipRelay {
if !st.srcNoAck && dstOptimizeRelay {
msgs.Src, err = collectAcks(dstCtx, dst, rp.Dst, srcAddress)
if err != nil {
return nil, err
Expand Down Expand Up @@ -576,11 +573,6 @@ func (st *NaiveStrategy) Send(src, dst Chain, msgs *RelayMsgs) {
)
}

func (st *NaiveStrategy) SkipRelay(src, dst bool) {
st.srcSkipRelay = src
st.dstSkipRelay = dst
}

func (st *naiveStrategyMetrics) updateBacklogMetrics(ctx context.Context, src, dst ChainInfo, newSrcBacklog, newDstBacklog PacketInfoList) error {
srcAttrs := []attribute.KeyValue{
attribute.Key("chain_id").String(src.ChainID()),
Expand Down
86 changes: 53 additions & 33 deletions core/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,27 @@ func StartService(
ctx context.Context,
st StrategyI,
src, dst *ProvableChain,
relayInterval, relayOptimizeInterval time.Duration,
relayOptimizeCount int64) error {
relayInterval,
srcRelayOptimizeInterval time.Duration,
srcRelayOptimizeCount int64,
dstRelayOptimizaInterval time.Duration,
dstRelayOptimizeCount int64,
) error {
sh, err := NewSyncHeaders(src, dst)
if err != nil {
return err
}
srv := NewRelayService(st, src, dst, sh, relayInterval, relayOptimizeInterval, relayOptimizeCount)
srv := NewRelayService(
st,
src,
dst,
sh,
relayInterval,
srcRelayOptimizeInterval,
srcRelayOptimizeCount,
dstRelayOptimizaInterval,
dstRelayOptimizeCount,
)
return srv.Start(ctx)
}

Expand All @@ -32,8 +46,10 @@ type RelayService struct {
}

type OptimizeRelay struct {
optimizeInterval time.Duration
optimizeCount int64
srcOptimizeInterval time.Duration
srcOptimizeCount int64
dstOptimizeInterval time.Duration
dstOptimizeCount int64

srcRelayPacketStartTime WatchStartTime
dstRelayPacketStartTime WatchStartTime
Expand All @@ -42,41 +58,47 @@ type OptimizeRelay struct {
}

type WatchStartTime struct {
Reset bool
StartTime time.Time
AlreadySet bool
StartTime time.Time
}

// NewRelayService returns a new service
func NewRelayService(
st StrategyI,
src, dst *ProvableChain,
sh SyncHeaders,
interval, optimizeInterval time.Duration,
optimizeCount int64) *RelayService {
interval,
srcOptimizeInterval time.Duration,
srcOptimizeCount int64,
dstOptimizeInterval time.Duration,
dstOptimizeCount int64,
) *RelayService {
return &RelayService{
src: src,
dst: dst,
st: st,
sh: sh,
interval: interval,
optimizeRelay: OptimizeRelay{
optimizeInterval: optimizeInterval,
optimizeCount: optimizeCount,
srcOptimizeInterval: srcOptimizeInterval,
srcOptimizeCount: srcOptimizeCount,
dstOptimizeInterval: dstOptimizeInterval,
dstOptimizeCount: dstOptimizeCount,
srcRelayPacketStartTime: WatchStartTime{
Reset: false,
StartTime: time.Now(),
AlreadySet: true,
StartTime: time.Now(),
},
dstRelayPacketStartTime: WatchStartTime{
Reset: false,
StartTime: time.Now(),
AlreadySet: true,
StartTime: time.Now(),
},
srcRelayAckStartTime: WatchStartTime{
Reset: false,
StartTime: time.Now(),
AlreadySet: true,
StartTime: time.Now(),
},
dstRelayAckStartTime: WatchStartTime{
Reset: false,
StartTime: time.Now(),
AlreadySet: true,
StartTime: time.Now(),
},
},
}
Expand Down Expand Up @@ -153,16 +175,15 @@ func (srv *RelayService) Serve(ctx context.Context) error {

// relay packets if unrelayed seqs exist
srcRelayPackets, dstRelayPackets := srv.shouldExecuteRelay(pseqs, srv.optimizeRelay.srcRelayPacketStartTime, srv.optimizeRelay.dstRelayPacketStartTime)
srv.st.SkipRelay(!srcRelayPackets, !dstRelayPackets)
if m, err := srv.st.RelayPackets(srv.src, srv.dst, pseqs, srv.sh); err != nil {
if m, err := srv.st.RelayPackets(srv.src, srv.dst, pseqs, srv.sh, srcRelayPackets, dstRelayPackets); err != nil {
logger.Error("failed to relay packets", err)
return err
} else {
if srcRelayPackets {
srv.optimizeRelay.srcRelayPacketStartTime.Reset = true
srv.optimizeRelay.srcRelayPacketStartTime.AlreadySet = false
}
if dstRelayPackets {
srv.optimizeRelay.dstRelayPacketStartTime.Reset = true
srv.optimizeRelay.dstRelayPacketStartTime.AlreadySet = false
}
msgs.Merge(m)
}
Expand All @@ -177,16 +198,15 @@ func (srv *RelayService) Serve(ctx context.Context) error {

// relay acks if unrelayed seqs exist
srcRelayAcks, dstRelayAcks := srv.shouldExecuteRelay(aseqs, srv.optimizeRelay.srcRelayAckStartTime, srv.optimizeRelay.dstRelayAckStartTime)
srv.st.SkipRelay(!srcRelayAcks, !dstRelayAcks)
if m, err := srv.st.RelayAcknowledgements(srv.src, srv.dst, aseqs, srv.sh); err != nil {
if m, err := srv.st.RelayAcknowledgements(srv.src, srv.dst, aseqs, srv.sh, srcRelayAcks, dstRelayAcks); err != nil {
logger.Error("failed to relay acknowledgements", err)
return err
} else {
if srcRelayAcks {
srv.optimizeRelay.srcRelayAckStartTime.Reset = true
srv.optimizeRelay.srcRelayAckStartTime.AlreadySet = false
}
if dstRelayAcks {
srv.optimizeRelay.dstRelayAckStartTime.Reset = true
srv.optimizeRelay.dstRelayAckStartTime.AlreadySet = false
}
msgs.Merge(m)
}
Expand All @@ -204,28 +224,28 @@ func (srv *RelayService) shouldExecuteRelay(seqs *RelayPackets, srcRelayStartTim
// packet count
srcRelayCount := len(seqs.Src)
dstRelayCount := len(seqs.Dst)
if int64(srcRelayCount) >= srv.optimizeRelay.optimizeCount {
if int64(srcRelayCount) >= srv.optimizeRelay.srcOptimizeCount {
srcRelay = true
}
if int64(dstRelayCount) >= srv.optimizeRelay.optimizeCount {
if int64(dstRelayCount) >= srv.optimizeRelay.dstOptimizeCount {
dstRelay = true
}

// time interval
if time.Since(srcRelayStartTime.StartTime) >= srv.optimizeRelay.optimizeInterval {
if time.Since(srcRelayStartTime.StartTime) >= srv.optimizeRelay.srcOptimizeInterval {
srcRelay = true
}
if time.Since(dstRelayStartTime.StartTime) >= srv.optimizeRelay.optimizeInterval {
if time.Since(dstRelayStartTime.StartTime) >= srv.optimizeRelay.dstOptimizeInterval {
dstRelay = true
}

return srcRelay, dstRelay
}

func resetWatchStartTime(watchStartTime *WatchStartTime) {
if !watchStartTime.Reset {
if watchStartTime.AlreadySet {
return
}
watchStartTime.Reset = false
watchStartTime.AlreadySet = true
watchStartTime.StartTime = time.Now()
}
7 changes: 2 additions & 5 deletions core/strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,20 @@ type StrategyI interface {
UnrelayedPackets(src, dst *ProvableChain, sh SyncHeaders, includeRelayedButUnfinalized bool) (*RelayPackets, error)

// RelayPackets executes RecvPacket to the packets contained in `rp` on both chains (`src` and `dst`).
RelayPackets(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders) (*RelayMsgs, error)
RelayPackets(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders, srcOptimizeRelay, dstOptimizeRelay bool) (*RelayMsgs, error)

// UnrelayedAcknowledgements returns packets to execute AcknowledgePacket to on `src` and `dst`.
// `includeRelayedButUnfinalized` decides if the result includes packets of which acknowledgePacket has been executed but not finalized
UnrelayedAcknowledgements(src, dst *ProvableChain, sh SyncHeaders, includeRelayedButUnfinalized bool) (*RelayPackets, error)

// RelayAcknowledgements executes AcknowledgePacket to the packets contained in `rp` on both chains (`src` and `dst`).
RelayAcknowledgements(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders) (*RelayMsgs, error)
RelayAcknowledgements(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders, srcOptimizeRelay, dstOptimizeRelay bool) (*RelayMsgs, error)

// UpdateClients executes UpdateClient only if needed
UpdateClients(src, dst *ProvableChain, rpForRecv, rpForAck *RelayPackets, sh SyncHeaders, doRefresh bool) (*RelayMsgs, error)

// Send executes submission of msgs to src/dst chains
Send(src, dst Chain, msgs *RelayMsgs)

// SkipRelay skips relay on `src` and `dst`
SkipRelay(src, dst bool)
}

// StrategyCfg defines which relaying strategy to take for a given path
Expand Down
2 changes: 1 addition & 1 deletion tests/cases/tm2tm/scripts/test-service
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ TM_ADDRESS1=$(${RLY} tendermint keys show ibc1 testkey)
RETRY_COUNT=10
RETRY_INTERVAL=1

${RLY} service start ibc01 --relay-interval 10s --relay-optimize-interval 15s --relay-optimize-count 3 &
${RLY} service start ibc01 --relay-interval 10s --src-relay-optimize-interval 15s --src-relay-optimize-count 3 --dst-relay-optimize-interval 15s --dst-relay-optimize-count 3 &
RLY_PID=$!

# transfer a token
Expand Down
2 changes: 1 addition & 1 deletion tests/cases/tmmock2tmmock/scripts/test-service
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ TM_ADDRESS1=$(${RLY} tendermint keys show ibc1 testkey)
RETRY_COUNT=10
RETRY_INTERVAL=1

${RLY} service start ibc01 --relay-interval 10s --relay-optimize-interval 15s --relay-optimize-count 3 &
${RLY} service start ibc01 --relay-interval 10s --src-relay-optimize-interval 15s --src-relay-optimize-count 3 --dst-relay-optimize-interval 15s --dst-relay-optimize-count 3 &
RLY_PID=$!

# transfer a token
Expand Down

0 comments on commit 90a61b7

Please sign in to comment.