Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…0440

98267: kvserver: replica load distribution metric r=andrewbaptist a=kvoli

This PR adds two histograms, tracking the percentiles of replica CPU time and replica Batch requests received. Previously, there was only point in time insight into either distribution via hotranges. This change enables historical timeseries tracking via the metric `rebalancing.replicas.cpunanospersecond` for CPU and `rebalancing.replicas.queriespersecond` for Batch Requests.

Informs: #98255

98690: upgrades: make the schema telemetry job setup permanent r=ajwerner a=ajwerner

This migration should run for every cluster.

Epic: none

Informs: #96763

Release note: None

98912: sql: address a few nits from reviewing an old PR r=yuzefovich a=yuzefovich

This commit addresses several nits (a typo, missing periods, and precisely allocating a slice) that I noticed while reviewing the old PR which introduced DELETE FROM ... USING support.

Epic: None

Release note: None

99981: opt: remove panics in execbuilder r=mgartner a=mgartner

This commit replaces `panic`s in execbuilder with returned errors. This
is safer because execbuilder functions are invoked outside the
panic-recoverable `Optimizer.Optimize` function.

Informs #98786

Release note: None


100312: bazel: upgrade to bazel 5.4.0 r=rail a=rickystewart

Release note: None
Epic: none

100321: streamingccl: clean up stream ingestion job execution r=stevendanna a=msbutler

This patch cleans up the stream ingestion job code by:
1. Seperating dist sql processor planning and execution in seperate functions.
2. Setting up the processor planning code for a future with job replanning.
3. Seperating stream ingestion and completion into seperat functions.
3. Cleaning up a few log lines and error messages.

Epic: none

Release note: none

100349: kvserver: nudge replicate queue on span config update r=irfansharif,knz a=arulajmani

We eagerly enqueue replicas into the split/merge queues whenever there is a span config update. A span config update could also imply a need to {up,down}replicate as well. This patch actively enqueues overlapping replicas into the replicate queue as well.

Epic: none

Release note: None

100355: sql: fix read-only SSL var r=knz a=rafiss

The variable will now look at the connection state to determine if SSL/TLS is being used, rather than relying on server configuration params, which aren't sufficient to be able to determine the type of connection.

fixes #99606
Release note: None

100440: server, sql: show in-memory data when no data is persisted r=maryliag a=maryliag

Fixes #100439

When no data is persisted to sql stats tables (because no flush happened yet or because the flush is disabled), the endpoints should fall back to the combined view that contains the in-memory data.

https://www.loom.com/share/b5e3a227a9904a19a5279ed828a3b3f3

Release note (sql change): When there is no data persisted, show the in-memory data.

Co-authored-by: Austen McClernon <[email protected]>
Co-authored-by: ajwerner <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Marcus Gartner <[email protected]>
Co-authored-by: Ricky Stewart <[email protected]>
Co-authored-by: Michael Butler <[email protected]>
Co-authored-by: Arul Ajmani <[email protected]>
Co-authored-by: Rafi Shamim <[email protected]>
Co-authored-by: maryliag <[email protected]>
  • Loading branch information
10 people committed Apr 3, 2023
10 parents 6a3422b + 7b6c751 + 75c083c + e00f324 + 04f7b0e + 69b9431 + 52ef4d3 + cf086e7 + e279bb1 + c7eb1bf commit 6cd1f1b
Show file tree
Hide file tree
Showing 38 changed files with 1,159 additions and 604 deletions.
2 changes: 1 addition & 1 deletion .bazelversion
Original file line number Diff line number Diff line change
@@ -1 +1 @@
cockroachdb/5.1.0
cockroachdb/5.4.0
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ go_library(
"external_connection.go",
"metrics.go",
"stream_ingest_manager.go",
"stream_ingestion_dist.go",
"stream_ingestion_frontier_processor.go",
"stream_ingestion_job.go",
"stream_ingestion_planning.go",
"stream_ingestion_processor.go",
"stream_ingestion_processor_planning.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingest",
visibility = ["//visibility:public"],
Expand Down
262 changes: 262 additions & 0 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
// Copyright 2020 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package streamingest

import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
)

