-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
Copy pathsettings.go
332 lines (300 loc) · 14.1 KB
/
settings.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
// Copyright 2020 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt
package changefeedbase
import (
"encoding/json"
"time"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util"
)
// TableDescriptorPollInterval controls how fast table descriptors are polled. A
// table descriptor must be read above the timestamp of any row that we'll emit.
//
// NB: The more generic name of this setting precedes its current
// interpretation. It used to control additional polling rates.
var TableDescriptorPollInterval = settings.RegisterDurationSetting(
settings.TenantWritable,
"changefeed.experimental_poll_interval",
"polling interval for the table descriptors",
1*time.Second,
settings.NonNegativeDuration,
)
// DefaultMinCheckpointFrequency is the default frequency to flush sink.
// See comment in newChangeAggregatorProcessor for explanation on the value.
var DefaultMinCheckpointFrequency = 30 * time.Second
// TestingSetDefaultMinCheckpointFrequency changes DefaultMinCheckpointFrequency for tests.
// Returns function to restore flush frequency to its original value.
func TestingSetDefaultMinCheckpointFrequency(f time.Duration) func() {
old := DefaultMinCheckpointFrequency
DefaultMinCheckpointFrequency = f
return func() { DefaultMinCheckpointFrequency = old }
}
// PerChangefeedMemLimit controls how much data can be buffered by
// a single changefeed.
var PerChangefeedMemLimit = settings.RegisterByteSizeSetting(
settings.TenantWritable,
"changefeed.memory.per_changefeed_limit",
"controls amount of data that can be buffered per changefeed",
1<<29, // 512MiB
settings.WithPublic)
// SlowSpanLogThreshold controls when we will log slow spans.
var SlowSpanLogThreshold = settings.RegisterDurationSetting(
settings.TenantWritable,
"changefeed.slow_span_log_threshold",
"a changefeed will log spans with resolved timestamps this far behind the current wall-clock time; if 0, a default value is calculated based on other cluster settings",
0,
settings.NonNegativeDuration,
)
// IdleTimeout controls how long the changefeed will wait for a new KV being
// emitted before marking itself as idle.
var IdleTimeout = settings.RegisterDurationSetting(
settings.TenantWritable,
"changefeed.idle_timeout",
"a changefeed will mark itself idle if no changes have been emitted for greater than this duration; if 0, the changefeed will never be marked idle",
10*time.Minute,
settings.NonNegativeDuration,
settings.WithName("changefeed.auto_idle.timeout"),
)
// FrontierCheckpointFrequency controls the frequency of frontier checkpoints.
var FrontierCheckpointFrequency = settings.RegisterDurationSetting(
settings.TenantWritable,
"changefeed.frontier_checkpoint_frequency",
"controls the frequency with which span level checkpoints will be written; if 0, disabled",
10*time.Minute,
settings.NonNegativeDuration,
)
// FrontierHighwaterLagCheckpointThreshold controls the amount the high-water
// mark is allowed to lag behind the leading edge of the frontier before we
// begin to attempt checkpointing spans above the high-water mark
var FrontierHighwaterLagCheckpointThreshold = settings.RegisterDurationSetting(
settings.TenantWritable,
"changefeed.frontier_highwater_lag_checkpoint_threshold",
"controls the maximum the high-water mark is allowed to lag behind the leading spans of the frontier before per-span checkpointing is enabled; if 0, checkpointing due to high-water lag is disabled",
10*time.Minute,
settings.NonNegativeDuration,
settings.WithPublic)
// FrontierCheckpointMaxBytes controls the maximum number of key bytes that will be added
// to the checkpoint record.
// Checkpoint record could be fairly large.
// Assume we have a 10T table, and a 1/2G max range size: 20K spans.
// Span frontier merges adjacent spans, so worst case we have 10K spans.
// Each span is a pair of keys. Those could be large. Assume 1/2K per key.
// So, 1KB per span. We could be looking at 10MB checkpoint record.
//
// The default for this setting was chosen as follows:
// - Assume a very long backfill, running for 25 hours (GC TTL default duration).
// - Assume we want to have at most 150MB worth of checkpoints in the job record.
//
// Therefore, we should write at most 6 MB of checkpoint/hour; OR, based on the default
// FrontierCheckpointFrequency setting, 1 MB per checkpoint.
var FrontierCheckpointMaxBytes = settings.RegisterByteSizeSetting(
settings.TenantWritable,
"changefeed.frontier_checkpoint_max_bytes",
"controls the maximum size of the checkpoint as a total size of key bytes",
1<<20, // 1 MiB
)
// ScanRequestLimit is the number of Scan requests that can run at once.
// Scan requests are issued when changefeed performs the backfill.
// If set to 0, a reasonable default will be chosen.
var ScanRequestLimit = settings.RegisterIntSetting(
settings.TenantWritable,
"changefeed.backfill.concurrent_scan_requests",
"number of concurrent scan requests per node issued during a backfill",
0,
settings.WithPublic)
// ScanRequestSize is the target size of the scan request response.
//
// TODO(cdc,yevgeniy,irfansharif): 16 MiB is too large for "elastic" work such
// as this; reduce the default. Evaluate this as part of #90089.
var ScanRequestSize = settings.RegisterIntSetting(
settings.TenantWritable,
"changefeed.backfill.scan_request_size",
"the maximum number of bytes returned by each scan request",
1<<19, // 1/2 MiB
settings.WithPublic)
// SinkThrottleConfig describes throttling configuration for the sink.
// 0 values for any of the settings disable that setting.
type SinkThrottleConfig struct {
// MessageRate sets approximate messages/s limit.
MessageRate float64 `json:",omitempty"`
// MessageBurst sets burst budget for messages/s.
MessageBurst float64 `json:",omitempty"`
// ByteRate sets approximate bytes/second limit.
ByteRate float64 `json:",omitempty"`
// RateBurst sets burst budget in bytes/s.
ByteBurst float64 `json:",omitempty"`
// FlushRate sets approximate flushes/s limit.
FlushRate float64 `json:",omitempty"`
// FlushBurst sets burst budget for flushes/s.
FlushBurst float64 `json:",omitempty"`
}
// NodeSinkThrottleConfig is the node wide throttling configuration for changefeeds.
var NodeSinkThrottleConfig = settings.RegisterStringSetting(
settings.TenantWritable,
"changefeed.node_throttle_config",
"specifies node level throttling configuration for all changefeeeds",
"",
settings.WithValidateString(validateSinkThrottleConfig),
settings.WithPublic,
settings.WithReportable(true),
)
func validateSinkThrottleConfig(values *settings.Values, configStr string) error {
if configStr == "" {
return nil
}
var config = &SinkThrottleConfig{}
return json.Unmarshal([]byte(configStr), config)
}
// MinHighWaterMarkCheckpointAdvance specifies the minimum amount of time the
// changefeed high water mark must advance for it to be eligible for checkpointing.
var MinHighWaterMarkCheckpointAdvance = settings.RegisterDurationSetting(
settings.TenantWritable,
"changefeed.min_highwater_advance",
"minimum amount of time the changefeed high water mark must advance "+
"for it to be eligible for checkpointing; Default of 0 will checkpoint every time frontier "+
"advances, as long as the rate of checkpointing keeps up with the rate of frontier changes",
0,
settings.NonNegativeDuration,
settings.WithPublic)
// EventMemoryMultiplier is the multiplier for the amount of memory needed to process an event.
//
// Memory accounting is hard. Furthermore, during the lifetime of the event, the
// amount of resources used to process such event varies. So, instead of coming up
// with complex schemes to accurately measure and adjust current memory usage,
// we'll request the amount of memory multiplied by this fudge factor.
var EventMemoryMultiplier = settings.RegisterFloatSetting(
settings.TenantWritable,
"changefeed.event_memory_multiplier",
"the amount of memory required to process an event is multiplied by this factor",
3,
settings.FloatWithMinimum(1),
)
// ProtectTimestampInterval controls the frequency of protected timestamp record updates
var ProtectTimestampInterval = settings.RegisterDurationSetting(
settings.TenantWritable,
"changefeed.protect_timestamp_interval",
"controls how often the changefeed forwards its protected timestamp to the resolved timestamp",
10*time.Minute,
settings.PositiveDuration,
settings.WithPublic)
// MaxProtectedTimestampAge controls the frequency of protected timestamp record updates
var MaxProtectedTimestampAge = settings.RegisterDurationSetting(
settings.TenantWritable,
"changefeed.protect_timestamp.max_age",
"fail the changefeed if the protected timestamp age exceeds this threshold; 0 disables expiration",
4*24*time.Hour,
settings.NonNegativeDuration,
settings.WithPublic)
// BatchReductionRetryEnabled enables the temporary reduction of batch sizes upon kafka message too large errors
var BatchReductionRetryEnabled = settings.RegisterBoolSetting(
settings.TenantWritable,
"changefeed.batch_reduction_retry_enabled",
"if true, kafka changefeeds upon erroring on an oversized batch will attempt to resend the messages with progressively lower batch sizes",
false,
settings.WithName("changefeed.batch_reduction_retry.enabled"),
settings.WithPublic)
// UseMuxRangeFeed enables the use of MuxRangeFeed RPC.
var UseMuxRangeFeed = settings.RegisterBoolSetting(
settings.TenantWritable,
"changefeed.mux_rangefeed.enabled",
"if true, changefeed uses multiplexing rangefeed RPC",
util.ConstantWithMetamorphicTestBool("changefeed.mux_rangefeed.enabled", false),
)
// EventConsumerWorkers specifies the maximum number of workers to use when
// processing events.
var EventConsumerWorkers = settings.RegisterIntSetting(
settings.TenantWritable,
"changefeed.event_consumer_workers",
"the number of workers to use when processing events: <0 disables, "+
"0 assigns a reasonable default, >0 assigns the setting value. for experimental/core "+
"changefeeds and changefeeds using parquet format, this is disabled",
0,
settings.WithPublic)
// EventConsumerWorkerQueueSize specifies the maximum number of events a worker buffer.
var EventConsumerWorkerQueueSize = settings.RegisterIntSetting(
settings.TenantWritable,
"changefeed.event_consumer_worker_queue_size",
"if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events "+
"which a worker can buffer",
int64(util.ConstantWithMetamorphicTestRange("changefeed.event_consumer_worker_queue_size", 16, 0, 16)),
settings.NonNegativeInt,
settings.WithPublic)
// EventConsumerPacerRequestSize specifies how often (measured in CPU time)
// that event consumer workers request CPU time from admission control.
// For example, every N milliseconds of CPU work, request N more
// milliseconds of CPU time.
var EventConsumerPacerRequestSize = settings.RegisterDurationSetting(
settings.TenantWritable,
"changefeed.cpu.per_event_consumer_worker_allocation",
"an event consumer worker will perform a blocking request for CPU time "+
"before consuming events. after fully utilizing this CPU time, it will "+
"request more",
50*time.Millisecond,
settings.PositiveDuration,
)
// PerEventElasticCPUControlEnabled determines whether changefeed event
// processing integrates with elastic CPU control.
var PerEventElasticCPUControlEnabled = settings.RegisterBoolSetting(
settings.TenantWritable,
"changefeed.cpu.per_event_elastic_control.enabled",
"determines whether changefeed event processing integrates with elastic CPU control",
true,
)
// RequireExternalConnectionSink is used to restrict non-admins with the CHANGEFEED privilege
// to create changefeeds to external connections only.
var RequireExternalConnectionSink = settings.RegisterBoolSetting(
settings.TenantWritable,
"changefeed.permissions.require_external_connection_sink",
"if enabled, this settings restricts users with the CHANGEFEED privilege"+
" to create changefeeds with external connection sinks only."+
" see https://www.cockroachlabs.com/docs/stable/create-external-connection.html",
false,
settings.WithName("changefeed.permissions.require_external_connection_sink.enabled"),
)
// SinkIOWorkers controls the number of IO workers used by sinks that use
// parallelIO to be able to send multiple requests in parallel.
var SinkIOWorkers = settings.RegisterIntSetting(
settings.TenantWritable,
"changefeed.sink_io_workers",
"the number of workers used by changefeeds when sending requests to the sink "+
"(currently webhook only): <0 disables, 0 assigns a reasonable default, >0 assigns the setting value",
0,
settings.WithPublic)
// SinkPacerRequestSize specifies how often (measured in CPU time)
// that the Sink batching worker request CPU time from admission control. For
// example, every N milliseconds of CPU work, request N more milliseconds of CPU
// time.
var SinkPacerRequestSize = settings.RegisterDurationSetting(
settings.TenantWritable,
"changefeed.cpu.sink_encoding_allocation",
"an event consumer worker will perform a blocking request for CPU time "+
"before consuming events. after fully utilizing this CPU time, it will "+
"request more",
50*time.Millisecond,
settings.PositiveDuration,
)
// LaggingRangesCheckFrequency is the frequency at which a changefeed will
// check for ranges which have fallen behind.
var LaggingRangesCheckFrequency = settings.RegisterDurationSetting(
settings.TenantWritable,
"changefeed.lagging_ranges_frequency",
"controls the frequency at which a changefeed checks for ranges which have fallen behind",
1*time.Minute,
settings.NonNegativeDuration,
settings.WithPublic,
)
// LaggingRangesThreshold is how far behind a range must be from the present to
// be considered as 'lagging' behind in metrics
var LaggingRangesThreshold = settings.RegisterDurationSetting(
settings.TenantWritable,
"changefeed.lagging_ranges_threshold",
"controls how far behind a range must be from the present to be considered as 'lagging' behind in metrics",
3*time.Minute,
settings.NonNegativeDuration,
settings.WithPublic,
)