Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Force volumes to unique nodes for production environments #162

Merged
merged 3 commits into from
Jun 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions examples/production-cluster.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
apiVersion: "database.arangodb.com/v1alpha"
kind: "ArangoDeployment"
metadata:
name: "production-cluster"
spec:
mode: Cluster
image: arangodb/arangodb:3.3.10
environment: Production
5 changes: 5 additions & 0 deletions pkg/apis/deployment/v1alpha/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ func (e Environment) Validate() error {
}
}

// IsProduction returns true when the given environment is a production environment.
func (e Environment) IsProduction() bool {
return e == EnvironmentProduction
}

// NewEnvironment returns a reference to a string with given value.
func NewEnvironment(input Environment) *Environment {
return &input
Expand Down
3 changes: 2 additions & 1 deletion pkg/deployment/resources/pvcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func (r *Resources) EnsurePVCs() error {
owner := apiObject.AsOwner()
iterator := r.context.GetServerGroupIterator()
status := r.context.GetStatus()
enforceAntiAffinity := r.context.GetSpec().GetEnvironment().IsProduction()

if err := iterator.ForeachServerGroup(func(group api.ServerGroup, spec api.ServerGroupSpec, status *api.MemberStatusList) error {
for _, m := range *status {
Expand All @@ -51,7 +52,7 @@ func (r *Resources) EnsurePVCs() error {
role := group.AsRole()
resources := spec.Resources
finalizers := r.createPVCFinalizers(group)
if err := k8sutil.CreatePersistentVolumeClaim(kubecli, m.PersistentVolumeClaimName, deploymentName, ns, storageClassName, role, resources, finalizers, owner); err != nil {
if err := k8sutil.CreatePersistentVolumeClaim(kubecli, m.PersistentVolumeClaimName, deploymentName, ns, storageClassName, role, enforceAntiAffinity, resources, finalizers, owner); err != nil {
return maskAny(err)
}
}
Expand Down
141 changes: 138 additions & 3 deletions pkg/storage/pv_creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package storage

import (
"context"
"crypto/sha1"
"encoding/json"
"fmt"
"math/rand"
Expand All @@ -32,6 +33,7 @@ import (
"sort"
"strconv"
"strings"
"time"

"k8s.io/apimachinery/pkg/api/resource"

Expand All @@ -41,6 +43,8 @@ import (

api "github.com/arangodb/kube-arangodb/pkg/apis/storage/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/storage/provisioner"
"github.com/arangodb/kube-arangodb/pkg/util/constants"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)

const (
Expand Down Expand Up @@ -72,7 +76,24 @@ func (ls *LocalStorage) createPVs(ctx context.Context, apiObject *api.ArangoLoca
clients[i], clients[j] = clients[j], clients[i]
})

var nodeClientMap map[string]provisioner.API
for i, claim := range unboundClaims {
// Find deployment name & role in the claim (if any)
deplName, role, enforceAniAffinity := getDeploymentInfo(claim)
allowedClients := clients
if enforceAniAffinity && deplName != "" {
// Select nodes to choose from such that no volume in group lands on the same node
if nodeClientMap == nil {
nodeClientMap = createNodeClientMap(ctx, clients)
}
var err error
allowedClients, err = ls.filterAllowedNodes(nodeClientMap, deplName, role)
if err != nil {
log.Warn().Err(err).Msg("Failed to filter allowed nodes")
continue // We'll try this claim again later
}
}

// Find size of PVC
volSize := defaultVolumeSize
if reqStorage := claim.Spec.Resources.Requests.StorageEphemeral(); reqStorage != nil {
Expand All @@ -81,7 +102,7 @@ func (ls *LocalStorage) createPVs(ctx context.Context, apiObject *api.ArangoLoca
}
}
// Create PV
if err := ls.createPV(ctx, apiObject, clients, i, volSize); err != nil {
if err := ls.createPV(ctx, apiObject, allowedClients, i, volSize, claim, deplName, role); err != nil {
log.Error().Err(err).Msg("Failed to create PersistentVolume")
}
}
Expand All @@ -90,7 +111,7 @@ func (ls *LocalStorage) createPVs(ctx context.Context, apiObject *api.ArangoLoca
}

// createPV creates a PersistentVolume.
func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocalStorage, clients []provisioner.API, clientsOffset int, volSize int64) error {
func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocalStorage, clients []provisioner.API, clientsOffset int, volSize int64, claim v1.PersistentVolumeClaim, deploymentName, role string) error {
log := ls.deps.Log
// Try clients
for clientIdx := 0; clientIdx < len(clients); clientIdx++ {
Expand All @@ -117,7 +138,7 @@ func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocal
continue
}
// Create a volume
pvName := apiObject.GetName() + "-" + name
pvName := strings.ToLower(apiObject.GetName() + "-" + shortHash(info.NodeName) + "-" + name)
volumeMode := v1.PersistentVolumeFilesystem
nodeAff, err := createNodeAffinity(info.NodeName)
if err != nil {
Expand All @@ -131,6 +152,10 @@ func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocal
v1.AlphaStorageNodeAffinityAnnotation: nodeAff,
nodeNameAnnotation: info.NodeName,
},
Labels: map[string]string{
k8sutil.LabelKeyArangoDeployment: deploymentName,
k8sutil.LabelKeyRole: role,
},
},
Spec: v1.PersistentVolumeSpec{
Capacity: v1.ResourceList{
Expand All @@ -147,6 +172,13 @@ func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocal
},
StorageClassName: apiObject.Spec.StorageClass.Name,
VolumeMode: &volumeMode,
ClaimRef: &v1.ObjectReference{
Kind: "PersistentVolumeClaim",
APIVersion: "",
Name: claim.GetName(),
Namespace: claim.GetNamespace(),
UID: claim.GetUID(),
},
},
}
// Attach PV to ArangoLocalStorage
Expand All @@ -159,6 +191,16 @@ func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocal
Str("name", pvName).
Str("node-name", info.NodeName).
Msg("Created PersistentVolume")

// Bind claim to volume
if err := ls.bindClaimToVolume(claim, pv.GetName()); err != nil {
// Try to delete the PV now
if err := ls.deps.KubeCli.CoreV1().PersistentVolumes().Delete(pv.GetName(), &metav1.DeleteOptions{}); err != nil {
log.Error().Err(err).Msg("Failed to delete PV after binding PVC failed")
}
return maskAny(err)
}

return nil
}
}
Expand Down Expand Up @@ -204,3 +246,96 @@ func createNodeAffinity(nodeName string) (string, error) {
}
return string(encoded), nil
}

// createNodeClientMap creates a map from node name to API.
// Clients that do not respond properly on a GetNodeInfo request are
// ignored.
func createNodeClientMap(ctx context.Context, clients []provisioner.API) map[string]provisioner.API {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should I be harassing you about these functions being nicely isolated and yet have no unit tests to demonstrate they work?

Copy link
Contributor Author

@ewoutp ewoutp Jun 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes .... will need to make some mocks to do so.
Will do so in a separate PR

result := make(map[string]provisioner.API)
for _, c := range clients {
if info, err := c.GetNodeInfo(ctx); err == nil {
result[info.NodeName] = c
}
}
return result
}

// getDeploymentInfo returns the name of the deployment that created the given claim,
// the role of the server that the claim is used for and the value for `enforceAntiAffinity`.
// If not found, empty strings are returned.
// Returns deploymentName, role, enforceAntiAffinity.
func getDeploymentInfo(pvc v1.PersistentVolumeClaim) (string, string, bool) {
deploymentName := pvc.GetLabels()[k8sutil.LabelKeyArangoDeployment]
role := pvc.GetLabels()[k8sutil.LabelKeyRole]
enforceAntiAffinity, _ := strconv.ParseBool(pvc.GetAnnotations()[constants.AnnotationEnforceAntiAffinity]) // If annotation empty, this will yield false.
return deploymentName, role, enforceAntiAffinity
}

// filterAllowedNodes returns those clients that do not yet have a volume for the given deployment name & role.
func (ls *LocalStorage) filterAllowedNodes(clients map[string]provisioner.API, deploymentName, role string) ([]provisioner.API, error) {
// Find all PVs for given deployment & role
list, err := ls.deps.KubeCli.CoreV1().PersistentVolumes().List(metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s,%s=%s", k8sutil.LabelKeyArangoDeployment, deploymentName, k8sutil.LabelKeyRole, role),
})
if err != nil {
return nil, maskAny(err)
}
excludedNodes := make(map[string]struct{})
for _, pv := range list.Items {
nodeName := pv.GetAnnotations()[nodeNameAnnotation]
excludedNodes[nodeName] = struct{}{}
}
result := make([]provisioner.API, 0, len(clients))
for nodeName, c := range clients {
if _, found := excludedNodes[nodeName]; !found {
result = append(result, c)
}
}
return result, nil
}

// bindClaimToVolume tries to bind the given claim to the volume with given name.
// If the claim has been updated, the function retries several times.
func (ls *LocalStorage) bindClaimToVolume(claim v1.PersistentVolumeClaim, volumeName string) error {
log := ls.deps.Log.With().Str("pvc-name", claim.GetName()).Str("volume-name", volumeName).Logger()
pvcs := ls.deps.KubeCli.CoreV1().PersistentVolumeClaims(claim.GetNamespace())

for attempt := 0; attempt < 10; attempt++ {
// Backoff if needed
time.Sleep(time.Millisecond * time.Duration(10*attempt))

// Fetch latest version of claim
updated, err := pvcs.Get(claim.GetName(), metav1.GetOptions{})
if k8sutil.IsNotFound(err) {
return maskAny(err)
} else if err != nil {
log.Warn().Err(err).Msg("Failed to load updated PersistentVolumeClaim")
continue
}

// Check claim. If already bound, bail out
if !pvcNeedsVolume(*updated) {
return maskAny(fmt.Errorf("PersistentVolumeClaim '%s' no longer needs a volume", claim.GetName()))
}

// Try to bind
updated.Spec.VolumeName = volumeName
if _, err := pvcs.Update(updated); k8sutil.IsConflict(err) {
// Claim modified already, retry
log.Debug().Err(err).Msg("PersistentVolumeClaim has been modified. Retrying.")
} else if err != nil {
log.Error().Err(err).Msg("Failed to bind PVC to volume")
return maskAny(err)
}
log.Debug().Msg("Bound volume to PersistentVolumeClaim")
return nil
}
log.Error().Msg("All attempts to bind PVC to volume failed")
return maskAny(fmt.Errorf("All attempts to bind PVC to volume failed"))
}

// shortHash creates a 6 letter hash of the given name.
func shortHash(name string) string {
h := sha1.Sum([]byte(name))
return fmt.Sprintf("%0x", h)[:6]
}
2 changes: 2 additions & 0 deletions pkg/util/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,6 @@ const (
FinalizerPodAgencyServing = "agent.database.arangodb.com/agency-serving" // Finalizer added to Agents, indicating the need for keeping enough agents alive
FinalizerPVCMemberExists = "pvc.database.arangodb.com/member-exists" // Finalizer added to PVCs, indicating the need to keep is as long as its member exists
FinalizerDeplReplStopSync = "replication.database.arangodb.com/stop-sync" // Finalizer added to ArangoDeploymentReplication, indicating the need to stop synchronization

AnnotationEnforceAntiAffinity = "database.arangodb.com/enforce-anti-affinity" // Key of annotation added to PVC. Value is a boolean "true" or "false"
)
9 changes: 8 additions & 1 deletion pkg/util/k8sutil/pvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@
package k8sutil

import (
"strconv"

"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

"github.com/arangodb/kube-arangodb/pkg/util/constants"
)

// IsPersistentVolumeClaimMarkedForDeletion returns true if the pod has been marked for deletion.
Expand All @@ -42,14 +46,17 @@ func CreatePersistentVolumeClaimName(deploymentName, role, id string) string {
// CreatePersistentVolumeClaim creates a persistent volume claim with given name and configuration.
// If the pvc already exists, nil is returned.
// If another error occurs, that error is returned.
func CreatePersistentVolumeClaim(kubecli kubernetes.Interface, pvcName, deploymentName, ns, storageClassName, role string, resources v1.ResourceRequirements, finalizers []string, owner metav1.OwnerReference) error {
func CreatePersistentVolumeClaim(kubecli kubernetes.Interface, pvcName, deploymentName, ns, storageClassName, role string, enforceAntiAffinity bool, resources v1.ResourceRequirements, finalizers []string, owner metav1.OwnerReference) error {
labels := LabelsForDeployment(deploymentName, role)
volumeMode := v1.PersistentVolumeFilesystem
pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: pvcName,
Labels: labels,
Finalizers: finalizers,
Annotations: map[string]string{
constants.AnnotationEnforceAntiAffinity: strconv.FormatBool(enforceAntiAffinity),
},
},
Spec: v1.PersistentVolumeClaimSpec{
AccessModes: []v1.PersistentVolumeAccessMode{
Expand Down