Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
50922: importccl: support `unique_rowid()` as default expression for IMPORT INTO r=Anzoteh96 a=Anzoteh96

The PR #50295 supports non-targeted columns with constant expression. This PR is a follow up to that in adding support to `unique_rowid()`.

Previously, the only support given to `rowid` as a default expression is for hidden column, which is a function of timestamp, row number, and source ID (the ID of processor). To accommodate for more usage of `unique_rowid()`, this PR modifies the `unique_rowid` function by making `unique_rowid` as a function of: 
1. timestamp; 
2. row number; 
3. source ID; 
4. the total occurrences of `unique_rowid` in the table schema; 
5. instances of each `unique_rowid` within each row. 

In addition, this PR also modifies the visitor method #51390 by adding override methods for volatile methods like `unique_rowid`. Annotations containing the total occurrences of `unique_rowid` and `unique_rowid` instances within a row are stored inside `evalCtx`, which will be read and updated when visitor walks through the default expression at the sanitization stage, and when default expression is evaluated at each row. 

Partially addresses #48253 

Release note (general change): IMPORT INTO now supports `unique_rowid()` as a default expression.

51518: rowflow,colexec: make routers propagate errors to all non-closed outputs r=yuzefovich a=yuzefovich

This commit changes the way we propagate the errors in the hash router
so that the error metadata is sent on all non-closed streams.
Previously, we would be sending it over only the first non-closed stream
which could result in the processors on the same stage as that single
stream end to treat the absence of rows and errors as the input being
exhausted successfully, which is wrong because the input did encounter
an error.

The same thing has been happening in the vectorized flow, but in that
case the problem is less severe - the issue will present itself only
when we have wrapped processors (because the materializers will prevent
the propagation throughout the whole flow as described below):
In the vectorized engine we use panic-catch mechanism of error
propagation, and we end up with the following sequence of events:
1. an operator encounters an error on any node (e.g. `colBatchScan`
encounters RWUI error on a remote node). It is not an internal vectorized
error, so the operator will panic with `colexecerror.ExpectedError`.
2. the panic is caught by one of the catchers (it can be a parallel
unordered synchronizer goroutine, an outbox goroutine, a materializer,
a hash router)
3. that component will then decide how to propagate the error further:
3.1 if it is a parallel unordered synchronizer, then it will cancel all
of its inputs and will repanic
3.2 if it is an outbox, the error is sent as metadata which will be
received by an inbox which will panic with it
3.3. if it is a materializer, then it might swallow the error (this is
the reason we need for the vectorized hash router to send the error to
all of its inputs). The swallowing is acceptable if it is the root
materializer though.
3.4 if it is a hash router, it'll cancel all of its outputs and will
forward the error on each of the outputs.

Fixes: #51458.

Release note (bug fix): Previously, CockroachDB could return incorrect
results on query that encountered ReadWithinUncertaintyInterval error,
and this has been fixed.

52016: colexec: re-enable short-circuiting in the hash joiner r=yuzefovich a=yuzefovich

This commit re-enables short-circuiting logic in the hash joiner when
the build side is empty (it was temporarily disabled because of #48785
which has been fixed).

Fixes: #49631.

Release note: None

52027: sql: skip TestQueryProgress r=yuzefovich a=yuzefovich

This test started failing more often, so we'll skip it temporarily until
we figure it out.

Addresses: #51356.

Release note: None

Co-authored-by: anzoteh96 <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
3 people committed Jul 28, 2020
5 parents 3c35836 + 0eaed19 + 88252df + 60de8f0 + 94ac60f commit 4de665c
Show file tree
Hide file tree
Showing 11 changed files with 461 additions and 213 deletions.
85 changes: 79 additions & 6 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2946,6 +2946,10 @@ func BenchmarkCSVConvertRecord(b *testing.B) {
b.ReportAllocs()
}

func selectNotNull(col string) string {
return fmt.Sprintf(`SELECT %s FROM t WHERE %s IS NOT NULL`, col, col)
}

