Skip to content

Commit

Permalink
kgo sasl reauth: be more pessimistic
Browse files Browse the repository at this point in the history
AWS is saying that we have 12hr of auth lifetime, and then throwing
authorization errors at ~11hr50min.

We will be more pessimistic and use only 95 to 98% of the lifetime.
This is similar to the Java client, which uses 85 to 95%.

Closes #205.
  • Loading branch information
twmb committed Oct 23, 2022
1 parent 47eccba commit eb6e3b5
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 21 deletions.
64 changes: 45 additions & 19 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,33 +877,59 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error {
}

if lifetimeMillis > 0 {
// Lifetime: we could have written our request instantaenously,
// the broker calculating our session lifetime, and then the
// broker / network hung for a bit when writing. We
// pessimistically assume this worst case and take off the
// final request e2e latency x1.1 from the lifetime.
// Lifetime is problematic. We need to be a bit pessimistic.
//
// If the latency is <2.5s, we also pessimistically assume that
// things may take 2.5s in the future.
// We want a lowerbound: we use 2s (arbitrary), but if 1.1x our
// e2e sasl latency is more than 2s, we use the latency.
//
// We may make our lifetime <0; brokers should use longer
// lifetimes, but some do not in all cases. If our lifetime is
// <100ms, we sleep for 100ms just to ensure we do not
// spin-loop reauthenticating *too* much.
latency := int64(float64(time.Since(prereq).Milliseconds()) * 1.1)
if latency < 2500 {
latency = 2500
// We do not want to reauthenticate too close to the lifetime
// especially for larger lifetimes due to clock issues (#205).
// We take 95% to 98% of the lifetime.
minPessimismMillis := float64(2 * time.Second.Milliseconds())
latencyMillis := 1.1 * float64(time.Since(prereq).Milliseconds())
if latencyMillis > minPessimismMillis {
minPessimismMillis = latencyMillis
}
maxPessimismMillis := float64(lifetimeMillis) * (0.05 - 0.03*cxn.b.cl.rng()) // 95 to 98% of lifetime (pessimism 2% to 5%)

useLifetime := lifetimeMillis - latency
// Our minimum lifetime is always 2s (or latency, if larger).
//
// If rng is 0, we begin using max lifetime at 40s:
//
// maxLifetime = 40s - (40s * 0.05) = 38s
// minLifetime = 40s - 2s = 38s
//
// If rng is 1, we begin using max lifetime at 25s:
//
// maxLifetime = 25s - (25s * 0.08) = 23s
// minLifetime = 25s - 2s = 23s
//
// Every second after, we add between 0.05s or 0.08s to our
// backoff:
//
// rng@0: maxLifetime = 41s - (41s * 0.05) = 38.95
// rng@1: maxLifetime = 26s - (26s * 0.08) = 23.92
//
// At 12hr, we reauth ~24 to 28min before the lifetime.
usePessimismMillis := maxPessimismMillis
if minPessimismMillis > maxPessimismMillis {
usePessimismMillis = minPessimismMillis
}
useLifetimeMillis := lifetimeMillis - int64(usePessimismMillis)

// If our lifetime is <0 (broker said our lifetime is less than
// our client picked min), we sleep for 100ms and retry.
// Brokers should give us longer lifetimes, but that may not
// always happen (see #136). We sleep to avoid spin loop
// reauthenticating.
now := time.Now()
cxn.expiry = now.Add(time.Duration(useLifetime) * time.Millisecond)
cxn.expiry = now.Add(time.Duration(useLifetimeMillis) * time.Millisecond)
cxn.cl.cfg.logger.Log(LogLevelDebug, "sasl has a limited lifetime", "broker", logID(cxn.b.meta.NodeID), "reauthenticate_in", cxn.expiry.Sub(now))
if useLifetime < 0 {
cxn.cl.cfg.logger.Log(LogLevelInfo, "sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop",
if useLifetimeMillis < 0 {
cxn.cl.cfg.logger.Log(LogLevelInfo, "sasl lifetime minus lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop",
"broker", logID(cxn.b.meta.NodeID),
"session_lifetime", time.Duration(lifetimeMillis)*time.Millisecond,
"latency_lower_bound", time.Duration(latency)*time.Millisecond,
"latency_lower_bound", time.Duration(latencyMillis)*time.Millisecond,
)
time.Sleep(100 * time.Millisecond)
}
Expand Down
13 changes: 11 additions & 2 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Client struct {
ctx context.Context
ctxCancel func()

rng *rand.Rand
rng func() float64

brokersMu sync.RWMutex
brokers []*broker // ordered by broker ID
Expand Down Expand Up @@ -201,7 +201,16 @@ func NewClient(opts ...Opt) (*Client, error) {
cfg: cfg,
ctx: ctx,
ctxCancel: cancel,
rng: rand.New(rand.NewSource(time.Now().UnixNano())),

rng: func() func() float64 {
var mu sync.Mutex
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
return func() float64 {
mu.Lock()
defer mu.Unlock()
return rng.Float64()
}
}(),

controllerID: unknownControllerID,

Expand Down

0 comments on commit eb6e3b5

Please sign in to comment.