Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
59199: rowexec: minor joinreader cleanup r=yuzefovich a=yuzefovich

Release note: None

59277: build: add crdb_test_off tag temporarily to sqllite logic tests r=yuzefovich a=yuzefovich

SQLLite logic tests have been failing since we introduced the
randomizations of the batch sizes (with a timeout). It is not
immediately clear what exactly needs to be adjusted, so let's disable
the randomizations for now (so that we at least get some value out of
the tests).

Informs: #58089.

Release note: None

59374: kv/kvclient/kvcoord: skip TestNoDuplicateHeartbeatLoops r=nvanbenschoten a=tbg

Refs: #59373

Reason: Needs rewrite - uses tracing in illegal manner

Generated by bin/skip-test.

Release justification: non-production code changes

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Tobias Grieger <[email protected]>
  • Loading branch information
3 people committed Jan 25, 2021
4 parents b51d9f1 + a7497b8 + e7d9deb + 3be4050 commit fad50c1
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 35 deletions.
6 changes: 4 additions & 2 deletions build/teamcity-sqllogictest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ export BUILDER_HIDE_GOPATH_SRC=0
# Run SqlLite tests.
# Need to specify the flex-types flag in order to skip past variations that have
# numeric typing differences.
# TODO(yuzefovich): remove crdb_test_off tag once sqllite tests have been
# adjusted to run in reasonable time with batch size randomizations.
run_json_test build/builder.sh \
stdbuf -oL -eL \
make test GOTESTFLAGS=-json TESTFLAGS="-v -bigtest -flex-types" TESTTIMEOUT='24h' PKG='./pkg/sql/logictest' TESTS='^TestSqlLiteLogic$$'
make test GOTESTFLAGS=-json TESTFLAGS="-v -bigtest -flex-types" TESTTIMEOUT='24h' PKG='./pkg/sql/logictest' TESTS='^TestSqlLiteLogic$$' TAGS=crdb_test_off

