Skip to content

Commit

Permalink
fix: multiple filters
Browse files Browse the repository at this point in the history
Fix multiple filters which was only applying the removal of the last
iterated set.

Add tests to validate that:
* Multiple filters are correctly processed
* Duplicate filters are correctly ignored
* Immediate shutdown occurs if no clients

This increasing code coverage to 89.4%.

Use best effort in removal if resource changes are detected after
a shutdown has been signalled to allow as much to be cleaned up as
possible.

This includes a new setting ChangesRetryInterval exposed by the env
variable RYUK_CHANGES_RETRY_INTERVAL and defaults to 1 second which
is used to control the interval between retries if resource changes
are detected.
  • Loading branch information
stevenh committed Oct 23, 2024
1 parent cbdc142 commit 37c3a18
Show file tree
Hide file tree
Showing 6 changed files with 345 additions and 160 deletions.
23 changes: 12 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ printf "label=something_else" | nc -N localhost 8080
In the ryuk window you'll see containers/networks/volumes deleted after 10s

```log
time=2024-09-30T19:42:30.000+01:00 level=INFO msg=starting connection_timeout=1m0s reconnection_timeout=10s request_timeout=10s shutdown_timeout=10m0s remove_retries=10 retry_offset=-1s port=8080 verbose=false
time=2024-09-30T19:42:30.000+01:00 level=INFO msg=starting connection_timeout=1m0s reconnection_timeout=10s request_timeout=10s shutdown_timeout=10m0s remove_retries=10 retry_offset=-1s changes_retry_interval=1s port=8080 verbose=false
time=2024-09-30T19:42:30.001+01:00 level=INFO msg="Started"
time=2024-09-30T19:42:30.001+01:00 level=INFO msg="client processing started"
time=2024-09-30T19:42:38.002+01:00 level=INFO msg="client connected" address=127.0.0.1:56432 clients=1
Expand All @@ -68,13 +68,14 @@ time=2024-09-30T19:42:52.216+01:00 level=INFO msg=done

The following environment variables can be configured to change the behaviour:

| Environment Variable | Default | Format | Description |
| --------------------------- | ------- | ------- | ------------ |
| `RYUK_CONNECTION_TIMEOUT` | `60s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The duration without receiving any connections which will trigger a shutdown |
| `RYUK_PORT` | `8080` | `uint16` | The port to listen on for connections |
| `RYUK_RECONNECTION_TIMEOUT` | `10s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The duration after the last connection closes which will trigger resource clean up and shutdown |
| `RYUK_REQUEST_TIMEOUT` | `10s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The timeout for any Docker requests |
| `RYUK_REMOVE_RETRIES` | `10` | `int` | The number of times to retry removing a resource |
| `RYUK_RETRY_OFFSET` | `-1s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The offset added to the start time of the prune pass that is used as the minimum resource creation time. Any resource created after this calculated time will trigger a retry to ensure in use resources are not removed |
| `RYUK_VERBOSE` | `false` | `bool` | Whether to enable verbose aka debug logging |
| `RYUK_SHUTDOWN_TIMEOUT` | `10m` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The duration after shutdown has been requested when the remaining connections are ignored and prune checks start |
| Environment Variable | Default | Format | Description |
| ----------------------------- | ------- | ------- | ------------ |
| `RYUK_CONNECTION_TIMEOUT` | `60s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The duration without receiving any connections which will trigger a shutdown |
| `RYUK_PORT` | `8080` | `uint16` | The port to listen on for connections |
| `RYUK_RECONNECTION_TIMEOUT` | `10s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The duration after the last connection closes which will trigger resource clean up and shutdown |
| `RYUK_REQUEST_TIMEOUT` | `10s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The timeout for any Docker requests |
| `RYUK_REMOVE_RETRIES` | `10` | `int` | The number of times to retry removing a resource |
| `RYUK_RETRY_OFFSET` | `-1s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The offset added to the start time of the prune pass that is used as the minimum resource creation time. Any resource created after this calculated time will trigger a retry to ensure in use resources are not removed |
| `RYUK_CHANGES_RETRY_INTERVAL` | `1s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The internal between retries if resource changes (containers, networks, images, and volumes) are detected while pruning |
| `RYUK_VERBOSE` | `false` | `bool` | Whether to enable verbose aka debug logging |
| `RYUK_SHUTDOWN_TIMEOUT` | `10m` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The duration after shutdown has been requested when the remaining connections are ignored and prune checks start |
5 changes: 5 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ type config struct {
// calculated time will trigger a retry to ensure in use resources are not removed.
RetryOffset time.Duration `env:"RYUK_RETRY_OFFSET" envDefault:"-1s"`

// ChangesRetryInterval is the internal between retries if resource changes (containers,
// networks, images, and volumes) are detected while pruning.
ChangesRetryInterval time.Duration `env:"RYUK_CHANGES_RETRY_INTERVAL" envDefault:"1s"`

// ShutdownTimeout is the maximum amount of time the reaper will wait
// for once signalled to shutdown before it terminates even if connections
// are still established.
Expand All @@ -49,6 +53,7 @@ func (c config) LogAttrs() []slog.Attr {
slog.Duration("shutdown_timeout", c.ShutdownTimeout),
slog.Int("remove_retries", c.RemoveRetries),
slog.Duration("retry_offset", c.RetryOffset),
slog.Duration("changes_retry_interval", c.ChangesRetryInterval),
slog.Int("port", int(c.Port)),
slog.Bool("verbose", c.Verbose),
}
Expand Down
33 changes: 18 additions & 15 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@ func Test_loadConfig(t *testing.T) {

t.Run("defaults", func(t *testing.T) {
expected := config{
Port: 8080,
ConnectionTimeout: time.Minute,
ReconnectionTimeout: time.Second * 10,
ShutdownTimeout: time.Minute * 10,
RemoveRetries: 10,
RequestTimeout: time.Second * 10,
RetryOffset: -time.Second,
Port: 8080,
ConnectionTimeout: time.Minute,
ReconnectionTimeout: time.Second * 10,
ShutdownTimeout: time.Minute * 10,
RemoveRetries: 10,
RequestTimeout: time.Second * 10,
RetryOffset: -time.Second,
ChangesRetryInterval: time.Second,
}

cfg, err := loadConfig()
Expand All @@ -53,16 +54,18 @@ func Test_loadConfig(t *testing.T) {
t.Setenv("RYUK_REQUEST_TIMEOUT", "4s")
t.Setenv("RYUK_REMOVE_RETRIES", "5")
t.Setenv("RYUK_RETRY_OFFSET", "-6s")
t.Setenv("RYUK_CHANGES_RETRY_INTERVAL", "8s")

expected := config{
Port: 1234,
ConnectionTimeout: time.Second * 2,
ReconnectionTimeout: time.Second * 3,
ShutdownTimeout: time.Second * 7,
Verbose: true,
RemoveRetries: 5,
RequestTimeout: time.Second * 4,
RetryOffset: -time.Second * 6,
Port: 1234,
ConnectionTimeout: time.Second * 2,
ReconnectionTimeout: time.Second * 3,
ShutdownTimeout: time.Second * 7,
Verbose: true,
RemoveRetries: 5,
RequestTimeout: time.Second * 4,
RetryOffset: -time.Second * 6,
ChangesRetryInterval: time.Second * 8,
}

cfg, err := loadConfig()
Expand Down
79 changes: 56 additions & 23 deletions reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ func (r *reaper) pruneWait(ctx context.Context) (*resources, error) {
clients := 0
pruneCheck := time.NewTicker(r.cfg.ConnectionTimeout)
done := ctx.Done()
var shutdownDeadline time.Time
for {
select {
case addr := <-r.connected:
Expand All @@ -308,6 +309,7 @@ func (r *reaper) pruneWait(ctx context.Context) (*resources, error) {
// a pruneCheck after a timeout and setting done
// to nil so we don't enter this case again.
r.shutdownListener()
shutdownDeadline = time.Now().Add(r.cfg.ShutdownTimeout)
timeout := r.cfg.ShutdownTimeout
if clients == 0 {
// No clients connected, shutdown immediately.
Expand All @@ -317,17 +319,23 @@ func (r *reaper) pruneWait(ctx context.Context) (*resources, error) {
pruneCheck.Reset(timeout)
done = nil
case now := <-pruneCheck.C:
r.logger.Info("prune check", fieldClients, clients)

level := slog.LevelInfo
if clients > 0 {
r.logger.Warn("shutdown timeout", fieldClients, clients)
level = slog.LevelWarn
}
r.logger.Log(context.Background(), level, "prune check", fieldClients, clients) //nolint:contextcheck // Ensure log is written.

resources, err := r.resources(now.Add(r.cfg.RetryOffset)) //nolint:contextcheck // Needs its own context to ensure clean up completes.
if err != nil {
if errors.Is(err, errChangesDetected) {
r.logger.Warn("change detected, waiting again", fieldError, err)
continue
if shutdownDeadline.IsZero() || now.Before(shutdownDeadline) {
r.logger.Warn("change detected, waiting again", fieldError, err)
pruneCheck.Reset(r.cfg.ChangesRetryInterval)
continue
}

// Still changes detected after shutdown timeout, force best effort prune.
r.logger.Warn("shutdown timeout reached, forcing prune", fieldError, err)
}

return resources, fmt.Errorf("resources: %w", err)
Expand All @@ -338,58 +346,73 @@ func (r *reaper) pruneWait(ctx context.Context) (*resources, error) {
}
}

// resources returns the resources that match the collected filters.
// resources returns the resources that match the collected filters
// for which there are no changes detected.
func (r *reaper) resources(since time.Time) (*resources, error) {
var ret resources
var err error
var errs []error
// We combine errors so we can do best effort removal.
for _, args := range r.filterArgs() {
if ret.containers, err = r.affectedContainers(since, args); err != nil {
containers, err := r.affectedContainers(since, args)
if err != nil {
if !errors.Is(err, errChangesDetected) {
r.logger.Error("affected containers", fieldError, err)
}
errs = append(errs, fmt.Errorf("affected containers: %w", err))
}

if ret.networks, err = r.affectedNetworks(since, args); err != nil {
ret.containers = append(ret.containers, containers...)

networks, err := r.affectedNetworks(since, args)
if err != nil {
if !errors.Is(err, errChangesDetected) {
r.logger.Error("affected networks", fieldError, err)
}
errs = append(errs, fmt.Errorf("affected networks: %w", err))
}

if ret.volumes, err = r.affectedVolumes(since, args); err != nil {
ret.networks = append(ret.networks, networks...)

volumes, err := r.affectedVolumes(since, args)
if err != nil {
if !errors.Is(err, errChangesDetected) {
r.logger.Error("affected volumes", fieldError, err)
}
errs = append(errs, fmt.Errorf("affected volumes: %w", err))
}

if ret.images, err = r.affectedImages(since, args); err != nil {
ret.volumes = append(ret.volumes, volumes...)

images, err := r.affectedImages(since, args)
if err != nil {
if !errors.Is(err, errChangesDetected) {
r.logger.Error("affected images", fieldError, err)
}
errs = append(errs, fmt.Errorf("affected images: %w", err))
}

ret.images = append(ret.images, images...)
}

return &ret, errors.Join(errs...)
}

// affectedContainers returns a slice of container IDs that match the filters.
// If a matching container was created after since, an error is returned.
// If a matching container was created after since, an error is returned and
// the container is not included in the list.
func (r *reaper) affectedContainers(since time.Time, args filters.Args) ([]string, error) {
ctx, cancel := context.WithTimeout(context.Background(), r.cfg.RequestTimeout)
defer cancel()

// List all containers including stopped ones.
options := container.ListOptions{All: true, Filters: args}
r.logger.Debug("listing containers", "filter", options)
containers, err := r.client.ContainerList(ctx, options)
if err != nil {
return nil, fmt.Errorf("container list: %w", err)
}

var errChanges []error
containerIDs := make([]string, 0, len(containers))
for _, container := range containers {
if container.Labels[ryukLabel] == "true" {
Expand All @@ -416,17 +439,19 @@ func (r *reaper) affectedContainers(since time.Time, args filters.Args) ([]strin
if changed {
// Its not safe to remove a container which was created after
// the prune was initiated, as this may lead to unexpected behaviour.
return nil, fmt.Errorf("container %s: %w", container.ID, errChangesDetected)
errChanges = append(errChanges, fmt.Errorf("container %s: %w", container.ID, errChangesDetected))
continue
}

containerIDs = append(containerIDs, container.ID)
}

return containerIDs, nil
return containerIDs, errors.Join(errChanges...)
}

// affectedNetworks returns a list of network IDs that match the filters.
// If a matching network was created after since, an error is returned.
// If a matching network was created after since, an error is returned and
// the network is not included in the list.
func (r *reaper) affectedNetworks(since time.Time, args filters.Args) ([]string, error) {
ctx, cancel := context.WithTimeout(context.Background(), r.cfg.RequestTimeout)
defer cancel()
Expand All @@ -438,6 +463,7 @@ func (r *reaper) affectedNetworks(since time.Time, args filters.Args) ([]string,
return nil, fmt.Errorf("network list: %w", err)
}

var errChanges []error
networks := make([]string, 0, len(report))
for _, network := range report {
changed := network.Created.After(since)
Expand All @@ -451,17 +477,19 @@ func (r *reaper) affectedNetworks(since time.Time, args filters.Args) ([]string,
if changed {
// Its not safe to remove a network which was created after
// the prune was initiated, as this may lead to unexpected behaviour.
return nil, fmt.Errorf("network %s: %w", network.ID, errChangesDetected)
errChanges = append(errChanges, fmt.Errorf("network %s: %w", network.ID, errChangesDetected))
continue
}

networks = append(networks, network.ID)
}

return networks, nil
return networks, errors.Join(errChanges...)
}

// affectedVolumes returns a list of volume names that match the filters.
// If a matching volume was created after since, an error is returned.
// If a matching volume was created after since, an error is returned and
// the volume is not included in the list.
func (r *reaper) affectedVolumes(since time.Time, args filters.Args) ([]string, error) {
ctx, cancel := context.WithTimeout(context.Background(), r.cfg.RequestTimeout)
defer cancel()
Expand All @@ -473,6 +501,7 @@ func (r *reaper) affectedVolumes(since time.Time, args filters.Args) ([]string,
return nil, fmt.Errorf("volume list: %w", err)
}

var errChanges []error
volumes := make([]string, 0, len(report.Volumes))
for _, volume := range report.Volumes {
created, perr := time.Parse(time.RFC3339, volume.CreatedAt)
Expand All @@ -493,17 +522,19 @@ func (r *reaper) affectedVolumes(since time.Time, args filters.Args) ([]string,
if changed {
// Its not safe to remove a volume which was created after
// the prune was initiated, as this may lead to unexpected behaviour.
return nil, fmt.Errorf("volume %s: %w", volume.Name, errChangesDetected)
errChanges = append(errChanges, fmt.Errorf("volume %s: %w", volume.Name, errChangesDetected))
continue
}

volumes = append(volumes, volume.Name)
}

return volumes, nil
return volumes, errors.Join(errChanges...)
}

// affectedImages returns a list of image IDs that match the filters.
// If a matching volume was created after since, an error is returned.
// If a matching image was created after since, an error is returned and
// the image is not included in the list.
func (r *reaper) affectedImages(since time.Time, args filters.Args) ([]string, error) {
ctx, cancel := context.WithTimeout(context.Background(), r.cfg.RequestTimeout)
defer cancel()
Expand All @@ -515,6 +546,7 @@ func (r *reaper) affectedImages(since time.Time, args filters.Args) ([]string, e
return nil, fmt.Errorf("image list: %w", err)
}

var errChanges []error
images := make([]string, 0, len(report))
for _, image := range report {
created := time.Unix(image.Created, 0)
Expand All @@ -529,13 +561,14 @@ func (r *reaper) affectedImages(since time.Time, args filters.Args) ([]string, e
if changed {
// Its not safe to remove an image which was created after
// the prune was initiated, as this may lead to unexpected behaviour.
return nil, fmt.Errorf("image %s: %w", image.ID, errChangesDetected)
errChanges = append(errChanges, fmt.Errorf("image %s: %w", image.ID, errChangesDetected))
continue
}

images = append(images, image.ID)
}

return images, nil
return images, errors.Join(errChanges...)
}

// addFilter adds a filter to prune.
Expand Down
Loading

0 comments on commit 37c3a18

Please sign in to comment.