forked from libp2p/go-libp2p-pubsub
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbackoff.go
107 lines (90 loc) · 2.33 KB
/
backoff.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package pubsub
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
"github.com/libp2p/go-libp2p-core/peer"
)
const (
MinBackoffDelay = 100 * time.Millisecond
MaxBackoffDelay = 10 * time.Second
TimeToLive = 10 * time.Minute
BackoffCleanupInterval = 1 * time.Minute
BackoffMultiplier = 2
MaxBackoffJitterCoff = 100
MaxBackoffAttempts = 4
)
type backoffHistory struct {
duration time.Duration
lastTried time.Time
attempts int
}
type backoff struct {
mu sync.Mutex
info map[peer.ID]*backoffHistory
ct int // size threshold that kicks off the cleaner
ci time.Duration // cleanup intervals
maxAttempts int // maximum backoff attempts prior to ejection
}
func newBackoff(ctx context.Context, sizeThreshold int, cleanupInterval time.Duration, maxAttempts int) *backoff {
b := &backoff{
mu: sync.Mutex{},
ct: sizeThreshold,
ci: cleanupInterval,
maxAttempts: maxAttempts,
info: make(map[peer.ID]*backoffHistory),
}
rand.Seed(time.Now().UnixNano()) // used for jitter
go b.cleanupLoop(ctx)
return b
}
func (b *backoff) updateAndGet(id peer.ID) (time.Duration, error) {
b.mu.Lock()
defer b.mu.Unlock()
h, ok := b.info[id]
switch {
case !ok || time.Since(h.lastTried) > TimeToLive:
// first request goes immediately.
h = &backoffHistory{
duration: time.Duration(0),
attempts: 0,
}
case h.attempts >= b.maxAttempts:
return 0, fmt.Errorf("peer %s has reached its maximum backoff attempts", id)
case h.duration < MinBackoffDelay:
h.duration = MinBackoffDelay
case h.duration < MaxBackoffDelay:
jitter := rand.Intn(MaxBackoffJitterCoff)
h.duration = (BackoffMultiplier * h.duration) + time.Duration(jitter)*time.Millisecond
if h.duration > MaxBackoffDelay || h.duration < 0 {
h.duration = MaxBackoffDelay
}
}
h.attempts += 1
h.lastTried = time.Now()
b.info[id] = h
return h.duration, nil
}
func (b *backoff) cleanup() {
b.mu.Lock()
defer b.mu.Unlock()
for id, h := range b.info {
if time.Since(h.lastTried) > TimeToLive {
delete(b.info, id)
}
}
}
func (b *backoff) cleanupLoop(ctx context.Context) {
ticker := time.NewTicker(b.ci)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return // pubsub shutting down
case <-ticker.C:
b.cleanup()
}
}
}