Skip to content

Commit

Permalink
Merge pull request #183 from tschottdorf/opsacrossranges
Browse files Browse the repository at this point in the history
Allow operations to cross range boundaries
  • Loading branch information
tbg committed Nov 26, 2014
2 parents b856d2c + 807efd4 commit 9741dc9
Show file tree
Hide file tree
Showing 6 changed files with 319 additions and 52 deletions.
119 changes: 94 additions & 25 deletions kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (ds *DistSender) getRangeDescriptor(key proto.Key) ([]proto.RangeDescriptor

// sendRPC sends one or more RPCs to replicas from the supplied
// proto.Replica slice. First, replicas which have gossipped
// addresses are corraled and then sent via rpc.Send, with requirement
// addresses are corralled and then sent via rpc.Send, with requirement
// that one RPC to a server must succeed.
func (ds *DistSender) sendRPC(desc *proto.RangeDescriptor, method string, args proto.Request, reply proto.Response) error {
if len(desc.Replicas) == 0 {
Expand Down Expand Up @@ -302,6 +302,10 @@ func (ds *DistSender) sendRPC(desc *proto.RangeDescriptor, method string, args p
// permissions and looks up the appropriate range based on the
// supplied key and sends the RPC according to the specified
// options.
// If the request spans multiple ranges (which is possible for
// Scan or DeleteRange requests), Send sends requests to the
// individual ranges sequentially and combines the results
// transparently.
func (ds *DistSender) Send(call *client.Call) {
// Verify permissions.
if err := ds.verifyPermissions(call.Method, call.Args.Header()); err != nil {
Expand All @@ -312,33 +316,98 @@ func (ds *DistSender) Send(call *client.Call) {
// Retry logic for lookup of range by key and RPCs to range replicas.
retryOpts := rpcRetryOpts
retryOpts.Tag = fmt.Sprintf("routing %s rpc", call.Method)
err := util.RetryWithBackoff(retryOpts, func() (util.RetryStatus, error) {
desc, err := ds.rangeCache.LookupRangeDescriptor(call.Args.Header().Key)
if err == nil {
err = ds.sendRPC(desc, call.Method, call.Args, call.Reply)
}
if err != nil {
log.Warningf("failed to invoke %s: %s", call.Method, err)
// If retryable, allow outer loop to retry. We treat a range not found
// or range key mismatch errors special. In these cases, we don't want
// to backoff on the retry, but reset the backoff loop so we can retry
// immediately.
switch err.(type) {
case *proto.RangeNotFoundError, *proto.RangeKeyMismatchError:
// Range descriptor might be out of date - evict it.
ds.rangeCache.EvictCachedRangeDescriptor(call.Args.Header().Key)
// On addressing errors, don't backoff and retry immediately.
return util.RetryReset, nil
default:
if retryErr, ok := err.(util.Retryable); ok && retryErr.CanRetry() {
return util.RetryContinue, nil

// responses and descNext are only used when executing across ranges.
var responses []proto.Response
var descNext *proto.RangeDescriptor
// args will be changed to point to a copy of call.Args if the request
// spans ranges since in that case we need to alter its contents.
args := call.Args
for {
reply := call.Reply
err := util.RetryWithBackoff(retryOpts, func() (util.RetryStatus, error) {
descNext = nil
desc, err := ds.rangeCache.LookupRangeDescriptor(args.Header().Key)
if err == nil {
// If the request accesses keys beyond the end of this range,
// get the descriptor of the adjacent range to address next.
if desc.EndKey.Less(call.Args.Header().EndKey) {
if _, ok := call.Reply.(proto.Combinable); !ok {
return util.RetryBreak, util.Errorf("illegal cross-range operation", call)
}
// This next lookup is likely for free since we've read the
// previous descriptor and range lookups use cache
// prefetching.
descNext, err = ds.rangeCache.LookupRangeDescriptor(desc.EndKey)
// If this is the first step in a multi-range operation,
// additionally copy call.Args because we will have to
// mutate it as we talk to the involved ranges.
if len(responses) == 0 {
args = gogoproto.Clone(call.Args).(proto.Request)
}
// Truncate the request to our current range.
args.Header().EndKey = desc.EndKey
}
}
// true if we're dealing with a range-spanning request.
isMulti := len(responses) > 0 || descNext != nil
if err == nil {
if isMulti {
// Make a new reply object for this call.
reply = gogoproto.Clone(call.Reply).(proto.Response)
}
err = ds.sendRPC(desc, call.Method, args, reply)
}

if err != nil {
log.Warningf("failed to invoke %s: %s", call.Method, err)
// If retryable, allow retry. For range not found or range
// key mismatch errors, we don't backoff on the retry,
// but reset the backoff loop so we can retry immediately.
switch err.(type) {
case *proto.RangeNotFoundError, *proto.RangeKeyMismatchError:
// Range descriptor might be out of date - evict it.
ds.rangeCache.EvictCachedRangeDescriptor(args.Header().Key)
// On addressing errors, don't backoff and retry immediately.
return util.RetryReset, nil
default:
if retryErr, ok := err.(util.Retryable); ok && retryErr.CanRetry() {
return util.RetryContinue, nil
}
}
} else if isMulti {
// If this request spans ranges, collect the replies.
responses = append(responses, reply)
}
return util.RetryBreak, err
})
// Immediately return if querying a range failed non-retryably.
// For multi-range requests, we return the failing range's reply.
if err != nil {
reply.Header().SetGoError(err)
gogoproto.Merge(call.Reply, reply) // Only relevant in multi-range case.
return
}
return util.RetryBreak, err
})
if err != nil {
call.Reply.Header().SetGoError(err)
// If this was the last range accessed by this call, exit loop.
if descNext == nil {
break
}
// In next iteration, query next range.
args.Header().Key = descNext.StartKey
// "Untruncate" EndKey to original.
args.Header().EndKey = call.Args.Header().EndKey
}

// If this was a multi-range request, aggregate the individual range
// responses into one reply.
if len(responses) > 0 {
// We've already ascertained earlier that we're dealing with a
// Combinable response type.
firstReply := responses[0].(proto.Combinable)
for _, r := range responses[1:] {
firstReply.Combine(r)
}
gogoproto.Merge(call.Reply, responses[0])
}
}

Expand Down
2 changes: 2 additions & 0 deletions kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ func NewTxnCoordSender(wrapped client.KVSender, clock *hlc.Clock) *TxnCoordSende
// if it's not nil but has an empty ID.
func (tc *TxnCoordSender) Send(call *client.Call) {
header := call.Args.Header()
// TODO(Tobias): for commands that may access multiple ranges,
// make sure to wrap in a txn for consistency.
tc.maybeBeginTxn(header)

// Process batch specially; otherwise, send via wrapped sender.
Expand Down
47 changes: 45 additions & 2 deletions proto/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@ const (
// Delete removes the value for the specified key.
Delete = "Delete"
// DeleteRange removes all values for keys which fall between
// args.RequestHeader.Key and args.RequestHeader.EndKey.
// args.RequestHeader.Key and args.RequestHeader.EndKey, with
// the latter endpoint excluded.
DeleteRange = "DeleteRange"
// Scan fetches the values for all keys which fall between
// args.RequestHeader.Key and args.RequestHeader.EndKey.
// args.RequestHeader.Key and args.RequestHeader.EndKey, with
// the latter endpoint excluded.
Scan = "Scan"
// EndTransaction either commits or aborts an ongoing transaction.
EndTransaction = "EndTransaction"
Expand Down Expand Up @@ -459,6 +461,47 @@ type Response interface {
Verify(req Request) error
}

// Combinable is implemented by response types whose corresponding
// requests may cross range boundaries, such as Scan or DeleteRange.
// Combine() allows responses from individual ranges to be aggregated
// into a single one.
// It is not expected that Combine() perform any error checking; this
// should be done by the caller instead.
type Combinable interface {
Combine(Response)
}

// Combine is used by range-spanning Response types (e.g. Scan or DeleteRange)
// to merge their headers.
func (rh *ResponseHeader) Combine(otherRH *ResponseHeader) {
if rh != nil {
if ts := otherRH.GetTimestamp(); rh.Timestamp.Less(ts) {
rh.Timestamp = ts
}
if rh.Txn != nil && otherRH.GetTxn() == nil {
rh.Txn = nil
}
}
}

// Combine implements the Combinable interface for ScanResponse.
func (sr *ScanResponse) Combine(c Response) {
otherSR := c.(*ScanResponse)
if sr != nil {
sr.Rows = append(sr.Rows, otherSR.GetRows()...)
sr.Header().Combine(otherSR.Header())
}
}

// Combine implements the Combinable interface for DeleteRangeResponse.
func (dr *DeleteRangeResponse) Combine(c Response) {
otherDR := c.(*DeleteRangeResponse)
if dr != nil {
dr.NumDeleted += otherDR.GetNumDeleted()
dr.Header().Combine(otherDR.Header())
}
}

// Header implements the Request interface for RequestHeader.
func (rh *RequestHeader) Header() *RequestHeader {
return rh
Expand Down
77 changes: 77 additions & 0 deletions proto/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package proto

import (
"fmt"
"reflect"
"testing"
)
Expand Down Expand Up @@ -52,3 +53,79 @@ func TestResponseHeaderSetGoError(t *testing.T) {
t.Error("expected generic error to be retryable")
}
}

type XX interface {
Run()
}
type YY int

func (i YY) Run() {
fmt.Println(i)
}

// TestCombinable tests the correct behaviour of some types that implement
// the Combinable interface, notably {Scan,DeleteRange}Response and
// ResponseHeader.
func TestCombinable(t *testing.T) {
// Test that GetResponse doesn't have anything to do with Combinable.
if _, ok := interface{}(&GetResponse{}).(Combinable); ok {
t.Fatalf("GetResponse implements Combinable, so presumably all Response types will")
}
// Test that {Scan,DeleteRange}Response properly implement it.
sr1 := &ScanResponse{
ResponseHeader: ResponseHeader{Timestamp: MinTimestamp},
Rows: []KeyValue{
{Key: Key("A"), Value: Value{Bytes: []byte("V")}},
},
}

if _, ok := interface{}(sr1).(Combinable); !ok {
t.Fatalf("ScanResponse does not implement Combinable")
}

sr2 := &ScanResponse{
ResponseHeader: ResponseHeader{Timestamp: MinTimestamp},
Rows: []KeyValue{
{Key: Key("B"), Value: Value{Bytes: []byte("W")}},
},
}
sr2.Timestamp = MaxTimestamp

wantedSR := &ScanResponse{
ResponseHeader: ResponseHeader{Timestamp: MaxTimestamp},
Rows: append(append([]KeyValue(nil), sr1.Rows...), sr2.Rows...),
}

sr1.Combine(sr2)
sr1.Combine(&ScanResponse{})

if !reflect.DeepEqual(sr1, wantedSR) {
t.Errorf("wanted %v, got %v", wantedSR, sr1)
}

dr1 := &DeleteRangeResponse{
ResponseHeader: ResponseHeader{Timestamp: Timestamp{Logical: 100}},
NumDeleted: 5,
}
if _, ok := interface{}(dr1).(Combinable); !ok {
t.Fatalf("DeleteRangeResponse does not implement Combinable")
}
dr2 := &DeleteRangeResponse{
ResponseHeader: ResponseHeader{Timestamp: Timestamp{Logical: 1}},
NumDeleted: 12,
}
dr3 := &DeleteRangeResponse{
ResponseHeader: ResponseHeader{Timestamp: Timestamp{Logical: 111}},
NumDeleted: 3,
}
wantedDR := &DeleteRangeResponse{
ResponseHeader: ResponseHeader{Timestamp: Timestamp{Logical: 111}},
NumDeleted: 20,
}
dr2.Combine(dr3)
dr1.Combine(dr2)

if !reflect.DeepEqual(dr1, wantedDR) {
t.Errorf("wanted %v, got %v", wantedDR, dr1)
}
}
Loading

0 comments on commit 9741dc9

Please sign in to comment.