Skip to content

Commit

Permalink
changefeedccl: webhook sink refactor
Browse files Browse the repository at this point in the history
Resolves #84676
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-11356

This PR implements the Webhook sink as part of a more general
`batchingSink` framework that can be used to make adding new sinks an
easier process, making it far more performant than it was previously.

A followup PR will be made to use the `batchingSink` for the pubsub
client which also suffers performance issues.

---

Sink-specific code is encapsulated in a SinkClient interface

```go
type SinkClient interface {
        MakeResolvedPayload(body []byte, topic string) (SinkPayload, error)
        MakeBatchWriter() BatchWriter
        Flush(context.Context, SinkPayload) error
        Close() error
}

type BatchWriter interface {
        AppendKV(key []byte, value []byte, topic string)
        ShouldFlush() bool
        Close() (SinkPayload, error)
}

type SinkPayload interface{}
```

Once the Batch is ready to be Flushed, the writer can be `Close()`'d to
do any final formatting (ex: wrap in a json object with extra metadata)
of the buffer-able data and obtain a final `SinkPayload` that is ready
to be passed to `SinkClient.Flush`.

The `SinkClient` has a separate `MakeResolvedPayload` since the sink may
require resolved events be formatted differently to a batch of kvs.

`Flush(ctx, payload)` encapsulates sending a blocking IO request to the
sink endpoint, and may be called multiple times with the same payload
due to retries.  Any kind of formatting work should be served to run in
the buffer's `Close` and stored as a `SinkPayload` to avoid multiple
calls to `Flush` repeating work upon retries.

---

The `batchingSink` handles all the logic to take a SinkClient and form a
full Sink implementation.

```go
type batchingSink struct {
        client             SinkClient
        ioWorkers          int
        minFlushFrequency  time.Duration
        retryOpts          retry.Options
        eventPool          sync.Pool
        batchPool          sync.Pool
        eventCh            chan interface{}
        pacer              *admission.Pacer
        ...
}

var _ Sink = (*batchingSink)(nil)
```

It involves a single goroutine which handles:
- Creating, building up, and finalizing `BatchWriter`s to eventually
form a `SinkPayload` to emit
- Flushing batches when they have persisted longer than a configured
`minFlushFrequency`
- Flushing deliberately and being able to block until the Flush has completed
- Logging all the various sink metrics

`EmitRow` calls are thread-safe therefore the use of the `safeSink` wrapper is
not required for users of this sink.

Events sent through the goroutines would normally need to exist on the
heap, but to avoid excessive garbage collection of hundreds of thousands
of tiny structs, both the `kvEvents{<data from EmitRow>}` events (sent from the
EmitRow caller to the batching wokrer) and the `sinkBatchBuffer{<data
about the batch>}` events (sent from the batching worker to the IO
routine in the next section) are allocated on object pools.

---

For a sink like Cloudstorage where there are large batches, doing the
above and just one-by-one flushing the batch payloads on a separate
routine is plenty good enough.  Unfortunately the Webhook sink can be
used with no batching at all with users wanting the lowest latency while
still having good throughput.  This means we need to be able to have
multiple requests in flight.  The difficulty here is if a batch with
keys [a1,b1] is in flight, a batch with keys [b2,c1] needs to block
until [a1,b1] completes as b2 cannot be sent and risk arriving at the
destination prior to b1.

Flushing out Payloads in a way that is both able to maintain
key-ordering guarantees but is able to run in parallel is done by a
separate `parallel_io` struct.

```go
type parallelIO struct {
	retryOpts retry.Options
	ioHandler IOHandler
	requestCh chan IORequest
	resultCh  chan IORequest
  ...
}

type IOHandler func(context.Context, IORequest) error

type IORequest interface {
	Keys() intsets.Fast
	SetError(error)
}
```

It involves one goroutine to manage the key ordering guarantees and a
configurable number of IO Worker goroutines that simply call `ioHandler`
on an `IORequest`.

IORequests represent the keys they shouldn't conflict on by providing a
`intsets.Fast` struct, which allows for efficient
Union/Intersects/Difference operations on them that `parallelIO` needs
to maintain ordering guarantees.

Requests are received as IORequests and the response is also returned as
an IORequest.  This way the parallelIO struct does not have to do any
heap allocations to communicate, the user of it can manage creating
and freeing these objects in pools.  The only heap allocations that
occur are part of the `intset` operations as it uses a linkedlist
internally.

---

The webhook sink is therefore formed by:
1. EmitRow is called, creating kvEvents that are sent to a Batching worker
2. The batching worker takes events and appends them to a batch
3. Once the batch is full, its encoded into an HTTP request
4. The request object is then sharded across a set of IO workers to be
   fully sent out in parallel with other non-key-conflicting requests.

