From 78bba21420d2aae29dc7c4c9263f33f27717ef29 Mon Sep 17 00:00:00 2001 From: Marylia Gutierrez Date: Mon, 30 Aug 2021 10:26:14 -0400 Subject: [PATCH 1/3] ui: fix tooltip text on stmt and txn pages Previously, all reset SQL stats tooltips were showing the wrong setting and always mentioning Statement history, instead of Transaction history. Fixes #68462 Release justification: Category 2 Release note (ui change): Fix tooltip text on Statement and Transaction pages, to use correct setting "diagnostics.sql_stat_reset.interval", instead of the previous value "diagnostics.reporting.interval" --- .../src/statementsPage/statementsPage.tsx | 1 + .../src/tableStatistics/tableStatistics.tsx | 29 +++++++++++++++---- .../transactionDetails/transactionDetails.tsx | 1 + .../src/transactionsPage/transactionsPage.tsx | 1 + 4 files changed, 27 insertions(+), 5 deletions(-) diff --git a/pkg/ui/workspaces/cluster-ui/src/statementsPage/statementsPage.tsx b/pkg/ui/workspaces/cluster-ui/src/statementsPage/statementsPage.tsx index 5dc924c64ccf..8b432b2e8c9a 100644 --- a/pkg/ui/workspaces/cluster-ui/src/statementsPage/statementsPage.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/statementsPage/statementsPage.tsx @@ -547,6 +547,7 @@ export class StatementsPage extends React.Component< search={search} totalCount={totalCount} arrayItemName="statements" + tooltipType="statement" activeFilters={activeFilters} onClearFilters={this.onClearFilters} resetSQLStats={resetSQLStats} diff --git a/pkg/ui/workspaces/cluster-ui/src/tableStatistics/tableStatistics.tsx b/pkg/ui/workspaces/cluster-ui/src/tableStatistics/tableStatistics.tsx index f3178300a6c9..8e411bb2c73c 100644 --- a/pkg/ui/workspaces/cluster-ui/src/tableStatistics/tableStatistics.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/tableStatistics/tableStatistics.tsx @@ -16,13 +16,15 @@ import { ISortedTablePagination } from "../sortedtable"; import { Button } from "src/button"; import { ResultsPerPageLabel } from "src/pagination"; import { Tooltip } from "@cockroachlabs/ui-components"; -import statementStyles from "src/statementDetails/statementDetails.module.scss"; import tableStatsStyles from "./tableStatistics.module.scss"; import classNames from "classnames/bind"; import { Icon } from "@cockroachlabs/ui-components"; +import { + contentModifiers, + StatisticType, +} from "../statsTableUtil/statsTableUtil"; const { statistic, countTitle, lastCleared } = statisticsClasses; -const cxStmt = classNames.bind(statementStyles); const cxStats = classNames.bind(tableStatsStyles); interface TableStatistics { @@ -30,15 +32,13 @@ interface TableStatistics { totalCount: number; lastReset: Date | string; arrayItemName: string; + tooltipType: StatisticType; activeFilters: number; search?: string; onClearFilters?: () => void; resetSQLStats: () => void; } -const toolTipText = `Statement history is cleared once an hour by default, which can be configured with the cluster setting - diagnostics.reporting.interval. Clicking ‘Clear SQL stats’ will reset SQL stats on the statements and transactions pages.`; - const renderLastCleared = (lastReset: string | Date) => { return `Last cleared ${moment.utc(lastReset).format(DATE_FORMAT)}`; }; @@ -49,6 +49,7 @@ export const TableStatistics: React.FC = ({ lastReset, search, arrayItemName, + tooltipType, onClearFilters, activeFilters, resetSQLStats, @@ -71,6 +72,24 @@ export const TableStatistics: React.FC = ({ ); + let toolTipText = ` history is cleared once an hour by default, which can be configured with + the cluster setting diagnostics.sql_stat_reset.interval. Clicking ‘Clear SQL stats’ will reset SQL stats + on the statements and transactions pages.`; + + switch (tooltipType) { + case "transaction": + toolTipText = contentModifiers.transactionCapital + toolTipText; + break; + case "statement": + toolTipText = contentModifiers.statementCapital + toolTipText; + break; + case "transactionDetails": + toolTipText = contentModifiers.statementCapital + toolTipText; + break; + default: + break; + } + return (

