Skip to content

Commit

Permalink
🐛 Add queue_check_duration to compressor registerer / 📝 Add some comm…
Browse files Browse the repository at this point in the history
…ents in values.yaml

Signed-off-by: Rintaro Okamura <[email protected]>
  • Loading branch information
rinx committed May 7, 2020
1 parent 6827c16 commit c47d42a
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 9 deletions.
24 changes: 19 additions & 5 deletions charts/vald/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ gateway:
# gateway.progressDeadlineSeconds -- progress deadline seconds
progressDeadlineSeconds: 600
# gateway.minReplicas -- minimum number of replicas
# if HPA is disabled, this number is used for replicas
minReplicas: 3
# gateway.maxReplicas -- maximum number of replicas
maxReplicas: 9
Expand Down Expand Up @@ -613,6 +614,7 @@ agent:
# agent.progressDeadlineSeconds -- progress deadline seconds
progressDeadlineSeconds: 600
# agent.minReplicas -- minimum number of replicas
# if HPA is disabled, this number is used for replicas
minReplicas: 20
# agent.maxReplicas -- maximum number of replicas
maxReplicas: 300
Expand Down Expand Up @@ -761,6 +763,7 @@ discoverer:
# discoverer.progressDeadlineSeconds -- progress deadline seconds
progressDeadlineSeconds: 600
# discoverer.minReplicas -- minimum number of replicas
# if HPA is disabled, this number is used for replicas
minReplicas: 1
# discoverer.maxReplicas -- maximum number of replicas
maxReplicas: 2
Expand Down Expand Up @@ -905,6 +908,7 @@ compressor:
# compressor.progressDeadlineSeconds -- progress deadline seconds
progressDeadlineSeconds: 600
# compressor.minReplicas -- minimum number of replicas
# if HPA is disabled, this number is used for replicas
minReplicas: 3
# compressor.maxReplicas -- maximum number of replicas
maxReplicas: 15
Expand Down Expand Up @@ -1013,17 +1017,25 @@ compressor:
# compressor.backup.client -- grpc client for backup (overrides defaults.grpc.client)
client: {}
compress:
# compressor.compress.compress_algorithm -- compression algorithm: gob, gzip, lz4 or zstd
# compressor.compress.compress_algorithm -- compression algorithm
# must be `gob`, `gzip`, `lz4` or `zstd`
compress_algorithm: zstd
# compressor.compress.compression_level -- compression level
# value range relies on which algorithm is used
# `gob`: level will be ignored.
# `gzip`: -1 (default compression), 0 (no compression), or 1 (best speed) to 9 (best compression).
# `lz4`: >= 0, higher is better.
# `zstd`: 1 (fastest) to 22 (best), however implementation relies on klauspost/compress.
compression_level: 3
# compressor.compress.concurrent_limit -- concurrency limit
# compressor.compress.concurrent_limit -- concurrency limit for compress/decompress processes
concurrent_limit: 10
# queue_check_duration represents duration of queue daemon block
queue_check_duration: 5s
# compressor.compress.queue_check_duration represents duration of queue daemon block
queue_check_duration: 200ms
registerer:
# compressor.registerer.concurrent_limit -- concurrency limit of registerer worker
# compressor.registerer.concurrent_limit -- concurrency limit for registering vector processes
concurrent_limit: 10
# compressor.registerer.queue_check_duration represents duration of queue daemon block
queue_check_duration: 200ms
compressor:
# compressor.registerer.compressor.client -- gRPC client for compressor (overrides defaults.grpc.client)
client: {}
Expand All @@ -1042,6 +1054,7 @@ backupManager:
# backupManager.progressDeadlineSeconds -- progress deadline seconds
progressDeadlineSeconds: 600
# backupManager.minReplicas -- minimum number of replicas
# if HPA is disabled, this number is used for replicas
minReplicas: 3
# backupManager.maxReplicas -- maximum number of replicas
maxReplicas: 15
Expand Down Expand Up @@ -1515,6 +1528,7 @@ meta:
# meta.progressDeadlineSeconds -- progress deadline seconds
progressDeadlineSeconds: 600
# meta.minReplicas -- minimum number of replicas
# if HPA is disabled, this number is used for replicas
minReplicas: 2
# meta.maxReplicas -- maximum number of replicas
maxReplicas: 10
Expand Down
5 changes: 5 additions & 0 deletions internal/config/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,16 @@ type CompressorRegisterer struct {
// ConcurrentLimit represents limitation of worker
ConcurrentLimit int `json:"concurrent_limit" yaml:"concurrent_limit"`

// QueueCheckDuration represents duration of queue daemon block
QueueCheckDuration string `json:"queue_check_duration" yaml:"queue_check_duration"`

// Compressor represents gRPC client config of compressor client (for forwarding use)
Compressor *BackupManager `json:"compressor" yaml:"compressor"`
}

func (cr *CompressorRegisterer) Bind() *CompressorRegisterer {
cr.QueueCheckDuration = GetActualValue(cr.QueueCheckDuration)

if cr.Compressor != nil {
cr.Compressor = cr.Compressor.Bind()
} else {
Expand Down
2 changes: 1 addition & 1 deletion internal/worker/queue_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var (
defaultQueueOpts = []QueueOption{
WithQueueBuffer(10),
WithQueueErrGroup(errgroup.Get()),
WithQueueCheckDuration("5s"),
WithQueueCheckDuration("200ms"),
}
)

Expand Down
13 changes: 10 additions & 3 deletions pkg/manager/compressor/usecase/compressord.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,11 @@ func New(cfg *config.Data) (r runner.Runner, err error) {
service.WithCompressorWorker(
worker.WithName("compressor"),
worker.WithLimitation(cfg.Compressor.ConcurrentLimit),
worker.WithQueueOption(worker.WithQueueCheckDuration(
cfg.Compressor.QueueCheckDuration,
)),
worker.WithQueueOption(
worker.WithQueueCheckDuration(
cfg.Compressor.QueueCheckDuration,
),
),
),
service.WithCompressorErrGroup(eg),
)
Expand Down Expand Up @@ -128,6 +130,11 @@ func New(cfg *config.Data) (r runner.Runner, err error) {
service.WithRegistererWorker(
worker.WithName("registerer"),
worker.WithLimitation(cfg.Registerer.ConcurrentLimit),
worker.WithQueueOption(
worker.WithQueueCheckDuration(
cfg.Registerer.QueueCheckDuration,
),
),
),
service.WithRegistererErrGroup(eg),
service.WithRegistererBackup(b),
Expand Down

0 comments on commit c47d42a

Please sign in to comment.