Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
63706: flowinfra: remove remnants of RunSyncFlow DistSQL RPC r=yuzefovich a=yuzefovich

We recently removed `RunSyncFlow` RPC call of DistSQL, but a couple of
things were missed. This commit cleans up the outbox based on that
removal, removes `SetupSyncFlow` method as well as adds some comments.

Release note: None

63780: opt: allow IN subquery to be converted to lookup join r=RaduBerinde a=RaduBerinde

#### opt: add opttester facility to test placeholder assignment

We like to assume that the result of building a memo with placeholders
followed by AssignPlaceholders is equivalent to building the query
with the values directly. This is not necessarily the case - it is
possible that some normalization rules act on a higher part of the
tree in a way that would not happen if we had fully normalized a lower
part of the tree.

This commit adds two new opttester directives:
`assign-placeholders-norm` and `assign-placeholders-opt`. These take a
query that has placeholders and simulates the prepared query planning
path.

We use these facilities to add some tests that reproduce a customer
issue.

Release note: None

#### opt: allow IN subquery to be converted to lookup join

This change adds a rule that handles a case which prevents Exists
subqueries from becoming lookup joins.

Fixes cockroachdb#43198.

Release note (performance improvement): certain queries containing
`<tuple> IN (<subquery>)` conditions may run significantly faster.

Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Radu Berinde <[email protected]>
  • Loading branch information
3 people committed Apr 19, 2021
3 parents 0609f89 + b5849f3 + e6fd187 commit defd143
Show file tree
Hide file tree
Showing 15 changed files with 601 additions and 227 deletions.
3 changes: 2 additions & 1 deletion pkg/sql/distsql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ go_test(
"columnar_utils_test.go",
"inbound_test.go",
"main_test.go",
"sync_flow_after_drain_test.go",
"setup_flow_after_drain_test.go",
"vectorized_panic_propagation_test.go",
],
embed = [":distsql"],
Expand Down Expand Up @@ -83,6 +83,7 @@ go_test(
"//pkg/util",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/netutil",
"//pkg/util/randutil",
"//pkg/util/stop",
Expand Down
29 changes: 6 additions & 23 deletions pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,26 +439,6 @@ func newFlow(
return rowflow.NewRowBasedFlow(base)
}

// SetupSyncFlow sets up a synchronous flow, connecting the sync response
// output stream to the given RowReceiver. The flow is not started. The flow
// will be associated with the given context.
// Note: the returned context contains a span that must be finished through
// Flow.Cleanup.
func (ds *ServerImpl) SetupSyncFlow(
ctx context.Context,
parentMonitor *mon.BytesMonitor,
req *execinfrapb.SetupFlowRequest,
output execinfra.RowReceiver,
) (context.Context, flowinfra.Flow, error) {
ctx, f, err := ds.setupFlow(
ds.AnnotateCtx(ctx), tracing.SpanFromContext(ctx), parentMonitor, req, output, LocalState{},
)
if err != nil {
return nil, nil, err
}
return ctx, f, err
}

// LocalState carries information that is required to set up a flow with wrapped
// planNodes.
type LocalState struct {
Expand All @@ -483,9 +463,12 @@ type LocalState struct {
LocalProcs []execinfra.LocalProcessor
}

// SetupLocalSyncFlow sets up a synchronous flow on the current (planning) node.
// It's used by the gateway node to set up the flows local to it.
// It's the same as SetupSyncFlow except it takes the localState.
// SetupLocalSyncFlow sets up a synchronous flow on the current (planning) node,
// connecting the sync response output stream to the given RowReceiver. It's
// used by the gateway node to set up the flows local to it. The flow is not
// started. The flow will be associated with the given context.
// Note: the returned context contains a span that must be finished through
// Flow.Cleanup.
func (ds *ServerImpl) SetupLocalSyncFlow(
ctx context.Context,
parentMonitor *mon.BytesMonitor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,17 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/distsqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

// Test that we can register send a sync flow to the distSQLSrv after the
// FlowRegistry is draining and the we can also clean that flow up (the flow
// will get a draining error). This used to crash.
func TestSyncFlowAfterDrain(t *testing.T) {
// Test that we can send a setup flow request to the distSQLSrv after the
// FlowRegistry is draining.
func TestSetupFlowAfterDrain(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
// We'll create a server just so that we can extract its distsql ServerConfig,
Expand Down Expand Up @@ -68,22 +67,13 @@ func TestSyncFlowAfterDrain(t *testing.T) {
},
}

types := make([]*types.T, 0)
rb := distsqlutils.NewRowBuffer(types, nil /* rows */, distsqlutils.RowBufferArgs{})
ctx, flow, err := distSQLSrv.SetupSyncFlow(ctx, distSQLSrv.memMonitor, &req, rb)
// We expect to see an error in the response.
resp, err := distSQLSrv.SetupFlow(ctx, &req)
if err != nil {
t.Fatal(err)
}
if err := flow.Start(ctx, func() {}); err != nil {
t.Fatal(err)
}
flow.Wait()
_, meta := rb.Next()
if meta == nil {
t.Fatal("expected draining err, got no meta")
}
if !testutils.IsError(meta.Err, "the registry is draining") {
t.Fatalf("expected draining err, got: %v", meta.Err)
respErr := resp.Error.ErrorDetail(ctx)
if !testutils.IsError(respErr, "the registry is draining") {
t.Fatalf("expected draining err, got: %v", respErr)
}
flow.Cleanup(ctx)
}
190 changes: 64 additions & 126 deletions pkg/sql/execinfrapb/api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 2 additions & 4 deletions pkg/sql/execinfrapb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,10 @@ message ConsumerSignal {
// stream.
optional DrainRequest drain_request = 1;

// Used in the RunSyncFlow case; the first message on the client stream must
// contain this message.
optional SetupFlowRequest setup_flow_request = 2;

// Consumer->Producer handshake messages. See message definition.
optional ConsumerHandshake handshake = 3;

reserved 2;
}

message DrainRequest {
Expand Down
Loading

0 comments on commit defd143

Please sign in to comment.