Skip to content

Commit

Permalink
Merge #39159 #39386
Browse files Browse the repository at this point in the history
39159: testutils/lint: convert some checkers to go/analysis r=mjibson a=mjibson

Convert float, unconvert, and timer to go/analysis
(https://godoc.org/golang.org/x/tools/go/analysis). This is the new,
official static analysis framework for Go. These were previously
disabled. Although now they work again, they are not yet hooked up. It
appears difficult to run these from inside a test. The Go people
apparently are steering us toward using the CLI instead. Hence, roachlint.

Add roachlint, a program to run these checkers. Not yet hooked up to
TestLint.

See #33669. Although that mentions using honnef.co/go/tools, that tool
now uses go/analysis: https://staticcheck.io/changes/2019.2#go-analysis.

Release note: None

39386: distsqlrun: clean up the materializer r=yuzefovich a=yuzefovich

This commit fixes a bug of not using the context that was updated
during `Flow.startInternal` to run the last processor in the flow.
This removes the necessity of having an additional `ctxCancel`
function in the materializer.

Additionally, `outputToInputColIdx` has been removed from the
materializer since it's always a mapping such that o[i] = i.
I believe it is a remnant of early days of the vectorized engine,
and since then we've been using projection operators to serve
the purpose that was initially envisioned for this mapping.

Fixes: #39384.

Release note: None

Co-authored-by: Matt Jibson <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
3 people committed Aug 7, 2019
3 parents 6b5b633 + e7df44e + a73eb98 commit fe76c37
Show file tree
Hide file tree
Showing 51 changed files with 500 additions and 411 deletions.
10 changes: 8 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1043,7 +1043,7 @@ lint lintshort: TESTTIMEOUT := $(LINTTIMEOUT)
.PHONY: lint
lint: override TAGS += lint
lint: ## Run all style checkers and linters.
lint: bin/returncheck
lint: bin/returncheck bin/roachlint
@if [ -t 1 ]; then echo '$(yellow)NOTE: `make lint` is very slow! Perhaps `make lintshort`?$(term-reset)'; fi
@# Run 'go build -i' to ensure we have compiled object files available for all
@# packages. In Go 1.10, only 'go vet' recompiles on demand. For details:
Expand All @@ -1054,6 +1054,7 @@ lint: bin/returncheck
.PHONY: lintshort
lintshort: override TAGS += lint
lintshort: ## Run a fast subset of the style checkers and linters.
lintshort: bin/roachlint
$(xgo) test ./pkg/testutils/lint -v $(GOFLAGS) -tags '$(TAGS)' -ldflags '$(LINKFLAGS)' -short -timeout $(TESTTIMEOUT) -run 'TestLint/$(TESTS)'

.PHONY: protobuf
Expand Down Expand Up @@ -1575,6 +1576,7 @@ bins = \
bin/publish-provisional-artifacts \
bin/optgen \
bin/returncheck \
bin/roachlint \
bin/roachprod \
bin/roachprod-stress \
bin/roachtest \
Expand Down
26 changes: 26 additions & 0 deletions pkg/cmd/roachlint/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package main

import (
"github.com/cockroachdb/cockroach/pkg/testutils/lint/passes/hash"
"github.com/cockroachdb/cockroach/pkg/testutils/lint/passes/timer"
"github.com/cockroachdb/cockroach/pkg/testutils/lint/passes/unconvert"
"golang.org/x/tools/go/analysis/multichecker"
)

func main() {
multichecker.Main(
hash.Analyzer,
timer.Analyzer,
unconvert.Analyzer,
)
}
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/test_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ func (r *testRunner) runTest(

if teamCity {
shout(ctx, l, stdout, "##teamcity[testFailed name='%s' details='%s' flowId='%s']",
t.Name(), teamCityEscape(string(output)), t.Name(),
t.Name(), teamCityEscape(output), t.Name(),
)

// Copy a snapshot of the testrunner's log to the test's artifacts dir
Expand Down
33 changes: 19 additions & 14 deletions pkg/security/password.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,33 @@ var BcryptCost = bcrypt.DefaultCost
// ErrEmptyPassword indicates that an empty password was attempted to be set.
var ErrEmptyPassword = errors.New("empty passwords are not permitted")

var sha256NewSum = sha256.New().Sum(nil)

// TODO(mjibson): properly apply SHA-256 to the password. The current code
// erroneously appends the SHA-256 of the empty hash to the unhashed password
// instead of actually hashing the password. Fixing this requires a somewhat
// complicated backwards compatibility dance. This is not a security issue
// because the round of SHA-256 was only intended to achieve a fixed-length
// input to bcrypt; it is bcrypt that provides the cryptographic security, and
// bcrypt is correctly applied.
func appendEmptySha256(password string) []byte {
// In the past we incorrectly called the hash.Hash.Sum method. That
// method uses its argument as a place to put the current hash:
// it does not add its argument to the curret hash. Thus, using
// h.Sum([]byte(password))) is the equivalent to the below append.
return append([]byte(password), sha256NewSum...)
}

// CompareHashAndPassword tests that the provided bytes are equivalent to the
// hash of the supplied password. If they are not equivalent, returns an
// error.
func CompareHashAndPassword(hashedPassword []byte, password string) error {
h := sha256.New()
// TODO(benesch): properly apply SHA-256 to the password. The current code
// erroneously appends the SHA-256 of the empty hash to the unhashed password
// instead of actually hashing the password. Fixing this requires a somewhat
// complicated backwards compatibility dance. This is not a security issue
// because the round of SHA-256 was only intended to achieve a fixed-length
// input to bcrypt; it is bcrypt that provides the cryptographic security, and
// bcrypt is correctly applied.
//
//lint:ignore HC1000 backwards compatibility
return bcrypt.CompareHashAndPassword(hashedPassword, h.Sum([]byte(password)))
return bcrypt.CompareHashAndPassword(hashedPassword, appendEmptySha256(password))
}

// HashPassword takes a raw password and returns a bcrypt hashed password.
func HashPassword(password string) ([]byte, error) {
h := sha256.New()
//lint:ignore HC1000 backwards compatibility (see CompareHashAndPassword)
return bcrypt.GenerateFromPassword(h.Sum([]byte(password)), BcryptCost)
return bcrypt.GenerateFromPassword(appendEmptySha256(password), BcryptCost)
}

// PromptForPassword prompts for a password.
Expand Down
10 changes: 0 additions & 10 deletions pkg/sql/distsqlrun/column_exec_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,12 @@ func wrapRowSource(
// to this operator (e.g. streamIDToOp).
toWrapInput = c.input
} else {
outputToInputColIdx := make([]int, len(inputTypes))
for i := range outputToInputColIdx {
outputToInputColIdx[i] = i
}
var err error
toWrapInput, err = newMaterializer(
flowCtx,
processorID,
input,
inputTypes,
outputToInputColIdx,
&distsqlpb.PostProcessSpec{},
nil, /* output */
nil, /* metadataSourcesQueue */
Expand Down Expand Up @@ -1337,10 +1332,6 @@ func (s *vectorizedFlowCreator) setupOutput(
}
// Make the materializer, which will write to the given receiver.
columnTypes := s.syncFlowConsumer.Types()
outputToInputColIdx := make([]int, len(columnTypes))
for i := range outputToInputColIdx {
outputToInputColIdx[i] = i
}
var outputStatsToTrace func()
if s.recordingStats {
// Make a copy given that vectorizedStatsCollectorsQueue is reset and
Expand All @@ -1357,7 +1348,6 @@ func (s *vectorizedFlowCreator) setupOutput(
pspec.ProcessorID,
op,
columnTypes,
outputToInputColIdx,
&distsqlpb.PostProcessSpec{},
s.syncFlowConsumer,
metadataSourcesQueue,
Expand Down
5 changes: 0 additions & 5 deletions pkg/sql/distsqlrun/columnar_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,11 @@ func verifyColOperator(
return err
}

outputToInputColIdx := make([]int, len(outputTypes))
for i := range outputTypes {
outputToInputColIdx[i] = i
}
outColOp, err := newMaterializer(
flowCtx,
int32(len(inputs))+2,
result.op,
outputTypes,
outputToInputColIdx,
&distsqlpb.PostProcessSpec{},
nil, /* output */
nil, /* metadataSourcesQueue */
Expand Down
17 changes: 10 additions & 7 deletions pkg/sql/distsqlrun/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ func (f *Flow) setupProcessors(ctx context.Context, inputSyncs [][]RowSource) er
func (f *Flow) setup(ctx context.Context, spec *distsqlpb.FlowSpec) error {
f.spec = spec
if f.isVectorized {
log.VEventf(ctx, 1, "setting up vectorize flow %d", f.id)
log.VEventf(ctx, 1, "setting up vectorize flow %s", f.id.Short())
acc := f.EvalCtx.Mon.MakeBoundAccount()
f.vectorizedBoundAccount = &acc
err := f.setupVectorizedFlow(ctx, f.vectorizedBoundAccount)
Expand All @@ -481,8 +481,10 @@ func (f *Flow) setup(ctx context.Context, spec *distsqlpb.FlowSpec) error {

// startInternal starts the flow. All processors are started, each in their own
// goroutine. The caller must forward any returned error to syncFlowConsumer if
// set.
func (f *Flow) startInternal(ctx context.Context, doneFn func()) error {
// set. A new context is derived and returned, and it must be used when this
// method returns so that all components running in their own goroutines could
// listen for a cancellation on the same context.
func (f *Flow) startInternal(ctx context.Context, doneFn func()) (context.Context, error) {
f.doneFn = doneFn
log.VEventf(
ctx, 1, "starting (%d processors, %d startables)", len(f.processors), len(f.startables),
Expand All @@ -503,7 +505,7 @@ func (f *Flow) startInternal(ctx context.Context, doneFn func()) error {
if err := f.flowRegistry.RegisterFlow(
ctx, f.id, f, f.inboundStreams, settingFlowStreamTimeout.Get(&f.FlowCtx.Cfg.Settings.SV),
); err != nil {
return err
return ctx, err
}
}

Expand All @@ -523,7 +525,7 @@ func (f *Flow) startInternal(ctx context.Context, doneFn func()) error {
}(i)
}
f.startedGoroutines = len(f.startables) > 0 || len(f.processors) > 0 || !f.isLocal()
return nil
return ctx, nil
}

// isLocal returns whether this flow does not have any remote execution.
Expand All @@ -540,7 +542,7 @@ func (f *Flow) isLocal() bool {
// setup error is pushed to the syncFlowConsumer. In this case, a subsequent
// call to f.Wait() will not block.
func (f *Flow) Start(ctx context.Context, doneFn func()) error {
if err := f.startInternal(ctx, doneFn); err != nil {
if _, err := f.startInternal(ctx, doneFn); err != nil {
// For sync flows, the error goes to the consumer.
if f.syncFlowConsumer != nil {
f.syncFlowConsumer.Push(nil /* row */, &distsqlpb.ProducerMetadata{Err: err})
Expand Down Expand Up @@ -569,7 +571,8 @@ func (f *Flow) Run(ctx context.Context, doneFn func()) error {
headProc = f.processors[len(f.processors)-1]
f.processors = f.processors[:len(f.processors)-1]

if err := f.startInternal(ctx, doneFn); err != nil {
var err error
if ctx, err = f.startInternal(ctx, doneFn); err != nil {
// For sync flows, the error goes to the consumer.
if f.syncFlowConsumer != nil {
f.syncFlowConsumer.Push(nil /* row */, &distsqlpb.ProducerMetadata{Err: err})
Expand Down
40 changes: 7 additions & 33 deletions pkg/sql/distsqlrun/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,6 @@ type materializer struct {

da sqlbase.DatumAlloc

// outputToInputColIdx is a mapping from output row index to the operator's
// internal column schema. For example, if the input operator had 2 columns
// [a, b], and the desired output was just [b], outputToInputColIdx would be
// [1]: mapping the 0th column of the output row schema onto the 1st column
// of the operator's row schema.
outputToInputColIdx []int

// runtime fields --

// curIdx represents the current index into the column batch: the next row the
Expand All @@ -53,10 +46,6 @@ type materializer struct {
outputRow sqlbase.EncDatumRow
outputMetadata *distsqlpb.ProducerMetadata

// ctxCancel will cancel the context that is passed to the input (which will
// pass it down further). This allows for the cancellation of the tree rooted
// at this materializer when it is closed.
ctxCancel context.CancelFunc
// cancelFlow will return a function to cancel the context of the flow. It is
// a function in order to be lazily evaluated, since the context cancellation
// function is only available when Starting. This function differs from
Expand All @@ -83,19 +72,15 @@ func newMaterializer(
processorID int32,
input exec.Operator,
typs []types.T,
// TODO(yuzefovich): I feel like we should remove outputToInputColIdx
// argument since it's always {0, 1, ..., len(typs)-1}.
outputToInputColIdx []int,
post *distsqlpb.PostProcessSpec,
output RowReceiver,
metadataSourcesQueue []distsqlpb.MetadataSource,
outputStatsToTrace func(),
cancelFlow func() context.CancelFunc,
) (*materializer, error) {
m := &materializer{
input: input,
outputToInputColIdx: outputToInputColIdx,
row: make(sqlbase.EncDatumRow, len(outputToInputColIdx)),
input: input,
row: make(sqlbase.EncDatumRow, len(typs)),
}

if err := m.ProcessorBase.Init(
Expand Down Expand Up @@ -139,17 +124,7 @@ func (m *materializer) Child(nth int) exec.OpNode {

func (m *materializer) Start(ctx context.Context) context.Context {
m.input.Init()
ctx = m.ProcessorBase.StartInternal(ctx, materializerProcName)
// In general case, ctx that is passed is related to the "flow context" that
// will be canceled by m.cancelFlow. However, in some cases (like when there
// is a subquery), it appears as if the subquery flow context is not related
// to the flow context of the main query, so calling m.cancelFlow will not
// shutdown the subquery tree. To work around this, we always use another
// context and get another cancellation function, and we will trigger both
// upon exit from the materializer.
// TODO(yuzefovich): figure out what is the problem here.
m.Ctx, m.ctxCancel = context.WithCancel(ctx)
return m.Ctx
return m.ProcessorBase.StartInternal(ctx, materializerProcName)
}

// nextAdapter calls next() and saves the returned results in m. For internal
Expand Down Expand Up @@ -182,17 +157,17 @@ func (m *materializer) next() (sqlbase.EncDatumRow, *distsqlpb.ProducerMetadata)
m.curIdx++

typs := m.OutputTypes()
for outIdx, cIdx := range m.outputToInputColIdx {
col := m.batch.ColVec(cIdx)
for colIdx := 0; colIdx < len(typs); colIdx++ {
col := m.batch.ColVec(colIdx)
// TODO(asubiotto): we shouldn't have to do this check. Figure out who's
// not setting nulls.
if col.MaybeHasNulls() {
if col.Nulls().NullAt(rowIdx) {
m.row[outIdx].Datum = tree.DNull
m.row[colIdx].Datum = tree.DNull
continue
}
}
m.row[outIdx].Datum = exec.PhysicalTypeColElemToDatum(col, rowIdx, m.da, typs[outIdx])
m.row[colIdx].Datum = exec.PhysicalTypeColElemToDatum(col, rowIdx, m.da, typs[colIdx])
}
return m.ProcessRowHelper(m.row), nil
}
Expand All @@ -209,7 +184,6 @@ func (m *materializer) Next() (sqlbase.EncDatumRow, *distsqlpb.ProducerMetadata)

func (m *materializer) InternalClose() bool {
if m.ProcessorBase.InternalClose() {
m.ctxCancel()
if m.cancelFlow != nil {
m.cancelFlow()()
}
Expand Down
3 changes: 0 additions & 3 deletions pkg/sql/distsqlrun/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func TestColumnarizeMaterialize(t *testing.T) {
1, /* processorID */
c,
typs,
[]int{0, 1},
&distsqlpb.PostProcessSpec{},
nil, /* output */
nil, /* metadataSourcesQueue */
Expand Down Expand Up @@ -139,7 +138,6 @@ func TestMaterializeTypes(t *testing.T) {
1, /* processorID */
c,
types,
outputToInputColIdx,
&distsqlpb.PostProcessSpec{},
nil, /* output */
nil, /* metadataSourcesQueue */
Expand Down Expand Up @@ -194,7 +192,6 @@ func BenchmarkColumnarizeMaterialize(b *testing.B) {
1, /* processorID */
c,
types,
[]int{0, 1},
&distsqlpb.PostProcessSpec{},
nil, /* output */
nil, /* metadataSourcesQueue */
Expand Down
2 changes: 0 additions & 2 deletions pkg/sql/distsqlrun/vectorized_error_propagation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ func TestVectorizedErrorPropagation(t *testing.T) {
1, /* processorID */
vee,
types,
[]int{0},
&distsqlpb.PostProcessSpec{},
nil, /* output */
nil, /* metadataSourceQueue */
Expand Down Expand Up @@ -116,7 +115,6 @@ func TestNonVectorizedErrorPropagation(t *testing.T) {
1, /* processorID */
nvee,
types,
[]int{0},
&distsqlpb.PostProcessSpec{},
nil, /* output */
nil, /* metadataSourceQueue */
Expand Down
Loading

0 comments on commit fe76c37

Please sign in to comment.