From 3f4a6f7c5f303be0d1b443c56c99b6eca9767a08 Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Wed, 20 Nov 2024 11:44:54 +0100 Subject: [PATCH] [core] avoid stuck updateTaskStatus due to mulitple mesos updates The issue described in OCTRL-953 occurs when we schedule kill of a task and while waiting for an acknowledgment, we receive two mesos updates: ``` [2024-11-11T12:14:02+01:00] TRACE scheduler: task status update received detector=TRG message= srcHost=alio2-cr1-flp163 state=TASK_FAILED task=2qwA9EYEgnY [2024-11-11T12:14:02+01:00] TRACE scheduler: task status update received detector=TRG message=Reconciliation: Task is unknown to the agent srcHost=alio2-cr1-flp163 state=TASK_LOST task=2qwA9EYEgnY ``` Which then trigger the discussed ack. Since it's inclear to me whether we can surely ignore TASK_LOST and trust that we will always receive either TASK_FAILED or TASK_FINISHED, I went for the approach of improving safeAcks to produce an error when subsequent acks are sent instead of blocking some goroutines. --- Makefile | 2 +- common/utils/safeacks/safeacks.go | 124 +++++++++++++++++++++++++ common/utils/safeacks/safeacks_test.go | 118 +++++++++++++++++++++++ core/task/manager.go | 33 ++++--- core/task/safeacks.go | 84 ----------------- 5 files changed, 264 insertions(+), 97 deletions(-) create mode 100644 common/utils/safeacks/safeacks.go create mode 100644 common/utils/safeacks/safeacks_test.go delete mode 100644 core/task/safeacks.go diff --git a/Makefile b/Makefile index d3c46427..e349e826 100644 --- a/Makefile +++ b/Makefile @@ -70,7 +70,7 @@ INSTALL_WHAT:=$(patsubst %, install_%, $(WHAT)) GENERATE_DIRS := ./apricot ./coconut/cmd ./common ./common/runtype ./common/system ./core ./core/integration/ccdb ./core/integration/dcs ./core/integration/ddsched ./core/integration/kafka ./core/integration/odc ./executor ./walnut ./core/integration/trg ./core/integration/bookkeeping SRC_DIRS := ./apricot ./cmd/* ./core ./coconut ./executor ./common ./configuration ./occ/peanut ./walnut -TEST_DIRS := ./apricot/local ./common/gera ./configuration/cfgbackend ./configuration/componentcfg ./configuration/template ./core/task/sm ./core/workflow ./core/integration/odc/fairmq ./core/integration ./core/environment +TEST_DIRS := ./apricot/local ./common/gera ./common/utils/safeacks ./configuration/cfgbackend ./configuration/componentcfg ./configuration/template ./core/task/sm ./core/workflow ./core/integration/odc/fairmq ./core/integration ./core/environment GO_TEST_DIRS := ./core/repos ./core/integration/dcs coverage:COVERAGE_PREFIX := ./coverage_results diff --git a/common/utils/safeacks/safeacks.go b/common/utils/safeacks/safeacks.go new file mode 100644 index 00000000..bace3f23 --- /dev/null +++ b/common/utils/safeacks/safeacks.go @@ -0,0 +1,124 @@ +/* + * === This file is part of ALICE O² === + * + * Copyright 2020 CERN and copyright holders of ALICE O². + * Author: Miltiadis Alexis + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * In applying this license CERN does not waive the privileges and + * immunities granted to it by virtue of its status as an + * Intergovernmental Organization or submit itself to any jurisdiction. + */ + +package safeacks + +import ( + "fmt" + "sync" +) + +// SafeAcks is a thread safe structure which allows to handle acknowledgment exchanges +// with N senders and one receiver. The first sender succeeds, then an error is returned for the subsequent ones. +// This way, subsequent senders are not stuck sending an acknowledgment when nothing expects it anymore. +// The signaling design is inspired by point 2 in https://go101.org/article/channel-closing.html +// SafeAcks can be used to acknowledge that an action happened to the task such as task KILLED. +// At the moment we utilize SafeAcks to acknowledge that all the requested tasks were killed by mesos (task/manager.go). +type SafeAcks struct { + mu sync.RWMutex + acks map[string]ackChannels +} + +type ackChannels struct { + // the channel to send the ack to + ack chan struct{} + // the channel to close when acks are no longer expected + stop chan struct{} +} + +func (a *SafeAcks) deleteKey(key string) { + a.mu.Lock() + defer a.mu.Unlock() + + delete(a.acks, key) +} + +func (a *SafeAcks) ExpectsAck(key string) bool { + a.mu.RLock() + defer a.mu.RUnlock() + + _, ok := a.acks[key] + + return ok +} + +func (a *SafeAcks) RegisterAck(key string) error { + a.mu.Lock() + defer a.mu.Unlock() + + if _, hasKey := a.acks[key]; hasKey { + return fmt.Errorf("an acknowledgment was already registered for key '%s'", key) + } + + a.acks[key] = ackChannels{make(chan struct{}), make(chan struct{})} + return nil +} + +func (a *SafeAcks) getValue(key string) (ackChannels ackChannels, ok bool) { + a.mu.Lock() + defer a.mu.Unlock() + + ackChannels, ok = a.acks[key] + return +} + +// TrySendAck checks if an acknowledgment is expected and if it is, it blocks until it is received. +// If an acknowledgment is not expected at the moment of the call (or already was received), nil is returned. +// If more than one goroutine attempts to send an acknowledgment before it is received, all but one goroutines will +// receive an error. +func (a *SafeAcks) TrySendAck(key string) error { + channels, ok := a.getValue(key) + if !ok { + // fixme: perhaps we should return an error also here, but returning nil preserves the original behaviour + // of safeAcks before the refactoring. Perhaps the rest of the code assumes it's ok to blindly try sending + // an ack "just in case", so I would not change it lightly. + return nil + } + + select { + case <-channels.stop: + return fmt.Errorf("an acknowledgment has been already received for key '%s'", key) + case channels.ack <- struct{}{}: + return nil + } +} + +// TryReceiveAck blocks until an acknowledgment is received and then returns true. +// It will return false if an acknowledgment for a given key is not expected. +func (a *SafeAcks) TryReceiveAck(key string) bool { + channels, ok := a.getValue(key) + if !ok { + return false + } + <-channels.ack + close(channels.stop) + a.deleteKey(key) + return true +} + +func NewAcks() *SafeAcks { + return &SafeAcks{ + acks: make(map[string]ackChannels), + } +} diff --git a/common/utils/safeacks/safeacks_test.go b/common/utils/safeacks/safeacks_test.go new file mode 100644 index 00000000..6a5103c2 --- /dev/null +++ b/common/utils/safeacks/safeacks_test.go @@ -0,0 +1,118 @@ +package safeacks + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "sync" + "testing" + "time" +) + +var _ = Describe("SafeAcks", func() { + var sa *SafeAcks + + BeforeEach(func() { + sa = NewAcks() + }) + + Describe("RegisterAck", func() { + It("should register a new ack", func(ctx SpecContext) { + err := sa.RegisterAck("test") + Expect(err).NotTo(HaveOccurred()) + Expect(sa.ExpectsAck("test")).To(BeTrue()) + }, SpecTimeout(5*time.Second)) + + It("should return error when an ack is already registered", func(ctx SpecContext) { + err := sa.RegisterAck("test") + Expect(err).NotTo(HaveOccurred()) + Expect(sa.ExpectsAck("test")).To(BeTrue()) + + err = sa.RegisterAck("test") + Expect(err).To(HaveOccurred()) + + Expect(sa.ExpectsAck("test")).To(BeTrue()) + }, SpecTimeout(5*time.Second)) + }) + // TODO add timeout for this test + Describe("TrySendAck and TryReceiveAck", func() { + It("should return nil for non-existent key", func(ctx SpecContext) { + err := sa.TrySendAck("nonexistent") + Expect(err).To(BeNil()) + }, SpecTimeout(5*time.Second)) + + It("should send ack successfully", func(ctx SpecContext) { + err := sa.RegisterAck("test") + Expect(err).NotTo(HaveOccurred()) + + var wg sync.WaitGroup + wg.Add(1) + + go func() { + defer wg.Done() + err := sa.TrySendAck("test") + Expect(err).To(BeNil()) + }() + Expect(sa.TryReceiveAck("test")).To(BeTrue()) + + wg.Wait() + }, SpecTimeout(5*time.Second)) + + It("should return error when ack was already sent once", func(ctx SpecContext) { + err := sa.RegisterAck("test") + Expect(err).NotTo(HaveOccurred()) + + result1 := make(chan error) + result2 := make(chan error) + go func() { + result1 <- sa.TrySendAck("test") + }() + + go func() { + result2 <- sa.TrySendAck("test") + }() + + // I really don't like relying on a sleep call to test this, but I see no other way... + // The goal is to have both `TrySendAck` blocked at channel send before invoking TryReceiveAck. + // Hopefully 1 second is enough to avoid having a shaky test. + time.Sleep(1000 * time.Millisecond) + + ok := sa.TryReceiveAck("test") + Expect(ok).To(BeTrue()) + + oneErrorHaveOccured := (<-result1 != nil) != (<-result2 != nil) + Expect(oneErrorHaveOccured).To(BeTrue()) + }, SpecTimeout(5*time.Second)) + }) + + Describe("ExpectsAck", func() { + It("should return false for non-existent key", func(ctx SpecContext) { + Expect(sa.ExpectsAck("nonexistent")).To(BeFalse()) + }, SpecTimeout(5*time.Second)) + + It("should return true for registered key", func(ctx SpecContext) { + err := sa.RegisterAck("test") + Expect(err).NotTo(HaveOccurred()) + Expect(sa.ExpectsAck("test")).To(BeTrue()) + }, SpecTimeout(5*time.Second)) + + It("should not be permanently blocked by another call", func(ctx SpecContext) { + err := sa.RegisterAck("test") + Expect(err).NotTo(HaveOccurred()) + go func() { + sa.TryReceiveAck("test") + }() + + // I really don't like relying on a sleep call to test this, but I see no other way... + // The goal is to have `TryReceiveAck` blocked at channel receive before invoking ExpectsAck. + // Hopefully 1 second is enough to avoid having a shaky test. + time.Sleep(1000 * time.Millisecond) + + Expect(sa.ExpectsAck("test")).To(BeTrue()) + }, SpecTimeout(5*time.Second)) + }) +}) + +func TestSafeAcks(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Component SafeAcks Test Suite") +} diff --git a/core/task/manager.go b/core/task/manager.go index 9193ecbb..640fe648 100644 --- a/core/task/manager.go +++ b/core/task/manager.go @@ -28,6 +28,7 @@ import ( "context" "errors" "fmt" + "github.com/AliceO2Group/Control/common/utils/safeacks" "os" "strings" "sync" @@ -99,7 +100,7 @@ type Manager struct { schedulerState *schedulerState internalEventCh chan<- event.Event - ackKilledTasks *safeAcks + ackKilledTasks *safeacks.SafeAcks killTasksMu sync.Mutex // to avoid races when attempting to kill the same tasks in different goroutines } @@ -141,7 +142,7 @@ func NewManager(shutdown func(), internalEventCh chan<- event.Event) (taskman *M taskman.cq = taskman.schedulerState.commandqueue taskman.tasksToDeploy = taskman.schedulerState.tasksToDeploy taskman.reviveOffersTrg = taskman.schedulerState.reviveOffersTrg - taskman.ackKilledTasks = newAcks() + taskman.ackKilledTasks = safeacks.NewAcks() schedState.setupCli() @@ -1009,11 +1010,17 @@ func (m *Manager) updateTaskStatus(status *mesos.TaskStatus) { WithField("partition", envId.String()). Warn("attempted status update of task not in roster") } - if ack, ok := m.ackKilledTasks.getValue(taskId); ok { - ack <- struct{}{} - // close(ack) // It can even be left open? + err := m.ackKilledTasks.TrySendAck(taskId) + if err != nil { + log.WithField("taskId", taskId). + WithField("mesosStatus", status.GetState().String()). + WithField("level", infologger.IL_Devel). + WithField("status", status.GetState().String()). + WithField("reason", status.GetReason().String()). + WithField("detector", detector). + WithField("partition", envId.String()). + Warnf("%s", err) } - return } @@ -1064,7 +1071,7 @@ func (m *Manager) Cleanup() (killed Tasks, running Tasks, err error) { // If the task list includes locked tasks, TaskNotFoundError is returned. func (m *Manager) KillTasks(taskIds []string) (killed Tasks, running Tasks, err error) { taskCanBeKilledFilter := func(t *Task) bool { - if t.IsLocked() || m.ackKilledTasks.contains(t.taskId) { + if t.IsLocked() || m.ackKilledTasks.ExpectsAck(t.taskId) { return false } for _, id := range taskIds { @@ -1090,17 +1097,19 @@ func (m *Manager) KillTasks(taskIds []string) (killed Tasks, running Tasks, err } for _, id := range toKill.GetTaskIds() { - m.ackKilledTasks.addAckChannel(id) + err := m.ackKilledTasks.RegisterAck(id) + if err != nil { + log.WithField("level", infologger.IL_Devel).Warnf("failed to register ack for task '%s': %s", id, err) + } } killed, running, err = m.doKillTasks(toKill) m.killTasksMu.Unlock() for _, id := range killed.GetTaskIds() { - ack, ok := m.ackKilledTasks.getValue(id) - if ok { - <-ack - m.ackKilledTasks.deleteKey(id) + ok := m.ackKilledTasks.TryReceiveAck(id) + if !ok { + log.WithField("level", infologger.IL_Devel).Warnf("ack for task '%s' was never registered or was already received", id) } } return diff --git a/core/task/safeacks.go b/core/task/safeacks.go deleted file mode 100644 index 5ae6c6c1..00000000 --- a/core/task/safeacks.go +++ /dev/null @@ -1,84 +0,0 @@ -/* - * === This file is part of ALICE O² === - * - * Copyright 2020 CERN and copyright holders of ALICE O². - * Author: Miltiadis Alexis - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - * - * In applying this license CERN does not waive the privileges and - * immunities granted to it by virtue of its status as an - * Intergovernmental Organization or submit itself to any jurisdiction. - */ - -package task - -import ( - "sync" -) - -// safeAcks is a thread safe map where key is a string usually a taskID -// and the value is a channel of empty struct. It is being used -// when we want to acknowledge that an action happened to the task -// such as task KILLED. At the moment we utilize -// safeAcks to acknowledge that all the requested tasks -// where killed by mesos (task/manager.go). -type safeAcks struct { - mu sync.RWMutex - acks map[string]chan struct{} -} - -func (a *safeAcks) getMap() map[string]chan struct{} { - a.mu.RLock() - defer a.mu.RUnlock() - - return a.acks -} - -func (a *safeAcks) deleteKey(key string) { - a.mu.Lock() - defer a.mu.Unlock() - - delete(a.acks, key) -} - -func (a *safeAcks) contains(key string) bool { - a.mu.RLock() - defer a.mu.RUnlock() - - _, ok := a.acks[key] - - return ok -} - -func (a *safeAcks) addAckChannel(key string) { - a.mu.Lock() - defer a.mu.Unlock() - - a.acks[key] = make(chan struct{}) -} - -func (a *safeAcks) getValue(key string) (ch chan struct{}, ok bool) { - a.mu.Lock() - defer a.mu.Unlock() - - ch, ok = a.acks[key] - return -} - -func newAcks() *safeAcks { - return &safeAcks{ - acks: make(map[string]chan struct{}), - } -}