Skip to content

Commit

Permalink
Capture OS signals while steps executed and perform cleanups as grace…
Browse files Browse the repository at this point in the history
…ful shutdown

Summary:
Here is my ~~first~~ new attempt to use channels to keep track of step results, errors or OS signals.

# Design overview
We could use channels to communicate results and errors produced from TTP steps executed.  Having this, we might also check additional channel to check if an OS signal was received.

This requires changes in the following parts:
0. Steps execution
0. Step results retrieving
0. Clean up execution for steps completed only
0. Sub-TTP processing
    0. Proper setup of execution context object per sub-ttp
    0. Postponing clean up execution till all steps of root  TTP are completed or an error encountered

Resolves #476

Reviewed By: d3sch41n

Differential Revision: D54117726

fbshipit-source-id: aed5abe03534679f0c74c542e0607607ac149a45
  • Loading branch information
inesusvet authored and facebook-github-bot committed Feb 29, 2024
1 parent ff176b6 commit 9d547bf
Show file tree
Hide file tree
Showing 15 changed files with 285 additions and 117 deletions.
10 changes: 9 additions & 1 deletion cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,15 @@ func buildRunCommand(cfg *Config) *cobra.Command {
return nil
}

if _, err := ttp.Execute(execCtx); err != nil {
runErr := ttp.Execute(*execCtx)
// Run clean up always
cleanupErr := ttp.RunCleanup(*execCtx)

if cleanupErr != nil {
logging.L().Warnf("Failed to run cleanup: %v", cleanupErr)
}

if runErr != nil {
return fmt.Errorf("failed to run TTP at %v: %v", ttpAbsPath, err)
}
return nil
Expand Down
3 changes: 1 addition & 2 deletions cmd/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ func checkRunCmdTestCase(t *testing.T, tc runCmdTestCase) {
}
require.NoError(t, err)
assert.Equal(t, tc.expectedStdout, stdoutBuf.String())

}

