Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
101796: kv: don't preemptively refresh under weak isolation levels r=arulajmani a=nvanbenschoten

Informs #100131.

This commit disables preemptive transaction refreshes under weak isolation levels. These transactions can tolerate write skew, so they can commit even if their write timestamp has been bumped. Transactions run at weak isolation levels may refresh in response to WriteTooOld errors or ReadWithinUncertaintyInterval errors returned by requests, but they do not need to refresh preemptively ahead of an EndTxn request.

Release note: None

101979: sql: fix the pretty-printing of CREATE EXTENSION r=otan a=knz

Fixes #101978.

Release note: None
Epic: None

Co-authored-by: Nathan VanBenschoten <[email protected]>
Co-authored-by: Raphael 'kena' Poss <[email protected]>
  • Loading branch information
3 people committed Apr 24, 2023
3 parents 315e1a3 + 33fda25 + f43bd38 commit cfa9ede
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 8 deletions.
9 changes: 9 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,15 @@ func (sr *txnSpanRefresher) maybeRefreshPreemptively(
return ba, nil
}

// If the transaction can tolerate write skew, no preemptive refresh is
// necessary, even if its write timestamp has been bumped. Transactions run at
// weak isolation levels may refresh in response to WriteTooOld errors or
// ReadWithinUncertaintyInterval errors returned by requests, but they do not
// need to refresh preemptively ahead of an EndTxn request.
if ba.Txn.IsoLevel.ToleratesWriteSkew() {
return ba, nil
}

// If true, tryRefreshTxnSpans will trivially succeed.
refreshFree := ba.CanForwardReadTimestamp

Expand Down
73 changes: 73 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -539,6 +540,78 @@ func TestTxnSpanRefresherPreemptiveRefresh(t *testing.T) {
require.False(t, tsr.refreshInvalid)
}

// TestTxnSpanRefresherPreemptiveRefreshIsoLevel tests that the txnSpanRefresher
// only performed preemptive client-side refreshes of Serializable transactions.
func TestTxnSpanRefresherPreemptiveRefreshIsoLevel(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

tests := []struct {
isoLevel isolation.Level
expRefresh bool
}{
{isolation.Serializable, true},
{isolation.Snapshot, false},
{isolation.ReadCommitted, false},
}
for _, tt := range tests {
t.Run(tt.isoLevel.String(), func(t *testing.T) {
ctx := context.Background()
tsr, mockSender := makeMockTxnSpanRefresher()

txn := makeTxnProto()
txn.IsoLevel = tt.isoLevel

// Push the txn.
txn.WriteTimestamp = txn.WriteTimestamp.Add(1, 0)
origReadTs := txn.ReadTimestamp
pushedWriteTs := txn.WriteTimestamp

// Send an EndTxn request.
ba := &kvpb.BatchRequest{}
ba.Header = kvpb.Header{Txn: &txn}
etArgs := kvpb.EndTxnRequest{Commit: true}
ba.Add(&etArgs)

mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
require.Len(t, ba.Requests, 1)
require.True(t, ba.CanForwardReadTimestamp)
require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[0].GetInner())

if tt.expRefresh {
// The transaction should be refreshed.
require.NotEqual(t, origReadTs, ba.Txn.ReadTimestamp)
require.Equal(t, pushedWriteTs, ba.Txn.ReadTimestamp)
require.Equal(t, pushedWriteTs, ba.Txn.WriteTimestamp)
} else {
// The transaction should not be refreshed.
require.Equal(t, origReadTs, ba.Txn.ReadTimestamp)
require.NotEqual(t, pushedWriteTs, ba.Txn.ReadTimestamp)
require.Equal(t, pushedWriteTs, ba.Txn.WriteTimestamp)
}

br := ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})

br, pErr := tsr.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)

