Skip to content

Commit

Permalink
Merge #80269 #80679
Browse files Browse the repository at this point in the history
80269: cluster-ui: create bar graph for time series data r=maryliag a=xinhaoz

Closes #74516

This commit creates a generic bar chart component in
the cluster-ui package. The component is intended for
use with time series data and should be wrapped in
parent components that convert metrics responses from
the server to the expected format.

Release note: None

--------------

Multi-series (stacked) and single series examples:
<img width="963" alt="image" src="https://user-images.githubusercontent.com/20136951/164536905-c906b696-03ea-45ba-a4b0-0cb41403eb20.png">

<img width="974" alt="image" src="https://user-images.githubusercontent.com/20136951/164318812-6a39625b-a8a9-4c93-8d09-c55f7ab37834.png">

https://www.loom.com/share/ae3863084bc24036b9d5bd8040aa78b3

80679: colexec: fix sort chunks with disk spilling in very rare circumstances r=yuzefovich a=yuzefovich

This commit fixes a long-standing but very rare bug which could result
in some rows being dropped when sort chunks ("segmented sort") spills
to disk.

The root cause is that a deselector operator is placed on top of the
input to the sort chunks op (because its "chunker" spooler assumes no
selection vector on batches), and that deselector uses the same
allocator as the sort chunks. If the allocator's budget is used up, then
an error is thrown, and it is caught by the disk-spilling infrastructure
that is wrapping this whole `sort chunks -> chunker -> deselector`
chain; the error is then suppressed, and spilling to disk occurs.
However, crucially, it was always assumed that the error occurred in
`chunker`, so only that component knows how to properly perform the
fallover. If the error occurs in the deselector, the deselector might
end up losing a single input batch.

We worked around this by making a fake allocation in the deselector
before reading the input batch. However, if the stars align, and the
error occurs _after_ reading the input batch in the deselector, that
input batch will be lost, and we might get incorrect results.

For the bug to occur a couple of conditions need to be met:
1. The "memory budget exceeded" error must occur for the sort chunks
operation. It is far more likely that it will occur in the "chunker"
because that component can buffer an arbitrarily large number of tuples
and because we did make that fake allocation.
2. The input operator to the chain must be producing batches with
selection vectors on top - if this is not the case, then the deselector
is a noop. An example of such an input is a table reader with a filter
on top.

The fix is quite simple - use a separate allocator for the deselector
that has an unlimited budget. This allows us to still properly track the
memory usage of an extra batch created in the deselector without it
running into these difficulties with disk spilling. This also makes it
so that if a "memory budget exceeded" error does occur in the deselector
(which is possible if `--max-sql-memory` has been used up), it will not
be caught by the disk-spilling infrastructure and will be propagate to
the user - which is the expected and desired behavior in such
a scenario.

There is no explicit regression test for this since our existing unit
tests already exercise this scenario once the fake allocation in the
deselector is removed.

Fixes: #80645.

Release note (bug fix): Previously, in very rare circumstances
CockroachDB could incorrectly evaluate queries with ORDER BY clause when
the prefix of ordering was already provided by the index ordering of the
scanned table.

