diff --git a/pkg/apis/deployment/v1alpha/member_phase.go b/pkg/apis/deployment/v1alpha/member_phase.go new file mode 100644 index 000000000..a047d395c --- /dev/null +++ b/pkg/apis/deployment/v1alpha/member_phase.go @@ -0,0 +1,48 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +package v1alpha + +// MemberPhase is a strongly typed lifetime phase of a deployment member +type MemberPhase string + +const ( + // MemberPhaseNone indicates that the state is not set yet + MemberPhaseNone MemberPhase = "" + // MemberPhaseCreated indicates that all resources needed for the member have been created + MemberPhaseCreated MemberPhase = "Created" + // MemberPhaseFailed indicates that the member is gone beyond hope of recovery. It must be replaced with a new member. + MemberPhaseFailed MemberPhase = "Failed" + // MemberPhaseCleanOut indicates that a dbserver is in the process of being cleaned out + MemberPhaseCleanOut MemberPhase = "CleanOut" + // MemberPhaseShuttingDown indicates that a member is shutting down + MemberPhaseShuttingDown MemberPhase = "ShuttingDown" + // MemberPhaseRotating indicates that a member is being rotated + MemberPhaseRotating MemberPhase = "Rotating" + // MemberPhaseUpgrading indicates that a member is in the process of upgrading its database data format + MemberPhaseUpgrading MemberPhase = "Upgrading" +) + +// IsFailed returns true when given phase == "Failed" +func (p MemberPhase) IsFailed() bool { + return p == MemberPhaseFailed +} diff --git a/pkg/apis/deployment/v1alpha/member_state.go b/pkg/apis/deployment/v1alpha/member_state.go deleted file mode 100644 index 4fe637268..000000000 --- a/pkg/apis/deployment/v1alpha/member_state.go +++ /dev/null @@ -1,41 +0,0 @@ -// -// DISCLAIMER -// -// Copyright 2018 ArangoDB GmbH, Cologne, Germany -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// Copyright holder is ArangoDB GmbH, Cologne, Germany -// -// Author Ewout Prangsma -// - -package v1alpha - -// MemberState is a strongly typed state of a deployment member -type MemberState string - -const ( - // MemberStateNone indicates that the state is not set yet - MemberStateNone MemberState = "" - // MemberStateCreated indicates that all resources needed for the member have been created - MemberStateCreated MemberState = "Created" - // MemberStateCleanOut indicates that a dbserver is in the process of being cleaned out - MemberStateCleanOut MemberState = "CleanOut" - // MemberStateShuttingDown indicates that a member is shutting down - MemberStateShuttingDown MemberState = "ShuttingDown" - // MemberStateRotating indicates that a member is being rotated - MemberStateRotating MemberState = "Rotating" - // MemberStateUpgrading indicates that a member is in the process of upgrading its database data format - MemberStateUpgrading MemberState = "Upgrading" -) diff --git a/pkg/apis/deployment/v1alpha/member_status.go b/pkg/apis/deployment/v1alpha/member_status.go index 20831c25b..984600886 100644 --- a/pkg/apis/deployment/v1alpha/member_status.go +++ b/pkg/apis/deployment/v1alpha/member_status.go @@ -25,6 +25,7 @@ package v1alpha import ( "time" + "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -33,8 +34,10 @@ type MemberStatus struct { // ID holds the unique ID of the member. // This id is also used within the ArangoDB cluster to identify this server. ID string `json:"id"` - // State holds the current state of this member - State MemberState `json:"state"` + // Phase holds the current lifetime phase of this member + Phase MemberPhase `json:"phase"` + // CreatedAt holds the creation timestamp of this member. + CreatedAt metav1.Time `json:"created-at"` // PersistentVolumeClaimName holds the name of the persistent volume claim used for this member (if any). PersistentVolumeClaimName string `json:"persistentVolumeClaimName,omitempty"` // PodName holds the name of the Pod that currently runs this member @@ -78,3 +81,17 @@ func (s MemberStatus) RecentTerminationsSince(timestamp time.Time) int { } return count } + +// IsNotReadySince returns true when the given member has not been ready since the given timestamp. +// That means it: +// - A) Was created before timestamp and never reached a ready state or +// - B) The Ready condition is set to false, and last transision is before timestamp +func (s MemberStatus) IsNotReadySince(timestamp time.Time) bool { + cond, found := s.Conditions.Get(ConditionTypeReady) + if found { + // B + return cond.Status != v1.ConditionTrue && cond.LastTransitionTime.Time.Before(timestamp) + } + // A + return s.CreatedAt.Time.Before(timestamp) +} diff --git a/pkg/apis/deployment/v1alpha/member_status_list.go b/pkg/apis/deployment/v1alpha/member_status_list.go index 7c59e1c67..fff418e25 100644 --- a/pkg/apis/deployment/v1alpha/member_status_list.go +++ b/pkg/apis/deployment/v1alpha/member_status_list.go @@ -108,7 +108,7 @@ func (l MemberStatusList) SelectMemberToRemove() (MemberStatus, error) { if len(l) > 0 { // Try to find a not ready member for _, m := range l { - if m.State == MemberStateNone { + if m.Phase == MemberPhaseNone { return m, nil } } @@ -116,7 +116,7 @@ func (l MemberStatusList) SelectMemberToRemove() (MemberStatus, error) { perm := rand.Perm(len(l)) for _, idx := range perm { m := l[idx] - if m.State == MemberStateCreated { + if m.Phase == MemberPhaseCreated { return m, nil } } diff --git a/pkg/apis/deployment/v1alpha/member_status_test.go b/pkg/apis/deployment/v1alpha/member_status_test.go index 58594ed28..4f1a6517c 100644 --- a/pkg/apis/deployment/v1alpha/member_status_test.go +++ b/pkg/apis/deployment/v1alpha/member_status_test.go @@ -51,3 +51,24 @@ func TestMemberStatusRecentTerminations(t *testing.T) { assert.Equal(t, 2, s.RemoveTerminationsBefore(time.Now())) assert.Len(t, s.RecentTerminations, 1) } + +// TestMemberStatusIsNotReadySince tests the functions related to MemberStatus.IsNotReadySince. +func TestMemberStatusIsNotReadySince(t *testing.T) { + s := MemberStatus{ + CreatedAt: metav1.Now(), + } + assert.False(t, s.IsNotReadySince(time.Now().Add(-time.Hour))) + + s.CreatedAt.Time = time.Now().Add(-time.Hour) + assert.False(t, s.IsNotReadySince(time.Now().Add(-2*time.Hour))) + assert.True(t, s.IsNotReadySince(time.Now().Add(-(time.Hour - time.Minute)))) + + s.CreatedAt = metav1.Now() + s.Conditions.Update(ConditionTypeReady, true, "", "") + assert.False(t, s.IsNotReadySince(time.Now().Add(-time.Minute))) + assert.False(t, s.IsNotReadySince(time.Now().Add(time.Minute))) + + s.Conditions.Update(ConditionTypeReady, false, "", "") + assert.False(t, s.IsNotReadySince(time.Now().Add(-time.Minute))) + assert.True(t, s.IsNotReadySince(time.Now().Add(time.Minute))) +} diff --git a/pkg/apis/deployment/v1alpha/zz_generated.deepcopy.go b/pkg/apis/deployment/v1alpha/zz_generated.deepcopy.go index 61ff324f7..265fa0b42 100644 --- a/pkg/apis/deployment/v1alpha/zz_generated.deepcopy.go +++ b/pkg/apis/deployment/v1alpha/zz_generated.deepcopy.go @@ -364,6 +364,7 @@ func (in *ImageInfo) DeepCopy() *ImageInfo { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *MemberStatus) DeepCopyInto(out *MemberStatus) { *out = *in + in.CreatedAt.DeepCopyInto(&out.CreatedAt) if in.Conditions != nil { in, out := &in.Conditions, &out.Conditions *out = make(ConditionList, len(*in)) diff --git a/pkg/deployment/context_impl.go b/pkg/deployment/context_impl.go index 19f780b4d..a417570db 100644 --- a/pkg/deployment/context_impl.go +++ b/pkg/deployment/context_impl.go @@ -97,10 +97,14 @@ func (d *Deployment) GetServerClient(ctx context.Context, group api.ServerGroup, } // GetAgencyClients returns a client connection for every agency member. -func (d *Deployment) GetAgencyClients(ctx context.Context) ([]arangod.Agency, error) { +// If the given predicate is not nil, only agents are included where the given predicate returns true. +func (d *Deployment) GetAgencyClients(ctx context.Context, predicate func(id string) bool) ([]arangod.Agency, error) { agencyMembers := d.status.Members.Agents result := make([]arangod.Agency, 0, len(agencyMembers)) for _, m := range agencyMembers { + if predicate != nil && !predicate(m.ID) { + continue + } client, err := d.GetServerClient(ctx, api.ServerGroupAgents, m.ID) if err != nil { return nil, maskAny(err) @@ -115,9 +119,11 @@ func (d *Deployment) GetAgencyClients(ctx context.Context) ([]arangod.Agency, er } // CreateMember adds a new member to the given group. -func (d *Deployment) CreateMember(group api.ServerGroup) error { +// If ID is non-empty, it will be used, otherwise a new ID is created. +func (d *Deployment) CreateMember(group api.ServerGroup, id string) error { log := d.deps.Log - if err := d.createMember(group, d.apiObject); err != nil { + id, err := d.createMember(group, id, d.apiObject) + if err != nil { log.Debug().Err(err).Str("group", group.AsRole()).Msg("Failed to create member") return maskAny(err) } @@ -126,6 +132,9 @@ func (d *Deployment) CreateMember(group api.ServerGroup) error { log.Debug().Err(err).Msg("Updating CR status failed") return maskAny(err) } + // Create event about it + d.CreateEvent(k8sutil.NewMemberAddEvent(id, group.AsRole(), d.apiObject)) + return nil } diff --git a/pkg/deployment/deployment.go b/pkg/deployment/deployment.go index f4104e305..0d05bd6cc 100644 --- a/pkg/deployment/deployment.go +++ b/pkg/deployment/deployment.go @@ -36,6 +36,7 @@ import ( api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha" "github.com/arangodb/kube-arangodb/pkg/deployment/reconcile" + "github.com/arangodb/kube-arangodb/pkg/deployment/resilience" "github.com/arangodb/kube-arangodb/pkg/deployment/resources" "github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" @@ -92,6 +93,7 @@ type Deployment struct { recentInspectionErrors int clusterScalingIntegration *clusterScalingIntegration reconciler *reconcile.Reconciler + resilience *resilience.Resilience resources *resources.Resources } @@ -111,6 +113,7 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De clientCache: newClientCache(deps.KubeCli, apiObject), } d.reconciler = reconcile.NewReconciler(deps.Log, d) + d.resilience = resilience.NewResilience(deps.Log, d) d.resources = resources.NewResources(deps.Log, d) if d.status.AcceptedSpec == nil { // We've validated the spec, so let's use it from now. diff --git a/pkg/deployment/deployment_inspector.go b/pkg/deployment/deployment_inspector.go index 87bb9d3d0..9ce1c848e 100644 --- a/pkg/deployment/deployment_inspector.go +++ b/pkg/deployment/deployment_inspector.go @@ -76,6 +76,12 @@ func (d *Deployment) inspectDeployment(lastInterval time.Duration) time.Duration d.CreateEvent(k8sutil.NewErrorEvent("Pod inspection failed", err, d.apiObject)) } + // Check members for resilience + if err := d.resilience.CheckMemberFailure(); err != nil { + hasError = true + d.CreateEvent(k8sutil.NewErrorEvent("Member failure detection failed", err, d.apiObject)) + } + // Create scale/update plan if err := d.reconciler.CreatePlan(); err != nil { hasError = true diff --git a/pkg/deployment/members.go b/pkg/deployment/members.go index 114af5330..27435e052 100644 --- a/pkg/deployment/members.go +++ b/pkg/deployment/members.go @@ -26,9 +26,11 @@ import ( "fmt" "strings" - api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha" "github.com/dchest/uniuri" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" ) @@ -39,11 +41,14 @@ func (d *Deployment) createInitialMembers(apiObject *api.ArangoDeployment) error log.Debug().Msg("creating initial members...") // Go over all groups and create members + var events []*v1.Event if err := apiObject.ForeachServerGroup(func(group api.ServerGroup, spec api.ServerGroupSpec, status *api.MemberStatusList) error { for len(*status) < spec.GetCount() { - if err := d.createMember(group, apiObject); err != nil { + id, err := d.createMember(group, "", apiObject) + if err != nil { return maskAny(err) } + events = append(events, k8sutil.NewMemberAddEvent(id, group.AsRole(), apiObject)) } return nil }, &d.status); err != nil { @@ -55,6 +60,10 @@ func (d *Deployment) createInitialMembers(apiObject *api.ArangoDeployment) error if err := d.updateCRStatus(); err != nil { return maskAny(err) } + // Save events + for _, evt := range events { + d.CreateEvent(evt) + } return nil } @@ -62,16 +71,17 @@ func (d *Deployment) createInitialMembers(apiObject *api.ArangoDeployment) error // createMember creates member and adds it to the applicable member list. // Note: This does not create any pods of PVCs // Note: The updated status is not yet written to the apiserver. -func (d *Deployment) createMember(group api.ServerGroup, apiObject *api.ArangoDeployment) error { +func (d *Deployment) createMember(group api.ServerGroup, id string, apiObject *api.ArangoDeployment) (string, error) { log := d.deps.Log - var id string - idPrefix := getArangodIDPrefix(group) - for { - id = idPrefix + strings.ToLower(uniuri.NewLen(8)) // K8s accepts only lowercase, so we use it here as well - if !d.status.Members.ContainsID(id) { - break + if id == "" { + idPrefix := getArangodIDPrefix(group) + for { + id = idPrefix + strings.ToLower(uniuri.NewLen(8)) // K8s accepts only lowercase, so we use it here as well + if !d.status.Members.ContainsID(id) { + break + } + // Duplicate, try again } - // Duplicate, try again } deploymentName := apiObject.GetName() role := group.AsRole() @@ -80,68 +90,74 @@ func (d *Deployment) createMember(group api.ServerGroup, apiObject *api.ArangoDe case api.ServerGroupSingle: log.Debug().Str("id", id).Msg("Adding single server") if err := d.status.Members.Single.Add(api.MemberStatus{ - ID: id, - State: api.MemberStateNone, + ID: id, + CreatedAt: metav1.Now(), + Phase: api.MemberPhaseNone, PersistentVolumeClaimName: k8sutil.CreatePersistentVolumeClaimName(deploymentName, role, id), PodName: "", }); err != nil { - return maskAny(err) + return "", maskAny(err) } case api.ServerGroupAgents: log.Debug().Str("id", id).Msg("Adding agent") if err := d.status.Members.Agents.Add(api.MemberStatus{ - ID: id, - State: api.MemberStateNone, + ID: id, + CreatedAt: metav1.Now(), + Phase: api.MemberPhaseNone, PersistentVolumeClaimName: k8sutil.CreatePersistentVolumeClaimName(deploymentName, role, id), PodName: "", }); err != nil { - return maskAny(err) + return "", maskAny(err) } case api.ServerGroupDBServers: log.Debug().Str("id", id).Msg("Adding dbserver") if err := d.status.Members.DBServers.Add(api.MemberStatus{ - ID: id, - State: api.MemberStateNone, + ID: id, + CreatedAt: metav1.Now(), + Phase: api.MemberPhaseNone, PersistentVolumeClaimName: k8sutil.CreatePersistentVolumeClaimName(deploymentName, role, id), PodName: "", }); err != nil { - return maskAny(err) + return "", maskAny(err) } case api.ServerGroupCoordinators: log.Debug().Str("id", id).Msg("Adding coordinator") if err := d.status.Members.Coordinators.Add(api.MemberStatus{ - ID: id, - State: api.MemberStateNone, + ID: id, + CreatedAt: metav1.Now(), + Phase: api.MemberPhaseNone, PersistentVolumeClaimName: "", PodName: "", }); err != nil { - return maskAny(err) + return "", maskAny(err) } case api.ServerGroupSyncMasters: log.Debug().Str("id", id).Msg("Adding syncmaster") if err := d.status.Members.SyncMasters.Add(api.MemberStatus{ - ID: id, - State: api.MemberStateNone, + ID: id, + CreatedAt: metav1.Now(), + Phase: api.MemberPhaseNone, PersistentVolumeClaimName: "", PodName: "", }); err != nil { - return maskAny(err) + return "", maskAny(err) } case api.ServerGroupSyncWorkers: log.Debug().Str("id", id).Msg("Adding syncworker") if err := d.status.Members.SyncWorkers.Add(api.MemberStatus{ - ID: id, - State: api.MemberStateNone, + ID: id, + CreatedAt: metav1.Now(), + Phase: api.MemberPhaseNone, PersistentVolumeClaimName: "", PodName: "", }); err != nil { - return maskAny(err) + return "", maskAny(err) } default: - return maskAny(fmt.Errorf("Unknown server group %d", group)) + return "", maskAny(fmt.Errorf("Unknown server group %d", group)) } - return nil + return id, nil } // getArangodIDPrefix returns the prefix required ID's of arangod servers diff --git a/pkg/deployment/reconcile/action_add_member.go b/pkg/deployment/reconcile/action_add_member.go index d4c18aa39..cf96ca726 100644 --- a/pkg/deployment/reconcile/action_add_member.go +++ b/pkg/deployment/reconcile/action_add_member.go @@ -51,7 +51,7 @@ type actionAddMember struct { // Returns true if the action is completely finished, false in case // the start time needs to be recorded and a ready condition needs to be checked. func (a *actionAddMember) Start(ctx context.Context) (bool, error) { - if err := a.actionCtx.CreateMember(a.action.Group); err != nil { + if err := a.actionCtx.CreateMember(a.action.Group, a.action.MemberID); err != nil { log.Debug().Err(err).Msg("Failed to create member") return false, maskAny(err) } diff --git a/pkg/deployment/reconcile/action_cleanout_member.go b/pkg/deployment/reconcile/action_cleanout_member.go index 32653e736..6292a5c52 100644 --- a/pkg/deployment/reconcile/action_cleanout_member.go +++ b/pkg/deployment/reconcile/action_cleanout_member.go @@ -71,7 +71,7 @@ func (a *actionCleanoutMember) Start(ctx context.Context) (bool, error) { return false, maskAny(err) } // Update status - m.State = api.MemberStateCleanOut + m.Phase = api.MemberPhaseCleanOut if a.actionCtx.UpdateMember(m); err != nil { return false, maskAny(err) } diff --git a/pkg/deployment/reconcile/action_context.go b/pkg/deployment/reconcile/action_context.go index 01e35d1ae..f1ae06883 100644 --- a/pkg/deployment/reconcile/action_context.go +++ b/pkg/deployment/reconcile/action_context.go @@ -52,7 +52,8 @@ type ActionContext interface { // when no such member is found. GetMemberStatusByID(id string) (api.MemberStatus, bool) // CreateMember adds a new member to the given group. - CreateMember(group api.ServerGroup) error + // If ID is non-empty, it will be used, otherwise a new ID is created. + CreateMember(group api.ServerGroup, id string) error // UpdateMember updates the deployment status wrt the given member. UpdateMember(member api.MemberStatus) error // RemoveMemberByID removes a member with given id. @@ -105,7 +106,7 @@ func (ac *actionContext) GetServerClient(ctx context.Context, group api.ServerGr // GetAgencyClients returns a client connection for every agency member. func (ac *actionContext) GetAgencyClients(ctx context.Context) ([]arangod.Agency, error) { - c, err := ac.context.GetAgencyClients(ctx) + c, err := ac.context.GetAgencyClients(ctx, nil) if err != nil { return nil, maskAny(err) } @@ -122,8 +123,9 @@ func (ac *actionContext) GetMemberStatusByID(id string) (api.MemberStatus, bool) } // CreateMember adds a new member to the given group. -func (ac *actionContext) CreateMember(group api.ServerGroup) error { - if err := ac.context.CreateMember(group); err != nil { +// If ID is non-empty, it will be used, otherwise a new ID is created. +func (ac *actionContext) CreateMember(group api.ServerGroup, id string) error { + if err := ac.context.CreateMember(group, id); err != nil { return maskAny(err) } return nil diff --git a/pkg/deployment/reconcile/action_remove_member.go b/pkg/deployment/reconcile/action_remove_member.go index 81764a658..feb5ac8c6 100644 --- a/pkg/deployment/reconcile/action_remove_member.go +++ b/pkg/deployment/reconcile/action_remove_member.go @@ -25,8 +25,12 @@ package reconcile import ( "context" - api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha" + "github.com/pkg/errors" "github.com/rs/zerolog" + + driver "github.com/arangodb/go-driver" + api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha" + "github.com/arangodb/kube-arangodb/pkg/util/arangod" ) // NewRemoveMemberAction creates a new Action that implements the given @@ -55,6 +59,18 @@ func (a *actionRemoveMember) Start(ctx context.Context) (bool, error) { // We wanted to remove and it is already gone. All ok return true, nil } + // For safety, remove from cluster + if a.action.Group == api.ServerGroupCoordinators || a.action.Group == api.ServerGroupDBServers { + client, err := a.actionCtx.GetDatabaseClient(ctx) + if err != nil { + return false, maskAny(err) + } + if err := arangod.RemoveServerFromCluster(ctx, client.Connection(), driver.ServerID(m.ID)); err != nil { + if !driver.IsNotFound(err) && !driver.IsPreconditionFailed(err) { + return false, maskAny(errors.Wrapf(err, "Failed to remove server from cluster: %#v", err)) + } + } + } // Remove the pod (if any) if err := a.actionCtx.DeletePod(m.PodName); err != nil { return false, maskAny(err) diff --git a/pkg/deployment/reconcile/action_rotate_member.go b/pkg/deployment/reconcile/action_rotate_member.go index fb3e07bec..84af5c8bf 100644 --- a/pkg/deployment/reconcile/action_rotate_member.go +++ b/pkg/deployment/reconcile/action_rotate_member.go @@ -83,7 +83,7 @@ func (a *actionRotateMember) Start(ctx context.Context) (bool, error) { } } // Update status - m.State = api.MemberStateRotating + m.Phase = api.MemberPhaseRotating if err := a.actionCtx.UpdateMember(m); err != nil { return false, maskAny(err) } @@ -109,7 +109,7 @@ func (a *actionRotateMember) CheckProgress(ctx context.Context) (bool, error) { return false, maskAny(err) } // Pod is now gone, update the member status - m.State = api.MemberStateNone + m.Phase = api.MemberPhaseNone m.RecentTerminations = nil // Since we're rotating, we do not care about old terminations. if err := a.actionCtx.UpdateMember(m); err != nil { return false, maskAny(err) diff --git a/pkg/deployment/reconcile/action_shutdown_member.go b/pkg/deployment/reconcile/action_shutdown_member.go index 5e62258a5..bb0ec47ca 100644 --- a/pkg/deployment/reconcile/action_shutdown_member.go +++ b/pkg/deployment/reconcile/action_shutdown_member.go @@ -89,7 +89,7 @@ func (a *actionShutdownMember) Start(ctx context.Context) (bool, error) { } } // Update status - m.State = api.MemberStateShuttingDown + m.Phase = api.MemberPhaseShuttingDown if err := a.actionCtx.UpdateMember(m); err != nil { return false, maskAny(err) } diff --git a/pkg/deployment/reconcile/action_upgrade_member.go b/pkg/deployment/reconcile/action_upgrade_member.go index f9fe9657b..a9ec6564d 100644 --- a/pkg/deployment/reconcile/action_upgrade_member.go +++ b/pkg/deployment/reconcile/action_upgrade_member.go @@ -88,7 +88,7 @@ func (a *actionUpgradeMember) Start(ctx context.Context) (bool, error) { } } // Update status - m.State = api.MemberStateRotating // We keep the rotation state here, since only when a new pod is created, it will get the Upgrading state. + m.Phase = api.MemberPhaseRotating // We keep the rotation phase here, since only when a new pod is created, it will get the Upgrading phase. if err := a.actionCtx.UpdateMember(m); err != nil { return false, maskAny(err) } @@ -105,7 +105,7 @@ func (a *actionUpgradeMember) CheckProgress(ctx context.Context) (bool, error) { log.Error().Msg("No such member") return true, nil } - isUpgrading := m.State == api.MemberStateUpgrading + isUpgrading := m.Phase == api.MemberPhaseUpgrading log = log.With(). Str("pod-name", m.PodName). Bool("is-upgrading", isUpgrading).Logger() @@ -119,7 +119,7 @@ func (a *actionUpgradeMember) CheckProgress(ctx context.Context) (bool, error) { return false, maskAny(err) } // Pod is now gone, update the member status - m.State = api.MemberStateNone + m.Phase = api.MemberPhaseNone m.RecentTerminations = nil // Since we're upgrading, we do not care about old terminations. if err := a.actionCtx.UpdateMember(m); err != nil { return false, maskAny(err) diff --git a/pkg/deployment/reconcile/action_wait_for_member_up.go b/pkg/deployment/reconcile/action_wait_for_member_up.go index 00454d22c..922fd4593 100644 --- a/pkg/deployment/reconcile/action_wait_for_member_up.go +++ b/pkg/deployment/reconcile/action_wait_for_member_up.go @@ -24,8 +24,6 @@ package reconcile import ( "context" - "sync" - "time" driver "github.com/arangodb/go-driver" "github.com/rs/zerolog" @@ -44,10 +42,6 @@ func NewWaitForMemberUpAction(log zerolog.Logger, action api.Action, actionCtx A } } -const ( - maxAgentResponseTime = time.Second * 10 -) - // actionWaitForMemberUp implements an WaitForMemberUp. type actionWaitForMemberUp struct { log zerolog.Logger @@ -99,12 +93,6 @@ func (a *actionWaitForMemberUp) checkProgressSingle(ctx context.Context) (bool, return true, nil } -type agentStatus struct { - IsLeader bool - LeaderEndpoint string - IsResponding bool -} - // checkProgressAgent checks the progress of the action in the case // of an agent. func (a *actionWaitForMemberUp) checkProgressAgent(ctx context.Context) (bool, error) { @@ -115,65 +103,12 @@ func (a *actionWaitForMemberUp) checkProgressAgent(ctx context.Context) (bool, e return false, maskAny(err) } - wg := sync.WaitGroup{} - invalidKey := []string{"does-not-exists-149e97e8-4b81-5664-a8a8-9ba93881d64c"} - statuses := make([]agentStatus, len(clients)) - for i, c := range clients { - wg.Add(1) - go func(i int, c arangod.Agency) { - defer wg.Done() - var trash interface{} - lctx, cancel := context.WithTimeout(ctx, maxAgentResponseTime) - defer cancel() - if err := c.ReadKey(lctx, invalidKey, &trash); err == nil || arangod.IsKeyNotFound(err) { - // We got a valid read from the leader - statuses[i].IsLeader = true - statuses[i].LeaderEndpoint = c.Endpoint() - statuses[i].IsResponding = true - } else { - if location, ok := arangod.IsNotLeader(err); ok { - // Valid response from a follower - statuses[i].IsLeader = false - statuses[i].LeaderEndpoint = location - statuses[i].IsResponding = true - } else { - // Unexpected / invalid response - log.Debug().Err(err).Str("endpoint", c.Endpoint()).Msg("Agent is not responding") - statuses[i].IsResponding = false - } - } - }(i, c) - } - wg.Wait() - - // Check the results - noLeaders := 0 - for i, status := range statuses { - if !status.IsResponding { - log.Debug().Msg("Not all agents are responding") - return false, nil - } - if status.IsLeader { - noLeaders++ - } - if i > 0 { - // Compare leader endpoint with previous - prev := statuses[i-1].LeaderEndpoint - if !arangod.IsSameEndpoint(prev, status.LeaderEndpoint) { - log.Debug().Msg("Not all agents report the same leader endpoint") - return false, nil - } - } - } - if noLeaders != 1 { - log.Debug().Int("leaders", noLeaders).Msg("Unexpected number of agency leaders") + if err := arangod.AreAgentsHealthy(ctx, clients); err != nil { + log.Debug().Err(err).Msg("Not all agents are ready") return false, nil } - log.Debug(). - Int("leaders", noLeaders). - Int("followers", len(statuses)-noLeaders). - Msg("Agency is happy") + log.Debug().Msg("Agency is happy") return true, nil } diff --git a/pkg/deployment/reconcile/context.go b/pkg/deployment/reconcile/context.go index 8bba162c5..3d95784c1 100644 --- a/pkg/deployment/reconcile/context.go +++ b/pkg/deployment/reconcile/context.go @@ -50,9 +50,11 @@ type Context interface { // GetServerClient returns a cached client for a specific server. GetServerClient(ctx context.Context, group api.ServerGroup, id string) (driver.Client, error) // GetAgencyClients returns a client connection for every agency member. - GetAgencyClients(ctx context.Context) ([]arangod.Agency, error) + // If the given predicate is not nil, only agents are included where the given predicate returns true. + GetAgencyClients(ctx context.Context, predicate func(id string) bool) ([]arangod.Agency, error) // CreateMember adds a new member to the given group. - CreateMember(group api.ServerGroup) error + // If ID is non-empty, it will be used, otherwise a new ID is created. + CreateMember(group api.ServerGroup, id string) error // DeletePod deletes a pod with given name in the namespace // of the deployment. If the pod does not exist, the error is ignored. DeletePod(podName string) error diff --git a/pkg/deployment/reconcile/plan_builder.go b/pkg/deployment/reconcile/plan_builder.go index dcb270900..098b275bd 100644 --- a/pkg/deployment/reconcile/plan_builder.go +++ b/pkg/deployment/reconcile/plan_builder.go @@ -85,57 +85,78 @@ func createPlan(log zerolog.Logger, apiObject metav1.Object, // Check for various scenario's var plan api.Plan + // Check for members in failed state + status.Members.ForeachServerGroup(func(group api.ServerGroup, members *api.MemberStatusList) error { + for _, m := range *members { + if m.Phase == api.MemberPhaseFailed && len(plan) == 0 { + newID := "" + if group == api.ServerGroupAgents { + newID = m.ID // Agents cannot (yet) be replaced with new IDs + } + plan = append(plan, + api.NewAction(api.ActionTypeRemoveMember, group, m.ID), + api.NewAction(api.ActionTypeAddMember, group, newID), + ) + } + } + return nil + }) + // Check for scale up/down - switch spec.GetMode() { - case api.DeploymentModeSingle: - // Never scale down - case api.DeploymentModeResilientSingle: - // Only scale singles - plan = append(plan, createScalePlan(log, status.Members.Single, api.ServerGroupSingle, spec.Single.GetCount())...) - case api.DeploymentModeCluster: - // Scale dbservers, coordinators, syncmasters & syncworkers - plan = append(plan, createScalePlan(log, status.Members.DBServers, api.ServerGroupDBServers, spec.DBServers.GetCount())...) - plan = append(plan, createScalePlan(log, status.Members.Coordinators, api.ServerGroupCoordinators, spec.Coordinators.GetCount())...) - plan = append(plan, createScalePlan(log, status.Members.SyncMasters, api.ServerGroupSyncMasters, spec.SyncMasters.GetCount())...) - plan = append(plan, createScalePlan(log, status.Members.SyncWorkers, api.ServerGroupSyncWorkers, spec.SyncWorkers.GetCount())...) + if len(plan) == 0 { + switch spec.GetMode() { + case api.DeploymentModeSingle: + // Never scale down + case api.DeploymentModeResilientSingle: + // Only scale singles + plan = append(plan, createScalePlan(log, status.Members.Single, api.ServerGroupSingle, spec.Single.GetCount())...) + case api.DeploymentModeCluster: + // Scale dbservers, coordinators, syncmasters & syncworkers + plan = append(plan, createScalePlan(log, status.Members.DBServers, api.ServerGroupDBServers, spec.DBServers.GetCount())...) + plan = append(plan, createScalePlan(log, status.Members.Coordinators, api.ServerGroupCoordinators, spec.Coordinators.GetCount())...) + plan = append(plan, createScalePlan(log, status.Members.SyncMasters, api.ServerGroupSyncMasters, spec.SyncMasters.GetCount())...) + plan = append(plan, createScalePlan(log, status.Members.SyncWorkers, api.ServerGroupSyncWorkers, spec.SyncWorkers.GetCount())...) + } } // Check for the need to rotate one or more members - getPod := func(podName string) *v1.Pod { - for _, p := range pods { - if p.GetName() == podName { - return &p + if len(plan) == 0 { + getPod := func(podName string) *v1.Pod { + for _, p := range pods { + if p.GetName() == podName { + return &p + } } + return nil } - return nil - } - status.Members.ForeachServerGroup(func(group api.ServerGroup, members *api.MemberStatusList) error { - for _, m := range *members { - if len(plan) > 0 { - // Only 1 change at a time - continue - } - if m.State != api.MemberStateCreated { - // Only rotate when state is created - continue - } - if podName := m.PodName; podName != "" { - if p := getPod(podName); p != nil { - // Got pod, compare it with what it should be - decision := podNeedsUpgrading(*p, spec, status.Images) - if decision.UpgradeNeeded && decision.UpgradeAllowed { - plan = append(plan, createUpgradeMemberPlan(log, m, group, "Version upgrade")...) - } else { - rotNeeded, reason := podNeedsRotation(*p, apiObject, spec, group, status.Members.Agents, m.ID) - if rotNeeded { - plan = append(plan, createRotateMemberPlan(log, m, group, reason)...) + status.Members.ForeachServerGroup(func(group api.ServerGroup, members *api.MemberStatusList) error { + for _, m := range *members { + if len(plan) > 0 { + // Only 1 change at a time + continue + } + if m.Phase != api.MemberPhaseCreated { + // Only rotate when phase is created + continue + } + if podName := m.PodName; podName != "" { + if p := getPod(podName); p != nil { + // Got pod, compare it with what it should be + decision := podNeedsUpgrading(*p, spec, status.Images) + if decision.UpgradeNeeded && decision.UpgradeAllowed { + plan = append(plan, createUpgradeMemberPlan(log, m, group, "Version upgrade")...) + } else { + rotNeeded, reason := podNeedsRotation(*p, apiObject, spec, group, status.Members.Agents, m.ID) + if rotNeeded { + plan = append(plan, createRotateMemberPlan(log, m, group, reason)...) + } } } } } - } - return nil - }) + return nil + }) + } // Return plan return plan, true diff --git a/pkg/deployment/resilience/context.go b/pkg/deployment/resilience/context.go new file mode 100644 index 000000000..6dad0d4f2 --- /dev/null +++ b/pkg/deployment/resilience/context.go @@ -0,0 +1,48 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +package resilience + +import ( + "context" + + driver "github.com/arangodb/go-driver" + api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha" + "github.com/arangodb/kube-arangodb/pkg/util/arangod" +) + +// Context provides methods to the resilience package. +type Context interface { + // GetSpec returns the current specification of the deployment + GetSpec() api.DeploymentSpec + // GetStatus returns the current status of the deployment + GetStatus() api.DeploymentStatus + // UpdateStatus replaces the status of the deployment with the given status and + // updates the resources in k8s. + UpdateStatus(status api.DeploymentStatus, force ...bool) error + // GetAgencyClients returns a client connection for every agency member. + // If the given predicate is not nil, only agents are included where the given predicate returns true. + GetAgencyClients(ctx context.Context, predicate func(id string) bool) ([]arangod.Agency, error) + // GetDatabaseClient returns a cached client for the entire database (cluster coordinators or single server), + // creating one if needed. + GetDatabaseClient(ctx context.Context) (driver.Client, error) +} diff --git a/pkg/deployment/resilience/errors.go b/pkg/deployment/resilience/errors.go new file mode 100644 index 000000000..3433d4cbb --- /dev/null +++ b/pkg/deployment/resilience/errors.go @@ -0,0 +1,29 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +package resilience + +import "github.com/pkg/errors" + +var ( + maskAny = errors.WithStack +) diff --git a/pkg/deployment/resilience/member_failure.go b/pkg/deployment/resilience/member_failure.go new file mode 100644 index 000000000..ff161d500 --- /dev/null +++ b/pkg/deployment/resilience/member_failure.go @@ -0,0 +1,148 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +package resilience + +import ( + "context" + "time" + + api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha" + "github.com/arangodb/kube-arangodb/pkg/util/arangod" +) + +const ( + recentTerminationsSinceGracePeriod = time.Minute * 10 + notReadySinceGracePeriod = time.Minute * 5 + recentTerminationThreshold = 5 +) + +// CheckMemberFailure performs a check for members that should be in failed state because: +// - They are frequently restarted +// - They cannot be scheduled for a long time (TODO) +func (r *Resilience) CheckMemberFailure() error { + status := r.context.GetStatus() + updateStatusNeeded := false + if err := status.Members.ForeachServerGroup(func(group api.ServerGroup, list *api.MemberStatusList) error { + for _, m := range *list { + log := r.log.With(). + Str("id", m.ID). + Str("role", group.AsRole()). + Logger() + // Check current state + if m.Phase != api.MemberPhaseCreated { + // Phase is not Created, so we're not looking further. + continue + } + // Check if pod is ready + if m.Conditions.IsTrue(api.ConditionTypeReady) { + // Pod is now ready, so we're not looking further + continue + } + + // Check not ready for a long time + if !m.Phase.IsFailed() { + if m.IsNotReadySince(time.Now().Add(-notReadySinceGracePeriod)) { + // Member has terminated too often in recent history. + failureAcceptable, reason, err := r.isMemberFailureAcceptable(status, group, m) + if err != nil { + log.Warn().Err(err).Msg("Failed to check is member failure is acceptable") + } else if failureAcceptable { + log.Info().Msg("Member is not ready for long time, marking is failed") + m.Phase = api.MemberPhaseFailed + list.Update(m) + updateStatusNeeded = true + } else { + log.Warn().Msgf("Member is not ready for long time, but it is not safe to mark it a failed because: %s", reason) + } + } + } + + // Check recent terminations + if !m.Phase.IsFailed() { + count := m.RecentTerminationsSince(time.Now().Add(-recentTerminationsSinceGracePeriod)) + if count >= recentTerminationThreshold { + // Member has terminated too often in recent history. + failureAcceptable, reason, err := r.isMemberFailureAcceptable(status, group, m) + if err != nil { + log.Warn().Err(err).Msg("Failed to check is member failure is acceptable") + } else if failureAcceptable { + log.Info().Msg("Member has terminated too often in recent history, marking is failed") + m.Phase = api.MemberPhaseFailed + list.Update(m) + updateStatusNeeded = true + } else { + log.Warn().Msgf("Member has terminated too often in recent history, but it is not safe to mark it a failed because: %s", reason) + } + } + } + } + + return nil + }); err != nil { + return maskAny(err) + } + if updateStatusNeeded { + if err := r.context.UpdateStatus(status); err != nil { + return maskAny(err) + } + } + + return nil +} + +// isMemberFailureAcceptable checks if it is currently acceptable to switch the phase of the given member +// to failed, which means that it will be replaced. +// Return: failureAcceptable, notAcceptableReason, error +func (r *Resilience) isMemberFailureAcceptable(status api.DeploymentStatus, group api.ServerGroup, m api.MemberStatus) (bool, string, error) { + ctx := context.Background() + switch group { + case api.ServerGroupAgents: + // All good when remaining agents are health + clients, err := r.context.GetAgencyClients(ctx, func(id string) bool { return id != m.ID }) + if err != nil { + return false, "", maskAny(err) + } + if err := arangod.AreAgentsHealthy(ctx, clients); err != nil { + return false, err.Error(), nil + } + return true, "", nil + case api.ServerGroupDBServers: + client, err := r.context.GetDatabaseClient(ctx) + if err != nil { + return false, "", maskAny(err) + } + if err := arangod.IsDBServerEmpty(ctx, m.ID, client); err != nil { + return false, err.Error(), nil + } + return true, "", nil + case api.ServerGroupCoordinators: + // Coordinators can be replaced at will + return true, "", nil + case api.ServerGroupSyncMasters, api.ServerGroupSyncWorkers: + // Sync masters & workers can be replaced at will + return true, "", nil + default: + // TODO + return false, "TODO", nil + } +} diff --git a/pkg/deployment/resilience/resilience.go b/pkg/deployment/resilience/resilience.go new file mode 100644 index 000000000..70485e089 --- /dev/null +++ b/pkg/deployment/resilience/resilience.go @@ -0,0 +1,40 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +package resilience + +import "github.com/rs/zerolog" + +// Resilience is the service that inspects the overall state of the deployment +// to improve resilience. +type Resilience struct { + log zerolog.Logger + context Context +} + +// NewResilience creates a new resilience with given context. +func NewResilience(log zerolog.Logger, context Context) *Resilience { + return &Resilience{ + log: log, + context: context, + } +} diff --git a/pkg/deployment/resources/pod_creator.go b/pkg/deployment/resources/pod_creator.go index 373f98757..ce55e8bbd 100644 --- a/pkg/deployment/resources/pod_creator.go +++ b/pkg/deployment/resources/pod_creator.go @@ -317,7 +317,7 @@ func (r *Resources) createPodForMember(spec api.DeploymentSpec, group api.Server roleAbbr := group.AsRoleAbbreviated() podSuffix := createPodSuffix(spec) m.PodName = k8sutil.CreatePodName(apiObject.GetName(), roleAbbr, m.ID, podSuffix) - newState := api.MemberStateCreated + newPhase := api.MemberPhaseCreated // Create pod if group.IsArangod() { // Find image ID @@ -329,7 +329,7 @@ func (r *Resources) createPodForMember(spec api.DeploymentSpec, group api.Server // Prepare arguments autoUpgrade := m.Conditions.IsTrue(api.ConditionTypeAutoUpgrade) if autoUpgrade { - newState = api.MemberStateUpgrading + newPhase = api.MemberPhaseUpgrading } args := createArangodArgs(apiObject, spec, group, status.Members.Agents, m.ID, autoUpgrade) env := make(map[string]k8sutil.EnvValue) @@ -393,8 +393,8 @@ func (r *Resources) createPodForMember(spec api.DeploymentSpec, group api.Server } log.Debug().Str("pod-name", m.PodName).Msg("Created pod") } - // Record new member state - m.State = newState + // Record new member phase + m.Phase = newPhase m.Conditions.Remove(api.ConditionTypeReady) m.Conditions.Remove(api.ConditionTypeTerminated) m.Conditions.Remove(api.ConditionTypeAutoUpgrade) @@ -405,7 +405,7 @@ func (r *Resources) createPodForMember(spec api.DeploymentSpec, group api.Server return maskAny(err) } // Create event - r.context.CreateEvent(k8sutil.NewMemberAddEvent(m.PodName, role, apiObject)) + r.context.CreateEvent(k8sutil.NewPodCreatedEvent(m.PodName, role, apiObject)) return nil } @@ -416,7 +416,7 @@ func (r *Resources) EnsurePods() error { status := r.context.GetStatus() if err := iterator.ForeachServerGroup(func(group api.ServerGroup, groupSpec api.ServerGroupSpec, status *api.MemberStatusList) error { for _, m := range *status { - if m.State != api.MemberStateNone { + if m.Phase != api.MemberPhaseNone { continue } spec := r.context.GetSpec() diff --git a/pkg/deployment/resources/pod_inspector.go b/pkg/deployment/resources/pod_inspector.go index b419b3143..138ff5946 100644 --- a/pkg/deployment/resources/pod_inspector.go +++ b/pkg/deployment/resources/pod_inspector.go @@ -79,21 +79,27 @@ func (r *Resources) InspectPods() error { updateMemberStatusNeeded := false if k8sutil.IsPodSucceeded(&p) { // Pod has terminated with exit code 0. + wasTerminated := memberStatus.Conditions.IsTrue(api.ConditionTypeTerminated) if memberStatus.Conditions.Update(api.ConditionTypeTerminated, true, "Pod Succeeded", "") { log.Debug().Str("pod-name", p.GetName()).Msg("Updating member condition Terminated to true: Pod Succeeded") updateMemberStatusNeeded = true - // Record termination time - now := metav1.Now() - memberStatus.RecentTerminations = append(memberStatus.RecentTerminations, now) + if !wasTerminated { + // Record termination time + now := metav1.Now() + memberStatus.RecentTerminations = append(memberStatus.RecentTerminations, now) + } } } else if k8sutil.IsPodFailed(&p) { // Pod has terminated with at least 1 container with a non-zero exit code. + wasTerminated := memberStatus.Conditions.IsTrue(api.ConditionTypeTerminated) if memberStatus.Conditions.Update(api.ConditionTypeTerminated, true, "Pod Failed", "") { log.Debug().Str("pod-name", p.GetName()).Msg("Updating member condition Terminated to true: Pod Failed") updateMemberStatusNeeded = true - // Record termination time - now := metav1.Now() - memberStatus.RecentTerminations = append(memberStatus.RecentTerminations, now) + if !wasTerminated { + // Record termination time + now := metav1.Now() + memberStatus.RecentTerminations = append(memberStatus.RecentTerminations, now) + } } } if k8sutil.IsPodReady(&p) { @@ -137,23 +143,44 @@ func (r *Resources) InspectPods() error { for _, m := range *members { if podName := m.PodName; podName != "" { if !podExists(podName) { - switch m.State { - case api.MemberStateNone: + switch m.Phase { + case api.MemberPhaseNone: // Do nothing - case api.MemberStateShuttingDown, api.MemberStateRotating, api.MemberStateUpgrading: + case api.MemberPhaseShuttingDown, api.MemberPhaseRotating, api.MemberPhaseUpgrading, api.MemberPhaseFailed: // Shutdown was intended, so not need to do anything here. // Just mark terminated + wasTerminated := m.Conditions.IsTrue(api.ConditionTypeTerminated) if m.Conditions.Update(api.ConditionTypeTerminated, true, "Pod Terminated", "") { + if !wasTerminated { + // Record termination time + now := metav1.Now() + m.RecentTerminations = append(m.RecentTerminations, now) + } + // Save it if err := status.Members.UpdateMemberStatus(m, group); err != nil { return maskAny(err) } } default: log.Debug().Str("pod-name", podName).Msg("Pod is gone") - m.State = api.MemberStateNone // This is trigger a recreate of the pod. + m.Phase = api.MemberPhaseNone // This is trigger a recreate of the pod. // Create event events = append(events, k8sutil.NewPodGoneEvent(podName, group.AsRole(), apiObject)) + updateMemberNeeded := false if m.Conditions.Update(api.ConditionTypeReady, false, "Pod Does Not Exist", "") { + updateMemberNeeded = true + } + wasTerminated := m.Conditions.IsTrue(api.ConditionTypeTerminated) + if m.Conditions.Update(api.ConditionTypeTerminated, true, "Pod Does Not Exist", "") { + if !wasTerminated { + // Record termination time + now := metav1.Now() + m.RecentTerminations = append(m.RecentTerminations, now) + } + updateMemberNeeded = true + } + if updateMemberNeeded { + // Save it if err := status.Members.UpdateMemberStatus(m, group); err != nil { return maskAny(err) } diff --git a/pkg/util/arangod/agency_health.go b/pkg/util/arangod/agency_health.go new file mode 100644 index 000000000..f76dcf94c --- /dev/null +++ b/pkg/util/arangod/agency_health.go @@ -0,0 +1,98 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +package arangod + +import ( + "context" + "fmt" + "sync" + "time" +) + +const ( + maxAgentResponseTime = time.Second * 10 +) + +// agentStatus is a helper structure used in AreAgentsHealthy. +type agentStatus struct { + IsLeader bool + LeaderEndpoint string + IsResponding bool +} + +// AreAgentsHealthy performs a health check on all given agents. +// Of the given agents, 1 must respond as leader and all others must redirect to the leader. +// The function returns nil when all agents are healthy or an error when something is wrong. +func AreAgentsHealthy(ctx context.Context, clients []Agency) error { + wg := sync.WaitGroup{} + invalidKey := []string{"does-not-exist-149e97e8-4b81-5664-a8a8-9ba93881d64c"} + statuses := make([]agentStatus, len(clients)) + for i, c := range clients { + wg.Add(1) + go func(i int, c Agency) { + defer wg.Done() + var trash interface{} + lctx, cancel := context.WithTimeout(ctx, maxAgentResponseTime) + defer cancel() + if err := c.ReadKey(lctx, invalidKey, &trash); err == nil || IsKeyNotFound(err) { + // We got a valid read from the leader + statuses[i].IsLeader = true + statuses[i].LeaderEndpoint = c.Endpoint() + statuses[i].IsResponding = true + } else { + if location, ok := IsNotLeader(err); ok { + // Valid response from a follower + statuses[i].IsLeader = false + statuses[i].LeaderEndpoint = location + statuses[i].IsResponding = true + } else { + // Unexpected / invalid response + statuses[i].IsResponding = false + } + } + }(i, c) + } + wg.Wait() + + // Check the results + noLeaders := 0 + for i, status := range statuses { + if !status.IsResponding { + return maskAny(fmt.Errorf("Agent %s is not responding", clients[i].Endpoint())) + } + if status.IsLeader { + noLeaders++ + } + if i > 0 { + // Compare leader endpoint with previous + prev := statuses[i-1].LeaderEndpoint + if !IsSameEndpoint(prev, status.LeaderEndpoint) { + return maskAny(fmt.Errorf("Not all agents report the same leader endpoint")) + } + } + } + if noLeaders != 1 { + return maskAny(fmt.Errorf("Unexpected number of agency leaders: %d", noLeaders)) + } + return nil +} diff --git a/pkg/util/arangod/cluster.go b/pkg/util/arangod/cluster.go index a75601499..4874e3cdc 100644 --- a/pkg/util/arangod/cluster.go +++ b/pkg/util/arangod/cluster.go @@ -92,3 +92,22 @@ func SetNumberOfServers(ctx context.Context, conn driver.Connection, noCoordinat } return nil } + +// RemoveServerFromCluster tries to remove a coordinator or DBServer from the cluster. +func RemoveServerFromCluster(ctx context.Context, conn driver.Connection, id driver.ServerID) error { + req, err := conn.NewRequest("POST", "_admin/cluster/removeServer") + if err != nil { + return maskAny(err) + } + if _, err := req.SetBody(id); err != nil { + return maskAny(err) + } + resp, err := conn.Do(ctx, req) + if err != nil { + return maskAny(err) + } + if err := resp.CheckStatus(200); err != nil { + return maskAny(err) + } + return nil +} diff --git a/pkg/util/arangod/dbserver.go b/pkg/util/arangod/dbserver.go new file mode 100644 index 000000000..dc5b264b6 --- /dev/null +++ b/pkg/util/arangod/dbserver.go @@ -0,0 +1,67 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +package arangod + +import ( + "context" + "fmt" + + driver "github.com/arangodb/go-driver" + "github.com/pkg/errors" +) + +// IsDBServerEmpty checks if the dbserver identified by the given ID no longer has any +// data on it. +// The given driver must have all coordinators as endpoints. +// The functions returns an error when the check could not be completed or the dbserver +// is not empty, or nil when the dbserver is found to be empty. +func IsDBServerEmpty(ctx context.Context, id string, client driver.Client) error { + c, err := client.Cluster(ctx) + if err != nil { + return maskAny(errors.Wrapf(err, "Cannot obtain Cluster")) + } + dbs, err := client.Databases(ctx) + if err != nil { + return maskAny(errors.Wrapf(err, "Cannot fetch databases")) + } + for _, db := range dbs { + inventory, err := c.DatabaseInventory(ctx, db) + if err != nil { + return maskAny(errors.Wrapf(err, "Cannot fetch inventory for %s", db.Name())) + } + // Go over all collections + for _, col := range inventory.Collections { + // Go over all shards of the collection + for shardID, serverIDs := range col.Parameters.Shards { + for _, serverID := range serverIDs { + if string(serverID) == id { + // DBServer still used in this shard + return maskAny(fmt.Errorf("DBServer still used in shard %s of %s.%s", shardID, col.Parameters.Name, db.Name())) + } + } + } + } + } + // DBServer is not used in any shard of any database + return nil +} diff --git a/pkg/util/arangod/driver_init.go b/pkg/util/arangod/driver_init.go new file mode 100644 index 000000000..cd634537d --- /dev/null +++ b/pkg/util/arangod/driver_init.go @@ -0,0 +1,34 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +package arangod + +import ( + driver "github.com/arangodb/go-driver" + "github.com/pkg/errors" +) + +func init() { + // Initialize error wrapping in the go-driver + driver.WithStack = errors.WithStack + driver.Cause = errors.Cause +} diff --git a/pkg/util/k8sutil/events.go b/pkg/util/k8sutil/events.go index ab86beafb..c0210add8 100644 --- a/pkg/util/k8sutil/events.go +++ b/pkg/util/k8sutil/events.go @@ -59,6 +59,15 @@ func NewMemberRemoveEvent(memberName, role string, apiObject APIObject) *v1.Even return event } +// NewPodCreatedEvent creates an event indicating that a pod has been created +func NewPodCreatedEvent(podName, role string, apiObject APIObject) *v1.Event { + event := newDeploymentEvent(apiObject) + event.Type = v1.EventTypeNormal + event.Reason = fmt.Sprintf("Pod Of %s Created", strings.Title(role)) + event.Message = fmt.Sprintf("Pod %s of member %s is created", podName, role) + return event +} + // NewPodGoneEvent creates an event indicating that a pod is missing func NewPodGoneEvent(podName, role string, apiObject APIObject) *v1.Event { event := newDeploymentEvent(apiObject) diff --git a/tests/member_resilience_test.go b/tests/member_resilience_test.go new file mode 100644 index 000000000..34070913d --- /dev/null +++ b/tests/member_resilience_test.go @@ -0,0 +1,308 @@ +package tests + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/dchest/uniuri" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + driver "github.com/arangodb/go-driver" + api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha" + "github.com/arangodb/kube-arangodb/pkg/client" + "github.com/arangodb/kube-arangodb/pkg/util/retry" +) + +// TestMemberResilienceAgents creates a cluster and removes a +// specific agent pod 5 times. Each time it waits for a new pod to arrive. +// After 5 times, the member should be replaced by another member with the same ID. +func TestMemberResilienceAgents(t *testing.T) { + longOrSkip(t) + c := client.MustNewInCluster() + kubecli := mustNewKubeClient(t) + ns := getNamespace(t) + + // Prepare deployment config + depl := newDeployment("test-member-res-agnt-" + uniuri.NewLen(4)) + depl.Spec.Mode = api.NewMode(api.DeploymentModeCluster) + depl.Spec.SetDefaults(depl.GetName()) // this must be last + + // Create deployment + apiObject, err := c.DatabaseV1alpha().ArangoDeployments(ns).Create(depl) + if err != nil { + t.Fatalf("Create deployment failed: %v", err) + } + + // Wait for deployment to be ready + if _, err = waitUntilDeployment(c, depl.GetName(), ns, deploymentIsReady()); err != nil { + t.Fatalf("Deployment not running in time: %v", err) + } + + // Create a database client + ctx := context.Background() + client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t) + + // Wait for cluster to be completely ready + if err := waitUntilClusterHealth(client, func(h driver.ClusterHealth) error { + return clusterHealthEqualsSpec(h, apiObject.Spec) + }); err != nil { + t.Fatalf("Cluster not running in expected health in time: %v", err) + } + + // Fetch latest status so we know all member details + apiObject, err = c.DatabaseV1alpha().ArangoDeployments(ns).Get(depl.GetName(), metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get deployment: %v", err) + } + + // Pick an agent to be deleted 5 times + targetAgent := apiObject.Status.Members.Agents[0] + for i := 0; i < 5; i++ { + // Get current pod so we can compare UID later + originalPod, err := kubecli.CoreV1().Pods(ns).Get(targetAgent.PodName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get pod %s: %v", targetAgent.PodName, err) + } + if err := kubecli.CoreV1().Pods(ns).Delete(targetAgent.PodName, &metav1.DeleteOptions{}); err != nil { + t.Fatalf("Failed to delete pod %s: %v", targetAgent.PodName, err) + } + if i < 4 { + // Wait for pod to return with different UID + op := func() error { + pod, err := kubecli.CoreV1().Pods(ns).Get(targetAgent.PodName, metav1.GetOptions{}) + if err != nil { + return maskAny(err) + } + if pod.GetUID() == originalPod.GetUID() { + return fmt.Errorf("Still original pod") + } + return nil + } + if err := retry.Retry(op, time.Minute); err != nil { + t.Fatalf("Pod did not restart: %v", err) + } + } else { + // Wait for member to be replaced + op := func() error { + updatedObject, err := c.DatabaseV1alpha().ArangoDeployments(ns).Get(depl.GetName(), metav1.GetOptions{}) + if err != nil { + return maskAny(err) + } + m, _, found := updatedObject.Status.Members.ElementByID(targetAgent.ID) + if !found { + return maskAny(fmt.Errorf("Member %s not found", targetAgent.ID)) + } + if m.CreatedAt.Equal(&targetAgent.CreatedAt) { + return maskAny(fmt.Errorf("Member %s still not replaced", targetAgent.ID)) + } + return nil + } + if err := retry.Retry(op, time.Minute); err != nil { + t.Fatalf("Member failure did not succeed: %v", err) + } + } + // Wait for cluster to be completely ready + if err := waitUntilClusterHealth(client, func(h driver.ClusterHealth) error { + return clusterHealthEqualsSpec(h, apiObject.Spec) + }); err != nil { + t.Fatalf("Cluster not running in expected health in time: %v", err) + } + } + + // Cleanup + removeDeployment(c, depl.GetName(), ns) +} + +// TestMemberResilienceCoordinators creates a cluster and removes a +// specific coordinator pod 5 times. Each time it waits for a new pod to arrive. +// After 5 times, the member should be replaced by another member. +func TestMemberResilienceCoordinators(t *testing.T) { + longOrSkip(t) + c := client.MustNewInCluster() + kubecli := mustNewKubeClient(t) + ns := getNamespace(t) + + // Prepare deployment config + depl := newDeployment("test-member-res-crdn-" + uniuri.NewLen(4)) + depl.Spec.Mode = api.NewMode(api.DeploymentModeCluster) + depl.Spec.SetDefaults(depl.GetName()) // this must be last + + // Create deployment + apiObject, err := c.DatabaseV1alpha().ArangoDeployments(ns).Create(depl) + if err != nil { + t.Fatalf("Create deployment failed: %v", err) + } + + // Wait for deployment to be ready + if _, err = waitUntilDeployment(c, depl.GetName(), ns, deploymentIsReady()); err != nil { + t.Fatalf("Deployment not running in time: %v", err) + } + + // Create a database client + ctx := context.Background() + client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t) + + // Wait for cluster to be completely ready + if err := waitUntilClusterHealth(client, func(h driver.ClusterHealth) error { + return clusterHealthEqualsSpec(h, apiObject.Spec) + }); err != nil { + t.Fatalf("Cluster not running in expected health in time: %v", err) + } + + // Fetch latest status so we know all member details + apiObject, err = c.DatabaseV1alpha().ArangoDeployments(ns).Get(depl.GetName(), metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get deployment: %v", err) + } + + // Pick a coordinator to be deleted 5 times + targetCoordinator := apiObject.Status.Members.Coordinators[0] + for i := 0; i < 5; i++ { + // Get current pod so we can compare UID later + originalPod, err := kubecli.CoreV1().Pods(ns).Get(targetCoordinator.PodName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get pod %s: %v", targetCoordinator.PodName, err) + } + if err := kubecli.CoreV1().Pods(ns).Delete(targetCoordinator.PodName, &metav1.DeleteOptions{}); err != nil { + t.Fatalf("Failed to delete pod %s: %v", targetCoordinator.PodName, err) + } + if i < 4 { + // Wait for pod to return with different UID + op := func() error { + pod, err := kubecli.CoreV1().Pods(ns).Get(targetCoordinator.PodName, metav1.GetOptions{}) + if err != nil { + return maskAny(err) + } + if pod.GetUID() == originalPod.GetUID() { + return fmt.Errorf("Still original pod") + } + return nil + } + if err := retry.Retry(op, time.Minute); err != nil { + t.Fatalf("Pod did not restart: %v", err) + } + } else { + // Wait for member to be replaced + op := func() error { + updatedObject, err := c.DatabaseV1alpha().ArangoDeployments(ns).Get(depl.GetName(), metav1.GetOptions{}) + if err != nil { + return maskAny(err) + } + if updatedObject.Status.Members.ContainsID(targetCoordinator.ID) { + return maskAny(fmt.Errorf("Member %s still not replaced", targetCoordinator.ID)) + } + return nil + } + if err := retry.Retry(op, time.Minute); err != nil { + t.Fatalf("Member failure did not succeed: %v", err) + } + } + // Wait for cluster to be completely ready + if err := waitUntilClusterHealth(client, func(h driver.ClusterHealth) error { + return clusterHealthEqualsSpec(h, apiObject.Spec) + }); err != nil { + t.Fatalf("Cluster not running in expected health in time: %v", err) + } + } + + // Cleanup + removeDeployment(c, depl.GetName(), ns) +} + +// TestMemberResilienceDBServers creates a cluster and removes a +// specific dbserver pod 5 times. Each time it waits for a new pod to arrive. +// After 5 times, the member should be replaced by another member. +func TestMemberResilienceDBServers(t *testing.T) { + longOrSkip(t) + c := client.MustNewInCluster() + kubecli := mustNewKubeClient(t) + ns := getNamespace(t) + + // Prepare deployment config + depl := newDeployment("test-member-res-prmr-" + uniuri.NewLen(4)) + depl.Spec.Mode = api.NewMode(api.DeploymentModeCluster) + depl.Spec.SetDefaults(depl.GetName()) // this must be last + + // Create deployment + apiObject, err := c.DatabaseV1alpha().ArangoDeployments(ns).Create(depl) + if err != nil { + t.Fatalf("Create deployment failed: %v", err) + } + + // Wait for deployment to be ready + if _, err = waitUntilDeployment(c, depl.GetName(), ns, deploymentIsReady()); err != nil { + t.Fatalf("Deployment not running in time: %v", err) + } + + // Create a database client + ctx := context.Background() + client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t) + + // Wait for cluster to be completely ready + if err := waitUntilClusterHealth(client, func(h driver.ClusterHealth) error { + return clusterHealthEqualsSpec(h, apiObject.Spec) + }); err != nil { + t.Fatalf("Cluster not running in expected health in time: %v", err) + } + + // Fetch latest status so we know all member details + apiObject, err = c.DatabaseV1alpha().ArangoDeployments(ns).Get(depl.GetName(), metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get deployment: %v", err) + } + + // Pick a coordinator to be deleted 5 times + targetServer := apiObject.Status.Members.DBServers[0] + for i := 0; i < 5; i++ { + // Get current pod so we can compare UID later + originalPod, err := kubecli.CoreV1().Pods(ns).Get(targetServer.PodName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get pod %s: %v", targetServer.PodName, err) + } + if err := kubecli.CoreV1().Pods(ns).Delete(targetServer.PodName, &metav1.DeleteOptions{}); err != nil { + t.Fatalf("Failed to delete pod %s: %v", targetServer.PodName, err) + } + if i < 4 { + // Wait for pod to return with different UID + op := func() error { + pod, err := kubecli.CoreV1().Pods(ns).Get(targetServer.PodName, metav1.GetOptions{}) + if err != nil { + return maskAny(err) + } + if pod.GetUID() == originalPod.GetUID() { + return fmt.Errorf("Still original pod") + } + return nil + } + if err := retry.Retry(op, time.Minute); err != nil { + t.Fatalf("Pod did not restart: %v", err) + } + } else { + // Wait for member to be replaced + op := func() error { + updatedObject, err := c.DatabaseV1alpha().ArangoDeployments(ns).Get(depl.GetName(), metav1.GetOptions{}) + if err != nil { + return maskAny(err) + } + if updatedObject.Status.Members.ContainsID(targetServer.ID) { + return maskAny(fmt.Errorf("Member %s still not replaced", targetServer.ID)) + } + return nil + } + if err := retry.Retry(op, time.Minute); err != nil { + t.Fatalf("Member failure did not succeed: %v", err) + } + } + // Wait for cluster to be completely ready + if err := waitUntilClusterHealth(client, func(h driver.ClusterHealth) error { + return clusterHealthEqualsSpec(h, apiObject.Spec) + }); err != nil { + t.Fatalf("Cluster not running in expected health in time: %v", err) + } + } + + // Cleanup + removeDeployment(c, depl.GetName(), ns) +}