From 01beef51261e464fd6ab550fb12da91df9222174 Mon Sep 17 00:00:00 2001 From: Marshall Ford Date: Fri, 31 Jul 2020 18:02:11 -0500 Subject: [PATCH] Let each .flux.yaml command have its own timeout This gives more flexibility to .flux.yaml authors, by letting them give a timeout (as a duration e.g., `"30s"`) for each command individually. Signed-off-by: Marshall Ford --- pkg/daemon/daemon.go | 2 +- pkg/manifests/configaware.go | 19 ++++-- pkg/manifests/configaware_test.go | 3 +- pkg/manifests/configfile.go | 104 +++++++++++++++--------------- pkg/manifests/configfile_test.go | 43 +++++++++--- 5 files changed, 102 insertions(+), 69 deletions(-) diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 4db68d423..3477b756c 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -73,7 +73,7 @@ type repo interface { func (d *Daemon) getManifestStore(r repo) (manifests.Store, error) { absPaths := git.MakeAbsolutePaths(r, d.GitConfig.Paths) if d.ManifestGenerationEnabled { - return manifests.NewConfigAware(r.Dir(), absPaths, d.Manifests) + return manifests.NewConfigAware(r.Dir(), absPaths, d.Manifests, d.SyncTimeout) } return manifests.NewRawFiles(r.Dir(), absPaths, d.Manifests), nil } diff --git a/pkg/manifests/configaware.go b/pkg/manifests/configaware.go index 721397a6c..bb6b2c85d 100644 --- a/pkg/manifests/configaware.go +++ b/pkg/manifests/configaware.go @@ -7,6 +7,7 @@ import ( "path/filepath" "strings" "sync" + "time" "github.com/fluxcd/flux/pkg/image" "github.com/fluxcd/flux/pkg/resource" @@ -31,12 +32,15 @@ type configAware struct { // have a set of resources mu sync.RWMutex resourcesByID map[string]resourceWithOrigin + + // default command timeout + defaultTimeout time.Duration } // NewConfigAware constructs a `Store` that processes in-repo config // files (`.flux.yaml`) where present, and otherwise looks for "raw" // YAML files. -func NewConfigAware(baseDir string, targetPaths []string, manifests Manifests) (*configAware, error) { +func NewConfigAware(baseDir string, targetPaths []string, manifests Manifests, syncTimeout time.Duration) (*configAware, error) { configFiles, rawManifestDirs, err := splitConfigFilesAndRawManifestPaths(baseDir, targetPaths) if err != nil { return nil, err @@ -48,9 +52,10 @@ func NewConfigAware(baseDir string, targetPaths []string, manifests Manifests) ( baseDir: baseDir, paths: rawManifestDirs, }, - manifests: manifests, - baseDir: baseDir, - configFiles: configFiles, + manifests: manifests, + baseDir: baseDir, + configFiles: configFiles, + defaultTimeout: syncTimeout, } return result, nil } @@ -146,7 +151,7 @@ func (ca *configAware) SetWorkloadContainerImage(ctx context.Context, resourceID if err := ca.rawFiles.setManifestWorkloadContainerImage(resWithOrigin.resource, container, newImageID); err != nil { return err } - } else if err := resWithOrigin.configFile.SetWorkloadContainerImage(ctx, ca.manifests, resWithOrigin.resource, container, newImageID); err != nil { + } else if err := resWithOrigin.configFile.SetWorkloadContainerImage(ctx, ca.manifests, resWithOrigin.resource, container, newImageID, ca.defaultTimeout); err != nil { return err } // Reset resources, since we have modified one @@ -168,7 +173,7 @@ func (ca *configAware) UpdateWorkloadPolicies(ctx context.Context, resourceID re changed, err = ca.rawFiles.updateManifestWorkloadPolicies(resWithOrigin.resource, update) } else { cf := resWithOrigin.configFile - changed, err = cf.UpdateWorkloadPolicies(ctx, ca.manifests, resWithOrigin.resource, update) + changed, err = cf.UpdateWorkloadPolicies(ctx, ca.manifests, resWithOrigin.resource, update, ca.defaultTimeout) } if err != nil { return false, err @@ -210,7 +215,7 @@ func (ca *configAware) getResourcesByID(ctx context.Context) (map[string]resourc } for _, cf := range ca.configFiles { - resourceManifests, err := cf.GenerateManifests(ctx, ca.manifests) + resourceManifests, err := cf.GenerateManifests(ctx, ca.manifests, ca.defaultTimeout) if err != nil { return nil, err } diff --git a/pkg/manifests/configaware_test.go b/pkg/manifests/configaware_test.go index cab460de7..723d30e30 100644 --- a/pkg/manifests/configaware_test.go +++ b/pkg/manifests/configaware_test.go @@ -6,6 +6,7 @@ import ( "os" "path/filepath" "testing" + "time" "github.com/go-kit/kit/log" "github.com/stretchr/testify/assert" @@ -53,7 +54,7 @@ func setup(t *testing.T, paths []string, configs ...config) (*configAware, strin ioutil.WriteFile(filepath.Join(baseDir, p, ConfigFilename), []byte(c.fluxyaml), 0600) } } - frs, err := NewConfigAware(baseDir, searchPaths, manifests) + frs, err := NewConfigAware(baseDir, searchPaths, manifests, time.Minute) assert.NoError(t, err) return frs, baseDir, cleanup } diff --git a/pkg/manifests/configfile.go b/pkg/manifests/configfile.go index 0ad035a78..7da67089a 100644 --- a/pkg/manifests/configfile.go +++ b/pkg/manifests/configfile.go @@ -14,6 +14,7 @@ import ( "github.com/ghodss/yaml" "github.com/pkg/errors" jsonschema "github.com/xeipuuv/gojsonschema" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/fluxcd/flux/pkg/image" "github.com/fluxcd/flux/pkg/resource" @@ -21,7 +22,6 @@ import ( const ( ConfigFilename = ".flux.yaml" - CommandTimeout = time.Minute ) // This is easier to read as YAML, trust me. @@ -29,8 +29,12 @@ const configSchemaYAML = ` "$schema": http://json-schema.org/draft-07/schema# definitions: command: - type: object required: ['command'] + type: object + properties: + command: { type: string } + timeout: { type: string } + additionalProperties: false version: { const: 1 } type: object oneOf: @@ -110,39 +114,29 @@ type ConfigFile struct { // CommandUpdated represents a config in which updates are done by // execing commands as given. type CommandUpdated struct { - Generators []Generator `json:"generators"` - Updaters []Updater `json:"updaters,omitempty"` + Generators []Command `json:"generators"` + Updaters []Updater `json:"updaters,omitempty"` } -// Generator is an individual command for generating manifests. -type Generator struct { - Command string `json:"command,omitempty"` +// Command is an individual command and timeout for generating manifests. +type Command struct { + Command string `json:"command,omitempty"` + Timeout *metav1.Duration `json:"timeout,omitempty"` } // Updater gives a means for updating image refs and a means for // updating policy in a manifest. type Updater struct { - ContainerImage ContainerImageUpdater `json:"containerImage,omitempty"` - Policy PolicyUpdater `json:"policy,omitempty"` -} - -// ContainerImageUpdater is a command for updating the image used by a -// container, in a manifest. -type ContainerImageUpdater struct { - Command string `json:"command,omitempty"` -} - -// PolicyUpdater is a command for updating a policy for a manifest. -type PolicyUpdater struct { - Command string `json:"command,omitempty"` + ContainerImage Command `json:"containerImage,omitempty"` + Policy Command `json:"policy,omitempty"` } // PatchUpdated represents a config in which updates are done by // maintaining a patch, which is calculating from, and applied to, the // generated manifests. type PatchUpdated struct { - Generators []Generator `json:"generators"` - PatchFile string `json:"patchFile,omitempty"` + Generators []Command `json:"generators"` + PatchFile string `json:"patchFile,omitempty"` generatorsResultCache []byte } @@ -229,23 +223,23 @@ func (cf *ConfigFile) ConfigRelativeToWorkingDir() string { // GenerateManifests returns the manifests generated (and patched, if // necessary) according to the config file. -func (cf *ConfigFile) GenerateManifests(ctx context.Context, manifests Manifests) ([]byte, error) { +func (cf *ConfigFile) GenerateManifests(ctx context.Context, manifests Manifests, defaultTimeout time.Duration) ([]byte, error) { if cf.PatchUpdated != nil { - _, finalBytes, _, err := cf.getGeneratedAndPatchedManifests(ctx, manifests) + _, finalBytes, _, err := cf.getGeneratedAndPatchedManifests(ctx, manifests, defaultTimeout) return finalBytes, err } - return cf.getGeneratedManifests(ctx, manifests, cf.CommandUpdated.Generators) + return cf.getGeneratedManifests(ctx, manifests, cf.CommandUpdated.Generators, defaultTimeout) } -func (cf *ConfigFile) SetWorkloadContainerImage(ctx context.Context, manifests Manifests, r resource.Resource, container string, newImageID image.Ref) error { +func (cf *ConfigFile) SetWorkloadContainerImage(ctx context.Context, manifests Manifests, r resource.Resource, container string, newImageID image.Ref, defaultTimeout time.Duration) error { if cf.PatchUpdated != nil { return cf.updatePatchFile(ctx, manifests, func(previousManifests []byte) ([]byte, error) { return manifests.SetWorkloadContainerImage(previousManifests, r.ResourceID(), container, newImageID) - }) + }, defaultTimeout) } // Command-updated - result := cf.execContainerImageUpdaters(ctx, r.ResourceID(), container, newImageID.Name.String(), newImageID.Tag) + result := cf.execContainerImageUpdaters(ctx, r.ResourceID(), container, newImageID.Name.String(), newImageID.Tag, defaultTimeout) if len(result) == 0 { return makeNoCommandsRunErr("update.containerImage", cf) } @@ -264,7 +258,7 @@ func (cf *ConfigFile) SetWorkloadContainerImage(ctx context.Context, manifests M // UpdateWorkloadPolicies updates policies for a workload, using // commands or patching according to the config file. -func (cf *ConfigFile) UpdateWorkloadPolicies(ctx context.Context, manifests Manifests, r resource.Resource, update resource.PolicyUpdate) (bool, error) { +func (cf *ConfigFile) UpdateWorkloadPolicies(ctx context.Context, manifests Manifests, r resource.Resource, update resource.PolicyUpdate, defaultTimeout time.Duration) (bool, error) { if cf.PatchUpdated != nil { var changed bool err := cf.updatePatchFile(ctx, manifests, func(previousManifests []byte) ([]byte, error) { @@ -273,7 +267,7 @@ func (cf *ConfigFile) UpdateWorkloadPolicies(ctx context.Context, manifests Mani changed = bytes.Compare(previousManifests, updatedManifests) != 0 } return updatedManifests, err - }) + }, defaultTimeout) return changed, err } @@ -288,7 +282,7 @@ func (cf *ConfigFile) UpdateWorkloadPolicies(ctx context.Context, manifests Mani } for key, value := range changes { - result := cf.execPolicyUpdaters(ctx, r.ResourceID(), key, value) + result := cf.execPolicyUpdaters(ctx, r.ResourceID(), key, value, defaultTimeout) if len(result) == 0 { return false, makeNoCommandsRunErr("updaters.policy", cf) } @@ -324,11 +318,11 @@ type ConfigFileCombinedExecResult struct { // getGeneratedAndPatchedManifests is used to generate manifests when // the config is patchUpdated. -func (cf *ConfigFile) getGeneratedAndPatchedManifests(ctx context.Context, manifests Manifests) ([]byte, []byte, string, error) { +func (cf *ConfigFile) getGeneratedAndPatchedManifests(ctx context.Context, manifests Manifests, defaultTimeout time.Duration) ([]byte, []byte, string, error) { generatedManifests := cf.PatchUpdated.generatorsResultCache if generatedManifests == nil { var err error - generatedManifests, err = cf.getGeneratedManifests(ctx, manifests, cf.PatchUpdated.Generators) + generatedManifests, err = cf.getGeneratedManifests(ctx, manifests, cf.PatchUpdated.Generators, defaultTimeout) if err != nil { return nil, nil, "", err } @@ -364,9 +358,9 @@ func (cf *ConfigFile) getGeneratedAndPatchedManifests(ctx context.Context, manif // getGeneratedManifests is used to produce the manifests based _only_ // on the generators in the config. This is sufficient for // commandUpdated config, and the first step for patchUpdated config. -func (cf *ConfigFile) getGeneratedManifests(ctx context.Context, manifests Manifests, generators []Generator) ([]byte, error) { +func (cf *ConfigFile) getGeneratedManifests(ctx context.Context, manifests Manifests, generators []Command, defaultTimeout time.Duration) ([]byte, error) { buf := bytes.NewBuffer(nil) - for i, cmdResult := range cf.execGenerators(ctx, generators) { + for i, cmdResult := range cf.execGenerators(ctx, generators, defaultTimeout) { if cmdResult.Error != nil { err := fmt.Errorf("error executing generator command %q from file %q: %s\nerror output:\n%s\ngenerated output:\n%s", generators[i].Command, @@ -386,8 +380,8 @@ func (cf *ConfigFile) getGeneratedManifests(ctx context.Context, manifests Manif // updatePatchFile calculates the patch given a transformation, and // updates the patch file given in the config. -func (cf *ConfigFile) updatePatchFile(ctx context.Context, manifests Manifests, updateFn func(previousManifests []byte) ([]byte, error)) error { - generatedManifests, patchedManifests, patchFilePath, err := cf.getGeneratedAndPatchedManifests(ctx, manifests) +func (cf *ConfigFile) updatePatchFile(ctx context.Context, manifests Manifests, updateFn func(previousManifests []byte) ([]byte, error), defaultTimeout time.Duration) error { + generatedManifests, patchedManifests, patchFilePath, err := cf.getGeneratedAndPatchedManifests(ctx, manifests, defaultTimeout) if err != nil { return fmt.Errorf("error parsing generated, patched output from file %s: %s", cf.configPathRelative, err) } @@ -404,12 +398,16 @@ func (cf *ConfigFile) updatePatchFile(ctx context.Context, manifests Manifests, // execGenerators executes all the generators given and returns the // results; it will stop at the first failing command. -func (cf *ConfigFile) execGenerators(ctx context.Context, generators []Generator) []ConfigFileExecResult { +func (cf *ConfigFile) execGenerators(ctx context.Context, generators []Command, defaultTimeout time.Duration) []ConfigFileExecResult { result := []ConfigFileExecResult{} for _, g := range generators { stdErr := bytes.NewBuffer(nil) stdOut := bytes.NewBuffer(nil) - err := cf.execCommand(ctx, nil, stdOut, stdErr, g.Command) + timeout := defaultTimeout + if g.Timeout != nil && g.Timeout.Duration >= time.Second { + timeout = g.Timeout.Duration + } + err := cf.execCommand(ctx, nil, stdOut, stdErr, g.Command, timeout) r := ConfigFileExecResult{ Stdout: stdOut.Bytes(), Stderr: stdErr.Bytes(), @@ -427,22 +425,22 @@ func (cf *ConfigFile) execGenerators(ctx context.Context, generators []Generator // execContainerImageUpdaters executes all the image updates in the configuration file. // It will stop at the first error, in which case the returned error will be non-nil func (cf *ConfigFile) execContainerImageUpdaters(ctx context.Context, - workload resource.ID, container string, image, imageTag string) []ConfigFileCombinedExecResult { + workload resource.ID, container string, image, imageTag string, defaultTimeout time.Duration) []ConfigFileCombinedExecResult { env := makeEnvFromResourceID(workload) env = append(env, "FLUX_CONTAINER="+container, "FLUX_IMG="+image, "FLUX_TAG="+imageTag, ) - commands := []string{} + commands := []Command{} var updaters []Updater if cf.CommandUpdated != nil { updaters = cf.CommandUpdated.Updaters } for _, u := range updaters { - commands = append(commands, u.ContainerImage.Command) + commands = append(commands, u.ContainerImage) } - return cf.execCommandsWithCombinedOutput(ctx, env, commands) + return cf.execCommandsWithCombinedOutput(ctx, env, commands, defaultTimeout) } // execPolicyUpdaters executes all the policy update commands given in @@ -450,29 +448,33 @@ func (cf *ConfigFile) execContainerImageUpdaters(ctx context.Context, // policy. It will stop at the first error, in which case the returned // error will be non-nil func (cf *ConfigFile) execPolicyUpdaters(ctx context.Context, - workload resource.ID, policyName, policyValue string) []ConfigFileCombinedExecResult { + workload resource.ID, policyName, policyValue string, defaultTimeout time.Duration) []ConfigFileCombinedExecResult { env := makeEnvFromResourceID(workload) env = append(env, "FLUX_POLICY="+policyName) if policyValue != "" { env = append(env, "FLUX_POLICY_VALUE="+policyValue) } - commands := []string{} + commands := []Command{} var updaters []Updater if cf.CommandUpdated != nil { updaters = cf.CommandUpdated.Updaters } for _, u := range updaters { - commands = append(commands, u.Policy.Command) + commands = append(commands, u.Policy) } - return cf.execCommandsWithCombinedOutput(ctx, env, commands) + return cf.execCommandsWithCombinedOutput(ctx, env, commands, defaultTimeout) } -func (cf *ConfigFile) execCommandsWithCombinedOutput(ctx context.Context, env []string, commands []string) []ConfigFileCombinedExecResult { +func (cf *ConfigFile) execCommandsWithCombinedOutput(ctx context.Context, env []string, commands []Command, defaultTimeout time.Duration) []ConfigFileCombinedExecResult { env = append(env, "PATH="+os.Getenv("PATH")) result := []ConfigFileCombinedExecResult{} for _, c := range commands { stdOutAndErr := bytes.NewBuffer(nil) - err := cf.execCommand(ctx, env, stdOutAndErr, stdOutAndErr, c) + timeout := defaultTimeout + if c.Timeout != nil && c.Timeout.Duration >= time.Second { + timeout = c.Timeout.Duration + } + err := cf.execCommand(ctx, env, stdOutAndErr, stdOutAndErr, c.Command, timeout) r := ConfigFileCombinedExecResult{ Output: stdOutAndErr.Bytes(), Error: err, @@ -486,8 +488,8 @@ func (cf *ConfigFile) execCommandsWithCombinedOutput(ctx context.Context, env [] return result } -func (cf *ConfigFile) execCommand(ctx context.Context, env []string, stdOut, stdErr io.Writer, command string) error { - cmdCtx, cancel := context.WithTimeout(ctx, CommandTimeout) +func (cf *ConfigFile) execCommand(ctx context.Context, env []string, stdOut, stdErr io.Writer, command string, timeout time.Duration) error { + cmdCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() cmd := exec.CommandContext(ctx, "/bin/sh", "-c", command) cmd.Env = env diff --git a/pkg/manifests/configfile_test.go b/pkg/manifests/configfile_test.go index cf2b0e453..a170e94bc 100644 --- a/pkg/manifests/configfile_test.go +++ b/pkg/manifests/configfile_test.go @@ -3,6 +3,7 @@ package manifests import ( "context" "testing" + "time" "github.com/stretchr/testify/assert" @@ -47,6 +48,15 @@ commandUpdated: generators: [] patchFile: "foo.yaml" `, + + "generator timeout is a number": ` +version: 1 +patchUpdated: + generators: + - command: some command + timeout: 5 + patchFile: "foo.yaml" +`, } { t.Run(name, func(t *testing.T) { var cf ConfigFile @@ -97,9 +107,12 @@ func TestJustFileDirective(t *testing.T) { const patchUpdatedConfigFile = `--- version: 1 patchUpdated: - generators: - - command: foo - - command: bar + generators: + - command: sleep 1 + - command: sleep 1 + timeout: 2s + - command: sleep 2 + timeout: 1s patchFile: baz.yaml ` @@ -112,11 +125,23 @@ func TestParsePatchUpdatedConfigFile(t *testing.T) { assert.NotNil(t, cf.PatchUpdated) assert.Nil(t, cf.CommandUpdated) assert.Equal(t, 1, cf.Version) - assert.Equal(t, 2, len(cf.PatchUpdated.Generators)) - assert.Equal(t, "bar", cf.PatchUpdated.Generators[1].Command) + assert.Equal(t, 3, len(cf.PatchUpdated.Generators)) + assert.Equal(t, time.Second*2, cf.PatchUpdated.Generators[1].Timeout.Duration) + assert.Equal(t, "sleep 2", cf.PatchUpdated.Generators[2].Command) assert.Equal(t, "baz.yaml", cf.PatchUpdated.PatchFile) } +func TestExecGeneratorsTimeout(t *testing.T) { + var cf ConfigFile + if err := ParseConfigFile([]byte(patchUpdatedConfigFile), &cf); err != nil { + t.Fatal(err) + } + result := cf.execGenerators(context.Background(), cf.PatchUpdated.Generators, time.Second*2) + assert.Nil(t, result[0].Error) + assert.Nil(t, result[1].Error) + assert.NotNil(t, result[2].Error) +} + const echoCmdUpdatedConfigFile = `--- version: 1 commandUpdated: @@ -159,7 +184,7 @@ func TestExecGenerators(t *testing.T) { var cf ConfigFile err := ParseConfigFile([]byte(echoCmdUpdatedConfigFile), &cf) assert.NoError(t, err) - result := cf.execGenerators(context.Background(), cf.CommandUpdated.Generators) + result := cf.execGenerators(context.Background(), cf.CommandUpdated.Generators, time.Minute) assert.Equal(t, 2, len(result), "result: %s", result) assert.Equal(t, "g1\n", string(result[0].Stdout)) assert.Equal(t, "g2\n", string(result[1].Stdout)) @@ -170,7 +195,7 @@ func TestExecContainerImageUpdaters(t *testing.T) { err := ParseConfigFile([]byte(echoCmdUpdatedConfigFile), &cf) assert.NoError(t, err) resourceID := resource.MustParseID("default:deployment/foo") - result := cf.execContainerImageUpdaters(context.Background(), resourceID, "bar", "repo/image", "latest") + result := cf.execContainerImageUpdaters(context.Background(), resourceID, "bar", "repo/image", "latest", time.Minute) assert.Equal(t, 2, len(result), "result: %s", result) assert.Equal(t, "uci1 default:deployment/foo default deployment foo bar repo/image latest\n", @@ -188,7 +213,7 @@ func TestExecAnnotationUpdaters(t *testing.T) { // Test the update/addition of annotations annotationValue := "value" - result := cf.execPolicyUpdaters(context.Background(), resourceID, "key", annotationValue) + result := cf.execPolicyUpdaters(context.Background(), resourceID, "key", annotationValue, time.Minute) assert.Equal(t, 2, len(result), "result: %s", result) assert.Equal(t, "ua1 default:deployment/foo default deployment foo key value\n", @@ -198,7 +223,7 @@ func TestExecAnnotationUpdaters(t *testing.T) { string(result[1].Output)) // Test the deletion of annotations " - result = cf.execPolicyUpdaters(context.Background(), resourceID, "key", "") + result = cf.execPolicyUpdaters(context.Background(), resourceID, "key", "", time.Minute) assert.Equal(t, 2, len(result), "result: %s", result) assert.Equal(t, "ua1 default:deployment/foo default deployment foo key delete\n",