Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move mutex to private member for sampling. #123

Merged
merged 2 commits into from
Jun 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions strategy/sampling/centralized.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type CentralizedStrategy struct {
// represents daemon endpoints
daemonEndpoints *daemoncfg.DaemonEndpoints

sync.RWMutex
mu sync.RWMutex
}

// svcProxy is the interface for API calls to X-Ray service.
Expand Down Expand Up @@ -151,15 +151,15 @@ func (ss *CentralizedStrategy) ShouldTrace(request *Request) *Decision {
return ss.fallback.ShouldTrace(request)
}

ss.manifest.RLock()
defer ss.manifest.RUnlock()
ss.manifest.mu.RLock()
defer ss.manifest.mu.RUnlock()

// Match against known rules
for _, r := range ss.manifest.Rules {

r.RLock()
r.mu.RLock()
applicable := r.AppliesTo(request)
r.RUnlock()
r.mu.RUnlock()

if !applicable {
continue
Expand All @@ -185,7 +185,7 @@ func (ss *CentralizedStrategy) ShouldTrace(request *Request) *Decision {

// start initiates rule and target pollers.
func (ss *CentralizedStrategy) start() {
ss.Lock()
ss.mu.Lock()

if !ss.pollerStart {
var er error
Expand All @@ -199,7 +199,7 @@ func (ss *CentralizedStrategy) start() {

ss.pollerStart = true

ss.Unlock()
ss.mu.Unlock()
}

// startRulePoller starts rule poller.
Expand Down Expand Up @@ -339,9 +339,9 @@ func (ss *CentralizedStrategy) refreshManifest() (err error) {
ss.manifest.sort()

// Update refreshedAt timestamp
ss.manifest.Lock()
ss.manifest.mu.Lock()
ss.manifest.refreshedAt = now
ss.manifest.Unlock()
ss.manifest.mu.Unlock()

return
}
Expand Down Expand Up @@ -420,9 +420,9 @@ func (ss *CentralizedStrategy) refreshTargets() (err error) {

// Set refresh flag if modifiedAt timestamp from remote is greater than ours.
if remote := output.LastRuleModification; remote != nil {
ss.manifest.RLock()
ss.manifest.mu.RLock()
local := ss.manifest.refreshedAt
ss.manifest.RUnlock()
ss.manifest.mu.RUnlock()

if remote.Unix() >= local {
refresh = true
Expand All @@ -447,8 +447,8 @@ func (ss *CentralizedStrategy) snapshots() []*xraySvc.SamplingStatisticsDocument
statistics := make([]*xraySvc.SamplingStatisticsDocument, 0, len(ss.manifest.Rules)+1)
now := ss.clock.Now().Unix()

ss.manifest.RLock()
defer ss.manifest.RUnlock()
ss.manifest.mu.RLock()
defer ss.manifest.mu.RUnlock()

// Generate sampling statistics for user-defined rules
for _, r := range ss.manifest.Rules {
Expand Down Expand Up @@ -486,16 +486,16 @@ func (ss *CentralizedStrategy) updateTarget(t *xraySvc.SamplingTargetDocument) (
}

// Rule for given target
ss.manifest.RLock()
ss.manifest.mu.RLock()
r, ok := ss.manifest.Index[*t.RuleName]
ss.manifest.RUnlock()
ss.manifest.mu.RUnlock()

if !ok {
return fmt.Errorf("rule %s not found", *t.RuleName)
}

r.Lock()
defer r.Unlock()
r.mu.Lock()
defer r.mu.Unlock()

r.reservoir.refreshedAt = ss.clock.Now().Unix()

Expand Down
38 changes: 19 additions & 19 deletions strategy/sampling/centralized_sampling_rule_manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type CentralizedManifest struct {
Index map[string]*CentralizedRule
refreshedAt int64
clock utils.Clock
sync.RWMutex
mu sync.RWMutex
}

// putRule updates the named rule if it already exists or creates it if it does not.
Expand All @@ -48,9 +48,9 @@ func (m *CentralizedManifest) putRule(svcRule *xraySvc.SamplingRule) (r *Central

// Default rule
if name == defaultRule {
m.RLock()
m.mu.RLock()
r = m.Default
m.RUnlock()
m.mu.RUnlock()

// Update rule if already exists
if r != nil {
Expand All @@ -66,9 +66,9 @@ func (m *CentralizedManifest) putRule(svcRule *xraySvc.SamplingRule) (r *Central
}

// User-defined rule
m.RLock()
m.mu.RLock()
r, ok := m.Index[name]
m.RUnlock()
m.mu.RUnlock()

// Create rule if it does not exist
if !ok {
Expand Down Expand Up @@ -122,8 +122,8 @@ func (m *CentralizedManifest) createUserRule(svcRule *xraySvc.SamplingRule) *Cen
rand: rand,
}

m.Lock()
defer m.Unlock()
m.mu.Lock()
defer m.mu.Unlock()

// Return early if rule already exists
if r, ok := m.Index[*svcRule.RuleName]; ok {
Expand Down Expand Up @@ -156,8 +156,8 @@ func (m *CentralizedManifest) updateUserRule(r *CentralizedRule, svcRule *xraySv

p, c := *svcRule.Priority, *svcRule.ReservoirSize

r.Lock()
defer r.Unlock()
r.mu.Lock()
defer r.mu.Unlock()

r.Properties = pr
r.priority = p
Expand Down Expand Up @@ -196,8 +196,8 @@ func (m *CentralizedManifest) createDefaultRule(svcRule *xraySvc.SamplingRule) *
rand: rand,
}

m.Lock()
defer m.Unlock()
m.mu.Lock()
defer m.mu.Unlock()

// Return early if rule already exists
if d := m.Default; d != nil {
Expand Down Expand Up @@ -228,8 +228,8 @@ func (m *CentralizedManifest) updateDefaultRule(svcRule *xraySvc.SamplingRule) {

c := *svcRule.ReservoirSize

r.Lock()
defer r.Unlock()
r.mu.Lock()
defer r.mu.Unlock()

r.Properties = p
r.reservoir.capacity = c
Expand All @@ -238,8 +238,8 @@ func (m *CentralizedManifest) updateDefaultRule(svcRule *xraySvc.SamplingRule) {
// prune removes all rules in the manifest not present in the given list of active rules.
// Preserves ordering of sorted array.
func (m *CentralizedManifest) prune(actives map[*CentralizedRule]bool) {
m.Lock()
defer m.Unlock()
m.mu.Lock()
defer m.mu.Unlock()

// Iterate in reverse order to avoid adjusting index for each deleted rule
for i := len(m.Rules) - 1; i >= 0; i-- {
Expand Down Expand Up @@ -278,17 +278,17 @@ func (m *CentralizedManifest) sort() {
return m.Rules[i].priority < m.Rules[j].priority
}

m.Lock()
defer m.Unlock()
m.mu.Lock()
defer m.mu.Unlock()

sort.Slice(m.Rules, less)
}

// expired returns true if the manifest has not been successfully refreshed in
// 'manifestTTL' seconds.
func (m *CentralizedManifest) expired() bool {
m.RLock()
defer m.RUnlock()
m.mu.RLock()
defer m.mu.RUnlock()

return m.refreshedAt < m.clock.Now().Unix()-manifestTTL
}
6 changes: 3 additions & 3 deletions strategy/sampling/centralized_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2513,14 +2513,14 @@ A:
break A
default:
// Assert that rule was added to manifest and the timestamp refreshed
ss.manifest.Lock()
ss.manifest.mu.Lock()
if len(ss.manifest.Rules) == 1 &&
len(ss.manifest.Index) == 1 &&
ss.manifest.refreshedAt == 1500000000 {
ss.manifest.Unlock()
ss.manifest.mu.Unlock()
break A
}
ss.manifest.Unlock()
ss.manifest.mu.Unlock()
}
}
}
Expand Down
14 changes: 7 additions & 7 deletions strategy/sampling/sampling_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,13 @@ type CentralizedRule struct {
// Provides random numbers
rand utils.Rand

sync.RWMutex
mu sync.RWMutex
}

// stale returns true if the quota is due for a refresh. False otherwise.
func (r *CentralizedRule) stale(now int64) bool {
r.Lock()
defer r.Unlock()
r.mu.Lock()
defer r.mu.Unlock()

return r.requests != 0 && now >= r.reservoir.refreshedAt+r.reservoir.interval
}
Expand All @@ -105,8 +105,8 @@ func (r *CentralizedRule) Sample() *Decision {
Rule: &r.ruleName,
}

r.Lock()
defer r.Unlock()
r.mu.Lock()
defer r.mu.Unlock()

r.requests++

Expand Down Expand Up @@ -165,7 +165,7 @@ func (r *CentralizedRule) bernoulliSample() bool {
// snapshot takes a snapshot of the sampling statistics counters, returning
// xraySvc.SamplingStatistics. It also resets statistics counters.
func (r *CentralizedRule) snapshot() *xraySvc.SamplingStatisticsDocument {
r.Lock()
r.mu.Lock()

name := &r.ruleName

Expand All @@ -176,7 +176,7 @@ func (r *CentralizedRule) snapshot() *xraySvc.SamplingStatisticsDocument {
// Reset counters
r.requests, r.sampled, r.borrows = 0, 0, 0

r.Unlock()
r.mu.Unlock()

now := r.clock.Now()
s := &xraySvc.SamplingStatisticsDocument{
Expand Down