Skip to content

Commit

Permalink
sql,sqlbase,client: bootstrap TableDescriptor timestamps from MVCC
Browse files Browse the repository at this point in the history
Using the MVCC timestamp of the value for table descriptors has long been
theorized as the right mechanism to eliminate the need for transactions
which update a table descriptor version to observe their commit timestamp
(see cockroachdb#17698 (comment)).

The challenge was presumed to be the need to expose MVCC timestamps in our
client library. It turns out we seem to do that already (how did nobody know
that?!).

This commit avoids using the CommitTimestamp by using the MVCC timestamp on
the read path. In order to make this setting of the timestamp less of a footgun
we add a `(*Descriptor).Table(hlc.Timestamp)` method which forces anybody who
extracts a `TableDescriptor` from a `Descriptor` to pass in a timestamp which
may be used to set `ModificationTime` and `CreateAsOfTime`. A linter in the
following commit enforces this proper usage.

The below SQL would always fail before this change and now passes:

```
CREATE TABLE foo ( k INT PRIMARY KEY );
BEGIN;
DROP TABLE foo;
<wait a while>
COMMIT;
```

Similarly the TestImportData seems to pass under stressrace with a 5s
`kv.closed_timestamp.target_duration`.

Release justification: fixes a release blocker and known customer issue.

References cockroachdb#37083.

Release note: None
  • Loading branch information
ajwerner committed Sep 13, 2019
1 parent f264c33 commit 5cffa36
Show file tree
Hide file tree
Showing 44 changed files with 508 additions and 230 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen in the /debug page</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>19.1-9</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>19.1-10</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
26 changes: 15 additions & 11 deletions pkg/ccl/backupccl/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func getRelevantDescChanges(
// obviously interesting to our backup.
for _, i := range descs {
interestingIDs[i.GetID()] = struct{}{}
if t := i.GetTable(); t != nil {
if t := i.Table(hlc.Timestamp{}); t != nil {
for j := t.ReplacementOf.ID; j != sqlbase.InvalidID; j = priorIDs[j] {
interestingIDs[j] = struct{}{}
}
Expand All @@ -200,7 +200,7 @@ func getRelevantDescChanges(
return nil, err
}
for _, i := range starting {
if table := i.GetTable(); table != nil {
if table := i.Table(hlc.Timestamp{}); table != nil {
// We need to add to interestingIDs so that if we later see a delete for
// this ID we still know it is interesting to us, even though we will not
// have a parentID at that point (since the delete is a nil desc).
Expand Down Expand Up @@ -231,7 +231,7 @@ func getRelevantDescChanges(
if _, ok := interestingIDs[change.ID]; ok {
interestingChanges = append(interestingChanges, change)
} else if change.Desc != nil {
if table := change.Desc.GetTable(); table != nil {
if table := change.Desc.Table(hlc.Timestamp{}); table != nil {
if _, ok := interestingParents[table.ParentID]; ok {
interestingIDs[table.ID] = struct{}{}
interestingChanges = append(interestingChanges, change)
Expand Down Expand Up @@ -279,7 +279,8 @@ func getAllDescChanges(
return nil, err
}
r.Desc = &desc
if t := desc.GetTable(); t != nil && t.ReplacementOf.ID != sqlbase.InvalidID {
t := desc.Table(rev.Timestamp)
if t != nil && t.ReplacementOf.ID != sqlbase.InvalidID {
priorIDs[t.ID] = t.ReplacementOf.ID
}
}
Expand All @@ -303,6 +304,9 @@ func allSQLDescriptors(ctx context.Context, txn *client.Txn) ([]sqlbase.Descript
return nil, errors.NewAssertionErrorWithWrappedErrf(err,
"%s: unable to unmarshal SQL descriptor", row.Key)
}
if row.Value != nil {
sqlDescs[i].Table(row.Value.Timestamp)
}
}
return sqlDescs, nil
}
Expand Down Expand Up @@ -379,7 +383,7 @@ func spansForAllTableIndexes(
// in them that we didn't already get above e.g. indexes or tables that are
// not in latest because they were dropped during the time window in question.
for _, rev := range revs {
if tbl := rev.Desc.GetTable(); tbl != nil {
if tbl := rev.Desc.Table(hlc.Timestamp{}); tbl != nil {
for _, idx := range tbl.AllNonDropIndexes() {
key := tableAndIndex{tableID: tbl.ID, indexID: idx.ID}
if !added[key] {
Expand Down Expand Up @@ -1016,7 +1020,7 @@ func backupPlanHook(
return err
}
}
if tableDesc := desc.GetTable(); tableDesc != nil {
if tableDesc := desc.Table(hlc.Timestamp{}); tableDesc != nil {
if err := p.CheckPrivilege(ctx, tableDesc, privilege.SELECT); err != nil {
return err
}
Expand Down Expand Up @@ -1083,7 +1087,7 @@ func backupPlanHook(
tablesInPrev := make(map[sqlbase.ID]struct{})
dbsInPrev := make(map[sqlbase.ID]struct{})
for _, d := range prevBackups[len(prevBackups)-1].Descriptors {
if t := d.GetTable(); t != nil {
if t := d.Table(hlc.Timestamp{}); t != nil {
tablesInPrev[t.ID] = struct{}{}
}
}
Expand All @@ -1092,7 +1096,7 @@ func backupPlanHook(
}

for _, d := range targetDescs {
if t := d.GetTable(); t != nil {
if t := d.Table(hlc.Timestamp{}); t != nil {
// If we're trying to use a previous backup for this table, ideally it
// actually contains this table.
if _, ok := tablesInPrev[t.ID]; ok {
Expand Down Expand Up @@ -1504,7 +1508,7 @@ func maybeDowngradeTableDescsInBackupDescriptor(
// Copy Descriptors so we can return a shallow copy without mutating the slice.
copy(backupDescCopy.Descriptors, backupDesc.Descriptors)
for i := range backupDesc.Descriptors {
if tableDesc := backupDesc.Descriptors[i].GetTable(); tableDesc != nil {
if tableDesc := backupDesc.Descriptors[i].Table(hlc.Timestamp{}); tableDesc != nil {
downgraded, newDesc, err := tableDesc.MaybeDowngradeForeignKeyRepresentation(ctx, settings)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1534,7 +1538,7 @@ func maybeUpgradeTableDescsInBackupDescriptors(
// descriptors so that they can be looked up.
for _, backupDesc := range backupDescs {
for _, desc := range backupDesc.Descriptors {
if table := desc.GetTable(); table != nil {
if table := desc.Table(hlc.Timestamp{}); table != nil {
protoGetter.Protos[string(sqlbase.MakeDescMetadataKey(table.ID))] =
sqlbase.WrapDescriptor(protoutil.Clone(table).(*sqlbase.TableDescriptor))
}
Expand All @@ -1544,7 +1548,7 @@ func maybeUpgradeTableDescsInBackupDescriptors(
for i := range backupDescs {
backupDesc := &backupDescs[i]
for j := range backupDesc.Descriptors {
if table := backupDesc.Descriptors[j].GetTable(); table != nil {
if table := backupDesc.Descriptors[j].Table(hlc.Timestamp{}); table != nil {
if _, err := table.MaybeUpgradeForeignKeyRepresentation(ctx, protoGetter, skipFKsWithNoMatchingTable); err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/backupccl/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func loadSQLDescsFromBackupsAtTime(

allDescs := make([]sqlbase.Descriptor, 0, len(byID))
for _, desc := range byID {
if t := desc.GetTable(); t != nil {
if t := desc.Table(hlc.Timestamp{}); t != nil {
// A table revisions may have been captured before it was in a DB that is
// backed up -- if the DB is missing, filter the table.
if byID[t.ParentID] == nil {
Expand Down Expand Up @@ -212,7 +212,7 @@ func selectTargets(

seenTable := false
for _, desc := range matched.descs {
if desc.GetTable() != nil {
if desc.Table(hlc.Timestamp{}) != nil {
seenTable = true
break
}
Expand Down Expand Up @@ -1534,7 +1534,7 @@ func doRestorePlan(
for _, desc := range sqlDescs {
if dbDesc := desc.GetDatabase(); dbDesc != nil {
databasesByID[dbDesc.ID] = dbDesc
} else if tableDesc := desc.GetTable(); tableDesc != nil {
} else if tableDesc := desc.Table(hlc.Timestamp{}); tableDesc != nil {
tablesByID[tableDesc.ID] = tableDesc
}
}
Expand Down Expand Up @@ -1686,7 +1686,7 @@ func createImportingTables(
var tables []*sqlbase.TableDescriptor
var oldTableIDs []sqlbase.ID
for _, desc := range sqlDescs {
if tableDesc := desc.GetTable(); tableDesc != nil {
if tableDesc := desc.Table(hlc.Timestamp{}); tableDesc != nil {
tables = append(tables, tableDesc)
oldTableIDs = append(oldTableIDs, tableDesc.ID)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/backupccl/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)
Expand Down Expand Up @@ -145,7 +146,7 @@ func backupShowerDefault(ctx context.Context, showSchemas bool) backupShower {
var rows []tree.Datums
var row tree.Datums
for _, descriptor := range desc.Descriptors {
if table := descriptor.GetTable(); table != nil {
if table := descriptor.Table(hlc.Timestamp{}); table != nil {
dbName := descs[table.ParentID]
row = tree.Datums{
tree.NewDString(dbName),
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/backupccl/targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -121,7 +122,7 @@ func newDescriptorResolver(descs []sqlbase.Descriptor) (*descriptorResolver, err
}
// Now on to the tables.
for _, desc := range descs {
if tbDesc := desc.GetTable(); tbDesc != nil {
if tbDesc := desc.Table(hlc.Timestamp{}); tbDesc != nil {
if tbDesc.Dropped() {
continue
}
Expand Down Expand Up @@ -214,7 +215,7 @@ func descriptorsMatchingTargets(
desc := descI.(sqlbase.Descriptor)

// If the parent database is not requested already, request it now
parentID := desc.GetTable().GetParentID()
parentID := desc.Table(hlc.Timestamp{}).GetParentID()
if _, ok := alreadyRequestedDBs[parentID]; !ok {
parentDesc := resolver.descByID[parentID]
ret.descs = append(ret.descs, parentDesc)
Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/backupccl/targets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
)

Expand All @@ -34,6 +35,10 @@ func TestDescriptorsMatchingTargets(t *testing.T) {
*sqlbase.WrapDescriptor(&sqlbase.DatabaseDescriptor{ID: 3, Name: "data"}),
*sqlbase.WrapDescriptor(&sqlbase.DatabaseDescriptor{ID: 5, Name: "empty"}),
}
// Set the timestamp on the table descriptors.
for _, d := range descriptors {
d.Table(hlc.Timestamp{WallTime: 1})
}

tests := []struct {
sessionDatabase string
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func changefeedPlanHook(
}
targets := make(jobspb.ChangefeedTargets, len(targetDescs))
for _, desc := range targetDescs {
if tableDesc := desc.GetTable(); tableDesc != nil {
if tableDesc := desc.Table(hlc.Timestamp{}); tableDesc != nil {
targets[tableDesc.ID] = jobspb.ChangefeedTarget{
StatementTimeName: tableDesc.Name,
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/changefeedccl/table_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ func fetchTableDescriptorVersions(
} else if !ok {
return nil
}
remaining, _, _, err := sqlbase.DecodeTableIDIndexID(it.UnsafeKey().Key)
k := it.UnsafeKey()
remaining, _, _, err := sqlbase.DecodeTableIDIndexID(k.Key)
if err != nil {
return err
}
Expand All @@ -282,7 +283,7 @@ func fetchTableDescriptorVersions(
if err := value.GetProto(&desc); err != nil {
return err
}
if tableDesc := desc.GetTable(); tableDesc != nil {
if tableDesc := desc.Table(k.Timestamp); tableDesc != nil {
tableDescs = append(tableDescs, tableDesc)
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/cliccl/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/cli"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/pkg/errors"
Expand Down Expand Up @@ -96,7 +97,7 @@ func runLoadShow(cmd *cobra.Command, args []string) error {
// in case more fields need to be added to the output.
fmt.Printf("Descriptors:\n")
for _, d := range desc.Descriptors {
if desc := d.GetTable(); desc != nil {
if desc := d.Table(hlc.Timestamp{}); desc != nil {
fmt.Printf(" %d: %s (table)\n", d.GetID(), d.GetName())
}
if desc := d.GetDatabase(); desc != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/storageccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/workload/bank"
)
Expand Down Expand Up @@ -173,7 +174,7 @@ func BenchmarkImport(b *testing.B) {
{
// TODO(dan): The following should probably make it into
// dataccl.Backup somehow.
tableDesc := backup.Desc.Descriptors[len(backup.Desc.Descriptors)-1].GetTable()
tableDesc := backup.Desc.Descriptors[len(backup.Desc.Descriptors)-1].Table(hlc.Timestamp{})
if tableDesc == nil || tableDesc.ParentID == keys.SystemDatabaseID {
b.Fatalf("bad table descriptor: %+v", tableDesc)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/storageccl/key_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -63,7 +64,7 @@ func MakeKeyRewriterFromRekeys(rekeys []roachpb.ImportRequest_TableRekey) (*KeyR
if err := protoutil.Unmarshal(rekey.NewDesc, &desc); err != nil {
return nil, errors.Wrapf(err, "unmarshalling rekey descriptor for old table id %d", rekey.OldID)
}
table := desc.GetTable()
table := desc.Table(hlc.Timestamp{})
if table == nil {
return nil, errors.New("expected a table descriptor")
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/ccl/storageccl/key_rewriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
)
Expand Down Expand Up @@ -140,8 +141,11 @@ func TestKeyRewriter(t *testing.T) {
})
}

func mustMarshalDesc(t *testing.T, desc *sqlbase.TableDescriptor) []byte {
bytes, err := protoutil.Marshal(sqlbase.WrapDescriptor(desc))
func mustMarshalDesc(t *testing.T, tableDesc *sqlbase.TableDescriptor) []byte {
desc := sqlbase.WrapDescriptor(tableDesc)
// Set the timestamp to a non-zero value.
desc.Table(hlc.Timestamp{WallTime: 1})
bytes, err := protoutil.Marshal(desc)
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/utilccl/sampledataccl/bankdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (b *Backup) NextKeyValues(
) ([]engine.MVCCKeyValue, roachpb.Span, error) {
var userTables []*sqlbase.TableDescriptor
for _, d := range b.Desc.Descriptors {
if t := d.GetTable(); t != nil && t.ParentID != keys.SystemDatabaseID {
if t := d.Table(hlc.Timestamp{}); t != nil && t.ParentID != keys.SystemDatabaseID {
userTables = append(userTables, t)
}
}
Expand Down
30 changes: 26 additions & 4 deletions pkg/internal/client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,15 @@ import (
)

// KeyValue represents a single key/value pair. This is similar to
// roachpb.KeyValue except that the value may be nil.
// roachpb.KeyValue except that the value may be nil. The timestamp
// in the value will be populated with the MVCC timestamp at which this
// value was read if this struct was produced by a GetRequest or
// ScanRequest which uses the KEY_VALUES ScanFormat. Values created from
// a ScanRequest which uses the BATCH_RESPONSE ScanFormat will contain a
// zero Timestamp.
type KeyValue struct {
Key roachpb.Key
Value *roachpb.Value // Timestamp will always be zero
Value *roachpb.Value
}

func (kv *KeyValue) String() string {
Expand Down Expand Up @@ -319,11 +324,28 @@ func (db *DB) Get(ctx context.Context, key interface{}) (KeyValue, error) {
//
// key can be either a byte slice or a string.
func (db *DB) GetProto(ctx context.Context, key interface{}, msg protoutil.Message) error {
_, err := db.GetProtoTs(ctx, key, msg)
return err
}

// GetProtoTs retrieves the value for a key and decodes the result as a proto
// message. It additionally returns the timestamp at which the key was read.
// If the key doesn't exist, the proto will simply be reset and a zero timestamp
// will be returned. A zero timestamp will also be returned if unmarshaling
// fails.
//
// key can be either a byte slice or a string.
func (db *DB) GetProtoTs(
ctx context.Context, key interface{}, msg protoutil.Message,
) (hlc.Timestamp, error) {
r, err := db.Get(ctx, key)
if err != nil {
return err
return hlc.Timestamp{}, err
}
if err := r.ValueProto(msg); err != nil || r.Value == nil {
return hlc.Timestamp{}, err
}
return r.ValueProto(msg)
return r.Value.Timestamp, nil
}

// Put sets the value for a key.
Expand Down
Loading

0 comments on commit 5cffa36

Please sign in to comment.