From d2e2cda88c55b7b07399e26dd772eff4ec920a75 Mon Sep 17 00:00:00 2001 From: Jack Lindamood Date: Thu, 13 Jun 2019 12:24:35 -0700 Subject: [PATCH 1/2] Move mutex to private member for sampling. Moving mutex to a private member since it does not make sense to allow other actors to lock/unlock a sampling --- strategy/sampling/centralized.go | 14 +++---- .../centralized_sampling_rule_manifest.go | 38 +++++++++---------- strategy/sampling/sampling_rule.go | 14 +++---- 3 files changed, 33 insertions(+), 33 deletions(-) diff --git a/strategy/sampling/centralized.go b/strategy/sampling/centralized.go index a0426ecf..fc30f342 100644 --- a/strategy/sampling/centralized.go +++ b/strategy/sampling/centralized.go @@ -53,7 +53,7 @@ type CentralizedStrategy struct { // represents daemon endpoints daemonEndpoints *daemoncfg.DaemonEndpoints - sync.RWMutex + mu sync.RWMutex } // svcProxy is the interface for API calls to X-Ray service. @@ -157,9 +157,9 @@ func (ss *CentralizedStrategy) ShouldTrace(request *Request) *Decision { // Match against known rules for _, r := range ss.manifest.Rules { - r.RLock() + r.mu.RLock() applicable := r.AppliesTo(request) - r.RUnlock() + r.mu.RUnlock() if !applicable { continue @@ -185,7 +185,7 @@ func (ss *CentralizedStrategy) ShouldTrace(request *Request) *Decision { // start initiates rule and target pollers. func (ss *CentralizedStrategy) start() { - ss.Lock() + ss.mu.Lock() if !ss.pollerStart { var er error @@ -199,7 +199,7 @@ func (ss *CentralizedStrategy) start() { ss.pollerStart = true - ss.Unlock() + ss.mu.Unlock() } // startRulePoller starts rule poller. @@ -494,8 +494,8 @@ func (ss *CentralizedStrategy) updateTarget(t *xraySvc.SamplingTargetDocument) ( return fmt.Errorf("rule %s not found", *t.RuleName) } - r.Lock() - defer r.Unlock() + r.mu.Lock() + defer r.mu.Unlock() r.reservoir.refreshedAt = ss.clock.Now().Unix() diff --git a/strategy/sampling/centralized_sampling_rule_manifest.go b/strategy/sampling/centralized_sampling_rule_manifest.go index 9e32da4d..937dcc28 100644 --- a/strategy/sampling/centralized_sampling_rule_manifest.go +++ b/strategy/sampling/centralized_sampling_rule_manifest.go @@ -32,7 +32,7 @@ type CentralizedManifest struct { Index map[string]*CentralizedRule refreshedAt int64 clock utils.Clock - sync.RWMutex + mu sync.RWMutex } // putRule updates the named rule if it already exists or creates it if it does not. @@ -48,9 +48,9 @@ func (m *CentralizedManifest) putRule(svcRule *xraySvc.SamplingRule) (r *Central // Default rule if name == defaultRule { - m.RLock() + m.mu.RLock() r = m.Default - m.RUnlock() + m.mu.RUnlock() // Update rule if already exists if r != nil { @@ -66,9 +66,9 @@ func (m *CentralizedManifest) putRule(svcRule *xraySvc.SamplingRule) (r *Central } // User-defined rule - m.RLock() + m.mu.RLock() r, ok := m.Index[name] - m.RUnlock() + m.mu.RUnlock() // Create rule if it does not exist if !ok { @@ -122,8 +122,8 @@ func (m *CentralizedManifest) createUserRule(svcRule *xraySvc.SamplingRule) *Cen rand: rand, } - m.Lock() - defer m.Unlock() + m.mu.Lock() + defer m.mu.Unlock() // Return early if rule already exists if r, ok := m.Index[*svcRule.RuleName]; ok { @@ -156,8 +156,8 @@ func (m *CentralizedManifest) updateUserRule(r *CentralizedRule, svcRule *xraySv p, c := *svcRule.Priority, *svcRule.ReservoirSize - r.Lock() - defer r.Unlock() + r.mu.Lock() + defer r.mu.Unlock() r.Properties = pr r.priority = p @@ -196,8 +196,8 @@ func (m *CentralizedManifest) createDefaultRule(svcRule *xraySvc.SamplingRule) * rand: rand, } - m.Lock() - defer m.Unlock() + m.mu.Lock() + defer m.mu.Unlock() // Return early if rule already exists if d := m.Default; d != nil { @@ -228,8 +228,8 @@ func (m *CentralizedManifest) updateDefaultRule(svcRule *xraySvc.SamplingRule) { c := *svcRule.ReservoirSize - r.Lock() - defer r.Unlock() + r.mu.Lock() + defer r.mu.Unlock() r.Properties = p r.reservoir.capacity = c @@ -238,8 +238,8 @@ func (m *CentralizedManifest) updateDefaultRule(svcRule *xraySvc.SamplingRule) { // prune removes all rules in the manifest not present in the given list of active rules. // Preserves ordering of sorted array. func (m *CentralizedManifest) prune(actives map[*CentralizedRule]bool) { - m.Lock() - defer m.Unlock() + m.mu.Lock() + defer m.mu.Unlock() // Iterate in reverse order to avoid adjusting index for each deleted rule for i := len(m.Rules) - 1; i >= 0; i-- { @@ -278,8 +278,8 @@ func (m *CentralizedManifest) sort() { return m.Rules[i].priority < m.Rules[j].priority } - m.Lock() - defer m.Unlock() + m.mu.Lock() + defer m.mu.Unlock() sort.Slice(m.Rules, less) } @@ -287,8 +287,8 @@ func (m *CentralizedManifest) sort() { // expired returns true if the manifest has not been successfully refreshed in // 'manifestTTL' seconds. func (m *CentralizedManifest) expired() bool { - m.RLock() - defer m.RUnlock() + m.mu.RLock() + defer m.mu.RUnlock() return m.refreshedAt < m.clock.Now().Unix()-manifestTTL } diff --git a/strategy/sampling/sampling_rule.go b/strategy/sampling/sampling_rule.go index 59d4edcd..2b7dd60d 100644 --- a/strategy/sampling/sampling_rule.go +++ b/strategy/sampling/sampling_rule.go @@ -87,13 +87,13 @@ type CentralizedRule struct { // Provides random numbers rand utils.Rand - sync.RWMutex + mu sync.RWMutex } // stale returns true if the quota is due for a refresh. False otherwise. func (r *CentralizedRule) stale(now int64) bool { - r.Lock() - defer r.Unlock() + r.mu.Lock() + defer r.mu.Unlock() return r.requests != 0 && now >= r.reservoir.refreshedAt+r.reservoir.interval } @@ -105,8 +105,8 @@ func (r *CentralizedRule) Sample() *Decision { Rule: &r.ruleName, } - r.Lock() - defer r.Unlock() + r.mu.Lock() + defer r.mu.Unlock() r.requests++ @@ -165,7 +165,7 @@ func (r *CentralizedRule) bernoulliSample() bool { // snapshot takes a snapshot of the sampling statistics counters, returning // xraySvc.SamplingStatistics. It also resets statistics counters. func (r *CentralizedRule) snapshot() *xraySvc.SamplingStatisticsDocument { - r.Lock() + r.mu.Lock() name := &r.ruleName @@ -176,7 +176,7 @@ func (r *CentralizedRule) snapshot() *xraySvc.SamplingStatisticsDocument { // Reset counters r.requests, r.sampled, r.borrows = 0, 0, 0 - r.Unlock() + r.mu.Unlock() now := r.clock.Now() s := &xraySvc.SamplingStatisticsDocument{ From b62caa6a663c5587ddcdae7e4779dd8d9f3a37a7 Mon Sep 17 00:00:00 2001 From: Jack Lindamood Date: Thu, 13 Jun 2019 12:55:29 -0700 Subject: [PATCH 2/2] Fix build --- strategy/sampling/centralized.go | 20 ++++++++++---------- strategy/sampling/centralized_test.go | 6 +++--- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/strategy/sampling/centralized.go b/strategy/sampling/centralized.go index fc30f342..b58503b7 100644 --- a/strategy/sampling/centralized.go +++ b/strategy/sampling/centralized.go @@ -151,8 +151,8 @@ func (ss *CentralizedStrategy) ShouldTrace(request *Request) *Decision { return ss.fallback.ShouldTrace(request) } - ss.manifest.RLock() - defer ss.manifest.RUnlock() + ss.manifest.mu.RLock() + defer ss.manifest.mu.RUnlock() // Match against known rules for _, r := range ss.manifest.Rules { @@ -339,9 +339,9 @@ func (ss *CentralizedStrategy) refreshManifest() (err error) { ss.manifest.sort() // Update refreshedAt timestamp - ss.manifest.Lock() + ss.manifest.mu.Lock() ss.manifest.refreshedAt = now - ss.manifest.Unlock() + ss.manifest.mu.Unlock() return } @@ -420,9 +420,9 @@ func (ss *CentralizedStrategy) refreshTargets() (err error) { // Set refresh flag if modifiedAt timestamp from remote is greater than ours. if remote := output.LastRuleModification; remote != nil { - ss.manifest.RLock() + ss.manifest.mu.RLock() local := ss.manifest.refreshedAt - ss.manifest.RUnlock() + ss.manifest.mu.RUnlock() if remote.Unix() >= local { refresh = true @@ -447,8 +447,8 @@ func (ss *CentralizedStrategy) snapshots() []*xraySvc.SamplingStatisticsDocument statistics := make([]*xraySvc.SamplingStatisticsDocument, 0, len(ss.manifest.Rules)+1) now := ss.clock.Now().Unix() - ss.manifest.RLock() - defer ss.manifest.RUnlock() + ss.manifest.mu.RLock() + defer ss.manifest.mu.RUnlock() // Generate sampling statistics for user-defined rules for _, r := range ss.manifest.Rules { @@ -486,9 +486,9 @@ func (ss *CentralizedStrategy) updateTarget(t *xraySvc.SamplingTargetDocument) ( } // Rule for given target - ss.manifest.RLock() + ss.manifest.mu.RLock() r, ok := ss.manifest.Index[*t.RuleName] - ss.manifest.RUnlock() + ss.manifest.mu.RUnlock() if !ok { return fmt.Errorf("rule %s not found", *t.RuleName) diff --git a/strategy/sampling/centralized_test.go b/strategy/sampling/centralized_test.go index 277735ab..648b9937 100644 --- a/strategy/sampling/centralized_test.go +++ b/strategy/sampling/centralized_test.go @@ -2513,14 +2513,14 @@ A: break A default: // Assert that rule was added to manifest and the timestamp refreshed - ss.manifest.Lock() + ss.manifest.mu.Lock() if len(ss.manifest.Rules) == 1 && len(ss.manifest.Index) == 1 && ss.manifest.refreshedAt == 1500000000 { - ss.manifest.Unlock() + ss.manifest.mu.Unlock() break A } - ss.manifest.Unlock() + ss.manifest.mu.Unlock() } } }