Skip to content

Commit

Permalink
no goroutine on single-batch txns
Browse files Browse the repository at this point in the history
fixes #2659
  • Loading branch information
tbg committed Sep 25, 2015
1 parent a911af8 commit 4c3d6fe
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 12 deletions.
29 changes: 17 additions & 12 deletions kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,11 +718,8 @@ func (tc *TxnCoordSender) updateState(ctx context.Context, ba proto.BatchRequest
// TODO(tschottdorf): already computed the intents prior to sending,
// consider re-using those.
if intents := ba.GetIntents(); len(intents) > 0 && err == nil {
// TODO(tschottdorf): avoid spawning one if EndTransaction is in
// the same batch.
if txnMeta == nil {
newTxn.Writing = true
trace.Event("coordinator spawns")
txnMeta = &txnMetadata{
txn: *newTxn,
keys: cache.NewIntervalCache(cache.Config{Policy: cache.CacheNone}),
Expand All @@ -732,15 +729,23 @@ func (tc *TxnCoordSender) updateState(ctx context.Context, ba proto.BatchRequest
txnEnd: make(chan struct{}),
}
tc.txns[id] = txnMeta
if !tc.stopper.RunAsyncTask(func() {
tc.heartbeatLoop(id)
}) {
// The system is already draining and we can't start the
// heartbeat. We refuse new transactions for now because
// they're likely not going to have all intents committed.
// In principle, we can relax this as needed though.
tc.unregisterTxnLocked(id)
return proto.NewError(&proto.NodeUnavailableError{})
if _, isEnding := ba.GetArg(proto.EndTransaction); !isEnding {
trace.Event("coordinator spawns")
if !tc.stopper.RunAsyncTask(func() {
tc.heartbeatLoop(id)
}) {
// The system is already draining and we can't start the
// heartbeat. We refuse new transactions for now because
// they're likely not going to have all intents committed.
// In principle, we can relax this as needed though.
tc.unregisterTxnLocked(id)
return proto.NewError(&proto.NodeUnavailableError{})
}
} else {
// We omit starting a coordinator since the txn just ended
// anyway. This means we need to do the cleanup that the
// heartbeat would've carried out otherwise.
defer tc.unregisterTxnLocked(id)
}
}
for _, intent := range intents {
Expand Down
31 changes: 31 additions & 0 deletions kv/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,3 +706,34 @@ func TestTxnMultipleCoord(t *testing.T) {
}
}
}

// TestTxnCoordSenderSingleRoundtripTxn checks that a batch which completely
// holds the writing portion of a Txn (including EndTransaction) does not
// launch a heartbeat goroutine at all.
func TestTxnCoordSenderSingleRoundtripTxn(t *testing.T) {
defer leaktest.AfterTest(t)
stopper := stop.NewStopper()
manual := hlc.NewManualClock(0)
clock := hlc.NewClock(manual.UnixNano)
clock.SetMaxOffset(20)

ts := NewTxnCoordSender(senderFn(func(_ context.Context, ba proto.BatchRequest) (*proto.BatchResponse, *proto.Error) {
return ba.CreateReply().(*proto.BatchResponse), nil
}), clock, false, nil, stopper)

// Stop the stopper manually, prior to trying the transaction. This has the
// effect of returning a NodeUnavailableError for any attempts at launching
// a heartbeat goroutine.
stopper.Stop()

var ba proto.BatchRequest
put := &proto.PutRequest{}
put.Key = proto.Key("test")
ba.Add(put)
ba.Add(&proto.EndTransactionRequest{})
ba.Txn = &proto.Transaction{Name: "test"}
_, pErr := ts.Send(context.Background(), ba)
if pErr != nil {
t.Fatal(pErr)
}
}
2 changes: 2 additions & 0 deletions proto/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ func (ba *BatchRequest) IsRange() bool {

// GetArg returns the first request of the given type, if possible.
func (ba *BatchRequest) GetArg(method Method) (Request, bool) {
// TODO(tschottdorf): when looking for EndTransaction, just look at the
// last entry.
for _, arg := range ba.Requests {
if req := arg.GetInner(); req.Method() == method {
return req, true
Expand Down

0 comments on commit 4c3d6fe

Please sign in to comment.