Skip to content

Commit

Permalink
Merge #88506 #88507
Browse files Browse the repository at this point in the history
88506: kvcoord: fix extremely rare OOM hazard in the DistSender r=nvanbenschoten a=arulajmani

The outer loop in the DistSender subdivides a BathRequest into partial batches corresponding to a single range and sends them out. Previously, the resolved span passed into the function responsible for sending out this partial batch (`sendPartialBatch`) corresponded to the span of the entire batch request (as opposed to the partial batch). This resolved span is used to check if the request needs to be subdivided further between retries in the outer loop of the DistSender. Given the wrong parameter value, we'd always end up determining that the batch needed to be subdivided.

This wasn't really an issue in practice as we don't expect too many retries here. However, if we did (eg. timeouts causing the transport to be exhausted on every try), the infinite recursion here could lead to an OOM.

References #87167

Release note: None

88507:  cluster-ui: enable active execs blocking info for CC dedicated r=xinhaoz a=xinhaoz

This commit enables wait time insights for active executions pages on CC Dedicated. This includes the `Time Spent Waiting` column in the overview pages and the wait time insights panel in the details pages. The feature is still gated for serverless.

Release note (ui change): Dedicated CC now contains information for which transactions are blocked / waiting on the active execution pages. This includes the `Time Spent Waiting` column in the overview pages and the wait time insights panel on the details pages. Feature currently unavailable for serverless.

Co-authored-by: Arul Ajmani <[email protected]>
Co-authored-by: Xin Hao Zhang <[email protected]>
  • Loading branch information
