diff --git a/pkg/ccl/changefeedccl/rowfetcher_cache.go b/pkg/ccl/changefeedccl/rowfetcher_cache.go index ca1f484d09b8..744b2cd00366 100644 --- a/pkg/ccl/changefeedccl/rowfetcher_cache.go +++ b/pkg/ccl/changefeedccl/rowfetcher_cache.go @@ -177,8 +177,7 @@ func (c *rowFetcherCache) RowFetcherForTableDesc( false, /* reverse */ descpb.ScanLockingStrength_FOR_NONE, descpb.ScanLockingWaitPolicy_BLOCK, - 0, /* lockTimeout */ - false, /* isCheck */ + 0, /* lockTimeout */ &c.a, nil, /* memMonitor */ rfArgs, diff --git a/pkg/ccl/cliccl/debug_backup.go b/pkg/ccl/cliccl/debug_backup.go index 118289c8d1f4..ceb2e101c7c9 100644 --- a/pkg/ccl/cliccl/debug_backup.go +++ b/pkg/ccl/cliccl/debug_backup.go @@ -600,8 +600,7 @@ func makeRowFetcher( false, /*reverse*/ descpb.ScanLockingStrength_FOR_NONE, descpb.ScanLockingWaitPolicy_BLOCK, - 0, /* lockTimeout */ - false, /*isCheck*/ + 0, /* lockTimeout */ &tree.DatumAlloc{}, nil, /*mon.BytesMonitor*/ table, diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 0a55dd365714..b0b1a30722ac 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -72,7 +72,6 @@ go_library( "distsql_plan_bulk.go", "distsql_plan_ctas.go", "distsql_plan_join.go", - "distsql_plan_scrub_physical.go", "distsql_plan_set_op.go", "distsql_plan_stats.go", "distsql_plan_window.go", @@ -177,7 +176,6 @@ go_library( "scrub_constraint.go", "scrub_fk.go", "scrub_index.go", - "scrub_physical.go", "select_name_resolution.go", "sequence.go", "sequence_select.go", diff --git a/pkg/sql/backfill/backfill.go b/pkg/sql/backfill/backfill.go index 0dcb4f8d842c..ff5c9c699c8e 100644 --- a/pkg/sql/backfill/backfill.go +++ b/pkg/sql/backfill/backfill.go @@ -156,8 +156,7 @@ func (cb *ColumnBackfiller) init( false, /* reverse */ descpb.ScanLockingStrength_FOR_NONE, descpb.ScanLockingWaitPolicy_BLOCK, - 0, /* lockTimeout */ - false, /* isCheck */ + 0, /* lockTimeout */ &cb.alloc, cb.mon, tableArgs, @@ -839,8 +838,7 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk( false, /* reverse */ descpb.ScanLockingStrength_FOR_NONE, descpb.ScanLockingWaitPolicy_BLOCK, - 0, /* lockTimeout */ - false, /* isCheck */ + 0, /* lockTimeout */ &ib.alloc, ib.mon, tableArgs, diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index ae37e3eb09ff..56bd1413b173 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -170,7 +170,7 @@ func supportedNatively(spec *execinfrapb.ProcessorSpec) error { return nil case spec.Core.TableReader != nil: - if spec.Core.TableReader.IsCheck { + if spec.Core.TableReader.DeprecatedIsCheck { return errors.Newf("scrub table reader is unsupported in vectorized") } return nil diff --git a/pkg/sql/colfetcher/colbatch_scan.go b/pkg/sql/colfetcher/colbatch_scan.go index a0ec7f42128c..649ba57c7c1d 100644 --- a/pkg/sql/colfetcher/colbatch_scan.go +++ b/pkg/sql/colfetcher/colbatch_scan.go @@ -194,7 +194,7 @@ func NewColBatchScan( if nodeID, ok := flowCtx.NodeID.OptionalNodeID(); nodeID == 0 && ok { return nil, errors.Errorf("attempting to create a ColBatchScan with uninitialized NodeID") } - if spec.IsCheck { + if spec.DeprecatedIsCheck { // cFetchers don't support these checks. return nil, errors.AssertionFailedf("attempting to create a cFetcher with the IsCheck flag set") } diff --git a/pkg/sql/delete_range.go b/pkg/sql/delete_range.go index 82a3b0945a2e..cdfef904de9f 100644 --- a/pkg/sql/delete_range.go +++ b/pkg/sql/delete_range.go @@ -99,8 +99,7 @@ func (d *deleteRangeNode) startExec(params runParams) error { false, /* reverse */ descpb.ScanLockingStrength_FOR_NONE, descpb.ScanLockingWaitPolicy_BLOCK, - 0, /* lockTimeout */ - false, /* isCheck */ + 0, /* lockTimeout */ params.p.alloc, nil, /* memMonitor */ table, diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 7c107b3feddf..242ee0703e45 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -1102,11 +1102,13 @@ func getIndexIdx(index catalog.Index, desc catalog.TableDescriptor) (uint32, err func initTableReaderSpec( n *scanNode, ) (*execinfrapb.TableReaderSpec, execinfrapb.PostProcessSpec, error) { + if n.isCheck { + return nil, execinfrapb.PostProcessSpec{}, errors.AssertionFailedf("isCheck no longer supported") + } s := physicalplan.NewTableReaderSpec() *s = execinfrapb.TableReaderSpec{ Table: *n.desc.TableDesc(), Reverse: n.reverse, - IsCheck: n.isCheck, Visibility: n.colCfg.visibility, LockingStrength: n.lockingStrength, LockingWaitPolicy: n.lockingWaitPolicy, @@ -1122,13 +1124,6 @@ func initTableReaderSpec( } s.IndexIdx = indexIdx - // When a TableReader is running scrub checks, do not allow a - // post-processor. This is because the outgoing stream is a fixed - // format (rowexec.ScrubTypes). - if n.isCheck { - return s, execinfrapb.PostProcessSpec{}, nil - } - var post execinfrapb.PostProcessSpec if n.hardLimit != 0 { post.Limit = uint64(n.hardLimit) @@ -4044,7 +4039,6 @@ func checkScanParallelizationIfLocal( } return true, nil case *scanNode: - prohibitParallelization = n.isCheck if len(n.reqOrdering) == 0 && n.parallelize { hasScanNodeToParallelize = true } diff --git a/pkg/sql/distsql_plan_scrub_physical.go b/pkg/sql/distsql_plan_scrub_physical.go deleted file mode 100644 index 0fc2ef81d61f..000000000000 --- a/pkg/sql/distsql_plan_scrub_physical.go +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright 2017 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package sql - -import ( - "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" - "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" - "github.com/cockroachdb/cockroach/pkg/sql/rowexec" -) - -// createScrubPhysicalCheck generates a plan for running a physical -// check for an index. The plan consists of TableReaders, with IsCheck -// enabled, that scan an index span. By having IsCheck enabled, the -// TableReaders will only emit errors encountered during scanning -// instead of row data. The plan is finalized. -func (dsp *DistSQLPlanner) createScrubPhysicalCheck( - planCtx *PlanningCtx, n *scanNode, -) (*PhysicalPlan, error) { - spec, _, err := initTableReaderSpec(n) - if err != nil { - return nil, err - } - - spanPartitions, err := dsp.PartitionSpans(planCtx, n.spans) - if err != nil { - return nil, err - } - - corePlacement := make([]physicalplan.ProcessorCorePlacement, len(spanPartitions)) - for i, sp := range spanPartitions { - tr := &execinfrapb.TableReaderSpec{} - *tr = *spec - tr.Spans = sp.Spans - - corePlacement[i].NodeID = sp.Node - corePlacement[i].Core.TableReader = tr - } - - p := planCtx.NewPhysicalPlan() - p.AddNoInputStage(corePlacement, execinfrapb.PostProcessSpec{}, rowexec.ScrubTypes, execinfrapb.Ordering{}) - p.PlanToStreamColMap = identityMapInPlace(make([]int, len(rowexec.ScrubTypes))) - - dsp.FinalizePlan(planCtx, p) - return p, nil -} diff --git a/pkg/sql/distsql_spec_exec_factory.go b/pkg/sql/distsql_spec_exec_factory.go index 4b46842d7e91..5ca2ca449536 100644 --- a/pkg/sql/distsql_spec_exec_factory.go +++ b/pkg/sql/distsql_spec_exec_factory.go @@ -235,7 +235,6 @@ func (e *distSQLSpecExecFactory) ConstructScan( *trSpec = execinfrapb.TableReaderSpec{ Table: *tabDesc.TableDesc(), Reverse: params.Reverse, - IsCheck: false, Visibility: colCfg.visibility, HasSystemColumns: scanContainsSystemColumns(&colCfg), } diff --git a/pkg/sql/execinfrapb/processors_sql.proto b/pkg/sql/execinfrapb/processors_sql.proto index 5c3042cfac6a..031b30a3b0e7 100644 --- a/pkg/sql/execinfrapb/processors_sql.proto +++ b/pkg/sql/execinfrapb/processors_sql.proto @@ -57,11 +57,8 @@ enum ScanVisibility { // columns of the rows that pass a filter expression. // // The "internal columns" of a TableReader (see ProcessorSpec) are all the -// columns of the table. If is_check is set, the TableReader will run additional -// data checking procedures and the "internal columns" are: -// - Error type (string). -// - Primary key as a string, if it was obtainable. -// - JSON of all decoded column values. +// columns of the table. Internally, only the values for the columns specified +// by needed_columns are to be populated. message TableReaderSpec { optional sqlbase.TableDescriptor table = 1 [(gogoproto.nullable) = false]; // If 0, we use the primary index. If non-zero, we use the index_idx-th index, @@ -80,8 +77,8 @@ message TableReaderSpec { optional int64 limit_hint = 5 [(gogoproto.nullable) = false]; // Indicates whether the TableReader is being run as an exhaustive - // check. This is only true during SCRUB commands. - optional bool is_check = 6 [(gogoproto.nullable) = false]; + // check. This is only true during SCRUB commands. No longer supported. + optional bool deprecated_is_check = 6 [(gogoproto.nullable) = false]; // Indicates the visibility level of the columns that should be returned. // Normally, will be set to PUBLIC. Will be set to PUBLIC_AND_NOT_PUBLIC if diff --git a/pkg/sql/indexbackfiller_test.go b/pkg/sql/indexbackfiller_test.go index dafc8e90c115..e5347fb15785 100644 --- a/pkg/sql/indexbackfiller_test.go +++ b/pkg/sql/indexbackfiller_test.go @@ -385,7 +385,6 @@ INSERT INTO foo VALUES (1), (10), (100); descpb.ScanLockingStrength_FOR_NONE, descpb.ScanLockingWaitPolicy_BLOCK, 0, - false, &alloc, mm.Monitor(), row.FetcherTableArgs{ diff --git a/pkg/sql/logictest/testdata/logic_test/scrub b/pkg/sql/logictest/testdata/logic_test/scrub index a100e6da1008..addac1956a08 100644 --- a/pkg/sql/logictest/testdata/logic_test/scrub +++ b/pkg/sql/logictest/testdata/logic_test/scrub @@ -22,15 +22,14 @@ query TTTTTTTT EXPERIMENTAL SCRUB TABLE t ----- -query TTTTTTTT +statement error not implemented EXPERIMENTAL SCRUB TABLE t WITH OPTIONS PHYSICAL ------ query TTTTTTTT EXPERIMENTAL SCRUB TABLE t WITH OPTIONS INDEX ALL ------ -query TTTTTTTT +statement error not implemented EXPERIMENTAL SCRUB TABLE t WITH OPTIONS PHYSICAL, INDEX (name_idx) ----- @@ -145,7 +144,7 @@ CREATE TABLE test.order (a INT, b INT, c INT, CONSTRAINT "primary" PRIMARY KEY ( statement ok INSERT INTO test.order VALUES (0, 0, 0), (0, 0, 1), (0, 1, 0), (0, 1, 1), (1, 0, 0); -query TTTTTTTT +statement error not implemented EXPERIMENTAL SCRUB TABLE test.order WITH OPTIONS PHYSICAL # Test that scrubbing timestamp works as expected. diff --git a/pkg/sql/row/errors.go b/pkg/sql/row/errors.go index 9b5b7ee3fe2b..0047d139a905 100644 --- a/pkg/sql/row/errors.go +++ b/pkg/sql/row/errors.go @@ -242,8 +242,7 @@ func DecodeRowInfo( false, /* reverse */ descpb.ScanLockingStrength_FOR_NONE, descpb.ScanLockingWaitPolicy_BLOCK, - 0, /* lockTimeout */ - false, /* isCheck */ + 0, /* lockTimeout */ &tree.DatumAlloc{}, nil, /* memMonitor */ tableArgs, diff --git a/pkg/sql/row/fetcher.go b/pkg/sql/row/fetcher.go index 1d4011718ae8..298490f05f88 100644 --- a/pkg/sql/row/fetcher.go +++ b/pkg/sql/row/fetcher.go @@ -126,18 +126,6 @@ type tableInfo struct { // changefeeds use this by providing raw kvs with tombstones unfiltered via // `StartScanFrom`. rowIsDeleted bool - - // hasLast indicates whether there was a previously scanned k/v. - hasLast bool - // lastDatums is a buffer for the current key. It is only present when - // doing a physical check in order to verify round-trip encoding. - // It is required because Fetcher.kv is overwritten before NextRow - // returns. - lastKV roachpb.KeyValue - // lastDatums is a buffer for the previously scanned k/v datums. It is - // only present when doing a physical check in order to verify - // ordering. - lastDatums tree.Datums } // FetcherTableArgs are the arguments passed to Fetcher.Init @@ -250,10 +238,6 @@ type Fetcher struct { keyRemainingBytes []byte kvEnd bool - // isCheck indicates whether or not we are running checks for k/v - // correctness. It is set only during SCRUB commands. - isCheck bool - // IgnoreUnexpectedNulls allows Fetcher to return null values for non-nullable // columns and is only used for decoding for error messages or debugging. IgnoreUnexpectedNulls bool @@ -297,7 +281,6 @@ func (rf *Fetcher) Init( lockStrength descpb.ScanLockingStrength, lockWaitPolicy descpb.ScanLockingWaitPolicy, lockTimeout time.Duration, - isCheck bool, alloc *tree.DatumAlloc, memMonitor *mon.BytesMonitor, tableArgs FetcherTableArgs, @@ -308,7 +291,6 @@ func (rf *Fetcher) Init( rf.lockWaitPolicy = lockWaitPolicy rf.lockTimeout = lockTimeout rf.alloc = alloc - rf.isCheck = isCheck if memMonitor != nil { rf.mon = mon.NewMonitorInheritWithLimit("fetcher-mem", 0 /* limit */, memMonitor) @@ -1169,9 +1151,6 @@ func (rf *Fetcher) NextRow( log.VEventf(ctx, 2, "fetched: %s -> %s", prettyKey, prettyVal) } - if rf.isCheck { - rf.table.lastKV = rf.kv - } rowDone, err := rf.NextKey(ctx) if err != nil { return nil, nil, nil, err @@ -1228,194 +1207,6 @@ func (rf *Fetcher) RowIsDeleted() bool { return rf.table.rowIsDeleted } -// NextRowWithErrors calls NextRow to fetch the next row and also run -// additional additional logic for physical checks. The Datums should -// not be modified and are only valid until the next call. When there -// are no more rows, the Datums is nil. The checks executed include: -// - k/v data round-trips, i.e. it decodes and re-encodes to the same -// value. -// - There is no extra unexpected or incorrect data encoded in the k/v -// pair. -// - Decoded keys follow the same ordering as their encoding. -func (rf *Fetcher) NextRowWithErrors(ctx context.Context) (rowenc.EncDatumRow, error) { - row, table, index, err := rf.NextRow(ctx) - if row == nil { - return nil, nil - } else if err != nil { - // If this is not already a wrapped error, we will consider it to be - // a generic physical error. - // FIXME(joey): This may not be needed if we capture all the errors - // encountered. This is a TBD when this change is polished. - if !scrub.IsScrubError(err) { - err = scrub.WrapError(scrub.PhysicalError, err) - } - return row, err - } - - // Decode the row in-place. The following check datum encoding - // functions require that the table.row datums are decoded. - for i := range row { - if row[i].IsUnset() { - rf.table.decodedRow[i] = tree.DNull - continue - } - if err := row[i].EnsureDecoded(rf.table.cols[i].GetType(), rf.alloc); err != nil { - return nil, err - } - rf.table.decodedRow[i] = row[i].Datum - } - - if index.GetID() == table.GetPrimaryIndexID() { - err = rf.checkPrimaryIndexDatumEncodings(ctx) - } else { - err = rf.checkSecondaryIndexDatumEncodings(ctx) - } - if err != nil { - return row, err - } - - err = rf.checkKeyOrdering(ctx) - - return row, err -} - -// checkPrimaryIndexDatumEncodings will run a round-trip encoding check -// on all values in the buffered row. This check is specific to primary -// index datums. -func (rf *Fetcher) checkPrimaryIndexDatumEncodings(ctx context.Context) error { - table := &rf.table - scratch := make([]byte, 1024) - colIDToColumn := make(map[descpb.ColumnID]catalog.Column) - for _, col := range table.desc.PublicColumns() { - colIDToColumn[col.GetID()] = col - } - - rh := rowHelper{TableDesc: table.desc, Indexes: table.desc.PublicNonPrimaryIndexes()} - - return table.desc.ForeachFamily(func(family *descpb.ColumnFamilyDescriptor) error { - var lastColID descpb.ColumnID - familyID := family.ID - familySortedColumnIDs, ok := rh.sortedColumnFamily(familyID) - if !ok { - return errors.AssertionFailedf("invalid family sorted column id map for family %d", familyID) - } - - for _, colID := range familySortedColumnIDs { - rowVal := table.row[table.colIdxMap.GetDefault(colID)] - if rowVal.IsNull() { - // Column is not present. - continue - } - - if skip, err := rh.skipColumnNotInPrimaryIndexValue(colID, rowVal.Datum); err != nil { - return errors.NewAssertionErrorWithWrappedErrf(err, "unable to determine skip") - } else if skip { - continue - } - - col := colIDToColumn[colID] - if col == nil { - return errors.AssertionFailedf("column mapping not found for column %d", colID) - } - - if lastColID > col.GetID() { - return errors.AssertionFailedf("cannot write column id %d after %d", col.GetID(), lastColID) - } - colIDDelta := valueside.MakeColumnIDDelta(lastColID, col.GetID()) - lastColID = col.GetID() - - if result, err := valueside.Encode([]byte(nil), colIDDelta, rowVal.Datum, - scratch); err != nil { - return errors.NewAssertionErrorWithWrappedErrf(err, "could not re-encode column %s, value was %#v", - col.GetName(), rowVal.Datum) - } else if !rowVal.BytesEqual(result) { - return scrub.WrapError(scrub.IndexValueDecodingError, errors.Errorf( - "value failed to round-trip encode. Column=%s colIDDelta=%d Key=%s expected %#v, got: %#v", - col.GetName(), colIDDelta, rf.kv.Key, rowVal.EncodedString(), result)) - } - } - return nil - }) -} - -// checkSecondaryIndexDatumEncodings will run a round-trip encoding -// check on all values in the buffered row. This check is specific to -// secondary index datums. -func (rf *Fetcher) checkSecondaryIndexDatumEncodings(ctx context.Context) error { - table := &rf.table - colToEncDatum := make(map[descpb.ColumnID]rowenc.EncDatum, len(table.row)) - values := make(tree.Datums, len(table.row)) - for i, col := range table.cols { - colToEncDatum[col.GetID()] = table.row[i] - values[i] = table.row[i].Datum - } - - // The below code makes incorrect checks (#45256). - indexEntries, err := rowenc.EncodeSecondaryIndex( - rf.codec, table.desc, table.index, table.colIdxMap, values, false /* includeEmpty */) - if err != nil { - return err - } - - for _, indexEntry := range indexEntries { - // We ignore the first 4 bytes of the values. These bytes are a - // checksum which are not set by EncodeSecondaryIndex. - if !indexEntry.Key.Equal(rf.table.lastKV.Key) { - return scrub.WrapError(scrub.IndexKeyDecodingError, errors.Errorf( - "secondary index key failed to round-trip encode. expected %#v, got: %#v", - rf.table.lastKV.Key, indexEntry.Key)) - } else if !indexEntry.Value.EqualTagAndData(table.lastKV.Value) { - return scrub.WrapError(scrub.IndexValueDecodingError, errors.Errorf( - "secondary index value failed to round-trip encode. expected %#v, got: %#v", - rf.table.lastKV.Value, indexEntry.Value)) - } - } - return nil -} - -// checkKeyOrdering verifies that the datums decoded for the current key -// have the same ordering as the encoded key. -func (rf *Fetcher) checkKeyOrdering(ctx context.Context) error { - defer func() { - rf.table.lastDatums = append(tree.Datums(nil), rf.table.decodedRow...) - }() - - if !rf.table.hasLast { - rf.table.hasLast = true - return nil - } - - evalCtx := tree.EvalContext{} - // Iterate through columns in order, comparing each value to the value in the - // previous row in that column. When the first column with a differing value - // is found, compare the values to ensure the ordering matches the column - // ordering. - for i := 0; i < rf.table.index.NumKeyColumns(); i++ { - id := rf.table.index.GetKeyColumnID(i) - idx := rf.table.colIdxMap.GetDefault(id) - result := rf.table.decodedRow[idx].Compare(&evalCtx, rf.table.lastDatums[idx]) - expectedDirection := rf.table.index.GetKeyColumnDirection(i) - if rf.reverse && expectedDirection == descpb.IndexDescriptor_ASC { - expectedDirection = descpb.IndexDescriptor_DESC - } else if rf.reverse && expectedDirection == descpb.IndexDescriptor_DESC { - expectedDirection = descpb.IndexDescriptor_ASC - } - - if result != 0 { - if expectedDirection == descpb.IndexDescriptor_ASC && result < 0 || - expectedDirection == descpb.IndexDescriptor_DESC && result > 0 { - return scrub.WrapError(scrub.IndexKeyDecodingError, - errors.Errorf("key ordering did not match datum ordering. IndexDescriptor=%s", - expectedDirection)) - } - // After the first column with a differing value is found, the remaining - // columns are skipped (see #32874). - break - } - } - return nil -} - func (rf *Fetcher) finalizeRow() error { table := &rf.table @@ -1449,15 +1240,10 @@ func (rf *Fetcher) finalizeRow() error { indexColValues = append(indexColValues, "?") } } - err := errors.AssertionFailedf( + return errors.AssertionFailedf( "Non-nullable column \"%s:%s\" with no value! Index scanned was %q with the index key columns (%s) and the values (%s)", table.desc.GetName(), table.cols[i].GetName(), table.index.GetName(), strings.Join(table.index.IndexDesc().KeyColumnNames, ","), strings.Join(indexColValues, ",")) - - if rf.isCheck { - return scrub.WrapError(scrub.UnexpectedNullValueError, err) - } - return err } table.row[i] = rowenc.EncDatum{ Datum: tree.DNull, diff --git a/pkg/sql/row/fetcher_mvcc_test.go b/pkg/sql/row/fetcher_mvcc_test.go index fce1ce4b04c5..8eb0c32d1959 100644 --- a/pkg/sql/row/fetcher_mvcc_test.go +++ b/pkg/sql/row/fetcher_mvcc_test.go @@ -103,8 +103,7 @@ func TestRowFetcherMVCCMetadata(t *testing.T) { false, /* reverse */ descpb.ScanLockingStrength_FOR_NONE, descpb.ScanLockingWaitPolicy_BLOCK, - 0, /* lockTimeout */ - true, /* isCheck */ + 0, /* lockTimeout */ &tree.DatumAlloc{}, nil, /* memMonitor */ table, diff --git a/pkg/sql/row/fetcher_test.go b/pkg/sql/row/fetcher_test.go index fba816bc4517..5b51e0acf09e 100644 --- a/pkg/sql/row/fetcher_test.go +++ b/pkg/sql/row/fetcher_test.go @@ -69,8 +69,7 @@ func initFetcher( reverseScan, descpb.ScanLockingStrength_FOR_NONE, descpb.ScanLockingWaitPolicy_BLOCK, - 0, /* lockTimeout */ - false, /* isCheck */ + 0, /* lockTimeout */ alloc, memMon, fetcherArgs, @@ -766,8 +765,7 @@ func TestRowFetcherReset(t *testing.T) { false, /*reverse*/ descpb.ScanLockingStrength_FOR_NONE, descpb.ScanLockingWaitPolicy_BLOCK, - 0, /* lockTimeout */ - false, /* isCheck */ + 0, /* lockTimeout */ &da, nil, /* memMonitor */ fetcherArgs, diff --git a/pkg/sql/rowexec/BUILD.bazel b/pkg/sql/rowexec/BUILD.bazel index abd6579165cd..d5fcb11016f1 100644 --- a/pkg/sql/rowexec/BUILD.bazel +++ b/pkg/sql/rowexec/BUILD.bazel @@ -27,7 +27,6 @@ go_library( "rowfetcher.go", "sample_aggregator.go", "sampler.go", - "scrub_tablereader.go", "sorter.go", "stats.go", "stream_group_accumulator.go", diff --git a/pkg/sql/rowexec/inverted_joiner.go b/pkg/sql/rowexec/inverted_joiner.go index 0cbc67986201..6b9864b9230d 100644 --- a/pkg/sql/rowexec/inverted_joiner.go +++ b/pkg/sql/rowexec/inverted_joiner.go @@ -322,7 +322,7 @@ func newInvertedJoiner( // and so do not need to see in-progress schema changes. _, _, err = initRowFetcher( flowCtx, &fetcher, ij.desc, int(spec.IndexIdx), ij.colIdxMap, false, /* reverse */ - allIndexCols, false /* isCheck */, flowCtx.EvalCtx.Mon, &ij.alloc, execinfra.ScanVisibilityPublic, + allIndexCols, flowCtx.EvalCtx.Mon, &ij.alloc, execinfra.ScanVisibilityPublic, descpb.ScanLockingStrength_FOR_NONE, descpb.ScanLockingWaitPolicy_BLOCK, false /* withSystemColumns */, nil, /* virtualColumn */ ) diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index 9400612d81a8..c98489208837 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -409,7 +409,7 @@ func newJoinReader( var fetcher row.Fetcher _, _, err = initRowFetcher( flowCtx, &fetcher, jr.desc, int(spec.IndexIdx), jr.colIdxMap, false, /* reverse */ - rightCols, false /* isCheck */, jr.EvalCtx.Mon, &jr.alloc, spec.Visibility, spec.LockingStrength, + rightCols, jr.EvalCtx.Mon, &jr.alloc, spec.Visibility, spec.LockingStrength, spec.LockingWaitPolicy, spec.HasSystemColumns, nil, /* virtualColumn */ ) diff --git a/pkg/sql/rowexec/processors.go b/pkg/sql/rowexec/processors.go index 16e7c83afacc..b649964e85e8 100644 --- a/pkg/sql/rowexec/processors.go +++ b/pkg/sql/rowexec/processors.go @@ -133,8 +133,8 @@ func NewProcessor( if err := checkNumInOut(inputs, outputs, 0, 1); err != nil { return nil, err } - if core.TableReader.IsCheck { - return newScrubTableReader(flowCtx, processorID, core.TableReader, post, outputs[0]) + if core.TableReader.DeprecatedIsCheck { + return nil, errors.New("scrubbing TableReader no longer implemented") } return newTableReader(flowCtx, processorID, core.TableReader, post, outputs[0]) } diff --git a/pkg/sql/rowexec/rowfetcher.go b/pkg/sql/rowexec/rowfetcher.go index 42de1c8bfdf2..d2a2d5648f8f 100644 --- a/pkg/sql/rowexec/rowfetcher.go +++ b/pkg/sql/rowexec/rowfetcher.go @@ -57,7 +57,6 @@ type rowFetcher interface { PartialKey(int) (roachpb.Key, error) Reset() GetBytesRead() int64 - NextRowWithErrors(context.Context) (rowenc.EncDatumRow, error) // Close releases any resources held by this fetcher. Close(ctx context.Context) } @@ -71,7 +70,6 @@ func initRowFetcher( colIdxMap catalog.TableColMap, reverseScan bool, valNeededForCol util.FastIntSet, - isCheck bool, mon *mon.BytesMonitor, alloc *tree.DatumAlloc, scanVisibility execinfrapb.ScanVisibility, @@ -102,7 +100,6 @@ func initRowFetcher( lockStrength, lockWaitPolicy, flowCtx.EvalCtx.SessionData().LockTimeout, - isCheck, alloc, mon, tableArgs, diff --git a/pkg/sql/rowexec/scrub_tablereader.go b/pkg/sql/rowexec/scrub_tablereader.go deleted file mode 100644 index 1f2e86f4ee89..000000000000 --- a/pkg/sql/rowexec/scrub_tablereader.go +++ /dev/null @@ -1,265 +0,0 @@ -// Copyright 2018 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package rowexec - -import ( - "bytes" - "context" - - "github.com/cockroachdb/cockroach/pkg/sql/catalog" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" - "github.com/cockroachdb/cockroach/pkg/sql/execinfra" - "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" - "github.com/cockroachdb/cockroach/pkg/sql/row" - "github.com/cockroachdb/cockroach/pkg/sql/rowenc" - "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" - "github.com/cockroachdb/cockroach/pkg/sql/scrub" - "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/types" - "github.com/cockroachdb/cockroach/pkg/util" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/errors" -) - -// ScrubTypes is the schema for TableReaders that are doing a SCRUB -// check. This schema is what TableReader output streams are overrided -// to for check. The column types correspond to: -// - Error type. -// - Primary key as a string, if it was obtainable. -// - JSON of all decoded column values. -// -// TODO(joey): If we want a way find the key for the error, we will need -// additional data such as the key bytes and the table descriptor ID. -// Repair won't be possible without this. -var ScrubTypes = []*types.T{ - types.String, - types.String, - types.Jsonb, -} - -type scrubTableReader struct { - tableReader - tableDesc catalog.TableDescriptor - // fetcherResultToColIdx maps Fetcher results to the column index in - // the TableDescriptor. This is only initialized and used during scrub - // physical checks. - fetcherResultToColIdx []int - // indexIdx refers to the index being scanned. This is only used - // during scrub physical checks. - indexIdx int -} - -var _ execinfra.Processor = &scrubTableReader{} -var _ execinfra.RowSource = &scrubTableReader{} - -var scrubTableReaderProcName = "scrub" - -// newScrubTableReader creates a scrubTableReader. -func newScrubTableReader( - flowCtx *execinfra.FlowCtx, - processorID int32, - spec *execinfrapb.TableReaderSpec, - post *execinfrapb.PostProcessSpec, - output execinfra.RowReceiver, -) (*scrubTableReader, error) { - // NB: we hit this with a zero NodeID (but !ok) with multi-tenancy. - if nodeID, ok := flowCtx.NodeID.OptionalNodeID(); nodeID == 0 && ok { - return nil, errors.Errorf("attempting to create a tableReader with uninitialized NodeID") - } - tr := &scrubTableReader{ - indexIdx: int(spec.IndexIdx), - } - - tr.tableDesc = flowCtx.TableDescriptor(&spec.Table) - tr.limitHint = rowinfra.RowLimit(execinfra.LimitHint(spec.LimitHint, post)) - - if err := tr.Init( - tr, - post, - ScrubTypes, - flowCtx, - processorID, - output, - nil, /* memMonitor */ - execinfra.ProcStateOpts{ - // We don't pass tr.input as an inputToDrain; tr.input is just an adapter - // on top of a Fetcher; draining doesn't apply to it. Moreover, Andrei - // doesn't trust that the adapter will do the right thing on a Next() call - // after it had previously returned an error. - InputsToDrain: nil, - TrailingMetaCallback: tr.generateTrailingMeta, - }, - ); err != nil { - return nil, err - } - - var neededColumns util.FastIntSet - // If we are doing a scrub physical check, NeededColumns needs to be - // changed to be all columns available in the index we are scanning. - // This is because the emitted schema is ScrubTypes so NeededColumns - // does not correctly represent the data being scanned. - if spec.IndexIdx == 0 { - neededColumns.AddRange(0, len(tr.tableDesc.PublicColumns())-1) - for i := range tr.tableDesc.PublicColumns() { - tr.fetcherResultToColIdx = append(tr.fetcherResultToColIdx, i) - } - } else { - colIdxMap := catalog.ColumnIDToOrdinalMap(tr.tableDesc.PublicColumns()) - idx := tr.tableDesc.PublicNonPrimaryIndexes()[spec.IndexIdx-1] - colIDs := idx.CollectKeyColumnIDs() - colIDs.UnionWith(idx.CollectSecondaryStoredColumnIDs()) - colIDs.UnionWith(idx.CollectKeySuffixColumnIDs()) - colIDs.ForEach(func(colID descpb.ColumnID) { - neededColumns.Add(colIdxMap.GetDefault(colID)) - }) - } - - var fetcher row.Fetcher - if _, _, err := initRowFetcher( - flowCtx, &fetcher, tr.tableDesc, int(spec.IndexIdx), catalog.ColumnIDToOrdinalMap(tr.tableDesc.PublicColumns()), - spec.Reverse, neededColumns, true /* isCheck */, flowCtx.EvalCtx.Mon, &tr.alloc, - execinfra.ScanVisibilityPublic, spec.LockingStrength, spec.LockingWaitPolicy, - false /* withSystemColumns */, nil, /* virtualColumn */ - ); err != nil { - return nil, err - } - tr.fetcher = &fetcher - - tr.Spans = spec.Spans - tr.MakeSpansCopy() - - return tr, nil -} - -// generateScrubErrorRow will create an EncDatumRow describing a -// physical check error encountered when scanning table data. The schema -// of the EncDatumRow is the ScrubTypes constant. -func (tr *scrubTableReader) generateScrubErrorRow( - row rowenc.EncDatumRow, scrubErr *scrub.Error, -) (rowenc.EncDatumRow, error) { - details := make(map[string]interface{}) - index := tr.tableDesc.ActiveIndexes()[tr.indexIdx] - // Collect all the row values into JSON - rowDetails := make(map[string]interface{}) - for i, colIdx := range tr.fetcherResultToColIdx { - col := tr.tableDesc.PublicColumns()[colIdx] - // TODO(joey): We should maybe try to get the underlying type. - rowDetails[col.GetName()] = row[i].String(col.GetType()) - } - details["row_data"] = rowDetails - details["index_name"] = index.GetName() - details["error_message"] = scrub.UnwrapScrubError(error(scrubErr)).Error() - - detailsJSON, err := tree.MakeDJSON(details) - if err != nil { - return nil, err - } - - primaryKeyValues := tr.prettyPrimaryKeyValues(row, tr.tableDesc.TableDesc()) - return rowenc.EncDatumRow{ - rowenc.DatumToEncDatum( - ScrubTypes[0], - tree.NewDString(scrubErr.Code), - ), - rowenc.DatumToEncDatum( - ScrubTypes[1], - tree.NewDString(primaryKeyValues), - ), - rowenc.DatumToEncDatum( - ScrubTypes[2], - detailsJSON, - ), - }, nil -} - -func (tr *scrubTableReader) prettyPrimaryKeyValues( - row rowenc.EncDatumRow, table *descpb.TableDescriptor, -) string { - var colIdxMap catalog.TableColMap - for i := range table.Columns { - id := table.Columns[i].ID - colIdxMap.Set(id, i) - } - var colIDToRowIdxMap catalog.TableColMap - for rowIdx, colIdx := range tr.fetcherResultToColIdx { - colIDToRowIdxMap.Set(tr.tableDesc.PublicColumns()[colIdx].GetID(), rowIdx) - } - var primaryKeyValues bytes.Buffer - primaryKeyValues.WriteByte('(') - for i, id := range table.PrimaryIndex.KeyColumnIDs { - if i > 0 { - primaryKeyValues.WriteByte(',') - } - primaryKeyValues.WriteString( - row[colIDToRowIdxMap.GetDefault(id)].String(table.Columns[colIdxMap.GetDefault(id)].Type)) - } - primaryKeyValues.WriteByte(')') - return primaryKeyValues.String() -} - -// Start is part of the RowSource interface. -func (tr *scrubTableReader) Start(ctx context.Context) { - if tr.FlowCtx.Txn == nil { - tr.MoveToDraining(errors.Errorf("scrubTableReader outside of txn")) - } - - ctx = tr.StartInternal(ctx, scrubTableReaderProcName) - - log.VEventf(ctx, 1, "starting") - - if err := tr.fetcher.StartScan( - ctx, tr.FlowCtx.Txn, tr.Spans, rowinfra.DefaultBatchBytesLimit, tr.limitHint, - tr.FlowCtx.TraceKV, tr.EvalCtx.TestingKnobs.ForceProductionBatchSizes, - ); err != nil { - tr.MoveToDraining(err) - } -} - -// Next is part of the RowSource interface. -func (tr *scrubTableReader) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { - for tr.State == execinfra.StateRunning { - var row rowenc.EncDatumRow - var err error - // If we are running a scrub physical check, we use a specialized - // procedure that runs additional checks while fetching the row - // data. - row, err = tr.fetcher.NextRowWithErrors(tr.Ctx) - // There are four cases that can happen after NextRowWithErrors: - // 1) We encounter a ScrubError. We do not propagate the error up, - // but instead generate and emit a row for the final results. - // 2) No errors were found. We simply continue scanning the data - // and discard the row values, as they are not needed for any - // results. - // 3) A non-scrub error was encountered. This was not considered a - // physical data error, and so we propagate this to the user - // immediately. - // 4) There was no error or row data. This signals that there is - // no more data to scan. - // - // NB: Cases 3 and 4 are handled further below, in the standard - // table scanning code path. - var v *scrub.Error - if errors.As(err, &v) { - row, err = tr.generateScrubErrorRow(row, v) - } else if err == nil && row != nil { - continue - } - if row == nil || err != nil { - tr.MoveToDraining(scrub.UnwrapScrubError(err)) - break - } - - if outRow := tr.ProcessRowHelper(row); outRow != nil { - return outRow, nil - } - } - return nil, tr.DrainHelper() -} diff --git a/pkg/sql/rowexec/tablereader.go b/pkg/sql/rowexec/tablereader.go index 5aedf22605ea..6d56b205f87a 100644 --- a/pkg/sql/rowexec/tablereader.go +++ b/pkg/sql/rowexec/tablereader.go @@ -153,7 +153,6 @@ func newTableReader( columnIdxMap, spec.Reverse, neededColumns, - spec.IsCheck, flowCtx.EvalCtx.Mon, &tr.alloc, spec.Visibility, diff --git a/pkg/sql/rowexec/zigzagjoiner.go b/pkg/sql/rowexec/zigzagjoiner.go index 7fff6b385ff6..910703475c7e 100644 --- a/pkg/sql/rowexec/zigzagjoiner.go +++ b/pkg/sql/rowexec/zigzagjoiner.go @@ -489,7 +489,6 @@ func (z *zigzagJoiner) setupInfo( catalog.ColumnIDToOrdinalMap(info.table.PublicColumns()), false, /* reverse */ neededCols, - false, /* check */ flowCtx.EvalCtx.Mon, info.alloc, execinfra.ScanVisibilityPublic, diff --git a/pkg/sql/scrub.go b/pkg/sql/scrub.go index d65c3a0a4b0b..42c818af1c64 100644 --- a/pkg/sql/scrub.go +++ b/pkg/sql/scrub.go @@ -22,7 +22,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/errors" ) @@ -224,6 +223,7 @@ func (n *scrubNode) startScrubTable( return err } n.run.checkQueue = append(n.run.checkQueue, checks...) + case *tree.ScrubOptionPhysical: if physicalCheckSet { return pgerror.Newf(pgcode.Syntax, @@ -234,8 +234,8 @@ func (n *scrubNode) startScrubTable( "cannot use AS OF SYSTEM TIME with PHYSICAL option") } physicalCheckSet = true - physicalChecks := createPhysicalCheckOperations(tableDesc, tableName) - n.run.checkQueue = append(n.run.checkQueue, physicalChecks...) + return pgerror.Newf(pgcode.FeatureNotSupported, "PHYSICAL scrub not implemented") + case *tree.ScrubOptionConstraint: if constraintsSet { return pgerror.Newf(pgcode.Syntax, @@ -248,6 +248,7 @@ func (n *scrubNode) startScrubTable( return err } n.run.checkQueue = append(n.run.checkQueue, constraintsToCheck...) + default: panic(errors.AssertionFailedf("unhandled SCRUB option received: %+v", v)) } @@ -269,8 +270,7 @@ func (n *scrubNode) startScrubTable( } n.run.checkQueue = append(n.run.checkQueue, constraintsToCheck...) - physicalChecks := createPhysicalCheckOperations(tableDesc, tableName) - n.run.checkQueue = append(n.run.checkQueue, physicalChecks...) + // Physical checks are no longer implemented. } return nil } @@ -335,17 +335,6 @@ func pairwiseOp(left []string, right []string, op string) []string { return res } -// createPhysicalCheckOperations will return the physicalCheckOperation -// for all indexes on a table. -func createPhysicalCheckOperations( - tableDesc catalog.TableDescriptor, tableName *tree.TableName, -) (checks []checkOperation) { - for _, idx := range tableDesc.ActiveIndexes() { - checks = append(checks, newPhysicalCheckOperation(tableName, tableDesc, idx)) - } - return checks -} - // createIndexCheckOperations will return the checkOperations for the // provided indexes. If indexNames is nil, then all indexes are // returned. @@ -468,40 +457,3 @@ func createConstraintCheckOperations( } return results, nil } - -// scrubRunDistSQL run a distSQLPhysicalPlan plan in distSQL. If non-nil -// rowContainerHelper is returned, the caller must close it. -func scrubRunDistSQL( - ctx context.Context, planCtx *PlanningCtx, p *planner, plan *PhysicalPlan, columnTypes []*types.T, -) (*rowContainerHelper, error) { - var rowContainer rowContainerHelper - rowContainer.Init(columnTypes, &p.extendedEvalCtx, "scrub" /* opName */) - rowResultWriter := NewRowResultWriter(&rowContainer) - recv := MakeDistSQLReceiver( - ctx, - rowResultWriter, - tree.Rows, - p.ExecCfg().RangeDescriptorCache, - p.txn, - p.ExecCfg().Clock, - p.extendedEvalCtx.Tracing, - p.ExecCfg().ContentionRegistry, - nil, /* testingPushCallback */ - ) - defer recv.Release() - - // Copy the evalCtx, as dsp.Run() might change it. - evalCtxCopy := p.extendedEvalCtx - p.extendedEvalCtx.DistSQLPlanner.Run( - planCtx, p.txn, plan, recv, &evalCtxCopy, nil, /* finishedSetupFn */ - )() - if rowResultWriter.Err() != nil { - rowContainer.Close(ctx) - return nil, rowResultWriter.Err() - } else if rowContainer.Len() == 0 { - rowContainer.Close(ctx) - return nil, nil - } - - return &rowContainer, nil -} diff --git a/pkg/sql/scrub_physical.go b/pkg/sql/scrub_physical.go deleted file mode 100644 index 9864b02d0942..000000000000 --- a/pkg/sql/scrub_physical.go +++ /dev/null @@ -1,199 +0,0 @@ -// Copyright 2017 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package sql - -import ( - "context" - "time" - - "github.com/cockroachdb/cockroach/pkg/sql/catalog" - "github.com/cockroachdb/cockroach/pkg/sql/rowexec" - "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/span" - "github.com/cockroachdb/errors" -) - -var _ checkOperation = &physicalCheckOperation{} - -// physicalCheckOperation is a check on an indexes physical data. -type physicalCheckOperation struct { - tableName *tree.TableName - tableDesc catalog.TableDescriptor - index catalog.Index - - // columns is a list of the columns returned in the query result - // tree.Datums. - columns []catalog.Column - // primaryColIdxs maps PrimaryIndex.Columns to the row - // indexes in the query result tree.Datums. - primaryColIdxs []int - - run physicalCheckRun -} - -// physicalCheckRun contains the run-time state for -// physicalCheckOperation during local execution. -type physicalCheckRun struct { - started bool - - rows *rowContainerHelper - iterator *rowContainerIterator - // If currentRow is nil, it means that all rows have been exhausted. - currentRow tree.Datums -} - -func newPhysicalCheckOperation( - tableName *tree.TableName, tableDesc catalog.TableDescriptor, index catalog.Index, -) *physicalCheckOperation { - return &physicalCheckOperation{ - tableName: tableName, - tableDesc: tableDesc, - index: index, - } -} - -// Start implements the checkOperation interface. -// It will plan and run the physical data check using the distSQL -// execution engine. -func (o *physicalCheckOperation) Start(params runParams) error { - ctx := params.ctx - // Collect all of the columns, their types, and their IDs. - var columnIDs []tree.ColumnID - colIDToIdx := catalog.ColumnIDToOrdinalMap(o.tableDesc.PublicColumns()) - columns := make([]catalog.Column, len(columnIDs)) - - // Collect all of the columns being scanned. - if o.index.GetID() == o.tableDesc.GetPrimaryIndexID() { - for _, c := range o.tableDesc.PublicColumns() { - columnIDs = append(columnIDs, c.GetID()) - } - } else { - for i := 0; i < o.index.NumKeyColumns(); i++ { - id := o.index.GetKeyColumnID(i) - columnIDs = append(columnIDs, id) - } - for i := 0; i < o.index.NumKeySuffixColumns(); i++ { - id := o.index.GetKeySuffixColumnID(i) - columnIDs = append(columnIDs, id) - } - for i := 0; i < o.index.NumSecondaryStoredColumns(); i++ { - id := o.index.GetStoredColumnID(i) - columnIDs = append(columnIDs, id) - } - } - - for i := range columnIDs { - idx := colIDToIdx.GetDefault(columnIDs[i]) - columns = append(columns, o.tableDesc.PublicColumns()[idx]) - } - - // Find the row indexes for all of the primary index columns. - primaryColIdxs, err := getPrimaryColIdxs(o.tableDesc, columns) - if err != nil { - return err - } - - indexFlags := &tree.IndexFlags{ - IndexID: o.index.GetID(), - NoIndexJoin: true, - } - scan := params.p.Scan() - scan.isCheck = true - colCfg := scanColumnsConfig{wantedColumns: columnIDs, addUnwantedAsHidden: true} - if err := scan.initTable(ctx, params.p, o.tableDesc, indexFlags, colCfg); err != nil { - return err - } - scan.index = scan.specifiedIndex - sb := span.MakeBuilder(params.EvalContext(), params.ExecCfg().Codec, o.tableDesc, o.index) - defer sb.Release() - scan.spans, err = sb.UnconstrainedSpans() - if err != nil { - return err - } - scan.isFull = true - - planCtx := params.extendedEvalCtx.DistSQLPlanner.NewPlanningCtx(ctx, params.extendedEvalCtx, params.p, params.p.txn, true /* distribute */) - // Since physicalCheckOperation might be only one of many check operations - // that scrubNode needs to perform, we need to make sure that scrubNode - // is not closed when this physical check operation is being cleaned up. - planCtx.ignoreClose = true - physPlan, err := params.extendedEvalCtx.DistSQLPlanner.createScrubPhysicalCheck(planCtx, scan) - if err != nil { - return err - } - - o.primaryColIdxs = primaryColIdxs - o.columns = columns - o.run.started = true - rows, err := scrubRunDistSQL(ctx, planCtx, params.p, physPlan, rowexec.ScrubTypes) - if rows == nil || err != nil { - // If either there were no rows that failed the check operation or an - // error was encountered, we short-circuit and don't set currentRow. - // This will indicate that we're done. - return err - } - o.run.rows = rows - o.run.iterator = newRowContainerIterator(ctx, *rows, rowexec.ScrubTypes) - o.run.currentRow, err = o.run.iterator.Next() - return err -} - -// Next implements the checkOperation interface. -func (o *physicalCheckOperation) Next(params runParams) (tree.Datums, error) { - timestamp, err := tree.MakeDTimestamp( - params.extendedEvalCtx.GetStmtTimestamp(), time.Nanosecond) - if err != nil { - return nil, err - } - - details, ok := o.run.currentRow[2].(*tree.DJSON) - if !ok { - return nil, errors.Errorf("expected row value 3 to be DJSON, got: %T", o.run.currentRow[2]) - } - - res := tree.Datums{ - // TODO(joey): Add the job UUID once the SCRUB command uses jobs. - tree.DNull, /* job_uuid */ - o.run.currentRow[0], /* errorType */ - tree.NewDString(o.tableName.Catalog()), - tree.NewDString(o.tableName.Table()), - o.run.currentRow[1], /* primaryKey */ - timestamp, - tree.DBoolFalse, - details, - } - - // Advance to the next row. - o.run.currentRow, err = o.run.iterator.Next() - return res, err -} - -// Started implements the checkOperation interface. -func (o *physicalCheckOperation) Started() bool { - return o.run.started -} - -// Done implements the checkOperation interface. -func (o *physicalCheckOperation) Done(context.Context) bool { - return o.run.currentRow == nil -} - -// Close implements the checkOperation interface. -func (o *physicalCheckOperation) Close(ctx context.Context) { - if o.run.rows != nil { - o.run.rows.Close(ctx) - o.run.rows = nil - } - if o.run.iterator != nil { - o.run.iterator.Close() - o.run.iterator = nil - } -} diff --git a/pkg/sql/scrub_test.go b/pkg/sql/scrub_test.go index e2ad1aa4e7f4..89ece54c6cf5 100644 --- a/pkg/sql/scrub_test.go +++ b/pkg/sql/scrub_test.go @@ -29,7 +29,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/scrub" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" - "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -560,396 +559,6 @@ ALTER TABLE t.child ADD FOREIGN KEY (parent_id, parent_id2) REFERENCES t.parent runScrub(t, db, `EXPERIMENTAL SCRUB TABLE t.child AS OF SYSTEM TIME '-1ms' WITH OPTIONS CONSTRAINT ALL`, exp) } -// TestScrubPhysicalNonnullableNullInSingleColumnFamily tests that -// `SCRUB TABLE ... WITH OPTIONS PHYSICAL` will find any rows where a -// value is NULL for a column that is not-nullable and the only column -// in a family. To test this, a row is created that we later overwrite -// the value for. The value that is inserted is the sentinel value as -// the column is the only one in the family. -func TestScrubPhysicalNonnullableNullInSingleColumnFamily(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) - defer s.Stopper().Stop(context.Background()) - - // Create the table and the row entry. - if _, err := db.Exec(` -CREATE DATABASE t; -CREATE TABLE t.test (k INT PRIMARY KEY, v INT NOT NULL); -INSERT INTO t.test VALUES (217, 314); -`); err != nil { - t.Fatalf("unexpected error: %s", err) - } - - tableDesc := catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "t", "test") - - // Construct datums for our row values (k, v). - values := []tree.Datum{tree.NewDInt(217), tree.NewDInt(314)} - - var colIDtoRowIndex catalog.TableColMap - colIDtoRowIndex.Set(tableDesc.PublicColumns()[0].GetID(), 0) - colIDtoRowIndex.Set(tableDesc.PublicColumns()[1].GetID(), 1) - - // Create the primary index key - primaryIndexKeyPrefix := rowenc.MakeIndexKeyPrefix( - keys.SystemSQLCodec, tableDesc.GetID(), tableDesc.GetPrimaryIndexID()) - primaryIndexKey, _, err := rowenc.EncodeIndexKey( - tableDesc, tableDesc.GetPrimaryIndex(), colIDtoRowIndex, values, primaryIndexKeyPrefix) - if err != nil { - t.Fatalf("unexpected error: %s", err) - } - - // Add the family suffix to the key. - family := tableDesc.GetFamilies()[0] - primaryIndexKey = keys.MakeFamilyKey(primaryIndexKey, uint32(family.ID)) - - // Create an empty sentinel value. - var value roachpb.Value - value.SetTuple([]byte(nil)) - - if err := kvDB.Put(context.Background(), primaryIndexKey, &value); err != nil { - t.Fatalf("unexpected error: %s", err) - } - - // Run SCRUB and find the errors we created. - rows, err := db.Query(`EXPERIMENTAL SCRUB TABLE t.test WITH OPTIONS PHYSICAL`) - if err != nil { - t.Fatalf("unexpected error: %s", err) - } - defer rows.Close() - results, err := sqlutils.GetScrubResultRows(rows) - if err != nil { - t.Fatalf("unexpected error: %s", err) - } else if len(results) != 1 { - t.Fatalf("expected 1 result, got %d. got %#v", len(results), results) - } - - if result := results[0]; result.ErrorType != string(scrub.UnexpectedNullValueError) { - t.Fatalf("expected %q error, instead got: %s", - scrub.UnexpectedNullValueError, result.ErrorType) - } else if result.Database != "t" { - t.Fatalf("expected database %q, got %q", "t", result.Database) - } else if result.Table != "test" { - t.Fatalf("expected table %q, got %q", "test", result.Table) - } else if result.PrimaryKey != "(217)" { - t.Fatalf("expected primaryKey %q, got %q", "(217)", result.PrimaryKey) - } else if result.Repaired { - t.Fatalf("expected repaired %v, got %v", false, result.Repaired) - } else if !strings.Contains(result.Details, `"k": "217"`) { - t.Fatalf("expected error details to contain `%s`, got %s", `"k": "217"`, result.Details) - } else if !strings.Contains(result.Details, `"v": ""`) { - t.Fatalf("expected error details to contain `%s`, got %s", `"v": ""`, result.Details) - } -} - -// TestScrubPhysicalNonnullableNullInMulticolumnFamily tests that -// `SCRUB TABLE ... WITH OPTIONS PHYSICAL` will find any rows where a -// value is NULL for a column that is not-nullable and is not the only -// column in a family. To test this, a row is created that we later -// overwrite the value for. The value that is inserted is missing one of -// the columns that belongs in the family. -func TestScrubPhysicalNonnullableNullInMulticolumnFamily(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) - defer s.Stopper().Stop(context.Background()) - - // Create the table and the row entry. - if _, err := db.Exec(` -CREATE DATABASE t; -CREATE TABLE t.test (k INT PRIMARY KEY, v INT NOT NULL, b INT NOT NULL, FAMILY (k), FAMILY (v, b)); -INSERT INTO t.test VALUES (217, 314, 1337); -`); err != nil { - t.Fatalf("unexpected error: %s", err) - } - - tableDesc := catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "t", "test") - - // Construct datums for our row values (k, v, b). - values := []tree.Datum{tree.NewDInt(217), tree.NewDInt(314), tree.NewDInt(1337)} - - var colIDtoRowIndex catalog.TableColMap - colIDtoRowIndex.Set(tableDesc.PublicColumns()[0].GetID(), 0) - colIDtoRowIndex.Set(tableDesc.PublicColumns()[1].GetID(), 1) - colIDtoRowIndex.Set(tableDesc.PublicColumns()[2].GetID(), 2) - - // Create the primary index key - primaryIndexKeyPrefix := rowenc.MakeIndexKeyPrefix( - keys.SystemSQLCodec, tableDesc.GetID(), tableDesc.GetPrimaryIndexID()) - primaryIndexKey, _, err := rowenc.EncodeIndexKey( - tableDesc, tableDesc.GetPrimaryIndex(), colIDtoRowIndex, values, primaryIndexKeyPrefix) - if err != nil { - t.Fatalf("unexpected error: %s", err) - } - - // Add the family suffix to the key, in particular we care about the - // second column family. - family := tableDesc.GetFamilies()[1] - primaryIndexKey = keys.MakeFamilyKey(primaryIndexKey, uint32(family.ID)) - - // Encode the second column value. - valueBuf, err := valueside.Encode( - []byte(nil), valueside.MakeColumnIDDelta(0, tableDesc.PublicColumns()[1].GetID()), values[1], []byte(nil)) - if err != nil { - t.Fatalf("unexpected error: %s", err) - } - - // Construct the tuple for the family that is missing a column value, i.e. it is NULL. - var value roachpb.Value - value.SetTuple(valueBuf) - - // Overwrite the existing value. - if err := kvDB.Put(context.Background(), primaryIndexKey, &value); err != nil { - t.Fatalf("unexpected error: %s", err) - } - - // Run SCRUB and find the errors we created. - rows, err := db.Query(`EXPERIMENTAL SCRUB TABLE t.test WITH OPTIONS PHYSICAL`) - if err != nil { - t.Fatalf("unexpected error: %s", err) - } - defer rows.Close() - results, err := sqlutils.GetScrubResultRows(rows) - if err != nil { - t.Fatalf("unexpected error: %s", err) - } else if len(results) != 1 { - t.Fatalf("expected 1 result, got %d. got %#v", len(results), results) - } - - if result := results[0]; result.ErrorType != string(scrub.UnexpectedNullValueError) { - t.Fatalf("expected %q error, instead got: %s", - scrub.UnexpectedNullValueError, result.ErrorType) - } else if result.Database != "t" { - t.Fatalf("expected database %q, got %q", "t", result.Database) - } else if result.Table != "test" { - t.Fatalf("expected table %q, got %q", "test", result.Table) - } else if result.PrimaryKey != "(217)" { - t.Fatalf("expected primaryKey %q, got %q", "(217)", result.PrimaryKey) - } else if result.Repaired { - t.Fatalf("expected repaired %v, got %v", false, result.Repaired) - } else if !strings.Contains(result.Details, `"k": "217"`) { - t.Fatalf("expected error details to contain `%s`, got %s", `"k": "217"`, result.Details) - } else if !strings.Contains(result.Details, `"v": "314"`) { - t.Fatalf("expected error details to contain `%s`, got %s", `"v": "314"`, result.Details) - } else if !strings.Contains(result.Details, `"b": ""`) { - t.Fatalf("expected error details to contain `%s`, got %s", `"b": ""`, result.Details) - } -} - -// TestScrubPhysicalUnexpectedFamilyID tests that `SCRUB TABLE ... WITH -// OPTIONS PHYSICAL` will find any rows where a primary index as key -// with an invalid family ID. To test this, a table is made with 2 -// families and then the first family is dropped. A row is then inserted -// using the KV client which has the ID of the first family. -func TestScrubPhysicalUnexpectedFamilyID(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - skip.WithIssue(t, 51797, "currently KV pairs with unexpected family IDs are not noticed by the fetcher") - s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) - defer s.Stopper().Stop(context.Background()) - - // Create the table and the row entry. - if _, err := db.Exec(` -CREATE DATABASE t; -CREATE TABLE t.test ( - k INT PRIMARY KEY, - v1 INT NOT NULL, - v2 INT NOT NULL, - FAMILY first (v1), - FAMILY second (v2) -); -`); err != nil { - t.Fatalf("unexpected error: %s", err) - } - - oldTableDesc := catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "t", "test") - - // Drop the first column family. - if _, err := db.Exec(`ALTER TABLE t.test DROP COLUMN v1`); err != nil { - t.Fatalf("unexpected error: %s", err) - } - - tableDesc := catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "t", "test") - - // Construct datums for our row values (k, v1). - values := []tree.Datum{tree.NewDInt(217), tree.NewDInt(314)} - - var colIDtoRowIndex catalog.TableColMap - colIDtoRowIndex.Set(tableDesc.PublicColumns()[0].GetID(), 0) - colIDtoRowIndex.Set(tableDesc.PublicColumns()[1].GetID(), 1) - - // Create the primary index key - primaryIndexKeyPrefix := rowenc.MakeIndexKeyPrefix( - keys.SystemSQLCodec, tableDesc.GetID(), tableDesc.GetPrimaryIndexID()) - primaryIndexKey, _, err := rowenc.EncodeIndexKey( - tableDesc, tableDesc.GetPrimaryIndex(), colIDtoRowIndex, values, primaryIndexKeyPrefix) - if err != nil { - t.Fatalf("unexpected error: %s", err) - } - - // Add the correct family suffix to the key. - primaryIndexKeyWithFamily := keys.MakeFamilyKey(primaryIndexKey, uint32(tableDesc.GetFamilies()[1].ID)) - - // Encode the second column value. - valueBuf, err := valueside.Encode( - []byte(nil), valueside.MakeColumnIDDelta(0, tableDesc.PublicColumns()[1].GetID()), values[1], []byte(nil)) - if err != nil { - t.Fatalf("unexpected error: %s", err) - } - var value roachpb.Value - value.SetTuple(valueBuf) - - // Insert the value. - if err := kvDB.Put(context.Background(), primaryIndexKeyWithFamily, &value); err != nil { - t.Fatalf("unexpected error: %s", err) - } - - // Create a k/v with an incorrect family suffix to the key. - primaryIndexKeyWithFamily = keys.MakeFamilyKey(primaryIndexKey, - uint32(oldTableDesc.GetFamilies()[1].ID)) - - // Encode the second column value. - valueBuf, err = valueside.Encode( - []byte(nil), valueside.MakeColumnIDDelta(0, tableDesc.PublicColumns()[1].GetID()), values[1], []byte(nil)) - if err != nil { - t.Fatalf("unexpected error: %s", err) - } - value = roachpb.Value{} - value.SetTuple(valueBuf) - - // Insert the incorrect family k/v. - if err := kvDB.Put(context.Background(), primaryIndexKeyWithFamily, &value); err != nil { - t.Fatalf("unexpected error: %s", err) - } - - // Run SCRUB and find the errors we created. - rows, err := db.Query(`EXPERIMENTAL SCRUB TABLE t.test WITH OPTIONS PHYSICAL`) - if err != nil { - t.Fatalf("unexpected error: %s", err) - } - defer rows.Close() - results, err := sqlutils.GetScrubResultRows(rows) - if err != nil { - t.Fatalf("unexpected error: %s", err) - } else if len(results) != 1 { - t.Fatalf("expected 1 result, got %d. got %#v", len(results), results) - } - - if result := results[0]; result.ErrorType != string(scrub.UnexpectedNullValueError) { - t.Fatalf("expected %q error, instead got: %s", - scrub.UnexpectedNullValueError, result.ErrorType) - } else if result.Database != "t" { - t.Fatalf("expected database %q, got %q", "t", result.Database) - } else if result.Table != "test" { - t.Fatalf("expected table %q, got %q", "test", result.Table) - } else if result.PrimaryKey != "(217)" { - t.Fatalf("expected primaryKey %q, got %q", "(217)", result.PrimaryKey) - } else if result.Repaired { - t.Fatalf("expected repaired %v, got %v", false, result.Repaired) - } else if !strings.Contains(result.Details, `"k": "217"`) { - t.Fatalf("expected error details to contain `%s`, got %s", `"k": "217"`, result.Details) - } else if !strings.Contains(result.Details, `"v": "314"`) { - t.Fatalf("expected error details to contain `%s`, got %s", `"v": "314"`, result.Details) - } else if !strings.Contains(result.Details, `"b": ""`) { - t.Fatalf("expected error details to contain `%s`, got %s", `"b": ""`, result.Details) - } -} - -// TestScrubPhysicalIncorrectPrimaryIndexValueColumn tests that -// `SCRUB TABLE ... WITH OPTIONS PHYSICAL` will find any rows where a -// value has an encoded column ID that does not correspond to the table -// descriptor. To test this, a row is inserted using the KV client. -func TestScrubPhysicalIncorrectPrimaryIndexValueColumn(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - skip.WithIssue(t, 51797, "the test is not failing, as it would be expected") - s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) - defer s.Stopper().Stop(context.Background()) - - // Create the table and the row entry. - if _, err := db.Exec(` -CREATE DATABASE t; -CREATE TABLE t.test (k INT PRIMARY KEY, v1 INT, v2 INT); -`); err != nil { - t.Fatalf("unexpected error: %s", err) - } - tableDesc := catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "t", "test") - - // Construct datums for our row values (k, v1, v2). - values := []tree.Datum{tree.NewDInt(217), tree.NewDInt(314), tree.NewDInt(1337)} - - var colIDtoRowIndex catalog.TableColMap - colIDtoRowIndex.Set(tableDesc.PublicColumns()[0].GetID(), 0) - colIDtoRowIndex.Set(tableDesc.PublicColumns()[1].GetID(), 1) - colIDtoRowIndex.Set(tableDesc.PublicColumns()[2].GetID(), 2) - - // Create the primary index key - primaryIndexKeyPrefix := rowenc.MakeIndexKeyPrefix( - keys.SystemSQLCodec, tableDesc.GetID(), tableDesc.GetPrimaryIndexID()) - primaryIndexKey, _, err := rowenc.EncodeIndexKey( - tableDesc, tableDesc.GetPrimaryIndex(), colIDtoRowIndex, values, primaryIndexKeyPrefix) - if err != nil { - t.Fatalf("unexpected error: %s", err) - } - // Add the default family suffix to the key. - primaryIndexKey = keys.MakeFamilyKey(primaryIndexKey, uint32(tableDesc.GetFamilies()[0].ID)) - - // Encode the second column values. The second column is encoded with - // a garbage colIDDiff. - valueBuf, err := valueside.Encode( - []byte(nil), valueside.MakeColumnIDDelta(0, tableDesc.PublicColumns()[1].GetID()), values[1], []byte(nil)) - if err != nil { - t.Fatalf("unexpected error: %s", err) - } - - valueBuf, err = valueside.Encode(valueBuf, 1000, values[2], []byte(nil)) - if err != nil { - t.Fatalf("unexpected error: %s", err) - } - - // Construct the tuple for the family that is missing a column value, i.e. it is NULL. - var value roachpb.Value - value.SetTuple(valueBuf) - - // Overwrite the existing value. - if err := kvDB.Put(context.Background(), primaryIndexKey, &value); err != nil { - t.Fatalf("unexpected error: %s", err) - } - - // Run SCRUB and find the errors we created. - rows, err := db.Query(`EXPERIMENTAL SCRUB TABLE t.test WITH OPTIONS PHYSICAL`) - if err != nil { - t.Fatalf("unexpected error: %s", err) - } - defer rows.Close() - - results, err := sqlutils.GetScrubResultRows(rows) - if err != nil { - t.Fatalf("unexpected error: %s", err) - } else if len(results) != 1 { - t.Fatalf("expected 1 result, got %d. got %#v", len(results), results) - } - - if result := results[0]; result.ErrorType != string(scrub.UnexpectedNullValueError) { - t.Fatalf("expected %q error, instead got: %s", - scrub.UnexpectedNullValueError, result.ErrorType) - } else if result.Database != "t" { - t.Fatalf("expected database %q, got %q", "t", result.Database) - } else if result.Table != "test" { - t.Fatalf("expected table %q, got %q", "test", result.Table) - } else if result.PrimaryKey != "(217)" { - t.Fatalf("expected primaryKey %q, got %q", "(217)", result.PrimaryKey) - } else if result.Repaired { - t.Fatalf("expected repaired %v, got %v", false, result.Repaired) - } else if !strings.Contains(result.Details, `"k": "217"`) { - t.Fatalf("expected error details to contain `%s`, got %s", `"k": "217"`, result.Details) - } else if !strings.Contains(result.Details, `"v": "314"`) { - t.Fatalf("expected error details to contain `%s`, got %s", `"v": "314"`, result.Details) - } else if !strings.Contains(result.Details, `"b": ""`) { - t.Fatalf("expected error details to contain `%s`, got %s", `"b": ""`, result.Details) - } -} - type expectedScrubResult struct { ErrorType string Database string