Skip to content

Commit

Permalink
Refactoring shouldExecuteRelay
Browse files Browse the repository at this point in the history
Signed-off-by: Dongri Jin <[email protected]>
  • Loading branch information
dongrie committed Dec 5, 2023
1 parent 8e5b121 commit b3cd4e7
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 36 deletions.
4 changes: 2 additions & 2 deletions cmd/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ func startCmd(ctx *config.Context) *cobra.Command {
cmd.Flags().Duration(flagRelayInterval, defaultRelayInterval, "time interval to perform relays")
cmd.Flags().String(flagPrometheusAddr, defaultPrometheusAddr, "host address to which the prometheus exporter listens")
cmd.Flags().Duration(flagSrcRelayOptimizeInterval, defaultRelayOptimizeInterval, "maximum time interval to delay relays for optimization")
cmd.Flags().Int64(flagSrcRelayOptimizeCount, defaultRelayOptimizeCount, "maximum number of relays to delay for optimization")
cmd.Flags().Uint64(flagSrcRelayOptimizeCount, defaultRelayOptimizeCount, "maximum number of relays to delay for optimization")
cmd.Flags().Duration(flagDstRelayOptimizeInterval, defaultRelayOptimizeInterval, "maximum time interval to delay relays for optimization")
cmd.Flags().Int64(flagDstRelayOptimizeCount, defaultRelayOptimizeCount, "maximum number of relays to delay for optimization")
cmd.Flags().Uint64(flagDstRelayOptimizeCount, defaultRelayOptimizeCount, "maximum number of relays to delay for optimization")
return cmd
}
56 changes: 22 additions & 34 deletions core/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,24 +157,8 @@ func (srv *RelayService) Serve(ctx context.Context) error {

msgs := NewRelayMsgs()

// reset watch start time for packets
if len(pseqs.Dst) > 0 {
resetWatchStartTime(&srv.optimizeRelay.srcRelayPacketStartTime)
}
if len(pseqs.Src) > 0 {
resetWatchStartTime(&srv.optimizeRelay.dstRelayPacketStartTime)
}

// reset watch start time for acks
if len(aseqs.Dst) > 0 {
resetWatchStartTime(&srv.optimizeRelay.srcRelayAckStartTime)
}
if len(aseqs.Src) > 0 {
resetWatchStartTime(&srv.optimizeRelay.dstRelayAckStartTime)
}

doExecuteRelaySrc, doExecuteRelayDst := srv.shouldExecuteRelay(pseqs, srv.optimizeRelay.srcRelayPacketStartTime, srv.optimizeRelay.dstRelayPacketStartTime)
doExecuteAckSrc, doExecuteAckDst := srv.shouldExecuteRelay(aseqs, srv.optimizeRelay.srcRelayAckStartTime, srv.optimizeRelay.dstRelayAckStartTime)
doExecuteRelaySrc, doExecuteRelayDst := srv.shouldExecuteRelay(pseqs, &srv.optimizeRelay.srcRelayPacketStartTime, &srv.optimizeRelay.dstRelayPacketStartTime)
doExecuteAckSrc, doExecuteAckDst := srv.shouldExecuteRelay(aseqs, &srv.optimizeRelay.srcRelayAckStartTime, &srv.optimizeRelay.dstRelayAckStartTime)

// update clients
if m, err := srv.st.UpdateClients(srv.src, srv.dst, doExecuteRelaySrc, doExecuteRelayDst, doExecuteAckSrc, doExecuteAckDst, srv.sh, true); err != nil {
Expand All @@ -192,34 +176,30 @@ func (srv *RelayService) Serve(ctx context.Context) error {
msgs.Merge(m)
}

if doExecuteRelaySrc {
srv.optimizeRelay.srcRelayPacketStartTime.AlreadySet = false
}
if doExecuteRelayDst {
srv.optimizeRelay.dstRelayPacketStartTime.AlreadySet = false
}

// relay acks if unrelayed seqs exist
if m, err := srv.st.RelayAcknowledgements(srv.src, srv.dst, aseqs, srv.sh, doExecuteAckSrc, doExecuteAckDst); err != nil {
logger.Error("failed to relay acknowledgements", err)
return err
} else {
msgs.Merge(m)
}
if doExecuteAckSrc {
srv.optimizeRelay.srcRelayAckStartTime.AlreadySet = false
}
if doExecuteAckDst {
srv.optimizeRelay.dstRelayAckStartTime.AlreadySet = false
}

// send all msgs to src/dst chains
srv.st.Send(srv.src, srv.dst, msgs)

return nil
}

func (srv *RelayService) shouldExecuteRelay(seqs *RelayPackets, srcRelayStartTime, dstRelayStartTime WatchStartTime) (bool, bool) {
func (srv *RelayService) shouldExecuteRelay(seqs *RelayPackets, srcStartTime, dstStartTime *WatchStartTime) (bool, bool) {

// reset watch start time for packets
if len(seqs.Dst) > 0 {
resetWatchStartTime(srcStartTime)
}
if len(seqs.Src) > 0 {
resetWatchStartTime(dstStartTime)
}

srcRelay := false
dstRelay := false

Expand All @@ -234,13 +214,21 @@ func (srv *RelayService) shouldExecuteRelay(seqs *RelayPackets, srcRelayStartTim
}

// time interval
if srcRelayStartTime.AlreadySet && time.Since(srcRelayStartTime.StartTime) >= srv.optimizeRelay.srcOptimizeInterval {
if srcStartTime.AlreadySet && time.Since(srcStartTime.StartTime) >= srv.optimizeRelay.srcOptimizeInterval {
srcRelay = true
}
if dstRelayStartTime.AlreadySet && time.Since(dstRelayStartTime.StartTime) >= srv.optimizeRelay.dstOptimizeInterval {
if dstStartTime.AlreadySet && time.Since(dstStartTime.StartTime) >= srv.optimizeRelay.dstOptimizeInterval {
dstRelay = true
}

// set already set to false if relay is executed
if srcRelay {
srcStartTime.AlreadySet = false
}
if dstRelay {
dstStartTime.AlreadySet = false
}

return srcRelay, dstRelay
}

Expand Down

0 comments on commit b3cd4e7

Please sign in to comment.