Skip to content

Commit

Permalink
GT-526 Use agency cache lock in metrics exporter (#1470)
Browse files Browse the repository at this point in the history
  • Loading branch information
jwierzbo authored Nov 2, 2023
1 parent 7819c2b commit 70d22ed
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 41 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- (Documentation) Update ArangoDeploymentReplication and ArangoLocalStorage CR auto-generated docs
- (Feature) Add ArangoMember Message and extend ArangoMember CRD
- (Documentation) Use OpenAPI-compatible type names in docs
- (Improvement) Use agency cache lock in metrics exporter

## [1.2.34](https://github.com/arangodb/kube-arangodb/tree/1.2.34) (2023-10-16)
- (Bugfix) Fix make manifests-crd-file command
Expand Down
21 changes: 21 additions & 0 deletions pkg/deployment/agency/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,11 @@ type Health interface {

type Cache interface {
Reload(ctx context.Context, size int, clients Connections) (uint64, error)
// Deprecated: Use Apply instead.
// It can cause Read/Write error when state is reloaded.
Data() (state.State, bool)
// Apply applies a function to the current state. Returns true if function was applied
Apply(f func(state.State)) bool
DataDB() (state.DB, bool)
CommitIndex() uint64
// Health returns true when healthy object is available.
Expand Down Expand Up @@ -202,6 +206,10 @@ func (c cacheSingle) Reload(_ context.Context, _ int, _ Connections) (uint64, er
return 0, nil
}

func (c cacheSingle) Apply(f func(state.State)) bool {
return false
}

func (c cacheSingle) Data() (state.State, bool) {
return state.State{}, true
}
Expand Down Expand Up @@ -232,6 +240,19 @@ func (c *cache) CommitIndex() uint64 {
return index
}

func (c *cache) Apply(f func(state.State)) bool {
c.lock.RLock()
defer c.lock.RUnlock()

data, _, ok := c.loader.State()
if !ok {
return false
}

f(data.Arango)
return true
}

func (c *cache) Data() (state.State, bool) {
c.lock.RLock()
defer c.lock.RUnlock()
Expand Down
5 changes: 5 additions & 0 deletions pkg/deployment/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ func (d *Deployment) GetMembersState() memberState.StateInspector {
return d.memberState
}

func (d *Deployment) WithAgencyCache(action func(state.State)) bool {
return d.agencyCache.Apply(action)
}

// Deprecated: Use WithAgencyCache instead
func (d *Deployment) GetAgencyCache() (state.State, bool) {
return d.agencyCache.Data()
}
Expand Down
84 changes: 43 additions & 41 deletions pkg/deployment/old_metrics.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,7 @@ import (
"sync"

api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/deployment/agency/state"
"github.com/arangodb/kube-arangodb/pkg/deployment/features"
"github.com/arangodb/kube-arangodb/pkg/generated/metric_descriptions"
"github.com/arangodb/kube-arangodb/pkg/metrics/collector"
Expand Down Expand Up @@ -76,8 +77,8 @@ func (i *inventory) CollectMetrics(in metrics.PushMetric) {

deployment.CollectMetrics(in)

if state := deployment.acs.CurrentClusterCache(); state != nil {
t := state.GetThrottles()
if cacheInspector := deployment.acs.CurrentClusterCache(); cacheInspector != nil {
t := cacheInspector.GetThrottles()

for _, c := range definitions.AllComponents() {
in.Push(i.operatorStateRefreshMetric.Gauge(float64(t.Get(c).Count()), deployment.GetNamespace(), deployment.GetName(), string(c)))
Expand All @@ -92,58 +93,59 @@ func (i *inventory) CollectMetrics(in metrics.PushMetric) {
}

if spec.Mode.Get().HasAgents() {
agency, agencyOk := deployment.GetAgencyCache()
if !agencyOk {
in.Push(i.deploymentAgencyStateMetric.Gauge(0, deployment.GetNamespace(), deployment.GetName()))
continue
}

in.Push(i.deploymentAgencyStateMetric.Gauge(1, deployment.GetNamespace(), deployment.GetName()))

if spec.Mode.Get() == api.DeploymentModeCluster {
for db, collections := range agency.Current.Collections {
dbName := db
if features.SensitiveInformationProtection().Enabled() {
dbName = "UNKNOWN"

if v, ok := agency.Plan.Databases[db]; ok && v.ID != "" {
dbName = v.ID
applied := deployment.WithAgencyCache(func(st state.State) {
for db, collections := range st.Current.Collections {
dbName := db
if features.SensitiveInformationProtection().Enabled() {
dbName = "UNKNOWN"

if v, ok := st.Plan.Databases[db]; ok && v.ID != "" {
dbName = v.ID
}
}
}

for collection, shards := range collections {
for shard, details := range shards {
for id, server := range details.Servers {
collectionName := "UNKNOWN"
if features.SensitiveInformationProtection().Enabled() {
collectionName = collection
} else {
if _, ok := agency.Plan.Collections[db]; ok {
if _, ok := agency.Plan.Collections[db][collection]; ok {
collectionName = agency.Plan.Collections[db][collection].GetName(collectionName)
for collection, shards := range collections {
for shard, details := range shards {
for id, server := range details.Servers {
collectionName := "UNKNOWN"
if features.SensitiveInformationProtection().Enabled() {
collectionName = collection
} else {
if _, ok := st.Plan.Collections[db]; ok {
if _, ok := st.Plan.Collections[db][collection]; ok {
collectionName = st.Plan.Collections[db][collection].GetName(collectionName)
}
}
}
}

m := []string{
deployment.GetNamespace(),
deployment.GetName(),
dbName,
collectionName,
shard,
string(server),
}
m := []string{
deployment.GetNamespace(),
deployment.GetName(),
dbName,
collectionName,
shard,
string(server),
}

if id == 0 {
in.Push(i.deploymentShardLeadersMetric.Gauge(1, m...))
if id == 0 {
in.Push(i.deploymentShardLeadersMetric.Gauge(1, m...))
}
in.Push(i.deploymentShardsMetric.Gauge(1, m...))
}
in.Push(i.deploymentShardsMetric.Gauge(1, m...))
}
}
}
})

if !applied {
in.Push(i.deploymentAgencyStateMetric.Gauge(0, deployment.GetNamespace(), deployment.GetName()))
continue
}

}
}
in.Push(i.deploymentAgencyStateMetric.Gauge(1, deployment.GetNamespace(), deployment.GetName()))
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/deployment/reconcile/action_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ func (ac *actionContext) ShardsInSyncMap() (state.ShardsSyncStatus, bool) {
return ac.context.ShardsInSyncMap()
}

func (ac *actionContext) WithAgencyCache(action func(state.State)) bool {
return ac.context.WithAgencyCache(action)
}

func (ac *actionContext) GetAgencyCache() (state.State, bool) {
return ac.context.GetAgencyCache()
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/deployment/reconcile/plan_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ func (c *testContext) GenerateMemberEndpoint(group api.ServerGroup, member api.M
return pod2.GenerateMemberEndpoint(c.Inspector, c.ArangoDeployment, c.ArangoDeployment.Spec, group, member)
}

func (c *testContext) WithAgencyCache(action func(state.State)) bool {
return false
}

func (c *testContext) GetAgencyCache() (state.State, bool) {
return state.State{}, true
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/deployment/reconciler/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ type DeploymentImageManager interface {
}

type ArangoAgencyGet interface {
// WithAgencyCache executes the given action with the agency cache using a Read lock. Returns true if action was applied
WithAgencyCache(action func(state.State)) bool
// GetAgencyCache returns the agency cache.
// It can cause to Read/Write error when state is reloaded.
// It is recommended to use WithAgencyCache instead.
GetAgencyCache() (state.State, bool)
GetAgencyArangoDBCache() (state.DB, bool)
GetAgencyHealth() (agencyCache.Health, bool)
Expand Down

0 comments on commit 70d22ed

Please sign in to comment.