With this setup, looking at the CPU flamegraph, at high throughputs most
of the `batchingSink`/`parallelIO` work didn't really show up much, the
work was largely just step 3, where taking a list of messages and
calling `json.Marshal` on it took almost 10% of the time, specifically a
call to `json.Compress`.

Since this isn't needed, and all we're doing is simply putting a list of
already-formatted JSON messages into a surrounding JSON array and small
object, I also swapped `json.Marshal` to just stitch together characters
manually into a buffer.

---

Since Matt's talked about a new significance being placed on Feature
flagging new work to avoid need for technical advisories, I placed this
new implementation under the changefeed.new_webhook_sink_enabled setting
and defaulted it to be disabled.

---

Release note (performance improvement): the webhook sink is now able to
handle a drastically higher maximum throughput by enabling the
"changefeed.new_webhook_sink_enabled" cluster setting.
  • Loading branch information
samiskin committed Mar 23, 2023
1 parent b1fdcd3 commit 80e8a47
Show file tree
Hide file tree
Showing 14 changed files with 1,360 additions and 126 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ changefeed.event_consumer_workers integer 0 the number of workers to use when pr
changefeed.fast_gzip.enabled boolean true use fast gzip implementation
changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds
changefeed.schema_feed.read_with_priority_after duration 1m0s retry with high priority if we were not able to read descriptors for too long; 0 disables
changefeed.sink_io_workers integer 0 the number of workers used by changefeeds when sending requests to the sink: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value.
cloudstorage.azure.concurrent_upload_buffers integer 1 controls the number of concurrent buffers that will be used by the Azure client when uploading chunks.Each buffer can buffer up to cloudstorage.write_chunk.size of memory during an upload
cloudstorage.http.custom_ca string custom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage
cloudstorage.timeout duration 10m0s the timeout for import/export storage operations
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
<tr><td><div id="setting-changefeed-fast-gzip-enabled" class="anchored"><code>changefeed.fast_gzip.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>use fast gzip implementation</td></tr>
<tr><td><div id="setting-changefeed-node-throttle-config" class="anchored"><code>changefeed.node_throttle_config</code></div></td><td>string</td><td><code></code></td><td>specifies node level throttling configuration for all changefeeeds</td></tr>
<tr><td><div id="setting-changefeed-schema-feed-read-with-priority-after" class="anchored"><code>changefeed.schema_feed.read_with_priority_after</code></div></td><td>duration</td><td><code>1m0s</code></td><td>retry with high priority if we were not able to read descriptors for too long; 0 disables</td></tr>
<tr><td><div id="setting-changefeed-sink-io-workers" class="anchored"><code>changefeed.sink_io_workers</code></div></td><td>integer</td><td><code>0</code></td><td>the number of workers used by changefeeds when sending requests to the sink: &lt;0 disables, 0 assigns a reasonable default, &gt;0 assigns the setting value.</td></tr>
<tr><td><div id="setting-cloudstorage-azure-concurrent-upload-buffers" class="anchored"><code>cloudstorage.azure.concurrent_upload_buffers</code></div></td><td>integer</td><td><code>1</code></td><td>controls the number of concurrent buffers that will be used by the Azure client when uploading chunks.Each buffer can buffer up to cloudstorage.write_chunk.size of memory during an upload</td></tr>
<tr><td><div id="setting-cloudstorage-http-custom-ca" class="anchored"><code>cloudstorage.http.custom_ca</code></div></td><td>string</td><td><code></code></td><td>custom root CA (appended to system&#39;s default CAs) for verifying certificates when interacting with HTTPS storage</td></tr>
<tr><td><div id="setting-cloudstorage-timeout" class="anchored"><code>cloudstorage.timeout</code></div></td><td>duration</td><td><code>10m0s</code></td><td>the timeout for import/export storage operations</td></tr>
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
"alter_changefeed_stmt.go",
"authorization.go",
"avro.go",
"batching_sink.go",
"changefeed.go",
"changefeed_dist.go",
"changefeed_processors.go",
Expand All @@ -20,6 +21,7 @@ go_library(
"event_processing.go",
"metrics.go",
"name.go",
"parallel_io.go",
"parquet_sink_cloudstorage.go",
"retry.go",
"scheduled_changefeed.go",
Expand All @@ -32,6 +34,7 @@ go_library(
"sink_pubsub.go",
"sink_sql.go",
"sink_webhook.go",
"sink_webhook_v2.go",
"telemetry.go",
"testing_knobs.go",
"tls.go",
Expand Down Expand Up @@ -125,6 +128,7 @@ go_library(
"//pkg/util/hlc",
"//pkg/util/httputil",
"//pkg/util/humanizeutil",
"//pkg/util/intsets",
"//pkg/util/json",
"//pkg/util/log",
"//pkg/util/log/eventpb",
Expand Down
Loading

0 comments on commit 80e8a47

Please sign in to comment.