Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[patch] Refactor agent-sidecar: fix S3 reader & add backoff logic #467

Merged
2 changes: 2 additions & 0 deletions .github/workflows/dockers-agent-sidecar-image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ on:
paths:
- 'internal/**'
- '!internal/db/**'
- 'internal/db/storage/blob'
- '!internal/k8s/**'
- 'apis/grpc/**'
- 'pkg/agent/sidecar/**'
Expand All @@ -22,6 +23,7 @@ on:
paths:
- 'internal/**'
- '!internal/db/**'
- 'internal/db/storage/blob'
- '!internal/k8s/**'
- 'apis/grpc/**'
- 'pkg/agent/sidecar/**'
Expand Down
98 changes: 97 additions & 1 deletion charts/vald/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ defaults:
# @schema {"name": "defaults.grpc.client.connection_pool.old_conn_close_duration", "type": "string"}
# defaults.grpc.client.connection_pool.old_conn_close_duration -- makes delay before gRPC client connection closing during connection pool rebalance
old_conn_close_duration: "3s"
# @schema {"name": "defaults.grpc.client.backoff", "type": "object"}
# @schema {"name": "defaults.grpc.client.backoff", "type": "object", "anchor": "backoff"}
backoff:
# @schema {"name": "defaults.grpc.client.backoff.initial_duration", "type": "string"}
# defaults.grpc.client.backoff.initial_duration -- gRPC client backoff initial duration
Expand Down Expand Up @@ -1318,6 +1318,102 @@ agent:
# `lz4`: >= 0, higher is better compression.
# `zstd`: 1 (fastest) to 22 (best), however implementation relies on klauspost/compress.
compression_level: -1
# @schema {"name": "agent.sidecar.config.client", "type": "object"}
client:
# @schema {"name": "agent.sidecar.config.client.tcp", "alias": "tcp"}
tcp:
dns:
# agent.sidecar.config.client.tcp.dns.cache_enabled -- HTTP client TCP DNS cache enabled
cache_enabled: true
# agent.sidecar.config.client.tcp.dns.refresh_duration -- HTTP client TCP DNS cache refresh duration
refresh_duration: 1h
# agent.sidecar.config.client.tcp.dns.refresh_duration -- HTTP client TCP DNS cache expiration
cache_expiration: 24h
dialer:
# agent.sidecar.config.client.tcp.dialer.timeout -- HTTP client TCP dialer connect timeout
timeout: 5s
# agent.sidecar.config.client.tcp.dialer.keep_alive -- HTTP client TCP dialer keep alive
keep_alive: 5m
# agent.sidecar.config.client.tcp.dialer.dual_stack_enabled -- HTTP client TCP dialer dual stack enabled
dual_stack_enabled: false
tls:
# agent.sidecar.config.client.tcp.tls.enabled -- HTTP client TCP TLS enabled
enabled: false
# agent.sidecar.config.client.tcp.tls.cert -- HTTP client TCP TLS cert path
cert: /path/to/cert
# agent.sidecar.config.client.tcp.tls.key -- HTTP client TCP TLS key path
key: /path/to/key
# agent.sidecar.config.client.tcp.tls.ca -- HTTP client TCP TLS ca path
ca: /path/to/ca
# @schema {"name": "agent.sidecar.config.client.transport", "type": "object"}
transport:
# @schema {"name": "agent.sidecar.config.client.transport.round_tripper", "type": "object"}
round_tripper:
# @schema {"name": "agent.sidecar.config.client.transport.round_tripper.tls_handshake_timeout", "type": "string"}
# agent.sidecar.config.client.transport.round_tripper.tls_handshake_timeout -- TLS handshake timeout
tls_handshake_timeout: 5s
# @schema {"name": "agent.sidecar.config.client.transport.round_tripper.max_idle_conns", "type": "integer"}
# agent.sidecar.config.client.transport.round_tripper.max_idle_conns -- maximum count of idle connections
max_idle_conns: 100
# @schema {"name": "agent.sidecar.config.client.transport.round_tripper.max_idle_conns_per_host", "type": "integer"}
# agent.sidecar.config.client.transport.round_tripper.max_idle_conns_per_host -- maximum count of idle connections per host
max_idle_conns_per_host: 10
# @schema {"name": "agent.sidecar.config.client.transport.round_tripper.max_conns_per_host", "type": "integer"}
# agent.sidecar.config.client.transport.round_tripper.max_conns_per_host -- maximum count of connections per host
max_conns_per_host: 10
# @schema {"name": "agent.sidecar.config.client.transport.round_tripper.idle_conn_timeout", "type": "string"}
# agent.sidecar.config.client.transport.round_tripper.idle_conn_timeout -- timeout for idle connections
idle_conn_timeout: 90s
# @schema {"name": "agent.sidecar.config.client.transport.round_tripper.response_header_timeout", "type": "string"}
# agent.sidecar.config.client.transport.round_tripper.response_header_timeout -- timeout for response header
response_header_timeout: 5s
# @schema {"name": "agent.sidecar.config.client.transport.round_tripper.expect_continue_timeout", "type": "string"}
# agent.sidecar.config.client.transport.round_tripper.expect_continue_timeout -- expect continue timeout
expect_continue_timeout: 5s
# @schema {"name": "agent.sidecar.config.client.transport.round_tripper.max_response_header_size", "type": "integer"}
# agent.sidecar.config.client.transport.round_tripper.max_response_header_size -- maximum response header size
max_response_header_size: 0
# @schema {"name": "agent.sidecar.config.client.transport.round_tripper.write_buffer_size", "type": "integer"}
# agent.sidecar.config.client.transport.round_tripper.write_buffer_size -- write buffer size
write_buffer_size: 0
# @schema {"name": "agent.sidecar.config.client.transport.round_tripper.read_buffer_size", "type": "integer"}
# agent.sidecar.config.client.transport.round_tripper.read_buffer_size -- read buffer size
read_buffer_size: 0
# @schema {"name": "agent.sidecar.config.client.transport.round_tripper.force_attempt_http_2", "type": "boolean"}
# agent.sidecar.config.client.transport.round_tripper.force_attempt_http_2 -- force attempt HTTP2
force_attempt_http_2: true
# @schema {"name": "agent.sidecar.config.client.transport.backoff", "alias": "backoff"}
backoff:
# agent.sidecar.config.client.transport.backoff.initial_duration -- backoff initial duration
initial_duration: 5ms
# agent.sidecar.config.client.transport.backoff.backoff_time_limit -- backoff time limit
backoff_time_limit: 5s
# agent.sidecar.config.client.transport.backoff.maximum_duration -- backoff maximum duration
maximum_duration: 5s
# agent.sidecar.config.client.transport.backoff.jitter_limit -- backoff jitter limit
jitter_limit: 100ms
# agent.sidecar.config.client.transport.backoff.backoff_factor -- backoff backoff factor
backoff_factor: 1.1
# agent.sidecar.config.client.transport.backoff.retry_count -- backoff retry count
retry_count: 100
# agent.sidecar.config.client.transport.backoff.enable_error_log -- backoff error log enabled
enable_error_log: true
# @schema {"name": "agent.sidecar.config.restore_backoff", "alias": "backoff"}
restore_backoff:
# agent.sidecar.config.restore_backoff.initial_duration -- restore backoff initial duration
initial_duration: 1s
# agent.sidecar.config.restore_backoff.backoff_time_limit -- restore backoff time limit
backoff_time_limit: 30m
# agent.sidecar.config.restore_backoff.maximum_duration -- restore backoff maximum duration
maximum_duration: 1m
# agent.sidecar.config.restore_backoff.jitter_limit -- restore backoff jitter limit
jitter_limit: 10s
# agent.sidecar.config.restore_backoff.backoff_factor -- restore backoff factor
backoff_factor: 1.2
# agent.sidecar.config.restore_backoff.retry_count -- restore backoff retry count
retry_count: 100
# agent.sidecar.config.restore_backoff.enable_error_log -- restore backoff log enabled
enable_error_log: true