3 people committed Sep 27, 2022
3 parents c55586b + d46a1f5 + c9e1fec commit 898987e
Show file tree
Hide file tree
Showing 13 changed files with 157 additions and 44 deletions.
33 changes: 20 additions & 13 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,8 @@ func NewDistSender(cfg DistSenderConfig) *DistSender {
ds.dontReorderReplicas = cfg.TestingKnobs.DontReorderReplicas
ds.dontConsiderConnHealth = cfg.TestingKnobs.DontConsiderConnHealth
ds.rpcRetryOptions = base.DefaultRetryOptions()
// TODO(arul): The rpcRetryOptions passed in here from server/tenant don't
// set a max retries limit. Should they?
if cfg.RPCRetryOptions != nil {
ds.rpcRetryOptions = *cfg.RPCRetryOptions
}
Expand Down Expand Up @@ -1392,11 +1394,11 @@ func (ds *DistSender) divideAndSendBatchToRanges(
// If we can reserve one of the limited goroutines available for parallel
// batch RPCs, send asynchronously.
if canParallelize && !lastRange && !ds.disableParallelBatches &&
ds.sendPartialBatchAsync(ctx, curRangeBatch, rs, isReverse, withCommit, batchIdx, ri.Token(), responseCh, positions) {
ds.sendPartialBatchAsync(ctx, curRangeBatch, curRangeRS, isReverse, withCommit, batchIdx, ri.Token(), responseCh, positions) {
// Sent the batch asynchronously.
} else {
resp := ds.sendPartialBatch(
ctx, curRangeBatch, rs, isReverse, withCommit, batchIdx, ri.Token(), positions,
ctx, curRangeBatch, curRangeRS, isReverse, withCommit, batchIdx, ri.Token(), positions,
)
responseCh <- resp
if resp.pErr != nil {
Expand Down Expand Up @@ -1530,21 +1532,26 @@ func slowRangeRPCReturnWarningStr(s *redact.StringBuilder, dur time.Duration, at
s.Printf("slow RPC finished after %.2fs (%d attempts)", dur.Seconds(), attempts)
}

// sendPartialBatch sends the supplied batch to the range specified by desc.
// sendPartialBatch sends the supplied batch to the range specified by the
// routing token.
//
// The batch request is supposed to be truncated already so that it contains
// only requests which intersect the range descriptor and keys for each request
// are limited to the range's key span. positions argument describes how the
// given BatchRequest corresponds to the original, un-truncated one, and allows
// us to combine the response later via BatchResponse.Combine. (nil positions
// argument should be used when the original batch request is fully contained
// within a single range.)
// The batch request is supposed to be truncated already so that it only
// contains requests which intersect the range descriptor and keys for each
// request are limited to the range's key span. The rs argument corresponds to
// the span encompassing the key ranges of all requests in the truncated batch.
// It should be entirely contained within the range descriptor of the supplied
// routing token. The positions argument describes how the given batch request
// corresponds to the original, un-truncated one, and allows us to combine the
// response later via BatchResponse.Combine. (nil positions argument should be
// used when the original batch request is fully contained within a single
// range.)
//
// The send occurs in a retry loop to handle send failures. On failure to send
// to any replicas, we backoff and retry by refetching the range descriptor. If
// the underlying range seems to have split, we recursively invoke
// divideAndSendBatchToRanges to re-enumerate the ranges in the span and resend
// to each.
// the underlying range seems to have split (determined by checking if the
// supplied rs is no longer entirely contained within the refreshed range
// descriptor) we recursively invoke divideAndSendBatchToRanges to re-enumerate
// the ranges in the span and resend to each.
func (ds *DistSender) sendPartialBatch(
ctx context.Context,
ba roachpb.BatchRequest,
Expand Down
84 changes: 84 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4553,6 +4553,90 @@ func TestRequestSubdivisionAfterDescriptorChange(t *testing.T) {
}
}

// TestRequestSubdivisionAfterDescriptorChangeWithUnavailableReplicasTerminates
// acts as a regression test for #87167. It essentially guards against infinite
// recursion which could happen in a very rare cases. Specifically, where a
// batch request spanned multiple ranges, but one or more of these ranges did
// not return a result and the DistSender exhausted the entire transport on
// each attempt. We simulate this by returning a sendError.
func TestRequestSubdivisionAfterDescriptorChangeWithUnavailableReplicasTerminates(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

keyA := roachpb.Key("a")
keyB := roachpb.Key("b")
keyC := roachpb.Key("c")
splitKey := keys.MustAddr(keyB)

get := func(k roachpb.Key) roachpb.Request {
return roachpb.NewGet(k, false /* forUpdate */)
}

ctx := context.Background()
tr := tracing.NewTracer()
stopper := stop.NewStopper(stop.WithTracer(tr))
defer stopper.Stop(ctx)

clock := hlc.NewClockWithSystemTimeSource(time.Nanosecond /* maxOffset */)
rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper)
g := makeGossip(t, stopper, rpcContext)

repls := []roachpb.ReplicaDescriptor{{
NodeID: 1,
StoreID: 1,
}}
splitDescs := []roachpb.RangeDescriptor{{
RangeID: roachpb.RangeID(1),
Generation: 2,
StartKey: roachpb.RKeyMin,
EndKey: splitKey,
InternalReplicas: repls,
}, {
RangeID: roachpb.RangeID(2),
Generation: 2,
StartKey: splitKey,
EndKey: roachpb.RKeyMax,
InternalReplicas: repls,
}}

splitRDB := mockRangeDescriptorDBForDescs(splitDescs...)

var numAttempts int32
transportFn := func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, error) {
atomic.AddInt32(&numAttempts, 1)
require.Equal(t, 1, len(ba.Requests))
return nil, newSendError("boom")
}
rpcRetryOptions := &retry.Options{
MaxRetries: 5, // maxAttempts = 6
}
cfg := DistSenderConfig{
AmbientCtx: log.AmbientContext{Tracer: tr},
Clock: clock,
NodeDescs: g,
RPCRetryOptions: rpcRetryOptions,
RPCContext: rpcContext,
RangeDescriptorDB: splitRDB,
TestingKnobs: ClientTestingKnobs{
TransportFactory: adaptSimpleTransport(transportFn),
},
Settings: cluster.MakeTestingClusterSettings(),
}

ds := NewDistSender(cfg)

var ba roachpb.BatchRequest
ba.Add(get(keyA), get(keyC))
// Inconsistent read because otherwise the batch will ask to be re-sent in a
// txn when split.
ba.ReadConsistency = roachpb.INCONSISTENT
_, pErr := ds.Send(ctx, ba)
require.NotNil(t, pErr)
require.True(t, testutils.IsError(pErr.GoError(), "boom"))
// 6 attempts each for the two partial batches.
require.Equal(t, int32(12), atomic.LoadInt32(&numAttempts))
}

// TestDescriptorChangeAfterRequestSubdivision is similar to
// TestRequestSubdivisionAfterDescriptorChange, but it exercises a scenario
// where the request is subdivided before observing a descriptor change. After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

import React, { useContext, useMemo } from "react";
import React, { useMemo } from "react";
import classNames from "classnames/bind";
import {
ActiveStatement,
Expand All @@ -31,7 +31,6 @@ import {
} from "./activeStatementsTable";
import { StatementViewType } from "src/statementsPage/statementPageTypes";
import { calculateActiveFilters } from "src/queryFilter/filter";
import { CockroachCloudContext } from "src/contexts";
import { isSelectedColumn } from "src/columnsSelector/utils";

const sortableTableCx = classNames.bind(sortableTableStyles);
Expand All @@ -43,6 +42,7 @@ type ActiveStatementsSectionProps = {
statements: ActiveStatement[];
selectedColumns?: string[];
sortSetting: SortSetting;
isTenant?: boolean;
onChangeSortSetting: (sortSetting: SortSetting) => void;
onClearFilters: () => void;
onColumnsSelect: (columns: string[]) => void;
Expand All @@ -52,6 +52,7 @@ export const ActiveStatementsSection: React.FC<
ActiveStatementsSectionProps
> = ({
filters,
isTenant,
pagination,
search,
statements,
Expand All @@ -61,12 +62,11 @@ export const ActiveStatementsSection: React.FC<
onChangeSortSetting,
onColumnsSelect,
}) => {
const isCockroachCloud = useContext(CockroachCloudContext);

const columns = useMemo(
() => makeActiveStatementsColumns(isCockroachCloud),
[isCockroachCloud],
() => makeActiveStatementsColumns(isTenant),
[isTenant],
);

const shownColumns = columns.filter(col =>
isSelectedColumn(selectedColumns, col),
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import { Tooltip } from "@cockroachlabs/ui-components";
import { limitText } from "../../util";

export function makeActiveStatementsColumns(
isCockroachCloud: boolean,
isTenant: boolean,
): ColumnDescriptor<ActiveStatement>[] {
return [
activeStatementColumnsFromCommon.executionID,
Expand All @@ -42,9 +42,7 @@ export function makeActiveStatementsColumns(
activeStatementColumnsFromCommon.status,
activeStatementColumnsFromCommon.startTime,
activeStatementColumnsFromCommon.elapsedTime,
!isCockroachCloud
? activeStatementColumnsFromCommon.timeSpentWaiting
: null,
!isTenant ? activeStatementColumnsFromCommon.timeSpentWaiting : null,
activeStatementColumnsFromCommon.applicationName,
].filter(col => col != null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

import React, { useMemo, useContext } from "react";
import React, { useMemo } from "react";
import classNames from "classnames/bind";
import {
ActiveStatementFilters,
Expand All @@ -30,14 +30,14 @@ import {
} from "./activeTransactionsTable";
import { TransactionViewType } from "src/transactionsPage/transactionsPageTypes";
import { calculateActiveFilters } from "src/queryFilter/filter";
import { CockroachCloudContext } from "src/contexts";
import { isSelectedColumn } from "src/columnsSelector/utils";
import { SortedTable } from "src/sortedtable";

const sortableTableCx = classNames.bind(sortableTableStyles);

type ActiveTransactionsSectionProps = {
filters: ActiveStatementFilters;
isTenant?: boolean;
pagination: ISortedTablePagination;
search: string;
transactions: ActiveTransaction[];
Expand All @@ -52,6 +52,7 @@ export const ActiveTransactionsSection: React.FC<
ActiveTransactionsSectionProps
> = ({
filters,
isTenant,
pagination,
search,
transactions,
Expand All @@ -61,11 +62,9 @@ export const ActiveTransactionsSection: React.FC<
onClearFilters,
onColumnsSelect,
}) => {
const isCockroachCloud = useContext(CockroachCloudContext);

const columns = useMemo(
() => makeActiveTransactionsColumns(isCockroachCloud),
[isCockroachCloud],
() => makeActiveTransactionsColumns(isTenant),
[isTenant],
);
const shownColumns = columns.filter(col =>
isSelectedColumn(selectedColumns, col),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import { Tooltip } from "@cockroachlabs/ui-components";
import { limitText } from "../../util";

export function makeActiveTransactionsColumns(
isCockroachCloud: boolean,
isTenant: boolean,
): ColumnDescriptor<ActiveTransaction>[] {
const execType: ExecutionType = "transaction";
return [
Expand All @@ -50,9 +50,7 @@ export function makeActiveTransactionsColumns(
activeTransactionColumnsFromCommon.status,
activeTransactionColumnsFromCommon.startTime,
activeTransactionColumnsFromCommon.elapsedTime,
!isCockroachCloud
? activeTransactionColumnsFromCommon.timeSpentWaiting
: null,
!isTenant ? activeTransactionColumnsFromCommon.timeSpentWaiting : null,
{
name: "statementCount",
title: executionsTableTitles.statementCount(execType),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
} from "src/selectors/activeExecutions.selectors";
import { actions as localStorageActions } from "src/store/localStorage";
import { actions as sessionsActions } from "src/store/sessions";
import { selectIsTenant } from "src/store/uiConfig";
import { localStorageSelector } from "../store/utils/selectors";

export const selectSortSetting = (state: AppState): SortSetting =>
Expand Down Expand Up @@ -54,6 +55,7 @@ export const mapStateToActiveStatementsPageProps = (
sortSetting: selectSortSetting(state),
filters: selectFilters(state),
internalAppNamePrefix: selectAppName(state),
isTenant: selectIsTenant(state),
});

export const mapDispatchToActiveStatementsPageProps = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ export type ActiveStatementsViewStateProps = {
sessionsError: Error | null;
filters: ActiveStatementFilters;
internalAppNamePrefix: string;
isTenant?: boolean;
};

export type ActiveStatementsViewProps = ActiveStatementsViewStateProps &
Expand All @@ -70,6 +71,7 @@ export const ActiveStatementsView: React.FC<ActiveStatementsViewProps> = ({
sessionsError,
filters,
internalAppNamePrefix,
isTenant,
}: ActiveStatementsViewProps) => {
const [pagination, setPagination] = useState<ISortedTablePagination>({
current: 1,
Expand Down Expand Up @@ -204,6 +206,7 @@ export const ActiveStatementsView: React.FC<ActiveStatementsViewProps> = ({
onClearFilters={clearFilters}
onChangeSortSetting={onSortClick}
onColumnsSelect={onColumnsSelect}
isTenant={isTenant}
/>
</Loading>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ const initialState: SessionsState = {
valid: true,
};

const ssessionsSlice = createSlice({
const sessionsSlice = createSlice({
name: `${DOMAIN_NAME}/sessions`,
initialState,
reducers: {
Expand All @@ -52,4 +52,4 @@ const ssessionsSlice = createSlice({
},
});

export const { reducer, actions } = ssessionsSlice;
export const { reducer, actions } = sessionsSlice;
26 changes: 21 additions & 5 deletions pkg/ui/workspaces/cluster-ui/src/store/sessions/sessions.sagas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,31 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

import { all, call, put, takeLatest, PutEffect } from "redux-saga/effects";
import {
all,
call,
put,
takeLatest,
AllEffect,
PutEffect,
SelectEffect,
select,
} from "redux-saga/effects";

import { actions } from "./sessions.reducer";
import { actions as clusterLockActions } from "../clusterLocks/clusterLocks.reducer";
import { getSessions } from "src/api/sessionsApi";
import { selectIsTenant } from "../uiConfig";

export function* refreshSessionsAndClusterLocksSaga(): Generator<PutEffect> {
yield put(actions.request());

// TODO (xzhang) request clusterLocks info here. This is currently not available on CC.
export function* refreshSessionsAndClusterLocksSaga(): Generator<
AllEffect<PutEffect> | SelectEffect | PutEffect
> {
const isTenant = yield select(selectIsTenant);
if (isTenant) {
yield put(actions.request());
return;
}
yield all([put(actions.request()), put(clusterLockActions.request())]);
}

export function* requestSessionsSaga(): any {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
import { actions as localStorageActions } from "src/store/localStorage";
import { actions as sessionsActions } from "src/store/sessions";
import { localStorageSelector } from "../store/utils/selectors";
import { selectIsTenant } from "src/store/uiConfig";

export const selectSortSetting = (state: AppState): SortSetting =>
localStorageSelector(state)["sortSetting/ActiveTransactionsPage"];
Expand Down Expand Up @@ -54,6 +55,7 @@ export const mapStateToActiveTransactionsPageProps = (
sortSetting: selectSortSetting(state),
filters: selectFilters(state),
internalAppNamePrefix: selectAppName(state),
isTenant: selectIsTenant(state),
});

export const mapDispatchToActiveTransactionsPageProps = (
Expand Down
Loading

0 comments on commit 898987e

Please sign in to comment.