Skip to content

Commit

Permalink
client, kv: move logic out of Txn, rewrite some of the TxnCoordSender
Browse files Browse the repository at this point in the history
This patch moves most of the logic from the client.Txn into the
kv.TxnCoordSender and reorganizes much of the TxnCoordSender in the
process.
The split between the client.Txn and the TxnCoordSender caused a lot of
grief historically. The main problem is that both the Txn and the TCS
each have their own copy of the roachpb.Transaction proto. They both
use their copy for different things. We attempt to keep the two protos
in sync, but we can't ensure that as there's no common locking between
the two layers.
This patch keeps the client.Txn as a mostly stateless shim, allowing
one to mock everything underneath. This is nice, as previously "mocking
KV" was a less clear proposition - does one mock all the logic in the
Txn or just the TCS? Now the TCS has all the logic and all the locking
necessary for serializing accesses to the "transaction state" - notably
the proto.
The Txn and TCS communicate through a (now expanded) client.TxnSender
interface.

Within the TCS, the biggest change is that everything that has to do
with the heartbeat loop has been moved to a new interceptor.

Fixes cockroachdb#28256

Release note: none
  • Loading branch information
andreimatei committed Aug 3, 2018
1 parent 624aacd commit 035cec9
Show file tree
Hide file tree
Showing 41 changed files with 2,668 additions and 2,374 deletions.
50 changes: 8 additions & 42 deletions pkg/internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -850,39 +850,6 @@ func TestReadConsistencyTypes(t *testing.T) {
}
}

// TestReadOnlyTxnObeysDeadline tests that read-only transactions obey the
// deadline. Read-only transactions have their EndTransaction elided, so the
// enforcement of the deadline is done in the client.
func TestReadOnlyTxnObeysDeadline(t *testing.T) {
defer leaktest.AfterTest(t)()
s, _, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(context.TODO())
db := createTestClient(t, s)
ctx := context.TODO()

if err := db.Put(ctx, "k", "v"); err != nil {
t.Fatal(err)
}

txn := client.NewTxn(db, 0 /* gatewayNodeID */, client.RootTxn)
// Only snapshot transactions can observe deadline errors; serializable ones
// get a restart error before the deadline check.
if err := txn.SetIsolation(enginepb.SNAPSHOT); err != nil {
t.Fatal(err)
}

// Set a deadline, then set a higher commit timestamp for the txn.
txn.UpdateDeadlineMaybe(ctx, s.Clock().Now())
txn.Proto().Timestamp.Forward(s.Clock().Now())
if _, err := txn.Get(ctx, "k"); err != nil {
t.Fatal(err)
}
if err := txn.Commit(ctx); !testutils.IsError(
err, "deadline exceeded before transaction finalization") {
t.Fatal(err)
}
}

