Skip to content

Commit

Permalink
kv/concurrency: avoid redundant txn pushes and batch intent resolution
Browse files Browse the repository at this point in the history
Fixes #48790.
Informs #36876.
Closes #31664.

This commit adds a per-Range LRU cache of transactions that are known to
be aborted or committed. We use this cache in the lockTableWaiter for
two purposes:
1. when we see a lock held by a known-finalized txn, we neither wait out
   the kv.lock_table.coordinator_liveness_push_delay (10 ms) nor push the
   transactions record (RPC to leaseholder of pushee's txn record range).
2. we use the existence of a transaction in the cache as an indication that
   it may have abandoned multiple intents, perhaps due to a failure of the
   transaction coordinator node, so we begin deferring intent resolution to
   enable batching.

Together, these two changes make us much more effective as cleaning up
after failed transactions that have abandoned a large number of intents.
The following example demonstrates this:
```sql
--- BEFORE

CREATE TABLE keys (k BIGINT NOT NULL PRIMARY KEY);
BEGIN; INSERT INTO keys SELECT generate_series(1, 10000); ROLLBACK;
SELECT * FROM keys;

  k
-----
(0 rows)

Time: 2m50.801304266s

CREATE TABLE keys2 (k BIGINT NOT NULL PRIMARY KEY);
BEGIN; INSERT INTO keys2 SELECT generate_series(1, 10000); ROLLBACK;
INSERT INTO keys2 SELECT generate_series(1, 10000);

INSERT 10000

Time: 3m26.874571045s

--- AFTER

CREATE TABLE keys (k BIGINT NOT NULL PRIMARY KEY);
BEGIN; INSERT INTO keys SELECT generate_series(1, 10000); ROLLBACK;
SELECT * FROM keys;

  k
-----
(0 rows)

Time: 5.138220753s

CREATE TABLE keys2 (k BIGINT NOT NULL PRIMARY KEY);
BEGIN; INSERT INTO keys2 SELECT generate_series(1, 10000); ROLLBACK;
INSERT INTO keys2 SELECT generate_series(1, 10000);

INSERT 10000

Time: 48.763541138s
```

Notice that we are still not as fast at cleaning up intents on the
insertion path as we are at doing so on the retrieval path. This is
because we only batch the resolution of intents observed by a single
request at a time. For the scanning case, a single ScanRequest notices
all 10,000 intents and cleans them all up together. For the insertion
case, each of the 10,000 PutRequests notice a single intent, and each
intent is cleaned up individually. So this case is only benefited by
the first part of this change (no liveness delay or txn record push)
and not the second part of this change (intent resolution batching).

For this reason, we still haven't solved all of #36876. To completely
address that, we'll need to defer propagation of WriteIntentError during
batch evaluation, like we do for WriteTooOldErrors. Or we can wait out
the future LockTable changes - once we remove all cases where an intent
is not "discovered", the changes here will effectively address #36876.

This was a partial regression in v20.1, so we'll want to backport this
to that release branch. This change is on the larger side, but I feel ok
about it because the mechanics aren't too tricky. I'll wait a week before
backporting just to see if anything falls out.

Release note (bug fix): Abandoned intents due to failed transaction
coordinators are now cleaned up much faster. This resolves a regression
in v20.1.0 compared to prior releases.
  • Loading branch information
nvanbenschoten committed May 26, 2020
1 parent e3f2142 commit 67c6bdb
Show file tree
Hide file tree
Showing 10 changed files with 635 additions and 39 deletions.
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/concurrency/concurrency_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,11 @@ type lockTableWaiter interface {
// and, in turn, remove this method. This will likely fall out of pulling
// all replicated locks into the lockTable.
WaitOnLock(context.Context, Request, *roachpb.Intent) *Error

// ClearCaches wipes all caches maintained by the lockTableWaiter. This is
// primarily used to recover memory when a replica loses a lease. However,
// it is also used in tests to reset the state of the lockTableWaiter.
ClearCaches()
}

// txnWaitQueue holds a collection of wait-queues for transaction records.
Expand Down
8 changes: 7 additions & 1 deletion pkg/kv/kvserver/concurrency/concurrency_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ func (c *Config) initDefaults() {
// NewManager creates a new concurrency Manager structure.
func NewManager(cfg Config) Manager {
cfg.initDefaults()
return &managerImpl{
m := new(managerImpl)
*m = managerImpl{
// TODO(nvanbenschoten): move pkg/storage/spanlatch to a new
// pkg/storage/concurrency/latch package. Make it implement the
// latchManager interface directly, if possible.
Expand All @@ -89,6 +90,7 @@ func NewManager(cfg Config) Manager {
st: cfg.Settings,
stopper: cfg.Stopper,
ir: cfg.IntentResolver,
lm: m,
disableTxnPushing: cfg.DisableTxnPushing,
},
// TODO(nvanbenschoten): move pkg/storage/txnwait to a new
Expand All @@ -102,6 +104,7 @@ func NewManager(cfg Config) Manager {
Knobs: cfg.TxnWaitKnobs,
}),
}
return m
}

// SequenceReq implements the RequestSequencer interface.
Expand Down Expand Up @@ -342,6 +345,9 @@ func (m *managerImpl) OnRangeLeaseUpdated(isLeaseholder bool) {
const disable = true
m.lt.Clear(disable)
m.twq.Clear(disable)
// Also clear caches, since they won't be needed any time soon and
// consume memory.
m.ltw.ClearCaches()
}
}

Expand Down
49 changes: 38 additions & 11 deletions pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,21 +228,35 @@ func TestConcurrencyManagerBasic(t *testing.T) {
d.Fatalf(t, "unknown request: %s", reqName)
}

var txnName string
d.ScanArgs(t, "txn", &txnName)
txn, ok := c.txnsByName[txnName]
if !ok {
d.Fatalf(t, "unknown txn %s", txnName)
}
// Each roachpb.Intent is provided on an indented line.
var intents []roachpb.Intent
singleReqLines := strings.Split(d.Input, "\n")
for _, line := range singleReqLines {
var err error
d.Cmd, d.CmdArgs, err = datadriven.ParseLine(line)
if err != nil {
d.Fatalf(t, "error parsing single intent: %v", err)
}
if d.Cmd != "intent" {
d.Fatalf(t, "expected \"intent\", found %s", d.Cmd)
}

var key string
d.ScanArgs(t, "key", &key)
var txnName string
d.ScanArgs(t, "txn", &txnName)
txn, ok := c.txnsByName[txnName]
if !ok {
d.Fatalf(t, "unknown txn %s", txnName)
}

var key string
d.ScanArgs(t, "key", &key)

intents = append(intents, roachpb.MakeIntent(&txn.TxnMeta, roachpb.Key(key)))
}

opName := fmt.Sprintf("handle write intent error %s", reqName)
mon.runAsync(opName, func(ctx context.Context) {
wiErr := &roachpb.WriteIntentError{Intents: []roachpb.Intent{
roachpb.MakeIntent(&txn.TxnMeta, roachpb.Key(key)),
}}
wiErr := &roachpb.WriteIntentError{Intents: intents}
guard, err := m.HandleWriterIntentError(ctx, prev, wiErr)
if err != nil {
log.Eventf(ctx, "handled %v, returned error: %v", wiErr, err)
Expand Down Expand Up @@ -578,6 +592,19 @@ func (c *cluster) ResolveIntent(
return nil
}

// ResolveIntents implements the concurrency.IntentResolver interface.
func (c *cluster) ResolveIntents(
ctx context.Context, intents []roachpb.LockUpdate, opts intentresolver.ResolveOptions,
) *roachpb.Error {
log.Eventf(ctx, "resolving a batch of %d intent(s)", len(intents))
for _, intent := range intents {
if err := c.ResolveIntent(ctx, intent, opts); err != nil {
return err
}
}
return nil
}

func (c *cluster) newTxnID() uuid.UUID {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down
170 changes: 162 additions & 8 deletions pkg/kv/kvserver/concurrency/lock_table_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

Expand All @@ -43,7 +45,7 @@ var LockTableLivenessPushDelay = settings.RegisterDurationSetting(
// the cost on each of a transaction's abandoned locks and instead only pay
// it once per abandoned transaction per range or per node. This could come
// in a few different forms, including:
// - a per-wide cache of recently detected abandoned transaction IDs
// - a per-store cache of recently detected abandoned transaction IDs
// - a per-range reverse index from transaction ID to locked keys
//
// TODO(nvanbenschoten): increasing this default value.
Expand Down Expand Up @@ -83,6 +85,18 @@ type lockTableWaiterImpl struct {
st *cluster.Settings
stopper *stop.Stopper
ir IntentResolver
lm LockManager

// finalizedTxnCache is a small LRU cache that tracks transactions that
// were pushed and found to be finalized (COMMITTED or ABORTED). It is
// used as an optimization to avoid repeatedly pushing the transaction
// record when cleaning up the intents of an abandoned transaction.
//
// NOTE: it probably makes sense to maintain a single finalizedTxnCache
// across all Ranges on a Store instead of an individual cache per
// Range. For now, we don't do this because we don't share any state
// between separate concurrency.Manager instances.
finalizedTxnCache txnCache

// When set, WriteIntentError are propagated instead of pushing
// conflicting transactions.
Expand All @@ -97,25 +111,39 @@ type IntentResolver interface {
// provided pushee transaction immediately, if possible. Otherwise, it will
// block until the pushee transaction is finalized or eventually can be
// pushed successfully.
// TODO(nvanbenschoten): return a *roachpb.Transaction here.
PushTransaction(
context.Context, *enginepb.TxnMeta, roachpb.Header, roachpb.PushTxnType,
) (roachpb.Transaction, *Error)

// ResolveIntent resolves the provided intent according to the options.
// ResolveIntent synchronously resolves the provided intent.
ResolveIntent(context.Context, roachpb.LockUpdate, intentresolver.ResolveOptions) *Error

// ResolveIntents synchronously resolves the provided batch of intents.
ResolveIntents(context.Context, []roachpb.LockUpdate, intentresolver.ResolveOptions) *Error
}

// WaitOn implements the lockTableWaiter interface.
func (w *lockTableWaiterImpl) WaitOn(
ctx context.Context, req Request, guard lockTableGuard,
) *Error {
) (err *Error) {
newStateC := guard.NewStateChan()
ctxDoneC := ctx.Done()
shouldQuiesceC := w.stopper.ShouldQuiesce()
// Used to delay liveness and deadlock detection pushes.
var timer *timeutil.Timer
var timerC <-chan time.Time
var timerWaitingState waitingState
// Used to defer the resolution of duplicate intents. Intended to allow
// batching of intent resolution while cleaning up after abandoned txns. A
// request may begin deferring intent resolution and then be forced to wait
// again on other locks. This is ok, as the request that deferred intent
// resolution will often be the new reservation holder for those intents'
// keys. Even when this is not the case (e.g. the request is read-only so it
// can't hold reservations), any other requests that slip ahead will simply
// re-discover the intent(s) during evaluation and resolve them themselves.
var deferredResolution []roachpb.LockUpdate
defer w.resolveDeferredIntents(ctx, &err, &deferredResolution)
for {
select {
case <-newStateC:
Expand All @@ -138,10 +166,7 @@ func (w *lockTableWaiterImpl) WaitOn(
// The purpose of the waitForDistinguished state is to avoid waiting
// out the longer deadlock detection delay before recognizing and
// recovering from the failure of a transaction coordinator for
// *each* of that transaction's previously written intents. If we
// had a cache of aborted transaction IDs that allowed us to notice
// and quickly resolve abandoned intents then we might be able to
// get rid of this state.
// *each* of that transaction's previously written intents.
livenessPush := state.kind == waitForDistinguished
deadlockPush := true

Expand All @@ -167,6 +192,54 @@ func (w *lockTableWaiterImpl) WaitOn(
continue
}

// If we know that a lock holder is already finalized (COMMITTED
// or ABORTED), there's no reason to push it again. Instead, we
// can skip directly to intent resolution.
//
// As an optimization, we defer the intent resolution until the
// we're done waiting on all conflicting locks in this function.
// This allows us to accumulate a group of intents to resolve
// and send them together as a batch.
//
// Remember that if the lock is held, there will be at least one
// waiter with livenessPush = true (the distinguished waiter),
// so at least one request will enter this branch and perform
// the cleanup on behalf of all other waiters.
if livenessPush {
if pusheeTxn, ok := w.finalizedTxnCache.get(state.txn.ID); ok {
resolve := roachpb.MakeLockUpdate(pusheeTxn, roachpb.Span{Key: state.key})
deferredResolution = append(deferredResolution, resolve)

// Inform the LockManager that the lock has been updated with a
// finalized status so that it gets removed from the lockTable
// and we are allowed to proceed.
//
// For unreplicated locks, this is all that is needed - the
// lockTable is the source of truth so, once removed, the
// unreplicated lock is gone. It is perfectly valid for us to
// instruct the lock to be released because we know that the
// lock's owner is finalized.
//
// For replicated locks, this is a bit of a lie. The lock hasn't
// actually been updated yet, but we will be conducting intent
// resolution in the future (before we observe the corresponding
// MVCC state). This is safe because we already handle cases
// where locks exist only in the MVCC keyspace and not in the
// lockTable.
//
// In the future, we'd like to make this more explicit.
// Specifically, we'd like to augment the lockTable with an
// understanding of finalized but not yet resolved locks. These
// locks will allow conflicting transactions to proceed with
// evaluation without the need to first remove all traces of
// them via a round of replication. This is discussed in more
// detail in #41720. Specifically, see mention of "contention
// footprint" and COMMITTED_BUT_NOT_REMOVABLE.
w.lm.OnLockUpdated(ctx, &deferredResolution[len(deferredResolution)-1])
continue
}
}

// The request should push to detect abandoned locks due to
// failed transaction coordinators, detect deadlocks between
// transactions, or both, but only after delay. This delay
Expand Down Expand Up @@ -266,7 +339,6 @@ func (w *lockTableWaiterImpl) WaitOn(
// behind a lock. In this case, the request has a dependency on the
// conflicting request but not necessarily the entire conflicting
// transaction.
var err *Error
if timerWaitingState.held {
err = w.pushLockTxn(ctx, req, timerWaitingState)
} else {
Expand Down Expand Up @@ -316,6 +388,11 @@ func (w *lockTableWaiterImpl) WaitOnLock(
})
}

// ClearCaches implements the lockTableWaiter interface.
func (w *lockTableWaiterImpl) ClearCaches() {
w.finalizedTxnCache.clear()
}

// pushLockTxn pushes the holder of the provided lock.
//
// The method blocks until the lock holder transaction experiences a state
Expand Down Expand Up @@ -353,6 +430,13 @@ func (w *lockTableWaiterImpl) pushLockTxn(
return err
}

// If the transaction is finalized, add it to the finalizedTxnCache. This
// avoids needing to push it again if we find another one of its locks and
// allows for batching of intent resolution.
if pusheeTxn.Status.IsFinalized() {
w.finalizedTxnCache.add(&pusheeTxn)
}

// If the push succeeded then the lock holder transaction must have
// experienced a state transition such that it no longer conflicts with
// the pusher's request. This state transition could have been any of the
Expand Down Expand Up @@ -485,6 +569,20 @@ func (w *lockTableWaiterImpl) pushHeader(req Request) roachpb.Header {
return h
}

// resolveDeferredIntents resolves the batch of intents if the provided error is
// nil. The batch of intents may be resolved more efficiently than if they were
// resolved individually.
func (w *lockTableWaiterImpl) resolveDeferredIntents(
ctx context.Context, err **Error, deferredResolution *[]roachpb.LockUpdate,
) {
if (*err != nil) || (len(*deferredResolution) == 0) {
return
}
// See pushLockTxn for an explanation of these options.
opts := intentresolver.ResolveOptions{Poison: true}
*err = w.ir.ResolveIntents(ctx, *deferredResolution, opts)
}

// watchForNotifications selects on the provided channel and watches for any
// updates. If the channel is ever notified, it calls the provided context
// cancelation function and exits.
Expand All @@ -504,6 +602,62 @@ func (w *lockTableWaiterImpl) watchForNotifications(
}
}

// txnCache is a small LRU cache that holds Transaction objects.
//
// The zero value of this struct is ready for use.
type txnCache struct {
mu syncutil.Mutex
txns [8]*roachpb.Transaction // [MRU, ..., LRU]
}

func (c *txnCache) get(id uuid.UUID) (*roachpb.Transaction, bool) {
c.mu.Lock()
defer c.mu.Unlock()
if idx := c.getIdxLocked(id); idx >= 0 {
txn := c.txns[idx]
c.moveFrontLocked(txn, idx)
return txn, true
}
return nil, false
}

func (c *txnCache) add(txn *roachpb.Transaction) {
c.mu.Lock()
defer c.mu.Unlock()
if idx := c.getIdxLocked(txn.ID); idx >= 0 {
c.moveFrontLocked(txn, idx)
} else {
c.insertFrontLocked(txn)
}
}

func (c *txnCache) clear() {
c.mu.Lock()
defer c.mu.Unlock()
for i := range c.txns {
c.txns[i] = nil
}
}

func (c *txnCache) getIdxLocked(id uuid.UUID) int {
for i, txn := range c.txns {
if txn != nil && txn.ID == id {
return i
}
}
return -1
}

func (c *txnCache) moveFrontLocked(txn *roachpb.Transaction, cur int) {
copy(c.txns[1:cur+1], c.txns[:cur])
c.txns[0] = txn
}

func (c *txnCache) insertFrontLocked(txn *roachpb.Transaction) {
copy(c.txns[1:], c.txns[:])
c.txns[0] = txn
}

func hasMinPriority(txn *enginepb.TxnMeta) bool {
return txn != nil && txn.Priority == enginepb.MinTxnPriority
}
Expand Down
Loading

0 comments on commit 67c6bdb

Please sign in to comment.