Skip to content

Commit

Permalink
Move mutex to private member for sampling. (#123)
Browse files Browse the repository at this point in the history
* Move mutex to private member for sampling.

Moving mutex to a private member since it does not make
sense to allow other actors to lock/unlock a sampling
  • Loading branch information
Jack Lindamood authored and luluzhao committed Jun 13, 2019
1 parent 3565e35 commit eff3994
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 46 deletions.
34 changes: 17 additions & 17 deletions strategy/sampling/centralized.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type CentralizedStrategy struct {
// represents daemon endpoints
daemonEndpoints *daemoncfg.DaemonEndpoints

sync.RWMutex
mu sync.RWMutex
}

// svcProxy is the interface for API calls to X-Ray service.
Expand Down Expand Up @@ -151,15 +151,15 @@ func (ss *CentralizedStrategy) ShouldTrace(request *Request) *Decision {
return ss.fallback.ShouldTrace(request)
}

ss.manifest.RLock()
defer ss.manifest.RUnlock()
ss.manifest.mu.RLock()
defer ss.manifest.mu.RUnlock()

// Match against known rules
for _, r := range ss.manifest.Rules {

r.RLock()
r.mu.RLock()
applicable := r.AppliesTo(request)
r.RUnlock()
r.mu.RUnlock()

if !applicable {
continue
Expand All @@ -185,7 +185,7 @@ func (ss *CentralizedStrategy) ShouldTrace(request *Request) *Decision {

// start initiates rule and target pollers.
func (ss *CentralizedStrategy) start() {
ss.Lock()
ss.mu.Lock()

if !ss.pollerStart {
var er error
Expand All @@ -199,7 +199,7 @@ func (ss *CentralizedStrategy) start() {

ss.pollerStart = true

ss.Unlock()
ss.mu.Unlock()
}

// startRulePoller starts rule poller.
Expand Down Expand Up @@ -339,9 +339,9 @@ func (ss *CentralizedStrategy) refreshManifest() (err error) {
ss.manifest.sort()

// Update refreshedAt timestamp
ss.manifest.Lock()
ss.manifest.mu.Lock()
ss.manifest.refreshedAt = now
ss.manifest.Unlock()
ss.manifest.mu.Unlock()

return
}
Expand Down Expand Up @@ -420,9 +420,9 @@ func (ss *CentralizedStrategy) refreshTargets() (err error) {

// Set refresh flag if modifiedAt timestamp from remote is greater than ours.
if remote := output.LastRuleModification; remote != nil {
ss.manifest.RLock()
ss.manifest.mu.RLock()
local := ss.manifest.refreshedAt
ss.manifest.RUnlock()
ss.manifest.mu.RUnlock()

if remote.Unix() >= local {
refresh = true
Expand All @@ -447,8 +447,8 @@ func (ss *CentralizedStrategy) snapshots() []*xraySvc.SamplingStatisticsDocument
statistics := make([]*xraySvc.SamplingStatisticsDocument, 0, len(ss.manifest.Rules)+1)
now := ss.clock.Now().Unix()

ss.manifest.RLock()
defer ss.manifest.RUnlock()
ss.manifest.mu.RLock()
defer ss.manifest.mu.RUnlock()

// Generate sampling statistics for user-defined rules
for _, r := range ss.manifest.Rules {
Expand Down Expand Up @@ -486,16 +486,16 @@ func (ss *CentralizedStrategy) updateTarget(t *xraySvc.SamplingTargetDocument) (
}

// Rule for given target
ss.manifest.RLock()
ss.manifest.mu.RLock()
r, ok := ss.manifest.Index[*t.RuleName]
ss.manifest.RUnlock()
ss.manifest.mu.RUnlock()

if !ok {
return fmt.Errorf("rule %s not found", *t.RuleName)
}

r.Lock()
defer r.Unlock()
r.mu.Lock()
defer r.mu.Unlock()

r.reservoir.refreshedAt = ss.clock.Now().Unix()

Expand Down
38 changes: 19 additions & 19 deletions strategy/sampling/centralized_sampling_rule_manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type CentralizedManifest struct {
Index map[string]*CentralizedRule
refreshedAt int64
clock utils.Clock
sync.RWMutex
mu sync.RWMutex
}

// putRule updates the named rule if it already exists or creates it if it does not.
Expand All @@ -48,9 +48,9 @@ func (m *CentralizedManifest) putRule(svcRule *xraySvc.SamplingRule) (r *Central

// Default rule
if name == defaultRule {
m.RLock()
m.mu.RLock()
r = m.Default
m.RUnlock()
m.mu.RUnlock()

// Update rule if already exists
if r != nil {
Expand All @@ -66,9 +66,9 @@ func (m *CentralizedManifest) putRule(svcRule *xraySvc.SamplingRule) (r *Central
}

// User-defined rule
m.RLock()
m.mu.RLock()
r, ok := m.Index[name]
m.RUnlock()
m.mu.RUnlock()

// Create rule if it does not exist
if !ok {
Expand Down Expand Up @@ -122,8 +122,8 @@ func (m *CentralizedManifest) createUserRule(svcRule *xraySvc.SamplingRule) *Cen
rand: rand,
}

m.Lock()
defer m.Unlock()
m.mu.Lock()
defer m.mu.Unlock()

// Return early if rule already exists
if r, ok := m.Index[*svcRule.RuleName]; ok {
Expand Down Expand Up @@ -156,8 +156,8 @@ func (m *CentralizedManifest) updateUserRule(r *CentralizedRule, svcRule *xraySv

p, c := *svcRule.Priority, *svcRule.ReservoirSize

r.Lock()
defer r.Unlock()
r.mu.Lock()
defer r.mu.Unlock()

r.Properties = pr
r.priority = p
Expand Down Expand Up @@ -196,8 +196,8 @@ func (m *CentralizedManifest) createDefaultRule(svcRule *xraySvc.SamplingRule) *
rand: rand,
}

m.Lock()
defer m.Unlock()
m.mu.Lock()
defer m.mu.Unlock()

// Return early if rule already exists
if d := m.Default; d != nil {
Expand Down Expand Up @@ -228,8 +228,8 @@ func (m *CentralizedManifest) updateDefaultRule(svcRule *xraySvc.SamplingRule) {

c := *svcRule.ReservoirSize

r.Lock()
defer r.Unlock()
r.mu.Lock()
defer r.mu.Unlock()

r.Properties = p
r.reservoir.capacity = c
Expand All @@ -238,8 +238,8 @@ func (m *CentralizedManifest) updateDefaultRule(svcRule *xraySvc.SamplingRule) {
// prune removes all rules in the manifest not present in the given list of active rules.
// Preserves ordering of sorted array.
func (m *CentralizedManifest) prune(actives map[*CentralizedRule]bool) {
m.Lock()
defer m.Unlock()
m.mu.Lock()
defer m.mu.Unlock()

// Iterate in reverse order to avoid adjusting index for each deleted rule
for i := len(m.Rules) - 1; i >= 0; i-- {
Expand Down Expand Up @@ -278,17 +278,17 @@ func (m *CentralizedManifest) sort() {
return m.Rules[i].priority < m.Rules[j].priority
}

m.Lock()
defer m.Unlock()
m.mu.Lock()
defer m.mu.Unlock()

sort.Slice(m.Rules, less)
}

// expired returns true if the manifest has not been successfully refreshed in
// 'manifestTTL' seconds.
func (m *CentralizedManifest) expired() bool {
m.RLock()
defer m.RUnlock()
m.mu.RLock()
defer m.mu.RUnlock()

return m.refreshedAt < m.clock.Now().Unix()-manifestTTL
}
6 changes: 3 additions & 3 deletions strategy/sampling/centralized_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2513,14 +2513,14 @@ A:
break A
default:
// Assert that rule was added to manifest and the timestamp refreshed
ss.manifest.Lock()
ss.manifest.mu.Lock()
if len(ss.manifest.Rules) == 1 &&
len(ss.manifest.Index) == 1 &&
ss.manifest.refreshedAt == 1500000000 {
ss.manifest.Unlock()
ss.manifest.mu.Unlock()
break A
}
ss.manifest.Unlock()
ss.manifest.mu.Unlock()
}
}
}
Expand Down
14 changes: 7 additions & 7 deletions strategy/sampling/sampling_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,13 @@ type CentralizedRule struct {
// Provides random numbers
rand utils.Rand

sync.RWMutex
mu sync.RWMutex
}

// stale returns true if the quota is due for a refresh. False otherwise.
func (r *CentralizedRule) stale(now int64) bool {
r.Lock()
defer r.Unlock()
r.mu.Lock()
defer r.mu.Unlock()

return r.requests != 0 && now >= r.reservoir.refreshedAt+r.reservoir.interval
}
Expand All @@ -105,8 +105,8 @@ func (r *CentralizedRule) Sample() *Decision {
Rule: &r.ruleName,
}

r.Lock()
defer r.Unlock()
r.mu.Lock()
defer r.mu.Unlock()

r.requests++

Expand Down Expand Up @@ -165,7 +165,7 @@ func (r *CentralizedRule) bernoulliSample() bool {
// snapshot takes a snapshot of the sampling statistics counters, returning
// xraySvc.SamplingStatistics. It also resets statistics counters.
func (r *CentralizedRule) snapshot() *xraySvc.SamplingStatisticsDocument {
r.Lock()
r.mu.Lock()

name := &r.ruleName

Expand All @@ -176,7 +176,7 @@ func (r *CentralizedRule) snapshot() *xraySvc.SamplingStatisticsDocument {
// Reset counters
r.requests, r.sampled, r.borrows = 0, 0, 0

r.Unlock()
r.mu.Unlock()

now := r.clock.Now()
s := &xraySvc.SamplingStatisticsDocument{
Expand Down

0 comments on commit eff3994

Please sign in to comment.