Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
105391: colinfo: add version gate for storing pg_lsn types r=rafiss a=otan

We don't want mixed version clusters storing pg_lsn types, in case they need to rollback / older versions do not understand the type.

Informs #105130

Release note: None

105444: systemschema: stop running PostDeserializationChanges when building tables r=Xiang-Gu a=rafiss

### systemschema: add JobInfo to MakeSystemTables helper

This table was missing from the list, and the expected count of tables
was off since we weren't accounting for non-system tenant tables.

The function is only used for testing, so there was no impact.

---

### systemschema: stop running PostDeserializationChanges when building tables

We would like to remove old PostDeserializationChanges that are no
longer needed. In order to do so, we need to stop relying on them to
build system tables.

Instead, now we adjust the hard-coded system table descriptors and the
related helpers so that they create valid descriptors. This needed two
changes:
- Update the ConstraintID for check constraints.
- Update the primary index encoding so that it includes stored columns.

---

Epic: None
Release note: None

105476: kv: fix data race when updating pending txn in txnStatusCache r=arulajmani a=nvanbenschoten

Fixes #105244.

This commit avoids a data race by treating *roachpb.Transaction objects as immutable, and simply choosing the right transaction to keep in the cache when there is a choice to be made.

The behavior of this logic is tested by `TestTxnCacheUpdatesTxn`.

Release note: None

105480: kv: fix data race during retry of EndTxn after refresh r=arulajmani a=nvanbenschoten

Fixes #103687.
Fixes #103247.
Fixes #104791.

This commit avoids a data race between `splitEndTxnAndRetrySend` and `raceTransport` by avoiding a mutation of a shared `RequestUnion_EndTxn` object within an unshared `RequestUnion` object. The `raceTransport` makes an effort to copy the `BatchRequest`'s `RequestUnion` slice, but it does not copy the inner interface, so we can't play tricks to avoid a reallocation of the `RequestUnion_EndTxn`.

The commit also addresses a similar problem in
`retryTxnCommitAfterFailedParallelCommit`.

We may be able to fix this in the `raceTransport`, but doing so would require some reflection magic and this is currently failing CI, so we make the easier change.

Release note: None

105515: pgwire: fix race in TestConn r=knz a=rafiss

fixes #105410

A recent refactor introduced this race, since the context is used by two testing goroutines.

Release note: None

