Skip to content

Commit

Permalink
PodStatusHooks - revised from ExecutorHooks
Browse files Browse the repository at this point in the history
  • Loading branch information
madmeignanam committed Sep 25, 2019
1 parent 4bf2784 commit 33d62ef
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 89 deletions.
7 changes: 4 additions & 3 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ launchtask:
httptimeout: 20000
plugins:
pluginorder: general
execHooks:
LaunchTask:
Post:
podStatusHooks:
task_failed: []
task_finished: []
task_running: []
cleanpod:
cleanfailtask: false
timeout: 20
Expand Down
2 changes: 1 addition & 1 deletion dce/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (exec *dockerComposeExecutor) LaunchTask(driver exec.ExecutorDriver, taskIn
"pool": pod.GetLabel("pool", taskInfo),
})

go pod.ListenOnTaskStatus("launchtask.post", driver, taskInfo)
go pod.ListenOnTaskStatus(driver, taskInfo)

task, err := json.Marshal(taskInfo)
if err != nil {
Expand Down
46 changes: 17 additions & 29 deletions plugin/extpoints.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,3 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// generated by go-extpoints -- DO NOT EDIT
package plugin

Expand Down Expand Up @@ -192,52 +178,54 @@ func (ep *composePluginExt) Names() []string {
}


// ExecutorHook
// PodStatusHook

var ExecutorHooks = &executorHookExt{
newExtensionPoint(new(ExecutorHook)),
var PodStatusHooks = &podStatusHookExt{
newExtensionPoint(new(PodStatusHook)),
}

type executorHookExt struct {
type podStatusHookExt struct {
*extensionPoint
}

func (ep *executorHookExt) Unregister(name string) bool {
func (ep *podStatusHookExt) Unregister(name string) bool {
return ep.unregister(name)
}

func (ep *executorHookExt) Register(extension ExecutorHook, name string) bool {
func (ep *podStatusHookExt) Register(extension PodStatusHook, name string) bool {
return ep.register(extension, name)
}

func (ep *executorHookExt) Lookup(name string) ExecutorHook {
func (ep *podStatusHookExt) Lookup(name string) PodStatusHook {
ext := ep.lookup(name)
if ext == nil {
return nil
}
return ext.(ExecutorHook)
return ext.(PodStatusHook)
}

func (ep *executorHookExt) Select(names []string) []ExecutorHook {
var selected []ExecutorHook
func (ep *podStatusHookExt) Select(names []string) []PodStatusHook {
var selected []PodStatusHook
for _, name := range names {
selected = append(selected, ep.Lookup(name))
}
return selected
}

func (ep *executorHookExt) All() map[string]ExecutorHook {
all := make(map[string]ExecutorHook)
func (ep *podStatusHookExt) All() map[string]PodStatusHook {
all := make(map[string]PodStatusHook)
for k, v := range ep.all() {
all[k] = v.(ExecutorHook)
all[k] = v.(PodStatusHook)
}
return all
}

func (ep *executorHookExt) Names() []string {
func (ep *podStatusHookExt) Names() []string {
var names []string
for k := range ep.all() {
names = append(names, k)
}
return names
}
}


15 changes: 7 additions & 8 deletions plugin/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,12 @@ type ComposePlugin interface {
Shutdown(executor.ExecutorDriver) error
}

// ExecutorHook allows custom implementations to be plugged post execution of Docker Compose Mesos Executor's
// lifecycle function. Currently this is supported only for LaunchTask(), but can be extended to other Executor lifecycle
// functions as needed
type ExecutorHook interface {
// PostExec is invoked post execution of Docker Compose Mesos Executor's lifecycle function
PostExec(taskInfo *mesos.TaskInfo) error
// BestEffort is invoked in case a PostExec returned an error and are expected to return a bool to indicate
// PodStatusHook allows custom implementations to be plugged when a Pod (mesos task) status changes. Currently this is
// designed to be executed on task status changes during LaunchTask.
type PodStatusHook interface {
// Execute is invoked when task status channel has a new status
Execute(podStatus string, data interface{}) error
// BestEffort is invoked in case a Execute returned an error and is expected to return a bool to indicate
// if the execution needs to continue with the next available hook or not
BestEffort(execPhase string) bool
BestEffort() bool
}
72 changes: 41 additions & 31 deletions utils/pod/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ var HealthCheckListId = make(map[string]bool)
var MonitorContainerList []string
var SinglePort bool
var LaunchCmdExecuted = false
var TaskStatusCh = make(chan string, 1)
var taskStatusCh = make(chan string, 1)

// Check exit code of all the containers in the pod.
// If all the exit codes are zero, then assign zero as pod's exit code,
Expand Down Expand Up @@ -166,7 +166,7 @@ func GetPodContainerIds(files []string) ([]string, error) {
}

if err := scanner.Err(); err != nil {
log.Errorln("reading standard input:", err)
log.Errorln(os.Stderr, "reading standard input:", err)
}
return containerIds, nil
}
Expand Down Expand Up @@ -221,7 +221,7 @@ func GetContainerIdByService(files []string, service string) (string, error) {
id += scanner.Text()
}
if err := scanner.Err(); err != nil {
logger.Errorln( "stderr: ", err)
logger.Errorln(os.Stderr, "stderr: ", err)
return "", err
}

Expand All @@ -247,7 +247,7 @@ func GetPodDetail(files []string, primaryContainerId string, healthcheck bool) {
log.Println(scanner.Text())
}
if err := scanner.Err(); err != nil {
log.Errorln( "reading standard input:", err)
log.Errorln(os.Stderr, "reading standard input:", err)
}

if primaryContainerId != "" {
Expand Down Expand Up @@ -801,10 +801,10 @@ func SetPodStatus(status types.PodStatus) {
log.Printf("Update Status : Update podStatus as %s", status)
}

// updatePodLaunched sets the CurPodStatus.podLaunched to true
// updatePodLaunched sets the CurPodStatus.Launched to true
func updatePodLaunched() {
CurPodStatus.Lock()
CurPodStatus.podLaunched = true
CurPodStatus.Launched = true
CurPodStatus.Unlock()
log.Printf("Updated Current Pod Status with Pod Launched ")
}
Expand Down Expand Up @@ -899,7 +899,7 @@ func SendMesosStatus(driver executor.ExecutorDriver, taskId *mesos.TaskID, state
time.Sleep(5 * time.Second) // FIXME: This wait time is unjustified, find reason and comment or remove it

// Push the state to Task status channel so any further steps on a given task status can be executed
TaskStatusCh <- state.Enum().String()
taskStatusCh <- state.Enum().String()

return nil
}
Expand Down Expand Up @@ -1222,65 +1222,75 @@ func IsService(taskInfo *mesos.TaskInfo) bool {
return assignTask.Task.IsService
}

// ListenOnTaskStatus
func ListenOnTaskStatus(execPhase string, driver executor.ExecutorDriver, taskInfo *mesos.TaskInfo) {
// ListenOnTaskStatus listens
func ListenOnTaskStatus(driver executor.ExecutorDriver, taskInfo *mesos.TaskInfo) {
logger := log.WithFields(log.Fields{
"func": "ListenOnTaskStatus",
})
defer close(taskStatusCh)
// Copy taskInfo locally, it may be garbage collected when hooks are executed after LaunchTask
cachedTaskInfo := &taskInfo
for {
select {
case status, ok := <-TaskStatusCh: // wait for task status
case status, ok := <-taskStatusCh: // wait for task status from LaunchTask
if ok {
switch status {
case mesos.TaskState_TASK_RUNNING.String():
ExecHooks(execPhase, taskInfo)
if err := execPodStatusHooks(status, *cachedTaskInfo); err != nil {
logger.Errorf("executing hooks failed %v ", err)
}
case mesos.TaskState_TASK_FAILED.String():
/*
Tasks are marked as Failed at
1. Initial launch failure
2. Monitor or any continuously polling plugin fails after the task has been running for a longtime
we want to execute hooks only on #1 above, so using CurPodStatus.podLaunched
Tasks are marked as Failed at
1. Initial launch failure
2. Health Monitor or any plugin monitors and fails after the task has been running for
a longtime
*/
if !CurPodStatus.podLaunched {
ExecHooks(execPhase, taskInfo)
if err := execPodStatusHooks(status, *cachedTaskInfo); err != nil {
logger.Errorf("executing hooks failed %v ", err)
}
stopDriver(driver)
break
case mesos.TaskState_TASK_FINISHED.String():
stopDriver(driver)
break
default:
log.Infof("Nothing to do on task status %s", status)
}
} else {
log.Error("Failure reading from task status channel")
log.Errorln("failure reading from task status channel")
}
}
}
}

// ExecHooks finds the hooks (implementations of ExecutorHook interface) configured for executor phase and executes them
// execPodStatusHooks finds the hooks (implementations of ExecutorHook interface) configured for executor phase and executes them
// error is returned if any of the hooks failed, and ExecutorHook.BestEffort() returns true
func ExecHooks(execPhase string, taskInfo *mesos.TaskInfo) error {
func execPodStatusHooks(status string, taskInfo *mesos.TaskInfo) error {
logger := log.WithFields(log.Fields{
"requuid": GetLabel("requuid", taskInfo),
"tenant": GetLabel("tenant", taskInfo),
"namespace": GetLabel("namespace", taskInfo),
"pool": GetLabel("pool", taskInfo),
})
var postHooks []string
if postHooks = config.GetConfig().GetStringSlice(fmt.Sprintf("execHooks.%s", execPhase)); len(postHooks) < 1 {
logger.Infof("No post ExecHook implementations found in config, skipping")
var podStatusHooks []string
if podStatusHooks = config.GetConfig().GetStringSlice(fmt.Sprintf("podStatusHooks.%s", status)); len(podStatusHooks) < 1 {
logger.Infof("No post podStatusHook implementations found in config, skipping")
return nil
}
logger.Infof("Executor Post Hooks found: %v", postHooks)
logger.Infof("Executor Post Hooks found: %v", podStatusHooks)
if _, err := utils.PluginPanicHandler(utils.ConditionFunc(func() (string, error) {
for _, name := range postHooks {
hook := plugin.ExecutorHooks.Lookup(name)
for _, name := range podStatusHooks {
hook := plugin.PodStatusHooks.Lookup(name)
if hook == nil {
logger.Errorf("Hook %s is nil, not initialized? still continuing with available hooks", name)
continue
}
if pherr := hook.PostExec(taskInfo); pherr != nil {
logger.Debugf(
"ExecutorHook %s failed with %v and is not best effort, so stopping further execution ",
if pherr := hook.Execute(status, taskInfo); pherr != nil {
logger.Errorf(
"PodStatusHook %s failed with %v and is not best effort, so stopping further execution ",
name, pherr)
if !hook.BestEffort(execPhase) {
if !hook.BestEffort() {
return "", errors.Wrapf(pherr, "executing hook %s failed", name)
}
} else {
Expand All @@ -1289,7 +1299,7 @@ func ExecHooks(execPhase string, taskInfo *mesos.TaskInfo) error {
}
return "", nil
})); err != nil {
logger.Errorf("Executing hooks at %s failed | err: %v", execPhase, err)
logger.Errorf("Executing hooks at pod status %s failed | err: %v", status, err)
return err
}
return nil
Expand Down
32 changes: 16 additions & 16 deletions utils/pod/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,54 +216,54 @@ func TestGetAndRemoveLabel(t *testing.T) {

func TestExecHooks(t *testing.T) {
//Register plugin with name
if ok := plugin.ExecutorHooks.Register(&happyHook{}, "happyHook"); !ok {
if ok := plugin.PodStatusHooks.Register(&happyHook{}, "happyHook"); !ok {
log.Fatalf("failed to register plugin %s", "happyHook")
}

if ok := plugin.ExecutorHooks.Register(&mandatoryHook{}, "mandatoryHook"); !ok {
if ok := plugin.PodStatusHooks.Register(&mandatoryHook{}, "mandatoryHook"); !ok {
log.Fatalf("failed to register plugin %s", "mandatoryHook")
}

if ok := plugin.ExecutorHooks.Register(&panicHook{}, "panicHook"); !ok {
if ok := plugin.PodStatusHooks.Register(&panicHook{}, "panicHook"); !ok {
log.Fatalf("failed to register plugin %s", "panicHook")
}

config.GetConfig().Set("exechooks.launchtask.post", []string{"happyHook"})
assert.NoError(t, ExecHooks("launchtask.post", nil), "happy hook can't fail")
config.GetConfig().Set("podstatushooks.TASK_RUNNING", []string{"happyHook"})
assert.NoError(t, execPodStatusHooks("TASK_RUNNING", nil), "happy hook can't fail")

config.GetConfig().Set("exechooks.launchtask.post", []string{"happyHook", "mandatoryHook"})
assert.Error(t, ExecHooks("launchtask.post", nil), "mandatory hook can't succeed")
config.GetConfig().Set("podstatushooks.TASK_FAILED", []string{"happyHook", "mandatoryHook"})
assert.Error(t, execPodStatusHooks("TASK_FAILED", nil), "mandatory hook can't succeed")

config.GetConfig().Set("exechooks.launchtask.post", []string{"panicHook"})
assert.Error(t, ExecHooks("launchtask.post", nil), "panicHook hook can't succeed")
config.GetConfig().Set("podstatushooks.TASK_FAILED", []string{"panicHook"})
assert.Error(t, execPodStatusHooks("TASK_FAILED", nil), "panicHook hook can't succeed")
}

// dummy executor hooks for unit test
type happyHook struct{}
type mandatoryHook struct{}
type panicHook struct{}

func (p *happyHook) PostExec(taskInfo *mesos.TaskInfo) error {
func (p *happyHook) Execute(podStatus string, data interface{}) error {
return nil
}

func (p *happyHook) BestEffort(execPhase string) bool {
func (p *happyHook) BestEffort() bool {
return false
}

func (p *mandatoryHook) PostExec(taskInfo *mesos.TaskInfo) error {
func (p *mandatoryHook) Execute(status string, data interface{}) error {
return errors.New("failure test case")
}

func (p *mandatoryHook) BestEffort(execPhase string) bool {
func (p *mandatoryHook) BestEffort() bool {
return false
}

func (p *panicHook) PostExec(taskInfo *mesos.TaskInfo) error {
func (p *panicHook) Execute(status string, data interface{}) error {
panic("unit test panic")
return errors.New("panic test case")
}

func (p *panicHook) BestEffort(execPhase string) bool {
func (p *panicHook) BestEffort() bool {
return false
}
}
2 changes: 1 addition & 1 deletion utils/pod/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ type Status struct {
sync.RWMutex
Status types.PodStatus
// if set to true, indicates that the pod was launched successfully and task moved to RUNNING state
podLaunched bool
Launched bool
}

0 comments on commit 33d62ef

Please sign in to comment.