# @schema {"name": "discoverer", "type": "object"}
discoverer:
Expand Down
18 changes: 18 additions & 0 deletions internal/config/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ type AgentSidecar struct {

// Compress represent compression configurations
Compress *CompressCore `yaml:"compress" json:"compress"`

// RestoreBackoff represent backoff configurations for restoring process
RestoreBackoff *Backoff `yaml:"restore_backoff" json:"restore_backoff"`

// Client represent HTTP client configurations
Client *Client `yaml:"client" json:"client"`
}

func (s *AgentSidecar) Bind() *AgentSidecar {
Expand All @@ -63,5 +69,17 @@ func (s *AgentSidecar) Bind() *AgentSidecar {
s.Compress = new(CompressCore)
}

if s.RestoreBackoff != nil {
s.RestoreBackoff = s.RestoreBackoff.Bind()
} else {
s.RestoreBackoff = new(Backoff)
}

if s.Client != nil {
s.Client = s.Client.Bind()
} else {
s.Client = new(Client)
}

return s
}
10 changes: 9 additions & 1 deletion internal/config/sidecar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ func TestAgentSidecar_Bind(t *testing.T) {
Mode string
WatchDir string
AutoBackupDuration string
PostStopTimeout string
Filename string
FilenameSuffix string
BlobStorage *Blob
Compress *CompressCore
RestoreBackoff *Backoff
}
type want struct {
want *AgentSidecar
Expand Down Expand Up @@ -61,10 +63,12 @@ func TestAgentSidecar_Bind(t *testing.T) {
Mode: "",
WatchDir: "",
AutoBackupDuration: "",
PostStopTimeout: "",
Filename: "",
FilenameSuffix: "",
BlobStorage: Blob{},
Compress: CompressCore{},
RestoreBackoff: Backoff{},
},
want: want{},
checkFunc: defaultCheckFunc,
Expand All @@ -80,10 +84,12 @@ func TestAgentSidecar_Bind(t *testing.T) {
Mode: "",
WatchDir: "",
AutoBackupDuration: "",
PostStopTimeout: "",
Filename: "",
FilenameSuffix: "",
BlobStorage: Blob{},
Compress: CompressCore{},
RestoreBackoff: Backoff{},
},
want: want{},
checkFunc: defaultCheckFunc,
Expand All @@ -94,7 +100,7 @@ func TestAgentSidecar_Bind(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(tt *testing.T) {
defer goleak.VerifyNone(t)
defer goleak.VerifyNone(tt)
if test.beforeFunc != nil {
test.beforeFunc()
}
Expand All @@ -108,10 +114,12 @@ func TestAgentSidecar_Bind(t *testing.T) {
Mode: test.fields.Mode,
WatchDir: test.fields.WatchDir,
AutoBackupDuration: test.fields.AutoBackupDuration,
PostStopTimeout: test.fields.PostStopTimeout,
Filename: test.fields.Filename,
FilenameSuffix: test.fields.FilenameSuffix,
BlobStorage: test.fields.BlobStorage,
Compress: test.fields.Compress,
RestoreBackoff: test.fields.RestoreBackoff,
}

got := s.Bind()
Expand Down
5 changes: 5 additions & 0 deletions internal/config/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,14 @@ func (r *RoundTripper) Bind() *RoundTripper {
func (t *Transport) Bind() *Transport {
if t.RoundTripper != nil {
t.RoundTripper = t.RoundTripper.Bind()
} else {
t.RoundTripper = new(RoundTripper)
}

if t.Backoff != nil {
t.Backoff = t.Backoff.Bind()
} else {
t.Backoff = new(Backoff)
}

return t
Expand Down
17 changes: 15 additions & 2 deletions internal/db/storage/blob/s3/reader/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,27 @@

package reader

import "github.com/aws/aws-sdk-go/service/s3"
import (
"github.com/aws/aws-sdk-go/service/s3"
"github.com/vdaas/vald/internal/errgroup"
)

type Option func(r *reader)

var (
defaultOpts = []Option{}
defaultOpts = []Option{
WithErrGroup(errgroup.Get()),
}
)

func WithErrGroup(eg errgroup.Group) Option {
return func(r *reader) {
if eg != nil {
r.eg = eg
}
}
}

func WithService(s *s3.S3) Option {
return func(r *reader) {
if s != nil {
Expand Down
Loading