Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
61337: sql,jobs: stop depending on streamingccl r=pbardea a=pbardea

Before this commit, some of our protobuf definitions (not defined in the
ccl code-base) referenced ccl types. This commit removes the implicit
casting of the protos and therefore the dependency on ccl code.

Release justification: high-impact (non-ccl code should not bring in ccl
code), low risk (type casts).
Release note: None

Co-authored-by: Paul Bardea <[email protected]>
  • Loading branch information
craig[bot] and pbardea committed Mar 10, 2021
2 parents 3bae381 + 729dc4c commit 8e04b62
Show file tree
Hide file tree
Showing 18 changed files with 586 additions and 594 deletions.
37 changes: 29 additions & 8 deletions pkg/ccl/streamingccl/streamclient/random_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,30 @@ const (
IngestionTablePrefix = "foo"
)

type interceptFn func(event streamingccl.Event, pa streamingccl.PartitionAddress)
var randomStreamClientSingleton *randomStreamClient

// GetRandomStreamClientSingletonForTesting returns the singleton instance of
// the client. This is to be used in testing, when interceptors can be
// registered on the client to observe events.
func GetRandomStreamClientSingletonForTesting() Client {
if randomStreamClientSingleton == nil {
randomStreamClientSingleton = &randomStreamClient{}
}
return randomStreamClientSingleton
}

// InterceptFn is a function that will intercept events emitted by
// an InterceptableStreamClient
type InterceptFn func(event streamingccl.Event, pa streamingccl.PartitionAddress)

// InterceptableStreamClient wraps a Client, and provides a method to register
// interceptor methods that are run on every streamed Event.
type InterceptableStreamClient interface {
Client

RegisterInterception(fn interceptFn)
// RegisterInterception is how you can register your interceptor to be called
// from an InterceptableStreamClient.
RegisterInterception(fn InterceptFn)
}

// randomStreamConfig specifies the variables that controls the rate and type of
Expand Down Expand Up @@ -168,16 +184,21 @@ var _ InterceptableStreamClient = &randomStreamClient{}
// events on a table with an integer key and integer value for the table with
// the given ID.
func newRandomStreamClient(streamURL *url.URL) (Client, error) {
if randomStreamClientSingleton == nil {
randomStreamClientSingleton = &randomStreamClient{}

randomStreamClientSingleton.mu.Lock()
randomStreamClientSingleton.mu.tableID = 52
randomStreamClientSingleton.mu.Unlock()
}

streamConfig, err := parseRandomStreamConfig(streamURL)
if err != nil {
return nil, err
}
randomStreamClientSingleton.config = streamConfig

client := randomStreamClient{config: streamConfig}
client.mu.Lock()
defer client.mu.Unlock()
client.mu.tableID = 52
return &client, nil
return randomStreamClientSingleton, nil
}

