Skip to content

Commit

Permalink
[core] avoid stuck updateTaskStatus due to mulitple mesos updates
Browse files Browse the repository at this point in the history
The issue described in OCTRL-953 occurs when we schedule kill of a task and while waiting for an acknowledgment, we receive two mesos updates:
```
[2024-11-11T12:14:02+01:00] TRACE scheduler:   task status update received detector=TRG message= srcHost=alio2-cr1-flp163 state=TASK_FAILED task=2qwA9EYEgnY
[2024-11-11T12:14:02+01:00] TRACE scheduler:   task status update received detector=TRG message=Reconciliation: Task is unknown to the agent srcHost=alio2-cr1-flp163 state=TASK_LOST task=2qwA9EYEgnY
```
Which then trigger the discussed ack.

Since it's inclear to me whether we can surely ignore TASK_LOST and trust that we will always receive either TASK_FAILED or TASK_FINISHED, I went for the approach of improving safeAcks to produce an error when subsequent acks are sent instead of blocking some goroutines.
  • Loading branch information
knopers8 authored and teo committed Nov 25, 2024
1 parent 1dfb714 commit 3f4a6f7
Show file tree
Hide file tree
Showing 5 changed files with 264 additions and 97 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ INSTALL_WHAT:=$(patsubst %, install_%, $(WHAT))

GENERATE_DIRS := ./apricot ./coconut/cmd ./common ./common/runtype ./common/system ./core ./core/integration/ccdb ./core/integration/dcs ./core/integration/ddsched ./core/integration/kafka ./core/integration/odc ./executor ./walnut ./core/integration/trg ./core/integration/bookkeeping
SRC_DIRS := ./apricot ./cmd/* ./core ./coconut ./executor ./common ./configuration ./occ/peanut ./walnut
TEST_DIRS := ./apricot/local ./common/gera ./configuration/cfgbackend ./configuration/componentcfg ./configuration/template ./core/task/sm ./core/workflow ./core/integration/odc/fairmq ./core/integration ./core/environment
TEST_DIRS := ./apricot/local ./common/gera ./common/utils/safeacks ./configuration/cfgbackend ./configuration/componentcfg ./configuration/template ./core/task/sm ./core/workflow ./core/integration/odc/fairmq ./core/integration ./core/environment
GO_TEST_DIRS := ./core/repos ./core/integration/dcs

coverage:COVERAGE_PREFIX := ./coverage_results
Expand Down
124 changes: 124 additions & 0 deletions common/utils/safeacks/safeacks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* === This file is part of ALICE O² ===
*
* Copyright 2020 CERN and copyright holders of ALICE O².
* Author: Miltiadis Alexis <[email protected]>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* In applying this license CERN does not waive the privileges and
* immunities granted to it by virtue of its status as an
* Intergovernmental Organization or submit itself to any jurisdiction.
*/

package safeacks

import (
"fmt"
"sync"
)

// SafeAcks is a thread safe structure which allows to handle acknowledgment exchanges
// with N senders and one receiver. The first sender succeeds, then an error is returned for the subsequent ones.
// This way, subsequent senders are not stuck sending an acknowledgment when nothing expects it anymore.
// The signaling design is inspired by point 2 in https://go101.org/article/channel-closing.html
// SafeAcks can be used to acknowledge that an action happened to the task such as task KILLED.
// At the moment we utilize SafeAcks to acknowledge that all the requested tasks were killed by mesos (task/manager.go).
type SafeAcks struct {
mu sync.RWMutex
acks map[string]ackChannels
}

type ackChannels struct {
// the channel to send the ack to
ack chan struct{}
// the channel to close when acks are no longer expected
stop chan struct{}
}

func (a *SafeAcks) deleteKey(key string) {
a.mu.Lock()
defer a.mu.Unlock()

delete(a.acks, key)
}

func (a *SafeAcks) ExpectsAck(key string) bool {
a.mu.RLock()
defer a.mu.RUnlock()

_, ok := a.acks[key]

return ok
}

func (a *SafeAcks) RegisterAck(key string) error {
a.mu.Lock()
defer a.mu.Unlock()

if _, hasKey := a.acks[key]; hasKey {
return fmt.Errorf("an acknowledgment was already registered for key '%s'", key)
}

a.acks[key] = ackChannels{make(chan struct{}), make(chan struct{})}
return nil
}

