Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
28185:  client, kv: move logic out of Txn, rewrite some of the TxnCoordSender r=andreimatei a=andreimatei

This patch moves most of the logic from the client.Txn into the
kv.TxnCoordSender and reorganizes much of the TxnCoordSender in the
process.
The split between the client.Txn and the TxnCoordSender caused a lot of
grief historically. The main problem is that both the Txn and the TCS
each have their own copy of the roachpb.Transaction proto. They both
use their copy for different things. We attempt to keep the two protos
in sync, but we can't ensure that as there's no common locking between
the two layers.
This patch keeps the client.Txn as a mostly stateless shim, allowing
one to mock everything underneath. This is nice, as previously "mocking
KV" was a less clear proposition - does one mock all the logic in the
Txn or just the TCS? Now the TCS has all the logic and all the locking
necessary for serializing accesses to the "transaction state" - notably
the proto.
The Txn and TCS communicate through a (now expanded) client.TxnSender
interface.

Within the TCS, the biggest change is that everything that has to do
with the heartbeat loop has been moved to a new interceptor.
The metrics generation has also been extracted into a new interceptor.

One behavior change introduced by this patch is that heartbeat loops are
no longer started for (what the TCS hopes will be) 1PC txns. The
motivation was concern over the price of spawning a (shortlived)
heartbeat goroutine per txn in the 1PC-heavy "kv" workload.
Another one is that the TxnCoordSender doesn't inherit the old Txn logic
for swallowing errors on rollbacks. Instead, we're relying on a recent
server change to not return errors on rollbacks when the txn record is
missing - which was the reason for said swallowing.

Fixes cockroachdb#28256

Release note: none

Co-authored-by: Andrei Matei <[email protected]>
  • Loading branch information
craig[bot] and andreimatei committed Aug 9, 2018
2 parents 92ec216 + 69fa9f7 commit f52883f
Show file tree
Hide file tree
Showing 42 changed files with 2,951 additions and 2,673 deletions.
50 changes: 8 additions & 42 deletions pkg/internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -850,39 +850,6 @@ func TestReadConsistencyTypes(t *testing.T) {
}
}

// TestReadOnlyTxnObeysDeadline tests that read-only transactions obey the
// deadline. Read-only transactions have their EndTransaction elided, so the
// enforcement of the deadline is done in the client.
func TestReadOnlyTxnObeysDeadline(t *testing.T) {
defer leaktest.AfterTest(t)()
s, _, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(context.TODO())
db := createTestClient(t, s)
ctx := context.TODO()

if err := db.Put(ctx, "k", "v"); err != nil {
t.Fatal(err)
}

txn := client.NewTxn(db, 0 /* gatewayNodeID */, client.RootTxn)
// Only snapshot transactions can observe deadline errors; serializable ones
// get a restart error before the deadline check.
if err := txn.SetIsolation(enginepb.SNAPSHOT); err != nil {
t.Fatal(err)
}

// Set a deadline, then set a higher commit timestamp for the txn.
txn.UpdateDeadlineMaybe(ctx, s.Clock().Now())
txn.Proto().Timestamp.Forward(s.Clock().Now())
if _, err := txn.Get(ctx, "k"); err != nil {
t.Fatal(err)
}
if err := txn.Commit(ctx); !testutils.IsError(
err, "deadline exceeded before transaction finalization") {
t.Fatal(err)
}
}

// TestTxn_ReverseScan a simple test for Txn.ReverseScan
func TestTxn_ReverseScan(t *testing.T) {
defer leaktest.AfterTest(t)()
Expand Down Expand Up @@ -972,13 +939,10 @@ func TestNodeIDAndObservedTimestamps(t *testing.T) {

// Mock out sender function to check that created transactions
// have the observed timestamp set for the configured node ID.
factory := client.TxnSenderFactoryFunc(func(client.TxnType) client.TxnSender {
return client.TxnSenderFunc(
func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
return ba.CreateReply(), nil
},
)
})
factory := client.MakeMockTxnSenderFactory(
func(_ context.Context, _ *roachpb.Transaction, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
return ba.CreateReply(), nil
})

clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
dbCtx := client.DefaultDBContext()
Expand All @@ -1000,7 +964,8 @@ func TestNodeIDAndObservedTimestamps(t *testing.T) {
for i, test := range directCases {
t.Run(fmt.Sprintf("direct-txn-%d", i), func(t *testing.T) {
txn := client.NewTxn(db, test.nodeID, test.typ)
if ots := txn.Proto().ObservedTimestamps; (len(ots) == 1 && ots[0].NodeID == test.nodeID) != test.expObserved {
ots := txn.Serialize().ObservedTimestamps
if (len(ots) == 1 && ots[0].NodeID == test.nodeID) != test.expObserved {
t.Errorf("expected observed ts %t; got %+v", test.expObserved, ots)
}
})
Expand All @@ -1021,7 +986,8 @@ func TestNodeIDAndObservedTimestamps(t *testing.T) {
}
if err := db.Txn(
ctx, func(_ context.Context, txn *client.Txn) error {
if ots := txn.Proto().ObservedTimestamps; (len(ots) == 1 && ots[0].NodeID == test.nodeID) != test.expObserved {
ots := txn.Serialize().ObservedTimestamps
if (len(ots) == 1 && ots[0].NodeID == test.nodeID) != test.expObserved {
t.Errorf("expected observed ts %t; got %+v", test.expObserved, ots)
}
return nil
Expand Down
26 changes: 0 additions & 26 deletions pkg/internal/client/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@ package client_test
import (
"bytes"
"context"
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)

func setup(t *testing.T) (serverutils.TestServerInterface, *client.DB) {
Expand Down Expand Up @@ -377,27 +375,3 @@ func TestDB_Put_insecure(t *testing.T) {
}
checkResult(t, []byte("1"), result.ValueBytes())
}

func TestDebugName(t *testing.T) {
defer leaktest.AfterTest(t)()
s, db := setup(t)
defer s.Stopper().Stop(context.TODO())

if err := db.Txn(context.TODO(), func(ctx context.Context, txn *client.Txn) error {
// Manually override the txn ID, to make the DebugName below deterministic.
id := "00000000-b33f-b33f-b33f-000000000000"
uuid, err := uuid.FromString(id)
if err != nil {
t.Fatal(err)
}
txn.Proto().ID = uuid

expected := fmt.Sprintf("unnamed (id: %s)", id)
if txn.DebugName() != expected {
t.Fatalf("expected \"%s\", but found \"%s\"", expected, txn.DebugName())
}
return nil
}); err != nil {
t.Errorf("txn failed: %s", err)
}
}
Loading

0 comments on commit f52883f

Please sign in to comment.