Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
44489: sql: bugfixes around writing the old primary key in pk changes r=rohany a=rohany

This PR fixes two bugs:
* The logic for when to rewrite the old primary key was broken
  resulting in the old primary key not being rewritten in many cases.
* The old primary key being created was not properly dealing with
  its dangling interleave information.

This PR makes the design decision to not re-interleave the copy of the old primary index if it was interleaved.

Release note: None

44551: jobs: always set start time of a job r=spaskob a=spaskob

When starting a job via CreateAndStartJob if
making the job started fails the job will stil be
in system.jobs and can be adopted by another
node later but the started time will be 0 in this
case. We add a check and set it if necessary.

Release note: none.

44553: engine: redefine targetSize and add maxSize to ExportToSst r=itsbilal a=ajwerner

This PR is a follow-up of work from #44440 motivated by problems unearthed while typing #44482. The PR comes in two commits:

1) Re-define `targetSize` from being a target below which most requests would remain to being the size above which the export stops.
2) Add a `maxSize` parameter above which the `ExportToSst` call will fail.

See the individual commits for more details. 

Co-authored-by: Rohan Yadav <[email protected]>
Co-authored-by: Spas Bojanov <[email protected]>
Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
4 people committed Jan 30, 2020
4 parents 6bf3a7b + fc51339 + bd90489 + 8429a42 commit dfe05bb
Show file tree
Hide file tree
Showing 15 changed files with 153 additions and 63 deletions.
12 changes: 8 additions & 4 deletions c-deps/libroach/db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1075,7 +1075,8 @@ DBStatus DBUnlockFile(DBFileLock lock) {
return ToDBStatus(rocksdb::Env::Default()->UnlockFile((rocksdb::FileLock*)lock));
}

DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, uint64_t target_size,
DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions,
uint64_t target_size, uint64_t max_size,
DBIterOptions iter_opts, DBEngine* engine, DBString* data,
DBString* write_intent, DBString* summary, DBString* resume) {
DBSstFileWriter* writer = DBSstFileWriterNew();
Expand Down Expand Up @@ -1136,9 +1137,8 @@ DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, uint64
// Check to see if this is the first version of key and adding it would
// put us over the limit (we might already be over the limit).
const int64_t cur_size = bulkop_summary.data_size();
const int64_t new_size = cur_size + decoded_key.size() + iter.value().size();
const bool is_over_target = cur_size > 0 && new_size > target_size;
if (paginated && is_new_key && is_over_target) {
const bool reached_target_size = cur_size > 0 && cur_size >= target_size;
if (paginated && is_new_key && reached_target_size) {
resume_key.reserve(decoded_key.size());
resume_key.assign(decoded_key.data(), decoded_key.size());
break;
Expand All @@ -1154,6 +1154,10 @@ DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, uint64
if (!row_counter.Count((iter.key()), &bulkop_summary)) {
return ToDBString("Error in row counter");
}
const int64_t new_size = cur_size + decoded_key.size() + iter.value().size();
if (max_size > 0 && new_size > max_size) {
return FmtStatus("export size (%ld bytes) exceeds max size (%ld bytes)", new_size, max_size);
}
bulkop_summary.set_data_size(new_size);
}
*summary = ToDBString(bulkop_summary.SerializeAsString());
Expand Down
19 changes: 11 additions & 8 deletions c-deps/libroach/include/libroach.h
Original file line number Diff line number Diff line change
Expand Up @@ -555,15 +555,18 @@ DBStatus DBUnlockFile(DBFileLock lock);

// DBExportToSst exports changes over the keyrange and time interval between the
// start and end DBKeys to an SSTable using an IncrementalIterator.
//
// If target_size is positive, it indicates that the export should produce SSTs
// which are roughly target size. Specifically, it will produce SSTs which contain
// all relevant versions of a key and will not add the first version of a new
// key if it would lead to the SST exceeding the target_size. If export_all_revisions
// is false, the returned SST will be smaller than target_size so long as the first
// kv pair is smaller than target_size. If export_all_revisions is true then
// target_size may be exceeded. If the SST construction stops due to the target_size,
// then resume will be set to the value of the resume key.
DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, uint64_t target_size,
// which are roughly target size. Specifically, it will return an SST such that
// the last key is responsible for exceeding the targetSize. If the resume_key
// is non-NULL then the returns sst will exceed the targetSize.
//
// If max_size is positive, it is an absolute maximum on byte size for the
// returned sst. If it is the case that the versions of the last key will lead
// to an SST that exceeds maxSize, an error will be returned. This parameter
// exists to prevent creating SSTs which are too large to be used.
DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions,
uint64_t target_size, uint64_t max_size,
DBIterOptions iter_opts, DBEngine* engine, DBString* data,
DBString* write_intent, DBString* summary, DBString* resume);

Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/storageccl/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,9 @@ func evalExport(
// target size for files and then paginate the actual export. The external
// API may need to be modified to deal with the case where ReturnSST is true.
const targetSize = 0 // unlimited
data, summary, resume, err := e.ExportToSst(args.Key, args.EndKey, args.StartTime, h.Timestamp, exportAllRevisions, targetSize, io)
const maxSize = 0 // unlimited
data, summary, resume, err := e.ExportToSst(args.Key, args.EndKey,
args.StartTime, h.Timestamp, exportAllRevisions, targetSize, maxSize, io)
if resume != nil {
return result.Result{}, crdberrors.AssertionFailedf("expected nil resume key with unlimited target size")
}
Expand Down
40 changes: 39 additions & 1 deletion pkg/ccl/storageccl/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,9 +333,47 @@ func assertEqualKVs(
var kvs []engine.MVCCKeyValue
for start := startKey; start != nil; {
var sst []byte
sst, _, start, err = e.ExportToSst(start, endKey, startTime, endTime, exportAllRevisions, targetSize, io)
var summary roachpb.BulkOpSummary
maxSize := uint64(0)
prevStart := start
sst, summary, start, err = e.ExportToSst(start, endKey, startTime, endTime,
exportAllRevisions, targetSize, maxSize, io)
require.NoError(t, err)
loaded := loadSST(t, sst, startKey, endKey)
// Ensure that the pagination worked properly.
if start != nil {
dataSize := uint64(summary.DataSize)
require.Truef(t, targetSize <= dataSize, "%d > %d",
targetSize, summary.DataSize)
// Now we want to ensure that if we remove the bytes due to the last
// key that we are below the target size.
firstKVofLastKey := sort.Search(len(loaded), func(i int) bool {
return loaded[i].Key.Key.Equal(loaded[len(loaded)-1].Key.Key)
})
dataSizeWithoutLastKey := dataSize
for _, kv := range loaded[firstKVofLastKey:] {
dataSizeWithoutLastKey -= uint64(len(kv.Key.Key) + len(kv.Value))
}
require.Truef(t, targetSize > dataSizeWithoutLastKey, "%d <= %d", targetSize, dataSizeWithoutLastKey)
// Ensure that maxSize leads to an error if exceeded.
// Note that this uses a relatively non-sensical value of maxSize which
// is equal to the targetSize.
maxSize = targetSize
dataSizeWhenExceeded := dataSize
for i := len(loaded) - 1; i >= 0; i-- {
kv := loaded[i]
lessThisKey := dataSizeWhenExceeded - uint64(len(kv.Key.Key)+len(kv.Value))
if lessThisKey >= maxSize {
dataSizeWhenExceeded = lessThisKey
} else {
break
}
}
_, _, _, err = e.ExportToSst(prevStart, endKey, startTime, endTime,
exportAllRevisions, targetSize, maxSize, io)
require.Regexp(t, fmt.Sprintf("export size \\(%d bytes\\) exceeds max size \\(%d bytes\\)",
dataSizeWhenExceeded, maxSize), err)
}
kvs = append(kvs, loaded...)
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,9 @@ func (j *Job) adopt(ctx context.Context, oldLease *jobspb.Lease) error {
md.Payload.Lease, oldLease)
}
md.Payload.Lease = j.registry.newLease()
if md.Payload.StartedMicros == 0 {
md.Payload.StartedMicros = timeutil.ToUnixMicros(j.registry.clock.Now().GoTime())
}
ju.UpdatePayload(md.Payload)
// Jobs in states running or pending are adopted as running.
newStatus := StatusRunning
Expand Down
14 changes: 6 additions & 8 deletions pkg/sql/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,17 +397,15 @@ func (n *alterTableNode) startExec(params runParams) error {

// TODO (rohany): gate this behind a flag so it doesn't happen all the time.
// Create a new index that indexes everything the old primary index does, but doesn't store anything.
// TODO (rohany): is there an easier way of checking if the existing primary index was the
// automatically created one?
if len(n.tableDesc.PrimaryIndex.ColumnNames) == 1 && n.tableDesc.PrimaryIndex.ColumnNames[0] != "rowid" {
if !n.tableDesc.IsPrimaryIndexDefaultRowID() {
oldPrimaryIndexCopy := protoutil.Clone(&n.tableDesc.PrimaryIndex).(*sqlbase.IndexDescriptor)
name := generateUniqueConstraintName(
"old_primary_key",
nameExists,
)
oldPrimaryIndexCopy.Name = name
// Clear the name of the index so that it gets generated by AllocateIDs.
oldPrimaryIndexCopy.Name = ""
oldPrimaryIndexCopy.StoreColumnIDs = nil
oldPrimaryIndexCopy.StoreColumnNames = nil
// Make the copy of the old primary index not-interleaved. This decision
// can be revisited based on user experience.
oldPrimaryIndexCopy.Interleave = sqlbase.InterleaveDescriptor{}
if err := addIndexMutationWithSpecificPrimaryKey(n.tableDesc, oldPrimaryIndexCopy, newPrimaryIndexDesc); err != nil {
return err
}
Expand Down
21 changes: 20 additions & 1 deletion pkg/sql/logictest/testdata/logic_test/alter_primary_key
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ child CREATE TABLE child (
y INT8 NOT NULL,
z INT8 NOT NULL,
CONSTRAINT "primary" PRIMARY KEY (x ASC, y ASC, z ASC),
UNIQUE INDEX old_primary_key (x ASC),
UNIQUE INDEX child_x_key (x ASC),
FAMILY fam_0_x_y_z (x, y, z)
) INTERLEAVE IN PARENT parent (x, y)

Expand Down Expand Up @@ -151,6 +151,7 @@ child CREATE TABLE child (
y INT8 NOT NULL,
z INT8 NOT NULL,
CONSTRAINT "primary" PRIMARY KEY (y ASC, z ASC),
UNIQUE INDEX child_x_y_z_key (x ASC, y ASC, z ASC),
FAMILY fam_0_x_y_z (x, y, z)
)

Expand Down Expand Up @@ -194,6 +195,7 @@ child CREATE TABLE child (
z INT8 NOT NULL,
w INT8 NULL,
CONSTRAINT "primary" PRIMARY KEY (x ASC, y ASC, z ASC),
UNIQUE INDEX child_x_y_key (x ASC, y ASC),
INDEX i (x ASC, w ASC) INTERLEAVE IN PARENT parent (x),
FAMILY fam_0_x_y_z_w (x, y, z, w)
) INTERLEAVE IN PARENT parent (x)
Expand Down Expand Up @@ -281,3 +283,20 @@ INSERT INTO t1 VALUES (100, 100, 100, 100)

statement error insert on table "t4" violates foreign key constraint "fk3"
INSERT INTO t4 VALUES (101)

# Ensure that we still rewrite a primary index if the index column has name "rowid".
statement ok
DROP TABLE IF EXISTS t;
CREATE TABLE t (rowid INT PRIMARY KEY, y INT NOT NULL, FAMILY (rowid, y));
ALTER TABLE t ALTER PRIMARY KEY USING COLUMNS (y)

query TT
SHOW CREATE t
----
t CREATE TABLE t (
rowid INT8 NOT NULL,
y INT8 NOT NULL,
CONSTRAINT "primary" PRIMARY KEY (y ASC),
UNIQUE INDEX t_rowid_key (rowid ASC),
FAMILY fam_0_rowid_y (rowid, y)
)
2 changes: 1 addition & 1 deletion pkg/sql/schema_changer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2322,7 +2322,7 @@ INSERT INTO t.test VALUES (1, 1, 1), (2, 2, 2), (3, 3, 3);
y INT8 NOT NULL,
z INT8 NULL,
CONSTRAINT "primary" PRIMARY KEY (y ASC),
UNIQUE INDEX old_primary_key (x ASC),
UNIQUE INDEX test_x_key (x ASC),
INDEX i (z ASC),
FAMILY "primary" (x, y, z)
)`
Expand Down
14 changes: 14 additions & 0 deletions pkg/sql/sqlbase/structured.go
Original file line number Diff line number Diff line change
Expand Up @@ -2808,6 +2808,20 @@ func (desc *TableDescriptor) IsInterleaved() bool {
return false
}

// IsPrimaryIndexDefaultRowID returns whether or not the table's primary
// index is the default primary key on the hidden rowid column.
func (desc *TableDescriptor) IsPrimaryIndexDefaultRowID() bool {
if len(desc.PrimaryIndex.ColumnIDs) != 1 {
return false
}
col, err := desc.FindColumnByID(desc.PrimaryIndex.ColumnIDs[0])
if err != nil {
// Should never be in this case.
panic(err)
}
return col.Hidden && col.Name == "rowid"
}

// MakeMutationComplete updates the descriptor upon completion of a mutation.
// There are three Validity types for the mutations:
// Validated - The constraint has already been added and validated, should
Expand Down
21 changes: 12 additions & 9 deletions pkg/storage/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,18 +200,21 @@ type Reader interface {
// within the interval is exported. Deletions are included if all revisions are
// requested or if the start.Timestamp is non-zero. Returns the bytes of an
// SSTable containing the exported keys, the size of exported data, or an error.
//
// If targetSize is positive, it indicates that the export should produce SSTs
// which are roughly target size. Specifically, it will produce SSTs which contain
// all relevant versions of a key and will not add the first version of a new
// key if it would lead to the SST exceeding the targetSize. If exportAllRevisions
// is false, the returned SST will be smaller than target_size so long as the first
// kv pair is smaller than targetSize. If exportAllRevisions is true then
// targetSize may be exceeded by as much as the size of all of the versions of
// the last key. If the SST construction stops due to the targetSize,
// then a non-nil resumeKey will be returned.
// which are roughly target size. Specifically, it will return an SST such that
// the last key is responsible for meeting or exceeding the targetSize. If the
// resumeKey is non-nil then the data size of the returned sst will be greater
// than or equal to the targetSize.
//
// If maxSize is positive, it is an absolute maximum on byte size for the
// returned sst. If it is the case that the versions of the last key will lead
// to an SST that exceeds maxSize, an error will be returned. This parameter
// exists to prevent creating SSTs which are too large to be used.
ExportToSst(
startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp,
exportAllRevisions bool, targetSize uint64, io IterOptions,
exportAllRevisions bool, targetSize uint64, maxSize uint64,
io IterOptions,
) (sst []byte, _ roachpb.BulkOpSummary, resumeKey roachpb.Key, _ error)
// Get returns the value for the given key, nil otherwise.
//
Expand Down
24 changes: 14 additions & 10 deletions pkg/storage/engine/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,10 +492,10 @@ func (p *Pebble) ExportToSst(
startKey, endKey roachpb.Key,
startTS, endTS hlc.Timestamp,
exportAllRevisions bool,
targetSize uint64,
targetSize, maxSize uint64,
io IterOptions,
) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) {
return pebbleExportToSst(p, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, io)
return pebbleExportToSst(p, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, maxSize, io)
}

// Get implements the Engine interface.
Expand Down Expand Up @@ -939,10 +939,10 @@ func (p *pebbleReadOnly) ExportToSst(
startKey, endKey roachpb.Key,
startTS, endTS hlc.Timestamp,
exportAllRevisions bool,
targetSize uint64,
targetSize, maxSize uint64,
io IterOptions,
) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) {
return pebbleExportToSst(p, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, io)
return pebbleExportToSst(p, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, maxSize, io)
}

func (p *pebbleReadOnly) Get(key MVCCKey) ([]byte, error) {
Expand Down Expand Up @@ -1061,10 +1061,10 @@ func (p *pebbleSnapshot) ExportToSst(
startKey, endKey roachpb.Key,
startTS, endTS hlc.Timestamp,
exportAllRevisions bool,
targetSize uint64,
targetSize, maxSize uint64,
io IterOptions,
) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) {
return pebbleExportToSst(p, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, io)
return pebbleExportToSst(p, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, maxSize, io)
}

// Get implements the Reader interface.
Expand Down Expand Up @@ -1116,7 +1116,7 @@ func pebbleExportToSst(
startKey, endKey roachpb.Key,
startTS, endTS hlc.Timestamp,
exportAllRevisions bool,
targetSize uint64,
targetSize, maxSize uint64,
io IterOptions,
) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) {
sstFile := &MemFile{}
Expand Down Expand Up @@ -1163,16 +1163,20 @@ func pebbleExportToSst(
return nil, roachpb.BulkOpSummary{}, nil, errors.Wrapf(err, "decoding %s", unsafeKey)
}
curSize := rows.BulkOpSummary.DataSize
newSize := curSize + int64(len(unsafeKey.Key)+len(unsafeValue))
isOverTarget := paginated && curSize > 0 && uint64(newSize) > targetSize
if isNewKey && isOverTarget {
reachedTargetSize := curSize > 0 && uint64(curSize) >= targetSize
if paginated && isNewKey && reachedTargetSize {
// Allocate the right size for resumeKey rather than using curKey.
resumeKey = append(make(roachpb.Key, 0, len(unsafeKey.Key)), unsafeKey.Key...)
break
}
if err := sstWriter.Put(unsafeKey, unsafeValue); err != nil {
return nil, roachpb.BulkOpSummary{}, nil, errors.Wrapf(err, "adding key %s", unsafeKey)
}
newSize := curSize + int64(len(unsafeKey.Key)+len(unsafeValue))
if maxSize > 0 && newSize > int64(maxSize) {
return nil, roachpb.BulkOpSummary{}, nil,
errors.Errorf("export size (%d bytes) exceeds max size (%d bytes)", newSize, maxSize)
}
rows.BulkOpSummary.DataSize = newSize
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/engine/pebble_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (p *pebbleBatch) ExportToSst(
startKey, endKey roachpb.Key,
startTS, endTS hlc.Timestamp,
exportAllRevisions bool,
targetSize uint64,
targetSize, maxSize uint64,
io IterOptions,
) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) {
panic("unimplemented")
Expand Down
18 changes: 10 additions & 8 deletions pkg/storage/engine/rocksdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,7 @@ func (r *RocksDB) ExportToSst(
startKey, endKey roachpb.Key,
startTS, endTS hlc.Timestamp,
exportAllRevisions bool,
targetSize uint64,
targetSize, maxSize uint64,
io IterOptions,
) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) {
start := MVCCKey{Key: startKey, Timestamp: startTS}
Expand All @@ -791,8 +791,10 @@ func (r *RocksDB) ExportToSst(
var bulkopSummary C.DBString
var resumeKey C.DBString

err := statusToError(C.DBExportToSst(goToCKey(start), goToCKey(end), C.bool(exportAllRevisions),
C.uint64_t(targetSize), goToCIterOptions(io), r.rdb, &data, &intentErr, &bulkopSummary, &resumeKey))
err := statusToError(C.DBExportToSst(goToCKey(start), goToCKey(end),
C.bool(exportAllRevisions),
C.uint64_t(targetSize), C.uint64_t(maxSize),
goToCIterOptions(io), r.rdb, &data, &intentErr, &bulkopSummary, &resumeKey))

if err != nil {
if err.Error() == "WriteIntentError" {
Expand Down Expand Up @@ -1005,10 +1007,10 @@ func (r *rocksDBReadOnly) ExportToSst(
startKey, endKey roachpb.Key,
startTS, endTS hlc.Timestamp,
exportAllRevisions bool,
targetSize uint64,
targetSize, maxSize uint64,
io IterOptions,
) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) {
return r.parent.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, io)
return r.parent.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, maxSize, io)
}

func (r *rocksDBReadOnly) Get(key MVCCKey) ([]byte, error) {
Expand Down Expand Up @@ -1326,10 +1328,10 @@ func (r *rocksDBSnapshot) ExportToSst(
startKey, endKey roachpb.Key,
startTS, endTS hlc.Timestamp,
exportAllRevisions bool,
targetSize uint64,
targetSize, maxSize uint64,
io IterOptions,
) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) {
return r.parent.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, io)
return r.parent.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, maxSize, io)
}

// Get returns the value for the given key, nil otherwise using
Expand Down Expand Up @@ -1736,7 +1738,7 @@ func (r *rocksDBBatch) ExportToSst(
startKey, endKey roachpb.Key,
startTS, endTS hlc.Timestamp,
exportAllRevisions bool,
targetSize uint64,
targetSize, maxSize uint64,
io IterOptions,
) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) {
panic("unimplemented")
Expand Down
Loading

0 comments on commit dfe05bb

Please sign in to comment.