Co-authored-by: Xin Hao Zhang <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
3 people committed Apr 28, 2022
3 parents 7c45e41 + dd58d66 + ef7d3ac commit 3831e27
Show file tree
Hide file tree
Showing 14 changed files with 621 additions and 45 deletions.
12 changes: 9 additions & 3 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,13 +379,19 @@ func (r opResult) createDiskBackedSort(
} else if matchLen > 0 {
// The input is already partially ordered. Use a chunks sorter to avoid
// loading all the rows into memory.
opName := opNamePrefix + "sort-chunks"
deselectorUnlimitedAllocator := colmem.NewAllocator(
ctx, args.MonitorRegistry.CreateUnlimitedMemAccount(
ctx, flowCtx, opName, processorID,
), factory,
)
var sortChunksMemAccount *mon.BoundAccount
sortChunksMemAccount, sorterMemMonitorName = args.MonitorRegistry.CreateMemAccountForSpillStrategyWithLimit(
ctx, flowCtx, spoolMemLimit, opNamePrefix+"sort-chunks", processorID,
ctx, flowCtx, spoolMemLimit, opName, processorID,
)
inMemorySorter = colexec.NewSortChunks(
colmem.NewAllocator(ctx, sortChunksMemAccount, factory), input, inputTypes,
ordering.Columns, int(matchLen), maxOutputBatchMemSize,
deselectorUnlimitedAllocator, colmem.NewAllocator(ctx, sortChunksMemAccount, factory),
input, inputTypes, ordering.Columns, int(matchLen), maxOutputBatchMemSize,
)
} else {
// No optimizations possible. Default to the standard sort operator.
Expand Down
38 changes: 17 additions & 21 deletions pkg/sql/colexec/colexecutils/deselector.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
type deselectorOp struct {
colexecop.OneInputHelper
colexecop.NonExplainable
allocator *colmem.Allocator
inputTypes []*types.T
unlimitedAllocator *colmem.Allocator
inputTypes []*types.T

output coldata.Batch
}
Expand All @@ -36,39 +36,35 @@ var _ colexecop.Operator = &deselectorOp{}

// NewDeselectorOp creates a new deselector operator on the given input
// operator with the given column types.
//
// The provided allocator must be derived from an unlimited memory monitor since
// the deselectorOp cannot spill to disk and a "memory budget exceeded" error
// might be caught by the higher-level diskSpiller which would result in losing
// some query results.
func NewDeselectorOp(
allocator *colmem.Allocator, input colexecop.Operator, typs []*types.T,
unlimitedAllocator *colmem.Allocator, input colexecop.Operator, typs []*types.T,
) colexecop.Operator {
return &deselectorOp{
OneInputHelper: colexecop.MakeOneInputHelper(input),
allocator: allocator,
inputTypes: typs,
OneInputHelper: colexecop.MakeOneInputHelper(input),
unlimitedAllocator: unlimitedAllocator,
inputTypes: typs,
}
}

func (p *deselectorOp) Next() coldata.Batch {
// deselectorOp should *not* limit the capacities of the returned batches,
// so we don't use a memory limit here. It is up to the wrapped operator to
// limit the size of batches based on the memory footprint.
const maxBatchMemSize = math.MaxInt64
// TODO(yuzefovich): this allocation is only needed in order to appease the
// tests of the external sorter with forced disk spilling (if we don't do
// this, an OOM error occurs during ResetMaybeReallocate call below at
// which point we have already received a batch from the input and it'll
// get lost because deselectorOp doesn't support fall-over to the
// disk-backed infrastructure).
p.output, _ = p.allocator.ResetMaybeReallocate(
p.inputTypes, p.output, 1 /* minCapacity */, maxBatchMemSize,
)
batch := p.Input.Next()
if batch.Selection() == nil || batch.Length() == 0 {
return batch
}
p.output, _ = p.allocator.ResetMaybeReallocate(
// deselectorOp should *not* limit the capacities of the returned batches,
// so we don't use a memory limit here. It is up to the wrapped operator to
// limit the size of batches based on the memory footprint.
const maxBatchMemSize = math.MaxInt64
p.output, _ = p.unlimitedAllocator.ResetMaybeReallocate(
p.inputTypes, p.output, batch.Length(), maxBatchMemSize,
)
sel := batch.Selection()
p.allocator.PerformOperation(p.output.ColVecs(), func() {
p.unlimitedAllocator.PerformOperation(p.output.ColVecs(), func() {
for i := range p.inputTypes {
toCol := p.output.ColVec(i)
fromCol := batch.ColVec(i)
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/distinct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ func TestDistinct(t *testing.T) {
}
tc.runTests(t, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) {
return newPartiallyOrderedDistinct(
testAllocator, input[0], tc.distinctCols, orderedCols, tc.typs, tc.nullsAreDistinct, tc.errorOnDup,
testAllocator, testAllocator, input[0], tc.distinctCols, orderedCols, tc.typs, tc.nullsAreDistinct, tc.errorOnDup,
)
})
}
Expand Down Expand Up @@ -630,7 +630,7 @@ func BenchmarkDistinct(b *testing.B) {
return NewUnorderedDistinct(allocator, input, distinctCols, typs, false /* nullsAreDistinct */, "" /* errorOnDup */), nil
},
func(allocator *colmem.Allocator, input colexecop.Operator, distinctCols []uint32, numOrderedCols int, typs []*types.T) (colexecop.Operator, error) {
return newPartiallyOrderedDistinct(allocator, input, distinctCols, distinctCols[:numOrderedCols], typs, false /* nullsAreDistinct */, "" /* errorOnDup */)
return newPartiallyOrderedDistinct(allocator, allocator, input, distinctCols, distinctCols[:numOrderedCols], typs, false /* nullsAreDistinct */, "" /* errorOnDup */)
},
func(allocator *colmem.Allocator, input colexecop.Operator, distinctCols []uint32, numOrderedCols int, typs []*types.T) (colexecop.Operator, error) {
return colexecbase.NewOrderedDistinct(input, distinctCols, typs, false /* nullsAreDistinct */, "" /* errorOnDup */), nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/colexec/partially_ordered_distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
// distinct columns when we have partial ordering on some of the distinct
// columns.
func newPartiallyOrderedDistinct(
unlimitedAllocator *colmem.Allocator,
allocator *colmem.Allocator,
input colexecop.Operator,
distinctCols []uint32,
Expand All @@ -39,7 +40,7 @@ func newPartiallyOrderedDistinct(
"partially ordered distinct wrongfully planned: numDistinctCols=%d "+
"numOrderedCols=%d", len(distinctCols), len(orderedCols))
}
chunker := newChunker(allocator, input, typs, orderedCols, nullsAreDistinct)
chunker := newChunker(unlimitedAllocator, allocator, input, typs, orderedCols, nullsAreDistinct)
chunkerOperator := newChunkerOperator(allocator, chunker, typs)
// distinctUnorderedCols will contain distinct columns that are not present
// among orderedCols. The unordered distinct operator will use these columns
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/colexec/sort_chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
// the columns in the input operator. The input tuples must be sorted on first
// matchLen columns.
func NewSortChunks(
unlimitedAllocator *colmem.Allocator,
allocator *colmem.Allocator,
input colexecop.Operator,
inputTypes []*types.T,
Expand All @@ -46,7 +47,7 @@ func NewSortChunks(
for i := range alreadySortedCols {
alreadySortedCols[i] = orderingCols[i].ColIdx
}
chunker := newChunker(allocator, input, inputTypes, alreadySortedCols, false /* nullsAreDistinct */)
chunker := newChunker(unlimitedAllocator, allocator, input, inputTypes, alreadySortedCols, false /* nullsAreDistinct */)
sorter := newSorter(allocator, chunker, inputTypes, orderingCols[matchLen:], maxOutputBatchMemSize)
return &sortChunksOp{allocator: allocator, input: chunker, sorter: sorter}
}
Expand Down Expand Up @@ -256,6 +257,7 @@ type chunker struct {
var _ spooler = &chunker{}

func newChunker(
unlimitedAllocator *colmem.Allocator,
allocator *colmem.Allocator,
input colexecop.Operator,
inputTypes []*types.T,
Expand All @@ -266,7 +268,7 @@ func newChunker(
for i, col := range alreadySortedCols {
partitioners[i] = newPartitioner(inputTypes[col], nullsAreDistinct)
}
deselector := colexecutils.NewDeselectorOp(allocator, input, inputTypes)
deselector := colexecutils.NewDeselectorOp(unlimitedAllocator, input, inputTypes)
return &chunker{
OneInputNode: colexecop.NewOneInputNode(deselector),
allocator: allocator,
Expand Down
8 changes: 5 additions & 3 deletions pkg/sql/colexec/sort_chunks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func TestSortChunks(t *testing.T) {

for _, tc := range sortChunksTestCases {
colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{tc.tuples}, tc.expected, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) {
return NewSortChunks(testAllocator, input[0], tc.typs, tc.ordCols, tc.matchLen, execinfra.DefaultMemoryLimit), nil
return NewSortChunks(testAllocator, testAllocator, input[0], tc.typs, tc.ordCols, tc.matchLen, execinfra.DefaultMemoryLimit), nil
})
}
}
Expand Down Expand Up @@ -231,7 +231,7 @@ func TestSortChunksRandomized(t *testing.T) {
sort.Slice(expected, less(expected, ordCols))

colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{sortedTups}, expected, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) {
return NewSortChunks(testAllocator, input[0], typs[:nCols], ordCols, matchLen, execinfra.DefaultMemoryLimit), nil
return NewSortChunks(testAllocator, testAllocator, input[0], typs[:nCols], ordCols, matchLen, execinfra.DefaultMemoryLimit), nil
})
}
}
Expand All @@ -244,7 +244,9 @@ func BenchmarkSortChunks(b *testing.B) {
ctx := context.Background()

sorterConstructors := []func(*colmem.Allocator, colexecop.Operator, []*types.T, []execinfrapb.Ordering_Column, int, int64) colexecop.Operator{
NewSortChunks,
func(allocator *colmem.Allocator, input colexecop.Operator, inputTypes []*types.T, orderingCols []execinfrapb.Ordering_Column, matchLen int, maxOutputBatchMemSize int64) colexecop.Operator {
return NewSortChunks(allocator, allocator, input, inputTypes, orderingCols, matchLen, maxOutputBatchMemSize)
},
func(allocator *colmem.Allocator, input colexecop.Operator, inputTypes []*types.T, orderingCols []execinfrapb.Ordering_Column, _ int, maxOutputBatchMemSize int64) colexecop.Operator {
return NewSorter(allocator, input, inputTypes, orderingCols, maxOutputBatchMemSize)
},
Expand Down
26 changes: 13 additions & 13 deletions pkg/sql/colflow/colrpc/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ type Outbox struct {

typs []*types.T

allocator *colmem.Allocator
converter *colserde.ArrowBatchConverter
serializer *colserde.RecordBatchSerializer
unlimitedAllocator *colmem.Allocator
converter *colserde.ArrowBatchConverter
serializer *colserde.RecordBatchSerializer

// draining is an atomic that represents whether the Outbox is draining.
draining uint32
Expand All @@ -83,7 +83,7 @@ type Outbox struct {
// - getStats, when non-nil, returns all of the execution statistics of the
// operators that are in the same tree as this Outbox.
func NewOutbox(
allocator *colmem.Allocator,
unlimitedAllocator *colmem.Allocator,
input colexecargs.OpWithMetaInfo,
typs []*types.T,
getStats func() []*execinfrapb.ComponentStats,
Expand All @@ -99,13 +99,13 @@ func NewOutbox(
o := &Outbox{
// Add a deselector as selection vectors are not serialized (nor should they
// be).
OneInputNode: colexecop.NewOneInputNode(colexecutils.NewDeselectorOp(allocator, input.Root, typs)),
inputMetaInfo: input,
typs: typs,
allocator: allocator,
converter: c,
serializer: s,
getStats: getStats,
OneInputNode: colexecop.NewOneInputNode(colexecutils.NewDeselectorOp(unlimitedAllocator, input.Root, typs)),
inputMetaInfo: input,
typs: typs,
unlimitedAllocator: unlimitedAllocator,
converter: c,
serializer: s,
getStats: getStats,
}
o.scratch.buf = &bytes.Buffer{}
o.scratch.msg = &execinfrapb.ProducerMessage{}
Expand All @@ -120,7 +120,7 @@ func (o *Outbox) close(ctx context.Context) {
// registered with the allocator (the allocator is shared by the outbox and
// the deselector).
o.Input = nil
o.allocator.ReleaseMemory(o.allocator.Used())
o.unlimitedAllocator.ReleaseMemory(o.unlimitedAllocator.Used())
o.inputMetaInfo.ToClose.CloseAndLogOnErr(ctx, "outbox")
}

Expand Down Expand Up @@ -312,7 +312,7 @@ func (o *Outbox) sendBatches(
// Note that because we never truncate the buffer, we are only
// adjusting the memory usage whenever the buffer's capacity
// increases (if it didn't increase, this call becomes a noop).
o.allocator.AdjustMemoryUsage(int64(o.scratch.buf.Cap() - oldBufCap))
o.unlimitedAllocator.AdjustMemoryUsage(int64(o.scratch.buf.Cap() - oldBufCap))
o.scratch.msg.Data.RawBytes = o.scratch.buf.Bytes()

// o.scratch.msg can be reused as soon as Send returns since it returns as
Expand Down
2 changes: 2 additions & 0 deletions pkg/ui/workspaces/cluster-ui/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ DEPENDENCIES = [
"@npm//source-map-loader",
"@npm//style-loader",
"@npm//ts-jest",
"@npm//uplot",
"@npm//url-loader",
"@npm//webpack",
"@npm//webpack-cli",
Expand Down Expand Up @@ -147,6 +148,7 @@ ts_project(
"@npm//redux-saga",
"@npm//redux-saga-test-plan",
"@npm//reselect",
"@npm//uplot",
],
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

import React from "react";
import { storiesOf } from "@storybook/react";
import { AlignedData, Options } from "uplot";
import { BarGraphTimeSeries } from "./index";
import { AxisUnits } from "../utils/domain";
import { getBarsBuilder } from "./bars";

function generateTimestampsMillis(start: number, length: number): number[] {
return [...Array(length)].map(
(_, idx): number => (60 * 60 * idx + start) * 1000,
);
}

function genValuesInRange(range: [number, number], length: number): number[] {
return [...Array(length)].map((): number =>
Math.random() > 0.1 ? Math.random() * (range[1] - range[0]) + range[0] : 0,
);
}

const mockData: AlignedData = [
generateTimestampsMillis(1546300800, 20),
genValuesInRange([0, 100], 20),
genValuesInRange([0, 100], 20),
genValuesInRange([0, 100], 20),
];

const mockOpts: Partial<Options> = {
axes: [{}, { label: "values" }],
series: [
{},
{
label: "bar 1",
},
{
label: "bar 2",
},
{
label: "bar 3",
},
],
};

storiesOf("BarGraphTimeSeries", module)
.add("with stacked multi-series", () => {
return (
<BarGraphTimeSeries
title="Example Stacked - Count"
alignedData={mockData}
uPlotOptions={mockOpts}
tooltip={
<div>This is an example stacked bar graph axis unit = count.</div>
}
yAxisUnits={AxisUnits.Count}
/>
);
})
.add("with single series", () => {
const data: AlignedData = [
generateTimestampsMillis(1546300800, 50),
genValuesInRange([0, 1], 50),
];
const opts = {
series: [{}, { label: "bar", paths: getBarsBuilder(0.8, 20) }],
legend: { show: false },
axes: [{}, { label: "mock" }],
};
return (
<BarGraphTimeSeries
title="Example Single Series - Percent"
alignedData={data}
uPlotOptions={opts}
tooltip={
<div>This is an example bar graph with axis unit = percent.</div>
}
yAxisUnits={AxisUnits.Percentage}
/>
);
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
:global {
@import "uplot/dist/uPlot.min";
}

.bargraph {
height: 100%;
:global(.uplot) {
display: flex;
flex-direction: column;
:global(.u-legend) {
text-align: left;
font-size: 10px;
margin-top: 20px;
z-index: 100;
width: fit-content;
padding: 10px;
}
}
}
Loading

0 comments on commit 3831e27

Please sign in to comment.