From 82fbf7b400e39ee7753fa2496e38611a70827077 Mon Sep 17 00:00:00 2001 From: Shiranka Miskin Date: Tue, 21 Mar 2023 16:51:37 +0000 Subject: [PATCH] changefeedccl: webhook sink refactor Resolves #84676 Epic: https://cockroachlabs.atlassian.net/browse/CRDB-11356 This PR implements the Webhook sink as part of a more general `batchingSink` framework that can be used to make adding new sinks an easier process, making it far more performant than it was previously. A followup PR will be made to use the `batchingSink` for the pubsub client which also suffers performance issues. --- Sink-specific code is encapsulated in a SinkClient interface ```go type SinkClient interface { MakeResolvedPayload(body []byte, topic string) (SinkPayload, error) MakeBatchWriter() BatchWriter Flush(context.Context, SinkPayload) error Close() error } type BatchWriter interface { AppendKV(key []byte, value []byte, topic string) ShouldFlush() bool Close() (SinkPayload, error) } type SinkPayload interface{} ``` Once the Batch is ready to be Flushed, the writer can be `Close()`'d to do any final formatting (ex: wrap in a json object with extra metadata) of the buffer-able data and obtain a final `SinkPayload` that is ready to be passed to `SinkClient.Flush`. The `SinkClient` has a separate `MakeResolvedPayload` since the sink may require resolved events be formatted differently to a batch of kvs. `Flush(ctx, payload)` encapsulates sending a blocking IO request to the sink endpoint, and may be called multiple times with the same payload due to retries. Any kind of formatting work should be served to run in the buffer's `Close` and stored as a `SinkPayload` to avoid multiple calls to `Flush` repeating work upon retries. --- The `batchingSink` handles all the logic to take a SinkClient and form a full Sink implementation. ```go type batchingSink struct { client SinkClient ioWorkers int minFlushFrequency time.Duration retryOpts retry.Options eventPool sync.Pool batchPool sync.Pool eventCh chan interface{} pacer *admission.Pacer ... } var _ Sink = (*batchingSink)(nil) ``` It involves a single goroutine which handles: - Creating, building up, and finalizing `BatchWriter`s to eventually form a `SinkPayload` to emit - Flushing batches when they have persisted longer than a configured `minFlushFrequency` - Flushing deliberately and being able to block until the Flush has completed - Logging all the various sink metrics `EmitRow` calls are thread-safe therefore the use of the `safeSink` wrapper is not required for users of this sink. Events sent through the goroutines would normally need to exist on the heap, but to avoid excessive garbage collection of hundreds of thousands of tiny structs, both the `kvEvents{}` events (sent from the EmitRow caller to the batching wokrer) and the `sinkBatchBuffer{}` events (sent from the batching worker to the IO routine in the next section) are allocated on object pools. --- For a sink like Cloudstorage where there are large batches, doing the above and just one-by-one flushing the batch payloads on a separate routine is plenty good enough. Unfortunately the Webhook sink can be used with no batching at all with users wanting the lowest latency while still having good throughput. This means we need to be able to have multiple requests in flight. The difficulty here is if a batch with keys [a1,b1] is in flight, a batch with keys [b2,c1] needs to block until [a1,b1] completes as b2 cannot be sent and risk arriving at the destination prior to b1. Flushing out Payloads in a way that is both able to maintain key-ordering guarantees but is able to run in parallel is done by a separate `parallel_io` struct. ```go type parallelIO struct { retryOpts retry.Options ioHandler IOHandler requestCh chan IORequest resultCh chan IORequest ... } type IOHandler func(context.Context, IORequest) error type IORequest interface { Keys() intsets.Fast SetError(error) } ``` It involves one goroutine to manage the key ordering guarantees and a configurable number of IO Worker goroutines that simply call `ioHandler` on an `IORequest`. IORequests represent the keys they shouldn't conflict on by providing a `intsets.Fast` struct, which allows for efficient Union/Intersects/Difference operations on them that `parallelIO` needs to maintain ordering guarantees. Requests are received as IORequests and the response is also returned as an IORequest. This way the parallelIO struct does not have to do any heap allocations to communicate, the user of it can manage creating and freeing these objects in pools. The only heap allocations that occur are part of the `intset` operations as it uses a linkedlist internally. --- The webhook sink is therefore formed by: 1. EmitRow is called, creating kvEvents that are sent to a Batching worker 2. The batching worker takes events and appends them to a batch 3. Once the batch is full, its encoded into an HTTP request 4. The request object is then sharded across a set of IO workers to be fully sent out in parallel with other non-key-conflicting requests. With this setup, looking at the CPU flamegraph, at high throughputs most of the `batchingSink`/`parallelIO` work didn't really show up much, the work was largely just step 3, where taking a list of messages and calling `json.Marshal` on it took almost 10% of the time, specifically a call to `json.Compress`. Since this isn't needed, and all we're doing is simply putting a list of already-formatted JSON messages into a surrounding JSON array and small object, I also swapped `json.Marshal` to just stitch together characters manually into a buffer. --- Since Matt's talked about a new significance being placed on Feature flagging new work to avoid need for technical advisories, I placed this new implementation under the changefeed.new_webhook_sink_enabled setting and defaulted it to be disabled. --- Release note (performance improvement): the webhook sink is now able to handle a drastically higher maximum throughput by enabling the "changefeed.new_webhook_sink_enabled" cluster setting. --- .../settings/settings-for-tenants.txt | 1 + docs/generated/settings/settings.html | 1 + pkg/ccl/changefeedccl/BUILD.bazel | 4 + pkg/ccl/changefeedccl/batching_sink.go | 456 ++++++++++++++++++ .../changefeedccl/changefeed_processors.go | 11 +- .../changefeedccl/changefeedbase/settings.go | 28 +- pkg/ccl/changefeedccl/event_processing.go | 8 +- pkg/ccl/changefeedccl/helpers_test.go | 8 +- pkg/ccl/changefeedccl/parallel_io.go | 248 ++++++++++ pkg/ccl/changefeedccl/sink.go | 186 ++++++- pkg/ccl/changefeedccl/sink_webhook.go | 127 ++--- pkg/ccl/changefeedccl/sink_webhook_test.go | 26 +- pkg/ccl/changefeedccl/sink_webhook_v2.go | 380 +++++++++++++++ pkg/cmd/roachtest/tests/cdc.go | 2 +- 14 files changed, 1360 insertions(+), 126 deletions(-) create mode 100644 pkg/ccl/changefeedccl/batching_sink.go create mode 100644 pkg/ccl/changefeedccl/parallel_io.go create mode 100644 pkg/ccl/changefeedccl/sink_webhook_v2.go diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 40ee3c481a31..85e4b1deab14 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -17,6 +17,7 @@ changefeed.event_consumer_workers integer 0 the number of workers to use when pr changefeed.fast_gzip.enabled boolean true use fast gzip implementation changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds changefeed.schema_feed.read_with_priority_after duration 1m0s retry with high priority if we were not able to read descriptors for too long; 0 disables +changefeed.sink_io_workers integer 0 the number of workers used by changefeeds when sending requests to the sink: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. cloudstorage.azure.concurrent_upload_buffers integer 1 controls the number of concurrent buffers that will be used by the Azure client when uploading chunks.Each buffer can buffer up to cloudstorage.write_chunk.size of memory during an upload cloudstorage.http.custom_ca string custom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage cloudstorage.timeout duration 10m0s the timeout for import/export storage operations diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 48d06500dc6f..fe6faa7d536e 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -23,6 +23,7 @@
changefeed.fast_gzip.enabled
booleantrueuse fast gzip implementation
changefeed.node_throttle_config
stringspecifies node level throttling configuration for all changefeeeds
changefeed.schema_feed.read_with_priority_after
duration1m0sretry with high priority if we were not able to read descriptors for too long; 0 disables +
changefeed.sink_io_workers
integer0the number of workers used by changefeeds when sending requests to the sink: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value.
cloudstorage.azure.concurrent_upload_buffers
integer1controls the number of concurrent buffers that will be used by the Azure client when uploading chunks.Each buffer can buffer up to cloudstorage.write_chunk.size of memory during an upload
cloudstorage.http.custom_ca
stringcustom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage
cloudstorage.timeout
duration10m0sthe timeout for import/export storage operations diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index baacdff30789..a19790992095 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "alter_changefeed_stmt.go", "authorization.go", "avro.go", + "batching_sink.go", "changefeed.go", "changefeed_dist.go", "changefeed_processors.go", @@ -20,6 +21,7 @@ go_library( "event_processing.go", "metrics.go", "name.go", + "parallel_io.go", "parquet_sink_cloudstorage.go", "retry.go", "scheduled_changefeed.go", @@ -32,6 +34,7 @@ go_library( "sink_pubsub.go", "sink_sql.go", "sink_webhook.go", + "sink_webhook_v2.go", "telemetry.go", "testing_knobs.go", "tls.go", @@ -125,6 +128,7 @@ go_library( "//pkg/util/hlc", "//pkg/util/httputil", "//pkg/util/humanizeutil", + "//pkg/util/intsets", "//pkg/util/json", "//pkg/util/log", "//pkg/util/log/eventpb", diff --git a/pkg/ccl/changefeedccl/batching_sink.go b/pkg/ccl/changefeedccl/batching_sink.go new file mode 100644 index 000000000000..15db202b0691 --- /dev/null +++ b/pkg/ccl/changefeedccl/batching_sink.go @@ -0,0 +1,456 @@ +// Copyright 2023 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "context" + "hash" + "sync" + "time" + + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" + "github.com/cockroachdb/cockroach/pkg/util/admission" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/intsets" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" +) + +// SinkClient is an interface to an external sink, where messages are written +// into batches as they arrive and once ready are flushed out. +type SinkClient interface { + MakeResolvedPayload(body []byte, topic string) (SinkPayload, error) + MakeBatchBuffer() BatchBuffer + Flush(context.Context, SinkPayload) error + Close() error +} + +// BatchBuffer is an interface to aggregate KVs into a payload that can be sent +// to the sink. +type BatchBuffer interface { + Append(key []byte, value []byte, topic string) + ShouldFlush() bool + Close() (SinkPayload, error) +} + +// SinkPayload is an interface representing a sink-specific representation of a +// batch of messages that is ready to be emitted by its EmitRow method. +type SinkPayload interface{} + +// batchingSink wraps a SinkClient to provide a Sink implementation that calls +// the SinkClient methods to form batches and flushes those batches across +// multiple parallel IO workers. +type batchingSink struct { + client SinkClient + topicNamer *TopicNamer + concreteType sinkType + + ioWorkers int + minFlushFrequency time.Duration + retryOpts retry.Options + + ts timeutil.TimeSource + metrics metricsRecorder + knobs batchingSinkKnobs + + // eventCh is the channel used to send requests from the Sink caller routines + // to the batching routine. Messages can either be a flushReq or a kvEvent. + eventCh chan interface{} + + termErr error + wg ctxgroup.Group + hasher hash.Hash32 + pacer *admission.Pacer + doneCh chan struct{} +} + +type batchingSinkKnobs struct { + OnAppend func(*rowEvent) +} + +type flushReq struct { + waiter chan struct{} +} + +type rowEvent struct { + key []byte + val []byte + topic string + + alloc kvevent.Alloc + mvcc hlc.Timestamp +} + +// Flush implements the Sink interface, returning the first error that has +// occured in the past EmitRow calls. +func (s *batchingSink) Flush(ctx context.Context) error { + flushWaiter := make(chan struct{}) + select { + case <-ctx.Done(): + case <-s.doneCh: + case s.eventCh <- flushReq{waiter: flushWaiter}: + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-s.doneCh: + return nil + case <-flushWaiter: + return s.termErr + } +} + +var _ Sink = (*batchingSink)(nil) + +// Event structs and batch structs which are transferred across routines (and +// therefore escape to the heap) can both be incredibly frequent (every event +// may be its own batch) and temporary, so to avoid GC thrashing they are both +// claimed and freed from object pools. +var eventPool sync.Pool = sync.Pool{ + New: func() interface{} { + return new(rowEvent) + }, +} + +func newRowEvent() *rowEvent { + return eventPool.Get().(*rowEvent) +} +func freeRowEvent(e *rowEvent) { + *e = rowEvent{} + eventPool.Put(e) +} + +var batchPool sync.Pool = sync.Pool{ + New: func() interface{} { + return new(sinkBatch) + }, +} + +func newSinkBatch() *sinkBatch { + return batchPool.Get().(*sinkBatch) +} +func freeSinkBatchEvent(b *sinkBatch) { + *b = sinkBatch{} + batchPool.Put(b) +} + +// EmitRow implements the Sink interface. +func (s *batchingSink) EmitRow( + ctx context.Context, + topic TopicDescriptor, + key, value []byte, + updated, mvcc hlc.Timestamp, + alloc kvevent.Alloc, +) error { + s.metrics.recordMessageSize(int64(len(key) + len(value))) + + payload := newRowEvent() + payload.key = key + payload.val = value + payload.topic = "" // unimplemented for now + payload.mvcc = mvcc + payload.alloc = alloc + + select { + case <-ctx.Done(): + return ctx.Err() + case s.eventCh <- payload: + case <-s.doneCh: + } + + return nil +} + +// EmitResolvedTimestamp implements the Sink interface. +func (s *batchingSink) EmitResolvedTimestamp( + ctx context.Context, encoder Encoder, resolved hlc.Timestamp, +) error { + data, err := encoder.EncodeResolvedTimestamp(ctx, "", resolved) + if err != nil { + return err + } + payload, err := s.client.MakeResolvedPayload(data, "") + if err != nil { + return err + } + + if err = s.Flush(ctx); err != nil { + return err + } + return retry.WithMaxAttempts(ctx, s.retryOpts, s.retryOpts.MaxRetries+1, func() error { + defer s.metrics.recordFlushRequestCallback()() + return s.client.Flush(ctx, payload) + }) +} + +// Close implements the Sink interface. +func (s *batchingSink) Close() error { + close(s.doneCh) + _ = s.wg.Wait() + if s.pacer != nil { + s.pacer.Close() + } + return s.client.Close() +} + +// Dial implements the Sink interface. +func (s *batchingSink) Dial() error { + return nil +} + +// getConcreteType implements the Sink interface. +func (s *batchingSink) getConcreteType() sinkType { + return s.concreteType +} + +// sinkBatch stores an in-progress/complete batch of messages, along with +// metadata related to the batch. +type sinkBatch struct { + buffer BatchBuffer + payload SinkPayload // payload is nil until FinalizePayload has been called + + numMessages int + numKVBytes int // the total amount of uncompressed kv data in the batch + keys intsets.Fast // An intset of the keys within the batch to provide to parallelIO + bufferTime time.Time // The earliest time a message was inserted into the batch + mvcc hlc.Timestamp + + alloc kvevent.Alloc + hasher hash.Hash32 +} + +// FinalizePayload closes the writer to produce a payload that is ready to be +// Flushed by the SinkClient. +func (sb *sinkBatch) FinalizePayload() error { + payload, err := sb.buffer.Close() + if err != nil { + return err + } + sb.payload = payload + return nil +} + +// Keys implements the IORequest interface. +func (sb *sinkBatch) Keys() intsets.Fast { + return sb.keys +} + +func (sb *sinkBatch) isEmpty() bool { + return sb.numMessages == 0 +} + +func hashToInt(h hash.Hash32, buf []byte) int { + h.Reset() + h.Write(buf) + return int(h.Sum32()) +} + +// Append adds the contents of a kvEvent to the batch, merging its alloc pool. +func (sb *sinkBatch) Append(e *rowEvent) { + if sb.isEmpty() { + sb.bufferTime = timeutil.Now() + } + + sb.buffer.Append(e.key, e.val, e.topic) + + sb.keys.Add(hashToInt(sb.hasher, e.key)) + sb.numMessages += 1 + sb.numKVBytes += len(e.key) + len(e.val) + + if sb.mvcc.IsEmpty() || e.mvcc.Less(sb.mvcc) { + sb.mvcc = e.mvcc + } + + sb.alloc.Merge(&e.alloc) +} + +func (s *batchingSink) handleError(err error) { + if s.termErr == nil { + s.termErr = err + } +} + +func (s *batchingSink) newBatchBuffer() *sinkBatch { + batch := newSinkBatch() + batch.buffer = s.client.MakeBatchBuffer() + batch.hasher = s.hasher + return batch +} + +// runBatchingWorker combines 1 or more KV events into batches, sending the IO +// requests out either once the batch is full or a flush request arrives. +func (s *batchingSink) runBatchingWorker(ctx context.Context) { + batchBuffer := s.newBatchBuffer() + + // Once finalized, batches are sent to a parallelIO struct which handles + // performing multiple Flushes in parallel while maintaining Keys() ordering. + ioHandler := func(ctx context.Context, req IORequest) error { + defer s.metrics.recordFlushRequestCallback()() + batch, _ := req.(*sinkBatch) + return s.client.Flush(ctx, batch.payload) + } + ioEmitter := newParallelIO(ctx, s.retryOpts, s.ioWorkers, ioHandler, s.metrics) + defer ioEmitter.Close() + + var handleResult func(result *ioResult) + + tryFlushBatch := func() { + if batchBuffer.isEmpty() { + return + } + toFlush := batchBuffer + batchBuffer = s.newBatchBuffer() + + if err := toFlush.FinalizePayload(); err != nil { + s.handleError(err) + return + } + + // Emitting needs to also handle any incoming results to avoid a deadlock + // with trying to emit while the emitter is blocked on returning a result. + for { + select { + case <-ctx.Done(): + case ioEmitter.requestCh <- toFlush: + case result := <-ioEmitter.resultCh: + handleResult(result) + continue + case <-s.doneCh: + } + break + } + } + + // Flushing requires tracking the number of inflight messages and confirming + // completion to the requester once the counter reaches 0. + inflight := 0 + var sinkFlushWaiter chan struct{} + + handleResult = func(result *ioResult) { + batch, _ := result.request.(*sinkBatch) + + if result.err != nil { + s.handleError(result.err) + } else { + s.metrics.recordEmittedBatch( + batch.bufferTime, batch.numMessages, batch.mvcc, batch.numKVBytes, sinkDoesNotCompress, + ) + } + + inflight -= batch.numMessages + + if (result.err != nil || inflight == 0) && sinkFlushWaiter != nil { + close(sinkFlushWaiter) + sinkFlushWaiter = nil + } + + freeIOResult(result) + batch.alloc.Release(ctx) + freeSinkBatchEvent(batch) + } + + flushTimer := s.ts.NewTimer() + defer flushTimer.Stop() + + for { + if s.pacer != nil { + if err := s.pacer.Pace(ctx); err != nil { + if pacerLogEvery.ShouldLog() { + log.Errorf(ctx, "automatic sink batcher pacing: %v", err) + } + } + } + + select { + case req := <-s.eventCh: + if flush, isFlush := req.(flushReq); isFlush { + if inflight == 0 { + close(flush.waiter) + } else { + sinkFlushWaiter = flush.waiter + tryFlushBatch() + } + } else if event, isKV := req.(*rowEvent); isKV { + inflight += 1 + + // If we're about to append to an empty batch, start the timer to + // guarantee the messages do not stay buffered longer than the + // configured frequency. + if batchBuffer.isEmpty() && s.minFlushFrequency > 0 { + flushTimer.Reset(s.minFlushFrequency) + } + + batchBuffer.Append(event) + if s.knobs.OnAppend != nil { + s.knobs.OnAppend(event) + } + + // The event struct can be freed as the contents are expected to be + // managed by the batch instead. + freeRowEvent(event) + + if batchBuffer.buffer.ShouldFlush() { + s.metrics.recordSizeBasedFlush() + tryFlushBatch() + } + } + case result := <-ioEmitter.resultCh: + handleResult(result) + case <-flushTimer.Ch(): + flushTimer.MarkRead() + tryFlushBatch() + case <-ctx.Done(): + return + case <-s.doneCh: + return + } + + if s.termErr != nil { + return + } + } +} + +func makeBatchingSink( + ctx context.Context, + concreteType sinkType, + client SinkClient, + minFlushFrequency time.Duration, + retryOpts retry.Options, + numWorkers int, + topicNamer *TopicNamer, + pacer *admission.Pacer, + timeSource timeutil.TimeSource, + metrics metricsRecorder, +) Sink { + sink := &batchingSink{ + client: client, + topicNamer: topicNamer, + concreteType: concreteType, + minFlushFrequency: minFlushFrequency, + ioWorkers: numWorkers, + retryOpts: retryOpts, + ts: timeSource, + metrics: metrics, + eventCh: make(chan interface{}, flushQueueDepth), + wg: ctxgroup.WithContext(ctx), + hasher: makeHasher(), + pacer: pacer, + doneCh: make(chan struct{}), + } + + sink.wg.GoCtx(func(ctx context.Context) error { + sink.runBatchingWorker(ctx) + return nil + }) + return sink +} diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index fde56353bfa3..76032968ded5 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -564,12 +564,19 @@ func (ca *changeAggregator) tick() error { return ca.noteResolvedSpan(resolved) } case kvevent.TypeFlush: - return ca.sink.Flush(ca.Ctx()) + return ca.flushBufferedEvents() } return nil } +func (ca *changeAggregator) flushBufferedEvents() error { + if err := ca.eventConsumer.Flush(ca.Ctx()); err != nil { + return err + } + return ca.sink.Flush(ca.Ctx()) +} + // noteResolvedSpan periodically flushes Frontier progress from the current // changeAggregator node to the changeFrontier node to allow the changeFrontier // to persist the overall changefeed's progress @@ -621,7 +628,7 @@ func (ca *changeAggregator) flushFrontier() error { // otherwise, we could lose buffered messages and violate the // at-least-once guarantee. This is also true for checkpointing the // resolved spans in the job progress. - if err := ca.sink.Flush(ca.Ctx()); err != nil { + if err := ca.flushBufferedEvents(); err != nil { return err } diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go index abb8292b3867..9bd743c06ee2 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/settings.go +++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go @@ -262,9 +262,9 @@ var EventConsumerPacerRequestSize = settings.RegisterDurationSetting( settings.PositiveDuration, ) -// EventConsumerElasticCPUControlEnabled determines whether changefeed event +// PerEventElasticCPUControlEnabled determines whether changefeed event // processing integrates with elastic CPU control. -var EventConsumerElasticCPUControlEnabled = settings.RegisterBoolSetting( +var PerEventElasticCPUControlEnabled = settings.RegisterBoolSetting( settings.TenantWritable, "changefeed.cpu.per_event_elastic_control.enabled", "determines whether changefeed event processing integrates with elastic CPU control", @@ -281,3 +281,27 @@ var RequireExternalConnectionSink = settings.RegisterBoolSetting( " see https://www.cockroachlabs.com/docs/stable/create-external-connection.html", false, ) + +// SinkIOWorkers controls the number of IO workers used by sinks that use +// parallelIO to be able to send multiple requests in parallel. +var SinkIOWorkers = settings.RegisterIntSetting( + settings.TenantWritable, + "changefeed.sink_io_workers", + "the number of workers used by changefeeds when sending requests to the sink "+ + "(currently webhook only): <0 disables, 0 assigns a reasonable default, >0 assigns the setting value.", + 0, +).WithPublic() + +// SinkPacerRequestSize specifies how often (measured in CPU time) +// that the Sink batching worker request CPU time from admission control. For +// example, every N milliseconds of CPU work, request N more milliseconds of CPU +// time. +var SinkPacerRequestSize = settings.RegisterDurationSetting( + settings.TenantWritable, + "changefeed.cpu.sink_encoding_allocation", + "an event consumer worker will perform a blocking request for CPU time "+ + "before consuming events. after fully utilizing this CPU time, it will "+ + "request more", + 50*time.Millisecond, + settings.PositiveDuration, +) diff --git a/pkg/ccl/changefeedccl/event_processing.go b/pkg/ccl/changefeedccl/event_processing.go index 2861515fe03e..7c44cf17d815 100644 --- a/pkg/ccl/changefeedccl/event_processing.go +++ b/pkg/ccl/changefeedccl/event_processing.go @@ -104,7 +104,7 @@ func newEventConsumer( } pacerRequestUnit := changefeedbase.EventConsumerPacerRequestSize.Get(&cfg.Settings.SV) - enablePacer := changefeedbase.EventConsumerElasticCPUControlEnabled.Get(&cfg.Settings.SV) + enablePacer := changefeedbase.PerEventElasticCPUControlEnabled.Get(&cfg.Settings.SV) makeConsumer := func(s EventSink, frontier frontier) (eventConsumer, error) { var err error @@ -182,7 +182,11 @@ func newEventConsumer( workerChSize: changefeedbase.EventConsumerWorkerQueueSize.Get(&cfg.Settings.SV), spanFrontier: spanFrontier, } - ss := &safeSink{wrapped: sink, beforeFlush: c.Flush} + ss := sink + // Only webhook supports concurrent EmitRow calls + if sink.getConcreteType() != sinkTypeWebhook { + ss = &safeSink{wrapped: sink} + } c.makeConsumer = func() (eventConsumer, error) { return makeConsumer(ss, c) } diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index 1752392f5136..93cf96e0cb80 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -854,6 +854,9 @@ func randomSinkTypeWithOptions(options feedTestOptions) string { for _, weight := range sinkWeights { weightTotal += weight } + if weightTotal == 0 { + return "skip" + } p := rand.Float32() * float32(weightTotal) var sum float32 = 0 for sink, weight := range sinkWeights { @@ -862,7 +865,7 @@ func randomSinkTypeWithOptions(options feedTestOptions) string { return sink } } - return "kafka" // unreachable + return "skip" // should never occur } // addCloudStorageOptions adds the options necessary to enable a server to run a @@ -982,6 +985,9 @@ func cdcTestNamedWithSystem( cleanupCloudStorage := addCloudStorageOptions(t, &options) sinkType := randomSinkTypeWithOptions(options) + if sinkType == "skip" { + return + } testLabel := sinkType if name != "" { testLabel = fmt.Sprintf("%s/%s", sinkType, name) diff --git a/pkg/ccl/changefeedccl/parallel_io.go b/pkg/ccl/changefeedccl/parallel_io.go new file mode 100644 index 000000000000..25c69327f69d --- /dev/null +++ b/pkg/ccl/changefeedccl/parallel_io.go @@ -0,0 +1,248 @@ +// Copyright 2023 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "context" + "sync" + + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/intsets" + "github.com/cockroachdb/cockroach/pkg/util/retry" +) + +// parallelIO allows performing blocking "IOHandler" calls on in parallel. +// IORequests implement a Keys() function returning keys on which ordering is +// preserved. +// Example: if the events [[a,b], [b,c], [c,d], [e,f]] are all submitted in that +// order, [a,b] and [e,f] can be emitted concurrentyl while [b,c] will block +// until [a,b] completes, then [c,d] will block until [b,c] completes. If [c,d] +// errored, [b,c] would never be sent, and an error would be returned with [c,d] +// in an ioResult struct sent to resultCh. After sending an error to resultCh +// all workers are torn down and no further requests are received or handled. +type parallelIO struct { + retryOpts retry.Options + wg ctxgroup.Group + metrics metricsRecorder + doneCh chan struct{} + + ioHandler IOHandler + + requestCh chan IORequest + resultCh chan *ioResult // Readers should freeIOResult after handling result events +} + +// IORequest represents an abstract unit of IO that has a set of keys upon which +// sequential ordering of fulfillment must be enforced. +type IORequest interface { + Keys() intsets.Fast +} + +// ioResult stores the full request that was sent as well as an error if even +// after retries the IOHanlder was unable to succeed. +type ioResult struct { + request IORequest + err error +} + +var resultPool sync.Pool = sync.Pool{ + New: func() interface{} { + return new(ioResult) + }, +} + +func newIOResult(req IORequest, err error) *ioResult { + res := resultPool.Get().(*ioResult) + res.request = req + res.err = err + return res +} +func freeIOResult(e *ioResult) { + *e = ioResult{} + resultPool.Put(e) +} + +// IOHandler performs a blocking IO operation on an IORequest +type IOHandler func(context.Context, IORequest) error + +func newParallelIO( + ctx context.Context, + retryOpts retry.Options, + numWorkers int, + handler IOHandler, + metrics metricsRecorder, +) *parallelIO { + wg := ctxgroup.WithContext(ctx) + io := ¶llelIO{ + retryOpts: retryOpts, + wg: wg, + metrics: metrics, + ioHandler: handler, + requestCh: make(chan IORequest, numWorkers), + resultCh: make(chan *ioResult, numWorkers), + doneCh: make(chan struct{}), + } + + wg.GoCtx(func(ctx context.Context) error { + return io.runWorkers(ctx, numWorkers) + }) + + return io +} + +// Close stops all workers immediately and returns once they shut down. Inflight +// requests sent to requestCh may never result in being sent to resultCh. +func (p *parallelIO) Close() { + close(p.doneCh) + _ = p.wg.Wait() +} + +func (p *parallelIO) runWorkers(ctx context.Context, numEmitWorkers int) error { + emitWithRetries := func(ctx context.Context, payload IORequest) error { + initialSend := true + return retry.WithMaxAttempts(ctx, p.retryOpts, p.retryOpts.MaxRetries+1, func() error { + if !initialSend { + p.metrics.recordInternalRetry(int64(payload.Keys().Len()), false) + } + initialSend = false + return p.ioHandler(ctx, payload) + }) + } + + // Multiple worker routines handle the IO operations, retrying when necessary. + emitCh := make(chan IORequest, numEmitWorkers) + defer close(emitCh) + workerResultCh := make(chan *ioResult, numEmitWorkers) + + for i := 0; i < numEmitWorkers; i++ { + p.wg.GoCtx(func(ctx context.Context) error { + for req := range emitCh { + result := newIOResult(req, emitWithRetries(ctx, req)) + select { + case <-ctx.Done(): + return ctx.Err() + case <-p.doneCh: + return nil + case workerResultCh <- result: + } + } + return nil + }) + } + + var pendingResults []*ioResult + + submitIO := func(req IORequest) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-p.doneCh: + return nil + case emitCh <- req: + return nil + case res := <-workerResultCh: + // Must also handle results to avoid the above emit being able to block + // forever on all workers being busy trying to emit results. + pendingResults = append(pendingResults, res) + } + } + } + + // The main routine keeps track of incoming and completed requests, where + // admitted requests yet to be completed have their Keys() tracked in an + // intset, and any incoming request with keys already in the intset are placed + // in a Queue to be sent to IO workers once the conflicting requests complete. + var inflight intsets.Fast + var pending []IORequest + + handleResult := func(res *ioResult) error { + if res.err == nil { + // Clear out the completed keys to check for newly valid pending requests. + inflight.DifferenceWith(res.request.Keys()) + + // Check for a pending request that is now able to be sent i.e. is not + // conflicting with any inflight requests or any requests that arrived + // earlier than itself in the pending queue. + pendingKeys := intsets.Fast{} + for i, pendingReq := range pending { + if !inflight.Intersects(pendingReq.Keys()) && !pendingKeys.Intersects(pendingReq.Keys()) { + inflight.UnionWith(pendingReq.Keys()) + pending = append(pending[:i], pending[i+1:]...) + if err := submitIO(pendingReq); err != nil { + return err + } + break + } + + pendingKeys.UnionWith(pendingReq.Keys()) + } + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-p.doneCh: + return nil + case p.resultCh <- res: + return nil + } + } + + // A set of keys can be sent immediately if no yet-to-be-handled request + // observed so far shares any of those keys. + canSendKeys := func(keys intsets.Fast) bool { + if inflight.Intersects(keys) { + return true + } + for _, pendingReq := range pending { + if pendingReq.Keys().Intersects(keys) { + return true + } + } + return false + } + + for { + // Results read from sendToWorker need to be first added to a pendingResults + // list and then handled separately here rather than calling handleResult + // inside sendToWorker, as having a re-entrant sendToWorker -> handleResult + // -> sendToWorker -> handleResult chain creates complexity with managing + // pending requests. + unhandled := pendingResults + pendingResults = nil + for _, res := range unhandled { + if err := handleResult(res); err != nil { + return err + } + } + + select { + case req := <-p.requestCh: + if canSendKeys(req.Keys()) { + // If a request conflicts with any currently unhandled requests, add it + // to the pending queue to be rechecked for validity later. + pending = append(pending, req) + } else { + inflight.UnionWith(req.Keys()) + if err := submitIO(req); err != nil { + return err + } + } + case res := <-workerResultCh: + if err := handleResult(res); err != nil { + return err + } + case <-ctx.Done(): + return ctx.Err() + case <-p.doneCh: + return nil + } + } +} diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index 70eff852d348..9816177ce64f 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -10,7 +10,11 @@ package changefeedccl import ( "context" + "encoding/json" + "math" "net/url" + "runtime" + "strconv" "strings" "time" @@ -19,13 +23,19 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/admission" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/bufalloc" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" @@ -146,6 +156,16 @@ func getAndDialSink( return sink, sink.Dial() } +// NewWebhookSinkEnabled determines whether or not the refactored Webhook sink +// or the deprecated sink should be used. +var NewWebhookSinkEnabled = settings.RegisterBoolSetting( + settings.TenantWritable, + "changefeed.new_webhook_sink_enabled", + "if enabled, this setting enables a new implementation of the webhook sink"+ + " that allows for a much higher throughput", + util.ConstantWithMetamorphicTestBool("changefeed.new_webhook_sink_enabled", false), +) + func getSink( ctx context.Context, serverCfg *execinfra.ServerConfig, @@ -207,10 +227,17 @@ func getSink( if err != nil { return nil, err } - return validateOptionsAndMakeSink(changefeedbase.WebhookValidOptions, func() (Sink, error) { - return makeWebhookSink(ctx, sinkURL{URL: u}, encodingOpts, webhookOpts, - defaultWorkerCount(), timeutil.DefaultTimeSource{}, metricsBuilder) - }) + if changefeedbase.NewWebhookSinkEnabled.Get(&serverCfg.Settings.SV) { + return validateOptionsAndMakeSink(changefeedbase.WebhookValidOptions, func() (Sink, error) { + return makeWebhookSink(ctx, sinkURL{URL: u}, encodingOpts, webhookOpts, + numSinkIOWorkers(serverCfg), sinkPacer(ctx, serverCfg), timeutil.DefaultTimeSource{}, metricsBuilder) + }) + } else { + return validateOptionsAndMakeSink(changefeedbase.WebhookValidOptions, func() (Sink, error) { + return makeDeprecatedWebhookSink(ctx, sinkURL{URL: u}, encodingOpts, webhookOpts, + defaultWorkerCount(), timeutil.DefaultTimeSource{}, metricsBuilder) + }) + } case isPubsubSink(u): // TODO: add metrics to pubsubsink return MakePubsubSink(ctx, u, encodingOpts, AllTargets(feedCfg), opts.IsSet(changefeedbase.OptUnordered)) @@ -606,12 +633,10 @@ func (n *nullSink) Dial() error { } // safeSink wraps an EventSink in a mutex so it's methods are -// thread safe. It also has a beforeFlush hook which is called -// at the beginning of safeSink.Flush(). +// thread safe. type safeSink struct { syncutil.Mutex - beforeFlush func(ctx context.Context) error - wrapped EventSink + wrapped EventSink } var _ EventSink = (*safeSink)(nil) @@ -645,9 +670,6 @@ func (s *safeSink) EmitRow( } func (s *safeSink) Flush(ctx context.Context) error { - if err := s.beforeFlush(ctx); err != nil { - return err - } s.Lock() defer s.Unlock() return s.wrapped.Flush(ctx) @@ -673,3 +695,145 @@ type SinkWithEncoder interface { Flush(ctx context.Context) error } + +// proper JSON schema for sink config: +// +// { +// "Flush": { +// "Messages": ..., +// "Bytes": ..., +// "Frequency": ..., +// }, +// "Retry": { +// "Max": ..., +// "Backoff": ..., +// } +// } +type sinkJSONConfig struct { + Flush sinkBatchConfig `json:",omitempty"` + Retry sinkRetryConfig `json:",omitempty"` +} + +type sinkBatchConfig struct { + Bytes, Messages int `json:",omitempty"` + Frequency jsonDuration `json:",omitempty"` +} + +// wrapper structs to unmarshal json, retry.Options will be the actual config +type sinkRetryConfig struct { + Max jsonMaxRetries `json:",omitempty"` + Backoff jsonDuration `json:",omitempty"` +} + +func defaultRetryConfig() retry.Options { + opts := retry.Options{ + InitialBackoff: 500 * time.Millisecond, + MaxRetries: 3, + Multiplier: 2, + } + // max backoff should be initial * 2 ^ maxRetries + opts.MaxBackoff = opts.InitialBackoff * time.Duration(int(math.Pow(2.0, float64(opts.MaxRetries)))) + return opts +} + +func getSinkConfigFromJson( + jsonStr changefeedbase.SinkSpecificJSONConfig, base sinkJSONConfig, +) (batchCfg sinkBatchConfig, retryCfg retry.Options, err error) { + retryCfg = defaultRetryConfig() + + var cfg = base + cfg.Retry.Max = jsonMaxRetries(retryCfg.MaxRetries) + cfg.Retry.Backoff = jsonDuration(retryCfg.InitialBackoff) + if jsonStr != `` { + // set retry defaults to be overridden if included in JSON + if err = json.Unmarshal([]byte(jsonStr), &cfg); err != nil { + return batchCfg, retryCfg, errors.Wrapf(err, "error unmarshalling json") + } + } + + // don't support negative values + if cfg.Flush.Messages < 0 || cfg.Flush.Bytes < 0 || cfg.Flush.Frequency < 0 || + cfg.Retry.Max < 0 || cfg.Retry.Backoff < 0 { + return batchCfg, retryCfg, errors.Errorf("invalid option value %s, all config values must be non-negative", changefeedbase.OptWebhookSinkConfig) + } + + // errors if other batch values are set, but frequency is not + if (cfg.Flush.Messages > 0 || cfg.Flush.Bytes > 0) && cfg.Flush.Frequency == 0 { + return batchCfg, retryCfg, errors.Errorf("invalid option value %s, flush frequency is not set, messages may never be sent", changefeedbase.OptWebhookSinkConfig) + } + + retryCfg.MaxRetries = int(cfg.Retry.Max) + retryCfg.InitialBackoff = time.Duration(cfg.Retry.Backoff) + return cfg.Flush, retryCfg, nil +} + +type jsonMaxRetries int + +func (j *jsonMaxRetries) UnmarshalJSON(b []byte) error { + var i int64 + // try to parse as int + i, err := strconv.ParseInt(string(b), 10, 64) + if err == nil { + if i <= 0 { + return errors.Errorf("max retry count must be a positive integer. use 'inf' for infinite retries.") + } + *j = jsonMaxRetries(i) + } else { + // if that fails, try to parse as string (only accept 'inf') + var s string + // using unmarshal here to remove quotes around the string + if err := json.Unmarshal(b, &s); err != nil { + return err + } + if strings.ToLower(s) == "inf" { + // if used wants infinite retries, set to zero as retry.Options interprets this as infinity + *j = 0 + } else if n, err := strconv.Atoi(s); err == nil { // also accept ints as strings + *j = jsonMaxRetries(n) + } else { + return errors.Errorf("max retries must be either a positive int or 'inf' for infinite retries.") + } + } + return nil +} + +func numSinkIOWorkers(cfg *execinfra.ServerConfig) int { + numWorkers := changefeedbase.SinkIOWorkers.Get(&cfg.Settings.SV) + if numWorkers > 0 { + return int(numWorkers) + } + + idealNumber := runtime.GOMAXPROCS(0) + if idealNumber < 1 { + return 1 + } + if idealNumber > 32 { + return 32 + } + return idealNumber +} + +func sinkPacer(ctx context.Context, cfg *execinfra.ServerConfig) *admission.Pacer { + pacerRequestUnit := changefeedbase.SinkPacerRequestSize.Get(&cfg.Settings.SV) + enablePacer := changefeedbase.PerEventElasticCPUControlEnabled.Get(&cfg.Settings.SV) + + var pacer *admission.Pacer = nil + if enablePacer { + tenantID, ok := roachpb.ClientTenantFromContext(ctx) + if !ok { + tenantID = roachpb.SystemTenantID + } + + pacer = cfg.AdmissionPacerFactory.NewPacer( + pacerRequestUnit, + admission.WorkInfo{ + TenantID: tenantID, + Priority: admissionpb.BulkNormalPri, + CreateTime: timeutil.Now().UnixNano(), + BypassAdmission: false, + }, + ) + } + + return pacer +} diff --git a/pkg/ccl/changefeedccl/sink_webhook.go b/pkg/ccl/changefeedccl/sink_webhook.go index 09fd6cb508a2..ff0bd20e68c7 100644 --- a/pkg/ccl/changefeedccl/sink_webhook.go +++ b/pkg/ccl/changefeedccl/sink_webhook.go @@ -17,11 +17,9 @@ import ( "fmt" "hash/crc32" "io" - "math" "net" "net/http" "net/url" - "strconv" "strings" "time" @@ -36,23 +34,7 @@ import ( "github.com/cockroachdb/errors" ) -const ( - applicationTypeJSON = `application/json` - applicationTypeCSV = `text/csv` - authorizationHeader = `Authorization` -) - -func isWebhookSink(u *url.URL) bool { - switch u.Scheme { - // allow HTTP here but throw an error later to make it clear HTTPS is required - case changefeedbase.SinkSchemeWebhookHTTP, changefeedbase.SinkSchemeWebhookHTTPS: - return true - default: - return false - } -} - -type webhookSink struct { +type deprecatedWebhookSink struct { // Webhook configuration. parallelism int retryCfg retry.Options @@ -80,11 +62,11 @@ type webhookSink struct { workerCtx context.Context workerGroup ctxgroup.Group exitWorkers func() // Signaled to shut down all workers. - eventsChans []chan []messagePayload + eventsChans []chan []deprecatedMessagePayload metrics metricsRecorder } -func (s *webhookSink) getConcreteType() sinkType { +func (s *deprecatedWebhookSink) getConcreteType() sinkType { return sinkTypeWebhook } @@ -100,7 +82,7 @@ type encodedPayload struct { mvcc hlc.Timestamp } -func encodePayloadJSONWebhook(messages []messagePayload) (encodedPayload, error) { +func encodePayloadJSONWebhook(messages []deprecatedMessagePayload) (encodedPayload, error) { result := encodedPayload{ emitTime: timeutil.Now(), } @@ -129,7 +111,7 @@ func encodePayloadJSONWebhook(messages []messagePayload) (encodedPayload, error) return result, err } -func encodePayloadCSVWebhook(messages []messagePayload) (encodedPayload, error) { +func encodePayloadCSVWebhook(messages []deprecatedMessagePayload) (encodedPayload, error) { result := encodedPayload{ emitTime: timeutil.Now(), } @@ -150,7 +132,7 @@ func encodePayloadCSVWebhook(messages []messagePayload) (encodedPayload, error) return result, nil } -type messagePayload struct { +type deprecatedMessagePayload struct { // Payload message fields. key []byte val []byte @@ -162,15 +144,15 @@ type messagePayload struct { // webhookMessage contains either messagePayload or a flush request. type webhookMessage struct { flushDone *chan struct{} - payload messagePayload + payload deprecatedMessagePayload } type batch struct { - buffer []messagePayload + buffer []deprecatedMessagePayload bufferBytes int } -func (b *batch) addToBuffer(m messagePayload) { +func (b *batch) addToBuffer(m deprecatedMessagePayload) { b.bufferBytes += len(m.val) b.buffer = append(b.buffer, m) } @@ -185,36 +167,6 @@ type batchConfig struct { Frequency jsonDuration `json:",omitempty"` } -type jsonMaxRetries int - -func (j *jsonMaxRetries) UnmarshalJSON(b []byte) error { - var i int64 - // try to parse as int - i, err := strconv.ParseInt(string(b), 10, 64) - if err == nil { - if i <= 0 { - return errors.Errorf("max retry count must be a positive integer. use 'inf' for infinite retries.") - } - *j = jsonMaxRetries(i) - } else { - // if that fails, try to parse as string (only accept 'inf') - var s string - // using unmarshal here to remove quotes around the string - if err := json.Unmarshal(b, &s); err != nil { - return err - } - if strings.ToLower(s) == "inf" { - // if used wants infinite retries, set to zero as retry.Options interprets this as infinity - *j = 0 - } else if n, err := strconv.Atoi(s); err == nil { // also accept ints as strings - *j = jsonMaxRetries(n) - } else { - return errors.Errorf("max retries must be either a positive int or 'inf' for infinite retries.") - } - } - return nil -} - // wrapper structs to unmarshal json, retry.Options will be the actual config type retryConfig struct { Max jsonMaxRetries `json:",omitempty"` @@ -239,7 +191,7 @@ type webhookSinkConfig struct { Retry retryConfig `json:",omitempty"` } -func (s *webhookSink) getWebhookSinkConfig( +func (s *deprecatedWebhookSink) getWebhookSinkConfig( jsonStr changefeedbase.SinkSpecificJSONConfig, ) (batchCfg batchConfig, retryCfg retry.Options, err error) { retryCfg = defaultRetryConfig() @@ -270,7 +222,7 @@ func (s *webhookSink) getWebhookSinkConfig( return cfg.Flush, retryCfg, nil } -func makeWebhookSink( +func makeDeprecatedWebhookSink( ctx context.Context, u sinkURL, encodingOpts changefeedbase.EncodingOptions, @@ -312,7 +264,7 @@ func makeWebhookSink( ctx, cancel := context.WithCancel(ctx) - sink := &webhookSink{ + sink := &deprecatedWebhookSink{ workerCtx: ctx, authHeader: opts.AuthHeader, exitWorkers: cancel, @@ -329,7 +281,7 @@ func makeWebhookSink( } // TODO(yevgeniy): Establish HTTP connection in Dial(). - sink.client, err = makeWebhookClient(u, connTimeout) + sink.client, err = deprecatedMakeWebhookClient(u, connTimeout) if err != nil { return nil, err } @@ -350,7 +302,7 @@ func makeWebhookSink( return sink, nil } -func makeWebhookClient(u sinkURL, timeout time.Duration) (*httputil.Client, error) { +func deprecatedMakeWebhookClient(u sinkURL, timeout time.Duration) (*httputil.Client, error) { client := &httputil.Client{ Client: &http.Client{ Timeout: timeout, @@ -417,30 +369,19 @@ func makeWebhookClient(u sinkURL, timeout time.Duration) (*httputil.Client, erro return client, nil } -func defaultRetryConfig() retry.Options { - opts := retry.Options{ - InitialBackoff: 500 * time.Millisecond, - MaxRetries: 3, - Multiplier: 2, - } - // max backoff should be initial * 2 ^ maxRetries - opts.MaxBackoff = opts.InitialBackoff * time.Duration(int(math.Pow(2.0, float64(opts.MaxRetries)))) - return opts -} - // defaultWorkerCount() is the number of CPU's on the machine func defaultWorkerCount() int { return system.NumCPU() } -func (s *webhookSink) Dial() error { +func (s *deprecatedWebhookSink) Dial() error { s.setupWorkers() return nil } -func (s *webhookSink) setupWorkers() { +func (s *deprecatedWebhookSink) setupWorkers() { // setup events channels to send to workers and the worker group - s.eventsChans = make([]chan []messagePayload, s.parallelism) + s.eventsChans = make([]chan []deprecatedMessagePayload, s.parallelism) s.workerGroup = ctxgroup.WithContext(s.workerCtx) s.batchChan = make(chan webhookMessage) @@ -455,7 +396,7 @@ func (s *webhookSink) setupWorkers() { return nil }) for i := 0; i < s.parallelism; i++ { - s.eventsChans[i] = make(chan []messagePayload) + s.eventsChans[i] = make(chan []deprecatedMessagePayload) j := i s.workerGroup.GoCtx(func(ctx context.Context) error { s.workerLoop(j) @@ -464,7 +405,7 @@ func (s *webhookSink) setupWorkers() { } } -func (s *webhookSink) shouldSendBatch(b batch) bool { +func (s *deprecatedWebhookSink) shouldSendBatch(b batch) bool { // similar to sarama, send batch if: // everything is zero (default) // any one of the conditions are met UNLESS the condition is zero which means never batch @@ -483,8 +424,8 @@ func (s *webhookSink) shouldSendBatch(b batch) bool { } } -func (s *webhookSink) splitAndSendBatch(batch []messagePayload) error { - workerBatches := make([][]messagePayload, s.parallelism) +func (s *deprecatedWebhookSink) splitAndSendBatch(batch []deprecatedMessagePayload) error { + workerBatches := make([][]deprecatedMessagePayload, s.parallelism) for _, msg := range batch { // split batch into per-worker batches i := s.workerIndex(msg.key) @@ -504,7 +445,7 @@ func (s *webhookSink) splitAndSendBatch(batch []messagePayload) error { } // flushWorkers sends flush request to each worker and waits for each one to acknowledge. -func (s *webhookSink) flushWorkers(done chan struct{}) error { +func (s *deprecatedWebhookSink) flushWorkers(done chan struct{}) error { for i := 0; i < len(s.eventsChans); i++ { // Ability to write a nil message to events channel indicates that // the worker has processed all other messages. @@ -525,7 +466,7 @@ func (s *webhookSink) flushWorkers(done chan struct{}) error { // batchWorker ingests messages from EmitRow into a batch and splits them into // per-worker batches to be sent separately -func (s *webhookSink) batchWorker() { +func (s *deprecatedWebhookSink) batchWorker() { var batchTracker batch batchTimer := s.ts.NewTimer() defer batchTimer.Stop() @@ -578,7 +519,7 @@ func (s *webhookSink) batchWorker() { } } -func (s *webhookSink) workerLoop(workerIndex int) { +func (s *deprecatedWebhookSink) workerLoop(workerIndex int) { for { select { case <-s.workerCtx.Done(): @@ -613,14 +554,14 @@ func (s *webhookSink) workerLoop(workerIndex int) { } } -func (s *webhookSink) sendMessageWithRetries(ctx context.Context, reqBody []byte) error { +func (s *deprecatedWebhookSink) sendMessageWithRetries(ctx context.Context, reqBody []byte) error { requestFunc := func() error { return s.sendMessage(ctx, reqBody) } return retry.WithMaxAttempts(ctx, s.retryCfg, s.retryCfg.MaxRetries+1, requestFunc) } -func (s *webhookSink) sendMessage(ctx context.Context, reqBody []byte) error { +func (s *deprecatedWebhookSink) sendMessage(ctx context.Context, reqBody []byte) error { req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.url.String(), bytes.NewReader(reqBody)) if err != nil { return err @@ -658,13 +599,13 @@ func (s *webhookSink) sendMessage(ctx context.Context, reqBody []byte) error { // deterministically assigned to the same worker. Since we have a channel per // worker, we can ensure per-worker ordering and therefore guarantee per-key // ordering. -func (s *webhookSink) workerIndex(key []byte) uint32 { +func (s *deprecatedWebhookSink) workerIndex(key []byte) uint32 { return crc32.ChecksumIEEE(key) % uint32(s.parallelism) } // exitWorkersWithError saves the first error message encountered by webhook workers, // and requests all workers to terminate. -func (s *webhookSink) exitWorkersWithError(err error) { +func (s *deprecatedWebhookSink) exitWorkersWithError(err error) { // errChan has buffer size 1, first error will be saved to the buffer and // subsequent errors will be ignored select { @@ -675,7 +616,7 @@ func (s *webhookSink) exitWorkersWithError(err error) { } // sinkError checks to see if any errors occurred inside workers go routines. -func (s *webhookSink) sinkError() error { +func (s *deprecatedWebhookSink) sinkError() error { select { case err := <-s.errChan: return err @@ -684,7 +625,7 @@ func (s *webhookSink) sinkError() error { } } -func (s *webhookSink) EmitRow( +func (s *deprecatedWebhookSink) EmitRow( ctx context.Context, topic TopicDescriptor, key, value []byte, @@ -702,7 +643,7 @@ func (s *webhookSink) EmitRow( case err := <-s.errChan: return err case s.batchChan <- webhookMessage{ - payload: messagePayload{ + payload: deprecatedMessagePayload{ key: key, val: value, alloc: alloc, @@ -714,7 +655,7 @@ func (s *webhookSink) EmitRow( return nil } -func (s *webhookSink) EmitResolvedTimestamp( +func (s *deprecatedWebhookSink) EmitResolvedTimestamp( ctx context.Context, encoder Encoder, resolved hlc.Timestamp, ) error { defer s.metrics.recordResolvedCallback()() @@ -745,7 +686,7 @@ func (s *webhookSink) EmitResolvedTimestamp( return nil } -func (s *webhookSink) Flush(ctx context.Context) error { +func (s *deprecatedWebhookSink) Flush(ctx context.Context) error { s.metrics.recordFlushRequestCallback()() // Send flush request. @@ -768,7 +709,7 @@ func (s *webhookSink) Flush(ctx context.Context) error { } } -func (s *webhookSink) Close() error { +func (s *deprecatedWebhookSink) Close() error { s.exitWorkers() // ignore errors here since we're closing the sink anyway _ = s.workerGroup.Wait() diff --git a/pkg/ccl/changefeedccl/sink_webhook_test.go b/pkg/ccl/changefeedccl/sink_webhook_test.go index 199734be6bf5..0037f5cfd6fe 100644 --- a/pkg/ccl/changefeedccl/sink_webhook_test.go +++ b/pkg/ccl/changefeedccl/sink_webhook_test.go @@ -15,6 +15,7 @@ import ( "net/http" "net/url" "strings" + "sync/atomic" "testing" "time" @@ -83,7 +84,7 @@ func setupWebhookSinkWithDetails( if err != nil { return nil, err } - sinkSrc, err := makeWebhookSink(ctx, sinkURL{URL: u}, encodingOpts, sinkOpts, parallelism, source, nilMetricsRecorderBuilder) + sinkSrc, err := makeWebhookSink(ctx, sinkURL{URL: u}, encodingOpts, sinkOpts, parallelism, nil, source, nilMetricsRecorderBuilder) if err != nil { return nil, err } @@ -172,8 +173,6 @@ func TestWebhookSink(t *testing.T) { require.NoError(t, sinkSrcNoCert.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc)) require.Regexp(t, "x509", sinkSrcNoCert.Flush(context.Background())) - require.EqualError(t, sinkSrcNoCert.EmitRow(context.Background(), nil, nil, nil, zeroTS, zeroTS, zeroAlloc), - `context canceled`) params.Set(changefeedbase.SinkParamSkipTLSVerify, "true") sinkDestHost.RawQuery = params.Encode() @@ -191,8 +190,6 @@ func TestWebhookSink(t *testing.T) { err = sinkSrc.Flush(context.Background()) require.Error(t, err) require.Contains(t, err.Error(), fmt.Sprintf(`Post "%s":`, sinkDest.URL())) - require.EqualError(t, sinkSrc.EmitRow(context.Background(), nil, nil, nil, zeroTS, zeroTS, zeroAlloc), - `context canceled`) sinkDestHTTP, err := cdctest.StartMockWebhookSinkInsecure() require.NoError(t, err) @@ -207,8 +204,6 @@ func TestWebhookSink(t *testing.T) { require.EqualError(t, sinkSrcWrongProtocol.Flush(context.Background()), fmt.Sprintf(`Post "%s": http: server gave HTTP response to HTTPS client`, fmt.Sprintf("https://%s", strings.TrimPrefix(sinkDestHTTP.URL(), "http://")))) - require.EqualError(t, sinkSrcWrongProtocol.EmitRow(context.Background(), nil, nil, nil, zeroTS, zeroTS, zeroAlloc), - `context canceled`) sinkDestSecure, err := cdctest.StartMockWebhookSinkSecure(cert) require.NoError(t, err) @@ -230,6 +225,7 @@ func TestWebhookSink(t *testing.T) { Opts: opts, } + require.NoError(t, sinkSrc.Close()) sinkSrc, err = setupWebhookSinkWithDetails(context.Background(), details, parallelism, timeutil.DefaultTimeSource{}) require.NoError(t, err) @@ -305,8 +301,6 @@ func TestWebhookSinkWithAuthOptions(t *testing.T) { require.NoError(t, sinkSrcNoCreds.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc)) require.EqualError(t, sinkSrcNoCreds.Flush(context.Background()), "401 Unauthorized: ") - require.EqualError(t, sinkSrcNoCreds.EmitRow(context.Background(), nil, nil, nil, zeroTS, zeroTS, zeroAlloc), - `context canceled`) // wrong credentials should result in a 401 as well var wrongAuthHeader string @@ -318,8 +312,6 @@ func TestWebhookSinkWithAuthOptions(t *testing.T) { require.NoError(t, sinkSrcWrongCreds.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc)) require.EqualError(t, sinkSrcWrongCreds.Flush(context.Background()), "401 Unauthorized: ") - require.EqualError(t, sinkSrcWrongCreds.EmitRow(context.Background(), nil, nil, nil, zeroTS, zeroTS, zeroAlloc), - `context canceled`) require.NoError(t, sinkSrc.Close()) require.NoError(t, sinkSrcNoCreds.Close()) @@ -603,6 +595,13 @@ func TestWebhookSinkConfig(t *testing.T) { sinkSrc, err := setupWebhookSinkWithDetails(context.Background(), details, parallelism, mt) require.NoError(t, err) + batchingSink, ok := sinkSrc.(*batchingSink) + require.True(t, ok) + var appendCount int32 = 0 + batchingSink.knobs.OnAppend = func(event *rowEvent) { + atomic.AddInt32(&appendCount, 1) + } + // send incomplete batch require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1001},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) @@ -611,11 +610,10 @@ func TestWebhookSinkConfig(t *testing.T) { require.Equal(t, sinkDest.Latest(), "") testutils.SucceedsSoon(t, func() error { - // wait for the timer in batch worker to be set (1 hour from now, as specified by config) before advancing time. - if len(mt.Timers()) == 1 && mt.Timers()[0] == mt.Now().Add(time.Hour) { + if atomic.LoadInt32(&appendCount) >= 2 { return nil } - return errors.New("Waiting for timer to be created by batch worker") + return errors.New("Waiting for rows to be buffered") }) mt.Advance(time.Hour) require.NoError(t, sinkSrc.Flush(context.Background())) diff --git a/pkg/ccl/changefeedccl/sink_webhook_v2.go b/pkg/ccl/changefeedccl/sink_webhook_v2.go new file mode 100644 index 000000000000..c0e0640b3990 --- /dev/null +++ b/pkg/ccl/changefeedccl/sink_webhook_v2.go @@ -0,0 +1,380 @@ +// Copyright 2023 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "bytes" + "context" + "crypto/tls" + "crypto/x509" + "fmt" + "io" + "net" + "net/http" + "net/url" + "strings" + "time" + + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" + "github.com/cockroachdb/cockroach/pkg/util/admission" + "github.com/cockroachdb/cockroach/pkg/util/httputil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" +) + +const ( + applicationTypeJSON = `application/json` + applicationTypeCSV = `text/csv` + authorizationHeader = `Authorization` +) + +func isWebhookSink(u *url.URL) bool { + switch u.Scheme { + // allow HTTP here but throw an error later to make it clear HTTPS is required + case changefeedbase.SinkSchemeWebhookHTTP, changefeedbase.SinkSchemeWebhookHTTPS: + return true + default: + return false + } +} + +type webhookSinkClient struct { + ctx context.Context + format changefeedbase.FormatType + url sinkURL + authHeader string + batchCfg sinkBatchConfig + client *httputil.Client +} + +var _ SinkClient = (*webhookSinkClient)(nil) + +func makeWebhookSinkClient( + ctx context.Context, + u sinkURL, + encodingOpts changefeedbase.EncodingOptions, + opts changefeedbase.WebhookSinkOptions, + batchCfg sinkBatchConfig, + parallelism int, +) (SinkClient, error) { + err := validateWebhookOpts(u, encodingOpts, opts) + if err != nil { + return nil, err + } + + u.Scheme = strings.TrimPrefix(u.Scheme, `webhook-`) + + sinkClient := &webhookSinkClient{ + ctx: ctx, + authHeader: opts.AuthHeader, + format: encodingOpts.Format, + batchCfg: batchCfg, + } + + var connTimeout time.Duration + if opts.ClientTimeout != nil { + connTimeout = *opts.ClientTimeout + } + sinkClient.client, err = makeWebhookClient(u, connTimeout, parallelism) + if err != nil { + return nil, err + } + + // remove known query params from sink URL before setting in sink config + sinkURLParsed, err := url.Parse(u.String()) + if err != nil { + return nil, err + } + params := sinkURLParsed.Query() + params.Del(changefeedbase.SinkParamSkipTLSVerify) + params.Del(changefeedbase.SinkParamCACert) + params.Del(changefeedbase.SinkParamClientCert) + params.Del(changefeedbase.SinkParamClientKey) + sinkURLParsed.RawQuery = params.Encode() + sinkClient.url = sinkURL{URL: sinkURLParsed} + + return sinkClient, nil +} + +func makeWebhookClient( + u sinkURL, timeout time.Duration, parallelism int, +) (*httputil.Client, error) { + client := &httputil.Client{ + Client: &http.Client{ + Timeout: timeout, + Transport: &http.Transport{ + DialContext: (&net.Dialer{Timeout: timeout}).DialContext, + MaxConnsPerHost: parallelism, + MaxIdleConnsPerHost: parallelism, + IdleConnTimeout: time.Minute, + ForceAttemptHTTP2: true, + }, + }, + } + + dialConfig := struct { + tlsSkipVerify bool + caCert []byte + clientCert []byte + clientKey []byte + }{} + + transport := client.Transport.(*http.Transport) + + if _, err := u.consumeBool(changefeedbase.SinkParamSkipTLSVerify, &dialConfig.tlsSkipVerify); err != nil { + return nil, err + } + if err := u.decodeBase64(changefeedbase.SinkParamCACert, &dialConfig.caCert); err != nil { + return nil, err + } + if err := u.decodeBase64(changefeedbase.SinkParamClientCert, &dialConfig.clientCert); err != nil { + return nil, err + } + if err := u.decodeBase64(changefeedbase.SinkParamClientKey, &dialConfig.clientKey); err != nil { + return nil, err + } + + transport.TLSClientConfig = &tls.Config{ + InsecureSkipVerify: dialConfig.tlsSkipVerify, + } + + if dialConfig.caCert != nil { + caCertPool, err := x509.SystemCertPool() + if err != nil { + return nil, errors.Wrap(err, "could not load system root CA pool") + } + if caCertPool == nil { + caCertPool = x509.NewCertPool() + } + if !caCertPool.AppendCertsFromPEM(dialConfig.caCert) { + return nil, errors.Errorf("failed to parse certificate data:%s", string(dialConfig.caCert)) + } + transport.TLSClientConfig.RootCAs = caCertPool + } + + if dialConfig.clientCert != nil && dialConfig.clientKey == nil { + return nil, errors.Errorf(`%s requires %s to be set`, changefeedbase.SinkParamClientCert, changefeedbase.SinkParamClientKey) + } else if dialConfig.clientKey != nil && dialConfig.clientCert == nil { + return nil, errors.Errorf(`%s requires %s to be set`, changefeedbase.SinkParamClientKey, changefeedbase.SinkParamClientCert) + } + + if dialConfig.clientCert != nil && dialConfig.clientKey != nil { + cert, err := tls.X509KeyPair(dialConfig.clientCert, dialConfig.clientKey) + if err != nil { + return nil, errors.Wrap(err, `invalid client certificate data provided`) + } + transport.TLSClientConfig.Certificates = []tls.Certificate{cert} + } + + return client, nil +} + +func (wse *webhookSinkClient) makePayloadForBytes(body []byte) (SinkPayload, error) { + req, err := http.NewRequestWithContext(wse.ctx, http.MethodPost, wse.url.String(), bytes.NewReader(body)) + if err != nil { + return nil, err + } + switch wse.format { + case changefeedbase.OptFormatJSON: + req.Header.Set("Content-Type", applicationTypeJSON) + case changefeedbase.OptFormatCSV: + req.Header.Set("Content-Type", applicationTypeCSV) + } + + if wse.authHeader != "" { + req.Header.Set(authorizationHeader, wse.authHeader) + } + + return req, nil +} + +// MakeResolvedPayload implements the SinkClient interface +func (wse *webhookSinkClient) MakeResolvedPayload(body []byte, topic string) (SinkPayload, error) { + return wse.makePayloadForBytes(body) +} + +// Flush implements the SinkClient interface +func (wse *webhookSinkClient) Flush(ctx context.Context, batch SinkPayload) error { + req := batch.(*http.Request) + res, err := wse.client.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + + if !(res.StatusCode >= http.StatusOK && res.StatusCode < http.StatusMultipleChoices) { + resBody, err := io.ReadAll(res.Body) + if err != nil { + return errors.Wrapf(err, "failed to read body for HTTP response with status: %d", res.StatusCode) + } + return fmt.Errorf("%s: %s", res.Status, string(resBody)) + } + return nil +} + +// Close implements the SinkClient interface +func (wse *webhookSinkClient) Close() error { + wse.client.CloseIdleConnections() + return nil +} + +func validateWebhookOpts( + u sinkURL, encodingOpts changefeedbase.EncodingOptions, opts changefeedbase.WebhookSinkOptions, +) error { + if u.Scheme != changefeedbase.SinkSchemeWebhookHTTPS { + return errors.Errorf(`this sink requires %s`, changefeedbase.SinkSchemeHTTPS) + } + + switch encodingOpts.Format { + case changefeedbase.OptFormatJSON: + case changefeedbase.OptFormatCSV: + default: + return errors.Errorf(`this sink is incompatible with %s=%s`, + changefeedbase.OptFormat, encodingOpts.Format) + } + + switch encodingOpts.Envelope { + case changefeedbase.OptEnvelopeWrapped, changefeedbase.OptEnvelopeBare: + default: + return errors.Errorf(`this sink is incompatible with %s=%s`, + changefeedbase.OptEnvelope, encodingOpts.Envelope) + } + + encodingOpts.TopicInValue = true + + if encodingOpts.Envelope != changefeedbase.OptEnvelopeBare { + encodingOpts.KeyInValue = true + } + + return nil +} + +type webhookCSVBuffer struct { + bytes []byte + messageCount int + sc *webhookSinkClient +} + +var _ BatchBuffer = (*webhookCSVBuffer)(nil) + +// Append implements the BatchBuffer interface +func (cw *webhookCSVBuffer) Append(key []byte, value []byte, topic string) { + cw.bytes = append(cw.bytes, value...) + cw.messageCount += 1 +} + +// ShouldFlush implements the BatchBuffer interface +func (cw *webhookCSVBuffer) ShouldFlush() bool { + return cw.sc.shouldFlush(len(cw.bytes), cw.messageCount) +} + +// Close implements the BatchBuffer interface +func (cw *webhookCSVBuffer) Close() (SinkPayload, error) { + return cw.sc.makePayloadForBytes(cw.bytes) +} + +type webhookJSONBuffer struct { + messages [][]byte + numBytes int + sc *webhookSinkClient +} + +var _ BatchBuffer = (*webhookJSONBuffer)(nil) + +// Append implements the BatchBuffer interface +func (jw *webhookJSONBuffer) Append(key []byte, value []byte, topic string) { + jw.messages = append(jw.messages, value) + jw.numBytes += len(value) +} + +// ShouldFlush implements the BatchBuffer interface +func (jw *webhookJSONBuffer) ShouldFlush() bool { + return jw.sc.shouldFlush(jw.numBytes, len(jw.messages)) +} + +// Close implements the BatchBuffer interface +func (jw *webhookJSONBuffer) Close() (SinkPayload, error) { + var buffer bytes.Buffer + prefix := "{\"payload\":[" + suffix := fmt.Sprintf("],\"length\":%d}", len(jw.messages)) + + // Grow all at once to avoid reallocations + buffer.Grow(len(prefix) + jw.numBytes /* kvs */ + len(jw.messages) /* commas */ + len(suffix)) + + buffer.WriteString(prefix) + for i, msg := range jw.messages { + if i != 0 { + buffer.WriteByte(',') + } + buffer.Write(msg) + } + buffer.WriteString(suffix) + return jw.sc.makePayloadForBytes(buffer.Bytes()) +} + +func (wse *webhookSinkClient) MakeBatchBuffer() BatchBuffer { + if wse.format == changefeedbase.OptFormatCSV { + return &webhookCSVBuffer{sc: wse} + } else { + return &webhookJSONBuffer{ + sc: wse, + messages: make([][]byte, 0, wse.batchCfg.Messages), + } + } +} + +func (wse *webhookSinkClient) shouldFlush(bytes int, messages int) bool { + switch { + // all zero values is interpreted as flush every time + case wse.batchCfg.Messages == 0 && wse.batchCfg.Bytes == 0 && wse.batchCfg.Frequency == 0: + return true + // messages threshold has been reached + case wse.batchCfg.Messages > 0 && messages >= wse.batchCfg.Messages: + return true + // bytes threshold has been reached + case wse.batchCfg.Bytes > 0 && bytes >= wse.batchCfg.Bytes: + return true + default: + return false + } +} + +func makeWebhookSink( + ctx context.Context, + u sinkURL, + encodingOpts changefeedbase.EncodingOptions, + opts changefeedbase.WebhookSinkOptions, + parallelism int, + pacer *admission.Pacer, + source timeutil.TimeSource, + mb metricsRecorderBuilder, +) (Sink, error) { + batchCfg, retryOpts, err := getSinkConfigFromJson(opts.JSONConfig, sinkJSONConfig{}) + if err != nil { + return nil, err + } + + sinkClient, err := makeWebhookSinkClient(ctx, u, encodingOpts, opts, batchCfg, parallelism) + if err != nil { + return nil, err + } + + return makeBatchingSink( + ctx, + sinkTypeWebhook, + sinkClient, + time.Duration(batchCfg.Frequency), + retryOpts, + parallelism, + nil, + pacer, + source, + mb(requiresResourceAccounting), + ), nil +} diff --git a/pkg/cmd/roachtest/tests/cdc.go b/pkg/cmd/roachtest/tests/cdc.go index 196dcf340267..0d0f098e66a2 100644 --- a/pkg/cmd/roachtest/tests/cdc.go +++ b/pkg/cmd/roachtest/tests/cdc.go @@ -1225,6 +1225,7 @@ func registerCDC(r registry.Registry) { ct.runTPCCWorkload(tpccArgs{warehouses: 100, duration: "30m"}) + // The deprecated webhook sink is unable to handle the throughput required for 100 warehouses if _, err := ct.DB().Exec("SET CLUSTER SETTING changefeed.new_webhook_sink_enabled = true;"); err != nil { ct.t.Fatal(err) } @@ -1234,7 +1235,6 @@ func registerCDC(r registry.Registry) { targets: allTpccTargets, opts: map[string]string{ "metrics_label": "'webhook'", - "initial_scan": "'only'", "webhook_sink_config": `'{"Flush": { "Messages": 100, "Frequency": "5s" } }'`, }, })