diff --git a/pkg/ui/workspaces/cluster-ui/src/transactionDetails/transactionDetails.tsx b/pkg/ui/workspaces/cluster-ui/src/transactionDetails/transactionDetails.tsx index 744025243c78..f674f6de5661 100644 --- a/pkg/ui/workspaces/cluster-ui/src/transactionDetails/transactionDetails.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/transactionDetails/transactionDetails.tsx @@ -248,6 +248,7 @@ export class TransactionDetails extends React.Component< arrayItemName={ "statement fingerprints for this transaction" } + tooltipType="transactionDetails" activeFilters={0} resetSQLStats={resetSQLStats} /> diff --git a/pkg/ui/workspaces/cluster-ui/src/transactionsPage/transactionsPage.tsx b/pkg/ui/workspaces/cluster-ui/src/transactionsPage/transactionsPage.tsx index 9bfb6c96ff5f..24fb424e2335 100644 --- a/pkg/ui/workspaces/cluster-ui/src/transactionsPage/transactionsPage.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/transactionsPage/transactionsPage.tsx @@ -356,6 +356,7 @@ export class TransactionsPage extends React.Component< search={search} totalCount={transactionsToDisplay.length} arrayItemName="transactions" + tooltipType="transaction" activeFilters={activeFilters} onClearFilters={this.onClearFilters} resetSQLStats={resetSQLStats} From 583efd3cd9c7198bac935bb81a7a598e5849a269 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Fri, 27 Aug 2021 15:12:46 -0400 Subject: [PATCH 2/3] sql/catalog/seqexpr: move util/sequence This package is about looking inside expressions to find usages of sequence functions for the purpose of dependency tracking. It definitely does not belong in `util`. It can't go in `schemaexpr` where it would naturally belong because it depends on builtins and builtins depend, indirectly, on schemaexpr. Release justification: non-production code change Release note: None --- pkg/BUILD.bazel | 2 +- pkg/sql/BUILD.bazel | 2 +- pkg/{util/sequence => sql/catalog/seqexpr}/BUILD.bazel | 8 ++++---- pkg/{util/sequence => sql/catalog/seqexpr}/sequence.go | 7 ++++++- .../sequence => sql/catalog/seqexpr}/sequence_test.go | 2 +- pkg/sql/create_view.go | 6 +++--- pkg/sql/opt/optbuilder/BUILD.bazel | 2 +- pkg/sql/opt/optbuilder/scalar.go | 4 ++-- pkg/sql/rename_database.go | 4 ++-- pkg/sql/row/BUILD.bazel | 2 +- pkg/sql/row/expr_walker.go | 4 ++-- pkg/sql/schemachanger/scbuild/BUILD.bazel | 2 +- pkg/sql/schemachanger/scbuild/table.go | 6 +++--- pkg/sql/schemachanger/scexec/scmutationexec/BUILD.bazel | 2 +- .../schemachanger/scexec/scmutationexec/scmutationexec.go | 4 ++-- pkg/sql/sequence.go | 8 ++++---- 16 files changed, 35 insertions(+), 30 deletions(-) rename pkg/{util/sequence => sql/catalog/seqexpr}/BUILD.bazel (77%) rename pkg/{util/sequence => sql/catalog/seqexpr}/sequence.go (95%) rename pkg/{util/sequence => sql/catalog/seqexpr}/sequence_test.go (99%) diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index c0dd3c990a7a..01de73e3379a 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -199,6 +199,7 @@ ALL_TESTS = [ "//pkg/sql/catalog/resolver:resolver_test", "//pkg/sql/catalog/schemadesc:schemadesc_test", "//pkg/sql/catalog/schemaexpr:schemaexpr_test", + "//pkg/sql/catalog/seqexpr:seqexpr_test", "//pkg/sql/catalog/systemschema:systemschema_test", "//pkg/sql/catalog/tabledesc:tabledesc_test", "//pkg/sql/catalog/typedesc:typedesc_test", @@ -379,7 +380,6 @@ ALL_TESTS = [ "//pkg/util/ring:ring_test", "//pkg/util/sdnotify:sdnotify_test", "//pkg/util/search:search_test", - "//pkg/util/sequence:sequence_test", "//pkg/util/shuffle:shuffle_test", "//pkg/util/span:span_test", "//pkg/util/stop:stop_test", diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 5f7dbd41235d..400adee2b1a9 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -294,6 +294,7 @@ go_library( "//pkg/sql/catalog/resolver", "//pkg/sql/catalog/schemadesc", "//pkg/sql/catalog/schemaexpr", + "//pkg/sql/catalog/seqexpr", "//pkg/sql/catalog/systemschema", "//pkg/sql/catalog/tabledesc", "//pkg/sql/catalog/typedesc", @@ -397,7 +398,6 @@ go_library( "//pkg/util/quotapool", "//pkg/util/retry", "//pkg/util/ring", - "//pkg/util/sequence", "//pkg/util/stop", "//pkg/util/syncutil", "//pkg/util/timeutil", diff --git a/pkg/util/sequence/BUILD.bazel b/pkg/sql/catalog/seqexpr/BUILD.bazel similarity index 77% rename from pkg/util/sequence/BUILD.bazel rename to pkg/sql/catalog/seqexpr/BUILD.bazel index a47e32c1e83c..fc031404f110 100644 --- a/pkg/util/sequence/BUILD.bazel +++ b/pkg/sql/catalog/seqexpr/BUILD.bazel @@ -1,9 +1,9 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( - name = "sequence", + name = "seqexpr", srcs = ["sequence.go"], - importpath = "github.com/cockroachdb/cockroach/pkg/util/sequence", + importpath = "github.com/cockroachdb/cockroach/pkg/sql/catalog/seqexpr", visibility = ["//visibility:public"], deps = [ "//pkg/sql/pgwire/pgcode", @@ -16,9 +16,9 @@ go_library( ) go_test( - name = "sequence_test", + name = "seqexpr_test", srcs = ["sequence_test.go"], - embed = [":sequence"], + embed = [":seqexpr"], deps = [ "//pkg/sql/parser", "//pkg/sql/sem/tree", diff --git a/pkg/util/sequence/sequence.go b/pkg/sql/catalog/seqexpr/sequence.go similarity index 95% rename from pkg/util/sequence/sequence.go rename to pkg/sql/catalog/seqexpr/sequence.go index 6ce6ba57300a..61efde264841 100644 --- a/pkg/util/sequence/sequence.go +++ b/pkg/sql/catalog/seqexpr/sequence.go @@ -8,7 +8,12 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package sequence +// Package seqexpr provides functionality to find usages of sequences in +// expressions. +// +// The logic here would fit nicely into schemaexpr if it weren't for the +// dependency on builtins, which itself depends on schemaexpr. +package seqexpr import ( "go/constant" diff --git a/pkg/util/sequence/sequence_test.go b/pkg/sql/catalog/seqexpr/sequence_test.go similarity index 99% rename from pkg/util/sequence/sequence_test.go rename to pkg/sql/catalog/seqexpr/sequence_test.go index fb5cb2dc2a5a..9448b381165a 100644 --- a/pkg/util/sequence/sequence_test.go +++ b/pkg/sql/catalog/seqexpr/sequence_test.go @@ -8,7 +8,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package sequence +package seqexpr import ( "context" diff --git a/pkg/sql/create_view.go b/pkg/sql/create_view.go index c54d6c582819..e70478c4d8c2 100644 --- a/pkg/sql/create_view.go +++ b/pkg/sql/create_view.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/seqexpr" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" @@ -38,7 +39,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" - "github.com/cockroachdb/cockroach/pkg/util/sequence" "github.com/cockroachdb/errors" ) @@ -412,7 +412,7 @@ func replaceSeqNamesWithIDs( ctx context.Context, sc resolver.SchemaResolver, viewQuery string, ) (string, error) { replaceSeqFunc := func(expr tree.Expr) (recurse bool, newExpr tree.Expr, err error) { - seqIdentifiers, err := sequence.GetUsedSequences(expr) + seqIdentifiers, err := seqexpr.GetUsedSequences(expr) if err != nil { return false, expr, err } @@ -424,7 +424,7 @@ func replaceSeqNamesWithIDs( } seqNameToID[seqIdentifier.SeqName] = int64(seqDesc.ID) } - newExpr, err = sequence.ReplaceSequenceNamesWithIDs(expr, seqNameToID) + newExpr, err = seqexpr.ReplaceSequenceNamesWithIDs(expr, seqNameToID) if err != nil { return false, expr, err } diff --git a/pkg/sql/opt/optbuilder/BUILD.bazel b/pkg/sql/opt/optbuilder/BUILD.bazel index 950acda7f2a2..cca0c8a9f15c 100644 --- a/pkg/sql/opt/optbuilder/BUILD.bazel +++ b/pkg/sql/opt/optbuilder/BUILD.bazel @@ -51,6 +51,7 @@ go_library( "//pkg/sql/catalog/colinfo", "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/schemaexpr", + "//pkg/sql/catalog/seqexpr", "//pkg/sql/catalog/typedesc", "//pkg/sql/delegate", "//pkg/sql/lex", @@ -75,7 +76,6 @@ go_library( "//pkg/util/errorutil", "//pkg/util/errorutil/unimplemented", "//pkg/util/log", - "//pkg/util/sequence", "@com_github_cockroachdb_errors//:errors", "@com_github_lib_pq//oid", ], diff --git a/pkg/sql/opt/optbuilder/scalar.go b/pkg/sql/opt/optbuilder/scalar.go index ec43fdcf6018..67c7a0b7a5e9 100644 --- a/pkg/sql/opt/optbuilder/scalar.go +++ b/pkg/sql/opt/optbuilder/scalar.go @@ -15,6 +15,7 @@ import ( "fmt" "github.com/cockroachdb/cockroach/pkg/server/telemetry" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/seqexpr" "github.com/cockroachdb/cockroach/pkg/sql/lex" "github.com/cockroachdb/cockroach/pkg/sql/opt" "github.com/cockroachdb/cockroach/pkg/sql/opt/cat" @@ -30,7 +31,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/errorutil" "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/sequence" "github.com/cockroachdb/errors" ) @@ -521,7 +521,7 @@ func (b *Builder) buildFunction( // Add a dependency on sequences that are used as a string argument. if b.trackViewDeps { - seqIdentifier, err := sequence.GetSequenceFromFunc(f) + seqIdentifier, err := seqexpr.GetSequenceFromFunc(f) if err != nil { panic(err) } diff --git a/pkg/sql/rename_database.go b/pkg/sql/rename_database.go index 5857500ac989..0dc9eae86909 100644 --- a/pkg/sql/rename_database.go +++ b/pkg/sql/rename_database.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv" "github.com/cockroachdb/cockroach/pkg/sql/catalog/dbdesc" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/seqexpr" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" @@ -28,7 +29,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" - "github.com/cockroachdb/cockroach/pkg/util/sequence" "github.com/cockroachdb/errors" ) @@ -298,7 +298,7 @@ func isAllowedDependentDescInRenameDatabase( if err != nil { return false, "", err } - seqIdentifiers, err := sequence.GetUsedSequences(typedExpr) + seqIdentifiers, err := seqexpr.GetUsedSequences(typedExpr) if err != nil { return false, "", err } diff --git a/pkg/sql/row/BUILD.bazel b/pkg/sql/row/BUILD.bazel index ab82934f3348..f36c3e526eab 100644 --- a/pkg/sql/row/BUILD.bazel +++ b/pkg/sql/row/BUILD.bazel @@ -36,6 +36,7 @@ go_library( "//pkg/sql/catalog/colinfo", "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/schemaexpr", + "//pkg/sql/catalog/seqexpr", "//pkg/sql/execinfrapb", "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", @@ -58,7 +59,6 @@ go_library( "//pkg/util/log/eventpb", "//pkg/util/metric", "//pkg/util/mon", - "//pkg/util/sequence", "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/unique", diff --git a/pkg/sql/row/expr_walker.go b/pkg/sql/row/expr_walker.go index 9a81464e119e..0e1729c8cbaf 100644 --- a/pkg/sql/row/expr_walker.go +++ b/pkg/sql/row/expr_walker.go @@ -22,12 +22,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/schemaexpr" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/seqexpr" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" - "github.com/cockroachdb/cockroach/pkg/util/sequence" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" ) @@ -607,7 +607,7 @@ var supportedImportFuncOverrides = map[string]*customFunc{ visitorSideEffect: func(annot *tree.Annotations, fn *tree.FuncExpr) error { // Get sequence name so that we can update the annotation with the number // of nextval calls to this sequence in a row. - seqIdentifier, err := sequence.GetSequenceFromFunc(fn) + seqIdentifier, err := seqexpr.GetSequenceFromFunc(fn) if err != nil { return err } diff --git a/pkg/sql/schemachanger/scbuild/BUILD.bazel b/pkg/sql/schemachanger/scbuild/BUILD.bazel index b69fde8c2910..68b4eaed256e 100644 --- a/pkg/sql/schemachanger/scbuild/BUILD.bazel +++ b/pkg/sql/schemachanger/scbuild/BUILD.bazel @@ -21,6 +21,7 @@ go_library( "//pkg/sql/catalog/descs", "//pkg/sql/catalog/resolver", "//pkg/sql/catalog/schemaexpr", + "//pkg/sql/catalog/seqexpr", "//pkg/sql/catalog/tabledesc", "//pkg/sql/catalog/typedesc", "//pkg/sql/parser", @@ -33,7 +34,6 @@ go_library( "//pkg/sql/sqltelemetry", "//pkg/util/errorutil/unimplemented", "//pkg/util/protoutil", - "//pkg/util/sequence", "@com_github_cockroachdb_errors//:errors", "@com_github_lib_pq//oid", ], diff --git a/pkg/sql/schemachanger/scbuild/table.go b/pkg/sql/schemachanger/scbuild/table.go index bf1c8cc6cdca..660cb8ca195d 100644 --- a/pkg/sql/schemachanger/scbuild/table.go +++ b/pkg/sql/schemachanger/scbuild/table.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver" "github.com/cockroachdb/cockroach/pkg/sql/catalog/schemaexpr" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/seqexpr" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc" "github.com/cockroachdb/cockroach/pkg/sql/parser" @@ -26,7 +27,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scpb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" - "github.com/cockroachdb/cockroach/pkg/util/sequence" "github.com/cockroachdb/errors" "github.com/lib/pq/oid" ) @@ -353,7 +353,7 @@ var _ = (*buildContext)(nil).alterTableDropColumn func (b *buildContext) maybeAddSequenceReferenceDependencies( ctx context.Context, tableID descpb.ID, col *descpb.ColumnDescriptor, defaultExpr tree.TypedExpr, ) { - seqIdentifiers, err := sequence.GetUsedSequences(defaultExpr) + seqIdentifiers, err := seqexpr.GetUsedSequences(defaultExpr) if err != nil { panic(err) } @@ -391,7 +391,7 @@ func (b *buildContext) maybeAddSequenceReferenceDependencies( } if len(seqIdentifiers) > 0 { - newExpr, err := sequence.ReplaceSequenceNamesWithIDs(defaultExpr, seqNameToID) + newExpr, err := seqexpr.ReplaceSequenceNamesWithIDs(defaultExpr, seqNameToID) if err != nil { panic(err) } diff --git a/pkg/sql/schemachanger/scexec/scmutationexec/BUILD.bazel b/pkg/sql/schemachanger/scexec/scmutationexec/BUILD.bazel index 26aeab37b63d..ee2d373973ff 100644 --- a/pkg/sql/schemachanger/scexec/scmutationexec/BUILD.bazel +++ b/pkg/sql/schemachanger/scexec/scmutationexec/BUILD.bazel @@ -11,13 +11,13 @@ go_library( deps = [ "//pkg/sql/catalog", "//pkg/sql/catalog/descpb", + "//pkg/sql/catalog/seqexpr", "//pkg/sql/catalog/tabledesc", "//pkg/sql/catalog/typedesc", "//pkg/sql/parser", "//pkg/sql/schemachanger/scexec/descriptorutils", "//pkg/sql/schemachanger/scop", "//pkg/util/protoutil", - "//pkg/util/sequence", "@com_github_cockroachdb_errors//:errors", ], ) diff --git a/pkg/sql/schemachanger/scexec/scmutationexec/scmutationexec.go b/pkg/sql/schemachanger/scexec/scmutationexec/scmutationexec.go index 0c70901c7cf7..0031d5ce0d0d 100644 --- a/pkg/sql/schemachanger/scexec/scmutationexec/scmutationexec.go +++ b/pkg/sql/schemachanger/scexec/scmutationexec/scmutationexec.go @@ -16,13 +16,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/seqexpr" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec/descriptorutils" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scop" "github.com/cockroachdb/cockroach/pkg/util/protoutil" - "github.com/cockroachdb/cockroach/pkg/util/sequence" "github.com/cockroachdb/errors" ) @@ -113,7 +113,7 @@ func (m *visitor) UpdateRelationDeps(ctx context.Context, op scop.UpdateRelation if err != nil { return err } - usedSequences, err := sequence.GetUsedSequences(expr) + usedSequences, err := seqexpr.GetUsedSequences(expr) if err != nil { return err } diff --git a/pkg/sql/sequence.go b/pkg/sql/sequence.go index 1d921f5e08c3..f1e6338bdee1 100644 --- a/pkg/sql/sequence.go +++ b/pkg/sql/sequence.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/seqexpr" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" @@ -33,7 +34,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/sequence" "github.com/cockroachdb/errors" ) @@ -641,7 +641,7 @@ func maybeAddSequenceDependencies( expr tree.TypedExpr, backrefs map[descpb.ID]*tabledesc.Mutable, ) ([]*tabledesc.Mutable, error) { - seqIdentifiers, err := sequence.GetUsedSequences(expr) + seqIdentifiers, err := seqexpr.GetUsedSequences(expr) if err != nil { return nil, err } @@ -684,7 +684,7 @@ func maybeAddSequenceDependencies( // If sequences are present in the expr (and the cluster is the right version), // walk the expr tree and replace any sequences names with their IDs. if len(seqIdentifiers) > 0 { - newExpr, err := sequence.ReplaceSequenceNamesWithIDs(expr, seqNameToID) + newExpr, err := seqexpr.ReplaceSequenceNamesWithIDs(expr, seqNameToID) if err != nil { return nil, err } @@ -698,7 +698,7 @@ func maybeAddSequenceDependencies( // GetSequenceDescFromIdentifier resolves the sequence descriptor for the given // sequence identifier. func GetSequenceDescFromIdentifier( - ctx context.Context, sc resolver.SchemaResolver, seqIdentifier sequence.SeqIdentifier, + ctx context.Context, sc resolver.SchemaResolver, seqIdentifier seqexpr.SeqIdentifier, ) (*tabledesc.Mutable, error) { var tn tree.TableName if seqIdentifier.IsByID() { From 3e0a94429d7ec6286dfe690c101f685f57f0eb74 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Wed, 25 Aug 2021 16:51:43 -0400 Subject: [PATCH 3/3] changefeedccl: Implement backfill pushback. Recent changes to memory accounting, allocation and pushback, caused backfills to not track any memory at all. This PR corrects this by associating `kvevent.Alloc` with each event produced during the backfill. With an allocation associated with each event, backfills now gain pushback functionality. The accounting and pushback is implemented by using a blocking memory buffer, instead of the channel buffer. The use of blocking buffer also improved performance during the backfill. Backfills issue concurrent scan requests, while the rest of the event processing is single threading. Having a buffer improves throughput by around ~10%. Fixes #69248 Release Justification: Complete feature work previusly done for changefeeds by adding pushback signal to the backfills. Release note (enterprise change): Changefeeds correctly account for memory during backfills and "pushback" under memory pressure -- that is, slow down backfills. Release justification: --- .../changefeedccl/changefeed_processors.go | 114 ++++++++---------- pkg/ccl/changefeedccl/kvevent/BUILD.bazel | 8 +- pkg/ccl/changefeedccl/kvevent/alloc.go | 67 +++++++--- pkg/ccl/changefeedccl/kvevent/alloc_test.go | 111 +++++++++++++++++ .../changefeedccl/kvevent/blocking_buffer.go | 89 ++++++++++---- .../kvevent/blocking_buffer_test.go | 3 +- pkg/ccl/changefeedccl/kvevent/chan_buffer.go | 6 + pkg/ccl/changefeedccl/kvevent/event.go | 2 + pkg/ccl/changefeedccl/kvfeed/kv_feed.go | 28 +++-- pkg/ccl/changefeedccl/kvfeed/scanner_test.go | 4 + 10 files changed, 315 insertions(+), 117 deletions(-) create mode 100644 pkg/ccl/changefeedccl/kvevent/alloc_test.go diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 9a69086b7768..dbe3ff324422 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -78,7 +78,7 @@ type changeAggregator struct { resolvedSpanBuf encDatumRowBuffer // eventProducer produces the next event from the kv feed. - eventProducer kvEventProducer + eventProducer kvevent.Reader // eventConsumer consumes the event. eventConsumer kvEventConsumer @@ -257,26 +257,36 @@ func (ca *changeAggregator) Start(ctx context.Context) { ca.sink = makeMetricsSink(ca.metrics, ca.sink) ca.sink = &errorWrapperSink{wrapped: ca.sink} - cfg := ca.flowCtx.Cfg - buf := kvevent.NewThrottlingBuffer( - kvevent.MakeChanBuffer(), cdcutils.NodeLevelThrottler(&cfg.Settings.SV)) - kvfeedCfg := makeKVFeedCfg(ctx, ca.flowCtx.Cfg, ca.kvFeedMemMon, - ca.spec, spans, buf, ca.metrics, ca.knobs.FeedKnobs) - - ca.eventProducer = &bufEventProducer{buf} + initialHighWater, needsInitialScan := getKVFeedInitialParameters(ca.spec) + ca.eventProducer, err = ca.startKVFeed(ctx, spans, initialHighWater, needsInitialScan) + if err != nil { + // Early abort in the case that there is an error creating the sink. + ca.MoveToDraining(err) + ca.cancel() + return + } if ca.spec.Feed.Opts[changefeedbase.OptFormat] == string(changefeedbase.OptFormatNative) { ca.eventConsumer = newNativeKVConsumer(ca.sink) } else { ca.eventConsumer = newKVEventToRowConsumer( - ctx, cfg, ca.frontier.SpanFrontier(), kvfeedCfg.InitialHighWater, + ctx, ca.flowCtx.Cfg, ca.frontier.SpanFrontier(), initialHighWater, ca.sink, ca.encoder, ca.spec.Feed, ca.knobs) } - - ca.startKVFeed(ctx, kvfeedCfg) } -func (ca *changeAggregator) startKVFeed(ctx context.Context, kvfeedCfg kvfeed.Config) { +func (ca *changeAggregator) startKVFeed( + ctx context.Context, spans []roachpb.Span, initialHighWater hlc.Timestamp, needsInitialScan bool, +) (kvevent.Reader, error) { + cfg := ca.flowCtx.Cfg + buf := kvevent.NewThrottlingBuffer( + kvevent.NewMemBuffer(ca.kvFeedMemMon.MakeBoundAccount(), &cfg.Settings.SV, &ca.metrics.KVFeedMetrics), + cdcutils.NodeLevelThrottler(&cfg.Settings.SV)) + + // KVFeed takes ownership of the kvevent.Writer portion of the buffer, while + // we return the kvevent.Reader part to the caller. + kvfeedCfg := ca.makeKVFeedCfg(ctx, spans, buf, initialHighWater, needsInitialScan) + // Give errCh enough buffer both possible errors from supporting goroutines, // but only the first one is ever used. ca.errCh = make(chan error, 2) @@ -294,43 +304,33 @@ func (ca *changeAggregator) startKVFeed(ctx context.Context, kvfeedCfg kvfeed.Co close(ca.kvFeedDoneCh) ca.errCh <- err ca.cancel() + return nil, err } -} -func newSchemaFeed( - ctx context.Context, - cfg *execinfra.ServerConfig, - spec execinfrapb.ChangeAggregatorSpec, - metrics *Metrics, -) schemafeed.SchemaFeed { - schemaChangePolicy := changefeedbase.SchemaChangePolicy( - spec.Feed.Opts[changefeedbase.OptSchemaChangePolicy]) - if schemaChangePolicy == changefeedbase.OptSchemaChangePolicyIgnore { - return schemafeed.DoNothingSchemaFeed - } - schemaChangeEvents := changefeedbase.SchemaChangeEventClass( - spec.Feed.Opts[changefeedbase.OptSchemaChangeEvents]) - initialHighWater, _ := getKVFeedInitialParameters(spec) - return schemafeed.New(ctx, cfg, schemaChangeEvents, - spec.Feed.Targets, initialHighWater, &metrics.SchemaFeedMetrics) + return buf, nil } -func makeKVFeedCfg( +func (ca *changeAggregator) makeKVFeedCfg( ctx context.Context, - cfg *execinfra.ServerConfig, - mm *mon.BytesMonitor, - spec execinfrapb.ChangeAggregatorSpec, spans []roachpb.Span, - buf kvevent.Buffer, - metrics *Metrics, - knobs kvfeed.TestingKnobs, + buf kvevent.Writer, + initialHighWater hlc.Timestamp, + needsInitialScan bool, ) kvfeed.Config { schemaChangeEvents := changefeedbase.SchemaChangeEventClass( - spec.Feed.Opts[changefeedbase.OptSchemaChangeEvents]) + ca.spec.Feed.Opts[changefeedbase.OptSchemaChangeEvents]) schemaChangePolicy := changefeedbase.SchemaChangePolicy( - spec.Feed.Opts[changefeedbase.OptSchemaChangePolicy]) - _, withDiff := spec.Feed.Opts[changefeedbase.OptDiff] - initialHighWater, needsInitialScan := getKVFeedInitialParameters(spec) + ca.spec.Feed.Opts[changefeedbase.OptSchemaChangePolicy]) + _, withDiff := ca.spec.Feed.Opts[changefeedbase.OptDiff] + cfg := ca.flowCtx.Cfg + + var sf schemafeed.SchemaFeed + if schemaChangePolicy == changefeedbase.OptSchemaChangePolicyIgnore { + sf = schemafeed.DoNothingSchemaFeed + } else { + sf = schemafeed.New(ctx, cfg, schemaChangeEvents, ca.spec.Feed.Targets, + initialHighWater, &ca.metrics.SchemaFeedMetrics) + } return kvfeed.Config{ Writer: buf, @@ -340,17 +340,17 @@ func makeKVFeedCfg( Clock: cfg.DB.Clock(), Gossip: cfg.Gossip, Spans: spans, - BackfillCheckpoint: spec.Checkpoint.Spans, - Targets: spec.Feed.Targets, - Metrics: &metrics.KVFeedMetrics, - MM: mm, + BackfillCheckpoint: ca.spec.Checkpoint.Spans, + Targets: ca.spec.Feed.Targets, + Metrics: &ca.metrics.KVFeedMetrics, + MM: ca.kvFeedMemMon, InitialHighWater: initialHighWater, WithDiff: withDiff, NeedsInitialScan: needsInitialScan, SchemaChangeEvents: schemaChangeEvents, SchemaChangePolicy: schemaChangePolicy, - SchemaFeed: newSchemaFeed(ctx, cfg, spec, metrics), - Knobs: knobs, + SchemaFeed: sf, + Knobs: ca.knobs.FeedKnobs, } } @@ -426,6 +426,7 @@ func (ca *changeAggregator) close() { log.Warningf(ca.Ctx, `error closing sink. goroutines may have leaked: %v`, err) } } + ca.memAcc.Close(ca.Ctx) if ca.kvFeedMemMon != nil { ca.kvFeedMemMon.Stop(ca.Ctx) @@ -469,7 +470,7 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet // kvFeed, sends off this event to the event consumer, and flushes the sink // if necessary. func (ca *changeAggregator) tick() error { - event, err := ca.eventProducer.GetEvent(ca.Ctx) + event, err := ca.eventProducer.Get(ca.Ctx) if err != nil { return err } @@ -489,7 +490,8 @@ func (ca *changeAggregator) tick() error { case kvevent.TypeKV: return ca.eventConsumer.ConsumeEvent(ca.Ctx, event) case kvevent.TypeResolved: - event.DetachAlloc().Release(ca.Ctx) + a := event.DetachAlloc() + a.Release(ca.Ctx) resolved := event.Resolved() if ca.knobs.ShouldSkipResolved == nil || !ca.knobs.ShouldSkipResolved(resolved) { return ca.noteResolvedSpan(resolved) @@ -574,22 +576,6 @@ func (ca *changeAggregator) ConsumerClosed() { ca.close() } -type kvEventProducer interface { - // GetEvent returns the next kv event. - GetEvent(ctx context.Context) (kvevent.Event, error) -} - -type bufEventProducer struct { - kvevent.Reader -} - -var _ kvEventProducer = &bufEventProducer{} - -// GetEvent implements kvEventProducer interface -func (p *bufEventProducer) GetEvent(ctx context.Context) (kvevent.Event, error) { - return p.Get(ctx) -} - type kvEventConsumer interface { // ConsumeEvent responsible for consuming kv event. ConsumeEvent(ctx context.Context, event kvevent.Event) error diff --git a/pkg/ccl/changefeedccl/kvevent/BUILD.bazel b/pkg/ccl/changefeedccl/kvevent/BUILD.bazel index 0279f310b8a0..047865f58226 100644 --- a/pkg/ccl/changefeedccl/kvevent/BUILD.bazel +++ b/pkg/ccl/changefeedccl/kvevent/BUILD.bazel @@ -33,9 +33,12 @@ go_library( go_test( name = "kvevent_test", - srcs = ["blocking_buffer_test.go"], + srcs = [ + "alloc_test.go", + "blocking_buffer_test.go", + ], + embed = [":kvevent"], deps = [ - ":kvevent", "//pkg/keys", "//pkg/roachpb:with-mocks", "//pkg/settings/cluster", @@ -50,6 +53,7 @@ go_test( "//pkg/util/mon", "//pkg/util/quotapool", "//pkg/util/randutil", + "//pkg/util/syncutil", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/ccl/changefeedccl/kvevent/alloc.go b/pkg/ccl/changefeedccl/kvevent/alloc.go index ffba3a58ce7b..aaa4084e05c8 100644 --- a/pkg/ccl/changefeedccl/kvevent/alloc.go +++ b/pkg/ccl/changefeedccl/kvevent/alloc.go @@ -18,38 +18,73 @@ type Alloc struct { bytes int64 // memory allocated for this request. entries int64 // number of entries using those bytes, usually 1. ap pool // pool where those resources ought to be released. + + // otherPoolAllocs is a map from pool to Alloc that exists to deal with + // cases where allocs from different pools might be merged into this pool. + // This can happen, at the time of writing, when the backfill concludes. + // By merging on a per-pool basis, we can accumulate exactly the number of + // allocs as there are pools in use. Any entry in this map must have a + // nil otherPoolAllocs field. + otherPoolAllocs map[pool]*Alloc +} + +// pool is an allocation pool responsible for freeing up previously acquired resources. +type pool interface { + // Release releases resources to this pool. + Release(ctx context.Context, bytes, entries int64) } // Release releases resources associated with this allocation. -func (a Alloc) Release(ctx context.Context) { - if a.ap != nil { - a.ap.Release(ctx, a.bytes, a.entries) +func (a *Alloc) Release(ctx context.Context) { + if a.isZero() { + return } + for _, oa := range a.otherPoolAllocs { + oa.Release(ctx) + } + a.ap.Release(ctx, a.bytes, a.entries) + a.clear() } // Merge merges other resources into this allocation. func (a *Alloc) Merge(other *Alloc) { - if a.ap == nil { - // Okay to merge into nil allocation -- just use the other. + defer other.clear() + if a.isZero() { // a is a zero allocation -- just use the other. *a = *other return } - if a.ap != other.ap { - panic("cannot merge allocations from two different pools") + // If other has any allocs from a pool other than its own, merge those + // into this. Flattening first means that any alloc in otherPoolAllocs + // will have a nil otherPoolAllocs. + if other.otherPoolAllocs != nil { + for _, oa := range other.otherPoolAllocs { + a.Merge(oa) + } + other.otherPoolAllocs = nil } - a.bytes += other.bytes - a.entries += other.entries - other.bytes = 0 - other.entries = 0 -} -// pool is an allocation pool responsible for freeing up previously acquired resources. -type pool interface { - // Release releases resources to this pool. - Release(ctx context.Context, bytes, entries int64) + if samePool := a.ap == other.ap; samePool { + a.bytes += other.bytes + a.entries += other.entries + } else { + // If other is from another pool, either store it in the map or merge it + // into an existing map entry. + if a.otherPoolAllocs == nil { + a.otherPoolAllocs = make(map[pool]*Alloc, 1) + } + if mergeAlloc, ok := a.otherPoolAllocs[other.ap]; ok { + mergeAlloc.Merge(other) + } else { + otherCpy := *other + a.otherPoolAllocs[other.ap] = &otherCpy + } + } } +func (a *Alloc) clear() { *a = Alloc{} } +func (a *Alloc) isZero() bool { return a.ap == nil } + // TestingMakeAlloc creates allocation for the specified number of bytes // in a single message using allocation pool 'p'. func TestingMakeAlloc(bytes int64, p pool) Alloc { diff --git a/pkg/ccl/changefeedccl/kvevent/alloc_test.go b/pkg/ccl/changefeedccl/kvevent/alloc_test.go new file mode 100644 index 000000000000..3d4d50294a96 --- /dev/null +++ b/pkg/ccl/changefeedccl/kvevent/alloc_test.go @@ -0,0 +1,111 @@ +// Copyright 2021 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package kvevent + +import ( + "context" + "fmt" + "math/rand" + "testing" + + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/stretchr/testify/require" +) + +func TestAllocMergeRandomized(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + run := func(t *testing.T, N, P int) { + require.True(t, N >= P) // test assumes this invariant + pools := make([]*testAllocPool, P) + allocs := make([]Alloc, N) + + // Make P pools. + for i := range pools { + pools[i] = &testAllocPool{} + } + + // Allocate N allocs from the P pools. + poolPerm := rand.Perm(P) + for i := range allocs { + allocs[i] = pools[poolPerm[i%P]].alloc() + } + + // Randomly merge the allocs together. + perm := rand.Perm(N) + for i := 0; i < N-1; i++ { + p := perm[i] + toMergeInto := perm[i+1+rand.Intn(N-i-1)] + allocs[toMergeInto].Merge(&allocs[p]) + } + + // Ensure that the remaining alloc, which has received all of the + // others, has P-1 other allocs. + require.Len(t, allocs[perm[N-1]].otherPoolAllocs, P-1) + for i := 0; i < N-1; i++ { + require.True(t, allocs[perm[i]].isZero()) + } + + // Ensure that all N allocations worth of data are still outstanding + sum := func() (ret int) { + for _, p := range pools { + ret += p.getN() + } + return ret + } + require.Equal(t, N, sum()) + + // Release the remaining alloc. + allocs[perm[N-1]].Release(context.Background()) + // Ensure it now is zero-valued. + require.True(t, allocs[perm[N-1]].isZero()) + // Ensure that all of the resources have been released. + require.Equal(t, 0, sum()) + } + for _, np := range []struct{ N, P int }{ + {1, 1}, + {2, 2}, + {1000, 2}, + {10000, 1000}, + } { + t.Run(fmt.Sprintf("N=%d,P=%d", np.N, np.P), func(t *testing.T) { + run(t, np.N, np.P) + }) + } +} + +type testAllocPool struct { + syncutil.Mutex + n int64 +} + +// Release implements kvevent.pool interface. +func (ap *testAllocPool) Release(ctx context.Context, bytes, entries int64) { + ap.Lock() + defer ap.Unlock() + if ap.n == 0 { + panic("can't release zero resources") + } + ap.n -= bytes +} + +func (ap *testAllocPool) alloc() Alloc { + ap.Lock() + defer ap.Unlock() + ap.n++ + return TestingMakeAlloc(1, ap) +} + +func (ap *testAllocPool) getN() int { + ap.Lock() + defer ap.Unlock() + return int(ap.n) +} diff --git a/pkg/ccl/changefeedccl/kvevent/blocking_buffer.go b/pkg/ccl/changefeedccl/kvevent/blocking_buffer.go index d5d6283f4e9f..5772f48d5955 100644 --- a/pkg/ccl/changefeedccl/kvevent/blocking_buffer.go +++ b/pkg/ccl/changefeedccl/kvevent/blocking_buffer.go @@ -10,7 +10,6 @@ package kvevent import ( "context" - "io" "sync" "time" @@ -34,8 +33,9 @@ type blockingBuffer struct { mu struct { syncutil.Mutex - closed bool - queue bufferEntryQueue + closed bool + drainCh chan struct{} + queue bufferEntryQueue } } @@ -70,9 +70,14 @@ func (b *blockingBuffer) pop() (e *bufferEntry, err error) { b.mu.Lock() defer b.mu.Unlock() if b.mu.closed { - return nil, io.EOF + return nil, ErrBufferClosed } - return b.mu.queue.dequeue(), nil + e = b.mu.queue.dequeue() + if b.mu.drainCh != nil && b.mu.queue.empty() { + close(b.mu.drainCh) + b.mu.drainCh = nil + } + return e, nil } // Get implements kvevent.Reader interface. @@ -116,26 +121,33 @@ func (b *blockingBuffer) ensureOpenedLocked(ctx context.Context) error { // Add implements Writer interface. func (b *blockingBuffer) Add(ctx context.Context, e Event) error { - if e.alloc.ap != nil { - return errors.AssertionFailedf("event unexpectedly has a alloc associated with it") - } - if err := b.ensureOpened(ctx); err != nil { return err } - // Acquire the quota first. - alloc := int64(changefeedbase.EventMemoryMultiplier.Get(b.sv) * float64(e.approxSize)) - if l := changefeedbase.PerChangefeedMemLimit.Get(b.sv); alloc > l { - return errors.Newf("event size %d exceeds per changefeed limit %d", alloc, l) - } + var be *bufferEntry + if e.alloc.ap == nil { + // Acquire the quota first. + alloc := int64(changefeedbase.EventMemoryMultiplier.Get(b.sv) * float64(e.approxSize)) + if l := changefeedbase.PerChangefeedMemLimit.Get(b.sv); alloc > l { + return errors.Newf("event size %d exceeds per changefeed limit %d", alloc, l) + } + e.alloc = Alloc{ + bytes: alloc, + entries: 1, + ap: &b.qp, + } + be = newBufferEntry(e) - be := newBufferEntry(e, &b.qp, alloc) - if err := b.qp.Acquire(ctx, be); err != nil { - bufferEntryPool.Put(be) - return err + if err := b.qp.Acquire(ctx, be); err != nil { + bufferEntryPool.Put(be) + return err + } + b.metrics.BufferEntriesMemAcquired.Inc(alloc) + } else { + // Use allocation associated with the event itself. + be = newBufferEntry(e) } - b.metrics.BufferEntriesMemAcquired.Inc(alloc) // Enqueue message, and signal if anybody is waiting. b.mu.Lock() @@ -153,6 +165,34 @@ func (b *blockingBuffer) Add(ctx context.Context, e Event) error { return nil } +// tryDrain attempts to see if the buffer already empty. +// If so, returns nil. If not, returns a channel that will be closed once the buffer is empty. +func (b *blockingBuffer) tryDrain() chan struct{} { + b.mu.Lock() + defer b.mu.Unlock() + if b.mu.queue.empty() { + return nil + } + + b.mu.drainCh = make(chan struct{}) + return b.mu.drainCh +} + +// Drain implements Writer interface. +func (b *blockingBuffer) Drain(ctx context.Context) error { + if drained := b.tryDrain(); drained != nil { + select { + case <-ctx.Done(): + return ctx.Err() + case <-drained: + return nil + } + } + + return nil +} + +// Close implements Writer interface. func (b *blockingBuffer) Close(ctx context.Context) error { b.mu.Lock() defer b.mu.Unlock() @@ -236,13 +276,8 @@ var bufferEntryPool = sync.Pool{ }, } -func newBufferEntry(e Event, ap *allocPool, alloc int64) *bufferEntry { +func newBufferEntry(e Event) *bufferEntry { be := bufferEntryPool.Get().(*bufferEntry) - e.alloc = Alloc{ - bytes: alloc, - entries: 1, - ap: ap, - } be.e = e be.next = nil return be @@ -302,6 +337,10 @@ func (l *bufferEntryQueue) enqueue(be *bufferEntry) { } } +func (l *bufferEntryQueue) empty() bool { + return l.head == nil +} + func (l *bufferEntryQueue) dequeue() *bufferEntry { if l.head == nil { return nil diff --git a/pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go b/pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go index 92fef0eb3d42..fa34f6266682 100644 --- a/pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go +++ b/pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go @@ -106,7 +106,8 @@ func TestBlockingBuffer(t *testing.T) { for metrics.BufferPushbackNanos.Count() == 0 { e, err := buf.Get(context.Background()) require.NoError(t, err) - e.DetachAlloc().Release(context.Background()) + a := e.DetachAlloc() + a.Release(context.Background()) } stopProducers() } diff --git a/pkg/ccl/changefeedccl/kvevent/chan_buffer.go b/pkg/ccl/changefeedccl/kvevent/chan_buffer.go index 931bc2a1e890..b831f80db33b 100644 --- a/pkg/ccl/changefeedccl/kvevent/chan_buffer.go +++ b/pkg/ccl/changefeedccl/kvevent/chan_buffer.go @@ -39,6 +39,12 @@ func (b *chanBuffer) Add(ctx context.Context, event Event) error { } } +// Drain implements Writer interface. +func (b *chanBuffer) Drain(ctx context.Context) error { + // channel buffer is unbuffered. + return nil +} + func (b *chanBuffer) Close(_ context.Context) error { close(b.entriesCh) return nil diff --git a/pkg/ccl/changefeedccl/kvevent/event.go b/pkg/ccl/changefeedccl/kvevent/event.go index 10d2a831c2c5..fea2dc5bc7b1 100644 --- a/pkg/ccl/changefeedccl/kvevent/event.go +++ b/pkg/ccl/changefeedccl/kvevent/event.go @@ -41,6 +41,8 @@ type Reader interface { type Writer interface { // Add adds event to this writer. Add(ctx context.Context, event Event) error + // Drain waits until all events buffered by this writer has been consumed. + Drain(ctx context.Context) error // Close closes this writer. Close(ctx context.Context) error } diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go index 32995faff539..c39bf758a290 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go @@ -103,6 +103,7 @@ func Run(ctx context.Context, cfg Config) error { g.GoCtx(cfg.SchemaFeed.Run) g.GoCtx(f.run) err := g.Wait() + // NB: The higher layers of the changefeed should detect the boundary and the // policy and tear everything down. Returning before the higher layers tear down // the changefeed exposes synchronization challenges if the provided writer is @@ -110,18 +111,27 @@ func Run(ctx context.Context, cfg Config) error { // changefeedAggregator to exit even if all values haven't been read out of the // provided buffer. var scErr schemaChangeDetectedError - if errors.As(err, &scErr) { - log.Infof(ctx, "stopping kv feed due to schema change at %v", scErr.ts) - // Close the buffer so the consumer (changeAggregator) knows no more - // writes are expected and can transition to a draining state. - if err := f.writer.Close(ctx); err != nil { - return errors.Wrap(err, "failed to close kv event writer") - } + if !errors.As(err, &scErr) { + // Regardless of whether we exited KV feed with or without an error, that error + // is not a schema change; so, close the writer and return. + return errors.CombineErrors(err, f.writer.Close(ctx)) + } + + log.Infof(ctx, "stopping kv feed due to schema change at %v", scErr.ts) + + // Drain the writer before we close it so that all events emitted prior to schema change + // boundary are consumed by the change aggregator. + // Regardless of whether drain succeeds, we must also close the buffer to release + // any resources, and to let the consumer (changeAggregator) know that no more writes + // are expected so that it can transition to a draining state. + err = errors.CombineErrors(f.writer.Drain(ctx), f.writer.Close(ctx)) + + if err == nil { // This context is canceled by the change aggregator when it receives - // ErrBufferClosed from the kv buffer that we closed above. + // an error reading from the Writer that was closed above. <-ctx.Done() - err = nil } + return err } diff --git a/pkg/ccl/changefeedccl/kvfeed/scanner_test.go b/pkg/ccl/changefeedccl/kvfeed/scanner_test.go index ad64829ff889..6d16610a778a 100644 --- a/pkg/ccl/changefeedccl/kvfeed/scanner_test.go +++ b/pkg/ccl/changefeedccl/kvfeed/scanner_test.go @@ -38,6 +38,10 @@ func (r *recordResolvedWriter) Add(ctx context.Context, e kvevent.Event) error { return nil } +func (r *recordResolvedWriter) Drain(ctx context.Context) error { + return nil +} + func (r *recordResolvedWriter) Close(ctx context.Context) error { return nil }