From 33d62efb42aac3c617bea2d50f0b7ad5f4d1ab89 Mon Sep 17 00:00:00 2001 From: madmeignanam Date: Wed, 25 Sep 2019 00:55:36 -0700 Subject: [PATCH] PodStatusHooks - revised from ExecutorHooks --- config/config.yaml | 7 +++-- dce/main.go | 2 +- plugin/extpoints.go | 46 ++++++++++----------------- plugin/type.go | 15 +++++---- utils/pod/pod.go | 72 ++++++++++++++++++++++++------------------- utils/pod/pod_test.go | 32 +++++++++---------- utils/pod/types.go | 2 +- 7 files changed, 87 insertions(+), 89 deletions(-) diff --git a/config/config.yaml b/config/config.yaml index fe37c075..e2fc47ec 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -11,9 +11,10 @@ launchtask: httptimeout: 20000 plugins: pluginorder: general -execHooks: - LaunchTask: - Post: +podStatusHooks: + task_failed: [] + task_finished: [] + task_running: [] cleanpod: cleanfailtask: false timeout: 20 diff --git a/dce/main.go b/dce/main.go index 95e48225..46428dcf 100644 --- a/dce/main.go +++ b/dce/main.go @@ -81,7 +81,7 @@ func (exec *dockerComposeExecutor) LaunchTask(driver exec.ExecutorDriver, taskIn "pool": pod.GetLabel("pool", taskInfo), }) - go pod.ListenOnTaskStatus("launchtask.post", driver, taskInfo) + go pod.ListenOnTaskStatus(driver, taskInfo) task, err := json.Marshal(taskInfo) if err != nil { diff --git a/plugin/extpoints.go b/plugin/extpoints.go index bfd488e7..c0c9f1cd 100644 --- a/plugin/extpoints.go +++ b/plugin/extpoints.go @@ -1,17 +1,3 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - // generated by go-extpoints -- DO NOT EDIT package plugin @@ -192,52 +178,54 @@ func (ep *composePluginExt) Names() []string { } -// ExecutorHook +// PodStatusHook -var ExecutorHooks = &executorHookExt{ - newExtensionPoint(new(ExecutorHook)), +var PodStatusHooks = &podStatusHookExt{ + newExtensionPoint(new(PodStatusHook)), } -type executorHookExt struct { +type podStatusHookExt struct { *extensionPoint } -func (ep *executorHookExt) Unregister(name string) bool { +func (ep *podStatusHookExt) Unregister(name string) bool { return ep.unregister(name) } -func (ep *executorHookExt) Register(extension ExecutorHook, name string) bool { +func (ep *podStatusHookExt) Register(extension PodStatusHook, name string) bool { return ep.register(extension, name) } -func (ep *executorHookExt) Lookup(name string) ExecutorHook { +func (ep *podStatusHookExt) Lookup(name string) PodStatusHook { ext := ep.lookup(name) if ext == nil { return nil } - return ext.(ExecutorHook) + return ext.(PodStatusHook) } -func (ep *executorHookExt) Select(names []string) []ExecutorHook { - var selected []ExecutorHook +func (ep *podStatusHookExt) Select(names []string) []PodStatusHook { + var selected []PodStatusHook for _, name := range names { selected = append(selected, ep.Lookup(name)) } return selected } -func (ep *executorHookExt) All() map[string]ExecutorHook { - all := make(map[string]ExecutorHook) +func (ep *podStatusHookExt) All() map[string]PodStatusHook { + all := make(map[string]PodStatusHook) for k, v := range ep.all() { - all[k] = v.(ExecutorHook) + all[k] = v.(PodStatusHook) } return all } -func (ep *executorHookExt) Names() []string { +func (ep *podStatusHookExt) Names() []string { var names []string for k := range ep.all() { names = append(names, k) } return names -} \ No newline at end of file +} + + diff --git a/plugin/type.go b/plugin/type.go index d6100831..6d683db9 100644 --- a/plugin/type.go +++ b/plugin/type.go @@ -31,13 +31,12 @@ type ComposePlugin interface { Shutdown(executor.ExecutorDriver) error } -// ExecutorHook allows custom implementations to be plugged post execution of Docker Compose Mesos Executor's -// lifecycle function. Currently this is supported only for LaunchTask(), but can be extended to other Executor lifecycle -// functions as needed -type ExecutorHook interface { - // PostExec is invoked post execution of Docker Compose Mesos Executor's lifecycle function - PostExec(taskInfo *mesos.TaskInfo) error - // BestEffort is invoked in case a PostExec returned an error and are expected to return a bool to indicate +// PodStatusHook allows custom implementations to be plugged when a Pod (mesos task) status changes. Currently this is +// designed to be executed on task status changes during LaunchTask. +type PodStatusHook interface { + // Execute is invoked when task status channel has a new status + Execute(podStatus string, data interface{}) error + // BestEffort is invoked in case a Execute returned an error and is expected to return a bool to indicate // if the execution needs to continue with the next available hook or not - BestEffort(execPhase string) bool + BestEffort() bool } \ No newline at end of file diff --git a/utils/pod/pod.go b/utils/pod/pod.go index edc730c0..e06b1505 100644 --- a/utils/pod/pod.go +++ b/utils/pod/pod.go @@ -61,7 +61,7 @@ var HealthCheckListId = make(map[string]bool) var MonitorContainerList []string var SinglePort bool var LaunchCmdExecuted = false -var TaskStatusCh = make(chan string, 1) +var taskStatusCh = make(chan string, 1) // Check exit code of all the containers in the pod. // If all the exit codes are zero, then assign zero as pod's exit code, @@ -166,7 +166,7 @@ func GetPodContainerIds(files []string) ([]string, error) { } if err := scanner.Err(); err != nil { - log.Errorln("reading standard input:", err) + log.Errorln(os.Stderr, "reading standard input:", err) } return containerIds, nil } @@ -221,7 +221,7 @@ func GetContainerIdByService(files []string, service string) (string, error) { id += scanner.Text() } if err := scanner.Err(); err != nil { - logger.Errorln( "stderr: ", err) + logger.Errorln(os.Stderr, "stderr: ", err) return "", err } @@ -247,7 +247,7 @@ func GetPodDetail(files []string, primaryContainerId string, healthcheck bool) { log.Println(scanner.Text()) } if err := scanner.Err(); err != nil { - log.Errorln( "reading standard input:", err) + log.Errorln(os.Stderr, "reading standard input:", err) } if primaryContainerId != "" { @@ -801,10 +801,10 @@ func SetPodStatus(status types.PodStatus) { log.Printf("Update Status : Update podStatus as %s", status) } -// updatePodLaunched sets the CurPodStatus.podLaunched to true +// updatePodLaunched sets the CurPodStatus.Launched to true func updatePodLaunched() { CurPodStatus.Lock() - CurPodStatus.podLaunched = true + CurPodStatus.Launched = true CurPodStatus.Unlock() log.Printf("Updated Current Pod Status with Pod Launched ") } @@ -899,7 +899,7 @@ func SendMesosStatus(driver executor.ExecutorDriver, taskId *mesos.TaskID, state time.Sleep(5 * time.Second) // FIXME: This wait time is unjustified, find reason and comment or remove it // Push the state to Task status channel so any further steps on a given task status can be executed - TaskStatusCh <- state.Enum().String() + taskStatusCh <- state.Enum().String() return nil } @@ -1222,65 +1222,75 @@ func IsService(taskInfo *mesos.TaskInfo) bool { return assignTask.Task.IsService } -// ListenOnTaskStatus -func ListenOnTaskStatus(execPhase string, driver executor.ExecutorDriver, taskInfo *mesos.TaskInfo) { +// ListenOnTaskStatus listens +func ListenOnTaskStatus(driver executor.ExecutorDriver, taskInfo *mesos.TaskInfo) { + logger := log.WithFields(log.Fields{ + "func": "ListenOnTaskStatus", + }) + defer close(taskStatusCh) + // Copy taskInfo locally, it may be garbage collected when hooks are executed after LaunchTask + cachedTaskInfo := &taskInfo for { select { - case status, ok := <-TaskStatusCh: // wait for task status + case status, ok := <-taskStatusCh: // wait for task status from LaunchTask if ok { switch status { case mesos.TaskState_TASK_RUNNING.String(): - ExecHooks(execPhase, taskInfo) + if err := execPodStatusHooks(status, *cachedTaskInfo); err != nil { + logger.Errorf("executing hooks failed %v ", err) + } case mesos.TaskState_TASK_FAILED.String(): /* - Tasks are marked as Failed at - 1. Initial launch failure - 2. Monitor or any continuously polling plugin fails after the task has been running for a longtime - we want to execute hooks only on #1 above, so using CurPodStatus.podLaunched + Tasks are marked as Failed at + 1. Initial launch failure + 2. Health Monitor or any plugin monitors and fails after the task has been running for + a longtime */ - if !CurPodStatus.podLaunched { - ExecHooks(execPhase, taskInfo) + if err := execPodStatusHooks(status, *cachedTaskInfo); err != nil { + logger.Errorf("executing hooks failed %v ", err) } stopDriver(driver) + break case mesos.TaskState_TASK_FINISHED.String(): stopDriver(driver) + break default: log.Infof("Nothing to do on task status %s", status) } } else { - log.Error("Failure reading from task status channel") + log.Errorln("failure reading from task status channel") } } } } -// ExecHooks finds the hooks (implementations of ExecutorHook interface) configured for executor phase and executes them +// execPodStatusHooks finds the hooks (implementations of ExecutorHook interface) configured for executor phase and executes them // error is returned if any of the hooks failed, and ExecutorHook.BestEffort() returns true -func ExecHooks(execPhase string, taskInfo *mesos.TaskInfo) error { +func execPodStatusHooks(status string, taskInfo *mesos.TaskInfo) error { logger := log.WithFields(log.Fields{ "requuid": GetLabel("requuid", taskInfo), "tenant": GetLabel("tenant", taskInfo), "namespace": GetLabel("namespace", taskInfo), "pool": GetLabel("pool", taskInfo), }) - var postHooks []string - if postHooks = config.GetConfig().GetStringSlice(fmt.Sprintf("execHooks.%s", execPhase)); len(postHooks) < 1 { - logger.Infof("No post ExecHook implementations found in config, skipping") + var podStatusHooks []string + if podStatusHooks = config.GetConfig().GetStringSlice(fmt.Sprintf("podStatusHooks.%s", status)); len(podStatusHooks) < 1 { + logger.Infof("No post podStatusHook implementations found in config, skipping") return nil } - logger.Infof("Executor Post Hooks found: %v", postHooks) + logger.Infof("Executor Post Hooks found: %v", podStatusHooks) if _, err := utils.PluginPanicHandler(utils.ConditionFunc(func() (string, error) { - for _, name := range postHooks { - hook := plugin.ExecutorHooks.Lookup(name) + for _, name := range podStatusHooks { + hook := plugin.PodStatusHooks.Lookup(name) if hook == nil { logger.Errorf("Hook %s is nil, not initialized? still continuing with available hooks", name) continue } - if pherr := hook.PostExec(taskInfo); pherr != nil { - logger.Debugf( - "ExecutorHook %s failed with %v and is not best effort, so stopping further execution ", + if pherr := hook.Execute(status, taskInfo); pherr != nil { + logger.Errorf( + "PodStatusHook %s failed with %v and is not best effort, so stopping further execution ", name, pherr) - if !hook.BestEffort(execPhase) { + if !hook.BestEffort() { return "", errors.Wrapf(pherr, "executing hook %s failed", name) } } else { @@ -1289,7 +1299,7 @@ func ExecHooks(execPhase string, taskInfo *mesos.TaskInfo) error { } return "", nil })); err != nil { - logger.Errorf("Executing hooks at %s failed | err: %v", execPhase, err) + logger.Errorf("Executing hooks at pod status %s failed | err: %v", status, err) return err } return nil diff --git a/utils/pod/pod_test.go b/utils/pod/pod_test.go index cf660642..5ebcbe5e 100644 --- a/utils/pod/pod_test.go +++ b/utils/pod/pod_test.go @@ -216,26 +216,26 @@ func TestGetAndRemoveLabel(t *testing.T) { func TestExecHooks(t *testing.T) { //Register plugin with name - if ok := plugin.ExecutorHooks.Register(&happyHook{}, "happyHook"); !ok { + if ok := plugin.PodStatusHooks.Register(&happyHook{}, "happyHook"); !ok { log.Fatalf("failed to register plugin %s", "happyHook") } - if ok := plugin.ExecutorHooks.Register(&mandatoryHook{}, "mandatoryHook"); !ok { + if ok := plugin.PodStatusHooks.Register(&mandatoryHook{}, "mandatoryHook"); !ok { log.Fatalf("failed to register plugin %s", "mandatoryHook") } - if ok := plugin.ExecutorHooks.Register(&panicHook{}, "panicHook"); !ok { + if ok := plugin.PodStatusHooks.Register(&panicHook{}, "panicHook"); !ok { log.Fatalf("failed to register plugin %s", "panicHook") } - config.GetConfig().Set("exechooks.launchtask.post", []string{"happyHook"}) - assert.NoError(t, ExecHooks("launchtask.post", nil), "happy hook can't fail") + config.GetConfig().Set("podstatushooks.TASK_RUNNING", []string{"happyHook"}) + assert.NoError(t, execPodStatusHooks("TASK_RUNNING", nil), "happy hook can't fail") - config.GetConfig().Set("exechooks.launchtask.post", []string{"happyHook", "mandatoryHook"}) - assert.Error(t, ExecHooks("launchtask.post", nil), "mandatory hook can't succeed") + config.GetConfig().Set("podstatushooks.TASK_FAILED", []string{"happyHook", "mandatoryHook"}) + assert.Error(t, execPodStatusHooks("TASK_FAILED", nil), "mandatory hook can't succeed") - config.GetConfig().Set("exechooks.launchtask.post", []string{"panicHook"}) - assert.Error(t, ExecHooks("launchtask.post", nil), "panicHook hook can't succeed") + config.GetConfig().Set("podstatushooks.TASK_FAILED", []string{"panicHook"}) + assert.Error(t, execPodStatusHooks("TASK_FAILED", nil), "panicHook hook can't succeed") } // dummy executor hooks for unit test @@ -243,27 +243,27 @@ type happyHook struct{} type mandatoryHook struct{} type panicHook struct{} -func (p *happyHook) PostExec(taskInfo *mesos.TaskInfo) error { +func (p *happyHook) Execute(podStatus string, data interface{}) error { return nil } -func (p *happyHook) BestEffort(execPhase string) bool { +func (p *happyHook) BestEffort() bool { return false } -func (p *mandatoryHook) PostExec(taskInfo *mesos.TaskInfo) error { +func (p *mandatoryHook) Execute(status string, data interface{}) error { return errors.New("failure test case") } -func (p *mandatoryHook) BestEffort(execPhase string) bool { +func (p *mandatoryHook) BestEffort() bool { return false } -func (p *panicHook) PostExec(taskInfo *mesos.TaskInfo) error { +func (p *panicHook) Execute(status string, data interface{}) error { panic("unit test panic") return errors.New("panic test case") } -func (p *panicHook) BestEffort(execPhase string) bool { +func (p *panicHook) BestEffort() bool { return false -} +} \ No newline at end of file diff --git a/utils/pod/types.go b/utils/pod/types.go index e8fe32b9..0a69d54a 100644 --- a/utils/pod/types.go +++ b/utils/pod/types.go @@ -10,5 +10,5 @@ type Status struct { sync.RWMutex Status types.PodStatus // if set to true, indicates that the pod was launched successfully and task moved to RUNNING state - podLaunched bool + Launched bool }