Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
46206: sql: add active user txn information to crdb_internal r=rohany a=rohany

Fixes #46055.

Release justification: low risk change to existing functionality

Release note (sql change): This PR:
* Adds new internal tables `crdb_internal.node_transactions`
and `crdb_internal.cluster_transactions.`
* These tables contain some metadata about active user transactions.
* Adds the column `txn_id` to the `crdb_internal.node_queries` and
  `crdb_internal.cluster_queries` tables. These fields represent
  the transaction ID of each query in each row.


46234: kv: immediately push on WriteIntentError when lock-table disabled r=nvanbenschoten a=nvanbenschoten

Fixes #46148.

This commit fixes a bug where follower reads that hit intents could get stuck in an indefinite loop of running into the intent during evaluation, not adding the intent to the lock-table because the lock table was disabled, sequencing in the concurrency manager without issue, and repeating. The new TestClosedTimestampCanServeWithConflictingIntent test hits exactly this issue before this commit.

The fix implemented here is to immediately push the transaction responsible for an intent when serving a follower read (i.e. when a replica's lock-table is disabled). This ensures that the intent gets cleaned up if it was abandoned and avoids the busy loop we see today. If/when lockTables are maintained on follower replicas by propagating lockTable state transitions through the Raft log in the ReplicatedEvalResult instead of through the (leaseholder-only) LocalResult, we should be able to remove the lockTable "disabled" state and, in turn, remove this special-case.

The alternative approach floated to address this was to simply pass a NotLeaseHolderError back to the client when an intent is hit on a follower. This would have worked to avoid the infinite loop, but it seems like a short-term patch that doesn't get to the root of the issue. As we push further on follower reads (or even consistent read replicas), we want non-leaseholders to be able to perform conflict resolution. Falling back to the leaseholder works counter to this goal. The approach implemented by this commit works towards this goal, simply falling back to the previous sub-optimal approach of pushing immediately during conflicts.

Release note (bug fix): Follower reads that hit intents no longer have a chance of entering an infinite loop. This bug was present in earlier versions of the v20.1 release.

Release justification: fixes a high-priority bug where follower reads could get stuck indefinitely if they hit an abandoned intent.

46328: c-deps/rocksdb: Bump to pick up WAL sequence check r=itsbilal a=itsbilal

Picks up cockroachdb/rocksdb#78 ,
a change to ensure sequence numbers are in increasing order
during WAL replay.

Release justification: Adds a safety check
Release note (bug fix): Adds a check that detects invalid sequence numbers
in the RocksDB write-ahead log and returns an error during start
instead of applying the invalid log entries.

Co-authored-by: Rohan Yadav <[email protected]>
Co-authored-by: Nathan VanBenschoten <[email protected]>
Co-authored-by: Bilal Akhtar <[email protected]>
  • Loading branch information
4 people committed Mar 19, 2020
4 parents 6e991cb + 3da00f0 + 4b258e5 + 747c640 commit eb8bd6b
Show file tree
Hide file tree
Showing 50 changed files with 2,062 additions and 932 deletions.
2 changes: 1 addition & 1 deletion c-deps/rocksdb
Submodule rocksdb updated 1 files
+7 −0 db/db_impl_open.cc
2 changes: 2 additions & 0 deletions pkg/cli/zip.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ var debugZipTablesPerCluster = []string{
"crdb_internal.cluster_queries",
"crdb_internal.cluster_sessions",
"crdb_internal.cluster_settings",
"crdb_internal.cluster_transactions",

"crdb_internal.jobs",
"system.jobs", // get the raw, restorable jobs records too.
Expand Down Expand Up @@ -90,6 +91,7 @@ var debugZipTablesPerNode = []string{
"crdb_internal.node_runtime_info",
"crdb_internal.node_sessions",
"crdb_internal.node_statement_statistics",
"crdb_internal.node_transactions",
"crdb_internal.node_txn_stats",
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/cli/zip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ requesting data for debug/reports/problemranges... writing: debug/reports/proble
retrieving SQL data for crdb_internal.cluster_queries... writing: debug/crdb_internal.cluster_queries.txt
retrieving SQL data for crdb_internal.cluster_sessions... writing: debug/crdb_internal.cluster_sessions.txt
retrieving SQL data for crdb_internal.cluster_settings... writing: debug/crdb_internal.cluster_settings.txt
retrieving SQL data for crdb_internal.cluster_transactions... writing: debug/crdb_internal.cluster_transactions.txt
retrieving SQL data for crdb_internal.jobs... writing: debug/crdb_internal.jobs.txt
retrieving SQL data for system.jobs... writing: debug/system.jobs.txt
retrieving SQL data for system.descriptor... writing: debug/system.descriptor.txt
Expand All @@ -161,6 +162,7 @@ retrieving SQL data for crdb_internal.node_queries... writing: debug/nodes/1/crd
retrieving SQL data for crdb_internal.node_runtime_info... writing: debug/nodes/1/crdb_internal.node_runtime_info.txt
retrieving SQL data for crdb_internal.node_sessions... writing: debug/nodes/1/crdb_internal.node_sessions.txt
retrieving SQL data for crdb_internal.node_statement_statistics... writing: debug/nodes/1/crdb_internal.node_statement_statistics.txt
retrieving SQL data for crdb_internal.node_transactions... writing: debug/nodes/1/crdb_internal.node_transactions.txt
retrieving SQL data for crdb_internal.node_txn_stats... writing: debug/nodes/1/crdb_internal.node_txn_stats.txt
requesting data for debug/nodes/1/details... writing: debug/nodes/1/details.json
requesting data for debug/nodes/1/gossip... writing: debug/nodes/1/gossip.json
Expand Down Expand Up @@ -427,6 +429,7 @@ requesting data for debug/reports/problemranges... writing: debug/reports/proble
retrieving SQL data for crdb_internal.cluster_queries... writing: debug/crdb_internal.cluster_queries.txt
retrieving SQL data for crdb_internal.cluster_sessions... writing: debug/crdb_internal.cluster_sessions.txt
retrieving SQL data for crdb_internal.cluster_settings... writing: debug/crdb_internal.cluster_settings.txt
retrieving SQL data for crdb_internal.cluster_transactions... writing: debug/crdb_internal.cluster_transactions.txt
retrieving SQL data for crdb_internal.jobs... writing: debug/crdb_internal.jobs.txt
^- resulted in ...
retrieving SQL data for system.jobs... writing: debug/system.jobs.txt
Expand Down Expand Up @@ -463,6 +466,7 @@ retrieving SQL data for crdb_internal.node_queries... writing: debug/nodes/1/crd
retrieving SQL data for crdb_internal.node_runtime_info... writing: debug/nodes/1/crdb_internal.node_runtime_info.txt
retrieving SQL data for crdb_internal.node_sessions... writing: debug/nodes/1/crdb_internal.node_sessions.txt
retrieving SQL data for crdb_internal.node_statement_statistics... writing: debug/nodes/1/crdb_internal.node_statement_statistics.txt
retrieving SQL data for crdb_internal.node_transactions... writing: debug/nodes/1/crdb_internal.node_transactions.txt
retrieving SQL data for crdb_internal.node_txn_stats... writing: debug/nodes/1/crdb_internal.node_txn_stats.txt
requesting data for debug/nodes/1/details... writing: debug/nodes/1/details.json
requesting data for debug/nodes/1/gossip... writing: debug/nodes/1/gossip.json
Expand Down Expand Up @@ -545,6 +549,7 @@ requesting data for debug/reports/problemranges... writing: debug/reports/proble
retrieving SQL data for crdb_internal.cluster_queries... writing: debug/crdb_internal.cluster_queries.txt
retrieving SQL data for crdb_internal.cluster_sessions... writing: debug/crdb_internal.cluster_sessions.txt
retrieving SQL data for crdb_internal.cluster_settings... writing: debug/crdb_internal.cluster_settings.txt
retrieving SQL data for crdb_internal.cluster_transactions... writing: debug/crdb_internal.cluster_transactions.txt
retrieving SQL data for crdb_internal.jobs... writing: debug/crdb_internal.jobs.txt
retrieving SQL data for system.jobs... writing: debug/system.jobs.txt
retrieving SQL data for system.descriptor... writing: debug/system.descriptor.txt
Expand All @@ -569,6 +574,7 @@ retrieving SQL data for crdb_internal.node_queries... writing: debug/nodes/1/crd
retrieving SQL data for crdb_internal.node_runtime_info... writing: debug/nodes/1/crdb_internal.node_runtime_info.txt
retrieving SQL data for crdb_internal.node_sessions... writing: debug/nodes/1/crdb_internal.node_sessions.txt
retrieving SQL data for crdb_internal.node_statement_statistics... writing: debug/nodes/1/crdb_internal.node_statement_statistics.txt
retrieving SQL data for crdb_internal.node_transactions... writing: debug/nodes/1/crdb_internal.node_transactions.txt
retrieving SQL data for crdb_internal.node_txn_stats... writing: debug/nodes/1/crdb_internal.node_txn_stats.txt
requesting data for debug/nodes/1/details... writing: debug/nodes/1/details.json
requesting data for debug/nodes/1/gossip... writing: debug/nodes/1/gossip.json
Expand Down Expand Up @@ -638,6 +644,8 @@ retrieving SQL data for crdb_internal.node_sessions... writing: debug/nodes/2/cr
^- resulted in ...
retrieving SQL data for crdb_internal.node_statement_statistics... writing: debug/nodes/2/crdb_internal.node_statement_statistics.txt
^- resulted in ...
retrieving SQL data for crdb_internal.node_transactions... writing: debug/nodes/2/crdb_internal.node_transactions.txt
^- resulted in ...
retrieving SQL data for crdb_internal.node_txn_stats... writing: debug/nodes/2/crdb_internal.node_txn_stats.txt
^- resulted in ...
requesting data for debug/nodes/2/details... writing: debug/nodes/2/details.json
Expand Down Expand Up @@ -674,6 +682,7 @@ retrieving SQL data for crdb_internal.node_queries... writing: debug/nodes/3/crd
retrieving SQL data for crdb_internal.node_runtime_info... writing: debug/nodes/3/crdb_internal.node_runtime_info.txt
retrieving SQL data for crdb_internal.node_sessions... writing: debug/nodes/3/crdb_internal.node_sessions.txt
retrieving SQL data for crdb_internal.node_statement_statistics... writing: debug/nodes/3/crdb_internal.node_statement_statistics.txt
retrieving SQL data for crdb_internal.node_transactions... writing: debug/nodes/3/crdb_internal.node_transactions.txt
retrieving SQL data for crdb_internal.node_txn_stats... writing: debug/nodes/3/crdb_internal.node_txn_stats.txt
requesting data for debug/nodes/3/details... writing: debug/nodes/3/details.json
requesting data for debug/nodes/3/gossip... writing: debug/nodes/3/gossip.json
Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,13 @@ func (tc *TxnCoordSender) SetDebugName(name string) {
tc.mu.txn.Name = name
}

// String is part of the client.TxnSender interface.
func (tc *TxnCoordSender) String() string {
tc.mu.Lock()
defer tc.mu.Unlock()
return tc.mu.txn.String()
}

// ReadTimestamp is part of the client.TxnSender interface.
func (tc *TxnCoordSender) ReadTimestamp() hlc.Timestamp {
tc.mu.Lock()
Expand Down
76 changes: 76 additions & 0 deletions pkg/kv/kvserver/closed_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ import (
gosql "database/sql"
"fmt"
"math/rand"
"strconv"
"sync/atomic"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -172,6 +175,79 @@ func TestClosedTimestampCanServeThroughoutLeaseTransfer(t *testing.T) {
}
}

// TestClosedTimestampCanServeWithConflictingIntent validates that a read served
// from a follower replica will wait on conflicting intents and ensure that they
// are cleaned up if necessary to allow the read to proceed.
func TestClosedTimestampCanServeWithConflictingIntent(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
tc, _, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t, testingTargetDuration)
defer tc.Stopper().Stop(ctx)
ds := tc.Server(0).DistSenderI().(*kvcoord.DistSender)

// Write N different intents for the same transaction, where N is the number
// of replicas in the testing range. Each intent will be read and eventually
// resolved by a read on a different replica.
txnKey := desc.StartKey.AsRawKey()
txnKey = txnKey[:len(txnKey):len(txnKey)] // avoid aliasing
txn := roachpb.MakeTransaction("txn", txnKey, 0, tc.Server(0).Clock().Now(), 0)
var keys []roachpb.Key
for i := range repls {
key := append(txnKey, []byte(strconv.Itoa(i))...)
keys = append(keys, key)
put := putArgs(key, []byte("val"))
resp, err := kv.SendWrappedWith(ctx, ds, roachpb.Header{Txn: &txn}, put)
if err != nil {
t.Fatal(err)
}
txn.Update(resp.Header().Txn)
}

// Read a different intent on each replica. All should begin waiting on the
// intents by pushing the transaction that wrote them. None should complete.
ts := txn.WriteTimestamp
respCh := make(chan struct{}, len(keys))
for i, key := range keys {
go func(repl *kvserver.Replica, key roachpb.Key) {
var baRead roachpb.BatchRequest
r := &roachpb.ScanRequest{}
r.Key = key
r.EndKey = key.Next()
baRead.Add(r)
baRead.Timestamp = ts
baRead.RangeID = desc.RangeID

testutils.SucceedsSoon(t, func() error {
// Expect 0 rows, because the intents will be aborted.
_, err := expectRows(0)(repl.Send(ctx, baRead))
return err
})
respCh <- struct{}{}
}(repls[i], key)
}

select {
case <-respCh:
t.Fatal("request unexpectedly succeeded, should block")
case <-time.After(20 * time.Millisecond):
}

// Abort the transaction. All pushes should succeed and all intents should
// be resolved, allowing all reads (on the leaseholder and on followers) to
// proceed and finish.
endTxn := &roachpb.EndTxnRequest{
RequestHeader: roachpb.RequestHeader{Key: txn.Key},
Commit: false,
}
if _, err := kv.SendWrappedWith(ctx, ds, roachpb.Header{Txn: &txn}, endTxn); err != nil {
t.Fatal(err)
}
for range keys {
<-respCh
}
}

// TestClosedTimestampCanServeAfterSplitsAndMerges validates the invariant that
// if a timestamp is safe for reading on both the left side and right side of a
// a merge then it will be safe after the merge and that if a timestamp is safe
Expand Down
26 changes: 24 additions & 2 deletions pkg/kv/kvserver/concurrency/concurrency_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ type ContentionHandler interface {
// error in the lock's wait-queue (but does not wait) and releases the
// guard's latches. It returns an updated guard reflecting this change.
// After the method returns, the original guard should no longer be used.
// If an error is returned then the provided guard will be released and no
// guard will be returned.
//
// Example usage: Txn A scans the lock table and does not see an intent on
// key K from txn B because the intent is not being tracked in the lock
Expand All @@ -204,7 +206,7 @@ type ContentionHandler interface {
// method before txn A retries its scan. During the retry, txn A scans the
// lock table and observes the lock on key K, so it enters the lock's
// wait-queue and waits for it to be resolved.
HandleWriterIntentError(context.Context, *Guard, *roachpb.WriteIntentError) *Guard
HandleWriterIntentError(context.Context, *Guard, *roachpb.WriteIntentError) (*Guard, *Error)

// HandleTransactionPushError consumes a TransactionPushError thrown by a
// PushTxnRequest by informing the concurrency manager about a transaction
Expand Down Expand Up @@ -474,7 +476,11 @@ type lockTable interface {
//
// A latch consistent with the access desired by the guard must be held on
// the span containing the discovered lock's key.
AddDiscoveredLock(*roachpb.Intent, lockTableGuard) error
//
// The method returns a boolean indicating whether the discovered lock was
// added to the lockTable (true) or whether it was ignored because the
// lockTable is currently disabled (false).
AddDiscoveredLock(*roachpb.Intent, lockTableGuard) (bool, error)

// AcquireLock informs the lockTable that a new lock was acquired or an
// existing lock was updated.
Expand Down Expand Up @@ -610,6 +616,22 @@ type lockTableWaiter interface {
// wait-queues and it is safe to re-acquire latches and scan the lockTable
// again.
WaitOn(context.Context, Request, lockTableGuard) *Error

// WaitOnLock waits on the transaction responsible for the specified lock
// and then ensures that the lock is cleared out of the request's way.
//
// The method should be called after dropping any latches that a request has
// acquired. It returns when the lock has been resolved.
//
// NOTE: this method is used when the lockTable is disabled (e.g. on a
// follower replica) and a lock is discovered that must be waited on (e.g.
// during a follower read). If/when lockTables are maintained on follower
// replicas by propagating lockTable state transitions through the Raft log
// in the ReplicatedEvalResult instead of through the (leaseholder-only)
// LocalResult, we should be able to remove the lockTable "disabled" state
// and, in turn, remove this method. This will likely fall out of pulling
// all replicated locks into the lockTable.
WaitOnLock(context.Context, Request, *roachpb.Intent) *Error
}

// txnWaitQueue holds a collection of wait-queues for transaction records.
Expand Down
62 changes: 57 additions & 5 deletions pkg/kv/kvserver/concurrency/concurrency_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
Expand Down Expand Up @@ -85,7 +86,6 @@ func NewManager(cfg Config) Manager {
maxLocks: cfg.MaxLockTableSize,
},
ltw: &lockTableWaiterImpl{
nodeID: cfg.NodeDesc.NodeID,
st: cfg.Settings,
stopper: cfg.Stopper,
ir: cfg.IntentResolver,
Expand Down Expand Up @@ -244,27 +244,47 @@ func (m *managerImpl) FinishReq(g *Guard) {
// HandleWriterIntentError implements the ContentionHandler interface.
func (m *managerImpl) HandleWriterIntentError(
ctx context.Context, g *Guard, t *roachpb.WriteIntentError,
) *Guard {
) (*Guard, *Error) {
if g.ltg == nil {
log.Fatalf(ctx, "cannot handle WriteIntentError %v for request without "+
"lockTableGuard; were lock spans declared for this request?", t)
}

// Add a discovered lock to lock-table for each intent and enter each lock's
// wait-queue.
// wait-queue. If the lock-table is disabled and one or more of the intents
// are ignored then we immediately wait on all intents.
wait := false
for i := range t.Intents {
intent := &t.Intents[i]
if err := m.lt.AddDiscoveredLock(intent, g.ltg); err != nil {
added, err := m.lt.AddDiscoveredLock(intent, g.ltg)
if err != nil {
log.Fatal(ctx, errors.HandleAsAssertionFailure(err))
}
if !added {
wait = true
}
}

// Release the Guard's latches but continue to remain in lock wait-queues by
// not releasing lockWaitQueueGuards. We expect the caller of this method to
// then re-sequence the Request by calling SequenceReq with the un-latched
// Guard. This is analogous to iterating through the loop in SequenceReq.
m.lm.Release(g.moveLatchGuard())
return g

// If the lockTable was disabled then we need to immediately wait on the
// intents to ensure that they are resolved and moved out of the request's
// way.
if wait {
for i := range t.Intents {
intent := &t.Intents[i]
if err := m.ltw.WaitOnLock(ctx, g.Req, intent); err != nil {
m.FinishReq(g)
return nil, err
}
}
}

return g, nil
}

// HandleTransactionPushError implements the ContentionHandler interface.
Expand Down Expand Up @@ -378,6 +398,38 @@ func (m *managerImpl) TxnWaitQueue() *txnwait.Queue {
return m.twq.(*txnwait.Queue)
}

func (r *Request) txnMeta() *enginepb.TxnMeta {
if r.Txn == nil {
return nil
}
return &r.Txn.TxnMeta
}

// readConflictTimestamp returns the maximum timestamp at which the request
// conflicts with locks acquired by other transaction. The request must wait
// for all locks acquired by other transactions at or below this timestamp
// to be released. All locks acquired by other transactions above this
// timestamp are ignored.
func (r *Request) readConflictTimestamp() hlc.Timestamp {
ts := r.Timestamp
if r.Txn != nil {
ts = r.Txn.ReadTimestamp
ts.Forward(r.Txn.MaxTimestamp)
}
return ts
}

// writeConflictTimestamp returns the minimum timestamp at which the request
// acquires locks when performing mutations. All writes performed by the
// requests must take place at or above this timestamp.
func (r *Request) writeConflictTimestamp() hlc.Timestamp {
ts := r.Timestamp
if r.Txn != nil {
ts = r.Txn.WriteTimestamp
}
return ts
}

func (r *Request) isSingle(m roachpb.Method) bool {
if len(r.Requests) != 1 {
return false
Expand Down
Loading

0 comments on commit eb8bd6b

Please sign in to comment.