Skip to content

Commit

Permalink
[WIP] streamingccl: add ingestion job framework
Browse files Browse the repository at this point in the history
This change introduces a new StreamIngestionJob. It does not do much
more than laying out the general outline of the job, which is very
similar to other bulk jobs such as changefeed, backup etc.

More precisely:
- Introduces StreamIngestionDetails job details proto
- Hooks up the dependancy to a mock stream client
- Introduces a StreamIngestionProcessorSpec
- Sets up a simple DistSQL flow which round robin assigns the partitions
  to the processors.

Most notable TODOs in job land which will be addressed in follow up PRs:
- StreamIngestionPlanHook to create this job. Will involve figuring out
  SQL syntax.
- Introducing a ts watermark in both the job and processors. This
  watermark will represent the lowest resolved ts which all processors
have ingested till. Iron out semantics on job start and resumption.
- Introducing a StreamIngestionFrontier processor which will slurp the
  results from the StreamIngestionProcessors, and use them to keep track
of the minimum resolved ts across all processors.

Release note: None
  • Loading branch information
adityamaru committed Dec 30, 2020
1 parent 60bf11a commit 2dc44c7
Show file tree
Hide file tree
Showing 13 changed files with 1,279 additions and 542 deletions.
21 changes: 19 additions & 2 deletions pkg/ccl/streamingccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,25 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "streamingccl",
srcs = ["event.go"],
srcs = [
"event.go",
"stream_ingestion_job.go",
"stream_ingestion_processor_planning.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl",
visibility = ["//visibility:public"],
deps = ["//pkg/roachpb"],
deps = [
"//pkg/ccl/streamingccl/streamclient",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/kv",
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/execinfrapb",
"//pkg/sql/physicalplan",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"@com_github_cockroachdb_logtags//:logtags",
],
)
93 changes: 93 additions & 0 deletions pkg/ccl/streamingccl/stream_ingestion_job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2020 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package streamingccl

import (
"context"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
)

type streamIngestionResumer struct {
job *jobs.Job
}

func ingest(
ctx context.Context, execCtx sql.JobExecContext, streamAddress string, job *jobs.Job,
) error {
// Initialize a stream client and resolve topology.
client := streamclient.NewMockStreamClient()
sa := streamclient.StreamAddress(streamAddress)
topology, err := client.GetTopology(sa)
if err != nil {
return err
}

evalCtx := execCtx.ExtendedEvalContext()
dsp := execCtx.DistSQLPlanner()

planCtx, nodes, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, execCtx.ExecCfg())
if err != nil {
return err
}

// Construct stream ingestion processor specs.
streamIngestionSpecs, err := distStreamIngestionPlanSpecs(topology, nodes)
if err != nil {
return err
}

// Plan and run the DistSQL flow.
err = distStreamIngest(ctx, execCtx, nodes, planCtx, dsp, streamIngestionSpecs)
if err != nil {
return err
}

return nil
}

// Resume is part of the jobs.Resumer interface.
func (s *streamIngestionResumer) Resume(
ctx context.Context, execCtx interface{}, resultsCh chan<- tree.Datums,
) error {
details := s.job.Details().(jobspb.StreamIngestionDetails)
p := execCtx.(sql.JobExecContext)

err := ingest(ctx, p, details.StreamAddress, s.job)
if err != nil {
return err
}

// TODO(adityamaru): We probably want to use the resultsCh to indicate that
// the processors have completed setup. We can then return the job ID in the
// plan hook similar to how changefeeds do it.

return nil
}

