From a7497b886636eb72857f88b5392160cfaaa2fe21 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 19 Jan 2021 16:55:52 -0800 Subject: [PATCH 1/3] rowexec: minor joinreader cleanup Release note: None --- pkg/sql/rowexec/joinreader.go | 68 ++++++++++++++++++----------------- 1 file changed, 35 insertions(+), 33 deletions(-) diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index 99312d665404..3aaef5c05b52 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -90,8 +90,7 @@ type joinReader struct { shouldLimitBatches bool readerType joinReaderType - input execinfra.RowSource - inputTypes []*types.T + input execinfra.RowSource // Column indexes in the input stream specifying the columns which match with // the index columns. These are the equality columns of the join. lookupCols []uint32 @@ -171,6 +170,9 @@ func newJoinReader( if spec.Type != descpb.InnerJoin { return nil, errors.AssertionFailedf("only inner index joins are supported, %s requested", spec.Type) } + if !spec.OnExpr.Empty() { + return nil, errors.AssertionFailedf("non-empty ON expressions are not supported for index joins") + } } var lookupCols []uint32 @@ -190,9 +192,13 @@ func newJoinReader( desc: tabledesc.MakeImmutable(spec.Table), maintainOrdering: spec.MaintainOrdering, input: input, - inputTypes: input.OutputTypes(), lookupCols: lookupCols, outputGroupContinuationForLeftRow: spec.OutputGroupContinuationForLeftRow, + // If the lookup columns form a key, there is only one result per + // lookup, so the fetcher should parallelize the key lookups it + // performs. + shouldLimitBatches: !spec.LookupColumnsAreKey && readerType == lookupJoinReaderType, + readerType: readerType, } if readerType != indexJoinReaderType { jr.groupingState = &inputBatchGroupingState{doGrouping: spec.LeftJoinWithPairedJoiner} @@ -213,19 +219,14 @@ func newJoinReader( indexCols[i] = uint32(columnID) } - // If the lookup columns form a key, there is only one result per lookup, so the fetcher - // should parallelize the key lookups it performs. - jr.shouldLimitBatches = !spec.LookupColumnsAreKey && readerType == lookupJoinReaderType - jr.readerType = readerType - // Add all requested system columns to the output. var sysColDescs []descpb.ColumnDescriptor if spec.HasSystemColumns { sysColDescs = colinfo.AllSystemColumnDescs - } - for i := range sysColDescs { - columnTypes = append(columnTypes, sysColDescs[i].Type) - jr.colIdxMap.Set(sysColDescs[i].ID, jr.colIdxMap.Len()) + for i := range sysColDescs { + columnTypes = append(columnTypes, sysColDescs[i].Type) + jr.colIdxMap.Set(sysColDescs[i].ID, jr.colIdxMap.Len()) + } } var leftTypes []*types.T @@ -276,26 +277,18 @@ func newJoinReader( collectingStats = true } - neededRightCols := jr.neededRightCols() - set, err := getIndexColSet(jr.index, jr.colIdxMap) - if err != nil { - return nil, err - } - if isSecondary && !neededRightCols.SubsetOf(set) { - return nil, errors.Errorf("joinreader index does not cover all columns") + rightCols := jr.neededRightCols() + if isSecondary { + set, err := getIndexColSet(jr.index, jr.colIdxMap) + if err != nil { + return nil, err + } + if !rightCols.SubsetOf(set) { + return nil, errors.Errorf("joinreader index does not cover all columns") + } } var fetcher row.Fetcher - var rightCols util.FastIntSet - switch readerType { - case indexJoinReaderType: - rightCols = jr.Out.NeededColumns() - case lookupJoinReaderType: - rightCols = neededRightCols - default: - return nil, errors.Errorf("unsupported joinReaderType") - } - _, _, err = initRowFetcher( flowCtx, &fetcher, &jr.desc, int(spec.IndexIdx), jr.colIdxMap, false, /* reverse */ rightCols, false /* isCheck */, jr.EvalCtx.Mon, &jr.alloc, spec.Visibility, spec.LockingStrength, @@ -420,22 +413,31 @@ func (jr *joinReader) Spilled() bool { func (jr *joinReader) neededRightCols() util.FastIntSet { neededCols := jr.Out.NeededColumns() + if jr.readerType == indexJoinReaderType { + // For index joins, all columns from the left side are not output, so no + // shift is needed. Also, onCond is always empty for index joins, so + // there is no need to iterate over it either. + return neededCols + } + // Get the columns from the right side of the join and shift them over by // the size of the left side so the right side starts at 0. + numInputTypes := len(jr.input.OutputTypes()) neededRightCols := util.MakeFastIntSet() var lastCol int - for i, ok := neededCols.Next(len(jr.inputTypes)); ok; i, ok = neededCols.Next(i + 1) { - lastCol = i - len(jr.inputTypes) + for i, ok := neededCols.Next(numInputTypes); ok; i, ok = neededCols.Next(i + 1) { + lastCol = i - numInputTypes neededRightCols.Add(lastCol) } if jr.outputGroupContinuationForLeftRow { - // The lastCol is the bool continuation column and not a right column. + // The lastCol is the bool continuation column and not a right + // column. neededRightCols.Remove(lastCol) } // Add columns needed by OnExpr. for _, v := range jr.onCond.Vars.GetIndexedVars() { - rightIdx := v.Idx - len(jr.inputTypes) + rightIdx := v.Idx - numInputTypes if rightIdx >= 0 { neededRightCols.Add(rightIdx) } From e7d9deb2b7f3801bd978691ba0ea49edfefed3f5 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 21 Jan 2021 18:56:52 -0800 Subject: [PATCH 2/3] build: add crdb_test_off tag temporarily to sqllite logic tests SQLLite logic tests have been failing since we introduced the randomizations of the batch sizes (with a timeout). It is not immediately clear what exactly needs to be adjusted, so let's disable the randomizations for now (so that we at least get some value out of the tests). Release note: None --- build/teamcity-sqllogictest.sh | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/build/teamcity-sqllogictest.sh b/build/teamcity-sqllogictest.sh index 699c8c49fe1a..fe43872297e1 100755 --- a/build/teamcity-sqllogictest.sh +++ b/build/teamcity-sqllogictest.sh @@ -14,11 +14,13 @@ export BUILDER_HIDE_GOPATH_SRC=0 # Run SqlLite tests. # Need to specify the flex-types flag in order to skip past variations that have # numeric typing differences. +# TODO(yuzefovich): remove crdb_test_off tag once sqllite tests have been +# adjusted to run in reasonable time with batch size randomizations. run_json_test build/builder.sh \ stdbuf -oL -eL \ - make test GOTESTFLAGS=-json TESTFLAGS="-v -bigtest -flex-types" TESTTIMEOUT='24h' PKG='./pkg/sql/logictest' TESTS='^TestSqlLiteLogic$$' + make test GOTESTFLAGS=-json TESTFLAGS="-v -bigtest -flex-types" TESTTIMEOUT='24h' PKG='./pkg/sql/logictest' TESTS='^TestSqlLiteLogic$$' TAGS=crdb_test_off # Run the tests with a multitenant configuration. run_json_test build/builder.sh \ stdbuf -oL -eL \ - make test GOTESTFLAGS=-json TESTFLAGS="-v -bigtest -flex-types" TESTTIMEOUT='24h' PKG='./pkg/ccl/logictestccl' TESTS='^TestTenantSQLLiteLogic$$' + make test GOTESTFLAGS=-json TESTFLAGS="-v -bigtest -flex-types" TESTTIMEOUT='24h' PKG='./pkg/ccl/logictestccl' TESTS='^TestTenantSQLLiteLogic$$' TAGS=crdb_test_off From 3be4050b48f1e1974fdcb2a7a094cf31d1a476ff Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 25 Jan 2021 15:27:31 +0100 Subject: [PATCH 3/3] kv/kvclient/kvcoord: skip TestNoDuplicateHeartbeatLoops Refs: #59373 Reason: Needs rewrite - uses tracing in illegal manner Generated by bin/skip-test. Release justification: non-production code changes Release note: None --- pkg/kv/kvclient/kvcoord/txn_coord_sender_server_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_server_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_server_test.go index 115c0608e25f..a310efb3a51a 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_server_test.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/tracing" @@ -142,6 +143,7 @@ func TestHeartbeatFindsOutAboutAbortedTransaction(t *testing.T) { // times a heartbeat loop was started. func TestNoDuplicateHeartbeatLoops(t *testing.T) { defer leaktest.AfterTest(t)() + skip.WithIssue(t, 59373, "Needs rewrite - uses tracing in illegal manner") defer log.Scope(t).Close(t) s, _, db := serverutils.StartServer(t, base.TestServerArgs{})