func (a *SafeAcks) getValue(key string) (ackChannels ackChannels, ok bool) {
a.mu.Lock()
defer a.mu.Unlock()

ackChannels, ok = a.acks[key]
return
}

// TrySendAck checks if an acknowledgment is expected and if it is, it blocks until it is received.
// If an acknowledgment is not expected at the moment of the call (or already was received), nil is returned.
// If more than one goroutine attempts to send an acknowledgment before it is received, all but one goroutines will
// receive an error.
func (a *SafeAcks) TrySendAck(key string) error {
channels, ok := a.getValue(key)
if !ok {
// fixme: perhaps we should return an error also here, but returning nil preserves the original behaviour
// of safeAcks before the refactoring. Perhaps the rest of the code assumes it's ok to blindly try sending
// an ack "just in case", so I would not change it lightly.
return nil
}

select {
case <-channels.stop:
return fmt.Errorf("an acknowledgment has been already received for key '%s'", key)
case channels.ack <- struct{}{}:
return nil
}
}

// TryReceiveAck blocks until an acknowledgment is received and then returns true.
// It will return false if an acknowledgment for a given key is not expected.
func (a *SafeAcks) TryReceiveAck(key string) bool {
channels, ok := a.getValue(key)
if !ok {
return false
}
<-channels.ack
close(channels.stop)
a.deleteKey(key)
return true
}

func NewAcks() *SafeAcks {
return &SafeAcks{
acks: make(map[string]ackChannels),
}
}
118 changes: 118 additions & 0 deletions common/utils/safeacks/safeacks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package safeacks

import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"sync"
"testing"
"time"
)

var _ = Describe("SafeAcks", func() {
var sa *SafeAcks

BeforeEach(func() {
sa = NewAcks()
})

Describe("RegisterAck", func() {
It("should register a new ack", func(ctx SpecContext) {
err := sa.RegisterAck("test")
Expect(err).NotTo(HaveOccurred())
Expect(sa.ExpectsAck("test")).To(BeTrue())
}, SpecTimeout(5*time.Second))

It("should return error when an ack is already registered", func(ctx SpecContext) {
err := sa.RegisterAck("test")
Expect(err).NotTo(HaveOccurred())
Expect(sa.ExpectsAck("test")).To(BeTrue())

err = sa.RegisterAck("test")
Expect(err).To(HaveOccurred())

Expect(sa.ExpectsAck("test")).To(BeTrue())
}, SpecTimeout(5*time.Second))
})
// TODO add timeout for this test
Describe("TrySendAck and TryReceiveAck", func() {
It("should return nil for non-existent key", func(ctx SpecContext) {
err := sa.TrySendAck("nonexistent")
Expect(err).To(BeNil())
}, SpecTimeout(5*time.Second))

It("should send ack successfully", func(ctx SpecContext) {
err := sa.RegisterAck("test")
Expect(err).NotTo(HaveOccurred())

var wg sync.WaitGroup
wg.Add(1)

go func() {
defer wg.Done()
err := sa.TrySendAck("test")
Expect(err).To(BeNil())
}()
Expect(sa.TryReceiveAck("test")).To(BeTrue())

wg.Wait()
}, SpecTimeout(5*time.Second))

It("should return error when ack was already sent once", func(ctx SpecContext) {
err := sa.RegisterAck("test")
Expect(err).NotTo(HaveOccurred())

result1 := make(chan error)
result2 := make(chan error)
go func() {
result1 <- sa.TrySendAck("test")
}()

go func() {
result2 <- sa.TrySendAck("test")
}()

// I really don't like relying on a sleep call to test this, but I see no other way...
// The goal is to have both `TrySendAck` blocked at channel send before invoking TryReceiveAck.
// Hopefully 1 second is enough to avoid having a shaky test.
time.Sleep(1000 * time.Millisecond)

ok := sa.TryReceiveAck("test")
Expect(ok).To(BeTrue())

oneErrorHaveOccured := (<-result1 != nil) != (<-result2 != nil)
Expect(oneErrorHaveOccured).To(BeTrue())
}, SpecTimeout(5*time.Second))
})

Describe("ExpectsAck", func() {
It("should return false for non-existent key", func(ctx SpecContext) {
Expect(sa.ExpectsAck("nonexistent")).To(BeFalse())
}, SpecTimeout(5*time.Second))

It("should return true for registered key", func(ctx SpecContext) {
err := sa.RegisterAck("test")
Expect(err).NotTo(HaveOccurred())
Expect(sa.ExpectsAck("test")).To(BeTrue())
}, SpecTimeout(5*time.Second))

It("should not be permanently blocked by another call", func(ctx SpecContext) {
err := sa.RegisterAck("test")
Expect(err).NotTo(HaveOccurred())
go func() {
sa.TryReceiveAck("test")
}()

// I really don't like relying on a sleep call to test this, but I see no other way...
// The goal is to have `TryReceiveAck` blocked at channel receive before invoking ExpectsAck.
// Hopefully 1 second is enough to avoid having a shaky test.
time.Sleep(1000 * time.Millisecond)

Expect(sa.ExpectsAck("test")).To(BeTrue())
}, SpecTimeout(5*time.Second))
})
})

