Skip to content

Commit

Permalink
Merge #25541
Browse files Browse the repository at this point in the history
25541: kv,client,storage: rationalize TxnCoordSender/client.Txn redundant states r=andreimatei a=andreimatei

client.Txn and TCS try to maintain a bunch of logically-redundant state
about whether a transaction is "writing" - essentially whether an
EndTransaction needs to be sent to cleanup up the TCS heartbeat loop and
the server's txn record.
The logic that both parties used for this was complex (e.g. it involved
updates in both Txn and TCS both on the outgoing path and on the
returning path of a batch) and not in sync - sometimes the TCS would
consider the txn as "writing" and the client.Txn wouldn't (e.g. in case
the first writing batch got an ambiguous error).

This patch simplifies things: the idea is that, if a BeginTxn has been
sent, an EndTransaction needs to be sent, period. The client.Txn thus
only keeps track of whether a BeginTxn was sent (except for a 1PC
batch), and it takes charge of starting the TCS' heartbeat loop (by
instructing it explicitly directly to start it before the BeginTxn is
sent). The TCS is no longer burdened with maintaining any state about
whether there is a txn record or not.

As a byproduct, the proto Transaction.Writing flag, which used to have
an unclear meaning, becomes straight forward: if set, the server needs
to check batches against the abort cache. The client is the only one
setting it, the server is the only one checking it. It used to be used
for different purposeses by both the client and server.

Release note: none

Co-authored-by: Andrei Matei <[email protected]>
  • Loading branch information
