Skip to content

Commit

Permalink
BDP estimation and window update. (#1310)
Browse files Browse the repository at this point in the history
  • Loading branch information
MakMukhi authored Jul 11, 2017
1 parent 93166a0 commit d69dedd
Show file tree
Hide file tree
Showing 5 changed files with 319 additions and 71 deletions.
125 changes: 125 additions & 0 deletions transport/bdp_estimator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package transport

import (
"sync"
"time"
)

const (
// bdpLimit is the maximum value the flow control windows
// will be increased to.
bdpLimit = (1 << 20) * 4
// alpha is a constant factor used to keep a moving average
// of RTTs.
alpha = 0.9
// If the current bdp sample is greater than or equal to
// our beta * our estimated bdp and the current bandwidth
// sample is the maximum bandwidth observed so far, we
// increase our bbp estimate by a factor of gamma.
beta = 0.66
// To put our bdp to be smaller than or equal to twice the real BDP,
// we should multiply our current sample with 4/3, however to round things out
// we use 2 as the multiplication factor.
gamma = 2
)

var (
// Adding arbitrary data to ping so that its ack can be
// identified.
// Easter-egg: what does the ping message say?
bdpPing = &ping{data: [8]byte{2, 4, 16, 16, 9, 14, 7, 7}}
)

type bdpEstimator struct {
// sentAt is the time when the ping was sent.
sentAt time.Time

mu sync.Mutex
// bdp is the current bdp estimate.
bdp uint32
// sample is the number of bytes received in one measurement cycle.
sample uint32
// bwMax is the maximum bandwidth noted so far (bytes/sec).
bwMax float64
// bool to keep track of the begining of a new measurement cycle.
isSent bool
// Callback to update the window sizes.
updateFlowControl func(n uint32)
// sampleCount is the number of samples taken so far.
sampleCount uint64
// round trip time (seconds)
rtt float64
}

// timesnap registers the time bdp ping was sent out so that
// network rtt can be calculated when its ack is recieved.
// It is called (by controller) when the bdpPing is
// being written on the wire.
func (b *bdpEstimator) timesnap(d [8]byte) {
if bdpPing.data != d {
return
}
b.sentAt = time.Now()
}

// add adds bytes to the current sample for calculating bdp.
// It returns true only if a ping must be sent. This can be used
// by the caller (handleData) to make decision about batching
// a window update with it.
func (b *bdpEstimator) add(n uint32) bool {
b.mu.Lock()
defer b.mu.Unlock()
if b.bdp == bdpLimit {
return false
}
if !b.isSent {
b.isSent = true
b.sample = n
b.sentAt = time.Time{}
b.sampleCount++
return true
}
b.sample += n
return false
}

// calculate is called when an ack for a bdp ping is received.
// Here we calculate the current bdp and bandwidth sample and
// decide if the flow control windows should go up.
func (b *bdpEstimator) calculate(d [8]byte) {
// Check if the ping acked for was the bdp ping.
if bdpPing.data != d {
return
}
b.mu.Lock()
rttSample := time.Since(b.sentAt).Seconds()
if b.sampleCount < 10 {
// Bootstrap rtt with an average of first 10 rtt samples.
b.rtt += (rttSample - b.rtt) / float64(b.sampleCount)
} else {
// Heed to the recent past more.
b.rtt += (rttSample - b.rtt) * float64(alpha)
}
b.isSent = false
// The number of bytes accumalated so far in the sample is smaller
// than or equal to 1.5 times the real BDP on a saturated connection.
bwCurrent := float64(b.sample) / (b.rtt * float64(1.5))
if bwCurrent > b.bwMax {
b.bwMax = bwCurrent
}
// If the current sample (which is smaller than or equal to the 1.5 times the real BDP) is
// greater than or equal to 2/3rd our perceived bdp AND this is the maximum bandwidth seen so far, we
// should update our perception of the network BDP.
if float64(b.sample) >= beta*float64(b.bdp) && bwCurrent == b.bwMax && b.bdp != bdpLimit {
sampleFloat := float64(b.sample)
b.bdp = uint32(gamma * sampleFloat)
if b.bdp > bdpLimit {
b.bdp = bdpLimit
}
bdp := b.bdp
b.mu.Unlock()
b.updateFlowControl(bdp)
return
}
b.mu.Unlock()
}
16 changes: 12 additions & 4 deletions transport/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ const (
// The default value of flow control window size in HTTP2 spec.
defaultWindowSize = 65535
// The initial window size for flow control.
initialWindowSize = defaultWindowSize // for an RPC
initialConnWindowSize = defaultWindowSize * 16 // for a connection
initialWindowSize = defaultWindowSize // for an RPC
infinity = time.Duration(math.MaxInt64)
defaultClientKeepaliveTime = infinity
defaultClientKeepaliveTimeout = time.Duration(20 * time.Second)
Expand Down Expand Up @@ -145,10 +144,9 @@ func (qb *quotaPool) acquire() <-chan int {

// inFlow deals with inbound flow control
type inFlow struct {
mu sync.Mutex
// The inbound flow control limit for pending data.
limit uint32

mu sync.Mutex
// pendingData is the overall data which have been received but not been
// consumed by applications.
pendingData uint32
Expand All @@ -160,6 +158,16 @@ type inFlow struct {
delta uint32
}

// newLimit updates the inflow window to a new value n.
// It assumes that n is always greater than the old limit.
func (f *inFlow) newLimit(n uint32) uint32 {
f.mu.Lock()
defer f.mu.Unlock()
d := n - f.limit
f.limit = n
return d
}

func (f *inFlow) maybeAdjust(n uint32) uint32 {
if n > uint32(math.MaxInt32) {
n = uint32(math.MaxInt32)
Expand Down
66 changes: 59 additions & 7 deletions transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ type http2Client struct {

initialWindowSize int32

bdpEst *bdpEstimator

mu sync.Mutex // guard the following variables
state transportState // the state of underlying connection
activeStreams map[uint32]*Stream
Expand Down Expand Up @@ -191,9 +193,11 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
if kp.Timeout == 0 {
kp.Timeout = defaultClientKeepaliveTimeout
}
icwz := int32(initialConnWindowSize)
dynamicWindow := true
icwz := int32(initialWindowSize)
if opts.InitialConnWindowSize >= defaultWindowSize {
icwz = opts.InitialConnWindowSize
dynamicWindow = false
}
var buf bytes.Buffer
t := &http2Client{
Expand Down Expand Up @@ -232,6 +236,13 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
}
if opts.InitialWindowSize >= defaultWindowSize {
t.initialWindowSize = opts.InitialWindowSize
dynamicWindow = false
}
if dynamicWindow {
t.bdpEst = &bdpEstimator{
bdp: initialWindowSize,
updateFlowControl: t.updateFlowControl,
}
}
// Make sure awakenKeepalive can't be written upon.
// keepalive routine will make it writable, if need be.
Expand Down Expand Up @@ -827,11 +838,33 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) {
}
}

// updateFlowControl updates the incoming flow control windows
// for the transport and the stream based on the current bdp
// estimation.
func (t *http2Client) updateFlowControl(n uint32) {
t.mu.Lock()
for _, s := range t.activeStreams {
s.fc.newLimit(n)
}
t.initialWindowSize = int32(n)
t.mu.Unlock()
t.controlBuf.put(&windowUpdate{0, t.fc.newLimit(n), false})
t.controlBuf.put(&settings{
ack: false,
ss: []http2.Setting{
{
ID: http2.SettingInitialWindowSize,
Val: uint32(n),
},
},
})
}

func (t *http2Client) handleData(f *http2.DataFrame) {
size := f.Header().Length
if err := t.fc.onData(uint32(size)); err != nil {
t.notifyError(connectionErrorf(true, err, "%v", err))
return
var sendBDPPing bool
if t.bdpEst != nil {
sendBDPPing = t.bdpEst.add(uint32(size))
}
// Decouple connection's flow control from application's read.
// An update on connection's flow control should not depend on
Expand All @@ -841,8 +874,20 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
// Decoupling the connection flow control will prevent other
// active(fast) streams from starving in presence of slow or
// inactive streams.
if w := t.fc.onRead(uint32(size)); w > 0 {
t.controlBuf.put(&windowUpdate{0, w, true})
//
// Furthermore, if a bdpPing is being sent out we can piggyback
// connection's window update for the bytes we just received.
if sendBDPPing {
t.controlBuf.put(&windowUpdate{0, uint32(size), false})
t.controlBuf.put(bdpPing)
} else {
if err := t.fc.onData(uint32(size)); err != nil {
t.notifyError(connectionErrorf(true, err, "%v", err))
return
}
if w := t.fc.onRead(uint32(size)); w > 0 {
t.controlBuf.put(&windowUpdate{0, w, true})
}
}
// Select the right stream to dispatch.
s, ok := t.getStream(f)
Expand Down Expand Up @@ -930,7 +975,11 @@ func (t *http2Client) handleSettings(f *http2.SettingsFrame) {
}

func (t *http2Client) handlePing(f *http2.PingFrame) {
if f.IsAck() { // Do nothing.
if f.IsAck() {
// Maybe it's a BDP ping.
if t.bdpEst != nil {
t.bdpEst.calculate(f.Data)
}
return
}
pingAck := &ping{ack: true}
Expand Down Expand Up @@ -1202,6 +1251,9 @@ func (t *http2Client) controller() {
case *flushIO:
t.framer.flushWrite()
case *ping:
if !i.ack {
t.bdpEst.timesnap(i.data)
}
t.framer.writePing(true, i.ack, i.data)
default:
grpclog.Printf("transport: http2Client.controller got unexpected item type %v\n", i)
Expand Down
Loading

0 comments on commit d69dedd

Please sign in to comment.