Co-authored-by: Oliver Tan <[email protected]>
Co-authored-by: Rafi Shamim <[email protected]>
Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
4 people committed Jun 26, 2023
6 parents 72f7db9 + ff12251 + d91fdc4 + 6017bce + e29a2bc + 87d41db commit 15d43bb
Show file tree
Hide file tree
Showing 12 changed files with 98 additions and 31 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func (tc *txnCommitter) retryTxnCommitAfterFailedParallelCommit(
et := baSuffix.Requests[0].GetEndTxn().ShallowCopy().(*kvpb.EndTxnRequest)
et.LockSpans, _ = mergeIntoSpans(et.LockSpans, et.InFlightWrites)
et.InFlightWrites = nil
baSuffix.Requests[0].Value.(*kvpb.RequestUnion_EndTxn).EndTxn = et
baSuffix.Requests[0].MustSetInner(et)
}
brSuffix, pErr := tc.wrapped.SendLocked(ctx, baSuffix)
if pErr != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ func (sr *txnSpanRefresher) splitEndTxnAndRetrySend(
et = et.ShallowCopy().(*kvpb.EndTxnRequest)
et.LockSpans, _ = mergeIntoSpans(et.LockSpans, et.InFlightWrites)
et.InFlightWrites = nil
baSuffix.Requests[0].Value.(*kvpb.RequestUnion_EndTxn).EndTxn = et
baSuffix.Requests[0].MustSetInner(et)
}
brSuffix, pErr := sr.SendLocked(ctx, baSuffix)
if pErr != nil {
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/concurrency/lock_table_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -956,8 +956,10 @@ func (c *txnCache) add(txn *roachpb.Transaction) {
c.mu.Lock()
defer c.mu.Unlock()
if idx := c.getIdxLocked(txn.ID); idx >= 0 {
if !txn.Status.IsFinalized() {
txn.Update(c.txns[idx])
if curTxn := c.txns[idx]; txn.WriteTimestamp.Less(curTxn.WriteTimestamp) {
// If the new txn has a lower write timestamp than the cached txn,
// just move the cached txn to the front of the LRU cache.
txn = curTxn
}
c.moveFrontLocked(txn, idx)
} else {
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/catalog/bootstrap/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,8 @@ func addSystemDescriptorsToSchema(target *MetadataSchema) {
// If adding a call to AddDescriptor or AddDescriptorForSystemTenant, please
// bump the value of NumSystemTablesForSystemTenant below. This constant is
// just used for testing purposes.
// Also add it to the list generated by systemschema.MakeSystemTables(), which
// is also only used for testing purposes.
}

// NumSystemTablesForSystemTenant is the number of system tables defined on
Expand Down
10 changes: 9 additions & 1 deletion pkg/sql/catalog/colinfo/col_type_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func ValidateColumnDefType(ctx context.Context, version clusterversion.Handle, t
return ValidateColumnDefType(ctx, version, t.ArrayContents())

case types.BitFamily, types.IntFamily, types.FloatFamily, types.BoolFamily, types.BytesFamily, types.DateFamily,
types.INetFamily, types.IntervalFamily, types.JsonFamily, types.OidFamily, types.PGLSNFamily, types.TimeFamily,
types.INetFamily, types.IntervalFamily, types.JsonFamily, types.OidFamily, types.TimeFamily,
types.TimestampFamily, types.TimestampTZFamily, types.UuidFamily, types.TimeTZFamily,
types.GeographyFamily, types.GeometryFamily, types.EnumFamily, types.Box2DFamily:
// These types are OK.
Expand All @@ -124,6 +124,14 @@ func ValidateColumnDefType(ctx context.Context, version clusterversion.Handle, t
"TSVector/TSQuery not supported until version 23.1")
}

case types.PGLSNFamily:
if !version.IsActive(ctx, clusterversion.V23_2) {
return pgerror.Newf(
pgcode.FeatureNotSupported,
"pg_lsn not supported until version 23.2",
)
}

default:
return pgerror.Newf(pgcode.InvalidTableDefinition,
"value type %s cannot be used for table columns", t.String())
Expand Down
68 changes: 50 additions & 18 deletions pkg/sql/catalog/systemschema/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -1098,6 +1098,31 @@ func systemTable(
// assigned before the primary index.
tbl.PrimaryIndex.ConstraintID = tbl.NextConstraintID
tbl.NextConstraintID++

// Make sure all primary indexes have the correct encoding and version,
// which also requires the stored columns to be set.
tbl.PrimaryIndex.Version = descpb.PrimaryIndexWithStoredColumnsVersion
tbl.PrimaryIndex.EncodingType = catenumpb.PrimaryIndexEncoding
storedColumnIDs := make([]descpb.ColumnID, 0, len(tbl.Columns))
storedColumnNames := make([]string, 0, len(tbl.Columns))
keyColIDs := catalog.TableColSet{}
for _, colID := range tbl.PrimaryIndex.KeyColumnIDs {
keyColIDs.Add(colID)
}
for _, col := range tbl.Columns {
if keyColIDs.Contains(col.ID) || col.Virtual {
continue
}
storedColumnIDs = append(storedColumnIDs, col.ID)
storedColumnNames = append(storedColumnNames, col.Name)
}
if len(storedColumnIDs) == 0 {
storedColumnIDs = nil
storedColumnNames = nil
}
tbl.PrimaryIndex.StoreColumnIDs = storedColumnIDs
tbl.PrimaryIndex.StoreColumnNames = storedColumnNames

return tbl
}

Expand Down Expand Up @@ -1130,12 +1155,6 @@ func makeSystemTable(
fn(&tbl)
}
b := tabledesc.NewBuilder(&tbl)
if err := b.RunPostDeserializationChanges(); err != nil {
log.Fatalf(
ctx, "system table %q cannot be registered, error during RunPostDeserializationChanges: %+v",
tbl.Name, err,
)
}
return SystemTable{
Schema: createTableStmt,
TableDescriptor: b.BuildImmutableTable(),
Expand Down Expand Up @@ -1164,6 +1183,7 @@ func MakeSystemTables() []SystemTable {
RangeEventTable,
UITable,
JobsTable,
SystemJobInfoTable,
WebSessionsTable,
TableStatisticsTable,
LocationsTable,
Expand Down Expand Up @@ -2269,10 +2289,12 @@ var (
),
func(tbl *descpb.TableDescriptor) {
tbl.Checks = []*descpb.TableDescriptor_CheckConstraint{{
Name: "check_singleton",
Expr: "singleton",
ColumnIDs: []descpb.ColumnID{1},
Name: "check_singleton",
Expr: "singleton",
ColumnIDs: []descpb.ColumnID{1},
ConstraintID: tbl.NextConstraintID,
}}
tbl.NextConstraintID++
},
)

Expand Down Expand Up @@ -2412,10 +2434,12 @@ var (
),
func(tbl *descpb.TableDescriptor) {
tbl.Checks = []*descpb.TableDescriptor_CheckConstraint{{
Name: "check_sampling_probability",
Expr: "sampling_probability BETWEEN 0.0:::FLOAT8 AND 1.0:::FLOAT8",
ColumnIDs: []descpb.ColumnID{8},
Name: "check_sampling_probability",
Expr: "sampling_probability BETWEEN 0.0:::FLOAT8 AND 1.0:::FLOAT8",
ColumnIDs: []descpb.ColumnID{8},
ConstraintID: tbl.NextConstraintID,
}}
tbl.NextConstraintID++
},
)

Expand Down Expand Up @@ -2851,7 +2875,9 @@ var (
ColumnIDs: []descpb.ColumnID{11},
IsNonNullConstraint: false,
FromHashShardedColumn: true,
ConstraintID: tbl.NextConstraintID,
}}
tbl.NextConstraintID++
},
)

Expand Down Expand Up @@ -3073,7 +3099,9 @@ var (
ColumnIDs: []descpb.ColumnID{8},
IsNonNullConstraint: false,
FromHashShardedColumn: true,
ConstraintID: tbl.NextConstraintID,
}}
tbl.NextConstraintID++
},
)

