diff --git a/kv/txn_coord_sender.go b/kv/txn_coord_sender.go index 69f16c68661a..1da9627d7eaa 100644 --- a/kv/txn_coord_sender.go +++ b/kv/txn_coord_sender.go @@ -365,16 +365,17 @@ func (tc *TxnCoordSender) Send(ctx context.Context, ba proto.BatchRequest) (*pro // Remember when EndTransaction started in case we want to // be linearizable. startNS = tc.clock.PhysicalNow() - // EndTransaction must have its key set to that of the txn. - // TODO(tschottdorf): Should remove this here and make sure - // the client does it properly. - et.Key = ba.Txn.Key if len(et.Intents) > 0 { // TODO(tschottdorf): it may be useful to allow this later. // That would be part of a possible plan to allow txns which // write on multiple coordinators. return nil, proto.NewError(util.Errorf("client must not pass intents to EndTransaction")) } + if len(et.Key) != 0 { + return nil, proto.NewError(util.Errorf("EndTransaction must not have a Key set")) + } + et.Key = ba.Txn.Key + tc.Lock() txnMeta, metaOK := tc.txns[id] if id != "" && metaOK { @@ -513,10 +514,8 @@ func (tc *TxnCoordSender) cleanupTxn(trace *tracer.Trace, txn proto.Transaction) } // unregisterTxn deletes a txnMetadata object from the sender -// and collects its stats. -func (tc *TxnCoordSender) unregisterTxn(id string) { - tc.Lock() - defer tc.Unlock() +// and collects its stats. It assumes the lock is held. +func (tc *TxnCoordSender) unregisterTxnLocked(id string) { txnMeta := tc.txns[id] // guaranteed to exist if txnMeta == nil { panic("attempt to unregister non-existent transaction: " + id) @@ -548,7 +547,11 @@ func (tc *TxnCoordSender) heartbeatLoop(id string) { tickChan = ticker.C defer ticker.Stop() } - defer tc.unregisterTxn(id) + defer func() { + tc.Lock() + tc.unregisterTxnLocked(id) + tc.Unlock() + }() var closer <-chan struct{} var trace *tracer.Trace @@ -701,9 +704,13 @@ func (tc *TxnCoordSender) updateState(ctx context.Context, ba proto.BatchRequest } } - if len(newTxn.ID) > 0 { + return func() *proto.Error { + if len(newTxn.ID) <= 0 { + return pErr + } id := string(newTxn.ID) tc.Lock() + defer tc.Unlock() txnMeta := tc.txns[id] // For successful transactional requests, keep the written intents and // the updated transaction record to be sent along with the reply. @@ -711,11 +718,8 @@ func (tc *TxnCoordSender) updateState(ctx context.Context, ba proto.BatchRequest // TODO(tschottdorf): already computed the intents prior to sending, // consider re-using those. if intents := ba.GetIntents(); len(intents) > 0 && err == nil { - // TODO(tschottdorf): avoid spawning one if EndTransaction is in - // the same batch. if txnMeta == nil { newTxn.Writing = true - trace.Event("coordinator spawns") txnMeta = &txnMetadata{ txn: *newTxn, keys: cache.NewIntervalCache(cache.Config{Policy: cache.CacheNone}), @@ -725,16 +729,23 @@ func (tc *TxnCoordSender) updateState(ctx context.Context, ba proto.BatchRequest txnEnd: make(chan struct{}), } tc.txns[id] = txnMeta - if !tc.stopper.RunAsyncTask(func() { - tc.heartbeatLoop(id) - }) { - // The system is already draining and we can't start the - // heartbeat. We refuse new transactions for now because - // they're likely not going to have all intents committed. - // In principle, we can relax this as needed though. - tc.Unlock() - tc.unregisterTxn(id) - return proto.NewError(&proto.NodeUnavailableError{}) + if _, isEnding := ba.GetArg(proto.EndTransaction); !isEnding { + trace.Event("coordinator spawns") + if !tc.stopper.RunAsyncTask(func() { + tc.heartbeatLoop(id) + }) { + // The system is already draining and we can't start the + // heartbeat. We refuse new transactions for now because + // they're likely not going to have all intents committed. + // In principle, we can relax this as needed though. + tc.unregisterTxnLocked(id) + return proto.NewError(&proto.NodeUnavailableError{}) + } + } else { + // We omit starting a coordinator since the txn just ended + // anyway. This means we need to do the cleanup that the + // heartbeat would've carried out otherwise. + defer tc.unregisterTxnLocked(id) } } for _, intent := range intents { @@ -749,7 +760,6 @@ func (tc *TxnCoordSender) updateState(ctx context.Context, ba proto.BatchRequest } txnMeta.setLastUpdate(tc.clock.PhysicalNow()) } - tc.Unlock() if err == nil { // For successful transactional requests, always send the updated txn // record back. @@ -758,8 +768,8 @@ func (tc *TxnCoordSender) updateState(ctx context.Context, ba proto.BatchRequest } *br.Txn = *newTxn } - } - return pErr + return pErr + }() } // TODO(tschottdorf): this method is somewhat awkward but unless we want to diff --git a/kv/txn_coord_sender_test.go b/kv/txn_coord_sender_test.go index 9ee1edaf77de..28d3ca0b8aad 100644 --- a/kv/txn_coord_sender_test.go +++ b/kv/txn_coord_sender_test.go @@ -380,7 +380,6 @@ func TestTxnCoordSenderEndTxn(t *testing.T) { pReply := reply.(*proto.PutResponse) if _, err := batchutil.SendWrapped(s.Sender, &proto.EndTransactionRequest{ RequestHeader: proto.RequestHeader{ - Key: txn.Key, Timestamp: txn.Timestamp, Txn: pReply.Header().Txn, }, @@ -429,7 +428,6 @@ func TestTxnCoordSenderCleanupOnAborted(t *testing.T) { // end transaction failed. etArgs := &proto.EndTransactionRequest{ RequestHeader: proto.RequestHeader{ - Key: txn.Key, Timestamp: txn.Timestamp, Txn: txn, }, @@ -563,73 +561,6 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) { } } -// TestTxnCoordSenderBatchTransaction tests that it is possible to send -// one-off transactional calls within a batch under certain circumstances. -func TestTxnCoordSenderBatchTransaction(t *testing.T) { - defer leaktest.AfterTest(t) - t.Skip("TODO(tschottdorf): remove this test; behavior is more transparent now") - defer leaktest.AfterTest(t) - stopper := stop.NewStopper() - defer stopper.Stop() - clock := hlc.NewClock(hlc.UnixNano) - var called bool - var alwaysError = errors.New("success") - ts := NewTxnCoordSender(senderFn(func(_ context.Context, _ proto.BatchRequest) (*proto.BatchResponse, *proto.Error) { - called = true - // Returning this error is an easy way of preventing heartbeats - // to be started for otherwise "successful" calls. - return nil, proto.NewError(alwaysError) - }), clock, false, nil, stopper) - - pushArg := &proto.PushTxnRequest{} - putArg := &proto.PutRequest{} - getArg := &proto.GetRequest{} - testCases := []struct { - req proto.Request - batch, arg, ok bool - }{ - // Lays intents: can't have this on individual calls at all. - {putArg, false, false, true}, - {putArg, true, false, true}, - {putArg, true, true, false}, - {putArg, false, true, false}, - - // No intents: all ok, except when batch and arg have different txns. - {pushArg, false, false, true}, - {pushArg, true, false, true}, - {pushArg, true, true, false}, - {pushArg, false, true, true}, - {getArg, false, false, true}, - {getArg, true, false, true}, - {getArg, true, true, false}, - {getArg, false, true, true}, - } - - txn1 := &proto.Transaction{ID: []byte("txn1")} - txn2 := &proto.Transaction{ID: []byte("txn2")} - - for i, tc := range testCases { - called = false - tc.req.Reset() - ba := &proto.BatchRequest{} - - if tc.arg { - tc.req.Header().Txn = txn1 - } - ba.Add(tc.req) - if tc.batch { - ba.Txn = txn2 - } - called = false - _, err := batchutil.SendWrapped(ts, ba) - if !tc.ok && err == alwaysError { - t.Fatalf("%d: expected error%s", i, err) - } else if tc.ok != called { - t.Fatalf("%d: wanted call: %t, got call: %t", i, tc.ok, called) - } - } -} - // TestTxnDrainingNode tests that pending transactions tasks' intents are resolved // if they commit while draining, and that a NodeUnavailableError is received // when attempting to run a new transaction on a draining node. @@ -658,7 +589,6 @@ func TestTxnDrainingNode(t *testing.T) { endTxn := func() { if _, err := batchutil.SendWrapped(s.Sender, &proto.EndTransactionRequest{ RequestHeader: proto.RequestHeader{ - Key: txn.Key, Timestamp: txn.Timestamp, Txn: txn, }, @@ -714,7 +644,6 @@ func TestTxnCoordIdempotentCleanup(t *testing.T) { s.Sender.cleanupTxn(nil, *txn) // first call if _, err := batchutil.SendWrapped(s.Sender, &proto.EndTransactionRequest{ RequestHeader: proto.RequestHeader{ - Key: txn.Key, Timestamp: txn.Timestamp, Txn: txn, }, @@ -768,7 +697,6 @@ func TestTxnMultipleCoord(t *testing.T) { // Abort for clean shutdown. if _, err := batchutil.SendWrapped(s.Sender, &proto.EndTransactionRequest{ RequestHeader: proto.RequestHeader{ - Key: txn.Key, Timestamp: txn.Timestamp, Txn: txn, }, @@ -778,3 +706,34 @@ func TestTxnMultipleCoord(t *testing.T) { } } } + +// TestTxnCoordSenderSingleRoundtripTxn checks that a batch which completely +// holds the writing portion of a Txn (including EndTransaction) does not +// launch a heartbeat goroutine at all. +func TestTxnCoordSenderSingleRoundtripTxn(t *testing.T) { + defer leaktest.AfterTest(t) + stopper := stop.NewStopper() + manual := hlc.NewManualClock(0) + clock := hlc.NewClock(manual.UnixNano) + clock.SetMaxOffset(20) + + ts := NewTxnCoordSender(senderFn(func(_ context.Context, ba proto.BatchRequest) (*proto.BatchResponse, *proto.Error) { + return ba.CreateReply().(*proto.BatchResponse), nil + }), clock, false, nil, stopper) + + // Stop the stopper manually, prior to trying the transaction. This has the + // effect of returning a NodeUnavailableError for any attempts at launching + // a heartbeat goroutine. + stopper.Stop() + + var ba proto.BatchRequest + put := &proto.PutRequest{} + put.Key = proto.Key("test") + ba.Add(put) + ba.Add(&proto.EndTransactionRequest{}) + ba.Txn = &proto.Transaction{Name: "test"} + _, pErr := ts.Send(context.Background(), ba) + if pErr != nil { + t.Fatal(pErr) + } +} diff --git a/proto/batch.go b/proto/batch.go index d11913268830..5dc683dbfc44 100644 --- a/proto/batch.go +++ b/proto/batch.go @@ -95,6 +95,8 @@ func (ba *BatchRequest) IsRange() bool { // GetArg returns the first request of the given type, if possible. func (ba *BatchRequest) GetArg(method Method) (Request, bool) { + // TODO(tschottdorf): when looking for EndTransaction, just look at the + // last entry. for _, arg := range ba.Requests { if req := arg.GetInner(); req.Method() == method { return req, true diff --git a/storage/replica_command.go b/storage/replica_command.go index c5645c60e77a..238db0eea2f6 100644 --- a/storage/replica_command.go +++ b/storage/replica_command.go @@ -1176,8 +1176,7 @@ func (r *Replica) AdminSplit(args proto.AdminSplitRequest, desc *proto.RangeDesc // End the transaction manually, instead of letting RunTransaction // loop do it, in order to provide a split trigger. b.InternalAddRequest(&proto.EndTransactionRequest{ - RequestHeader: proto.RequestHeader{Key: newDesc.StartKey}, - Commit: true, + Commit: true, InternalCommitTrigger: &proto.InternalCommitTrigger{ SplitTrigger: &proto.SplitTrigger{ UpdatedDesc: updatedDesc, @@ -1365,8 +1364,7 @@ func (r *Replica) AdminMerge(args proto.AdminMergeRequest, origLeftDesc *proto.R // End the transaction manually instead of letting RunTransaction // loop do it, in order to provide a merge trigger. b.InternalAddRequest(&proto.EndTransactionRequest{ - RequestHeader: proto.RequestHeader{Key: updatedLeftDesc.StartKey}, - Commit: true, + Commit: true, InternalCommitTrigger: &proto.InternalCommitTrigger{ MergeTrigger: &proto.MergeTrigger{ UpdatedDesc: updatedLeftDesc, @@ -1517,8 +1515,7 @@ func (r *Replica) ChangeReplicas(changeType proto.ReplicaChangeType, replica pro // End the transaction manually instead of letting RunTransaction // loop do it, in order to provide a commit trigger. b.InternalAddRequest(&proto.EndTransactionRequest{ - RequestHeader: proto.RequestHeader{Key: updatedDesc.StartKey}, - Commit: true, + Commit: true, InternalCommitTrigger: &proto.InternalCommitTrigger{ ChangeReplicasTrigger: &proto.ChangeReplicasTrigger{ NodeID: replica.NodeID, diff --git a/storage/replica_test.go b/storage/replica_test.go index e304a5a4beba..7dd94c7acef4 100644 --- a/storage/replica_test.go +++ b/storage/replica_test.go @@ -900,7 +900,7 @@ func scanArgs(start, end []byte, rangeID proto.RangeID, storeID proto.StoreID) p func endTxnArgs(txn *proto.Transaction, commit bool, rangeID proto.RangeID, storeID proto.StoreID) proto.EndTransactionRequest { return proto.EndTransactionRequest{ RequestHeader: proto.RequestHeader{ - Key: txn.Key, + Key: txn.Key, // not allowed when going through TxnCoordSender, but we're not RangeID: rangeID, Replica: proto.Replica{StoreID: storeID}, Txn: txn,