Skip to content

Commit

Permalink
Merge pull request #17908 from mjibson/csv-timestamp
Browse files Browse the repository at this point in the history
sqlccl: add endtime to CSV backup descriptors
  • Loading branch information
maddyblue authored Aug 25, 2017
2 parents 90840ef + 761b042 commit 2c4f14b
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 12 deletions.
4 changes: 3 additions & 1 deletion pkg/ccl/sqlccl/backup.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion pkg/ccl/sqlccl/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ message BackupDescriptor {
util.hlc.Timestamp end_time = 2 [(gogoproto.nullable) = false];
// Spans contains the spans requested for backup. The keyranges covered by
// `files` may be a subset of this if there were ranges with no changes since
// the last backup.
// the last backup. For all tables in the backup descriptor, these spans must
// completely cover each table's span. For example, if a table with ID 51 were
// being backed up, then the span `/Table/5{1-2}` must be completely covered.
repeated roachpb.Span spans = 3 [(gogoproto.nullable) = false];
repeated File files = 4 [(gogoproto.nullable) = false];
repeated sql.sqlbase.Descriptor descriptors = 5 [(gogoproto.nullable) = false];
Expand Down
19 changes: 9 additions & 10 deletions pkg/ccl/sqlccl/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func doLocalCSVTransform(
recordCh := make(chan csvRecord, chanSize)
kvCh := make(chan roachpb.KeyValue, chanSize)
contentCh := make(chan sstContent)
walltime := timeutil.Now().UnixNano()
group.Go(func() error {
defer close(recordCh)
var err error
Expand All @@ -139,12 +140,12 @@ func doLocalCSVTransform(
group.Go(func() error {
defer close(contentCh)
var err error
kvCount, err = writeRocksDB(gCtx, kvCh, rocksdbDest, sstMaxSize, contentCh)
kvCount, err = writeRocksDB(gCtx, kvCh, rocksdbDest, sstMaxSize, contentCh, walltime)
return err
})
group.Go(func() error {
var err error
sstCount, err = makeBackup(gCtx, parentID, tableDesc, dest, contentCh)
sstCount, err = makeBackup(gCtx, parentID, tableDesc, dest, contentCh, walltime)
return err
})
return csvCount, kvCount, sstCount, group.Wait()
Expand Down Expand Up @@ -415,6 +416,7 @@ func writeRocksDB(
rocksdbDir string,
sstMaxSize int64,
contentCh chan<- sstContent,
walltime int64,
) (int64, error) {
const batchMaxSize = 1024 * 50

Expand Down Expand Up @@ -489,7 +491,6 @@ func writeRocksDB(

var kv engine.MVCCKeyValue
var firstKey, lastKey roachpb.Key
ts := timeutil.Now().UnixNano()
var count int64
for it.Seek(engine.MVCCKey{}); ; it.Next() {
if ok, err := it.Valid(); err != nil {
Expand All @@ -510,7 +511,7 @@ func writeRocksDB(
}

kv.Key = it.UnsafeKey()
kv.Key.Timestamp.WallTime = ts
kv.Key.Timestamp.WallTime = walltime
kv.Value = it.UnsafeValue()

if err := sst.Add(kv); err != nil {
Expand Down Expand Up @@ -546,9 +547,11 @@ func makeBackup(
tableDesc *sqlbase.TableDescriptor,
destDir string,
contentCh <-chan sstContent,
walltime int64,
) (int64, error) {
backupDesc := BackupDescriptor{
FormatVersion: BackupFormatInitialVersion,
EndTime: hlc.Timestamp{WallTime: walltime},
}

conf, err := storageccl.ExportStorageConfFromURI(destDir)
Expand Down Expand Up @@ -599,12 +602,7 @@ func finalizeCSVBackup(
}

sort.Sort(backupFileDescriptors(backupDesc.Files))
backupDesc.Spans = []roachpb.Span{
{
Key: backupDesc.Files[0].Span.Key,
EndKey: backupDesc.Files[len(backupDesc.Files)-1].Span.EndKey,
},
}
backupDesc.Spans = []roachpb.Span{tableDesc.TableSpan()}
backupDesc.Descriptors = []sqlbase.Descriptor{
*sqlbase.WrapDescriptor(&sqlbase.DatabaseDescriptor{
Name: csvDatabaseName,
Expand Down Expand Up @@ -840,6 +838,7 @@ func doDistributedCSVTransform(

backupDesc := BackupDescriptor{
FormatVersion: BackupFormatInitialVersion,
EndTime: hlc.Timestamp{WallTime: walltime},
}
n := rows.Len()
for i := 0; i < n; i++ {
Expand Down

0 comments on commit 2c4f14b

Please sign in to comment.