Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Merge pull request #3228 from marshallford/command-timeout
Browse files Browse the repository at this point in the history
feat: optional command timeout field in config file that defaults  to syncTimeout
  • Loading branch information
squaremo authored Oct 27, 2020
2 parents 925083d + 01beef5 commit 201200c
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 69 deletions.
2 changes: 1 addition & 1 deletion pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type repo interface {
func (d *Daemon) getManifestStore(r repo) (manifests.Store, error) {
absPaths := git.MakeAbsolutePaths(r, d.GitConfig.Paths)
if d.ManifestGenerationEnabled {
return manifests.NewConfigAware(r.Dir(), absPaths, d.Manifests)
return manifests.NewConfigAware(r.Dir(), absPaths, d.Manifests, d.SyncTimeout)
}
return manifests.NewRawFiles(r.Dir(), absPaths, d.Manifests), nil
}
Expand Down
19 changes: 12 additions & 7 deletions pkg/manifests/configaware.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"path/filepath"
"strings"
"sync"
"time"

"github.com/fluxcd/flux/pkg/image"
"github.com/fluxcd/flux/pkg/resource"
Expand All @@ -31,12 +32,15 @@ type configAware struct {
// have a set of resources
mu sync.RWMutex
resourcesByID map[string]resourceWithOrigin

// default command timeout
defaultTimeout time.Duration
}

// NewConfigAware constructs a `Store` that processes in-repo config
// files (`.flux.yaml`) where present, and otherwise looks for "raw"
// YAML files.
func NewConfigAware(baseDir string, targetPaths []string, manifests Manifests) (*configAware, error) {
func NewConfigAware(baseDir string, targetPaths []string, manifests Manifests, syncTimeout time.Duration) (*configAware, error) {
configFiles, rawManifestDirs, err := splitConfigFilesAndRawManifestPaths(baseDir, targetPaths)
if err != nil {
return nil, err
Expand All @@ -48,9 +52,10 @@ func NewConfigAware(baseDir string, targetPaths []string, manifests Manifests) (
baseDir: baseDir,
paths: rawManifestDirs,
},
manifests: manifests,
baseDir: baseDir,
configFiles: configFiles,
manifests: manifests,
baseDir: baseDir,
configFiles: configFiles,
defaultTimeout: syncTimeout,
}
return result, nil
}
Expand Down Expand Up @@ -146,7 +151,7 @@ func (ca *configAware) SetWorkloadContainerImage(ctx context.Context, resourceID
if err := ca.rawFiles.setManifestWorkloadContainerImage(resWithOrigin.resource, container, newImageID); err != nil {
return err
}
} else if err := resWithOrigin.configFile.SetWorkloadContainerImage(ctx, ca.manifests, resWithOrigin.resource, container, newImageID); err != nil {
} else if err := resWithOrigin.configFile.SetWorkloadContainerImage(ctx, ca.manifests, resWithOrigin.resource, container, newImageID, ca.defaultTimeout); err != nil {
return err
}
// Reset resources, since we have modified one
Expand All @@ -168,7 +173,7 @@ func (ca *configAware) UpdateWorkloadPolicies(ctx context.Context, resourceID re
changed, err = ca.rawFiles.updateManifestWorkloadPolicies(resWithOrigin.resource, update)
} else {
cf := resWithOrigin.configFile
changed, err = cf.UpdateWorkloadPolicies(ctx, ca.manifests, resWithOrigin.resource, update)
changed, err = cf.UpdateWorkloadPolicies(ctx, ca.manifests, resWithOrigin.resource, update, ca.defaultTimeout)
}
if err != nil {
return false, err
Expand Down Expand Up @@ -210,7 +215,7 @@ func (ca *configAware) getResourcesByID(ctx context.Context) (map[string]resourc
}

for _, cf := range ca.configFiles {
resourceManifests, err := cf.GenerateManifests(ctx, ca.manifests)
resourceManifests, err := cf.GenerateManifests(ctx, ca.manifests, ca.defaultTimeout)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/manifests/configaware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"path/filepath"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -53,7 +54,7 @@ func setup(t *testing.T, paths []string, configs ...config) (*configAware, strin
ioutil.WriteFile(filepath.Join(baseDir, p, ConfigFilename), []byte(c.fluxyaml), 0600)
}
}
frs, err := NewConfigAware(baseDir, searchPaths, manifests)
frs, err := NewConfigAware(baseDir, searchPaths, manifests, time.Minute)
assert.NoError(t, err)
return frs, baseDir, cleanup
}
Expand Down
104 changes: 53 additions & 51 deletions pkg/manifests/configfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,27 @@ import (
"github.com/ghodss/yaml"
"github.com/pkg/errors"
jsonschema "github.com/xeipuuv/gojsonschema"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/fluxcd/flux/pkg/image"
"github.com/fluxcd/flux/pkg/resource"
)

const (
ConfigFilename = ".flux.yaml"
CommandTimeout = time.Minute
)

// This is easier to read as YAML, trust me.
const configSchemaYAML = `
"$schema": http://json-schema.org/draft-07/schema#
definitions:
command:
type: object
required: ['command']
type: object
properties:
command: { type: string }
timeout: { type: string }
additionalProperties: false
version: { const: 1 }
type: object
oneOf:
Expand Down Expand Up @@ -110,39 +114,29 @@ type ConfigFile struct {
// CommandUpdated represents a config in which updates are done by
// execing commands as given.
type CommandUpdated struct {
Generators []Generator `json:"generators"`
Updaters []Updater `json:"updaters,omitempty"`
Generators []Command `json:"generators"`
Updaters []Updater `json:"updaters,omitempty"`
}

// Generator is an individual command for generating manifests.
type Generator struct {
Command string `json:"command,omitempty"`
// Command is an individual command and timeout for generating manifests.
type Command struct {
Command string `json:"command,omitempty"`
Timeout *metav1.Duration `json:"timeout,omitempty"`
}

// Updater gives a means for updating image refs and a means for
// updating policy in a manifest.
type Updater struct {
ContainerImage ContainerImageUpdater `json:"containerImage,omitempty"`
Policy PolicyUpdater `json:"policy,omitempty"`
}

// ContainerImageUpdater is a command for updating the image used by a
// container, in a manifest.
type ContainerImageUpdater struct {
Command string `json:"command,omitempty"`
}

// PolicyUpdater is a command for updating a policy for a manifest.
type PolicyUpdater struct {
Command string `json:"command,omitempty"`
ContainerImage Command `json:"containerImage,omitempty"`
Policy Command `json:"policy,omitempty"`
}

// PatchUpdated represents a config in which updates are done by
// maintaining a patch, which is calculating from, and applied to, the
// generated manifests.
type PatchUpdated struct {
Generators []Generator `json:"generators"`
PatchFile string `json:"patchFile,omitempty"`
Generators []Command `json:"generators"`
PatchFile string `json:"patchFile,omitempty"`
generatorsResultCache []byte
}

Expand Down Expand Up @@ -229,23 +223,23 @@ func (cf *ConfigFile) ConfigRelativeToWorkingDir() string {

// GenerateManifests returns the manifests generated (and patched, if
// necessary) according to the config file.
func (cf *ConfigFile) GenerateManifests(ctx context.Context, manifests Manifests) ([]byte, error) {
func (cf *ConfigFile) GenerateManifests(ctx context.Context, manifests Manifests, defaultTimeout time.Duration) ([]byte, error) {
if cf.PatchUpdated != nil {
_, finalBytes, _, err := cf.getGeneratedAndPatchedManifests(ctx, manifests)
_, finalBytes, _, err := cf.getGeneratedAndPatchedManifests(ctx, manifests, defaultTimeout)
return finalBytes, err
}
return cf.getGeneratedManifests(ctx, manifests, cf.CommandUpdated.Generators)
return cf.getGeneratedManifests(ctx, manifests, cf.CommandUpdated.Generators, defaultTimeout)
}

func (cf *ConfigFile) SetWorkloadContainerImage(ctx context.Context, manifests Manifests, r resource.Resource, container string, newImageID image.Ref) error {
func (cf *ConfigFile) SetWorkloadContainerImage(ctx context.Context, manifests Manifests, r resource.Resource, container string, newImageID image.Ref, defaultTimeout time.Duration) error {
if cf.PatchUpdated != nil {
return cf.updatePatchFile(ctx, manifests, func(previousManifests []byte) ([]byte, error) {
return manifests.SetWorkloadContainerImage(previousManifests, r.ResourceID(), container, newImageID)
})
}, defaultTimeout)
}

// Command-updated
result := cf.execContainerImageUpdaters(ctx, r.ResourceID(), container, newImageID.Name.String(), newImageID.Tag)
result := cf.execContainerImageUpdaters(ctx, r.ResourceID(), container, newImageID.Name.String(), newImageID.Tag, defaultTimeout)
if len(result) == 0 {
return makeNoCommandsRunErr("update.containerImage", cf)
}
Expand All @@ -264,7 +258,7 @@ func (cf *ConfigFile) SetWorkloadContainerImage(ctx context.Context, manifests M

// UpdateWorkloadPolicies updates policies for a workload, using
// commands or patching according to the config file.
func (cf *ConfigFile) UpdateWorkloadPolicies(ctx context.Context, manifests Manifests, r resource.Resource, update resource.PolicyUpdate) (bool, error) {
func (cf *ConfigFile) UpdateWorkloadPolicies(ctx context.Context, manifests Manifests, r resource.Resource, update resource.PolicyUpdate, defaultTimeout time.Duration) (bool, error) {
if cf.PatchUpdated != nil {
var changed bool
err := cf.updatePatchFile(ctx, manifests, func(previousManifests []byte) ([]byte, error) {
Expand All @@ -273,7 +267,7 @@ func (cf *ConfigFile) UpdateWorkloadPolicies(ctx context.Context, manifests Mani
changed = bytes.Compare(previousManifests, updatedManifests) != 0
}
return updatedManifests, err
})
}, defaultTimeout)
return changed, err
}

Expand All @@ -288,7 +282,7 @@ func (cf *ConfigFile) UpdateWorkloadPolicies(ctx context.Context, manifests Mani
}

for key, value := range changes {
result := cf.execPolicyUpdaters(ctx, r.ResourceID(), key, value)
result := cf.execPolicyUpdaters(ctx, r.ResourceID(), key, value, defaultTimeout)
if len(result) == 0 {
return false, makeNoCommandsRunErr("updaters.policy", cf)
}
Expand Down Expand Up @@ -324,11 +318,11 @@ type ConfigFileCombinedExecResult struct {

// getGeneratedAndPatchedManifests is used to generate manifests when
// the config is patchUpdated.
func (cf *ConfigFile) getGeneratedAndPatchedManifests(ctx context.Context, manifests Manifests) ([]byte, []byte, string, error) {
func (cf *ConfigFile) getGeneratedAndPatchedManifests(ctx context.Context, manifests Manifests, defaultTimeout time.Duration) ([]byte, []byte, string, error) {
generatedManifests := cf.PatchUpdated.generatorsResultCache
if generatedManifests == nil {
var err error
generatedManifests, err = cf.getGeneratedManifests(ctx, manifests, cf.PatchUpdated.Generators)
generatedManifests, err = cf.getGeneratedManifests(ctx, manifests, cf.PatchUpdated.Generators, defaultTimeout)
if err != nil {
return nil, nil, "", err
}
Expand Down Expand Up @@ -364,9 +358,9 @@ func (cf *ConfigFile) getGeneratedAndPatchedManifests(ctx context.Context, manif
// getGeneratedManifests is used to produce the manifests based _only_
// on the generators in the config. This is sufficient for
// commandUpdated config, and the first step for patchUpdated config.
func (cf *ConfigFile) getGeneratedManifests(ctx context.Context, manifests Manifests, generators []Generator) ([]byte, error) {
func (cf *ConfigFile) getGeneratedManifests(ctx context.Context, manifests Manifests, generators []Command, defaultTimeout time.Duration) ([]byte, error) {
buf := bytes.NewBuffer(nil)
for i, cmdResult := range cf.execGenerators(ctx, generators) {
for i, cmdResult := range cf.execGenerators(ctx, generators, defaultTimeout) {
if cmdResult.Error != nil {
err := fmt.Errorf("error executing generator command %q from file %q: %s\nerror output:\n%s\ngenerated output:\n%s",
generators[i].Command,
Expand All @@ -386,8 +380,8 @@ func (cf *ConfigFile) getGeneratedManifests(ctx context.Context, manifests Manif

// updatePatchFile calculates the patch given a transformation, and
// updates the patch file given in the config.
func (cf *ConfigFile) updatePatchFile(ctx context.Context, manifests Manifests, updateFn func(previousManifests []byte) ([]byte, error)) error {
generatedManifests, patchedManifests, patchFilePath, err := cf.getGeneratedAndPatchedManifests(ctx, manifests)
func (cf *ConfigFile) updatePatchFile(ctx context.Context, manifests Manifests, updateFn func(previousManifests []byte) ([]byte, error), defaultTimeout time.Duration) error {
generatedManifests, patchedManifests, patchFilePath, err := cf.getGeneratedAndPatchedManifests(ctx, manifests, defaultTimeout)
if err != nil {
return fmt.Errorf("error parsing generated, patched output from file %s: %s", cf.configPathRelative, err)
}
Expand All @@ -404,12 +398,16 @@ func (cf *ConfigFile) updatePatchFile(ctx context.Context, manifests Manifests,

// execGenerators executes all the generators given and returns the
// results; it will stop at the first failing command.
func (cf *ConfigFile) execGenerators(ctx context.Context, generators []Generator) []ConfigFileExecResult {
func (cf *ConfigFile) execGenerators(ctx context.Context, generators []Command, defaultTimeout time.Duration) []ConfigFileExecResult {
result := []ConfigFileExecResult{}
for _, g := range generators {
stdErr := bytes.NewBuffer(nil)
stdOut := bytes.NewBuffer(nil)
err := cf.execCommand(ctx, nil, stdOut, stdErr, g.Command)
timeout := defaultTimeout
if g.Timeout != nil && g.Timeout.Duration >= time.Second {
timeout = g.Timeout.Duration
}
err := cf.execCommand(ctx, nil, stdOut, stdErr, g.Command, timeout)
r := ConfigFileExecResult{
Stdout: stdOut.Bytes(),
Stderr: stdErr.Bytes(),
Expand All @@ -427,52 +425,56 @@ func (cf *ConfigFile) execGenerators(ctx context.Context, generators []Generator
// execContainerImageUpdaters executes all the image updates in the configuration file.
// It will stop at the first error, in which case the returned error will be non-nil
func (cf *ConfigFile) execContainerImageUpdaters(ctx context.Context,
workload resource.ID, container string, image, imageTag string) []ConfigFileCombinedExecResult {
workload resource.ID, container string, image, imageTag string, defaultTimeout time.Duration) []ConfigFileCombinedExecResult {
env := makeEnvFromResourceID(workload)
env = append(env,
"FLUX_CONTAINER="+container,
"FLUX_IMG="+image,
"FLUX_TAG="+imageTag,
)
commands := []string{}
commands := []Command{}
var updaters []Updater
if cf.CommandUpdated != nil {
updaters = cf.CommandUpdated.Updaters
}
for _, u := range updaters {
commands = append(commands, u.ContainerImage.Command)
commands = append(commands, u.ContainerImage)
}
return cf.execCommandsWithCombinedOutput(ctx, env, commands)
return cf.execCommandsWithCombinedOutput(ctx, env, commands, defaultTimeout)
}

// execPolicyUpdaters executes all the policy update commands given in
// the configuration file. An empty policyValue means remove the
// policy. It will stop at the first error, in which case the returned
// error will be non-nil
func (cf *ConfigFile) execPolicyUpdaters(ctx context.Context,
workload resource.ID, policyName, policyValue string) []ConfigFileCombinedExecResult {
workload resource.ID, policyName, policyValue string, defaultTimeout time.Duration) []ConfigFileCombinedExecResult {
env := makeEnvFromResourceID(workload)
env = append(env, "FLUX_POLICY="+policyName)
if policyValue != "" {
env = append(env, "FLUX_POLICY_VALUE="+policyValue)
}
commands := []string{}
commands := []Command{}
var updaters []Updater
if cf.CommandUpdated != nil {
updaters = cf.CommandUpdated.Updaters
}
for _, u := range updaters {
commands = append(commands, u.Policy.Command)
commands = append(commands, u.Policy)
}
return cf.execCommandsWithCombinedOutput(ctx, env, commands)
return cf.execCommandsWithCombinedOutput(ctx, env, commands, defaultTimeout)
}

func (cf *ConfigFile) execCommandsWithCombinedOutput(ctx context.Context, env []string, commands []string) []ConfigFileCombinedExecResult {
func (cf *ConfigFile) execCommandsWithCombinedOutput(ctx context.Context, env []string, commands []Command, defaultTimeout time.Duration) []ConfigFileCombinedExecResult {
env = append(env, "PATH="+os.Getenv("PATH"))
result := []ConfigFileCombinedExecResult{}
for _, c := range commands {
stdOutAndErr := bytes.NewBuffer(nil)
err := cf.execCommand(ctx, env, stdOutAndErr, stdOutAndErr, c)
timeout := defaultTimeout
if c.Timeout != nil && c.Timeout.Duration >= time.Second {
timeout = c.Timeout.Duration
}
err := cf.execCommand(ctx, env, stdOutAndErr, stdOutAndErr, c.Command, timeout)
r := ConfigFileCombinedExecResult{
Output: stdOutAndErr.Bytes(),
Error: err,
Expand All @@ -486,8 +488,8 @@ func (cf *ConfigFile) execCommandsWithCombinedOutput(ctx context.Context, env []
return result
}

func (cf *ConfigFile) execCommand(ctx context.Context, env []string, stdOut, stdErr io.Writer, command string) error {
cmdCtx, cancel := context.WithTimeout(ctx, CommandTimeout)
func (cf *ConfigFile) execCommand(ctx context.Context, env []string, stdOut, stdErr io.Writer, command string, timeout time.Duration) error {
cmdCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
cmd := exec.CommandContext(ctx, "/bin/sh", "-c", command)
cmd.Env = env
Expand Down
Loading

0 comments on commit 201200c

Please sign in to comment.