Skip to content

Commit

Permalink
sql: thread WriteAtRequestTimestamp through BackfillerSpec
Browse files Browse the repository at this point in the history
This change was pulled out of the index backfiller PR (cockroachdb#73878). It
threads the new WriteAtRequestTimestamp option supported by AddSSTable
into the bulk processor.

In this PR nothing sets this to true, so it should be a no-op, but it
means one less file to keep rebasing when people steal my integer.

Release note: None
  • Loading branch information
stevendanna committed Feb 4, 2022
1 parent fdf9a8f commit 6b14283
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 12 deletions.
2 changes: 1 addition & 1 deletion pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -975,7 +975,7 @@ func (sc *SchemaChanger) distIndexBackfill(
true /* distribute */)
indexBatchSize := indexBackfillBatchSize.Get(&sc.execCfg.Settings.SV)
chunkSize := sc.getChunkSize(indexBatchSize)
spec, err := initIndexBackfillerSpec(*tableDesc.TableDesc(), writeAsOf, readAsOf, chunkSize, addedIndexes)
spec, err := initIndexBackfillerSpec(*tableDesc.TableDesc(), writeAsOf, readAsOf, false /* writeAtRequestTimestamp */, chunkSize, addedIndexes)
if err != nil {
return err
}
Expand Down
14 changes: 8 additions & 6 deletions pkg/sql/distsql_plan_backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,18 @@ func initColumnBackfillerSpec(
func initIndexBackfillerSpec(
desc descpb.TableDescriptor,
writeAsOf, readAsOf hlc.Timestamp,
writeAtRequestTimestamp bool,
chunkSize int64,
indexesToBackfill []descpb.IndexID,
) (execinfrapb.BackfillerSpec, error) {
return execinfrapb.BackfillerSpec{
Table: desc,
WriteAsOf: writeAsOf,
ReadAsOf: readAsOf,
Type: execinfrapb.BackfillerSpec_Index,
ChunkSize: chunkSize,
IndexesToBackfill: indexesToBackfill,
Table: desc,
WriteAsOf: writeAsOf,
WriteAtRequestTimestamp: writeAtRequestTimestamp,
ReadAsOf: readAsOf,
Type: execinfrapb.BackfillerSpec_Index,
ChunkSize: chunkSize,
IndexesToBackfill: indexesToBackfill,
}, nil
}

Expand Down
17 changes: 13 additions & 4 deletions pkg/sql/execinfrapb/processors_bulk_io.proto
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ message BackfillerSpec {
reserved 6;

optional int32 initial_splits = 11 [(gogoproto.nullable) = false];

// WriteAtRequestTimestamp controls the corresponding AddSSTable request
// option which updates all MVCC timestamps in the SST to the request
// timestamp, even if the request gets pushed. This ensures the writes
// comply with the timestamp cache and closed timestamp.
//
// Note that older nodes do not respect this flag so callers should
// check MVCCAddSSTable before setting this option.
optional bool write_at_request_timestamp = 12 [(gogoproto.nullable) = false];
}

// JobProgress identifies the job to report progress on. This reporting
Expand Down Expand Up @@ -161,25 +170,25 @@ message ReadImportDataSpec {

message StreamIngestionDataSpec {
reserved 1;

// StreamID is the ID of the stream (which is shared across the producer and consumer).
optional uint64 stream_id = 5 [(gogoproto.nullable) = false, (gogoproto.customname) = "StreamID"];

// PartitionSpecs specify how to subscribe to the i'th partition.
repeated string partition_ids = 6;
// PartitionSpecs specify how to subscribe to the i'th partition.
repeated string partition_specs = 7;
// PartitionAddresses locate the partitions that produce events to be
// ingested. We don't set the casttype to avoid depending on ccl packages.
repeated string partition_addresses = 8;

// The processor will ingest events from StartTime onwards.
optional util.hlc.Timestamp start_time = 2 [(gogoproto.nullable) = false];
// StreamAddress locate the stream so that a stream client can be initialized.
optional string stream_address = 3 [(gogoproto.nullable) = false];
// JobID is the job ID of the stream ingestion job.
optional int64 job_id = 4 [(gogoproto.nullable) = false, (gogoproto.customname) = "JobID"];

}

message StreamIngestionFrontierSpec {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/index_backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (ib *IndexBackfillPlanner) plan(
// TODO(ajwerner): Adopt util.ConstantWithMetamorphicTestRange for the
// batch size. Also plumb in a testing knob.
chunkSize := indexBackfillBatchSize.Get(&ib.execCfg.Settings.SV)
spec, err := initIndexBackfillerSpec(*td.TableDesc(), writeAsOf, readAsOf, chunkSize, indexesToBackfill)
spec, err := initIndexBackfillerSpec(*td.TableDesc(), writeAsOf, readAsOf, false /* writeAtRequestTimestamp */, chunkSize, indexesToBackfill)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/rowexec/indexbackfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ func (ib *indexBackfiller) ingestIndexEntries(
SkipDuplicates: ib.ContainsInvertedIndex(),
BatchTimestamp: ib.spec.ReadAsOf,
InitialSplitsIfUnordered: int(ib.spec.InitialSplits),
WriteAtRequestTime: ib.spec.WriteAtRequestTimestamp,
}
adder, err := ib.flowCtx.Cfg.BulkAdder(ctx, ib.flowCtx.Cfg.DB, ib.spec.WriteAsOf, opts)
if err != nil {
Expand Down

0 comments on commit 6b14283

Please sign in to comment.