Skip to content

Commit

Permalink
ttl: improve row-level TTL performance using DistSQL
Browse files Browse the repository at this point in the history
fixes cockroachdb#76914

Release note (performance improvement): The row-level TTL job has been modified
to distribute work using DistSQL. This usually results in the leaseholder nodes
managing deleting of the spans they own.
  • Loading branch information
ecwall committed Aug 3, 2022
1 parent 47d34c9 commit 081cce5
Show file tree
Hide file tree
Showing 15 changed files with 519 additions and 215 deletions.
13 changes: 13 additions & 0 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -984,11 +984,24 @@ message AutoSQLStatsCompactionProgress {
}

message RowLevelTTLDetails {

// TableID is the ID of the table that the TTL job removes records from.
uint32 table_id = 1 [
(gogoproto.customname) = "TableID",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.ID"
];

// Cutoff is compared against execinfrapb.TTLSpec.TTLExpr by the
// ttlProcessor to determine what records to delete. Records are deleted
// if TTLExpr <= Cutoff.
google.protobuf.Timestamp cutoff = 2 [(gogoproto.nullable)=false, (gogoproto.stdtime) = true];

// TableVersion is the table descriptor version of the table when the TTLJob
// started.
uint64 table_version = 3 [
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.DescriptorVersion"
];

}

message RowLevelTTLProgress {
Expand Down
12 changes: 12 additions & 0 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,18 @@ func NewMetadataCallbackWriter(
return &MetadataCallbackWriter{rowResultWriter: rowResultWriter, fn: metaFn}
}

// NewMetadataOnlyMetadataCallbackWriter creates a new MetadataCallbackWriter
// that uses errOnlyResultWriter and only supports receiving
// execinfrapb.ProducerMetadata.
func NewMetadataOnlyMetadataCallbackWriter() *MetadataCallbackWriter {
return NewMetadataCallbackWriter(
&errOnlyResultWriter{},
func(ctx context.Context, meta *execinfrapb.ProducerMetadata) error {
return nil
},
)
}

// errOnlyResultWriter is a rowResultWriter and batchResultWriter that only
// supports receiving an error. All other functions that deal with producing
// results panic.
Expand Down
18 changes: 11 additions & 7 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/schemaexpr"
Expand Down Expand Up @@ -1528,13 +1527,18 @@ type TTLTestingKnobs struct {
// AOSTDuration changes the AOST timestamp duration to add to the
// current time.
AOSTDuration *time.Duration
// ReturnStatsError causes stats errors to be returned instead of logged as warnings.
// RequireMultipleSpanPartitions is a flag to verify that the DistSQL will
// distribute the work across multiple nodes.
RequireMultipleSpanPartitions bool
// ReturnStatsError causes stats errors to be returned instead of logged as
// warnings.
ReturnStatsError bool
// MockTableDescriptorVersionDuringDelete is a version to mock the table descriptor
// as during delete.
MockTableDescriptorVersionDuringDelete *descpb.DescriptorVersion
// PreSelectDeleteStatement runs before the start of the TTL select-delete loop
PreSelectDeleteStatement string
// PreDeleteChangeTableVersion is a flag to change the table descriptor
// during a delete.
PreDeleteChangeTableVersion bool
// PreSelectStatement runs before the start of the TTL select-delete
// loop.
PreSelectStatement string
}

// ModuleTestingKnobs implements the base.ModuleTestingKnobs interface.
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/execinfrapb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ proto_library(
"processors_changefeeds.proto",
"processors_sql.proto",
"processors_table_stats.proto",
"processors_ttl.proto",
],
strip_import_prefix = "/pkg",
visibility = ["//visibility:public"],
Expand Down
10 changes: 10 additions & 0 deletions pkg/sql/execinfrapb/flow_diagram.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,16 @@ func (s *ChangeFrontierSpec) summary() (string, []string) {
return "ChangeFrontier", []string{}
}

// summary implements the diagramCellType interface.
func (s *TTLSpec) summary() (string, []string) {
details := s.RowLevelTTLDetails
return "TTL", []string{
fmt.Sprintf("JobID: %d", s.JobID),
fmt.Sprintf("TableID: %d", details.TableID),
fmt.Sprintf("TableVersion: %d", details.TableVersion),
}
}

type diagramCell struct {
Title string `json:"title"`
Details []string `json:"details"`
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/execinfrapb/processors.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ option go_package = "execinfrapb";
import "sql/execinfrapb/data.proto";
import "sql/execinfrapb/processors_base.proto";
import "sql/execinfrapb/processors_sql.proto";
import "sql/execinfrapb/processors_ttl.proto";
import "sql/execinfrapb/processors_bulk_io.proto";
import "sql/execinfrapb/processors_changefeeds.proto";
import "sql/execinfrapb/processors_table_stats.proto";
Expand Down Expand Up @@ -120,6 +121,7 @@ message ProcessorCoreUnion {
optional StreamIngestionFrontierSpec streamIngestionFrontier = 36;
optional ExportSpec exporter = 37;
optional IndexBackfillMergerSpec indexBackfillMerger = 38;
optional TTLSpec ttl = 39;

reserved 6, 12, 14, 17, 18, 19, 20;
}
Expand Down
85 changes: 85 additions & 0 deletions pkg/sql/execinfrapb/processors_ttl.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
//
// Processor definitions for distributed SQL APIs. See
// docs/RFCS/distributed_sql.md.
// All the concepts here are "physical plan" concepts.

syntax = "proto2";
// Beware! This package name must not be changed, even though it doesn't match
// the Go package name, because it defines the Protobuf message names which
// can't be changed without breaking backward compatibility.
package cockroach.sql.distsqlrun;
option go_package = "execinfrapb";

import "gogoproto/gogo.proto";
import "google/protobuf/timestamp.proto";
import "roachpb/data.proto";
import "jobs/jobspb/jobs.proto";

message TTLSpec {

// JobID of the job that ran the ttlProcessor.
optional int64 job_id = 1 [
(gogoproto.nullable) = false,
(gogoproto.customname) = "JobID",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/jobs/jobspb.JobID"
];

// RowLevelTTLDetails are the details of the job that ran the ttlProcessor.
optional jobs.jobspb.RowLevelTTLDetails row_level_ttl_details = 2 [
(gogoproto.nullable) = false,
(gogoproto.customname) = "RowLevelTTLDetails"
];

// AOST is the AS OF SYSTEM TIME value the ttlProcessor uses to select records.
optional google.protobuf.Timestamp aost = 3 [
(gogoproto.nullable) = false,
(gogoproto.customname) = "AOST",
(gogoproto.stdtime) = true
];

// TTLExpr is compared against jobspb.RowLevelTTLDetails.Cutoff by the
// ttlProcessor to determine what records to delete. Records are deleted
// if TTLExpr <= Cutoff.
optional string ttl_expr = 4 [
(gogoproto.nullable) = false,
(gogoproto.customname) = "TTLExpr",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb.Expression"
];

// Spans determine which records are processed by which nodes in the DistSQL
// flow.
repeated roachpb.Span spans = 5 [(gogoproto.nullable) = false];

// RangeConcurrency controls how many ranges a single ttlProcessor processes
// in parallel.
optional int64 range_concurrency = 6 [(gogoproto.nullable) = false];

// SelectBatchSize controls the batch size for SELECTs.
optional int64 select_batch_size = 7 [(gogoproto.nullable) = false];

// DeleteBatchSize controls the batch size for DELETEs.
optional int64 delete_batch_size = 8 [(gogoproto.nullable) = false];

// DeleteRateLimit controls how many records can be deleted per second.
optional int64 delete_rate_limit = 9 [(gogoproto.nullable) = false];

// LabelMetrics controls if metrics are labeled with the name of the table being TTLed.
optional bool label_metrics = 10 [(gogoproto.nullable) = false];

// PreDeleteChangeTableVersion is a test flag to change the table
// descriptor before a delete.
optional bool pre_delete_change_table_version = 11 [(gogoproto.nullable) = false];

// PreSelectStatement is a test setting to run a SQL statement
// before selecting records.
optional string pre_select_statement = 12 [(gogoproto.nullable) = false];
}
9 changes: 9 additions & 0 deletions pkg/sql/rowexec/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,12 @@ func NewProcessor(
}
return backfill.NewIndexBackfillMerger(ctx, flowCtx, *core.IndexBackfillMerger, outputs[0])
}
if core.Ttl != nil {
if err := checkNumInOut(inputs, outputs, 0, 1); err != nil {
return nil, err
}
return NewTTLProcessor(flowCtx, processorID, *core.Ttl, outputs[0])
}
return nil, errors.Errorf("unsupported processor core %q", core)
}

