From 0a8baa981d11303e1c9ce9af54de7a4e4cff30f7 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 19 Apr 2021 19:42:07 -0700 Subject: [PATCH 1/8] sql: use disk-backed container in bufferNode This commit refactors several `planNode`s that need to buffer rows to use a disk-backed row container instead of pure in-memory one. In order to achieve that a couple of light wrappers are introduced on top of the corresponding container and an iterator over it. Still, one - probably important - case is not fixed properly: namely, if a subquery is executed in AllRows or AllRowsNormalized mode, then we first buffer the rows into the disk-backed container only to materialize it later into a single tuple. Addressing this is left as a TODO. Release note (sql change): CockroachDB now should be more stable when executing queries with subqueries producing many rows (previously we could OOM crash and now we will use the temporary disk storage). --- pkg/sql/BUILD.bazel | 1 + pkg/sql/apply_join.go | 109 ++++++++------- pkg/sql/buffer.go | 46 ++++--- pkg/sql/buffer_util.go | 128 ++++++++++++++++++ pkg/sql/create_stats.go | 14 +- pkg/sql/distsql_plan_csv.go | 13 +- pkg/sql/distsql_plan_stats.go | 6 +- pkg/sql/distsql_running.go | 50 ++++--- .../logictest/testdata/logic_test/apply_join | 26 ++++ pkg/sql/plan_columns.go | 16 ++- pkg/sql/recursive_cte.go | 88 +++++++----- pkg/sql/rowcontainer/datum_row_container.go | 12 +- pkg/sql/scrub.go | 26 ++-- pkg/sql/scrub_physical.go | 52 ++++--- 14 files changed, 407 insertions(+), 180 deletions(-) create mode 100644 pkg/sql/buffer_util.go diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index e13fbe548df8..e75b8cee5af9 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -23,6 +23,7 @@ go_library( "authorization.go", "backfill.go", "buffer.go", + "buffer_util.go", "cancel_queries.go", "cancel_sessions.go", "check.go", diff --git a/pkg/sql/apply_join.go b/pkg/sql/apply_join.go index ebd7238b3d10..be061feb6f50 100644 --- a/pkg/sql/apply_join.go +++ b/pkg/sql/apply_join.go @@ -16,8 +16,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/opt/exec" - "github.com/cockroachdb/cockroach/pkg/sql/rowcontainer" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" ) @@ -43,10 +43,10 @@ type applyJoinNode struct { // columns contains the metadata for the results of this node. columns colinfo.ResultColumns - // rightCols contains the metadata for the result of the right side of this - // apply join, as built in the optimization phase. Later on, every re-planning - // of the right side will emit these same columns. - rightCols colinfo.ResultColumns + // rightTypes is the schema of the rows produced by the right side of the + // join, as built in the optimization phase. Later on, every re-planning of + // the right side will emit these same columns. + rightTypes []*types.T planRightSideFn exec.ApplyJoinPlanRightSideFn @@ -61,10 +61,9 @@ type applyJoinNode struct { leftRowFoundAMatch bool // rightRows will be populated with the result of the right side of the join // each time it's run. - rightRows *rowcontainer.RowContainer - // curRightRow is the index into rightRows of the current right row being - // processed. - curRightRow int + rightRows rowContainerHelper + // rightRowsIterator, if non-nil, is the iterator into rightRows. + rightRowsIterator *rowContainerIterator // out is the full result row, populated on each call to Next. out tree.Datums // done is true if the left side has been exhausted. @@ -72,7 +71,6 @@ type applyJoinNode struct { } } -// Set to true to enable ultra verbose debug logging. func newApplyJoinNode( joinType descpb.JoinType, left planDataSource, @@ -93,7 +91,7 @@ func newApplyJoinNode( joinType: joinType, input: left, pred: pred, - rightCols: rightCols, + rightTypes: getTypesFromResultColumns(rightCols), planRightSideFn: planRightSideFn, columns: pred.cols, }, nil @@ -103,15 +101,13 @@ func (a *applyJoinNode) startExec(params runParams) error { // If needed, pre-allocate a right row of NULL tuples for when the // join predicate fails to match. if a.joinType == descpb.LeftOuterJoin { - a.run.emptyRight = make(tree.Datums, len(a.rightCols)) + a.run.emptyRight = make(tree.Datums, len(a.rightTypes)) for i := range a.run.emptyRight { a.run.emptyRight[i] = tree.DNull } } a.run.out = make(tree.Datums, len(a.columns)) - ci := colinfo.ColTypeInfoFromResCols(a.rightCols) - acc := params.EvalContext().Mon.MakeBoundAccount() - a.run.rightRows = rowcontainer.NewRowContainer(acc, ci) + a.run.rightRows.init(a.rightTypes, params.extendedEvalCtx, "apply-join" /* opName */) return nil } @@ -121,37 +117,42 @@ func (a *applyJoinNode) Next(params runParams) (bool, error) { } for { - for a.run.curRightRow < a.run.rightRows.Len() { + if a.run.rightRowsIterator != nil { // We have right rows set up - check the next one for a match. - var rrow tree.Datums - if len(a.rightCols) != 0 { - rrow = a.run.rightRows.At(a.run.curRightRow) - } - a.run.curRightRow++ - // Compute join. - predMatched, err := a.pred.eval(params.EvalContext(), a.run.leftRow, rrow) - if err != nil { - return false, err - } - if !predMatched { - // Didn't match? Try with the next right-side row. - continue - } + for { + // Note that if a.rightTypes has zero length, non-nil rrow is + // returned the correct number of times. + rrow, err := a.run.rightRowsIterator.next() + if err != nil { + return false, err + } + if rrow == nil { + // We have exhausted all rows from the right side. + break + } + // Compute join. + predMatched, err := a.pred.eval(params.EvalContext(), a.run.leftRow, rrow) + if err != nil { + return false, err + } + if !predMatched { + // Didn't match? Try with the next right-side row. + continue + } - a.run.leftRowFoundAMatch = true - if a.joinType == descpb.LeftAntiJoin || - a.joinType == descpb.LeftSemiJoin { - // We found a match, but we're doing an anti or semi join, so we're - // done with this left row. - break + a.run.leftRowFoundAMatch = true + if a.joinType == descpb.LeftAntiJoin || + a.joinType == descpb.LeftSemiJoin { + // We found a match, but we're doing an anti or semi join, + // so we're done with this left row. + break + } + // We're doing an ordinary join, so prep the row and emit it. + a.pred.prepareRow(a.run.out, a.run.leftRow, rrow) + return true, nil } - // We're doing an ordinary join, so prep the row and emit it. - a.pred.prepareRow(a.run.out, a.run.leftRow, rrow) - return true, nil } - // We're out of right side rows. Clear them, and reset the match state for - // next time. - a.run.rightRows.Clear(params.ctx) + // We're out of right side rows. Reset the match state for next time. foundAMatch := a.run.leftRowFoundAMatch a.run.leftRowFoundAMatch = false @@ -222,15 +223,25 @@ func (a *applyJoinNode) Next(params runParams) (bool, error) { // wrong during execution of the right hand side of the join, and that we should // completely give up on the outer join. func (a *applyJoinNode) runRightSidePlan(params runParams, plan *planComponents) error { - a.run.curRightRow = 0 - a.run.rightRows.Clear(params.ctx) - return runPlanInsidePlan(params, plan, a.run.rightRows) + // Prepare rightRows state for reuse. + if err := a.run.rightRows.clear(params.ctx); err != nil { + return err + } + if a.run.rightRowsIterator != nil { + a.run.rightRowsIterator.close() + a.run.rightRowsIterator = nil + } + if err := runPlanInsidePlan(params, plan, &a.run.rightRows); err != nil { + return err + } + a.run.rightRowsIterator = newRowContainerIterator(params.ctx, a.run.rightRows, a.rightTypes) + return nil } // runPlanInsidePlan is used to run a plan and gather the results in a row // container, as part of the execution of an "outer" plan. func runPlanInsidePlan( - params runParams, plan *planComponents, rowContainer *rowcontainer.RowContainer, + params runParams, plan *planComponents, rowContainer *rowContainerHelper, ) error { rowResultWriter := NewRowResultWriter(rowContainer) recv := MakeDistSQLReceiver( @@ -279,7 +290,9 @@ func (a *applyJoinNode) Values() tree.Datums { func (a *applyJoinNode) Close(ctx context.Context) { a.input.plan.Close(ctx) - if a.run.rightRows != nil { - a.run.rightRows.Close(ctx) + a.run.rightRows.close(ctx) + if a.run.rightRowsIterator != nil { + a.run.rightRowsIterator.close() + a.run.rightRowsIterator = nil } } diff --git a/pkg/sql/buffer.go b/pkg/sql/buffer.go index 6e139e71c49c..4c8138a46890 100644 --- a/pkg/sql/buffer.go +++ b/pkg/sql/buffer.go @@ -13,9 +13,8 @@ package sql import ( "context" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" - "github.com/cockroachdb/cockroach/pkg/sql/rowcontainer" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" ) // bufferNode consumes its input one row at a time, stores it in the buffer, @@ -24,21 +23,18 @@ import ( type bufferNode struct { plan planNode - // TODO(yuzefovich): the buffer should probably be backed by disk. If so, the - // comments about TempStorage suggest that it should be used by DistSQL - // processors, but this node is local. - bufferedRows *rowcontainer.RowContainer - passThruNextRowIdx int + // typs is the schema of rows buffered by this node. + typs []*types.T + rows rowContainerHelper + currentRow tree.Datums // label is a string used to describe the node in an EXPLAIN plan. label string } func (n *bufferNode) startExec(params runParams) error { - n.bufferedRows = rowcontainer.NewRowContainer( - params.EvalContext().Mon.MakeBoundAccount(), - colinfo.ColTypeInfoFromResCols(getPlanColumns(n.plan, false /* mut */)), - ) + n.typs = planTypes(n.plan) + n.rows.init(n.typs, params.extendedEvalCtx, n.label) return nil } @@ -53,20 +49,20 @@ func (n *bufferNode) Next(params runParams) (bool, error) { if !ok { return false, nil } - if _, err = n.bufferedRows.AddRow(params.ctx, n.plan.Values()); err != nil { + n.currentRow = n.plan.Values() + if err = n.rows.addRow(params.ctx, n.currentRow); err != nil { return false, err } - n.passThruNextRowIdx++ return true, nil } func (n *bufferNode) Values() tree.Datums { - return n.bufferedRows.At(n.passThruNextRowIdx - 1) + return n.currentRow } func (n *bufferNode) Close(ctx context.Context) { n.plan.Close(ctx) - n.bufferedRows.Close(ctx) + n.rows.close(ctx) } // scanBufferNode behaves like an iterator into the bufferNode it is @@ -75,24 +71,34 @@ func (n *bufferNode) Close(ctx context.Context) { type scanBufferNode struct { buffer *bufferNode - nextRowIdx int + iterator *rowContainerIterator + currentRow tree.Datums // label is a string used to describe the node in an EXPLAIN plan. label string } -func (n *scanBufferNode) startExec(runParams) error { +func (n *scanBufferNode) startExec(params runParams) error { + n.iterator = newRowContainerIterator(params.ctx, n.buffer.rows, n.buffer.typs) return nil } func (n *scanBufferNode) Next(runParams) (bool, error) { - n.nextRowIdx++ - return n.nextRowIdx <= n.buffer.bufferedRows.Len(), nil + var err error + n.currentRow, err = n.iterator.next() + if n.currentRow == nil || err != nil { + return false, err + } + return true, nil } func (n *scanBufferNode) Values() tree.Datums { - return n.buffer.bufferedRows.At(n.nextRowIdx - 1) + return n.currentRow } func (n *scanBufferNode) Close(context.Context) { + if n.iterator != nil { + n.iterator.close() + n.iterator = nil + } } diff --git a/pkg/sql/buffer_util.go b/pkg/sql/buffer_util.go new file mode 100644 index 000000000000..a49aa5eb4d4f --- /dev/null +++ b/pkg/sql/buffer_util.go @@ -0,0 +1,128 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql + +import ( + "context" + "fmt" + + "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/sql/rowcontainer" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/mon" +) + +// rowContainerHelper is a wrapper around a disk-backed row container that +// should be used by planNodes (or similar components) whenever they need to +// buffer data. init must be called before the first use. +type rowContainerHelper struct { + rows *rowcontainer.DiskBackedRowContainer + scratch rowenc.EncDatumRow + memMonitor *mon.BytesMonitor + diskMonitor *mon.BytesMonitor +} + +func (c *rowContainerHelper) init( + typs []*types.T, evalContext *extendedEvalContext, opName string, +) { + distSQLCfg := &evalContext.DistSQLPlanner.distSQLSrv.ServerConfig + c.memMonitor = execinfra.NewLimitedMonitor(evalContext.Context, evalContext.Mon, distSQLCfg, fmt.Sprintf("%s-limited", opName)) + c.diskMonitor = execinfra.NewMonitor(evalContext.Context, distSQLCfg.ParentDiskMonitor, fmt.Sprintf("%s-disk", opName)) + c.rows = &rowcontainer.DiskBackedRowContainer{} + c.rows.Init( + colinfo.NoOrdering, typs, &evalContext.EvalContext, + distSQLCfg.TempStorage, c.memMonitor, c.diskMonitor, + ) + c.scratch = make(rowenc.EncDatumRow, len(typs)) +} + +// addRow adds a given row to the helper and returns any error it encounters. +func (c *rowContainerHelper) addRow(ctx context.Context, row tree.Datums) error { + for i := range row { + c.scratch[i].Datum = row[i] + } + return c.rows.AddRow(ctx, c.scratch) +} + +// len returns the number of rows buffered so far. +func (c *rowContainerHelper) len() int { + return c.rows.Len() +} + +// clear prepares the helper for reuse (it resets the underlying container which +// will delete all buffered data; also, the container will be using the +// in-memory variant even if it spilled on the previous usage). +func (c *rowContainerHelper) clear(ctx context.Context) error { + return c.rows.UnsafeReset(ctx) +} + +// close must be called once the helper is no longer needed to clean up any +// resources. +func (c *rowContainerHelper) close(ctx context.Context) { + if c.rows != nil { + c.rows.Close(ctx) + c.memMonitor.Stop(ctx) + c.diskMonitor.Stop(ctx) + c.rows = nil + } +} + +// rowContainerIterator is a wrapper around rowcontainer.RowIterator that takes +// care of advancing the underlying iterator and converting the rows to +// tree.Datums. +type rowContainerIterator struct { + iter rowcontainer.RowIterator + + typs []*types.T + datums tree.Datums + da rowenc.DatumAlloc +} + +// newRowContainerIterator returns a new rowContainerIterator that must be +// closed once no longer needed. +func newRowContainerIterator( + ctx context.Context, c rowContainerHelper, typs []*types.T, +) *rowContainerIterator { + i := &rowContainerIterator{ + iter: c.rows.NewIterator(ctx), + typs: typs, + datums: make(tree.Datums, len(typs)), + } + i.iter.Rewind() + return i +} + +// next returns the next row of the iterator or an error if encountered. It +// returns nil, nil when the iterator has been exhausted. +func (i *rowContainerIterator) next() (tree.Datums, error) { + defer i.iter.Next() + if valid, err := i.iter.Valid(); err != nil { + return nil, err + } else if !valid { + // All rows have been exhausted. + return nil, nil + } + row, err := i.iter.Row() + if err != nil { + return nil, err + } + if err = rowenc.EncDatumRowToDatums(i.typs, i.datums, row, &i.da); err != nil { + return nil, err + } + return i.datums, nil +} + +func (i *rowContainerIterator) close() { + i.iter.Close() +} diff --git a/pkg/sql/create_stats.go b/pkg/sql/create_stats.go index 49e3cf186d48..f04bd559a769 100644 --- a/pkg/sql/create_stats.go +++ b/pkg/sql/create_stats.go @@ -30,7 +30,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/privilege" - "github.com/cockroachdb/cockroach/pkg/sql/rowcontainer" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/sql/stats" @@ -506,14 +505,6 @@ func (r *createStatsResumer) Resume(ctx context.Context, execCtx interface{}) er r.tableID = details.Table.ID evalCtx := p.ExtendedEvalContext() - ci := colinfo.ColTypeInfoFromColTypes([]*types.T{}) - rows := rowcontainer.NewRowContainer(evalCtx.Mon.MakeBoundAccount(), ci) - defer func() { - if rows != nil { - rows.Close(ctx) - } - }() - dsp := p.DistSQLPlanner() if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { // Set the transaction on the EvalContext to this txn. This allows for @@ -527,8 +518,11 @@ func (r *createStatsResumer) Resume(ctx context.Context, execCtx interface{}) er } planCtx := dsp.NewPlanningCtx(ctx, evalCtx, nil /* planner */, txn, true /* distribute */) + // CREATE STATS flow doesn't produce any rows and only emits the + // metadata, so we can use a nil rowContainerHelper. + resultWriter := NewRowResultWriter(nil /* rowContainer */) if err := dsp.planAndRunCreateStats( - ctx, evalCtx, planCtx, txn, r.job, NewRowResultWriter(rows), + ctx, evalCtx, planCtx, txn, r.job, resultWriter, ); err != nil { // Check if this was a context canceled error and restart if it was. if grpcutil.IsContextCanceled(err) { diff --git a/pkg/sql/distsql_plan_csv.go b/pkg/sql/distsql_plan_csv.go index 1e13ed9b146f..0b76587ecdd1 100644 --- a/pkg/sql/distsql_plan_csv.go +++ b/pkg/sql/distsql_plan_csv.go @@ -24,7 +24,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" - "github.com/cockroachdb/cockroach/pkg/sql/rowcontainer" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" @@ -35,7 +34,7 @@ import ( // RowResultWriter is a thin wrapper around a RowContainer. type RowResultWriter struct { - rowContainer *rowcontainer.RowContainer + rowContainer *rowContainerHelper rowsAffected int err error } @@ -43,9 +42,7 @@ type RowResultWriter struct { var _ rowResultWriter = &RowResultWriter{} // NewRowResultWriter creates a new RowResultWriter. -func NewRowResultWriter(rowContainer *rowcontainer.RowContainer) *RowResultWriter { - // TODO(yuzefovich): consider using disk-backed row container in some cases - // (for example, in case of subqueries and apply-joins). +func NewRowResultWriter(rowContainer *rowContainerHelper) *RowResultWriter { return &RowResultWriter{rowContainer: rowContainer} } @@ -56,8 +53,10 @@ func (b *RowResultWriter) IncrementRowsAffected(ctx context.Context, n int) { // AddRow implements the rowResultWriter interface. func (b *RowResultWriter) AddRow(ctx context.Context, row tree.Datums) error { - _, err := b.rowContainer.AddRow(ctx, row) - return err + if b.rowContainer != nil { + return b.rowContainer.addRow(ctx, row) + } + return nil } // SetError is part of the rowResultWriter interface. diff --git a/pkg/sql/distsql_plan_stats.go b/pkg/sql/distsql_plan_stats.go index d6f3114ae1bc..0a4d640d0002 100644 --- a/pkg/sql/distsql_plan_stats.go +++ b/pkg/sql/distsql_plan_stats.go @@ -259,7 +259,7 @@ func (dsp *DistSQLPlanner) planAndRunCreateStats( planCtx *PlanningCtx, txn *kv.Txn, job *jobs.Job, - resultRows *RowResultWriter, + resultWriter *RowResultWriter, ) error { ctx = logtags.AddTag(ctx, "create-stats-distsql", nil) @@ -272,7 +272,7 @@ func (dsp *DistSQLPlanner) planAndRunCreateStats( recv := MakeDistSQLReceiver( ctx, - resultRows, + resultWriter, tree.DDL, evalCtx.ExecCfg.RangeDescriptorCache, txn, @@ -284,5 +284,5 @@ func (dsp *DistSQLPlanner) planAndRunCreateStats( defer recv.Release() dsp.Run(planCtx, txn, physPlan, recv, evalCtx, nil /* finishedSetupFn */)() - return resultRows.Err() + return resultWriter.Err() } diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index a17c8654cc03..a101ae51adda 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -23,7 +23,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" "github.com/cockroachdb/cockroach/pkg/server/telemetry" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/colflow" "github.com/cockroachdb/cockroach/pkg/sql/contention" "github.com/cockroachdb/cockroach/pkg/sql/distsql" @@ -33,7 +32,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" - "github.com/cockroachdb/cockroach/pkg/sql/rowcontainer" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/rowexec" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -882,18 +880,18 @@ func (dsp *DistSQLPlanner) planAndRunSubquery( // receiver, and use it and serialize the results of the subquery. The type // of the results stored in the container depends on the type of the subquery. subqueryRecv := recv.clone() - var typ colinfo.ColTypeInfo - var rows *rowcontainer.RowContainer + var typs []*types.T if subqueryPlan.execMode == rowexec.SubqueryExecModeExists { subqueryRecv.existsMode = true - typ = colinfo.ColTypeInfoFromColTypes([]*types.T{}) + typs = []*types.T{} } else { - typ = colinfo.ColTypeInfoFromColTypes(subqueryPhysPlan.GetResultTypes()) + typs = subqueryPhysPlan.GetResultTypes() } - rows = rowcontainer.NewRowContainer(subqueryMemAccount, typ) - defer rows.Close(ctx) + var rows rowContainerHelper + rows.init(typs, evalCtx, "subquery" /* opName */) + defer rows.close(ctx) - subqueryRowReceiver := NewRowResultWriter(rows) + subqueryRowReceiver := NewRowResultWriter(&rows) subqueryRecv.resultWriter = subqueryRowReceiver subqueryPlans[planIdx].started = true dsp.Run(subqueryPlanCtx, planner.txn, subqueryPhysPlan, subqueryRecv, evalCtx, nil /* finishedSetupFn */)() @@ -903,13 +901,23 @@ func (dsp *DistSQLPlanner) planAndRunSubquery( switch subqueryPlan.execMode { case rowexec.SubqueryExecModeExists: // For EXISTS expressions, all we want to know if there is at least one row. - hasRows := rows.Len() != 0 + hasRows := rows.len() != 0 subqueryPlans[planIdx].result = tree.MakeDBool(tree.DBool(hasRows)) case rowexec.SubqueryExecModeAllRows, rowexec.SubqueryExecModeAllRowsNormalized: + // TODO(yuzefovich): this is unfortunate - we're materializing all + // buffered rows into a single tuple kept in memory without any memory + // accounting. Refactor it. var result tree.DTuple - for rows.Len() > 0 { - row := rows.At(0) - rows.PopFirst(ctx) + iterator := newRowContainerIterator(ctx, rows, typs) + defer iterator.close() + for { + row, err := iterator.next() + if err != nil { + return err + } + if row == nil { + break + } if row.Len() == 1 { // This seems hokey, but if we don't do this then the subquery expands // to a tuple of tuples instead of a tuple of values and an expression @@ -926,16 +934,24 @@ func (dsp *DistSQLPlanner) planAndRunSubquery( } subqueryPlans[planIdx].result = &result case rowexec.SubqueryExecModeOneRow: - switch rows.Len() { + switch rows.len() { case 0: subqueryPlans[planIdx].result = tree.DNull case 1: - row := rows.At(0) + iterator := newRowContainerIterator(ctx, rows, typs) + defer iterator.close() + row, err := iterator.next() + if err != nil { + return err + } + if row == nil { + return errors.AssertionFailedf("iterator didn't return a row although container len is 1") + } switch row.Len() { case 1: subqueryPlans[planIdx].result = row[0] default: - subqueryPlans[planIdx].result = &tree.DTuple{D: rows.At(0)} + subqueryPlans[planIdx].result = &tree.DTuple{D: row} } default: return pgerror.Newf(pgcode.CardinalityViolation, @@ -1017,7 +1033,7 @@ func (dsp *DistSQLPlanner) PlanAndRunCascadesAndChecks( buf := plan.cascades[i].Buffer var numBufferedRows int if buf != nil { - numBufferedRows = buf.(*bufferNode).bufferedRows.Len() + numBufferedRows = buf.(*bufferNode).rows.rows.Len() if numBufferedRows == 0 { // No rows were actually modified. continue diff --git a/pkg/sql/logictest/testdata/logic_test/apply_join b/pkg/sql/logictest/testdata/logic_test/apply_join index 58b9b048c371..638dbca51017 100644 --- a/pkg/sql/logictest/testdata/logic_test/apply_join +++ b/pkg/sql/logictest/testdata/logic_test/apply_join @@ -36,6 +36,32 @@ EXECUTE a 4 four 4 four 5 five 5 five +# A test case when the right side produces no columns. +statement ok +PREPARE right_no_cols AS OPT PLAN ' +(Root + (InnerJoinApply + (Scan [(Table "t") (Cols "k,str") ]) + (Select + (Scan [(Table "u") (Cols "") ]) + [ (Eq (Var "k") (Const 2 "int") )] + ) + [] + [] + ) + (Presentation "k,str") + (NoOrdering) +)' + +query IT +EXECUTE right_no_cols +---- +2 two +2 two +2 two +2 two +2 two + # LeftJoinApply tests. statement ok diff --git a/pkg/sql/plan_columns.go b/pkg/sql/plan_columns.go index 99187e742dd6..71e1ac3cbbab 100644 --- a/pkg/sql/plan_columns.go +++ b/pkg/sql/plan_columns.go @@ -10,7 +10,10 @@ package sql -import "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" +import ( + "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" + "github.com/cockroachdb/cockroach/pkg/sql/types" +) var noColumns = make(colinfo.ResultColumns, 0) @@ -160,6 +163,17 @@ func getPlanColumns(plan planNode, mut bool) colinfo.ResultColumns { return noColumns } +// planTypes returns the types schema of the rows produced by this planNode. See +// comments on planColumns for more details. +func planTypes(plan planNode) []*types.T { + columns := planColumns(plan) + typs := make([]*types.T, len(columns)) + for i := range typs { + typs[i] = columns[i].Typ + } + return typs +} + // optColumnsSlot is a helper struct for nodes with a static signature. // It allows instances to reuse a common (shared) ResultColumns slice as long as // no read/write access is requested to the slice via planMutableColumns. diff --git a/pkg/sql/recursive_cte.go b/pkg/sql/recursive_cte.go index 6cadb4283baa..83935fd0d850 100644 --- a/pkg/sql/recursive_cte.go +++ b/pkg/sql/recursive_cte.go @@ -13,10 +13,9 @@ package sql import ( "context" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/opt/exec" - "github.com/cockroachdb/cockroach/pkg/sql/rowcontainer" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" ) // recursiveCTENode implements the logic for a recursive CTE: @@ -40,23 +39,21 @@ type recursiveCTENode struct { } type recursiveCTERun struct { + // typs is the schema of the rows produced by this CTE. + typs []*types.T // workingRows contains the rows produced by the current iteration (aka the // "working" table). - workingRows *rowcontainer.RowContainer - // nextRowIdx is the index inside workingRows of the next row to be returned - // by the operator. - nextRowIdx int + workingRows rowContainerHelper + iterator *rowContainerIterator + currentRow tree.Datums initialDone bool done bool } func (n *recursiveCTENode) startExec(params runParams) error { - n.workingRows = rowcontainer.NewRowContainer( - params.EvalContext().Mon.MakeBoundAccount(), - colinfo.ColTypeInfoFromResCols(getPlanColumns(n.initial, false /* mut */)), - ) - n.nextRowIdx = 0 + n.typs = planTypes(n.initial) + n.workingRows.init(n.typs, params.extendedEvalCtx, "cte" /* opName */) return nil } @@ -65,19 +62,23 @@ func (n *recursiveCTENode) Next(params runParams) (bool, error) { return false, err } - n.nextRowIdx++ - if !n.initialDone { - ok, err := n.initial.Next(params) - if err != nil { - return false, err - } - if ok { - if _, err = n.workingRows.AddRow(params.ctx, n.initial.Values()); err != nil { + // Fully consume the initial rows (we could have read the initial rows + // one at a time and return it in the same fashion, but that would + // require special-case behavior). + for { + ok, err := n.initial.Next(params) + if err != nil { + return false, err + } + if !ok { + break + } + if err = n.workingRows.addRow(params.ctx, n.initial.Values()); err != nil { return false, err } - return true, nil } + n.iterator = newRowContainerIterator(params.ctx, n.workingRows, n.typs) n.initialDone = true } @@ -85,52 +86,67 @@ func (n *recursiveCTENode) Next(params runParams) (bool, error) { return false, nil } - if n.workingRows.Len() == 0 { + if n.workingRows.len() == 0 { // Last iteration returned no rows. n.done = true return false, nil } - // There are more rows to return from the last iteration. - if n.nextRowIdx <= n.workingRows.Len() { + var err error + n.currentRow, err = n.iterator.next() + if err != nil { + return false, err + } + if n.currentRow != nil { + // There are more rows to return from the last iteration. return true, nil } // Let's run another iteration. + n.iterator.close() + n.iterator = nil lastWorkingRows := n.workingRows - defer lastWorkingRows.Close(params.ctx) + defer lastWorkingRows.close(params.ctx) - n.workingRows = rowcontainer.NewRowContainer( - params.EvalContext().Mon.MakeBoundAccount(), - colinfo.ColTypeInfoFromResCols(getPlanColumns(n.initial, false /* mut */)), - ) + n.workingRows = rowContainerHelper{} + n.workingRows.init(n.typs, params.extendedEvalCtx, "cte" /* opName */) // Set up a bufferNode that can be used as a reference for a scanBufferNode. buf := &bufferNode{ // The plan here is only useful for planColumns, so it's ok to always use // the initial plan. - plan: n.initial, - bufferedRows: lastWorkingRows, - label: n.label, + plan: n.initial, + typs: n.typs, + rows: lastWorkingRows, + label: n.label, } newPlan, err := n.genIterationFn(newExecFactory(params.p), buf) if err != nil { return false, err } - if err := runPlanInsidePlan(params, newPlan.(*planComponents), n.workingRows); err != nil { + if err := runPlanInsidePlan(params, newPlan.(*planComponents), &n.workingRows); err != nil { + return false, err + } + + n.iterator = newRowContainerIterator(params.ctx, n.workingRows, n.typs) + n.currentRow, err = n.iterator.next() + if err != nil { return false, err } - n.nextRowIdx = 1 - return n.workingRows.Len() > 0, nil + return n.currentRow != nil, nil } func (n *recursiveCTENode) Values() tree.Datums { - return n.workingRows.At(n.nextRowIdx - 1) + return n.currentRow } func (n *recursiveCTENode) Close(ctx context.Context) { n.initial.Close(ctx) - n.workingRows.Close(ctx) + n.workingRows.close(ctx) + if n.iterator != nil { + n.iterator.close() + n.iterator = nil + } } diff --git a/pkg/sql/rowcontainer/datum_row_container.go b/pkg/sql/rowcontainer/datum_row_container.go index 7cb7ba458e74..c6d7e522ae4b 100644 --- a/pkg/sql/rowcontainer/datum_row_container.go +++ b/pkg/sql/rowcontainer/datum_row_container.go @@ -145,10 +145,14 @@ func (c *RowContainer) Init(acc mon.BoundAccount, ti colinfo.ColTypeInfo, rowCap } } - // Precalculate the memory used for a chunk, specifically by the Datums in the - // chunk and the slice pointing at the chunk. - c.chunkMemSize = tree.SizeOfDatum * int64(c.rowsPerChunk*c.numCols) - c.chunkMemSize += tree.SizeOfDatums + if nCols > 0 { + // Precalculate the memory used for a chunk, specifically by the Datums + // in the chunk and the slice pointing at the chunk. + // Note that when there are no columns, we simply track the number of + // rows added in c.numRows and don't allocate any memory. + c.chunkMemSize = tree.SizeOfDatum * int64(c.rowsPerChunk*c.numCols) + c.chunkMemSize += tree.SizeOfDatums + } } // Clear resets the container and releases the associated memory. This allows diff --git a/pkg/sql/scrub.go b/pkg/sql/scrub.go index 169592576e16..b90a7f95b7bf 100644 --- a/pkg/sql/scrub.go +++ b/pkg/sql/scrub.go @@ -17,12 +17,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" - "github.com/cockroachdb/cockroach/pkg/sql/rowcontainer" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -58,7 +56,7 @@ type checkOperation interface { Start(params runParams) error // Next will return the next check result. The datums returned have - // the column types specified by scrubTypes, which are the valeus + // the column types specified by scrubTypes, which are the values // returned to the user. // // Next is not called if Done() is false. @@ -475,15 +473,14 @@ func createConstraintCheckOperations( return results, nil } -// scrubRunDistSQL run a distSQLPhysicalPlan plan in distSQL. If -// RowContainer is returned, the caller must close it. +// scrubRunDistSQL run a distSQLPhysicalPlan plan in distSQL. If non-nil +// rowContainerHelper is returned, the caller must close it. func scrubRunDistSQL( ctx context.Context, planCtx *PlanningCtx, p *planner, plan *PhysicalPlan, columnTypes []*types.T, -) (*rowcontainer.RowContainer, error) { - ci := colinfo.ColTypeInfoFromColTypes(columnTypes) - acc := p.extendedEvalCtx.Mon.MakeBoundAccount() - rows := rowcontainer.NewRowContainer(acc, ci) - rowResultWriter := NewRowResultWriter(rows) +) (*rowContainerHelper, error) { + var rowContainer rowContainerHelper + rowContainer.init(columnTypes, &p.extendedEvalCtx, "scrub" /* opName */) + rowResultWriter := NewRowResultWriter(&rowContainer) recv := MakeDistSQLReceiver( ctx, rowResultWriter, @@ -503,11 +500,12 @@ func scrubRunDistSQL( planCtx, p.txn, plan, recv, &evalCtxCopy, nil, /* finishedSetupFn */ )() if rowResultWriter.Err() != nil { - return rows, rowResultWriter.Err() - } else if rows.Len() == 0 { - rows.Close(ctx) + rowContainer.close(ctx) + return nil, rowResultWriter.Err() + } else if rowContainer.len() == 0 { + rowContainer.close(ctx) return nil, nil } - return rows, nil + return &rowContainer, nil } diff --git a/pkg/sql/scrub_physical.go b/pkg/sql/scrub_physical.go index 517ca3795a65..3e76a1cc5dd9 100644 --- a/pkg/sql/scrub_physical.go +++ b/pkg/sql/scrub_physical.go @@ -16,7 +16,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" - "github.com/cockroachdb/cockroach/pkg/sql/rowcontainer" "github.com/cockroachdb/cockroach/pkg/sql/rowexec" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/span" @@ -44,9 +43,12 @@ type physicalCheckOperation struct { // physicalCheckRun contains the run-time state for // physicalCheckOperation during local execution. type physicalCheckRun struct { - started bool - rows *rowcontainer.RowContainer - rowIndex int + started bool + + rows *rowContainerHelper + iterator *rowContainerIterator + // If currentRow is nil, it means that all rows have been exhausted. + currentRow tree.Datums } func newPhysicalCheckOperation( @@ -132,41 +134,46 @@ func (o *physicalCheckOperation) Start(params runParams) error { o.columns = columns o.run.started = true rows, err := scrubRunDistSQL(ctx, planCtx, params.p, physPlan, rowexec.ScrubTypes) - if err != nil { - rows.Close(ctx) + if rows == nil || err != nil { + // If either there were no rows that failed the check operation or an + // error was encountered, we short-circuit and don't set currentRow. + // This will indicate that we're done. return err } o.run.rows = rows - return nil + o.run.iterator = newRowContainerIterator(ctx, *rows, rowexec.ScrubTypes) + o.run.currentRow, err = o.run.iterator.next() + return err } // Next implements the checkOperation interface. func (o *physicalCheckOperation) Next(params runParams) (tree.Datums, error) { - row := o.run.rows.At(o.run.rowIndex) - o.run.rowIndex++ - timestamp, err := tree.MakeDTimestamp( params.extendedEvalCtx.GetStmtTimestamp(), time.Nanosecond) if err != nil { return nil, err } - details, ok := row[2].(*tree.DJSON) + details, ok := o.run.currentRow[2].(*tree.DJSON) if !ok { - return nil, errors.Errorf("expected row value 3 to be DJSON, got: %T", row[2]) + return nil, errors.Errorf("expected row value 3 to be DJSON, got: %T", o.run.currentRow[2]) } - return tree.Datums{ + res := tree.Datums{ // TODO(joey): Add the job UUID once the SCRUB command uses jobs. - tree.DNull, /* job_uuid */ - row[0], /* errorType */ + tree.DNull, /* job_uuid */ + o.run.currentRow[0], /* errorType */ tree.NewDString(o.tableName.Catalog()), tree.NewDString(o.tableName.Table()), - row[1], /* primaryKey */ + o.run.currentRow[1], /* primaryKey */ timestamp, tree.DBoolFalse, details, - }, nil + } + + // Advance to the next row. + o.run.currentRow, err = o.run.iterator.next() + return res, err } // Started implements the checkOperation interface. @@ -175,13 +182,18 @@ func (o *physicalCheckOperation) Started() bool { } // Done implements the checkOperation interface. -func (o *physicalCheckOperation) Done(ctx context.Context) bool { - return o.run.rows == nil || o.run.rowIndex >= o.run.rows.Len() +func (o *physicalCheckOperation) Done(context.Context) bool { + return o.run.currentRow == nil } // Close implements the checkOperation interface. func (o *physicalCheckOperation) Close(ctx context.Context) { if o.run.rows != nil { - o.run.rows.Close(ctx) + o.run.rows.close(ctx) + o.run.rows = nil + } + if o.run.iterator != nil { + o.run.iterator.close() + o.run.iterator = nil } } From 333ae238c52d43694030e80a154d1bfac133f5cd Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Fri, 2 Apr 2021 15:21:34 -0400 Subject: [PATCH 2/8] kvserverpb: update comment on RER.Timestamp It was lying about helping with lease protection. Release note: None --- pkg/kv/kvserver/kvserverpb/proposer_kv.pb.go | 6 +++--- pkg/kv/kvserver/kvserverpb/proposer_kv.proto | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/kv/kvserver/kvserverpb/proposer_kv.pb.go b/pkg/kv/kvserver/kvserverpb/proposer_kv.pb.go index 2233575e1af2..3b4049be469e 100644 --- a/pkg/kv/kvserver/kvserverpb/proposer_kv.pb.go +++ b/pkg/kv/kvserver/kvserverpb/proposer_kv.pb.go @@ -285,9 +285,9 @@ type ReplicatedEvalResult struct { Merge *Merge `protobuf:"bytes,4,opt,name=merge,proto3" json:"merge,omitempty"` ComputeChecksum *ComputeChecksum `protobuf:"bytes,21,opt,name=compute_checksum,json=computeChecksum,proto3" json:"compute_checksum,omitempty"` IsLeaseRequest bool `protobuf:"varint,6,opt,name=is_lease_request,json=isLeaseRequest,proto3" json:"is_lease_request,omitempty"` - // Duplicates BatchRequest.Timestamp for proposer-evaluated KV. Used - // to verify the validity of the command (for lease coverage and GC - // threshold). + // The BatchRequest.Timestamp of the request that produced this command. Used + // to verify the validity of the command against the GC threshold and to + // update the followers' clocks. Timestamp hlc.Timestamp `protobuf:"bytes,8,opt,name=timestamp,proto3" json:"timestamp"` // The stats delta corresponding to the data in this WriteBatch. On // a split, contains only the contributions to the left-hand side. diff --git a/pkg/kv/kvserver/kvserverpb/proposer_kv.proto b/pkg/kv/kvserver/kvserverpb/proposer_kv.proto index 108991549ddf..dbc8a20fd232 100644 --- a/pkg/kv/kvserver/kvserverpb/proposer_kv.proto +++ b/pkg/kv/kvserver/kvserverpb/proposer_kv.proto @@ -125,9 +125,9 @@ message ReplicatedEvalResult { Merge merge = 4; ComputeChecksum compute_checksum = 21; bool is_lease_request = 6; - // Duplicates BatchRequest.Timestamp for proposer-evaluated KV. Used - // to verify the validity of the command (for lease coverage and GC - // threshold). + // The BatchRequest.Timestamp of the request that produced this command. Used + // to verify the validity of the command against the GC threshold and to + // update the followers' clocks. util.hlc.Timestamp timestamp = 8 [(gogoproto.nullable) = false]; // The stats delta corresponding to the data in this WriteBatch. On // a split, contains only the contributions to the left-hand side. From 20d6d05a1ce10b61f33ed1bc9f80635d7abbb4fc Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Fri, 2 Apr 2021 15:52:54 -0400 Subject: [PATCH 3/8] kvserver: switch meaning of RER.Timestamp The ReplicatedEvalResult.Timestamp used to carry the read timestamp of the request that proposed the respective command. This patch renames it to WriteTimestamp and sets it accordingly. The field is used for two purposes: for bumping the clock on followers such that they don't have values above their clock, and for checking that we're not writing below the GC threshold. Both of these use cases actually want the write timestamps, so they were broken. The second use seems dubious to me (I don't think it's actually needed) but this patch does not take a position on it beyond adding a TODO. Beyond fixing the existing uses of this field, putting the write timestamp in every Raft command has the added benefit that we can use it to assert below Raft that nobody tries to write below the closed timestamp. Before this patch, only the leaseholder was able to do this assertion (because it was the only replica that had access to the write timestamp) and so the leaseholder had to do it as a non-deterministic error (and non-deterministic errors are nasty, even when they're assertion failures). This patch will help turn the assertion into a deterministic one in the future. In addition, by clarifying the meaning of this field, this patch opens the door for #62569 to muddy the field back and give it a special meaning for lease transfers - the lease start time. Also mentioning #55293 because this patch lightly touches on the interaction between requests and the GC threshold. Release note: None --- pkg/kv/kvserver/client_raft_test.go | 2 +- pkg/kv/kvserver/client_replica_test.go | 2 +- pkg/kv/kvserver/kvserverpb/proposer_kv.pb.go | 208 +++++++++--------- pkg/kv/kvserver/kvserverpb/proposer_kv.proto | 8 +- pkg/kv/kvserver/replica.go | 4 +- pkg/kv/kvserver/replica_application_result.go | 4 +- .../replica_application_state_machine.go | 15 +- .../replica_application_state_machine_test.go | 2 +- pkg/kv/kvserver/replica_proposal.go | 6 +- 9 files changed, 129 insertions(+), 122 deletions(-) diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index 338048c3c98d..80e72eca642b 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -4702,7 +4702,7 @@ func TestAckWriteBeforeApplication(t *testing.T) { blockPreApplication, blockPostApplication := make(chan struct{}), make(chan struct{}) applyFilterFn := func(ch chan struct{}) kvserverbase.ReplicaApplyFilter { return func(filterArgs kvserverbase.ApplyFilterArgs) (int, *roachpb.Error) { - if atomic.LoadInt32(&filterActive) == 1 && filterArgs.Timestamp == magicTS { + if atomic.LoadInt32(&filterActive) == 1 && filterArgs.WriteTimestamp == magicTS { <-ch } return 0, nil diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index 9e0e2dffb00f..efc66a769264 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -3342,7 +3342,7 @@ func TestProposalOverhead(t *testing.T) { } // Sometime the logical portion of the timestamp can be non-zero which makes // the overhead non-deterministic. - args.Cmd.ReplicatedEvalResult.Timestamp.Logical = 0 + args.Cmd.ReplicatedEvalResult.WriteTimestamp.Logical = 0 atomic.StoreUint32(&overhead, uint32(args.Cmd.Size()-args.Cmd.WriteBatch.Size())) // We don't want to print the WriteBatch because it's explicitly // excluded from the size computation. Nil'ing it out does not diff --git a/pkg/kv/kvserver/kvserverpb/proposer_kv.pb.go b/pkg/kv/kvserver/kvserverpb/proposer_kv.pb.go index 3b4049be469e..23909fca7afe 100644 --- a/pkg/kv/kvserver/kvserverpb/proposer_kv.pb.go +++ b/pkg/kv/kvserver/kvserverpb/proposer_kv.pb.go @@ -285,10 +285,10 @@ type ReplicatedEvalResult struct { Merge *Merge `protobuf:"bytes,4,opt,name=merge,proto3" json:"merge,omitempty"` ComputeChecksum *ComputeChecksum `protobuf:"bytes,21,opt,name=compute_checksum,json=computeChecksum,proto3" json:"compute_checksum,omitempty"` IsLeaseRequest bool `protobuf:"varint,6,opt,name=is_lease_request,json=isLeaseRequest,proto3" json:"is_lease_request,omitempty"` - // The BatchRequest.Timestamp of the request that produced this command. Used - // to verify the validity of the command against the GC threshold and to - // update the followers' clocks. - Timestamp hlc.Timestamp `protobuf:"bytes,8,opt,name=timestamp,proto3" json:"timestamp"` + // The timestamp at which this command is writing. Used to verify the validity + // of the command against the GC threshold and to update the followers' + // clocks. + WriteTimestamp hlc.Timestamp `protobuf:"bytes,8,opt,name=write_timestamp,json=writeTimestamp,proto3" json:"write_timestamp"` // The stats delta corresponding to the data in this WriteBatch. On // a split, contains only the contributions to the left-hand side. DeprecatedDelta *enginepb.MVCCStats `protobuf:"bytes,10,opt,name=deprecated_delta,json=deprecatedDelta,proto3" json:"deprecated_delta,omitempty"` @@ -704,102 +704,102 @@ func init() { } var fileDescriptor_19df0b186dd19269 = []byte{ - // 1510 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x57, 0xd1, 0x6f, 0x13, 0x47, - 0x1a, 0x8f, 0x63, 0x3b, 0x59, 0x8f, 0x13, 0x7b, 0x33, 0x04, 0xd8, 0xcb, 0xdd, 0xd9, 0x91, 0x8f, - 0x43, 0x39, 0x8e, 0x5b, 0xa3, 0xe4, 0x4e, 0x42, 0x1c, 0xaa, 0x88, 0x1d, 0x28, 0x31, 0x49, 0x0a, - 0xe3, 0x40, 0x2b, 0x5a, 0x69, 0x35, 0xde, 0x1d, 0xd6, 0x5b, 0xaf, 0x77, 0x97, 0x99, 0xb1, 0x21, - 0x7f, 0x45, 0x5b, 0xa9, 0x52, 0xfb, 0x54, 0x78, 0xec, 0x9f, 0xc2, 0x23, 0x8f, 0xa8, 0x0f, 0x56, - 0x09, 0x2f, 0xfd, 0x1b, 0x78, 0xaa, 0x66, 0x76, 0xd6, 0x5e, 0xa3, 0xd0, 0x18, 0xfa, 0x36, 0xfb, - 0xcd, 0x7c, 0xbf, 0x6f, 0xe6, 0xfb, 0x7e, 0xdf, 0x6f, 0x66, 0xc1, 0x46, 0x6f, 0x58, 0xef, 0x0d, - 0x19, 0xa1, 0x43, 0x42, 0xc7, 0x83, 0xa8, 0x53, 0x8f, 0x68, 0x18, 0x85, 0x8c, 0x50, 0xab, 0x37, - 0x34, 0x23, 0x1a, 0xf2, 0x10, 0x56, 0xed, 0xd0, 0xee, 0xd1, 0x10, 0xdb, 0x5d, 0xb3, 0x37, 0x34, - 0x93, 0xa5, 0x26, 0xe3, 0x21, 0xc5, 0x2e, 0x89, 0x3a, 0x6b, 0x2b, 0x72, 0x32, 0xea, 0xd4, 0x71, - 0xe4, 0xc5, 0x3e, 0x6b, 0x30, 0x31, 0x39, 0x98, 0x63, 0x65, 0x3b, 0x97, 0xd8, 0xfa, 0x84, 0xe3, - 0x94, 0xfd, 0xaf, 0x0a, 0xa9, 0x4e, 0x02, 0xd7, 0x0b, 0x88, 0x58, 0x30, 0xb4, 0x6d, 0x35, 0xf9, - 0xb7, 0x13, 0x27, 0xb7, 0xd4, 0x6c, 0xed, 0x3d, 0x87, 0x60, 0x1c, 0x73, 0xa2, 0xd6, 0x5c, 0x4a, - 0xaf, 0xa1, 0x04, 0x3b, 0x6c, 0xd0, 0xef, 0x63, 0x7a, 0x54, 0xa7, 0x4c, 0xac, 0x8c, 0x3f, 0xd4, - 0x5a, 0x63, 0xc0, 0x3d, 0xbf, 0xde, 0xf5, 0xed, 0x3a, 0xf7, 0xfa, 0x84, 0x71, 0xdc, 0x8f, 0xd4, - 0xcc, 0xaa, 0x1b, 0xba, 0xa1, 0x1c, 0xd6, 0xc5, 0x28, 0xb6, 0xd6, 0x7e, 0xce, 0x80, 0x7c, 0x3b, - 0xf2, 0x3d, 0x0e, 0x9b, 0x60, 0x91, 0x53, 0xcf, 0x75, 0x09, 0x35, 0x32, 0xeb, 0x99, 0x8d, 0xe2, - 0x66, 0xd5, 0x9c, 0xa4, 0x4d, 0x1d, 0xdc, 0x94, 0x4b, 0x0f, 0xe3, 0x65, 0x0d, 0xed, 0xc5, 0xa8, - 0x3a, 0xf7, 0x72, 0x54, 0xcd, 0xa0, 0xc4, 0x13, 0x1e, 0x82, 0x02, 0xed, 0x32, 0xcb, 0x21, 0x3e, - 0xc7, 0xc6, 0xbc, 0x84, 0xf9, 0x67, 0x0a, 0x46, 0xa5, 0xc2, 0x4c, 0x52, 0x61, 0xee, 0x3f, 0x68, - 0x36, 0xdb, 0x1c, 0x73, 0xd6, 0xd0, 0x05, 0xd8, 0xf1, 0xa8, 0xaa, 0xa1, 0xdb, 0xed, 0x1d, 0xe1, - 0x8e, 0x34, 0xda, 0x65, 0x72, 0x74, 0x2d, 0xf7, 0xdb, 0xf3, 0x6a, 0xa6, 0x86, 0x40, 0x7e, 0x9f, - 0x50, 0x97, 0xcc, 0xb6, 0x53, 0xb9, 0xf4, 0xfd, 0x3b, 0x55, 0x98, 0x0e, 0x28, 0x35, 0xbb, 0x38, - 0x70, 0x09, 0x22, 0x91, 0xef, 0xd9, 0x98, 0xc1, 0xbd, 0x77, 0xc1, 0x37, 0x4e, 0x00, 0x9f, 0xf6, - 0xf9, 0xa3, 0x28, 0x3f, 0x3e, 0xaf, 0xce, 0xd5, 0x5e, 0xcf, 0x83, 0x72, 0x33, 0xec, 0x47, 0x03, - 0x4e, 0x9a, 0x5d, 0x62, 0xf7, 0xd8, 0xa0, 0x0f, 0xbf, 0x06, 0x45, 0x5b, 0x8d, 0x2d, 0xcf, 0x91, - 0xb1, 0x96, 0x1a, 0xbb, 0x02, 0xe1, 0x97, 0x51, 0x75, 0xcb, 0xf5, 0x78, 0x77, 0xd0, 0x31, 0xed, - 0xb0, 0x5f, 0x1f, 0x47, 0x77, 0x3a, 0x93, 0x71, 0x3d, 0xea, 0xb9, 0x75, 0x59, 0xea, 0xc1, 0xc0, - 0x73, 0xcc, 0xfb, 0xf7, 0x77, 0x77, 0x8e, 0x47, 0x55, 0x90, 0xa0, 0xef, 0xee, 0x20, 0x90, 0xa0, - 0xef, 0x3a, 0xd0, 0x00, 0x8b, 0x43, 0x42, 0x99, 0x17, 0x06, 0x46, 0x7e, 0x3d, 0xb3, 0xb1, 0x8c, - 0x92, 0x4f, 0xf8, 0x0f, 0xb0, 0xcc, 0xf0, 0x90, 0x58, 0x2c, 0xc0, 0x11, 0xeb, 0x86, 0x5c, 0xd6, - 0x4c, 0x43, 0x4b, 0xc2, 0xd8, 0x56, 0x36, 0xb8, 0x05, 0x72, 0xfd, 0xd0, 0x21, 0x46, 0x76, 0x3d, - 0xb3, 0x51, 0x3a, 0x31, 0xd9, 0x49, 0xdc, 0xfd, 0xd0, 0x21, 0x48, 0x2e, 0x86, 0x15, 0x10, 0xef, - 0x20, 0x0a, 0xbd, 0x80, 0x1b, 0x39, 0x09, 0x9b, 0xb2, 0xc0, 0xdb, 0xa0, 0xc0, 0x09, 0xed, 0x7b, - 0x01, 0xe6, 0xc4, 0x58, 0x58, 0xcf, 0x6e, 0x14, 0x37, 0x2f, 0x9c, 0x80, 0xac, 0x72, 0xbc, 0x43, - 0x98, 0x4d, 0xbd, 0x88, 0x87, 0xb4, 0x91, 0x13, 0x39, 0x42, 0x13, 0x67, 0x55, 0xc9, 0x07, 0x00, - 0x88, 0x14, 0x63, 0x9b, 0x8b, 0x73, 0xad, 0x82, 0x7c, 0xe7, 0x88, 0x13, 0x26, 0xf3, 0x9a, 0x45, - 0xf1, 0x07, 0xbc, 0x0c, 0x20, 0x1b, 0xb8, 0x2e, 0x61, 0x9c, 0x38, 0x16, 0xe6, 0x56, 0x80, 0x83, - 0x90, 0xc9, 0x23, 0x67, 0x91, 0x3e, 0x9e, 0xd9, 0xe6, 0x07, 0xc2, 0xae, 0x70, 0xbf, 0x9f, 0x07, - 0x67, 0xda, 0xc9, 0x54, 0x2a, 0xc2, 0x3d, 0x50, 0x60, 0x1c, 0x53, 0x6e, 0xf5, 0xc8, 0x91, 0xaa, - 0xde, 0x7f, 0xdf, 0x8e, 0xaa, 0x57, 0x66, 0xaa, 0x5c, 0x72, 0xba, 0x3b, 0xe4, 0x08, 0x69, 0x12, - 0xe6, 0x0e, 0x39, 0x82, 0xfb, 0x60, 0x91, 0x04, 0x8e, 0x04, 0x9c, 0xff, 0x13, 0x80, 0x0b, 0x24, - 0x70, 0x04, 0xdc, 0x7d, 0x00, 0xec, 0xf1, 0x7e, 0x65, 0xf1, 0x8a, 0x9b, 0xff, 0x36, 0x4f, 0x91, - 0x42, 0x73, 0x72, 0xc4, 0x14, 0x9f, 0x53, 0x40, 0x2a, 0x2d, 0xcf, 0x0a, 0x60, 0x55, 0xd5, 0x86, - 0x13, 0xe7, 0xe6, 0x10, 0xfb, 0x88, 0xb0, 0x81, 0x2f, 0x64, 0x24, 0x2f, 0xb5, 0x4b, 0x75, 0xff, - 0x7f, 0x4e, 0x0d, 0xa8, 0x50, 0x84, 0x0a, 0x10, 0x14, 0xfb, 0xc2, 0xeb, 0x20, 0xcf, 0x84, 0xd2, - 0xa8, 0x5d, 0x5f, 0x3c, 0x15, 0x44, 0xea, 0x12, 0x8a, 0x9d, 0x84, 0x77, 0x5f, 0x74, 0xbf, 0x64, - 0xdd, 0x2c, 0xde, 0x52, 0x2b, 0x50, 0xec, 0x04, 0xbf, 0x04, 0xba, 0x1d, 0xf7, 0xaa, 0x95, 0xb4, - 0x90, 0x71, 0x56, 0x02, 0x5d, 0x99, 0x29, 0x79, 0xa9, 0x26, 0x47, 0x65, 0xfb, 0x9d, 0xae, 0xdf, - 0x00, 0xba, 0xc7, 0x2c, 0x9f, 0x60, 0x46, 0x2c, 0x4a, 0x1e, 0x0f, 0x08, 0xe3, 0xc6, 0x82, 0xec, - 0x8d, 0x92, 0xc7, 0xf6, 0x84, 0x19, 0xc5, 0x56, 0xb8, 0x0d, 0x0a, 0x63, 0x05, 0x37, 0x34, 0x19, - 0xff, 0xef, 0xa9, 0xf8, 0xa2, 0xf7, 0xcd, 0xae, 0x6f, 0x9b, 0x87, 0xc9, 0xa2, 0x71, 0x63, 0x24, - 0x06, 0x78, 0x17, 0xe8, 0x0e, 0x89, 0x28, 0x91, 0x25, 0x52, 0x9a, 0x0c, 0x3e, 0x40, 0x93, 0x51, - 0x79, 0xe2, 0x2e, 0x85, 0x18, 0xde, 0x02, 0xf9, 0x18, 0x06, 0x4a, 0x98, 0x4b, 0x33, 0xc1, 0x48, - 0x57, 0xb5, 0xbb, 0xd8, 0x1d, 0x7e, 0x01, 0xca, 0xb6, 0x94, 0x50, 0x8b, 0x2a, 0x0d, 0x35, 0x96, - 0x24, 0x62, 0xfd, 0xf4, 0x14, 0x4f, 0x49, 0x2f, 0x2a, 0xd9, 0xd3, 0xf2, 0x7d, 0x01, 0x94, 0x28, - 0x7e, 0xc4, 0x2d, 0x3f, 0x74, 0xd5, 0x89, 0x97, 0x65, 0x7b, 0x2f, 0x09, 0xeb, 0x5e, 0xe8, 0xc6, - 0xe7, 0x78, 0x0c, 0x8a, 0xd8, 0x71, 0x2c, 0xc6, 0x38, 0xee, 0xf8, 0xc4, 0x58, 0x91, 0xb1, 0x6f, - 0xcc, 0x4a, 0xd5, 0x29, 0xc2, 0x9b, 0xdb, 0x8e, 0xd3, 0x6e, 0x1f, 0x0a, 0x9c, 0x46, 0x49, 0x68, - 0xf0, 0xe4, 0x1b, 0x01, 0xec, 0x38, 0xed, 0x38, 0x06, 0xfc, 0x26, 0x03, 0xce, 0x44, 0x94, 0x0c, - 0x55, 0xf1, 0xe3, 0x47, 0x0a, 0xf6, 0x8d, 0xd5, 0x59, 0x4a, 0x7b, 0xe3, 0xed, 0xa8, 0x7a, 0x7d, - 0xf6, 0x3b, 0x41, 0x38, 0x37, 0xfd, 0xd0, 0xee, 0x8d, 0x11, 0xd0, 0x8a, 0x88, 0x2d, 0x09, 0x76, - 0x57, 0x45, 0x86, 0x5f, 0x01, 0x18, 0x51, 0x2f, 0xa4, 0x96, 0x78, 0x52, 0x58, 0xea, 0x19, 0x61, - 0x9c, 0x93, 0xfb, 0x31, 0xdf, 0x93, 0x8b, 0xd4, 0xeb, 0xc3, 0x44, 0x04, 0x3b, 0xed, 0x78, 0x8c, - 0x74, 0x89, 0x94, 0xb2, 0xac, 0x7d, 0x0a, 0x52, 0x99, 0x80, 0x10, 0xe4, 0xc4, 0x7b, 0x29, 0x16, - 0x4a, 0x24, 0xc7, 0xb0, 0x0a, 0xf2, 0x36, 0xb5, 0xb7, 0x36, 0xa5, 0x52, 0x2c, 0x37, 0x0a, 0xc7, - 0xa3, 0x6a, 0xbe, 0x89, 0x9a, 0x5b, 0x9b, 0x28, 0xb6, 0xc7, 0x4a, 0xd3, 0xca, 0x69, 0x19, 0x7d, - 0xbe, 0x95, 0xd3, 0xf2, 0xfa, 0x42, 0x2b, 0xa7, 0x2d, 0xea, 0x5a, 0x2b, 0xa7, 0x15, 0x74, 0xd0, - 0xca, 0x69, 0x25, 0xbd, 0xdc, 0xca, 0x69, 0x65, 0x5d, 0x6f, 0xe5, 0x34, 0x5d, 0x5f, 0x69, 0xe5, - 0xb4, 0x33, 0xfa, 0x6a, 0x6b, 0x41, 0xfb, 0xee, 0x40, 0xff, 0xe9, 0xa0, 0xb6, 0x0e, 0xc0, 0xe7, - 0xd4, 0xe3, 0xa4, 0x81, 0xb9, 0xdd, 0x3d, 0x69, 0x03, 0xb5, 0x7b, 0x60, 0x69, 0x2f, 0x74, 0x3d, - 0x1b, 0xfb, 0x9f, 0x45, 0x7b, 0xa1, 0x0b, 0xb7, 0x41, 0x36, 0x8c, 0xc4, 0x95, 0x21, 0x2e, 0xa3, - 0x7f, 0x9d, 0xc6, 0xed, 0xb1, 0xab, 0xa2, 0xb6, 0xf0, 0xad, 0xfd, 0xb0, 0x00, 0x8a, 0x08, 0x3f, - 0xe2, 0xcd, 0xb0, 0xdf, 0xc7, 0x81, 0x03, 0x87, 0xe0, 0xfc, 0xf8, 0x39, 0x1a, 0x17, 0x9e, 0x89, - 0xfe, 0x0e, 0x6c, 0x22, 0xdb, 0x3e, 0xdb, 0xf8, 0xe4, 0xed, 0xa8, 0x7a, 0xed, 0x83, 0x24, 0x5e, - 0x16, 0xb1, 0xad, 0x50, 0xd0, 0xd9, 0x04, 0x7e, 0xca, 0x0c, 0x0f, 0xc1, 0x5f, 0x52, 0xad, 0x3f, - 0xbd, 0x05, 0xf9, 0x06, 0x28, 0x6e, 0x1a, 0x27, 0xdc, 0xb6, 0xb1, 0x02, 0x9d, 0x9f, 0xb8, 0xde, - 0x4d, 0xa3, 0xc3, 0x8b, 0xa0, 0xdc, 0xc7, 0x4f, 0xd5, 0x41, 0xbc, 0xc0, 0x21, 0x4f, 0xa5, 0xc4, - 0xe6, 0xd0, 0x72, 0x1f, 0x3f, 0x95, 0x4b, 0x76, 0x85, 0x11, 0xde, 0x06, 0xba, 0xed, 0x87, 0x8c, - 0x38, 0xd6, 0x44, 0xc2, 0x56, 0x66, 0xe0, 0x39, 0x2a, 0xc7, 0x6e, 0x63, 0x03, 0x7c, 0x0c, 0xce, - 0xd1, 0x71, 0xd3, 0x59, 0x64, 0x88, 0x7d, 0x8b, 0xca, 0xb6, 0x93, 0x6d, 0x5d, 0xdc, 0xfc, 0xdf, - 0x47, 0xf5, 0xac, 0xaa, 0xd8, 0x2a, 0x3d, 0xe9, 0x02, 0xdb, 0x03, 0xc5, 0x27, 0x82, 0x37, 0x56, - 0x47, 0x10, 0xc7, 0x28, 0xcd, 0x78, 0x6f, 0x4e, 0xb8, 0x86, 0xc0, 0x93, 0x09, 0xef, 0xda, 0xa0, - 0xe4, 0xc7, 0x44, 0xb1, 0xc2, 0x48, 0xa8, 0x92, 0x51, 0x9e, 0xf1, 0x5e, 0x4c, 0x53, 0x13, 0x2d, - 0xf9, 0x69, 0xa2, 0x3e, 0x04, 0x80, 0x53, 0x6c, 0x13, 0x4b, 0x52, 0x5a, 0x97, 0x7c, 0xfd, 0xff, - 0xe9, 0x99, 0x98, 0xf0, 0xd2, 0x3c, 0x14, 0xee, 0x3b, 0x98, 0xe3, 0x9b, 0x01, 0xa7, 0x47, 0xa8, - 0xc0, 0x93, 0xef, 0xb5, 0xeb, 0xa0, 0x34, 0x3d, 0x09, 0x75, 0x90, 0x4d, 0xde, 0x38, 0x05, 0x24, - 0x86, 0xe2, 0x75, 0x35, 0xc4, 0xfe, 0x20, 0xbe, 0xe3, 0x0b, 0x28, 0xfe, 0xb8, 0x36, 0x7f, 0x55, - 0x34, 0x6b, 0x56, 0xcf, 0x8d, 0x5b, 0x76, 0x5e, 0xcf, 0xc6, 0xed, 0xf8, 0xec, 0xa0, 0x76, 0x15, - 0x94, 0xf6, 0x15, 0x49, 0x6e, 0x85, 0x21, 0x27, 0x74, 0x56, 0x36, 0xd5, 0x5c, 0x70, 0xb6, 0x39, - 0x4d, 0x0b, 0x05, 0x70, 0xf0, 0x91, 0x34, 0x53, 0xe5, 0x7f, 0x97, 0x6c, 0x8d, 0xcb, 0x2f, 0x5e, - 0x57, 0xe6, 0x5e, 0x1c, 0x57, 0x32, 0x2f, 0x8f, 0x2b, 0x99, 0x57, 0xc7, 0x95, 0xcc, 0xaf, 0xc7, - 0x95, 0xcc, 0xb7, 0x6f, 0x2a, 0x73, 0x2f, 0xdf, 0x54, 0xe6, 0x5e, 0xbd, 0xa9, 0xcc, 0x3d, 0x04, - 0x93, 0x3f, 0xb4, 0xce, 0x82, 0xfc, 0x81, 0xda, 0xfa, 0x3d, 0x00, 0x00, 0xff, 0xff, 0xeb, 0xe6, - 0x84, 0xc3, 0x87, 0x0e, 0x00, 0x00, + // 1515 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x57, 0x51, 0x6f, 0x13, 0xc7, + 0x16, 0x8e, 0x63, 0x3b, 0x59, 0x8f, 0x13, 0x7b, 0x33, 0x04, 0xd8, 0x9b, 0x7b, 0xaf, 0x1d, 0xf9, + 0x72, 0x51, 0x2e, 0x97, 0xae, 0x51, 0xd2, 0x4a, 0x88, 0xa2, 0x0a, 0xec, 0x40, 0x89, 0x49, 0x52, + 0x18, 0x07, 0x5a, 0xd1, 0x4a, 0xab, 0xf1, 0xee, 0xb0, 0xde, 0x7a, 0xbd, 0xbb, 0xcc, 0x8c, 0x0d, + 0xf9, 0x15, 0x6d, 0xa5, 0x4a, 0xed, 0x53, 0xcb, 0x63, 0xd5, 0x5f, 0xc2, 0x23, 0x8f, 0xa8, 0x0f, + 0x56, 0x09, 0x2f, 0xfd, 0x0d, 0x3c, 0x55, 0x33, 0x3b, 0x6b, 0xaf, 0x51, 0x68, 0x0c, 0x7d, 0x9b, + 0x3d, 0x73, 0xce, 0x37, 0x33, 0xe7, 0x7c, 0xe7, 0x9b, 0x59, 0xb0, 0xd1, 0x1b, 0xd6, 0x7b, 0x43, + 0x46, 0xe8, 0x90, 0xd0, 0xf1, 0x20, 0xea, 0xd4, 0x23, 0x1a, 0x46, 0x21, 0x23, 0xd4, 0xea, 0x0d, + 0xcd, 0x88, 0x86, 0x3c, 0x84, 0x55, 0x3b, 0xb4, 0x7b, 0x34, 0xc4, 0x76, 0xd7, 0xec, 0x0d, 0xcd, + 0xc4, 0xd5, 0x64, 0x3c, 0xa4, 0xd8, 0x25, 0x51, 0x67, 0x6d, 0x45, 0x4e, 0x46, 0x9d, 0x3a, 0x8e, + 0xbc, 0x38, 0x66, 0x0d, 0x26, 0x26, 0x07, 0x73, 0xac, 0x6c, 0x67, 0x12, 0x5b, 0x9f, 0x70, 0x9c, + 0xb2, 0xff, 0x53, 0x21, 0xd5, 0x49, 0xe0, 0x7a, 0x01, 0x11, 0x0e, 0x43, 0xdb, 0x56, 0x93, 0xff, + 0x3a, 0x76, 0x72, 0x4b, 0xcd, 0xd6, 0xde, 0x72, 0x08, 0xc6, 0x31, 0x27, 0xca, 0xe7, 0x42, 0xda, + 0x87, 0x12, 0xec, 0xb0, 0x41, 0xbf, 0x8f, 0xe9, 0x61, 0x9d, 0x32, 0xe1, 0x19, 0x7f, 0x28, 0x5f, + 0x63, 0xc0, 0x3d, 0xbf, 0xde, 0xf5, 0xed, 0x3a, 0xf7, 0xfa, 0x84, 0x71, 0xdc, 0x8f, 0xd4, 0xcc, + 0xaa, 0x1b, 0xba, 0xa1, 0x1c, 0xd6, 0xc5, 0x28, 0xb6, 0xd6, 0x7e, 0xc9, 0x80, 0x7c, 0x3b, 0xf2, + 0x3d, 0x0e, 0x9b, 0x60, 0x91, 0x53, 0xcf, 0x75, 0x09, 0x35, 0x32, 0xeb, 0x99, 0x8d, 0xe2, 0x66, + 0xd5, 0x9c, 0xa4, 0x4d, 0x1d, 0xdc, 0x94, 0xae, 0x07, 0xb1, 0x5b, 0x43, 0x7b, 0x36, 0xaa, 0xce, + 0x3d, 0x1f, 0x55, 0x33, 0x28, 0x89, 0x84, 0x07, 0xa0, 0x40, 0xbb, 0xcc, 0x72, 0x88, 0xcf, 0xb1, + 0x31, 0x2f, 0x61, 0xfe, 0x9b, 0x82, 0x51, 0xa9, 0x30, 0x93, 0x54, 0x98, 0x7b, 0xf7, 0x9b, 0xcd, + 0x36, 0xc7, 0x9c, 0x35, 0x74, 0x01, 0x76, 0x34, 0xaa, 0x6a, 0xe8, 0x56, 0x7b, 0x5b, 0x84, 0x23, + 0x8d, 0x76, 0x99, 0x1c, 0x5d, 0xc9, 0xfd, 0xf1, 0xb4, 0x9a, 0xa9, 0x21, 0x90, 0xdf, 0x23, 0xd4, + 0x25, 0xb3, 0xed, 0x54, 0xba, 0xbe, 0x7d, 0xa7, 0x0a, 0xd3, 0x01, 0xa5, 0x66, 0x17, 0x07, 0x2e, + 0x41, 0x24, 0xf2, 0x3d, 0x1b, 0x33, 0xb8, 0xfb, 0x26, 0xf8, 0xc6, 0x31, 0xe0, 0xd3, 0x31, 0x7f, + 0xb5, 0xca, 0x8f, 0x4f, 0xab, 0x73, 0xb5, 0x97, 0xf3, 0xa0, 0xdc, 0x0c, 0xfb, 0xd1, 0x80, 0x93, + 0x66, 0x97, 0xd8, 0x3d, 0x36, 0xe8, 0xc3, 0xaf, 0x41, 0xd1, 0x56, 0x63, 0xcb, 0x73, 0xe4, 0x5a, + 0x4b, 0x8d, 0x1d, 0x81, 0xf0, 0xdb, 0xa8, 0xba, 0xe5, 0x7a, 0xbc, 0x3b, 0xe8, 0x98, 0x76, 0xd8, + 0xaf, 0x8f, 0x57, 0x77, 0x3a, 0x93, 0x71, 0x3d, 0xea, 0xb9, 0x75, 0x59, 0xea, 0xc1, 0xc0, 0x73, + 0xcc, 0x7b, 0xf7, 0x76, 0xb6, 0x8f, 0x46, 0x55, 0x90, 0xa0, 0xef, 0x6c, 0x23, 0x90, 0xa0, 0xef, + 0x38, 0xd0, 0x00, 0x8b, 0x43, 0x42, 0x99, 0x17, 0x06, 0x46, 0x7e, 0x3d, 0xb3, 0xb1, 0x8c, 0x92, + 0x4f, 0xf8, 0x1f, 0xb0, 0xcc, 0xf0, 0x90, 0x58, 0x2c, 0xc0, 0x11, 0xeb, 0x86, 0x5c, 0xd6, 0x4c, + 0x43, 0x4b, 0xc2, 0xd8, 0x56, 0x36, 0xb8, 0x05, 0x72, 0xfd, 0xd0, 0x21, 0x46, 0x76, 0x3d, 0xb3, + 0x51, 0x3a, 0x36, 0xd9, 0xc9, 0xba, 0x7b, 0xa1, 0x43, 0x90, 0x74, 0x86, 0x15, 0x10, 0xef, 0x20, + 0x0a, 0xbd, 0x80, 0x1b, 0x39, 0x09, 0x9b, 0xb2, 0xc0, 0x5b, 0xa0, 0xc0, 0x09, 0xed, 0x7b, 0x01, + 0xe6, 0xc4, 0x58, 0x58, 0xcf, 0x6e, 0x14, 0x37, 0xcf, 0x1d, 0x83, 0xac, 0x72, 0xbc, 0x4d, 0x98, + 0x4d, 0xbd, 0x88, 0x87, 0xb4, 0x91, 0x13, 0x39, 0x42, 0x93, 0x60, 0x55, 0xc9, 0xfb, 0x00, 0x88, + 0x14, 0x63, 0x9b, 0x8b, 0x73, 0xad, 0x82, 0x7c, 0xe7, 0x90, 0x13, 0x26, 0xf3, 0x9a, 0x45, 0xf1, + 0x07, 0xbc, 0x08, 0x20, 0x1b, 0xb8, 0x2e, 0x61, 0x9c, 0x38, 0x16, 0xe6, 0x56, 0x80, 0x83, 0x90, + 0xc9, 0x23, 0x67, 0x91, 0x3e, 0x9e, 0xb9, 0xce, 0xf7, 0x85, 0x5d, 0xe1, 0x7e, 0x3f, 0x0f, 0x4e, + 0xb5, 0x93, 0xa9, 0xd4, 0x0a, 0x77, 0x41, 0x81, 0x71, 0x4c, 0xb9, 0xd5, 0x23, 0x87, 0xaa, 0x7a, + 0x1f, 0xbe, 0x1e, 0x55, 0x2f, 0xcd, 0x54, 0xb9, 0xe4, 0x74, 0xb7, 0xc9, 0x21, 0xd2, 0x24, 0xcc, + 0x6d, 0x72, 0x08, 0xf7, 0xc0, 0x22, 0x09, 0x1c, 0x09, 0x38, 0xff, 0x37, 0x00, 0x17, 0x48, 0xe0, + 0x08, 0xb8, 0x7b, 0x00, 0xd8, 0xe3, 0xfd, 0xca, 0xe2, 0x15, 0x37, 0xff, 0x6f, 0x9e, 0x20, 0x85, + 0xe6, 0xe4, 0x88, 0x29, 0x3e, 0xa7, 0x80, 0x54, 0x5a, 0x7e, 0x2d, 0x80, 0x55, 0x55, 0x1b, 0x4e, + 0x9c, 0x1b, 0x43, 0xec, 0x23, 0xc2, 0x06, 0xbe, 0x90, 0x91, 0xbc, 0xd4, 0x2e, 0xd5, 0xfd, 0x1f, + 0x9c, 0xb8, 0xa0, 0x42, 0x11, 0x2a, 0x40, 0x50, 0x1c, 0x0b, 0xaf, 0x82, 0x3c, 0x13, 0x4a, 0xa3, + 0x76, 0x7d, 0xfe, 0x44, 0x10, 0xa9, 0x4b, 0x28, 0x0e, 0x12, 0xd1, 0x7d, 0xd1, 0xfd, 0x92, 0x75, + 0xb3, 0x44, 0x4b, 0xad, 0x40, 0x71, 0x10, 0xfc, 0x12, 0xe8, 0x76, 0xdc, 0xab, 0x56, 0xd2, 0x42, + 0xc6, 0x69, 0x09, 0x74, 0x69, 0xa6, 0xe4, 0xa5, 0x9a, 0x1c, 0x95, 0xed, 0x37, 0xba, 0x7e, 0x03, + 0xe8, 0x1e, 0xb3, 0x7c, 0x82, 0x19, 0xb1, 0x28, 0x79, 0x34, 0x20, 0x8c, 0x1b, 0x0b, 0xb2, 0x37, + 0x4a, 0x1e, 0xdb, 0x15, 0x66, 0x14, 0x5b, 0xe1, 0x2e, 0x28, 0x3f, 0xa6, 0x1e, 0x27, 0xd6, 0x58, + 0xc7, 0x0d, 0x4d, 0xee, 0xe2, 0xdf, 0xa9, 0x5d, 0x08, 0x05, 0x30, 0xbb, 0xbe, 0x6d, 0x1e, 0x24, + 0x4e, 0xaa, 0x3d, 0x4a, 0x32, 0x76, 0x6c, 0x85, 0x77, 0x80, 0xee, 0x90, 0x88, 0x12, 0x59, 0x2d, + 0x25, 0xcf, 0xe0, 0x1d, 0xe4, 0x19, 0x95, 0x27, 0xe1, 0x52, 0x93, 0xe1, 0x4d, 0x90, 0x8f, 0x61, + 0xa0, 0x84, 0xb9, 0x30, 0x13, 0x8c, 0x0c, 0x55, 0x5b, 0x8c, 0xc3, 0xe1, 0x17, 0xa0, 0x6c, 0x4b, + 0x35, 0xb5, 0xa8, 0x92, 0x53, 0x63, 0x49, 0x22, 0xd6, 0x4f, 0xce, 0xf6, 0x94, 0x0a, 0xa3, 0x92, + 0x3d, 0xad, 0xe4, 0xe7, 0x40, 0x89, 0xe2, 0x87, 0xdc, 0xf2, 0x43, 0x57, 0x9d, 0x78, 0x59, 0x76, + 0xfa, 0x92, 0xb0, 0xee, 0x86, 0x6e, 0x7c, 0x8e, 0x47, 0xa0, 0x88, 0x1d, 0xc7, 0x62, 0x8c, 0xe3, + 0x8e, 0x4f, 0x8c, 0x15, 0xb9, 0xf6, 0xb5, 0x59, 0x59, 0x3b, 0xc5, 0x7d, 0xf3, 0xba, 0xe3, 0xb4, + 0xdb, 0x07, 0x02, 0xa7, 0x51, 0x12, 0x72, 0x3c, 0xf9, 0x46, 0x00, 0x3b, 0x4e, 0x3b, 0x5e, 0x03, + 0x7e, 0x93, 0x01, 0xa7, 0x22, 0x4a, 0x86, 0x8a, 0x07, 0xf1, 0x7b, 0x05, 0xfb, 0xc6, 0xea, 0x2c, + 0xf5, 0xbd, 0xf6, 0x7a, 0x54, 0xbd, 0x3a, 0xfb, 0xf5, 0x20, 0x82, 0x9b, 0x7e, 0x68, 0xf7, 0xc6, + 0x08, 0x68, 0x45, 0xac, 0x2d, 0xb9, 0x76, 0x47, 0xad, 0x0c, 0xbf, 0x02, 0x30, 0xa2, 0x5e, 0x48, + 0x2d, 0xf1, 0xba, 0xb0, 0xd4, 0x8b, 0xc2, 0x38, 0x23, 0xf7, 0x63, 0xbe, 0x25, 0x17, 0xa9, 0x87, + 0x88, 0x89, 0x08, 0x76, 0xda, 0xf1, 0x18, 0xe9, 0x12, 0x29, 0x65, 0x59, 0xfb, 0x14, 0xa4, 0x32, + 0x01, 0x21, 0xc8, 0x89, 0xa7, 0x53, 0xac, 0x99, 0x48, 0x8e, 0x61, 0x15, 0xe4, 0x6d, 0x6a, 0x6f, + 0x6d, 0x4a, 0xd1, 0x58, 0x6e, 0x14, 0x8e, 0x46, 0xd5, 0x7c, 0x13, 0x35, 0xb7, 0x36, 0x51, 0x6c, + 0x8f, 0x45, 0xa7, 0x95, 0xd3, 0x32, 0xfa, 0x7c, 0x2b, 0xa7, 0xe5, 0xf5, 0x85, 0x56, 0x4e, 0x5b, + 0xd4, 0xb5, 0x56, 0x4e, 0x2b, 0xe8, 0xa0, 0x95, 0xd3, 0x4a, 0x7a, 0xb9, 0x95, 0xd3, 0xca, 0xba, + 0xde, 0xca, 0x69, 0xba, 0xbe, 0xd2, 0xca, 0x69, 0xa7, 0xf4, 0xd5, 0xd6, 0x82, 0xf6, 0xdd, 0xbe, + 0xfe, 0xd3, 0x7e, 0x6d, 0x1d, 0x80, 0xcf, 0x45, 0x3f, 0x34, 0x30, 0xb7, 0xbb, 0xc7, 0x6d, 0xa0, + 0x76, 0x17, 0x2c, 0xed, 0x86, 0xae, 0x67, 0x63, 0xff, 0xb3, 0x68, 0x37, 0x74, 0xe1, 0x75, 0x90, + 0x0d, 0x23, 0x71, 0x7b, 0x88, 0x7b, 0xe9, 0x7f, 0x27, 0x71, 0x7b, 0x1c, 0xaa, 0xa8, 0x2d, 0x62, + 0x6b, 0x3f, 0x2c, 0x80, 0x22, 0xc2, 0x0f, 0x79, 0x33, 0xec, 0xf7, 0x71, 0xe0, 0xc0, 0x21, 0x38, + 0x3b, 0x7e, 0x99, 0xc6, 0x85, 0x67, 0xa2, 0xd5, 0x03, 0x9b, 0x48, 0x05, 0xc8, 0x36, 0x3e, 0x79, + 0x3d, 0xaa, 0x5e, 0x79, 0x27, 0xb5, 0x97, 0x45, 0x6c, 0x2b, 0x14, 0x74, 0x3a, 0x81, 0x9f, 0x32, + 0xc3, 0x03, 0xf0, 0x8f, 0x54, 0xeb, 0x4f, 0x6f, 0x41, 0x3e, 0x07, 0x8a, 0x9b, 0xc6, 0x31, 0x17, + 0x6f, 0x2c, 0x46, 0x67, 0x27, 0xa1, 0x77, 0xd2, 0xe8, 0xf0, 0x3c, 0x28, 0xf7, 0xf1, 0x13, 0x75, + 0x10, 0x2f, 0x70, 0xc8, 0x13, 0xa9, 0xb6, 0x39, 0xb4, 0xdc, 0xc7, 0x4f, 0xa4, 0xcb, 0x8e, 0x30, + 0xc2, 0x5b, 0x40, 0xb7, 0xfd, 0x90, 0x11, 0x27, 0xa5, 0x63, 0x2b, 0x33, 0xf0, 0x1c, 0x95, 0xe3, + 0xb0, 0x89, 0x84, 0x3d, 0x02, 0x67, 0xe8, 0xb8, 0xe9, 0x2c, 0x32, 0xc4, 0xbe, 0x45, 0x65, 0xdb, + 0xc9, 0xb6, 0x2e, 0x6e, 0x7e, 0xf4, 0x5e, 0x3d, 0xab, 0x2a, 0xb6, 0x4a, 0x8f, 0xbb, 0xcb, 0x76, + 0x41, 0x31, 0xd6, 0xe0, 0x8e, 0x20, 0x8e, 0x51, 0x9a, 0xf1, 0x0a, 0x9d, 0x70, 0x0d, 0x81, 0xc7, + 0x13, 0xde, 0xb5, 0x41, 0xc9, 0x8f, 0x89, 0x62, 0x85, 0x91, 0x50, 0x25, 0xa3, 0x3c, 0xe3, 0x15, + 0x99, 0xa6, 0x26, 0x5a, 0xf2, 0xd3, 0x44, 0x7d, 0x00, 0x00, 0xa7, 0xd8, 0x26, 0x96, 0xa4, 0xb4, + 0x2e, 0xf9, 0xfa, 0xf1, 0xc9, 0x99, 0x98, 0xf0, 0xd2, 0x3c, 0x10, 0xe1, 0xdb, 0x98, 0xe3, 0x1b, + 0x01, 0xa7, 0x87, 0xa8, 0xc0, 0x93, 0xef, 0xb5, 0xab, 0xa0, 0x34, 0x3d, 0x09, 0x75, 0x90, 0x4d, + 0x9e, 0x3b, 0x05, 0x24, 0x86, 0xe2, 0xa1, 0x35, 0xc4, 0xfe, 0x20, 0xbe, 0xee, 0x0b, 0x28, 0xfe, + 0xb8, 0x32, 0x7f, 0x59, 0x34, 0x6b, 0x56, 0xcf, 0x8d, 0x5b, 0x76, 0x5e, 0xcf, 0xc6, 0xed, 0xf8, + 0xf3, 0x7e, 0xed, 0x32, 0x28, 0xed, 0x29, 0x92, 0xdc, 0x0c, 0x43, 0x4e, 0xe8, 0xac, 0x6c, 0xaa, + 0xb9, 0xe0, 0x74, 0x73, 0x9a, 0x16, 0x0a, 0x60, 0xff, 0x3d, 0x69, 0xa6, 0xca, 0xff, 0x26, 0xd9, + 0x1a, 0x17, 0x9f, 0xbd, 0xac, 0xcc, 0x3d, 0x3b, 0xaa, 0x64, 0x9e, 0x1f, 0x55, 0x32, 0x2f, 0x8e, + 0x2a, 0x99, 0xdf, 0x8f, 0x2a, 0x99, 0x6f, 0x5f, 0x55, 0xe6, 0x9e, 0xbf, 0xaa, 0xcc, 0xbd, 0x78, + 0x55, 0x99, 0x7b, 0x00, 0x26, 0x3f, 0x6b, 0x9d, 0x05, 0xf9, 0x2f, 0xb5, 0xf5, 0x67, 0x00, 0x00, + 0x00, 0xff, 0xff, 0xf2, 0x56, 0x08, 0x65, 0x92, 0x0e, 0x00, 0x00, } func (this *Split) Equal(that interface{}) bool { @@ -1365,7 +1365,7 @@ func (m *ReplicatedEvalResult) MarshalToSizedBuffer(dAtA []byte) (int, error) { dAtA[i] = 0x52 } { - size, err := m.Timestamp.MarshalToSizedBuffer(dAtA[:i]) + size, err := m.WriteTimestamp.MarshalToSizedBuffer(dAtA[:i]) if err != nil { return 0, err } @@ -1837,7 +1837,7 @@ func (m *ReplicatedEvalResult) Size() (n int) { if m.IsLeaseRequest { n += 2 } - l = m.Timestamp.Size() + l = m.WriteTimestamp.Size() n += 1 + l + sovProposerKv(uint64(l)) if m.DeprecatedDelta != nil { l = m.DeprecatedDelta.Size() @@ -2860,7 +2860,7 @@ func (m *ReplicatedEvalResult) Unmarshal(dAtA []byte) error { m.IsLeaseRequest = bool(v != 0) case 8: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field WriteTimestamp", wireType) } var msglen int for shift := uint(0); ; shift += 7 { @@ -2887,7 +2887,7 @@ func (m *ReplicatedEvalResult) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - if err := m.Timestamp.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + if err := m.WriteTimestamp.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { return err } iNdEx = postIndex diff --git a/pkg/kv/kvserver/kvserverpb/proposer_kv.proto b/pkg/kv/kvserver/kvserverpb/proposer_kv.proto index dbc8a20fd232..d6a72965ffa2 100644 --- a/pkg/kv/kvserver/kvserverpb/proposer_kv.proto +++ b/pkg/kv/kvserver/kvserverpb/proposer_kv.proto @@ -125,10 +125,10 @@ message ReplicatedEvalResult { Merge merge = 4; ComputeChecksum compute_checksum = 21; bool is_lease_request = 6; - // The BatchRequest.Timestamp of the request that produced this command. Used - // to verify the validity of the command against the GC threshold and to - // update the followers' clocks. - util.hlc.Timestamp timestamp = 8 [(gogoproto.nullable) = false]; + // The timestamp at which this command is writing. Used to verify the validity + // of the command against the GC threshold and to update the followers' + // clocks. + util.hlc.Timestamp write_timestamp = 8 [(gogoproto.nullable) = false]; // The stats delta corresponding to the data in this WriteBatch. On // a split, contains only the contributions to the left-hand side. storage.enginepb.MVCCStats deprecated_delta = 10; // See #18828 diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 865175233bb4..334829e18677 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -1348,8 +1348,8 @@ func (r *Replica) checkSpanInRangeRLocked(ctx context.Context, rspan roachpb.RSp ) } -// checkTSAboveGCThresholdRLocked returns an error if a request (identified -// by its MVCC timestamp) can be run on the replica. +// checkTSAboveGCThresholdRLocked returns an error if a request (identified by +// its read timestamp) wants to read below the range's GC threshold. func (r *Replica) checkTSAboveGCThresholdRLocked( ts hlc.Timestamp, st kvserverpb.LeaseStatus, isAdmin bool, ) error { diff --git a/pkg/kv/kvserver/replica_application_result.go b/pkg/kv/kvserver/replica_application_result.go index 1d212e8d9c24..c86f35ab5fa7 100644 --- a/pkg/kv/kvserver/replica_application_result.go +++ b/pkg/kv/kvserver/replica_application_result.go @@ -67,7 +67,7 @@ func isTrivial(r *kvserverpb.ReplicatedEvalResult) bool { // it is trivial. allowlist := *r allowlist.Delta = enginepb.MVCCStatsDelta{} - allowlist.Timestamp = hlc.Timestamp{} + allowlist.WriteTimestamp = hlc.Timestamp{} allowlist.DeprecatedDelta = nil allowlist.PrevLeaseProposal = nil allowlist.State = nil @@ -85,7 +85,7 @@ func clearTrivialReplicatedEvalResultFields(r *kvserverpb.ReplicatedEvalResult) // they don't trigger an assertion at the end of the application process // (which checks that all fields were handled). r.IsLeaseRequest = false - r.Timestamp = hlc.Timestamp{} + r.WriteTimestamp = hlc.Timestamp{} r.PrevLeaseProposal = nil // The state fields cleared here were already applied to the in-memory view of // replica state for this batch. diff --git a/pkg/kv/kvserver/replica_application_state_machine.go b/pkg/kv/kvserver/replica_application_state_machine.go index 2aa5acb0f544..7ff541f5df8e 100644 --- a/pkg/kv/kvserver/replica_application_state_machine.go +++ b/pkg/kv/kvserver/replica_application_state_machine.go @@ -318,16 +318,19 @@ func checkForcedErr( ) } - // Verify that the batch timestamp is after the GC threshold. This is + // Verify that command is not trying to write below the GC threshold. This is // necessary because not all commands declare read access on the GC // threshold key, even though they implicitly depend on it. This means // that access to this state will not be serialized by latching, // so we must perform this check upstream and downstream of raft. - // See #14833. - ts := raftCmd.ReplicatedEvalResult.Timestamp - if ts.LessEq(*replicaState.GCThreshold) { + // TODO(andrei,nvanbenschoten,bdarnell): Is this check below-Raft actually + // necessary, given that we've check at evaluation time that the request + // evaluates at a timestamp above the GC threshold? Does it actually matter if + // the GC threshold has advanced since then? + wts := raftCmd.ReplicatedEvalResult.WriteTimestamp + if wts.LessEq(*replicaState.GCThreshold) { return leaseIndex, proposalNoReevaluation, roachpb.NewError(&roachpb.BatchTimestampBeforeGCError{ - Timestamp: ts, + Timestamp: wts, Threshold: *replicaState.GCThreshold, }) } @@ -484,7 +487,7 @@ func (b *replicaAppBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error } // Update the batch's max timestamp. - if clockTS, ok := cmd.replicatedResult().Timestamp.TryToClockTimestamp(); ok { + if clockTS, ok := cmd.replicatedResult().WriteTimestamp.TryToClockTimestamp(); ok { b.maxTS.Forward(clockTS) } diff --git a/pkg/kv/kvserver/replica_application_state_machine_test.go b/pkg/kv/kvserver/replica_application_state_machine_test.go index c64225c03ec7..11d2c1b04f21 100644 --- a/pkg/kv/kvserver/replica_application_state_machine_test.go +++ b/pkg/kv/kvserver/replica_application_state_machine_test.go @@ -123,7 +123,7 @@ func TestReplicaStateMachineChangeReplicas(t *testing.T) { ReplicatedEvalResult: kvserverpb.ReplicatedEvalResult{ State: &kvserverpb.ReplicaState{Desc: &newDesc}, ChangeReplicas: &kvserverpb.ChangeReplicas{ChangeReplicasTrigger: trigger}, - Timestamp: r.mu.state.GCThreshold.Add(1, 0), + WriteTimestamp: r.mu.state.GCThreshold.Add(1, 0), }, }, confChange: &decodedConfChange{ diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 5bd7f506744f..98587f35f088 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -766,6 +766,10 @@ func (r *Replica) evaluateProposal( // Evaluate the commands. If this returns without an error, the batch should // be committed. Note that we don't hold any locks at this point. This is // important since evaluating a proposal is expensive. + // + // Note that, during evaluation, ba's read and write timestamps might get + // bumped (see evaluateWriteBatchWithServersideRefreshes). + // // TODO(tschottdorf): absorb all returned values in `res` below this point // in the call stack as well. batch, ms, br, res, pErr := r.evaluateWriteBatch(ctx, idKey, ba, lul, latchSpans) @@ -822,7 +826,7 @@ func (r *Replica) evaluateProposal( // Set the proposal's replicated result, which contains metadata and // side-effects that are to be replicated to all replicas. res.Replicated.IsLeaseRequest = ba.IsLeaseRequest() - res.Replicated.Timestamp = ba.Timestamp + res.Replicated.WriteTimestamp = ba.WriteTimestamp() res.Replicated.Delta = ms.ToStatsDelta() // This is the result of a migration. See the field for more details. From f10fed82be3c440979605f87c0837e05a0c1f910 Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Fri, 2 Apr 2021 16:59:30 -0400 Subject: [PATCH 4/8] kvserver: future-proof RER.WriteTimestamp This patch makes below-Raft code not freak out if the field is not set (i.e. it appears to be an empty timestamp). This is in order to leave the door open for future versions to not send the field any more, or at least send it only on some commands. Concretely, this patch allows the proposer to declare that particular request does not need to be checked against the GC threshold below Raft (whether *any* request actually needs to be checked below Raft is a question that we leave open). Release note: None --- pkg/kv/kvserver/replica_application_state_machine.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/kv/kvserver/replica_application_state_machine.go b/pkg/kv/kvserver/replica_application_state_machine.go index 7ff541f5df8e..decf7cbbd517 100644 --- a/pkg/kv/kvserver/replica_application_state_machine.go +++ b/pkg/kv/kvserver/replica_application_state_machine.go @@ -328,7 +328,7 @@ func checkForcedErr( // evaluates at a timestamp above the GC threshold? Does it actually matter if // the GC threshold has advanced since then? wts := raftCmd.ReplicatedEvalResult.WriteTimestamp - if wts.LessEq(*replicaState.GCThreshold) { + if !wts.IsEmpty() && wts.LessEq(*replicaState.GCThreshold) { return leaseIndex, proposalNoReevaluation, roachpb.NewError(&roachpb.BatchTimestampBeforeGCError{ Timestamp: wts, Threshold: *replicaState.GCThreshold, From 40491088fc47d7153cfc050edfb3e4e2a9badeec Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Tue, 6 Apr 2021 14:55:10 -0400 Subject: [PATCH 5/8] kvserver: rationalize proposal timestamp This patch reworks the ReplicatedEvalResult.WriteTimestamp field (*). Before this patch, WriteTimestamp was always coming from ba.WriteTimestamp(), which is either a transaction's write timestamp or, for non-txn requests, the batch's read timestamp or, for non-MVCC requests, some random clock value. Below Raft, the field is used for updating the followers' clocks, and also to check the request against the GC threshold. This patch sets the WriteTimestamp differently for IntentWrite requests than other requests: - for regular writes, the field remains ba.WriteTimestamp() - for other proposals, the field is a clock reading on the proposer Some requests (e.g. LeaseTransfers) need a clock signal to travel with their proposal, and now they get it (see #62569). [*] An alternative to split the field into two was considered, but it's hard to do now because of backwards compatibility. It can be done in the next release, though, because now all the uses of the WriteTimestamp field tolerate it being empty. Fixes #62569 Release note: None --- pkg/kv/kvserver/kvserverpb/proposer_kv.pb.go | 4 +++- pkg/kv/kvserver/kvserverpb/proposer_kv.proto | 4 +++- pkg/kv/kvserver/replica_application_state_machine.go | 11 +++++++---- pkg/kv/kvserver/replica_proposal.go | 10 +++++++++- 4 files changed, 22 insertions(+), 7 deletions(-) diff --git a/pkg/kv/kvserver/kvserverpb/proposer_kv.pb.go b/pkg/kv/kvserver/kvserverpb/proposer_kv.pb.go index 23909fca7afe..f0f7add35f02 100644 --- a/pkg/kv/kvserver/kvserverpb/proposer_kv.pb.go +++ b/pkg/kv/kvserver/kvserverpb/proposer_kv.pb.go @@ -287,7 +287,9 @@ type ReplicatedEvalResult struct { IsLeaseRequest bool `protobuf:"varint,6,opt,name=is_lease_request,json=isLeaseRequest,proto3" json:"is_lease_request,omitempty"` // The timestamp at which this command is writing. Used to verify the validity // of the command against the GC threshold and to update the followers' - // clocks. + // clocks. If the request that produced this command is not an IntentWrite + // one, then the request's write timestamp is meaningless; for such request's, + // this field is simply a clock reading from the proposer. WriteTimestamp hlc.Timestamp `protobuf:"bytes,8,opt,name=write_timestamp,json=writeTimestamp,proto3" json:"write_timestamp"` // The stats delta corresponding to the data in this WriteBatch. On // a split, contains only the contributions to the left-hand side. diff --git a/pkg/kv/kvserver/kvserverpb/proposer_kv.proto b/pkg/kv/kvserver/kvserverpb/proposer_kv.proto index d6a72965ffa2..f6bcb7bafbdb 100644 --- a/pkg/kv/kvserver/kvserverpb/proposer_kv.proto +++ b/pkg/kv/kvserver/kvserverpb/proposer_kv.proto @@ -127,7 +127,9 @@ message ReplicatedEvalResult { bool is_lease_request = 6; // The timestamp at which this command is writing. Used to verify the validity // of the command against the GC threshold and to update the followers' - // clocks. + // clocks. If the request that produced this command is not an IntentWrite + // one, then the request's write timestamp is meaningless; for such request's, + // this field is simply a clock reading from the proposer. util.hlc.Timestamp write_timestamp = 8 [(gogoproto.nullable) = false]; // The stats delta corresponding to the data in this WriteBatch. On // a split, contains only the contributions to the left-hand side. diff --git a/pkg/kv/kvserver/replica_application_state_machine.go b/pkg/kv/kvserver/replica_application_state_machine.go index decf7cbbd517..f52d1a1f40b6 100644 --- a/pkg/kv/kvserver/replica_application_state_machine.go +++ b/pkg/kv/kvserver/replica_application_state_machine.go @@ -460,10 +460,13 @@ func (b *replicaAppBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error // we can only assert on the leaseholder, as only that replica has // cmd.proposal.Request filled in. if cmd.IsLocal() && cmd.proposal.Request.IsIntentWrite() { - wts := cmd.proposal.Request.WriteTimestamp() - if wts.LessEq(b.state.RaftClosedTimestamp) { - return nil, makeNonDeterministicFailure("writing at %s below closed ts: %s (%s)", - wts, b.state.RaftClosedTimestamp.String(), cmd.proposal.Request.String()) + wts := cmd.raftCmd.ReplicatedEvalResult.WriteTimestamp + if !wts.IsEmpty() && wts.LessEq(b.state.RaftClosedTimestamp) { + wts := wts // Shadow variable that escapes to the heap. + return nil, wrapWithNonDeterministicFailure( + errors.AssertionFailedf("writing at %s below closed ts: %s (%s)", + wts, b.state.RaftClosedTimestamp.String(), cmd.proposal.Request), + "attempting to write below closed timestamp") } } log.Event(ctx, "applying command") diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 98587f35f088..48cd26d49d7f 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -826,7 +826,15 @@ func (r *Replica) evaluateProposal( // Set the proposal's replicated result, which contains metadata and // side-effects that are to be replicated to all replicas. res.Replicated.IsLeaseRequest = ba.IsLeaseRequest() - res.Replicated.WriteTimestamp = ba.WriteTimestamp() + if ba.IsIntentWrite() { + res.Replicated.WriteTimestamp = ba.WriteTimestamp() + } else { + // For misc requests, use WriteTimestamp to propagate a clock signal. This + // is particularly important for lease transfers, as it assures that the + // follower getting the lease will have a clock above the start time of + // its lease. + res.Replicated.WriteTimestamp = r.store.Clock().Now() + } res.Replicated.Delta = ms.ToStatsDelta() // This is the result of a migration. See the field for more details. From a1756148dbeeaaedf9215d91b94163022a611869 Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Thu, 1 Apr 2021 16:01:43 -0400 Subject: [PATCH 6/8] kvserver: more Raft closed timestamp assertions In #62655 we see that there appears to be something wrong with the Raft closed timestamp. That issue shows an assertion failure about a command trying to write below a timestamp that was promised to have been closed by a previous command. This patch includes a little bit more info in that assertion (the current lease) and adds another two assertions: - an assertion that the closed timestamp itself does not regress. This assertion already existed in stageTrivialReplicatedEvalResult(), but that comes after the failure we've seen. The point of copying this assertion up is to ascertain whether we're screwing up by regressing the closed timestamp, or whether a particular request/command is at fault for not obeying the closed timestamp. - an assertion against closed ts regression when copying the replica state from a staging batch to the replica. All these assertions can be disabled with an env var if things go bad. Fixes #62765 Release note: None --- .../replica_application_state_machine.go | 97 +++++++++++++++---- pkg/util/hlc/timestamp.go | 2 +- 2 files changed, 79 insertions(+), 20 deletions(-) diff --git a/pkg/kv/kvserver/replica_application_state_machine.go b/pkg/kv/kvserver/replica_application_state_machine.go index f52d1a1f40b6..a7ee883983e8 100644 --- a/pkg/kv/kvserver/replica_application_state_machine.go +++ b/pkg/kv/kvserver/replica_application_state_machine.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -454,20 +455,11 @@ func (b *replicaAppBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error cmd.raftCmd.LogicalOpLog = nil cmd.raftCmd.ClosedTimestamp = nil } else { - // Assert that we're not writing under the closed timestamp. We can only do - // these checks on IsIntentWrite requests, since others (for example, - // EndTxn) can operate below the closed timestamp. In turn, this means that - // we can only assert on the leaseholder, as only that replica has - // cmd.proposal.Request filled in. - if cmd.IsLocal() && cmd.proposal.Request.IsIntentWrite() { - wts := cmd.raftCmd.ReplicatedEvalResult.WriteTimestamp - if !wts.IsEmpty() && wts.LessEq(b.state.RaftClosedTimestamp) { - wts := wts // Shadow variable that escapes to the heap. - return nil, wrapWithNonDeterministicFailure( - errors.AssertionFailedf("writing at %s below closed ts: %s (%s)", - wts, b.state.RaftClosedTimestamp.String(), cmd.proposal.Request), - "attempting to write below closed timestamp") - } + if err := b.assertNoCmdClosedTimestampRegression(cmd); err != nil { + return nil, err + } + if err := b.assertNoWriteBelowClosedTimestamp(cmd); err != nil { + return nil, err } log.Event(ctx, "applying command") } @@ -834,11 +826,6 @@ func (b *replicaAppBatch) stageTrivialReplicatedEvalResult( b.state.LeaseAppliedIndex = leaseAppliedIndex } if cts := cmd.raftCmd.ClosedTimestamp; cts != nil && !cts.IsEmpty() { - if cts.Less(b.state.RaftClosedTimestamp) { - log.Fatalf(ctx, - "closed timestamp regressing from %s to %s when applying command %x", - b.state.RaftClosedTimestamp, cts, cmd.idKey) - } b.state.RaftClosedTimestamp = *cts if clockTS, ok := cts.TryToClockTimestamp(); ok { b.maxTS.Forward(clockTS) @@ -902,6 +889,16 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error { r.mu.Lock() r.mu.state.RaftAppliedIndex = b.state.RaftAppliedIndex r.mu.state.LeaseAppliedIndex = b.state.LeaseAppliedIndex + + // Sanity check that the RaftClosedTimestamp doesn't go backwards. + existingClosed := r.mu.state.RaftClosedTimestamp + newClosed := b.state.RaftClosedTimestamp + if !newClosed.IsEmpty() && newClosed.Less(existingClosed) && raftClosedTimestampAssertionsEnabled { + return errors.AssertionFailedf( + "raft closed timestamp regression; replica has: %s, new batch has: %s.", + existingClosed.String(), newClosed.String()) + } + closedTimestampUpdated := r.mu.state.RaftClosedTimestamp.Forward(b.state.RaftClosedTimestamp) prevStats := *r.mu.state.Stats *r.mu.state.Stats = *b.state.Stats @@ -1015,6 +1012,68 @@ func (b *replicaAppBatch) Close() { *b = replicaAppBatch{} } +// raftClosedTimestampAssertionsEnabled provides an emergency way of shutting +// down assertions. +var raftClosedTimestampAssertionsEnabled = envutil.EnvOrDefaultBool("COCKROACH_RAFT_CLOSEDTS_ASSERTIONS_ENABLED", true) + +// Assert that the current command is not writing under the closed timestamp. +// This check only applies to IntentWrite commands, since others (for example, +// EndTxn) can operate below the closed timestamp. +// +// Note that we check that we're we're writing under b.state.RaftClosedTimestamp +// (i.e. below the timestamp closed by previous commands), not below +// cmd.raftCmd.ClosedTimestamp. A command is allowed to write below the closed +// timestamp carried by itself; in other words cmd.raftCmd.ClosedTimestamp is a +// promise about future commands, not the command carrying it. +func (b *replicaAppBatch) assertNoWriteBelowClosedTimestamp(cmd *replicatedCmd) error { + if !cmd.IsLocal() || !cmd.proposal.Request.IsIntentWrite() { + return nil + } + if !raftClosedTimestampAssertionsEnabled { + return nil + } + wts := cmd.raftCmd.ReplicatedEvalResult.WriteTimestamp + if !wts.IsEmpty() && wts.LessEq(b.state.RaftClosedTimestamp) { + wts := wts // Make a shadow variable that escapes to the heap. + var req string + if cmd.proposal != nil { + req = cmd.proposal.Request.String() + } else { + req = "request unknown; not leaseholder" + } + return wrapWithNonDeterministicFailure(errors.AssertionFailedf( + "command writing below closed timestamp; cmd: %x, write ts: %s, "+ + "batch state closed: %s, command closed: %s, request: %s, lease: %s", + cmd.idKey, wts, + b.state.RaftClosedTimestamp.String(), cmd.raftCmd.ClosedTimestamp, + req, b.state.Lease), + "command writing below closed timestamp") + } + return nil +} + +// Assert that the closed timestamp carried by the command is not below one from +// previous commands. +func (b *replicaAppBatch) assertNoCmdClosedTimestampRegression(cmd *replicatedCmd) error { + if !raftClosedTimestampAssertionsEnabled { + return nil + } + existingClosed := b.state.RaftClosedTimestamp + newClosed := cmd.raftCmd.ClosedTimestamp + if newClosed != nil && !newClosed.IsEmpty() && newClosed.Less(existingClosed) { + var req string + if cmd.IsLocal() && cmd.proposal.Request.IsIntentWrite() { + req = cmd.proposal.Request.String() + } else { + req = "" + } + return errors.AssertionFailedf( + "raft closed timestamp regression in cmd: %x; batch state: %s, command: %s, lease: %s, req: %s", + cmd.idKey, existingClosed.String(), newClosed.String(), b.state.Lease, req) + } + return nil +} + // ephemeralReplicaAppBatch implements the apply.Batch interface. // // The batch performs the bare-minimum amount of work to be able to diff --git a/pkg/util/hlc/timestamp.go b/pkg/util/hlc/timestamp.go index adde5be0e960..96741346efec 100644 --- a/pkg/util/hlc/timestamp.go +++ b/pkg/util/hlc/timestamp.go @@ -175,7 +175,7 @@ func (t Timestamp) AsOfSystemTime() string { return fmt.Sprintf("%d.%010d%s", t.WallTime, t.Logical, syn) } -// IsEmpty retruns true if t is an empty Timestamp. +// IsEmpty returns true if t is an empty Timestamp. func (t Timestamp) IsEmpty() bool { return t == Timestamp{} } From cf7875b30c0306f13bb90c8e04f8a27455eb41eb Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Fri, 9 Apr 2021 15:56:54 -0400 Subject: [PATCH 7/8] kvserver: even more closed ts assertions This patch adds historical information to the assertion against closed timestamp regressions. We've seen that assertion fire in #61981. The replica now maintains info about what command last bumped the ClosedTimestamp. Release note: None --- pkg/kv/kvserver/replica.go | 3 + .../replica_application_state_machine.go | 70 ++++++++++++++++++- 2 files changed, 71 insertions(+), 2 deletions(-) diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 334829e18677..ca8cfddc4e96 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -568,6 +568,9 @@ type Replica struct { failureToGossipSystemConfig bool tenantID roachpb.TenantID // Set when first initialized, not modified after + + // Historical information about the command that set the closed timestamp. + closedTimestampSetter closedTimestampSetterInfo } rangefeedMu struct { diff --git a/pkg/kv/kvserver/replica_application_state_machine.go b/pkg/kv/kvserver/replica_application_state_machine.go index a7ee883983e8..333d49f425fc 100644 --- a/pkg/kv/kvserver/replica_application_state_machine.go +++ b/pkg/kv/kvserver/replica_application_state_machine.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/apply" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -25,8 +26,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" "github.com/kr/pretty" "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3/raftpb" @@ -357,6 +360,7 @@ func (sm *replicaStateMachine) NewBatch(ephemeral bool) apply.Batch { b.state = r.mu.state b.state.Stats = &b.stats *b.state.Stats = *r.mu.state.Stats + b.closedTimestampSetter = r.mu.closedTimestampSetter r.mu.RUnlock() b.start = timeutil.Now() return b @@ -379,6 +383,9 @@ type replicaAppBatch struct { // under the Replica.mu when the batch is initialized and is updated in // stageTrivialReplicatedEvalResult. state kvserverpb.ReplicaState + // closedTimestampSetter maintains historical information about the + // advancement of the closed timestamp. + closedTimestampSetter closedTimestampSetterInfo // stats is stored on the application batch to avoid an allocation in // tracking the batch's view of replicaState. All pointer fields in // replicaState other than Stats are overwritten completely rather than @@ -827,6 +834,7 @@ func (b *replicaAppBatch) stageTrivialReplicatedEvalResult( } if cts := cmd.raftCmd.ClosedTimestamp; cts != nil && !cts.IsEmpty() { b.state.RaftClosedTimestamp = *cts + b.closedTimestampSetter.record(cmd, b.state.Lease) if clockTS, ok := cts.TryToClockTimestamp(); ok { b.maxTS.Forward(clockTS) } @@ -898,6 +906,7 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error { "raft closed timestamp regression; replica has: %s, new batch has: %s.", existingClosed.String(), newClosed.String()) } + r.mu.closedTimestampSetter = b.closedTimestampSetter closedTimestampUpdated := r.mu.state.RaftClosedTimestamp.Forward(b.state.RaftClosedTimestamp) prevStats := *r.mu.state.Stats @@ -1067,9 +1076,18 @@ func (b *replicaAppBatch) assertNoCmdClosedTimestampRegression(cmd *replicatedCm } else { req = "" } + var prevReq redact.StringBuilder + if req := b.closedTimestampSetter.leaseReq; req != nil { + prevReq.Printf("lease acquisition: %s (prev: %s)", req.Lease, req.PrevLease) + } else { + prevReq.SafeString("") + } + return errors.AssertionFailedf( - "raft closed timestamp regression in cmd: %x; batch state: %s, command: %s, lease: %s, req: %s", - cmd.idKey, existingClosed.String(), newClosed.String(), b.state.Lease, req) + "raft closed timestamp regression in cmd: %x; batch state: %s, command: %s, lease: %s, req: %s, applying at LAI: %d.\n"+ + "Closed timestamp was set by req: %s under lease: %s; applied at LAI: %d. Batch idx: %d.", + cmd.idKey, existingClosed.String(), newClosed.String(), b.state.Lease, req, cmd.leaseIndex, + prevReq, b.closedTimestampSetter.lease, b.closedTimestampSetter.leaseIdx, b.entries) } return nil } @@ -1322,3 +1340,51 @@ func (sm *replicaStateMachine) moveStats() applyCommittedEntriesStats { sm.stats = applyCommittedEntriesStats{} return stats } + +// closedTimestampSetterInfo contains information about the command that last +// bumped the closed timestamp. +type closedTimestampSetterInfo struct { + // lease represents the lease under which the command is being applied. + lease *roachpb.Lease + // leaseIdx is the LAI of the command. + leaseIdx ctpb.LAI + // leaseReq is set if the request that generated this command was a + // RequestLeaseRequest. This is only ever set on the leaseholder replica since + // only the leaseholder has information about the request corresponding to a + // command. + // NOTE: We only keep track of lease requests because keeping track of all + // requests would be too expensive: cloning the request is expensive and also + // requests can be large in memory. + leaseReq *roachpb.RequestLeaseRequest + // split and merge are set if the request was an EndTxn with the respective + // commit trigger set. + split, merge bool +} + +// record saves information about the command that update's the replica's closed +// timestamp. +func (s *closedTimestampSetterInfo) record(cmd *replicatedCmd, lease *roachpb.Lease) { + *s = closedTimestampSetterInfo{} + s.leaseIdx = ctpb.LAI(cmd.leaseIndex) + s.lease = lease + if !cmd.IsLocal() { + return + } + req := cmd.proposal.Request + et, ok := req.GetArg(roachpb.EndTxn) + if ok { + endTxn := et.(*roachpb.EndTxnRequest) + if trig := endTxn.InternalCommitTrigger; trig != nil { + if trig.SplitTrigger != nil { + s.split = true + } else if trig.MergeTrigger != nil { + s.merge = true + } + } + } else if req.IsLeaseRequest() { + // Make a deep copy since we're not allowed to hold on to request + // memory. + lr, _ := req.GetArg(roachpb.RequestLease) + s.leaseReq = protoutil.Clone(lr).(*roachpb.RequestLeaseRequest) + } +} From 3f38eb2700017ddbb0fd8a46e186768e0c79bb7a Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Wed, 14 Apr 2021 17:15:14 -0400 Subject: [PATCH 8/8] kvserver: add safe formatters for lease and friends This patch makes some types survive log redacting. In particular, it ensures that the assertions added in previous patches don't get too redacted. Release note: None --- docs/generated/redact_safe.md | 2 ++ pkg/kv/kvserver/closedts/ctpb/entry.go | 3 +++ pkg/kv/kvserver/kvserverbase/BUILD.bazel | 1 + pkg/kv/kvserver/kvserverbase/base.go | 14 +++++++++- .../replica_application_state_machine.go | 20 +++++++------- pkg/roachpb/data.go | 26 +++++++++++-------- pkg/roachpb/errors.go | 2 +- 7 files changed, 45 insertions(+), 23 deletions(-) diff --git a/docs/generated/redact_safe.md b/docs/generated/redact_safe.md index 52b50fe0f9ed..ea1d855d2641 100644 --- a/docs/generated/redact_safe.md +++ b/docs/generated/redact_safe.md @@ -4,8 +4,10 @@ File | Type --|-- pkg/cli/exit/exit.go | `Code` pkg/jobs/jobspb/wrap.go | `Type` +pkg/kv/kvserver/closedts/ctpb/entry.go | `LAI` pkg/kv/kvserver/concurrency/lock/locking.go | `WaitPolicy` pkg/kv/kvserver/raft.go | `SnapshotRequest_Type` +pkg/roachpb/data.go | `LeaseSequence` pkg/roachpb/data.go | `ReplicaChangeType` pkg/roachpb/metadata.go | `NodeID` pkg/roachpb/metadata.go | `StoreID` diff --git a/pkg/kv/kvserver/closedts/ctpb/entry.go b/pkg/kv/kvserver/closedts/ctpb/entry.go index 40e54905a66b..9b1d8a376272 100644 --- a/pkg/kv/kvserver/closedts/ctpb/entry.go +++ b/pkg/kv/kvserver/closedts/ctpb/entry.go @@ -25,6 +25,9 @@ type Epoch int64 // mix-ups in positional arguments. type LAI int64 +// SafeValue implements the redact.SafeValue interface. +func (LAI) SafeValue() {} + // String formats Entry for human consumption as well as testing (by avoiding // randomness in the output caused by map iteraton order). func (e Entry) String() string { diff --git a/pkg/kv/kvserver/kvserverbase/BUILD.bazel b/pkg/kv/kvserver/kvserverbase/BUILD.bazel index 598e59870015..df9a0d56bf82 100644 --- a/pkg/kv/kvserver/kvserverbase/BUILD.bazel +++ b/pkg/kv/kvserver/kvserverbase/BUILD.bazel @@ -16,5 +16,6 @@ go_library( "//pkg/roachpb", "//pkg/settings", "//pkg/util/hlc", + "@com_github_cockroachdb_redact//:redact", ], ) diff --git a/pkg/kv/kvserver/kvserverbase/base.go b/pkg/kv/kvserver/kvserverbase/base.go index 4967c27f8a25..6d3acd7600aa 100644 --- a/pkg/kv/kvserver/kvserverbase/base.go +++ b/pkg/kv/kvserver/kvserverbase/base.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/redact" ) // MergeQueueEnabled is a setting that controls whether the merge queue is @@ -39,9 +40,20 @@ var MergeQueueEnabled = settings.RegisterBoolSetting( // larger than the heartbeat interval used by the coordinator. const TxnCleanupThreshold = time.Hour -// CmdIDKey is a Raft command id. +// CmdIDKey is a Raft command id. This will be logged unredacted - keep it random. type CmdIDKey string +// SafeFormat implements redact.SafeFormatter. +func (s CmdIDKey) SafeFormat(sp redact.SafePrinter, verb rune) { + sp.Printf("%q", redact.SafeString(s)) +} + +func (s CmdIDKey) String() string { + return redact.StringWithoutMarkers(s) +} + +var _ redact.SafeFormatter = CmdIDKey("") + // FilterArgs groups the arguments to a ReplicaCommandFilter. type FilterArgs struct { Ctx context.Context diff --git a/pkg/kv/kvserver/replica_application_state_machine.go b/pkg/kv/kvserver/replica_application_state_machine.go index 333d49f425fc..7d8702e88657 100644 --- a/pkg/kv/kvserver/replica_application_state_machine.go +++ b/pkg/kv/kvserver/replica_application_state_machine.go @@ -1044,17 +1044,17 @@ func (b *replicaAppBatch) assertNoWriteBelowClosedTimestamp(cmd *replicatedCmd) wts := cmd.raftCmd.ReplicatedEvalResult.WriteTimestamp if !wts.IsEmpty() && wts.LessEq(b.state.RaftClosedTimestamp) { wts := wts // Make a shadow variable that escapes to the heap. - var req string + var req redact.StringBuilder if cmd.proposal != nil { - req = cmd.proposal.Request.String() + req.Print(cmd.proposal.Request) } else { - req = "request unknown; not leaseholder" + req.SafeString("request unknown; not leaseholder") } return wrapWithNonDeterministicFailure(errors.AssertionFailedf( "command writing below closed timestamp; cmd: %x, write ts: %s, "+ "batch state closed: %s, command closed: %s, request: %s, lease: %s", cmd.idKey, wts, - b.state.RaftClosedTimestamp.String(), cmd.raftCmd.ClosedTimestamp, + b.state.RaftClosedTimestamp, cmd.raftCmd.ClosedTimestamp, req, b.state.Lease), "command writing below closed timestamp") } @@ -1067,14 +1067,14 @@ func (b *replicaAppBatch) assertNoCmdClosedTimestampRegression(cmd *replicatedCm if !raftClosedTimestampAssertionsEnabled { return nil } - existingClosed := b.state.RaftClosedTimestamp + existingClosed := &b.state.RaftClosedTimestamp newClosed := cmd.raftCmd.ClosedTimestamp - if newClosed != nil && !newClosed.IsEmpty() && newClosed.Less(existingClosed) { - var req string + if newClosed != nil && !newClosed.IsEmpty() && newClosed.Less(*existingClosed) { + var req redact.StringBuilder if cmd.IsLocal() && cmd.proposal.Request.IsIntentWrite() { - req = cmd.proposal.Request.String() + req.Print(cmd.proposal.Request) } else { - req = "" + req.SafeString("") } var prevReq redact.StringBuilder if req := b.closedTimestampSetter.leaseReq; req != nil { @@ -1086,7 +1086,7 @@ func (b *replicaAppBatch) assertNoCmdClosedTimestampRegression(cmd *replicatedCm return errors.AssertionFailedf( "raft closed timestamp regression in cmd: %x; batch state: %s, command: %s, lease: %s, req: %s, applying at LAI: %d.\n"+ "Closed timestamp was set by req: %s under lease: %s; applied at LAI: %d. Batch idx: %d.", - cmd.idKey, existingClosed.String(), newClosed.String(), b.state.Lease, req, cmd.leaseIndex, + cmd.idKey, existingClosed, newClosed, b.state.Lease, req, cmd.leaseIndex, prevReq, b.closedTimestampSetter.lease, b.closedTimestampSetter.leaseIdx, b.entries) } return nil diff --git a/pkg/roachpb/data.go b/pkg/roachpb/data.go index 20f67eb3f160..d770ef25849a 100644 --- a/pkg/roachpb/data.go +++ b/pkg/roachpb/data.go @@ -1852,25 +1852,29 @@ func (crt ChangeReplicasTrigger) Removed() []ReplicaDescriptor { // LeaseSequence is a custom type for a lease sequence number. type LeaseSequence int64 -// String implements the fmt.Stringer interface. -func (s LeaseSequence) String() string { - return strconv.FormatInt(int64(s), 10) -} +// SafeValue implements the redact.SafeValue interface. +func (s LeaseSequence) SafeValue() {} var _ fmt.Stringer = &Lease{} func (l Lease) String() string { + return redact.StringWithoutMarkers(l) +} + +// SafeFormat implements the redact.SafeFormatter interface. +func (l Lease) SafeFormat(w redact.SafePrinter, _ rune) { if l.Empty() { - return "" - } - var proposedSuffix string - if l.ProposedTS != nil { - proposedSuffix = fmt.Sprintf(" pro=%s", l.ProposedTS) + w.SafeString("") + return } if l.Type() == LeaseExpiration { - return fmt.Sprintf("repl=%s seq=%s start=%s exp=%s%s", l.Replica, l.Sequence, l.Start, l.Expiration, proposedSuffix) + w.Printf("repl=%s seq=%d start=%s exp=%s", l.Replica, l.Sequence, l.Start, l.Expiration) + } else { + w.Printf("repl=%s seq=%d start=%s epo=%d", l.Replica, l.Sequence, l.Start, l.Epoch) + } + if l.ProposedTS != nil { + w.Printf(" pro=%s", l.ProposedTS) } - return fmt.Sprintf("repl=%s seq=%s start=%s epo=%d%s", l.Replica, l.Sequence, l.Start, l.Epoch, proposedSuffix) } // Empty returns true for the Lease zero-value. diff --git a/pkg/roachpb/errors.go b/pkg/roachpb/errors.go index 450ab229462c..2109aa00f789 100644 --- a/pkg/roachpb/errors.go +++ b/pkg/roachpb/errors.go @@ -503,7 +503,7 @@ func (e *LeaseRejectedError) Error() string { } func (e *LeaseRejectedError) message(_ *Error) string { - return fmt.Sprintf("cannot replace lease %s with %s: %s", e.Existing, e.Requested, e.Message) + return fmt.Sprintf("cannot replace lease %s with %s: %s", e.Existing, e.Requested.String(), e.Message) } var _ ErrorDetailInterface = &LeaseRejectedError{}