Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
84445: streamingccl: use any topology address in job/frontier client connection r=samiskin a=samiskin

Resolves cockroachdb#84009 

Previously the ingestion job would rely on the provided StreamAddress to obtain
a plan, which is what has to happen when the stream is first created, however
after getting back a topology there are now many more potential addresses to
connect to.

This change makes the ingestion job and the frontier attempt to iterate through
all stream addresses in the topology when attempting to connect to a client.

The topology was also added to the ingestion job progress in order to make this
work, which also has the added observability benefit of the user being able to
accomplish tasks such as checking which nodes have spans which are lagging in
the checkpointed frontier.

Release note (bug fix): ingestion job is now able to fall back to alternate nodes in
its topology if the original streamaddress is unavailable.

Co-authored-by: Shiranka Miskin <[email protected]>
  • Loading branch information
craig[bot] and samiskin committed Jul 29, 2022
2 parents 2d8db3a + 774f779 commit 22d2ba2
Show file tree
Hide file tree
Showing 14 changed files with 368 additions and 48 deletions.
10 changes: 10 additions & 0 deletions pkg/ccl/streamingccl/addresses.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ func (sa StreamAddress) URL() (*url.URL, error) {
return url.Parse(string(sa))
}

// String returns only the Host of the StreamAddress in order to avoid leaking
// credentials. If the URL is invalid, "<invalidURL>" is returned.
func (sa StreamAddress) String() string {
streamURL, parseErr := sa.URL()
if parseErr != nil {
return "<invalidURL>"
}
return streamURL.Host
}