// TestTxn_ReverseScan a simple test for Txn.ReverseScan
func TestTxn_ReverseScan(t *testing.T) {
defer leaktest.AfterTest(t)()
Expand Down Expand Up @@ -972,13 +939,10 @@ func TestNodeIDAndObservedTimestamps(t *testing.T) {

// Mock out sender function to check that created transactions
// have the observed timestamp set for the configured node ID.
factory := client.TxnSenderFactoryFunc(func(client.TxnType) client.TxnSender {
return client.TxnSenderFunc(
func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
return ba.CreateReply(), nil
},
)
})
factory := client.MakeMockTxnSenderFactory(
func(_ context.Context, _ *roachpb.Transaction, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
return ba.CreateReply(), nil
})

clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
dbCtx := client.DefaultDBContext()
Expand All @@ -1000,7 +964,8 @@ func TestNodeIDAndObservedTimestamps(t *testing.T) {
for i, test := range directCases {
t.Run(fmt.Sprintf("direct-txn-%d", i), func(t *testing.T) {
txn := client.NewTxn(db, test.nodeID, test.typ)
if ots := txn.Proto().ObservedTimestamps; (len(ots) == 1 && ots[0].NodeID == test.nodeID) != test.expObserved {
ots := txn.Serialize().ObservedTimestamps
if (len(ots) == 1 && ots[0].NodeID == test.nodeID) != test.expObserved {
t.Errorf("expected observed ts %t; got %+v", test.expObserved, ots)
}
})
Expand All @@ -1021,7 +986,8 @@ func TestNodeIDAndObservedTimestamps(t *testing.T) {
}
if err := db.Txn(
ctx, func(_ context.Context, txn *client.Txn) error {
if ots := txn.Proto().ObservedTimestamps; (len(ots) == 1 && ots[0].NodeID == test.nodeID) != test.expObserved {
ots := txn.Serialize().ObservedTimestamps
if (len(ots) == 1 && ots[0].NodeID == test.nodeID) != test.expObserved {
t.Errorf("expected observed ts %t; got %+v", test.expObserved, ots)
}
return nil
Expand Down
26 changes: 0 additions & 26 deletions pkg/internal/client/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@ package client_test
import (
"bytes"
"context"
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)

func setup(t *testing.T) (serverutils.TestServerInterface, *client.DB) {
Expand Down Expand Up @@ -377,27 +375,3 @@ func TestDB_Put_insecure(t *testing.T) {
}
checkResult(t, []byte("1"), result.ValueBytes())
}

func TestDebugName(t *testing.T) {
defer leaktest.AfterTest(t)()
s, db := setup(t)
defer s.Stopper().Stop(context.TODO())

if err := db.Txn(context.TODO(), func(ctx context.Context, txn *client.Txn) error {
// Manually override the txn ID, to make the DebugName below deterministic.
id := "00000000-b33f-b33f-b33f-000000000000"
uuid, err := uuid.FromString(id)
if err != nil {
t.Fatal(err)
}
txn.Proto().ID = uuid

expected := fmt.Sprintf("unnamed (id: %s)", id)
if txn.DebugName() != expected {
t.Fatalf("expected \"%s\", but found \"%s\"", expected, txn.DebugName())
}
return nil
}); err != nil {
t.Errorf("txn failed: %s", err)
}
}
229 changes: 207 additions & 22 deletions pkg/internal/client/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

// TxnType specifies whether a transaction is the root (parent)
Expand Down Expand Up @@ -78,11 +80,91 @@ type TxnSender interface {
// if this method is invoked multiple times, the most recent callback
// is the only one which will be invoked.
OnFinish(func(error))

// SetSystemConfigTrigger sets the system db trigger to true on this transaction.
// This will impact the EndTransactionRequest.
//
// NOTE: The system db trigger will only execute correctly if the transaction
// record is located on the range that contains the system span. If a
// transaction is created which modifies both system *and* non-system data, it
// should be ensured that the transaction record itself is on the system span.
// This can be done by making sure a system key is the first key touched in the
// transaction.
SetSystemConfigTrigger() error

// BumpEpochAfterConcurrentRetryError bumps the transaction epoch
// manually and resets the transaction state away from txnError. This is meant
// to be used after synchronizing concurrent actors using a txn when a
// retryable error is seen.
//
// TODO(andrei): this should go away once we move to a TxnAttempt model.
BumpEpochAfterConcurrentRetryError(context.Context)

// TxnStatus exports the txn's status.
TxnStatus() roachpb.TransactionStatus

SetUserPriority(roachpb.UserPriority) error
SetDebugName(name string)
SetIsolation(isolation enginepb.IsolationType) error

// OrigTimestamp returns the transaction's starting timestamp.
// Note a transaction can be internally pushed forward in time before
// committing so this is not guaranteed to be the commit timestamp.
// Use CommitTimestamp() when needed.
OrigTimestamp() hlc.Timestamp

// CommitTimestamp returns the transaction's start timestamp.
// The start timestamp can get pushed but the use of this
// method will guarantee that the caller of this method sees
// the push and thus calls this method again to receive the new
// timestamp.
CommitTimestamp() hlc.Timestamp

// SetFixedTimestamp makes the transaction run in an unusual way, at a "fixed
// timestamp": Timestamp and OrigTimestamp are set to ts, there's no clock
// uncertainty, and the txn's deadline is set to ts such that the transaction
// can't be pushed to a different timestamp.
//
// This is used to support historical queries (AS OF SYSTEM TIME queries and
// backups). This method must be called on every transaction retry (but note
// that retries should be rare for read-only queries with no clock uncertainty).
SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp)

// ManualRestart bumps the transactions epoch, and can upgrade the priority
// and timestamp.
// An uninitialized timestamp can be passed to leave the timestamp alone.
ManualRestart(context.Context, roachpb.UserPriority, hlc.Timestamp)

// IsSerializablePushAndRefreshNotPossible returns true if the transaction is
// serializable, its timestamp has been pushed and there's no chance that
// refreshing the read spans will succeed later (thus allowing the transaction
// to commit and not be restarted). Used to detect whether the txn is
// guaranteed to get a retriable error later.
//
// Note that this method allows for false negatives: sometimes the client only
// figures out that it's been pushed when it sends an EndTransaction - i.e.
// it's possible for the txn to have been pushed asynchoronously by some other
// operation (usually, but not exclusively, by a high-priority txn with
// conflicting writes).
IsSerializablePushAndRefreshNotPossible() bool

Epoch() uint32

// SerializeTxn returns a clone of the transaction's current proto.
// This is a nuclear option; generally client code shouldn't deal with protos.
// However, this is used by DistSQL for sending the transaction over the wire
// when it creates flows.
SerializeTxn() *roachpb.Transaction

// UpdateStateOnRemoteRetryableErr updates the txn in response to an error
// encountered when running a request through the txn.
UpdateStateOnRemoteRetryableErr(context.Context, *roachpb.Error) *roachpb.Error

// DisablePipelining instructs the TxnSender not to pipeline requests. It
// should rarely be necessary to call this method. It is only recommended for
// transactions that need extremely precise control over the request ordering,
// like the transaction that merges ranges together.
DisablePipelining()
DisablePipelining() error
}

// TxnSenderFactory is the interface used to create new instances
Expand Down Expand Up @@ -111,45 +193,148 @@ func (f SenderFunc) Send(
return f(ctx, ba)
}

// TxnSenderFunc is an adapter to allow the use of ordinary functions as
// TxnSenders with GetMeta or AugmentMeta panicing with unimplemented. This is
// a helper mechanism to facilitate testing.
type TxnSenderFunc func(
context.Context, roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error)
// MockTransactionalSender allows a function to be used as a TxnSender.
type MockTransactionalSender struct {
senderFunc func(
context.Context, *roachpb.Transaction, roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error)
txn roachpb.Transaction
}

// Send calls f(ctx, c).
func (f TxnSenderFunc) Send(
// NewMockTransactionalSender creates a MockTransactionalSender.
func NewMockTransactionalSender(
f func(
context.Context, *roachpb.Transaction, roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error),
txn *roachpb.Transaction,
) *MockTransactionalSender {
return &MockTransactionalSender{senderFunc: f, txn: txn.Clone()}
}

// Send is part of the TxnSender interface.
func (m *MockTransactionalSender) Send(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
return f(ctx, ba)
return m.senderFunc(ctx, &m.txn, ba)
}

// GetMeta is part of the TxnSender interface.
func (f TxnSenderFunc) GetMeta() roachpb.TxnCoordMeta { panic("unimplemented") }
func (m *MockTransactionalSender) GetMeta() roachpb.TxnCoordMeta { panic("unimplemented") }

// AugmentMeta is part of the TxnSender interface.
func (f TxnSenderFunc) AugmentMeta(context.Context, roachpb.TxnCoordMeta) { panic("unimplemented") }
func (m *MockTransactionalSender) AugmentMeta(context.Context, roachpb.TxnCoordMeta) {
panic("unimplemented")
}

// OnFinish is part of the TxnSender interface.
func (f TxnSenderFunc) OnFinish(_ func(error)) { panic("unimplemented") }
func (m *MockTransactionalSender) OnFinish(_ func(error)) { panic("unimplemented") }

// SetSystemConfigTrigger is part of the TxnSender interface.
func (m *MockTransactionalSender) SetSystemConfigTrigger() error { panic("unimplemented") }

// BumpEpochAfterConcurrentRetryError is part of the TxnSender interface.
func (m *MockTransactionalSender) BumpEpochAfterConcurrentRetryError(context.Context) {
panic("unimplemented")
}

// TxnStatus is part of the TxnSender interface.
func (m *MockTransactionalSender) TxnStatus() roachpb.TransactionStatus {
return m.txn.Status
}

// DisablePipelining is part of the TxnSender interface.
func (f TxnSenderFunc) DisablePipelining() { panic("unimplemented") }
// SetUserPriority is part of the TxnSender interface.
func (m *MockTransactionalSender) SetUserPriority(pri roachpb.UserPriority) error {
m.txn.Priority = roachpb.MakePriority(pri)
return nil
}

// SetDebugName is part of the TxnSender interface.
func (m *MockTransactionalSender) SetDebugName(name string) {
m.txn.Name = name
}

// SetIsolation is part of the TxnSender interface.
func (m *MockTransactionalSender) SetIsolation(isolation enginepb.IsolationType) error {
m.txn.Isolation = isolation
return nil
}

// OrigTimestamp is part of the TxnSender interface.
func (m *MockTransactionalSender) OrigTimestamp() hlc.Timestamp {
return m.txn.OrigTimestamp
}

// CommitTimestamp is part of the TxnSender interface.
func (m *MockTransactionalSender) CommitTimestamp() hlc.Timestamp {
return m.txn.OrigTimestamp
}

// SetFixedTimestamp is part of the TxnSender interface.
func (m *MockTransactionalSender) SetFixedTimestamp(context.Context, hlc.Timestamp) {
panic("unimplemented")
}

// ManualRestart is part of the TxnSender interface.
func (m *MockTransactionalSender) ManualRestart(
ctx context.Context, pri roachpb.UserPriority, ts hlc.Timestamp,
) {
m.txn.Restart(pri, 0 /* upgradePriority */, ts)
}

// IsSerializablePushAndRefreshNotPossible is part of the TxnSender interface.
func (m *MockTransactionalSender) IsSerializablePushAndRefreshNotPossible() bool {
panic("unimplemented")
}

// Epoch is part of the TxnSender interface.
func (m *MockTransactionalSender) Epoch() uint32 { panic("unimplemented") }

// SerializeTxn is part of the TxnSender interface.
func (m *MockTransactionalSender) SerializeTxn() *roachpb.Transaction {
cp := m.txn.Clone()
return &cp
}

// UpdateStateOnRemoteRetryableErr is part of the TxnSender interface.
func (m *MockTransactionalSender) UpdateStateOnRemoteRetryableErr(
ctx context.Context, pErr *roachpb.Error,
) *roachpb.Error {
panic("unimplemented")
}

// DisablePipelining is part of the client.TxnSender interface.
func (m *MockTransactionalSender) DisablePipelining() error { return nil }

// TxnSenderFactoryFunc is an adapter to allow the use of ordinary functions
// as TxnSenderFactories. This is a helper mechanism to facilitate testing.
type TxnSenderFactoryFunc func(TxnType) TxnSender
// MockTxnSenderFactory is a TxnSenderFactory producing MockTxnSenders.
type MockTxnSenderFactory struct {
senderFunc func(context.Context, *roachpb.Transaction, roachpb.BatchRequest) (
*roachpb.BatchResponse, *roachpb.Error)
}

var _ TxnSenderFactory = TxnSenderFactoryFunc(nil)
var _ TxnSenderFactory = MockTxnSenderFactory{}

// MakeMockTxnSenderFactory creates a MockTxnSenderFactory from a sender
// function that receives the transaction in addition to the request. The
// function is responsible for putting the txn inside the batch, if needed.
func MakeMockTxnSenderFactory(
senderFunc func(
context.Context, *roachpb.Transaction, roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error),
) MockTxnSenderFactory {
return MockTxnSenderFactory{
senderFunc: senderFunc,
}
}

// TransactionalSender is part of TxnSenderFactory.
func (f TxnSenderFactoryFunc) TransactionalSender(typ TxnType, _ roachpb.TxnCoordMeta) TxnSender {
return f(typ)
func (f MockTxnSenderFactory) TransactionalSender(
_ TxnType, coordMeta roachpb.TxnCoordMeta,
) TxnSender {
return NewMockTransactionalSender(f.senderFunc, &coordMeta.Txn)
}

// NonTransactionalSender is part of TxnSenderFactory.
func (f TxnSenderFactoryFunc) NonTransactionalSender() Sender {
func (f MockTxnSenderFactory) NonTransactionalSender() Sender {
return nil
}

Expand Down
Loading

0 comments on commit 035cec9

Please sign in to comment.