Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adaptive queue for staging dials #237

Merged
merged 11 commits into from
Jan 30, 2019
39 changes: 24 additions & 15 deletions dial_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,23 @@ func (dq *dialQueue) control() {
waiting <-chan waitingCh
lastScalingEvt = time.Now()
)

defer func() {
// close channels.
if resp.ch != nil {
close(resp.ch)
}
close(dq.waitingCh)
raulk marked this conversation as resolved.
Show resolved Hide resolved
for w := range dq.waitingCh {
close(w.ch)
}
}()

for {
// First process any backlog of dial jobs and waiters -- making progress is the priority.
// This block is copied below; couldn't find a more concise way of doing this.
select {
case <-dq.ctx.Done():
// close channels.
if resp.ch != nil {
close(resp.ch)
}
for w := range waiting {
close(w.ch)
}
return
case p = <-dialled:
dialled, waiting = nil, dq.waitingCh
Expand All @@ -113,7 +118,8 @@ func (dq *dialQueue) control() {
resp.ch <- p
close(resp.ch)
dialled, waiting = dq.out.DeqChan, nil // stop consuming waiting jobs until we've cleared a peer.
continue // onto the top.
resp.ch = nil
continue // onto the top.
default:
// there's nothing to process, so proceed onto the main select block.
}
Expand All @@ -129,6 +135,7 @@ func (dq *dialQueue) control() {
resp.ch <- p
close(resp.ch)
dialled, waiting = dq.out.DeqChan, nil // stop consuming waiting jobs until we've cleared a peer.
resp.ch = nil
case <-dq.growCh:
if time.Now().Sub(lastScalingEvt) < DialQueueScalingMutePeriod {
continue
Expand Down Expand Up @@ -169,6 +176,10 @@ func (dq *dialQueue) Consume() <-chan peer.ID {

// park the channel until a dialled peer becomes available.
select {
case <-dq.ctx.Done():
// return a closed channel with no value if we're done.
close(ch)
return ch
case dq.waitingCh <- waitingCh{ch, time.Now()}:
raulk marked this conversation as resolved.
Show resolved Hide resolved
default:
panic("detected more consuming goroutines than declared upfront")
Expand Down Expand Up @@ -226,8 +237,7 @@ func (dq *dialQueue) shrink() {
func (dq *dialQueue) worker() {
// This idle timer tracks if the environment is slow. If we're waiting to long to acquire a peer to dial,
// it means that the DHT query is progressing slow and we should shrink the worker pool.
idleTimer := time.NewTimer(0)

idleTimer := time.NewTimer(24 * time.Hour) // placeholder init value which will be overridden immediately.
for {
// trap exit signals first.
select {
Expand All @@ -238,11 +248,10 @@ func (dq *dialQueue) worker() {
default:
}

if !idleTimer.Stop() {
select {
case <-idleTimer.C:
default:
}
idleTimer.Stop()
select {
case <-idleTimer.C:
default:
}
idleTimer.Reset(DialQueueMaxIdle)

Expand Down
6 changes: 3 additions & 3 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,9 @@ func (r *dhtQueryRunner) spawnWorkers(proc process.Process) {
case <-r.rateLimit:
ch := r.peersDialed.Consume()
select {
case p, _ := <-ch:
if p == "" {
// peer is nil; this signals context cancellation.
case p, ok := <-ch:
if !ok {
// this signals context cancellation.
return
}
// do it as a child func to make sure Run exits
Expand Down