Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: thread WriteAtRequestTimestamp through BackfillerSpec #76064

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -975,7 +975,7 @@ func (sc *SchemaChanger) distIndexBackfill(
true /* distribute */)
indexBatchSize := indexBackfillBatchSize.Get(&sc.execCfg.Settings.SV)
chunkSize := sc.getChunkSize(indexBatchSize)
spec, err := initIndexBackfillerSpec(*tableDesc.TableDesc(), writeAsOf, readAsOf, chunkSize, addedIndexes)
spec, err := initIndexBackfillerSpec(*tableDesc.TableDesc(), writeAsOf, readAsOf, false /* writeAtRequestTimestamp */, chunkSize, addedIndexes)
if err != nil {
return err
}
Expand Down
14 changes: 8 additions & 6 deletions pkg/sql/distsql_plan_backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,18 @@ func initColumnBackfillerSpec(
func initIndexBackfillerSpec(
desc descpb.TableDescriptor,
writeAsOf, readAsOf hlc.Timestamp,
writeAtRequestTimestamp bool,
chunkSize int64,
indexesToBackfill []descpb.IndexID,
) (execinfrapb.BackfillerSpec, error) {
return execinfrapb.BackfillerSpec{
Table: desc,
WriteAsOf: writeAsOf,
ReadAsOf: readAsOf,
Type: execinfrapb.BackfillerSpec_Index,
ChunkSize: chunkSize,
IndexesToBackfill: indexesToBackfill,
Table: desc,
WriteAsOf: writeAsOf,
WriteAtRequestTimestamp: writeAtRequestTimestamp,
ReadAsOf: readAsOf,
Type: execinfrapb.BackfillerSpec_Index,
ChunkSize: chunkSize,
IndexesToBackfill: indexesToBackfill,
}, nil
}

Expand Down
17 changes: 13 additions & 4 deletions pkg/sql/execinfrapb/processors_bulk_io.proto
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ message BackfillerSpec {
reserved 6;

optional int32 initial_splits = 11 [(gogoproto.nullable) = false];

// WriteAtRequestTimestamp controls the corresponding AddSSTable request
// option which updates all MVCC timestamps in the SST to the request
// timestamp, even if the request gets pushed. This ensures the writes
// comply with the timestamp cache and closed timestamp.
//
// Note that older nodes do not respect this flag so callers should
// check MVCCAddSSTable before setting this option.
optional bool write_at_request_timestamp = 12 [(gogoproto.nullable) = false];
}

// JobProgress identifies the job to report progress on. This reporting
Expand Down Expand Up @@ -161,25 +170,25 @@ message ReadImportDataSpec {

message StreamIngestionDataSpec {
reserved 1;

// StreamID is the ID of the stream (which is shared across the producer and consumer).
optional uint64 stream_id = 5 [(gogoproto.nullable) = false, (gogoproto.customname) = "StreamID"];

// PartitionSpecs specify how to subscribe to the i'th partition.
repeated string partition_ids = 6;
// PartitionSpecs specify how to subscribe to the i'th partition.
repeated string partition_specs = 7;
// PartitionAddresses locate the partitions that produce events to be
// ingested. We don't set the casttype to avoid depending on ccl packages.
repeated string partition_addresses = 8;

// The processor will ingest events from StartTime onwards.
optional util.hlc.Timestamp start_time = 2 [(gogoproto.nullable) = false];
// StreamAddress locate the stream so that a stream client can be initialized.
optional string stream_address = 3 [(gogoproto.nullable) = false];
// JobID is the job ID of the stream ingestion job.
optional int64 job_id = 4 [(gogoproto.nullable) = false, (gogoproto.customname) = "JobID"];

}

message StreamIngestionFrontierSpec {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/index_backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (ib *IndexBackfillPlanner) plan(
// TODO(ajwerner): Adopt util.ConstantWithMetamorphicTestRange for the
// batch size. Also plumb in a testing knob.
chunkSize := indexBackfillBatchSize.Get(&ib.execCfg.Settings.SV)
spec, err := initIndexBackfillerSpec(*td.TableDesc(), writeAsOf, readAsOf, chunkSize, indexesToBackfill)
spec, err := initIndexBackfillerSpec(*td.TableDesc(), writeAsOf, readAsOf, false /* writeAtRequestTimestamp */, chunkSize, indexesToBackfill)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/rowexec/indexbackfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ func (ib *indexBackfiller) ingestIndexEntries(
SkipDuplicates: ib.ContainsInvertedIndex(),
BatchTimestamp: ib.spec.ReadAsOf,
InitialSplitsIfUnordered: int(ib.spec.InitialSplits),
WriteAtRequestTime: ib.spec.WriteAtRequestTimestamp,
}
adder, err := ib.flowCtx.Cfg.BulkAdder(ctx, ib.flowCtx.Cfg.DB, ib.spec.WriteAsOf, opts)
if err != nil {
Expand Down