diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index c340a1c6..92bd784a 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -877,33 +877,59 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error { } if lifetimeMillis > 0 { - // Lifetime: we could have written our request instantaenously, - // the broker calculating our session lifetime, and then the - // broker / network hung for a bit when writing. We - // pessimistically assume this worst case and take off the - // final request e2e latency x1.1 from the lifetime. + // Lifetime is problematic. We need to be a bit pessimistic. // - // If the latency is <2.5s, we also pessimistically assume that - // things may take 2.5s in the future. + // We want a lowerbound: we use 2s (arbitrary), but if 1.1x our + // e2e sasl latency is more than 2s, we use the latency. // - // We may make our lifetime <0; brokers should use longer - // lifetimes, but some do not in all cases. If our lifetime is - // <100ms, we sleep for 100ms just to ensure we do not - // spin-loop reauthenticating *too* much. - latency := int64(float64(time.Since(prereq).Milliseconds()) * 1.1) - if latency < 2500 { - latency = 2500 + // We do not want to reauthenticate too close to the lifetime + // especially for larger lifetimes due to clock issues (#205). + // We take 95% to 98% of the lifetime. + minPessimismMillis := float64(2 * time.Second.Milliseconds()) + latencyMillis := 1.1 * float64(time.Since(prereq).Milliseconds()) + if latencyMillis > minPessimismMillis { + minPessimismMillis = latencyMillis } + maxPessimismMillis := float64(lifetimeMillis) * (0.05 - 0.03*cxn.b.cl.rng()) // 95 to 98% of lifetime (pessimism 2% to 5%) - useLifetime := lifetimeMillis - latency + // Our minimum lifetime is always 2s (or latency, if larger). + // + // If rng is 0, we begin using max lifetime at 40s: + // + // maxLifetime = 40s - (40s * 0.05) = 38s + // minLifetime = 40s - 2s = 38s + // + // If rng is 1, we begin using max lifetime at 25s: + // + // maxLifetime = 25s - (25s * 0.08) = 23s + // minLifetime = 25s - 2s = 23s + // + // Every second after, we add between 0.05s or 0.08s to our + // backoff: + // + // rng@0: maxLifetime = 41s - (41s * 0.05) = 38.95 + // rng@1: maxLifetime = 26s - (26s * 0.08) = 23.92 + // + // At 12hr, we reauth ~24 to 28min before the lifetime. + usePessimismMillis := maxPessimismMillis + if minPessimismMillis > maxPessimismMillis { + usePessimismMillis = minPessimismMillis + } + useLifetimeMillis := lifetimeMillis - int64(usePessimismMillis) + + // If our lifetime is <0 (broker said our lifetime is less than + // our client picked min), we sleep for 100ms and retry. + // Brokers should give us longer lifetimes, but that may not + // always happen (see #136). We sleep to avoid spin loop + // reauthenticating. now := time.Now() - cxn.expiry = now.Add(time.Duration(useLifetime) * time.Millisecond) + cxn.expiry = now.Add(time.Duration(useLifetimeMillis) * time.Millisecond) cxn.cl.cfg.logger.Log(LogLevelDebug, "sasl has a limited lifetime", "broker", logID(cxn.b.meta.NodeID), "reauthenticate_in", cxn.expiry.Sub(now)) - if useLifetime < 0 { - cxn.cl.cfg.logger.Log(LogLevelInfo, "sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop", + if useLifetimeMillis < 0 { + cxn.cl.cfg.logger.Log(LogLevelInfo, "sasl lifetime minus lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop", "broker", logID(cxn.b.meta.NodeID), "session_lifetime", time.Duration(lifetimeMillis)*time.Millisecond, - "latency_lower_bound", time.Duration(latency)*time.Millisecond, + "latency_lower_bound", time.Duration(latencyMillis)*time.Millisecond, ) time.Sleep(100 * time.Millisecond) } diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 624ac61a..90159a3b 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -36,7 +36,7 @@ type Client struct { ctx context.Context ctxCancel func() - rng *rand.Rand + rng func() float64 brokersMu sync.RWMutex brokers []*broker // ordered by broker ID @@ -201,7 +201,16 @@ func NewClient(opts ...Opt) (*Client, error) { cfg: cfg, ctx: ctx, ctxCancel: cancel, - rng: rand.New(rand.NewSource(time.Now().UnixNano())), + + rng: func() func() float64 { + var mu sync.Mutex + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + return func() float64 { + mu.Lock() + defer mu.Unlock() + return rng.Float64() + } + }(), controllerID: unknownControllerID,