Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: immediately push on WriteIntentError when lock-table disabled #46234

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions pkg/kv/kvserver/closed_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ import (
gosql "database/sql"
"fmt"
"math/rand"
"strconv"
"sync/atomic"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -172,6 +175,79 @@ func TestClosedTimestampCanServeThroughoutLeaseTransfer(t *testing.T) {
}
}

// TestClosedTimestampCanServeWithConflictingIntent validates that a read served
// from a follower replica will wait on conflicting intents and ensure that they
// are cleaned up if necessary to allow the read to proceed.
func TestClosedTimestampCanServeWithConflictingIntent(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
tc, _, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t, testingTargetDuration)
defer tc.Stopper().Stop(ctx)
ds := tc.Server(0).DistSenderI().(*kvcoord.DistSender)

// Write N different intents for the same transaction, where N is the number
// of replicas in the testing range. Each intent will be read and eventually
// resolved by a read on a different replica.
txnKey := desc.StartKey.AsRawKey()
txnKey = txnKey[:len(txnKey):len(txnKey)] // avoid aliasing
txn := roachpb.MakeTransaction("txn", txnKey, 0, tc.Server(0).Clock().Now(), 0)
var keys []roachpb.Key
for i := range repls {
key := append(txnKey, []byte(strconv.Itoa(i))...)
keys = append(keys, key)
put := putArgs(key, []byte("val"))
resp, err := kv.SendWrappedWith(ctx, ds, roachpb.Header{Txn: &txn}, put)
if err != nil {
t.Fatal(err)
}
txn.Update(resp.Header().Txn)
}

// Read a different intent on each replica. All should begin waiting on the
// intents by pushing the transaction that wrote them. None should complete.
ts := txn.WriteTimestamp
respCh := make(chan struct{}, len(keys))
for i, key := range keys {
go func(repl *kvserver.Replica, key roachpb.Key) {
var baRead roachpb.BatchRequest
r := &roachpb.ScanRequest{}
r.Key = key
r.EndKey = key.Next()
baRead.Add(r)
baRead.Timestamp = ts
baRead.RangeID = desc.RangeID

testutils.SucceedsSoon(t, func() error {
// Expect 0 rows, because the intents will be aborted.
_, err := expectRows(0)(repl.Send(ctx, baRead))
return err
})
respCh <- struct{}{}
}(repls[i], key)
}

select {
case <-respCh:
t.Fatal("request unexpectedly succeeded, should block")
case <-time.After(20 * time.Millisecond):
}

// Abort the transaction. All pushes should succeed and all intents should
// be resolved, allowing all reads (on the leaseholder and on followers) to
// proceed and finish.
endTxn := &roachpb.EndTxnRequest{
RequestHeader: roachpb.RequestHeader{Key: txn.Key},
Commit: false,
}
if _, err := kv.SendWrappedWith(ctx, ds, roachpb.Header{Txn: &txn}, endTxn); err != nil {
t.Fatal(err)
}
for range keys {
<-respCh
}
}

