Skip to content

Commit

Permalink
implement a circular channel buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
jameshageman-stripe committed Jan 15, 2019
1 parent 33b806c commit 0752b9d
Showing 1 changed file with 43 additions and 19 deletions.
62 changes: 43 additions & 19 deletions stripe.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ type BackendImplementation struct {
// See also SetNetworkRetriesSleep.
networkRetriesSleep bool

enableTelemetry bool
prevRequestMetrics chan requestMetrics
enableTelemetry bool
requestMetricsBuffer *ringBuffer
}

// Call is the Backend.Call implementation for invoking Stripe APIs.
Expand Down Expand Up @@ -303,7 +303,7 @@ func (s *BackendImplementation) Do(req *http.Request, body *bytes.Buffer, v inte

if s.enableTelemetry {
select {
case metrics := <-s.prevRequestMetrics:
case metrics := <-s.requestMetricsBuffer.outputChannel:
metricsJSON, err := json.Marshal(&requestTelemetry{LastRequestMetrics: metrics})
if err == nil {
req.Header.Set("X-Stripe-Client-Telemetry", string(metricsJSON))
Expand Down Expand Up @@ -425,11 +425,7 @@ func (s *BackendImplementation) Do(req *http.Request, body *bytes.Buffer, v inte
RequestDurationMS: requestDurationMS,
}

select {
case s.prevRequestMetrics <- metrics:
default:
// buffer is full, discard.
}
s.requestMetricsBuffer.inputChannel <- metrics
}
}

Expand Down Expand Up @@ -866,6 +862,11 @@ type requestTelemetry struct {
LastRequestMetrics requestMetrics `json:"last_request_metrics"`
}

// ringBuffer is a circular buffer of requestMetrics.
type ringBuffer struct {
inputChannel, outputChannel chan requestMetrics
}

//
// Private variables
//
Expand Down Expand Up @@ -939,24 +940,47 @@ func isHTTPWriteMethod(method string) bool {
// The vast majority of the time you should be calling GetBackendWithConfig
// instead of this function.
func newBackendImplementation(backendType SupportedBackend, config *BackendConfig) Backend {
var requestMetricsBuffer chan requestMetrics
var requestMetricsBuffer *ringBuffer

// only allocate the requestMetrics buffer if client telemetry is enabled.
if config.EnableTelemetry {
requestMetricsBuffer = make(chan requestMetrics, telemetryBufferSize)
requestMetricsBuffer = &ringBuffer{
// inputChannel does not need to be buffered because ringBuffer#run()
// is always pulling objects off inputChannel and putting them in
// outputChannel.
inputChannel: make(chan requestMetrics),
outputChannel: make(chan requestMetrics, telemetryBufferSize),
}

go requestMetricsBuffer.run()
}

return &BackendImplementation{
HTTPClient: config.HTTPClient,
LogLevel: config.LogLevel,
Logger: config.Logger,
MaxNetworkRetries: config.MaxNetworkRetries,
Type: backendType,
URL: config.URL,
networkRetriesSleep: true,
enableTelemetry: config.EnableTelemetry,
prevRequestMetrics: requestMetricsBuffer,
HTTPClient: config.HTTPClient,
LogLevel: config.LogLevel,
Logger: config.Logger,
MaxNetworkRetries: config.MaxNetworkRetries,
Type: backendType,
URL: config.URL,
networkRetriesSleep: true,
enableTelemetry: config.EnableTelemetry,
requestMetricsBuffer: requestMetricsBuffer,
}
}

// This is the bookkeeping method for ringBuffer, which should be run as a
// seperate goroutine.
func (r *ringBuffer) run() {
for v := range r.inputChannel {
select {
case r.outputChannel <- v:
default:
<-r.outputChannel
r.outputChannel <- v
}
}

close(r.outputChannel)
}

func normalizeURL(url string) string {
Expand Down

0 comments on commit 0752b9d

Please sign in to comment.