Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql,kv: add preliminary SQL savepoints support #45566

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/cli/interactive_tests/test_txn_prompt.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ eexpect root@
send "SELECT crdb_internal.force_retry('1s':::INTERVAL);\r"
eexpect "ERROR: restart transaction"
eexpect root@
eexpect "RETRY>"
eexpect "ERROR>"
end_test

start_test "Test that prompt reverts to OPEN at beginning of new attempt."
Expand Down
4 changes: 1 addition & 3 deletions pkg/cli/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,14 +740,12 @@ func (c *cliState) refreshTransactionStatus() {

// Change the prompt based on the response from the server.
switch txnString {
case sql.NoTxnStr:
case sql.NoTxnStateStr:
c.lastKnownTxnStatus = ""
case sql.AbortedStateStr:
c.lastKnownTxnStatus = " ERROR"
case sql.CommitWaitStateStr:
c.lastKnownTxnStatus = " DONE"
case sql.RestartWaitStateStr:
c.lastKnownTxnStatus = " RETRY"
case sql.OpenStateStr:
// The state AutoRetry is reported by the server as Open, so no need to
// handle it here.
Expand Down
56 changes: 0 additions & 56 deletions pkg/cmd/roachtest/pgjdbc_blacklist.go

Large diffs are not rendered by default.

9 changes: 7 additions & 2 deletions pkg/internal/client/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,13 @@ const (

// SavepointToken represents a savepoint.
type SavepointToken interface {
// SavepointToken is a marker interface.
SavepointToken()
// Initial returns true if this savepoint has been created before performing
// any KV operations. If so, it is possible to rollback to it after a
// retriable error. If not, then rolling back to it after a retriable error
// will return the retriable error again because reads might have been
// evaluated before the savepoint and such reads cannot have their timestamp
// forwarded without a refresh.
Initial() bool
}

// TxnStatusOpt represents options for TxnSender.GetMeta().
Expand Down
36 changes: 28 additions & 8 deletions pkg/kv/testdata/savepoints
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,6 @@ commit
subtest end

subtest rollback_across_retry

# TODO(knz): change this test when rolling back across retries becomes
# supported.

begin
----
0 <noignore>
Expand All @@ -339,11 +335,11 @@ epoch: 0 -> 1

release x
----
(*withstack.withStack) cannot release savepoint across transaction retries
0 <noignore>

rollback x
----
(*withstack.withStack) cannot rollback savepoint across transaction retries
0 <noignore>

subtest end

Expand All @@ -364,11 +360,35 @@ txn id changed

release x
----
(*withstack.withStack) cannot release savepoint across transaction retries
0 <noignore>

rollback x
----
(*withstack.withStack) cannot rollback savepoint across transaction retries
0 <noignore>

subtest end

subtest rollback_across_retry_fails_for_non-initial_savepoint
# The difference from the previous test is that here we do a write before
# creating the savepoint.
begin
----
0 <noignore>

put k a
----

savepoint x
----
1 <noignore>

retry
----
synthetic error: TransactionRetryWithProtoRefreshError: forced retry
epoch: 0 -> 1

rollback x
----
(*roachpb.TransactionRetryWithProtoRefreshError) TransactionRetryWithProtoRefreshError: cannot rollback to savepoint after a transaction restart

subtest end
15 changes: 13 additions & 2 deletions pkg/kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ type TxnCoordSender struct {
// clients on Send().
storedErr *roachpb.Error

// active is set whenever the transaction has sent any requests.
// active is set whenever the transaction has sent any requests. Rolling
// back to a savepoint taken before the TxnCoordSender became active resets
// the field to false.
active bool

// closed is set once this transaction has either committed or rolled back
Expand Down Expand Up @@ -176,6 +178,14 @@ type txnInterceptor interface {
// increment.
epochBumpedLocked()

// createSavepointLocked is used to populate a savepoint with all the state
// that needs to be restored on a rollback.
createSavepointLocked(context.Context, *savepoint)

// rollbackToSavepointLocked is used to restore the state previously saved by
// createSavepointLocked().
rollbackToSavepointLocked(context.Context, savepoint)

// closeLocked closes the interceptor. It is called when the TxnCoordSender
// shuts down due to either a txn commit or a txn abort. The method will
// be called exactly once from cleanupTxnLocked.
Expand Down Expand Up @@ -943,7 +953,8 @@ func (tc *TxnCoordSender) IsTracking() bool {
return tc.interceptorAlloc.txnHeartbeater.heartbeatLoopRunningLocked()
}

// Active returns true iff there were commands executed already.
// Active returns true if requests were sent already. Rolling back to a
// savepoint taken before any requests were sent resets this to false.
func (tc *TxnCoordSender) Active() bool {
tc.mu.Lock()
defer tc.mu.Unlock()
Expand Down
171 changes: 101 additions & 70 deletions pkg/kv/txn_coord_sender_savepoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,56 +14,48 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

// savepointToken captures the state in the TxnCoordSender necessary
// to restore that state upon a savepoint rollback.
//
// TODO(knz,andrei): Currently this definition is only sufficient for
// just a few cases of rollbacks. This should be extended to cover
// more ground:
//
// - We also need the current size of txnSpanRefresher.refreshSpans the
// list of tracked reads, such that upon rollback we tell the
// refresher interceptor to discard further reads.
// - We also need something about in-flight writes
// (txnPipeliner.ifWrites). There I guess we need to take some sort of
// snapshot of the current in-flight writes and, on rollback, discard
// in-flight writes that are not part of the savepoint. But, on
// rollback, I don't think we should (nor am I sure that we could)
// simply overwrite the set of in-flight writes with the ones from the
// savepoint because writes that have been verified since the snapshot
// has been taken should continue to be verified. Basically, on
// rollback I think we need to intersect the savepoint with the
// current set of in-flight writes.
type savepointToken struct {
// seqNum is currently the only field that helps to "restore"
// anything upon a rollback. When used, it does not change anything
// in the TCS; instead it simply configures the txn to ignore all
// seqnums from this value until the most recent seqnum emitted by
// the TCS.
seqNum enginepb.TxnSeq

// txnID is used to verify that a rollback is not used to paper
// over a txn abort error.
// savepoint captures the state in the TxnCoordSender necessary to restore that
// state upon a savepoint rollback.
type savepoint struct {
// active is a snapshot of TxnCoordSender.active.
active bool

// txnID and epoch are set for savepoints with the active field set.
// txnID and epoch are used to disallow rollbacks past transaction restarts.
// Savepoints without the active field set are allowed to be used to rollback
// past transaction restarts too, because it's trivial to rollback to the
// beginning of the transaction.
txnID uuid.UUID
// epoch is used to verify that a savepoint rollback is not
// used to paper over a retry error.
// TODO(knz,andrei): expand savepoint rollbacks to recover
// from retry errors.
// TODO(knz,andrei): remove the epoch mechanism entirely in
// favor of seqnums and savepoint rollbacks.
epoch enginepb.TxnEpoch

// seqNum represents the write seq num at the time the savepoint was created.
// On rollback, it configures the txn to ignore all seqnums from this value
// until the most recent seqnum.
seqNum enginepb.TxnSeq

// txnSpanRefresher fields.
refreshSpans []roachpb.Span
refreshInvalid bool
refreshSpanBytes int64
}

var _ client.SavepointToken = (*savepointToken)(nil)
var _ client.SavepointToken = (*savepoint)(nil)

// SavepointToken implements the client.SavepointToken interface.
func (s *savepointToken) SavepointToken() {}
// statically allocated savepoint marking the beginning of a transaction. Used
// to avoid allocations for such savepoints.
var initialSavepoint = savepoint{}

// Initial implements the client.SavepointToken interface.
func (s *savepoint) Initial() bool {
return !s.active
}

// CreateSavepoint is part of the client.TxnSender interface.
func (tc *TxnCoordSender) CreateSavepoint(ctx context.Context) (client.SavepointToken, error) {
Expand All @@ -82,11 +74,22 @@ func (tc *TxnCoordSender) CreateSavepoint(ctx context.Context) (client.Savepoint
return nil, ErrSavepointOperationInErrorTxn
}

return &savepointToken{
if !tc.mu.active {
// Return a preallocated savepoint for the common case of savepoints placed
// at the beginning of transactions.
return &initialSavepoint, nil
}

s := &savepoint{
active: true, // we've handled the not-active case above
txnID: tc.mu.txn.ID,
epoch: tc.mu.txn.Epoch,
seqNum: tc.interceptorAlloc.txnSeqNumAllocator.writeSeq,
}, nil
}
for _, reqInt := range tc.interceptorStack {
reqInt.createSavepointLocked(ctx, s)
}

return s, nil
}

// RollbackToSavepoint is part of the client.TxnSender interface.
Expand All @@ -102,25 +105,41 @@ func (tc *TxnCoordSender) RollbackToSavepoint(ctx context.Context, s client.Save
return err
}

st, err := tc.checkSavepointLocked(s, "rollback")
sp := s.(*savepoint)
err := tc.checkSavepointLocked(sp)
if err != nil {
if err == errSavepointInvalidAfterTxnRestart {
err = roachpb.NewTransactionRetryWithProtoRefreshError(
"cannot rollback to savepoint after a transaction restart",
tc.mu.txn.ID,
// The transaction inside this error doesn't matter.
roachpb.Transaction{},
)
}
return err
}

// TODO(knz): handle recoverable errors.
if tc.mu.txnState == txnError {
return unimplemented.New("rollback_error", "savepoint rollback after error")
if tc.mu.txnState == txnFinalized {
return unimplemented.New("rollback_error", "savepoint rollback finalized txn")
}

if st.seqNum == tc.interceptorAlloc.txnSeqNumAllocator.writeSeq {
// No operations since savepoint was taken. No-op.
return nil
// Restore the transaction's state, in case we're rewiding after an error.
tc.mu.txnState = txnPending

tc.mu.active = sp.active

for _, reqInt := range tc.interceptorStack {
reqInt.rollbackToSavepointLocked(ctx, *sp)
}

tc.mu.txn.AddIgnoredSeqNumRange(
enginepb.IgnoredSeqNumRange{
Start: st.seqNum + 1, End: tc.interceptorAlloc.txnSeqNumAllocator.writeSeq,
})
// If there's been any more writes since the savepoint was created, they'll
// need to be ignored.
if sp.seqNum < tc.interceptorAlloc.txnSeqNumAllocator.writeSeq {
tc.mu.txn.AddIgnoredSeqNumRange(
enginepb.IgnoredSeqNumRange{
Start: sp.seqNum + 1, End: tc.interceptorAlloc.txnSeqNumAllocator.writeSeq,
})
}

return nil
}
Expand All @@ -138,7 +157,16 @@ func (tc *TxnCoordSender) ReleaseSavepoint(ctx context.Context, s client.Savepoi
return ErrSavepointOperationInErrorTxn
}

_, err := tc.checkSavepointLocked(s, "release")
sp := s.(*savepoint)
err := tc.checkSavepointLocked(sp)
if err == errSavepointInvalidAfterTxnRestart {
err = roachpb.NewTransactionRetryWithProtoRefreshError(
"cannot release savepoint after a transaction restart",
tc.mu.txn.ID,
// The transaction inside this error doesn't matter.
roachpb.Transaction{},
)
}
return err
}

Expand All @@ -159,26 +187,29 @@ func (tc *TxnCoordSender) assertNotFinalized() error {
return nil
}

func (tc *TxnCoordSender) checkSavepointLocked(
s client.SavepointToken, opName string,
) (*savepointToken, error) {
st, ok := s.(*savepointToken)
if !ok {
return nil, errors.AssertionFailedf("expected savepointToken, got %T", s)
}
var errSavepointInvalidAfterTxnRestart = errors.New("savepoint invalid after transaction restart")

if st.txnID != tc.mu.txn.ID {
return nil, errors.Newf("cannot %s savepoint across transaction retries", opName)
// checkSavepointLocked checks whether the provided savepoint is still valid.
// Returns errSavepointInvalidAfterTxnRestart if the savepoint is not an
// "initial" one and the transaction has restarted since the savepoint was
// created.
func (tc *TxnCoordSender) checkSavepointLocked(s *savepoint) error {
// Only savepoints taken before any activity are allowed to be used after a
// transaction restart.
if s.Initial() {
return nil
}

if st.epoch != tc.mu.txn.Epoch {
return nil, errors.Newf("cannot %s savepoint across transaction retries", opName)
if s.txnID != tc.mu.txn.ID {
return errSavepointInvalidAfterTxnRestart
}
if s.epoch != tc.mu.txn.Epoch {
return errSavepointInvalidAfterTxnRestart
}

if st.seqNum < 0 || st.seqNum > tc.interceptorAlloc.txnSeqNumAllocator.writeSeq {
return nil, errors.AssertionFailedf("invalid savepoint: got %d, expected 0-%d",
st.seqNum, tc.interceptorAlloc.txnSeqNumAllocator.writeSeq)
if s.seqNum < 0 || s.seqNum > tc.interceptorAlloc.txnSeqNumAllocator.writeSeq {
return errors.AssertionFailedf("invalid savepoint: got %d, expected 0-%d",
s.seqNum, tc.interceptorAlloc.txnSeqNumAllocator.writeSeq)
}

return st, nil
return nil
}
6 changes: 6 additions & 0 deletions pkg/kv/txn_interceptor_committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,12 @@ func (*txnCommitter) importLeafFinalState(*roachpb.LeafTxnFinalState) {}
// epochBumpedLocked implements the txnReqInterceptor interface.
func (tc *txnCommitter) epochBumpedLocked() {}

// createSavepointLocked is part of the txnReqInterceptor interface.
func (*txnCommitter) createSavepointLocked(context.Context, *savepoint) {}

// rollbackToSavepointLocked is part of the txnReqInterceptor interface.
func (*txnCommitter) rollbackToSavepointLocked(context.Context, savepoint) {}

// closeLocked implements the txnReqInterceptor interface.
func (tc *txnCommitter) closeLocked() {}

Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/txn_interceptor_heartbeater.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,12 @@ func (*txnHeartbeater) importLeafFinalState(*roachpb.LeafTxnFinalState) {}
// epochBumpedLocked is part of the txnInterceptor interface.
func (h *txnHeartbeater) epochBumpedLocked() {}

// createSavepointLocked is part of the txnReqInterceptor interface.
func (*txnHeartbeater) createSavepointLocked(context.Context, *savepoint) {}

// rollbackToSavepointLocked is part of the txnReqInterceptor interface.
func (*txnHeartbeater) rollbackToSavepointLocked(context.Context, savepoint) {}

// closeLocked is part of the txnInterceptor interface.
func (h *txnHeartbeater) closeLocked() {
h.cancelHeartbeatLoopLocked()
Expand Down
Loading