Skip to content

Commit

Permalink
ui: use status server apis for stmt bundle ops
Browse files Browse the repository at this point in the history
Since 23.1 we've been using the sql over http api to view,
request and cancel stmt bundles for fingerprints.  This
caused a regression in the sql permissions required to
use these features. Prior to using sql over http we only
required `VIEWACTIVITY` or `VIEWACTIVITYREDACTED` for these
operations. Now `VIEWSYSTEMTABLE` is also required since the
sql over http api direclty reads from
system.statement_diagnostics_requests as the db console user.

Instead of creating a view and builtins on top of the system
table, let's simply revert to the existing http apis on the
status server that are already properly gated.

This change reintroduces the following APIs to DB Console
- `POST /_status/stmtdiagreports` to request statement bundles
- `GET /_status/stmtdiagreports` to list statement bundles
- `POST /_status/stmtdiagreports/cancel` to cancel statement bundles

Limitations:
This PR does not carry over the added behaviour in 23.1 where we
prevent users from making multiple statement bundle reqs for a
particular fingerprint. This is not really a common case in the
UI since we show 'Waiting' in the overview page for the row
when a stmt bundle is in progress for a fingerprint, which prevents
another request for the same fingerprint.

Epic: none
Fixes: #121301

Release note (bug fix): Users with VIEWACTIVITY sql priv can now
request, view and cancel stmt bundles in DB console.
  • Loading branch information
xinhaoz committed Aug 28, 2024
1 parent 1ba0298 commit 59be2fb
Showing 1 changed file with 61 additions and 197 deletions.
258 changes: 61 additions & 197 deletions pkg/ui/workspaces/cluster-ui/src/api/statementDiagnosticsApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

import { cockroach } from "@cockroachlabs/crdb-protobuf-client";
import Long from "long";
import moment from "moment-timezone";
import { Duration } from "src/util/format";
import {
createSqlExecutionRequest,
executeInternalSql,
executeInternalSqlHelper,
SqlExecutionRequest,
SqlTxnResult,
txnResultSetIsEmpty,
} from "src/api";

import { fetchData } from "src/api";

import { NumberToDuration } from "../util";

const STATEMENT_DIAGNOSTICS_PATH = "_status/stmtdiagreports";
const CANCEL_STATEMENT_DIAGNOSTICS_PATH =
STATEMENT_DIAGNOSTICS_PATH + "/cancel";

