Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv,client,storage: rationalize TxnCoordSender/client.Txn redundant states #25541

Merged
merged 5 commits into from
Jun 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen in the /debug page</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set.</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>2.0-5</code></td><td>set the active cluster version in the format '<major>.<minor>'.</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>2.0-6</code></td><td>set the active cluster version in the format '<major>.<minor>'.</td></tr>
</tbody>
</table>
4 changes: 2 additions & 2 deletions pkg/cmd/roachtest/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func registerTPCCBench(r *registry) {

LoadWarehouses: 1000,
EstimatedMax: 325,
StoreDirVersion: "2.0-5",
StoreDirVersion: "2.0-6",
},
{
Nodes: 3,
Expand All @@ -394,7 +394,7 @@ func registerTPCCBench(r *registry) {

LoadWarehouses: 5000,
EstimatedMax: 500,
StoreDirVersion: "2.0-5",
StoreDirVersion: "2.0-6",
},
// objective 3, key result 2.
{
Expand Down
4 changes: 2 additions & 2 deletions pkg/internal/client/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ func (b *Batch) fillResults(ctx context.Context) {
// instead; this effectively just leaks here.
// TODO(tschottdorf): returning an error here seems
// to get swallowed.
panic(errors.Errorf("not enough responses for calls: %+v, %+v",
b.reqs, b.response))
panic(errors.Errorf("not enough responses for calls: (%T) %+v\nresponses: %+v",
args, args, b.response))
}
}
}
Expand Down
29 changes: 18 additions & 11 deletions pkg/internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,14 +798,18 @@ func TestReadConsistencyTypes(t *testing.T) {
t.Run(rc.String(), func(t *testing.T) {
// Mock out DistSender's sender function to check the read consistency for
// outgoing BatchRequests and return an empty reply.
factory := client.TxnSenderFactoryFunc(func(_ client.TxnType) client.TxnSender {
return client.TxnSenderFunc(func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
if ba.ReadConsistency != rc {
return nil, roachpb.NewErrorf("BatchRequest has unexpected ReadConsistency %s", ba.ReadConsistency)
}
return ba.CreateReply(), nil
})
factory := client.TxnSenderFactoryFunc(func(client.TxnType) client.TxnSender {
return client.TxnSenderAdapter{
StartTrackingWrapped: func(context.Context) error { panic("unimplemented") },
Wrapped: func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
if ba.ReadConsistency != rc {
return nil, roachpb.NewErrorf("BatchRequest has unexpected ReadConsistency %s", ba.ReadConsistency)
}
return ba.CreateReply(), nil
},
}
})

clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
db := client.NewDB(factory, clock)
ctx := context.TODO()
Expand Down Expand Up @@ -971,10 +975,13 @@ func TestNodeIDAndObservedTimestamps(t *testing.T) {

// Mock out sender function to check that created transactions
// have the observed timestamp set for the configured node ID.
factory := client.TxnSenderFactoryFunc(func(_ client.TxnType) client.TxnSender {
return client.TxnSenderFunc(func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
return ba.CreateReply(), nil
})
factory := client.TxnSenderFactoryFunc(func(client.TxnType) client.TxnSender {
return client.TxnSenderAdapter{
StartTrackingWrapped: func(context.Context) error { panic("unimplemented") },
Wrapped: func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
return ba.CreateReply(), nil
},
}
})

clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
Expand Down
19 changes: 15 additions & 4 deletions pkg/internal/client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,22 @@ type DB struct {
ctx DBContext
}

// GetSender returns a transaction-capable Sender instance. The same
// sender must be used for the entirety of a transaction. Get a new
// instance to start a new transaction.
// GetSender returns a Sender that can be used to send requests through.
// Note that a new Sender created; it is not shared.
//
// The Sender returned should not be used for sending transactional requests.
// Use db.Txn() or db.NewTxn() for that.
func (db *DB) GetSender() Sender {
return db.factory.New(RootTxn)
// We pass nil for the txn here because we don't have a txn on hand.
// That's why this method says to not use the Sender for transactional
// requests, plus the fact that if a Sender is used directly, the caller needs
// to be mindful of the need to start a heartbeat loop when writing.
//
// Note that even non-transactional requests need to go through a
// TxnCoordSender because batches that get split need to be wrapped in
// transactions (and the TxnCoordSender handles that). So we can't simply
// return the wrapped handler here.
return db.factory.New(RootTxn, nil /* txn */)
}

// GetFactory returns the DB's TxnSenderFactory.
Expand Down
48 changes: 32 additions & 16 deletions pkg/internal/client/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type TxnSender interface {
GetMeta() roachpb.TxnCoordMeta
// AugmentMeta combines the TxnCoordMeta from another distributed
// TxnSender which is part of the same transaction.
AugmentMeta(meta roachpb.TxnCoordMeta)
AugmentMeta(ctx context.Context, meta roachpb.TxnCoordMeta)
// OnFinish invokes the supplied closure when the sender has finished
// with the txn (i.e. it's been abandoned, aborted, or committed).
// The error passed is meant to indicate to an extant distributed
Expand All @@ -78,16 +78,21 @@ type TxnSender interface {
// if this method is invoked multiple times, the most recent callback
// is the only one which will be invoked.
OnFinish(func(error))

// StartTracking starts a heartbeat loop and tracking of intents.
StartTracking(ctx context.Context) error
}

// TxnSenderFactory is the interface used to create new instances
// of TxnSender.
type TxnSenderFactory interface {
// New returns a new instance of TxnSender. The typ parameter
// specifies whether the sender is the root or one of potentially
// many child "leaf" nodes in a tree of transaction objects, as is
// created during a DistSQL flow.
New(typ TxnType) TxnSender
// New returns a new instance of TxnSender.
// typ specifies whether the sender is the root or one of potentially many
// child "leaf" nodes in a tree of transaction objects, as is created during a
// DistSQL flow.
// txn is the transaction whose requests this sender will carry. It can be nil
// if the sender will not be used for transactional requests.
New(typ TxnType, txn *roachpb.Transaction) TxnSender
// WrappedSender returns the TxnSenderFactory's wrapped Sender.
WrappedSender() Sender
}
Expand All @@ -103,33 +108,44 @@ func (f SenderFunc) Send(
return f(ctx, ba)
}

// TxnSenderFunc is an adapter to allow the use of ordinary functions
// as TxnSenders with GetMeta or AugmentMeta panicing with unimplemented.
// This is a helper mechanism to facilitate testing.
type TxnSenderFunc func(context.Context, roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error)
// TxnSenderAdapter is an adapter to allow the use of ordinary functions as
// TxnSenders with GetMeta or AugmentMeta panicing with unimplemented. This is
// a helper mechanism to facilitate testing.
type TxnSenderAdapter struct {
Wrapped func(context.Context, roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error)
StartTrackingWrapped func(context.Context) error
}

// Send calls f(ctx, c).
func (f TxnSenderFunc) Send(
func (f TxnSenderAdapter) Send(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
return f(ctx, ba)
return f.Wrapped(ctx, ba)
}

// GetMeta is part of the TxnSender interface.
func (f TxnSenderFunc) GetMeta() roachpb.TxnCoordMeta { panic("unimplemented") }
func (f TxnSenderAdapter) GetMeta() roachpb.TxnCoordMeta { panic("unimplemented") }

// AugmentMeta is part of the TxnSender interface.
func (f TxnSenderFunc) AugmentMeta(_ roachpb.TxnCoordMeta) { panic("unimplemented") }
func (f TxnSenderAdapter) AugmentMeta(context.Context, roachpb.TxnCoordMeta) { panic("unimplemented") }

// OnFinish is part of the TxnSender interface.
func (f TxnSenderFunc) OnFinish(_ func(error)) { panic("unimplemented") }
func (f TxnSenderAdapter) OnFinish(_ func(error)) { panic("unimplemented") }

// StartTracking is part the TxnSender interface.
func (f TxnSenderAdapter) StartTracking(ctx context.Context) error {
if f.StartTrackingWrapped != nil {
return f.StartTrackingWrapped(ctx)
}
panic("unimplemented")
}

// TxnSenderFactoryFunc is an adapter to allow the use of ordinary functions
// as TxnSenderFactories. This is a helper mechanism to facilitate testing.
type TxnSenderFactoryFunc func(TxnType) TxnSender

// New calls f().
func (f TxnSenderFactoryFunc) New(typ TxnType) TxnSender {
func (f TxnSenderFactoryFunc) New(typ TxnType, _ *roachpb.Transaction) TxnSender {
return f(typ)
}

Expand Down
Loading