expRefreshSuccess := int64(0)
if tt.expRefresh {
expRefreshSuccess = 1
}
require.Equal(t, expRefreshSuccess, tsr.refreshSuccess.Count())
require.Equal(t, int64(0), tsr.refreshFail.Count())
require.Equal(t, int64(0), tsr.refreshAutoRetries.Count())
require.True(t, tsr.refreshFootprint.empty())
require.False(t, tsr.refreshInvalid)
})
}
}

// TestTxnSpanRefresherSplitEndTxnOnAutoRetry tests that EndTxn requests are
// split into their own sub-batch on auto-retries after a successful refresh.
// This is done to avoid starvation.
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/create_extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (n *createExtensionNode) unimplementedExtensionError(issue int) error {
name := n.CreateExtension.Name
return unimplemented.NewWithIssueDetailf(
issue,
"CREATE EXTENSION "+name,
"CREATE EXTENSION "+string(name),
"extension %q is not yet supported",
name,
)
Expand All @@ -48,7 +48,7 @@ func (n *createExtensionNode) startExec(params runParams) error {
"fuzzystrmatch",
"pgcrypto",
"uuid-ossp":
telemetry.Inc(sqltelemetry.CreateExtensionCounter(n.CreateExtension.Name))
telemetry.Inc(sqltelemetry.CreateExtensionCounter(string(n.CreateExtension.Name)))
return nil
case "postgis_raster",
"postgis_topology",
Expand Down Expand Up @@ -105,7 +105,7 @@ func (n *createExtensionNode) startExec(params runParams) error {
"xml2":
return n.unimplementedExtensionError(54516)
}
return pgerror.Newf(pgcode.UndefinedParameter, "unknown extension: %q", n.CreateExtension.Name)
return pgerror.Newf(pgcode.UndefinedParameter, "unknown extension: %s", n.CreateExtension.Name)
}

func (n *createExtensionNode) Next(params runParams) (bool, error) { return false, nil }
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/parser/sql.y
Original file line number Diff line number Diff line change
Expand Up @@ -4490,10 +4490,10 @@ create_schedule_stmt:
create_extension_stmt:
CREATE EXTENSION IF NOT EXISTS name
{
$$.val = &tree.CreateExtension{IfNotExists: true, Name: $6}
$$.val = &tree.CreateExtension{IfNotExists: true, Name: tree.Name($6)}
}
| CREATE EXTENSION name {
$$.val = &tree.CreateExtension{Name: $3}
$$.val = &tree.CreateExtension{Name: tree.Name($3)}
}
| CREATE EXTENSION IF NOT EXISTS name WITH error
{
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/parser/testdata/create_misc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ CREATE EXTENSION bob -- fully parenthesized
CREATE EXTENSION bob -- literals removed
CREATE EXTENSION bob -- identifiers removed

parse
CREATE EXTENSION "a-b"
----
CREATE EXTENSION "a-b"
CREATE EXTENSION "a-b" -- fully parenthesized
CREATE EXTENSION "a-b" -- literals removed
CREATE EXTENSION "a-b" -- identifiers removed

parse
CREATE EXTENSION IF NOT EXISTS bob
----
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/sem/tree/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -2132,7 +2132,7 @@ func (o *CreateStatsOptions) CombineWith(other *CreateStatsOptions) error {

// CreateExtension represents a CREATE EXTENSION statement.
type CreateExtension struct {
Name string
Name Name
IfNotExists bool
}

Expand All @@ -2147,7 +2147,9 @@ func (node *CreateExtension) Format(ctx *FmtCtx) {
// do not contain sensitive information and
// 2) we want to get telemetry on which extensions
// users attempt to load.
ctx.WriteString(node.Name)
ctx.WithFlags(ctx.flags&^FmtAnonymize, func() {
ctx.FormatNode(&node.Name)
})
}

// CreateExternalConnection represents a CREATE EXTERNAL CONNECTION statement.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/testdata/telemetry/extension
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ unimplemented.#54516.CREATE EXTENSION xml2
feature-usage
CREATE EXTENSION asdf
----
error: pq: unknown extension: "asdf"
error: pq: unknown extension: asdf

0 comments on commit cfa9ede

Please sign in to comment.