Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
111318: plpgsql: add support for FETCH and MOVE statements r=DrewKimball a=DrewKimball

#### plpgsql: implement builtin function for FETCH statements

This patch adds the undocumentd `crdb_internal.plpgsql_fetch` builtin function,
which seeks a cursor by the specified number of rows in the specified
direction, and then returns the row at the ending location (if any). It reuses
the same logic as the SQL FETCH and MOVE statements. Note that it differs from
the SQL behavior for the `ALL` variants, since it only returns one row. This is
consistent with PLpgSQL cursor behavior.

The builtin function has 4 parameters: the name of the cursor, the seek
direction (an integer representing `tree.FetchType`), the seek count
(0 if not applicable), and a tuple containing typed NULL values that represents
the expected return types for the columns. This type argument is similar to
the one for `crdb_internal.assignment_cast`, with one addition: the result
columns are padded with NULLs or truncated as necessary to fit the number
of expected types.

When the actual types returned by the cursor must be coerced to the expected
types, an explicit cast is used, but width truncation is disallowed. This is
in line with PG behavior, which allows casting a String to an Int, but does
not allow casting a string like 'abc' to a Char.

Informs cockroachdb#109709

Release note: None

#### plpgsql: add support for FETCH and MOVE statements

This patch adds support for the PLpgSQL FETCH and MOVE statements,
which seek and return rows from a cursor. This is handled by a builtin
function, `crdb_internal.plpgsql_fetch`, which calls into the same logic
that implements SQL FETCH and MOVE. Since it is possible to call `FETCH`
with `INTO` variables of different types, the `crdb_internal.plpgsql_fetch`
builtin takes an argument that supplies the expected column types as a
tuple of typed NULL values like this: `(NULL::INT, NULL::BOOL)`. The
actual types supplied by the cursor are coerced into the expected types.

Note that the current implementation does not support using dynamic
expressions in the FETCH/MOVE direction; only constant integer values.
Dynamic direction counts like `FORWARD x` are not allowed in SQL syntax,
but are allowed by PLpgSQL.

Informs cockroachdb#109709

Release note (sql change): Added support for PLpgSQL FETCH and MOVE
statements. Similar to SQL FETCH/MOVE statements, commands that would
seek the cursor backward will fail. In addition, expressions other than
constant integers are not yet supported for the `count` option.

111546: kv: bump timestamp cache when resolving replicated locks r=nvanbenschoten a=arulajmani

This patch teaches ResolveIntent and ResolveIntentRange requests to
bump the timestamp cache if any replicated shared/exclusive locks were
resolved by them (if the transaction that held the lock was committed).
In all other cases (only unreplicated locks, no shared or exclusive
locks, or aborted lock holder transaction) the timestamp cache is not
bumped.

The handling of ResolveIntentRange requests deserves some words -- for
these, we choose to bump the timestamp cache over the entire keyspan
they operated over if there's a single replicated {shared, exclusive}
lock. This means we're losing fidelity over specific keys that had point
locks on them; we choose this approach instead of trying to plumb high
fidelity information back up.

Lastly, it's worth noting that `EndTxn` requests also resolve local
locks. As such, any replicated {shared, exclusive} locks resolved by a
EndTxn request also need to be handled in similar fashion. This patch
does not do that -- we leave that to an subsequent patch, at which point
the linked issue can be closed.

Informs cockroachdb#111536

Release note: None

Co-authored-by: Drew Kimball <[email protected]>
Co-authored-by: Arul Ajmani <[email protected]>
  • Loading branch information