// Test that IMPORT INTO works when columns with default expressions are present.
// The default expressions supported by IMPORT INTO are constant expressions,
// which are literals and functions that always return the same value given the
Expand All @@ -2958,6 +2962,10 @@ func TestImportDefault(t *testing.T) {
defer log.Scope(t).Close(t)

const nodes = 3
numFiles := nodes + 2
rowsPerFile := 1000
rowsPerRaceFile := 16
testFiles := makeCSVData(t, numFiles, rowsPerFile, nodes, rowsPerRaceFile)

ctx := context.Background()
baseDir := filepath.Join("testdata", "csv")
Expand Down Expand Up @@ -3231,6 +3239,64 @@ func TestImportDefault(t *testing.T) {
})
}
})
t.Run("unique_rowid", func(t *testing.T) {
const M = int(1e9 + 7) // Remainder for unique_rowid addition.
testCases := []struct {
name string
create string
targetCols []string
insert string
rowIDCols []string
}{
{
name: "multiple_unique_rowid",
create: "a INT DEFAULT unique_rowid(), b INT, c STRING, d INT DEFAULT unique_rowid()",
targetCols: []string{"b", "c"},
insert: "INSERT INTO t (b, c) VALUES (3, 'CAT'), (4, 'DOG')",
rowIDCols: []string{selectNotNull("a"), selectNotNull("d")},
},
{
name: "unique_rowid_with_pk",
create: "a INT DEFAULT unique_rowid(), b INT PRIMARY KEY, c STRING",
targetCols: []string{"b", "c"},
insert: "INSERT INTO t (b, c) VALUES (-3, 'CAT'), (-4, 'DOG')",
rowIDCols: []string{selectNotNull("a")},
},
{
// unique_rowid()+unique_rowid() won't work as the rowid produced by import
// has its leftmost bit set to 1, and adding them causes overflow. A way to
// get around is to have each unique_rowid() modulo a number, M. Here M = 1e9+7
// is used here given that it's big enough and is a prime, which is
// generally effective in avoiding collisions.
name: "rowid+rowid",
create: fmt.Sprintf(
`a INT DEFAULT (unique_rowid() %% %d) + (unique_rowid() %% %d), b INT PRIMARY KEY, c STRING`, M, M),
targetCols: []string{"b", "c"},
rowIDCols: []string{selectNotNull("a")},
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
defer sqlDB.Exec(t, `DROP TABLE t`)
sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE t(%s)`, test.create))
if test.insert != "" {
sqlDB.Exec(t, test.insert)
}
sqlDB.Exec(t, fmt.Sprintf(`IMPORT INTO t (%s) CSV DATA (%s)`,
strings.Join(test.targetCols, ", "),
strings.Join(testFiles.files, ", ")))
var numDistinctRows int
sqlDB.QueryRow(t,
fmt.Sprintf(`SELECT DISTINCT COUNT (*) FROM (%s)`,
strings.Join(test.rowIDCols, " UNION ")),
).Scan(&numDistinctRows)
var numRows int
sqlDB.QueryRow(t, `SELECT COUNT (*) FROM t`).Scan(&numRows)
require.Equal(t, numDistinctRows, len(test.rowIDCols)*numRows)
})

}
})
}

// goos: darwin
Expand Down Expand Up @@ -4369,14 +4435,21 @@ func TestImportPgDumpGeo(t *testing.T) {

// Verify both created tables are identical.
importCreate := sqlDB.QueryStr(t, "SELECT create_statement FROM [SHOW CREATE importdb.nyc_census_blocks]")
// Families are slightly different due to the geom column being last
// in exec and rowid being last in import, so swap that in import to
// match exec.
importCreate[0][0] = strings.Replace(importCreate[0][0], "geom, rowid", "rowid, geom", 1)
// Families are slightly different due to rowid showing up in exec but
// not import (possibly due to the ALTER TABLE statement that makes
// gid a primary key), so add that into import to match exec.
importCreate[0][0] = strings.Replace(importCreate[0][0], "boroname, geom", "boroname, rowid, geom", 1)
sqlDB.CheckQueryResults(t, "SELECT create_statement FROM [SHOW CREATE execdb.nyc_census_blocks]", importCreate)

importSelect := sqlDB.QueryStr(t, "SELECT * FROM importdb.nyc_census_blocks ORDER BY PRIMARY KEY importdb.nyc_census_blocks")
sqlDB.CheckQueryResults(t, "SELECT * FROM execdb.nyc_census_blocks ORDER BY PRIMARY KEY execdb.nyc_census_blocks", importSelect)
importCols := "blkid, popn_total, popn_white, popn_black, popn_nativ, popn_asian, popn_other, boroname"
importSelect := sqlDB.QueryStr(t, fmt.Sprintf(
"SELECT (%s) FROM importdb.nyc_census_blocks ORDER BY PRIMARY KEY importdb.nyc_census_blocks",
importCols,
))
sqlDB.CheckQueryResults(t, fmt.Sprintf(
"SELECT (%s) FROM execdb.nyc_census_blocks ORDER BY PRIMARY KEY execdb.nyc_census_blocks",
importCols,
), importSelect)
}

func TestImportCockroachDump(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/read_import_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ func makeDatumConverter(
ctx context.Context, importCtx *parallelImportContext, fileCtx *importFileContext,
) (*row.DatumRowConverter, error) {
conv, err := row.NewDatumRowConverter(
ctx, importCtx.tableDesc, importCtx.targetCols, importCtx.evalCtx.Copy(), importCtx.kvCh)
ctx, importCtx.tableDesc, importCtx.targetCols, importCtx.evalCtx, importCtx.kvCh)
if err == nil {
conv.KvBatch.Source = fileCtx.source
}
Expand Down
8 changes: 2 additions & 6 deletions pkg/ccl/importccl/testdata/pgdump/geo.sql
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
-- The two comments below removing gid are there because IMPORT doesn't
-- support DEFAULT functions (#48253). This function is otherwise exactly
-- what shp2pgsql produces.

SET CLIENT_ENCODING TO UTF8;
SET STANDARD_CONFORMING_STRINGS TO ON;
BEGIN;
CREATE TABLE "nyc_census_blocks" (--gid serial,
CREATE TABLE "nyc_census_blocks" (gid serial,
"blkid" varchar(15),
"popn_total" float8,
"popn_white" float8,
Expand All @@ -14,7 +10,7 @@ CREATE TABLE "nyc_census_blocks" (--gid serial,
"popn_asian" float8,
"popn_other" float8,
"boroname" varchar(32));
--ALTER TABLE "nyc_census_blocks" ADD PRIMARY KEY (gid);
ALTER TABLE "nyc_census_blocks" ADD PRIMARY KEY (gid);
SELECT AddGeometryColumn('','nyc_census_blocks','geom','26918','MULTIPOLYGON',2);
INSERT INTO "nyc_census_blocks" ("blkid","popn_total","popn_white","popn_black","popn_nativ","popn_asian","popn_other","boroname",geom) VALUES ('360850009001000','97','51','32','1','5','8','Staten Island','010600002026690000010000000103000000010000000A00000051AC161881A22141A31409CF1F2A51415F4321458DA2214100102A3F1D2A51418C34807C0BA221414E3E89F5122A5141782D605495A12141780D1CE92A2A51410D1C9C6770A121410F2D6074322A5141441560E0B0A02141A00099C72F2A51412365B4789AA021419F60A7BB342A514160E3E8FA66A0214118B4C0CE402A5141EA4BF3EEC7A12141A3023D61452A514151AC161881A22141A31409CF1F2A5141');
INSERT INTO "nyc_census_blocks" ("blkid","popn_total","popn_white","popn_black","popn_nativ","popn_asian","popn_other","boroname",geom) VALUES ('360850020011000','66','52','2','0','7','5','Staten Island','0106000020266900000100000001030000000100000007000000083B4A6F79A8214127EC57B49926514151B51BB7CEA72141B2EAD6F38A2651416F429640B9A72141449FCB1C89265141163AA64D56A72141B89E2B7C9B26514150509213EDA72141DCC9A351A826514184FA4C6017A82141B9AE24F0AB265141083B4A6F79A8214127EC57B499265141');
Expand Down
6 changes: 1 addition & 5 deletions pkg/sql/colexec/hashjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,7 @@ func (hj *hashJoiner) Next(ctx context.Context) coldata.Batch {
hj.spec.joinType == sqlbase.RightOuterJoin ||
hj.spec.joinType == sqlbase.LeftSemiJoin ||
hj.spec.joinType == sqlbase.IntersectAllJoin {
// The short-circuiting behavior is temporarily disabled
// because it causes flakiness of some tests due to #48785
// (concurrent calls to DrainMeta and Next).
// TODO(asubiotto): remove this once the issue is resolved.
// hj.state = hjDone
hj.state = hjDone
continue
}
}
Expand Down
12 changes: 4 additions & 8 deletions pkg/sql/colexec/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,10 +634,10 @@ func newHashRouterWithOutputs(
return r
}

// cancelOutputs cancels all outputs and forwards the given error to one output
// if non-nil. The only case where the error is not forwarded if no output could
// be canceled due to an error. In this case each output will forward the error
// returned during cancellation.
// cancelOutputs cancels all outputs and forwards the given error to all of
// them if non-nil. The only case where the error is not forwarded is if no
// output could be canceled due to an error. In this case each output will
// forward the error returned during cancellation.
func (r *HashRouter) cancelOutputs(ctx context.Context, errToForward error) {
for _, o := range r.outputs {
if err := colexecerror.CatchVectorizedRuntimeError(func() {
Expand All @@ -646,10 +646,6 @@ func (r *HashRouter) cancelOutputs(ctx context.Context, errToForward error) {
// If there was an error canceling this output, this error can be
// forwarded to whoever is calling Next.
o.forwardErr(err)
} else {
// Successful cancellation, which means errToForward was also consumed.
// Set it to nil to not forward it to another output.
errToForward = nil
}
}
}
Expand Down
21 changes: 4 additions & 17 deletions pkg/sql/colexec/routers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1097,29 +1097,16 @@ func TestHashRouterRandom(t *testing.T) {
require.NoError(t, resultsByOp[i].err)
}
}
requireOneError := func(t *testing.T, err error) {
requireErrFromEachOutput := func(t *testing.T, err error) {
t.Helper()
if err == nil {
t.Fatal("use requireNoErrors instead")
}
for i := range resultsByOp {
if err == nil {
// A match was already found. Since we only expect one error, this
// error must be nil.
require.Nil(t, resultsByOp[i].err, "expected error to be nil")
continue
}
if resultsByOp[i].err == nil {
// This result has no error but we have not yet found the expected
// error, continue to another result.
continue
t.Fatalf("unexpectedly no error from %d output", i)
}
require.True(t, testutils.IsError(resultsByOp[i].err, err.Error()), "unexpected error %v", resultsByOp[i].err)
err = nil
}
if err != nil {
// err is set to nil when a match is found.
t.Fatal("no matching error found")
}
}

Expand Down Expand Up @@ -1156,10 +1143,10 @@ func TestHashRouterRandom(t *testing.T) {
}
}
case hashRouterContextCanceled:
requireOneError(t, context.Canceled)
requireErrFromEachOutput(t, context.Canceled)
checkMetadata(t, []string{hashRouterMetadataMsg})
case hashRouterOutputErrorOnAddBatch:
requireOneError(t, errors.New(addBatchErrMsg))
requireErrFromEachOutput(t, errors.New(addBatchErrMsg))
checkMetadata(t, []string{hashRouterMetadataMsg})
case hashRouterOutputErrorOnNext:
// If an error is encountered in Next, it is returned to the caller,
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/conn_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -495,6 +496,8 @@ func TestQueryProgress(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.WithIssue(t, 51356)

const rows, kvBatchSize = 1000, 50

defer rowexec.TestingSetScannedRowProgressFrequency(rows / 60)()
Expand Down
Loading

0 comments on commit 4de665c

Please sign in to comment.