func (m *randomStreamClient) getNextTableID() int {
Expand Down Expand Up @@ -393,7 +414,7 @@ func (m *randomStreamClient) makeRandomKey(
}

// RegisterInterception implements the InterceptableStreamClient interface.
func (m *randomStreamClient) RegisterInterception(fn interceptFn) {
func (m *randomStreamClient) RegisterInterception(fn InterceptFn) {
m.mu.Lock()
defer m.mu.Unlock()
m.mu.interceptors = append(m.mu.interceptors, fn)
Expand Down
1 change: 0 additions & 1 deletion pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ go_test(
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/sem/tree",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func TestStreamIngestionFrontierProcessor(t *testing.T) {
},
} {
t.Run(tc.name, func(t *testing.T) {
spec.PartitionAddresses = []streamingccl.PartitionAddress{pa1, pa2}
spec.PartitionAddresses = []string{string(pa1), string(pa2)}
proc, err := newStreamIngestionDataProcessor(&flowCtx, 0 /* processorID */, spec, &post, out)
require.NoError(t, err)
sip, ok := proc.(*streamIngestionProcessor)
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ func (s *streamIngestionResumer) Resume(resumeCtx context.Context, execCtx inter
p := execCtx.(sql.JobExecContext)

// Start ingesting KVs from the replication stream.
err := ingest(resumeCtx, p, details.StartTime, details.StreamAddress, s.job.Progress(), s.job.ID())
streamAddress := streamingccl.StreamAddress(details.StreamAddress)
err := ingest(resumeCtx, p, details.StartTime, streamAddress, s.job.Progress(), s.job.ID())
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func ingestionPlanHook(
}

streamIngestionDetails := jobspb.StreamIngestionDetails{
StreamAddress: streamAddress,
StreamAddress: string(streamAddress),
Span: roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()},
StartTime: startTime,
}
Expand Down
20 changes: 3 additions & 17 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/bulk"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
Expand Down Expand Up @@ -138,25 +137,11 @@ func newStreamIngestionDataProcessor(
post *execinfrapb.PostProcessSpec,
output execinfra.RowReceiver,
) (execinfra.Processor, error) {
streamClient, err := streamclient.NewStreamClient(spec.StreamAddress)
streamClient, err := streamclient.NewStreamClient(streamingccl.StreamAddress(spec.StreamAddress))
if err != nil {
return nil, err
}

// Check if there are any interceptor methods that need to be registered with
// the stream client.
// These methods are invoked on every emitted Event.
if knobs, ok := flowCtx.Cfg.TestingKnobs.StreamIngestionTestingKnobs.(*sql.
StreamIngestionTestingKnobs); ok {
if knobs.Interceptors != nil {
if interceptable, ok := streamClient.(streamclient.InterceptableStreamClient); ok {
for _, interceptor := range knobs.Interceptors {
interceptable.RegisterInterception(interceptor)
}
}
}
}

sip := &streamIngestionProcessor{
flowCtx: flowCtx,
spec: spec,
Expand Down Expand Up @@ -212,7 +197,8 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) {
// Initialize the event streams.
eventChs := make(map[streamingccl.PartitionAddress]chan streamingccl.Event)
errChs := make(map[streamingccl.PartitionAddress]chan error)
for _, partitionAddress := range sip.spec.PartitionAddresses {
for _, pa := range sip.spec.PartitionAddresses {
partitionAddress := streamingccl.PartitionAddress(pa)
eventCh, errCh, err := sip.client.ConsumePartition(ctx, partitionAddress, sip.spec.StartTime)
if err != nil {
sip.MoveToDraining(errors.Wrapf(err, "consuming partition %v", partitionAddress))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ func distStreamIngestionPlanSpecs(
spec := &execinfrapb.StreamIngestionDataSpec{
JobID: int64(jobID),
StartTime: initialHighWater,
StreamAddress: streamAddress,
PartitionAddresses: make([]streamingccl.PartitionAddress, 0),
StreamAddress: string(streamAddress),
PartitionAddresses: make([]string, 0),
}
streamIngestionSpecs = append(streamIngestionSpecs, spec)
}
n := i % len(nodes)
streamIngestionSpecs[n].PartitionAddresses = append(streamIngestionSpecs[n].PartitionAddresses,
partition)
string(partition))
partitionKey := roachpb.Key(partition)
// We create "fake" spans to uniquely identify the partition. This is used
// to keep track of the resolved ts received for a particular partition in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -131,8 +130,7 @@ func TestStreamIngestionProcessor(t *testing.T) {
startTime := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
partitionAddresses := []streamingccl.PartitionAddress{"partition1", "partition2"}
out, err := runStreamIngestionProcessor(ctx, t, registry, kvDB, "randomgen://test/",
partitionAddresses,
startTime, nil /* interceptEvents */, mockClient)
partitionAddresses, startTime, nil /* interceptEvents */, mockClient)
require.NoError(t, err)

actualRows := make(map[string]struct{})
Expand Down Expand Up @@ -308,8 +306,7 @@ func TestRandomClientGeneration(t *testing.T) {
streamValidator := newStreamClientValidator()
validator := registerValidatorWithClient(streamValidator)
out, err := runStreamIngestionProcessor(ctx, t, registry, kvDB, streamAddr, topo.Partitions,
startTime, []func(streamingccl.Event, streamingccl.PartitionAddress){cancelAfterCheckpoints,
validator}, nil /* mockClient */)
startTime, []streamclient.InterceptFn{cancelAfterCheckpoints, validator}, nil /* mockClient */)
require.NoError(t, err)

partitionSpanToTableID := getPartitionSpanToTableID(t, topo.Partitions)
Expand Down Expand Up @@ -379,7 +376,7 @@ func runStreamIngestionProcessor(
streamAddr string,
partitionAddresses []streamingccl.PartitionAddress,
startTime hlc.Timestamp,
interceptEvents []func(streamingccl.Event, streamingccl.PartitionAddress),
interceptEvents []streamclient.InterceptFn,
mockClient streamclient.Client,
) (*distsqlutils.RowBuffer, error) {
st := cluster.MakeTestingClusterSettings()
Expand All @@ -397,16 +394,17 @@ func runStreamIngestionProcessor(
EvalCtx: &evalCtx,
DiskMonitor: testDiskMonitor,
}
flowCtx.Cfg.TestingKnobs.StreamIngestionTestingKnobs = &sql.StreamIngestionTestingKnobs{
Interceptors: interceptEvents}

out := &distsqlutils.RowBuffer{}
post := execinfrapb.PostProcessSpec{}

var spec execinfrapb.StreamIngestionDataSpec
spec.StreamAddress = streamingccl.StreamAddress(streamAddr)
spec.StreamAddress = streamAddr

spec.PartitionAddresses = partitionAddresses
spec.PartitionAddresses = make([]string, len(partitionAddresses))
for i, pa := range partitionAddresses {
spec.PartitionAddresses[i] = string(pa)
}
spec.StartTime = startTime
processorID := int32(0)
proc, err := newStreamIngestionDataProcessor(&flowCtx, processorID, spec, &post, out)
Expand All @@ -420,6 +418,12 @@ func runStreamIngestionProcessor(
sip.client = mockClient
}

if interceptable, ok := sip.client.(streamclient.InterceptableStreamClient); ok {
for _, interceptor := range interceptEvents {
interceptable.RegisterInterception(interceptor)
}
}

sip.Run(ctx)

// Ensure that all the outputs are properly closed.
Expand Down
28 changes: 16 additions & 12 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
_ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" // To start tenants.
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
_ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingutils" // Load the cutover builtin.
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
Expand Down Expand Up @@ -87,22 +85,28 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) {
completeJobAfterCheckpoints := makeCheckpointEventCounter(&mu, threshold, func() {
canBeCompletedCh <- struct{}{}
})

// Register interceptors on the random stream client, which will be used by
// the processors.
streamValidator := newStreamClientValidator()
registerValidator := registerValidatorWithClient(streamValidator)
knobs := base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{StreamIngestionTestingKnobs: &sql.StreamIngestionTestingKnobs{
Interceptors: []func(event streamingccl.Event, pa streamingccl.PartitionAddress){completeJobAfterCheckpoints,
registerValidator},
},
},
client := streamclient.GetRandomStreamClientSingletonForTesting()
interceptEvents := []streamclient.InterceptFn{
completeJobAfterCheckpoints,
registerValidator,
}
if interceptable, ok := client.(streamclient.InterceptableStreamClient); ok {
for _, interceptor := range interceptEvents {
interceptable.RegisterInterception(interceptor)
}
} else {
t.Fatal("expected the random stream client to be interceptable")
}
serverArgs := base.TestServerArgs{}
serverArgs.Knobs = knobs

var receivedRevertRequest chan struct{}
var allowResponse chan struct{}
var revertRangeTargetTime hlc.Timestamp
params := base.TestClusterArgs{ServerArgs: serverArgs}
params := base.TestClusterArgs{}
params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{
TestingRequestFilter: func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error {
for _, req := range ba.Requests {
Expand Down
1 change: 0 additions & 1 deletion pkg/jobs/jobspb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ go_proto_library(
proto = ":jobspb_proto",
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/streamingccl", # keep
"//pkg/clusterversion",
"//pkg/roachpb",
"//pkg/security", # keep
Expand Down
Loading

0 comments on commit 8e04b62

Please sign in to comment.