Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Chao Xu committed Oct 17, 2019
1 parent bc0d6c6 commit f7858bc
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 54 deletions.
53 changes: 0 additions & 53 deletions http2/client_conn_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@
package http2

import (
"context"
"crypto/tls"
"net/http"
"sync"
"time"
)

// ClientConnPool manages a pool of HTTP/2 client connections.
Expand Down Expand Up @@ -43,16 +41,6 @@ type clientConnPool struct {
dialing map[string]*dialCall // currently in-flight dials
keys map[*ClientConn][]string
addConnCalls map[string]*addConnCall // in-flight addConnIfNeede calls

// TODO: figure out a way to allow user to configure pingPeriod and
// pingTimeout.
pingPeriod time.Duration // how often pings are sent on idle
// connections. The connection will be closed if response is not
// received within pingTimeout. 0 means no periodic pings.
pingTimeout time.Duration // connection will be force closed if a Ping
// response is not received within pingTimeout.
pingStops map[*ClientConn]chan struct{} // channels to stop the
// periodic Pings.
}

func (p *clientConnPool) GetClientConn(req *http.Request, addr string) (*ClientConn, error) {
Expand Down Expand Up @@ -231,54 +219,13 @@ func (p *clientConnPool) addConnLocked(key string, cc *ClientConn) {
if p.keys == nil {
p.keys = make(map[*ClientConn][]string)
}
if p.pingStops == nil {
p.pingStops = make(map[*ClientConn]chan struct{})
}
p.conns[key] = append(p.conns[key], cc)
p.keys[cc] = append(p.keys[cc], key)
if p.pingPeriod != 0 {
p.pingStops[cc] = p.pingConnection(key, cc)
}
}

// TODO: ping all connections at the same tick to save tickers?
func (p *clientConnPool) pingConnection(key string, cc *ClientConn) chan struct{} {
done := make(chan struct{})
go func() {
ticker := time.NewTicker(p.pingPeriod)
defer ticker.Stop()
for {
select {
case <-done:
return
default:
}

select {
case <-done:
return
case <-ticker.C:
ctx, _ := context.WithTimeout(context.Background(), p.pingTimeout)
err := cc.Ping(ctx)
if err != nil {
cc.closeForLostPing()
p.MarkDead(cc)
}
}
}
}()
return done
}

func (p *clientConnPool) MarkDead(cc *ClientConn) {
p.mu.Lock()
defer p.mu.Unlock()

if done, ok := p.pingStops[cc]; ok {
close(done)
delete(p.pingStops, cc)
}

for _, key := range p.keys[cc] {
vv, ok := p.conns[key]
if !ok {
Expand Down
77 changes: 76 additions & 1 deletion http2/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@ type ClientConn struct {

wmu sync.Mutex // held while writing; acquire AFTER mu if holding both
werr error // first write error that has occurred

healthCheckCancel chan struct{}
}

// clientStream is the state for a single HTTP/2 stream. One of these
Expand Down Expand Up @@ -678,6 +680,44 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
return cc, nil
}

func (cc *ClientConn) healthCheck(cancel chan struct{}) {
// TODO: CHAO: configurable
pingPeriod := 15 * time.Second
ticker := time.NewTicker(pingPeriod)
defer ticker.Stop()
for {
select {
case <-cancel:
return
case <-ticker.C:
ctx, _ := context.WithTimeout(context.Background(), p.pingTimeout)
err := cc.Ping(ctx)
if err != nil {
cc.closeForLostPing()
cc.t.connPool().MarkDead(cc)
}
}
}
}

func (cc *ClientConn) startHealthCheck() {
if cc.healthCheckCancel != nil {
// a health check is already running
return
}
cc.healthCheckCancel = make(chan struct{})
go cc.healthCheck(cc.healthCheckCancel)
}

func (cc *ClientConn) stopHealthCheck() {
if cc.healthCheckCancel == nil {
// no health check running
return
}
close(cc.healthCheckCancel)
cc.healthCheckCancel = nil
}

func (cc *ClientConn) setGoAway(f *GoAwayFrame) {
cc.mu.Lock()
defer cc.mu.Unlock()
Expand Down Expand Up @@ -1717,13 +1757,48 @@ func (rl *clientConnReadLoop) cleanup() {
cc.mu.Unlock()
}

func ReadFrameAndProbablyStartOrStopPingLoop() {
select {
case <-timer:
// start ping loop
case <-read:
// stop ping loop
}
}

type frameAndError struct {
f Frame
err error
}

func nonBlockingReadFrame(f *Framer) chan frameAndError {
feCh := make(chan FrameAndError)
go func() {
f, err := fr.ReadFrame()
feCh <- frameAndError{frame: f, err: err}
}()
return feCh
}

func (rl *clientConnReadLoop) run() error {
cc := rl.cc
rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse
gotReply := false // ever saw a HEADERS reply
gotSettings := false
for {
f, err := cc.fr.ReadFrame()
var fe frameAndError
feCh := nonBlockingReadFrame(cc.fr)
// TODO: CHAO: make it configurable
readIdleTimer := time.NewTimer(15 * time.Second)
select {
case fe <- feCh:
cc.stopHealthCheck()
readIdleTimer.Stop()
case <-readIdleTimer.C:
cc.startHealthCheck()
fe <- feCh
}
f, err := fe.f, fe.err
if err != nil {
cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
}
Expand Down

0 comments on commit f7858bc

Please sign in to comment.