Expand Down Expand Up @@ -391,3 +397,6 @@ var NewChangeFrontierProcessor func(*execinfra.FlowCtx, int32, execinfrapb.Chang

// NewStreamIngestionFrontierProcessor is implemented in the non-free (CCL) codebase and then injected here via runtime initialization.
var NewStreamIngestionFrontierProcessor func(*execinfra.FlowCtx, int32, execinfrapb.StreamIngestionFrontierSpec, execinfra.RowSource, *execinfrapb.PostProcessSpec, execinfra.RowReceiver) (execinfra.Processor, error)

// NewTTLProcessor is implemented in the non-free (CCL) codebase and then injected here via runtime initialization.
var NewTTLProcessor func(*execinfra.FlowCtx, int32, execinfrapb.TTLSpec, execinfra.RowReceiver) (execinfra.Processor, error)
7 changes: 6 additions & 1 deletion pkg/sql/ttl/ttljob/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/sql/ttl/ttljob",
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/keys",
Expand All @@ -27,12 +28,17 @@ go_library(
"//pkg/sql/catalog/colinfo",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/lexbase",
"//pkg/sql/physicalplan",
"//pkg/sql/rowenc",
"//pkg/sql/rowexec",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sessiondatapb",
"//pkg/sql/sqltelemetry",
"//pkg/sql/sqlutil",
"//pkg/sql/types",
"//pkg/util/ctxgroup",
"//pkg/util/log",
Expand Down Expand Up @@ -68,7 +74,6 @@ go_test(
"//pkg/server",
"//pkg/sql",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/desctestutils",
"//pkg/sql/lexbase",
"//pkg/sql/parser",
Expand Down
Loading

0 comments on commit 081cce5

Please sign in to comment.