export type StatementDiagnosticsReport = {
id: string;
Expand All @@ -32,70 +33,26 @@ export type StatementDiagnosticsReport = {
export type StatementDiagnosticsResponse = StatementDiagnosticsReport[];

export async function getStatementDiagnosticsReports(): Promise<StatementDiagnosticsResponse> {
let result: StatementDiagnosticsResponse = [];

const createReq = () => {
let offset = "";
const args = [];
if (result.length > 0) {
// Using the id is more performant and reliable than offset.
// Schema is PRIMARY KEY (id) with INT8 DEFAULT unique_rowid() NOT NULL.
offset = " AND (id::STRING < $1) ";
const last = result[result.length - 1];
args.push(last.id);
}
const query = `SELECT
id::STRING,
statement_fingerprint,
completed,
statement_diagnostics_id::STRING,
requested_at,
min_execution_latency,
expires_at
FROM
system.statement_diagnostics_requests
WHERE
(expires_at > now() OR expires_at IS NULL OR completed = true) ${offset}
order by id desc`;

return createSqlExecutionRequest(undefined, [
{
sql: query,
arguments: args,
},
]);
};

const err = await executeInternalSqlHelper<StatementDiagnosticsReport>(
createReq,
(response: SqlTxnResult<StatementDiagnosticsReport>[]) => {
if (!response) {
return;
}

if (txnResultSetIsEmpty(response)) {
return;
}

response.forEach(x => {
if (x.rows && x.rows.length > 0) {
result = result.concat(x.rows);
}
});
},
const response = await fetchData(
cockroach.server.serverpb.StatementDiagnosticsReportsResponse,
STATEMENT_DIAGNOSTICS_PATH,
);

if (err) {
throw err;
}

return result;
return response.reports.map(report => {
return {
id: report.id.toString(),
statement_fingerprint: report.statement_fingerprint,
completed: report.completed,
statement_diagnostics_id: report.statement_diagnostics_id.toString(),
requested_at: moment.unix(report.requested_at.seconds.toNumber()),
min_execution_latency: moment.duration(
report.min_execution_latency.seconds.toNumber(),
"seconds",
),
expires_at: moment.unix(report.expires_at.seconds.toNumber()),
};
});
}

type CheckPendingStmtDiagnosticRow = {
count: number;
};

export type InsertStmtDiagnosticRequest = {
stmtFingerprint: string;
samplingProbability?: number;
Expand All @@ -108,97 +65,24 @@ export type InsertStmtDiagnosticResponse = {
req_resp: boolean;
};

export function createStatementDiagnosticsReport({
stmtFingerprint,
samplingProbability,
minExecutionLatencySeconds,
expiresAfterSeconds,
planGist,
}: InsertStmtDiagnosticRequest): Promise<InsertStmtDiagnosticResponse> {
const args: any = [stmtFingerprint];
let query = `SELECT crdb_internal.request_statement_bundle($1, $2, $3::INTERVAL, $4::INTERVAL) as req_resp`;

if (planGist) {
args.push(planGist);
query = `SELECT crdb_internal.request_statement_bundle($1, $2, $3, $4::INTERVAL, $5::INTERVAL) as req_resp`;
}

if (samplingProbability) {
args.push(samplingProbability);
} else {
args.push(0);
}
if (minExecutionLatencySeconds) {
args.push(Duration(minExecutionLatencySeconds * 1e9));
} else {
args.push("0");
}
if (expiresAfterSeconds && expiresAfterSeconds !== 0) {
args.push(Duration(expiresAfterSeconds * 1e9));
} else {
args.push("0");
}

const createStmtDiag = {
sql: query,
arguments: args,
};

const req: SqlExecutionRequest = {
execute: true,
statements: [createStmtDiag],
};

return checkExistingDiagRequest(stmtFingerprint).then(_ => {
return executeInternalSql<InsertStmtDiagnosticResponse>(req).then(res => {
// If request succeeded but query failed, throw error (caught by saga/cacheDataReducer).
if (res.error) {
throw res.error;
}

if (
res.execution?.txn_results[0]?.rows?.length === 0 ||
res.execution?.txn_results[0]?.rows[0]["req_resp"] === false
) {
throw new Error("Failed to insert statement diagnostics request");
}

return res.execution.txn_results[0].rows[0];
});
});
}

function checkExistingDiagRequest(stmtFingerprint: string): Promise<void> {
// Query to check if there's already a pending request for this fingerprint.
const checkPendingStmtDiag = {
sql: `SELECT count(1) FROM system.statement_diagnostics_requests
WHERE
completed = false AND
statement_fingerprint = $1 AND
(expires_at IS NULL OR expires_at > now())`,
arguments: [stmtFingerprint],
};

const req: SqlExecutionRequest = {
execute: true,
statements: [checkPendingStmtDiag],
};

return executeInternalSql<CheckPendingStmtDiagnosticRow>(req).then(res => {
// If request succeeded but query failed, throw error (caught by saga/cacheDataReducer).
if (res.error) {
throw res.error;
}

if (res.execution?.txn_results[0]?.rows?.length === 0) {
throw new Error("Failed to check pending statement diagnostics");
}

if (res.execution.txn_results[0].rows[0].count > 0) {
throw new Error(
"A pending request for the requested fingerprint already exists. Cancel the existing request first and try again.",
);
}
export async function createStatementDiagnosticsReport(
req: InsertStmtDiagnosticRequest,
): Promise<InsertStmtDiagnosticResponse> {
return fetchData(
cockroach.server.serverpb.CreateStatementDiagnosticsReportResponse,
STATEMENT_DIAGNOSTICS_PATH,
cockroach.server.serverpb.CreateStatementDiagnosticsReportRequest,
new cockroach.server.serverpb.CreateStatementDiagnosticsReportRequest({
statement_fingerprint: req.stmtFingerprint,
sampling_probability: req.samplingProbability,
min_execution_latency: NumberToDuration(req.minExecutionLatencySeconds),
expires_after: NumberToDuration(req.expiresAfterSeconds),
plan_gist: req.planGist,
}),
).then(response => {
return {
req_resp: response.report !== null,
};
});
}

Expand All @@ -210,42 +94,22 @@ export type CancelStmtDiagnosticResponse = {
stmt_diag_req_id: string;
};

export function cancelStatementDiagnosticsReport({
requestId,
}: CancelStmtDiagnosticRequest): Promise<CancelStmtDiagnosticResponse> {
const query = `UPDATE system.statement_diagnostics_requests
SET expires_at = '1970-01-01'
WHERE completed = false
AND id = $1::INT8
AND (expires_at IS NULL OR expires_at > now()) RETURNING id as stmt_diag_req_id`;
const req: SqlExecutionRequest = {
execute: true,
statements: [
{
sql: query,
arguments: [requestId],
},
],
};

return executeInternalSql<CancelStmtDiagnosticResponse>(req).then(res => {
// If request succeeded but query failed, throw error (caught by saga/cacheDataReducer).
if (res.error) {
throw res.error;
export async function cancelStatementDiagnosticsReport(
req: CancelStmtDiagnosticRequest,
): Promise<CancelStmtDiagnosticResponse> {
return fetchData(
cockroach.server.serverpb.CancelStatementDiagnosticsReportResponse,
CANCEL_STATEMENT_DIAGNOSTICS_PATH,
cockroach.server.serverpb.CancelStatementDiagnosticsReportRequest,
new cockroach.server.serverpb.CancelStatementDiagnosticsReportRequest({
request_id: Long.fromString(req.requestId),
}),
).then(response => {
if (response.error) {
throw new Error(response.error);
}

if (!res.execution?.txn_results?.length) {
throw new Error(
"cancelStatementDiagnosticsReport - unexpected zero txn_results",
);
}

if (res.execution.txn_results[0].rows?.length === 0) {
throw new Error(
`No pending request found for the request id: ${requestId}`,
);
}

return res.execution.txn_results[0].rows[0];
return {
stmt_diag_req_id: req.requestId,
};
});
}

0 comments on commit 59be2fb

Please sign in to comment.