Skip to content

Commit

Permalink
kv,client,storage: rationalize TxnCoordSender/client.Txn redundant st…
Browse files Browse the repository at this point in the history
…ates

client.Txn and TCS try to maintain a bunch of logically-redundant state
about whether a transaction is "writing" - essentially whether an
EndTransaction needs to be sent to cleanup up the TCS heartbeat loop and
the server's txn record.
The logic that both parties used for this was complex (e.g. it involved
updates in both Txn and TCS both on the outgoing path and on the
returning path of a batch) and not in sync - sometimes the TCS would
consider the txn as "writing" and the client.Txn wouldn't (e.g. in case
the first writing batch got an ambiguous error).

This patch simplifies things: the idea is that, if a BeginTxn has been
sent, an EndTransaction needs to be sent, period. The client.Txn thus
only keeps track of whether a BeginTxn was sent (except for a 1PC
batch), and it takes charge of starting the TCS' heartbeat loop (by
instructing it explicitly directly to start it before the BeginTxn is
sent). The TCS is no longer burdened with maintaining any state about
whether there is a txn record or not.

As a byproduct, the proto Transaction.Writing flag, which used to have
an unclear meaning, becomes straight forward: if set, the server needs
to check batches against the abort cache. The client is the only one
setting it, the server is the only one checking it. It used to be used
for different purposeses by both the client and server.

Release note: none
  • Loading branch information