func TestSafeAcks(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Component SafeAcks Test Suite")
}
33 changes: 21 additions & 12 deletions core/task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"context"
"errors"
"fmt"
"github.com/AliceO2Group/Control/common/utils/safeacks"
"os"
"strings"
"sync"
Expand Down Expand Up @@ -99,7 +100,7 @@ type Manager struct {

schedulerState *schedulerState
internalEventCh chan<- event.Event
ackKilledTasks *safeAcks
ackKilledTasks *safeacks.SafeAcks
killTasksMu sync.Mutex // to avoid races when attempting to kill the same tasks in different goroutines
}

Expand Down Expand Up @@ -141,7 +142,7 @@ func NewManager(shutdown func(), internalEventCh chan<- event.Event) (taskman *M
taskman.cq = taskman.schedulerState.commandqueue
taskman.tasksToDeploy = taskman.schedulerState.tasksToDeploy
taskman.reviveOffersTrg = taskman.schedulerState.reviveOffersTrg
taskman.ackKilledTasks = newAcks()
taskman.ackKilledTasks = safeacks.NewAcks()

schedState.setupCli()

Expand Down Expand Up @@ -1009,11 +1010,17 @@ func (m *Manager) updateTaskStatus(status *mesos.TaskStatus) {
WithField("partition", envId.String()).
Warn("attempted status update of task not in roster")
}
if ack, ok := m.ackKilledTasks.getValue(taskId); ok {
ack <- struct{}{}
// close(ack) // It can even be left open?
err := m.ackKilledTasks.TrySendAck(taskId)
if err != nil {
log.WithField("taskId", taskId).
WithField("mesosStatus", status.GetState().String()).
WithField("level", infologger.IL_Devel).
WithField("status", status.GetState().String()).
WithField("reason", status.GetReason().String()).
WithField("detector", detector).
WithField("partition", envId.String()).
Warnf("%s", err)
}

return
}

Expand Down Expand Up @@ -1064,7 +1071,7 @@ func (m *Manager) Cleanup() (killed Tasks, running Tasks, err error) {
// If the task list includes locked tasks, TaskNotFoundError is returned.
func (m *Manager) KillTasks(taskIds []string) (killed Tasks, running Tasks, err error) {
taskCanBeKilledFilter := func(t *Task) bool {
if t.IsLocked() || m.ackKilledTasks.contains(t.taskId) {
if t.IsLocked() || m.ackKilledTasks.ExpectsAck(t.taskId) {
return false
}
for _, id := range taskIds {
Expand All @@ -1090,17 +1097,19 @@ func (m *Manager) KillTasks(taskIds []string) (killed Tasks, running Tasks, err
}

for _, id := range toKill.GetTaskIds() {
m.ackKilledTasks.addAckChannel(id)
err := m.ackKilledTasks.RegisterAck(id)
if err != nil {
log.WithField("level", infologger.IL_Devel).Warnf("failed to register ack for task '%s': %s", id, err)
}
}

killed, running, err = m.doKillTasks(toKill)
m.killTasksMu.Unlock()

for _, id := range killed.GetTaskIds() {
ack, ok := m.ackKilledTasks.getValue(id)
if ok {
<-ack
m.ackKilledTasks.deleteKey(id)
ok := m.ackKilledTasks.TryReceiveAck(id)
if !ok {
log.WithField("level", infologger.IL_Devel).Warnf("ack for task '%s' was never registered or was already received", id)
}
}
return
Expand Down
Loading

0 comments on commit 3f4a6f7

Please sign in to comment.