Skip to content

Commit

Permalink
Merge pull request #246 from arangodb/bugfix/resilience-improvements
Browse files Browse the repository at this point in the history
Resilience improvements
  • Loading branch information
ewoutp authored Sep 7, 2018
2 parents f0a0029 + cb84514 commit 82f70fb
Show file tree
Hide file tree
Showing 45 changed files with 2,452 additions and 382 deletions.
1,056 changes: 1,056 additions & 0 deletions examples/metrics/dashboard.json

Large diffs are not rendered by default.

24 changes: 6 additions & 18 deletions examples/metrics/deployment-operator-servicemonitor.yaml
Original file line number Diff line number Diff line change
@@ -1,34 +1,22 @@
# This example shows how to integrate with the Prometheus Operator
# to bring metrics from kube-arangodb to Prometheus.

apiVersion: v1
kind: Service
metadata:
name: arango-deployment-operator
labels:
app: arango-deployment-operator
spec:
selector:
app: arango-deployment-operator
ports:
- name: metrics
port: 8528

---

apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: arango-deployment-operator
namespace: monitoring
labels:
team: frontend
prometheus: kube-prometheus
spec:
selector:
matchLabels:
app: arango-deployment-operator
namespaceSelector:
matchNames:
- default
endpoints:
- port: metrics
- port: server
scheme: https
tlsConfig:
insecureSkipVerify: true