// PartitionAddress is the address where the stream client should be able to
// read the events produced by a partition of a stream.
//
Expand Down
35 changes: 35 additions & 0 deletions pkg/ccl/streamingccl/streamclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -48,6 +49,9 @@ type Client interface {
// can be used to interact with this stream in the future.
Create(ctx context.Context, tenantID roachpb.TenantID) (streaming.StreamID, error)

// Dial checks if the source is able to be connected to for queries
Dial(ctx context.Context) error

// Destroy informs the source of the stream that it may terminate production
// and release resources such as protected timestamps.
// Destroy(ID StreamID) error
Expand Down Expand Up @@ -88,6 +92,15 @@ type Client interface {
// stream. It specifies the number and addresses of partitions of the stream.
type Topology []PartitionInfo

// StreamAddresses returns the list of source addresses in a topology
func (t Topology) StreamAddresses() []string {
var addresses []string
for _, partition := range t {
addresses = append(addresses, string(partition.SrcAddr))
}
return addresses
}

// PartitionInfo describes a partition of a replication stream, i.e. a set of key
// spans in a source cluster in which changes will be emitted.
type PartitionInfo struct {
Expand Down Expand Up @@ -149,6 +162,28 @@ func NewStreamClient(
return streamClient, nil
}

// GetFirstActiveClient iterates through each provided stream address
// and returns the first client it's able to successfully Dial.
func GetFirstActiveClient(ctx context.Context, streamAddresses []string) (Client, error) {
var combinedError error = nil
for _, address := range streamAddresses {
streamAddress := streamingccl.StreamAddress(address)
client, err := NewStreamClient(ctx, streamAddress)
if err == nil {
err = client.Dial(ctx)
if err == nil {
return client, err
}
}

// Note the failure and attempt the next address
log.Errorf(ctx, "failed to connect to address %s: %s", streamAddress, err.Error())
combinedError = errors.CombineErrors(combinedError, err)
}

return nil, errors.Wrap(combinedError, "failed to connect to any partition address")
}

/*
TODO(cdc): Proposed new API from yv/dt chat. #70927.
Expand Down
69 changes: 65 additions & 4 deletions pkg/ccl/streamingccl/streamclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ package streamclient
import (
"context"
"fmt"
"net/url"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
Expand All @@ -20,13 +22,21 @@ import (
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

type testStreamClient struct{}

var _ Client = testStreamClient{}

// Dial implements Client interface.
func (sc testStreamClient) Dial(ctx context.Context) error {
return nil
}

// Create implements the Client interface.
func (sc testStreamClient) Create(
ctx context.Context, target roachpb.TenantID,
Expand All @@ -36,10 +46,10 @@ func (sc testStreamClient) Create(

// Plan implements the Client interface.
func (sc testStreamClient) Plan(ctx context.Context, ID streaming.StreamID) (Topology, error) {
return Topology([]PartitionInfo{
{SrcAddr: streamingccl.PartitionAddress("test://host1")},
{SrcAddr: streamingccl.PartitionAddress("test://host2")},
}), nil
return Topology{
{SrcAddr: "test://host1"},
{SrcAddr: "test://host2"},
}, nil
}

// Heartbeat implements the Client interface.
Expand Down Expand Up @@ -105,6 +115,57 @@ func (t testStreamSubscription) Err() error {
return nil
}

func TestGetFirstActiveClient(t *testing.T) {
defer leaktest.AfterTest(t)()

client := GetRandomStreamClientSingletonForTesting()
defer func() {
require.NoError(t, client.Close(context.Background()))
}()
interceptable, ok := client.(InterceptableStreamClient)
require.True(t, ok)

streamAddresses := []string{
"randomgen://test0/",
"<invalid-url-test1>",
"randomgen://test2/",
"invalidScheme://test3",
"randomgen://test4/",
"randomgen://test5/",
"randomgen://test6/",
}
addressDialCount := map[string]int{}
for _, addr := range streamAddresses {
addressDialCount[addr] = 0
}

// Track dials and error for all but test3 and test4
interceptable.RegisterDialInterception(func(streamURL *url.URL) error {
addr := streamURL.String()
addressDialCount[addr]++
if addr != streamAddresses[3] && addr != streamAddresses[4] {
return errors.Errorf("injected dial error")
}
return nil
})

client, err := GetFirstActiveClient(context.Background(), streamAddresses)
require.NoError(t, err)

// Should've dialed the valid schemes up to the 5th one where it should've
// succeeded
require.Equal(t, 1, addressDialCount[streamAddresses[0]])
require.Equal(t, 0, addressDialCount[streamAddresses[1]])
require.Equal(t, 1, addressDialCount[streamAddresses[2]])
require.Equal(t, 0, addressDialCount[streamAddresses[3]])
require.Equal(t, 1, addressDialCount[streamAddresses[4]])
require.Equal(t, 0, addressDialCount[streamAddresses[5]])
require.Equal(t, 0, addressDialCount[streamAddresses[6]])

// The 5th should've succeded as it was a valid scheme and succeeded Dial
require.Equal(t, client.(*randomStreamClient).streamURL.String(), streamAddresses[4])
}

// ExampleClientUsage serves as documentation to indicate how a stream
// client could be used.
func ExampleClient() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ func (p *partitionedStreamClient) Create(
return streamID, err
}

// Dial implements Client interface.
func (p *partitionedStreamClient) Dial(ctx context.Context) error {
p.mu.Lock()
defer p.mu.Unlock()
err := p.mu.srcConn.Ping(ctx)
return errors.Wrap(err, "failed to dial client")
}

// Heartbeat implements Client interface.
func (p *partitionedStreamClient) Heartbeat(
ctx context.Context, streamID streaming.StreamID, consumed hlc.Timestamp,
Expand Down
39 changes: 36 additions & 3 deletions pkg/ccl/streamingccl/streamclient/random_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ func GetRandomStreamClientSingletonForTesting() Client {
// an InterceptableStreamClient
type InterceptFn func(event streamingccl.Event, spec SubscriptionToken)

// DialInterceptFn is a function that will intercept Dial calls made to an
// InterceptableStreamClient
type DialInterceptFn func(streamURL *url.URL) error

// InterceptableStreamClient wraps a Client, and provides a method to register
// interceptor methods that are run on every streamed Event.
type InterceptableStreamClient interface {
Expand All @@ -99,6 +103,9 @@ type InterceptableStreamClient interface {
// RegisterInterception is how you can register your interceptor to be called
// from an InterceptableStreamClient.
RegisterInterception(fn InterceptFn)

// from an InterceptableStreamClient.
RegisterDialInterception(fn DialInterceptFn)
}

// randomStreamConfig specifies the variables that controls the rate and type of
Expand Down Expand Up @@ -191,16 +198,18 @@ func (c randomStreamConfig) URL(table int) string {
// The client can be configured to return more than one partition via the stream
// URL. Each partition covers a single table span.
type randomStreamClient struct {
config randomStreamConfig
config randomStreamConfig
streamURL *url.URL

// mu is used to provide a threadsafe interface to interceptors.
mu struct {
syncutil.Mutex

// interceptors can be registered to peek at every event generated by this
// client and which partition spec it was sent to.
interceptors []func(streamingccl.Event, SubscriptionToken)
tableID int
interceptors []func(streamingccl.Event, SubscriptionToken)
dialInterceptors []DialInterceptFn
tableID int
}
}

Expand All @@ -218,6 +227,7 @@ func newRandomStreamClient(streamURL *url.URL) (Client, error) {
return nil, err
}
c.config = streamConfig
c.streamURL = streamURL
return c, nil
}

Expand Down Expand Up @@ -248,6 +258,22 @@ func (m *randomStreamClient) tableDescForID(tableID int) (*tabledesc.Mutable, er
return tableDesc, err
}

// Dial implements Client interface.
func (m *randomStreamClient) Dial(ctx context.Context) error {
m.mu.Lock()
defer m.mu.Unlock()
for _, interceptor := range m.mu.dialInterceptors {
if interceptor == nil {
continue
}
if err := interceptor(m.streamURL); err != nil {
return err
}
}

return nil
}

// Plan implements the Client interface.
func (m *randomStreamClient) Plan(ctx context.Context, id streaming.StreamID) (Topology, error) {
topology := make(Topology, 0, m.config.numPartitions)
Expand Down Expand Up @@ -550,3 +576,10 @@ func (m *randomStreamClient) RegisterInterception(fn InterceptFn) {
defer m.mu.Unlock()
m.mu.interceptors = append(m.mu.interceptors, fn)
}

// RegisterDialInterception implements the InterceptableStreamClient interface.
func (m *randomStreamClient) RegisterDialInterception(fn DialInterceptFn) {
m.mu.Lock()
defer m.mu.Unlock()
m.mu.dialInterceptors = append(m.mu.dialInterceptors, fn)
}
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ go_test(
"//pkg/security/securitytest",
"//pkg/security/username",
"//pkg/server",
"//pkg/server/serverpb",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog/desctestutils",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,7 @@ type heartbeatSender struct {
func newHeartbeatSender(
flowCtx *execinfra.FlowCtx, spec execinfrapb.StreamIngestionFrontierSpec,
) (*heartbeatSender, error) {
streamClient, err := streamclient.NewStreamClient(
flowCtx.EvalCtx.Ctx(),
streamingccl.StreamAddress(spec.StreamAddress))
streamClient, err := streamclient.GetFirstActiveClient(flowCtx.EvalCtx.Ctx(), spec.StreamAddresses)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
Expand Down Expand Up @@ -192,23 +193,31 @@ func TestStreamIngestionFrontierProcessor(t *testing.T) {
},
} {
t.Run(tc.name, func(t *testing.T) {
if tc.name != "existing-job-checkpoint" {
return
}
spec.PartitionSpecs = map[string]execinfrapb.StreamIngestionPartitionSpec{
pa1: {
PartitionID: pa1,
SubscriptionToken: pa1,
Address: pa1,

topology := streamclient.Topology{
{
ID: pa1,
SubscriptionToken: []byte(pa1),
SrcAddr: streamingccl.PartitionAddress(pa1),
Spans: []roachpb.Span{pa1Span},
},
pa2: {
PartitionID: pa2,
SubscriptionToken: pa2,
Address: pa2,
{
ID: pa2,
SubscriptionToken: []byte(pa2),
SrcAddr: streamingccl.PartitionAddress(pa2),
Spans: []roachpb.Span{pa2Span},
},
}

spec.PartitionSpecs = map[string]execinfrapb.StreamIngestionPartitionSpec{}
for _, partition := range topology {
spec.PartitionSpecs[partition.ID] = execinfrapb.StreamIngestionPartitionSpec{
PartitionID: partition.ID,
SubscriptionToken: string(partition.SubscriptionToken),
Address: string(partition.SrcAddr),
Spans: partition.Spans,
}
}
spec.TenantRekey = execinfrapb.TenantRekey{
OldID: roachpb.MakeTenantID(tenantID),
NewID: roachpb.MakeTenantID(tenantID + 10),
Expand All @@ -231,7 +240,7 @@ func TestStreamIngestionFrontierProcessor(t *testing.T) {

// Create a frontier processor.
var frontierSpec execinfrapb.StreamIngestionFrontierSpec
frontierSpec.StreamAddress = spec.StreamAddress
frontierSpec.StreamAddresses = topology.StreamAddresses()
frontierSpec.TrackedSpans = []roachpb.Span{pa1Span, pa2Span}
frontierSpec.Checkpoint.ResolvedSpans = tc.jobCheckpoint

Expand Down
Loading

0 comments on commit 22d2ba2

Please sign in to comment.