func startDistIngestion(
ctx context.Context,
execCtx sql.JobExecContext,
ingestionJob *jobs.Job,
client streamclient.Client,
) error {

details := ingestionJob.Details().(jobspb.StreamIngestionDetails)
progress := ingestionJob.Progress()

var previousHighWater, heartbeatTimestamp hlc.Timestamp
initialScanTimestamp := details.ReplicationStartTime
// Start from the last checkpoint if it exists.
if h := progress.GetHighWater(); h != nil && !h.IsEmpty() {
previousHighWater = *h
heartbeatTimestamp = previousHighWater
} else {
heartbeatTimestamp = initialScanTimestamp
}

log.Infof(ctx, "ingestion job %d resumes stream ingestion from start time %s",
ingestionJob.ID(), heartbeatTimestamp)

streamID := streampb.StreamID(details.StreamID)
updateRunningStatus(ctx, execCtx, ingestionJob, jobspb.InitializingReplication,
fmt.Sprintf("connecting to the producer job %d and resuming a stream replication plan", streamID))
if err := waitUntilProducerActive(ctx, client, streamID, heartbeatTimestamp, ingestionJob.ID()); err != nil {
return err
}

log.Infof(ctx, "producer job %d is active, creating a stream replication plan", streamID)
dsp := execCtx.DistSQLPlanner()

p, planCtx, err := makePlan(
execCtx,
ingestionJob,
details,
client,
previousHighWater,
progress.Details.(*jobspb.Progress_StreamIngest).StreamIngest.Checkpoint,
initialScanTimestamp)(ctx, dsp)
if err != nil {
return err
}
log.Infof(ctx, "starting to run DistSQL flow for stream ingestion job %d",
ingestionJob.ID())

execPlan := func(ctx context.Context) error {
ctx = logtags.AddTag(ctx, "stream-ingest-distsql", nil)

rw := sql.NewRowResultWriter(nil /* rowContainer */)

var noTxn *kv.Txn
recv := sql.MakeDistSQLReceiver(
ctx,
rw,
tree.Rows,
nil, /* rangeCache */
noTxn,
nil, /* clockUpdater */
execCtx.ExtendedEvalContext().Tracing,
)
defer recv.Release()

jobsprofiler.StorePlanDiagram(ctx, execCtx.ExecCfg().DistSQLSrv.Stopper, p, execCtx.ExecCfg().InternalDB,
ingestionJob.ID())

// Copy the evalCtx, as dsp.Run() might change it.
evalCtxCopy := *execCtx.ExtendedEvalContext()
dsp.Run(ctx, planCtx, noTxn, p, recv, &evalCtxCopy, nil /* finishedSetupFn */)
return rw.Err()
}

updateRunningStatus(ctx, execCtx, ingestionJob, jobspb.Replicating,
"running the SQL flow for the stream ingestion job")

// TODO(msbutler): Implement automatic replanning in the spirit of changefeed replanning.
return execPlan(ctx)
}

// TODO (msbutler): this function signature was written to use in automatic job replanning via
// sql.PhysicalPlanChangeChecker(). Actually implement c2c replanning.
func makePlan(
execCtx sql.JobExecContext,
ingestionJob *jobs.Job,
details jobspb.StreamIngestionDetails,
client streamclient.Client,
previousHighWater hlc.Timestamp,
checkpoint jobspb.StreamIngestionCheckpoint,
initialScanTimestamp hlc.Timestamp,
) func(context.Context, *sql.DistSQLPlanner) (*sql.PhysicalPlan, *sql.PlanningCtx, error) {
return func(ctx context.Context, dsp *sql.DistSQLPlanner) (*sql.PhysicalPlan, *sql.PlanningCtx, error) {
jobID := ingestionJob.ID()
log.Infof(ctx, "Re Planning DistSQL flow for stream ingestion job %d", jobID)

streamID := streampb.StreamID(details.StreamID)
topology, err := client.Plan(ctx, streamID)
if err != nil {
return nil, nil, err
}
err = ingestionJob.NoTxn().Update(ctx, func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
md.Progress.GetStreamIngest().StreamAddresses = topology.StreamAddresses()
ju.UpdateProgress(md.Progress)
return nil
})
if err != nil {
return nil, nil, errors.Wrap(err, "failed to update job progress")
}

planCtx, sqlInstanceIDs, err := dsp.SetupAllNodesPlanning(ctx, execCtx.ExtendedEvalContext(), execCtx.ExecCfg())
if err != nil {
return nil, nil, err
}

streamIngestionSpecs, streamIngestionFrontierSpec, err := constructStreamIngestionPlanSpecs(
streamingccl.StreamAddress(details.StreamAddress),
topology,
sqlInstanceIDs,
initialScanTimestamp,
previousHighWater,
checkpoint,
jobID,
streamID,
topology.SourceTenantID,
details.DestinationTenantID)
if err != nil {
return nil, nil, err
}
if knobs := execCtx.ExecCfg().StreamingTestingKnobs; knobs != nil && knobs.AfterReplicationFlowPlan != nil {
knobs.AfterReplicationFlowPlan(streamIngestionSpecs, streamIngestionFrontierSpec)
}

// Setup a one-stage plan with one proc per input spec.
corePlacement := make([]physicalplan.ProcessorCorePlacement, len(streamIngestionSpecs))
for i := range streamIngestionSpecs {
corePlacement[i].SQLInstanceID = sqlInstanceIDs[i]
corePlacement[i].Core.StreamIngestionData = streamIngestionSpecs[i]
}

p := planCtx.NewPhysicalPlan()
p.AddNoInputStage(
corePlacement,
execinfrapb.PostProcessSpec{},
streamIngestionResultTypes,
execinfrapb.Ordering{},
)

gatewayNodeID, err := execCtx.ExecCfg().NodeInfo.NodeID.OptionalNodeIDErr(48274)
if err != nil {
return nil, nil, err
}

// The ResultRouters from the previous stage will feed in to the
// StreamIngestionFrontier processor.
p.AddSingleGroupStage(ctx, base.SQLInstanceID(gatewayNodeID),
execinfrapb.ProcessorCoreUnion{StreamIngestionFrontier: streamIngestionFrontierSpec},
execinfrapb.PostProcessSpec{}, streamIngestionResultTypes)

p.PlanToStreamColMap = []int{0}
sql.FinalizePlan(ctx, planCtx, p)
return p, planCtx, nil
}
}

