Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better push channel monitor logging #133

Merged
merged 1 commit into from
Jan 18, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 31 additions & 28 deletions pushchannelmonitor/pushchannelmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,9 @@ func (m *Monitor) run() {
ticker := time.NewTicker(tickInterval)
defer ticker.Stop()

log.Infof("Starting push channel monitor with %d checks per %s interval (check interval %s)",
m.cfg.ChecksPerInterval, m.cfg.Interval, tickInterval)
log.Infof("Starting push channel monitor with "+
"%d checks per %s interval (check interval %s); min bytes per interval: %d, restart backoff: %s; max consecutive restarts: %d",
m.cfg.ChecksPerInterval, m.cfg.Interval, tickInterval, m.cfg.MinBytesSent, m.cfg.RestartBackoff, m.cfg.MaxConsecutiveRestarts)

for {
select {
Expand Down Expand Up @@ -171,8 +172,8 @@ type monitoredChannel struct {
dataRatePoints chan *dataRatePoint
consecutiveRestarts int

restartLk sync.RWMutex
restarting bool
restartLk sync.RWMutex
restartedAt time.Time
}

func newMonitoredChannel(
Expand Down Expand Up @@ -248,8 +249,6 @@ func (mc *monitoredChannel) checkDataRate() {
mc.statsLk.Lock()
defer mc.statsLk.Unlock()

log.Debugf("%s: check data rate", mc.chid)

// Before returning, add the current data rate stats to the queue
defer func() {
var pending uint64
Expand All @@ -264,6 +263,9 @@ func (mc *monitoredChannel) checkDataRate() {

// Check that there are enough data points that an interval has elapsed
if len(mc.dataRatePoints) < int(mc.cfg.ChecksPerInterval) {
log.Debugf("%s: not enough data points to check data rate yet (%d / %d)",
mc.chid, len(mc.dataRatePoints), mc.cfg.ChecksPerInterval)

return
}

Expand All @@ -274,25 +276,25 @@ func (mc *monitoredChannel) checkDataRate() {
// and the amount sent was lower than the minimum required, restart the
// channel
sentInInterval := mc.sent - atIntervalStart.sent
log.Debugf("%s: %d bytes sent since last check", mc.chid, sentInInterval)
log.Debugf("%s: since last check: sent: %d - %d = %d, pending: %d, required %d",
mc.chid, mc.sent, atIntervalStart.sent, sentInInterval, atIntervalStart.pending, mc.cfg.MinBytesSent)
if atIntervalStart.pending > sentInInterval && sentInInterval < mc.cfg.MinBytesSent {
go mc.restartChannel()
}
}

func (mc *monitoredChannel) restartChannel() {
log.Debugf("%s: restart channel", mc.chid)

// Check if the channel is already being restarted
mc.restartLk.Lock()
alreadyRestarting := mc.restarting
if !alreadyRestarting {
mc.restarting = true
restartedAt := mc.restartedAt
if restartedAt.IsZero() {
mc.restartedAt = time.Now()
}
mc.restartLk.Unlock()

if alreadyRestarting {
log.Debugf("%s: already restarting, bailing out", mc.chid)
if !restartedAt.IsZero() {
log.Debugf("%s: restart called but already restarting channel (for %s so far; restart backoff is %s)",
mc.chid, time.Since(mc.restartedAt), mc.cfg.RestartBackoff)
return
}

Expand All @@ -310,20 +312,6 @@ func (mc *monitoredChannel) restartChannel() {
return
}

defer func() {
if mc.cfg.RestartBackoff > 0 {
// Backoff a little time after a restart before attempting another
select {
case <-time.After(mc.cfg.RestartBackoff):
case <-mc.ctx.Done():
}
}

mc.restartLk.Lock()
mc.restarting = false
mc.restartLk.Unlock()
}()

// Send a restart message for the channel.
// Note that at the networking layer there is logic to retry if a network
// connection cannot be established, so this may take some time.
Expand All @@ -334,7 +322,22 @@ func (mc *monitoredChannel) restartChannel() {
// and shut down the monitor
log.Errorf("%s: closing push data transfer channel after failing to send restart message: %s", mc.chid, err)
mc.closeChannelAndShutdown()
} else if mc.cfg.RestartBackoff > 0 {
log.Infof("%s: restart message sent successfully, backing off %s before allowing any other restarts",
mc.chid, mc.cfg.RestartBackoff)
// Backoff a little time after a restart before attempting another
select {
case <-time.After(mc.cfg.RestartBackoff):
case <-mc.ctx.Done():
}

log.Debugf("%s: restart back-off %s complete",
mc.chid, mc.cfg.RestartBackoff)
}

mc.restartLk.Lock()
mc.restartedAt = time.Time{}
mc.restartLk.Unlock()
}

func (mc *monitoredChannel) closeChannelAndShutdown() {
Expand Down