Expand Down Expand Up @@ -3606,10 +3634,12 @@ var (
),
func(tbl *descpb.TableDescriptor) {
tbl.Checks = []*descpb.TableDescriptor_CheckConstraint{{
Name: "check_bounds",
Expr: "start_key < end_key",
ColumnIDs: []descpb.ColumnID{1, 2},
Name: "check_bounds",
Expr: "start_key < end_key",
ColumnIDs: []descpb.ColumnID{1, 2},
ConstraintID: tbl.NextConstraintID,
}}
tbl.NextConstraintID++
},
)

Expand Down Expand Up @@ -3673,10 +3703,12 @@ var (
),
func(tbl *descpb.TableDescriptor) {
tbl.Checks = []*descpb.TableDescriptor_CheckConstraint{{
Name: "single_row",
Expr: "singleton",
ColumnIDs: []descpb.ColumnID{1},
Name: "single_row",
Expr: "singleton",
ColumnIDs: []descpb.ColumnID{1},
ConstraintID: tbl.NextConstraintID,
}}
tbl.NextConstraintID++
},
)

Expand Down
1 change: 1 addition & 0 deletions pkg/sql/logictest/testdata/logic_test/pg_lsn
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# LogicTest: !local-mixed-22.2-23.1
query T
SELECT 'A01F0/1AAA'::pg_lsn
----
Expand Down
10 changes: 10 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/pg_lsn_mixed
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# LogicTest: local-mixed-22.2-23.1
# TODO(otan): add tests for mixed 23.1-23.2.

query T
SELECT '1010F/AAAA'::pg_lsn
----
1010F/AAAA

statement error pg_lsn not supported until version 23.2
CREATE TABLE pg_lsn_table(id pg_lsn, val pg_lsn)

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions pkg/sql/pgwire/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ func TestConn(t *testing.T) {
log.Infof(context.Background(), "started listener on %s", serverAddr)

var g errgroup.Group
ctx := context.Background()
ctx, cancelConn := context.WithCancel(context.Background())
defer cancelConn()

var clientWG sync.WaitGroup
clientWG.Add(1)
Expand All @@ -103,8 +104,6 @@ func TestConn(t *testing.T) {
})

server := newTestServer()
ctx, cancelConn := context.WithCancel(ctx)
defer cancelConn()
// Wait for the client to connect and perform the handshake.
netConn, err := waitForClientConn(ln)
if err != nil {
Expand Down
9 changes: 6 additions & 3 deletions pkg/sql/tests/system_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,13 @@ func TestSystemTableLiterals(t *testing.T) {
}
}

const expectedNumberOfSystemTables = bootstrap.NumSystemTablesForSystemTenant
// Add one for the system.span_count table, which is currently the only
// non-system tenant table.
const expectedNumberOfSystemTables = bootstrap.NumSystemTablesForSystemTenant + 1
require.Equal(t, expectedNumberOfSystemTables, len(testcases))

runTest := func(name string, test testcase) {
runTest := func(t *testing.T, name string, test testcase) {
t.Helper()
privs := *test.pkg.GetPrivileges()
desc := test.pkg
// Allocate an ID to dynamically allocated system tables.
Expand Down Expand Up @@ -247,7 +250,7 @@ func TestSystemTableLiterals(t *testing.T) {

for name, test := range testcases {
t.Run(name, func(t *testing.T) {
runTest(name, test)
runTest(t, name, test)
})
}
}
10 changes: 10 additions & 0 deletions pkg/workload/schemachange/operation_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1360,6 +1360,14 @@ func (og *operationGenerator) createTable(ctx context.Context, tx pgx.Tx) (*opSt
}
return false
}()
// PGLSN was added in 23.2.
pgLSNNotSupported, err := isClusterVersionLessThan(
ctx,
tx,
clusterversion.ByKey(clusterversion.V23_2))
if err != nil {
return nil, err
}
// Forward indexes for arrays were added in 23.1, so check the index
// definitions for them in mixed version states.
forwardIndexesOnArraysNotSupported, err := isClusterVersionLessThan(
Expand Down Expand Up @@ -1460,6 +1468,8 @@ func (og *operationGenerator) createTable(ctx context.Context, tx pgx.Tx) (*opSt
opStmt.potentialExecErrors.addAll(codesWithConditions{
{code: pgcode.Syntax, condition: hasUnsupportedTSQuery},
{code: pgcode.FeatureNotSupported, condition: hasUnsupportedTSQuery},
{code: pgcode.Syntax, condition: pgLSNNotSupported},
{code: pgcode.FeatureNotSupported, condition: pgLSNNotSupported},
{code: pgcode.FeatureNotSupported, condition: hasUnsupportedIdxQueries},
{code: pgcode.InvalidTableDefinition, condition: hasUnsupportedIdxQueries},
})
Expand Down

0 comments on commit 15d43bb

Please sign in to comment.