func constructStreamIngestionPlanSpecs(
streamAddress streamingccl.StreamAddress,
topology streamclient.Topology,
sqlInstanceIDs []base.SQLInstanceID,
initialScanTimestamp hlc.Timestamp,
previousHighWater hlc.Timestamp,
checkpoint jobspb.StreamIngestionCheckpoint,
jobID jobspb.JobID,
streamID streampb.StreamID,
sourceTenantID roachpb.TenantID,
destinationTenantID roachpb.TenantID,
) ([]*execinfrapb.StreamIngestionDataSpec, *execinfrapb.StreamIngestionFrontierSpec, error) {
// For each stream partition in the topology, assign it to a node.
streamIngestionSpecs := make([]*execinfrapb.StreamIngestionDataSpec, 0, len(sqlInstanceIDs))

trackedSpans := make([]roachpb.Span, 0)
subscribingSQLInstances := make(map[string]uint32)
for i, partition := range topology.Partitions {
// Round robin assign the stream partitions to nodes. Partitions 0 through
// len(nodes) - 1 creates the spec. Future partitions just add themselves to
// the partition addresses.
if i < len(sqlInstanceIDs) {
spec := &execinfrapb.StreamIngestionDataSpec{
StreamID: uint64(streamID),
JobID: int64(jobID),
PreviousHighWaterTimestamp: previousHighWater,
InitialScanTimestamp: initialScanTimestamp,
Checkpoint: checkpoint, // TODO: Only forward relevant checkpoint info
StreamAddress: string(streamAddress),
PartitionSpecs: make(map[string]execinfrapb.StreamIngestionPartitionSpec),
TenantRekey: execinfrapb.TenantRekey{
OldID: sourceTenantID,
NewID: destinationTenantID,
},
}
streamIngestionSpecs = append(streamIngestionSpecs, spec)
}
n := i % len(sqlInstanceIDs)

subscribingSQLInstances[partition.ID] = uint32(sqlInstanceIDs[n])
streamIngestionSpecs[n].PartitionSpecs[partition.ID] = execinfrapb.StreamIngestionPartitionSpec{
PartitionID: partition.ID,
SubscriptionToken: string(partition.SubscriptionToken),
Address: string(partition.SrcAddr),
Spans: partition.Spans,
}

trackedSpans = append(trackedSpans, partition.Spans...)
}

// Create a spec for the StreamIngestionFrontier processor on the coordinator
// node.
streamIngestionFrontierSpec := &execinfrapb.StreamIngestionFrontierSpec{
HighWaterAtStart: previousHighWater,
TrackedSpans: trackedSpans,
JobID: int64(jobID),
StreamID: uint64(streamID),
StreamAddresses: topology.StreamAddresses(),
SubscribingSQLInstances: subscribingSQLInstances,
Checkpoint: checkpoint,
}

return streamIngestionSpecs, streamIngestionFrontierSpec, nil
}
Loading

0 comments on commit 6cd1f1b

Please sign in to comment.