From bf144b0edb183c1ccf593ac85351bc40ddec0fe5 Mon Sep 17 00:00:00 2001 From: dongrie <126585618+dongrie@users.noreply.github.com> Date: Wed, 31 Jan 2024 20:21:33 +0900 Subject: [PATCH] Optimize Relay (#108) * Optimize Relay Signed-off-by: Dongri Jin * Fix curerntTime Signed-off-by: Dongri Jin * Add src, dst current time Signed-off-by: Dongri Jin * Add optimizeRelay Signed-off-by: Dongri Jin * Refactoring optimize relay Signed-off-by: Dongri Jin * * Add src, dst parameter * Fix start time variable * Change relay parameter Signed-off-by: Dongri Jin * Fix update clients Signed-off-by: Dongri Jin * Fix src, dst parameters Signed-off-by: Dongri Jin * Fix src, dst Signed-off-by: Dongri Jin * Fix tm2tm test Signed-off-by: Dongri Jin * Refactoring shouldExecuteRelay Signed-off-by: Dongri Jin * Refactoring test-service Signed-off-by: Dongri Jin * Add ack test to service Signed-off-by: Dongri Jin * Fix error log Signed-off-by: Dongri Jin * Fix service interval Signed-off-by: Dongri Jin * Fix test sleep Signed-off-by: Dongri Jin * Fix service test Signed-off-by: Dongri Jin * Remove startTime Use eventHeight Signed-off-by: Dongri Jin * Add logger Signed-off-by: Dongri Jin * * Fix Error messages * Fix check src, dst timestamp * Fix test case comments Signed-off-by: Dongri Jin * Fix error logs Signed-off-by: Dongri Jin * Fix error message Signed-off-by: Dongri Jin * Fix error message Signed-off-by: Dongri Jin --------- Signed-off-by: Dongri Jin --- chains/tendermint/query.go | 22 ++-- cmd/service.go | 30 ++++- cmd/tx.go | 18 ++- core/naive-strategy.go | 66 ++++++----- core/service.go | 110 ++++++++++++++++-- core/strategies.go | 6 +- tests/cases/tm2tm/Makefile | 1 + tests/cases/tm2tm/scripts/test-service | 98 ++++++++++++++++ tests/cases/tm2tm/scripts/utils | 15 +++ tests/cases/tmmock2tmmock/Makefile | 1 + .../cases/tmmock2tmmock/scripts/test-service | 103 ++++++++++++++++ tests/cases/tmmock2tmmock/scripts/test-tx | 38 +----- tests/cases/tmmock2tmmock/scripts/utils | 31 +++++ 13 files changed, 439 insertions(+), 100 deletions(-) create mode 100755 tests/cases/tm2tm/scripts/test-service create mode 100644 tests/cases/tm2tm/scripts/utils create mode 100755 tests/cases/tmmock2tmmock/scripts/test-service create mode 100644 tests/cases/tmmock2tmmock/scripts/utils diff --git a/chains/tendermint/query.go b/chains/tendermint/query.go index 110384db..949021a8 100644 --- a/chains/tendermint/query.go +++ b/chains/tendermint/query.go @@ -184,7 +184,7 @@ func (c *Chain) QueryUnreceivedPackets(ctx core.QueryContext, seqs []uint64) ([] PacketCommitmentSequences: seqs, }) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to query unreceived packets: error=%w height=%v", err, ctx.Height()) } return res.Sequences, nil } @@ -192,14 +192,14 @@ func (c *Chain) QueryUnreceivedPackets(ctx core.QueryContext, seqs []uint64) ([] func (c *Chain) QueryUnfinalizedRelayPackets(ctx core.QueryContext, counterparty core.LightClientICS04Querier) (core.PacketInfoList, error) { res, err := c.queryPacketCommitments(ctx, 0, 1000) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to query packet commitments: error=%w height=%v", err, ctx.Height()) } var packets core.PacketInfoList for _, ps := range res.Commitments { packet, height, err := c.querySentPacket(ctx, ps.Sequence) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to query sent packet: error=%w height=%v", err, ctx.Height()) } packets = append(packets, &core.PacketInfo{ Packet: *packet, @@ -210,14 +210,14 @@ func (c *Chain) QueryUnfinalizedRelayPackets(ctx core.QueryContext, counterparty var counterpartyCtx core.QueryContext if counterpartyH, err := counterparty.GetLatestFinalizedHeader(); err != nil { - return nil, err + return nil, fmt.Errorf("failed to get latest finalized header: error=%w height=%v", err, ctx.Height()) } else { counterpartyCtx = core.NewQueryContext(context.TODO(), counterpartyH.GetHeight()) } seqs, err := counterparty.QueryUnreceivedPackets(counterpartyCtx, packets.ExtractSequenceList()) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to query counterparty for unreceived packets: error=%w, height=%v", err, counterpartyCtx.Height()) } packets = packets.Filter(seqs) @@ -233,7 +233,7 @@ func (c *Chain) QueryUnreceivedAcknowledgements(ctx core.QueryContext, seqs []ui PacketAckSequences: seqs, }) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to query unreceived acks: : error=%w height=%v", err, ctx.Height()) } return res.Sequences, nil } @@ -241,18 +241,18 @@ func (c *Chain) QueryUnreceivedAcknowledgements(ctx core.QueryContext, seqs []ui func (c *Chain) QueryUnfinalizedRelayAcknowledgements(ctx core.QueryContext, counterparty core.LightClientICS04Querier) (core.PacketInfoList, error) { res, err := c.queryPacketAcknowledgementCommitments(ctx, 0, 1000) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to query packet acknowledgement commitments: error=%w height=%v", err, ctx.Height()) } var packets core.PacketInfoList for _, ps := range res.Acknowledgements { packet, rpHeight, err := c.queryReceivedPacket(ctx, ps.Sequence) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to query received packet: error=%w height=%v", err, ctx.Height()) } ack, _, err := c.queryWrittenAcknowledgement(ctx, ps.Sequence) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to query written acknowledgement: error=%w height=%v", err, ctx.Height()) } packets = append(packets, &core.PacketInfo{ Packet: *packet, @@ -263,14 +263,14 @@ func (c *Chain) QueryUnfinalizedRelayAcknowledgements(ctx core.QueryContext, cou var counterpartyCtx core.QueryContext if counterpartyH, err := counterparty.GetLatestFinalizedHeader(); err != nil { - return nil, err + return nil, fmt.Errorf("failed to get latest finalized header: error=%w height=%v", err, ctx.Height()) } else { counterpartyCtx = core.NewQueryContext(context.TODO(), counterpartyH.GetHeight()) } seqs, err := counterparty.QueryUnreceivedAcknowledgements(counterpartyCtx, packets.ExtractSequenceList()) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to query counterparty for unreceived acknowledgements: error=%w height=%v", err, counterpartyCtx.Height()) } packets = packets.Filter(seqs) diff --git a/cmd/service.go b/cmd/service.go index 257582e1..e7c38f18 100644 --- a/cmd/service.go +++ b/cmd/service.go @@ -27,12 +27,18 @@ func serviceCmd(ctx *config.Context) *cobra.Command { func startCmd(ctx *config.Context) *cobra.Command { const ( - flagRelayInterval = "relay-interval" - flagPrometheusAddr = "prometheus-addr" + flagRelayInterval = "relay-interval" + flagPrometheusAddr = "prometheus-addr" + flagSrcRelayOptimizeInterval = "src-relay-optimize-interval" + flagSrcRelayOptimizeCount = "src-relay-optimize-count" + flagDstRelayOptimizeInterval = "dst-relay-optimize-interval" + flagDstRelayOptimizeCount = "dst-relay-optimize-count" ) const ( - defaultRelayInterval = 3 * time.Second - defaultPrometheusAddr = "localhost:2223" + defaultRelayInterval = 3 * time.Second + defaultPrometheusAddr = "localhost:2223" + defaultRelayOptimizeInterval = 10 * time.Second + defaultRelayOptimizeCount = 5 ) cmd := &cobra.Command{ @@ -60,10 +66,24 @@ func startCmd(ctx *config.Context) *cobra.Command { if err := st.SetupRelay(context.TODO(), c[src], c[dst]); err != nil { return err } - return core.StartService(context.Background(), st, c[src], c[dst], viper.GetDuration(flagRelayInterval)) + return core.StartService( + context.Background(), + st, + c[src], + c[dst], + viper.GetDuration(flagRelayInterval), + viper.GetDuration(flagSrcRelayOptimizeInterval), + viper.GetUint64(flagSrcRelayOptimizeCount), + viper.GetDuration(flagDstRelayOptimizeInterval), + viper.GetUint64(flagDstRelayOptimizeCount), + ) }, } cmd.Flags().Duration(flagRelayInterval, defaultRelayInterval, "time interval to perform relays") cmd.Flags().String(flagPrometheusAddr, defaultPrometheusAddr, "host address to which the prometheus exporter listens") + cmd.Flags().Duration(flagSrcRelayOptimizeInterval, defaultRelayOptimizeInterval, "maximum time interval to delay relays for optimization") + cmd.Flags().Uint64(flagSrcRelayOptimizeCount, defaultRelayOptimizeCount, "maximum number of relays to delay for optimization") + cmd.Flags().Duration(flagDstRelayOptimizeInterval, defaultRelayOptimizeInterval, "maximum time interval to delay relays for optimization") + cmd.Flags().Uint64(flagDstRelayOptimizeCount, defaultRelayOptimizeCount, "maximum number of relays to delay for optimization") return cmd } diff --git a/cmd/tx.go b/cmd/tx.go index 84bcb335..8c298964 100644 --- a/cmd/tx.go +++ b/cmd/tx.go @@ -245,13 +245,18 @@ func relayMsgsCmd(ctx *config.Context) *cobra.Command { msgs := core.NewRelayMsgs() - if m, err := st.UpdateClients(c[src], c[dst], sp, &core.RelayPackets{}, sh, viper.GetBool(flagDoRefresh)); err != nil { + doExecuteRelaySrc := len(sp.Dst) > 0 + doExecuteRelayDst := len(sp.Src) > 0 + doExecuteAckSrc := false + doExecuteAckDst := false + + if m, err := st.UpdateClients(c[src], c[dst], doExecuteRelaySrc, doExecuteRelayDst, doExecuteAckSrc, doExecuteAckDst, sh, viper.GetBool(flagDoRefresh)); err != nil { return err } else { msgs.Merge(m) } - if m, err := st.RelayPackets(c[src], c[dst], sp, sh); err != nil { + if m, err := st.RelayPackets(c[src], c[dst], sp, sh, doExecuteRelaySrc, doExecuteRelayDst); err != nil { return err } else { msgs.Merge(m) @@ -315,13 +320,18 @@ func relayAcksCmd(ctx *config.Context) *cobra.Command { msgs := core.NewRelayMsgs() - if m, err := st.UpdateClients(c[src], c[dst], &core.RelayPackets{}, sp, sh, viper.GetBool(flagDoRefresh)); err != nil { + doExecuteRelaySrc := false + doExecuteRelayDst := false + doExecuteAckSrc := len(sp.Dst) > 0 + doExecuteAckDst := len(sp.Src) > 0 + + if m, err := st.UpdateClients(c[src], c[dst], doExecuteRelaySrc, doExecuteRelayDst, doExecuteAckSrc, doExecuteAckDst, sh, viper.GetBool(flagDoRefresh)); err != nil { return err } else { msgs.Merge(m) } - if m, err := st.RelayAcknowledgements(c[src], c[dst], sp, sh); err != nil { + if m, err := st.RelayAcknowledgements(c[src], c[dst], sp, sh, doExecuteAckSrc, doExecuteAckDst); err != nil { return err } else { msgs.Merge(m) diff --git a/core/naive-strategy.go b/core/naive-strategy.go index 3f401c85..387186bf 100644 --- a/core/naive-strategy.go +++ b/core/naive-strategy.go @@ -101,7 +101,7 @@ func (st *NaiveStrategy) UnrelayedPackets(src, dst *ProvableChain, sh SyncHeader now := time.Now() srcPackets, err = src.QueryUnfinalizedRelayPackets(srcCtx, dst) if err != nil { - return err + return fmt.Errorf("failed to query unfinalized relay packets on src chain: %w", err) } logger.TimeTrack(now, "QueryUnfinalizedRelayPackets", "queried_chain", "src", "num_packets", len(srcPackets)) return nil @@ -123,7 +123,7 @@ func (st *NaiveStrategy) UnrelayedPackets(src, dst *ProvableChain, sh SyncHeader now := time.Now() dstPackets, err = dst.QueryUnfinalizedRelayPackets(dstCtx, src) if err != nil { - return err + return fmt.Errorf("failed to query unfinalized relay packets on dst chain: %w", err) } logger.TimeTrack(now, "QueryUnfinalizedRelayPackets", "queried_chain", "dst", "num_packets", len(dstPackets)) return nil @@ -168,7 +168,7 @@ func (st *NaiveStrategy) UnrelayedPackets(src, dst *ProvableChain, sh SyncHeader now := time.Now() seqs, err := dst.QueryUnreceivedPackets(dstCtx, srcPackets.ExtractSequenceList()) if err != nil { - return err + return fmt.Errorf("failed to query unreceived packets on dst chain: %w", err) } logger.TimeTrack(now, "QueryUnreceivedPackets", "queried_chain", "dst", "num_seqs", len(seqs)) srcPackets = srcPackets.Filter(seqs) @@ -179,7 +179,7 @@ func (st *NaiveStrategy) UnrelayedPackets(src, dst *ProvableChain, sh SyncHeader now := time.Now() seqs, err := src.QueryUnreceivedPackets(srcCtx, dstPackets.ExtractSequenceList()) if err != nil { - return err + return fmt.Errorf("failed to query unreceived packets on src chain: %w", err) } logger.TimeTrack(now, "QueryUnreceivedPackets", "queried_chain", "src", "num_seqs", len(seqs)) dstPackets = dstPackets.Filter(seqs) @@ -199,7 +199,7 @@ func (st *NaiveStrategy) UnrelayedPackets(src, dst *ProvableChain, sh SyncHeader }, nil } -func (st *NaiveStrategy) RelayPackets(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders) (*RelayMsgs, error) { +func (st *NaiveStrategy) RelayPackets(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders, doExecuteRelaySrc, doExecuteRelayDst bool) (*RelayMsgs, error) { logger := GetChannelPairLogger(src, dst) defer logger.TimeTrack(time.Now(), "RelayPackets", "num_src", len(rp.Src), "num_dst", len(rp.Dst)) @@ -225,21 +225,26 @@ func (st *NaiveStrategy) RelayPackets(src, dst *ProvableChain, rp *RelayPackets, return nil, err } - msgs.Dst, err = collectPackets(srcCtx, src, rp.Src, dstAddress) - if err != nil { - logger.Error( - "error collecting packets", - err, - ) - return nil, err + if doExecuteRelayDst { + msgs.Dst, err = collectPackets(srcCtx, src, rp.Src, dstAddress) + if err != nil { + logger.Error( + "error collecting packets", + err, + ) + return nil, err + } } - msgs.Src, err = collectPackets(dstCtx, dst, rp.Dst, srcAddress) - if err != nil { - logger.Error( - "error collecting packets", - err, - ) - return nil, err + + if doExecuteRelaySrc { + msgs.Src, err = collectPackets(dstCtx, dst, rp.Dst, srcAddress) + if err != nil { + logger.Error( + "error collecting packets", + err, + ) + return nil, err + } } if len(msgs.Dst) == 0 && len(msgs.Src) == 0 { @@ -281,7 +286,7 @@ func (st *NaiveStrategy) UnrelayedAcknowledgements(src, dst *ProvableChain, sh S now := time.Now() srcAcks, err = src.QueryUnfinalizedRelayAcknowledgements(srcCtx, dst) if err != nil { - return err + return fmt.Errorf("failed to query unfinalized relay acknowledgements on src chain: %w", err) } logger.TimeTrack(now, "QueryUnfinalizedRelayAcknowledgements", "queried_chain", "src", "num_packets", len(srcAcks)) return nil @@ -306,7 +311,7 @@ func (st *NaiveStrategy) UnrelayedAcknowledgements(src, dst *ProvableChain, sh S now := time.Now() dstAcks, err = dst.QueryUnfinalizedRelayAcknowledgements(dstCtx, src) if err != nil { - return err + return fmt.Errorf("failed to query unfinalized relay acknowledgements on dst chain: %w", err) } logger.TimeTrack(now, "QueryUnfinalizedRelayAcknowledgements", "queried_chain", "dst", "num_packets", len(dstAcks)) return nil @@ -350,7 +355,7 @@ func (st *NaiveStrategy) UnrelayedAcknowledgements(src, dst *ProvableChain, sh S now := time.Now() seqs, err := dst.QueryUnreceivedAcknowledgements(dstCtx, srcAcks.ExtractSequenceList()) if err != nil { - return err + return fmt.Errorf("failed to query unreceived acknowledgements on dst chain: %w", err) } logger.TimeTrack(now, "QueryUnreceivedAcknowledgements", "queried_chain", "dst", "num_seqs", len(seqs)) srcAcks = srcAcks.Filter(seqs) @@ -363,7 +368,7 @@ func (st *NaiveStrategy) UnrelayedAcknowledgements(src, dst *ProvableChain, sh S now := time.Now() seqs, err := src.QueryUnreceivedAcknowledgements(srcCtx, dstAcks.ExtractSequenceList()) if err != nil { - return err + return fmt.Errorf("failed to query unreceived acknowledgements on src chain: %w", err) } logger.TimeTrack(now, "QueryUnreceivedAcknowledgements", "queried_chain", "src", "num_seqs", len(seqs)) dstAcks = dstAcks.Filter(seqs) @@ -415,7 +420,7 @@ func logPacketsRelayed(src, dst Chain, num int, obj string, dir string) { ) } -func (st *NaiveStrategy) RelayAcknowledgements(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders) (*RelayMsgs, error) { +func (st *NaiveStrategy) RelayAcknowledgements(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders, doExecuteAckSrc, doExecuteAckDst bool) (*RelayMsgs, error) { logger := GetChannelPairLogger(src, dst) defer logger.TimeTrack(time.Now(), "RelayAcknowledgements", "num_src", len(rp.Src), "num_dst", len(rp.Dst)) @@ -440,13 +445,13 @@ func (st *NaiveStrategy) RelayAcknowledgements(src, dst *ProvableChain, rp *Rela return nil, err } - if !st.dstNoAck { + if !st.dstNoAck && doExecuteAckDst { msgs.Dst, err = collectAcks(srcCtx, src, rp.Src, dstAddress) if err != nil { return nil, err } } - if !st.srcNoAck { + if !st.srcNoAck && doExecuteAckSrc { msgs.Src, err = collectAcks(dstCtx, dst, rp.Dst, srcAddress) if err != nil { return nil, err @@ -490,16 +495,13 @@ func collectAcks(ctx QueryContext, chain *ProvableChain, packets PacketInfoList, return msgs, nil } -func (st *NaiveStrategy) UpdateClients(src, dst *ProvableChain, rpForRecv, rpForAck *RelayPackets, sh SyncHeaders, doRefresh bool) (*RelayMsgs, error) { +func (st *NaiveStrategy) UpdateClients(src, dst *ProvableChain, doExecuteRelaySrc, doExecuteRelayDst, doExecuteAckSrc, doExecuteAckDst bool, sh SyncHeaders, doRefresh bool) (*RelayMsgs, error) { logger := GetChannelPairLogger(src, dst) msgs := NewRelayMsgs() - // check if unrelayed packets or acks exist - needsUpdateForSrc := len(rpForRecv.Dst) > 0 || - !st.srcNoAck && len(rpForAck.Dst) > 0 - needsUpdateForDst := len(rpForRecv.Src) > 0 || - !st.dstNoAck && len(rpForAck.Src) > 0 + needsUpdateForSrc := doExecuteRelaySrc || (doExecuteAckSrc && !st.srcNoAck) + needsUpdateForDst := doExecuteRelayDst || (doExecuteAckDst && !st.dstNoAck) // check if LC refresh is needed if !needsUpdateForSrc && doRefresh { diff --git a/core/service.go b/core/service.go index abc8162a..b4246bb0 100644 --- a/core/service.go +++ b/core/service.go @@ -8,31 +8,73 @@ import ( ) // StartService starts a relay service -func StartService(ctx context.Context, st StrategyI, src, dst *ProvableChain, relayInterval time.Duration) error { +func StartService( + ctx context.Context, + st StrategyI, + src, dst *ProvableChain, + relayInterval, + srcRelayOptimizeInterval time.Duration, + srcRelayOptimizeCount uint64, + dstRelayOptimizaInterval time.Duration, + dstRelayOptimizeCount uint64, +) error { sh, err := NewSyncHeaders(src, dst) if err != nil { return err } - srv := NewRelayService(st, src, dst, sh, relayInterval) + srv := NewRelayService( + st, + src, + dst, + sh, + relayInterval, + srcRelayOptimizeInterval, + srcRelayOptimizeCount, + dstRelayOptimizaInterval, + dstRelayOptimizeCount, + ) return srv.Start(ctx) } type RelayService struct { - src *ProvableChain - dst *ProvableChain - st StrategyI - sh SyncHeaders - interval time.Duration + src *ProvableChain + dst *ProvableChain + st StrategyI + sh SyncHeaders + interval time.Duration + optimizeRelay OptimizeRelay +} + +type OptimizeRelay struct { + srcOptimizeInterval time.Duration + srcOptimizeCount uint64 + dstOptimizeInterval time.Duration + dstOptimizeCount uint64 } // NewRelayService returns a new service -func NewRelayService(st StrategyI, src, dst *ProvableChain, sh SyncHeaders, interval time.Duration) *RelayService { +func NewRelayService( + st StrategyI, + src, dst *ProvableChain, + sh SyncHeaders, + interval, + srcOptimizeInterval time.Duration, + srcOptimizeCount uint64, + dstOptimizeInterval time.Duration, + dstOptimizeCount uint64, +) *RelayService { return &RelayService{ src: src, dst: dst, st: st, sh: sh, interval: interval, + optimizeRelay: OptimizeRelay{ + srcOptimizeInterval: srcOptimizeInterval, + srcOptimizeCount: srcOptimizeCount, + dstOptimizeInterval: dstOptimizeInterval, + dstOptimizeCount: dstOptimizeCount, + }, } } @@ -76,7 +118,7 @@ func (srv *RelayService) Serve(ctx context.Context) error { // get unrelayed packets pseqs, err := srv.st.UnrelayedPackets(srv.src, srv.dst, srv.sh, false) if err != nil { - logger.Error("failed to get unrelayed sequences", err) + logger.Error("failed to get unrelayed packets", err) return err } @@ -89,8 +131,10 @@ func (srv *RelayService) Serve(ctx context.Context) error { msgs := NewRelayMsgs() + doExecuteRelaySrc, doExecuteRelayDst := srv.shouldExecuteRelay(pseqs) + doExecuteAckSrc, doExecuteAckDst := srv.shouldExecuteRelay(aseqs) // update clients - if m, err := srv.st.UpdateClients(srv.src, srv.dst, pseqs, aseqs, srv.sh, true); err != nil { + if m, err := srv.st.UpdateClients(srv.src, srv.dst, doExecuteRelaySrc, doExecuteRelayDst, doExecuteAckSrc, doExecuteAckDst, srv.sh, true); err != nil { logger.Error("failed to update clients", err) return err } else { @@ -98,7 +142,7 @@ func (srv *RelayService) Serve(ctx context.Context) error { } // relay packets if unrelayed seqs exist - if m, err := srv.st.RelayPackets(srv.src, srv.dst, pseqs, srv.sh); err != nil { + if m, err := srv.st.RelayPackets(srv.src, srv.dst, pseqs, srv.sh, doExecuteRelaySrc, doExecuteRelayDst); err != nil { logger.Error("failed to relay packets", err) return err } else { @@ -106,7 +150,7 @@ func (srv *RelayService) Serve(ctx context.Context) error { } // relay acks if unrelayed seqs exist - if m, err := srv.st.RelayAcknowledgements(srv.src, srv.dst, aseqs, srv.sh); err != nil { + if m, err := srv.st.RelayAcknowledgements(srv.src, srv.dst, aseqs, srv.sh, doExecuteAckSrc, doExecuteAckDst); err != nil { logger.Error("failed to relay acknowledgements", err) return err } else { @@ -118,3 +162,45 @@ func (srv *RelayService) Serve(ctx context.Context) error { return nil } + +func (srv *RelayService) shouldExecuteRelay(seqs *RelayPackets) (bool, bool) { + logger := GetChannelPairLogger(srv.src, srv.dst) + + srcRelay := false + dstRelay := false + + if len(seqs.Src) > 0 { + tsDst, err := srv.src.Timestamp(seqs.Src[0].EventHeight) + if err != nil { + return false, false + } + if time.Since(tsDst) >= srv.optimizeRelay.dstOptimizeInterval { + dstRelay = true + } + } + + if len(seqs.Dst) > 0 { + tsSrc, err := srv.dst.Timestamp(seqs.Dst[0].EventHeight) + if err != nil { + return false, false + } + if time.Since(tsSrc) >= srv.optimizeRelay.srcOptimizeInterval { + srcRelay = true + } + } + + // packet count + srcRelayCount := len(seqs.Dst) + dstRelayCount := len(seqs.Src) + + if uint64(srcRelayCount) >= srv.optimizeRelay.srcOptimizeCount { + srcRelay = true + } + if uint64(dstRelayCount) >= srv.optimizeRelay.dstOptimizeCount { + dstRelay = true + } + + logger.Info("shouldExecuteRelay", "srcRelay", srcRelay, "dstRelay", dstRelay) + + return srcRelay, dstRelay +} diff --git a/core/strategies.go b/core/strategies.go index 46edae59..c469624b 100644 --- a/core/strategies.go +++ b/core/strategies.go @@ -18,17 +18,17 @@ type StrategyI interface { UnrelayedPackets(src, dst *ProvableChain, sh SyncHeaders, includeRelayedButUnfinalized bool) (*RelayPackets, error) // RelayPackets executes RecvPacket to the packets contained in `rp` on both chains (`src` and `dst`). - RelayPackets(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders) (*RelayMsgs, error) + RelayPackets(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders, doExecuteRelaySrc, doExecuteRelayDst bool) (*RelayMsgs, error) // UnrelayedAcknowledgements returns packets to execute AcknowledgePacket to on `src` and `dst`. // `includeRelayedButUnfinalized` decides if the result includes packets of which acknowledgePacket has been executed but not finalized UnrelayedAcknowledgements(src, dst *ProvableChain, sh SyncHeaders, includeRelayedButUnfinalized bool) (*RelayPackets, error) // RelayAcknowledgements executes AcknowledgePacket to the packets contained in `rp` on both chains (`src` and `dst`). - RelayAcknowledgements(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders) (*RelayMsgs, error) + RelayAcknowledgements(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders, doExecuteAckSrc, doExecuteAckDst bool) (*RelayMsgs, error) // UpdateClients executes UpdateClient only if needed - UpdateClients(src, dst *ProvableChain, rpForRecv, rpForAck *RelayPackets, sh SyncHeaders, doRefresh bool) (*RelayMsgs, error) + UpdateClients(src, dst *ProvableChain, doExecuteRelaySrc, doExecuteRelayDst, doExecuteAckSrc, doExecuteAckDst bool, sh SyncHeaders, doRefresh bool) (*RelayMsgs, error) // Send executes submission of msgs to src/dst chains Send(src, dst Chain, msgs *RelayMsgs) diff --git a/tests/cases/tm2tm/Makefile b/tests/cases/tm2tm/Makefile index ed0dd01f..1cf67bdd 100644 --- a/tests/cases/tm2tm/Makefile +++ b/tests/cases/tm2tm/Makefile @@ -13,6 +13,7 @@ test: ./scripts/init-rly ./scripts/handshake ./scripts/test-tx + ./scripts/test-service .PHONY: network-down network-down: diff --git a/tests/cases/tm2tm/scripts/test-service b/tests/cases/tm2tm/scripts/test-service new file mode 100755 index 00000000..8b8a08f8 --- /dev/null +++ b/tests/cases/tm2tm/scripts/test-service @@ -0,0 +1,98 @@ +#!/bin/bash + +: <<'END_COMMENT' +* relay-interval = 20s +* src-relay-optimize-interval = 30s +* src-relay-optimize-count = 3 +* dst-relay-optimize-interval = 30s +* dst-relay-optimize-count = 3 + +- relay service [packets = 0, time = 0] -> skip + - sleep 4 + - transfer x 1 + - packets = 1, ack = 0 + - sleep 13 +- relay service [packets = 1, time = 20] -> skip + - sleep 4 + - transfer x 1 + - packets = 2, ack = 0 + - sleep 13 +- relay service [packets = 2, time = 40] -> exec (time) + - sleep 4 + - packets = 0, ack = 2 + - transfer x 3 + - packets = 3, ack = 2 + - sleep 8 +- relay service [packets = 3, time = 20] -> exec (count) + - sleep 4 + - packets = 0, ack = 5 + - sleep 20 +- relay service [acks = 5, time = 40] -> exec (time + count) + - sleep 4 + - packets = 0, ack = 0 + - Finished +END_COMMENT + +set -eux + +SCRIPT_DIR=$(cd $(dirname $0); pwd) +RLY_BINARY=${SCRIPT_DIR}/../../../../build/yrly +RLY="${RLY_BINARY} --debug" + +source ${SCRIPT_DIR}/utils + +TM_ADDRESS0=$(${RLY} tendermint keys show ibc0 testkey) +TM_ADDRESS1=$(${RLY} tendermint keys show ibc1 testkey) + +SECONDS=0 +${RLY} service start ibc01 --relay-interval 20s --src-relay-optimize-interval 30s --src-relay-optimize-count 3 --dst-relay-optimize-interval 30s --dst-relay-optimize-count 3 & +RLY_PID=$! +echo "xxxxxxx ${SECONDS} relay service [packets = 0, time = 0] -> skip xxxxxx" +sleep 4 + +# transfer a token +${RLY} tx transfer ibc01 ibc0 ibc1 100samoleans ${TM_ADDRESS1} + +expectUnrelayedCount "unrelayed-packets" "src" 1 +expectUnrelayedCount "unrelayed-acknowledgements" "dst" 0 +sleep 13 + +echo "xxxxxxx ${SECONDS} relay service [packets = 1, time = 20] -> skip xxxxxx" +sleep 4 + +${RLY} tx transfer ibc01 ibc0 ibc1 100samoleans ${TM_ADDRESS1} + +expectUnrelayedCount "unrelayed-packets" "src" 2 +expectUnrelayedCount "unrelayed-acknowledgements" "dst" 0 +sleep 13 + +echo "xxxxxxx ${SECONDS} relay service [packets = 2, time = 40] -> exec (time) xxxxxx" +sleep 4 + +expectUnrelayedCount "unrelayed-packets" "src" 0 +expectUnrelayedCount "unrelayed-acknowledgements" "dst" 2 + +${RLY} tx transfer ibc01 ibc0 ibc1 100samoleans ${TM_ADDRESS1} +${RLY} tx transfer ibc01 ibc0 ibc1 100samoleans ${TM_ADDRESS1} +${RLY} tx transfer ibc01 ibc0 ibc1 100samoleans ${TM_ADDRESS1} + +expectUnrelayedCount "unrelayed-packets" "src" 3 +expectUnrelayedCount "unrelayed-acknowledgements" "dst" 2 +sleep 8 + +echo "xxxxxxx ${SECONDS} relay service [packets = 3, time = 20] -> exec (count) xxxxxx" +sleep 4 + +expectUnrelayedCount "unrelayed-packets" "src" 0 +expectUnrelayedCount "unrelayed-acknowledgements" "dst" 5 +sleep 20 + +echo "xxxxxxx ${SECONDS} relay service [acks = 5, time = 40] -> exec (time + count) xxxxxx" +sleep 4 + +expectUnrelayedCount "unrelayed-packets" "src" 0 +expectUnrelayedCount "unrelayed-acknowledgements" "dst" 0 + +echo "Finished" + +kill $RLY_PID diff --git a/tests/cases/tm2tm/scripts/utils b/tests/cases/tm2tm/scripts/utils new file mode 100644 index 00000000..16645cfc --- /dev/null +++ b/tests/cases/tm2tm/scripts/utils @@ -0,0 +1,15 @@ +#!/usr/bin/env bash + +PATH_NAME=ibc01 + +expectUnrelayedCount() { + query_type=$1 + filter=".$2 | length" + expect_count=$3 + unrelayed_count=$(${RLY} query ${query_type} ${PATH_NAME} | jq "${filter}") + if [ $unrelayed_count -ne ${expect_count} ]; then + echo "$query_type: $unrelayed_count" + kill $RLY_PID + exit 1 + fi +} diff --git a/tests/cases/tmmock2tmmock/Makefile b/tests/cases/tmmock2tmmock/Makefile index d1c56efe..f4697b83 100644 --- a/tests/cases/tmmock2tmmock/Makefile +++ b/tests/cases/tmmock2tmmock/Makefile @@ -13,6 +13,7 @@ test: ./scripts/init-rly ./scripts/handshake ./scripts/test-tx + ./scripts/test-service .PHONY: network-down network-down: diff --git a/tests/cases/tmmock2tmmock/scripts/test-service b/tests/cases/tmmock2tmmock/scripts/test-service new file mode 100755 index 00000000..5b42ff0a --- /dev/null +++ b/tests/cases/tmmock2tmmock/scripts/test-service @@ -0,0 +1,103 @@ +#!/bin/bash + +: <<'END_COMMENT' +* relay-interval = 20s +* src-relay-optimize-interval = 30s +* src-relay-optimize-count = 3 +* dst-relay-optimize-interval = 30s +* dst-relay-optimize-count = 3 + +- relay service [packets = 0, time = 0] -> skip + - sleep 4 + - transfer x 1 + - sleep 3 finality + - packets = 1, ack = 0 + - sleep 10 +- relay service [packets = 1, time = 20] -> skip + - sleep 4 + - transfer x 1 + - sleep 3 finality + - packets = 2, ack = 0 + - sleep 10 +- relay service [packets = 2, time = 40] -> exec (time) + - sleep 4 + - packets = 0, ack = 2 + - transfer x 3 + - sleep 3 finality + - packets = 3, ack = 2 + - sleep 10 +- relay service [packets = 3, time = 20] -> exec (count) + - sleep 4 + - packets = 0, ack = 5 + - sleep 20 +- relay service [acks = 5, time = 40] -> exec (time + count) + - sleep 4 + - packets = 0, ack = 0 + - Finished +END_COMMENT + +set -eux + +SCRIPT_DIR=$(cd $(dirname $0); pwd) +RLY_BINARY=${SCRIPT_DIR}/../../../../build/yrly +RLY="${RLY_BINARY} --debug" + +source ${SCRIPT_DIR}/utils + +TM_ADDRESS0=$(${RLY} tendermint keys show ibc0 testkey) +TM_ADDRESS1=$(${RLY} tendermint keys show ibc1 testkey) + +SECONDS=0 +${RLY} service start ibc01 --relay-interval 20s --src-relay-optimize-interval 30s --src-relay-optimize-count 3 --dst-relay-optimize-interval 30s --dst-relay-optimize-count 3 & +RLY_PID=$! +echo "xxxxxxx ${SECONDS} relay service [packets = 0, time = 0] -> skip xxxxxx" +sleep 4 + +${RLY} tx transfer ibc01 ibc0 ibc1 100samoleans ${TM_ADDRESS1} +sleep 3 # finality_delay + +expectUnrelayedCount "unrelayed-packets" "src" 1 +expectUnrelayedCount "unrelayed-acknowledgements" "dst" 0 +sleep 10 + +echo "xxxxxxx ${SECONDS} relay service [packets = 1, time = 20] -> skip xxxxxx" +sleep 4 + +${RLY} tx transfer ibc01 ibc0 ibc1 100samoleans ${TM_ADDRESS1} +sleep 3 # finality_delay + +expectUnrelayedCount "unrelayed-packets" "src" 2 +expectUnrelayedCount "unrelayed-acknowledgements" "dst" 0 +sleep 10 + +echo "xxxxxxx ${SECONDS} relay service [packets = 2, time = 40] -> exec (time) xxxxxx" +sleep 4 + +expectUnrelayedCount "unrelayed-packets" "src" 0 +expectUnrelayedCount "unrelayed-acknowledgements" "dst" 2 + +${RLY} tx transfer ibc01 ibc0 ibc1 100samoleans ${TM_ADDRESS1} +${RLY} tx transfer ibc01 ibc0 ibc1 100samoleans ${TM_ADDRESS1} +${RLY} tx transfer ibc01 ibc0 ibc1 100samoleans ${TM_ADDRESS1} +sleep 3 # finality_delay + +expectUnrelayedCount "unrelayed-packets" "src" 3 +expectUnrelayedCount "unrelayed-acknowledgements" "dst" 2 +sleep 10 + +echo "xxxxxxx ${SECONDS} relay service [packets = 3, time = 20] -> exec (count) xxxxxx" +sleep 4 + +expectUnrelayedCount "unrelayed-packets" "src" 0 +expectUnrelayedCount "unrelayed-acknowledgements" "dst" 5 +sleep 20 + +echo "xxxxxxx ${SECONDS} relay service [akcs = 3, time = 40] -> exec - time + count xxxxxx" +sleep 4 + +expectUnrelayedCount "unrelayed-packets" "src" 0 +expectUnrelayedCount "unrelayed-acknowledgements" "dst" 0 + +echo "Finished" + +kill $RLY_PID diff --git a/tests/cases/tmmock2tmmock/scripts/test-tx b/tests/cases/tmmock2tmmock/scripts/test-tx index 4e51b18f..271c8d28 100755 --- a/tests/cases/tmmock2tmmock/scripts/test-tx +++ b/tests/cases/tmmock2tmmock/scripts/test-tx @@ -6,6 +6,8 @@ SCRIPT_DIR=$(cd $(dirname $0); pwd) RLY_BINARY=${SCRIPT_DIR}/../../../../build/yrly RLY="${RLY_BINARY} --debug" +source ${SCRIPT_DIR}/utils + TM_ADDRESS0=$(${RLY} tendermint keys show ibc0 testkey) TM_ADDRESS1=$(${RLY} tendermint keys show ibc1 testkey) @@ -20,47 +22,17 @@ echo "Before ibc1 balance: $(${RLY} query balance ibc1 ${TM_ADDRESS1})" # transfer a token (sendPacket) ${RLY} tx transfer ibc01 ibc0 ibc1 100samoleans ${TM_ADDRESS1} -# wait for the finalization of the sendPacket execution -for i in `seq $RETRY_COUNT` -do - echo "[try:$i] waiting for sendPacket finalization ..." - sleep $RETRY_INTERVAL - unrelayed=$(${RLY} query unrelayed-packets ibc01 | jq '.src | length') - if [ $unrelayed -gt 0 ] - then - break - fi -done +waitRelay "unrelayed-packets" "src" # relay the packet (recvPacket) ${RLY} tx relay --do-refresh ibc01 --src-seqs 1 -# wait for the finalization of the recvPacket execution -for i in `seq $RETRY_COUNT` -do - echo "[try:$i] waiting for recvPacket finalization ..." - sleep $RETRY_INTERVAL - unrelayed=$(${RLY} query unrelayed-acknowledgements ibc01 | jq '.dst | length') - if [ $unrelayed -gt 0 ] - then - break - fi -done +waitRelay "unrelayed-acknowledgements" "dst" # relay the ack for the packet (acknowledgePacket) ${RLY} tx acks --do-refresh ibc01 --dst-seqs 1 -# wait for the finalization of the recvPacket execution -for i in `seq $RETRY_COUNT` -do - echo "[try:$i] waiting for acknowledgePacket finalization ..." - sleep $RETRY_INTERVAL - unrelayed=$(${RLY} query unrelayed-acknowledgements ibc01 | jq '.dst | length') - if [ $unrelayed -eq 0 ] - then - break - fi -done +waitRelay "unrelayed-acknowledgements" "dst" echo "After ibc0 balance: $(${RLY} query balance ibc0 ${TM_ADDRESS0})" echo "After ibc1 balance: $(${RLY} query balance ibc1 ${TM_ADDRESS1})" diff --git a/tests/cases/tmmock2tmmock/scripts/utils b/tests/cases/tmmock2tmmock/scripts/utils new file mode 100644 index 00000000..91f4fa10 --- /dev/null +++ b/tests/cases/tmmock2tmmock/scripts/utils @@ -0,0 +1,31 @@ +#!/usr/bin/env bash + +PATH_NAME=ibc01 +RETRY_COUNT=5 +RETRY_INTERVAL=1 + +waitRelay() { + query_type=$1 + filter=".$2 | length" + for i in `seq $RETRY_COUNT` + do + echo "[try:$i] waiting for ${query_type} finalization ..." + sleep $RETRY_INTERVAL + unrelayed=$(${RLY} query ${query_type} ${PATH_NAME} | jq "${filter}") + if [ $unrelayed -gt 0 ]; then + break + fi + done +} + +expectUnrelayedCount() { + query_type=$1 + filter=".$2 | length" + expect_count=$3 + unrelayed_count=$(${RLY} query ${query_type} ${PATH_NAME} | jq "${filter}") + if [ $unrelayed_count -ne ${expect_count} ]; then + echo "$query_type: $unrelayed_count" + kill $RLY_PID + exit 1 + fi +}