diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 147fcc57443e..51f2a5ddfb59 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -132,6 +132,6 @@ trace.debug.enablebooleanfalseif set, traces for recent requests can be seen in the /debug page trace.lightstep.tokenstringif set, traces go to Lightstep using this token trace.zipkin.collectorstringif set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set -versioncustom validation19.1-9set the active cluster version in the format '.' +versioncustom validation19.1-10set the active cluster version in the format '.' diff --git a/pkg/ccl/backupccl/backup.go b/pkg/ccl/backupccl/backup.go index 519cb53a73dc..d2fb4dfdfb23 100644 --- a/pkg/ccl/backupccl/backup.go +++ b/pkg/ccl/backupccl/backup.go @@ -178,7 +178,7 @@ func getRelevantDescChanges( // obviously interesting to our backup. for _, i := range descs { interestingIDs[i.GetID()] = struct{}{} - if t := i.GetTable(); t != nil { + if t := i.Table(hlc.Timestamp{}); t != nil { for j := t.ReplacementOf.ID; j != sqlbase.InvalidID; j = priorIDs[j] { interestingIDs[j] = struct{}{} } @@ -200,7 +200,7 @@ func getRelevantDescChanges( return nil, err } for _, i := range starting { - if table := i.GetTable(); table != nil { + if table := i.Table(hlc.Timestamp{}); table != nil { // We need to add to interestingIDs so that if we later see a delete for // this ID we still know it is interesting to us, even though we will not // have a parentID at that point (since the delete is a nil desc). @@ -231,7 +231,7 @@ func getRelevantDescChanges( if _, ok := interestingIDs[change.ID]; ok { interestingChanges = append(interestingChanges, change) } else if change.Desc != nil { - if table := change.Desc.GetTable(); table != nil { + if table := change.Desc.Table(hlc.Timestamp{}); table != nil { if _, ok := interestingParents[table.ParentID]; ok { interestingIDs[table.ID] = struct{}{} interestingChanges = append(interestingChanges, change) @@ -279,7 +279,8 @@ func getAllDescChanges( return nil, err } r.Desc = &desc - if t := desc.GetTable(); t != nil && t.ReplacementOf.ID != sqlbase.InvalidID { + t := desc.Table(rev.Timestamp) + if t != nil && t.ReplacementOf.ID != sqlbase.InvalidID { priorIDs[t.ID] = t.ReplacementOf.ID } } @@ -303,6 +304,9 @@ func allSQLDescriptors(ctx context.Context, txn *client.Txn) ([]sqlbase.Descript return nil, errors.NewAssertionErrorWithWrappedErrf(err, "%s: unable to unmarshal SQL descriptor", row.Key) } + if row.Value != nil { + sqlDescs[i].Table(row.Value.Timestamp) + } } return sqlDescs, nil } @@ -379,7 +383,7 @@ func spansForAllTableIndexes( // in them that we didn't already get above e.g. indexes or tables that are // not in latest because they were dropped during the time window in question. for _, rev := range revs { - if tbl := rev.Desc.GetTable(); tbl != nil { + if tbl := rev.Desc.Table(hlc.Timestamp{}); tbl != nil { for _, idx := range tbl.AllNonDropIndexes() { key := tableAndIndex{tableID: tbl.ID, indexID: idx.ID} if !added[key] { @@ -1016,7 +1020,7 @@ func backupPlanHook( return err } } - if tableDesc := desc.GetTable(); tableDesc != nil { + if tableDesc := desc.Table(hlc.Timestamp{}); tableDesc != nil { if err := p.CheckPrivilege(ctx, tableDesc, privilege.SELECT); err != nil { return err } @@ -1083,7 +1087,7 @@ func backupPlanHook( tablesInPrev := make(map[sqlbase.ID]struct{}) dbsInPrev := make(map[sqlbase.ID]struct{}) for _, d := range prevBackups[len(prevBackups)-1].Descriptors { - if t := d.GetTable(); t != nil { + if t := d.Table(hlc.Timestamp{}); t != nil { tablesInPrev[t.ID] = struct{}{} } } @@ -1092,7 +1096,7 @@ func backupPlanHook( } for _, d := range targetDescs { - if t := d.GetTable(); t != nil { + if t := d.Table(hlc.Timestamp{}); t != nil { // If we're trying to use a previous backup for this table, ideally it // actually contains this table. if _, ok := tablesInPrev[t.ID]; ok { @@ -1504,7 +1508,7 @@ func maybeDowngradeTableDescsInBackupDescriptor( // Copy Descriptors so we can return a shallow copy without mutating the slice. copy(backupDescCopy.Descriptors, backupDesc.Descriptors) for i := range backupDesc.Descriptors { - if tableDesc := backupDesc.Descriptors[i].GetTable(); tableDesc != nil { + if tableDesc := backupDesc.Descriptors[i].Table(hlc.Timestamp{}); tableDesc != nil { downgraded, newDesc, err := tableDesc.MaybeDowngradeForeignKeyRepresentation(ctx, settings) if err != nil { return nil, err @@ -1534,7 +1538,7 @@ func maybeUpgradeTableDescsInBackupDescriptors( // descriptors so that they can be looked up. for _, backupDesc := range backupDescs { for _, desc := range backupDesc.Descriptors { - if table := desc.GetTable(); table != nil { + if table := desc.Table(hlc.Timestamp{}); table != nil { protoGetter.Protos[string(sqlbase.MakeDescMetadataKey(table.ID))] = sqlbase.WrapDescriptor(protoutil.Clone(table).(*sqlbase.TableDescriptor)) } @@ -1544,7 +1548,7 @@ func maybeUpgradeTableDescsInBackupDescriptors( for i := range backupDescs { backupDesc := &backupDescs[i] for j := range backupDesc.Descriptors { - if table := backupDesc.Descriptors[j].GetTable(); table != nil { + if table := backupDesc.Descriptors[j].Table(hlc.Timestamp{}); table != nil { if _, err := table.MaybeUpgradeForeignKeyRepresentation(ctx, protoGetter, skipFKsWithNoMatchingTable); err != nil { return err } diff --git a/pkg/ccl/backupccl/restore.go b/pkg/ccl/backupccl/restore.go index bdfdf83c4ab7..d8eb402af50a 100644 --- a/pkg/ccl/backupccl/restore.go +++ b/pkg/ccl/backupccl/restore.go @@ -184,7 +184,7 @@ func loadSQLDescsFromBackupsAtTime( allDescs := make([]sqlbase.Descriptor, 0, len(byID)) for _, desc := range byID { - if t := desc.GetTable(); t != nil { + if t := desc.Table(hlc.Timestamp{}); t != nil { // A table revisions may have been captured before it was in a DB that is // backed up -- if the DB is missing, filter the table. if byID[t.ParentID] == nil { @@ -212,7 +212,7 @@ func selectTargets( seenTable := false for _, desc := range matched.descs { - if desc.GetTable() != nil { + if desc.Table(hlc.Timestamp{}) != nil { seenTable = true break } @@ -1534,7 +1534,7 @@ func doRestorePlan( for _, desc := range sqlDescs { if dbDesc := desc.GetDatabase(); dbDesc != nil { databasesByID[dbDesc.ID] = dbDesc - } else if tableDesc := desc.GetTable(); tableDesc != nil { + } else if tableDesc := desc.Table(hlc.Timestamp{}); tableDesc != nil { tablesByID[tableDesc.ID] = tableDesc } } @@ -1686,7 +1686,7 @@ func createImportingTables( var tables []*sqlbase.TableDescriptor var oldTableIDs []sqlbase.ID for _, desc := range sqlDescs { - if tableDesc := desc.GetTable(); tableDesc != nil { + if tableDesc := desc.Table(hlc.Timestamp{}); tableDesc != nil { tables = append(tables, tableDesc) oldTableIDs = append(oldTableIDs, tableDesc.ID) } diff --git a/pkg/ccl/backupccl/show.go b/pkg/ccl/backupccl/show.go index 771f680df6e0..a91cffd256d4 100644 --- a/pkg/ccl/backupccl/show.go +++ b/pkg/ccl/backupccl/show.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" ) @@ -145,7 +146,7 @@ func backupShowerDefault(ctx context.Context, showSchemas bool) backupShower { var rows []tree.Datums var row tree.Datums for _, descriptor := range desc.Descriptors { - if table := descriptor.GetTable(); table != nil { + if table := descriptor.Table(hlc.Timestamp{}); table != nil { dbName := descs[table.ParentID] row = tree.Datums{ tree.NewDString(dbName), diff --git a/pkg/ccl/backupccl/targets.go b/pkg/ccl/backupccl/targets.go index 4ad651a12289..dfae8d2321ce 100644 --- a/pkg/ccl/backupccl/targets.go +++ b/pkg/ccl/backupccl/targets.go @@ -14,6 +14,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/pkg/errors" ) @@ -121,7 +122,7 @@ func newDescriptorResolver(descs []sqlbase.Descriptor) (*descriptorResolver, err } // Now on to the tables. for _, desc := range descs { - if tbDesc := desc.GetTable(); tbDesc != nil { + if tbDesc := desc.Table(hlc.Timestamp{}); tbDesc != nil { if tbDesc.Dropped() { continue } @@ -214,7 +215,7 @@ func descriptorsMatchingTargets( desc := descI.(sqlbase.Descriptor) // If the parent database is not requested already, request it now - parentID := desc.GetTable().GetParentID() + parentID := desc.Table(hlc.Timestamp{}).GetParentID() if _, ok := alreadyRequestedDBs[parentID]; !ok { parentDesc := resolver.descByID[parentID] ret.descs = append(ret.descs, parentDesc) diff --git a/pkg/ccl/backupccl/targets_test.go b/pkg/ccl/backupccl/targets_test.go index 7a69d832f0d5..c86676699fcf 100644 --- a/pkg/ccl/backupccl/targets_test.go +++ b/pkg/ccl/backupccl/targets_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" ) @@ -34,6 +35,10 @@ func TestDescriptorsMatchingTargets(t *testing.T) { *sqlbase.WrapDescriptor(&sqlbase.DatabaseDescriptor{ID: 3, Name: "data"}), *sqlbase.WrapDescriptor(&sqlbase.DatabaseDescriptor{ID: 5, Name: "empty"}), } + // Set the timestamp on the table descriptors. + for _, d := range descriptors { + d.Table(hlc.Timestamp{WallTime: 1}) + } tests := []struct { sessionDatabase string diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 7f2e8b0debef..af6736fde112 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -200,7 +200,7 @@ func changefeedPlanHook( } targets := make(jobspb.ChangefeedTargets, len(targetDescs)) for _, desc := range targetDescs { - if tableDesc := desc.GetTable(); tableDesc != nil { + if tableDesc := desc.Table(hlc.Timestamp{}); tableDesc != nil { targets[tableDesc.ID] = jobspb.ChangefeedTarget{ StatementTimeName: tableDesc.Name, } diff --git a/pkg/ccl/changefeedccl/table_history.go b/pkg/ccl/changefeedccl/table_history.go index 13f048a9f188..342dd140fd5b 100644 --- a/pkg/ccl/changefeedccl/table_history.go +++ b/pkg/ccl/changefeedccl/table_history.go @@ -260,7 +260,8 @@ func fetchTableDescriptorVersions( } else if !ok { return nil } - remaining, _, _, err := sqlbase.DecodeTableIDIndexID(it.UnsafeKey().Key) + k := it.UnsafeKey() + remaining, _, _, err := sqlbase.DecodeTableIDIndexID(k.Key) if err != nil { return err } @@ -282,7 +283,7 @@ func fetchTableDescriptorVersions( if err := value.GetProto(&desc); err != nil { return err } - if tableDesc := desc.GetTable(); tableDesc != nil { + if tableDesc := desc.Table(k.Timestamp); tableDesc != nil { tableDescs = append(tableDescs, tableDesc) } } diff --git a/pkg/ccl/cliccl/load.go b/pkg/ccl/cliccl/load.go index d7a44afc5bb0..bf1d98fd15ee 100644 --- a/pkg/ccl/cliccl/load.go +++ b/pkg/ccl/cliccl/load.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" "github.com/cockroachdb/cockroach/pkg/cli" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/pkg/errors" @@ -96,7 +97,7 @@ func runLoadShow(cmd *cobra.Command, args []string) error { // in case more fields need to be added to the output. fmt.Printf("Descriptors:\n") for _, d := range desc.Descriptors { - if desc := d.GetTable(); desc != nil { + if desc := d.Table(hlc.Timestamp{}); desc != nil { fmt.Printf(" %d: %s (table)\n", d.GetID(), d.GetName()) } if desc := d.GetDatabase(); desc != nil { diff --git a/pkg/ccl/storageccl/bench_test.go b/pkg/ccl/storageccl/bench_test.go index 0d0bdf28455e..85e465c50db6 100644 --- a/pkg/ccl/storageccl/bench_test.go +++ b/pkg/ccl/storageccl/bench_test.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/workload/bank" ) @@ -173,7 +174,7 @@ func BenchmarkImport(b *testing.B) { { // TODO(dan): The following should probably make it into // dataccl.Backup somehow. - tableDesc := backup.Desc.Descriptors[len(backup.Desc.Descriptors)-1].GetTable() + tableDesc := backup.Desc.Descriptors[len(backup.Desc.Descriptors)-1].Table(hlc.Timestamp{}) if tableDesc == nil || tableDesc.ParentID == keys.SystemDatabaseID { b.Fatalf("bad table descriptor: %+v", tableDesc) } diff --git a/pkg/ccl/storageccl/key_rewriter.go b/pkg/ccl/storageccl/key_rewriter.go index 9f7bd947d658..8df90829ab74 100644 --- a/pkg/ccl/storageccl/key_rewriter.go +++ b/pkg/ccl/storageccl/key_rewriter.go @@ -14,6 +14,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/pkg/errors" ) @@ -63,7 +64,7 @@ func MakeKeyRewriterFromRekeys(rekeys []roachpb.ImportRequest_TableRekey) (*KeyR if err := protoutil.Unmarshal(rekey.NewDesc, &desc); err != nil { return nil, errors.Wrapf(err, "unmarshalling rekey descriptor for old table id %d", rekey.OldID) } - table := desc.GetTable() + table := desc.Table(hlc.Timestamp{}) if table == nil { return nil, errors.New("expected a table descriptor") } diff --git a/pkg/ccl/storageccl/key_rewriter_test.go b/pkg/ccl/storageccl/key_rewriter_test.go index 9c45346dc219..b7385dc62f36 100644 --- a/pkg/ccl/storageccl/key_rewriter_test.go +++ b/pkg/ccl/storageccl/key_rewriter_test.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/protoutil" ) @@ -140,8 +141,11 @@ func TestKeyRewriter(t *testing.T) { }) } -func mustMarshalDesc(t *testing.T, desc *sqlbase.TableDescriptor) []byte { - bytes, err := protoutil.Marshal(sqlbase.WrapDescriptor(desc)) +func mustMarshalDesc(t *testing.T, tableDesc *sqlbase.TableDescriptor) []byte { + desc := sqlbase.WrapDescriptor(tableDesc) + // Set the timestamp to a non-zero value. + desc.Table(hlc.Timestamp{WallTime: 1}) + bytes, err := protoutil.Marshal(desc) if err != nil { t.Fatal(err) } diff --git a/pkg/ccl/utilccl/sampledataccl/bankdata.go b/pkg/ccl/utilccl/sampledataccl/bankdata.go index c8b20247bf93..acd5e8af621d 100644 --- a/pkg/ccl/utilccl/sampledataccl/bankdata.go +++ b/pkg/ccl/utilccl/sampledataccl/bankdata.go @@ -96,7 +96,7 @@ func (b *Backup) NextKeyValues( ) ([]engine.MVCCKeyValue, roachpb.Span, error) { var userTables []*sqlbase.TableDescriptor for _, d := range b.Desc.Descriptors { - if t := d.GetTable(); t != nil && t.ParentID != keys.SystemDatabaseID { + if t := d.Table(hlc.Timestamp{}); t != nil && t.ParentID != keys.SystemDatabaseID { userTables = append(userTables, t) } } diff --git a/pkg/internal/client/db.go b/pkg/internal/client/db.go index 304058859a4c..94e6aaa00432 100644 --- a/pkg/internal/client/db.go +++ b/pkg/internal/client/db.go @@ -28,10 +28,15 @@ import ( ) // KeyValue represents a single key/value pair. This is similar to -// roachpb.KeyValue except that the value may be nil. +// roachpb.KeyValue except that the value may be nil. The timestamp +// in the value will be populated with the MVCC timestamp at which this +// value was read if this struct was produced by a GetRequest or +// ScanRequest which uses the KEY_VALUES ScanFormat. Values created from +// a ScanRequest which uses the BATCH_RESPONSE ScanFormat will contain a +// zero Timestamp. type KeyValue struct { Key roachpb.Key - Value *roachpb.Value // Timestamp will always be zero + Value *roachpb.Value } func (kv *KeyValue) String() string { @@ -319,11 +324,28 @@ func (db *DB) Get(ctx context.Context, key interface{}) (KeyValue, error) { // // key can be either a byte slice or a string. func (db *DB) GetProto(ctx context.Context, key interface{}, msg protoutil.Message) error { + _, err := db.GetProtoTs(ctx, key, msg) + return err +} + +// GetProtoTs retrieves the value for a key and decodes the result as a proto +// message. It additionally returns the timestamp at which the key was read. +// If the key doesn't exist, the proto will simply be reset and a zero timestamp +// will be returned. A zero timestamp will also be returned if unmarshaling +// fails. +// +// key can be either a byte slice or a string. +func (db *DB) GetProtoTs( + ctx context.Context, key interface{}, msg protoutil.Message, +) (hlc.Timestamp, error) { r, err := db.Get(ctx, key) if err != nil { - return err + return hlc.Timestamp{}, err + } + if err := r.ValueProto(msg); err != nil || r.Value == nil { + return hlc.Timestamp{}, err } - return r.ValueProto(msg) + return r.Value.Timestamp, nil } // Put sets the value for a key. diff --git a/pkg/internal/client/txn.go b/pkg/internal/client/txn.go index 7c35b3cf91b4..6892fa67556a 100644 --- a/pkg/internal/client/txn.go +++ b/pkg/internal/client/txn.go @@ -263,14 +263,6 @@ func (txn *Txn) CommitTimestamp() hlc.Timestamp { return txn.mu.sender.CommitTimestamp() } -// CommitTimestampFixed returns true if the commit timestamp has -// been fixed to the start timestamp and cannot be pushed forward. -func (txn *Txn) CommitTimestampFixed() bool { - txn.mu.Lock() - defer txn.mu.Unlock() - return txn.mu.sender.CommitTimestampFixed() -} - // SetSystemConfigTrigger sets the system db trigger to true on this transaction. // This will impact the EndTransactionRequest. func (txn *Txn) SetSystemConfigTrigger() error { @@ -319,11 +311,28 @@ func (txn *Txn) Get(ctx context.Context, key interface{}) (KeyValue, error) { // // key can be either a byte slice or a string. func (txn *Txn) GetProto(ctx context.Context, key interface{}, msg protoutil.Message) error { + _, err := txn.GetProtoTs(ctx, key, msg) + return err +} + +// GetProtoTs retrieves the value for a key and decodes the result as a proto +// message. It additionally returns the timestamp at which the key was read. +// If the key doesn't exist, the proto will simply be reset and a zero timestamp +// will be returned. A zero timestamp will also be returned if unmarshaling +// fails. +// +// key can be either a byte slice or a string. +func (txn *Txn) GetProtoTs( + ctx context.Context, key interface{}, msg protoutil.Message, +) (hlc.Timestamp, error) { r, err := txn.Get(ctx, key) if err != nil { - return err + return hlc.Timestamp{}, err + } + if err := r.ValueProto(msg); err != nil || r.Value == nil { + return hlc.Timestamp{}, err } - return r.ValueProto(msg) + return r.Value.Timestamp, nil } // Put sets the value for a key diff --git a/pkg/server/updates.go b/pkg/server/updates.go index 8f45f8034bc4..df3f12262d43 100644 --- a/pkg/server/updates.go +++ b/pkg/server/updates.go @@ -497,7 +497,7 @@ func (s *Server) collectSchemaInfo(ctx context.Context) ([]sqlbase.TableDescript if err := kv.ValueProto(&desc); err != nil { return nil, errors.Wrapf(err, "%s: unable to unmarshal SQL descriptor", kv.Key) } - if t := desc.GetTable(); t != nil && t.ID > keys.MaxReservedDescID { + if t := desc.Table(kv.Value.Timestamp); t != nil && t.ID > keys.MaxReservedDescID { if err := reflectwalk.Walk(t, redactor); err != nil { panic(err) // stringRedactor never returns a non-nil err } diff --git a/pkg/settings/cluster/cockroach_versions.go b/pkg/settings/cluster/cockroach_versions.go index 7284c3fa1685..ba578fec8fa0 100644 --- a/pkg/settings/cluster/cockroach_versions.go +++ b/pkg/settings/cluster/cockroach_versions.go @@ -44,6 +44,7 @@ const ( VersionTopLevelForeignKeys VersionAtomicChangeReplicasTrigger VersionAtomicChangeReplicas + VersionTableDescModificationTimeFromMVCC // Add new versions here (step one of two). @@ -532,6 +533,18 @@ var versionsSingleton = keyedVersions([]keyedVersion{ Key: VersionAtomicChangeReplicas, Version: roachpb.Version{Major: 19, Minor: 1, Unstable: 9}, }, + { + // VersionTableDescModificationTimeFromMVCC is https://github.com/cockroachdb/cockroach/pull/40581 + // + // It represents an upgrade to the table descriptor format in which + // CreateAsOfTime and ModifiedTime are set to zero when new versions of + // table descriptors are written. This removes the need to fix the commit + // timestamp for transactions which update table descriptors. The value + // is then populated by the reading client with the MVCC timestamp of the + // row which contained the serialized table descriptor. + Key: VersionTableDescModificationTimeFromMVCC, + Version: roachpb.Version{Major: 19, Minor: 1, Unstable: 10}, + }, // Add new versions here (step two of two). diff --git a/pkg/settings/cluster/versionkey_string.go b/pkg/settings/cluster/versionkey_string.go index 03adc824bd13..110801e12816 100644 --- a/pkg/settings/cluster/versionkey_string.go +++ b/pkg/settings/cluster/versionkey_string.go @@ -21,11 +21,12 @@ func _() { _ = x[VersionTopLevelForeignKeys-10] _ = x[VersionAtomicChangeReplicasTrigger-11] _ = x[VersionAtomicChangeReplicas-12] + _ = x[VersionTableDescModificationTimeFromMVCC-13] } -const _VersionKey_name = "Version2_1VersionUnreplicatedRaftTruncatedStateVersionSideloadedStorageNoReplicaIDVersion19_1VersionStart19_2VersionQueryTxnTimestampVersionStickyBitVersionParallelCommitsVersionGenerationComparableVersionLearnerReplicasVersionTopLevelForeignKeysVersionAtomicChangeReplicasTriggerVersionAtomicChangeReplicas" +const _VersionKey_name = "Version2_1VersionUnreplicatedRaftTruncatedStateVersionSideloadedStorageNoReplicaIDVersion19_1VersionStart19_2VersionQueryTxnTimestampVersionStickyBitVersionParallelCommitsVersionGenerationComparableVersionLearnerReplicasVersionTopLevelForeignKeysVersionAtomicChangeReplicasTriggerVersionAtomicChangeReplicasVersionTableDescModificationTimeFromMVCC" -var _VersionKey_index = [...]uint16{0, 10, 47, 82, 93, 109, 133, 149, 171, 198, 220, 246, 280, 307} +var _VersionKey_index = [...]uint16{0, 10, 47, 82, 93, 109, 133, 149, 171, 198, 220, 246, 280, 307, 347} func (i VersionKey) String() string { if i < 0 || i >= VersionKey(len(_VersionKey_index)-1) { diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index d4b371499b58..5189b943698d 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -468,6 +468,10 @@ func (ex *connExecutor) execStmtInOpenState( // leases will only go down over time: no new conflicting leases can be // created as of the time of this call because v-2 can't be leased once // v-1 exists. +// +// If this method succeeds it is the caller's responsibility to release the +// executor's table leases after the txn commits so that schema changes can +// proceed. func (ex *connExecutor) checkTableTwoVersionInvariant(ctx context.Context) error { tables := ex.extraTxnState.tables.getTablesWithNewVersion() if tables == nil { @@ -477,40 +481,36 @@ func (ex *connExecutor) checkTableTwoVersionInvariant(ctx context.Context) error if txn.IsCommitted() { panic("transaction has already committed") } - if !txn.CommitTimestampFixed() { - panic("commit timestamp was not fixed") - } - // Release leases here for two reasons: - // 1. If there are existing leases at version V-2 for a descriptor - // being modified to version V being held the wait loop below that - // waits on a cluster wide release of old version leases will hang - // until these leases expire. - // 2. Once this transaction commits, the schema changers run and - // increment the version of the modified descriptors. If one of the - // descriptors being modified has a lease being held the schema - // changers will stall until the leases expire. - // - // The above two cases can be satified by releasing leases for both - // cases explicitly, but we prefer to call it here and kill two birds - // with one stone. + // We potentially hold leases for tables which we've modified which + // we need to drop. Say we're updating tables at version V. All leases + // for version V-2 need to be dropped immediately, otherwise the check + // below that nobody holds leases for version V-2 will fail. Worse yet, + // the code below loops waiting for nobody to hold leases on V-2. We also + // may hold leases for version V-1 of modified tables that are good to drop + // but not as vital for correctness. It's good to drop them because as soon + // as this transaction commits jobs may start and will need to wait until + // the lease expires. It is safe because V-1 must remain valid until this + // transaction commits; if we commit then nobody else could have written + // a new V beneath us because we've already laid down an intent. // - // It is safe to release leases even though the transaction hasn't yet - // committed only because the transaction timestamp has been fixed using - // CommitTimestamp(). - // - // releaseLeases can fail to release a lease if the server is shutting - // down. This is okay because it will result in the two cases mentioned - // above simply hanging until the expiration time for the leases. - ex.extraTxnState.tables.releaseLeases(ctx) - - count, err := CountLeases(ctx, ex.server.cfg.InternalExecutor, tables, txn.OrigTimestamp()) + // All this being said, we must retain our leases on tables which we have + // not modified to ensure that our writes to those other tables in this + // transaction remain valid. + ex.extraTxnState.tables.releaseTableLeases(ctx, tables) + + // We know that so long as there are no leases on the updated tables as of + // the current provisional commit timestamp for this transaction then if this + // transaction ends up committing then there won't have been any created + // in the meantime. + count, err := CountLeases(ctx, ex.server.cfg.InternalExecutor, tables, txn.Serialize().Timestamp) if err != nil { return err } if count == 0 { return nil } + // Restart the transaction so that it is able to replay itself at a newer timestamp // with the hope that the next time around there will be leases only at the current // version. @@ -533,6 +533,9 @@ func (ex *connExecutor) checkTableTwoVersionInvariant(ctx context.Context) error // We cleanup the transaction and create a new transaction wait time // might be extensive and so we'd better get rid of all the intents. txn.CleanupOnError(ctx, retryErr) + // Release the rest of our leases on unmodified tables so we don't hold up + // schema changes there and potentially create a deadlock. + ex.extraTxnState.tables.releaseLeases(ctx) // Wait until all older version leases have been released or expired. for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); { @@ -580,6 +583,13 @@ func (ex *connExecutor) commitSQLTransaction( return ex.makeErrEvent(err, stmt) } + // Now that we've committed, if we modified any table we need to make sure + // to release the leases for them so that the schema change can proceed and + // we don't block the client. + if tables := ex.extraTxnState.tables.getTablesWithNewVersion(); tables != nil { + ex.extraTxnState.tables.releaseLeases(ctx) + } + if !isRelease { return eventTxnFinish{}, eventTxnFinishPayload{commit: true} } diff --git a/pkg/sql/create_sequence.go b/pkg/sql/create_sequence.go index a6e11884eccb..833528d944b6 100644 --- a/pkg/sql/create_sequence.go +++ b/pkg/sql/create_sequence.go @@ -77,7 +77,7 @@ func doCreateSequence( privs := dbDesc.GetPrivileges() desc, err := MakeSequenceTableDesc(name.Table(), opts, - dbDesc.ID, id, params.p.txn.CommitTimestamp(), privs, params.EvalContext().Settings) + dbDesc.ID, id, params.creationTimeForNewTableDescriptor(), privs, params.EvalContext().Settings) if err != nil { return err } diff --git a/pkg/sql/create_table.go b/pkg/sql/create_table.go index 8a396c5f6508..f3b79f4a0795 100644 --- a/pkg/sql/create_table.go +++ b/pkg/sql/create_table.go @@ -164,7 +164,7 @@ func (n *createTableNode) startExec(params runParams) error { var asCols sqlbase.ResultColumns var desc sqlbase.MutableTableDescriptor var affected map[sqlbase.ID]*sqlbase.MutableTableDescriptor - creationTime := params.p.txn.CommitTimestamp() + creationTime := params.creationTimeForNewTableDescriptor() if n.n.As() { asCols = planColumns(n.sourcePlan) if !n.run.fromHeuristicPlanner && !n.n.AsHasUserSpecifiedPrimaryKey() { @@ -1078,7 +1078,6 @@ func MakeTableDesc( evalCtx *tree.EvalContext, ) (sqlbase.MutableTableDescriptor, error) { desc := InitTableDescriptor(id, parentID, n.Table.Table(), creationTime, privileges) - for _, def := range n.Defs { if d, ok := def.(*tree.ColumnTableDef); ok { if !desc.IsVirtualTable() { diff --git a/pkg/sql/create_view.go b/pkg/sql/create_view.go index 5a7ce35d8f6e..48451b29f44e 100644 --- a/pkg/sql/create_view.go +++ b/pkg/sql/create_view.go @@ -135,7 +135,7 @@ func (n *createViewNode) startExec(params runParams) error { n.dbDesc.ID, id, n.columns, - params.p.txn.CommitTimestamp(), + params.creationTimeForNewTableDescriptor(), privs, ¶ms.p.semaCtx, ) diff --git a/pkg/sql/descriptor.go b/pkg/sql/descriptor.go index 6c887614acba..dde054269400 100644 --- a/pkg/sql/descriptor.go +++ b/pkg/sql/descriptor.go @@ -165,13 +165,13 @@ func getDescriptorByID( log.Eventf(ctx, "fetching descriptor with ID %d", id) descKey := sqlbase.MakeDescMetadataKey(id) desc := &sqlbase.Descriptor{} - if err := txn.GetProto(ctx, descKey, desc); err != nil { + ts, err := txn.GetProtoTs(ctx, descKey, desc) + if err != nil { return err } - switch t := descriptor.(type) { case *sqlbase.TableDescriptor: - table := desc.GetTable() + table := desc.Table(ts) if table == nil { return pgerror.Newf(pgcode.WrongObjectType, "%q is not a table", desc.String()) @@ -216,7 +216,7 @@ func GetAllDescriptors(ctx context.Context, txn *client.Txn) ([]sqlbase.Descript } switch t := desc.Union.(type) { case *sqlbase.Descriptor_Table: - table := desc.GetTable() + table := desc.Table(kv.Value.Timestamp) if err := table.MaybeFillInDescriptor(ctx, txn); err != nil { return nil, err } diff --git a/pkg/sql/drop_test.go b/pkg/sql/drop_test.go index 0277b07f4303..2f1aaac7fe48 100644 --- a/pkg/sql/drop_test.go +++ b/pkg/sql/drop_test.go @@ -157,10 +157,11 @@ INSERT INTO t.kv VALUES ('c', 'e'), ('a', 'c'), ('b', 'd'); t.Fatalf(`table "kv" does not exist`) } tbDescKey := sqlbase.MakeDescMetadataKey(sqlbase.ID(gr.ValueInt())) - if err := kvDB.GetProto(ctx, tbDescKey, desc); err != nil { + ts, err := kvDB.GetProtoTs(ctx, tbDescKey, desc) + if err != nil { t.Fatal(err) } - tbDesc := desc.GetTable() + tbDesc := desc.Table(ts) // Add a zone config for both the table and database. cfg := config.DefaultZoneConfig() @@ -330,10 +331,11 @@ INSERT INTO t.kv2 VALUES ('c', 'd'), ('a', 'b'), ('e', 'a'); t.Fatalf(`table "kv" does not exist`) } tbDescKey := sqlbase.MakeDescMetadataKey(sqlbase.ID(gr.ValueInt())) - if err := kvDB.GetProto(ctx, tbDescKey, desc); err != nil { + ts, err := kvDB.GetProtoTs(ctx, tbDescKey, desc) + if err != nil { t.Fatal(err) } - tbDesc := desc.GetTable() + tbDesc := desc.Table(ts) tb2NameKey := sqlbase.MakeNameMetadataKey(dbDesc.ID, "kv2") gr2, err := kvDB.Get(ctx, tb2NameKey) @@ -344,10 +346,11 @@ INSERT INTO t.kv2 VALUES ('c', 'd'), ('a', 'b'), ('e', 'a'); t.Fatalf(`table "kv2" does not exist`) } tb2DescKey := sqlbase.MakeDescMetadataKey(sqlbase.ID(gr2.ValueInt())) - if err := kvDB.GetProto(ctx, tb2DescKey, desc); err != nil { + ts, err = kvDB.GetProtoTs(ctx, tb2DescKey, desc) + if err != nil { t.Fatal(err) } - tb2Desc := desc.GetTable() + tb2Desc := desc.Table(ts) tableSpan := tbDesc.TableSpan() table2Span := tb2Desc.TableSpan() diff --git a/pkg/sql/lease.go b/pkg/sql/lease.go index 4ff3c47e52d9..8e0f8aa8cfa5 100644 --- a/pkg/sql/lease.go +++ b/pkg/sql/lease.go @@ -306,10 +306,11 @@ func (s LeaseStore) WaitForOneVersion( // Get the current version of the table descriptor non-transactionally. // // TODO(pmattis): Do an inconsistent read here? - if err := s.db.GetProto(ctx, descKey, desc); err != nil { + ts, err := s.db.GetProtoTs(ctx, descKey, desc) + if err != nil { return 0, err } - tableDesc = desc.GetTable() + tableDesc = desc.Table(ts) if tableDesc == nil { return 0, errors.Errorf("ID %d is not a table", tableID) } @@ -404,7 +405,7 @@ func (s LeaseStore) PublishMultiple( descsToUpdate[id].Version, versions[id]) } - if err := descsToUpdate[id].MaybeIncrementVersion(ctx, txn); err != nil { + if err := descsToUpdate[id].MaybeIncrementVersion(ctx, txn, s.settings); err != nil { return err } if err := descsToUpdate[id].ValidateTable(); err != nil { @@ -553,10 +554,11 @@ func (s LeaseStore) getForExpiration( prevTimestamp := expiration.Prev() txn.SetFixedTimestamp(ctx, prevTimestamp) var desc sqlbase.Descriptor - if err := txn.GetProto(ctx, descKey, &desc); err != nil { + ts, err := txn.GetProtoTs(ctx, descKey, &desc) + if err != nil { return err } - tableDesc := desc.GetTable() + tableDesc := desc.Table(ts) if tableDesc == nil { return sqlbase.ErrDescriptorNotFound } diff --git a/pkg/sql/lease_test.go b/pkg/sql/lease_test.go index 84dbf6dd8711..31e9fb4fd89a 100644 --- a/pkg/sql/lease_test.go +++ b/pkg/sql/lease_test.go @@ -575,7 +575,7 @@ func isDeleted(tableID sqlbase.ID, cfg *config.SystemConfig) bool { if err := val.GetProto(&descriptor); err != nil { panic("unable to unmarshal table descriptor") } - table := descriptor.GetTable() + table := descriptor.Table(val.Timestamp) return table.Dropped() } @@ -1646,11 +1646,12 @@ CREATE TABLE t.test0 (k CHAR PRIMARY KEY, v CHAR); // Look up the descriptor. descKey := sqlbase.MakeDescMetadataKey(descID) dbDesc := &sqlbase.Descriptor{} - if err := txn.GetProto(ctx, descKey, dbDesc); err != nil { + ts, err := txn.GetProtoTs(ctx, descKey, dbDesc) + if err != nil { t.Fatalf("error while reading proto: %v", err) } // Look at the descriptor that comes back from the database. - dbTable := dbDesc.GetTable() + dbTable := dbDesc.Table(ts) if dbTable.Version != table.Version || dbTable.ModificationTime != table.ModificationTime { t.Fatalf("db has version %d at ts %s, expected version %d at ts %s", diff --git a/pkg/sql/logictest/testdata/logic_test/crdb_internal b/pkg/sql/logictest/testdata/logic_test/crdb_internal index a2d80b452460..74a429fa9d25 100644 --- a/pkg/sql/logictest/testdata/logic_test/crdb_internal +++ b/pkg/sql/logictest/testdata/logic_test/crdb_internal @@ -78,11 +78,12 @@ SELECT * FROM crdb_internal.schema_changes ---- table_id parent_id name type target_id target_name state direction -query IITTITRTTTTTTT colnames -SELECT * FROM crdb_internal.tables WHERE NAME = 'namespace' +# We don't select the modification time as it does not remain contant. +query IITTITTTTTTT colnames +SELECT table_id, parent_id, name, database_name, version, format_version, state, sc_lease_node_id, sc_lease_expiration_time, drop_time, audit_mode, schema_name FROM crdb_internal.tables WHERE NAME = 'namespace' ---- -table_id parent_id name database_name version mod_time mod_time_logical format_version state sc_lease_node_id sc_lease_expiration_time drop_time audit_mode schema_name -2 1 namespace system 1 1970-01-01 00:00:00 +0000 +0000 0E-10 InterleavedFormatVersion PUBLIC NULL NULL NULL DISABLED public +table_id parent_id name database_name version format_version state sc_lease_node_id sc_lease_expiration_time drop_time audit_mode schema_name +2 1 namespace system 1 InterleavedFormatVersion PUBLIC NULL NULL NULL DISABLED public # Verify that table names are not double escaped. diff --git a/pkg/sql/logictest/testdata/logic_test/schema_change_retry b/pkg/sql/logictest/testdata/logic_test/schema_change_retry index 4d0c12e8a3be..04acf7ab130f 100644 --- a/pkg/sql/logictest/testdata/logic_test/schema_change_retry +++ b/pkg/sql/logictest/testdata/logic_test/schema_change_retry @@ -3,6 +3,12 @@ # Schema changes that experienced retriable errors in RELEASE # SAVEPOINT would previously deadlock. +# Prevent transaction refreshes so we encounter the necessary +# TransactionRetryError below. + +statement ok +SET CLUSTER SETTING kv.transaction.max_refresh_spans_bytes = 0; + statement ok BEGIN diff --git a/pkg/sql/opt/exec/execbuilder/testdata/show_trace b/pkg/sql/opt/exec/execbuilder/testdata/show_trace index 868790669b6e..e4d960ad8f2f 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/show_trace +++ b/pkg/sql/opt/exec/execbuilder/testdata/show_trace @@ -38,7 +38,7 @@ WHERE message NOT LIKE '%Z/%' AND operation != 'dist sender send' ---- flow CPut /Table/2/1/53/"kv"/3/1 -> 54 -flow CPut /Table/3/1/54/2/1 -> table: columns: > nullable:false hidden:false > columns: > nullable:true hidden:false > next_column_id:3 families: next_family_id:1 primary_index: interleave:<> partitioning: type:FORWARD > next_index_id:2 privileges: users: > next_mutation_id:1 format_version:3 state:PUBLIC offline_reason:"" view_query:"" drop_time:0 replacement_of: > audit_mode:DISABLED drop_job_id:0 create_query:"" create_as_of_time: > +flow CPut /Table/3/1/54/2/1 -> table: columns: > nullable:false hidden:false > columns: > nullable:true hidden:false > next_column_id:3 families: next_family_id:1 primary_index: interleave:<> partitioning: type:FORWARD > next_index_id:2 privileges: users: > next_mutation_id:1 format_version:3 state:PUBLIC offline_reason:"" view_query:"" drop_time:0 replacement_of: > audit_mode:DISABLED drop_job_id:0 create_query:"" create_as_of_time:<> > exec stmt rows affected: 0 # We avoid using the full trace output, because that would make the @@ -64,7 +64,7 @@ WHERE message NOT LIKE '%Z/%' AND message NOT LIKE 'querying next range at%' AND tag NOT LIKE '%IndexBackfiller%' AND operation != 'dist sender send' ---- -flow Put /Table/3/1/54/2/1 -> table: columns: > nullable:false hidden:false > columns: > nullable:true hidden:false > next_column_id:3 families: next_family_id:1 primary_index: interleave:<> partitioning: type:FORWARD > next_index_id:3 privileges: users: > mutations: interleave:<> partitioning: type:FORWARD > state:DELETE_ONLY direction:ADD mutation_id:1 rollback:false > next_mutation_id:2 format_version:3 state:PUBLIC offline_reason:"" view_query:"" mutationJobs:<...> drop_time:0 replacement_of: > audit_mode:DISABLED drop_job_id:0 create_query:"" create_as_of_time: > +flow Put /Table/3/1/54/2/1 -> table: columns: > nullable:false hidden:false > columns: > nullable:true hidden:false > next_column_id:3 families: next_family_id:1 primary_index: interleave:<> partitioning: type:FORWARD > next_index_id:3 privileges: users: > mutations: interleave:<> partitioning: type:FORWARD > state:DELETE_ONLY direction:ADD mutation_id:1 rollback:false > next_mutation_id:2 format_version:3 state:PUBLIC offline_reason:"" view_query:"" mutationJobs:<...> drop_time:0 replacement_of: > audit_mode:DISABLED drop_job_id:0 create_query:"" create_as_of_time: > exec stmt rows affected: 0 statement ok @@ -123,7 +123,7 @@ WHERE message NOT LIKE '%Z/%' ---- table reader Scan /Table/54/{1-2} flow CPut /Table/2/1/53/"kv2"/3/1 -> 55 -flow CPut /Table/3/1/55/2/1 -> table: columns: > nullable:true hidden:false > columns: > nullable:true hidden:false > columns: > nullable:false default_expr:"unique_rowid()" hidden:true > next_column_id:4 families: next_family_id:1 primary_index: interleave:<> partitioning: type:FORWARD > next_index_id:2 privileges: users: > next_mutation_id:1 format_version:3 state:ADD offline_reason:"" view_query:"" drop_time:0 replacement_of: > audit_mode:DISABLED drop_job_id:0 create_query:"TABLE t.public.kv" create_as_of_time: > +flow CPut /Table/3/1/55/2/1 -> table: columns: > nullable:true hidden:false > columns: > nullable:true hidden:false > columns: > nullable:false default_expr:"unique_rowid()" hidden:true > next_column_id:4 families: next_family_id:1 primary_index: interleave:<> partitioning: type:FORWARD > next_index_id:2 privileges: users: > next_mutation_id:1 format_version:3 state:ADD offline_reason:"" view_query:"" drop_time:0 replacement_of: > audit_mode:DISABLED drop_job_id:0 create_query:"TABLE t.public.kv" create_as_of_time:<> > exec stmt rows affected: 0 statement ok @@ -172,7 +172,7 @@ WHERE message NOT LIKE '%Z/%' AND message NOT LIKE 'querying next range at%' AND tag NOT LIKE '%IndexBackfiller%' AND operation != 'dist sender send' ---- -flow Put /Table/3/1/55/2/1 -> table: columns: > nullable:true hidden:false > columns: > nullable:true hidden:false > columns: > nullable:false default_expr:"unique_rowid()" hidden:true > next_column_id:4 families: next_family_id:1 primary_index: interleave:<> partitioning: type:FORWARD > next_index_id:2 privileges: users: > next_mutation_id:1 format_version:3 state:DROP offline_reason:"" draining_names: view_query:"" drop_time:... replacement_of: > audit_mode:DISABLED drop_job_id:... create_query:"TABLE t.public.kv" create_as_of_time: > +flow Put /Table/3/1/55/2/1 -> table: columns: > nullable:true hidden:false > columns: > nullable:true hidden:false > columns: > nullable:false default_expr:"unique_rowid()" hidden:true > next_column_id:4 families: next_family_id:1 primary_index: interleave:<> partitioning: type:FORWARD > next_index_id:2 privileges: users: > next_mutation_id:1 format_version:3 state:DROP offline_reason:"" draining_names: view_query:"" drop_time:... replacement_of: > audit_mode:DISABLED drop_job_id:... create_query:"TABLE t.public.kv" create_as_of_time: > exec stmt rows affected: 0 statement ok @@ -207,7 +207,7 @@ WHERE message NOT LIKE '%Z/%' AND message NOT LIKE 'querying next range at%' AND tag NOT LIKE '%IndexBackfiller%' AND operation != 'dist sender send' ---- -flow Put /Table/3/1/54/2/1 -> table: columns: > nullable:false hidden:false > columns: > nullable:true hidden:false > next_column_id:3 families: next_family_id:1 primary_index: interleave:<> partitioning: type:FORWARD > next_index_id:3 privileges: users: > mutations: interleave:<> partitioning: type:FORWARD > state:DELETE_AND_WRITE_ONLY direction:DROP mutation_id:2 rollback:false > next_mutation_id:3 format_version:3 state:PUBLIC offline_reason:"" view_query:"" mutationJobs:<...> drop_time:0 replacement_of: > audit_mode:DISABLED drop_job_id:0 create_query:"" create_as_of_time: > +flow Put /Table/3/1/54/2/1 -> table: columns: > nullable:false hidden:false > columns: > nullable:true hidden:false > next_column_id:3 families: next_family_id:1 primary_index: interleave:<> partitioning: type:FORWARD > next_index_id:3 privileges: users: > mutations: interleave:<> partitioning: type:FORWARD > state:DELETE_AND_WRITE_ONLY direction:DROP mutation_id:2 rollback:false > next_mutation_id:3 format_version:3 state:PUBLIC offline_reason:"" view_query:"" mutationJobs:<...> drop_time:0 replacement_of: > audit_mode:DISABLED drop_job_id:0 create_query:"" create_as_of_time: > exec stmt rows affected: 0 statement ok @@ -224,7 +224,7 @@ WHERE message NOT LIKE '%Z/%' AND message NOT LIKE 'querying next range at%' AND tag NOT LIKE '%IndexBackfiller%' AND operation != 'dist sender send' ---- -flow Put /Table/3/1/54/2/1 -> table: columns: > nullable:false hidden:false > columns: > nullable:true hidden:false > next_column_id:3 families: next_family_id:1 primary_index: interleave:<> partitioning: type:FORWARD > next_index_id:3 privileges: users: > next_mutation_id:3 format_version:3 state:DROP offline_reason:"" draining_names: view_query:"" drop_time:... replacement_of: > audit_mode:DISABLED drop_job_id:... gc_mutations: create_query:"" create_as_of_time: > +flow Put /Table/3/1/54/2/1 -> table: columns: > nullable:false hidden:false > columns: > nullable:true hidden:false > next_column_id:3 families: next_family_id:1 primary_index: interleave:<> partitioning: type:FORWARD > next_index_id:3 privileges: users: > next_mutation_id:3 format_version:3 state:DROP offline_reason:"" draining_names: view_query:"" drop_time:... replacement_of: > audit_mode:DISABLED drop_job_id:... gc_mutations: create_query:"" create_as_of_time: > exec stmt rows affected: 0 # Check that session tracing does not inhibit the fast path for inserts & diff --git a/pkg/sql/plan.go b/pkg/sql/plan.go index 2dc1659ae0ee..8b12b4426280 100644 --- a/pkg/sql/plan.go +++ b/pkg/sql/plan.go @@ -14,6 +14,7 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/delegate" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" @@ -22,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" ) @@ -61,6 +63,24 @@ func (r *runParams) Ann() *tree.Annotations { return r.extendedEvalCtx.EvalContext.Annotations } +// createTimeForNewTableDescriptor consults the cluster version to determine +// whether the CommitTimestamp() needs to be observed when creating a new +// TableDescriptor. See TableDescriptor.ModificationTime. +// +// TODO(ajwerner): remove in 20.1. +func (r *runParams) creationTimeForNewTableDescriptor() hlc.Timestamp { + // Before 19.2 we needed to observe the transaction CommitTimestamp to ensure + // that CreateAsOfTime and ModificationTime reflected the timestamp at which the + // creating transaction committed. Starting in 19.2 we use a zero-valued + // CreateAsOfTime and ModificationTime when creating a table descriptor and then + // upon reading use the MVCC timestamp to populate the values. + var ts hlc.Timestamp + if !r.ExecCfg().Settings.Version.IsActive(cluster.VersionTableDescModificationTimeFromMVCC) { + ts = r.p.txn.CommitTimestamp() + } + return ts +} + // planNode defines the interface for executing a query or portion of a query. // // The following methods apply to planNodes and contain special cases diff --git a/pkg/sql/rename_test.go b/pkg/sql/rename_test.go index 56ab6d8dae38..d71e57064e2e 100644 --- a/pkg/sql/rename_test.go +++ b/pkg/sql/rename_test.go @@ -49,10 +49,11 @@ func TestRenameTable(t *testing.T) { // Check the table descriptor. desc := &sqlbase.Descriptor{} tableDescKey := sqlbase.MakeDescMetadataKey(sqlbase.ID(counter)) - if err := kvDB.GetProto(context.TODO(), tableDescKey, desc); err != nil { + ts, err := kvDB.GetProtoTs(context.TODO(), tableDescKey, desc) + if err != nil { t.Fatal(err) } - tableDesc := desc.GetTable() + tableDesc := desc.Table(ts) if tableDesc.Name != oldName { t.Fatalf("Wrong table name, expected %s, got: %+v", oldName, tableDesc) } @@ -74,10 +75,11 @@ func TestRenameTable(t *testing.T) { } // Check the table descriptor again. - if err := kvDB.GetProto(context.TODO(), tableDescKey, desc); err != nil { + ts, err = kvDB.GetProtoTs(context.TODO(), tableDescKey, desc) + if err != nil { t.Fatal(err) } - tableDesc = desc.GetTable() + tableDesc = desc.Table(ts) if tableDesc.Name != newName { t.Fatalf("Wrong table name, expected %s, got: %+v", newName, tableDesc) } @@ -103,7 +105,7 @@ func isRenamed( if err := val.GetProto(&descriptor); err != nil { panic("unable to unmarshal table descriptor") } - table := descriptor.GetTable() + table := descriptor.Table(val.Timestamp) return table.Name == expectedName && table.Version == expectedVersion } diff --git a/pkg/sql/resolver.go b/pkg/sql/resolver.go index 927b61632a23..f2d623b63dfe 100644 --- a/pkg/sql/resolver.go +++ b/pkg/sql/resolver.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/errors" ) @@ -611,7 +612,7 @@ func newInternalLookupCtxFromDescriptors( if prefix == nil || prefix.ID == database.ID { dbIDs = append(dbIDs, database.ID) } - } else if table := desc.GetTable(); table != nil { + } else if table := desc.Table(hlc.Timestamp{}); table != nil { tbDescs[table.ID] = table if prefix == nil || prefix.ID == table.ParentID { // Only make the table visible for iteration if the prefix was included. diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index 608f36834e2f..668d39177fab 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -3833,12 +3833,9 @@ func TestSchemaChangeRetryError(t *testing.T) { t.Fatal(err) } - // TODO(vivek): fix #17698. The transaction should get retried - // without returning this error to the user. - if err := tx.Commit(); !testutils.IsError(err, - `restart transaction: TransactionRetryWithProtoRefreshError: TransactionRetryError: retry txn \(RETRY_SERIALIZABLE\)`, - ) { - t.Fatalf("err = %+v", err) + // The transaction should get pushed and commit without an error. + if err := tx.Commit(); err != nil { + t.Fatal(err) } } diff --git a/pkg/sql/sqlbase/structured.go b/pkg/sql/sqlbase/structured.go index 138276618a8f..e80c5c7205d2 100644 --- a/pkg/sql/sqlbase/structured.go +++ b/pkg/sql/sqlbase/structured.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/interval" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -280,10 +281,10 @@ func NewImmutableTableDescriptor(tbl TableDescriptor) *ImmutableTableDescriptor // protoGetter is a sub-interface of client.Txn that can fetch protobufs in a // transaction. type protoGetter interface { - // GetProto retrieves a protoutil.Message that's stored at key, storing it + // GetProtoTs retrieves a protoutil.Message that's stored at key, storing it // into the input msg parameter. If the key doesn't exist, the input proto // will be reset. - GetProto(ctx context.Context, key interface{}, msg protoutil.Message) error + GetProtoTs(ctx context.Context, key interface{}, msg protoutil.Message) (hlc.Timestamp, error) } // GetDatabaseDescFromID retrieves the database descriptor for the database @@ -294,8 +295,8 @@ func GetDatabaseDescFromID( ) (*DatabaseDescriptor, error) { desc := &Descriptor{} descKey := MakeDescMetadataKey(id) - - if err := protoGetter.GetProto(ctx, descKey, desc); err != nil { + _, err := protoGetter.GetProtoTs(ctx, descKey, desc) + if err != nil { return nil, err } db := desc.GetDatabase() @@ -334,15 +335,14 @@ func getTableDescFromIDRaw( ) (*TableDescriptor, error) { desc := &Descriptor{} descKey := MakeDescMetadataKey(id) - - if err := protoGetter.GetProto(ctx, descKey, desc); err != nil { + ts, err := protoGetter.GetProtoTs(ctx, descKey, desc) + if err != nil { return nil, err } - table := desc.GetTable() + table := desc.Table(ts) if table == nil { return nil, ErrDescriptorNotFound } - return table, nil } @@ -773,8 +773,8 @@ type MapProtoGetter struct { Protos map[interface{}]protoutil.Message } -// GetProto implements the protoGetter interface. -func (m MapProtoGetter) GetProto( +// getProto implements the protoGetter interface. +func (m MapProtoGetter) getProto( ctx context.Context, key interface{}, msg protoutil.Message, ) error { msg.Reset() @@ -790,6 +790,13 @@ func (m MapProtoGetter) GetProto( return nil } +// GetProtoTs implements the protoGetter interface. +func (m MapProtoGetter) GetProtoTs( + ctx context.Context, key interface{}, msg protoutil.Message, +) (hlc.Timestamp, error) { + return hlc.Timestamp{}, m.getProto(ctx, key, msg) +} + // MaybeUpgradeForeignKeyRepresentation destructively modifies the input table // descriptor by replacing all old-style foreign key references (the ForeignKey // and ReferencedBy fields on IndexDescriptor) with new-style foreign key @@ -1486,27 +1493,25 @@ func (desc *MutableTableDescriptor) allocateColumnFamilyIDs(columnNames map[stri // MaybeIncrementVersion increments the version of a descriptor if necessary. func (desc *MutableTableDescriptor) MaybeIncrementVersion( - ctx context.Context, txn *client.Txn, + ctx context.Context, txn *client.Txn, settings *cluster.Settings, ) error { // Already incremented, no-op. if desc.Version == desc.ClusterVersion.Version+1 { return nil } desc.Version++ - // We need to set ModificationTime to the transaction's commit - // timestamp. Using CommitTimestamp() guarantees that the - // transaction will commit at the CommitTimestamp(). + + // Before 19.2 we needed to observe the transaction CommitTimestamp to ensure + // that ModificationTime reflected the timestamp at which the transaction + // committed. Starting in 19.2 we use a zero-valued ModificationTime when + // incrementing the version and then upon reading use the MVCC timestamp to + // populate the ModificationTime. // - // TODO(vivek): Stop needing to do this by deprecating the - // ModificationTime. A Descriptor modification time can be - // the mvcc timestamp of the descriptor. This requires moving the - // schema change lease out of the descriptor making the - // descriptor truly immutable at a version. - // Also recognize that the leases are released before the transaction - // is committed through a call to TableCollection.releaseLeases(), - // so updating this policy will also need to consider not doing - // that. - modTime := txn.CommitTimestamp() + // TODO(ajwerner): remove this check in 20.1. + var modTime hlc.Timestamp + if !settings.Version.IsActive(cluster.VersionTableDescModificationTimeFromMVCC) { + modTime = txn.CommitTimestamp() + } desc.ModificationTime = modTime log.Infof(ctx, "publish: descID=%d (%s) version=%d mtime=%s", desc.ID, desc.Name, desc.Version, modTime.GoTime()) @@ -3266,6 +3271,68 @@ func (desc *Descriptor) GetName() string { } } +// Table is a replacement for GetTable() which seeks to ensure that clients +// which unmarshal Descriptor structs properly set the ModificationTime on +// tables based on the MVCC timestamp at which the descriptor was read. +// +// A linter should ensure that GetTable() is not called. +func (desc *Descriptor) Table(ts hlc.Timestamp) *TableDescriptor { + t := desc.GetTable() + if t != nil { + t.maybeSetModificationTimeFromMVCCTimestamp(ts) + } + return t +} + +// maybeSetModificationTimeFromMVCCTimestamp will update ModificationTime +// with the provided timestamp. If desc.ModificationTime is non-zero it must +// be the case that it is not after the provided timestamp. +// +// When table descriptor versions are incremented they are written with a +// zero-valued ModificationTime. This is done to avoid the need to observe +// the commit timestamp for the writing transaction which would prevent +// pushes. This method is used in the read path to set the modification time +// based on the MVCC timestamp of row which contained this descriptor. If +// the ModificationTime is non-zero then we know that either this table +// descriptor was written by older version of cockroach which included the +// exact commit timestamp or it was re-written in which case it will include +// a timestamp which was set by this method. +// +// It is vital that users which read table descriptor values from the KV store +// call this method. +func (desc *TableDescriptor) maybeSetModificationTimeFromMVCCTimestamp(ts hlc.Timestamp) { + // Table descriptors can be updated in place after their version has been + // incremented (e.g. to include a schema change lease). + // When this happens we permit the ModificationTime to be written explicitly + // with the value that lives on the in-memory copy. That value should contain + // a timestamp set by this method. Thus if the ModificationTime is set it + // must not be after the MVCC timestamp we just read it at. + setIfShould := func(name string, cur *hlc.Timestamp, ts hlc.Timestamp) (set bool) { + if cur.IsEmpty() && ts.IsEmpty() { + log.Fatalf(context.TODO(), "read table descriptor without %s"+ + "with zero MVCC timestamp", name) + } + if cur.IsEmpty() { + *cur = ts + return true + } + if !ts.IsEmpty() && ts.Less(*cur) { + log.Fatalf(context.TODO(), "read table descriptor which has a %s "+ + "after its MVCC timestamp: has %v, expected %v", + name, cur, ts) + } + return false + } + setIfShould("ModificationTime", &desc.ModificationTime, ts) + if setIfShould("CreationTime", &desc.CreateAsOfTime, ts) && desc.Version != 1 { + // This means we never set the creation time and that we're updating a + // descriptor which wasn't read using this method. This is pretty bad but + // a fatal here seems extreme. + log.Errorf(context.TODO(), "set a creation time from MVCC timestamp for version %d", + desc.Version) + } +} + // IsSet returns whether or not the foreign key actually references a table. func (f ForeignKeyReference) IsSet() bool { return f.Table != 0 diff --git a/pkg/sql/sqlbase/structured.pb.go b/pkg/sql/sqlbase/structured.pb.go index 8aa8722f77a0..7ef56b9fdece 100644 --- a/pkg/sql/sqlbase/structured.pb.go +++ b/pkg/sql/sqlbase/structured.pb.go @@ -73,7 +73,7 @@ func (x *ConstraintValidity) UnmarshalJSON(data []byte) error { return nil } func (ConstraintValidity) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_structured_188f44329a8fdff9, []int{0} + return fileDescriptor_structured_b3d9f452828a3c03, []int{0} } type ForeignKeyReference_Action int32 @@ -118,7 +118,7 @@ func (x *ForeignKeyReference_Action) UnmarshalJSON(data []byte) error { return nil } func (ForeignKeyReference_Action) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_structured_188f44329a8fdff9, []int{0, 0} + return fileDescriptor_structured_b3d9f452828a3c03, []int{0, 0} } // Match is the algorithm used to compare composite keys. @@ -158,7 +158,7 @@ func (x *ForeignKeyReference_Match) UnmarshalJSON(data []byte) error { return nil } func (ForeignKeyReference_Match) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_structured_188f44329a8fdff9, []int{0, 1} + return fileDescriptor_structured_b3d9f452828a3c03, []int{0, 1} } // The direction of a column in the index. @@ -195,7 +195,7 @@ func (x *IndexDescriptor_Direction) UnmarshalJSON(data []byte) error { return nil } func (IndexDescriptor_Direction) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_structured_188f44329a8fdff9, []int{6, 0} + return fileDescriptor_structured_b3d9f452828a3c03, []int{6, 0} } // The type of the index. @@ -232,7 +232,7 @@ func (x *IndexDescriptor_Type) UnmarshalJSON(data []byte) error { return nil } func (IndexDescriptor_Type) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_structured_188f44329a8fdff9, []int{6, 1} + return fileDescriptor_structured_b3d9f452828a3c03, []int{6, 1} } type ConstraintToUpdate_ConstraintType int32 @@ -275,7 +275,7 @@ func (x *ConstraintToUpdate_ConstraintType) UnmarshalJSON(data []byte) error { return nil } func (ConstraintToUpdate_ConstraintType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_structured_188f44329a8fdff9, []int{7, 0} + return fileDescriptor_structured_b3d9f452828a3c03, []int{7, 0} } // A descriptor within a mutation is unavailable for reads, writes @@ -340,7 +340,7 @@ func (x *DescriptorMutation_State) UnmarshalJSON(data []byte) error { return nil } func (DescriptorMutation_State) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_structured_188f44329a8fdff9, []int{8, 0} + return fileDescriptor_structured_b3d9f452828a3c03, []int{8, 0} } // Direction of mutation. @@ -383,7 +383,7 @@ func (x *DescriptorMutation_Direction) UnmarshalJSON(data []byte) error { return nil } func (DescriptorMutation_Direction) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_structured_188f44329a8fdff9, []int{8, 1} + return fileDescriptor_structured_b3d9f452828a3c03, []int{8, 1} } // State is set if this TableDescriptor is in the process of being added or deleted. @@ -434,7 +434,7 @@ func (x *TableDescriptor_State) UnmarshalJSON(data []byte) error { return nil } func (TableDescriptor_State) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_structured_188f44329a8fdff9, []int{9, 0} + return fileDescriptor_structured_b3d9f452828a3c03, []int{9, 0} } // AuditMode indicates which auditing actions to take when this table is used. @@ -471,7 +471,7 @@ func (x *TableDescriptor_AuditMode) UnmarshalJSON(data []byte) error { return nil } func (TableDescriptor_AuditMode) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_structured_188f44329a8fdff9, []int{9, 1} + return fileDescriptor_structured_b3d9f452828a3c03, []int{9, 1} } type ForeignKeyReference struct { @@ -493,7 +493,7 @@ func (m *ForeignKeyReference) Reset() { *m = ForeignKeyReference{} } func (m *ForeignKeyReference) String() string { return proto.CompactTextString(m) } func (*ForeignKeyReference) ProtoMessage() {} func (*ForeignKeyReference) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_188f44329a8fdff9, []int{0} + return fileDescriptor_structured_b3d9f452828a3c03, []int{0} } func (m *ForeignKeyReference) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -561,7 +561,7 @@ func (m *ForeignKeyConstraint) Reset() { *m = ForeignKeyConstraint{} } func (m *ForeignKeyConstraint) String() string { return proto.CompactTextString(m) } func (*ForeignKeyConstraint) ProtoMessage() {} func (*ForeignKeyConstraint) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_188f44329a8fdff9, []int{1} + return fileDescriptor_structured_b3d9f452828a3c03, []int{1} } func (m *ForeignKeyConstraint) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -606,7 +606,7 @@ func (m *ColumnDescriptor) Reset() { *m = ColumnDescriptor{} } func (m *ColumnDescriptor) String() string { return proto.CompactTextString(m) } func (*ColumnDescriptor) ProtoMessage() {} func (*ColumnDescriptor) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_188f44329a8fdff9, []int{2} + return fileDescriptor_structured_b3d9f452828a3c03, []int{2} } func (m *ColumnDescriptor) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -657,7 +657,7 @@ func (m *ColumnFamilyDescriptor) Reset() { *m = ColumnFamilyDescriptor{} func (m *ColumnFamilyDescriptor) String() string { return proto.CompactTextString(m) } func (*ColumnFamilyDescriptor) ProtoMessage() {} func (*ColumnFamilyDescriptor) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_188f44329a8fdff9, []int{3} + return fileDescriptor_structured_b3d9f452828a3c03, []int{3} } func (m *ColumnFamilyDescriptor) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -703,7 +703,7 @@ func (m *InterleaveDescriptor) Reset() { *m = InterleaveDescriptor{} } func (m *InterleaveDescriptor) String() string { return proto.CompactTextString(m) } func (*InterleaveDescriptor) ProtoMessage() {} func (*InterleaveDescriptor) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_188f44329a8fdff9, []int{4} + return fileDescriptor_structured_b3d9f452828a3c03, []int{4} } func (m *InterleaveDescriptor) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -747,7 +747,7 @@ func (m *InterleaveDescriptor_Ancestor) Reset() { *m = InterleaveDescrip func (m *InterleaveDescriptor_Ancestor) String() string { return proto.CompactTextString(m) } func (*InterleaveDescriptor_Ancestor) ProtoMessage() {} func (*InterleaveDescriptor_Ancestor) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_188f44329a8fdff9, []int{4, 0} + return fileDescriptor_structured_b3d9f452828a3c03, []int{4, 0} } func (m *InterleaveDescriptor_Ancestor) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -792,7 +792,7 @@ func (m *PartitioningDescriptor) Reset() { *m = PartitioningDescriptor{} func (m *PartitioningDescriptor) String() string { return proto.CompactTextString(m) } func (*PartitioningDescriptor) ProtoMessage() {} func (*PartitioningDescriptor) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_188f44329a8fdff9, []int{5} + return fileDescriptor_structured_b3d9f452828a3c03, []int{5} } func (m *PartitioningDescriptor) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -835,7 +835,7 @@ func (m *PartitioningDescriptor_List) Reset() { *m = PartitioningDescrip func (m *PartitioningDescriptor_List) String() string { return proto.CompactTextString(m) } func (*PartitioningDescriptor_List) ProtoMessage() {} func (*PartitioningDescriptor_List) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_188f44329a8fdff9, []int{5, 0} + return fileDescriptor_structured_b3d9f452828a3c03, []int{5, 0} } func (m *PartitioningDescriptor_List) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -880,7 +880,7 @@ func (m *PartitioningDescriptor_Range) Reset() { *m = PartitioningDescri func (m *PartitioningDescriptor_Range) String() string { return proto.CompactTextString(m) } func (*PartitioningDescriptor_Range) ProtoMessage() {} func (*PartitioningDescriptor_Range) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_188f44329a8fdff9, []int{5, 1} + return fileDescriptor_structured_b3d9f452828a3c03, []int{5, 1} } func (m *PartitioningDescriptor_Range) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1013,7 +1013,7 @@ func (m *IndexDescriptor) Reset() { *m = IndexDescriptor{} } func (m *IndexDescriptor) String() string { return proto.CompactTextString(m) } func (*IndexDescriptor) ProtoMessage() {} func (*IndexDescriptor) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_188f44329a8fdff9, []int{6} + return fileDescriptor_structured_b3d9f452828a3c03, []int{6} } func (m *IndexDescriptor) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1064,7 +1064,7 @@ func (m *ConstraintToUpdate) Reset() { *m = ConstraintToUpdate{} } func (m *ConstraintToUpdate) String() string { return proto.CompactTextString(m) } func (*ConstraintToUpdate) ProtoMessage() {} func (*ConstraintToUpdate) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_188f44329a8fdff9, []int{7} + return fileDescriptor_structured_b3d9f452828a3c03, []int{7} } func (m *ConstraintToUpdate) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1118,7 +1118,7 @@ func (m *DescriptorMutation) Reset() { *m = DescriptorMutation{} } func (m *DescriptorMutation) String() string { return proto.CompactTextString(m) } func (*DescriptorMutation) ProtoMessage() {} func (*DescriptorMutation) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_188f44329a8fdff9, []int{8} + return fileDescriptor_structured_b3d9f452828a3c03, []int{8} } func (m *DescriptorMutation) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1313,6 +1313,14 @@ type TableDescriptor struct { // particular version increment. Version DescriptorVersion `protobuf:"varint,5,opt,name=version,casttype=DescriptorVersion" json:"version"` // Last modification time of the table descriptor. + // Starting in 19.2 this field's value may sometime be zero-valued in which + // case the MVCC timestamp of the row containing the value should be used to + // populate it. This dance allows us to avoid observing the commit timestamp + // for transactions which increment the descriptor version. + // Encoded TableDescriptor structs should not be stored directly but rather + // should live inside of a Descriptor. The Descriptor.Table() method takes an + // hlc timestamp to ensure that this field is set properly when extracted from + // a Descriptor. ModificationTime hlc.Timestamp `protobuf:"bytes,7,opt,name=modification_time,json=modificationTime" json:"modification_time"` Columns []ColumnDescriptor `protobuf:"bytes,8,rep,name=columns" json:"columns"` // next_column_id is used to ensure that deleted column ids are not reused. @@ -1395,9 +1403,12 @@ type TableDescriptor struct { // // TODO(vivekmenezes): This is currently only used by the non-interleaved drop // index case. Also use for dropped interleaved indexes and columns. - GCMutations []TableDescriptor_GCDescriptorMutation `protobuf:"bytes,33,rep,name=gc_mutations,json=gcMutations" json:"gc_mutations"` - CreateQuery string `protobuf:"bytes,34,opt,name=create_query,json=createQuery" json:"create_query"` - CreateAsOfTime hlc.Timestamp `protobuf:"bytes,35,opt,name=create_as_of_time,json=createAsOfTime" json:"create_as_of_time"` + GCMutations []TableDescriptor_GCDescriptorMutation `protobuf:"bytes,33,rep,name=gc_mutations,json=gcMutations" json:"gc_mutations"` + CreateQuery string `protobuf:"bytes,34,opt,name=create_query,json=createQuery" json:"create_query"` + // Starting in 19.2 CreateAsOfTime is initialized to zero for the first + // version of a table and is populated from the MVCC timestamp of the read + // like ModificationTime. See Descriptor.Table(). + CreateAsOfTime hlc.Timestamp `protobuf:"bytes,35,opt,name=create_as_of_time,json=createAsOfTime" json:"create_as_of_time"` // outbound_fks contains all foreign key constraints that have this table as // the origin table. OutboundFKs []ForeignKeyConstraint `protobuf:"bytes,36,rep,name=outbound_fks,json=outboundFks" json:"outbound_fks"` @@ -1410,7 +1421,7 @@ func (m *TableDescriptor) Reset() { *m = TableDescriptor{} } func (m *TableDescriptor) String() string { return proto.CompactTextString(m) } func (*TableDescriptor) ProtoMessage() {} func (*TableDescriptor) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_188f44329a8fdff9, []int{9} + return fileDescriptor_structured_b3d9f452828a3c03, []int{9} } func (m *TableDescriptor) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1695,7 +1706,7 @@ func (m *TableDescriptor_SchemaChangeLease) Reset() { *m = TableDescript func (m *TableDescriptor_SchemaChangeLease) String() string { return proto.CompactTextString(m) } func (*TableDescriptor_SchemaChangeLease) ProtoMessage() {} func (*TableDescriptor_SchemaChangeLease) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_188f44329a8fdff9, []int{9, 0} + return fileDescriptor_structured_b3d9f452828a3c03, []int{9, 0} } func (m *TableDescriptor_SchemaChangeLease) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1733,7 +1744,7 @@ func (m *TableDescriptor_CheckConstraint) Reset() { *m = TableDescriptor func (m *TableDescriptor_CheckConstraint) String() string { return proto.CompactTextString(m) } func (*TableDescriptor_CheckConstraint) ProtoMessage() {} func (*TableDescriptor_CheckConstraint) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_188f44329a8fdff9, []int{9, 1} + return fileDescriptor_structured_b3d9f452828a3c03, []int{9, 1} } func (m *TableDescriptor_CheckConstraint) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1836,7 +1847,7 @@ func (m *TableDescriptor_NameInfo) Reset() { *m = TableDescriptor_NameIn func (m *TableDescriptor_NameInfo) String() string { return proto.CompactTextString(m) } func (*TableDescriptor_NameInfo) ProtoMessage() {} func (*TableDescriptor_NameInfo) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_188f44329a8fdff9, []int{9, 2} + return fileDescriptor_structured_b3d9f452828a3c03, []int{9, 2} } func (m *TableDescriptor_NameInfo) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1876,7 +1887,7 @@ func (m *TableDescriptor_Reference) Reset() { *m = TableDescriptor_Refer func (m *TableDescriptor_Reference) String() string { return proto.CompactTextString(m) } func (*TableDescriptor_Reference) ProtoMessage() {} func (*TableDescriptor_Reference) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_188f44329a8fdff9, []int{9, 3} + return fileDescriptor_structured_b3d9f452828a3c03, []int{9, 3} } func (m *TableDescriptor_Reference) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1913,7 +1924,7 @@ func (m *TableDescriptor_MutationJob) Reset() { *m = TableDescriptor_Mut func (m *TableDescriptor_MutationJob) String() string { return proto.CompactTextString(m) } func (*TableDescriptor_MutationJob) ProtoMessage() {} func (*TableDescriptor_MutationJob) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_188f44329a8fdff9, []int{9, 4} + return fileDescriptor_structured_b3d9f452828a3c03, []int{9, 4} } func (m *TableDescriptor_MutationJob) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1955,7 +1966,7 @@ func (m *TableDescriptor_SequenceOpts) Reset() { *m = TableDescriptor_Se func (m *TableDescriptor_SequenceOpts) String() string { return proto.CompactTextString(m) } func (*TableDescriptor_SequenceOpts) ProtoMessage() {} func (*TableDescriptor_SequenceOpts) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_188f44329a8fdff9, []int{9, 5} + return fileDescriptor_structured_b3d9f452828a3c03, []int{9, 5} } func (m *TableDescriptor_SequenceOpts) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1989,7 +2000,7 @@ func (m *TableDescriptor_Replacement) Reset() { *m = TableDescriptor_Rep func (m *TableDescriptor_Replacement) String() string { return proto.CompactTextString(m) } func (*TableDescriptor_Replacement) ProtoMessage() {} func (*TableDescriptor_Replacement) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_188f44329a8fdff9, []int{9, 6} + return fileDescriptor_structured_b3d9f452828a3c03, []int{9, 6} } func (m *TableDescriptor_Replacement) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -2026,7 +2037,7 @@ func (m *TableDescriptor_GCDescriptorMutation) Reset() { *m = TableDescr func (m *TableDescriptor_GCDescriptorMutation) String() string { return proto.CompactTextString(m) } func (*TableDescriptor_GCDescriptorMutation) ProtoMessage() {} func (*TableDescriptor_GCDescriptorMutation) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_188f44329a8fdff9, []int{9, 7} + return fileDescriptor_structured_b3d9f452828a3c03, []int{9, 7} } func (m *TableDescriptor_GCDescriptorMutation) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -2065,7 +2076,7 @@ func (m *DatabaseDescriptor) Reset() { *m = DatabaseDescriptor{} } func (m *DatabaseDescriptor) String() string { return proto.CompactTextString(m) } func (*DatabaseDescriptor) ProtoMessage() {} func (*DatabaseDescriptor) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_188f44329a8fdff9, []int{10} + return fileDescriptor_structured_b3d9f452828a3c03, []int{10} } func (m *DatabaseDescriptor) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -2123,7 +2134,7 @@ func (m *Descriptor) Reset() { *m = Descriptor{} } func (m *Descriptor) String() string { return proto.CompactTextString(m) } func (*Descriptor) ProtoMessage() {} func (*Descriptor) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_188f44329a8fdff9, []int{11} + return fileDescriptor_structured_b3d9f452828a3c03, []int{11} } func (m *Descriptor) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -10723,10 +10734,10 @@ var ( ) func init() { - proto.RegisterFile("sql/sqlbase/structured.proto", fileDescriptor_structured_188f44329a8fdff9) + proto.RegisterFile("sql/sqlbase/structured.proto", fileDescriptor_structured_b3d9f452828a3c03) } -var fileDescriptor_structured_188f44329a8fdff9 = []byte{ +var fileDescriptor_structured_b3d9f452828a3c03 = []byte{ // 3345 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x5a, 0xcd, 0x6f, 0x23, 0xc7, 0x95, 0x57, 0xf3, 0x9b, 0x8f, 0x5f, 0xad, 0xd2, 0xcc, 0x98, 0x23, 0x8f, 0x45, 0x0e, 0xc7, 0x63, diff --git a/pkg/sql/sqlbase/structured.proto b/pkg/sql/sqlbase/structured.proto index 27696824b8d0..b61b71830329 100644 --- a/pkg/sql/sqlbase/structured.proto +++ b/pkg/sql/sqlbase/structured.proto @@ -521,6 +521,14 @@ message TableDescriptor { reserved 6; // Last modification time of the table descriptor. + // Starting in 19.2 this field's value may sometime be zero-valued in which + // case the MVCC timestamp of the row containing the value should be used to + // populate it. This dance allows us to avoid observing the commit timestamp + // for transactions which increment the descriptor version. + // Encoded TableDescriptor structs should not be stored directly but rather + // should live inside of a Descriptor. The Descriptor.Table() method takes an + // hlc timestamp to ensure that this field is set properly when extracted from + // a Descriptor. optional util.hlc.Timestamp modification_time = 7 [(gogoproto.nullable) = false]; repeated ColumnDescriptor columns = 8 [(gogoproto.nullable) = false]; // next_column_id is used to ensure that deleted column ids are not reused. @@ -811,6 +819,10 @@ message TableDescriptor { (gogoproto.customname) = "GCMutations"]; optional string create_query = 34 [(gogoproto.nullable) = false]; + + // Starting in 19.2 CreateAsOfTime is initialized to zero for the first + // version of a table and is populated from the MVCC timestamp of the read + // like ModificationTime. See Descriptor.Table(). optional util.hlc.Timestamp create_as_of_time = 35 [(gogoproto.nullable) = false]; // outbound_fks contains all foreign key constraints that have this table as diff --git a/pkg/sql/sqlbase/structured_test.go b/pkg/sql/sqlbase/structured_test.go index 17adb07754d5..6ab8a0f35666 100644 --- a/pkg/sql/sqlbase/structured_test.go +++ b/pkg/sql/sqlbase/structured_test.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/gogo/protobuf/proto" @@ -1356,7 +1357,8 @@ func TestUpgradeDowngradeFKRepr(t *testing.T) { cluster.BinaryMinimumSupportedVersion, cluster.VersionByKey(cluster.VersionTopLevelForeignKeys), ) - + // Use a non-zero ts for CreateAsOfTime and ModificationTime + ts := hlc.Timestamp{WallTime: 1} testCases := []struct { name string origin oldFormatUpgradedPair @@ -1366,8 +1368,10 @@ func TestUpgradeDowngradeFKRepr(t *testing.T) { name: "simple", origin: oldFormatUpgradedPair{ oldFormat: TableDescriptor{ - ID: 1, - Columns: []ColumnDescriptor{{ID: 1}, {ID: 2}}, + ID: 1, + CreateAsOfTime: ts, + ModificationTime: ts, + Columns: []ColumnDescriptor{{ID: 1}, {ID: 2}}, Indexes: []IndexDescriptor{ { ID: 1, @@ -1386,8 +1390,10 @@ func TestUpgradeDowngradeFKRepr(t *testing.T) { }, }, expectedUpgraded: TableDescriptor{ - ID: 1, - Columns: []ColumnDescriptor{{ID: 1}, {ID: 2}}, + ID: 1, + CreateAsOfTime: ts, + ModificationTime: ts, + Columns: []ColumnDescriptor{{ID: 1}, {ID: 2}}, Indexes: []IndexDescriptor{ { ID: 1, @@ -1411,8 +1417,10 @@ func TestUpgradeDowngradeFKRepr(t *testing.T) { }, referenced: oldFormatUpgradedPair{ oldFormat: TableDescriptor{ - ID: 2, - Columns: []ColumnDescriptor{{ID: 2}}, + ID: 2, + CreateAsOfTime: ts, + ModificationTime: ts, + Columns: []ColumnDescriptor{{ID: 2}}, Indexes: []IndexDescriptor{ { ColumnIDs: ColumnIDs{2}, @@ -1427,8 +1435,10 @@ func TestUpgradeDowngradeFKRepr(t *testing.T) { }, }, expectedUpgraded: TableDescriptor{ - ID: 2, - Columns: []ColumnDescriptor{{ID: 2}}, + ID: 2, + CreateAsOfTime: ts, + ModificationTime: ts, + Columns: []ColumnDescriptor{{ID: 2}}, Indexes: []IndexDescriptor{ { ColumnIDs: ColumnIDs{2}, @@ -1455,8 +1465,10 @@ func TestUpgradeDowngradeFKRepr(t *testing.T) { name: "primaryKey", origin: oldFormatUpgradedPair{ oldFormat: TableDescriptor{ - ID: 1, - Columns: []ColumnDescriptor{{ID: 1}, {ID: 2}}, + ID: 1, + CreateAsOfTime: ts, + ModificationTime: ts, + Columns: []ColumnDescriptor{{ID: 1}, {ID: 2}}, PrimaryIndex: IndexDescriptor{ ID: 1, ColumnIDs: ColumnIDs{1}, @@ -1473,8 +1485,10 @@ func TestUpgradeDowngradeFKRepr(t *testing.T) { }, }, expectedUpgraded: TableDescriptor{ - ID: 1, - Columns: []ColumnDescriptor{{ID: 1}, {ID: 2}}, + ID: 1, + CreateAsOfTime: ts, + ModificationTime: ts, + Columns: []ColumnDescriptor{{ID: 1}, {ID: 2}}, PrimaryIndex: IndexDescriptor{ ID: 1, ColumnIDs: ColumnIDs{1}, @@ -1496,8 +1510,10 @@ func TestUpgradeDowngradeFKRepr(t *testing.T) { }, referenced: oldFormatUpgradedPair{ oldFormat: TableDescriptor{ - ID: 2, - Columns: []ColumnDescriptor{{ID: 2}}, + ID: 2, + CreateAsOfTime: ts, + ModificationTime: ts, + Columns: []ColumnDescriptor{{ID: 2}}, PrimaryIndex: IndexDescriptor{ ColumnIDs: ColumnIDs{2}, ID: 2, @@ -1510,8 +1526,10 @@ func TestUpgradeDowngradeFKRepr(t *testing.T) { }, }, expectedUpgraded: TableDescriptor{ - ID: 2, - Columns: []ColumnDescriptor{{ID: 2}}, + ID: 2, + CreateAsOfTime: ts, + ModificationTime: ts, + Columns: []ColumnDescriptor{{ID: 2}}, PrimaryIndex: IndexDescriptor{ ColumnIDs: ColumnIDs{2}, ID: 2, @@ -1536,8 +1554,10 @@ func TestUpgradeDowngradeFKRepr(t *testing.T) { name: "self-reference-cycle", origin: oldFormatUpgradedPair{ oldFormat: TableDescriptor{ - ID: 1, - Columns: []ColumnDescriptor{{ID: 1}, {ID: 2}}, + ID: 1, + CreateAsOfTime: ts, + ModificationTime: ts, + Columns: []ColumnDescriptor{{ID: 1}, {ID: 2}}, Indexes: []IndexDescriptor{ { ID: 1, @@ -1582,8 +1602,10 @@ func TestUpgradeDowngradeFKRepr(t *testing.T) { }, }, expectedUpgraded: TableDescriptor{ - ID: 1, - Columns: []ColumnDescriptor{{ID: 1}, {ID: 2}}, + ID: 1, + CreateAsOfTime: ts, + ModificationTime: ts, + Columns: []ColumnDescriptor{{ID: 1}, {ID: 2}}, Indexes: []IndexDescriptor{ { ID: 1, @@ -1656,8 +1678,10 @@ func TestUpgradeDowngradeFKRepr(t *testing.T) { origin: oldFormatUpgradedPair{ oldFormatWasAlreadyUpgraded: true, oldFormat: TableDescriptor{ - ID: 1, - Columns: []ColumnDescriptor{{ID: 1}, {ID: 2}}, + ID: 1, + CreateAsOfTime: ts, + ModificationTime: ts, + Columns: []ColumnDescriptor{{ID: 1}, {ID: 2}}, Indexes: []IndexDescriptor{ { ID: 1, @@ -1687,8 +1711,10 @@ func TestUpgradeDowngradeFKRepr(t *testing.T) { // Our referenced table is *not* upgraded. referenced: oldFormatUpgradedPair{ oldFormat: TableDescriptor{ - ID: 2, - Columns: []ColumnDescriptor{{ID: 2}}, + ID: 2, + CreateAsOfTime: ts, + ModificationTime: ts, + Columns: []ColumnDescriptor{{ID: 2}}, PrimaryIndex: IndexDescriptor{ ColumnIDs: ColumnIDs{2}, ID: 2, @@ -1701,8 +1727,10 @@ func TestUpgradeDowngradeFKRepr(t *testing.T) { }, }, expectedUpgraded: TableDescriptor{ - ID: 2, - Columns: []ColumnDescriptor{{ID: 2}}, + ID: 2, + CreateAsOfTime: ts, + ModificationTime: ts, + Columns: []ColumnDescriptor{{ID: 2}}, PrimaryIndex: IndexDescriptor{ ColumnIDs: ColumnIDs{2}, ID: 2, @@ -1729,8 +1757,10 @@ func TestUpgradeDowngradeFKRepr(t *testing.T) { // Origin table has *not* been upgraded. origin: oldFormatUpgradedPair{ oldFormat: TableDescriptor{ - ID: 1, - Columns: []ColumnDescriptor{{ID: 1}, {ID: 2}}, + ID: 1, + CreateAsOfTime: ts, + ModificationTime: ts, + Columns: []ColumnDescriptor{{ID: 1}, {ID: 2}}, Indexes: []IndexDescriptor{ { ID: 1, @@ -1749,8 +1779,10 @@ func TestUpgradeDowngradeFKRepr(t *testing.T) { }, }, expectedUpgraded: TableDescriptor{ - ID: 1, - Columns: []ColumnDescriptor{{ID: 1}, {ID: 2}}, + ID: 1, + CreateAsOfTime: ts, + ModificationTime: ts, + Columns: []ColumnDescriptor{{ID: 1}, {ID: 2}}, Indexes: []IndexDescriptor{ { ID: 1, @@ -1776,8 +1808,10 @@ func TestUpgradeDowngradeFKRepr(t *testing.T) { referenced: oldFormatUpgradedPair{ oldFormatWasAlreadyUpgraded: true, oldFormat: TableDescriptor{ - ID: 2, - Columns: []ColumnDescriptor{{ID: 2}}, + ID: 2, + CreateAsOfTime: ts, + ModificationTime: ts, + Columns: []ColumnDescriptor{{ID: 2}}, PrimaryIndex: IndexDescriptor{ ColumnIDs: ColumnIDs{2}, ID: 2, @@ -1815,9 +1849,16 @@ func TestUpgradeDowngradeFKRepr(t *testing.T) { tc.origin.oldFormat.Privileges = NewDefaultPrivilegeDescriptor() tc.referenced.expectedUpgraded.Privileges = NewDefaultPrivilegeDescriptor() tc.referenced.oldFormat.Privileges = NewDefaultPrivilegeDescriptor() + // Make sure that the table descriptors have initialized timestamps. + // They always will when being used in a schema change. + toDesc := func(tableDesc *TableDescriptor) *Descriptor { + desc := WrapDescriptor(tableDesc) + desc.Table(hlc.Timestamp{WallTime: 1}) + return desc + } txn := MapProtoGetter{Protos: map[interface{}]protoutil.Message{ - string(MakeDescMetadataKey(tc.origin.oldFormat.ID)): WrapDescriptor(&tc.origin.oldFormat), - string(MakeDescMetadataKey(tc.referenced.oldFormat.ID)): WrapDescriptor(&tc.referenced.oldFormat), + string(MakeDescMetadataKey(tc.origin.oldFormat.ID)): toDesc(&tc.origin.oldFormat), + string(MakeDescMetadataKey(tc.referenced.oldFormat.ID)): toDesc(&tc.referenced.oldFormat), }} tables := []oldFormatUpgradedPair{tc.origin, tc.referenced} diff --git a/pkg/sql/sqlbase/system.go b/pkg/sql/sqlbase/system.go index c765b2d58a9d..b0e4b3c70877 100644 --- a/pkg/sql/sqlbase/system.go +++ b/pkg/sql/sqlbase/system.go @@ -44,7 +44,7 @@ func SplitAtIDHook(id uint32, cfg *config.SystemConfig) bool { if dbDesc := desc.GetDatabase(); dbDesc != nil { return false } - if tableDesc := desc.GetTable(); tableDesc != nil { + if tableDesc := desc.Table(descVal.Timestamp); tableDesc != nil { if viewStr := tableDesc.GetViewQuery(); viewStr != "" { return false } diff --git a/pkg/sql/sqlbase/table.go b/pkg/sql/sqlbase/table.go index df8d376c28a2..2400464bb97a 100644 --- a/pkg/sql/sqlbase/table.go +++ b/pkg/sql/sqlbase/table.go @@ -481,6 +481,7 @@ func ConditionalGetTableDescFromTxn( return nil, errors.Wrapf(err, "decoding current table descriptor value for id: %d", expectation.ID) } + existing.Table(existingKV.Value.Timestamp) } wrapped := WrapDescriptor(expectation) if !existing.Equal(wrapped) { diff --git a/pkg/sql/sqlbase/testutils.go b/pkg/sql/sqlbase/testutils.go index 7af7e2f4db94..be68b37914ae 100644 --- a/pkg/sql/sqlbase/testutils.go +++ b/pkg/sql/sqlbase/testutils.go @@ -66,10 +66,11 @@ func GetTableDescriptor(kvDB *client.DB, database string, table string) *TableDe descKey := MakeDescMetadataKey(ID(gr.ValueInt())) desc := &Descriptor{} - if err := kvDB.GetProto(ctx, descKey, desc); err != nil || (*desc == Descriptor{}) { + ts, err := kvDB.GetProtoTs(ctx, descKey, desc) + if err != nil || (*desc == Descriptor{}) { log.Fatalf(ctx, "proto with id %d missing. err: %v", gr.ValueInt(), err) } - tableDesc := desc.GetTable() + tableDesc := desc.Table(ts) if tableDesc == nil { return nil } diff --git a/pkg/sql/table.go b/pkg/sql/table.go index 2363ee269aef..745f370ff546 100644 --- a/pkg/sql/table.go +++ b/pkg/sql/table.go @@ -13,6 +13,7 @@ package sql import ( "context" "fmt" + "sort" "strings" "github.com/cockroachdb/cockroach/pkg/internal/client" @@ -385,6 +386,36 @@ func (tc *TableCollection) getMutableTableVersionByID( return sqlbase.GetMutableTableDescFromID(ctx, txn, tableID) } +// releaseTableLeases releases the leases for the tables with ids in +// the passed slice. Errors are logged but ignored. +func (tc *TableCollection) releaseTableLeases(ctx context.Context, tables []IDVersion) { + // Sort the tables and leases to make it easy to find the leases to release. + leasedTables := tc.leasedTables + sort.Slice(tables, func(i, j int) bool { + return tables[i].id < tables[j].id + }) + sort.Slice(leasedTables, func(i, j int) bool { + return leasedTables[i].ID < leasedTables[j].ID + }) + + filteredLeases := leasedTables[:0] // will store the remaining leases + tablesToConsider := tables + shouldRelease := func(id sqlbase.ID) (found bool) { + for len(tablesToConsider) > 0 && tablesToConsider[0].id < id { + tablesToConsider = tablesToConsider[1:] + } + return len(tablesToConsider) > 0 && tablesToConsider[0].id == id + } + for _, l := range leasedTables { + if !shouldRelease(l.ID) { + filteredLeases = append(filteredLeases, l) + } else if err := tc.leaseMgr.Release(l); err != nil { + log.Warning(ctx, err) + } + } + tc.leasedTables = filteredLeases +} + func (tc *TableCollection) releaseLeases(ctx context.Context) { if len(tc.leasedTables) > 0 { log.VEventf(ctx, 2, "releasing %d tables", len(tc.leasedTables)) @@ -844,7 +875,7 @@ func (p *planner) writeTableDescToBatch( } } else { // Only increment the table descriptor version once in this transaction. - if err := tableDesc.MaybeIncrementVersion(ctx, p.txn); err != nil { + if err := tableDesc.MaybeIncrementVersion(ctx, p.txn, p.execCfg.Settings); err != nil { return err } diff --git a/pkg/sql/zone_config.go b/pkg/sql/zone_config.go index 439c18745a2b..4d9972af2bb3 100644 --- a/pkg/sql/zone_config.go +++ b/pkg/sql/zone_config.go @@ -78,7 +78,7 @@ func getZoneConfig( if err := descVal.GetProto(&desc); err != nil { return 0, nil, 0, nil, err } - if tableDesc := desc.GetTable(); tableDesc != nil { + if tableDesc := desc.Table(descVal.Timestamp); tableDesc != nil { // This is a table descriptor. Look up its parent database zone config. dbID, zone, _, _, err := getZoneConfig(uint32(tableDesc.ParentID), getKey, false /* getInheritedDefault */) if err != nil { @@ -122,7 +122,7 @@ func completeZoneConfig( if err := descVal.GetProto(&desc); err != nil { return err } - if tableDesc := desc.GetTable(); tableDesc != nil { + if tableDesc := desc.Table(descVal.Timestamp); tableDesc != nil { _, dbzone, _, _, err := getZoneConfig(uint32(tableDesc.ParentID), getKey, false /* getInheritedDefault */) if err != nil { return err diff --git a/pkg/storage/reports/reporter.go b/pkg/storage/reports/reporter.go index c1bb36347915..36630b3cfbf3 100644 --- a/pkg/storage/reports/reporter.go +++ b/pkg/storage/reports/reporter.go @@ -346,7 +346,7 @@ func visitAncestors( if err := descVal.GetProto(&desc); err != nil { return false, err } - tableDesc := desc.GetTable() + tableDesc := desc.Table(descVal.Timestamp) // If it's a database, the parent is the default zone. if tableDesc == nil { return visitDefaultZone(ctx, cfg, visitor), nil diff --git a/pkg/storage/reports/reporter_test.go b/pkg/storage/reports/reporter_test.go index 4825f4977889..0d0ff8a5b8a2 100644 --- a/pkg/storage/reports/reporter_test.go +++ b/pkg/storage/reports/reporter_test.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/keysutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" @@ -508,7 +509,10 @@ func processTestCase( // And we're going to use keys in user space, otherwise there's special cases // in the zone config lookup that we bump into. objectCounter := keys.MinUserDescID - var sysCfgBuilder systemConfigBuilder + sysCfgBuilder := systemConfigBuilder{ + // Use a non-zero timestamp for descriptor creation/modification time. + ts: hlc.Timestamp{WallTime: 1}, + } sysCfgBuilder.setDefaultZoneConfig(tc.defaultZone.toZoneConfig()) objects["default"] = MakeZoneKey(keys.RootNamespaceID, NoSubzone) // Assign ids to databases, table, indexes; create descriptors and populate @@ -726,6 +730,9 @@ func addIndexSubzones( type systemConfigBuilder struct { kv []roachpb.KeyValue defaultZoneConfig *config.ZoneConfig + + // ts is used for the creation time of synthesized descriptors + ts hlc.Timestamp } func (b *systemConfigBuilder) setDefaultZoneConfig(cfg config.ZoneConfig) { @@ -758,35 +765,36 @@ func (b *systemConfigBuilder) build() *config.SystemConfig { } // addTableDesc adds a table descriptor to the SystemConfig. -func (b *systemConfigBuilder) addTableDesc(id int, desc sqlbase.TableDescriptor) { - if desc.ParentID == 0 { - panic(fmt.Sprintf("parent not set for table %q", desc.Name)) +func (b *systemConfigBuilder) addTableDesc(id int, tableDesc sqlbase.TableDescriptor) { + if tableDesc.ParentID == 0 { + panic(fmt.Sprintf("parent not set for table %q", tableDesc.Name)) } // Write the table to the SystemConfig, in the descriptors table. k := sqlbase.MakeDescMetadataKey(sqlbase.ID(id)) - dbDesc := &sqlbase.Descriptor{ + desc := &sqlbase.Descriptor{ Union: &sqlbase.Descriptor_Table{ - Table: &desc, + Table: &tableDesc, }, } + desc.Table(b.ts) var v roachpb.Value - if err := v.SetProto(dbDesc); err != nil { + if err := v.SetProto(desc); err != nil { panic(err) } b.kv = append(b.kv, roachpb.KeyValue{Key: k, Value: v}) } // addTableDesc adds a database descriptor to the SystemConfig. -func (b *systemConfigBuilder) addDBDesc(id int, desc sqlbase.DatabaseDescriptor) { +func (b *systemConfigBuilder) addDBDesc(id int, dbDesc sqlbase.DatabaseDescriptor) { // Write the table to the SystemConfig, in the descriptors table. k := sqlbase.MakeDescMetadataKey(sqlbase.ID(id)) - dbDesc := &sqlbase.Descriptor{ + desc := &sqlbase.Descriptor{ Union: &sqlbase.Descriptor_Database{ - Database: &desc, + Database: &dbDesc, }, } var v roachpb.Value - if err := v.SetProto(dbDesc); err != nil { + if err := v.SetProto(desc); err != nil { panic(err) } b.kv = append(b.kv, roachpb.KeyValue{Key: k, Value: v})