andreimatei committed May 16, 2018
1 parent 6f4e9b2 commit 990ca52
Show file tree
Hide file tree
Showing 20 changed files with 338 additions and 251 deletions.
4 changes: 2 additions & 2 deletions pkg/internal/client/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ func (b *Batch) fillResults(ctx context.Context) {
// instead; this effectively just leaks here.
// TODO(tschottdorf): returning an error here seems
// to get swallowed.
panic(errors.Errorf("not enough responses for calls: %+v, %+v",
b.reqs, b.response))
panic(errors.Errorf("not enough responses for calls: (%T) %+v\nresponses: %+v",
args, args, b.response))
}
}
}
Expand Down
29 changes: 18 additions & 11 deletions pkg/internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,14 +797,18 @@ func TestReadConsistencyTypes(t *testing.T) {
t.Run(rc.String(), func(t *testing.T) {
// Mock out DistSender's sender function to check the read consistency for
// outgoing BatchRequests and return an empty reply.
factory := client.TxnSenderFactoryFunc(func(_ client.TxnType) client.TxnSender {
return client.TxnSenderFunc(func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
if ba.ReadConsistency != rc {
return nil, roachpb.NewErrorf("BatchRequest has unexpected ReadConsistency %s", ba.ReadConsistency)
}
return ba.CreateReply(), nil
})
factory := client.TxnSenderFactoryFunc(func(client.TxnType) client.TxnSender {
return client.TxnSenderFunc{
StartTrackingWrapped: func(context.Context) error { panic("unimplemented") },
Wrapped: func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
if ba.ReadConsistency != rc {
return nil, roachpb.NewErrorf("BatchRequest has unexpected ReadConsistency %s", ba.ReadConsistency)
}
return ba.CreateReply(), nil
},
}
})

clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
db := client.NewDB(factory, clock)
ctx := context.TODO()
Expand Down Expand Up @@ -970,10 +974,13 @@ func TestNodeIDAndObservedTimestamps(t *testing.T) {

// Mock out sender function to check that created transactions
// have the observed timestamp set for the configured node ID.
factory := client.TxnSenderFactoryFunc(func(_ client.TxnType) client.TxnSender {
return client.TxnSenderFunc(func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
return ba.CreateReply(), nil
})
factory := client.TxnSenderFactoryFunc(func(client.TxnType) client.TxnSender {
return client.TxnSenderFunc{
StartTrackingWrapped: func(context.Context) error { panic("unimplemented") },
Wrapped: func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
return ba.CreateReply(), nil
},
}
})

clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
Expand Down
18 changes: 16 additions & 2 deletions pkg/internal/client/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ type TxnSender interface {
// if this method is invoked multiple times, the most recent callback
// is the only one which will be invoked.
OnFinish(func(error))

// StartTracking starts a heartbeat loop and tracking of intents.
StartTracking(ctx context.Context) error
}

// TxnSenderFactory is the interface used to create new instances
Expand Down Expand Up @@ -106,13 +109,16 @@ func (f SenderFunc) Send(
// TxnSenderFunc is an adapter to allow the use of ordinary functions
// as TxnSenders with GetMeta or AugmentMeta panicing with unimplemented.
// This is a helper mechanism to facilitate testing.
type TxnSenderFunc func(context.Context, roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error)
type TxnSenderFunc struct {
Wrapped func(context.Context, roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error)
StartTrackingWrapped func(context.Context) error
}

// Send calls f(ctx, c).
func (f TxnSenderFunc) Send(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
return f(ctx, ba)
return f.Wrapped(ctx, ba)
}

// GetMeta is part of the TxnSender interface.
Expand All @@ -124,6 +130,14 @@ func (f TxnSenderFunc) AugmentMeta(_ roachpb.TxnCoordMeta) { panic("unimplemente
// OnFinish is part of the TxnSender interface.
func (f TxnSenderFunc) OnFinish(_ func(error)) { panic("unimplemented") }

// StartTracking is part the TxnSender interface.
func (f TxnSenderFunc) StartTracking(ctx context.Context) error {
if f.StartTrackingWrapped != nil {
return f.StartTrackingWrapped(ctx)
}
panic("unimplemented")
}

// TxnSenderFactoryFunc is an adapter to allow the use of ordinary functions
// as TxnSenderFactories. This is a helper mechanism to facilitate testing.
type TxnSenderFactoryFunc func(TxnType) TxnSender
Expand Down
133 changes: 111 additions & 22 deletions pkg/internal/client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,7 @@ type Txn struct {
// txnAnchorKey is the key at which to anchor the transaction record. If
// unset, the first key written in the transaction will be used.
txnAnchorKey roachpb.Key
// writingTxnRecord is set when the Txn is in the middle of writing
// its transaction record. It is used to assure that even in the presence
// of concurrent requests, only one sends a BeginTxnRequest.
writingTxnRecord bool
state txnState
// see IsFinalized()
finalized bool
// previousIDs holds the set of all previous IDs that the Txn's Proto has
Expand All @@ -99,6 +96,36 @@ type Txn struct {
}
}

// txnState represents states relating to whether Begin/EndTxn requests need to
// be sent.
type txnState int

const (
// txnReadOnly means that the transaction never sent any writes. There's no
// transaction record, so an EndTxn does not need to be sent.
txnReadOnly txnState = iota
// txnWriting means that the transaction has sent some writes (and so it also
// sent a BeginTxn). An EndTransaction must be sent to resolve intents and/or
// to cleanup the txn record.
// txnWriting does not guarantee that the transaction record has been written.
// In case the BeginTxn batch encoutered an error, it might not have been. In
// this case, a rollback will get an error (ignored by SQL).
txnWriting
// txnWriteInOldEpoch means that the txn has been writing in an old epoch,
// but then restarted with a new epoch, and there have been no writes sent
// since then. This means that an EndTransaction(commit=false) needs to be
// sent to clean up intents. It also means that a BeginTransaction needs to be
// sent on the first write: the TransactionRestartError might have been
// received by the batch with the BeginTransaction in it, in which case there
// is no transaction record (and so it needs to be created).
// We could be smarter about not transitioning to this state if there's ever
// been a successful write (in which case we know that there is a txn record
// and a BeginTransaction is not necessary) but as of May 2018 we don't do
// that. Note that the server accepts a BeginTxn with a higher epoch if a
// transaction record already exists.
txnWriteInOldEpoch
)

// NewTxn returns a new txn. The typ parameter specifies whether this
// transaction is the top level (root), or one of potentially many
// distributed transactions (leaf).
Expand Down Expand Up @@ -298,7 +325,7 @@ func (txn *Txn) CommitTimestamp() hlc.Timestamp {
func (txn *Txn) SetTxnAnchorKey(key roachpb.Key) error {
txn.mu.Lock()
defer txn.mu.Unlock()
if txn.mu.Proto.Writing || txn.mu.writingTxnRecord {
if txn.mu.state != txnReadOnly {
return errors.Errorf("transaction anchor key already set")
}
txn.mu.txnAnchorKey = key
Expand Down Expand Up @@ -596,6 +623,14 @@ func (txn *Txn) Rollback(ctx context.Context) error {
}

func (txn *Txn) rollback(ctx context.Context) *roachpb.Error {
// TODO(andrei): It's common for rollbacks to fail with
// TransactionStatusError: txn record not found (REASON_TXN_NOT_FOUND),
// in case the txn record was never written (e.g. if a 1PC failed). One would
// think that, depending on the error received by a 1PC batch, we'd know if a
// txn record was written and, if it wasn't, we could short-circuit the
// rollback. There's two tricky things about that, though: a) we'd need to
// know whether the DistSender has split what looked like a 1PC batch to the
// client and b) ambiguous errors.
log.VEventf(ctx, 2, "rolling back transaction")
return txn.sendEndTxnReq(ctx, false /* commit */, nil)
}
Expand All @@ -614,19 +649,46 @@ func (txn *Txn) OnFinish(onFinishFn func(error)) {
txn.mu.sender.OnFinish(onFinishFn)
}

// readOnlyRes is returned by maybeFinishReadonly, informing the caller of what
// cleanup needs to take place.
type readOnlyRes int

const (
// readOnlyOK means that the transaction was read-only, and no cleanup is
// necessary.
readOnlyOK readOnlyRes = iota
// writing means that a BeginTransaction had been sent, so an EndTransaction
// needs to be sent too.
writing
// oldEpochWrite means that the current epoch of the txn has been read-only,
// but an older epoch had written. An EndTransaction(commit=false) needs to be
// sent.
oldEpochWrite
)

// maybeFinishReadonly provides a fast-path for finishing a read-only
// transaction without going through the overhead of creating an
// EndTransactionRequest only to not send it.
// EndTransactionRequest only to not send it. The return value informs the
// caller of what cleanup action needs to be taken.
//
// NB: The logic here must be kept in sync with the logic in txn.Send.
//
// TODO(andrei): Can we share this code with txn.Send?
func (txn *Txn) maybeFinishReadonly(commit bool, deadline *hlc.Timestamp) (bool, *roachpb.Error) {
func (txn *Txn) maybeFinishReadonly(
ctx context.Context, commit bool, deadline *hlc.Timestamp,
) (readOnlyRes, *roachpb.Error) {
txn.mu.Lock()
defer txn.mu.Unlock()
if txn.mu.Proto.Writing || txn.mu.writingTxnRecord {
return false, nil
switch txn.mu.state {
case txnWriting:
return writing, nil
case txnWriteInOldEpoch:
return oldEpochWrite, nil
case txnReadOnly:
default:
log.Fatalf(ctx, "unknown state: %d", txn.mu.state)
}

txn.mu.finalized = true
// Check that read only transactions do not violate their deadline. This can NOT
// happen since the txn deadline is normally updated when it is about to expire
Expand All @@ -644,26 +706,39 @@ func (txn *Txn) maybeFinishReadonly(commit bool, deadline *hlc.Timestamp) (bool,
// 4. new timestamp violates deadline
// 5. txn retries the read
// 6. commit fails - only thanks to this code path?
return false, roachpb.NewErrorWithTxn(roachpb.NewTransactionStatusError(
return readOnlyOK /* ignored */, roachpb.NewErrorWithTxn(roachpb.NewTransactionStatusError(
"deadline exceeded before transaction finalization"), &txn.mu.Proto)
}
if commit {
txn.mu.Proto.Status = roachpb.COMMITTED
} else {
txn.mu.Proto.Status = roachpb.ABORTED
}
return true, nil
return readOnlyOK, nil
}

func (txn *Txn) sendEndTxnReq(
ctx context.Context, commit bool, deadline *hlc.Timestamp,
) *roachpb.Error {
if ok, err := txn.maybeFinishReadonly(commit, deadline); ok || err != nil {
ret, err := txn.maybeFinishReadonly(ctx, commit, deadline)
if ret == readOnlyOK || err != nil {
return err
}
// If there was a write in an old epoch (and no writes in the current epoch),
// we need to send a rollback. We'll ignore the error from it; the commit is
// successful at this point.
if ret == oldEpochWrite {
if commit {
log.VEventf(ctx, 2, "state oldEpochWrite turning commit into rollback")
}
commit = false
}
var ba roachpb.BatchRequest
ba.Add(endTxnReq(commit, deadline, txn.systemConfigTrigger))
_, pErr := txn.Send(ctx, ba)
if pErr != nil && ret == oldEpochWrite {
return nil
}
return pErr
}

Expand Down Expand Up @@ -899,14 +974,27 @@ func (txn *Txn) Send(
txn.mu.active = true
}

needBeginTxn = !(txn.mu.Proto.Writing || txn.mu.writingTxnRecord) && haveTxnWrite
needEndTxn := txn.mu.Proto.Writing || txn.mu.writingTxnRecord || haveTxnWrite
needBeginTxn = haveTxnWrite && (txn.mu.state != txnWriting)
// We need the EndTxn if we're ever written before or if we're writing now.
needEndTxn := txn.mu.state != txnReadOnly || haveTxnWrite
elideEndTxn = haveEndTxn && !needEndTxn

// If we're not yet writing in this txn, but intend to, insert a
// begin transaction request before the first write command and update
// transaction state accordingly.
if needBeginTxn {
// Unless it's a 1PC, ask the TxnCoordSender to track the transaction.
if txn.mu.state == txnReadOnly && !haveEndTxn {
if err := txn.mu.sender.StartTracking(ctx); err != nil {
return roachpb.NewError(err)
}
}
// We're about to send a BeginTxn, so move to the Writing state.
txn.mu.state = txnWriting
// From now on, all requests need to be checked against the AbortCache on
// the server side.
txn.mu.Proto.Writing = true

// Set txn key based on the key of the first transactional write if
// not already set. If the transaction already has a key (we're in a
// restart), make sure we keep the anchor key the same.
Expand All @@ -930,9 +1018,6 @@ func (txn *Txn) Send(
copy(ba.Requests, oldRequests[:firstWriteIdx])
ba.Requests[firstWriteIdx].MustSetInner(bt)
copy(ba.Requests[firstWriteIdx+1:], oldRequests[firstWriteIdx:])
// We're going to be writing the transaction record by sending the
// begin transaction request.
txn.mu.writingTxnRecord = true
}

if elideEndTxn {
Expand All @@ -957,9 +1042,7 @@ func (txn *Txn) Send(
txn.mu.Lock()
defer txn.mu.Unlock()

// If we inserted a begin transaction request, remove it here. We also
// unset the flag writingTxnRecord flag in case another ever needs to
// be sent again (for instance, if we're aborted and need to restart).
// If we inserted a begin transaction request, remove it here.
if needBeginTxn {
if br != nil && br.Responses != nil {
br.Responses = append(br.Responses[:firstWriteIdx], br.Responses[firstWriteIdx+1:]...)
Expand All @@ -975,8 +1058,6 @@ func (txn *Txn) Send(
pErr.SetErrorIndex(idx - 1)
}
}

txn.mu.writingTxnRecord = false
}
if haveEndTxn {
if pErr == nil || !endTxnRequest.Commit {
Expand Down Expand Up @@ -1148,6 +1229,8 @@ func (txn *Txn) updateStateOnRetryableErrLocked(
// attempt. The txn inside pErr was correctly prepared for this by
// TxnCoordSender.
txn.mu.Proto = *newTxn
// We're starting a fresh transaction, with no state.
txn.mu.state = txnReadOnly

// Create a new txn sender.
txn.mu.sender = txn.db.factory.New(txn.typ)
Expand All @@ -1159,6 +1242,12 @@ func (txn *Txn) updateStateOnRetryableErrLocked(
// we rely on the associativity of Transaction.Update to sort out this
// lack of ordering guarantee.
txn.mu.Proto.Update(newTxn)

// If we've been writing, we'll need to send a BeginTxn with the next
// request.
if txn.mu.state == txnWriting {
txn.mu.state = txnWriteInOldEpoch
}
}
}

Expand Down
Loading

0 comments on commit 990ca52

Please sign in to comment.