Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
139342: ui: fix statement diag reports when min exec latency is null r=kyle-a-wong a=kyle-a-wong

A bug in db console was resulting in statement diagnostics reports to not work as intended. As a result, activating diagnostics didn't result in the intended state change which showed a user that a diagnostics report is running or downloadble.

This was happening in edge cases where reports "minExecutionLatency" response field was null, but the db console expected it to be populated. Now, db console should handle this edge case.

Fixes: #139340
Epic: none
Release note (bug fix): Fixes a bug where sometimes activating diagnostics for sql activity appears unresponsive, with no state or status update upon activating. Now, the status should always reflect that diagnosticsa are active or that a statement bundle is downloadable.

139491: crosscluster/physical: wait for sip shutdown before cutover r=kev-cao a=msbutler

Informs #136588

Release note: none

Co-authored-by: Kyle Wong <[email protected]>
Co-authored-by: Michael Butler <[email protected]>
  • Loading branch information
3 people committed Jan 22, 2025
3 parents 049fe5d + e84396c + 2bf0c3c commit 6652998
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 14 deletions.
2 changes: 2 additions & 0 deletions pkg/crosscluster/physical/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ go_library(
"//pkg/crosscluster/replicationutils",
"//pkg/crosscluster/streamclient",
"//pkg/jobs",
"//pkg/jobs/ingeststopped",
"//pkg/jobs/jobspb",
"//pkg/jobs/jobsprofiler",
"//pkg/jobs/jobsprotectedts",
Expand All @@ -51,6 +52,7 @@ go_library(
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/sql",
"//pkg/sql/catalog/catpb",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/replication",
Expand Down
25 changes: 18 additions & 7 deletions pkg/crosscluster/physical/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/crosscluster/replicationutils"
"github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/ingeststopped"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
Expand Down Expand Up @@ -41,6 +42,8 @@ import (
"github.com/cockroachdb/redact"
)

var maxIngestionProcessorShutdownWait = 5 * time.Minute

type streamIngestionResumer struct {
job *jobs.Job

Expand Down Expand Up @@ -501,6 +504,11 @@ func maybeRevertToCutoverTimestamp(
return cutoverTimestamp, false, errors.Wrapf(err, "failed to stop reader tenant")
}
}
if err := ingeststopped.WaitForNoIngestingNodes(ctx, p, ingestionJob, maxIngestionProcessorShutdownWait); err != nil {
return cutoverTimestamp, false, errors.Wrapf(err, "unable to verify that attempted LDR job %d had stopped offline ingesting %s", ingestionJob.ID(), maxIngestionProcessorShutdownWait)
}
log.Infof(ctx, "verified no nodes still offline ingesting on behalf of job %d", ingestionJob.ID())

log.Infof(ctx, "reverting to cutover timestamp %s", cutoverTimestamp)
if p.ExecCfg().StreamingTestingKnobs != nil && p.ExecCfg().StreamingTestingKnobs.AfterCutoverStarted != nil {
p.ExecCfg().StreamingTestingKnobs.AfterCutoverStarted()
Expand Down Expand Up @@ -593,12 +601,8 @@ func stopTenant(ctx context.Context, execCfg *sql.ExecutorConfig, tenantID roach
return nil
}

// OnFailOrCancel is part of the jobs.Resumer interface.
// There is a known race between the ingestion processors shutting down, and
// OnFailOrCancel being invoked. As a result of which we might see some keys
// leftover in the keyspace if a ClearRange were to be issued here. In general
// the tenant keyspace of a failed/canceled ingestion job should be treated as
// corrupted, and the tenant should be dropped before resuming the ingestion.
// OnFailOrCancel is part of the jobs.Resumer interface. After ingestion job
// fails or gets cancelled, the tenant should be dropped.
func (s *streamIngestionResumer) OnFailOrCancel(
ctx context.Context, execCtx interface{}, _ error,
) error {
Expand Down Expand Up @@ -627,6 +631,14 @@ func (s *streamIngestionResumer) OnFailOrCancel(
telemetry.Count("physical_replication.failed")
}

// Ensure no sip processors are still ingesting data, so a subsequent DROP
// TENANT cmd will cleanly wipe out all data.
if err := ingeststopped.WaitForNoIngestingNodes(ctx, jobExecCtx, s.job, maxIngestionProcessorShutdownWait); err != nil {
log.Warningf(ctx, "unable to verify that attempted LDR job %d had stopped offline ingesting %s: %v", s.job.ID(), maxIngestionProcessorShutdownWait, err)
} else {
log.Infof(ctx, "verified no nodes still offline ingesting on behalf of job %d", s.job.ID())
}

return execCfg.InternalDB.Txn(ctx, func(
ctx context.Context, txn isql.Txn,
) error {
Expand All @@ -648,7 +660,6 @@ func (s *streamIngestionResumer) OnFailOrCancel(
return err
}
}

return nil
})
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/crosscluster/physical/stream_ingestion_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
Expand Down Expand Up @@ -395,6 +396,8 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) {
sip.aggTimer.Reset(15 * time.Second)
}

defer sip.FlowCtx.Cfg.JobRegistry.MarkAsIngesting(catpb.JobID(sip.spec.JobID))()

ctx = sip.StartInternal(ctx, streamIngestionProcessorName, sip.agg)

sip.metrics = sip.FlowCtx.Cfg.JobRegistry.MetricsStruct().StreamIngest.(*Metrics)
Expand Down
14 changes: 7 additions & 7 deletions pkg/ui/workspaces/cluster-ui/src/api/statementDiagnosticsApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import moment from "moment-timezone";

import { fetchData } from "src/api";

import { NumberToDuration } from "../util";
import { DurationToMomentDuration, NumberToDuration } from "../util";

const STATEMENT_DIAGNOSTICS_PATH = "_status/stmtdiagreports";
const CANCEL_STATEMENT_DIAGNOSTICS_PATH =
Expand All @@ -33,17 +33,17 @@ export async function getStatementDiagnosticsReports(): Promise<StatementDiagnos
STATEMENT_DIAGNOSTICS_PATH,
);
return response.reports.map(report => {
const minExecutionLatency = report.min_execution_latency
? DurationToMomentDuration(report.min_execution_latency)
: null;
return {
id: report.id.toString(),
statement_fingerprint: report.statement_fingerprint,
completed: report.completed,
statement_diagnostics_id: report.statement_diagnostics_id.toString(),
requested_at: moment.unix(report.requested_at.seconds.toNumber()),
min_execution_latency: moment.duration(
report.min_execution_latency.seconds.toNumber(),
"seconds",
),
expires_at: moment.unix(report.expires_at.seconds.toNumber()),
requested_at: moment.unix(report.requested_at?.seconds.toNumber()),
min_execution_latency: minExecutionLatency,
expires_at: moment.unix(report.expires_at?.seconds.toNumber()),
};
});
}
Expand Down

0 comments on commit 6652998

Please sign in to comment.