Skip to content
This repository has been archived by the owner on Apr 5, 2024. It is now read-only.

Make convergenceElem.ttl atomic #43

Merged
merged 1 commit into from
Jun 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/cla/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
// CLA administration themselves.
type Manager struct {
// queueTtl is the amount of retries for a CLA.
queueTtl int
queueTtl int32

// retryTime is the duration between two activation attempts.
retryTime time.Duration
Expand Down
24 changes: 11 additions & 13 deletions pkg/cla/manager_elem.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package cla

import (
"sync"
"sync/atomic"

log "github.com/sirupsen/logrus"
)
Expand All @@ -24,7 +25,7 @@ type convergenceElem struct {

// ttl is used both for determining the activity and for counting-off.
// A negative ttl implies an active convergenceElem.
ttl int
ttl int32

// stop{Syn,Ack} are used to supervise closing this convergenceElem, see deactivate()
stopSyn chan struct{}
Expand All @@ -33,7 +34,7 @@ type convergenceElem struct {

// newConvergenceElement creates a new convergenceElem for a Convergence with
// an initial ttl value.
func newConvergenceElement(conv Convergence, convChnl chan ConvergenceStatus, ttl int) *convergenceElem {
func newConvergenceElement(conv Convergence, convChnl chan ConvergenceStatus, ttl int32) *convergenceElem {
return &convergenceElem{
conv: conv,
convChnl: convChnl,
Expand All @@ -57,10 +58,7 @@ func (ce *convergenceElem) asSender() (c ConvergenceSender, ok bool) {

// isActive return if this convergenceElem is wraped around an active Convergence.
func (ce *convergenceElem) isActive() bool {
ce.mutex.Lock()
defer ce.mutex.Unlock()

return ce.ttl < 0
return atomic.LoadInt32(&ce.ttl) < 0
}

// handler supervises both stopping and ConvergenceStatus forwarding to the Manager.
Expand Down Expand Up @@ -100,7 +98,7 @@ func (ce *convergenceElem) activate() (successful, retry bool) {
ce.mutex.Lock()
defer ce.mutex.Unlock()

if ce.ttl == 0 && !ce.conv.IsPermanent() {
if atomic.LoadInt32(&ce.ttl) == 0 && !ce.conv.IsPermanent() {
log.WithFields(log.Fields{
"cla": ce.conv,
"error": "TTL expired",
Expand All @@ -115,7 +113,7 @@ func (ce *convergenceElem) activate() (successful, retry bool) {
"cla": ce.conv,
}).Info("Started CLA")

ce.ttl = -1
atomic.StoreInt32(&ce.ttl, -1)

ce.stopSyn = make(chan struct{})
ce.stopAck = make(chan struct{})
Expand All @@ -126,15 +124,15 @@ func (ce *convergenceElem) activate() (successful, retry bool) {
log.WithFields(log.Fields{
"cla": ce.conv,
"permanent": ce.conv.IsPermanent(),
"ttl": ce.ttl,
"ttl": atomic.LoadInt32(&ce.ttl),
"retry": claRetry,
"error": claErr,
}).Info("Failed to start CLA")

if claRetry {
ce.ttl -= 1
atomic.AddInt32(&ce.ttl, -1)
} else {
ce.ttl = 0
atomic.StoreInt32(&ce.ttl, 0)
}

return false, claRetry
Expand All @@ -143,7 +141,7 @@ func (ce *convergenceElem) activate() (successful, retry bool) {

// deactivate marks this convergenceElem as deactivated. Both a new ttl as well
// as whether Stop should be executed can be specified.
func (ce *convergenceElem) deactivate(ttl int) {
func (ce *convergenceElem) deactivate(ttl int32) {
if !ce.isActive() {
return
}
Expand All @@ -158,5 +156,5 @@ func (ce *convergenceElem) deactivate(ttl int) {
close(ce.stopSyn)
<-ce.stopAck

ce.ttl = ttl
atomic.StoreInt32(&ce.ttl, ttl)
}