Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GT-429 [Feature] Agency Cache memory usage reduction #1325

Merged
merged 3 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## [master](https://github.com/arangodb/kube-arangodb/tree/master) (N/A)
- (Maintenance) Add govulncheck to pipeline, update golangci-linter
- (Feature) Agency Cache memory usage reduction

## [1.2.28](https://github.com/arangodb/kube-arangodb/tree/1.2.28) (2023-06-05)
- (Feature) ArangoBackup create retries and MaxIterations limit
Expand Down
4 changes: 4 additions & 0 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/arangodb/kube-arangodb/pkg/api"
deploymentApi "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/crd"
"github.com/arangodb/kube-arangodb/pkg/deployment/agency/cache"
"github.com/arangodb/kube-arangodb/pkg/deployment/features"
"github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned/scheme"
"github.com/arangodb/kube-arangodb/pkg/logging"
Expand Down Expand Up @@ -228,6 +229,9 @@ func init() {
if err := features.Init(&cmdMain); err != nil {
panic(err.Error())
}
if err := cache.Init(&cmdMain); err != nil {
panic(err.Error())
}
}

func Execute() int {
Expand Down
29 changes: 8 additions & 21 deletions pkg/deployment/agency/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,21 @@ import (

"github.com/rs/zerolog"

"github.com/arangodb/go-driver"
"github.com/arangodb/go-driver/agency"

api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/generated/metric_descriptions"
"github.com/arangodb/kube-arangodb/pkg/logging"
"github.com/arangodb/kube-arangodb/pkg/util/arangod/conn"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/globals"
"github.com/arangodb/kube-arangodb/pkg/util/metrics"
)

type Connections map[string]conn.Connection

type health struct {
namespace, name string

leaderID string
leader driver.Connection

agencySize int

Expand All @@ -52,14 +51,6 @@ type health struct {
election map[string]int
}

func (h health) Leader() (driver.Connection, bool) {
if l := h.leader; l != nil {
return l, true
}

return nil, false
}

func (h health) CollectMetrics(m metrics.PushMetric) {
if err := h.Serving(); err == nil {
m.Push(metric_descriptions.ArangodbOperatorAgencyCacheServingGauge(1, h.namespace, h.name))
Expand Down Expand Up @@ -145,14 +136,11 @@ type Health interface {
// LeaderID returns a leader ID or empty string if a leader is not known.
LeaderID() string

// Leader returns connection to the Agency leader
Leader() (driver.Connection, bool)

CollectMetrics(m metrics.PushMetric)
}

type Cache interface {
Reload(ctx context.Context, size int, clients map[string]agency.Agency) (uint64, error)
Reload(ctx context.Context, size int, clients Connections) (uint64, error)
Data() (State, bool)
DataDB() (StateDB, bool)
CommitIndex() uint64
Expand Down Expand Up @@ -206,7 +194,7 @@ func (c cacheSingle) Health() (Health, bool) {
return nil, false
}

func (c cacheSingle) Reload(_ context.Context, _ int, _ map[string]agency.Agency) (uint64, error) {
func (c cacheSingle) Reload(_ context.Context, _ int, _ Connections) (uint64, error) {
return 0, nil
}

Expand Down Expand Up @@ -278,7 +266,7 @@ func (c *cache) Health() (Health, bool) {
return nil, false
}

func (c *cache) Reload(ctx context.Context, size int, clients map[string]agency.Agency) (uint64, error) {
func (c *cache) Reload(ctx context.Context, size int, clients Connections) (uint64, error) {
c.lock.Lock()
defer c.lock.Unlock()

Expand Down Expand Up @@ -313,7 +301,7 @@ func (c *cache) Reload(ctx context.Context, size int, clients map[string]agency.
return index, nil
}

func (c *cache) reload(ctx context.Context, size int, clients map[string]agency.Agency) (uint64, error) {
func (c *cache) reload(ctx context.Context, size int, clients Connections) (uint64, error) {
leaderCli, leaderConfig, health, err := c.getLeader(ctx, size, clients)
if err != nil {
// Invalidate a leader ID and agency state.
Expand Down Expand Up @@ -363,7 +351,7 @@ func (c *cache) ShardsInSyncMap() (ShardsSyncStatus, bool) {

// getLeader returns config and client to a leader agency, and health to check if agencies are on the same page.
// If there is no quorum for the leader then error is returned.
func (c *cache) getLeader(ctx context.Context, size int, clients map[string]agency.Agency) (agency.Agency, *Config, health, error) {
func (c *cache) getLeader(ctx context.Context, size int, clients Connections) (conn.Connection, *Config, health, error) {
configs := make([]*Config, len(clients))
errs := make([]error, len(clients))
names := make([]string, 0, len(clients))
Expand Down Expand Up @@ -427,7 +415,6 @@ func (c *cache) getLeader(ctx context.Context, size int, clients map[string]agen

for id := range names {
if h.leaderID == h.names[id] {
h.leader = clients[names[id]].Connection()
if cfg := configs[id]; cfg != nil {
return clients[names[id]], cfg, h, nil
}
Expand Down
55 changes: 55 additions & 0 deletions pkg/deployment/agency/cache/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
//
// DISCLAIMER
//
// Copyright 2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//

package cache

import (
"time"

"github.com/spf13/cobra"

"github.com/arangodb/kube-arangodb/pkg/util"
"github.com/arangodb/kube-arangodb/pkg/version"
)

func Init(cmd *cobra.Command) error {
f := cmd.PersistentFlags()

ee := version.GetVersionV1().IsEnterprise()

f.BoolVar(&global.PollEnabled, "agency.poll-enabled", ee, "The Agency poll functionality enablement (EnterpriseEdition Only)")

if !ee {
if err := f.MarkHidden("agency.poll-enabled"); err != nil {
return err
}
}

f.DurationVar(&global.RefreshDelay, "agency.refresh-delay", util.BoolSwitch(ee, 500*time.Millisecond, 0), "The Agency refresh delay (0 = no delay)")

return nil
}

var global Config

type Config struct {
PollEnabled bool
RefreshDelay time.Duration
}
34 changes: 8 additions & 26 deletions pkg/deployment/agency/config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -22,41 +22,23 @@ package agency

import (
"context"
"encoding/json"
"net/http"

"github.com/arangodb/go-driver"
"github.com/arangodb/go-driver/agency"
"github.com/arangodb/kube-arangodb/pkg/util/arangod/conn"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
)

func GetAgencyConfig(ctx context.Context, client agency.Agency) (*Config, error) {
return GetAgencyConfigC(ctx, client.Connection())
}

func GetAgencyConfigC(ctx context.Context, conn driver.Connection) (*Config, error) {
req, err := conn.NewRequest(http.MethodGet, "/_api/agency/config")
if err != nil {
return nil, err
}

var data []byte

resp, err := conn.Do(driver.WithRawResponse(ctx, &data), req)
func GetAgencyConfig(ctx context.Context, connection conn.Connection) (*Config, error) {
resp, code, err := conn.NewExecutor[any, Config](connection).ExecuteGet(ctx, "/_api/agency/config")
if err != nil {
return nil, err
}

if err := resp.CheckStatus(http.StatusOK); err != nil {
return nil, err
}

var c Config

if err := json.Unmarshal(data, &c); err != nil {
return nil, err
if code != http.StatusOK {
return nil, errors.Newf("Unknown response code %d", code)
}

return &c, nil
return resp, nil
}

type Config struct {
Expand Down
24 changes: 22 additions & 2 deletions pkg/deployment/agency/definitions.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,8 @@ import (
"strings"
)

type ReadRequest [][]string

const (
ArangoKey = "arango"
ArangoDBKey = "arangodb"
Expand Down Expand Up @@ -66,6 +68,24 @@ func GetAgencyReadKey(elements ...string) []string {
return elements
}

func GetAgencyReadRequest(elements ...[]string) [][]string {
func GetAgencyReadRequest(elements ...[]string) ReadRequest {
return elements
}

func GetAgencyReadRequestFields() ReadRequest {
return GetAgencyReadRequest([]string{
GetAgencyKey(ArangoKey, SupervisionKey, SupervisionMaintenanceKey),
GetAgencyKey(ArangoKey, PlanKey, PlanCollectionsKey),
GetAgencyKey(ArangoKey, PlanKey, PlanDatabasesKey),
GetAgencyKey(ArangoKey, CurrentKey, PlanCollectionsKey),
GetAgencyKey(ArangoKey, CurrentKey, CurrentMaintenanceServers),
GetAgencyKey(ArangoKey, TargetKey, TargetHotBackupKey),
GetAgencyKey(ArangoKey, TargetKey, TargetJobToDoKey),
GetAgencyKey(ArangoKey, TargetKey, TargetJobPendingKey),
GetAgencyKey(ArangoKey, TargetKey, TargetJobFailedKey),
GetAgencyKey(ArangoKey, TargetKey, TargetJobFinishedKey),
GetAgencyKey(ArangoKey, TargetKey, TargetCleanedServersKey),
GetAgencyKey(ArangoDBKey, ArangoSyncKey, ArangoSyncStateKey, ArangoSyncStateIncomingKey, ArangoSyncStateIncomingStateKey),
GetAgencyKey(ArangoDBKey, ArangoSyncKey, ArangoSyncStateKey, ArangoSyncStateOutgoingKey, ArangoSyncStateOutgoingTargetsKey),
})
}
55 changes: 9 additions & 46 deletions pkg/deployment/agency/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,68 +22,31 @@ package agency

import (
"context"
"encoding/json"
"net/http"

"github.com/arangodb/go-driver"
"github.com/arangodb/go-driver/agency"

"github.com/arangodb/kube-arangodb/pkg/util/arangod/conn"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
)

func (c *cache) loadState(ctx context.Context, client agency.Agency) (StateRoot, error) {
conn := client.Connection()

req, err := client.Connection().NewRequest(http.MethodPost, "/_api/agency/read")
if err != nil {
return StateRoot{}, err
}

var data []byte

readKeys := []string{
GetAgencyKey(ArangoKey, SupervisionKey, SupervisionMaintenanceKey),
GetAgencyKey(ArangoKey, PlanKey, PlanCollectionsKey),
GetAgencyKey(ArangoKey, PlanKey, PlanDatabasesKey),
GetAgencyKey(ArangoKey, CurrentKey, PlanCollectionsKey),
GetAgencyKey(ArangoKey, CurrentKey, CurrentMaintenanceServers),
GetAgencyKey(ArangoKey, TargetKey, TargetHotBackupKey),
GetAgencyKey(ArangoKey, TargetKey, TargetJobToDoKey),
GetAgencyKey(ArangoKey, TargetKey, TargetJobPendingKey),
GetAgencyKey(ArangoKey, TargetKey, TargetJobFailedKey),
GetAgencyKey(ArangoKey, TargetKey, TargetJobFinishedKey),
GetAgencyKey(ArangoKey, TargetKey, TargetCleanedServersKey),
GetAgencyKey(ArangoDBKey, ArangoSyncKey, ArangoSyncStateKey, ArangoSyncStateIncomingKey, ArangoSyncStateIncomingStateKey),
GetAgencyKey(ArangoDBKey, ArangoSyncKey, ArangoSyncStateKey, ArangoSyncStateOutgoingKey, ArangoSyncStateOutgoingTargetsKey),
}

req, err = req.SetBody(GetAgencyReadRequest(GetAgencyReadKey(readKeys...)))
func (c *cache) loadState(ctx context.Context, connection conn.Connection) (StateRoot, error) {
resp, code, err := conn.NewExecutor[ReadRequest, StateRoots](connection).Execute(ctx, http.MethodPost, "/_api/agency/config", GetAgencyReadRequestFields())
if err != nil {
return StateRoot{}, err
}

resp, err := conn.Do(driver.WithRawResponse(ctx, &data), req)
if err != nil {
return StateRoot{}, err
}

if err := resp.CheckStatus(http.StatusOK); err != nil {
return StateRoot{}, err
if code != http.StatusOK {
return StateRoot{}, errors.Newf("Unknown response code %d", code)
}

var r StateRoots

if err := json.Unmarshal(data, &r); err != nil {
return StateRoot{}, err
if resp == nil {
return StateRoot{}, errors.Newf("Missing response body")
}

if len(r) != 1 {
if len(*resp) != 1 {
return StateRoot{}, errors.Newf("Invalid response size")
}

state := r[0]

return state, nil
return (*resp)[0], nil
}

type StateRoots []StateRoot
Expand Down
Loading