Skip to content
This repository has been archived by the owner on Apr 5, 2024. It is now read-only.

Commit

Permalink
Make convergenceElem.ttl atomic
Browse files Browse the repository at this point in the history
This way we can avoid locking the mutex as part of
convergenceElem.isActive(), which may prevent a deadlock
  • Loading branch information
CryptoCopter authored and oxzi committed Jun 3, 2021
1 parent fca8607 commit f4d1208
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 14 deletions.
2 changes: 1 addition & 1 deletion pkg/cla/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
// CLA administration themselves.
type Manager struct {
// queueTtl is the amount of retries for a CLA.
queueTtl int
queueTtl int32

// retryTime is the duration between two activation attempts.
retryTime time.Duration
Expand Down
24 changes: 11 additions & 13 deletions pkg/cla/manager_elem.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package cla

import (
"sync"
"sync/atomic"

log "github.com/sirupsen/logrus"
)
Expand All @@ -24,7 +25,7 @@ type convergenceElem struct {

// ttl is used both for determining the activity and for counting-off.
// A negative ttl implies an active convergenceElem.
ttl int
ttl int32

// stop{Syn,Ack} are used to supervise closing this convergenceElem, see deactivate()
stopSyn chan struct{}
Expand All @@ -33,7 +34,7 @@ type convergenceElem struct {

// newConvergenceElement creates a new convergenceElem for a Convergence with
// an initial ttl value.
func newConvergenceElement(conv Convergence, convChnl chan ConvergenceStatus, ttl int) *convergenceElem {
func newConvergenceElement(conv Convergence, convChnl chan ConvergenceStatus, ttl int32) *convergenceElem {
return &convergenceElem{
conv: conv,
convChnl: convChnl,
Expand All @@ -57,10 +58,7 @@ func (ce *convergenceElem) asSender() (c ConvergenceSender, ok bool) {

// isActive return if this convergenceElem is wraped around an active Convergence.
func (ce *convergenceElem) isActive() bool {
ce.mutex.Lock()
defer ce.mutex.Unlock()

return ce.ttl < 0
return atomic.LoadInt32(&ce.ttl) < 0
}

// handler supervises both stopping and ConvergenceStatus forwarding to the Manager.
Expand Down Expand Up @@ -100,7 +98,7 @@ func (ce *convergenceElem) activate() (successful, retry bool) {
ce.mutex.Lock()
defer ce.mutex.Unlock()

if ce.ttl == 0 && !ce.conv.IsPermanent() {
if atomic.LoadInt32(&ce.ttl) == 0 && !ce.conv.IsPermanent() {
log.WithFields(log.Fields{
"cla": ce.conv,
"error": "TTL expired",
Expand All @@ -115,7 +113,7 @@ func (ce *convergenceElem) activate() (successful, retry bool) {
"cla": ce.conv,
}).Info("Started CLA")

ce.ttl = -1
atomic.StoreInt32(&ce.ttl, -1)

ce.stopSyn = make(chan struct{})
ce.stopAck = make(chan struct{})
Expand All @@ -126,15 +124,15 @@ func (ce *convergenceElem) activate() (successful, retry bool) {
log.WithFields(log.Fields{
"cla": ce.conv,
"permanent": ce.conv.IsPermanent(),
"ttl": ce.ttl,
"ttl": atomic.LoadInt32(&ce.ttl),
"retry": claRetry,
"error": claErr,
}).Info("Failed to start CLA")

if claRetry {
ce.ttl -= 1
atomic.AddInt32(&ce.ttl, -1)
} else {
ce.ttl = 0
atomic.StoreInt32(&ce.ttl, 0)
}

return false, claRetry
Expand All @@ -143,7 +141,7 @@ func (ce *convergenceElem) activate() (successful, retry bool) {

// deactivate marks this convergenceElem as deactivated. Both a new ttl as well
// as whether Stop should be executed can be specified.
func (ce *convergenceElem) deactivate(ttl int) {
func (ce *convergenceElem) deactivate(ttl int32) {
if !ce.isActive() {
return
}
Expand All @@ -158,5 +156,5 @@ func (ce *convergenceElem) deactivate(ttl int) {
close(ce.stopSyn)
<-ce.stopAck

ce.ttl = ttl
atomic.StoreInt32(&ce.ttl, ttl)
}

0 comments on commit f4d1208

Please sign in to comment.