Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: multiple filters #173

Merged
merged 1 commit into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ printf "label=something_else" | nc -N localhost 8080
In the ryuk window you'll see containers/networks/volumes deleted after 10s

```log
time=2024-09-30T19:42:30.000+01:00 level=INFO msg=starting connection_timeout=1m0s reconnection_timeout=10s request_timeout=10s shutdown_timeout=10m0s remove_retries=10 retry_offset=-1s port=8080 verbose=false
time=2024-09-30T19:42:30.000+01:00 level=INFO msg=starting connection_timeout=1m0s reconnection_timeout=10s request_timeout=10s shutdown_timeout=10m0s remove_retries=10 retry_offset=-1s changes_retry_interval=1s port=8080 verbose=false
time=2024-09-30T19:42:30.001+01:00 level=INFO msg="Started"
time=2024-09-30T19:42:30.001+01:00 level=INFO msg="client processing started"
time=2024-09-30T19:42:38.002+01:00 level=INFO msg="client connected" address=127.0.0.1:56432 clients=1
Expand All @@ -68,13 +68,14 @@ time=2024-09-30T19:42:52.216+01:00 level=INFO msg=done

The following environment variables can be configured to change the behaviour:

| Environment Variable | Default | Format | Description |
| --------------------------- | ------- | ------- | ------------ |
| `RYUK_CONNECTION_TIMEOUT` | `60s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The duration without receiving any connections which will trigger a shutdown |
| `RYUK_PORT` | `8080` | `uint16` | The port to listen on for connections |
| `RYUK_RECONNECTION_TIMEOUT` | `10s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The duration after the last connection closes which will trigger resource clean up and shutdown |
| `RYUK_REQUEST_TIMEOUT` | `10s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The timeout for any Docker requests |
| `RYUK_REMOVE_RETRIES` | `10` | `int` | The number of times to retry removing a resource |
| `RYUK_RETRY_OFFSET` | `-1s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The offset added to the start time of the prune pass that is used as the minimum resource creation time. Any resource created after this calculated time will trigger a retry to ensure in use resources are not removed |
| `RYUK_VERBOSE` | `false` | `bool` | Whether to enable verbose aka debug logging |
| `RYUK_SHUTDOWN_TIMEOUT` | `10m` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The duration after shutdown has been requested when the remaining connections are ignored and prune checks start |
| Environment Variable | Default | Format | Description |
| ----------------------------- | ------- | ------- | ------------ |
| `RYUK_CONNECTION_TIMEOUT` | `60s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The duration without receiving any connections which will trigger a shutdown |
| `RYUK_PORT` | `8080` | `uint16` | The port to listen on for connections |
| `RYUK_RECONNECTION_TIMEOUT` | `10s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The duration after the last connection closes which will trigger resource clean up and shutdown |
| `RYUK_REQUEST_TIMEOUT` | `10s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The timeout for any Docker requests |
| `RYUK_REMOVE_RETRIES` | `10` | `int` | The number of times to retry removing a resource |
| `RYUK_RETRY_OFFSET` | `-1s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The offset added to the start time of the prune pass that is used as the minimum resource creation time. Any resource created after this calculated time will trigger a retry to ensure in use resources are not removed |
| `RYUK_CHANGES_RETRY_INTERVAL` | `1s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The internal between retries if resource changes (containers, networks, images, and volumes) are detected while pruning |
| `RYUK_VERBOSE` | `false` | `bool` | Whether to enable verbose aka debug logging |
| `RYUK_SHUTDOWN_TIMEOUT` | `10m` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The duration after shutdown has been requested when the remaining connections are ignored and prune checks start |
5 changes: 5 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ type config struct {
// calculated time will trigger a retry to ensure in use resources are not removed.
RetryOffset time.Duration `env:"RYUK_RETRY_OFFSET" envDefault:"-1s"`

// ChangesRetryInterval is the internal between retries if resource changes (containers,
// networks, images, and volumes) are detected while pruning.
ChangesRetryInterval time.Duration `env:"RYUK_CHANGES_RETRY_INTERVAL" envDefault:"1s"`
mdelapenya marked this conversation as resolved.
Show resolved Hide resolved

// ShutdownTimeout is the maximum amount of time the reaper will wait
// for once signalled to shutdown before it terminates even if connections
// are still established.
Expand All @@ -49,6 +53,7 @@ func (c config) LogAttrs() []slog.Attr {
slog.Duration("shutdown_timeout", c.ShutdownTimeout),
slog.Int("remove_retries", c.RemoveRetries),
slog.Duration("retry_offset", c.RetryOffset),
slog.Duration("changes_retry_interval", c.ChangesRetryInterval),
slog.Int("port", int(c.Port)),
slog.Bool("verbose", c.Verbose),
}
Expand Down
33 changes: 18 additions & 15 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@ func Test_loadConfig(t *testing.T) {

t.Run("defaults", func(t *testing.T) {
expected := config{
Port: 8080,
ConnectionTimeout: time.Minute,
ReconnectionTimeout: time.Second * 10,
ShutdownTimeout: time.Minute * 10,
RemoveRetries: 10,
RequestTimeout: time.Second * 10,
RetryOffset: -time.Second,
Port: 8080,
ConnectionTimeout: time.Minute,
ReconnectionTimeout: time.Second * 10,
ShutdownTimeout: time.Minute * 10,
RemoveRetries: 10,
RequestTimeout: time.Second * 10,
RetryOffset: -time.Second,
ChangesRetryInterval: time.Second,
}

cfg, err := loadConfig()
Expand All @@ -53,16 +54,18 @@ func Test_loadConfig(t *testing.T) {
t.Setenv("RYUK_REQUEST_TIMEOUT", "4s")
t.Setenv("RYUK_REMOVE_RETRIES", "5")
t.Setenv("RYUK_RETRY_OFFSET", "-6s")
t.Setenv("RYUK_CHANGES_RETRY_INTERVAL", "8s")

expected := config{
Port: 1234,
ConnectionTimeout: time.Second * 2,
ReconnectionTimeout: time.Second * 3,
ShutdownTimeout: time.Second * 7,
Verbose: true,
RemoveRetries: 5,
RequestTimeout: time.Second * 4,
RetryOffset: -time.Second * 6,
Port: 1234,
ConnectionTimeout: time.Second * 2,
ReconnectionTimeout: time.Second * 3,
ShutdownTimeout: time.Second * 7,
Verbose: true,
RemoveRetries: 5,
RequestTimeout: time.Second * 4,
RetryOffset: -time.Second * 6,
ChangesRetryInterval: time.Second * 8,
}

cfg, err := loadConfig()
Expand Down
79 changes: 56 additions & 23 deletions reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ func (r *reaper) pruneWait(ctx context.Context) (*resources, error) {
clients := 0
pruneCheck := time.NewTicker(r.cfg.ConnectionTimeout)
done := ctx.Done()
var shutdownDeadline time.Time
for {
select {
case addr := <-r.connected:
Expand All @@ -308,6 +309,7 @@ func (r *reaper) pruneWait(ctx context.Context) (*resources, error) {
// a pruneCheck after a timeout and setting done
// to nil so we don't enter this case again.
r.shutdownListener()
shutdownDeadline = time.Now().Add(r.cfg.ShutdownTimeout)
timeout := r.cfg.ShutdownTimeout
if clients == 0 {
// No clients connected, shutdown immediately.
Expand All @@ -317,17 +319,23 @@ func (r *reaper) pruneWait(ctx context.Context) (*resources, error) {
pruneCheck.Reset(timeout)
done = nil
case now := <-pruneCheck.C:
r.logger.Info("prune check", fieldClients, clients)

level := slog.LevelInfo
if clients > 0 {
r.logger.Warn("shutdown timeout", fieldClients, clients)
level = slog.LevelWarn
}
r.logger.Log(context.Background(), level, "prune check", fieldClients, clients) //nolint:contextcheck // Ensure log is written.

resources, err := r.resources(now.Add(r.cfg.RetryOffset)) //nolint:contextcheck // Needs its own context to ensure clean up completes.
if err != nil {
if errors.Is(err, errChangesDetected) {
r.logger.Warn("change detected, waiting again", fieldError, err)
continue
if shutdownDeadline.IsZero() || now.Before(shutdownDeadline) {
r.logger.Warn("change detected, waiting again", fieldError, err)
pruneCheck.Reset(r.cfg.ChangesRetryInterval)
continue
}

// Still changes detected after shutdown timeout, force best effort prune.
r.logger.Warn("shutdown timeout reached, forcing prune", fieldError, err)
}

return resources, fmt.Errorf("resources: %w", err)
Expand All @@ -338,58 +346,73 @@ func (r *reaper) pruneWait(ctx context.Context) (*resources, error) {
}
}

// resources returns the resources that match the collected filters.
// resources returns the resources that match the collected filters
// for which there are no changes detected.
func (r *reaper) resources(since time.Time) (*resources, error) {
var ret resources
var err error
var errs []error
// We combine errors so we can do best effort removal.
for _, args := range r.filterArgs() {
if ret.containers, err = r.affectedContainers(since, args); err != nil {
containers, err := r.affectedContainers(since, args)
if err != nil {
if !errors.Is(err, errChangesDetected) {
r.logger.Error("affected containers", fieldError, err)
}
errs = append(errs, fmt.Errorf("affected containers: %w", err))
}

if ret.networks, err = r.affectedNetworks(since, args); err != nil {
ret.containers = append(ret.containers, containers...)

networks, err := r.affectedNetworks(since, args)
if err != nil {
if !errors.Is(err, errChangesDetected) {
r.logger.Error("affected networks", fieldError, err)
}
errs = append(errs, fmt.Errorf("affected networks: %w", err))
}

if ret.volumes, err = r.affectedVolumes(since, args); err != nil {
ret.networks = append(ret.networks, networks...)

volumes, err := r.affectedVolumes(since, args)
if err != nil {
if !errors.Is(err, errChangesDetected) {
r.logger.Error("affected volumes", fieldError, err)
}
errs = append(errs, fmt.Errorf("affected volumes: %w", err))
}

if ret.images, err = r.affectedImages(since, args); err != nil {
ret.volumes = append(ret.volumes, volumes...)

images, err := r.affectedImages(since, args)
if err != nil {
if !errors.Is(err, errChangesDetected) {
r.logger.Error("affected images", fieldError, err)
}
errs = append(errs, fmt.Errorf("affected images: %w", err))
}

ret.images = append(ret.images, images...)
}

return &ret, errors.Join(errs...)
}

// affectedContainers returns a slice of container IDs that match the filters.
// If a matching container was created after since, an error is returned.
// If a matching container was created after since, an error is returned and
// the container is not included in the list.
func (r *reaper) affectedContainers(since time.Time, args filters.Args) ([]string, error) {
ctx, cancel := context.WithTimeout(context.Background(), r.cfg.RequestTimeout)
defer cancel()

// List all containers including stopped ones.
options := container.ListOptions{All: true, Filters: args}
r.logger.Debug("listing containers", "filter", options)
containers, err := r.client.ContainerList(ctx, options)
if err != nil {
return nil, fmt.Errorf("container list: %w", err)
}

var errChanges []error
containerIDs := make([]string, 0, len(containers))
for _, container := range containers {
if container.Labels[ryukLabel] == "true" {
Expand All @@ -416,17 +439,19 @@ func (r *reaper) affectedContainers(since time.Time, args filters.Args) ([]strin
if changed {
// Its not safe to remove a container which was created after
// the prune was initiated, as this may lead to unexpected behaviour.
return nil, fmt.Errorf("container %s: %w", container.ID, errChangesDetected)
errChanges = append(errChanges, fmt.Errorf("container %s: %w", container.ID, errChangesDetected))
continue
}

containerIDs = append(containerIDs, container.ID)
}

return containerIDs, nil
return containerIDs, errors.Join(errChanges...)
mdelapenya marked this conversation as resolved.
Show resolved Hide resolved
}

// affectedNetworks returns a list of network IDs that match the filters.
// If a matching network was created after since, an error is returned.
// If a matching network was created after since, an error is returned and
// the network is not included in the list.
func (r *reaper) affectedNetworks(since time.Time, args filters.Args) ([]string, error) {
ctx, cancel := context.WithTimeout(context.Background(), r.cfg.RequestTimeout)
defer cancel()
Expand All @@ -438,6 +463,7 @@ func (r *reaper) affectedNetworks(since time.Time, args filters.Args) ([]string,
return nil, fmt.Errorf("network list: %w", err)
}

var errChanges []error
networks := make([]string, 0, len(report))
for _, network := range report {
changed := network.Created.After(since)
Expand All @@ -451,17 +477,19 @@ func (r *reaper) affectedNetworks(since time.Time, args filters.Args) ([]string,
if changed {
// Its not safe to remove a network which was created after
// the prune was initiated, as this may lead to unexpected behaviour.
return nil, fmt.Errorf("network %s: %w", network.ID, errChangesDetected)
errChanges = append(errChanges, fmt.Errorf("network %s: %w", network.ID, errChangesDetected))
continue
}

networks = append(networks, network.ID)
}

return networks, nil
return networks, errors.Join(errChanges...)
}

// affectedVolumes returns a list of volume names that match the filters.
// If a matching volume was created after since, an error is returned.
// If a matching volume was created after since, an error is returned and
// the volume is not included in the list.
func (r *reaper) affectedVolumes(since time.Time, args filters.Args) ([]string, error) {
ctx, cancel := context.WithTimeout(context.Background(), r.cfg.RequestTimeout)
defer cancel()
Expand All @@ -473,6 +501,7 @@ func (r *reaper) affectedVolumes(since time.Time, args filters.Args) ([]string,
return nil, fmt.Errorf("volume list: %w", err)
}

var errChanges []error
volumes := make([]string, 0, len(report.Volumes))
for _, volume := range report.Volumes {
created, perr := time.Parse(time.RFC3339, volume.CreatedAt)
Expand All @@ -493,17 +522,19 @@ func (r *reaper) affectedVolumes(since time.Time, args filters.Args) ([]string,
if changed {
// Its not safe to remove a volume which was created after
// the prune was initiated, as this may lead to unexpected behaviour.
return nil, fmt.Errorf("volume %s: %w", volume.Name, errChangesDetected)
errChanges = append(errChanges, fmt.Errorf("volume %s: %w", volume.Name, errChangesDetected))
continue
}

volumes = append(volumes, volume.Name)
}

return volumes, nil
return volumes, errors.Join(errChanges...)
}

// affectedImages returns a list of image IDs that match the filters.
// If a matching volume was created after since, an error is returned.
// If a matching image was created after since, an error is returned and
// the image is not included in the list.
func (r *reaper) affectedImages(since time.Time, args filters.Args) ([]string, error) {
ctx, cancel := context.WithTimeout(context.Background(), r.cfg.RequestTimeout)
defer cancel()
Expand All @@ -515,6 +546,7 @@ func (r *reaper) affectedImages(since time.Time, args filters.Args) ([]string, e
return nil, fmt.Errorf("image list: %w", err)
}

var errChanges []error
images := make([]string, 0, len(report))
for _, image := range report {
created := time.Unix(image.Created, 0)
Expand All @@ -529,13 +561,14 @@ func (r *reaper) affectedImages(since time.Time, args filters.Args) ([]string, e
if changed {
// Its not safe to remove an image which was created after
// the prune was initiated, as this may lead to unexpected behaviour.
return nil, fmt.Errorf("image %s: %w", image.ID, errChangesDetected)
errChanges = append(errChanges, fmt.Errorf("image %s: %w", image.ID, errChangesDetected))
continue
}

images = append(images, image.ID)
}

return images, nil
return images, errors.Join(errChanges...)
}

// addFilter adds a filter to prune.
Expand Down
Loading
Loading