From 7f09c4a5a5193ce2458328fefde11590ba1b7752 Mon Sep 17 00:00:00 2001 From: James Hageman Date: Mon, 14 Jan 2019 17:14:48 -0800 Subject: [PATCH] implement a circular channel buffer --- stripe.go | 62 ++++++++++++++++++++++++++++++++++---------------- stripe_test.go | 2 ++ 2 files changed, 45 insertions(+), 19 deletions(-) diff --git a/stripe.go b/stripe.go index bbc4f567f0..62f8d6fb5c 100644 --- a/stripe.go +++ b/stripe.go @@ -165,8 +165,8 @@ type BackendImplementation struct { // See also SetNetworkRetriesSleep. networkRetriesSleep bool - enableTelemetry bool - prevRequestMetrics chan requestMetrics + enableTelemetry bool + requestMetricsBuffer *ringBuffer } // Call is the Backend.Call implementation for invoking Stripe APIs. @@ -303,7 +303,7 @@ func (s *BackendImplementation) Do(req *http.Request, body *bytes.Buffer, v inte if s.enableTelemetry { select { - case metrics := <-s.prevRequestMetrics: + case metrics := <-s.requestMetricsBuffer.outputChannel: metricsJSON, err := json.Marshal(&requestTelemetry{LastRequestMetrics: metrics}) if err == nil { req.Header.Set("X-Stripe-Client-Telemetry", string(metricsJSON)) @@ -425,11 +425,7 @@ func (s *BackendImplementation) Do(req *http.Request, body *bytes.Buffer, v inte RequestDurationMS: requestDurationMS, } - select { - case s.prevRequestMetrics <- metrics: - default: - // buffer is full, discard. - } + s.requestMetricsBuffer.inputChannel <- metrics } } @@ -866,6 +862,11 @@ type requestTelemetry struct { LastRequestMetrics requestMetrics `json:"last_request_metrics"` } +// ringBuffer is a circular buffer of requestMetrics. +type ringBuffer struct { + inputChannel, outputChannel chan requestMetrics +} + // // Private variables // @@ -939,24 +940,47 @@ func isHTTPWriteMethod(method string) bool { // The vast majority of the time you should be calling GetBackendWithConfig // instead of this function. func newBackendImplementation(backendType SupportedBackend, config *BackendConfig) Backend { - var requestMetricsBuffer chan requestMetrics + var requestMetricsBuffer *ringBuffer // only allocate the requestMetrics buffer if client telemetry is enabled. if config.EnableTelemetry { - requestMetricsBuffer = make(chan requestMetrics, telemetryBufferSize) + requestMetricsBuffer = &ringBuffer{ + // inputChannel does not need to be buffered because ringBuffer#run() + // is always pulling objects off inputChannel and putting them in + // outputChannel. + inputChannel: make(chan requestMetrics), + outputChannel: make(chan requestMetrics, telemetryBufferSize), + } + + go requestMetricsBuffer.run() } return &BackendImplementation{ - HTTPClient: config.HTTPClient, - LogLevel: config.LogLevel, - Logger: config.Logger, - MaxNetworkRetries: config.MaxNetworkRetries, - Type: backendType, - URL: config.URL, - networkRetriesSleep: true, - enableTelemetry: config.EnableTelemetry, - prevRequestMetrics: requestMetricsBuffer, + HTTPClient: config.HTTPClient, + LogLevel: config.LogLevel, + Logger: config.Logger, + MaxNetworkRetries: config.MaxNetworkRetries, + Type: backendType, + URL: config.URL, + networkRetriesSleep: true, + enableTelemetry: config.EnableTelemetry, + requestMetricsBuffer: requestMetricsBuffer, + } +} + +// This is the bookkeeping method for ringBuffer, which should be run as a +// seperate goroutine. +func (r *ringBuffer) run() { + for v := range r.inputChannel { + select { + case r.outputChannel <- v: + default: + <-r.outputChannel + r.outputChannel <- v + } } + + close(r.outputChannel) } func normalizeURL(url string) string { diff --git a/stripe_test.go b/stripe_test.go index 00e93dc70a..0e73e843e2 100644 --- a/stripe_test.go +++ b/stripe_test.go @@ -249,6 +249,8 @@ func TestDo_TelemetryEnabled(t *testing.T) { // the first request should not receive any metrics assert.Equal(t, telemetryStr, "") case 2: + assert.True(t, len(telemetryStr) > 0, "telemetryStr should not be empty") + // the telemetry should properly unmarshal into stripe.RequestTelemetry var telemetry requestTelemetry err := json.Unmarshal([]byte(telemetryStr), &telemetry)