Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: remove GetMeta, AugmentMeta and TxnCoordMeta #43032

Merged
merged 1 commit into from
Dec 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
476 changes: 343 additions & 133 deletions c-deps/libroach/protos/roachpb/data.pb.cc

Large diffs are not rendered by default.

449 changes: 299 additions & 150 deletions c-deps/libroach/protos/roachpb/data.pb.h

Large diffs are not rendered by default.

56 changes: 20 additions & 36 deletions docs/tech-notes/txn_coord_sender.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Table of contents:
- [LeafTxns and txn state repatriation](#LeafTxns-and-txn-state-repatriation)
- [client.Txn, meta and TxnCoordSender](#clientTxn-meta-and-TxnCoordSender)
- [Interceptors: between TxnCoordSender and DistSender](#Interceptors-between-TxnCoordSender-and-DistSender)
- [TxnCoordSender state and TxnCoordMeta](#TxnCoordSender-state-and-TxnCoordMeta)
- [TxnCoordSender state](#TxnCoordSender-state)
- [Summary of the all-is-well path](#Summary-of-the-all-is-well-path)
- [Error handling in TxnCoordSender](#Error-handling-in-TxnCoordSender)
- [Error handling with LeafTxns](#Error-handling-with-LeafTxns)
Expand Down Expand Up @@ -129,10 +129,7 @@ This works as follows:
- the SQL executor instantiates the RootTxn as usual.
- when a distributed query is about to start, the distsql
execution code pulls out a struct from the RootTxn
called "TxnCoordMeta", then "trims it down"
using `TrimRootToLeaf()` to turn it into the necessary
and sufficient input to create LeafTxn objects
on other nodes. This contains e.g. the txn ID,
called "LeafTxnInputState". This contains e.g. the txn ID,
timestamp and write intents as outlined above.
- the trimmed meta struct is sent along with the flow
request to a remote distsql server.
Expand All @@ -141,12 +138,12 @@ This works as follows:
- the distsql processor(s) (e.g a table reader) then uses
the LeafTxn to run KV batches.
- when query execution completes, the distsql processor
extracts a similar state struct off the LeafTxn,
trims it down using `TrimLeafToRoot()` and the
extracts a similar state struct off the LeafTxn
called `LeafTxnFinalState` and the
result is repatriated on the gateway when the
flow is shut down.
- on the gateway, repatriated LeafTxn state structs
are merged into the RootTxn using `AugmentTxnCoordMeta()`.
are merged into the RootTxn using `UpdateRootWithLeafFinalState()`.
- on the gateway, any error produced by a LeafTxn is also "ingested"
in the RootTxn to perform additional error recovery and clean-up,
using `UpdateStateOnRemoteRetryableErr()`.
Expand Down Expand Up @@ -293,7 +290,7 @@ writing)](https://github.com/cockroachdb/cockroach/blob/a57647381a4714b48f6ec6de
whereas LeafTxns, which only handle read requests, use [only a
subset](https://github.com/cockroachdb/cockroach/blob/a57647381a4714b48f6ec6dec0bf766eaa6746dd/pkg/kv/txn_coord_sender.go#L556).

## TxnCoordSender state and TxnCoordMeta
## TxnCoordSender state

The overall "current state" of a TCS is thus distributed
between various Go structs:
Expand All @@ -310,23 +307,17 @@ This overall state is a native Go struct and not a protobuf. However,
"current state" of a RootTxn and carry it over to another node to
build a LeafTxn.

For this purpose, a separate [protobuf message
TxnCoordMeta](https://github.com/cockroachdb/cockroach/blob/a57647381a4714b48f6ec6dec0bf766eaa6746dd/pkg/roachpb/data.proto#L617)
is defined. The TCS's `GetMeta()` method initially
populates it by asking every interceptor in turn to write its portion
of the state into it.

(side note: arguably, the name "Meta" here is ill-chosen. There's
nothing meta about it; this struct is really a mere serializable copy
of the txncoordsender's state, and would not be necessary if the
TCS state was natively stored in a protobuf-encodable
struct already.)
For this purpose, a separate protobuf message `LeafTxnInputState` is
defined. The TCS's `GetLeafTxnInputState()` method initially populates
it by asking every interceptor in turn to write its portion of the
state into it.

Conversely, when the state of a LeafTxn is repatriated and to be
"merged" into the RootTxn, the `AugmentMeta()` method uses the
`Update()` method on the `roachpb.Transaction` sub-object (which
merges the state of the txn object itself) then asks every interceptor,
in turn, to collect bits of state it may be interested to merge in too.
"merged" into the RootTxn, the `UpdateRootFromLeafFinalState()` method
uses the `Update()` method on the `roachpb.Transaction` sub-object
(which merges the state of the txn object itself) then asks every
interceptor, in turn, to collect bits of state it may be interested to
merge in too.

For example, that's where the RootTxn's txnSpanRefresher interceptor
picks up the spans accumulated in the LeafTxn.
Expand Down Expand Up @@ -610,7 +601,7 @@ SQL) is responsible for generating seqnums.
The seqnum counter's current value is split between three locations:

- a local variable in one of the interceptors, called `txnSeqNumAllocator` inside the TCS;
- the `enginepb.TxnMeta` record, inside the `roachpb.Transaction` held inside the `TxnCoordMeta`.
- the `enginepb.TxnMeta` record, inside the `roachpb.Transaction` held inside the `LeafTxnInputState`.
- the `enginepb.TxnMeta` record, inside the `roachpb.Transaction` held inside the header of every executed KV batch.

These three values are synchronized as follows:
Expand All @@ -627,18 +618,11 @@ These three values are synchronized as follows:
batch header in certain circumstnaces (most notably by another later
interceptor, the `txnPipeliner`) for use during txn conflict
resolution and write reordering.
- When a TCS is instantiated from a TxnCoordMeta (e.g. forking a
RootTxn into a LeafTxn), the counter value from the TxnMeta inside the TxnCoordMeta
- When a TCS is instantiated from a LeafTxnInputState (e.g. forking a
RootTxn into a LeafTxn), the counter value from the TxnMeta inside the LeafTxnInputState
is copied into the interceptor.
- When a TxnCoordMeta is constructed from a TCS, the value is copied
from the interceptor to the TxnCoordMeta.
- When a TCS is augmented by a TxnCoordMeta from a leaf, if the
sequence number from the incoming TxnCoordMeta is greater it is used
to bump the current TCS's counter.

(Note that this last logic step is currently never executed—i.e. it seems
to be over-engineered—since LeafTxns cannot issue writes and will
thus never increase the counter.)
- When a LeafTxnInputState is constructed from a TCS, the value is copied
from the interceptor.

Final note: the seqnum is scoped to a current txn epoch. When the
epoch field is incremented, the seqnum generator resets to 0. The
Expand Down
Binary file modified docs/tech-notes/txn_coord_sender/leafbase.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
20 changes: 9 additions & 11 deletions docs/tech-notes/txn_coord_sender/leafbase.puml
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,19 @@ end box
participant cluster

create RootTxn
SQL -> RootTxn : client.NewTxn(RootTxn)
SQL -> RootTxn : client.NewTxn()
...
note left of SQL
forking RootTxn
into LeafTxn meta
end note
SQL -> RootTxn : txn.GetTxnCoordMeta()
RootTxn --> SQL : TxnCoordMeta
SQL -> SQL : leafmeta.StripRootToLeaf()
SQL -> dSQLServer : SetupFlow(proc spec, leafmeta)
SQL -> RootTxn : txn.GetLeafTxnInitialState()
RootTxn --> SQL : LeafTxnInitialState
SQL -> dSQLServer : SetupFlow(proc spec, initState)
note over dSQLServer,LeafTxn: (dSQL proc starts exec)
note left of SQL: actually instantiating LeafTxn
create LeafTxn
dSQLServer -> LeafTxn : client.NewTxnWithCoordMeta()
dSQLServer -> LeafTxn : client.NewLeafTxn()
...
note left of SQL
LeafTxn issuing reads
Expand All @@ -43,11 +42,10 @@ note left of SQL
updates to LeafTxn
repatriated into RootTxn
end note
dSQLServer -> LeafTxn : GetTxnCoordMeta()
LeafTxn --> dSQLServer : TxnCoordMeta
dSQLServer -> dSQLServer : leafmeta.StripLeafToRoot()
dSQLServer --> SQL : final results + leafmeta
SQL -> RootTxn : txn.AugmentTxnCoordMeta(leafmeta)
dSQLServer -> LeafTxn : GetLeafTxnFinalState()
LeafTxn --> dSQLServer : LeafTxnFinalState
dSQLServer --> SQL : final results + finalState
SQL -> RootTxn : txn.UpdateRootFromLeafFinalState(finalState)
...
SQL -> RootTxn : Commit/Rollback/CleanupOnError
@enduml
2 changes: 1 addition & 1 deletion pkg/ccl/followerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func TestOracleFactory(t *testing.T) {
Tracer: tracing.NewTracer(),
}, client.MockTxnSenderFactory{},
hlc.NewClock(hlc.UnixNano, time.Nanosecond))
txn := client.NewTxn(context.TODO(), c, 0, client.RootTxn)
txn := client.NewTxn(context.TODO(), c, 0)
of := replicaoracle.NewOracleFactory(followerReadAwareChoice, replicaoracle.Config{
Settings: st,
RPCContext: rpcContext,
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/debug_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func TestRemoveDeadReplicas(t *testing.T) {
// we restart the cluster, so just write a setting.
s.Exec(t, "set cluster setting cluster.organization='remove dead replicas test'")

txn := client.NewTxn(ctx, tc.Servers[0].DB(), 1, client.RootTxn)
txn := client.NewTxn(ctx, tc.Servers[0].DB(), 1)
var desc roachpb.RangeDescriptor
// Pick one of the predefined split points.
rdKey := keys.RangeDescriptorKey(roachpb.RKey(keys.TimeseriesPrefix))
Expand Down
12 changes: 7 additions & 5 deletions pkg/internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,8 @@ func TestClientRunTransaction(t *testing.T) {
return err
}
// Attempt to read in another txn.
conflictTxn := client.NewTxn(ctx, db, 0 /* gatewayNodeID */, client.RootTxn)
conflictTxn.InternalSetPriority(enginepb.MaxTxnPriority)
conflictTxn := client.NewTxn(ctx, db, 0 /* gatewayNodeID */)
conflictTxn.TestingSetPriority(enginepb.MaxTxnPriority)
if gr, err := conflictTxn.Get(ctx, key); err != nil {
return err
} else if gr.Value != nil {
Expand Down Expand Up @@ -846,8 +846,10 @@ func TestNodeIDAndObservedTimestamps(t *testing.T) {
}
for i, test := range directCases {
t.Run(fmt.Sprintf("direct-txn-%d", i), func(t *testing.T) {
txn := client.NewTxn(ctx, db, test.nodeID, test.typ)
ots := txn.Serialize().ObservedTimestamps
now := db.Clock().Now()
kvTxn := roachpb.MakeTransaction("unnamed", nil /*baseKey*/, roachpb.NormalUserPriority, now, db.Clock().MaxOffset().Nanoseconds())
txn := client.NewTxnFromProto(ctx, db, test.nodeID, now, test.typ, &kvTxn)
ots := txn.TestingCloneTxn().ObservedTimestamps
if (len(ots) == 1 && ots[0].NodeID == test.nodeID) != test.expObserved {
t.Errorf("expected observed ts %t; got %+v", test.expObserved, ots)
}
Expand All @@ -869,7 +871,7 @@ func TestNodeIDAndObservedTimestamps(t *testing.T) {
}
if err := db.Txn(
ctx, func(_ context.Context, txn *client.Txn) error {
ots := txn.Serialize().ObservedTimestamps
ots := txn.TestingCloneTxn().ObservedTimestamps
if (len(ots) == 1 && ots[0].NodeID == test.nodeID) != test.expObserved {
t.Errorf("expected observed ts %t; got %+v", test.expObserved, ots)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/internal/client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ func (db *DB) Run(ctx context.Context, b *Batch) error {

// NewTxn creates a new RootTxn.
func (db *DB) NewTxn(ctx context.Context, debugName string) *Txn {
txn := NewTxn(ctx, db, db.ctx.NodeID.Get(), RootTxn)
txn := NewTxn(ctx, db, db.ctx.NodeID.Get())
txn.SetDebugName(debugName)
return txn
}
Expand All @@ -711,7 +711,7 @@ func (db *DB) Txn(ctx context.Context, retryable func(context.Context, *Txn) err
// TODO(radu): we should open a tracing Span here (we need to figure out how
// to use the correct tracer).

txn := NewTxn(ctx, db, db.ctx.NodeID.Get(), RootTxn)
txn := NewTxn(ctx, db, db.ctx.NodeID.Get())
txn.SetDebugName("unnamed")
err := txn.exec(ctx, func(ctx context.Context, txn *Txn) error {
return retryable(ctx, txn)
Expand Down
51 changes: 40 additions & 11 deletions pkg/internal/client/mock_transactional_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,24 @@ func (m *MockTransactionalSender) Send(
return m.senderFunc(ctx, &m.txn, ba)
}

// GetMeta is part of the TxnSender interface.
func (m *MockTransactionalSender) GetMeta(
// GetLeafTxnInputState is part of the TxnSender interface.
func (m *MockTransactionalSender) GetLeafTxnInputState(
context.Context, TxnStatusOpt,
) (roachpb.TxnCoordMeta, error) {
) (roachpb.LeafTxnInputState, error) {
panic("unimplemented")
}

// AugmentMeta is part of the TxnSender interface.
func (m *MockTransactionalSender) AugmentMeta(context.Context, roachpb.TxnCoordMeta) {
// GetLeafTxnFinalState is part of the TxnSender interface.
func (m *MockTransactionalSender) GetLeafTxnFinalState(
context.Context, TxnStatusOpt,
) (roachpb.LeafTxnFinalState, error) {
panic("unimplemented")
}

// UpdateRootWithLeafFinalState is part of the TxnSender interface.
func (m *MockTransactionalSender) UpdateRootWithLeafFinalState(
context.Context, *roachpb.LeafTxnFinalState,
) {
panic("unimplemented")
}

Expand Down Expand Up @@ -83,6 +92,11 @@ func (m *MockTransactionalSender) ReadTimestamp() hlc.Timestamp {
return m.txn.ReadTimestamp
}

// ProvisionalCommitTimestamp is part of the TxnSender interface.
func (m *MockTransactionalSender) ProvisionalCommitTimestamp() hlc.Timestamp {
return m.txn.WriteTimestamp
}

// CommitTimestamp is part of the TxnSender interface.
func (m *MockTransactionalSender) CommitTimestamp() hlc.Timestamp {
return m.txn.ReadTimestamp
Expand Down Expand Up @@ -124,11 +138,16 @@ func (m *MockTransactionalSender) IsSerializablePushAndRefreshNotPossible() bool
// Epoch is part of the TxnSender interface.
func (m *MockTransactionalSender) Epoch() enginepb.TxnEpoch { panic("unimplemented") }

// SerializeTxn is part of the TxnSender interface.
func (m *MockTransactionalSender) SerializeTxn() *roachpb.Transaction {
// TestingCloneTxn is part of the TxnSender interface.
func (m *MockTransactionalSender) TestingCloneTxn() *roachpb.Transaction {
return m.txn.Clone()
}

// Active is part of the TxnSender interface.
func (m *MockTransactionalSender) Active() bool {
panic("unimplemented")
}

// UpdateStateOnRemoteRetryableErr is part of the TxnSender interface.
func (m *MockTransactionalSender) UpdateStateOnRemoteRetryableErr(
ctx context.Context, pErr *roachpb.Error,
Expand All @@ -139,6 +158,11 @@ func (m *MockTransactionalSender) UpdateStateOnRemoteRetryableErr(
// DisablePipelining is part of the client.TxnSender interface.
func (m *MockTransactionalSender) DisablePipelining() error { return nil }

// PrepareRetryableError is part of the client.TxnSender interface.
func (m *MockTransactionalSender) PrepareRetryableError(ctx context.Context, msg string) error {
return roachpb.NewTransactionRetryWithProtoRefreshError(msg, m.txn.ID, *m.txn.Clone())
}

// MockTxnSenderFactory is a TxnSenderFactory producing MockTxnSenders.
type MockTxnSenderFactory struct {
senderFunc func(context.Context, *roachpb.Transaction, roachpb.BatchRequest) (
Expand All @@ -160,11 +184,16 @@ func MakeMockTxnSenderFactory(
}
}

// TransactionalSender is part of TxnSenderFactory.
func (f MockTxnSenderFactory) TransactionalSender(
_ TxnType, coordMeta roachpb.TxnCoordMeta, _ roachpb.UserPriority,
// RootTransactionalSender is part of TxnSenderFactory.
func (f MockTxnSenderFactory) RootTransactionalSender(
txn *roachpb.Transaction, _ roachpb.UserPriority,
) TxnSender {
return NewMockTransactionalSender(f.senderFunc, &coordMeta.Txn)
return NewMockTransactionalSender(f.senderFunc, txn)
}

// LeafTransactionalSender is part of TxnSenderFactory.
func (f MockTxnSenderFactory) LeafTransactionalSender(tis *roachpb.LeafTxnInputState) TxnSender {
return NewMockTransactionalSender(f.senderFunc, &tis.Txn)
}

// NonTransactionalSender is part of TxnSenderFactory.
Expand Down
Loading