Skip to content

Commit

Permalink
kv/kvclient: add ManualRefresh support for transactions
Browse files Browse the repository at this point in the history
This commit adds support for client-initiated refreshes of transactions.
The implementation is somewhat simplistic in that it hijacks existing
logic that occurs during the sending of a request. This makese the
implementation more uniform with the rest of the client library at
the cost of being somewhat awkward and implicit from a code-reading
perspective.

The motivation for this change is to provide the necessary tools to allow
the merge transaction to get pushed. The adoption follows in the next
commit.

Release note: None
  • Loading branch information
ajwerner committed Feb 16, 2021
1 parent fe919cc commit 9135287
Show file tree
Hide file tree
Showing 6 changed files with 230 additions and 8 deletions.
21 changes: 21 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1143,3 +1143,24 @@ func (tc *TxnCoordSender) GetSteppingMode(ctx context.Context) (curMode kv.Stepp
}
return curMode
}

// ManualRefresh is part of the TxnSender interface.
func (tc *TxnCoordSender) ManualRefresh(ctx context.Context) error {
tc.mu.Lock()
defer tc.mu.Unlock()

// Hijack the pre-emptive refresh code path to perform the refresh but
// provide the force flag to ensure that the refresh occurs unconditionally.
var ba roachpb.BatchRequest
ba.Txn = tc.mu.txn.Clone()
const force = true
ba, pErr := tc.interceptorAlloc.txnSpanRefresher.maybeRefreshPreemptivelyLocked(ctx, ba, force)
if pErr != nil {
pErr = tc.updateStateLocked(ctx, ba, nil, pErr)
} else {
var br roachpb.BatchResponse
br.Txn = ba.Txn
pErr = tc.updateStateLocked(ctx, ba, &br, pErr)
}
return pErr.GoError()
}
167 changes: 167 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2400,3 +2400,170 @@ func TestPutsInStagingTxn(t *testing.T) {
// seen a batch with the STAGING status.
require.True(t, putInStagingSeen)
}

// TestTxnManualRefresh verifies that TxnCoordSender's ManualRefresh method
// works as expected.
func TestTxnManualRefresh(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Create some machinery to mock out the kvserver and allow the test to
// launch some requests from the client and then pass control flow of handling
// those requests back to the test.
type resp struct {
br *roachpb.BatchResponse
pErr *roachpb.Error
}
type req struct {
ba roachpb.BatchRequest
respCh chan resp
}
type testCase struct {
name string
run func(
ctx context.Context,
t *testing.T,
db *kv.DB,
clock *hlc.ManualClock,
reqCh <-chan req,
)
}
var cases = []testCase{
{
name: "no-op",
run: func(
ctx context.Context, t *testing.T, db *kv.DB,
clock *hlc.ManualClock, reqCh <-chan req,
) {
txn := db.NewTxn(ctx, "test")
errCh := make(chan error)
go func() {
_, err := txn.Get(ctx, "foo")
errCh <- err
}()
{
r := <-reqCh
_, ok := r.ba.GetArg(roachpb.Get)
require.True(t, ok)
var br roachpb.BatchResponse
br.Txn = r.ba.Txn
br.Add(&roachpb.GetResponse{})
r.respCh <- resp{br: &br}
}
require.NoError(t, <-errCh)

// Now a refresh should be a no-op which is indicated by the fact that
// this call does not block to send requests.
require.NoError(t, txn.ManualRefresh(ctx))
require.NoError(t, txn.Commit(ctx))
},
},
{
name: "refresh occurs due to read",
run: func(
ctx context.Context, t *testing.T, db *kv.DB,
clock *hlc.ManualClock, reqCh <-chan req,
) {
txn := db.NewTxn(ctx, "test")
errCh := make(chan error)
go func() {
_, err := txn.Get(ctx, "foo")
errCh <- err
}()
{
r := <-reqCh
_, ok := r.ba.GetArg(roachpb.Get)
require.True(t, ok)
var br roachpb.BatchResponse
br.Txn = r.ba.Txn
br.Add(&roachpb.GetResponse{})
r.respCh <- resp{br: &br}
}
require.NoError(t, <-errCh)

go func() {
errCh <- txn.Put(ctx, "bar", "baz")
}()
{
r := <-reqCh
_, ok := r.ba.GetArg(roachpb.Put)
require.True(t, ok)
var br roachpb.BatchResponse
br.Txn = r.ba.Txn.Clone()
// Push the WriteTimestamp simulating an interaction with the
// timestamp cache.
br.Txn.WriteTimestamp =
br.Txn.WriteTimestamp.Add(time.Millisecond.Nanoseconds(), 0)
br.Add(&roachpb.PutResponse{})
r.respCh <- resp{br: &br}
}
require.NoError(t, <-errCh)

go func() {
errCh <- txn.ManualRefresh(ctx)
}()
{
r := <-reqCh
_, ok := r.ba.GetArg(roachpb.Refresh)
require.True(t, ok)
var br roachpb.BatchResponse
br.Txn = r.ba.Txn.Clone()
br.Add(&roachpb.RefreshResponse{})
r.respCh <- resp{br: &br}
}
require.NoError(t, <-errCh)

// Now a refresh should be a no-op which is indicated by the fact that
// this call does not block to send requests.
require.NoError(t, txn.ManualRefresh(ctx))
},
},
}
run := func(t *testing.T, tc testCase) {
stopper := stop.NewStopper()
manual := hlc.NewManualClock(123)
clock := hlc.NewClock(manual.UnixNano, time.Nanosecond)
ctx := context.Background()
defer stopper.Stop(ctx)

reqCh := make(chan req)
var senderFn kv.SenderFunc = func(_ context.Context, ba roachpb.BatchRequest) (
*roachpb.BatchResponse, *roachpb.Error) {
r := req{
ba: ba,
respCh: make(chan resp),
}
select {
case reqCh <- r:
case <-ctx.Done():
return nil, roachpb.NewError(ctx.Err())
}
select {
case rr := <-r.respCh:
return rr.br, rr.pErr
case <-ctx.Done():
return nil, roachpb.NewError(ctx.Err())
}
}
ambient := log.AmbientContext{Tracer: tracing.NewTracer()}
tsf := NewTxnCoordSenderFactory(
TxnCoordSenderFactoryConfig{
AmbientCtx: ambient,
Clock: clock,
Stopper: stopper,
HeartbeatInterval: time.Hour,
},
senderFn,
)
db := kv.NewDB(ambient, tsf, clock, stopper)

cancelCtx, cancel := context.WithCancel(ctx)
defer cancel()
tc.run(cancelCtx, t, db, manual, reqCh)
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
run(t, tc)
})
}
}
12 changes: 7 additions & 5 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (sr *txnSpanRefresher) SendLocked(
}