2 changes: 2 additions & 0 deletions lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,6 @@ func cmdLifecycleCopyRun(cmd *cobra.Command, args []string) {
if err := os.Chmod(targetPath, 0755); err != nil {
cliLog.Fatal().Err(err).Msg("Failed to chmod")
}

cliLog.Info().Msgf("Executable copied to %s", targetPath)
}
4 changes: 4 additions & 0 deletions pkg/apis/deployment/v1alpha/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ const (
// ConditionTypeCleanedOut indicates that the member (dbserver) has been cleaned out.
// Always check in combination with ConditionTypeTerminated.
ConditionTypeCleanedOut ConditionType = "CleanedOut"
// ConditionTypeAgentRecoveryNeeded indicates that the member (agent) will no
// longer recover from its current volume and there has to be rebuild
// using the recovery procedure.
ConditionTypeAgentRecoveryNeeded ConditionType = "AgentRecoveryNeeded"
// ConditionTypePodSchedulingFailure indicates that one or more pods belonging to the deployment cannot be schedule.
ConditionTypePodSchedulingFailure ConditionType = "PodSchedulingFailure"
// ConditionTypeSecretsChanged indicates that the value of one of more secrets used by
Expand Down
5 changes: 4 additions & 1 deletion pkg/apis/deployment/v1alpha/member_status_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package v1alpha

import (
"math/rand"
"sort"

"github.com/pkg/errors"
)
Expand Down Expand Up @@ -83,7 +84,9 @@ func (l *MemberStatusList) add(m MemberStatus) error {
return maskAny(errors.Wrapf(AlreadyExistsError, "Member '%s' already exists", m.ID))
}
}
*l = append(src, m)
newList := append(src, m)
sort.Slice(newList, func(i, j int) bool { return newList[i].ID < newList[j].ID })
*l = newList
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/deployment/access_package.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,15 @@ func (d *Deployment) ensureAccessPackage(apSecretName string) error {

// Fetch client authentication CA
clientAuthSecretName := spec.Sync.Authentication.GetClientCASecretName()
clientAuthCert, clientAuthKey, _, err := k8sutil.GetCASecret(d.deps.KubeCli.CoreV1(), clientAuthSecretName, ns, nil)
clientAuthCert, clientAuthKey, _, err := k8sutil.GetCASecret(secrets, clientAuthSecretName, nil)
if err != nil {
log.Debug().Err(err).Msg("Failed to get client-auth CA secret")
return maskAny(err)
}

// Fetch TLS CA public key
tlsCASecretName := spec.Sync.TLS.GetCASecretName()
tlsCACert, err := k8sutil.GetCACertficateSecret(d.deps.KubeCli.CoreV1(), tlsCASecretName, ns)
tlsCACert, err := k8sutil.GetCACertficateSecret(secrets, tlsCASecretName)
if err != nil {
log.Debug().Err(err).Msg("Failed to get TLS CA secret")
return maskAny(err)
Expand Down
9 changes: 6 additions & 3 deletions pkg/deployment/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,9 @@ func (d *Deployment) GetSyncServerClient(ctx context.Context, group api.ServerGr
log := d.deps.Log
kubecli := d.deps.KubeCli
ns := d.apiObject.GetNamespace()
secrets := kubecli.CoreV1().Secrets(ns)
secretName := d.apiObject.Spec.Sync.Monitoring.GetTokenSecretName()
monitoringToken, err := k8sutil.GetTokenSecret(kubecli.CoreV1(), secretName, ns)
monitoringToken, err := k8sutil.GetTokenSecret(secrets, secretName)
if err != nil {
log.Debug().Err(err).Str("secret-name", secretName).Msg("Failed to get sync monitoring secret")
return nil, maskAny(err)
Expand Down Expand Up @@ -331,7 +332,8 @@ func (d *Deployment) GetPvc(pvcName string) (*v1.PersistentVolumeClaim, error) {
func (d *Deployment) GetTLSKeyfile(group api.ServerGroup, member api.MemberStatus) (string, error) {
secretName := k8sutil.CreateTLSKeyfileSecretName(d.apiObject.GetName(), group.AsRole(), member.ID)
ns := d.apiObject.GetNamespace()
result, err := k8sutil.GetTLSKeyfileSecret(d.deps.KubeCli.CoreV1(), secretName, ns)
secrets := d.deps.KubeCli.CoreV1().Secrets(ns)
result, err := k8sutil.GetTLSKeyfileSecret(secrets, secretName)
if err != nil {
return "", maskAny(err)
}
Expand All @@ -353,8 +355,9 @@ func (d *Deployment) DeleteTLSKeyfile(group api.ServerGroup, member api.MemberSt
// Returns: publicKey, privateKey, ownerByDeployment, error
func (d *Deployment) GetTLSCA(secretName string) (string, string, bool, error) {
ns := d.apiObject.GetNamespace()
secrets := d.deps.KubeCli.CoreV1().Secrets(ns)
owner := d.apiObject.AsOwner()
cert, priv, isOwned, err := k8sutil.GetCASecret(d.deps.KubeCli.CoreV1(), secretName, ns, &owner)
cert, priv, isOwned, err := k8sutil.GetCASecret(secrets, secretName, &owner)
if err != nil {
return "", "", false, maskAny(err)
}
Expand Down
16 changes: 9 additions & 7 deletions pkg/deployment/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/arangodb/kube-arangodb/pkg/deployment/resilience"
"github.com/arangodb/kube-arangodb/pkg/deployment/resources"
"github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned"
"github.com/arangodb/kube-arangodb/pkg/util"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
"github.com/arangodb/kube-arangodb/pkg/util/retry"
"github.com/arangodb/kube-arangodb/pkg/util/trigger"
Expand Down Expand Up @@ -78,8 +79,8 @@ type deploymentEvent struct {

const (
deploymentEventQueueSize = 256
minInspectionInterval = time.Second // Ensure we inspect the generated resources no less than with this interval
maxInspectionInterval = time.Minute // Ensure we inspect the generated resources no less than with this interval
minInspectionInterval = util.Interval(time.Second) // Ensure we inspect the generated resources no less than with this interval
maxInspectionInterval = util.Interval(time.Minute) // Ensure we inspect the generated resources no less than with this interval
)

// Deployment is the in process state of an ArangoDeployment.
Expand Down Expand Up @@ -140,6 +141,7 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De
ci := newClusterScalingIntegration(d)
d.clusterScalingIntegration = ci
go ci.ListenForClusterEvents(d.stopCh)
go d.resources.RunDeploymentHealthLoop(d.stopCh)
}
if config.AllowChaos {
d.chaosMonkey = chaos.NewMonkey(deps.Log, d)
Expand Down Expand Up @@ -247,21 +249,21 @@ func (d *Deployment) run() {
}

case <-d.inspectTrigger.Done():
log.Debug().Msg("Inspect deployment...")
inspectionInterval = d.inspectDeployment(inspectionInterval)
log.Debug().Str("interval", inspectionInterval.String()).Msg("...inspected deployment")

case <-d.updateDeploymentTrigger.Done():
inspectionInterval = minInspectionInterval
if err := d.handleArangoDeploymentUpdatedEvent(); err != nil {
d.CreateEvent(k8sutil.NewErrorEvent("Failed to handle deployment update", err, d.GetAPIObject()))
}

case <-time.After(inspectionInterval):
case <-inspectionInterval.After():
// Trigger inspection
d.inspectTrigger.Trigger()
// Backoff with next interval
inspectionInterval = time.Duration(float64(inspectionInterval) * 1.5)
if inspectionInterval > maxInspectionInterval {
inspectionInterval = maxInspectionInterval
}
inspectionInterval = inspectionInterval.Backoff(1.5, maxInspectionInterval)
}
}
}
Expand Down
30 changes: 21 additions & 9 deletions pkg/deployment/deployment_inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,35 @@ import (
"time"

api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/metrics"
"github.com/arangodb/kube-arangodb/pkg/util"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var (
inspectDeploymentDurationGauges = metrics.MustRegisterGaugeVec(metricsComponent, "inspect_deployment_duration", "Amount of time taken by a single inspection of a deployment (in sec)", metrics.DeploymentName)
)

// inspectDeployment inspects the entire deployment, creates
// a plan to update if needed and inspects underlying resources.
// This function should be called when:
// - the deployment has changed
// - any of the underlying resources has changed
// - once in a while
// Returns the delay until this function should be called again.
func (d *Deployment) inspectDeployment(lastInterval time.Duration) time.Duration {
func (d *Deployment) inspectDeployment(lastInterval util.Interval) util.Interval {
log := d.deps.Log
start := time.Now()

nextInterval := lastInterval
hasError := false
ctx := context.Background()
deploymentName := d.apiObject.GetName()
defer metrics.SetDuration(inspectDeploymentDurationGauges.WithLabelValues(deploymentName), start)

// Check deployment still exists
updated, err := d.deps.DatabaseCRCli.DatabaseV1alpha().ArangoDeployments(d.apiObject.GetNamespace()).Get(d.apiObject.GetName(), metav1.GetOptions{})
updated, err := d.deps.DatabaseCRCli.DatabaseV1alpha().ArangoDeployments(d.apiObject.GetNamespace()).Get(deploymentName, metav1.GetOptions{})
if k8sutil.IsNotFound(err) {
// Deployment is gone
log.Info().Msg("Deployment is gone")
Expand Down Expand Up @@ -87,13 +96,17 @@ func (d *Deployment) inspectDeployment(lastInterval time.Duration) time.Duration
}

// Inspection of generated resources needed
if err := d.resources.InspectPods(ctx); err != nil {
if x, err := d.resources.InspectPods(ctx); err != nil {
hasError = true
d.CreateEvent(k8sutil.NewErrorEvent("Pod inspection failed", err, d.apiObject))
} else {
nextInterval = nextInterval.ReduceTo(x)
}
if err := d.resources.InspectPVCs(ctx); err != nil {
if x, err := d.resources.InspectPVCs(ctx); err != nil {
hasError = true
d.CreateEvent(k8sutil.NewErrorEvent("PVC inspection failed", err, d.apiObject))
} else {
nextInterval = nextInterval.ReduceTo(x)
}

// Check members for resilience
Expand Down Expand Up @@ -149,9 +162,11 @@ func (d *Deployment) inspectDeployment(lastInterval time.Duration) time.Duration
}

// At the end of the inspect, we cleanup terminated pods.
if err := d.resources.CleanupTerminatedPods(); err != nil {
if x, err := d.resources.CleanupTerminatedPods(); err != nil {
hasError = true
d.CreateEvent(k8sutil.NewErrorEvent("Pod cleanup failed", err, d.apiObject))
} else {
nextInterval = nextInterval.ReduceTo(x)
}
}

Expand All @@ -164,10 +179,7 @@ func (d *Deployment) inspectDeployment(lastInterval time.Duration) time.Duration
} else {
d.recentInspectionErrors = 0
}
if nextInterval > maxInspectionInterval {
nextInterval = maxInspectionInterval
}
return nextInterval
return nextInterval.ReduceTo(maxInspectionInterval)
}

// triggerInspection ensures that an inspection is run soon.
Expand Down
28 changes: 28 additions & 0 deletions pkg/deployment/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//

package deployment

const (
// Component name for metrics of this package
metricsComponent = "deployment"
)
15 changes: 14 additions & 1 deletion pkg/deployment/reconcile/plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ func createPlan(log zerolog.Logger, apiObject k8sutil.APIObject,
status.Members.ForeachServerGroup(func(group api.ServerGroup, members api.MemberStatusList) error {
for _, m := range members {
if m.Phase == api.MemberPhaseFailed && len(plan) == 0 {
log.Debug().
Str("id", m.ID).
Str("role", group.AsRole()).
Msg("Creating member replacement plan because member has failed")
newID := ""
if group == api.ServerGroupAgents {
newID = m.ID // Agents cannot (yet) be replaced with new IDs
Expand All @@ -117,6 +121,10 @@ func createPlan(log zerolog.Logger, apiObject k8sutil.APIObject,
// Check for cleaned out dbserver in created state
for _, m := range status.Members.DBServers {
if len(plan) == 0 && m.Phase == api.MemberPhaseCreated && m.Conditions.IsTrue(api.ConditionTypeCleanedOut) {
log.Debug().
Str("id", m.ID).
Str("role", api.ServerGroupDBServers.AsRole()).
Msg("Creating dbserver replacement plan because server is cleanout in created phase")
plan = append(plan,
api.NewAction(api.ActionTypeRemoveMember, api.ServerGroupDBServers, m.ID),
api.NewAction(api.ActionTypeAddMember, api.ServerGroupDBServers, ""),
Expand Down Expand Up @@ -398,13 +406,18 @@ func createRotateMemberPlan(log zerolog.Logger, member api.MemberStatus,
// member.
func createUpgradeMemberPlan(log zerolog.Logger, member api.MemberStatus,
group api.ServerGroup, reason string, imageName string, status api.DeploymentStatus) api.Plan {
upgradeAction := api.ActionTypeUpgradeMember
if group.IsStateless() {
upgradeAction = api.ActionTypeRotateMember
}
log.Debug().
Str("id", member.ID).
Str("role", group.AsRole()).
Str("reason", reason).
Str("action", string(upgradeAction)).
Msg("Creating upgrade plan")
plan := api.Plan{
api.NewAction(api.ActionTypeUpgradeMember, group, member.ID, reason),
api.NewAction(upgradeAction, group, member.ID, reason),
api.NewAction(api.ActionTypeWaitForMemberUp, group, member.ID),
}
if status.CurrentImage == nil || status.CurrentImage.Image != imageName {
Expand Down
10 changes: 5 additions & 5 deletions pkg/deployment/resources/certificates_client_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const (

// createClientAuthCACertificate creates a client authentication CA certificate and stores it in a secret with name
// specified in the given spec.
func createClientAuthCACertificate(log zerolog.Logger, cli v1.CoreV1Interface, spec api.SyncAuthenticationSpec, deploymentName, namespace string, ownerRef *metav1.OwnerReference) error {
func createClientAuthCACertificate(log zerolog.Logger, secrets k8sutil.SecretInterface, spec api.SyncAuthenticationSpec, deploymentName string, ownerRef *metav1.OwnerReference) error {
log = log.With().Str("secret", spec.GetClientCASecretName()).Logger()
options := certificates.CreateCertificateOptions{
CommonName: fmt.Sprintf("%s Client Authentication Root Certificate", deploymentName),
Expand All @@ -57,7 +57,7 @@ func createClientAuthCACertificate(log zerolog.Logger, cli v1.CoreV1Interface, s
log.Debug().Err(err).Msg("Failed to create CA certificate")
return maskAny(err)
}
if err := k8sutil.CreateCASecret(cli, spec.GetClientCASecretName(), namespace, cert, priv, ownerRef); err != nil {
if err := k8sutil.CreateCASecret(secrets, spec.GetClientCASecretName(), cert, priv, ownerRef); err != nil {
if k8sutil.IsAlreadyExists(err) {
log.Debug().Msg("CA Secret already exists")
} else {
Expand All @@ -71,10 +71,10 @@ func createClientAuthCACertificate(log zerolog.Logger, cli v1.CoreV1Interface, s

// createClientAuthCertificateKeyfile creates a client authentication certificate for a specific user and stores
// it in a secret with the given name.
func createClientAuthCertificateKeyfile(log zerolog.Logger, cli v1.CoreV1Interface, commonName string, ttl time.Duration, spec api.SyncAuthenticationSpec, secretName, namespace string, ownerRef *metav1.OwnerReference) error {
func createClientAuthCertificateKeyfile(log zerolog.Logger, secrets v1.SecretInterface, commonName string, ttl time.Duration, spec api.SyncAuthenticationSpec, secretName string, ownerRef *metav1.OwnerReference) error {
log = log.With().Str("secret", secretName).Logger()
// Load CA certificate
caCert, caKey, _, err := k8sutil.GetCASecret(cli, spec.GetClientCASecretName(), namespace, nil)
caCert, caKey, _, err := k8sutil.GetCASecret(secrets, spec.GetClientCASecretName(), nil)
if err != nil {
log.Debug().Err(err).Msg("Failed to load CA certificate")
return maskAny(err)
Expand All @@ -100,7 +100,7 @@ func createClientAuthCertificateKeyfile(log zerolog.Logger, cli v1.CoreV1Interfa
}
keyfile := strings.TrimSpace(cert) + "\n" +
strings.TrimSpace(priv)
if err := k8sutil.CreateTLSKeyfileSecret(cli, secretName, namespace, keyfile, ownerRef); err != nil {
if err := k8sutil.CreateTLSKeyfileSecret(secrets, secretName, keyfile, ownerRef); err != nil {
if k8sutil.IsAlreadyExists(err) {
log.Debug().Msg("Server Secret already exists")
} else {
Expand Down
Loading

0 comments on commit 82f70fb

Please sign in to comment.