From dd58d6606dea3438d99db22d3aaa8e637a007fc6 Mon Sep 17 00:00:00 2001 From: Xin Hao Zhang Date: Tue, 12 Apr 2022 16:53:21 -0400 Subject: [PATCH 1/2] cluster-ui: create bar graph for time series data Closes #74516 This commit creates a generic bar chart component in the cluster-ui package. The component is intended for use with time series data and should be wrapped in parent components that convert metrics responses from the server to the expected format. Release note: None --- pkg/ui/workspaces/cluster-ui/BUILD.bazel | 2 + .../src/graphs/bargraph/barGraph.stories.tsx | 88 ++++++++ .../src/graphs/bargraph/bargraph.module.scss | 19 ++ .../cluster-ui/src/graphs/bargraph/bars.ts | 200 ++++++++++++++++++ .../cluster-ui/src/graphs/bargraph/index.tsx | 99 +++++++++ .../cluster-ui/src/graphs/bargraph/plugins.ts | 148 +++++++++++++ .../cluster-ui/src/graphs/utils/domain.ts | 13 ++ 7 files changed, 569 insertions(+) create mode 100644 pkg/ui/workspaces/cluster-ui/src/graphs/bargraph/barGraph.stories.tsx create mode 100644 pkg/ui/workspaces/cluster-ui/src/graphs/bargraph/bargraph.module.scss create mode 100644 pkg/ui/workspaces/cluster-ui/src/graphs/bargraph/bars.ts create mode 100644 pkg/ui/workspaces/cluster-ui/src/graphs/bargraph/index.tsx create mode 100644 pkg/ui/workspaces/cluster-ui/src/graphs/bargraph/plugins.ts diff --git a/pkg/ui/workspaces/cluster-ui/BUILD.bazel b/pkg/ui/workspaces/cluster-ui/BUILD.bazel index 9939c04e8ead..773a235b14e4 100644 --- a/pkg/ui/workspaces/cluster-ui/BUILD.bazel +++ b/pkg/ui/workspaces/cluster-ui/BUILD.bazel @@ -109,6 +109,7 @@ DEPENDENCIES = [ "@npm//source-map-loader", "@npm//style-loader", "@npm//ts-jest", + "@npm//uplot", "@npm//url-loader", "@npm//webpack", "@npm//webpack-cli", @@ -147,6 +148,7 @@ ts_project( "@npm//redux-saga", "@npm//redux-saga-test-plan", "@npm//reselect", + "@npm//uplot", ], ) diff --git a/pkg/ui/workspaces/cluster-ui/src/graphs/bargraph/barGraph.stories.tsx b/pkg/ui/workspaces/cluster-ui/src/graphs/bargraph/barGraph.stories.tsx new file mode 100644 index 000000000000..070a2027b8b5 --- /dev/null +++ b/pkg/ui/workspaces/cluster-ui/src/graphs/bargraph/barGraph.stories.tsx @@ -0,0 +1,88 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +import React from "react"; +import { storiesOf } from "@storybook/react"; +import { AlignedData, Options } from "uplot"; +import { BarGraphTimeSeries } from "./index"; +import { AxisUnits } from "../utils/domain"; +import { getBarsBuilder } from "./bars"; + +function generateTimestampsMillis(start: number, length: number): number[] { + return [...Array(length)].map( + (_, idx): number => (60 * 60 * idx + start) * 1000, + ); +} + +function genValuesInRange(range: [number, number], length: number): number[] { + return [...Array(length)].map((): number => + Math.random() > 0.1 ? Math.random() * (range[1] - range[0]) + range[0] : 0, + ); +} + +const mockData: AlignedData = [ + generateTimestampsMillis(1546300800, 20), + genValuesInRange([0, 100], 20), + genValuesInRange([0, 100], 20), + genValuesInRange([0, 100], 20), +]; + +const mockOpts: Partial = { + axes: [{}, { label: "values" }], + series: [ + {}, + { + label: "bar 1", + }, + { + label: "bar 2", + }, + { + label: "bar 3", + }, + ], +}; + +storiesOf("BarGraphTimeSeries", module) + .add("with stacked multi-series", () => { + return ( + This is an example stacked bar graph axis unit = count. + } + yAxisUnits={AxisUnits.Count} + /> + ); + }) + .add("with single series", () => { + const data: AlignedData = [ + generateTimestampsMillis(1546300800, 50), + genValuesInRange([0, 1], 50), + ]; + const opts = { + series: [{}, { label: "bar", paths: getBarsBuilder(0.8, 20) }], + legend: { show: false }, + axes: [{}, { label: "mock" }], + }; + return ( + This is an example bar graph with axis unit = percent. + } + yAxisUnits={AxisUnits.Percentage} + /> + ); + }); diff --git a/pkg/ui/workspaces/cluster-ui/src/graphs/bargraph/bargraph.module.scss b/pkg/ui/workspaces/cluster-ui/src/graphs/bargraph/bargraph.module.scss new file mode 100644 index 000000000000..da0982dda519 --- /dev/null +++ b/pkg/ui/workspaces/cluster-ui/src/graphs/bargraph/bargraph.module.scss @@ -0,0 +1,19 @@ +:global { + @import "uplot/dist/uPlot.min"; +} + +.bargraph { + height: 100%; + :global(.uplot) { + display: flex; + flex-direction: column; + :global(.u-legend) { + text-align: left; + font-size: 10px; + margin-top: 20px; + z-index: 100; + width: fit-content; + padding: 10px; + } + } +} diff --git a/pkg/ui/workspaces/cluster-ui/src/graphs/bargraph/bars.ts b/pkg/ui/workspaces/cluster-ui/src/graphs/bargraph/bars.ts new file mode 100644 index 000000000000..dc9efd70d3fd --- /dev/null +++ b/pkg/ui/workspaces/cluster-ui/src/graphs/bargraph/bars.ts @@ -0,0 +1,200 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +import { merge } from "lodash"; +import uPlot, { Options, Band, AlignedData } from "uplot"; +import { + // AxisUnits, + AxisDomain, +} from "../utils/domain"; +import { barTooltipPlugin } from "./plugins"; + +const seriesPalette = [ + "#475872", + "#FFCD02", + "#F16969", + "#4E9FD1", + "#49D990", + "#D77FBF", + "#87326D", + "#A3415B", + "#B59153", + "#C9DB6D", + "#203D9B", + "#748BF2", + "#91C8F2", + "#FF9696", + "#EF843C", + "#DCCD4B", +]; + +// Aggregate the series. +export function stack( + data: AlignedData, + omit: (i: number) => boolean, +): AlignedData { + const stackedData = []; + const xAxisLength = data[0].length; + const accum = Array(xAxisLength); + accum.fill(0); + + for (let i = 1; i < data.length; i++) + stackedData.push( + omit(i) ? data[i] : data[i].map((v, i) => (accum[i] += v)), + ); + + return [data[0]].concat(stackedData) as AlignedData; +} + +function getStackedBands( + unstackedData: AlignedData, + omit: (i: number) => boolean, +): Band[] { + const bands = []; + + for (let i = 1; i < unstackedData.length; i++) + !omit(i) && + bands.push({ + series: [ + unstackedData.findIndex( + (_series, seriesIdx) => seriesIdx > i && !omit(seriesIdx), + ), + i, + ] as Band.Bounds, + }); + + return bands.filter(b => b.series[1] > -1); +} + +const { bars } = uPlot.paths; + +export const getBarsBuilder = ( + barWidthFactor: number, // percentage of space allocated to bar in the range [0, 1] + maxWidth: number, + minWidth = 10, + align: 0 | 1 | -1 = 1, // -1 = left aligned, 0 = center, 1 = right aligned +): uPlot.Series.PathBuilder => { + return bars({ size: [barWidthFactor, maxWidth, minWidth], align }); +}; + +export const getStackedBarOpts = ( + unstackedData: AlignedData, + userOptions: Partial, + xAxisDomain: AxisDomain, + yAxisDomain: AxisDomain, + colourPalette = seriesPalette, +): Options => { + const options = getBarChartOpts( + userOptions, + xAxisDomain, + yAxisDomain, + colourPalette, + ); + + options.bands = getStackedBands(unstackedData, () => false); + + options.series.forEach(s => { + s.value = (_u, _v, si, i) => unstackedData[si][i]; + + s.points = s.points || { show: false }; + + // Scan raw unstacked data to return only real points. + s.points.filter = (_u, seriesIdx, show) => { + if (show) { + const pts: number[] = []; + unstackedData[seriesIdx].forEach((v, i) => { + v && pts.push(i); + }); + return pts; + } + }; + }); + + options.cursor = options.cursor || {}; + options.cursor.dataIdx = (_u, seriesIdx, closestIdx, _xValue) => { + return unstackedData[seriesIdx][closestIdx] == null ? null : closestIdx; + }; + + options.hooks = options.hooks || {}; + options.hooks.setSeries = options.hooks.setSeries || []; + options.hooks.setSeries.push(u => { + // Restack on toggle. + const bands = getStackedBands(unstackedData, i => !u.series[i].show); + const data = stack(unstackedData, i => !u.series[i].show); + u.delBand(null); // Clear bands. + bands.forEach(b => u.addBand(b)); + u.setData(data); + }); + + return options; +}; + +export const getBarChartOpts = ( + userOptions: Partial, + xAxisDomain: AxisDomain, + yAxisDomain: AxisDomain, + colourPalette = seriesPalette, +): Options => { + const { series, ...providedOpts } = userOptions; + const defaultBars = getBarsBuilder(0.9, 80); + + const opts: Options = { + // Default width and height. + width: 947, + height: 300, + ms: 1, // Interpret timestamps in milliseconds. + legend: { + isolate: true, // Isolate series on click. + live: false, + }, + scales: { + x: { + range: () => [xAxisDomain.extent[0], xAxisDomain.extent[1]], + }, + }, + axes: [ + { + values: (_u, vals) => vals.map(xAxisDomain.tickFormat), + splits: () => xAxisDomain.ticks, + }, + { + values: (_u, vals) => vals.map(yAxisDomain.tickFormat), + splits: () => [ + yAxisDomain.extent[0], + ...yAxisDomain.ticks, + yAxisDomain.extent[1], + ], + scale: "yAxis", + }, + ], + series: [ + { + value: (_u, millis) => xAxisDomain.guideFormat(millis), + }, + ...series.slice(1).map((s, i) => ({ + fill: colourPalette[i % colourPalette.length], + stroke: colourPalette[i % colourPalette.length], + width: 2, + paths: defaultBars, + points: { show: false }, + scale: "yAxis", + ...s, + })), + ], + plugins: [barTooltipPlugin()], + }; + + const combinedOpts = merge(opts, providedOpts); + + // Set y-axis label with units. + combinedOpts.axes[1].label += ` ${yAxisDomain.label}`; + + return combinedOpts; +}; diff --git a/pkg/ui/workspaces/cluster-ui/src/graphs/bargraph/index.tsx b/pkg/ui/workspaces/cluster-ui/src/graphs/bargraph/index.tsx new file mode 100644 index 000000000000..9f3e974e4bea --- /dev/null +++ b/pkg/ui/workspaces/cluster-ui/src/graphs/bargraph/index.tsx @@ -0,0 +1,99 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +import React, { useEffect, useRef } from "react"; +import classNames from "classnames/bind"; +import { getStackedBarOpts, stack } from "./bars"; +import uPlot, { AlignedData } from "uplot"; +import styles from "./bargraph.module.scss"; +import { Visualization } from "../visualization"; +import { + AxisUnits, + calculateXAxisDomainBarChart, + calculateYAxisDomain, +} from "../utils/domain"; +import { Options } from "uplot"; + +const cx = classNames.bind(styles); + +export type BarGraphTimeSeriesProps = { + alignedData?: AlignedData; + colourPalette?: string[]; // Series colour palette. + preCalcGraphSize?: boolean; + title: string; + tooltip?: React.ReactNode; + uPlotOptions: Partial; + yAxisUnits: AxisUnits; +}; + +// Currently this component only supports stacked multi-series bars. +export const BarGraphTimeSeries: React.FC = ({ + alignedData, + colourPalette, + preCalcGraphSize = true, + title, + tooltip, + uPlotOptions, + yAxisUnits, +}) => { + const graphRef = useRef(null); + const samplingIntervalMillis = alignedData[0].length + ? alignedData[0][1] - alignedData[0][0] + : 0; + + useEffect(() => { + if (!alignedData) return; + + const xAxisDomain = calculateXAxisDomainBarChart( + alignedData[0][0], // startMillis + alignedData[0][alignedData[0].length - 1], // endMillis + samplingIntervalMillis, + ); + + const stackedData = stack(alignedData, () => false); + + const allYDomainPoints: number[] = []; + stackedData.slice(1).forEach(points => allYDomainPoints.push(...points)); + const yAxisDomain = calculateYAxisDomain(yAxisUnits, allYDomainPoints); + + const opts = getStackedBarOpts( + alignedData, + uPlotOptions, + xAxisDomain, + yAxisDomain, + colourPalette, + ); + + const plot = new uPlot(opts, stackedData, graphRef.current); + + return () => { + plot?.destroy(); + }; + }, [ + alignedData, + colourPalette, + uPlotOptions, + yAxisUnits, + samplingIntervalMillis, + ]); + + return ( + +
+
+
+ + ); +}; diff --git a/pkg/ui/workspaces/cluster-ui/src/graphs/bargraph/plugins.ts b/pkg/ui/workspaces/cluster-ui/src/graphs/bargraph/plugins.ts new file mode 100644 index 000000000000..72845e4c26b1 --- /dev/null +++ b/pkg/ui/workspaces/cluster-ui/src/graphs/bargraph/plugins.ts @@ -0,0 +1,148 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +import uPlot, { Plugin } from "uplot"; +import { formatTimeStamp } from "../utils/domain"; + +// Fallback color for series stroke if one is not defined. +const DEFAULT_STROKE = "#7e89a9"; + +// Generate a series legend within the provided div showing the data points +// relative to the cursor position. +const generateSeriesLegend = (uPlot: uPlot, seriesLegend: HTMLDivElement) => { + // idx is the closest data index to the cursor position. + const { idx } = uPlot.cursor; + + if (idx === undefined || idx === null) { + return; + } + + // remove all previous child nodes + seriesLegend.innerHTML = ""; + + // Generate new child nodes. + uPlot.series.forEach((series: uPlot.Series, index: number) => { + if (index === 0 || series.show === false) { + // Skip the series for x axis or if series is hidden. + return; + } + + // series.stroke can be either a function that returns a canvas stroke + // value, or a function returning a stroke value. + const strokeColor = + typeof series.stroke === "function" + ? series.stroke(uPlot, idx) + : series.stroke; + + const container = document.createElement("div"); + container.style.display = "flex"; + container.style.alignItems = "center"; + + const colorBox = document.createElement("span"); + colorBox.style.height = "12px"; + colorBox.style.width = "12px"; + colorBox.style.background = String(strokeColor || DEFAULT_STROKE); + colorBox.style.display = "inline-block"; + colorBox.style.marginRight = "12px"; + + const label = document.createElement("span"); + label.textContent = series.label || ""; + + const dataValue = uPlot.data[index][idx]; + const value = document.createElement("div"); + value.style.textAlign = "right"; + value.style.flex = "1"; + value.style.fontFamily = "'Source Sans Pro', sans-serif"; + value.textContent = + series.value instanceof Function && dataValue + ? String(series.value(uPlot, dataValue, index, idx)) + : String(dataValue); + + container.appendChild(colorBox); + container.appendChild(label); + container.appendChild(value); + + seriesLegend.appendChild(container); + }); +}; + +// Tooltip legend plugin for bar charts. +export function barTooltipPlugin(): Plugin { + const cursorToolTip = { + tooltip: document.createElement("div"), + timeStamp: document.createElement("div"), + seriesLegend: document.createElement("div"), + }; + + function setCursor(u: uPlot) { + const { tooltip, timeStamp, seriesLegend } = cursorToolTip; + const { left = 0, top = 0 } = u.cursor; + + // get the current timestamp from the x axis and formatting as + // the Tooltip header. + const closestDataPointTimeMillis = u.data[0][u.posToIdx(left)]; + timeStamp.textContent = formatTimeStamp(closestDataPointTimeMillis); + + // Generating the series legend based on current state of µPlot + generateSeriesLegend(u, seriesLegend); + + // set the position of the Tooltip. Adjusting the tooltip away from the + // cursor for readability. + tooltip.style.left = `${left + 20}px`; + tooltip.style.top = `${top - 10}px`; + + if (tooltip.style.display === "none") { + tooltip.style.display = ""; + } + } + + function ready(u: uPlot) { + const plot = u.root.querySelector(".u-over"); + const { tooltip } = cursorToolTip; + + plot?.addEventListener("mouseleave", () => { + tooltip.style.display = "none"; + }); + } + + function init(u: uPlot) { + const plot = u.root.querySelector(".u-over"); + const { tooltip, timeStamp, seriesLegend } = cursorToolTip; + tooltip.style.display = "none"; + tooltip.style.pointerEvents = "none"; + tooltip.style.position = "absolute"; + tooltip.style.padding = "0 16px 16px"; + tooltip.style.minWidth = "230px"; + tooltip.style.background = "#fff"; + tooltip.style.borderRadius = "5px"; + tooltip.style.boxShadow = "0px 7px 13px rgba(71, 88, 114, 0.3)"; + tooltip.style.zIndex = "100"; + tooltip.style.whiteSpace = "nowrap"; + + // Set timeStamp. + timeStamp.textContent = "time"; + timeStamp.style.paddingTop = "12px"; + timeStamp.style.marginBottom = "16px"; + tooltip.appendChild(timeStamp); + + // appending seriesLegend empty. Content will be generated on mousemove. + tooltip.appendChild(seriesLegend); + + plot?.appendChild(tooltip); + } + + return { + hooks: { + init, + ready, + setCursor, + }, + }; +} diff --git a/pkg/ui/workspaces/cluster-ui/src/graphs/utils/domain.ts b/pkg/ui/workspaces/cluster-ui/src/graphs/utils/domain.ts index 933b7e110c5f..963d09dc8069 100644 --- a/pkg/ui/workspaces/cluster-ui/src/graphs/utils/domain.ts +++ b/pkg/ui/workspaces/cluster-ui/src/graphs/utils/domain.ts @@ -340,3 +340,16 @@ export function calculateXAxisDomain( ): AxisDomain { return ComputeTimeAxisDomain([startMillis, endMillis] as Extent); } + +export function calculateXAxisDomainBarChart( + startMillis: number, + endMillis: number, + samplingIntervalMillis: number, +): AxisDomain { + // For bar charts, we want to render past endMillis to fully render the + // last bar. We should extend the x axis to the next sampling interval. + return ComputeTimeAxisDomain([ + startMillis, + endMillis + samplingIntervalMillis, + ] as Extent); +} From ef7d3acd7e68dcc3345a738c30725dbe0fc8175c Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Wed, 27 Apr 2022 17:18:02 -0700 Subject: [PATCH 2/2] colexec: fix sort chunks with disk spilling in very rare circumstances This commit fixes a long-standing but very rare bug which could result in some rows being dropped when sort chunks ("segmented sort") spills to disk. The root cause is that a deselector operator is placed on top of the input to the sort chunks op (because its "chunker" spooler assumes no selection vector on batches), and that deselector uses the same allocator as the sort chunks. If the allocator's budget is used up, then an error is thrown, and it is caught by the disk-spilling infrastructure that is wrapping this whole `sort chunks -> chunker -> deselector` chain; the error is then suppressed, and spilling to disk occurs. However, crucially, it was always assumed that the error occurred in `chunker`, so only that component knows how to properly perform the fallover. If the error occurs in the deselector, the deselector might end up losing a single input batch. We worked around this by making a fake allocation in the deselector before reading the input batch. However, if the stars align, and the error occurs _after_ reading the input batch in the deselector, that input batch will be lost, and we might get incorrect results. For the bug to occur a couple of conditions need to be met: 1. The "memory budget exceeded" error must occur for the sort chunks operation. It is far more likely that it will occur in the "chunker" because that component can buffer an arbitrarily large number of tuples and because we did make that fake allocation. 2. The input operator to the chain must be producing batches with selection vectors on top - if this is not the case, then the deselector is a noop. An example of such an input is a table reader with a filter on top. The fix is quite simple - use a separate allocator for the deselector that has an unlimited budget. This allows us to still properly track the memory usage of an extra batch created in the deselector without it running into these difficulties with disk spilling. This also makes it so that if a "memory budget exceeded" error does occur in the deselector (which is possible if `--max-sql-memory` has been used up), it will not be caught by the disk-spilling infrastructure and will be propagate to the user - which is the expected and desired behavior in such a scenario. There is no explicit regression test for this since our existing unit tests already exercise this scenario once the fake allocation in the deselector is removed. Release note (bug fix): Previously, in very rare circumstances CockroachDB could incorrectly evaluate queries with ORDER BY clause when the prefix of ordering was already provided by the index ordering of the scanned table. --- pkg/sql/colexec/colbuilder/execplan.go | 12 ++++-- pkg/sql/colexec/colexecutils/deselector.go | 38 +++++++++---------- pkg/sql/colexec/distinct_test.go | 4 +- pkg/sql/colexec/partially_ordered_distinct.go | 3 +- pkg/sql/colexec/sort_chunks.go | 6 ++- pkg/sql/colexec/sort_chunks_test.go | 8 ++-- pkg/sql/colflow/colrpc/outbox.go | 26 ++++++------- 7 files changed, 52 insertions(+), 45 deletions(-) diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index 9cba7221160b..9c38ad7afded 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -379,13 +379,19 @@ func (r opResult) createDiskBackedSort( } else if matchLen > 0 { // The input is already partially ordered. Use a chunks sorter to avoid // loading all the rows into memory. + opName := opNamePrefix + "sort-chunks" + deselectorUnlimitedAllocator := colmem.NewAllocator( + ctx, args.MonitorRegistry.CreateUnlimitedMemAccount( + ctx, flowCtx, opName, processorID, + ), factory, + ) var sortChunksMemAccount *mon.BoundAccount sortChunksMemAccount, sorterMemMonitorName = args.MonitorRegistry.CreateMemAccountForSpillStrategyWithLimit( - ctx, flowCtx, spoolMemLimit, opNamePrefix+"sort-chunks", processorID, + ctx, flowCtx, spoolMemLimit, opName, processorID, ) inMemorySorter = colexec.NewSortChunks( - colmem.NewAllocator(ctx, sortChunksMemAccount, factory), input, inputTypes, - ordering.Columns, int(matchLen), maxOutputBatchMemSize, + deselectorUnlimitedAllocator, colmem.NewAllocator(ctx, sortChunksMemAccount, factory), + input, inputTypes, ordering.Columns, int(matchLen), maxOutputBatchMemSize, ) } else { // No optimizations possible. Default to the standard sort operator. diff --git a/pkg/sql/colexec/colexecutils/deselector.go b/pkg/sql/colexec/colexecutils/deselector.go index 462f45417028..9d3e87e3ef41 100644 --- a/pkg/sql/colexec/colexecutils/deselector.go +++ b/pkg/sql/colexec/colexecutils/deselector.go @@ -26,8 +26,8 @@ import ( type deselectorOp struct { colexecop.OneInputHelper colexecop.NonExplainable - allocator *colmem.Allocator - inputTypes []*types.T + unlimitedAllocator *colmem.Allocator + inputTypes []*types.T output coldata.Batch } @@ -36,39 +36,35 @@ var _ colexecop.Operator = &deselectorOp{} // NewDeselectorOp creates a new deselector operator on the given input // operator with the given column types. +// +// The provided allocator must be derived from an unlimited memory monitor since +// the deselectorOp cannot spill to disk and a "memory budget exceeded" error +// might be caught by the higher-level diskSpiller which would result in losing +// some query results. func NewDeselectorOp( - allocator *colmem.Allocator, input colexecop.Operator, typs []*types.T, + unlimitedAllocator *colmem.Allocator, input colexecop.Operator, typs []*types.T, ) colexecop.Operator { return &deselectorOp{ - OneInputHelper: colexecop.MakeOneInputHelper(input), - allocator: allocator, - inputTypes: typs, + OneInputHelper: colexecop.MakeOneInputHelper(input), + unlimitedAllocator: unlimitedAllocator, + inputTypes: typs, } } func (p *deselectorOp) Next() coldata.Batch { - // deselectorOp should *not* limit the capacities of the returned batches, - // so we don't use a memory limit here. It is up to the wrapped operator to - // limit the size of batches based on the memory footprint. - const maxBatchMemSize = math.MaxInt64 - // TODO(yuzefovich): this allocation is only needed in order to appease the - // tests of the external sorter with forced disk spilling (if we don't do - // this, an OOM error occurs during ResetMaybeReallocate call below at - // which point we have already received a batch from the input and it'll - // get lost because deselectorOp doesn't support fall-over to the - // disk-backed infrastructure). - p.output, _ = p.allocator.ResetMaybeReallocate( - p.inputTypes, p.output, 1 /* minCapacity */, maxBatchMemSize, - ) batch := p.Input.Next() if batch.Selection() == nil || batch.Length() == 0 { return batch } - p.output, _ = p.allocator.ResetMaybeReallocate( + // deselectorOp should *not* limit the capacities of the returned batches, + // so we don't use a memory limit here. It is up to the wrapped operator to + // limit the size of batches based on the memory footprint. + const maxBatchMemSize = math.MaxInt64 + p.output, _ = p.unlimitedAllocator.ResetMaybeReallocate( p.inputTypes, p.output, batch.Length(), maxBatchMemSize, ) sel := batch.Selection() - p.allocator.PerformOperation(p.output.ColVecs(), func() { + p.unlimitedAllocator.PerformOperation(p.output.ColVecs(), func() { for i := range p.inputTypes { toCol := p.output.ColVec(i) fromCol := batch.ColVec(i) diff --git a/pkg/sql/colexec/distinct_test.go b/pkg/sql/colexec/distinct_test.go index 147cdf91ac7a..35020d11c655 100644 --- a/pkg/sql/colexec/distinct_test.go +++ b/pkg/sql/colexec/distinct_test.go @@ -405,7 +405,7 @@ func TestDistinct(t *testing.T) { } tc.runTests(t, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) { return newPartiallyOrderedDistinct( - testAllocator, input[0], tc.distinctCols, orderedCols, tc.typs, tc.nullsAreDistinct, tc.errorOnDup, + testAllocator, testAllocator, input[0], tc.distinctCols, orderedCols, tc.typs, tc.nullsAreDistinct, tc.errorOnDup, ) }) } @@ -630,7 +630,7 @@ func BenchmarkDistinct(b *testing.B) { return NewUnorderedDistinct(allocator, input, distinctCols, typs, false /* nullsAreDistinct */, "" /* errorOnDup */), nil }, func(allocator *colmem.Allocator, input colexecop.Operator, distinctCols []uint32, numOrderedCols int, typs []*types.T) (colexecop.Operator, error) { - return newPartiallyOrderedDistinct(allocator, input, distinctCols, distinctCols[:numOrderedCols], typs, false /* nullsAreDistinct */, "" /* errorOnDup */) + return newPartiallyOrderedDistinct(allocator, allocator, input, distinctCols, distinctCols[:numOrderedCols], typs, false /* nullsAreDistinct */, "" /* errorOnDup */) }, func(allocator *colmem.Allocator, input colexecop.Operator, distinctCols []uint32, numOrderedCols int, typs []*types.T) (colexecop.Operator, error) { return colexecbase.NewOrderedDistinct(input, distinctCols, typs, false /* nullsAreDistinct */, "" /* errorOnDup */), nil diff --git a/pkg/sql/colexec/partially_ordered_distinct.go b/pkg/sql/colexec/partially_ordered_distinct.go index 2ef384c696fa..3f68c9d01e45 100644 --- a/pkg/sql/colexec/partially_ordered_distinct.go +++ b/pkg/sql/colexec/partially_ordered_distinct.go @@ -26,6 +26,7 @@ import ( // distinct columns when we have partial ordering on some of the distinct // columns. func newPartiallyOrderedDistinct( + unlimitedAllocator *colmem.Allocator, allocator *colmem.Allocator, input colexecop.Operator, distinctCols []uint32, @@ -39,7 +40,7 @@ func newPartiallyOrderedDistinct( "partially ordered distinct wrongfully planned: numDistinctCols=%d "+ "numOrderedCols=%d", len(distinctCols), len(orderedCols)) } - chunker := newChunker(allocator, input, typs, orderedCols, nullsAreDistinct) + chunker := newChunker(unlimitedAllocator, allocator, input, typs, orderedCols, nullsAreDistinct) chunkerOperator := newChunkerOperator(allocator, chunker, typs) // distinctUnorderedCols will contain distinct columns that are not present // among orderedCols. The unordered distinct operator will use these columns diff --git a/pkg/sql/colexec/sort_chunks.go b/pkg/sql/colexec/sort_chunks.go index ad9c4742f5b8..9a785929f670 100644 --- a/pkg/sql/colexec/sort_chunks.go +++ b/pkg/sql/colexec/sort_chunks.go @@ -29,6 +29,7 @@ import ( // the columns in the input operator. The input tuples must be sorted on first // matchLen columns. func NewSortChunks( + unlimitedAllocator *colmem.Allocator, allocator *colmem.Allocator, input colexecop.Operator, inputTypes []*types.T, @@ -46,7 +47,7 @@ func NewSortChunks( for i := range alreadySortedCols { alreadySortedCols[i] = orderingCols[i].ColIdx } - chunker := newChunker(allocator, input, inputTypes, alreadySortedCols, false /* nullsAreDistinct */) + chunker := newChunker(unlimitedAllocator, allocator, input, inputTypes, alreadySortedCols, false /* nullsAreDistinct */) sorter := newSorter(allocator, chunker, inputTypes, orderingCols[matchLen:], maxOutputBatchMemSize) return &sortChunksOp{allocator: allocator, input: chunker, sorter: sorter} } @@ -256,6 +257,7 @@ type chunker struct { var _ spooler = &chunker{} func newChunker( + unlimitedAllocator *colmem.Allocator, allocator *colmem.Allocator, input colexecop.Operator, inputTypes []*types.T, @@ -266,7 +268,7 @@ func newChunker( for i, col := range alreadySortedCols { partitioners[i] = newPartitioner(inputTypes[col], nullsAreDistinct) } - deselector := colexecutils.NewDeselectorOp(allocator, input, inputTypes) + deselector := colexecutils.NewDeselectorOp(unlimitedAllocator, input, inputTypes) return &chunker{ OneInputNode: colexecop.NewOneInputNode(deselector), allocator: allocator, diff --git a/pkg/sql/colexec/sort_chunks_test.go b/pkg/sql/colexec/sort_chunks_test.go index 2702e32c4b3d..2428dce424ce 100644 --- a/pkg/sql/colexec/sort_chunks_test.go +++ b/pkg/sql/colexec/sort_chunks_test.go @@ -189,7 +189,7 @@ func TestSortChunks(t *testing.T) { for _, tc := range sortChunksTestCases { colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{tc.tuples}, tc.expected, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) { - return NewSortChunks(testAllocator, input[0], tc.typs, tc.ordCols, tc.matchLen, execinfra.DefaultMemoryLimit), nil + return NewSortChunks(testAllocator, testAllocator, input[0], tc.typs, tc.ordCols, tc.matchLen, execinfra.DefaultMemoryLimit), nil }) } } @@ -231,7 +231,7 @@ func TestSortChunksRandomized(t *testing.T) { sort.Slice(expected, less(expected, ordCols)) colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{sortedTups}, expected, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) { - return NewSortChunks(testAllocator, input[0], typs[:nCols], ordCols, matchLen, execinfra.DefaultMemoryLimit), nil + return NewSortChunks(testAllocator, testAllocator, input[0], typs[:nCols], ordCols, matchLen, execinfra.DefaultMemoryLimit), nil }) } } @@ -244,7 +244,9 @@ func BenchmarkSortChunks(b *testing.B) { ctx := context.Background() sorterConstructors := []func(*colmem.Allocator, colexecop.Operator, []*types.T, []execinfrapb.Ordering_Column, int, int64) colexecop.Operator{ - NewSortChunks, + func(allocator *colmem.Allocator, input colexecop.Operator, inputTypes []*types.T, orderingCols []execinfrapb.Ordering_Column, matchLen int, maxOutputBatchMemSize int64) colexecop.Operator { + return NewSortChunks(allocator, allocator, input, inputTypes, orderingCols, matchLen, maxOutputBatchMemSize) + }, func(allocator *colmem.Allocator, input colexecop.Operator, inputTypes []*types.T, orderingCols []execinfrapb.Ordering_Column, _ int, maxOutputBatchMemSize int64) colexecop.Operator { return NewSorter(allocator, input, inputTypes, orderingCols, maxOutputBatchMemSize) }, diff --git a/pkg/sql/colflow/colrpc/outbox.go b/pkg/sql/colflow/colrpc/outbox.go index 8e84733536d4..3d5343270c53 100644 --- a/pkg/sql/colflow/colrpc/outbox.go +++ b/pkg/sql/colflow/colrpc/outbox.go @@ -55,9 +55,9 @@ type Outbox struct { typs []*types.T - allocator *colmem.Allocator - converter *colserde.ArrowBatchConverter - serializer *colserde.RecordBatchSerializer + unlimitedAllocator *colmem.Allocator + converter *colserde.ArrowBatchConverter + serializer *colserde.RecordBatchSerializer // draining is an atomic that represents whether the Outbox is draining. draining uint32 @@ -83,7 +83,7 @@ type Outbox struct { // - getStats, when non-nil, returns all of the execution statistics of the // operators that are in the same tree as this Outbox. func NewOutbox( - allocator *colmem.Allocator, + unlimitedAllocator *colmem.Allocator, input colexecargs.OpWithMetaInfo, typs []*types.T, getStats func() []*execinfrapb.ComponentStats, @@ -99,13 +99,13 @@ func NewOutbox( o := &Outbox{ // Add a deselector as selection vectors are not serialized (nor should they // be). - OneInputNode: colexecop.NewOneInputNode(colexecutils.NewDeselectorOp(allocator, input.Root, typs)), - inputMetaInfo: input, - typs: typs, - allocator: allocator, - converter: c, - serializer: s, - getStats: getStats, + OneInputNode: colexecop.NewOneInputNode(colexecutils.NewDeselectorOp(unlimitedAllocator, input.Root, typs)), + inputMetaInfo: input, + typs: typs, + unlimitedAllocator: unlimitedAllocator, + converter: c, + serializer: s, + getStats: getStats, } o.scratch.buf = &bytes.Buffer{} o.scratch.msg = &execinfrapb.ProducerMessage{} @@ -120,7 +120,7 @@ func (o *Outbox) close(ctx context.Context) { // registered with the allocator (the allocator is shared by the outbox and // the deselector). o.Input = nil - o.allocator.ReleaseMemory(o.allocator.Used()) + o.unlimitedAllocator.ReleaseMemory(o.unlimitedAllocator.Used()) o.inputMetaInfo.ToClose.CloseAndLogOnErr(ctx, "outbox") } @@ -312,7 +312,7 @@ func (o *Outbox) sendBatches( // Note that because we never truncate the buffer, we are only // adjusting the memory usage whenever the buffer's capacity // increases (if it didn't increase, this call becomes a noop). - o.allocator.AdjustMemoryUsage(int64(o.scratch.buf.Cap() - oldBufCap)) + o.unlimitedAllocator.AdjustMemoryUsage(int64(o.scratch.buf.Cap() - oldBufCap)) o.scratch.msg.Data.RawBytes = o.scratch.buf.Bytes() // o.scratch.msg can be reused as soon as Send returns since it returns as