diff --git a/examples/production-cluster.yaml b/examples/production-cluster.yaml new file mode 100644 index 000000000..350963229 --- /dev/null +++ b/examples/production-cluster.yaml @@ -0,0 +1,8 @@ +apiVersion: "database.arangodb.com/v1alpha" +kind: "ArangoDeployment" +metadata: + name: "production-cluster" +spec: + mode: Cluster + image: arangodb/arangodb:3.3.10 + environment: Production diff --git a/pkg/apis/deployment/v1alpha/environment.go b/pkg/apis/deployment/v1alpha/environment.go index 458c829c7..8803842ce 100644 --- a/pkg/apis/deployment/v1alpha/environment.go +++ b/pkg/apis/deployment/v1alpha/environment.go @@ -47,6 +47,11 @@ func (e Environment) Validate() error { } } +// IsProduction returns true when the given environment is a production environment. +func (e Environment) IsProduction() bool { + return e == EnvironmentProduction +} + // NewEnvironment returns a reference to a string with given value. func NewEnvironment(input Environment) *Environment { return &input diff --git a/pkg/deployment/resources/pvcs.go b/pkg/deployment/resources/pvcs.go index 72ddf243e..da59f25fe 100644 --- a/pkg/deployment/resources/pvcs.go +++ b/pkg/deployment/resources/pvcs.go @@ -43,6 +43,7 @@ func (r *Resources) EnsurePVCs() error { owner := apiObject.AsOwner() iterator := r.context.GetServerGroupIterator() status := r.context.GetStatus() + enforceAntiAffinity := r.context.GetSpec().GetEnvironment().IsProduction() if err := iterator.ForeachServerGroup(func(group api.ServerGroup, spec api.ServerGroupSpec, status *api.MemberStatusList) error { for _, m := range *status { @@ -51,7 +52,7 @@ func (r *Resources) EnsurePVCs() error { role := group.AsRole() resources := spec.Resources finalizers := r.createPVCFinalizers(group) - if err := k8sutil.CreatePersistentVolumeClaim(kubecli, m.PersistentVolumeClaimName, deploymentName, ns, storageClassName, role, resources, finalizers, owner); err != nil { + if err := k8sutil.CreatePersistentVolumeClaim(kubecli, m.PersistentVolumeClaimName, deploymentName, ns, storageClassName, role, enforceAntiAffinity, resources, finalizers, owner); err != nil { return maskAny(err) } } diff --git a/pkg/storage/pv_creator.go b/pkg/storage/pv_creator.go index b79d26b6b..aff87f490 100644 --- a/pkg/storage/pv_creator.go +++ b/pkg/storage/pv_creator.go @@ -24,6 +24,7 @@ package storage import ( "context" + "crypto/sha1" "encoding/json" "fmt" "math/rand" @@ -32,6 +33,7 @@ import ( "sort" "strconv" "strings" + "time" "k8s.io/apimachinery/pkg/api/resource" @@ -41,6 +43,8 @@ import ( api "github.com/arangodb/kube-arangodb/pkg/apis/storage/v1alpha" "github.com/arangodb/kube-arangodb/pkg/storage/provisioner" + "github.com/arangodb/kube-arangodb/pkg/util/constants" + "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" ) const ( @@ -72,7 +76,24 @@ func (ls *LocalStorage) createPVs(ctx context.Context, apiObject *api.ArangoLoca clients[i], clients[j] = clients[j], clients[i] }) + var nodeClientMap map[string]provisioner.API for i, claim := range unboundClaims { + // Find deployment name & role in the claim (if any) + deplName, role, enforceAniAffinity := getDeploymentInfo(claim) + allowedClients := clients + if enforceAniAffinity && deplName != "" { + // Select nodes to choose from such that no volume in group lands on the same node + if nodeClientMap == nil { + nodeClientMap = createNodeClientMap(ctx, clients) + } + var err error + allowedClients, err = ls.filterAllowedNodes(nodeClientMap, deplName, role) + if err != nil { + log.Warn().Err(err).Msg("Failed to filter allowed nodes") + continue // We'll try this claim again later + } + } + // Find size of PVC volSize := defaultVolumeSize if reqStorage := claim.Spec.Resources.Requests.StorageEphemeral(); reqStorage != nil { @@ -81,7 +102,7 @@ func (ls *LocalStorage) createPVs(ctx context.Context, apiObject *api.ArangoLoca } } // Create PV - if err := ls.createPV(ctx, apiObject, clients, i, volSize); err != nil { + if err := ls.createPV(ctx, apiObject, allowedClients, i, volSize, claim, deplName, role); err != nil { log.Error().Err(err).Msg("Failed to create PersistentVolume") } } @@ -90,7 +111,7 @@ func (ls *LocalStorage) createPVs(ctx context.Context, apiObject *api.ArangoLoca } // createPV creates a PersistentVolume. -func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocalStorage, clients []provisioner.API, clientsOffset int, volSize int64) error { +func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocalStorage, clients []provisioner.API, clientsOffset int, volSize int64, claim v1.PersistentVolumeClaim, deploymentName, role string) error { log := ls.deps.Log // Try clients for clientIdx := 0; clientIdx < len(clients); clientIdx++ { @@ -117,7 +138,7 @@ func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocal continue } // Create a volume - pvName := apiObject.GetName() + "-" + name + pvName := strings.ToLower(apiObject.GetName() + "-" + shortHash(info.NodeName) + "-" + name) volumeMode := v1.PersistentVolumeFilesystem nodeAff, err := createNodeAffinity(info.NodeName) if err != nil { @@ -131,6 +152,10 @@ func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocal v1.AlphaStorageNodeAffinityAnnotation: nodeAff, nodeNameAnnotation: info.NodeName, }, + Labels: map[string]string{ + k8sutil.LabelKeyArangoDeployment: deploymentName, + k8sutil.LabelKeyRole: role, + }, }, Spec: v1.PersistentVolumeSpec{ Capacity: v1.ResourceList{ @@ -147,6 +172,13 @@ func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocal }, StorageClassName: apiObject.Spec.StorageClass.Name, VolumeMode: &volumeMode, + ClaimRef: &v1.ObjectReference{ + Kind: "PersistentVolumeClaim", + APIVersion: "", + Name: claim.GetName(), + Namespace: claim.GetNamespace(), + UID: claim.GetUID(), + }, }, } // Attach PV to ArangoLocalStorage @@ -159,6 +191,16 @@ func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocal Str("name", pvName). Str("node-name", info.NodeName). Msg("Created PersistentVolume") + + // Bind claim to volume + if err := ls.bindClaimToVolume(claim, pv.GetName()); err != nil { + // Try to delete the PV now + if err := ls.deps.KubeCli.CoreV1().PersistentVolumes().Delete(pv.GetName(), &metav1.DeleteOptions{}); err != nil { + log.Error().Err(err).Msg("Failed to delete PV after binding PVC failed") + } + return maskAny(err) + } + return nil } } @@ -204,3 +246,96 @@ func createNodeAffinity(nodeName string) (string, error) { } return string(encoded), nil } + +// createNodeClientMap creates a map from node name to API. +// Clients that do not respond properly on a GetNodeInfo request are +// ignored. +func createNodeClientMap(ctx context.Context, clients []provisioner.API) map[string]provisioner.API { + result := make(map[string]provisioner.API) + for _, c := range clients { + if info, err := c.GetNodeInfo(ctx); err == nil { + result[info.NodeName] = c + } + } + return result +} + +// getDeploymentInfo returns the name of the deployment that created the given claim, +// the role of the server that the claim is used for and the value for `enforceAntiAffinity`. +// If not found, empty strings are returned. +// Returns deploymentName, role, enforceAntiAffinity. +func getDeploymentInfo(pvc v1.PersistentVolumeClaim) (string, string, bool) { + deploymentName := pvc.GetLabels()[k8sutil.LabelKeyArangoDeployment] + role := pvc.GetLabels()[k8sutil.LabelKeyRole] + enforceAntiAffinity, _ := strconv.ParseBool(pvc.GetAnnotations()[constants.AnnotationEnforceAntiAffinity]) // If annotation empty, this will yield false. + return deploymentName, role, enforceAntiAffinity +} + +// filterAllowedNodes returns those clients that do not yet have a volume for the given deployment name & role. +func (ls *LocalStorage) filterAllowedNodes(clients map[string]provisioner.API, deploymentName, role string) ([]provisioner.API, error) { + // Find all PVs for given deployment & role + list, err := ls.deps.KubeCli.CoreV1().PersistentVolumes().List(metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s,%s=%s", k8sutil.LabelKeyArangoDeployment, deploymentName, k8sutil.LabelKeyRole, role), + }) + if err != nil { + return nil, maskAny(err) + } + excludedNodes := make(map[string]struct{}) + for _, pv := range list.Items { + nodeName := pv.GetAnnotations()[nodeNameAnnotation] + excludedNodes[nodeName] = struct{}{} + } + result := make([]provisioner.API, 0, len(clients)) + for nodeName, c := range clients { + if _, found := excludedNodes[nodeName]; !found { + result = append(result, c) + } + } + return result, nil +} + +// bindClaimToVolume tries to bind the given claim to the volume with given name. +// If the claim has been updated, the function retries several times. +func (ls *LocalStorage) bindClaimToVolume(claim v1.PersistentVolumeClaim, volumeName string) error { + log := ls.deps.Log.With().Str("pvc-name", claim.GetName()).Str("volume-name", volumeName).Logger() + pvcs := ls.deps.KubeCli.CoreV1().PersistentVolumeClaims(claim.GetNamespace()) + + for attempt := 0; attempt < 10; attempt++ { + // Backoff if needed + time.Sleep(time.Millisecond * time.Duration(10*attempt)) + + // Fetch latest version of claim + updated, err := pvcs.Get(claim.GetName(), metav1.GetOptions{}) + if k8sutil.IsNotFound(err) { + return maskAny(err) + } else if err != nil { + log.Warn().Err(err).Msg("Failed to load updated PersistentVolumeClaim") + continue + } + + // Check claim. If already bound, bail out + if !pvcNeedsVolume(*updated) { + return maskAny(fmt.Errorf("PersistentVolumeClaim '%s' no longer needs a volume", claim.GetName())) + } + + // Try to bind + updated.Spec.VolumeName = volumeName + if _, err := pvcs.Update(updated); k8sutil.IsConflict(err) { + // Claim modified already, retry + log.Debug().Err(err).Msg("PersistentVolumeClaim has been modified. Retrying.") + } else if err != nil { + log.Error().Err(err).Msg("Failed to bind PVC to volume") + return maskAny(err) + } + log.Debug().Msg("Bound volume to PersistentVolumeClaim") + return nil + } + log.Error().Msg("All attempts to bind PVC to volume failed") + return maskAny(fmt.Errorf("All attempts to bind PVC to volume failed")) +} + +// shortHash creates a 6 letter hash of the given name. +func shortHash(name string) string { + h := sha1.Sum([]byte(name)) + return fmt.Sprintf("%0x", h)[:6] +} diff --git a/pkg/util/constants/constants.go b/pkg/util/constants/constants.go index 8c43f6d79..ed7dc3c64 100644 --- a/pkg/util/constants/constants.go +++ b/pkg/util/constants/constants.go @@ -48,4 +48,6 @@ const ( FinalizerPodAgencyServing = "agent.database.arangodb.com/agency-serving" // Finalizer added to Agents, indicating the need for keeping enough agents alive FinalizerPVCMemberExists = "pvc.database.arangodb.com/member-exists" // Finalizer added to PVCs, indicating the need to keep is as long as its member exists FinalizerDeplReplStopSync = "replication.database.arangodb.com/stop-sync" // Finalizer added to ArangoDeploymentReplication, indicating the need to stop synchronization + + AnnotationEnforceAntiAffinity = "database.arangodb.com/enforce-anti-affinity" // Key of annotation added to PVC. Value is a boolean "true" or "false" ) diff --git a/pkg/util/k8sutil/pvc.go b/pkg/util/k8sutil/pvc.go index d366ee5ae..2bfc36d5a 100644 --- a/pkg/util/k8sutil/pvc.go +++ b/pkg/util/k8sutil/pvc.go @@ -23,9 +23,13 @@ package k8sutil import ( + "strconv" + "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + + "github.com/arangodb/kube-arangodb/pkg/util/constants" ) // IsPersistentVolumeClaimMarkedForDeletion returns true if the pod has been marked for deletion. @@ -42,7 +46,7 @@ func CreatePersistentVolumeClaimName(deploymentName, role, id string) string { // CreatePersistentVolumeClaim creates a persistent volume claim with given name and configuration. // If the pvc already exists, nil is returned. // If another error occurs, that error is returned. -func CreatePersistentVolumeClaim(kubecli kubernetes.Interface, pvcName, deploymentName, ns, storageClassName, role string, resources v1.ResourceRequirements, finalizers []string, owner metav1.OwnerReference) error { +func CreatePersistentVolumeClaim(kubecli kubernetes.Interface, pvcName, deploymentName, ns, storageClassName, role string, enforceAntiAffinity bool, resources v1.ResourceRequirements, finalizers []string, owner metav1.OwnerReference) error { labels := LabelsForDeployment(deploymentName, role) volumeMode := v1.PersistentVolumeFilesystem pvc := &v1.PersistentVolumeClaim{ @@ -50,6 +54,9 @@ func CreatePersistentVolumeClaim(kubecli kubernetes.Interface, pvcName, deployme Name: pvcName, Labels: labels, Finalizers: finalizers, + Annotations: map[string]string{ + constants.AnnotationEnforceAntiAffinity: strconv.FormatBool(enforceAntiAffinity), + }, }, Spec: v1.PersistentVolumeClaimSpec{ AccessModes: []v1.PersistentVolumeAccessMode{