// OnFailOrCancel is part of the jobs.Resumer interface.
func (s *streamIngestionResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error {
return nil
}

var _ jobs.Resumer = &streamIngestionResumer{}

func init() {
jobs.RegisterConstructor(
jobspb.TypeStreamIngestion,
func(job *jobs.Job,
settings *cluster.Settings) jobs.Resumer {
return &streamIngestionResumer{job: job}
})
}
109 changes: 109 additions & 0 deletions pkg/ccl/streamingccl/stream_ingestion_processor_planning.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2020 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package streamingccl

import (
"context"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/logtags"
)

// TODO(adityamaru): Figure out what the processors will return.
var streamIngestionResultTypes = []*types.T{}

func distStreamIngestionPlanSpecs(
topology streamclient.Topology, nodes []roachpb.NodeID,
) ([]*execinfrapb.StreamIngestionDataSpec, error) {

// For each stream partition in the topology, assign it to a node.
streamIngestionSpecs := make([]*execinfrapb.StreamIngestionDataSpec, 0, len(nodes))

for i, partition := range topology.Partitions {
// Round robin assign the stream partitions to nodes. Partitions 0 through
// len(nodes) - 1 creates the spec. Future partitions just add themselves to
// the partition addresses.
if i < len(nodes) {
spec := &execinfrapb.StreamIngestionDataSpec{
PartitionAddress: make(map[int32]string),
}
streamIngestionSpecs = append(streamIngestionSpecs, spec)
}
n := i % len(nodes)
streamIngestionSpecs[n].PartitionAddress[int32(i)] = string(partition)
}

return streamIngestionSpecs, nil
}

func distStreamIngest(
ctx context.Context,
execCtx sql.JobExecContext,
nodes []roachpb.NodeID,
planCtx *sql.PlanningCtx,
dsp *sql.DistSQLPlanner,
streamIngestionSpecs []*execinfrapb.StreamIngestionDataSpec,
) error {
ctx = logtags.AddTag(ctx, "stream-ingest-distsql", nil)
evalCtx := execCtx.ExtendedEvalContext()
var noTxn *kv.Txn

if len(streamIngestionSpecs) == 0 {
return nil
}

// Setup a one-stage plan with one proc per input spec.
corePlacement := make([]physicalplan.ProcessorCorePlacement, len(streamIngestionSpecs))
for i := range streamIngestionSpecs {
corePlacement[i].NodeID = nodes[i]
corePlacement[i].Core.StreamIngestionData = streamIngestionSpecs[i]
}

p := planCtx.NewPhysicalPlan()
p.AddNoInputStage(
corePlacement,
execinfrapb.PostProcessSpec{},
streamIngestionResultTypes,
execinfrapb.Ordering{},
)

// TODO(adityamaru): It is likely that we will add a StreamIngestFrontier
// processor on the coordinator node. All the StreamIngestionProcessors will
// feed their results into this frontier. This is similar to the relationship
// between the ChangeAggregator and ChangeFrontier processors. The
// StreamIngestFrontier will be responsible for updating the job watermark
// with the min of the resolved ts outputted by all the processors.

// TODO(adityamaru): Once result types are updated, add PlanToStreamColMap.
dsp.FinalizePlan(planCtx, p)

recv := sql.MakeDistSQLReceiver(
ctx,
// TODO(adityamaru): Are there any results we want to surface to the user?
nil, /* resultWriter */
tree.Rows,
nil, /* rangeCache */
noTxn,
nil, /* clockUpdater */
evalCtx.Tracing,
)
defer recv.Release()

// Copy the evalCtx, as dsp.Run() might change it.
evalCtxCopy := *evalCtx
dsp.Run(planCtx, noTxn, p, recv, &evalCtxCopy, nil /* finishedSetupFn */)
return nil
}
7 changes: 5 additions & 2 deletions pkg/ccl/streamingccl/streamclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "streamclient",
srcs = ["stream_client.go"],
srcs = [
"mock_stream_client.go",
"stream_client.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient",
visibility = ["//visibility:public"],
deps = ["//pkg/ccl/streamingccl"],
Expand All @@ -17,6 +20,6 @@ go_test(
"//pkg/roachpb",
"//pkg/util/hlc",
"//pkg/util/timeutil",
"//vendor/github.com/stretchr/testify/require",
"@com_github_stretchr_testify//require",
],
)
37 changes: 37 additions & 0 deletions pkg/ccl/streamingccl/streamclient/mock_stream_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2020 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package streamclient

import (
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
)

// MockStreamClient is a mock stream client.
type MockStreamClient struct{}

var _ StreamClient = &MockStreamClient{}

// NewMockStreamClient returns a new mock stream client.
func NewMockStreamClient() *MockStreamClient {
return &MockStreamClient{}
}

// GetTopology implements the StreamClient interface.
func (m *MockStreamClient) GetTopology(address StreamAddress) (Topology, error) {
panic("unimplemented mock method")
}

// ConsumePartition implements the StreamClient interface.
func (m *MockStreamClient) ConsumePartition(
address PartitionAddress, startTime time.Time,
) (chan streamingccl.Event, error) {
panic("unimplemented mock method")
}
10 changes: 5 additions & 5 deletions pkg/ccl/streamingccl/streamclient/stream_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,20 @@ import (
"github.com/stretchr/testify/require"
)

type mockStreamClient struct{}
type testStreamClient struct{}

var _ StreamClient = mockStreamClient{}
var _ StreamClient = testStreamClient{}

// GetTopology implements the StreamClient interface.
func (sc mockStreamClient) GetTopology(_ StreamAddress) (Topology, error) {
func (sc testStreamClient) GetTopology(_ StreamAddress) (Topology, error) {
return Topology{Partitions: []PartitionAddress{
"s3://my_bucket/my_stream/partition_1",
"s3://my_bucket/my_stream/partition_2",
}}, nil
}

// ConsumePartition implements the StreamClient interface.
func (sc mockStreamClient) ConsumePartition(
func (sc testStreamClient) ConsumePartition(
_ PartitionAddress, _ time.Time,
) (chan streamingccl.Event, error) {
sampleKV := roachpb.KeyValue{
Expand All @@ -54,7 +54,7 @@ func (sc mockStreamClient) ConsumePartition(
// TestExampleClientUsage serves as documentation to indicate how a stream
// client could be used.
func TestExampleClientUsage(t *testing.T) {
client := mockStreamClient{}
client := testStreamClient{}
sa := StreamAddress("s3://my_bucket/my_stream")
topology, err := client.GetTopology(sa)
require.NoError(t, err)
Expand Down
Loading

0 comments on commit 2dc44c7

Please sign in to comment.