# Run the tests with a multitenant configuration.
run_json_test build/builder.sh \
stdbuf -oL -eL \
make test GOTESTFLAGS=-json TESTFLAGS="-v -bigtest -flex-types" TESTTIMEOUT='24h' PKG='./pkg/ccl/logictestccl' TESTS='^TestTenantSQLLiteLogic$$'
make test GOTESTFLAGS=-json TESTFLAGS="-v -bigtest -flex-types" TESTTIMEOUT='24h' PKG='./pkg/ccl/logictestccl' TESTS='^TestTenantSQLLiteLogic$$' TAGS=crdb_test_off
2 changes: 2 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand Down Expand Up @@ -142,6 +143,7 @@ func TestHeartbeatFindsOutAboutAbortedTransaction(t *testing.T) {
// times a heartbeat loop was started.
func TestNoDuplicateHeartbeatLoops(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.WithIssue(t, 59373, "Needs rewrite - uses tracing in illegal manner")
defer log.Scope(t).Close(t)

s, _, db := serverutils.StartServer(t, base.TestServerArgs{})
Expand Down
68 changes: 35 additions & 33 deletions pkg/sql/rowexec/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ type joinReader struct {
shouldLimitBatches bool
readerType joinReaderType

input execinfra.RowSource
inputTypes []*types.T
input execinfra.RowSource
// Column indexes in the input stream specifying the columns which match with
// the index columns. These are the equality columns of the join.
lookupCols []uint32
Expand Down Expand Up @@ -171,6 +170,9 @@ func newJoinReader(
if spec.Type != descpb.InnerJoin {
return nil, errors.AssertionFailedf("only inner index joins are supported, %s requested", spec.Type)
}
if !spec.OnExpr.Empty() {
return nil, errors.AssertionFailedf("non-empty ON expressions are not supported for index joins")
}
}

var lookupCols []uint32
Expand All @@ -190,9 +192,13 @@ func newJoinReader(
desc: tabledesc.MakeImmutable(spec.Table),
maintainOrdering: spec.MaintainOrdering,
input: input,
inputTypes: input.OutputTypes(),
lookupCols: lookupCols,
outputGroupContinuationForLeftRow: spec.OutputGroupContinuationForLeftRow,
// If the lookup columns form a key, there is only one result per
// lookup, so the fetcher should parallelize the key lookups it
// performs.
shouldLimitBatches: !spec.LookupColumnsAreKey && readerType == lookupJoinReaderType,
readerType: readerType,
}
if readerType != indexJoinReaderType {
jr.groupingState = &inputBatchGroupingState{doGrouping: spec.LeftJoinWithPairedJoiner}
Expand All @@ -213,19 +219,14 @@ func newJoinReader(
indexCols[i] = uint32(columnID)
}

// If the lookup columns form a key, there is only one result per lookup, so the fetcher
// should parallelize the key lookups it performs.
jr.shouldLimitBatches = !spec.LookupColumnsAreKey && readerType == lookupJoinReaderType
jr.readerType = readerType

// Add all requested system columns to the output.
var sysColDescs []descpb.ColumnDescriptor
if spec.HasSystemColumns {
sysColDescs = colinfo.AllSystemColumnDescs
}
for i := range sysColDescs {
columnTypes = append(columnTypes, sysColDescs[i].Type)
jr.colIdxMap.Set(sysColDescs[i].ID, jr.colIdxMap.Len())
for i := range sysColDescs {
columnTypes = append(columnTypes, sysColDescs[i].Type)
jr.colIdxMap.Set(sysColDescs[i].ID, jr.colIdxMap.Len())
}
}

var leftTypes []*types.T
Expand Down Expand Up @@ -276,26 +277,18 @@ func newJoinReader(
collectingStats = true
}

neededRightCols := jr.neededRightCols()
set, err := getIndexColSet(jr.index, jr.colIdxMap)
if err != nil {
return nil, err
}
if isSecondary && !neededRightCols.SubsetOf(set) {
return nil, errors.Errorf("joinreader index does not cover all columns")
rightCols := jr.neededRightCols()
if isSecondary {
set, err := getIndexColSet(jr.index, jr.colIdxMap)
if err != nil {
return nil, err
}
if !rightCols.SubsetOf(set) {
return nil, errors.Errorf("joinreader index does not cover all columns")
}
}

var fetcher row.Fetcher
var rightCols util.FastIntSet
switch readerType {
case indexJoinReaderType:
rightCols = jr.Out.NeededColumns()
case lookupJoinReaderType:
rightCols = neededRightCols
default:
return nil, errors.Errorf("unsupported joinReaderType")
}

_, _, err = initRowFetcher(
flowCtx, &fetcher, &jr.desc, int(spec.IndexIdx), jr.colIdxMap, false, /* reverse */
rightCols, false /* isCheck */, jr.EvalCtx.Mon, &jr.alloc, spec.Visibility, spec.LockingStrength,
Expand Down Expand Up @@ -420,22 +413,31 @@ func (jr *joinReader) Spilled() bool {
func (jr *joinReader) neededRightCols() util.FastIntSet {
neededCols := jr.Out.NeededColumns()

if jr.readerType == indexJoinReaderType {
// For index joins, all columns from the left side are not output, so no
// shift is needed. Also, onCond is always empty for index joins, so
// there is no need to iterate over it either.
return neededCols
}

// Get the columns from the right side of the join and shift them over by
// the size of the left side so the right side starts at 0.
numInputTypes := len(jr.input.OutputTypes())
neededRightCols := util.MakeFastIntSet()
var lastCol int
for i, ok := neededCols.Next(len(jr.inputTypes)); ok; i, ok = neededCols.Next(i + 1) {
lastCol = i - len(jr.inputTypes)
for i, ok := neededCols.Next(numInputTypes); ok; i, ok = neededCols.Next(i + 1) {
lastCol = i - numInputTypes
neededRightCols.Add(lastCol)
}
if jr.outputGroupContinuationForLeftRow {
// The lastCol is the bool continuation column and not a right column.
// The lastCol is the bool continuation column and not a right
// column.
neededRightCols.Remove(lastCol)
}

// Add columns needed by OnExpr.
for _, v := range jr.onCond.Vars.GetIndexedVars() {
rightIdx := v.Idx - len(jr.inputTypes)
rightIdx := v.Idx - numInputTypes
if rightIdx >= 0 {
neededRightCols.Add(rightIdx)
}
Expand Down

0 comments on commit fad50c1

Please sign in to comment.