From bc37675919d77638c33240a80610699db46fb739 Mon Sep 17 00:00:00 2001 From: Damien Neil Date: Mon, 2 Dec 2024 10:30:06 -0800 Subject: [PATCH] http2: limit number of PINGs bundled with RST_STREAMs gRPC has an unfortunate behavior of stictly rate limiting the number of PING frames that it will receive. The default is two PING frames every two hours when no requests are in flight; two PING frames every five minutes when a request is in flight; and the limit resets every time the gRPC endpoint sends a HEADERS or DATA frame. When sending a RST_STREAM frame, the Transport can bundle a PING frame with it to confirm the server is responding. When canceling several requests in succession, this can result in hitting the gRPC ping limit. Work around this gRPC behavior by sending at most one bundled PING per HEADERS or DATA frame received. We already limit ourselves to one PING in flight at a time; now, when we receive a PING response, disable sending additional bundled PINGs until we read a HEADERS/DATA frame. This does not affect keep-alive pings. Fixes golang/go#70575. Change-Id: I7c4003039bd2dc52106b2806ca31eeeee37b7e09 Reviewed-on: https://go-review.googlesource.com/c/net/+/632995 Reviewed-by: Jonathan Amsterdam Auto-Submit: Damien Neil LUCI-TryBot-Result: Go LUCI --- http2/transport.go | 46 ++++++++++++++++++++------ http2/transport_test.go | 71 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 108 insertions(+), 9 deletions(-) diff --git a/http2/transport.go b/http2/transport.go index 7fcfa89e0..090d0e1bd 100644 --- a/http2/transport.go +++ b/http2/transport.go @@ -399,6 +399,16 @@ type ClientConn struct { pingTimeout time.Duration extendedConnectAllowed bool + // rstStreamPingsBlocked works around an unfortunate gRPC behavior. + // gRPC strictly limits the number of PING frames that it will receive. + // The default is two pings per two hours, but the limit resets every time + // the gRPC endpoint sends a HEADERS or DATA frame. See golang/go#70575. + // + // rstStreamPingsBlocked is set after receiving a response to a PING frame + // bundled with an RST_STREAM (see pendingResets below), and cleared after + // receiving a HEADERS or DATA frame. + rstStreamPingsBlocked bool + // pendingResets is the number of RST_STREAM frames we have sent to the peer, // without confirming that the peer has received them. When we send a RST_STREAM, // we bundle it with a PING frame, unless a PING is already in flight. We count @@ -1738,10 +1748,14 @@ func (cs *clientStream) cleanupWriteRequest(err error) { ping := false if !closeOnIdle { cc.mu.Lock() - if cc.pendingResets == 0 { - ping = true + // rstStreamPingsBlocked works around a gRPC behavior: + // see comment on the field for details. + if !cc.rstStreamPingsBlocked { + if cc.pendingResets == 0 { + ping = true + } + cc.pendingResets++ } - cc.pendingResets++ cc.mu.Unlock() } cc.writeStreamReset(cs.ID, ErrCodeCancel, ping, err) @@ -2489,7 +2503,7 @@ func (rl *clientConnReadLoop) run() error { cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err) } if se, ok := err.(StreamError); ok { - if cs := rl.streamByID(se.StreamID); cs != nil { + if cs := rl.streamByID(se.StreamID, notHeaderOrDataFrame); cs != nil { if se.Cause == nil { se.Cause = cc.fr.errDetail } @@ -2544,7 +2558,7 @@ func (rl *clientConnReadLoop) run() error { } func (rl *clientConnReadLoop) processHeaders(f *MetaHeadersFrame) error { - cs := rl.streamByID(f.StreamID) + cs := rl.streamByID(f.StreamID, headerOrDataFrame) if cs == nil { // We'd get here if we canceled a request while the // server had its response still in flight. So if this @@ -2873,7 +2887,7 @@ func (b transportResponseBody) Close() error { func (rl *clientConnReadLoop) processData(f *DataFrame) error { cc := rl.cc - cs := rl.streamByID(f.StreamID) + cs := rl.streamByID(f.StreamID, headerOrDataFrame) data := f.Data() if cs == nil { cc.mu.Lock() @@ -3008,9 +3022,22 @@ func (rl *clientConnReadLoop) endStreamError(cs *clientStream, err error) { cs.abortStream(err) } -func (rl *clientConnReadLoop) streamByID(id uint32) *clientStream { +// Constants passed to streamByID for documentation purposes. +const ( + headerOrDataFrame = true + notHeaderOrDataFrame = false +) + +// streamByID returns the stream with the given id, or nil if no stream has that id. +// If headerOrData is true, it clears rst.StreamPingsBlocked. +func (rl *clientConnReadLoop) streamByID(id uint32, headerOrData bool) *clientStream { rl.cc.mu.Lock() defer rl.cc.mu.Unlock() + if headerOrData { + // Work around an unfortunate gRPC behavior. + // See comment on ClientConn.rstStreamPingsBlocked for details. + rl.cc.rstStreamPingsBlocked = false + } cs := rl.cc.streams[id] if cs != nil && !cs.readAborted { return cs @@ -3145,7 +3172,7 @@ func (rl *clientConnReadLoop) processSettingsNoWrite(f *SettingsFrame) error { func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error { cc := rl.cc - cs := rl.streamByID(f.StreamID) + cs := rl.streamByID(f.StreamID, notHeaderOrDataFrame) if f.StreamID != 0 && cs == nil { return nil } @@ -3174,7 +3201,7 @@ func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error { } func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame) error { - cs := rl.streamByID(f.StreamID) + cs := rl.streamByID(f.StreamID, notHeaderOrDataFrame) if cs == nil { // TODO: return error if server tries to RST_STREAM an idle stream return nil @@ -3252,6 +3279,7 @@ func (rl *clientConnReadLoop) processPing(f *PingFrame) error { if cc.pendingResets > 0 { // See clientStream.cleanupWriteRequest. cc.pendingResets = 0 + cc.rstStreamPingsBlocked = true cc.cond.Broadcast() } return nil diff --git a/http2/transport_test.go b/http2/transport_test.go index 746f6e3ee..0e12e0f1c 100644 --- a/http2/transport_test.go +++ b/http2/transport_test.go @@ -5562,6 +5562,10 @@ func TestTransportSendPingWithReset(t *testing.T) { tc.wantFrameType(FrameHeaders) tc.wantIdle() + // Receive a byte of data for the remaining stream, which resets our ability + // to send pings (see comment on ClientConn.rstStreamPingsBlocked). + tc.writeData(rts[2].streamID(), false, []byte{0}) + // Cancel the last request. We send another PING, since none are in flight. rts[2].response().Body.Close() tc.wantRSTStream(rts[2].streamID(), ErrCodeCancel) @@ -5569,6 +5573,73 @@ func TestTransportSendPingWithReset(t *testing.T) { tc.wantIdle() } +// Issue #70505: gRPC gets upset if we send more than 2 pings per HEADERS/DATA frame +// sent by the server. +func TestTransportSendNoMoreThanOnePingWithReset(t *testing.T) { + tc := newTestClientConn(t) + tc.greet() + + makeAndResetRequest := func() { + t.Helper() + ctx, cancel := context.WithCancel(context.Background()) + req := must(http.NewRequestWithContext(ctx, "GET", "https://dummy.tld/", nil)) + rt := tc.roundTrip(req) + tc.wantFrameType(FrameHeaders) + cancel() + tc.wantRSTStream(rt.streamID(), ErrCodeCancel) // client sends RST_STREAM + } + + // Create a request and cancel it. + // The client sends a PING frame along with the reset. + makeAndResetRequest() + pf1 := readFrame[*PingFrame](t, tc) // client sends PING + + // Create another request and cancel it. + // We do not send a PING frame along with the reset, + // because we haven't received a HEADERS or DATA frame from the server + // since the last PING we sent. + makeAndResetRequest() + + // Server belatedly responds to request 1. + // The server has not responded to our first PING yet. + tc.writeHeaders(HeadersFrameParam{ + StreamID: 1, + EndHeaders: true, + EndStream: true, + BlockFragment: tc.makeHeaderBlockFragment( + ":status", "200", + ), + }) + + // Create yet another request and cancel it. + // We still do not send a PING frame along with the reset. + // We've received a HEADERS frame, but it came before the response to the PING. + makeAndResetRequest() + + // The server responds to our PING. + tc.writePing(true, pf1.Data) + + // Create yet another request and cancel it. + // Still no PING frame; we got a response to the previous one, + // but no HEADERS or DATA. + makeAndResetRequest() + + // Server belatedly responds to the second request. + tc.writeHeaders(HeadersFrameParam{ + StreamID: 3, + EndHeaders: true, + EndStream: true, + BlockFragment: tc.makeHeaderBlockFragment( + ":status", "200", + ), + }) + + // One more request. + // This time we send a PING frame. + makeAndResetRequest() + tc.wantFrameType(FramePing) +} + func TestTransportConnBecomesUnresponsive(t *testing.T) { // We send a number of requests in series to an unresponsive connection. // Each request is canceled or times out without a response.