From f7858bc5242f0003f0128f254ae843c5466a2b39 Mon Sep 17 00:00:00 2001 From: Chao Xu Date: Thu, 17 Oct 2019 16:58:55 -0700 Subject: [PATCH] fix --- http2/client_conn_pool.go | 53 --------------------------- http2/transport.go | 77 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 76 insertions(+), 54 deletions(-) diff --git a/http2/client_conn_pool.go b/http2/client_conn_pool.go index 2bb701897..f4d9b5ece 100644 --- a/http2/client_conn_pool.go +++ b/http2/client_conn_pool.go @@ -7,11 +7,9 @@ package http2 import ( - "context" "crypto/tls" "net/http" "sync" - "time" ) // ClientConnPool manages a pool of HTTP/2 client connections. @@ -43,16 +41,6 @@ type clientConnPool struct { dialing map[string]*dialCall // currently in-flight dials keys map[*ClientConn][]string addConnCalls map[string]*addConnCall // in-flight addConnIfNeede calls - - // TODO: figure out a way to allow user to configure pingPeriod and - // pingTimeout. - pingPeriod time.Duration // how often pings are sent on idle - // connections. The connection will be closed if response is not - // received within pingTimeout. 0 means no periodic pings. - pingTimeout time.Duration // connection will be force closed if a Ping - // response is not received within pingTimeout. - pingStops map[*ClientConn]chan struct{} // channels to stop the - // periodic Pings. } func (p *clientConnPool) GetClientConn(req *http.Request, addr string) (*ClientConn, error) { @@ -231,54 +219,13 @@ func (p *clientConnPool) addConnLocked(key string, cc *ClientConn) { if p.keys == nil { p.keys = make(map[*ClientConn][]string) } - if p.pingStops == nil { - p.pingStops = make(map[*ClientConn]chan struct{}) - } p.conns[key] = append(p.conns[key], cc) p.keys[cc] = append(p.keys[cc], key) - if p.pingPeriod != 0 { - p.pingStops[cc] = p.pingConnection(key, cc) - } -} - -// TODO: ping all connections at the same tick to save tickers? -func (p *clientConnPool) pingConnection(key string, cc *ClientConn) chan struct{} { - done := make(chan struct{}) - go func() { - ticker := time.NewTicker(p.pingPeriod) - defer ticker.Stop() - for { - select { - case <-done: - return - default: - } - - select { - case <-done: - return - case <-ticker.C: - ctx, _ := context.WithTimeout(context.Background(), p.pingTimeout) - err := cc.Ping(ctx) - if err != nil { - cc.closeForLostPing() - p.MarkDead(cc) - } - } - } - }() - return done } func (p *clientConnPool) MarkDead(cc *ClientConn) { p.mu.Lock() defer p.mu.Unlock() - - if done, ok := p.pingStops[cc]; ok { - close(done) - delete(p.pingStops, cc) - } - for _, key := range p.keys[cc] { vv, ok := p.conns[key] if !ok { diff --git a/http2/transport.go b/http2/transport.go index a47ab780b..46e169fe4 100644 --- a/http2/transport.go +++ b/http2/transport.go @@ -243,6 +243,8 @@ type ClientConn struct { wmu sync.Mutex // held while writing; acquire AFTER mu if holding both werr error // first write error that has occurred + + healthCheckCancel chan struct{} } // clientStream is the state for a single HTTP/2 stream. One of these @@ -678,6 +680,44 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro return cc, nil } +func (cc *ClientConn) healthCheck(cancel chan struct{}) { + // TODO: CHAO: configurable + pingPeriod := 15 * time.Second + ticker := time.NewTicker(pingPeriod) + defer ticker.Stop() + for { + select { + case <-cancel: + return + case <-ticker.C: + ctx, _ := context.WithTimeout(context.Background(), p.pingTimeout) + err := cc.Ping(ctx) + if err != nil { + cc.closeForLostPing() + cc.t.connPool().MarkDead(cc) + } + } + } +} + +func (cc *ClientConn) startHealthCheck() { + if cc.healthCheckCancel != nil { + // a health check is already running + return + } + cc.healthCheckCancel = make(chan struct{}) + go cc.healthCheck(cc.healthCheckCancel) +} + +func (cc *ClientConn) stopHealthCheck() { + if cc.healthCheckCancel == nil { + // no health check running + return + } + close(cc.healthCheckCancel) + cc.healthCheckCancel = nil +} + func (cc *ClientConn) setGoAway(f *GoAwayFrame) { cc.mu.Lock() defer cc.mu.Unlock() @@ -1717,13 +1757,48 @@ func (rl *clientConnReadLoop) cleanup() { cc.mu.Unlock() } +func ReadFrameAndProbablyStartOrStopPingLoop() { + select { + case <-timer: + // start ping loop + case <-read: + // stop ping loop + } +} + +type frameAndError struct { + f Frame + err error +} + +func nonBlockingReadFrame(f *Framer) chan frameAndError { + feCh := make(chan FrameAndError) + go func() { + f, err := fr.ReadFrame() + feCh <- frameAndError{frame: f, err: err} + }() + return feCh +} + func (rl *clientConnReadLoop) run() error { cc := rl.cc rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse gotReply := false // ever saw a HEADERS reply gotSettings := false for { - f, err := cc.fr.ReadFrame() + var fe frameAndError + feCh := nonBlockingReadFrame(cc.fr) + // TODO: CHAO: make it configurable + readIdleTimer := time.NewTimer(15 * time.Second) + select { + case fe <- feCh: + cc.stopHealthCheck() + readIdleTimer.Stop() + case <-readIdleTimer.C: + cc.startHealthCheck() + fe <- feCh + } + f, err := fe.f, fe.err if err != nil { cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err) }