craig[bot] and andreimatei committed Jun 5, 2018
2 parents 941cdc4 + 823438f commit 1893774
Show file tree
Hide file tree
Showing 34 changed files with 844 additions and 529 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen in the /debug page</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set.</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>2.0-5</code></td><td>set the active cluster version in the format '<major>.<minor>'.</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>2.0-6</code></td><td>set the active cluster version in the format '<major>.<minor>'.</td></tr>
</tbody>
</table>
4 changes: 2 additions & 2 deletions pkg/cmd/roachtest/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func registerTPCCBench(r *registry) {

LoadWarehouses: 1000,
EstimatedMax: 325,
StoreDirVersion: "2.0-5",
StoreDirVersion: "2.0-6",
},
{
Nodes: 3,
Expand All @@ -394,7 +394,7 @@ func registerTPCCBench(r *registry) {

LoadWarehouses: 5000,
EstimatedMax: 500,
StoreDirVersion: "2.0-5",
StoreDirVersion: "2.0-6",
},
// objective 3, key result 2.
{
Expand Down
4 changes: 2 additions & 2 deletions pkg/internal/client/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ func (b *Batch) fillResults(ctx context.Context) {
// instead; this effectively just leaks here.
// TODO(tschottdorf): returning an error here seems
// to get swallowed.
panic(errors.Errorf("not enough responses for calls: %+v, %+v",
b.reqs, b.response))
panic(errors.Errorf("not enough responses for calls: (%T) %+v\nresponses: %+v",
args, args, b.response))
}
}
}
Expand Down
29 changes: 18 additions & 11 deletions pkg/internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,14 +798,18 @@ func TestReadConsistencyTypes(t *testing.T) {
t.Run(rc.String(), func(t *testing.T) {
// Mock out DistSender's sender function to check the read consistency for
// outgoing BatchRequests and return an empty reply.
factory := client.TxnSenderFactoryFunc(func(_ client.TxnType) client.TxnSender {
return client.TxnSenderFunc(func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
if ba.ReadConsistency != rc {
return nil, roachpb.NewErrorf("BatchRequest has unexpected ReadConsistency %s", ba.ReadConsistency)
}
return ba.CreateReply(), nil
})
factory := client.TxnSenderFactoryFunc(func(client.TxnType) client.TxnSender {
return client.TxnSenderAdapter{
StartTrackingWrapped: func(context.Context) error { panic("unimplemented") },
Wrapped: func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
if ba.ReadConsistency != rc {
return nil, roachpb.NewErrorf("BatchRequest has unexpected ReadConsistency %s", ba.ReadConsistency)
}
return ba.CreateReply(), nil
},
}
})

clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
db := client.NewDB(factory, clock)
ctx := context.TODO()
Expand Down Expand Up @@ -971,10 +975,13 @@ func TestNodeIDAndObservedTimestamps(t *testing.T) {

// Mock out sender function to check that created transactions
// have the observed timestamp set for the configured node ID.
factory := client.TxnSenderFactoryFunc(func(_ client.TxnType) client.TxnSender {
return client.TxnSenderFunc(func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
return ba.CreateReply(), nil
})
factory := client.TxnSenderFactoryFunc(func(client.TxnType) client.TxnSender {
return client.TxnSenderAdapter{
StartTrackingWrapped: func(context.Context) error { panic("unimplemented") },
Wrapped: func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
return ba.CreateReply(), nil
},
}
})

clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
Expand Down
19 changes: 15 additions & 4 deletions pkg/internal/client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,22 @@ type DB struct {
ctx DBContext
}

// GetSender returns a transaction-capable Sender instance. The same
// sender must be used for the entirety of a transaction. Get a new
// instance to start a new transaction.
// GetSender returns a Sender that can be used to send requests through.
// Note that a new Sender created; it is not shared.
//
// The Sender returned should not be used for sending transactional requests.
// Use db.Txn() or db.NewTxn() for that.
func (db *DB) GetSender() Sender {
return db.factory.New(RootTxn)
// We pass nil for the txn here because we don't have a txn on hand.
// That's why this method says to not use the Sender for transactional
// requests, plus the fact that if a Sender is used directly, the caller needs
// to be mindful of the need to start a heartbeat loop when writing.
//
// Note that even non-transactional requests need to go through a
// TxnCoordSender because batches that get split need to be wrapped in
// transactions (and the TxnCoordSender handles that). So we can't simply
// return the wrapped handler here.
return db.factory.New(RootTxn, nil /* txn */)
}

// GetFactory returns the DB's TxnSenderFactory.
Expand Down
48 changes: 32 additions & 16 deletions pkg/internal/client/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type TxnSender interface {
GetMeta() roachpb.TxnCoordMeta
// AugmentMeta combines the TxnCoordMeta from another distributed
// TxnSender which is part of the same transaction.
AugmentMeta(meta roachpb.TxnCoordMeta)
AugmentMeta(ctx context.Context, meta roachpb.TxnCoordMeta)
// OnFinish invokes the supplied closure when the sender has finished
// with the txn (i.e. it's been abandoned, aborted, or committed).
// The error passed is meant to indicate to an extant distributed
Expand All @@ -78,16 +78,21 @@ type TxnSender interface {
// if this method is invoked multiple times, the most recent callback
// is the only one which will be invoked.
OnFinish(func(error))

// StartTracking starts a heartbeat loop and tracking of intents.
StartTracking(ctx context.Context) error
}

// TxnSenderFactory is the interface used to create new instances
// of TxnSender.
type TxnSenderFactory interface {
// New returns a new instance of TxnSender. The typ parameter
// specifies whether the sender is the root or one of potentially
// many child "leaf" nodes in a tree of transaction objects, as is
// created during a DistSQL flow.
New(typ TxnType) TxnSender
// New returns a new instance of TxnSender.
// typ specifies whether the sender is the root or one of potentially many
// child "leaf" nodes in a tree of transaction objects, as is created during a
// DistSQL flow.
// txn is the transaction whose requests this sender will carry. It can be nil
// if the sender will not be used for transactional requests.
New(typ TxnType, txn *roachpb.Transaction) TxnSender
// WrappedSender returns the TxnSenderFactory's wrapped Sender.
WrappedSender() Sender
}
Expand All @@ -103,33 +108,44 @@ func (f SenderFunc) Send(
return f(ctx, ba)
}

// TxnSenderFunc is an adapter to allow the use of ordinary functions
// as TxnSenders with GetMeta or AugmentMeta panicing with unimplemented.
// This is a helper mechanism to facilitate testing.
type TxnSenderFunc func(context.Context, roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error)
// TxnSenderAdapter is an adapter to allow the use of ordinary functions as
// TxnSenders with GetMeta or AugmentMeta panicing with unimplemented. This is
// a helper mechanism to facilitate testing.
type TxnSenderAdapter struct {
Wrapped func(context.Context, roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error)
StartTrackingWrapped func(context.Context) error
}

// Send calls f(ctx, c).
func (f TxnSenderFunc) Send(
func (f TxnSenderAdapter) Send(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
return f(ctx, ba)
return f.Wrapped(ctx, ba)
}

// GetMeta is part of the TxnSender interface.
func (f TxnSenderFunc) GetMeta() roachpb.TxnCoordMeta { panic("unimplemented") }
func (f TxnSenderAdapter) GetMeta() roachpb.TxnCoordMeta { panic("unimplemented") }

// AugmentMeta is part of the TxnSender interface.
func (f TxnSenderFunc) AugmentMeta(_ roachpb.TxnCoordMeta) { panic("unimplemented") }
func (f TxnSenderAdapter) AugmentMeta(context.Context, roachpb.TxnCoordMeta) { panic("unimplemented") }

// OnFinish is part of the TxnSender interface.
func (f TxnSenderFunc) OnFinish(_ func(error)) { panic("unimplemented") }
func (f TxnSenderAdapter) OnFinish(_ func(error)) { panic("unimplemented") }

// StartTracking is part the TxnSender interface.
func (f TxnSenderAdapter) StartTracking(ctx context.Context) error {
if f.StartTrackingWrapped != nil {
return f.StartTrackingWrapped(ctx)
}
panic("unimplemented")
}

// TxnSenderFactoryFunc is an adapter to allow the use of ordinary functions
// as TxnSenderFactories. This is a helper mechanism to facilitate testing.
type TxnSenderFactoryFunc func(TxnType) TxnSender

// New calls f().
func (f TxnSenderFactoryFunc) New(typ TxnType) TxnSender {
func (f TxnSenderFactoryFunc) New(typ TxnType, _ *roachpb.Transaction) TxnSender {
return f(typ)
}

Expand Down
Loading

0 comments on commit 1893774

Please sign in to comment.