Skip to content

Commit

Permalink
Merge pull request #1396 from tamird/retry-for-loop
Browse files Browse the repository at this point in the history
Rewrite util/retry as a for-loop
  • Loading branch information
tamird committed Jun 29, 2015
2 parents dba61d0 + a4b984e commit 830e67a
Show file tree
Hide file tree
Showing 23 changed files with 435 additions and 452 deletions.
14 changes: 7 additions & 7 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ func TestClientRetryNonTxn(t *testing.T) {
s := server.StartTestServer(t)
defer s.Stop()
s.SetRangeRetryOptions(retry.Options{
Backoff: 1 * time.Millisecond,
MaxBackoff: 5 * time.Millisecond,
Constant: 2,
MaxAttempts: 2,
InitialBackoff: 1 * time.Millisecond,
MaxBackoff: 5 * time.Millisecond,
Multiplier: 2,
MaxRetries: 1,
})

testCases := []struct {
Expand Down Expand Up @@ -213,10 +213,10 @@ func TestClientRetryNonTxn(t *testing.T) {
}

func setTxnRetryBackoff(backoff time.Duration) func() {
savedBackoff := client.DefaultTxnRetryOptions.Backoff
client.DefaultTxnRetryOptions.Backoff = backoff
savedBackoff := client.DefaultTxnRetryOptions.InitialBackoff
client.DefaultTxnRetryOptions.InitialBackoff = backoff
return func() {
client.DefaultTxnRetryOptions.Backoff = savedBackoff
client.DefaultTxnRetryOptions.InitialBackoff = savedBackoff
}
}

Expand Down
58 changes: 35 additions & 23 deletions client/http_post.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
snappy "github.com/cockroachdb/c-snappy"
"github.com/cockroachdb/cockroach/base"
"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/log"
"github.com/cockroachdb/cockroach/util/retry"
gogoproto "github.com/gogo/protobuf/proto"
)
Expand All @@ -55,36 +56,41 @@ type PostContext struct {
// retry here to eventually get through with the same client command ID and be
// given the cached response.
func HTTPPost(c PostContext, request, response gogoproto.Message, method fmt.Stringer) error {
retryOpts := c.RetryOpts
retryOpts.Tag = fmt.Sprintf("%s %s", c.Context.RequestScheme(), method)

// Marshal the args into a request body.
body, err := gogoproto.Marshal(request)
if err != nil {
return err
}

client, err := c.Context.GetHTTPClient()
if err != nil {
return err
}

url := c.Context.RequestScheme() + "://" + c.Server + c.Endpoint + method.String()

return retry.WithBackoff(retryOpts, func() (retry.Status, error) {
req, err := http.NewRequest("POST", url, bytes.NewReader(body))
var (
req *http.Request
resp *http.Response
b []byte
)

for r := retry.Start(c.RetryOpts); r.Next(); {
req, err = http.NewRequest("POST", url, bytes.NewReader(body))
if err != nil {
return retry.Break, err
return err
}
req.Header.Add(util.ContentTypeHeader, util.ProtoContentType)
req.Header.Add(util.AcceptHeader, util.ProtoContentType)
req.Header.Add(util.AcceptEncodingHeader, util.SnappyEncoding)

client, err := c.Context.GetHTTPClient()
resp, err = client.Do(req)
if err != nil {
return retry.Break, err
if log.V(1) {
log.Warning(err)
}
continue
}

resp, err := client.Do(req)
if err != nil {
return retry.Continue, err
}

defer resp.Body.Close()

switch resp.StatusCode {
Expand All @@ -94,27 +100,33 @@ func HTTPPost(c PostContext, request, response gogoproto.Message, method fmt.Str
// Retry on service unavailable and request timeout.
// TODO(spencer): consider respecting the Retry-After header for
// backoff / retry duration.
return retry.Continue, errors.New(resp.Status)
continue
default:
// Can't recover from all other errors.
return retry.Break, errors.New(resp.Status)
return errors.New(resp.Status)
}

if resp.Header.Get(util.ContentEncodingHeader) == util.SnappyEncoding {
resp.Body = &snappyReader{body: resp.Body}
}

b, err := ioutil.ReadAll(resp.Body)
b, err = ioutil.ReadAll(resp.Body)
if err != nil {
return retry.Continue, err
if log.V(1) {
log.Warning(err)
}
continue
}

if err := gogoproto.Unmarshal(b, response); err != nil {
return retry.Continue, err
if err = gogoproto.Unmarshal(b, response); err != nil {
if log.V(1) {
log.Warning(err)
}
continue
}

return retry.Break, nil
})
break
}
return err
}

// snappyReader wraps a response body so it can lazily
Expand Down
4 changes: 2 additions & 2 deletions client/http_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestHTTPSenderSend(t *testing.T) {
func TestHTTPSenderRetryResponseCodes(t *testing.T) {
defer leaktest.AfterTest(t)
retryOptions := defaultRetryOptions
retryOptions.Backoff = 1 * time.Millisecond
retryOptions.InitialBackoff = 1 * time.Millisecond

testCases := []struct {
code int
Expand Down Expand Up @@ -174,7 +174,7 @@ func TestHTTPSenderRetryResponseCodes(t *testing.T) {
func TestHTTPSenderRetryHTTPSendError(t *testing.T) {
defer leaktest.AfterTest(t)
retryOptions := defaultRetryOptions
retryOptions.Backoff = 1 * time.Millisecond
retryOptions.InitialBackoff = 1 * time.Millisecond

testCases := []func(*httptest.Server, http.ResponseWriter){
// Send back an unparseable response but a success code on first try.
Expand Down
22 changes: 11 additions & 11 deletions client/rpc/rpc_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package rpc

import (
"fmt"
"net"
"net/url"

Expand Down Expand Up @@ -79,32 +78,33 @@ func newSender(server string, context *base.Context, retryOpts retry.Options) (*
// through and been executed successfully. We retry here to eventually get
// through with the same client command ID and be given the cached response.
func (s *Sender) Send(_ context.Context, call proto.Call) {
retryOpts := s.retryOpts
retryOpts.Tag = fmt.Sprintf("rpc %s", call.Method())

if err := retry.WithBackoff(retryOpts, func() (retry.Status, error) {
var err error
for r := retry.Start(s.retryOpts); r.Next(); {
if !s.client.IsHealthy() {
return retry.Continue, nil
log.Warningf("client %s is unhealthy; retrying", s.client)
continue
}

method := call.Args.Method().String()
c := s.client.Go("Server."+method, call.Args, call.Reply, nil)
<-c.Done
if c.Error != nil {
err = c.Error
if err != nil {
// Assume all errors sending request are retryable. The actual
// number of things that could go wrong is vast, but we don't
// want to miss any which should in theory be retried with the
// same client command ID. We log the error here as a warning so
// there's visiblity that this is happening. Some of the errors
// we'll sweep up in this net shouldn't be retried, but we can't
// really know for sure which.
log.Warningf("failed to send RPC request %s: %v", method, c.Error)
return retry.Continue, nil
log.Warningf("failed to send RPC request %s: %v", method, err)
continue
}

// On successful post, we're done with retry loop.
return retry.Break, nil
}); err != nil {
break
}
if err != nil {
call.Reply.Header().SetGoError(err)
}
}
8 changes: 3 additions & 5 deletions client/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,9 @@ import (
// defaultRetryOptions sets the retry options for handling retryable errors and
// connection I/O errors.
var defaultRetryOptions = retry.Options{
Backoff: 50 * time.Millisecond,
MaxBackoff: 5 * time.Second,
Constant: 2,
MaxAttempts: 0, // retry indefinitely
UseV1Info: true,
InitialBackoff: 50 * time.Millisecond,
MaxBackoff: 5 * time.Second,
Multiplier: 2,
}

// Sender is an interface for sending a request to a Key-Value
Expand Down
34 changes: 16 additions & 18 deletions client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,9 @@ var (
// for transactions.
// This is exported for testing purposes only.
DefaultTxnRetryOptions = retry.Options{
Backoff: 50 * time.Millisecond,
MaxBackoff: 5 * time.Second,
Constant: 2,
MaxAttempts: 0, // retry indefinitely
UseV1Info: true,
InitialBackoff: 50 * time.Millisecond,
MaxBackoff: 5 * time.Second,
Multiplier: 2,
}
)

Expand Down Expand Up @@ -268,15 +266,12 @@ func (txn *Txn) Commit(b *Batch) error {
return txn.Run(b)
}

func (txn *Txn) exec(retryable func(txn *Txn) error) error {
func (txn *Txn) exec(retryable func(txn *Txn) error) (err error) {
// Run retryable in a retry loop until we encounter a success or
// error condition this loop isn't capable of handling.
retryOpts := txn.db.txnRetryOptions
retryOpts.Tag = txn.txn.Name
err := retry.WithBackoff(retryOpts, func() (retry.Status, error) {
for r := retry.Start(txn.db.txnRetryOptions); r.Next(); {
txn.haveTxnWrite, txn.haveEndTxn = false, false // always reset before [re]starting txn
err := retryable(txn)
if err == nil {
if err = retryable(txn); err == nil {
if !txn.haveEndTxn && txn.haveTxnWrite {
// If there were no errors running retryable, commit the txn. This
// may block waiting for outstanding writes to complete in case
Expand All @@ -288,25 +283,28 @@ func (txn *Txn) exec(retryable func(txn *Txn) error) error {
}
}
if restartErr, ok := err.(proto.TransactionRestartError); ok {
if log.V(2) {
log.Warning(err)
}
if restartErr.CanRestartTransaction() == proto.TransactionRestart_IMMEDIATE {
return retry.Reset, err
r.Reset()
continue
} else if restartErr.CanRestartTransaction() == proto.TransactionRestart_BACKOFF {
return retry.Continue, err
continue
}
// By default, fall through and return Break.
// By default, fall through and break.
}
return retry.Break, err
})
break
}
if err != nil && txn.haveTxnWrite {
if replyErr := txn.send(proto.Call{
Args: &proto.EndTransactionRequest{Commit: false},
Reply: &proto.EndTransactionResponse{},
}); replyErr != nil {
log.Errorf("failure aborting transaction: %s; abort caused by: %s", replyErr, err)
}
return err
}
return err
return
}

// send runs the specified calls synchronously in a single batch and
Expand Down
2 changes: 1 addition & 1 deletion client/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func TestRunTransactionRetryOnErrors(t *testing.T) {
}
}
}))
db.txnRetryOptions.Backoff = 1 * time.Millisecond
db.txnRetryOptions.InitialBackoff = 1 * time.Millisecond
err := db.Txn(func(txn *Txn) error {
return txn.Put("a", "b")
})
Expand Down
17 changes: 7 additions & 10 deletions gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,29 +546,26 @@ func (g *Gossip) maybeWarnAboutInit(stopper *util.Stopper) {
case <-time.After(5 * time.Second):
}
retryOptions := retry.Options{
Tag: "check cluster initialization",
Backoff: 5 * time.Second, // first backoff at 5s
MaxBackoff: 60 * time.Second, // max backoff is 60s
Constant: 2, // doubles
MaxAttempts: 0, // indefinite retries
Stopper: stopper, // stop no matter what on stopper
InitialBackoff: 5 * time.Second, // first backoff at 5s
MaxBackoff: 60 * time.Second, // max backoff is 60s
Multiplier: 2, // doubles
Stopper: stopper, // stop no matter what on stopper
}
// will never error because infinite retries
_ = retry.WithBackoff(retryOptions, func() (retry.Status, error) {
for r := retry.Start(retryOptions); r.Next(); {
g.mu.Lock()
hasSentinel := g.is.getInfo(KeySentinel) != nil
g.mu.Unlock()
// If we have the sentinel, exit the retry loop.
if hasSentinel {
return retry.Break, nil
break
}
// Otherwise, if all bootstrap hosts are connected, warn.
if g.triedAll {
log.Warningf("connected to gossip but missing sentinel. Has the cluster been initialized? " +
"Use \"cockroach init\" to initialize.")
}
return retry.Continue, nil
})
}
})
}

Expand Down
Loading

0 comments on commit 830e67a

Please sign in to comment.