Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

execinfrapb: move MockDistSQLServer to flowinfra #130111

Merged
merged 1 commit into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/sql/colflow/colrpc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ go_test(
"//pkg/sql/colmem",
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/flowinfra",
"//pkg/sql/types",
"//pkg/testutils",
"//pkg/util/cancelchecker",
Expand Down
13 changes: 7 additions & 6 deletions pkg/sql/colflow/colrpc/colrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/cancelchecker"
Expand Down Expand Up @@ -135,7 +136,7 @@ func TestOutboxInbox(t *testing.T) {
defer stopper.Stop(ctx)

clock := hlc.NewClockForTesting(nil)
_, mockServer, addr, err := execinfrapb.StartMockDistSQLServer(ctx, clock, stopper, execinfra.StaticSQLInstanceID)
_, mockServer, addr, err := flowinfra.StartMockDistSQLServer(ctx, clock, stopper, execinfra.StaticSQLInstanceID)
require.NoError(t, err)

// Generate a random cancellation scenario.
Expand Down Expand Up @@ -490,7 +491,7 @@ func TestInboxHostCtxCancellation(t *testing.T) {
defer stopper.Stop(ctx)

clock := hlc.NewClockForTesting(nil)
_, mockServer, addr, err := execinfrapb.StartMockDistSQLServer(ctx, clock, stopper, execinfra.StaticSQLInstanceID)
_, mockServer, addr, err := flowinfra.StartMockDistSQLServer(ctx, clock, stopper, execinfra.StaticSQLInstanceID)
require.NoError(t, err)

rng, _ := randutil.NewTestRand()
Expand Down Expand Up @@ -578,7 +579,7 @@ func TestOutboxInboxMetadataPropagation(t *testing.T) {
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

_, mockServer, addr, err := execinfrapb.StartMockDistSQLServer(ctx,
_, mockServer, addr, err := flowinfra.StartMockDistSQLServer(ctx,
hlc.NewClockForTesting(nil), stopper, execinfra.StaticSQLInstanceID,
)
require.NoError(t, err)
Expand Down Expand Up @@ -773,7 +774,7 @@ func BenchmarkOutboxInbox(b *testing.B) {
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

_, mockServer, addr, err := execinfrapb.StartMockDistSQLServer(ctx,
_, mockServer, addr, err := flowinfra.StartMockDistSQLServer(ctx,
hlc.NewClockForTesting(nil), stopper, execinfra.StaticSQLInstanceID,
)
require.NoError(b, err)
Expand Down Expand Up @@ -848,11 +849,11 @@ func TestOutboxStreamIDPropagation(t *testing.T) {
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

_, mockServer, addr, err := execinfrapb.StartMockDistSQLServer(ctx,
_, mockServer, addr, err := flowinfra.StartMockDistSQLServer(ctx,
hlc.NewClockForTesting(nil), stopper, execinfra.StaticSQLInstanceID,
)
require.NoError(t, err)
dialer := &execinfrapb.MockDialer{Addr: addr}
dialer := &flowinfra.MockDialer{Addr: addr}
defer dialer.Close()

typs := []*types.T{types.Int}
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/colflow/vectorized_flow_shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -113,11 +114,11 @@ func TestVectorizedFlowShutdown(t *testing.T) {
ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
_, mockServer, addr, err := execinfrapb.StartMockDistSQLServer(ctx,
_, mockServer, addr, err := flowinfra.StartMockDistSQLServer(ctx,
hlc.NewClockForTesting(nil), stopper, execinfra.StaticSQLInstanceID,
)
require.NoError(t, err)
dialer := &execinfrapb.MockDialer{Addr: addr}
dialer := &flowinfra.MockDialer{Addr: addr}
defer dialer.Close()

queueCfg, cleanup := colcontainerutils.NewTestingDiskQueueCfg(t, true /* inMem */)
Expand Down
8 changes: 0 additions & 8 deletions pkg/sql/execinfrapb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,14 @@ go_library(
"expr.go",
"flow_diagram.go",
"processors.go",
"testutils.go",
],
embed = [":execinfrapb_go_proto"],
importpath = "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb",
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/roachpb",
"//pkg/rpc",
"//pkg/security/username",
"//pkg/settings/cluster",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catalogkeys",
"//pkg/sql/catalog/catenumpb",
Expand All @@ -45,21 +42,16 @@ go_library(
"//pkg/util/buildutil",
"//pkg/util/duration",
"//pkg/util/encoding",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/netutil",
"//pkg/util/optional",
"//pkg/util/protoutil",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/tracing/tracingpb",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
"@com_github_dustin_go_humanize//:go-humanize",
"@com_github_gogo_protobuf//types",
"@org_golang_google_grpc//:go_default_library",
],
)

Expand Down
9 changes: 9 additions & 0 deletions pkg/sql/flowinfra/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@ go_library(
"stream_decoder.go",
"stream_encoder.go",
"testing_knobs.go",
"testutils.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/sql/flowinfra",
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/kv",
"//pkg/roachpb",
"//pkg/rpc",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/catalog/catenumpb",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/execinfra",
Expand All @@ -32,23 +35,29 @@ go_library(
"//pkg/sql/rowenc",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/util",
"//pkg/util/admission",
"//pkg/util/admission/admissionpb",
"//pkg/util/buildutil",
"//pkg/util/cancelchecker",
"//pkg/util/ctxlog",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/netutil",
"//pkg/util/optional",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/tracing/tracingpb",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
"@com_github_gogo_protobuf//proto",
"@io_opentelemetry_go_otel//attribute",
"@org_golang_google_grpc//:go_default_library",
],
)

Expand Down
10 changes: 5 additions & 5 deletions pkg/sql/flowinfra/outbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestOutbox(t *testing.T) {
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
clock := hlc.NewClockForTesting(nil)
clusterID, mockServer, addr, err := execinfrapb.StartMockDistSQLServer(ctx, clock, stopper, execinfra.StaticSQLInstanceID)
clusterID, mockServer, addr, err := flowinfra.StartMockDistSQLServer(ctx, clock, stopper, execinfra.StaticSQLInstanceID)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -224,7 +224,7 @@ func TestOutboxInitializesStreamBeforeReceivingAnyRows(t *testing.T) {
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
clock := hlc.NewClockForTesting(nil)
clusterID, mockServer, addr, err := execinfrapb.StartMockDistSQLServer(ctx, clock, stopper, execinfra.StaticSQLInstanceID)
clusterID, mockServer, addr, err := flowinfra.StartMockDistSQLServer(ctx, clock, stopper, execinfra.StaticSQLInstanceID)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -297,7 +297,7 @@ func TestOutboxClosesWhenConsumerCloses(t *testing.T) {
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
clock := hlc.NewClockForTesting(nil)
clusterID, mockServer, addr, err := execinfrapb.StartMockDistSQLServer(ctx, clock, stopper, execinfra.StaticSQLInstanceID)
clusterID, mockServer, addr, err := flowinfra.StartMockDistSQLServer(ctx, clock, stopper, execinfra.StaticSQLInstanceID)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -375,7 +375,7 @@ func TestOutboxCancelsFlowOnError(t *testing.T) {
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
clock := hlc.NewClockForTesting(nil)
clusterID, mockServer, addr, err := execinfrapb.StartMockDistSQLServer(ctx, clock, stopper, execinfra.StaticSQLInstanceID)
clusterID, mockServer, addr, err := flowinfra.StartMockDistSQLServer(ctx, clock, stopper, execinfra.StaticSQLInstanceID)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -501,7 +501,7 @@ func BenchmarkOutbox(b *testing.B) {
stopper := stop.NewStopper()
defer stopper.Stop(bgCtx)
clock := hlc.NewClockForTesting(nil)
clusterID, mockServer, addr, err := execinfrapb.StartMockDistSQLServer(bgCtx, clock, stopper, execinfra.StaticSQLInstanceID)
clusterID, mockServer, addr, err := flowinfra.StartMockDistSQLServer(bgCtx, clock, stopper, execinfra.StaticSQLInstanceID)
if err != nil {
b.Fatal(err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package execinfrapb
package flowinfra

import (
"context"
Expand All @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/netutil"
Expand Down Expand Up @@ -52,7 +53,7 @@ func StartMockDistSQLServer(
return uuid.Nil, nil, nil, err
}
mock := newMockDistSQLServer()
RegisterDistSQLServer(server, mock)
execinfrapb.RegisterDistSQLServer(server, mock)
ln, err := netutil.ListenAndServeGRPC(stopper, server, util.IsolatedTestAddr)
if err != nil {
return uuid.Nil, nil, nil, err
Expand All @@ -70,12 +71,12 @@ type MockDistSQLServer struct {
// that a new gRPC call has arrived and thus a stream has arrived. The rpc
// handler is blocked until Donec is signaled.
type InboundStreamNotification struct {
Stream DistSQL_FlowStreamServer
Stream execinfrapb.DistSQL_FlowStreamServer
Donec chan<- error
}

// MockDistSQLServer implements the DistSQLServer interface.
var _ DistSQLServer = &MockDistSQLServer{}
var _ execinfrapb.DistSQLServer = &MockDistSQLServer{}

func newMockDistSQLServer() *MockDistSQLServer {
return &MockDistSQLServer{
Expand All @@ -85,20 +86,20 @@ func newMockDistSQLServer() *MockDistSQLServer {

// SetupFlow is part of the DistSQLServer interface.
func (ds *MockDistSQLServer) SetupFlow(
_ context.Context, req *SetupFlowRequest,
) (*SimpleResponse, error) {
_ context.Context, req *execinfrapb.SetupFlowRequest,
) (*execinfrapb.SimpleResponse, error) {
return nil, nil
}

// CancelDeadFlows is part of the DistSQLServer interface.
func (ds *MockDistSQLServer) CancelDeadFlows(
_ context.Context, req *CancelDeadFlowsRequest,
) (*SimpleResponse, error) {
_ context.Context, req *execinfrapb.CancelDeadFlowsRequest,
) (*execinfrapb.SimpleResponse, error) {
return nil, nil
}

// FlowStream is part of the DistSQLServer interface.
func (ds *MockDistSQLServer) FlowStream(stream DistSQL_FlowStreamServer) error {
func (ds *MockDistSQLServer) FlowStream(stream execinfrapb.DistSQL_FlowStreamServer) error {
donec := make(chan error)
ds.InboundStreams <- InboundStreamNotification{Stream: stream, Donec: donec}
return <-donec
Expand All @@ -107,7 +108,7 @@ func (ds *MockDistSQLServer) FlowStream(stream DistSQL_FlowStreamServer) error {
// MockDialer is a mocked implementation of the Outbox's `Dialer` interface.
// Used to create a connection with a client stream.
type MockDialer struct {
// Addr is assumed to be obtained from execinfrapb.StartMockDistSQLServer.
// Addr is assumed to be obtained from flowinfra.StartMockDistSQLServer.
Addr net.Addr
mu struct {
syncutil.Mutex
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/flowinfra/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func createDummyStream(
stopper := stop.NewStopper()
ctx := context.Background()
clock := hlc.NewClockForTesting(nil)
storageClusterID, mockServer, addr, err := execinfrapb.StartMockDistSQLServer(ctx, clock, stopper, execinfra.StaticSQLInstanceID)
storageClusterID, mockServer, addr, err := StartMockDistSQLServer(ctx, clock, stopper, execinfra.StaticSQLInstanceID)
if err != nil {
t.Fatal(err)
}
Expand Down
Loading