Skip to content

Commit

Permalink
Merge pull request #2663 from tschottdorf/coord
Browse files Browse the repository at this point in the history
fix some minor kv TODOs
  • Loading branch information
tbg committed Sep 25, 2015
2 parents e5e3e68 + 4c3d6fe commit 9529d0d
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 105 deletions.
62 changes: 36 additions & 26 deletions kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,16 +365,17 @@ func (tc *TxnCoordSender) Send(ctx context.Context, ba proto.BatchRequest) (*pro
// Remember when EndTransaction started in case we want to
// be linearizable.
startNS = tc.clock.PhysicalNow()
// EndTransaction must have its key set to that of the txn.
// TODO(tschottdorf): Should remove this here and make sure
// the client does it properly.
et.Key = ba.Txn.Key
if len(et.Intents) > 0 {
// TODO(tschottdorf): it may be useful to allow this later.
// That would be part of a possible plan to allow txns which
// write on multiple coordinators.
return nil, proto.NewError(util.Errorf("client must not pass intents to EndTransaction"))
}
if len(et.Key) != 0 {
return nil, proto.NewError(util.Errorf("EndTransaction must not have a Key set"))
}
et.Key = ba.Txn.Key

tc.Lock()
txnMeta, metaOK := tc.txns[id]
if id != "" && metaOK {
Expand Down Expand Up @@ -513,10 +514,8 @@ func (tc *TxnCoordSender) cleanupTxn(trace *tracer.Trace, txn proto.Transaction)
}

// unregisterTxn deletes a txnMetadata object from the sender
// and collects its stats.
func (tc *TxnCoordSender) unregisterTxn(id string) {
tc.Lock()
defer tc.Unlock()
// and collects its stats. It assumes the lock is held.
func (tc *TxnCoordSender) unregisterTxnLocked(id string) {
txnMeta := tc.txns[id] // guaranteed to exist
if txnMeta == nil {
panic("attempt to unregister non-existent transaction: " + id)
Expand Down Expand Up @@ -548,7 +547,11 @@ func (tc *TxnCoordSender) heartbeatLoop(id string) {
tickChan = ticker.C
defer ticker.Stop()
}
defer tc.unregisterTxn(id)
defer func() {
tc.Lock()
tc.unregisterTxnLocked(id)
tc.Unlock()
}()

var closer <-chan struct{}
var trace *tracer.Trace
Expand Down Expand Up @@ -701,21 +704,22 @@ func (tc *TxnCoordSender) updateState(ctx context.Context, ba proto.BatchRequest
}
}

if len(newTxn.ID) > 0 {
return func() *proto.Error {
if len(newTxn.ID) <= 0 {
return pErr
}
id := string(newTxn.ID)
tc.Lock()
defer tc.Unlock()
txnMeta := tc.txns[id]
// For successful transactional requests, keep the written intents and
// the updated transaction record to be sent along with the reply.
// The transaction metadata is created with the first writing operation
// TODO(tschottdorf): already computed the intents prior to sending,
// consider re-using those.
if intents := ba.GetIntents(); len(intents) > 0 && err == nil {
// TODO(tschottdorf): avoid spawning one if EndTransaction is in
// the same batch.
if txnMeta == nil {
newTxn.Writing = true
trace.Event("coordinator spawns")
txnMeta = &txnMetadata{
txn: *newTxn,
keys: cache.NewIntervalCache(cache.Config{Policy: cache.CacheNone}),
Expand All @@ -725,16 +729,23 @@ func (tc *TxnCoordSender) updateState(ctx context.Context, ba proto.BatchRequest
txnEnd: make(chan struct{}),
}
tc.txns[id] = txnMeta
if !tc.stopper.RunAsyncTask(func() {
tc.heartbeatLoop(id)
}) {
// The system is already draining and we can't start the
// heartbeat. We refuse new transactions for now because
// they're likely not going to have all intents committed.
// In principle, we can relax this as needed though.
tc.Unlock()
tc.unregisterTxn(id)
return proto.NewError(&proto.NodeUnavailableError{})
if _, isEnding := ba.GetArg(proto.EndTransaction); !isEnding {
trace.Event("coordinator spawns")
if !tc.stopper.RunAsyncTask(func() {
tc.heartbeatLoop(id)
}) {
// The system is already draining and we can't start the
// heartbeat. We refuse new transactions for now because
// they're likely not going to have all intents committed.
// In principle, we can relax this as needed though.
tc.unregisterTxnLocked(id)
return proto.NewError(&proto.NodeUnavailableError{})
}
} else {
// We omit starting a coordinator since the txn just ended
// anyway. This means we need to do the cleanup that the
// heartbeat would've carried out otherwise.
defer tc.unregisterTxnLocked(id)
}
}
for _, intent := range intents {
Expand All @@ -749,7 +760,6 @@ func (tc *TxnCoordSender) updateState(ctx context.Context, ba proto.BatchRequest
}
txnMeta.setLastUpdate(tc.clock.PhysicalNow())
}
tc.Unlock()
if err == nil {
// For successful transactional requests, always send the updated txn
// record back.
Expand All @@ -758,8 +768,8 @@ func (tc *TxnCoordSender) updateState(ctx context.Context, ba proto.BatchRequest
}
*br.Txn = *newTxn
}
}
return pErr
return pErr
}()
}

// TODO(tschottdorf): this method is somewhat awkward but unless we want to
Expand Down
103 changes: 31 additions & 72 deletions kv/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,6 @@ func TestTxnCoordSenderEndTxn(t *testing.T) {
pReply := reply.(*proto.PutResponse)
if _, err := batchutil.SendWrapped(s.Sender, &proto.EndTransactionRequest{
RequestHeader: proto.RequestHeader{
Key: txn.Key,
Timestamp: txn.Timestamp,
Txn: pReply.Header().Txn,
},
Expand Down Expand Up @@ -429,7 +428,6 @@ func TestTxnCoordSenderCleanupOnAborted(t *testing.T) {
// end transaction failed.
etArgs := &proto.EndTransactionRequest{
RequestHeader: proto.RequestHeader{
Key: txn.Key,
Timestamp: txn.Timestamp,
Txn: txn,
},
Expand Down Expand Up @@ -563,73 +561,6 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) {
}
}

// TestTxnCoordSenderBatchTransaction tests that it is possible to send
// one-off transactional calls within a batch under certain circumstances.
func TestTxnCoordSenderBatchTransaction(t *testing.T) {
defer leaktest.AfterTest(t)
t.Skip("TODO(tschottdorf): remove this test; behavior is more transparent now")
defer leaktest.AfterTest(t)
stopper := stop.NewStopper()
defer stopper.Stop()
clock := hlc.NewClock(hlc.UnixNano)
var called bool
var alwaysError = errors.New("success")
ts := NewTxnCoordSender(senderFn(func(_ context.Context, _ proto.BatchRequest) (*proto.BatchResponse, *proto.Error) {
called = true
// Returning this error is an easy way of preventing heartbeats
// to be started for otherwise "successful" calls.
return nil, proto.NewError(alwaysError)
}), clock, false, nil, stopper)

pushArg := &proto.PushTxnRequest{}
putArg := &proto.PutRequest{}
getArg := &proto.GetRequest{}
testCases := []struct {
req proto.Request
batch, arg, ok bool
}{
// Lays intents: can't have this on individual calls at all.
{putArg, false, false, true},
{putArg, true, false, true},
{putArg, true, true, false},
{putArg, false, true, false},

// No intents: all ok, except when batch and arg have different txns.
{pushArg, false, false, true},
{pushArg, true, false, true},
{pushArg, true, true, false},
{pushArg, false, true, true},
{getArg, false, false, true},
{getArg, true, false, true},
{getArg, true, true, false},
{getArg, false, true, true},
}

txn1 := &proto.Transaction{ID: []byte("txn1")}
txn2 := &proto.Transaction{ID: []byte("txn2")}

for i, tc := range testCases {
called = false
tc.req.Reset()
ba := &proto.BatchRequest{}

if tc.arg {
tc.req.Header().Txn = txn1
}
ba.Add(tc.req)
if tc.batch {
ba.Txn = txn2
}
called = false
_, err := batchutil.SendWrapped(ts, ba)
if !tc.ok && err == alwaysError {
t.Fatalf("%d: expected error%s", i, err)
} else if tc.ok != called {
t.Fatalf("%d: wanted call: %t, got call: %t", i, tc.ok, called)
}
}
}

// TestTxnDrainingNode tests that pending transactions tasks' intents are resolved
// if they commit while draining, and that a NodeUnavailableError is received
// when attempting to run a new transaction on a draining node.
Expand Down Expand Up @@ -658,7 +589,6 @@ func TestTxnDrainingNode(t *testing.T) {
endTxn := func() {
if _, err := batchutil.SendWrapped(s.Sender, &proto.EndTransactionRequest{
RequestHeader: proto.RequestHeader{
Key: txn.Key,
Timestamp: txn.Timestamp,
Txn: txn,
},
Expand Down Expand Up @@ -714,7 +644,6 @@ func TestTxnCoordIdempotentCleanup(t *testing.T) {
s.Sender.cleanupTxn(nil, *txn) // first call
if _, err := batchutil.SendWrapped(s.Sender, &proto.EndTransactionRequest{
RequestHeader: proto.RequestHeader{
Key: txn.Key,
Timestamp: txn.Timestamp,
Txn: txn,
},
Expand Down Expand Up @@ -768,7 +697,6 @@ func TestTxnMultipleCoord(t *testing.T) {
// Abort for clean shutdown.
if _, err := batchutil.SendWrapped(s.Sender, &proto.EndTransactionRequest{
RequestHeader: proto.RequestHeader{
Key: txn.Key,
Timestamp: txn.Timestamp,
Txn: txn,
},
Expand All @@ -778,3 +706,34 @@ func TestTxnMultipleCoord(t *testing.T) {
}
}
}

// TestTxnCoordSenderSingleRoundtripTxn checks that a batch which completely
// holds the writing portion of a Txn (including EndTransaction) does not
// launch a heartbeat goroutine at all.
func TestTxnCoordSenderSingleRoundtripTxn(t *testing.T) {
defer leaktest.AfterTest(t)
stopper := stop.NewStopper()
manual := hlc.NewManualClock(0)
clock := hlc.NewClock(manual.UnixNano)
clock.SetMaxOffset(20)

ts := NewTxnCoordSender(senderFn(func(_ context.Context, ba proto.BatchRequest) (*proto.BatchResponse, *proto.Error) {
return ba.CreateReply().(*proto.BatchResponse), nil
}), clock, false, nil, stopper)

// Stop the stopper manually, prior to trying the transaction. This has the
// effect of returning a NodeUnavailableError for any attempts at launching
// a heartbeat goroutine.
stopper.Stop()

var ba proto.BatchRequest
put := &proto.PutRequest{}
put.Key = proto.Key("test")
ba.Add(put)
ba.Add(&proto.EndTransactionRequest{})
ba.Txn = &proto.Transaction{Name: "test"}
_, pErr := ts.Send(context.Background(), ba)
if pErr != nil {
t.Fatal(pErr)
}
}
2 changes: 2 additions & 0 deletions proto/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ func (ba *BatchRequest) IsRange() bool {

// GetArg returns the first request of the given type, if possible.
func (ba *BatchRequest) GetArg(method Method) (Request, bool) {
// TODO(tschottdorf): when looking for EndTransaction, just look at the
// last entry.
for _, arg := range ba.Requests {
if req := arg.GetInner(); req.Method() == method {
return req, true
Expand Down
9 changes: 3 additions & 6 deletions storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -1176,8 +1176,7 @@ func (r *Replica) AdminSplit(args proto.AdminSplitRequest, desc *proto.RangeDesc
// End the transaction manually, instead of letting RunTransaction
// loop do it, in order to provide a split trigger.
b.InternalAddRequest(&proto.EndTransactionRequest{
RequestHeader: proto.RequestHeader{Key: newDesc.StartKey},
Commit: true,
Commit: true,
InternalCommitTrigger: &proto.InternalCommitTrigger{
SplitTrigger: &proto.SplitTrigger{
UpdatedDesc: updatedDesc,
Expand Down Expand Up @@ -1365,8 +1364,7 @@ func (r *Replica) AdminMerge(args proto.AdminMergeRequest, origLeftDesc *proto.R
// End the transaction manually instead of letting RunTransaction
// loop do it, in order to provide a merge trigger.
b.InternalAddRequest(&proto.EndTransactionRequest{
RequestHeader: proto.RequestHeader{Key: updatedLeftDesc.StartKey},
Commit: true,
Commit: true,
InternalCommitTrigger: &proto.InternalCommitTrigger{
MergeTrigger: &proto.MergeTrigger{
UpdatedDesc: updatedLeftDesc,
Expand Down Expand Up @@ -1517,8 +1515,7 @@ func (r *Replica) ChangeReplicas(changeType proto.ReplicaChangeType, replica pro
// End the transaction manually instead of letting RunTransaction
// loop do it, in order to provide a commit trigger.
b.InternalAddRequest(&proto.EndTransactionRequest{
RequestHeader: proto.RequestHeader{Key: updatedDesc.StartKey},
Commit: true,
Commit: true,
InternalCommitTrigger: &proto.InternalCommitTrigger{
ChangeReplicasTrigger: &proto.ChangeReplicasTrigger{
NodeID: replica.NodeID,
Expand Down
2 changes: 1 addition & 1 deletion storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,7 @@ func scanArgs(start, end []byte, rangeID proto.RangeID, storeID proto.StoreID) p
func endTxnArgs(txn *proto.Transaction, commit bool, rangeID proto.RangeID, storeID proto.StoreID) proto.EndTransactionRequest {
return proto.EndTransactionRequest{
RequestHeader: proto.RequestHeader{
Key: txn.Key,
Key: txn.Key, // not allowed when going through TxnCoordSender, but we're not
RangeID: rangeID,
Replica: proto.Replica{StoreID: storeID},
Txn: txn,
Expand Down

0 comments on commit 9529d0d

Please sign in to comment.