diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index a24712b0130c..4782d6608e5b 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -32,6 +32,7 @@ go_library( "sink_external_connection.go", "sink_kafka.go", "sink_pubsub.go", + "sink_pubsub_v2.go", "sink_sql.go", "sink_webhook.go", "sink_webhook_v2.go", @@ -163,6 +164,8 @@ go_library( "@com_github_shopify_sarama//:sarama", "@com_github_xdg_go_scram//:scram", "@com_google_cloud_go_pubsub//:pubsub", + "@com_google_cloud_go_pubsub//apiv1", + "@com_google_cloud_go_pubsub//apiv1/pubsubpb", "@org_golang_google_api//impersonate", "@org_golang_google_api//option", "@org_golang_google_grpc//codes", @@ -313,6 +316,13 @@ go_test( "@com_github_shopify_sarama//:sarama", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", + "@com_google_cloud_go_pubsub//:pubsub", + "@com_google_cloud_go_pubsub//apiv1", + "@com_google_cloud_go_pubsub//apiv1/pubsubpb", + "@com_google_cloud_go_pubsub//pstest", + "@org_golang_google_api//option", + "@org_golang_google_grpc//:go_default_library", + "@org_golang_google_grpc//credentials/insecure", "@org_golang_x_text//collate", ], ) diff --git a/pkg/ccl/changefeedccl/batching_sink.go b/pkg/ccl/changefeedccl/batching_sink.go index d764d7abcab6..f1b99d509655 100644 --- a/pkg/ccl/changefeedccl/batching_sink.go +++ b/pkg/ccl/changefeedccl/batching_sink.go @@ -29,7 +29,8 @@ import ( // into batches as they arrive and once ready are flushed out. type SinkClient interface { MakeResolvedPayload(body []byte, topic string) (SinkPayload, error) - MakeBatchBuffer() BatchBuffer + // Batches can only hold messages for one unique topic + MakeBatchBuffer(topic string) BatchBuffer Flush(context.Context, SinkPayload) error Close() error } @@ -37,7 +38,7 @@ type SinkClient interface { // BatchBuffer is an interface to aggregate KVs into a payload that can be sent // to the sink. type BatchBuffer interface { - Append(key []byte, value []byte, topic string) + Append(key []byte, value []byte) ShouldFlush() bool // Once all data has been Append'ed, Close can be called to return a finalized @@ -90,9 +91,9 @@ type flushReq struct { } type rowEvent struct { - key []byte - val []byte - topic string + key []byte + val []byte + topicDescriptor TopicDescriptor alloc kvevent.Alloc mvcc hlc.Timestamp @@ -176,7 +177,7 @@ func (s *batchingSink) EmitRow( payload := newRowEvent() payload.key = key payload.val = value - payload.topic = "" // unimplemented for now + payload.topicDescriptor = topic payload.mvcc = mvcc payload.alloc = alloc @@ -277,7 +278,7 @@ func (sb *sinkBatch) Append(e *rowEvent) { sb.bufferTime = timeutil.Now() } - sb.buffer.Append(e.key, e.val, e.topic) + sb.buffer.Append(e.key, e.val) sb.keys.Add(hashToInt(sb.hasher, e.key)) sb.numMessages += 1 @@ -296,9 +297,9 @@ func (s *batchingSink) handleError(err error) { } } -func (s *batchingSink) newBatchBuffer() *sinkBatch { +func (s *batchingSink) newBatchBuffer(topic string) *sinkBatch { batch := newSinkBatch() - batch.buffer = s.client.MakeBatchBuffer() + batch.buffer = s.client.MakeBatchBuffer(topic) batch.hasher = s.hasher return batch } @@ -306,7 +307,12 @@ func (s *batchingSink) newBatchBuffer() *sinkBatch { // runBatchingWorker combines 1 or more row events into batches, sending the IO // requests out either once the batch is full or a flush request arrives. func (s *batchingSink) runBatchingWorker(ctx context.Context) { - batchBuffer := s.newBatchBuffer() + // topicBatches stores per-topic sinkBatches which are flushed individually + // when one reaches its size limit, but are all flushed together if the + // frequency timer triggers. Messages for different topics cannot be allowed + // to be batched together as the data may need to end up at a specific + // endpoint for that topic. + topicBatches := make(map[string]*sinkBatch) // Once finalized, batches are sent to a parallelIO struct which handles // performing multiple Flushes in parallel while maintaining Keys() ordering. @@ -347,14 +353,14 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) { freeSinkBatchEvent(batch) } - tryFlushBatch := func() error { - if batchBuffer.isEmpty() { + tryFlushBatch := func(topic string) error { + batchBuffer, ok := topicBatches[topic] + if !ok || batchBuffer.isEmpty() { return nil } - toFlush := batchBuffer - batchBuffer = s.newBatchBuffer() + topicBatches[topic] = s.newBatchBuffer(topic) - if err := toFlush.FinalizePayload(); err != nil { + if err := batchBuffer.FinalizePayload(); err != nil { return err } @@ -364,7 +370,7 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) { select { case <-ctx.Done(): return ctx.Err() - case ioEmitter.requestCh <- toFlush: + case ioEmitter.requestCh <- batchBuffer: case result := <-ioEmitter.resultCh: handleResult(result) continue @@ -376,8 +382,22 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) { return nil } + flushAll := func() error { + for topic := range topicBatches { + if err := tryFlushBatch(topic); err != nil { + return err + } + } + return nil + } + + // flushTimer is used to ensure messages do not remain batched longer than a + // given timeout. Every minFlushFrequency seconds after the first event for + // any topic has arrived, batches for all topics are flushed out immediately + // and the timer once again waits for the first message to arrive. flushTimer := s.ts.NewTimer() defer flushTimer.Stop() + isTimerPending := false for { select { @@ -396,11 +416,29 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) { inflight += 1 - // If we're about to append to an empty batch, start the timer to - // guarantee the messages do not stay buffered longer than the - // configured frequency. - if batchBuffer.isEmpty() && s.minFlushFrequency > 0 { + var topic string + var err error + if s.topicNamer != nil { + topic, err = s.topicNamer.Name(r.topicDescriptor) + if err != nil { + s.handleError(err) + continue + } + } + + // If the timer isn't pending then this message is the first message to + // arrive either ever or since the timer last triggered a flush, + // therefore we're going from 0 messages batched to 1, and should + // restart the timer. + if !isTimerPending && s.minFlushFrequency > 0 { flushTimer.Reset(s.minFlushFrequency) + isTimerPending = true + } + + batchBuffer, ok := topicBatches[topic] + if !ok { + batchBuffer = s.newBatchBuffer(topic) + topicBatches[topic] = batchBuffer } batchBuffer.Append(r) @@ -414,7 +452,7 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) { if batchBuffer.buffer.ShouldFlush() { s.metrics.recordSizeBasedFlush() - if err := tryFlushBatch(); err != nil { + if err := tryFlushBatch(topic); err != nil { s.handleError(err) } } @@ -423,7 +461,7 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) { close(r.waiter) } else { sinkFlushWaiter = r.waiter - if err := tryFlushBatch(); err != nil { + if err := flushAll(); err != nil { s.handleError(err) } } @@ -434,7 +472,8 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) { handleResult(result) case <-flushTimer.Ch(): flushTimer.MarkRead() - if err := tryFlushBatch(); err != nil { + isTimerPending = false + if err := flushAll(); err != nil { s.handleError(err) } case <-ctx.Done(): diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index d594a09fae34..413e824286f8 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -165,6 +165,7 @@ const ( // OptKafkaSinkConfig is a JSON configuration for kafka sink (kafkaSinkConfig). OptKafkaSinkConfig = `kafka_sink_config` + OptPubsubSinkConfig = `pubsub_sink_config` OptWebhookSinkConfig = `webhook_sink_config` // OptSink allows users to alter the Sink URI of an existing changefeed. @@ -333,6 +334,7 @@ var ChangefeedOptionExpectValues = map[string]OptionPermittedValues{ OptProtectDataFromGCOnPause: flagOption, OptExpirePTSAfter: durationOption.thatCanBeZero(), OptKafkaSinkConfig: jsonOption, + OptPubsubSinkConfig: jsonOption, OptWebhookSinkConfig: jsonOption, OptWebhookAuthHeader: stringOption, OptWebhookClientTimeout: durationOption, @@ -369,7 +371,7 @@ var CloudStorageValidOptions = makeStringSet(OptCompression) var WebhookValidOptions = makeStringSet(OptWebhookAuthHeader, OptWebhookClientTimeout, OptWebhookSinkConfig) // PubsubValidOptions is options exclusive to pubsub sink -var PubsubValidOptions = makeStringSet() +var PubsubValidOptions = makeStringSet(OptPubsubSinkConfig) // ExternalConnectionValidOptions is options exclusive to the external // connection sink. @@ -888,6 +890,12 @@ func (s StatementOptions) GetKafkaConfigJSON() SinkSpecificJSONConfig { return s.getJSONValue(OptKafkaSinkConfig) } +// GetPubsubConfigJSON returns arbitrary json to be interpreted +// by the pubsub sink. +func (s StatementOptions) GetPubsubConfigJSON() SinkSpecificJSONConfig { + return s.getJSONValue(OptPubsubSinkConfig) +} + // GetResolvedTimestampInterval gets the best-effort interval at which resolved timestamps // should be emitted. Nil or 0 means emit as often as possible. False means do not emit at all. // Returns an error for negative or invalid duration value. diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index 8309d30f27d2..ae12a7046dc4 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -166,6 +166,16 @@ var NewWebhookSinkEnabled = settings.RegisterBoolSetting( util.ConstantWithMetamorphicTestBool("changefeed.new_webhook_sink_enabled", false), ) +// NewPubsubSinkEnabled determines whether or not the refactored Webhook sink +// or the deprecated sink should be used. +var NewPubsubSinkEnabled = settings.RegisterBoolSetting( + settings.TenantWritable, + "changefeed.new_pubsub_sink_enabled", + "if enabled, this setting enables a new implementation of the pubsub sink"+ + " that allows for a higher throughput", + util.ConstantWithMetamorphicTestBool("changefeed.new_pubsub_sink_enabled", false), +) + func getSink( ctx context.Context, serverCfg *execinfra.ServerConfig, @@ -239,8 +249,16 @@ func getSink( }) } case isPubsubSink(u): - // TODO: add metrics to pubsubsink - return MakePubsubSink(ctx, u, encodingOpts, AllTargets(feedCfg), opts.IsSet(changefeedbase.OptUnordered)) + var testingKnobs *TestingKnobs + if knobs, ok := serverCfg.TestingKnobs.Changefeed.(*TestingKnobs); ok { + testingKnobs = knobs + } + if NewPubsubSinkEnabled.Get(&serverCfg.Settings.SV) { + return makePubsubSink(ctx, u, encodingOpts, opts.GetPubsubConfigJSON(), AllTargets(feedCfg), opts.IsSet(changefeedbase.OptUnordered), + numSinkIOWorkers(serverCfg), newCPUPacerFactory(ctx, serverCfg), timeutil.DefaultTimeSource{}, metricsBuilder, testingKnobs) + } else { + return makeDeprecatedPubsubSink(ctx, u, encodingOpts, AllTargets(feedCfg), opts.IsSet(changefeedbase.OptUnordered), metricsBuilder, testingKnobs) + } case isCloudStorageSink(u): return validateOptionsAndMakeSink(changefeedbase.CloudStorageValidOptions, func() (Sink, error) { // Placeholder id for canary sink @@ -737,11 +755,11 @@ func defaultRetryConfig() retry.Options { } func getSinkConfigFromJson( - jsonStr changefeedbase.SinkSpecificJSONConfig, + jsonStr changefeedbase.SinkSpecificJSONConfig, baseConfig sinkJSONConfig, ) (batchCfg sinkBatchConfig, retryCfg retry.Options, err error) { retryCfg = defaultRetryConfig() - var cfg = sinkJSONConfig{} + var cfg = baseConfig cfg.Retry.Max = jsonMaxRetries(retryCfg.MaxRetries) cfg.Retry.Backoff = jsonDuration(retryCfg.InitialBackoff) if jsonStr != `` { @@ -854,6 +872,22 @@ func nilPacerFactory() *admission.Pacer { return nil } +func shouldFlushBatch(bytes int, messages int, config sinkBatchConfig) bool { + switch { + // all zero values is interpreted as flush every time + case config.Messages == 0 && config.Bytes == 0 && config.Frequency == 0: + return true + // messages threshold has been reached + case config.Messages > 0 && messages >= config.Messages: + return true + // bytes threshold has been reached + case config.Bytes > 0 && bytes >= config.Bytes: + return true + default: + return false + } +} + func sinkSupportsConcurrentEmits(sink EventSink) bool { _, ok := sink.(*batchingSink) return ok diff --git a/pkg/ccl/changefeedccl/sink_pubsub.go b/pkg/ccl/changefeedccl/sink_pubsub.go index 04ea1d841b21..d5fcbf55d115 100644 --- a/pkg/ccl/changefeedccl/sink_pubsub.go +++ b/pkg/ccl/changefeedccl/sink_pubsub.go @@ -11,42 +11,25 @@ package changefeedccl import ( "context" "encoding/json" - "fmt" "hash/crc32" "net/url" "cloud.google.com/go/pubsub" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" - "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" - "golang.org/x/oauth2/google" - "google.golang.org/api/impersonate" "google.golang.org/api/option" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) -const credentialsParam = "CREDENTIALS" - -// GcpScheme to be used in testfeed and sink.go -const GcpScheme = "gcpubsub" -const gcpScope = "https://www.googleapis.com/auth/pubsub" -const cloudPlatformScope = "https://www.googleapis.com/auth/cloud-platform" -const globalGCPEndpoint = "pubsub.googleapis.com:443" - // TODO: make numOfWorkers configurable const numOfWorkers = 128 -// isPubsubSInk returns true if url contains scheme with valid pubsub sink -func isPubsubSink(u *url.URL) bool { - return u.Scheme == GcpScheme -} - -type pubsubClient interface { +type deprecatedPubsubClient interface { init() error closeTopics() flushTopics() @@ -73,9 +56,10 @@ type pubsubMessage struct { alloc kvevent.Alloc message payload isFlush bool + mvcc hlc.Timestamp } -type gcpPubsubClient struct { +type deprecatedGcpPubsubClient struct { client *pubsub.Client ctx context.Context projectID string @@ -89,9 +73,11 @@ type gcpPubsubClient struct { publishError error topics map[string]*pubsub.Topic } + + knobs *TestingKnobs } -type pubsubSink struct { +type deprecatedPubsubSink struct { numWorkers int workerCtx context.Context @@ -106,88 +92,27 @@ type pubsubSink struct { // errChan is written to indicate an error while sending message. errChan chan error - client pubsubClient + client deprecatedPubsubClient topicNamer *TopicNamer format changefeedbase.FormatType -} -func (p *pubsubSink) getConcreteType() sinkType { - return sinkTypePubsub + metrics metricsRecorder } -// TODO: unify gcp credentials code with gcp cloud storage credentials code -// getGCPCredentials returns gcp credentials parsed out from url -func getGCPCredentials(ctx context.Context, u sinkURL) (option.ClientOption, error) { - const authParam = "AUTH" - const assumeRoleParam = "ASSUME_ROLE" - const authSpecified = "specified" - const authImplicit = "implicit" - const authDefault = "default" - - var credsJSON []byte - var creds *google.Credentials - var err error - authOption := u.consumeParam(authParam) - assumeRoleOption := u.consumeParam(assumeRoleParam) - authScope := gcpScope - if assumeRoleOption != "" { - // If we need to assume a role, the credentials need to have the scope to - // impersonate instead. - authScope = cloudPlatformScope - } - - // implemented according to https://github.com/cockroachdb/cockroach/pull/64737 - switch authOption { - case authImplicit: - creds, err = google.FindDefaultCredentials(ctx, authScope) - if err != nil { - return nil, err - } - case authSpecified: - fallthrough - case authDefault: - fallthrough - default: - if u.q.Get(credentialsParam) == "" { - return nil, errors.New("missing credentials parameter") - } - err := u.decodeBase64(credentialsParam, &credsJSON) - if err != nil { - return nil, errors.Wrap(err, "decoding credentials json") - } - creds, err = google.CredentialsFromJSON(ctx, credsJSON, authScope) - if err != nil { - return nil, errors.Wrap(err, "creating credentials from json") - } - } - - credsOpt := option.WithCredentials(creds) - if assumeRoleOption != "" { - assumeRole, delegateRoles := cloud.ParseRoleString(assumeRoleOption) - cfg := impersonate.CredentialsConfig{ - TargetPrincipal: assumeRole, - Scopes: []string{gcpScope}, - Delegates: delegateRoles, - } - - ts, err := impersonate.CredentialsTokenSource(ctx, cfg, credsOpt) - if err != nil { - return nil, errors.Wrap(err, "creating impersonate credentials") - } - return option.WithTokenSource(ts), nil - } - - return credsOpt, nil +func (p *deprecatedPubsubSink) getConcreteType() sinkType { + return sinkTypePubsub } -// MakePubsubSink returns the corresponding pubsub sink based on the url given -func MakePubsubSink( +// makeDeprecatedPubsubSink returns the corresponding pubsub sink based on the url given +func makeDeprecatedPubsubSink( ctx context.Context, u *url.URL, encodingOpts changefeedbase.EncodingOptions, targets changefeedbase.Targets, unordered bool, + mb metricsRecorderBuilder, + knobs *TestingKnobs, ) (Sink, error) { pubsubURL := sinkURL{URL: u, q: u.Query()} @@ -212,11 +137,12 @@ func MakePubsubSink( } ctx, cancel := context.WithCancel(ctx) - p := &pubsubSink{ + p := &deprecatedPubsubSink{ workerCtx: ctx, numWorkers: numOfWorkers, exitWorkers: cancel, format: formatType, + metrics: mb(requiresResourceAccounting), } // creates custom pubsub object based on scheme @@ -244,12 +170,13 @@ func MakePubsubSink( if err != nil { return nil, err } - g := &gcpPubsubClient{ + g := &deprecatedGcpPubsubClient{ topicNamer: tn, ctx: ctx, projectID: projectID, endpoint: endpoint, url: pubsubURL, + knobs: knobs, } p.client = g p.topicNamer = tn @@ -259,13 +186,13 @@ func MakePubsubSink( } } -func (p *pubsubSink) Dial() error { +func (p *deprecatedPubsubSink) Dial() error { p.setupWorkers() return p.client.init() } // EmitRow pushes a message to event channel where it is consumed by workers -func (p *pubsubSink) EmitRow( +func (p *deprecatedPubsubSink) EmitRow( ctx context.Context, topic TopicDescriptor, key, value []byte, @@ -273,12 +200,14 @@ func (p *pubsubSink) EmitRow( mvcc hlc.Timestamp, alloc kvevent.Alloc, ) error { + p.metrics.recordMessageSize(int64(len(key) + len(value))) + topicName, err := p.topicNamer.Name(topic) if err != nil { return err } m := pubsubMessage{ - alloc: alloc, isFlush: false, message: payload{ + alloc: alloc, isFlush: false, mvcc: mvcc, message: payload{ Key: key, Value: value, Topic: topicName, @@ -302,7 +231,7 @@ func (p *pubsubSink) EmitRow( } // EmitResolvedTimestamp sends resolved timestamp message -func (p *pubsubSink) EmitResolvedTimestamp( +func (p *deprecatedPubsubSink) EmitResolvedTimestamp( ctx context.Context, encoder Encoder, resolved hlc.Timestamp, ) error { payload, err := encoder.EncodeResolvedTimestamp(ctx, "", resolved) @@ -314,14 +243,16 @@ func (p *pubsubSink) EmitResolvedTimestamp( } // Flush blocks until all messages in the event channels are sent -func (p *pubsubSink) Flush(ctx context.Context) error { +func (p *deprecatedPubsubSink) Flush(ctx context.Context) error { if err := p.flush(ctx); err != nil { return errors.CombineErrors(p.client.connectivityErrorLocked(), err) } return nil } -func (p *pubsubSink) flush(ctx context.Context) error { +func (p *deprecatedPubsubSink) flush(ctx context.Context) error { + defer p.metrics.recordFlushRequestCallback()() + select { case <-ctx.Done(): return ctx.Err() @@ -346,7 +277,7 @@ func (p *pubsubSink) flush(ctx context.Context) error { } // Close closes all the channels and shutdowns the topic -func (p *pubsubSink) Close() error { +func (p *deprecatedPubsubSink) Close() error { p.client.closeTopics() p.exitWorkers() _ = p.workerGroup.Wait() @@ -366,11 +297,11 @@ func (p *pubsubSink) Close() error { // Topics gives the names of all topics that have been initialized // and will receive resolved timestamps. -func (p *pubsubSink) Topics() []string { +func (p *deprecatedPubsubSink) Topics() []string { return p.topicNamer.DisplayNamesSlice() } -func (p *gcpPubsubClient) cacheTopicLocked(name string, topic *pubsub.Topic) { +func (p *deprecatedGcpPubsubClient) cacheTopicLocked(name string, topic *pubsub.Topic) { //TODO (zinger): Investigate whether changing topics to a sync.Map would be //faster here, I think it would. p.mu.Lock() @@ -378,14 +309,14 @@ func (p *gcpPubsubClient) cacheTopicLocked(name string, topic *pubsub.Topic) { p.mu.topics[name] = topic } -func (p *gcpPubsubClient) getTopicLocked(name string) (t *pubsub.Topic, ok bool) { +func (p *deprecatedGcpPubsubClient) getTopicLocked(name string) (t *pubsub.Topic, ok bool) { p.mu.Lock() defer p.mu.Unlock() t, ok = p.mu.topics[name] return t, ok } -func (p *gcpPubsubClient) getTopicClient(name string) (*pubsub.Topic, error) { +func (p *deprecatedGcpPubsubClient) getTopicClient(name string) (*pubsub.Topic, error) { if topic, ok := p.getTopicLocked(name); ok { return topic, nil } @@ -398,7 +329,7 @@ func (p *gcpPubsubClient) getTopicClient(name string) (*pubsub.Topic, error) { } // setupWorkers sets up the channels used by the sink and starts a goroutine for every worker -func (p *pubsubSink) setupWorkers() { +func (p *deprecatedPubsubSink) setupWorkers() { // setup events channels to send to workers and the worker group p.eventsChans = make([]chan pubsubMessage, p.numWorkers) p.workerGroup = ctxgroup.WithContext(p.workerCtx) @@ -421,7 +352,7 @@ func (p *pubsubSink) setupWorkers() { } // workerLoop consumes any message sent to the channel corresponding to the worker index -func (p *pubsubSink) workerLoop(workerIndex int) { +func (p *deprecatedPubsubSink) workerLoop(workerIndex int) { for { select { case <-p.workerCtx.Done(): @@ -448,17 +379,19 @@ func (p *pubsubSink) workerLoop(workerIndex int) { content = msg.message.Value } + updateMetrics := p.metrics.recordOneMessage() err = p.client.sendMessage(content, msg.message.Topic, string(msg.message.Key)) if err != nil { p.exitWorkersWithError(err) } msg.alloc.Release(p.workerCtx) + updateMetrics(msg.mvcc, len(msg.message.Key)+len(msg.message.Value)+len(msg.message.Topic), sinkDoesNotCompress) } } } // exitWorkersWithError sends an error to the sink error channel -func (p *pubsubSink) exitWorkersWithError(err error) { +func (p *deprecatedPubsubSink) exitWorkersWithError(err error) { // errChan has buffer size 1, first error will be saved to the buffer and // subsequent errors will be ignored select { @@ -469,7 +402,7 @@ func (p *pubsubSink) exitWorkersWithError(err error) { } // sinkError checks if there is an error in the error channel -func (p *pubsubSink) sinkErrorLocked() error { +func (p *deprecatedPubsubSink) sinkErrorLocked() error { select { case err := <-p.errChan: return err @@ -479,12 +412,12 @@ func (p *pubsubSink) sinkErrorLocked() error { } // workerIndex hashes key to return a worker index -func (p *pubsubSink) workerIndex(key []byte) uint32 { +func (p *deprecatedPubsubSink) workerIndex(key []byte) uint32 { return crc32.ChecksumIEEE(key) % uint32(p.numWorkers) } // flushWorkers sends a flush message to every worker channel and then signals sink that flush is done -func (p *pubsubSink) flushWorkers() error { +func (p *deprecatedPubsubSink) flushWorkers() error { for i := 0; i < p.numWorkers; i++ { //flush message will be blocked until all the messages in the channel are processed select { @@ -507,7 +440,14 @@ func (p *pubsubSink) flushWorkers() error { } // init opens a gcp client -func (p *gcpPubsubClient) init() error { +func (p *deprecatedGcpPubsubClient) init() error { + p.mu.topics = make(map[string]*pubsub.Topic) + + if p.client != nil { + // Already set by unit test + return nil + } + var client *pubsub.Client var err error @@ -530,14 +470,13 @@ func (p *gcpPubsubClient) init() error { return errors.Wrap(err, "opening client") } p.client = client - p.mu.topics = make(map[string]*pubsub.Topic) return nil } // openTopic optimistically creates the topic -func (p *gcpPubsubClient) openTopic(topicName string) (*pubsub.Topic, error) { +func (p *deprecatedGcpPubsubClient) openTopic(topicName string) (*pubsub.Topic, error) { t, err := p.client.CreateTopic(p.ctx, topicName) if err != nil { switch status.Code(err) { @@ -557,15 +496,16 @@ func (p *gcpPubsubClient) openTopic(topicName string) (*pubsub.Topic, error) { return t, nil } -func (p *gcpPubsubClient) closeTopics() { +func (p *deprecatedGcpPubsubClient) closeTopics() { _ = p.forEachTopic(func(_ string, t *pubsub.Topic) error { t.Stop() return nil }) + _ = p.client.Close() } // sendMessage sends a message to the topic -func (p *gcpPubsubClient) sendMessage(m []byte, topic string, key string) error { +func (p *deprecatedGcpPubsubClient) sendMessage(m []byte, topic string, key string) error { t, err := p.getTopicClient(topic) if err != nil { return err @@ -586,7 +526,7 @@ func (p *gcpPubsubClient) sendMessage(m []byte, topic string, key string) error return nil } -func (p *gcpPubsubClient) sendMessageToAllTopics(m []byte) error { +func (p *deprecatedGcpPubsubClient) sendMessageToAllTopics(m []byte) error { return p.forEachTopic(func(_ string, t *pubsub.Topic) error { res := t.Publish(p.ctx, &pubsub.Message{ Data: m, @@ -599,14 +539,16 @@ func (p *gcpPubsubClient) sendMessageToAllTopics(m []byte) error { }) } -func (p *gcpPubsubClient) flushTopics() { +func (p *deprecatedGcpPubsubClient) flushTopics() { _ = p.forEachTopic(func(_ string, t *pubsub.Topic) error { t.Flush() return nil }) } -func (p *gcpPubsubClient) forEachTopic(f func(name string, topicClient *pubsub.Topic) error) error { +func (p *deprecatedGcpPubsubClient) forEachTopic( + f func(name string, topicClient *pubsub.Topic) error, +) error { return p.topicNamer.Each(func(n string) error { t, err := p.getTopicClient(n) if err != nil { @@ -616,20 +558,20 @@ func (p *gcpPubsubClient) forEachTopic(f func(name string, topicClient *pubsub.T }) } -func (p *gcpPubsubClient) recordAutocreateErrorLocked(e error) { +func (p *deprecatedGcpPubsubClient) recordAutocreateErrorLocked(e error) { p.mu.Lock() defer p.mu.Unlock() p.mu.autocreateError = e } -func (p *gcpPubsubClient) recordPublishErrorLocked(e error) { +func (p *deprecatedGcpPubsubClient) recordPublishErrorLocked(e error) { p.mu.Lock() defer p.mu.Unlock() p.mu.publishError = e } // connectivityError returns any errors encountered while writing to gcp. -func (p *gcpPubsubClient) connectivityErrorLocked() error { +func (p *deprecatedGcpPubsubClient) connectivityErrorLocked() error { p.mu.Lock() defer p.mu.Unlock() if status.Code(p.mu.publishError) == codes.NotFound && p.mu.autocreateError != nil { @@ -640,10 +582,3 @@ func (p *gcpPubsubClient) connectivityErrorLocked() error { } return errors.CombineErrors(p.mu.publishError, p.mu.autocreateError) } - -// Generate the cloud endpoint that's specific to a region (e.g. us-east1). -// Ideally this would be discoverable via API but doesn't seem to be. -// A hardcoded approach looks to be correct right now. -func gcpEndpointForRegion(region string) string { - return fmt.Sprintf("%s-pubsub.googleapis.com:443", region) -} diff --git a/pkg/ccl/changefeedccl/sink_pubsub_v2.go b/pkg/ccl/changefeedccl/sink_pubsub_v2.go new file mode 100644 index 000000000000..60dd85dba8bd --- /dev/null +++ b/pkg/ccl/changefeedccl/sink_pubsub_v2.go @@ -0,0 +1,414 @@ +// Copyright 2021 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "bytes" + "context" + "fmt" + "net/url" + "time" + + pubsub "cloud.google.com/go/pubsub/apiv1" + pb "cloud.google.com/go/pubsub/apiv1/pubsubpb" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" + "github.com/cockroachdb/cockroach/pkg/cloud" + "github.com/cockroachdb/cockroach/pkg/util/admission" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" + "golang.org/x/oauth2/google" + "google.golang.org/api/impersonate" + "google.golang.org/api/option" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +const credentialsParam = "CREDENTIALS" + +// GcpScheme to be used in testfeed and sink.go +const GcpScheme = "gcpubsub" +const gcpScope = "https://www.googleapis.com/auth/pubsub" +const cloudPlatformScope = "https://www.googleapis.com/auth/cloud-platform" +const globalGCPEndpoint = "pubsub.googleapis.com:443" + +// isPubsubSink returns true if url contains scheme with valid pubsub sink +func isPubsubSink(u *url.URL) bool { + return u.Scheme == GcpScheme +} + +type pubsubSinkClient struct { + ctx context.Context + client *pubsub.PublisherClient + projectID string + format changefeedbase.FormatType + batchCfg sinkBatchConfig + mu struct { + syncutil.RWMutex + + // Topic creation errors may not be an actual issue unless the Publish call + // itself fails, so creation errors are stored for future use in the event of + // a publish error. + topicCreateErr error + + // Caches whether or not we've already created a topic + topicCache map[string]struct{} + } +} + +var _ SinkClient = (*pubsubSinkClient)(nil) +var _ SinkPayload = (*pb.PubsubMessage)(nil) + +func makePubsubSinkClient( + ctx context.Context, + u *url.URL, + encodingOpts changefeedbase.EncodingOptions, + targets changefeedbase.Targets, + batchCfg sinkBatchConfig, + unordered bool, + knobs *TestingKnobs, +) (SinkClient, error) { + if u.Scheme != GcpScheme { + return nil, errors.Errorf("unknown scheme: %s", u.Scheme) + } + + var formatType changefeedbase.FormatType + switch encodingOpts.Format { + case changefeedbase.OptFormatJSON: + formatType = changefeedbase.OptFormatJSON + case changefeedbase.OptFormatCSV: + formatType = changefeedbase.OptFormatCSV + default: + return nil, errors.Errorf(`this sink is incompatible with %s=%s`, + changefeedbase.OptFormat, encodingOpts.Format) + } + + switch encodingOpts.Envelope { + case changefeedbase.OptEnvelopeWrapped, changefeedbase.OptEnvelopeBare: + default: + return nil, errors.Errorf(`this sink is incompatible with %s=%s`, + changefeedbase.OptEnvelope, encodingOpts.Envelope) + } + + pubsubURL := sinkURL{URL: u, q: u.Query()} + + projectID := pubsubURL.Host + if projectID == "" { + return nil, errors.New("missing project name") + } + + publisherClient, err := makePublisherClient(ctx, pubsubURL, unordered, knobs) + if err != nil { + return nil, err + } + + sinkClient := &pubsubSinkClient{ + ctx: ctx, + format: formatType, + client: publisherClient, + batchCfg: batchCfg, + projectID: projectID, + } + sinkClient.mu.topicCache = make(map[string]struct{}) + + return sinkClient, nil +} + +// MakeResolvedPayload implements the SinkClient interface +func (sc *pubsubSinkClient) MakeResolvedPayload(body []byte, topic string) (SinkPayload, error) { + return &pb.PublishRequest{ + Topic: sc.gcPubsubTopic(topic), + Messages: []*pb.PubsubMessage{{ + Data: body, + }}, + }, nil +} + +func (sc *pubsubSinkClient) maybeCreateTopic(topic string) error { + sc.mu.RLock() + _, ok := sc.mu.topicCache[topic] + if ok { + sc.mu.RUnlock() + return nil + } + sc.mu.RUnlock() + sc.mu.Lock() + defer sc.mu.Unlock() + _, ok = sc.mu.topicCache[topic] + if ok { + return nil + } + + _, err := sc.client.CreateTopic(sc.ctx, &pb.Topic{Name: topic}) + if err != nil && status.Code(err) != codes.AlreadyExists { + if status.Code(err) == codes.PermissionDenied { + // PermissionDenied may not be fatal if the topic already exists, + // but record it in case it turns out not to. + sc.mu.topicCreateErr = err + } else { + sc.mu.topicCreateErr = err + return err + } + } + sc.mu.topicCache[topic] = struct{}{} + return nil +} + +// Flush implements the SinkClient interface +func (sc *pubsubSinkClient) Flush(ctx context.Context, payload SinkPayload) error { + publishRequest := payload.(*pb.PublishRequest) + + err := sc.maybeCreateTopic(publishRequest.Topic) + if err != nil { + return err + } + + _, err = sc.client.Publish(sc.ctx, publishRequest) + + if status.Code(err) == codes.NotFound { + sc.mu.RLock() + defer sc.mu.RUnlock() + if sc.mu.topicCreateErr != nil { + return errors.WithHint( + errors.Wrap(sc.mu.topicCreateErr, + "Topic not found, and attempt to autocreate it failed."), + "Create topics in advance or grant this service account the pubsub.editor role on your project.") + } + } + return err +} + +type pubsubBuffer struct { + sc *pubsubSinkClient + topic string + messages []*pb.PubsubMessage + numBytes int +} + +var _ BatchBuffer = (*pubsubBuffer)(nil) + +// Append implements the BatchBuffer interface +func (psb *pubsubBuffer) Append(key []byte, value []byte) { + var content []byte + switch psb.sc.format { + case changefeedbase.OptFormatJSON: + var buffer bytes.Buffer + keyPrefix := "{\"Key\":" + valInfix := ",\"Value\":" + topicSuffix := fmt.Sprintf(",\"Topic\":\"%s\"}", psb.topic) + // Grow all at once to avoid reallocations + buffer.Grow(len(keyPrefix) + len(key) + len(valInfix) + len(value) + len(topicSuffix)) + buffer.WriteString(keyPrefix) + buffer.Write(key) + buffer.WriteString(valInfix) + buffer.Write(value) + buffer.WriteString(topicSuffix) + content = buffer.Bytes() + case changefeedbase.OptFormatCSV: + content = value + } + + psb.messages = append(psb.messages, &pb.PubsubMessage{Data: content}) + psb.numBytes += len(content) +} + +// Close implements the BatchBuffer interface +func (psb *pubsubBuffer) Close() (SinkPayload, error) { + return &pb.PublishRequest{ + Topic: psb.sc.gcPubsubTopic(psb.topic), + Messages: psb.messages, + }, nil +} + +// ShouldFlush implements the BatchBuffer interface +func (psb *pubsubBuffer) ShouldFlush() bool { + return shouldFlushBatch(psb.numBytes, len(psb.messages), psb.sc.batchCfg) +} + +// MakeBatchBuffer implements the SinkClient interface +func (sc *pubsubSinkClient) MakeBatchBuffer(topic string) BatchBuffer { + return &pubsubBuffer{ + sc: sc, + topic: topic, + messages: make([]*pb.PubsubMessage, 0, sc.batchCfg.Messages), + } +} + +// Close implements the SinkClient interface +func (pe *pubsubSinkClient) Close() error { + return pe.client.Close() +} + +func makePublisherClient( + ctx context.Context, url sinkURL, unordered bool, knobs *TestingKnobs, +) (*pubsub.PublisherClient, error) { + const regionParam = "region" + region := url.consumeParam(regionParam) + var endpoint string + if region == "" { + if unordered { + endpoint = globalGCPEndpoint + } else { + return nil, errors.WithHintf(errors.New("region query parameter not found"), + "Use of gcpubsub without specifying a region requires the WITH %s option.", + changefeedbase.OptUnordered) + } + } else { + endpoint = gcpEndpointForRegion(region) + } + + options := []option.ClientOption{ + option.WithEndpoint(endpoint), + } + + if knobs == nil || !knobs.PubsubClientSkipCredentialsCheck { + creds, err := getGCPCredentials(ctx, url) + if err != nil { + return nil, err + } + options = append(options, creds) + } + + client, err := pubsub.NewPublisherClient( + ctx, + options..., + ) + if err != nil { + return nil, errors.Wrap(err, "opening client") + } + + return client, nil +} + +// Generate the cloud endpoint that's specific to a region (e.g. us-east1). +// Ideally this would be discoverable via API but doesn't seem to be. +// A hardcoded approach looks to be correct right now. +func gcpEndpointForRegion(region string) string { + return fmt.Sprintf("%s-pubsub.googleapis.com:443", region) +} + +// TODO: unify gcp credentials code with gcp cloud storage credentials code +// getGCPCredentials returns gcp credentials parsed out from url +func getGCPCredentials(ctx context.Context, u sinkURL) (option.ClientOption, error) { + const authParam = "AUTH" + const assumeRoleParam = "ASSUME_ROLE" + const authSpecified = "specified" + const authImplicit = "implicit" + const authDefault = "default" + + var credsJSON []byte + var creds *google.Credentials + var err error + authOption := u.consumeParam(authParam) + assumeRoleOption := u.consumeParam(assumeRoleParam) + authScope := gcpScope + if assumeRoleOption != "" { + // If we need to assume a role, the credentials need to have the scope to + // impersonate instead. + authScope = cloudPlatformScope + } + + // implemented according to https://github.com/cockroachdb/cockroach/pull/64737 + switch authOption { + case authImplicit: + creds, err = google.FindDefaultCredentials(ctx, authScope) + if err != nil { + return nil, err + } + case authSpecified: + fallthrough + case authDefault: + fallthrough + default: + if u.q.Get(credentialsParam) == "" { + return nil, errors.New("missing credentials parameter") + } + err := u.decodeBase64(credentialsParam, &credsJSON) + if err != nil { + return nil, errors.Wrap(err, "decoding credentials json") + } + creds, err = google.CredentialsFromJSON(ctx, credsJSON, authScope) + if err != nil { + return nil, errors.Wrap(err, "creating credentials from json") + } + } + + credsOpt := option.WithCredentials(creds) + if assumeRoleOption != "" { + assumeRole, delegateRoles := cloud.ParseRoleString(assumeRoleOption) + cfg := impersonate.CredentialsConfig{ + TargetPrincipal: assumeRole, + Scopes: []string{gcpScope}, + Delegates: delegateRoles, + } + + ts, err := impersonate.CredentialsTokenSource(ctx, cfg, credsOpt) + if err != nil { + return nil, errors.Wrap(err, "creating impersonate credentials") + } + return option.WithTokenSource(ts), nil + } + + return credsOpt, nil +} + +func (sc *pubsubSinkClient) gcPubsubTopic(topic string) string { + return fmt.Sprintf("projects/%s/topics/%s", sc.projectID, topic) +} + +func makePubsubSink( + ctx context.Context, + u *url.URL, + encodingOpts changefeedbase.EncodingOptions, + jsonConfig changefeedbase.SinkSpecificJSONConfig, + targets changefeedbase.Targets, + unordered bool, + parallelism int, + pacerFactory func() *admission.Pacer, + source timeutil.TimeSource, + mb metricsRecorderBuilder, + knobs *TestingKnobs, +) (Sink, error) { + batchCfg, retryOpts, err := getSinkConfigFromJson(jsonConfig, sinkJSONConfig{ + // GCPubsub library defaults + Flush: sinkBatchConfig{ + Frequency: jsonDuration(10 * time.Millisecond), + Messages: 100, + Bytes: 1e6, + }, + }) + if err != nil { + return nil, err + } + + sinkClient, err := makePubsubSinkClient(ctx, u, encodingOpts, targets, batchCfg, unordered, knobs) + if err != nil { + return nil, err + } + + pubsubURL := sinkURL{URL: u, q: u.Query()} + pubsubTopicName := pubsubURL.consumeParam(changefeedbase.SinkParamTopicName) + topicNamer, err := MakeTopicNamer(targets, WithSingleName(pubsubTopicName)) + if err != nil { + return nil, err + } + + return makeBatchingSink( + ctx, + sinkTypePubsub, + sinkClient, + time.Duration(batchCfg.Frequency), + retryOpts, + parallelism, + topicNamer, + pacerFactory, + source, + mb(requiresResourceAccounting), + ), nil +} diff --git a/pkg/ccl/changefeedccl/sink_test.go b/pkg/ccl/changefeedccl/sink_test.go index 25ae999c29ea..28e52c664102 100644 --- a/pkg/ccl/changefeedccl/sink_test.go +++ b/pkg/ccl/changefeedccl/sink_test.go @@ -771,7 +771,7 @@ func TestSinkConfigParsing(t *testing.T) { t.Run("handles valid types", func(t *testing.T) { opts := changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": 1234, "Frequency": "3s", "Bytes":30}, "Retry": {"Max": 5, "Backoff": "3h"}}`) - batch, retry, err := getSinkConfigFromJson(opts) + batch, retry, err := getSinkConfigFromJson(opts, sinkJSONConfig{}) require.NoError(t, err) require.Equal(t, batch, sinkBatchConfig{ Bytes: 30, @@ -783,7 +783,7 @@ func TestSinkConfigParsing(t *testing.T) { // Max accepts both values and specifically the string "inf" opts = changefeedbase.SinkSpecificJSONConfig(`{"Retry": {"Max": "inf"}}`) - _, retry, err = getSinkConfigFromJson(opts) + _, retry, err = getSinkConfigFromJson(opts, sinkJSONConfig{}) require.NoError(t, err) require.Equal(t, retry.MaxRetries, 0) }) @@ -792,14 +792,14 @@ func TestSinkConfigParsing(t *testing.T) { defaultRetry := defaultRetryConfig() opts := changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": 1234, "Frequency": "3s"}}`) - _, retry, err := getSinkConfigFromJson(opts) + _, retry, err := getSinkConfigFromJson(opts, sinkJSONConfig{}) require.NoError(t, err) require.Equal(t, retry.MaxRetries, defaultRetry.MaxRetries) require.Equal(t, retry.InitialBackoff, defaultRetry.InitialBackoff) opts = changefeedbase.SinkSpecificJSONConfig(`{"Retry": {"Max": "inf"}}`) - _, retry, err = getSinkConfigFromJson(opts) + _, retry, err = getSinkConfigFromJson(opts, sinkJSONConfig{}) require.NoError(t, err) require.Equal(t, retry.MaxRetries, 0) require.Equal(t, retry.InitialBackoff, defaultRetry.InitialBackoff) @@ -807,43 +807,43 @@ func TestSinkConfigParsing(t *testing.T) { t.Run("errors on invalid configuration", func(t *testing.T) { opts := changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": -1234, "Frequency": "3s"}}`) - _, _, err := getSinkConfigFromJson(opts) + _, _, err := getSinkConfigFromJson(opts, sinkJSONConfig{}) require.ErrorContains(t, err, "invalid sink config, all values must be non-negative") opts = changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": 1234, "Frequency": "-3s"}}`) - _, _, err = getSinkConfigFromJson(opts) + _, _, err = getSinkConfigFromJson(opts, sinkJSONConfig{}) require.ErrorContains(t, err, "invalid sink config, all values must be non-negative") opts = changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": 10}}`) - _, _, err = getSinkConfigFromJson(opts) + _, _, err = getSinkConfigFromJson(opts, sinkJSONConfig{}) require.ErrorContains(t, err, "invalid sink config, Flush.Frequency is not set, messages may never be sent") }) t.Run("errors on invalid type", func(t *testing.T) { opts := changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": "1234", "Frequency": "3s", "Bytes":30}}`) - _, _, err := getSinkConfigFromJson(opts) + _, _, err := getSinkConfigFromJson(opts, sinkJSONConfig{}) require.ErrorContains(t, err, "Flush.Messages of type int") opts = changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": 1234, "Frequency": "3s", "Bytes":"30"}}`) - _, _, err = getSinkConfigFromJson(opts) + _, _, err = getSinkConfigFromJson(opts, sinkJSONConfig{}) require.ErrorContains(t, err, "Flush.Bytes of type int") opts = changefeedbase.SinkSpecificJSONConfig(`{"Retry": {"Max": true}}`) - _, _, err = getSinkConfigFromJson(opts) + _, _, err = getSinkConfigFromJson(opts, sinkJSONConfig{}) require.ErrorContains(t, err, "Retry.Max must be either a positive int or 'inf'") opts = changefeedbase.SinkSpecificJSONConfig(`{"Retry": {"Max": "not-inf"}}`) - _, _, err = getSinkConfigFromJson(opts) + _, _, err = getSinkConfigFromJson(opts, sinkJSONConfig{}) require.ErrorContains(t, err, "Retry.Max must be either a positive int or 'inf'") }) t.Run("errors on malformed json", func(t *testing.T) { opts := changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": 1234 "Frequency": "3s"}}`) - _, _, err := getSinkConfigFromJson(opts) + _, _, err := getSinkConfigFromJson(opts, sinkJSONConfig{}) require.ErrorContains(t, err, "invalid character '\"'") opts = changefeedbase.SinkSpecificJSONConfig(`string`) - _, _, err = getSinkConfigFromJson(opts) + _, _, err = getSinkConfigFromJson(opts, sinkJSONConfig{}) require.ErrorContains(t, err, "invalid character 's' looking for beginning of value") }) } diff --git a/pkg/ccl/changefeedccl/sink_webhook_v2.go b/pkg/ccl/changefeedccl/sink_webhook_v2.go index 6403c67c7a48..91575c17be24 100644 --- a/pkg/ccl/changefeedccl/sink_webhook_v2.go +++ b/pkg/ccl/changefeedccl/sink_webhook_v2.go @@ -54,6 +54,7 @@ type webhookSinkClient struct { } var _ SinkClient = (*webhookSinkClient)(nil) +var _ SinkPayload = (*http.Request)(nil) func makeWebhookSinkClient( ctx context.Context, @@ -264,14 +265,14 @@ type webhookCSVBuffer struct { var _ BatchBuffer = (*webhookCSVBuffer)(nil) // Append implements the BatchBuffer interface -func (cb *webhookCSVBuffer) Append(key []byte, value []byte, topic string) { +func (cb *webhookCSVBuffer) Append(key []byte, value []byte) { cb.bytes = append(cb.bytes, value...) cb.messageCount += 1 } // ShouldFlush implements the BatchBuffer interface func (cb *webhookCSVBuffer) ShouldFlush() bool { - return cb.sc.shouldFlush(len(cb.bytes), cb.messageCount) + return shouldFlushBatch(len(cb.bytes), cb.messageCount, cb.sc.batchCfg) } // Close implements the BatchBuffer interface @@ -288,14 +289,14 @@ type webhookJSONBuffer struct { var _ BatchBuffer = (*webhookJSONBuffer)(nil) // Append implements the BatchBuffer interface -func (jb *webhookJSONBuffer) Append(key []byte, value []byte, topic string) { +func (jb *webhookJSONBuffer) Append(key []byte, value []byte) { jb.messages = append(jb.messages, value) jb.numBytes += len(value) } // ShouldFlush implements the BatchBuffer interface func (jb *webhookJSONBuffer) ShouldFlush() bool { - return jb.sc.shouldFlush(jb.numBytes, len(jb.messages)) + return shouldFlushBatch(jb.numBytes, len(jb.messages), jb.sc.batchCfg) } // Close implements the BatchBuffer interface @@ -319,7 +320,7 @@ func (jb *webhookJSONBuffer) Close() (SinkPayload, error) { } // MakeBatchBuffer implements the SinkClient interface -func (sc *webhookSinkClient) MakeBatchBuffer() BatchBuffer { +func (sc *webhookSinkClient) MakeBatchBuffer(topic string) BatchBuffer { if sc.format == changefeedbase.OptFormatCSV { return &webhookCSVBuffer{sc: sc} } else { @@ -330,22 +331,6 @@ func (sc *webhookSinkClient) MakeBatchBuffer() BatchBuffer { } } -func (sc *webhookSinkClient) shouldFlush(bytes int, messages int) bool { - switch { - // all zero values is interpreted as flush every time - case sc.batchCfg.Messages == 0 && sc.batchCfg.Bytes == 0 && sc.batchCfg.Frequency == 0: - return true - // messages threshold has been reached - case sc.batchCfg.Messages > 0 && messages >= sc.batchCfg.Messages: - return true - // bytes threshold has been reached - case sc.batchCfg.Bytes > 0 && bytes >= sc.batchCfg.Bytes: - return true - default: - return false - } -} - func makeWebhookSink( ctx context.Context, u sinkURL, @@ -356,7 +341,7 @@ func makeWebhookSink( source timeutil.TimeSource, mb metricsRecorderBuilder, ) (Sink, error) { - batchCfg, retryOpts, err := getSinkConfigFromJson(opts.JSONConfig) + batchCfg, retryOpts, err := getSinkConfigFromJson(opts.JSONConfig, sinkJSONConfig{}) if err != nil { return nil, err } diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index 3581857ad2c7..6cea1d70e07f 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -29,6 +29,10 @@ import ( "sync/atomic" "time" + "cloud.google.com/go/pubsub" + pubsubv1 "cloud.google.com/go/pubsub/apiv1" + pb "cloud.google.com/go/pubsub/apiv1/pubsubpb" + "cloud.google.com/go/pubsub/pstest" "github.com/Shopify/sarama" "github.com/cockroachdb/cockroach-go/v2/crdb" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" @@ -60,6 +64,9 @@ import ( "github.com/cockroachdb/errors" goparquet "github.com/fraugster/parquet-go" "github.com/jackc/pgx/v4" + "google.golang.org/api/option" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) type sinklessFeedFactory struct { @@ -300,7 +307,6 @@ func (e *externalConnectionFeedFactory) Feed( createStmt.SinkURI = tree.NewStrVal(`external://` + randomExternalConnectionName) return e.TestFeedFactory.Feed(createStmt.String(), args...) - } func setURI( @@ -2194,82 +2200,77 @@ func (f *webhookFeed) Close() error { type mockPubsubMessage struct { data string - // TODO: implement error injection - // err error -} -type mockPubsubMessageBuffer struct { - mu syncutil.Mutex - rows []mockPubsubMessage } -func (p *mockPubsubMessageBuffer) pop() *mockPubsubMessage { - p.mu.Lock() - defer p.mu.Unlock() - if len(p.rows) == 0 { - return nil +type fakePubsubServer struct { + srv *pstest.Server + mu struct { + syncutil.Mutex + buffer []mockPubsubMessage + notify chan struct{} } - var head mockPubsubMessage - head, p.rows = p.rows[0], p.rows[1:] - return &head -} - -func (p *mockPubsubMessageBuffer) push(m mockPubsubMessage) { - p.mu.Lock() - defer p.mu.Unlock() - p.rows = append(p.rows, m) -} - -type fakePubsubClient struct { - buffer *mockPubsubMessageBuffer -} - -var _ pubsubClient = (*fakePubsubClient)(nil) - -func (p *fakePubsubClient) init() error { - return nil } -func (p *fakePubsubClient) closeTopics() { +func makeFakePubsubServer() *fakePubsubServer { + mockServer := fakePubsubServer{} + mockServer.mu.buffer = make([]mockPubsubMessage, 0) + mockServer.srv = pstest.NewServer(pstest.ServerReactorOption{ + FuncName: "Publish", + Reactor: &mockServer, + }) + return &mockServer } -// sendMessage sends a message to the topic -func (p *fakePubsubClient) sendMessage(m []byte, _ string, _ string) error { - message := mockPubsubMessage{data: string(m)} - p.buffer.push(message) - return nil -} +var _ pstest.Reactor = (*fakePubsubServer)(nil) -func (p *fakePubsubClient) sendMessageToAllTopics(m []byte) error { - message := mockPubsubMessage{data: string(m)} - p.buffer.push(message) - return nil -} +func (ps *fakePubsubServer) React(req interface{}) (handled bool, ret interface{}, err error) { + publishReq, ok := req.(*pb.PublishRequest) + if ok { + ps.mu.Lock() + defer ps.mu.Unlock() + for _, msg := range publishReq.Messages { + ps.mu.buffer = append(ps.mu.buffer, mockPubsubMessage{data: string(msg.Data)}) + } + if ps.mu.notify != nil { + notifyCh := ps.mu.notify + ps.mu.notify = nil + close(notifyCh) + } + } -func (p *fakePubsubClient) flushTopics() { + return false, nil, nil } -type fakePubsubSink struct { - Sink - client *fakePubsubClient - sync *sinkSynchronizer +func (s *fakePubsubServer) NotifyMessage() chan struct{} { + c := make(chan struct{}) + s.mu.Lock() + defer s.mu.Unlock() + if len(s.mu.buffer) > 0 { + close(c) + } else { + s.mu.notify = c + } + return c } -var _ Sink = (*fakePubsubSink)(nil) - -func (p *fakePubsubSink) Dial() error { - s := p.Sink.(*pubsubSink) - s.client = p.client - s.setupWorkers() - return nil +func (ps *fakePubsubServer) Dial() (*grpc.ClientConn, error) { + return grpc.Dial(ps.srv.Addr, grpc.WithTransportCredentials(insecure.NewCredentials())) } -func (p *fakePubsubSink) Flush(ctx context.Context) error { - defer p.sync.addFlush() - return p.Sink.Flush(ctx) +func (ps *fakePubsubServer) Pop() *mockPubsubMessage { + ps.mu.Lock() + defer ps.mu.Unlock() + if len(ps.mu.buffer) == 0 { + return nil + } + var head mockPubsubMessage + head, ps.mu.buffer = ps.mu.buffer[0], ps.mu.buffer[1:] + return &head } -func (p *fakePubsubClient) connectivityErrorLocked() error { - return nil +func (ps *fakePubsubServer) Close() error { + ps.srv.Wait() + return ps.srv.Close() } type pubsubFeedFactory struct { @@ -2281,6 +2282,17 @@ var _ cdctest.TestFeedFactory = (*pubsubFeedFactory)(nil) // makePubsubFeedFactory returns a TestFeedFactory implementation using the `pubsub` uri. func makePubsubFeedFactory(srvOrCluster interface{}, db *gosql.DB) cdctest.TestFeedFactory { s, injectables := getInjectables(srvOrCluster) + + switch t := srvOrCluster.(type) { + case serverutils.TestTenantInterface: + t.DistSQLServer().(*distsql.ServerImpl).TestingKnobs.Changefeed.(*TestingKnobs).PubsubClientSkipCredentialsCheck = true + case serverutils.TestClusterInterface: + servers := make([]feedInjectable, t.NumServers()) + for i := range servers { + t.Server(i).DistSQLServer().(*distsql.ServerImpl).TestingKnobs.Changefeed.(*TestingKnobs).PubsubClientSkipCredentialsCheck = true + } + } + return &pubsubFeedFactory{ enterpriseFeedFactory: enterpriseFeedFactory{ s: s, @@ -2302,34 +2314,49 @@ func (p *pubsubFeedFactory) Feed(create string, args ...interface{}) (cdctest.Te if err != nil { return nil, err } - ss := &sinkSynchronizer{} - client := &fakePubsubClient{ - buffer: &mockPubsubMessageBuffer{ - rows: make([]mockPubsubMessage, 0), - }, - } + mockServer := makeFakePubsubServer() + ss := &sinkSynchronizer{} + var mu syncutil.Mutex wrapSink := func(s Sink) Sink { - return &fakePubsubSink{ - Sink: s, - client: client, - sync: ss, + mu.Lock() // Called concurrently due to getEventSink and getResolvedTimestampSink + defer mu.Unlock() + if batchingSink, ok := s.(*batchingSink); ok { + if sinkClient, ok := batchingSink.client.(*pubsubSinkClient); ok { + _ = sinkClient.client.Close() + + conn, _ := mockServer.Dial() + mockClient, _ := pubsubv1.NewPublisherClient(context.Background(), option.WithGRPCConn(conn)) + sinkClient.client = mockClient + } + return ¬ifyFlushSink{Sink: s, sync: ss} + } else if deprecatedSink, ok := s.(*deprecatedPubsubSink); ok { + if client, ok := deprecatedSink.client.(*deprecatedGcpPubsubClient); ok { + if client.client != nil { + client.closeTopics() + } + conn, _ := mockServer.Dial() + mockClient, _ := pubsub.NewClient( + client.ctx, + client.projectID, + option.WithGRPCConn(conn), + ) + client.client = mockClient + } } + return s } c := &pubsubFeed{ jobFeed: newJobFeed(p.jobsTableConn(), wrapSink), seenTrackerMap: make(map[string]struct{}), ss: ss, - client: client, + mockServer: mockServer, } if err := p.startFeedJob(c.jobFeed, createStmt.String(), args...); err != nil { - return nil, err - } - - if err != nil { + _ = mockServer.Close() return nil, err } return c, nil @@ -2343,8 +2370,8 @@ func (p *pubsubFeedFactory) Server() serverutils.TestTenantInterface { type pubsubFeed struct { *jobFeed seenTrackerMap - ss *sinkSynchronizer - client *fakePubsubClient + ss *sinkSynchronizer + mockServer *fakePubsubServer } var _ cdctest.TestFeed = (*pubsubFeed)(nil) @@ -2379,7 +2406,7 @@ func extractJSONMessagePubsub(wrapped []byte) (value []byte, key []byte, topic s // Next implements TestFeed func (p *pubsubFeed) Next() (*cdctest.TestFeedMessage, error) { for { - msg := p.client.buffer.pop() + msg := p.mockServer.Pop() if msg != nil { details, err := p.Details() if err != nil { @@ -2415,13 +2442,15 @@ func (p *pubsubFeed) Next() (*cdctest.TestFeedMessage, error) { } if err := contextutil.RunWithTimeout( - context.Background(), timeoutOp("pubsub.Next", p.jobID), timeout(), + context.Background(), "pubsub.Next", timeout(), func(ctx context.Context) error { select { case <-ctx.Done(): return ctx.Err() case <-p.ss.eventReady(): return nil + case <-p.mockServer.NotifyMessage(): + return nil case <-p.shutdown: return p.terminalJobError() } @@ -2438,6 +2467,7 @@ func (p *pubsubFeed) Close() error { if err != nil { return err } + _ = p.mockServer.Close() return nil } diff --git a/pkg/ccl/changefeedccl/testing_knobs.go b/pkg/ccl/changefeedccl/testing_knobs.go index 5735189ed9fb..56da319a08eb 100644 --- a/pkg/ccl/changefeedccl/testing_knobs.go +++ b/pkg/ccl/changefeedccl/testing_knobs.go @@ -32,6 +32,8 @@ type TestingKnobs struct { // It allows the tests to muck with the Sink, and even return altogether different // implementation. WrapSink func(s Sink, jobID jobspb.JobID) Sink + // PubsubClientSkipCredentialsCheck, if set, skips the gcp credentials checking + PubsubClientSkipCredentialsCheck bool // FilterSpanWithMutation is a filter returning true if the resolved span event should // be skipped. This method takes a pointer in case resolved spans need to be mutated. FilterSpanWithMutation func(resolved *jobspb.ResolvedSpan) bool diff --git a/pkg/cmd/roachtest/tests/cdc.go b/pkg/cmd/roachtest/tests/cdc.go index 83c25cf37c0c..b12504b4a7f4 100644 --- a/pkg/cmd/roachtest/tests/cdc.go +++ b/pkg/cmd/roachtest/tests/cdc.go @@ -1147,7 +1147,11 @@ func registerCDC(r registry.Registry) { ct := newCDCTester(ctx, t, c) defer ct.Close() - ct.runTPCCWorkload(tpccArgs{warehouses: 1, duration: "30m"}) + // The deprecated pubsub sink is unable to handle the throughput required for 100 warehouses + if _, err := ct.DB().Exec("SET CLUSTER SETTING changefeed.new_pubsub_sink_enabled = true;"); err != nil { + ct.t.Fatal(err) + } + ct.runTPCCWorkload(tpccArgs{warehouses: 100, duration: "30m"}) feed := ct.newChangefeed(feedArgs{ sinkType: pubsubSink,