// TestClosedTimestampCanServeAfterSplitsAndMerges validates the invariant that
// if a timestamp is safe for reading on both the left side and right side of a
// a merge then it will be safe after the merge and that if a timestamp is safe
Expand Down
26 changes: 24 additions & 2 deletions pkg/kv/kvserver/concurrency/concurrency_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ type ContentionHandler interface {
// error in the lock's wait-queue (but does not wait) and releases the
// guard's latches. It returns an updated guard reflecting this change.
// After the method returns, the original guard should no longer be used.
// If an error is returned then the provided guard will be released and no
// guard will be returned.
//
// Example usage: Txn A scans the lock table and does not see an intent on
// key K from txn B because the intent is not being tracked in the lock
Expand All @@ -204,7 +206,7 @@ type ContentionHandler interface {
// method before txn A retries its scan. During the retry, txn A scans the
// lock table and observes the lock on key K, so it enters the lock's
// wait-queue and waits for it to be resolved.
HandleWriterIntentError(context.Context, *Guard, *roachpb.WriteIntentError) *Guard
HandleWriterIntentError(context.Context, *Guard, *roachpb.WriteIntentError) (*Guard, *Error)

// HandleTransactionPushError consumes a TransactionPushError thrown by a
// PushTxnRequest by informing the concurrency manager about a transaction
Expand Down Expand Up @@ -474,7 +476,11 @@ type lockTable interface {
//
// A latch consistent with the access desired by the guard must be held on
// the span containing the discovered lock's key.
AddDiscoveredLock(*roachpb.Intent, lockTableGuard) error
//
// The method returns a boolean indicating whether the discovered lock was
// added to the lockTable (true) or whether it was ignored because the
// lockTable is currently disabled (false).
AddDiscoveredLock(*roachpb.Intent, lockTableGuard) (bool, error)

// AcquireLock informs the lockTable that a new lock was acquired or an
// existing lock was updated.
Expand Down Expand Up @@ -610,6 +616,22 @@ type lockTableWaiter interface {
// wait-queues and it is safe to re-acquire latches and scan the lockTable
// again.
WaitOn(context.Context, Request, lockTableGuard) *Error

// WaitOnLock waits on the transaction responsible for the specified lock
// and then ensures that the lock is cleared out of the request's way.
//
// The method should be called after dropping any latches that a request has
// acquired. It returns when the lock has been resolved.
//
// NOTE: this method is used when the lockTable is disabled (e.g. on a
// follower replica) and a lock is discovered that must be waited on (e.g.
// during a follower read). If/when lockTables are maintained on follower
// replicas by propagating lockTable state transitions through the Raft log
// in the ReplicatedEvalResult instead of through the (leaseholder-only)
// LocalResult, we should be able to remove the lockTable "disabled" state
// and, in turn, remove this method. This will likely fall out of pulling
// all replicated locks into the lockTable.
WaitOnLock(context.Context, Request, *roachpb.Intent) *Error
}

// txnWaitQueue holds a collection of wait-queues for transaction records.
Expand Down
62 changes: 57 additions & 5 deletions pkg/kv/kvserver/concurrency/concurrency_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
Expand Down Expand Up @@ -85,7 +86,6 @@ func NewManager(cfg Config) Manager {
maxLocks: cfg.MaxLockTableSize,
},
ltw: &lockTableWaiterImpl{
nodeID: cfg.NodeDesc.NodeID,
st: cfg.Settings,
stopper: cfg.Stopper,
ir: cfg.IntentResolver,
Expand Down Expand Up @@ -244,27 +244,47 @@ func (m *managerImpl) FinishReq(g *Guard) {
// HandleWriterIntentError implements the ContentionHandler interface.
func (m *managerImpl) HandleWriterIntentError(
ctx context.Context, g *Guard, t *roachpb.WriteIntentError,
) *Guard {
) (*Guard, *Error) {
if g.ltg == nil {
log.Fatalf(ctx, "cannot handle WriteIntentError %v for request without "+
"lockTableGuard; were lock spans declared for this request?", t)
}

// Add a discovered lock to lock-table for each intent and enter each lock's
// wait-queue.
// wait-queue. If the lock-table is disabled and one or more of the intents
// are ignored then we immediately wait on all intents.
wait := false
for i := range t.Intents {
intent := &t.Intents[i]
if err := m.lt.AddDiscoveredLock(intent, g.ltg); err != nil {
added, err := m.lt.AddDiscoveredLock(intent, g.ltg)
if err != nil {
log.Fatal(ctx, errors.HandleAsAssertionFailure(err))
}
if !added {
wait = true
}
}

// Release the Guard's latches but continue to remain in lock wait-queues by
// not releasing lockWaitQueueGuards. We expect the caller of this method to
// then re-sequence the Request by calling SequenceReq with the un-latched
// Guard. This is analogous to iterating through the loop in SequenceReq.
m.lm.Release(g.moveLatchGuard())
return g

// If the lockTable was disabled then we need to immediately wait on the
// intents to ensure that they are resolved and moved out of the request's
// way.
if wait {
for i := range t.Intents {
intent := &t.Intents[i]
if err := m.ltw.WaitOnLock(ctx, g.Req, intent); err != nil {
m.FinishReq(g)
return nil, err
}
}
}

return g, nil
}

// HandleTransactionPushError implements the ContentionHandler interface.
Expand Down Expand Up @@ -378,6 +398,38 @@ func (m *managerImpl) TxnWaitQueue() *txnwait.Queue {
return m.twq.(*txnwait.Queue)
}

func (r *Request) txnMeta() *enginepb.TxnMeta {
if r.Txn == nil {
return nil
}
return &r.Txn.TxnMeta
}

// readConflictTimestamp returns the maximum timestamp at which the request
// conflicts with locks acquired by other transaction. The request must wait
// for all locks acquired by other transactions at or below this timestamp
// to be released. All locks acquired by other transactions above this
// timestamp are ignored.
func (r *Request) readConflictTimestamp() hlc.Timestamp {
ts := r.Timestamp
if r.Txn != nil {
ts = r.Txn.ReadTimestamp
ts.Forward(r.Txn.MaxTimestamp)
}
return ts
}

// writeConflictTimestamp returns the minimum timestamp at which the request
// acquires locks when performing mutations. All writes performed by the
// requests must take place at or above this timestamp.
func (r *Request) writeConflictTimestamp() hlc.Timestamp {
ts := r.Timestamp
if r.Txn != nil {
ts = r.Txn.WriteTimestamp
}
return ts
}

func (r *Request) isSingle(m roachpb.Method) bool {
if len(r.Requests) != 1 {
return false
Expand Down
33 changes: 24 additions & 9 deletions pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import (
//
// The input files use the following DSL:
//
// new-txn name=<txn-name> ts=<int>[,<int>] epoch=<int>
// new-txn name=<txn-name> ts=<int>[,<int>] epoch=<int> [maxts=<int>[,<int>]]
// new-request name=<req-name> txn=<txn-name>|none ts=<int>[,<int>] [priority] [consistency]
// <proto-name> [<field-name>=<field-value>...]
// sequence req=<req-name>
Expand All @@ -61,8 +61,8 @@ import (
// handle-txn-push-error req=<req-name> txn=<txn-name> key=<key> TODO(nvanbenschoten): implement this
//
// on-lock-acquired txn=<txn-name> key=<key>
// on-lock-updated txn=<txn-name> key=<key> status=[committed|aborted|pending] [ts<int>[,<int>]]
// on-txn-updated txn=<txn-name> status=[committed|aborted|pending] [ts<int>[,<int>]]
// on-lock-updated txn=<txn-name> key=<key> status=[committed|aborted|pending] [ts=<int>[,<int>]]
// on-txn-updated txn=<txn-name> status=[committed|aborted|pending] [ts=<int>[,<int>]]
//
// on-lease-updated leaseholder=<bool>
// on-split
Expand Down Expand Up @@ -94,6 +94,11 @@ func TestConcurrencyManagerBasic(t *testing.T) {
var epoch int
d.ScanArgs(t, "epoch", &epoch)

maxTS := ts
if d.HasArg("maxts") {
maxTS = scanTimestampWithName(t, d, "maxts")
}

txn, ok := c.txnsByName[txnName]
var id uuid.UUID
if ok {
Expand All @@ -110,6 +115,7 @@ func TestConcurrencyManagerBasic(t *testing.T) {
Priority: 1, // not min or max
},
ReadTimestamp: ts,
MaxTimestamp: maxTS,
}
txn.UpdateObservedTimestamp(c.nodeDesc.NodeID, ts)
c.registerTxn(txnName, txn)
Expand Down Expand Up @@ -215,7 +221,7 @@ func TestConcurrencyManagerBasic(t *testing.T) {
case "handle-write-intent-error":
var reqName string
d.ScanArgs(t, "req", &reqName)
guard, ok := c.guardsByReqName[reqName]
prev, ok := c.guardsByReqName[reqName]
if !ok {
d.Fatalf(t, "unknown request: %s", reqName)
}
Expand All @@ -231,12 +237,22 @@ func TestConcurrencyManagerBasic(t *testing.T) {
d.ScanArgs(t, "key", &key)

opName := fmt.Sprintf("handle write intent error %s", reqName)
mon.runSync(opName, func(ctx context.Context) {
err := &roachpb.WriteIntentError{Intents: []roachpb.Intent{
mon.runAsync(opName, func(ctx context.Context) {
wiErr := &roachpb.WriteIntentError{Intents: []roachpb.Intent{
roachpb.MakeIntent(&txn.TxnMeta, roachpb.Key(key)),
}}
log.Eventf(ctx, "handling %v", err)
guard = m.HandleWriterIntentError(ctx, guard, err)
guard, err := m.HandleWriterIntentError(ctx, prev, wiErr)
if err != nil {
log.Eventf(ctx, "handled %v, returned error: %v", wiErr, err)
c.mu.Lock()
delete(c.guardsByReqName, reqName)
c.mu.Unlock()
} else {
log.Eventf(ctx, "handled %v, released latches", wiErr)
c.mu.Lock()
c.guardsByReqName[reqName] = guard
c.mu.Unlock()
}
})
return c.waitAndCollect(t, mon)

Expand Down Expand Up @@ -445,7 +461,6 @@ func (c *cluster) makeConfig() concurrency.Config {
func (c *cluster) PushTransaction(
ctx context.Context, pushee *enginepb.TxnMeta, h roachpb.Header, pushType roachpb.PushTxnType,
) (roachpb.Transaction, *roachpb.Error) {
log.Eventf(ctx, "pushing txn %s", pushee.ID.Short())
pusheeRecord, err := c.getTxnRecord(pushee.ID)
if err != nil {
return roachpb.Transaction{}, roachpb.NewError(err)
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/concurrency/datadriven_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,13 @@ func nextUUID(counter *uint32) uuid.UUID {
}

func scanTimestamp(t *testing.T, d *datadriven.TestData) hlc.Timestamp {
return scanTimestampWithName(t, d, "ts")
}

func scanTimestampWithName(t *testing.T, d *datadriven.TestData, name string) hlc.Timestamp {
var ts hlc.Timestamp
var tsS string
d.ScanArgs(t, "ts", &tsS)
d.ScanArgs(t, name, &tsS)
parts := strings.Split(tsS, ",")

// Find the wall time part.
Expand Down
Loading