diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt
index 40ee3c481a31..85e4b1deab14 100644
--- a/docs/generated/settings/settings-for-tenants.txt
+++ b/docs/generated/settings/settings-for-tenants.txt
@@ -17,6 +17,7 @@ changefeed.event_consumer_workers integer 0 the number of workers to use when pr
changefeed.fast_gzip.enabled boolean true use fast gzip implementation
changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds
changefeed.schema_feed.read_with_priority_after duration 1m0s retry with high priority if we were not able to read descriptors for too long; 0 disables
+changefeed.sink_io_workers integer 0 the number of workers used by changefeeds when sending requests to the sink: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value.
cloudstorage.azure.concurrent_upload_buffers integer 1 controls the number of concurrent buffers that will be used by the Azure client when uploading chunks.Each buffer can buffer up to cloudstorage.write_chunk.size of memory during an upload
cloudstorage.http.custom_ca string custom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage
cloudstorage.timeout duration 10m0s the timeout for import/export storage operations
diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index 48d06500dc6f..fe6faa7d536e 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -23,6 +23,7 @@
changefeed.fast_gzip.enabled
| boolean | true | use fast gzip implementation |
changefeed.node_throttle_config
| string |
| specifies node level throttling configuration for all changefeeeds |
changefeed.schema_feed.read_with_priority_after
| duration | 1m0s | retry with high priority if we were not able to read descriptors for too long; 0 disables |
+changefeed.sink_io_workers
| integer | 0 | the number of workers used by changefeeds when sending requests to the sink: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. |
cloudstorage.azure.concurrent_upload_buffers
| integer | 1 | controls the number of concurrent buffers that will be used by the Azure client when uploading chunks.Each buffer can buffer up to cloudstorage.write_chunk.size of memory during an upload |
cloudstorage.http.custom_ca
| string |
| custom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage |
cloudstorage.timeout
| duration | 10m0s | the timeout for import/export storage operations |
diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel
index baacdff30789..a19790992095 100644
--- a/pkg/ccl/changefeedccl/BUILD.bazel
+++ b/pkg/ccl/changefeedccl/BUILD.bazel
@@ -7,6 +7,7 @@ go_library(
"alter_changefeed_stmt.go",
"authorization.go",
"avro.go",
+ "batching_sink.go",
"changefeed.go",
"changefeed_dist.go",
"changefeed_processors.go",
@@ -20,6 +21,7 @@ go_library(
"event_processing.go",
"metrics.go",
"name.go",
+ "parallel_io.go",
"parquet_sink_cloudstorage.go",
"retry.go",
"scheduled_changefeed.go",
@@ -32,6 +34,7 @@ go_library(
"sink_pubsub.go",
"sink_sql.go",
"sink_webhook.go",
+ "sink_webhook_v2.go",
"telemetry.go",
"testing_knobs.go",
"tls.go",
@@ -125,6 +128,7 @@ go_library(
"//pkg/util/hlc",
"//pkg/util/httputil",
"//pkg/util/humanizeutil",
+ "//pkg/util/intsets",
"//pkg/util/json",
"//pkg/util/log",
"//pkg/util/log/eventpb",
diff --git a/pkg/ccl/changefeedccl/batching_sink.go b/pkg/ccl/changefeedccl/batching_sink.go
new file mode 100644
index 000000000000..15db202b0691
--- /dev/null
+++ b/pkg/ccl/changefeedccl/batching_sink.go
@@ -0,0 +1,456 @@
+// Copyright 2023 The Cockroach Authors.
+//
+// Licensed as a CockroachDB Enterprise file under the Cockroach Community
+// License (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt
+
+package changefeedccl
+
+import (
+ "context"
+ "hash"
+ "sync"
+ "time"
+
+ "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
+ "github.com/cockroachdb/cockroach/pkg/util/admission"
+ "github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
+ "github.com/cockroachdb/cockroach/pkg/util/hlc"
+ "github.com/cockroachdb/cockroach/pkg/util/intsets"
+ "github.com/cockroachdb/cockroach/pkg/util/log"
+ "github.com/cockroachdb/cockroach/pkg/util/retry"
+ "github.com/cockroachdb/cockroach/pkg/util/timeutil"
+)
+
+// SinkClient is an interface to an external sink, where messages are written
+// into batches as they arrive and once ready are flushed out.
+type SinkClient interface {
+ MakeResolvedPayload(body []byte, topic string) (SinkPayload, error)
+ MakeBatchBuffer() BatchBuffer
+ Flush(context.Context, SinkPayload) error
+ Close() error
+}
+
+// BatchBuffer is an interface to aggregate KVs into a payload that can be sent
+// to the sink.
+type BatchBuffer interface {
+ Append(key []byte, value []byte, topic string)
+ ShouldFlush() bool
+ Close() (SinkPayload, error)
+}
+
+// SinkPayload is an interface representing a sink-specific representation of a
+// batch of messages that is ready to be emitted by its EmitRow method.
+type SinkPayload interface{}
+
+// batchingSink wraps a SinkClient to provide a Sink implementation that calls
+// the SinkClient methods to form batches and flushes those batches across
+// multiple parallel IO workers.
+type batchingSink struct {
+ client SinkClient
+ topicNamer *TopicNamer
+ concreteType sinkType
+
+ ioWorkers int
+ minFlushFrequency time.Duration
+ retryOpts retry.Options
+
+ ts timeutil.TimeSource
+ metrics metricsRecorder
+ knobs batchingSinkKnobs
+
+ // eventCh is the channel used to send requests from the Sink caller routines
+ // to the batching routine. Messages can either be a flushReq or a kvEvent.
+ eventCh chan interface{}
+
+ termErr error
+ wg ctxgroup.Group
+ hasher hash.Hash32
+ pacer *admission.Pacer
+ doneCh chan struct{}
+}
+
+type batchingSinkKnobs struct {
+ OnAppend func(*rowEvent)
+}
+
+type flushReq struct {
+ waiter chan struct{}
+}
+
+type rowEvent struct {
+ key []byte
+ val []byte
+ topic string
+
+ alloc kvevent.Alloc
+ mvcc hlc.Timestamp
+}
+
+// Flush implements the Sink interface, returning the first error that has
+// occured in the past EmitRow calls.
+func (s *batchingSink) Flush(ctx context.Context) error {
+ flushWaiter := make(chan struct{})
+ select {
+ case <-ctx.Done():
+ case <-s.doneCh:
+ case s.eventCh <- flushReq{waiter: flushWaiter}:
+ }
+
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-s.doneCh:
+ return nil
+ case <-flushWaiter:
+ return s.termErr
+ }
+}
+
+var _ Sink = (*batchingSink)(nil)
+
+// Event structs and batch structs which are transferred across routines (and
+// therefore escape to the heap) can both be incredibly frequent (every event
+// may be its own batch) and temporary, so to avoid GC thrashing they are both
+// claimed and freed from object pools.
+var eventPool sync.Pool = sync.Pool{
+ New: func() interface{} {
+ return new(rowEvent)
+ },
+}
+
+func newRowEvent() *rowEvent {
+ return eventPool.Get().(*rowEvent)
+}
+func freeRowEvent(e *rowEvent) {
+ *e = rowEvent{}
+ eventPool.Put(e)
+}
+
+var batchPool sync.Pool = sync.Pool{
+ New: func() interface{} {
+ return new(sinkBatch)
+ },
+}
+
+func newSinkBatch() *sinkBatch {
+ return batchPool.Get().(*sinkBatch)
+}
+func freeSinkBatchEvent(b *sinkBatch) {
+ *b = sinkBatch{}
+ batchPool.Put(b)
+}
+
+// EmitRow implements the Sink interface.
+func (s *batchingSink) EmitRow(
+ ctx context.Context,
+ topic TopicDescriptor,
+ key, value []byte,
+ updated, mvcc hlc.Timestamp,
+ alloc kvevent.Alloc,
+) error {
+ s.metrics.recordMessageSize(int64(len(key) + len(value)))
+
+ payload := newRowEvent()
+ payload.key = key
+ payload.val = value
+ payload.topic = "" // unimplemented for now
+ payload.mvcc = mvcc
+ payload.alloc = alloc
+
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case s.eventCh <- payload:
+ case <-s.doneCh:
+ }
+
+ return nil
+}
+
+// EmitResolvedTimestamp implements the Sink interface.
+func (s *batchingSink) EmitResolvedTimestamp(
+ ctx context.Context, encoder Encoder, resolved hlc.Timestamp,
+) error {
+ data, err := encoder.EncodeResolvedTimestamp(ctx, "", resolved)
+ if err != nil {
+ return err
+ }
+ payload, err := s.client.MakeResolvedPayload(data, "")
+ if err != nil {
+ return err
+ }
+
+ if err = s.Flush(ctx); err != nil {
+ return err
+ }
+ return retry.WithMaxAttempts(ctx, s.retryOpts, s.retryOpts.MaxRetries+1, func() error {
+ defer s.metrics.recordFlushRequestCallback()()
+ return s.client.Flush(ctx, payload)
+ })
+}
+
+// Close implements the Sink interface.
+func (s *batchingSink) Close() error {
+ close(s.doneCh)
+ _ = s.wg.Wait()
+ if s.pacer != nil {
+ s.pacer.Close()
+ }
+ return s.client.Close()
+}
+
+// Dial implements the Sink interface.
+func (s *batchingSink) Dial() error {
+ return nil
+}
+
+// getConcreteType implements the Sink interface.
+func (s *batchingSink) getConcreteType() sinkType {
+ return s.concreteType
+}
+
+// sinkBatch stores an in-progress/complete batch of messages, along with
+// metadata related to the batch.
+type sinkBatch struct {
+ buffer BatchBuffer
+ payload SinkPayload // payload is nil until FinalizePayload has been called
+
+ numMessages int
+ numKVBytes int // the total amount of uncompressed kv data in the batch
+ keys intsets.Fast // An intset of the keys within the batch to provide to parallelIO
+ bufferTime time.Time // The earliest time a message was inserted into the batch
+ mvcc hlc.Timestamp
+
+ alloc kvevent.Alloc
+ hasher hash.Hash32
+}
+
+// FinalizePayload closes the writer to produce a payload that is ready to be
+// Flushed by the SinkClient.
+func (sb *sinkBatch) FinalizePayload() error {
+ payload, err := sb.buffer.Close()
+ if err != nil {
+ return err
+ }
+ sb.payload = payload
+ return nil
+}
+
+// Keys implements the IORequest interface.
+func (sb *sinkBatch) Keys() intsets.Fast {
+ return sb.keys
+}
+
+func (sb *sinkBatch) isEmpty() bool {
+ return sb.numMessages == 0
+}
+
+func hashToInt(h hash.Hash32, buf []byte) int {
+ h.Reset()
+ h.Write(buf)
+ return int(h.Sum32())
+}
+
+// Append adds the contents of a kvEvent to the batch, merging its alloc pool.
+func (sb *sinkBatch) Append(e *rowEvent) {
+ if sb.isEmpty() {
+ sb.bufferTime = timeutil.Now()
+ }
+
+ sb.buffer.Append(e.key, e.val, e.topic)
+
+ sb.keys.Add(hashToInt(sb.hasher, e.key))
+ sb.numMessages += 1
+ sb.numKVBytes += len(e.key) + len(e.val)
+
+ if sb.mvcc.IsEmpty() || e.mvcc.Less(sb.mvcc) {
+ sb.mvcc = e.mvcc
+ }
+
+ sb.alloc.Merge(&e.alloc)
+}
+
+func (s *batchingSink) handleError(err error) {
+ if s.termErr == nil {
+ s.termErr = err
+ }
+}
+
+func (s *batchingSink) newBatchBuffer() *sinkBatch {
+ batch := newSinkBatch()
+ batch.buffer = s.client.MakeBatchBuffer()
+ batch.hasher = s.hasher
+ return batch
+}
+
+// runBatchingWorker combines 1 or more KV events into batches, sending the IO
+// requests out either once the batch is full or a flush request arrives.
+func (s *batchingSink) runBatchingWorker(ctx context.Context) {
+ batchBuffer := s.newBatchBuffer()
+
+ // Once finalized, batches are sent to a parallelIO struct which handles
+ // performing multiple Flushes in parallel while maintaining Keys() ordering.
+ ioHandler := func(ctx context.Context, req IORequest) error {
+ defer s.metrics.recordFlushRequestCallback()()
+ batch, _ := req.(*sinkBatch)
+ return s.client.Flush(ctx, batch.payload)
+ }
+ ioEmitter := newParallelIO(ctx, s.retryOpts, s.ioWorkers, ioHandler, s.metrics)
+ defer ioEmitter.Close()
+
+ var handleResult func(result *ioResult)
+
+ tryFlushBatch := func() {
+ if batchBuffer.isEmpty() {
+ return
+ }
+ toFlush := batchBuffer
+ batchBuffer = s.newBatchBuffer()
+
+ if err := toFlush.FinalizePayload(); err != nil {
+ s.handleError(err)
+ return
+ }
+
+ // Emitting needs to also handle any incoming results to avoid a deadlock
+ // with trying to emit while the emitter is blocked on returning a result.
+ for {
+ select {
+ case <-ctx.Done():
+ case ioEmitter.requestCh <- toFlush:
+ case result := <-ioEmitter.resultCh:
+ handleResult(result)
+ continue
+ case <-s.doneCh:
+ }
+ break
+ }
+ }
+
+ // Flushing requires tracking the number of inflight messages and confirming
+ // completion to the requester once the counter reaches 0.
+ inflight := 0
+ var sinkFlushWaiter chan struct{}
+
+ handleResult = func(result *ioResult) {
+ batch, _ := result.request.(*sinkBatch)
+
+ if result.err != nil {
+ s.handleError(result.err)
+ } else {
+ s.metrics.recordEmittedBatch(
+ batch.bufferTime, batch.numMessages, batch.mvcc, batch.numKVBytes, sinkDoesNotCompress,
+ )
+ }
+
+ inflight -= batch.numMessages
+
+ if (result.err != nil || inflight == 0) && sinkFlushWaiter != nil {
+ close(sinkFlushWaiter)
+ sinkFlushWaiter = nil
+ }
+
+ freeIOResult(result)
+ batch.alloc.Release(ctx)
+ freeSinkBatchEvent(batch)
+ }
+
+ flushTimer := s.ts.NewTimer()
+ defer flushTimer.Stop()
+
+ for {
+ if s.pacer != nil {
+ if err := s.pacer.Pace(ctx); err != nil {
+ if pacerLogEvery.ShouldLog() {
+ log.Errorf(ctx, "automatic sink batcher pacing: %v", err)
+ }
+ }
+ }
+
+ select {
+ case req := <-s.eventCh:
+ if flush, isFlush := req.(flushReq); isFlush {
+ if inflight == 0 {
+ close(flush.waiter)
+ } else {
+ sinkFlushWaiter = flush.waiter
+ tryFlushBatch()
+ }
+ } else if event, isKV := req.(*rowEvent); isKV {
+ inflight += 1
+
+ // If we're about to append to an empty batch, start the timer to
+ // guarantee the messages do not stay buffered longer than the
+ // configured frequency.
+ if batchBuffer.isEmpty() && s.minFlushFrequency > 0 {
+ flushTimer.Reset(s.minFlushFrequency)
+ }
+
+ batchBuffer.Append(event)
+ if s.knobs.OnAppend != nil {
+ s.knobs.OnAppend(event)
+ }
+
+ // The event struct can be freed as the contents are expected to be
+ // managed by the batch instead.
+ freeRowEvent(event)
+
+ if batchBuffer.buffer.ShouldFlush() {
+ s.metrics.recordSizeBasedFlush()
+ tryFlushBatch()
+ }
+ }
+ case result := <-ioEmitter.resultCh:
+ handleResult(result)
+ case <-flushTimer.Ch():
+ flushTimer.MarkRead()
+ tryFlushBatch()
+ case <-ctx.Done():
+ return
+ case <-s.doneCh:
+ return
+ }
+
+ if s.termErr != nil {
+ return
+ }
+ }
+}
+
+func makeBatchingSink(
+ ctx context.Context,
+ concreteType sinkType,
+ client SinkClient,
+ minFlushFrequency time.Duration,
+ retryOpts retry.Options,
+ numWorkers int,
+ topicNamer *TopicNamer,
+ pacer *admission.Pacer,
+ timeSource timeutil.TimeSource,
+ metrics metricsRecorder,
+) Sink {
+ sink := &batchingSink{
+ client: client,
+ topicNamer: topicNamer,
+ concreteType: concreteType,
+ minFlushFrequency: minFlushFrequency,
+ ioWorkers: numWorkers,
+ retryOpts: retryOpts,
+ ts: timeSource,
+ metrics: metrics,
+ eventCh: make(chan interface{}, flushQueueDepth),
+ wg: ctxgroup.WithContext(ctx),
+ hasher: makeHasher(),
+ pacer: pacer,
+ doneCh: make(chan struct{}),
+ }
+
+ sink.wg.GoCtx(func(ctx context.Context) error {
+ sink.runBatchingWorker(ctx)
+ return nil
+ })
+ return sink
+}
diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go
index fde56353bfa3..76032968ded5 100644
--- a/pkg/ccl/changefeedccl/changefeed_processors.go
+++ b/pkg/ccl/changefeedccl/changefeed_processors.go
@@ -564,12 +564,19 @@ func (ca *changeAggregator) tick() error {
return ca.noteResolvedSpan(resolved)
}
case kvevent.TypeFlush:
- return ca.sink.Flush(ca.Ctx())
+ return ca.flushBufferedEvents()
}
return nil
}
+func (ca *changeAggregator) flushBufferedEvents() error {
+ if err := ca.eventConsumer.Flush(ca.Ctx()); err != nil {
+ return err
+ }
+ return ca.sink.Flush(ca.Ctx())
+}
+
// noteResolvedSpan periodically flushes Frontier progress from the current
// changeAggregator node to the changeFrontier node to allow the changeFrontier
// to persist the overall changefeed's progress
@@ -621,7 +628,7 @@ func (ca *changeAggregator) flushFrontier() error {
// otherwise, we could lose buffered messages and violate the
// at-least-once guarantee. This is also true for checkpointing the
// resolved spans in the job progress.
- if err := ca.sink.Flush(ca.Ctx()); err != nil {
+ if err := ca.flushBufferedEvents(); err != nil {
return err
}
diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go
index abb8292b3867..44aaa52b657c 100644
--- a/pkg/ccl/changefeedccl/changefeedbase/settings.go
+++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go
@@ -262,9 +262,9 @@ var EventConsumerPacerRequestSize = settings.RegisterDurationSetting(
settings.PositiveDuration,
)
-// EventConsumerElasticCPUControlEnabled determines whether changefeed event
+// PerEventElasticCPUControlEnabled determines whether changefeed event
// processing integrates with elastic CPU control.
-var EventConsumerElasticCPUControlEnabled = settings.RegisterBoolSetting(
+var PerEventElasticCPUControlEnabled = settings.RegisterBoolSetting(
settings.TenantWritable,
"changefeed.cpu.per_event_elastic_control.enabled",
"determines whether changefeed event processing integrates with elastic CPU control",
@@ -281,3 +281,29 @@ var RequireExternalConnectionSink = settings.RegisterBoolSetting(
" see https://www.cockroachlabs.com/docs/stable/create-external-connection.html",
false,
)
+
+var NewWebhookSinkEnabled = settings.RegisterBoolSetting(
+ settings.TenantWritable,
+ "changefeed.new_webhook_sink_enabled",
+ "if enabled, this setting enables a new implementation of the webhook sink"+
+ " that allows for a much higher throughput",
+ util.ConstantWithMetamorphicTestBool("changefeed.new_webhook_sink_enabled", false),
+)
+
+var SinkIOWorkers = settings.RegisterIntSetting(
+ settings.TenantWritable,
+ "changefeed.sink_io_workers",
+ "the number of workers used by changefeeds when sending requests to the sink: <0 disables, "+
+ "0 assigns a reasonable default, >0 assigns the setting value.",
+ 0,
+).WithPublic()
+
+var SinkPacerRequestSize = settings.RegisterDurationSetting(
+ settings.TenantWritable,
+ "changefeed.cpu.sink_encoding_allocation",
+ "an event consumer worker will perform a blocking request for CPU time "+
+ "before consuming events. after fully utilizing this CPU time, it will "+
+ "request more",
+ 50*time.Millisecond,
+ settings.PositiveDuration,
+)
diff --git a/pkg/ccl/changefeedccl/event_processing.go b/pkg/ccl/changefeedccl/event_processing.go
index 2861515fe03e..7c44cf17d815 100644
--- a/pkg/ccl/changefeedccl/event_processing.go
+++ b/pkg/ccl/changefeedccl/event_processing.go
@@ -104,7 +104,7 @@ func newEventConsumer(
}
pacerRequestUnit := changefeedbase.EventConsumerPacerRequestSize.Get(&cfg.Settings.SV)
- enablePacer := changefeedbase.EventConsumerElasticCPUControlEnabled.Get(&cfg.Settings.SV)
+ enablePacer := changefeedbase.PerEventElasticCPUControlEnabled.Get(&cfg.Settings.SV)
makeConsumer := func(s EventSink, frontier frontier) (eventConsumer, error) {
var err error
@@ -182,7 +182,11 @@ func newEventConsumer(
workerChSize: changefeedbase.EventConsumerWorkerQueueSize.Get(&cfg.Settings.SV),
spanFrontier: spanFrontier,
}
- ss := &safeSink{wrapped: sink, beforeFlush: c.Flush}
+ ss := sink
+ // Only webhook supports concurrent EmitRow calls
+ if sink.getConcreteType() != sinkTypeWebhook {
+ ss = &safeSink{wrapped: sink}
+ }
c.makeConsumer = func() (eventConsumer, error) {
return makeConsumer(ss, c)
}
diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go
index 1752392f5136..3046f73ed99a 100644
--- a/pkg/ccl/changefeedccl/helpers_test.go
+++ b/pkg/ccl/changefeedccl/helpers_test.go
@@ -854,6 +854,9 @@ func randomSinkTypeWithOptions(options feedTestOptions) string {
for _, weight := range sinkWeights {
weightTotal += weight
}
+ if weightTotal == 0 {
+ return ""
+ }
p := rand.Float32() * float32(weightTotal)
var sum float32 = 0
for sink, weight := range sinkWeights {
@@ -862,7 +865,7 @@ func randomSinkTypeWithOptions(options feedTestOptions) string {
return sink
}
}
- return "kafka" // unreachable
+ return ""
}
// addCloudStorageOptions adds the options necessary to enable a server to run a
@@ -982,6 +985,9 @@ func cdcTestNamedWithSystem(
cleanupCloudStorage := addCloudStorageOptions(t, &options)
sinkType := randomSinkTypeWithOptions(options)
+ if sinkType == "" {
+ return
+ }
testLabel := sinkType
if name != "" {
testLabel = fmt.Sprintf("%s/%s", sinkType, name)
diff --git a/pkg/ccl/changefeedccl/parallel_io.go b/pkg/ccl/changefeedccl/parallel_io.go
new file mode 100644
index 000000000000..25c69327f69d
--- /dev/null
+++ b/pkg/ccl/changefeedccl/parallel_io.go
@@ -0,0 +1,248 @@
+// Copyright 2023 The Cockroach Authors.
+//
+// Licensed as a CockroachDB Enterprise file under the Cockroach Community
+// License (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt
+
+package changefeedccl
+
+import (
+ "context"
+ "sync"
+
+ "github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
+ "github.com/cockroachdb/cockroach/pkg/util/intsets"
+ "github.com/cockroachdb/cockroach/pkg/util/retry"
+)
+
+// parallelIO allows performing blocking "IOHandler" calls on in parallel.
+// IORequests implement a Keys() function returning keys on which ordering is
+// preserved.
+// Example: if the events [[a,b], [b,c], [c,d], [e,f]] are all submitted in that
+// order, [a,b] and [e,f] can be emitted concurrentyl while [b,c] will block
+// until [a,b] completes, then [c,d] will block until [b,c] completes. If [c,d]
+// errored, [b,c] would never be sent, and an error would be returned with [c,d]
+// in an ioResult struct sent to resultCh. After sending an error to resultCh
+// all workers are torn down and no further requests are received or handled.
+type parallelIO struct {
+ retryOpts retry.Options
+ wg ctxgroup.Group
+ metrics metricsRecorder
+ doneCh chan struct{}
+
+ ioHandler IOHandler
+
+ requestCh chan IORequest
+ resultCh chan *ioResult // Readers should freeIOResult after handling result events
+}
+
+// IORequest represents an abstract unit of IO that has a set of keys upon which
+// sequential ordering of fulfillment must be enforced.
+type IORequest interface {
+ Keys() intsets.Fast
+}
+
+// ioResult stores the full request that was sent as well as an error if even
+// after retries the IOHanlder was unable to succeed.
+type ioResult struct {
+ request IORequest
+ err error
+}
+
+var resultPool sync.Pool = sync.Pool{
+ New: func() interface{} {
+ return new(ioResult)
+ },
+}
+
+func newIOResult(req IORequest, err error) *ioResult {
+ res := resultPool.Get().(*ioResult)
+ res.request = req
+ res.err = err
+ return res
+}
+func freeIOResult(e *ioResult) {
+ *e = ioResult{}
+ resultPool.Put(e)
+}
+
+// IOHandler performs a blocking IO operation on an IORequest
+type IOHandler func(context.Context, IORequest) error
+
+func newParallelIO(
+ ctx context.Context,
+ retryOpts retry.Options,
+ numWorkers int,
+ handler IOHandler,
+ metrics metricsRecorder,
+) *parallelIO {
+ wg := ctxgroup.WithContext(ctx)
+ io := ¶llelIO{
+ retryOpts: retryOpts,
+ wg: wg,
+ metrics: metrics,
+ ioHandler: handler,
+ requestCh: make(chan IORequest, numWorkers),
+ resultCh: make(chan *ioResult, numWorkers),
+ doneCh: make(chan struct{}),
+ }
+
+ wg.GoCtx(func(ctx context.Context) error {
+ return io.runWorkers(ctx, numWorkers)
+ })
+
+ return io
+}
+
+// Close stops all workers immediately and returns once they shut down. Inflight
+// requests sent to requestCh may never result in being sent to resultCh.
+func (p *parallelIO) Close() {
+ close(p.doneCh)
+ _ = p.wg.Wait()
+}
+
+func (p *parallelIO) runWorkers(ctx context.Context, numEmitWorkers int) error {
+ emitWithRetries := func(ctx context.Context, payload IORequest) error {
+ initialSend := true
+ return retry.WithMaxAttempts(ctx, p.retryOpts, p.retryOpts.MaxRetries+1, func() error {
+ if !initialSend {
+ p.metrics.recordInternalRetry(int64(payload.Keys().Len()), false)
+ }
+ initialSend = false
+ return p.ioHandler(ctx, payload)
+ })
+ }
+
+ // Multiple worker routines handle the IO operations, retrying when necessary.
+ emitCh := make(chan IORequest, numEmitWorkers)
+ defer close(emitCh)
+ workerResultCh := make(chan *ioResult, numEmitWorkers)
+
+ for i := 0; i < numEmitWorkers; i++ {
+ p.wg.GoCtx(func(ctx context.Context) error {
+ for req := range emitCh {
+ result := newIOResult(req, emitWithRetries(ctx, req))
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-p.doneCh:
+ return nil
+ case workerResultCh <- result:
+ }
+ }
+ return nil
+ })
+ }
+
+ var pendingResults []*ioResult
+
+ submitIO := func(req IORequest) error {
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-p.doneCh:
+ return nil
+ case emitCh <- req:
+ return nil
+ case res := <-workerResultCh:
+ // Must also handle results to avoid the above emit being able to block
+ // forever on all workers being busy trying to emit results.
+ pendingResults = append(pendingResults, res)
+ }
+ }
+ }
+
+ // The main routine keeps track of incoming and completed requests, where
+ // admitted requests yet to be completed have their Keys() tracked in an
+ // intset, and any incoming request with keys already in the intset are placed
+ // in a Queue to be sent to IO workers once the conflicting requests complete.
+ var inflight intsets.Fast
+ var pending []IORequest
+
+ handleResult := func(res *ioResult) error {
+ if res.err == nil {
+ // Clear out the completed keys to check for newly valid pending requests.
+ inflight.DifferenceWith(res.request.Keys())
+
+ // Check for a pending request that is now able to be sent i.e. is not
+ // conflicting with any inflight requests or any requests that arrived
+ // earlier than itself in the pending queue.
+ pendingKeys := intsets.Fast{}
+ for i, pendingReq := range pending {
+ if !inflight.Intersects(pendingReq.Keys()) && !pendingKeys.Intersects(pendingReq.Keys()) {
+ inflight.UnionWith(pendingReq.Keys())
+ pending = append(pending[:i], pending[i+1:]...)
+ if err := submitIO(pendingReq); err != nil {
+ return err
+ }
+ break
+ }
+
+ pendingKeys.UnionWith(pendingReq.Keys())
+ }
+ }
+
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-p.doneCh:
+ return nil
+ case p.resultCh <- res:
+ return nil
+ }
+ }
+
+ // A set of keys can be sent immediately if no yet-to-be-handled request
+ // observed so far shares any of those keys.
+ canSendKeys := func(keys intsets.Fast) bool {
+ if inflight.Intersects(keys) {
+ return true
+ }
+ for _, pendingReq := range pending {
+ if pendingReq.Keys().Intersects(keys) {
+ return true
+ }
+ }
+ return false
+ }
+
+ for {
+ // Results read from sendToWorker need to be first added to a pendingResults
+ // list and then handled separately here rather than calling handleResult
+ // inside sendToWorker, as having a re-entrant sendToWorker -> handleResult
+ // -> sendToWorker -> handleResult chain creates complexity with managing
+ // pending requests.
+ unhandled := pendingResults
+ pendingResults = nil
+ for _, res := range unhandled {
+ if err := handleResult(res); err != nil {
+ return err
+ }
+ }
+
+ select {
+ case req := <-p.requestCh:
+ if canSendKeys(req.Keys()) {
+ // If a request conflicts with any currently unhandled requests, add it
+ // to the pending queue to be rechecked for validity later.
+ pending = append(pending, req)
+ } else {
+ inflight.UnionWith(req.Keys())
+ if err := submitIO(req); err != nil {
+ return err
+ }
+ }
+ case res := <-workerResultCh:
+ if err := handleResult(res); err != nil {
+ return err
+ }
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-p.doneCh:
+ return nil
+ }
+ }
+}
diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go
index 70eff852d348..896cb34690d8 100644
--- a/pkg/ccl/changefeedccl/sink.go
+++ b/pkg/ccl/changefeedccl/sink.go
@@ -10,7 +10,11 @@ package changefeedccl
import (
"context"
+ "encoding/json"
+ "math"
"net/url"
+ "runtime"
+ "strconv"
"strings"
"time"
@@ -19,13 +23,17 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
+ "github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
+ "github.com/cockroachdb/cockroach/pkg/util/admission"
+ "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/bufalloc"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
+ "github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
@@ -207,10 +215,17 @@ func getSink(
if err != nil {
return nil, err
}
- return validateOptionsAndMakeSink(changefeedbase.WebhookValidOptions, func() (Sink, error) {
- return makeWebhookSink(ctx, sinkURL{URL: u}, encodingOpts, webhookOpts,
- defaultWorkerCount(), timeutil.DefaultTimeSource{}, metricsBuilder)
- })
+ if changefeedbase.NewWebhookSinkEnabled.Get(&serverCfg.Settings.SV) {
+ return validateOptionsAndMakeSink(changefeedbase.WebhookValidOptions, func() (Sink, error) {
+ return makeWebhookSink(ctx, sinkURL{URL: u}, encodingOpts, webhookOpts,
+ numSinkIOWorkers(serverCfg), sinkPacer(ctx, serverCfg), timeutil.DefaultTimeSource{}, metricsBuilder)
+ })
+ } else {
+ return validateOptionsAndMakeSink(changefeedbase.WebhookValidOptions, func() (Sink, error) {
+ return makeDeprecatedWebhookSink(ctx, sinkURL{URL: u}, encodingOpts, webhookOpts,
+ defaultWorkerCount(), timeutil.DefaultTimeSource{}, metricsBuilder)
+ })
+ }
case isPubsubSink(u):
// TODO: add metrics to pubsubsink
return MakePubsubSink(ctx, u, encodingOpts, AllTargets(feedCfg), opts.IsSet(changefeedbase.OptUnordered))
@@ -606,12 +621,10 @@ func (n *nullSink) Dial() error {
}
// safeSink wraps an EventSink in a mutex so it's methods are
-// thread safe. It also has a beforeFlush hook which is called
-// at the beginning of safeSink.Flush().
+// thread safe.
type safeSink struct {
syncutil.Mutex
- beforeFlush func(ctx context.Context) error
- wrapped EventSink
+ wrapped EventSink
}
var _ EventSink = (*safeSink)(nil)
@@ -645,9 +658,6 @@ func (s *safeSink) EmitRow(
}
func (s *safeSink) Flush(ctx context.Context) error {
- if err := s.beforeFlush(ctx); err != nil {
- return err
- }
s.Lock()
defer s.Unlock()
return s.wrapped.Flush(ctx)
@@ -673,3 +683,145 @@ type SinkWithEncoder interface {
Flush(ctx context.Context) error
}
+
+// proper JSON schema for sink config:
+//
+// {
+// "Flush": {
+// "Messages": ...,
+// "Bytes": ...,
+// "Frequency": ...,
+// },
+// "Retry": {
+// "Max": ...,
+// "Backoff": ...,
+// }
+// }
+type sinkJSONConfig struct {
+ Flush sinkBatchConfig `json:",omitempty"`
+ Retry sinkRetryConfig `json:",omitempty"`
+}
+
+type sinkBatchConfig struct {
+ Bytes, Messages int `json:",omitempty"`
+ Frequency jsonDuration `json:",omitempty"`
+}
+
+// wrapper structs to unmarshal json, retry.Options will be the actual config
+type sinkRetryConfig struct {
+ Max jsonMaxRetries `json:",omitempty"`
+ Backoff jsonDuration `json:",omitempty"`
+}
+
+func defaultRetryConfig() retry.Options {
+ opts := retry.Options{
+ InitialBackoff: 500 * time.Millisecond,
+ MaxRetries: 3,
+ Multiplier: 2,
+ }
+ // max backoff should be initial * 2 ^ maxRetries
+ opts.MaxBackoff = opts.InitialBackoff * time.Duration(int(math.Pow(2.0, float64(opts.MaxRetries))))
+ return opts
+}
+
+func getSinkConfigFromJson(
+ jsonStr changefeedbase.SinkSpecificJSONConfig, base sinkJSONConfig,
+) (batchCfg sinkBatchConfig, retryCfg retry.Options, err error) {
+ retryCfg = defaultRetryConfig()
+
+ var cfg = base
+ cfg.Retry.Max = jsonMaxRetries(retryCfg.MaxRetries)
+ cfg.Retry.Backoff = jsonDuration(retryCfg.InitialBackoff)
+ if jsonStr != `` {
+ // set retry defaults to be overridden if included in JSON
+ if err = json.Unmarshal([]byte(jsonStr), &cfg); err != nil {
+ return batchCfg, retryCfg, errors.Wrapf(err, "error unmarshalling json")
+ }
+ }
+
+ // don't support negative values
+ if cfg.Flush.Messages < 0 || cfg.Flush.Bytes < 0 || cfg.Flush.Frequency < 0 ||
+ cfg.Retry.Max < 0 || cfg.Retry.Backoff < 0 {
+ return batchCfg, retryCfg, errors.Errorf("invalid option value %s, all config values must be non-negative", changefeedbase.OptWebhookSinkConfig)
+ }
+
+ // errors if other batch values are set, but frequency is not
+ if (cfg.Flush.Messages > 0 || cfg.Flush.Bytes > 0) && cfg.Flush.Frequency == 0 {
+ return batchCfg, retryCfg, errors.Errorf("invalid option value %s, flush frequency is not set, messages may never be sent", changefeedbase.OptWebhookSinkConfig)
+ }
+
+ retryCfg.MaxRetries = int(cfg.Retry.Max)
+ retryCfg.InitialBackoff = time.Duration(cfg.Retry.Backoff)
+ return cfg.Flush, retryCfg, nil
+}
+
+type jsonMaxRetries int
+
+func (j *jsonMaxRetries) UnmarshalJSON(b []byte) error {
+ var i int64
+ // try to parse as int
+ i, err := strconv.ParseInt(string(b), 10, 64)
+ if err == nil {
+ if i <= 0 {
+ return errors.Errorf("max retry count must be a positive integer. use 'inf' for infinite retries.")
+ }
+ *j = jsonMaxRetries(i)
+ } else {
+ // if that fails, try to parse as string (only accept 'inf')
+ var s string
+ // using unmarshal here to remove quotes around the string
+ if err := json.Unmarshal(b, &s); err != nil {
+ return err
+ }
+ if strings.ToLower(s) == "inf" {
+ // if used wants infinite retries, set to zero as retry.Options interprets this as infinity
+ *j = 0
+ } else if n, err := strconv.Atoi(s); err == nil { // also accept ints as strings
+ *j = jsonMaxRetries(n)
+ } else {
+ return errors.Errorf("max retries must be either a positive int or 'inf' for infinite retries.")
+ }
+ }
+ return nil
+}
+
+func numSinkIOWorkers(cfg *execinfra.ServerConfig) int {
+ numWorkers := changefeedbase.SinkIOWorkers.Get(&cfg.Settings.SV)
+ if numWorkers > 0 {
+ return int(numWorkers)
+ }
+
+ idealNumber := runtime.GOMAXPROCS(0)
+ if idealNumber < 1 {
+ return 1
+ }
+ if idealNumber > 32 {
+ return 32
+ }
+ return idealNumber
+}
+
+func sinkPacer(ctx context.Context, cfg *execinfra.ServerConfig) *admission.Pacer {
+ pacerRequestUnit := changefeedbase.SinkPacerRequestSize.Get(&cfg.Settings.SV)
+ enablePacer := changefeedbase.PerEventElasticCPUControlEnabled.Get(&cfg.Settings.SV)
+
+ var pacer *admission.Pacer = nil
+ if enablePacer {
+ tenantID, ok := roachpb.ClientTenantFromContext(ctx)
+ if !ok {
+ tenantID = roachpb.SystemTenantID
+ }
+
+ pacer = cfg.AdmissionPacerFactory.NewPacer(
+ pacerRequestUnit,
+ admission.WorkInfo{
+ TenantID: tenantID,
+ Priority: admissionpb.BulkNormalPri,
+ CreateTime: timeutil.Now().UnixNano(),
+ BypassAdmission: false,
+ },
+ )
+ }
+
+ return pacer
+}
diff --git a/pkg/ccl/changefeedccl/sink_webhook.go b/pkg/ccl/changefeedccl/sink_webhook.go
index 09fd6cb508a2..ff0bd20e68c7 100644
--- a/pkg/ccl/changefeedccl/sink_webhook.go
+++ b/pkg/ccl/changefeedccl/sink_webhook.go
@@ -17,11 +17,9 @@ import (
"fmt"
"hash/crc32"
"io"
- "math"
"net"
"net/http"
"net/url"
- "strconv"
"strings"
"time"
@@ -36,23 +34,7 @@ import (
"github.com/cockroachdb/errors"
)
-const (
- applicationTypeJSON = `application/json`
- applicationTypeCSV = `text/csv`
- authorizationHeader = `Authorization`
-)
-
-func isWebhookSink(u *url.URL) bool {
- switch u.Scheme {
- // allow HTTP here but throw an error later to make it clear HTTPS is required
- case changefeedbase.SinkSchemeWebhookHTTP, changefeedbase.SinkSchemeWebhookHTTPS:
- return true
- default:
- return false
- }
-}
-
-type webhookSink struct {
+type deprecatedWebhookSink struct {
// Webhook configuration.
parallelism int
retryCfg retry.Options
@@ -80,11 +62,11 @@ type webhookSink struct {
workerCtx context.Context
workerGroup ctxgroup.Group
exitWorkers func() // Signaled to shut down all workers.
- eventsChans []chan []messagePayload
+ eventsChans []chan []deprecatedMessagePayload
metrics metricsRecorder
}
-func (s *webhookSink) getConcreteType() sinkType {
+func (s *deprecatedWebhookSink) getConcreteType() sinkType {
return sinkTypeWebhook
}
@@ -100,7 +82,7 @@ type encodedPayload struct {
mvcc hlc.Timestamp
}
-func encodePayloadJSONWebhook(messages []messagePayload) (encodedPayload, error) {
+func encodePayloadJSONWebhook(messages []deprecatedMessagePayload) (encodedPayload, error) {
result := encodedPayload{
emitTime: timeutil.Now(),
}
@@ -129,7 +111,7 @@ func encodePayloadJSONWebhook(messages []messagePayload) (encodedPayload, error)
return result, err
}
-func encodePayloadCSVWebhook(messages []messagePayload) (encodedPayload, error) {
+func encodePayloadCSVWebhook(messages []deprecatedMessagePayload) (encodedPayload, error) {
result := encodedPayload{
emitTime: timeutil.Now(),
}
@@ -150,7 +132,7 @@ func encodePayloadCSVWebhook(messages []messagePayload) (encodedPayload, error)
return result, nil
}
-type messagePayload struct {
+type deprecatedMessagePayload struct {
// Payload message fields.
key []byte
val []byte
@@ -162,15 +144,15 @@ type messagePayload struct {
// webhookMessage contains either messagePayload or a flush request.
type webhookMessage struct {
flushDone *chan struct{}
- payload messagePayload
+ payload deprecatedMessagePayload
}
type batch struct {
- buffer []messagePayload
+ buffer []deprecatedMessagePayload
bufferBytes int
}
-func (b *batch) addToBuffer(m messagePayload) {
+func (b *batch) addToBuffer(m deprecatedMessagePayload) {
b.bufferBytes += len(m.val)
b.buffer = append(b.buffer, m)
}
@@ -185,36 +167,6 @@ type batchConfig struct {
Frequency jsonDuration `json:",omitempty"`
}
-type jsonMaxRetries int
-
-func (j *jsonMaxRetries) UnmarshalJSON(b []byte) error {
- var i int64
- // try to parse as int
- i, err := strconv.ParseInt(string(b), 10, 64)
- if err == nil {
- if i <= 0 {
- return errors.Errorf("max retry count must be a positive integer. use 'inf' for infinite retries.")
- }
- *j = jsonMaxRetries(i)
- } else {
- // if that fails, try to parse as string (only accept 'inf')
- var s string
- // using unmarshal here to remove quotes around the string
- if err := json.Unmarshal(b, &s); err != nil {
- return err
- }
- if strings.ToLower(s) == "inf" {
- // if used wants infinite retries, set to zero as retry.Options interprets this as infinity
- *j = 0
- } else if n, err := strconv.Atoi(s); err == nil { // also accept ints as strings
- *j = jsonMaxRetries(n)
- } else {
- return errors.Errorf("max retries must be either a positive int or 'inf' for infinite retries.")
- }
- }
- return nil
-}
-
// wrapper structs to unmarshal json, retry.Options will be the actual config
type retryConfig struct {
Max jsonMaxRetries `json:",omitempty"`
@@ -239,7 +191,7 @@ type webhookSinkConfig struct {
Retry retryConfig `json:",omitempty"`
}
-func (s *webhookSink) getWebhookSinkConfig(
+func (s *deprecatedWebhookSink) getWebhookSinkConfig(
jsonStr changefeedbase.SinkSpecificJSONConfig,
) (batchCfg batchConfig, retryCfg retry.Options, err error) {
retryCfg = defaultRetryConfig()
@@ -270,7 +222,7 @@ func (s *webhookSink) getWebhookSinkConfig(
return cfg.Flush, retryCfg, nil
}
-func makeWebhookSink(
+func makeDeprecatedWebhookSink(
ctx context.Context,
u sinkURL,
encodingOpts changefeedbase.EncodingOptions,
@@ -312,7 +264,7 @@ func makeWebhookSink(
ctx, cancel := context.WithCancel(ctx)
- sink := &webhookSink{
+ sink := &deprecatedWebhookSink{
workerCtx: ctx,
authHeader: opts.AuthHeader,
exitWorkers: cancel,
@@ -329,7 +281,7 @@ func makeWebhookSink(
}
// TODO(yevgeniy): Establish HTTP connection in Dial().
- sink.client, err = makeWebhookClient(u, connTimeout)
+ sink.client, err = deprecatedMakeWebhookClient(u, connTimeout)
if err != nil {
return nil, err
}
@@ -350,7 +302,7 @@ func makeWebhookSink(
return sink, nil
}
-func makeWebhookClient(u sinkURL, timeout time.Duration) (*httputil.Client, error) {
+func deprecatedMakeWebhookClient(u sinkURL, timeout time.Duration) (*httputil.Client, error) {
client := &httputil.Client{
Client: &http.Client{
Timeout: timeout,
@@ -417,30 +369,19 @@ func makeWebhookClient(u sinkURL, timeout time.Duration) (*httputil.Client, erro
return client, nil
}
-func defaultRetryConfig() retry.Options {
- opts := retry.Options{
- InitialBackoff: 500 * time.Millisecond,
- MaxRetries: 3,
- Multiplier: 2,
- }
- // max backoff should be initial * 2 ^ maxRetries
- opts.MaxBackoff = opts.InitialBackoff * time.Duration(int(math.Pow(2.0, float64(opts.MaxRetries))))
- return opts
-}
-
// defaultWorkerCount() is the number of CPU's on the machine
func defaultWorkerCount() int {
return system.NumCPU()
}
-func (s *webhookSink) Dial() error {
+func (s *deprecatedWebhookSink) Dial() error {
s.setupWorkers()
return nil
}
-func (s *webhookSink) setupWorkers() {
+func (s *deprecatedWebhookSink) setupWorkers() {
// setup events channels to send to workers and the worker group
- s.eventsChans = make([]chan []messagePayload, s.parallelism)
+ s.eventsChans = make([]chan []deprecatedMessagePayload, s.parallelism)
s.workerGroup = ctxgroup.WithContext(s.workerCtx)
s.batchChan = make(chan webhookMessage)
@@ -455,7 +396,7 @@ func (s *webhookSink) setupWorkers() {
return nil
})
for i := 0; i < s.parallelism; i++ {
- s.eventsChans[i] = make(chan []messagePayload)
+ s.eventsChans[i] = make(chan []deprecatedMessagePayload)
j := i
s.workerGroup.GoCtx(func(ctx context.Context) error {
s.workerLoop(j)
@@ -464,7 +405,7 @@ func (s *webhookSink) setupWorkers() {
}
}
-func (s *webhookSink) shouldSendBatch(b batch) bool {
+func (s *deprecatedWebhookSink) shouldSendBatch(b batch) bool {
// similar to sarama, send batch if:
// everything is zero (default)
// any one of the conditions are met UNLESS the condition is zero which means never batch
@@ -483,8 +424,8 @@ func (s *webhookSink) shouldSendBatch(b batch) bool {
}
}
-func (s *webhookSink) splitAndSendBatch(batch []messagePayload) error {
- workerBatches := make([][]messagePayload, s.parallelism)
+func (s *deprecatedWebhookSink) splitAndSendBatch(batch []deprecatedMessagePayload) error {
+ workerBatches := make([][]deprecatedMessagePayload, s.parallelism)
for _, msg := range batch {
// split batch into per-worker batches
i := s.workerIndex(msg.key)
@@ -504,7 +445,7 @@ func (s *webhookSink) splitAndSendBatch(batch []messagePayload) error {
}
// flushWorkers sends flush request to each worker and waits for each one to acknowledge.
-func (s *webhookSink) flushWorkers(done chan struct{}) error {
+func (s *deprecatedWebhookSink) flushWorkers(done chan struct{}) error {
for i := 0; i < len(s.eventsChans); i++ {
// Ability to write a nil message to events channel indicates that
// the worker has processed all other messages.
@@ -525,7 +466,7 @@ func (s *webhookSink) flushWorkers(done chan struct{}) error {
// batchWorker ingests messages from EmitRow into a batch and splits them into
// per-worker batches to be sent separately
-func (s *webhookSink) batchWorker() {
+func (s *deprecatedWebhookSink) batchWorker() {
var batchTracker batch
batchTimer := s.ts.NewTimer()
defer batchTimer.Stop()
@@ -578,7 +519,7 @@ func (s *webhookSink) batchWorker() {
}
}
-func (s *webhookSink) workerLoop(workerIndex int) {
+func (s *deprecatedWebhookSink) workerLoop(workerIndex int) {
for {
select {
case <-s.workerCtx.Done():
@@ -613,14 +554,14 @@ func (s *webhookSink) workerLoop(workerIndex int) {
}
}
-func (s *webhookSink) sendMessageWithRetries(ctx context.Context, reqBody []byte) error {
+func (s *deprecatedWebhookSink) sendMessageWithRetries(ctx context.Context, reqBody []byte) error {
requestFunc := func() error {
return s.sendMessage(ctx, reqBody)
}
return retry.WithMaxAttempts(ctx, s.retryCfg, s.retryCfg.MaxRetries+1, requestFunc)
}
-func (s *webhookSink) sendMessage(ctx context.Context, reqBody []byte) error {
+func (s *deprecatedWebhookSink) sendMessage(ctx context.Context, reqBody []byte) error {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.url.String(), bytes.NewReader(reqBody))
if err != nil {
return err
@@ -658,13 +599,13 @@ func (s *webhookSink) sendMessage(ctx context.Context, reqBody []byte) error {
// deterministically assigned to the same worker. Since we have a channel per
// worker, we can ensure per-worker ordering and therefore guarantee per-key
// ordering.
-func (s *webhookSink) workerIndex(key []byte) uint32 {
+func (s *deprecatedWebhookSink) workerIndex(key []byte) uint32 {
return crc32.ChecksumIEEE(key) % uint32(s.parallelism)
}
// exitWorkersWithError saves the first error message encountered by webhook workers,
// and requests all workers to terminate.
-func (s *webhookSink) exitWorkersWithError(err error) {
+func (s *deprecatedWebhookSink) exitWorkersWithError(err error) {
// errChan has buffer size 1, first error will be saved to the buffer and
// subsequent errors will be ignored
select {
@@ -675,7 +616,7 @@ func (s *webhookSink) exitWorkersWithError(err error) {
}
// sinkError checks to see if any errors occurred inside workers go routines.
-func (s *webhookSink) sinkError() error {
+func (s *deprecatedWebhookSink) sinkError() error {
select {
case err := <-s.errChan:
return err
@@ -684,7 +625,7 @@ func (s *webhookSink) sinkError() error {
}
}
-func (s *webhookSink) EmitRow(
+func (s *deprecatedWebhookSink) EmitRow(
ctx context.Context,
topic TopicDescriptor,
key, value []byte,
@@ -702,7 +643,7 @@ func (s *webhookSink) EmitRow(
case err := <-s.errChan:
return err
case s.batchChan <- webhookMessage{
- payload: messagePayload{
+ payload: deprecatedMessagePayload{
key: key,
val: value,
alloc: alloc,
@@ -714,7 +655,7 @@ func (s *webhookSink) EmitRow(
return nil
}
-func (s *webhookSink) EmitResolvedTimestamp(
+func (s *deprecatedWebhookSink) EmitResolvedTimestamp(
ctx context.Context, encoder Encoder, resolved hlc.Timestamp,
) error {
defer s.metrics.recordResolvedCallback()()
@@ -745,7 +686,7 @@ func (s *webhookSink) EmitResolvedTimestamp(
return nil
}
-func (s *webhookSink) Flush(ctx context.Context) error {
+func (s *deprecatedWebhookSink) Flush(ctx context.Context) error {
s.metrics.recordFlushRequestCallback()()
// Send flush request.
@@ -768,7 +709,7 @@ func (s *webhookSink) Flush(ctx context.Context) error {
}
}
-func (s *webhookSink) Close() error {
+func (s *deprecatedWebhookSink) Close() error {
s.exitWorkers()
// ignore errors here since we're closing the sink anyway
_ = s.workerGroup.Wait()
diff --git a/pkg/ccl/changefeedccl/sink_webhook_test.go b/pkg/ccl/changefeedccl/sink_webhook_test.go
index 199734be6bf5..0037f5cfd6fe 100644
--- a/pkg/ccl/changefeedccl/sink_webhook_test.go
+++ b/pkg/ccl/changefeedccl/sink_webhook_test.go
@@ -15,6 +15,7 @@ import (
"net/http"
"net/url"
"strings"
+ "sync/atomic"
"testing"
"time"
@@ -83,7 +84,7 @@ func setupWebhookSinkWithDetails(
if err != nil {
return nil, err
}
- sinkSrc, err := makeWebhookSink(ctx, sinkURL{URL: u}, encodingOpts, sinkOpts, parallelism, source, nilMetricsRecorderBuilder)
+ sinkSrc, err := makeWebhookSink(ctx, sinkURL{URL: u}, encodingOpts, sinkOpts, parallelism, nil, source, nilMetricsRecorderBuilder)
if err != nil {
return nil, err
}
@@ -172,8 +173,6 @@ func TestWebhookSink(t *testing.T) {
require.NoError(t, sinkSrcNoCert.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc))
require.Regexp(t, "x509", sinkSrcNoCert.Flush(context.Background()))
- require.EqualError(t, sinkSrcNoCert.EmitRow(context.Background(), nil, nil, nil, zeroTS, zeroTS, zeroAlloc),
- `context canceled`)
params.Set(changefeedbase.SinkParamSkipTLSVerify, "true")
sinkDestHost.RawQuery = params.Encode()
@@ -191,8 +190,6 @@ func TestWebhookSink(t *testing.T) {
err = sinkSrc.Flush(context.Background())
require.Error(t, err)
require.Contains(t, err.Error(), fmt.Sprintf(`Post "%s":`, sinkDest.URL()))
- require.EqualError(t, sinkSrc.EmitRow(context.Background(), nil, nil, nil, zeroTS, zeroTS, zeroAlloc),
- `context canceled`)
sinkDestHTTP, err := cdctest.StartMockWebhookSinkInsecure()
require.NoError(t, err)
@@ -207,8 +204,6 @@ func TestWebhookSink(t *testing.T) {
require.EqualError(t, sinkSrcWrongProtocol.Flush(context.Background()),
fmt.Sprintf(`Post "%s": http: server gave HTTP response to HTTPS client`, fmt.Sprintf("https://%s", strings.TrimPrefix(sinkDestHTTP.URL(),
"http://"))))
- require.EqualError(t, sinkSrcWrongProtocol.EmitRow(context.Background(), nil, nil, nil, zeroTS, zeroTS, zeroAlloc),
- `context canceled`)
sinkDestSecure, err := cdctest.StartMockWebhookSinkSecure(cert)
require.NoError(t, err)
@@ -230,6 +225,7 @@ func TestWebhookSink(t *testing.T) {
Opts: opts,
}
+ require.NoError(t, sinkSrc.Close())
sinkSrc, err = setupWebhookSinkWithDetails(context.Background(), details, parallelism, timeutil.DefaultTimeSource{})
require.NoError(t, err)
@@ -305,8 +301,6 @@ func TestWebhookSinkWithAuthOptions(t *testing.T) {
require.NoError(t, sinkSrcNoCreds.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc))
require.EqualError(t, sinkSrcNoCreds.Flush(context.Background()), "401 Unauthorized: ")
- require.EqualError(t, sinkSrcNoCreds.EmitRow(context.Background(), nil, nil, nil, zeroTS, zeroTS, zeroAlloc),
- `context canceled`)
// wrong credentials should result in a 401 as well
var wrongAuthHeader string
@@ -318,8 +312,6 @@ func TestWebhookSinkWithAuthOptions(t *testing.T) {
require.NoError(t, sinkSrcWrongCreds.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc))
require.EqualError(t, sinkSrcWrongCreds.Flush(context.Background()), "401 Unauthorized: ")
- require.EqualError(t, sinkSrcWrongCreds.EmitRow(context.Background(), nil, nil, nil, zeroTS, zeroTS, zeroAlloc),
- `context canceled`)
require.NoError(t, sinkSrc.Close())
require.NoError(t, sinkSrcNoCreds.Close())
@@ -603,6 +595,13 @@ func TestWebhookSinkConfig(t *testing.T) {
sinkSrc, err := setupWebhookSinkWithDetails(context.Background(), details, parallelism, mt)
require.NoError(t, err)
+ batchingSink, ok := sinkSrc.(*batchingSink)
+ require.True(t, ok)
+ var appendCount int32 = 0
+ batchingSink.knobs.OnAppend = func(event *rowEvent) {
+ atomic.AddInt32(&appendCount, 1)
+ }
+
// send incomplete batch
require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc()))
require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1001},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc()))
@@ -611,11 +610,10 @@ func TestWebhookSinkConfig(t *testing.T) {
require.Equal(t, sinkDest.Latest(), "")
testutils.SucceedsSoon(t, func() error {
- // wait for the timer in batch worker to be set (1 hour from now, as specified by config) before advancing time.
- if len(mt.Timers()) == 1 && mt.Timers()[0] == mt.Now().Add(time.Hour) {
+ if atomic.LoadInt32(&appendCount) >= 2 {
return nil
}
- return errors.New("Waiting for timer to be created by batch worker")
+ return errors.New("Waiting for rows to be buffered")
})
mt.Advance(time.Hour)
require.NoError(t, sinkSrc.Flush(context.Background()))
diff --git a/pkg/ccl/changefeedccl/sink_webhook_v2.go b/pkg/ccl/changefeedccl/sink_webhook_v2.go
new file mode 100644
index 000000000000..c0e0640b3990
--- /dev/null
+++ b/pkg/ccl/changefeedccl/sink_webhook_v2.go
@@ -0,0 +1,380 @@
+// Copyright 2023 The Cockroach Authors.
+//
+// Licensed as a CockroachDB Enterprise file under the Cockroach Community
+// License (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt
+
+package changefeedccl
+
+import (
+ "bytes"
+ "context"
+ "crypto/tls"
+ "crypto/x509"
+ "fmt"
+ "io"
+ "net"
+ "net/http"
+ "net/url"
+ "strings"
+ "time"
+
+ "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
+ "github.com/cockroachdb/cockroach/pkg/util/admission"
+ "github.com/cockroachdb/cockroach/pkg/util/httputil"
+ "github.com/cockroachdb/cockroach/pkg/util/timeutil"
+ "github.com/cockroachdb/errors"
+)
+
+const (
+ applicationTypeJSON = `application/json`
+ applicationTypeCSV = `text/csv`
+ authorizationHeader = `Authorization`
+)
+
+func isWebhookSink(u *url.URL) bool {
+ switch u.Scheme {
+ // allow HTTP here but throw an error later to make it clear HTTPS is required
+ case changefeedbase.SinkSchemeWebhookHTTP, changefeedbase.SinkSchemeWebhookHTTPS:
+ return true
+ default:
+ return false
+ }
+}
+
+type webhookSinkClient struct {
+ ctx context.Context
+ format changefeedbase.FormatType
+ url sinkURL
+ authHeader string
+ batchCfg sinkBatchConfig
+ client *httputil.Client
+}
+
+var _ SinkClient = (*webhookSinkClient)(nil)
+
+func makeWebhookSinkClient(
+ ctx context.Context,
+ u sinkURL,
+ encodingOpts changefeedbase.EncodingOptions,
+ opts changefeedbase.WebhookSinkOptions,
+ batchCfg sinkBatchConfig,
+ parallelism int,
+) (SinkClient, error) {
+ err := validateWebhookOpts(u, encodingOpts, opts)
+ if err != nil {
+ return nil, err
+ }
+
+ u.Scheme = strings.TrimPrefix(u.Scheme, `webhook-`)
+
+ sinkClient := &webhookSinkClient{
+ ctx: ctx,
+ authHeader: opts.AuthHeader,
+ format: encodingOpts.Format,
+ batchCfg: batchCfg,
+ }
+
+ var connTimeout time.Duration
+ if opts.ClientTimeout != nil {
+ connTimeout = *opts.ClientTimeout
+ }
+ sinkClient.client, err = makeWebhookClient(u, connTimeout, parallelism)
+ if err != nil {
+ return nil, err
+ }
+
+ // remove known query params from sink URL before setting in sink config
+ sinkURLParsed, err := url.Parse(u.String())
+ if err != nil {
+ return nil, err
+ }
+ params := sinkURLParsed.Query()
+ params.Del(changefeedbase.SinkParamSkipTLSVerify)
+ params.Del(changefeedbase.SinkParamCACert)
+ params.Del(changefeedbase.SinkParamClientCert)
+ params.Del(changefeedbase.SinkParamClientKey)
+ sinkURLParsed.RawQuery = params.Encode()
+ sinkClient.url = sinkURL{URL: sinkURLParsed}
+
+ return sinkClient, nil
+}
+
+func makeWebhookClient(
+ u sinkURL, timeout time.Duration, parallelism int,
+) (*httputil.Client, error) {
+ client := &httputil.Client{
+ Client: &http.Client{
+ Timeout: timeout,
+ Transport: &http.Transport{
+ DialContext: (&net.Dialer{Timeout: timeout}).DialContext,
+ MaxConnsPerHost: parallelism,
+ MaxIdleConnsPerHost: parallelism,
+ IdleConnTimeout: time.Minute,
+ ForceAttemptHTTP2: true,
+ },
+ },
+ }
+
+ dialConfig := struct {
+ tlsSkipVerify bool
+ caCert []byte
+ clientCert []byte
+ clientKey []byte
+ }{}
+
+ transport := client.Transport.(*http.Transport)
+
+ if _, err := u.consumeBool(changefeedbase.SinkParamSkipTLSVerify, &dialConfig.tlsSkipVerify); err != nil {
+ return nil, err
+ }
+ if err := u.decodeBase64(changefeedbase.SinkParamCACert, &dialConfig.caCert); err != nil {
+ return nil, err
+ }
+ if err := u.decodeBase64(changefeedbase.SinkParamClientCert, &dialConfig.clientCert); err != nil {
+ return nil, err
+ }
+ if err := u.decodeBase64(changefeedbase.SinkParamClientKey, &dialConfig.clientKey); err != nil {
+ return nil, err
+ }
+
+ transport.TLSClientConfig = &tls.Config{
+ InsecureSkipVerify: dialConfig.tlsSkipVerify,
+ }
+
+ if dialConfig.caCert != nil {
+ caCertPool, err := x509.SystemCertPool()
+ if err != nil {
+ return nil, errors.Wrap(err, "could not load system root CA pool")
+ }
+ if caCertPool == nil {
+ caCertPool = x509.NewCertPool()
+ }
+ if !caCertPool.AppendCertsFromPEM(dialConfig.caCert) {
+ return nil, errors.Errorf("failed to parse certificate data:%s", string(dialConfig.caCert))
+ }
+ transport.TLSClientConfig.RootCAs = caCertPool
+ }
+
+ if dialConfig.clientCert != nil && dialConfig.clientKey == nil {
+ return nil, errors.Errorf(`%s requires %s to be set`, changefeedbase.SinkParamClientCert, changefeedbase.SinkParamClientKey)
+ } else if dialConfig.clientKey != nil && dialConfig.clientCert == nil {
+ return nil, errors.Errorf(`%s requires %s to be set`, changefeedbase.SinkParamClientKey, changefeedbase.SinkParamClientCert)
+ }
+
+ if dialConfig.clientCert != nil && dialConfig.clientKey != nil {
+ cert, err := tls.X509KeyPair(dialConfig.clientCert, dialConfig.clientKey)
+ if err != nil {
+ return nil, errors.Wrap(err, `invalid client certificate data provided`)
+ }
+ transport.TLSClientConfig.Certificates = []tls.Certificate{cert}
+ }
+
+ return client, nil
+}
+
+func (wse *webhookSinkClient) makePayloadForBytes(body []byte) (SinkPayload, error) {
+ req, err := http.NewRequestWithContext(wse.ctx, http.MethodPost, wse.url.String(), bytes.NewReader(body))
+ if err != nil {
+ return nil, err
+ }
+ switch wse.format {
+ case changefeedbase.OptFormatJSON:
+ req.Header.Set("Content-Type", applicationTypeJSON)
+ case changefeedbase.OptFormatCSV:
+ req.Header.Set("Content-Type", applicationTypeCSV)
+ }
+
+ if wse.authHeader != "" {
+ req.Header.Set(authorizationHeader, wse.authHeader)
+ }
+
+ return req, nil
+}
+
+// MakeResolvedPayload implements the SinkClient interface
+func (wse *webhookSinkClient) MakeResolvedPayload(body []byte, topic string) (SinkPayload, error) {
+ return wse.makePayloadForBytes(body)
+}
+
+// Flush implements the SinkClient interface
+func (wse *webhookSinkClient) Flush(ctx context.Context, batch SinkPayload) error {
+ req := batch.(*http.Request)
+ res, err := wse.client.Do(req)
+ if err != nil {
+ return err
+ }
+ defer res.Body.Close()
+
+ if !(res.StatusCode >= http.StatusOK && res.StatusCode < http.StatusMultipleChoices) {
+ resBody, err := io.ReadAll(res.Body)
+ if err != nil {
+ return errors.Wrapf(err, "failed to read body for HTTP response with status: %d", res.StatusCode)
+ }
+ return fmt.Errorf("%s: %s", res.Status, string(resBody))
+ }
+ return nil
+}
+
+// Close implements the SinkClient interface
+func (wse *webhookSinkClient) Close() error {
+ wse.client.CloseIdleConnections()
+ return nil
+}
+
+func validateWebhookOpts(
+ u sinkURL, encodingOpts changefeedbase.EncodingOptions, opts changefeedbase.WebhookSinkOptions,
+) error {
+ if u.Scheme != changefeedbase.SinkSchemeWebhookHTTPS {
+ return errors.Errorf(`this sink requires %s`, changefeedbase.SinkSchemeHTTPS)
+ }
+
+ switch encodingOpts.Format {
+ case changefeedbase.OptFormatJSON:
+ case changefeedbase.OptFormatCSV:
+ default:
+ return errors.Errorf(`this sink is incompatible with %s=%s`,
+ changefeedbase.OptFormat, encodingOpts.Format)
+ }
+
+ switch encodingOpts.Envelope {
+ case changefeedbase.OptEnvelopeWrapped, changefeedbase.OptEnvelopeBare:
+ default:
+ return errors.Errorf(`this sink is incompatible with %s=%s`,
+ changefeedbase.OptEnvelope, encodingOpts.Envelope)
+ }
+
+ encodingOpts.TopicInValue = true
+
+ if encodingOpts.Envelope != changefeedbase.OptEnvelopeBare {
+ encodingOpts.KeyInValue = true
+ }
+
+ return nil
+}
+
+type webhookCSVBuffer struct {
+ bytes []byte
+ messageCount int
+ sc *webhookSinkClient
+}
+
+var _ BatchBuffer = (*webhookCSVBuffer)(nil)
+
+// Append implements the BatchBuffer interface
+func (cw *webhookCSVBuffer) Append(key []byte, value []byte, topic string) {
+ cw.bytes = append(cw.bytes, value...)
+ cw.messageCount += 1
+}
+
+// ShouldFlush implements the BatchBuffer interface
+func (cw *webhookCSVBuffer) ShouldFlush() bool {
+ return cw.sc.shouldFlush(len(cw.bytes), cw.messageCount)
+}
+
+// Close implements the BatchBuffer interface
+func (cw *webhookCSVBuffer) Close() (SinkPayload, error) {
+ return cw.sc.makePayloadForBytes(cw.bytes)
+}
+
+type webhookJSONBuffer struct {
+ messages [][]byte
+ numBytes int
+ sc *webhookSinkClient
+}
+
+var _ BatchBuffer = (*webhookJSONBuffer)(nil)
+
+// Append implements the BatchBuffer interface
+func (jw *webhookJSONBuffer) Append(key []byte, value []byte, topic string) {
+ jw.messages = append(jw.messages, value)
+ jw.numBytes += len(value)
+}
+
+// ShouldFlush implements the BatchBuffer interface
+func (jw *webhookJSONBuffer) ShouldFlush() bool {
+ return jw.sc.shouldFlush(jw.numBytes, len(jw.messages))
+}
+
+// Close implements the BatchBuffer interface
+func (jw *webhookJSONBuffer) Close() (SinkPayload, error) {
+ var buffer bytes.Buffer
+ prefix := "{\"payload\":["
+ suffix := fmt.Sprintf("],\"length\":%d}", len(jw.messages))
+
+ // Grow all at once to avoid reallocations
+ buffer.Grow(len(prefix) + jw.numBytes /* kvs */ + len(jw.messages) /* commas */ + len(suffix))
+
+ buffer.WriteString(prefix)
+ for i, msg := range jw.messages {
+ if i != 0 {
+ buffer.WriteByte(',')
+ }
+ buffer.Write(msg)
+ }
+ buffer.WriteString(suffix)
+ return jw.sc.makePayloadForBytes(buffer.Bytes())
+}
+
+func (wse *webhookSinkClient) MakeBatchBuffer() BatchBuffer {
+ if wse.format == changefeedbase.OptFormatCSV {
+ return &webhookCSVBuffer{sc: wse}
+ } else {
+ return &webhookJSONBuffer{
+ sc: wse,
+ messages: make([][]byte, 0, wse.batchCfg.Messages),
+ }
+ }
+}
+
+func (wse *webhookSinkClient) shouldFlush(bytes int, messages int) bool {
+ switch {
+ // all zero values is interpreted as flush every time
+ case wse.batchCfg.Messages == 0 && wse.batchCfg.Bytes == 0 && wse.batchCfg.Frequency == 0:
+ return true
+ // messages threshold has been reached
+ case wse.batchCfg.Messages > 0 && messages >= wse.batchCfg.Messages:
+ return true
+ // bytes threshold has been reached
+ case wse.batchCfg.Bytes > 0 && bytes >= wse.batchCfg.Bytes:
+ return true
+ default:
+ return false
+ }
+}
+
+func makeWebhookSink(
+ ctx context.Context,
+ u sinkURL,
+ encodingOpts changefeedbase.EncodingOptions,
+ opts changefeedbase.WebhookSinkOptions,
+ parallelism int,
+ pacer *admission.Pacer,
+ source timeutil.TimeSource,
+ mb metricsRecorderBuilder,
+) (Sink, error) {
+ batchCfg, retryOpts, err := getSinkConfigFromJson(opts.JSONConfig, sinkJSONConfig{})
+ if err != nil {
+ return nil, err
+ }
+
+ sinkClient, err := makeWebhookSinkClient(ctx, u, encodingOpts, opts, batchCfg, parallelism)
+ if err != nil {
+ return nil, err
+ }
+
+ return makeBatchingSink(
+ ctx,
+ sinkTypeWebhook,
+ sinkClient,
+ time.Duration(batchCfg.Frequency),
+ retryOpts,
+ parallelism,
+ nil,
+ pacer,
+ source,
+ mb(requiresResourceAccounting),
+ ), nil
+}
diff --git a/pkg/cmd/roachtest/tests/cdc.go b/pkg/cmd/roachtest/tests/cdc.go
index 196dcf340267..0d0f098e66a2 100644
--- a/pkg/cmd/roachtest/tests/cdc.go
+++ b/pkg/cmd/roachtest/tests/cdc.go
@@ -1225,6 +1225,7 @@ func registerCDC(r registry.Registry) {
ct.runTPCCWorkload(tpccArgs{warehouses: 100, duration: "30m"})
+ // The deprecated webhook sink is unable to handle the throughput required for 100 warehouses
if _, err := ct.DB().Exec("SET CLUSTER SETTING changefeed.new_webhook_sink_enabled = true;"); err != nil {
ct.t.Fatal(err)
}
@@ -1234,7 +1235,6 @@ func registerCDC(r registry.Registry) {
targets: allTpccTargets,
opts: map[string]string{
"metrics_label": "'webhook'",
- "initial_scan": "'only'",
"webhook_sink_config": `'{"Flush": { "Messages": 100, "Frequency": "5s" } }'`,
},
})