diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index f0b5e45765d5..0fd374223761 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -975,7 +975,7 @@ func (sc *SchemaChanger) distIndexBackfill( true /* distribute */) indexBatchSize := indexBackfillBatchSize.Get(&sc.execCfg.Settings.SV) chunkSize := sc.getChunkSize(indexBatchSize) - spec, err := initIndexBackfillerSpec(*tableDesc.TableDesc(), writeAsOf, readAsOf, chunkSize, addedIndexes) + spec, err := initIndexBackfillerSpec(*tableDesc.TableDesc(), writeAsOf, readAsOf, false /* writeAtRequestTimestamp */, chunkSize, addedIndexes) if err != nil { return err } diff --git a/pkg/sql/distsql_plan_backfill.go b/pkg/sql/distsql_plan_backfill.go index fe75a1297edb..6f559ef4fd99 100644 --- a/pkg/sql/distsql_plan_backfill.go +++ b/pkg/sql/distsql_plan_backfill.go @@ -36,16 +36,18 @@ func initColumnBackfillerSpec( func initIndexBackfillerSpec( desc descpb.TableDescriptor, writeAsOf, readAsOf hlc.Timestamp, + writeAtRequestTimestamp bool, chunkSize int64, indexesToBackfill []descpb.IndexID, ) (execinfrapb.BackfillerSpec, error) { return execinfrapb.BackfillerSpec{ - Table: desc, - WriteAsOf: writeAsOf, - ReadAsOf: readAsOf, - Type: execinfrapb.BackfillerSpec_Index, - ChunkSize: chunkSize, - IndexesToBackfill: indexesToBackfill, + Table: desc, + WriteAsOf: writeAsOf, + WriteAtRequestTimestamp: writeAtRequestTimestamp, + ReadAsOf: readAsOf, + Type: execinfrapb.BackfillerSpec_Index, + ChunkSize: chunkSize, + IndexesToBackfill: indexesToBackfill, }, nil } diff --git a/pkg/sql/execinfrapb/processors_bulk_io.proto b/pkg/sql/execinfrapb/processors_bulk_io.proto index e9a8632c9029..1001f6f0d5bc 100644 --- a/pkg/sql/execinfrapb/processors_bulk_io.proto +++ b/pkg/sql/execinfrapb/processors_bulk_io.proto @@ -74,6 +74,15 @@ message BackfillerSpec { reserved 6; optional int32 initial_splits = 11 [(gogoproto.nullable) = false]; + + // WriteAtRequestTimestamp controls the corresponding AddSSTable request + // option which updates all MVCC timestamps in the SST to the request + // timestamp, even if the request gets pushed. This ensures the writes + // comply with the timestamp cache and closed timestamp. + // + // Note that older nodes do not respect this flag so callers should + // check MVCCAddSSTable before setting this option. + optional bool write_at_request_timestamp = 12 [(gogoproto.nullable) = false]; } // JobProgress identifies the job to report progress on. This reporting @@ -161,10 +170,10 @@ message ReadImportDataSpec { message StreamIngestionDataSpec { reserved 1; - + // StreamID is the ID of the stream (which is shared across the producer and consumer). optional uint64 stream_id = 5 [(gogoproto.nullable) = false, (gogoproto.customname) = "StreamID"]; - + // PartitionSpecs specify how to subscribe to the i'th partition. repeated string partition_ids = 6; // PartitionSpecs specify how to subscribe to the i'th partition. @@ -172,14 +181,14 @@ message StreamIngestionDataSpec { // PartitionAddresses locate the partitions that produce events to be // ingested. We don't set the casttype to avoid depending on ccl packages. repeated string partition_addresses = 8; - + // The processor will ingest events from StartTime onwards. optional util.hlc.Timestamp start_time = 2 [(gogoproto.nullable) = false]; // StreamAddress locate the stream so that a stream client can be initialized. optional string stream_address = 3 [(gogoproto.nullable) = false]; // JobID is the job ID of the stream ingestion job. optional int64 job_id = 4 [(gogoproto.nullable) = false, (gogoproto.customname) = "JobID"]; - + } message StreamIngestionFrontierSpec { diff --git a/pkg/sql/index_backfiller.go b/pkg/sql/index_backfiller.go index f2a1ffdfaa4e..ec90367fc2de 100644 --- a/pkg/sql/index_backfiller.go +++ b/pkg/sql/index_backfiller.go @@ -173,7 +173,7 @@ func (ib *IndexBackfillPlanner) plan( // TODO(ajwerner): Adopt util.ConstantWithMetamorphicTestRange for the // batch size. Also plumb in a testing knob. chunkSize := indexBackfillBatchSize.Get(&ib.execCfg.Settings.SV) - spec, err := initIndexBackfillerSpec(*td.TableDesc(), writeAsOf, readAsOf, chunkSize, indexesToBackfill) + spec, err := initIndexBackfillerSpec(*td.TableDesc(), writeAsOf, readAsOf, false /* writeAtRequestTimestamp */, chunkSize, indexesToBackfill) if err != nil { return err } diff --git a/pkg/sql/rowexec/indexbackfiller.go b/pkg/sql/rowexec/indexbackfiller.go index 04c00e19be69..c474b9176e3e 100644 --- a/pkg/sql/rowexec/indexbackfiller.go +++ b/pkg/sql/rowexec/indexbackfiller.go @@ -200,6 +200,7 @@ func (ib *indexBackfiller) ingestIndexEntries( SkipDuplicates: ib.ContainsInvertedIndex(), BatchTimestamp: ib.spec.ReadAsOf, InitialSplitsIfUnordered: int(ib.spec.InitialSplits), + WriteAtRequestTime: ib.spec.WriteAtRequestTimestamp, } adder, err := ib.flowCtx.Cfg.BulkAdder(ctx, ib.flowCtx.Cfg.DB, ib.spec.WriteAsOf, opts) if err != nil {