3 people committed Oct 5, 2023
3 parents 9a1bb22 + 20dcd51 + 7702ff8 commit 6b08842
Show file tree
Hide file tree
Showing 47 changed files with 2,164 additions and 350 deletions.
7 changes: 0 additions & 7 deletions pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/kv/kvpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1622,8 +1622,8 @@ func (*QueryIntentRequest) flags() flag {
return isRead | isPrefix | updatesTSCache | updatesTSCacheOnErr
}
func (*QueryLocksRequest) flags() flag { return isRead | isRange }
func (*ResolveIntentRequest) flags() flag { return isWrite }
func (*ResolveIntentRangeRequest) flags() flag { return isWrite | isRange }
func (*ResolveIntentRequest) flags() flag { return isWrite | updatesTSCache }
func (*ResolveIntentRangeRequest) flags() flag { return isWrite | isRange | updatesTSCache }
func (*TruncateLogRequest) flags() flag { return isWrite }
func (*MergeRequest) flags() flag { return isWrite | canBackpressure }
func (*RequestLeaseRequest) flags() flag {
Expand Down
22 changes: 22 additions & 0 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1479,6 +1479,17 @@ message ResolveIntentRequest {
// ResolveIntent() method.
message ResolveIntentResponse {
ResponseHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
// ReplicatedLocksReleasedCommitTimestamp, if non-empty, indicates that a
// replicated lock with strength Shared or Exclusive was released by a
// transaction who committed at this timestamp. Notably, this field is left
// unset if only a write intent was resolved. The field is also left unset for
// transactions who aborted.
//
// The caller must bump the timestamp cache across the resolution span to this
// commit timestamp. Doing so ensures that the released lock (acquired by a
// now committed transaction) continues to provide protection against other
// writers up to the commit timestamp, even after the lock has been released.
util.hlc.Timestamp replicated_locks_released_commit_timestamp = 2 [(gogoproto.nullable) = false];
}

// A ResolveIntentRangeRequest is arguments to the ResolveIntentRange() method.
Expand Down Expand Up @@ -1523,6 +1534,17 @@ message ResolveIntentRangeRequest {
// ResolveIntent() method.
message ResolveIntentRangeResponse {
ResponseHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
// ReplicatedLocksReleasedCommitTimestamp, if non-empty, indicates that at
// least one replicated lock with strength Shared or Exclusive was released by
// a transaction who committed at this timestamp. Notably, this field is left
// unset if only a write intent was resolved. The field is also left unset for
// transactions who aborted.
//
// The caller must bump the timestamp cache across the resolution span to this
// commit timestamp. Doing so ensures that the released lock (acquired by a
// now committed transaction) continues to provide protection against other
// writers up to the commit timestamp, even after the lock has been released.
util.hlc.Timestamp replicated_locks_released_commit_timestamp = 2 [(gogoproto.nullable) = false];
}

// A MergeRequest contains arguments to the Merge() method. It
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batch_spanset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ func TestSpanSetMVCCResolveWriteIntentRange(t *testing.T) {
Txn: enginepb.TxnMeta{ID: uuid.MakeV4()}, // unused
Status: roachpb.PENDING,
}
if _, _, _, _, err := storage.MVCCResolveWriteIntentRange(
if _, _, _, _, _, err := storage.MVCCResolveWriteIntentRange(
ctx, batch, nil /* ms */, intent, storage.MVCCResolveWriteIntentRangeOptions{},
); err != nil {
t.Fatal(err)
Expand Down
8 changes: 5 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ func resolveLocalLocksWithPagination(
//
// Note that the underlying pebbleIterator will still be reused
// since readWriter is a pebbleBatch in the typical case.
ok, numBytes, resumeSpan, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update,
ok, numBytes, resumeSpan, _, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentOptions{TargetBytes: targetBytes})
if err != nil {
return 0, 0, 0, errors.Wrapf(err, "resolving write intent at %s on end transaction [%s]", span, txn.Status)
Expand Down Expand Up @@ -630,8 +630,10 @@ func resolveLocalLocksWithPagination(
externalLocks = append(externalLocks, outSpans...)
if inSpan != nil {
update.Span = *inSpan
numKeys, numBytes, resumeSpan, resumeReason, err := storage.MVCCResolveWriteIntentRange(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentRangeOptions{MaxKeys: maxKeys, TargetBytes: targetBytes})
numKeys, numBytes, resumeSpan, resumeReason, _, err :=
storage.MVCCResolveWriteIntentRange(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentRangeOptions{MaxKeys: maxKeys, TargetBytes: targetBytes},
)
if err != nil {
return 0, 0, 0, errors.Wrapf(err, "resolving write intent range at %s on end transaction [%s]", span, txn.Status)
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_refresh_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,9 @@ func TestRefreshRangeTimeBoundIterator(t *testing.T) {
// would not have any timestamp bounds and would be selected for every read.
intent := roachpb.MakeLockUpdate(txn, roachpb.Span{Key: k})
intent.Status = roachpb.COMMITTED
if _, _, _, err := storage.MVCCResolveWriteIntent(ctx, db, nil, intent, storage.MVCCResolveWriteIntentOptions{}); err != nil {
if _, _, _, _, err := storage.MVCCResolveWriteIntent(
ctx, db, nil, intent, storage.MVCCResolveWriteIntentOptions{},
); err != nil {
t.Fatal(err)
}
if err := storage.MVCCPut(ctx, db, roachpb.Key("unused2"), ts1, v, storage.MVCCWriteOptions{}); err != nil {
Expand Down Expand Up @@ -278,7 +280,7 @@ func TestRefreshRangeError(t *testing.T) {
if resolveIntent {
intent := roachpb.MakeLockUpdate(txn, roachpb.Span{Key: k})
intent.Status = roachpb.COMMITTED
if _, _, _, err := storage.MVCCResolveWriteIntent(ctx, db, nil, intent, storage.MVCCResolveWriteIntentOptions{}); err != nil {
if _, _, _, _, err := storage.MVCCResolveWriteIntent(ctx, db, nil, intent, storage.MVCCResolveWriteIntentOptions{}); err != nil {
t.Fatal(err)
}
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_refresh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ func TestRefreshError(t *testing.T) {
if resolveIntent {
intent := roachpb.MakeLockUpdate(txn, roachpb.Span{Key: k})
intent.Status = roachpb.COMMITTED
if _, _, _, err := storage.MVCCResolveWriteIntent(ctx, db, nil, intent, storage.MVCCResolveWriteIntentOptions{}); err != nil {
if _, _, _, _, err := storage.MVCCResolveWriteIntent(
ctx, db, nil, intent, storage.MVCCResolveWriteIntentOptions{},
); err != nil {
t.Fatal(err)
}
}
Expand Down
15 changes: 13 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_resolve_intent.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ func ResolveIntent(
// The observation was from the wrong node. Ignore.
update.ClockWhilePending = roachpb.ObservedTimestamp{}
}
ok, numBytes, resumeSpan, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentOptions{TargetBytes: h.TargetBytes})
ok, numBytes, resumeSpan, replLocksReleased, err := storage.MVCCResolveWriteIntent(
ctx, readWriter, ms, update, storage.MVCCResolveWriteIntentOptions{TargetBytes: h.TargetBytes})
if err != nil {
return result.Result{}, err
}
Expand All @@ -114,6 +114,17 @@ func ResolveIntent(
res.Local.ResolvedLocks = []roachpb.LockUpdate{update}
res.Local.Metrics = resolveToMetricType(args.Status, args.Poison)

// Handle replicated lock releases.
if replLocksReleased && update.Status == roachpb.COMMITTED {
// A replicated {shared, exclusive} lock was released for a committed
// transaction. Now that the lock is no longer there, we still need to make
// sure other transactions can't write underneath the transaction's commit
// timestamp to the key. We return the transaction's commit timestamp on the
// response and update the timestamp cache a few layers above to ensure
// this.
reply.ReplicatedLocksReleasedCommitTimestamp = update.Txn.WriteTimestamp
}

if WriteAbortSpanOnResolve(args.Status, args.Poison, ok) {
if err := UpdateAbortSpan(ctx, cArgs.EvalCtx, readWriter, ms, args.IntentTxn, args.Poison); err != nil {
return result.Result{}, err
Expand Down
22 changes: 20 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_resolve_intent_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@ func ResolveIntentRange(
// The observation was from the wrong node. Ignore.
update.ClockWhilePending = roachpb.ObservedTimestamp{}
}
numKeys, numBytes, resumeSpan, resumeReason, err := storage.MVCCResolveWriteIntentRange(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentRangeOptions{MaxKeys: h.MaxSpanRequestKeys, TargetBytes: h.TargetBytes})
numKeys, numBytes, resumeSpan, resumeReason, replLocksReleased, err :=
storage.MVCCResolveWriteIntentRange(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentRangeOptions{MaxKeys: h.MaxSpanRequestKeys, TargetBytes: h.TargetBytes},
)
if err != nil {
return result.Result{}, err
}
Expand All @@ -77,6 +79,22 @@ func ResolveIntentRange(
res.Local.ResolvedLocks = []roachpb.LockUpdate{update}
res.Local.Metrics = resolveToMetricType(args.Status, args.Poison)

// Handle replicated lock releases.
if replLocksReleased && update.Status == roachpb.COMMITTED {
// A replicated {shared, exclusive} lock was released for a committed
// transaction. Now that the lock is no longer there, we still need to make
// sure other transactions can't write underneath the transaction's commit
// timestamp to the key. We return the transaction's commit timestamp on the
// response and update the timestamp cache a few layers above to ensure
// this.
//
// NB: Doing so will update the timestamp cache over the entire key span the
// request operated over -- we're losing fidelity about which key(s) had
// locks. We could do a better job tracking and plumbing this information
// up, but we choose not to.
reply.ReplicatedLocksReleasedCommitTimestamp = update.Txn.WriteTimestamp
}

if WriteAbortSpanOnResolve(args.Status, args.Poison, numKeys > 0) {
if err := UpdateAbortSpan(ctx, cArgs.EvalCtx, readWriter, ms, args.IntentTxn, args.Poison); err != nil {
return result.Result{}, err
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/gc/gc_random_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,9 @@ func TestNewVsInvariants(t *testing.T) {
Txn: i.Txn,
Status: roachpb.ABORTED,
}
_, _, _, err := storage.MVCCResolveWriteIntent(ctx, eng, &stats, l, storage.MVCCResolveWriteIntentOptions{})
_, _, _, _, err := storage.MVCCResolveWriteIntent(
ctx, eng, &stats, l, storage.MVCCResolveWriteIntentOptions{},
)
require.NoError(t, err, "failed to resolve intent")
}
for _, cr := range gcer.clearRanges() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/loqrecovery/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func applyReplicaUpdate(
Txn: res.Intent.Txn,
Status: roachpb.ABORTED,
}
if _, _, _, err := storage.MVCCResolveWriteIntent(ctx, readWriter, &ms, update, storage.MVCCResolveWriteIntentOptions{}); err != nil {
if _, _, _, _, err := storage.MVCCResolveWriteIntent(ctx, readWriter, &ms, update, storage.MVCCResolveWriteIntentOptions{}); err != nil {
return PrepareReplicaReport{}, err
}
report.AbortedTransaction = true
Expand Down
Loading

0 comments on commit 6b08842

Please sign in to comment.