Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for retries of single requests in a batch on failure #5355

Merged
merged 1 commit into from
Jan 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 129 additions & 70 deletions google/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,19 @@ package google
import (
"context"
"fmt"
"github.com/hashicorp/errwrap"
"log"
"sync"
"time"

"github.com/hashicorp/errwrap"
)

const defaultBatchSendIntervalSec = 3

// RequestBatcher is a global batcher object that keeps track of
// existing batches.
// In general, a batcher should be created per service that requires batching
// in order to prevent blocking batching for one service due to another,
// and to minimize the possibility of overlap in batchKey formats
// (see SendRequestWithTimeout)
// RequestBatcher keeps track of batched requests globally.
// It should be created at a provider level. In general, one
// should be created per service that requires batching to:
// - prevent blocking batching for one service due to another,
// - minimize the possibility of overlap in batchKey formats (see SendRequestWithTimeout)
type RequestBatcher struct {
sync.Mutex

Expand All @@ -27,56 +25,74 @@ type RequestBatcher struct {
debugId string
}

// BatchRequest represents a single request to a global batcher.
type BatchRequest struct {
// ResourceName represents the underlying resource for which
// a request is made. Its format is determined by what SendF expects, but
// typically should be the name of the parent GCP resource being changed.
ResourceName string

// Body is this request's data to be passed to SendF, and may be combined
// with other bodies using CombineF.
Body interface{}

// CombineF function determines how to combine bodies from two batches.
CombineF batcherCombineFunc

// SendF function determines how to actually send a batched request to a
// third party service. The arguments given to this function are
// (ResourceName, Body) where Body may have been combined with other request
// Bodies.
SendF batcherSendFunc

// ID for debugging request. This should be specific to a single request
// (i.e. per Terraform resource)
DebugId string
}

// These types are meant to be the public interface to batchers. They define
// logic to manage batch data type and behavior, and require service-specific
// implementations per type of request per service.
// Function type for combine existing batches and additional batch data
type batcherCombineFunc func(body interface{}, toAdd interface{}) (interface{}, error)
// batch data format and logic to send/combine batches, i.e. they require
// specific implementations per type of request.
type (
// BatchRequest represents a single request to a global batcher.
BatchRequest struct {
// ResourceName represents the underlying resource for which
// a request is made. Its format is determined by what SendF expects, but
// typically should be the name of the parent GCP resource being changed.
ResourceName string

// Body is this request's data to be passed to SendF, and may be combined
// with other bodies using CombineF.
Body interface{}

// CombineF function determines how to combine bodies from two batches.
CombineF BatcherCombineFunc

// SendF function determines how to actually send a batched request to a
// third party service. The arguments given to this function are
// (ResourceName, Body) where Body may have been combined with other request
// Bodies.
SendF BatcherSendFunc

// ID for debugging request. This should be specific to a single request
// (i.e. per Terraform resource)
DebugId string
}

// Function type for sending a batch request
type batcherSendFunc func(resourceName string, body interface{}) (interface{}, error)
// BatcherCombineFunc is a function type for combine existing batches and additional batch data
BatcherCombineFunc func(body interface{}, toAdd interface{}) (interface{}, error)

// BatcherSendFunc is a function type for sending a batch request
BatcherSendFunc func(resourceName string, body interface{}) (interface{}, error)
)

// batchResponse bundles an API response (data, error) tuple.
type batchResponse struct {
body interface{}
err error
}

// startedBatch refers to a processed batch whose timer to send the request has
// already been started. The responses for the request is sent to each listener
// channel, representing parallel callers that are waiting on requests
// combined into this batch.
func (br *batchResponse) IsError() bool {
return br.err != nil
}

// startedBatch refers to a registered batch to group batch requests coming in.
// The timer manages the time after which a given batch is sent.
type startedBatch struct {
batchKey string

// Combined Batch Request
*BatchRequest

listeners []chan batchResponse
timer *time.Timer
// subscribers is a registry of the requests (batchSubscriber) combined into this batcher.

subscribers []batchSubscriber

timer *time.Timer
}

// batchSubscriber contains information required for a single request for a startedBatch.
type batchSubscriber struct {
// singleRequest is the original request this subscriber represents
singleRequest *BatchRequest

// respCh is the channel created to communicate the result to a waiting goroutine.s
respCh chan batchResponse
}

// batchingConfig contains user configuration for controlling batch requests.
Expand All @@ -94,8 +110,12 @@ func NewRequestBatcher(debugId string, ctx context.Context, config *batchingConf
batches: make(map[string]*startedBatch),
}

// Start goroutine to managing stopping the batcher if the provider-level parent context is closed.
go func(b *RequestBatcher) {
<-ctx.Done()
// Block until parent context is closed
<-b.parentCtx.Done()

log.Printf("[DEBUG] parent context canceled, cleaning up batcher batches")
b.stop()
}(batcher)

Expand All @@ -108,19 +128,19 @@ func (b *RequestBatcher) stop() {

log.Printf("[DEBUG] Stopping batcher %q", b.debugId)
for batchKey, batch := range b.batches {
log.Printf("[DEBUG] Cleaning up batch request %q", batchKey)
log.Printf("[DEBUG] Cancelling started batch for batchKey %q", batchKey)
batch.timer.Stop()
for _, l := range batch.listeners {
close(l)
for _, l := range batch.subscribers {
close(l.respCh)
}
}
}

// SendRequestWithTimeout is expected to be called per parallel call.
// It manages waiting on the result of a batch request.
// SendRequestWithTimeout is a blocking call for making a single request, run alone or as part of a batch.
// It manages registering the single request with the batcher and waiting on the result.
//
// Batch requests are grouped by the given batchKey. batchKey
// should be unique to the API request being sent, most likely similar to
// Params:
// batchKey: A string to group batchable requests. It should be unique to the API request being sent, similar to
// the HTTP request URL with GCP resource ID included in the URL (the caller
// may choose to use a key with method if needed to diff GET/read and
// POST/create)
Expand Down Expand Up @@ -179,40 +199,75 @@ func (b *RequestBatcher) registerBatchRequest(batchKey string, newRequest *Batch
return batch.addRequest(newRequest)
}

// Batch doesn't exist for given batch key - create a new batch.

log.Printf("[DEBUG] Creating new batch %q from request %q", newRequest.DebugId, batchKey)

// The calling goroutine will need a channel to wait on for a response.
respCh := make(chan batchResponse, 1)
sub := batchSubscriber{
singleRequest: newRequest,
respCh: respCh,
}

// Create a new batch.
// Create a new batch with copy of the given batch request.
b.batches[batchKey] = &startedBatch{
BatchRequest: newRequest,
batchKey: batchKey,
listeners: []chan batchResponse{respCh},
BatchRequest: &BatchRequest{
ResourceName: newRequest.ResourceName,
Body: newRequest.Body,
CombineF: newRequest.CombineF,
SendF: newRequest.SendF,
DebugId: fmt.Sprintf("Combined batch for started batch %q", batchKey),
},
batchKey: batchKey,
subscribers: []batchSubscriber{sub},
}

// Start a timer to send the request
b.batches[batchKey].timer = time.AfterFunc(b.sendAfter, func() {
batch := b.popBatch(batchKey)

var resp batchResponse
if batch == nil {
log.Printf("[DEBUG] Batch not found in saved batches, running single request batch %q", batchKey)
resp = newRequest.send()
log.Printf("[ERROR] batch should have been added to saved batches - just run as single request %q", newRequest.DebugId)
respCh <- newRequest.send()
close(respCh)
} else {
log.Printf("[DEBUG] Sending batch %q combining %d requests)", batchKey, len(batch.listeners))
resp = batch.send()
}

// Send message to all goroutines waiting on result.
for _, ch := range batch.listeners {
ch <- resp
close(ch)
b.sendBatchWithSingleRetry(batchKey, batch)
}
})

return respCh, nil
}

func (b *RequestBatcher) sendBatchWithSingleRetry(batchKey string, batch *startedBatch) {
log.Printf("[DEBUG] Sending batch %q combining %d requests)", batchKey, len(batch.subscribers))
resp := batch.send()

// If the batch failed and combines more than one request, retry each single request.
if resp.IsError() && len(batch.subscribers) > 1 {
log.Printf("[DEBUG] Batch failed with error: %v", resp.err)
log.Printf("[DEBUG] Sending each request in batch separately")
for _, sub := range batch.subscribers {
log.Printf("[DEBUG] Retrying single request %q", sub.singleRequest.DebugId)
singleResp := sub.singleRequest.send()
log.Printf("[DEBUG] Retried single request %q returned response: %v", sub.singleRequest.DebugId, singleResp)

if singleResp.IsError() {
singleResp.err = errwrap.Wrapf(
"batch request and retry as single request failed - final error: {{err}}",
singleResp.err)
}
sub.respCh <- singleResp
close(sub.respCh)
}
} else {
// Send result to all subscribers
for _, sub := range batch.subscribers {
sub.respCh <- resp
close(sub.respCh)
}
}
}

// popBatch safely gets and removes a batch with given batchkey from the
// RequestBatcher's started batches.
func (b *RequestBatcher) popBatch(batchKey string) *startedBatch {
Expand Down Expand Up @@ -243,7 +298,11 @@ func (batch *startedBatch) addRequest(newRequest *BatchRequest) (<-chan batchRes
log.Printf("[DEBUG] Added batch request %q to batch. New batch body: %v", newRequest.DebugId, batch.Body)

respCh := make(chan batchResponse, 1)
batch.listeners = append(batch.listeners, respCh)
sub := batchSubscriber{
singleRequest: newRequest,
respCh: respCh,
}
batch.subscribers = append(batch.subscribers, sub)
return respCh, nil
}

Expand Down
57 changes: 40 additions & 17 deletions google/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"log"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -139,41 +140,63 @@ func TestRequestBatcher_errInSend(t *testing.T) {
enableBatching: true,
})

testResource := "resource for send error"
sendErrTmpl := "this is an expected error in send batch for resource %q"
// combineF keeps track of the batched indexes
testCombine := func(body interface{}, toAdd interface{}) (interface{}, error) {
return append(body.([]int), toAdd.([]int)...), nil
}

// combineF is no-op
testCombine := func(_ interface{}, _ interface{}) (interface{}, error) {
failIdx := 0
testResource := "RESOURCE-SEND-ERROR"
expectedErrMsg := fmt.Sprintf("Error - batch %q contains idx %d", testResource, failIdx)

testSendBatch := func(resourceName string, body interface{}) (interface{}, error) {
log.Printf("[DEBUG] sendBatch body: %+v", body)
for _, v := range body.([]int) {
if v == failIdx {
return nil, fmt.Errorf(expectedErrMsg)
}
}
return nil, nil
}

testSendBatch := func(resourceName string, cnt interface{}) (interface{}, error) {
return cnt, fmt.Errorf(sendErrTmpl, resourceName)
}
numRequests := 3

wg := sync.WaitGroup{}
wg.Add(2)
wg.Add(numRequests)

for i := 0; i < 2; i++ {
for i := 0; i < numRequests; i++ {
go func(idx int) {
defer wg.Done()

req := &BatchRequest{
DebugId: fmt.Sprintf("sendError %d", idx),
ResourceName: testResource,
Body: nil,
Body: []int{idx},
CombineF: testCombine,
SendF: testSendBatch,
}

_, err := testBatcher.SendRequestWithTimeout("batchSendError", req, time.Duration(10)*time.Second)
if err == nil {
t.Errorf("expected error, got none")
return
}
expectedErr := fmt.Sprintf(sendErrTmpl, testResource)
if !strings.Contains(err.Error(), fmt.Sprintf(sendErrTmpl, testResource)) {
t.Errorf("expected error %q, got error: %v", expectedErr, err)
// Requests without index 0 should have succeeded
if idx == failIdx {
// We expect an error
if err == nil {
t.Errorf("expected error for request %d, got none", idx)
}
// Check error message
expectedErrPrefix := "batch request and retry as single request failed - final error: "
if !strings.Contains(err.Error(), expectedErrPrefix) {
t.Errorf("expected error %q to contain %q", err, expectedErrPrefix)
}
if !strings.Contains(err.Error(), expectedErrMsg) {
t.Errorf("expected error %q to contain %q", err, expectedErrMsg)
}
} else {

// We shouldn't get error for non-failure index
if err != nil {
t.Errorf("expected request %d to succeed, got error: %v", idx, err)
}
}
}(i)
}
Expand Down
2 changes: 1 addition & 1 deletion google/iam_batching.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func combineBatchIamPolicyModifiers(currV interface{}, toAddV interface{}) (inte
return append(currModifiers, newModifiers...), nil
}

func sendBatchModifyIamPolicy(updater ResourceIamUpdater) batcherSendFunc {
func sendBatchModifyIamPolicy(updater ResourceIamUpdater) BatcherSendFunc {
return func(resourceName string, body interface{}) (interface{}, error) {
modifiers, ok := body.([]iamPolicyModifyFunc)
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion google/resource_google_project_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func resourceGoogleProjectServiceCreate(d *schema.ResourceData, meta interface{}
}

srv := d.Get("service").(string)
err = BatchRequestEnableServices(map[string]struct{}{srv: {}}, project, d, config)
err = BatchRequestEnableService(srv, project, d, config)
if err != nil {
return err
}
Expand Down
Loading