func TestRun(t *testing.T) {
Expand Down Expand Up @@ -95,7 +94,7 @@ func TestRun(t *testing.T) {
},
{
name: "subttp-cleanup",
description: "verify that execution of a subTTP with cleanup succeeds",
description: "when one of subTTP causes failures then cleanups executed in right order",
args: []string{
"-c",
testConfigFilePath,
Expand Down
4 changes: 2 additions & 2 deletions pkg/blocks/basicstep_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ outputs:
filters:
- json_path: foo.bar`
var s BasicStep
var execCtx TTPExecutionContext
execCtx := NewTTPExecutionContext()
err := yaml.Unmarshal([]byte(content), &s)
require.NoError(t, err)
err = s.Validate(TTPExecutionContext{})
err = s.Validate(execCtx)
require.NoError(t, err)

// execute and check result
Expand Down
19 changes: 16 additions & 3 deletions pkg/blocks/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,22 @@ type TTPExecutionConfig struct {

// TTPExecutionContext - holds config and context for the currently executing TTP
type TTPExecutionContext struct {
Cfg TTPExecutionConfig
WorkDir string
StepResults *StepResultsRecord
Cfg TTPExecutionConfig
WorkDir string
StepResults *StepResultsRecord
actionResultsChan chan *ActResult
errorsChan chan error
shutdownChan chan bool
}

// NewTTPExecutionContext creates a new TTPExecutionContext with empty config and created channels
func NewTTPExecutionContext() TTPExecutionContext {
return TTPExecutionContext{
StepResults: NewStepResultsRecord(),
actionResultsChan: make(chan *ActResult, 1),
errorsChan: make(chan error, 1),
shutdownChan: SetupSignalHandler(),
}
}

// ExpandVariables takes a string containing the following types of variables
Expand Down
15 changes: 10 additions & 5 deletions pkg/blocks/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,16 +149,21 @@ func LoadTTP(ttpFilePath string, fsys afero.Fs, execCfg *TTPExecutionConfig, arg
}
ttp.WorkDir = wd
}
execCtx := &TTPExecutionContext{
Cfg: *execCfg,
WorkDir: ttp.WorkDir,

execCtx := TTPExecutionContext{
Cfg: *execCfg,
WorkDir: ttp.WorkDir,
StepResults: NewStepResultsRecord(),
actionResultsChan: make(chan *ActResult, 1),
errorsChan: make(chan error, 1),
shutdownChan: SetupSignalHandler(),
}

err = ttp.Validate(*execCtx)
err = ttp.Validate(execCtx)
if err != nil {
return nil, nil, err
}
return ttp, execCtx, nil
return ttp, &execCtx, nil
}

func readTTPBytes(ttpFilePath string, system afero.Fs) ([]byte, error) {
Expand Down
5 changes: 5 additions & 0 deletions pkg/blocks/printstr.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ type PrintStrAction struct {
Message string `yaml:"print_str,omitempty"`
}

// NewPrintStrAction creates a new PrintStrAction.
func NewPrintStrAction() *PrintStrAction {
return &PrintStrAction{}
}

// IsNil checks if the step is nil or empty and returns a boolean value.
func (s *PrintStrAction) IsNil() bool {
switch {
Expand Down
4 changes: 2 additions & 2 deletions pkg/blocks/requirements_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,15 @@ steps:
var ttp TTP
err := yaml.Unmarshal([]byte(tc.content), &ttp)
require.NoError(t, err)
var ctx TTPExecutionContext
ctx := NewTTPExecutionContext()
err = ttp.Validate(ctx)
if tc.expectValidateError {
require.Error(t, err)
return
}
require.NoError(t, err)

_, err = ttp.Execute(&ctx)
err = ttp.Execute(ctx)
if tc.expectExecuteError {
assert.Error(t, err)
return
Expand Down
61 changes: 61 additions & 0 deletions pkg/blocks/signal_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
Copyright © 2023-present, Meta Platforms, Inc. and affiliates
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/

package blocks

import (
"os"
"os/signal"
"sync"
"syscall"

"github.com/facebookincubator/ttpforge/pkg/logging"
)

var signalHandlerInstalled bool
var signalHandlerLock = sync.Mutex{}
var shutdownChan chan bool

// SetupSignalHandler sets up SIGINT and SIGTERM handlers for graceful shutdown
func SetupSignalHandler() chan bool {
// setup signal handling only once
signalHandlerLock.Lock()
if signalHandlerInstalled {
signalHandlerLock.Unlock()
return shutdownChan
}
sigs := make(chan os.Signal, 2)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
shutdownChan = make(chan bool, 1)
signalHandlerInstalled = true
signalHandlerLock.Unlock()

go func() {
var sig os.Signal
var counter int
for {
sig = <-sigs
logging.L().Infof("[%v] Received signal %v, shutting down now", counter, sig)
shutdownChan <- true
counter++
}
}()

return shutdownChan
}
43 changes: 40 additions & 3 deletions pkg/blocks/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/facebookincubator/ttpforge/pkg/checks"
"github.com/facebookincubator/ttpforge/pkg/logging"
"github.com/spf13/afero"
"gopkg.in/yaml.v3"
)

Expand Down Expand Up @@ -151,13 +152,21 @@ func (s *Step) UnmarshalYAML(node *yaml.Node) error {
return nil
}

// Execute runs the action associated with this step
// Execute runs the action associated with this step and sends result/error to channels of the context
func (s *Step) Execute(execCtx TTPExecutionContext) (*ActResult, error) {
desc := s.action.GetDescription()
if desc != "" {
logging.L().Infof("Description: %v", desc)
}
return s.action.Execute(execCtx)
result, err := s.action.Execute(execCtx)
if err != nil {
logging.L().Errorf("Failed to execute step %v: %v", s.Name, err)
execCtx.errorsChan <- err
} else {
logging.L().Debugf("Successfully executed step %v", s.Name)
execCtx.actionResultsChan <- result
}
return result, err
}

// Cleanup runs the cleanup action associated with this step
Expand Down Expand Up @@ -190,7 +199,17 @@ func (s *Step) Validate(execCtx TTPExecutionContext) error {
// ParseAction decodes an action (from step or cleanup) in YAML
// format into the appropriate struct
func (s *Step) ParseAction(node *yaml.Node) (Action, error) {
actionCandidates := []Action{NewBasicStep(), NewFileStep(), NewSubTTPStep(), NewEditStep(), NewFetchURIStep(), NewCreateFileStep(), NewCopyPathStep(), NewRemovePathAction(), &PrintStrAction{}}
actionCandidates := []Action{
NewBasicStep(),
NewFileStep(),
NewSubTTPStep(),
NewEditStep(),
NewFetchURIStep(),
NewCreateFileStep(),
NewCopyPathStep(),
NewRemovePathAction(),
NewPrintStrAction(),
}
var action Action
for _, actionType := range actionCandidates {
err := node.Decode(actionType)
Expand All @@ -214,3 +233,21 @@ func (s *Step) ParseAction(node *yaml.Node) (Action, error) {
}
return action, nil
}

// VerifyChecks runs all checks and returns an error if any of them fail
func (s *Step) VerifyChecks() error {
if len(s.Checks) == 0 {
logging.L().Debugf("No checks defined for step %v", s.Name)
return nil
}
verificationCtx := checks.VerificationContext{
FileSystem: afero.NewOsFs(),
}
for checkIdx, check := range s.Checks {
if err := check.Verify(verificationCtx); err != nil {
return fmt.Errorf("Success check %d of step %q failed: %w", checkIdx+1, s.Name, err)
}
logging.L().Debugf("Success check %d (%q) of step %q PASSED", checkIdx+1, check.Msg, s.Name)
}
return nil
}
4 changes: 2 additions & 2 deletions pkg/blocks/step_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ cleanup: default`,
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var s Step
var execCtx TTPExecutionContext
execCtx := NewTTPExecutionContext()

// parse the step
err := yaml.Unmarshal([]byte(tc.content), &s)
Expand Down Expand Up @@ -191,7 +191,7 @@ cleanup:
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var s Step
var execCtx TTPExecutionContext
execCtx := NewTTPExecutionContext()

// hack to get a valid temporary path without creating it
tmpFile, err := os.CreateTemp("", "ttpforge-test-cleanup-default")
Expand Down
23 changes: 11 additions & 12 deletions pkg/blocks/subttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ type SubTTPStep struct {
TtpRef string `yaml:"ttp"`
Args map[string]string `yaml:"args"`

ttp *TTP
subExecCtx TTPExecutionContext
firstStepToCleanupIdx int
ttp *TTP
subExecCtx *TTPExecutionContext
}

// NewSubTTPStep creates a new SubTTPStep and returns a pointer to it.
Expand Down Expand Up @@ -78,19 +77,17 @@ func (s *SubTTPStep) processSubTTPArgs(execCtx TTPExecutionContext) ([]string, e
}

// Execute runs each step of the TTP file associated with the SubTTPStep
// and manages the outputs and cleanup steps.
func (s *SubTTPStep) Execute(execCtx TTPExecutionContext) (*ActResult, error) {
logging.L().Infof("[*] Executing Sub TTP: %s", s.TtpRef)
execResults, firstStepToCleanupIdx, runErr := s.ttp.RunSteps(&execCtx)
s.firstStepToCleanupIdx = firstStepToCleanupIdx
runErr := s.ttp.RunSteps(*s.subExecCtx)
if runErr != nil {
return nil, runErr
return &ActResult{}, runErr
}
logging.L().Info("[*] Completed SubTTP - No Errors :)")

// just a little annoying plumbing due to subtle type differences0
// just a little annoying plumbing due to subtle type differences
var actResults []*ActResult
for _, execResult := range execResults.ByIndex {
for _, execResult := range s.subExecCtx.StepResults.ByIndex {
actResults = append(actResults, &execResult.ActResult)
}
return aggregateResults(actResults), nil
Expand All @@ -100,7 +97,7 @@ func (s *SubTTPStep) Execute(execCtx TTPExecutionContext) (*ActResult, error) {
// and validates the contained steps.
func (s *SubTTPStep) loadSubTTP(execCtx TTPExecutionContext) error {
repo := execCtx.Cfg.Repo
subTTPAbsPath, err := execCtx.Cfg.Repo.FindTTP(s.TtpRef)
subTTPAbsPath, err := repo.FindTTP(s.TtpRef)
if err != nil {
return err
}
Expand All @@ -110,11 +107,13 @@ func (s *SubTTPStep) loadSubTTP(execCtx TTPExecutionContext) error {
return err
}

ttps, _, err := LoadTTP(subTTPAbsPath, repo.GetFs(), &s.subExecCtx.Cfg, subArgsKv)
ttps, ctx, err := LoadTTP(subTTPAbsPath, repo.GetFs(), &execCtx.Cfg, subArgsKv)
if err != nil {
return err
}
s.ttp = ttps
s.subExecCtx = ctx

return nil
}

Expand Down Expand Up @@ -144,5 +143,5 @@ func (s *SubTTPStep) Validate(execCtx TTPExecutionContext) error {
return err
}

return s.ttp.Validate(execCtx)
return s.ttp.Validate(*s.subExecCtx)
}
8 changes: 4 additions & 4 deletions pkg/blocks/subttp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,11 @@ ttp: with/cleanup.yaml`,
repo, err := tc.spec.Load(tc.fsys, "")
require.NoError(t, err)

execCtx := TTPExecutionContext{
Cfg: TTPExecutionConfig{
Repo: repo,
},
execCtx := NewTTPExecutionContext()
execCtx.Cfg = TTPExecutionConfig{
Repo: repo,
}

err = step.Validate(execCtx)
require.NoError(t, err, "step failed to validate")

Expand Down
2 changes: 1 addition & 1 deletion pkg/blocks/subttpcleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type subTTPCleanupAction struct {

// Execute will cleanup the subTTP starting from the last successful step
func (a *subTTPCleanupAction) Execute(execCtx TTPExecutionContext) (*ActResult, error) {
cleanupResults, err := a.step.ttp.startCleanupAtStepIdx(a.step.firstStepToCleanupIdx, &execCtx)
cleanupResults, err := a.step.ttp.startCleanupForCompletedSteps(*a.step.subExecCtx)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 9d547bf

Please sign in to comment.