Skip to content

Commit

Permalink
kv: fix issues around failed 1PC txns
Browse files Browse the repository at this point in the history
The recent cockroachdb#25541 changed the way "tracking" (the heartbeat loop and
intent collection) is initiated for transactions. It aimed to simplify
things and put the burden on the client to decide when a txn needs
tracking. This introduced a problem - the client.Txn was not initiating
tracking when sending 1PC batches. However, tracking is needed for these
transactions too: even though usually they'll succeed and so the
TxnCoordSender state can be quickly destroyed, when they fail their
intents and heartbeat loop need to be kept around just like for any
other txn.

This patch backtracks on the previous move to make it the client's
responsibility to initiate tracking (it didn't stand the test of time):
the client.Txn is no longer in charge of calling tcs.StartTracking().
Instead, the TCS does whatever needs to be done when it sees an
EndTransaction.

I also took the opportunity to spruce up comments on the TxnCoordSender.

Release note: None
  • Loading branch information
andreimatei committed Jun 7, 2018
1 parent a6a7113 commit 22554db
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 121 deletions.
14 changes: 6 additions & 8 deletions pkg/internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,15 +799,14 @@ func TestReadConsistencyTypes(t *testing.T) {
// Mock out DistSender's sender function to check the read consistency for
// outgoing BatchRequests and return an empty reply.
factory := client.TxnSenderFactoryFunc(func(client.TxnType) client.TxnSender {
return client.TxnSenderAdapter{
StartTrackingWrapped: func(context.Context) error { panic("unimplemented") },
Wrapped: func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
return client.TxnSenderFunc(
func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
if ba.ReadConsistency != rc {
return nil, roachpb.NewErrorf("BatchRequest has unexpected ReadConsistency %s", ba.ReadConsistency)
}
return ba.CreateReply(), nil
},
}
)
})

clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
Expand Down Expand Up @@ -976,12 +975,11 @@ func TestNodeIDAndObservedTimestamps(t *testing.T) {
// Mock out sender function to check that created transactions
// have the observed timestamp set for the configured node ID.
factory := client.TxnSenderFactoryFunc(func(client.TxnType) client.TxnSender {
return client.TxnSenderAdapter{
StartTrackingWrapped: func(context.Context) error { panic("unimplemented") },
Wrapped: func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
return client.TxnSenderFunc(
func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
return ba.CreateReply(), nil
},
}
)
})

clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
Expand Down
30 changes: 9 additions & 21 deletions pkg/internal/client/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,6 @@ type TxnSender interface {
// if this method is invoked multiple times, the most recent callback
// is the only one which will be invoked.
OnFinish(func(error))

// StartTracking starts a heartbeat loop and tracking of intents.
StartTracking(ctx context.Context) error
}

// TxnSenderFactory is the interface used to create new instances
Expand Down Expand Up @@ -108,37 +105,28 @@ func (f SenderFunc) Send(
return f(ctx, ba)
}

// TxnSenderAdapter is an adapter to allow the use of ordinary functions as
// TxnSenderFunc is an adapter to allow the use of ordinary functions as
// TxnSenders with GetMeta or AugmentMeta panicing with unimplemented. This is
// a helper mechanism to facilitate testing.
type TxnSenderAdapter struct {
Wrapped func(context.Context, roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error)
StartTrackingWrapped func(context.Context) error
}
type TxnSenderFunc func(
context.Context, roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error)

// Send calls f(ctx, c).
func (f TxnSenderAdapter) Send(
func (f TxnSenderFunc) Send(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
return f.Wrapped(ctx, ba)
return f(ctx, ba)
}

// GetMeta is part of the TxnSender interface.
func (f TxnSenderAdapter) GetMeta() roachpb.TxnCoordMeta { panic("unimplemented") }
func (f TxnSenderFunc) GetMeta() roachpb.TxnCoordMeta { panic("unimplemented") }

// AugmentMeta is part of the TxnSender interface.
func (f TxnSenderAdapter) AugmentMeta(context.Context, roachpb.TxnCoordMeta) { panic("unimplemented") }
func (f TxnSenderFunc) AugmentMeta(context.Context, roachpb.TxnCoordMeta) { panic("unimplemented") }

// OnFinish is part of the TxnSender interface.
func (f TxnSenderAdapter) OnFinish(_ func(error)) { panic("unimplemented") }

// StartTracking is part the TxnSender interface.
func (f TxnSenderAdapter) StartTracking(ctx context.Context) error {
if f.StartTrackingWrapped != nil {
return f.StartTrackingWrapped(ctx)
}
panic("unimplemented")
}
func (f TxnSenderFunc) OnFinish(_ func(error)) { panic("unimplemented") }

// TxnSenderFactoryFunc is an adapter to allow the use of ordinary functions
// as TxnSenderFactories. This is a helper mechanism to facilitate testing.
Expand Down
6 changes: 0 additions & 6 deletions pkg/internal/client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -972,12 +972,6 @@ func (txn *Txn) Send(
// begin transaction request before the first write command and update
// transaction state accordingly.
if needBeginTxn {
// Unless it's a 1PC, ask the TxnCoordSender to track the transaction.
if txn.mu.state == txnReadOnly && !haveEndTxn {
if err := txn.mu.sender.StartTracking(ctx); err != nil {
return roachpb.NewError(err)
}
}
// We're about to send a BeginTxn, so move to the Writing state.
txn.mu.state = txnWriting
// From now on, all requests need to be checked against the AbortCache on
Expand Down
7 changes: 3 additions & 4 deletions pkg/internal/client/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,8 @@ func newTestTxnFactory(
createReply func(roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error),
) TxnSenderFactoryFunc {
return TxnSenderFactoryFunc(func(TxnType) TxnSender {
return TxnSenderAdapter{
StartTrackingWrapped: func(context.Context) error { return nil },
Wrapped: func(_ context.Context, ba roachpb.BatchRequest,
return TxnSenderFunc(
func(_ context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
if ba.UserPriority == 0 {
ba.UserPriority = 1
Expand Down Expand Up @@ -137,7 +136,7 @@ func newTestTxnFactory(
}
return br, pErr
},
}
)
})
}

Expand Down
102 changes: 68 additions & 34 deletions pkg/kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,45 @@ const (
aborted
)

// A TxnCoordSender is an implementation of client.Sender which wraps
// a lower-level Sender (either a storage.Stores or a DistSender) to
// which it sends commands. It acts as a man-in-the-middle,
// coordinating transaction state for clients. Unlike other senders,
// the TxnCoordSender is stateful and holds information about an
// ongoing transaction. Among other things, it records the intent
// spans of keys mutated by the transaction for later
// resolution.
// A TxnCoordSender is the production implementation of client.TxnSender. It is
// a Sender which wraps a lower-level Sender (a DistSender) to which it sends
// commands. It works on behalf of the client to keep a transaction's state
// (e.g. intents) and to perform periodic heartbeating of the transaction
// required when necessary. Unlike other senders, TxnCoordSender is not a
// singleton - an instance is created for every transaction by the
// TxnCoordSenderFactory.
//
// After a transaction has begun writing, the TxnCoordSender may start
// sending periodic heartbeat messages to that transaction's txn
// record, to keep it live. Note that heartbeating is done only from
// the root transaction coordinator, in the event that multiple
// Among the functions it performs are:
// - Heartbeating of the transaction record. Note that heartbeating is done only
// from the root transaction coordinator, in the event that multiple
// coordinators are active (i.e. in a distributed SQL flow).
// - Accumulating intent spans.
// - Attaching intent spans to EndTransaction requests, for intent cleanup.
// - Handles retriable errors by either bumping the transaction's epoch or, in
// case of TransactionAbortedErrors, cleaning up the transaction (in this case,
// the client.Txn is expected to create a new TxnCoordSender instance
// transparently for the higher-level client).
// - Ensures atomic execution for non-transactional (write) batches by transparently
// wrapping them in transactions when the DistSender is forced to split them for
// multiple ranges. For this reason, generally even non-transactional batches
// need to be sent through a TxnCoordSender.
//
// Since it is stateful, the TxnCoordSender needs to understand when a
// transaction is "finished" and the state can be destroyed. As such there's a
// contract that the client.Txn needs obey. Read-only transactions don't matter
// - they're stateless. For the others, once a BeginTransaction is sent by the
// client, the TxnCoordSender considers the transactions completed in the
// following situations:
// - A batch containing an EndTransactions (commit or rollback) succeeds.
// - A batch containing an EndTransaction(commit=false) succeeds or fails. I.e.
// nothing is expected to follow a rollback attempt.
// - A batch returns a TransactionAbortedError. As mentioned above, the client
// is expected to create a new TxnCoordSender for the next transaction attempt.
//
// Note that "1PC" batches (i.e. batches containing both a Begin and an
// EndTransaction) are no exception from the contract - if the batch fails, the
// client is expected to send a rollback (or perform another transaction attempt
// in case of retriable errors).
type TxnCoordSender struct {
mu struct {
syncutil.Mutex
Expand Down Expand Up @@ -127,7 +152,7 @@ type TxnCoordSender struct {
// transaction was instantiated.
firstUpdateNanos int64
// txnEnd is closed when the transaction is aborted or committed,
// terminating the associated heartbeat instance.
// terminating the heartbeat loop.
txnEnd chan struct{}
// state indicates the state of the transaction coordinator, which
// may briefly diverge from the state of the transaction record if
Expand Down Expand Up @@ -175,6 +200,9 @@ var (
metaCommitsRates = metric.Metadata{
Name: "txn.commits",
Help: "Number of committed KV transactions (including 1PC)"}
// NOTE: The 1PC rate is arguably not accurate because it counts batches
// containing both BeginTransaction and EndTransaction without caring if the
// DistSender had to split it for touching multiple ranges.
metaCommits1PCRates = metric.Metadata{
Name: "txn.commits1PC",
Help: "Number of committed one-phase KV transactions"}
Expand Down Expand Up @@ -412,6 +440,23 @@ func (tc *TxnCoordSender) Send(
txnIDStr := txnID.String()
sp.SetBaggageItem("txnID", txnIDStr)

_, hasBegin := ba.GetArg(roachpb.BeginTransaction)
if hasBegin {
// If there's a BeginTransaction, we need to start the heartbeat loop and
// intent tracking.
// Perhaps surprisingly, this needs to be done even if the batch has both
// a BeginTransaction and an EndTransaction. Although on batch success the
// heartbeat loop will be stopped right away, on error we might need both
// the intents and the heartbeat loop:
// - on retriable error, we need to keep around the intents for cleanup in
// subsequent epochs.
// - on non-retriable error, we need to keep around the intents as the
// client is expected to send an EndTransaction(commit=false) to cleanup.
if err := tc.startTracking(ctx); err != nil {
return nil, roachpb.NewError(err)
}
}

var et *roachpb.EndTransactionRequest
var hasET bool
{
Expand Down Expand Up @@ -568,6 +613,11 @@ func (tc *TxnCoordSender) Send(
if br.Txn.Status != roachpb.PENDING {
tc.mu.Lock()
tc.mu.meta.Txn = br.Txn.Clone()
_, hasBT := ba.GetArg(roachpb.BeginTransaction)
onePC := br.Txn.Status == roachpb.COMMITTED && hasBT
if onePC {
tc.metrics.Commits1PC.Inc(1)
}
tc.cleanupTxnLocked(ctx, done)
tc.mu.Unlock()
}
Expand Down Expand Up @@ -975,13 +1025,12 @@ func (tc *TxnCoordSender) heartbeatLoop(ctx context.Context) {
defer func() {
tc.mu.Lock()
if tc.mu.txnEnd != nil {
close(tc.mu.txnEnd)
tc.mu.txnEnd = nil
}
duration, restarts, status := tc.finalTxnStatsLocked()
tc.mu.tracking = false
tc.mu.Unlock()
tc.updateStats(duration, restarts, status, false)
tc.updateStats(duration, restarts, status)
}()

var closer <-chan struct{}
Expand Down Expand Up @@ -1183,8 +1232,8 @@ func (tc *TxnCoordSender) heartbeat(ctx context.Context) bool {
return true
}

// StartTracking is part of the client.TxnSender interface.
func (tc *TxnCoordSender) StartTracking(ctx context.Context) error {
// startTracking starts a heartbeat loop and tracking of intents.
func (tc *TxnCoordSender) startTracking(ctx context.Context) error {
tc.mu.Lock()
defer tc.mu.Unlock()

Expand All @@ -1210,7 +1259,7 @@ func (tc *TxnCoordSender) StartTracking(ctx context.Context) error {
// In principle, we can relax this as needed though.
tc.cleanupTxnLocked(ctx, aborted)
duration, restarts, status := tc.finalTxnStatsLocked()
tc.updateStats(duration, restarts, status, false /* onePC */)
tc.updateStats(duration, restarts, status)
return err
}
return nil
Expand Down Expand Up @@ -1376,16 +1425,6 @@ func (tc *TxnCoordSender) updateState(
// intents blocking concurrent writers for extended periods of time.
// See #3346.
tc.appendAndCondenseIntentsLocked(ctx, ba, br)
} else {
// If this was a successful one phase commit, update stats
// directly as they won't otherwise be updated on heartbeat
// loop shutdown.
_, isBeginning := ba.GetArg(roachpb.BeginTransaction)
_, isEnding := ba.GetArg(roachpb.EndTransaction)
if pErr == nil && isBeginning && isEnding {
etArgs, ok := br.Responses[len(br.Responses)-1].GetInner().(*roachpb.EndTransactionResponse)
tc.updateStats(tc.clock.PhysicalNow()-startNS, 0, newTxn.Status, ok && etArgs.OnePhaseCommit)
}
}

// Update our record of this transaction, even on error.
Expand Down Expand Up @@ -1443,9 +1482,7 @@ func (tc *TxnCoordSender) resendWithTxn(
}

// updateStats updates transaction metrics after a transaction finishes.
func (tc *TxnCoordSender) updateStats(
duration, restarts int64, status roachpb.TransactionStatus, onePC bool,
) {
func (tc *TxnCoordSender) updateStats(duration, restarts int64, status roachpb.TransactionStatus) {
tc.metrics.Durations.RecordValue(duration)
tc.metrics.Restarts.RecordValue(restarts)
switch status {
Expand All @@ -1455,8 +1492,5 @@ func (tc *TxnCoordSender) updateStats(
tc.metrics.Abandons.Inc(1)
case roachpb.COMMITTED:
tc.metrics.Commits.Inc(1)
if onePC {
tc.metrics.Commits1PC.Inc(1)
}
}
}
Loading

0 comments on commit 22554db

Please sign in to comment.