// Attempt a refresh before sending the batch.
ba, pErr := sr.maybeRefreshPreemptively(ctx, ba)
ba, pErr := sr.maybeRefreshPreemptivelyLocked(ctx, ba, false)
if pErr != nil {
return nil, pErr
}
Expand Down Expand Up @@ -406,13 +406,15 @@ func (sr *txnSpanRefresher) splitEndTxnAndRetrySend(
return br, nil
}

// maybeRefreshPreemptively attempts to refresh a transaction's read timestamp
// maybeRefreshPreemptivelyLocked attempts to refresh a transaction's read timestamp
// eagerly. Doing so can take advantage of opportunities where the refresh is
// free or can avoid wasting work issuing a batch containing an EndTxn that will
// necessarily throw a serializable error. The method returns a batch with an
// updated transaction if the refresh is successful, or a retry error if not.
func (sr *txnSpanRefresher) maybeRefreshPreemptively(
ctx context.Context, ba roachpb.BatchRequest,
// If the force flag is true, the refresh will be attempted even if a refresh
// is not inevitable.
func (sr *txnSpanRefresher) maybeRefreshPreemptivelyLocked(
ctx context.Context, ba roachpb.BatchRequest, force bool,
) (roachpb.BatchRequest, *roachpb.Error) {
// If we know that the transaction will need a refresh at some point because
// its write timestamp has diverged from its read timestamp, consider doing
Expand Down Expand Up @@ -466,7 +468,7 @@ func (sr *txnSpanRefresher) maybeRefreshPreemptively(
refreshInevitable := hasET && args.(*roachpb.EndTxnRequest).Commit

// If neither condition is true, defer the refresh.
if !refreshFree && !refreshInevitable {
if !refreshFree && !refreshInevitable && !force {
return ba, nil
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/mock_transactional_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,11 @@ func (m *MockTransactionalSender) GetSteppingMode(context.Context) SteppingMode
return SteppingDisabled
}

// ManualRefresh is part of the TxnSender interface.
func (m *MockTransactionalSender) ManualRefresh(ctx context.Context) error {
panic("unimplemented")
}

// MockTxnSenderFactory is a TxnSenderFactory producing MockTxnSenders.
type MockTxnSenderFactory struct {
senderFunc func(context.Context, *roachpb.Transaction, roachpb.BatchRequest) (
Expand Down
14 changes: 14 additions & 0 deletions pkg/kv/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,20 @@ type TxnSender interface {
// GetSteppingMode accompanies ConfigureStepping. It is provided
// for use in tests and assertion checks.
GetSteppingMode(ctx context.Context) (curMode SteppingMode)

// ManualRefresh attempts to refresh a transactions read timestamp up to its
// provisional commit timestamp. In the case that the two are already the
// same, it is a no-op. The reason one might want to do that is to ensure
// that a transaction can commit without experiencing another push.
//
// A transaction which has proven all of its intents and has been fully
// refreshed and does not perform any additional reads or writes that does not
// contend with any other transactions will not be pushed further. This
// method's reason for existence is to ensure that range merge requests can
// be pushed but then can later commit without the possibility of needing to
// refresh reads performed on the RHS after the RHS has been subsumed but
// before the merge transaction completed.
ManualRefresh(ctx context.Context) error
}

// SteppingMode is the argument type to ConfigureStepping.
Expand Down
19 changes: 16 additions & 3 deletions pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,9 +334,7 @@ func (txn *Txn) SetSystemConfigTrigger(forSystemTenant bool) error {
}

// DisablePipelining instructs the transaction not to pipeline requests. It
// should rarely be necessary to call this method. It is only recommended for
// transactions that need extremely precise control over the request ordering,
// like the transaction that merges ranges together.
// should rarely be necessary to call this method.
//
// DisablePipelining must be called before any operations are performed on the
// transaction.
Expand Down Expand Up @@ -1297,3 +1295,18 @@ func (txn *Txn) ReleaseSavepoint(ctx context.Context, s SavepointToken) error {
defer txn.mu.Unlock()
return txn.mu.sender.ReleaseSavepoint(ctx, s)
}

// ManualRefresh forces a refresh of the read timestamp of a transaction to
// match that of its write timestamp. It is only recommended for transactions
// that need extremely precise control over the request ordering, like the
// transaction that merges ranges together. When combined with
// DisablePipelining, this feature allows the range merge transaction to
// prove that it will not be pushed between sending its SubsumeRequest and
// committing. This enables that request to be pushed at earlier points in
// its lifecycle.
func (txn *Txn) ManualRefresh(ctx context.Context) error {
txn.mu.Lock()
sender := txn.mu.sender
txn.mu.Unlock()
return sender.ManualRefresh(ctx)
}

0 comments on commit 9135287

Please sign in to comment.