From 993a345ae29a08e0d4feab6f3d80cf2b5aa2f6d5 Mon Sep 17 00:00:00 2001 From: MegaByte875 Date: Mon, 1 Apr 2024 11:40:47 +0800 Subject: [PATCH] support metad and graphd local pv failover (#488) --- apis/apps/v1alpha1/nebulacluster_graphd.go | 9 +- apis/apps/v1alpha1/nebulacluster_metad.go | 9 +- apis/apps/v1alpha1/nebulacluster_types.go | 31 +-- apis/apps/v1alpha1/zz_generated.deepcopy.go | 74 ++++--- .../nebula-operator/crds/nebulaclusters.yaml | 64 +++++- .../apps.nebula-graph.io_nebulaclusters.yaml | 64 +++++- pkg/controller/component/failover_helper.go | 88 ++++++++ pkg/controller/component/graphd_cluster.go | 160 +++++++++++++- pkg/controller/component/graphd_failover.go | 201 ++++++++++++++++++ pkg/controller/component/interface.go | 4 +- pkg/controller/component/metad_cluster.go | 157 +++++++++++++- pkg/controller/component/metad_failover.go | 201 ++++++++++++++++++ pkg/controller/component/storaged_cluster.go | 82 +++---- pkg/controller/component/storaged_failover.go | 200 ++++++++--------- pkg/controller/component/storaged_scaler.go | 4 +- .../nebulabackup/nebula_backup_manager.go | 14 +- .../nebula_cluster_controller.go | 4 + .../nebularestore/nebula_restore_manager.go | 10 +- .../remote.go => credentials/credentials.go} | 2 +- 19 files changed, 1143 insertions(+), 235 deletions(-) create mode 100644 pkg/controller/component/failover_helper.go create mode 100644 pkg/controller/component/graphd_failover.go create mode 100644 pkg/controller/component/metad_failover.go rename pkg/{remote/remote.go => credentials/credentials.go} (99%) diff --git a/apis/apps/v1alpha1/nebulacluster_graphd.go b/apis/apps/v1alpha1/nebulacluster_graphd.go index 01c2aabd..52b9895b 100644 --- a/apis/apps/v1alpha1/nebulacluster_graphd.go +++ b/apis/apps/v1alpha1/nebulacluster_graphd.go @@ -352,6 +352,13 @@ func (c *graphdComponent) IsSuspended() bool { } func (c *graphdComponent) IsAutoFailovering() bool { - //TODO implement me + if len(c.nc.Status.Graphd.FailureHosts) == 0 { + return false + } + for _, failureHost := range c.nc.Status.Graphd.FailureHosts { + if !failureHost.ConfirmationTime.IsZero() { + return true + } + } return false } diff --git a/apis/apps/v1alpha1/nebulacluster_metad.go b/apis/apps/v1alpha1/nebulacluster_metad.go index b1e3c32a..a9798464 100644 --- a/apis/apps/v1alpha1/nebulacluster_metad.go +++ b/apis/apps/v1alpha1/nebulacluster_metad.go @@ -430,6 +430,13 @@ func (c *metadComponent) IsSuspended() bool { } func (c *metadComponent) IsAutoFailovering() bool { - //TODO implement me + if len(c.nc.Status.Metad.FailureHosts) == 0 { + return false + } + for _, failureHost := range c.nc.Status.Metad.FailureHosts { + if !failureHost.ConfirmationTime.IsZero() { + return true + } + } return false } diff --git a/apis/apps/v1alpha1/nebulacluster_types.go b/apis/apps/v1alpha1/nebulacluster_types.go index 12ceac41..9e845969 100644 --- a/apis/apps/v1alpha1/nebulacluster_types.go +++ b/apis/apps/v1alpha1/nebulacluster_types.go @@ -135,22 +135,22 @@ type NebulaClusterStatus struct { // ComponentStatus is the status and version of a nebula component. type ComponentStatus struct { - Version string `json:"version,omitempty"` - Phase ComponentPhase `json:"phase,omitempty"` - Workload *WorkloadStatus `json:"workload,omitempty"` - Volume *VolumeStatus `json:"volume,omitempty"` + Version string `json:"version,omitempty"` + Phase ComponentPhase `json:"phase,omitempty"` + Workload *WorkloadStatus `json:"workload,omitempty"` + FailureHosts map[string]FailureHost `json:"failureHosts,omitempty"` + Volume *VolumeStatus `json:"volume,omitempty"` } // StoragedStatus describes the status and version of nebula storaged. type StoragedStatus struct { - ComponentStatus `json:",inline"` - HostsAdded bool `json:"hostsAdded,omitempty"` - RemovedSpaces []int32 `json:"removedSpaces,omitempty"` - BalancedSpaces []int32 `json:"balancedSpaces,omitempty"` - LastBalanceJob *BalanceJob `json:"lastBalanceJob,omitempty"` - BalancedAfterFailover *bool `json:"balancedAfterFailover,omitempty"` - FailureHosts map[string]StoragedFailureHost `json:"failureHosts,omitempty"` - Volume *VolumeStatus `json:"volume,omitempty"` + ComponentStatus `json:",inline"` + HostsAdded bool `json:"hostsAdded,omitempty"` + RemovedSpaces []int32 `json:"removedSpaces,omitempty"` + BalancedSpaces []int32 `json:"balancedSpaces,omitempty"` + LastBalanceJob *BalanceJob `json:"lastBalanceJob,omitempty"` + FailureHosts map[string]FailureHost `json:"failureHosts,omitempty"` + Volume *VolumeStatus `json:"volume,omitempty"` } // BalanceJob describes the admin job for balance data. @@ -180,11 +180,12 @@ type ProvisionedVolume struct { type EmptyStruct struct{} -// StoragedFailureHost is the storaged failure host information. -type StoragedFailureHost struct { +// FailureHost is the failure host information. +type FailureHost struct { Host string `json:"host,omitempty"` PVCSet map[types.UID]EmptyStruct `json:"pvcSet,omitempty"` - HostDeleted bool `json:"hostDeleted,omitempty"` + HostDeleted *bool `json:"hostDeleted,omitempty"` + DataBalanced *bool `json:"dataBalanced,omitempty"` PodRestarted bool `json:"podRestarted,omitempty"` PodRebuilt bool `json:"podRebuilt,omitempty"` NodeDown bool `json:"nodeDown,omitempty"` diff --git a/apis/apps/v1alpha1/zz_generated.deepcopy.go b/apis/apps/v1alpha1/zz_generated.deepcopy.go index 44924348..b4bdb4d0 100644 --- a/apis/apps/v1alpha1/zz_generated.deepcopy.go +++ b/apis/apps/v1alpha1/zz_generated.deepcopy.go @@ -358,6 +358,13 @@ func (in *ComponentStatus) DeepCopyInto(out *ComponentStatus) { *out = new(WorkloadStatus) (*in).DeepCopyInto(*out) } + if in.FailureHosts != nil { + in, out := &in.FailureHosts, &out.FailureHosts + *out = make(map[string]FailureHost, len(*in)) + for key, val := range *in { + (*out)[key] = *val.DeepCopy() + } + } if in.Volume != nil { in, out := &in.Volume, &out.Volume *out = new(VolumeStatus) @@ -481,6 +488,41 @@ func (in *ExporterSpec) DeepCopy() *ExporterSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FailureHost) DeepCopyInto(out *FailureHost) { + *out = *in + if in.PVCSet != nil { + in, out := &in.PVCSet, &out.PVCSet + *out = make(map[types.UID]EmptyStruct, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.HostDeleted != nil { + in, out := &in.HostDeleted, &out.HostDeleted + *out = new(bool) + **out = **in + } + if in.DataBalanced != nil { + in, out := &in.DataBalanced, &out.DataBalanced + *out = new(bool) + **out = **in + } + in.CreationTime.DeepCopyInto(&out.CreationTime) + in.ConfirmationTime.DeepCopyInto(&out.ConfirmationTime) + in.DeletionTime.DeepCopyInto(&out.DeletionTime) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FailureHost. +func (in *FailureHost) DeepCopy() *FailureHost { + if in == nil { + return nil + } + out := new(FailureHost) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *GraphdServiceSpec) DeepCopyInto(out *GraphdServiceSpec) { *out = *in @@ -1344,31 +1386,6 @@ func (in *StorageProvider) DeepCopy() *StorageProvider { return out } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *StoragedFailureHost) DeepCopyInto(out *StoragedFailureHost) { - *out = *in - if in.PVCSet != nil { - in, out := &in.PVCSet, &out.PVCSet - *out = make(map[types.UID]EmptyStruct, len(*in)) - for key, val := range *in { - (*out)[key] = val - } - } - in.CreationTime.DeepCopyInto(&out.CreationTime) - in.ConfirmationTime.DeepCopyInto(&out.ConfirmationTime) - in.DeletionTime.DeepCopyInto(&out.DeletionTime) -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StoragedFailureHost. -func (in *StoragedFailureHost) DeepCopy() *StoragedFailureHost { - if in == nil { - return nil - } - out := new(StoragedFailureHost) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *StoragedSpec) DeepCopyInto(out *StoragedSpec) { *out = *in @@ -1443,14 +1460,9 @@ func (in *StoragedStatus) DeepCopyInto(out *StoragedStatus) { *out = new(BalanceJob) **out = **in } - if in.BalancedAfterFailover != nil { - in, out := &in.BalancedAfterFailover, &out.BalancedAfterFailover - *out = new(bool) - **out = **in - } if in.FailureHosts != nil { in, out := &in.FailureHosts, &out.FailureHosts - *out = make(map[string]StoragedFailureHost, len(*in)) + *out = make(map[string]FailureHost, len(*in)) for key, val := range *in { (*out)[key] = *val.DeepCopy() } diff --git a/charts/nebula-operator/crds/nebulaclusters.yaml b/charts/nebula-operator/crds/nebulaclusters.yaml index cfa7b38e..5a11fed0 100644 --- a/charts/nebula-operator/crds/nebulaclusters.yaml +++ b/charts/nebula-operator/crds/nebulaclusters.yaml @@ -11783,6 +11783,36 @@ spec: type: array graphd: properties: + failureHosts: + additionalProperties: + properties: + confirmationTime: + format: date-time + type: string + creationTime: + format: date-time + type: string + dataBalanced: + type: boolean + deletionTime: + format: date-time + type: string + host: + type: string + hostDeleted: + type: boolean + nodeDown: + type: boolean + podRebuilt: + type: boolean + podRestarted: + type: boolean + pvcSet: + additionalProperties: + type: object + type: object + type: object + type: object phase: type: string version: @@ -11844,6 +11874,36 @@ spec: type: object metad: properties: + failureHosts: + additionalProperties: + properties: + confirmationTime: + format: date-time + type: string + creationTime: + format: date-time + type: string + dataBalanced: + type: boolean + deletionTime: + format: date-time + type: string + host: + type: string + hostDeleted: + type: boolean + nodeDown: + type: boolean + podRebuilt: + type: boolean + podRestarted: + type: boolean + pvcSet: + additionalProperties: + type: object + type: object + type: object + type: object phase: type: string version: @@ -11908,8 +11968,6 @@ spec: type: integer storaged: properties: - balancedAfterFailover: - type: boolean balancedSpaces: items: format: int32 @@ -11924,6 +11982,8 @@ spec: creationTime: format: date-time type: string + dataBalanced: + type: boolean deletionTime: format: date-time type: string diff --git a/config/crd/bases/apps.nebula-graph.io_nebulaclusters.yaml b/config/crd/bases/apps.nebula-graph.io_nebulaclusters.yaml index cfa7b38e..5a11fed0 100644 --- a/config/crd/bases/apps.nebula-graph.io_nebulaclusters.yaml +++ b/config/crd/bases/apps.nebula-graph.io_nebulaclusters.yaml @@ -11783,6 +11783,36 @@ spec: type: array graphd: properties: + failureHosts: + additionalProperties: + properties: + confirmationTime: + format: date-time + type: string + creationTime: + format: date-time + type: string + dataBalanced: + type: boolean + deletionTime: + format: date-time + type: string + host: + type: string + hostDeleted: + type: boolean + nodeDown: + type: boolean + podRebuilt: + type: boolean + podRestarted: + type: boolean + pvcSet: + additionalProperties: + type: object + type: object + type: object + type: object phase: type: string version: @@ -11844,6 +11874,36 @@ spec: type: object metad: properties: + failureHosts: + additionalProperties: + properties: + confirmationTime: + format: date-time + type: string + creationTime: + format: date-time + type: string + dataBalanced: + type: boolean + deletionTime: + format: date-time + type: string + host: + type: string + hostDeleted: + type: boolean + nodeDown: + type: boolean + podRebuilt: + type: boolean + podRestarted: + type: boolean + pvcSet: + additionalProperties: + type: object + type: object + type: object + type: object phase: type: string version: @@ -11908,8 +11968,6 @@ spec: type: integer storaged: properties: - balancedAfterFailover: - type: boolean balancedSpaces: items: format: int32 @@ -11924,6 +11982,8 @@ spec: creationTime: format: date-time type: string + dataBalanced: + type: boolean deletionTime: format: date-time type: string diff --git a/pkg/controller/component/failover_helper.go b/pkg/controller/component/failover_helper.go new file mode 100644 index 00000000..3f93cfd6 --- /dev/null +++ b/pkg/controller/component/failover_helper.go @@ -0,0 +1,88 @@ +/* +Copyright 2023 Vesoft Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package component + +import ( + "strconv" + "strings" + "time" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/klog/v2" + + "github.com/vesoft-inc/nebula-operator/apis/apps/v1alpha1" + "github.com/vesoft-inc/nebula-operator/apis/pkg/annotation" + "github.com/vesoft-inc/nebula-operator/apis/pkg/label" + "github.com/vesoft-inc/nebula-operator/pkg/kube" + "github.com/vesoft-inc/nebula-operator/pkg/util/condition" +) + +const ( + PVCProtectionFinalizer = "kubernetes.io/pvc-protection" + RestartTolerancePeriod = time.Minute * 1 +) + +func getPodOrdinal(name string) int { + ordinal := -1 + s := strings.Split(name, "-") + o := s[len(s)-1] + if i, err := strconv.ParseInt(o, 10, 32); err == nil { + ordinal = int(i) + } + return ordinal +} + +func getPodAndPvcs(clientSet kube.ClientSet, nc *v1alpha1.NebulaCluster, cl label.Label, podName string) (*corev1.Pod, []corev1.PersistentVolumeClaim, error) { + pod, err := clientSet.Pod().GetPod(nc.Namespace, podName) + if err != nil && !apierrors.IsNotFound(err) { + return nil, nil, err + } + cl[annotation.AnnPodNameKey] = podName + pvcSelector, err := cl.Selector() + if err != nil { + return nil, nil, err + } + pvcs, err := clientSet.PVC().ListPVCs(nc.Namespace, pvcSelector) + if err != nil && !apierrors.IsNotFound(err) { + return nil, nil, err + } + return pod, pvcs, nil +} + +func isNodeDown(node *corev1.Node) bool { + for _, taint := range node.Spec.Taints { + klog.Infof("node %s found taint %s, effect %s", node.Name, taint.Key, taint.Effect) + } + if condition.IsNodeReadyFalseOrUnknown(&node.Status) { + klog.Infof("node %s is not ready", node.Name) + conditions := condition.GetNodeTrueConditions(&node.Status) + for i := range conditions { + klog.Infof("node %s condition type %s is true", node.Name, conditions[i].Type) + } + return true + } + return false +} + +func getWorkloadReplicas(workload *v1alpha1.WorkloadStatus) int32 { + var workloadReplicas int32 + if workload != nil { + workloadReplicas = workload.Replicas + } + return workloadReplicas +} diff --git a/pkg/controller/component/graphd_cluster.go b/pkg/controller/component/graphd_cluster.go index a2fe3609..cc824608 100644 --- a/pkg/controller/component/graphd_cluster.go +++ b/pkg/controller/component/graphd_cluster.go @@ -19,16 +19,24 @@ package component import ( "fmt" "strconv" + "strings" + "time" + "github.com/vesoft-inc/nebula-go/v3/nebula/meta" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" + "k8s.io/utils/pointer" "github.com/vesoft-inc/nebula-operator/apis/apps/v1alpha1" "github.com/vesoft-inc/nebula-operator/apis/pkg/annotation" + "github.com/vesoft-inc/nebula-operator/apis/pkg/label" "github.com/vesoft-inc/nebula-operator/pkg/kube" + "github.com/vesoft-inc/nebula-operator/pkg/nebula" "github.com/vesoft-inc/nebula-operator/pkg/util/discovery" utilerrors "github.com/vesoft-inc/nebula-operator/pkg/util/errors" "github.com/vesoft-inc/nebula-operator/pkg/util/extender" @@ -36,23 +44,26 @@ import ( ) type graphdCluster struct { - clientSet kube.ClientSet - dm discovery.Interface - updateManager UpdateManager - eventRecorder record.EventRecorder + clientSet kube.ClientSet + dm discovery.Interface + updateManager UpdateManager + failoverManager FailoverManager + eventRecorder record.EventRecorder } func NewGraphdCluster( clientSet kube.ClientSet, dm discovery.Interface, um UpdateManager, + fm FailoverManager, recorder record.EventRecorder, ) ReconcileManager { return &graphdCluster{ - clientSet: clientSet, - dm: dm, - updateManager: um, - eventRecorder: recorder, + clientSet: clientSet, + dm: dm, + updateManager: um, + failoverManager: fm, + eventRecorder: recorder, } } @@ -176,6 +187,22 @@ func (c *graphdCluster) syncGraphdWorkload(nc *v1alpha1.NebulaCluster) error { } } + if nc.IsAutoFailoverEnabled() { + r, hosts, err := c.shouldRecover(nc) + if err != nil { + return err + } + if r { + if err := c.failoverManager.Recovery(nc, hosts); err != nil { + return err + } + } else if nc.GraphdComponent().IsAutoFailovering() { + if err := c.failoverManager.Failover(nc); err != nil { + return err + } + } + } + equal := extender.PodTemplateEqual(newWorkload, oldWorkload) if !equal || nc.Status.Graphd.Phase == v1alpha1.UpdatePhase { if err := c.updateManager.Update(nc, oldWorkload, newWorkload, gvk); err != nil { @@ -241,6 +268,72 @@ func (c *graphdCluster) syncNebulaClusterStatus( nc.Status.Graphd.Phase = v1alpha1.RunningPhase } + workloadReplicas := getWorkloadReplicas(nc.Status.Graphd.Workload) + if !nc.IsAutoFailoverEnabled() || + pointer.Int32Deref(nc.Spec.Graphd.Replicas, 0) != workloadReplicas { + return syncComponentStatus(nc.GraphdComponent(), &nc.Status.Graphd, oldWorkload) + } + + options, err := nebula.ClientOptions(nc, nebula.SetIsMeta(true)) + if err != nil { + return err + } + hosts := []string{nc.GetMetadThriftConnAddress()} + metaClient, err := nebula.NewMetaClient(hosts, options...) + if err != nil { + return err + } + defer func() { + _ = metaClient.Disconnect() + }() + + hostItems, err := metaClient.ListHosts(meta.ListHostType_GRAPH) + if err != nil { + return err + } + thriftPort := nc.GraphdComponent().GetPort(v1alpha1.GraphdPortNameThrift) + for i := range hostItems { + host := hostItems[i] + if host.Status == meta.HostStatus_OFFLINE && host.HostAddr.Port == thriftPort { + podName := strings.Split(host.HostAddr.Host, ".")[0] + ordinal := getPodOrdinal(podName) + if int32(ordinal) >= pointer.Int32Deref(nc.Spec.Graphd.Replicas, 0) { + continue + } + if nc.Status.Graphd.FailureHosts == nil { + nc.Status.Graphd.FailureHosts = make(map[string]v1alpha1.FailureHost) + } + fh, exists := nc.Status.Graphd.FailureHosts[podName] + if exists { + deadline := fh.CreationTime.Add(nc.Spec.FailoverPeriod.Duration) + if time.Now().After(deadline) { + if fh.ConfirmationTime.IsZero() { + fh.ConfirmationTime = metav1.Time{Time: time.Now()} + cl := label.New().Cluster(nc.GetClusterName()).Graphd() + _, pvcs, err := getPodAndPvcs(c.clientSet, nc, cl, podName) + if err != nil { + return err + } + pvcSet := make(map[types.UID]v1alpha1.EmptyStruct) + for _, pvc := range pvcs { + pvcSet[pvc.UID] = v1alpha1.EmptyStruct{} + } + fh.PVCSet = pvcSet + nc.Status.Graphd.FailureHosts[podName] = fh + klog.Infof("graphd pod [%s/%s] failover period exceeds %s", nc.Namespace, podName, nc.Spec.FailoverPeriod.Duration.String()) + } + } + continue + } + failureHost := v1alpha1.FailureHost{ + Host: host.HostAddr.Host, + CreationTime: metav1.Time{Time: time.Now()}, + } + nc.Status.Graphd.FailureHosts[podName] = failureHost + klog.Infof("offline graph host %s found", host.HostAddr.Host) + } + } + return syncComponentStatus(nc.GraphdComponent(), &nc.Status.Graphd, oldWorkload) } @@ -321,6 +414,57 @@ func (c *graphdCluster) setTopologyZone(nc *v1alpha1.NebulaCluster, newReplicas return nil } +func (c *graphdCluster) shouldRecover(nc *v1alpha1.NebulaCluster) (bool, []string, error) { + if nc.Status.Graphd.FailureHosts == nil { + return true, nil, nil + } + + m := make(map[string]string) + for podName, fh := range nc.Status.Graphd.FailureHosts { + pod, err := c.clientSet.Pod().GetPod(nc.Namespace, podName) + if err != nil && !apierrors.IsNotFound(err) { + return false, nil, err + } + if pod == nil { + continue + } + if isPodHealthy(pod) { + m[fh.Host] = podName + } + } + if len(m) == 0 { + return false, nil, nil + } + + options, err := nebula.ClientOptions(nc, nebula.SetIsMeta(true)) + if err != nil { + return false, nil, err + } + hosts := []string{nc.GetMetadThriftConnAddress()} + metaClient, err := nebula.NewMetaClient(hosts, options...) + if err != nil { + return false, nil, err + } + defer func() { + _ = metaClient.Disconnect() + }() + + onlineHosts := make([]string, 0) + hostItems, err := metaClient.ListHosts(meta.ListHostType_GRAPH) + if err != nil { + return false, nil, err + } + thriftPort := nc.GraphdComponent().GetPort(v1alpha1.GraphdPortNameThrift) + for _, host := range hostItems { + podName, ok := m[host.HostAddr.Host] + if ok && host.Status == meta.HostStatus_ONLINE && host.HostAddr.Port == thriftPort { + onlineHosts = append(onlineHosts, podName) + } + } + r := len(onlineHosts) > 0 + return r, onlineHosts, nil +} + type FakeGraphdCluster struct { err error } diff --git a/pkg/controller/component/graphd_failover.go b/pkg/controller/component/graphd_failover.go new file mode 100644 index 00000000..a272a323 --- /dev/null +++ b/pkg/controller/component/graphd_failover.go @@ -0,0 +1,201 @@ +/* +Copyright 2023 Vesoft Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package component + +import ( + "context" + "fmt" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/vesoft-inc/nebula-operator/apis/apps/v1alpha1" + "github.com/vesoft-inc/nebula-operator/apis/pkg/label" + "github.com/vesoft-inc/nebula-operator/pkg/kube" + utilerrors "github.com/vesoft-inc/nebula-operator/pkg/util/errors" +) + +type graphdFailover struct { + client.Client + clientSet kube.ClientSet +} + +func NewGraphdFailover(c client.Client, clientSet kube.ClientSet) FailoverManager { + return &graphdFailover{Client: c, clientSet: clientSet} +} + +func (g *graphdFailover) Failover(nc *v1alpha1.NebulaCluster) error { + if err := g.tryRestartPod(nc); err != nil { + return err + } + readyPods, err := g.toleratePods(nc) + if err != nil { + return err + } + if len(readyPods) > 0 { + return utilerrors.ReconcileErrorf("graphd pods [%v] are ready after restarted", readyPods) + } + if err := g.deleteFailurePodAndPVC(nc); err != nil { + return err + } + if err := g.checkPendingPod(nc); err != nil { + return err + } + return nil +} + +func (g *graphdFailover) Recovery(nc *v1alpha1.NebulaCluster, hosts []string) error { + for _, host := range hosts { + delete(nc.Status.Graphd.FailureHosts, host) + klog.Infof("clearing graphd cluster [%s/%s] failure host %s", nc.GetNamespace(), nc.GetName(), host) + } + return nil +} + +func (g *graphdFailover) tryRestartPod(nc *v1alpha1.NebulaCluster) error { + for podName, fh := range nc.Status.Graphd.FailureHosts { + if fh.PodRestarted { + continue + } + pod, err := g.clientSet.Pod().GetPod(nc.Namespace, podName) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + if pod == nil || isPodPending(pod) { + continue + } + node, err := g.clientSet.Node().GetNode(pod.Spec.NodeName) + if err != nil { + klog.Errorf("get node %s failed: %v", pod.Spec.NodeName, err) + return err + } + if isNodeDown(node) { + fh.NodeDown = true + } + if err := g.clientSet.Pod().DeletePod(nc.Namespace, podName, true); err != nil { + return err + } + fh.PodRestarted = true + nc.Status.Graphd.FailureHosts[podName] = fh + return utilerrors.ReconcileErrorf("try to restart failure graphd pod [%s/%s] for recovery", nc.Namespace, podName) + } + return nil +} + +func (g *graphdFailover) toleratePods(nc *v1alpha1.NebulaCluster) ([]string, error) { + readyPods := make([]string, 0) + for podName, fh := range nc.Status.Graphd.FailureHosts { + if fh.PodRebuilt { + continue + } + pod, err := g.clientSet.Pod().GetPod(nc.Namespace, podName) + if err != nil && !apierrors.IsNotFound(err) { + return nil, err + } + if pod != nil && isPodTerminating(pod) { + return nil, utilerrors.ReconcileErrorf("failure graphd pod [%s/%s] is deleting", nc.Namespace, podName) + } + if isPodHealthy(pod) { + readyPods = append(readyPods, podName) + continue + } + tolerance := pod.CreationTimestamp.Add(RestartTolerancePeriod) + if time.Now().Before(tolerance) { + return nil, utilerrors.ReconcileErrorf("waiting failure graphd pod [%s/%s] ready in tolerance period", nc.Namespace, podName) + } + } + return readyPods, nil +} + +func (g *graphdFailover) deleteFailurePodAndPVC(nc *v1alpha1.NebulaCluster) error { + cl := label.New().Cluster(nc.GetClusterName()).Graphd() + for podName, fh := range nc.Status.Graphd.FailureHosts { + if fh.PodRebuilt { + continue + } + pod, pvcs, err := getPodAndPvcs(g.clientSet, nc, cl, podName) + if err != nil { + return err + } + if pod == nil { + return fmt.Errorf("failure graphd pod [%s/%s] not found, skip", nc.Namespace, podName) + } + if !isPodTerminating(pod) { + podScheduled := isPodConditionScheduledTrue(pod.Status.Conditions) + klog.Infof("scheduled condition of pod [%s/%s] is %v", nc.Namespace, podName, podScheduled) + if err := g.clientSet.Pod().DeletePod(nc.Namespace, podName, true); err != nil { + return err + } + } else { + klog.Infof("pod [%s/%s] has DeletionTimestamp set to %s", nc.Namespace, podName, pod.DeletionTimestamp.String()) + } + + for i := range pvcs { + pvc := pvcs[i] + if _, exist := fh.PVCSet[pvc.UID]; exist { + if pvc.DeletionTimestamp == nil { + if err := g.clientSet.PVC().DeletePVC(nc.Namespace, pvc.Name); err != nil { + return err + } + klog.Infof("delete failure graphd pod PVC [%s/%s] successfully", nc.Namespace, pvc.Name) + } else { + klog.Infof("PVC [%s/%s] has DeletionTimestamp set to %s", nc.Namespace, pvc.Name, pvc.DeletionTimestamp.String()) + } + } + } + fh.PodRebuilt = true + fh.DeletionTime = metav1.Time{Time: time.Now()} + nc.Status.Graphd.FailureHosts[podName] = fh + return utilerrors.ReconcileErrorf("try to delete failure graphd pod [%s/%s] for rebuilding", nc.Namespace, podName) + } + return nil +} + +func (g *graphdFailover) checkPendingPod(nc *v1alpha1.NebulaCluster) error { + cl := label.New().Cluster(nc.GetClusterName()).Graphd() + for podName, fh := range nc.Status.Graphd.FailureHosts { + pod, pvcs, err := getPodAndPvcs(g.clientSet, nc, cl, podName) + if err != nil { + return err + } + if pod == nil { + return fmt.Errorf("rebuilt graphd pod [%s/%s] not found, skip", nc.Namespace, podName) + } + for i := range pvcs { + pvc := pvcs[i] + if _, exist := fh.PVCSet[pvc.UID]; exist { + if pvc.DeletionTimestamp != nil && len(pvc.GetFinalizers()) > 0 { + if err := kube.UpdateFinalizer(context.TODO(), g.Client, pvc.DeepCopy(), kube.RemoveFinalizerOpType, PVCProtectionFinalizer); err != nil { + return err + } + return utilerrors.ReconcileErrorf("waiting for PVC [%s/%s] finalizer updated", nc.Namespace, pvc.Name) + } + } + } + if isPodConditionScheduledTrue(pod.Status.Conditions) && isPodPending(pod) && time.Now().After(pod.CreationTimestamp.Add(time.Minute*1)) { + klog.Infof("graphd pod [%s/%s] conditions %v", nc.Namespace, podName, pod.Status.Conditions) + if err := g.clientSet.Pod().DeletePod(nc.Namespace, podName, true); err != nil { + return err + } + return utilerrors.ReconcileErrorf("pending graphd pod [%s/%s] deleted, reschedule", nc.Namespace, podName) + } + } + return nil +} diff --git a/pkg/controller/component/interface.go b/pkg/controller/component/interface.go index bf817ab4..8f93969b 100644 --- a/pkg/controller/component/interface.go +++ b/pkg/controller/component/interface.go @@ -52,6 +52,8 @@ type UpdateManager interface { } type FailoverManager interface { + // Failover moves the offline state host to a ready node Failover(nc *v1alpha1.NebulaCluster) error - Recovery(nc *v1alpha1.NebulaCluster) error + // Recovery clears the failure hosts in nc status + Recovery(nc *v1alpha1.NebulaCluster, hosts []string) error } diff --git a/pkg/controller/component/metad_cluster.go b/pkg/controller/component/metad_cluster.go index 72e70c19..0dbc1a5c 100644 --- a/pkg/controller/component/metad_cluster.go +++ b/pkg/controller/component/metad_cluster.go @@ -18,16 +18,24 @@ package component import ( "fmt" + "strings" + "time" + "github.com/vesoft-inc/nebula-go/v3/nebula/meta" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" + "k8s.io/utils/pointer" "github.com/vesoft-inc/nebula-operator/apis/apps/v1alpha1" "github.com/vesoft-inc/nebula-operator/apis/pkg/annotation" + "github.com/vesoft-inc/nebula-operator/apis/pkg/label" "github.com/vesoft-inc/nebula-operator/pkg/kube" + "github.com/vesoft-inc/nebula-operator/pkg/nebula" "github.com/vesoft-inc/nebula-operator/pkg/util/discovery" utilerrors "github.com/vesoft-inc/nebula-operator/pkg/util/errors" "github.com/vesoft-inc/nebula-operator/pkg/util/extender" @@ -35,23 +43,26 @@ import ( ) type metadCluster struct { - clientSet kube.ClientSet - dm discovery.Interface - updateManager UpdateManager - eventRecorder record.EventRecorder + clientSet kube.ClientSet + dm discovery.Interface + updateManager UpdateManager + failoverManager FailoverManager + eventRecorder record.EventRecorder } func NewMetadCluster( clientSet kube.ClientSet, dm discovery.Interface, um UpdateManager, + fm FailoverManager, recorder record.EventRecorder, ) ReconcileManager { return &metadCluster{ - clientSet: clientSet, - dm: dm, - updateManager: um, - eventRecorder: recorder, + clientSet: clientSet, + dm: dm, + updateManager: um, + failoverManager: fm, + eventRecorder: recorder, } } @@ -169,6 +180,22 @@ func (c *metadCluster) syncMetadWorkload(nc *v1alpha1.NebulaCluster) error { return utilerrors.ReconcileErrorf("waiting for metad cluster %s running", newWorkload.GetName()) } + if nc.IsAutoFailoverEnabled() { + r, hosts, err := c.shouldRecover(nc) + if err != nil { + return err + } + if r { + if err := c.failoverManager.Recovery(nc, hosts); err != nil { + return err + } + } else if nc.MetadComponent().IsAutoFailovering() { + if err := c.failoverManager.Failover(nc); err != nil { + return err + } + } + } + equal := extender.PodTemplateEqual(newWorkload, oldWorkload) if !equal || nc.Status.Metad.Phase == v1alpha1.UpdatePhase { if err := c.updateManager.Update(nc, oldWorkload, newWorkload, gvk); err != nil { @@ -206,7 +233,68 @@ func (c *metadCluster) syncNebulaClusterStatus(nc *v1alpha1.NebulaCluster, oldWo nc.Status.Metad.Phase = v1alpha1.RunningPhase } - // TODO: show metad hosts state with metad peers + workloadReplicas := getWorkloadReplicas(nc.Status.Metad.Workload) + if !nc.IsAutoFailoverEnabled() || + pointer.Int32Deref(nc.Spec.Metad.Replicas, 0) != workloadReplicas { + return syncComponentStatus(nc.MetadComponent(), &nc.Status.Metad, oldWorkload) + } + + options, err := nebula.ClientOptions(nc, nebula.SetIsMeta(true)) + if err != nil { + return err + } + hosts := []string{nc.GetMetadThriftConnAddress()} + metaClient, err := nebula.NewMetaClient(hosts, options...) + if err != nil { + return err + } + defer func() { + _ = metaClient.Disconnect() + }() + + hostItems, err := metaClient.ListHosts(meta.ListHostType_META) + if err != nil { + return err + } + thriftPort := nc.MetadComponent().GetPort(v1alpha1.MetadPortNameThrift) + for i := range hostItems { + host := hostItems[i] + if host.Status == meta.HostStatus_OFFLINE && host.HostAddr.Port == thriftPort { + podName := strings.Split(host.HostAddr.Host, ".")[0] + if nc.Status.Metad.FailureHosts == nil { + nc.Status.Metad.FailureHosts = make(map[string]v1alpha1.FailureHost) + } + fh, exists := nc.Status.Metad.FailureHosts[podName] + if exists { + deadline := fh.CreationTime.Add(nc.Spec.FailoverPeriod.Duration) + if time.Now().After(deadline) { + if fh.ConfirmationTime.IsZero() { + fh.ConfirmationTime = metav1.Time{Time: time.Now()} + cl := label.New().Cluster(nc.GetClusterName()).Metad() + _, pvcs, err := getPodAndPvcs(c.clientSet, nc, cl, podName) + if err != nil { + return err + } + pvcSet := make(map[types.UID]v1alpha1.EmptyStruct) + for _, pvc := range pvcs { + pvcSet[pvc.UID] = v1alpha1.EmptyStruct{} + } + fh.PVCSet = pvcSet + nc.Status.Metad.FailureHosts[podName] = fh + klog.Infof("metad pod [%s/%s] failover period exceeds %s", nc.Namespace, podName, nc.Spec.FailoverPeriod.Duration.String()) + } + } + continue + } + failureHost := v1alpha1.FailureHost{ + Host: host.HostAddr.Host, + CreationTime: metav1.Time{Time: time.Now()}, + } + nc.Status.Metad.FailureHosts[podName] = failureHost + klog.Infof("offline meta host %s found", host.HostAddr.Host) + } + } + return syncComponentStatus(nc.MetadComponent(), &nc.Status.Metad, oldWorkload) } @@ -227,6 +315,57 @@ func (c *metadCluster) syncMetadPVC(nc *v1alpha1.NebulaCluster) error { return nil } +func (c *metadCluster) shouldRecover(nc *v1alpha1.NebulaCluster) (bool, []string, error) { + if nc.Status.Metad.FailureHosts == nil { + return true, nil, nil + } + + m := make(map[string]string) + for podName, fh := range nc.Status.Metad.FailureHosts { + pod, err := c.clientSet.Pod().GetPod(nc.Namespace, podName) + if err != nil && !apierrors.IsNotFound(err) { + return false, nil, err + } + if pod == nil { + continue + } + if isPodHealthy(pod) { + m[fh.Host] = podName + } + } + if len(m) == 0 { + return false, nil, nil + } + + options, err := nebula.ClientOptions(nc, nebula.SetIsMeta(true)) + if err != nil { + return false, nil, err + } + hosts := []string{nc.GetMetadThriftConnAddress()} + metaClient, err := nebula.NewMetaClient(hosts, options...) + if err != nil { + return false, nil, err + } + defer func() { + _ = metaClient.Disconnect() + }() + + onlineHosts := make([]string, 0) + hostItems, err := metaClient.ListHosts(meta.ListHostType_META) + if err != nil { + return false, nil, err + } + thriftPort := nc.MetadComponent().GetPort(v1alpha1.MetadPortNameThrift) + for _, host := range hostItems { + podName, ok := m[host.HostAddr.Host] + if ok && host.Status == meta.HostStatus_ONLINE && host.HostAddr.Port == thriftPort { + onlineHosts = append(onlineHosts, podName) + } + } + r := len(onlineHosts) > 0 + return r, onlineHosts, nil +} + type FakeMetadCluster struct { err error } diff --git a/pkg/controller/component/metad_failover.go b/pkg/controller/component/metad_failover.go new file mode 100644 index 00000000..2c67eaa6 --- /dev/null +++ b/pkg/controller/component/metad_failover.go @@ -0,0 +1,201 @@ +/* +Copyright 2023 Vesoft Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package component + +import ( + "context" + "fmt" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/vesoft-inc/nebula-operator/apis/apps/v1alpha1" + "github.com/vesoft-inc/nebula-operator/apis/pkg/label" + "github.com/vesoft-inc/nebula-operator/pkg/kube" + utilerrors "github.com/vesoft-inc/nebula-operator/pkg/util/errors" +) + +type metadFailover struct { + client.Client + clientSet kube.ClientSet +} + +func NewMetadFailover(c client.Client, clientSet kube.ClientSet) FailoverManager { + return &metadFailover{Client: c, clientSet: clientSet} +} + +func (m *metadFailover) Failover(nc *v1alpha1.NebulaCluster) error { + if err := m.tryRestartPod(nc); err != nil { + return err + } + readyPods, err := m.toleratePods(nc) + if err != nil { + return err + } + if len(readyPods) > 0 { + return utilerrors.ReconcileErrorf("metad pods [%v] are ready after restarted", readyPods) + } + if err := m.deleteFailurePodAndPVC(nc); err != nil { + return err + } + if err := m.checkPendingPod(nc); err != nil { + return err + } + return nil +} + +func (m *metadFailover) Recovery(nc *v1alpha1.NebulaCluster, hosts []string) error { + for _, host := range hosts { + delete(nc.Status.Metad.FailureHosts, host) + klog.Infof("clearing metad cluster [%s/%s] failure host %s", nc.GetNamespace(), nc.GetName(), host) + } + return nil +} + +func (m *metadFailover) tryRestartPod(nc *v1alpha1.NebulaCluster) error { + for podName, fh := range nc.Status.Metad.FailureHosts { + if fh.PodRestarted { + continue + } + pod, err := m.clientSet.Pod().GetPod(nc.Namespace, podName) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + if pod == nil || isPodPending(pod) { + continue + } + node, err := m.clientSet.Node().GetNode(pod.Spec.NodeName) + if err != nil { + klog.Errorf("get node %s failed: %v", pod.Spec.NodeName, err) + return err + } + if isNodeDown(node) { + fh.NodeDown = true + } + if err := m.clientSet.Pod().DeletePod(nc.Namespace, podName, true); err != nil { + return err + } + fh.PodRestarted = true + nc.Status.Metad.FailureHosts[podName] = fh + return utilerrors.ReconcileErrorf("try to restart failure metad pod [%s/%s] for recovery", nc.Namespace, podName) + } + return nil +} + +func (m *metadFailover) toleratePods(nc *v1alpha1.NebulaCluster) ([]string, error) { + readyPods := make([]string, 0) + for podName, fh := range nc.Status.Metad.FailureHosts { + if fh.PodRebuilt { + continue + } + pod, err := m.clientSet.Pod().GetPod(nc.Namespace, podName) + if err != nil && !apierrors.IsNotFound(err) { + return nil, err + } + if pod != nil && isPodTerminating(pod) { + return nil, utilerrors.ReconcileErrorf("failure metad pod [%s/%s] is deleting", nc.Namespace, podName) + } + if isPodHealthy(pod) { + readyPods = append(readyPods, podName) + continue + } + tolerance := pod.CreationTimestamp.Add(RestartTolerancePeriod) + if time.Now().Before(tolerance) { + return nil, utilerrors.ReconcileErrorf("waiting failure metad pod [%s/%s] ready in tolerance period", nc.Namespace, podName) + } + } + return readyPods, nil +} + +func (m *metadFailover) deleteFailurePodAndPVC(nc *v1alpha1.NebulaCluster) error { + cl := label.New().Cluster(nc.GetClusterName()).Metad() + for podName, fh := range nc.Status.Metad.FailureHosts { + if fh.PodRebuilt { + continue + } + pod, pvcs, err := getPodAndPvcs(m.clientSet, nc, cl, podName) + if err != nil { + return err + } + if pod == nil { + return fmt.Errorf("failure metad pod [%s/%s] not found, skip", nc.Namespace, podName) + } + if !isPodTerminating(pod) { + podScheduled := isPodConditionScheduledTrue(pod.Status.Conditions) + klog.Infof("scheduled condition of pod [%s/%s] is %v", nc.Namespace, podName, podScheduled) + if err := m.clientSet.Pod().DeletePod(nc.Namespace, podName, true); err != nil { + return err + } + } else { + klog.Infof("pod [%s/%s] has DeletionTimestamp set to %s", nc.Namespace, podName, pod.DeletionTimestamp.String()) + } + + for i := range pvcs { + pvc := pvcs[i] + if _, exist := fh.PVCSet[pvc.UID]; exist { + if pvc.DeletionTimestamp == nil { + if err := m.clientSet.PVC().DeletePVC(nc.Namespace, pvc.Name); err != nil { + return err + } + klog.Infof("delete failure metad pod PVC [%s/%s] successfully", nc.Namespace, pvc.Name) + } else { + klog.Infof("PVC [%s/%s] has DeletionTimestamp set to %s", nc.Namespace, pvc.Name, pvc.DeletionTimestamp.String()) + } + } + } + fh.PodRebuilt = true + fh.DeletionTime = metav1.Time{Time: time.Now()} + nc.Status.Metad.FailureHosts[podName] = fh + return utilerrors.ReconcileErrorf("try to delete failure metad pod [%s/%s] for rebuilding", nc.Namespace, podName) + } + return nil +} + +func (m *metadFailover) checkPendingPod(nc *v1alpha1.NebulaCluster) error { + cl := label.New().Cluster(nc.GetClusterName()).Metad() + for podName, fh := range nc.Status.Metad.FailureHosts { + pod, pvcs, err := getPodAndPvcs(m.clientSet, nc, cl, podName) + if err != nil { + return err + } + if pod == nil { + return fmt.Errorf("rebuilt metad pod [%s/%s] not found", nc.Namespace, podName) + } + for i := range pvcs { + pvc := pvcs[i] + if _, exist := fh.PVCSet[pvc.UID]; exist { + if pvc.DeletionTimestamp != nil && len(pvc.GetFinalizers()) > 0 { + if err := kube.UpdateFinalizer(context.TODO(), m.Client, pvc.DeepCopy(), kube.RemoveFinalizerOpType, PVCProtectionFinalizer); err != nil { + return err + } + return utilerrors.ReconcileErrorf("waiting for PVC [%s/%s] finalizer updated", nc.Namespace, pvc.Name) + } + } + } + if isPodConditionScheduledTrue(pod.Status.Conditions) && isPodPending(pod) && time.Now().After(pod.CreationTimestamp.Add(time.Minute*1)) { + klog.Infof("metad pod [%s/%s] conditions %v", nc.Namespace, podName, pod.Status.Conditions) + if err := m.clientSet.Pod().DeletePod(nc.Namespace, podName, true); err != nil { + return err + } + return utilerrors.ReconcileErrorf("pending metad pod [%s/%s] deleted, reschedule", nc.Namespace, podName) + } + } + return nil +} diff --git a/pkg/controller/component/storaged_cluster.go b/pkg/controller/component/storaged_cluster.go index 06b25b2c..40383930 100644 --- a/pkg/controller/component/storaged_cluster.go +++ b/pkg/controller/component/storaged_cluster.go @@ -36,6 +36,7 @@ import ( "github.com/vesoft-inc/nebula-operator/apis/apps/v1alpha1" "github.com/vesoft-inc/nebula-operator/apis/pkg/annotation" + "github.com/vesoft-inc/nebula-operator/apis/pkg/label" "github.com/vesoft-inc/nebula-operator/pkg/kube" "github.com/vesoft-inc/nebula-operator/pkg/nebula" "github.com/vesoft-inc/nebula-operator/pkg/util/discovery" @@ -217,12 +218,12 @@ func (c *storagedCluster) syncStoragedWorkload(nc *v1alpha1.NebulaCluster) error } if nc.IsAutoFailoverEnabled() { - r, err := c.shouldRecover(nc) + r, hosts, err := c.shouldRecover(nc) if err != nil { return err } if r { - if err := c.failoverManager.Recovery(nc); err != nil { + if err := c.failoverManager.Recovery(nc, hosts); err != nil { return err } } else if nc.StoragedComponent().IsAutoFailovering() { @@ -309,7 +310,9 @@ func (c *storagedCluster) syncNebulaClusterStatus( nc.Status.Storaged.Phase = v1alpha1.UpdatePhase } - if !nc.IsAutoFailoverEnabled() { + workloadReplicas := getWorkloadReplicas(nc.Status.Storaged.Workload) + if !nc.IsAutoFailoverEnabled() || + pointer.Int32Deref(nc.Spec.Storaged.Replicas, 0) != workloadReplicas { return syncComponentStatus(nc.StoragedComponent(), &nc.Status.Storaged.ComponentStatus, oldWorkload) } @@ -335,8 +338,12 @@ func (c *storagedCluster) syncNebulaClusterStatus( host := hostItems[i] if host.Status == meta.HostStatus_OFFLINE && host.HostAddr.Port == thriftPort { podName := strings.Split(host.HostAddr.Host, ".")[0] + ordinal := getPodOrdinal(podName) + if int32(ordinal) >= pointer.Int32Deref(nc.Spec.Storaged.Replicas, 0) { + continue + } if nc.Status.Storaged.FailureHosts == nil { - nc.Status.Storaged.FailureHosts = make(map[string]v1alpha1.StoragedFailureHost) + nc.Status.Storaged.FailureHosts = make(map[string]v1alpha1.FailureHost) } fh, exists := nc.Status.Storaged.FailureHosts[podName] if exists { @@ -344,7 +351,8 @@ func (c *storagedCluster) syncNebulaClusterStatus( if time.Now().After(deadline) { if fh.ConfirmationTime.IsZero() { fh.ConfirmationTime = metav1.Time{Time: time.Now()} - pvcs, err := getPodPvcs(c.clientSet, nc, podName) + cl := label.New().Cluster(nc.GetClusterName()).Storaged() + _, pvcs, err := getPodAndPvcs(c.clientSet, nc, cl, podName) if err != nil { return err } @@ -354,23 +362,17 @@ func (c *storagedCluster) syncNebulaClusterStatus( } fh.PVCSet = pvcSet nc.Status.Storaged.FailureHosts[podName] = fh - if err := c.clientSet.NebulaCluster().UpdateNebulaCluster(nc); err != nil { - return err - } klog.Infof("storaged pod [%s/%s] failover period exceeds %s", nc.Namespace, podName, nc.Spec.FailoverPeriod.Duration.String()) } } continue } - failureHost := v1alpha1.StoragedFailureHost{ + failureHost := v1alpha1.FailureHost{ Host: host.HostAddr.Host, CreationTime: metav1.Time{Time: time.Now()}, } nc.Status.Storaged.FailureHosts[podName] = failureHost - if err := c.clientSet.NebulaCluster().UpdateNebulaCluster(nc); err != nil { - return err - } - klog.Infof("failure storage host %s found", host.HostAddr.Host) + klog.Infof("offline storage host %s found", host.HostAddr.Host) } } @@ -538,56 +540,64 @@ func (c *storagedCluster) updateZoneMappings(nc *v1alpha1.NebulaCluster, newRepl return nil } -// TODO support partially recovery -func (c *storagedCluster) shouldRecover(nc *v1alpha1.NebulaCluster) (bool, error) { +func (c *storagedCluster) shouldRecover(nc *v1alpha1.NebulaCluster) (bool, []string, error) { if nc.Status.Storaged.FailureHosts == nil { - return true, nil + return true, nil, nil } - fhSet := sets.New[string]() + m := make(map[string]string) for podName, fh := range nc.Status.Storaged.FailureHosts { pod, err := c.clientSet.Pod().GetPod(nc.Namespace, podName) - if err != nil { - klog.Errorf("storaged pod [%s/%s] does not exist: %v", nc.Namespace, podName, err) - return false, err + if err != nil && !apierrors.IsNotFound(err) { + return false, nil, err + } + if pod == nil { + continue } - if !isPodHealthy(pod) { - return false, nil + if isPodHealthy(pod) { + m[fh.Host] = podName } - fhSet.Insert(fh.Host) + } + if len(m) == 0 { + return false, nil, nil } options, err := nebula.ClientOptions(nc, nebula.SetIsMeta(true)) if err != nil { - return false, err + return false, nil, err } hosts := []string{nc.GetMetadThriftConnAddress()} metaClient, err := nebula.NewMetaClient(hosts, options...) if err != nil { - return false, err + return false, nil, err } defer func() { _ = metaClient.Disconnect() }() + spaces, err := metaClient.ListSpaces() + if err != nil { + return false, nil, err + } + onlineHosts := make([]string, 0) hostItems, err := metaClient.ListHosts(meta.ListHostType_STORAGE) if err != nil { - return false, err + return false, nil, err } thriftPort := nc.StoragedComponent().GetPort(v1alpha1.StoragedPortNameThrift) for _, host := range hostItems { - if fhSet.Has(host.HostAddr.Host) && - host.Status != meta.HostStatus_ONLINE && - host.HostAddr.Port == thriftPort { - return false, nil + podName, ok := m[host.HostAddr.Host] + fh, exists := nc.Status.Storaged.FailureHosts[podName] + balanced := pointer.BoolDeref(fh.DataBalanced, true) + if ok && host.Status == meta.HostStatus_ONLINE && host.HostAddr.Port == thriftPort { + if exists && len(spaces) > 0 && !balanced { + continue + } + onlineHosts = append(onlineHosts, podName) } } - - if nc.Status.Storaged.BalancedAfterFailover != nil && !*nc.Status.Storaged.BalancedAfterFailover { - return false, nil - } - - return true, nil + r := len(onlineHosts) > 0 + return r, onlineHosts, nil } type FakeStoragedCluster struct { diff --git a/pkg/controller/component/storaged_failover.go b/pkg/controller/component/storaged_failover.go index c25d629e..35954e0a 100644 --- a/pkg/controller/component/storaged_failover.go +++ b/pkg/controller/component/storaged_failover.go @@ -22,41 +22,19 @@ import ( "time" nebulago "github.com/vesoft-inc/nebula-go/v3/nebula" - corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" - podutils "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/vesoft-inc/nebula-operator/apis/apps/v1alpha1" - "github.com/vesoft-inc/nebula-operator/apis/pkg/annotation" "github.com/vesoft-inc/nebula-operator/apis/pkg/label" "github.com/vesoft-inc/nebula-operator/pkg/kube" "github.com/vesoft-inc/nebula-operator/pkg/nebula" - "github.com/vesoft-inc/nebula-operator/pkg/util/condition" utilerrors "github.com/vesoft-inc/nebula-operator/pkg/util/errors" ) -const ( - PVCProtectionFinalizer = "kubernetes.io/pvc-protection" - RestartTolerancePeriod = time.Minute * 2 -) - -var ( - nodeTaints = []*corev1.Taint{ - { - Key: corev1.TaintNodeUnreachable, - Effect: corev1.TaintEffectNoExecute, - }, - { - Key: corev1.TaintNodeNotReady, - Effect: corev1.TaintEffectNoExecute, - }, - } -) - type storagedFailover struct { client.Client clientSet kube.ClientSet @@ -74,8 +52,31 @@ func (s *storagedFailover) Failover(nc *v1alpha1.NebulaCluster) error { if err != nil { return err } + if len(readyPods) > 0 { - return utilerrors.ReconcileErrorf("storaged pods [%v] are ready after restarted", readyPods) + options, err := nebula.ClientOptions(nc, nebula.SetIsMeta(true)) + if err != nil { + return err + } + endpoints := []string{nc.GetMetadThriftConnAddress()} + metaClient, err := nebula.NewMetaClient(endpoints, options...) + if err != nil { + return err + } + defer func() { + err := metaClient.Disconnect() + if err != nil { + klog.Errorf("meta client disconnect failed: %v", err) + } + }() + + spaces, err := metaClient.ListSpaces() + if err != nil { + return err + } + if len(spaces) == 0 { + return utilerrors.ReconcileErrorf("storaged pods [%v] are ready after restarted", readyPods) + } } if err := s.deleteFailureHost(nc); err != nil { return err @@ -86,18 +87,17 @@ func (s *storagedFailover) Failover(nc *v1alpha1.NebulaCluster) error { if err := s.checkPendingPod(nc); err != nil { return err } - if !pointer.BoolDeref(nc.Status.Storaged.BalancedAfterFailover, false) { - if err := s.balanceData(nc); err != nil { - return err - } + if err := s.balanceData(nc); err != nil { + return err } return nil } -func (s *storagedFailover) Recovery(nc *v1alpha1.NebulaCluster) error { - nc.Status.Storaged.FailureHosts = nil - nc.Status.Storaged.BalancedAfterFailover = nil - klog.Infof("clearing storaged cluster [%s/%s] failure hosts", nc.GetNamespace(), nc.GetName()) +func (s *storagedFailover) Recovery(nc *v1alpha1.NebulaCluster, hosts []string) error { + for _, host := range hosts { + delete(nc.Status.Storaged.FailureHosts, host) + klog.Infof("clearing storaged cluster [%s/%s] failure host %s", nc.GetNamespace(), nc.GetName(), host) + } return nil } @@ -106,6 +106,21 @@ func (s *storagedFailover) tryRestartPod(nc *v1alpha1.NebulaCluster) error { if fh.PodRestarted { continue } + pod, err := s.clientSet.Pod().GetPod(nc.Namespace, podName) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + if pod == nil || isPodPending(pod) { + continue + } + node, err := s.clientSet.Node().GetNode(pod.Spec.NodeName) + if err != nil { + klog.Errorf("get node %s failed: %v", pod.Spec.NodeName, err) + return err + } + if isNodeDown(node) { + fh.NodeDown = true + } if err := s.clientSet.Pod().DeletePod(nc.Namespace, podName, true); err != nil { return err } @@ -119,24 +134,14 @@ func (s *storagedFailover) tryRestartPod(nc *v1alpha1.NebulaCluster) error { func (s *storagedFailover) toleratePods(nc *v1alpha1.NebulaCluster) ([]string, error) { readyPods := make([]string, 0) for podName, fh := range nc.Status.Storaged.FailureHosts { - if fh.HostDeleted { + if fh.PodRebuilt { continue } pod, err := s.clientSet.Pod().GetPod(nc.Namespace, podName) - if err != nil { + if err != nil && !apierrors.IsNotFound(err) { return nil, err } if pod != nil && isPodTerminating(pod) { - node, err := s.clientSet.Node().GetNode(pod.Spec.NodeName) - if err != nil { - klog.Errorf("get node %s failed: %v", pod.Spec.NodeName, err) - return nil, err - } - if isNodeDown(node) { - fh.NodeDown = true - nc.Status.Storaged.FailureHosts[podName] = fh - continue - } return nil, utilerrors.ReconcileErrorf("failure storaged pod [%s/%s] is deleting", nc.Namespace, podName) } if isPodHealthy(pod) { @@ -172,7 +177,7 @@ func (s *storagedFailover) deleteFailureHost(nc *v1alpha1.NebulaCluster) error { hosts := make([]*nebulago.HostAddr, 0) for _, fh := range nc.Status.Storaged.FailureHosts { - if fh.HostDeleted { + if pointer.BoolDeref(fh.HostDeleted, false) { continue } count, err := metaClient.GetLeaderCount(fh.Host) @@ -196,12 +201,16 @@ func (s *storagedFailover) deleteFailureHost(nc *v1alpha1.NebulaCluster) error { if err != nil { return err } - - if len(spaces) > 0 { - if nc.Status.Storaged.RemovedSpaces == nil { - nc.Status.Storaged.RemovedSpaces = make([]int32, 0, len(spaces)) + if len(spaces) == 0 { + for podName, fh := range nc.Status.Storaged.FailureHosts { + fh.HostDeleted = pointer.Bool(true) + nc.Status.Storaged.FailureHosts[podName] = fh } - nc.Status.Storaged.BalancedAfterFailover = pointer.Bool(false) + return utilerrors.ReconcileErrorf("try to remove storaged cluster [%s/%s] failure host for recovery", ns, componentName) + } + + if nc.Status.Storaged.RemovedSpaces == nil { + nc.Status.Storaged.RemovedSpaces = make([]int32, 0, len(spaces)) } for _, space := range spaces { if contains(nc.Status.Storaged.RemovedSpaces, *space.Id.SpaceID) { @@ -215,10 +224,8 @@ func (s *storagedFailover) deleteFailureHost(nc *v1alpha1.NebulaCluster) error { } for podName, fh := range nc.Status.Storaged.FailureHosts { - if fh.HostDeleted { - continue - } - fh.HostDeleted = true + fh.HostDeleted = pointer.Bool(true) + fh.DataBalanced = pointer.Bool(false) nc.Status.Storaged.FailureHosts[podName] = fh } @@ -228,11 +235,12 @@ func (s *storagedFailover) deleteFailureHost(nc *v1alpha1.NebulaCluster) error { } func (s *storagedFailover) deleteFailurePodAndPVC(nc *v1alpha1.NebulaCluster) error { + cl := label.New().Cluster(nc.GetClusterName()).Storaged() for podName, fh := range nc.Status.Storaged.FailureHosts { if fh.PodRebuilt { continue } - pod, pvcs, err := s.getPodAndPvcs(nc, podName) + pod, pvcs, err := getPodAndPvcs(s.clientSet, nc, cl, podName) if err != nil && !apierrors.IsNotFound(err) { return err } @@ -272,8 +280,9 @@ func (s *storagedFailover) deleteFailurePodAndPVC(nc *v1alpha1.NebulaCluster) er } func (s *storagedFailover) checkPendingPod(nc *v1alpha1.NebulaCluster) error { + cl := label.New().Cluster(nc.GetClusterName()).Storaged() for podName, fh := range nc.Status.Storaged.FailureHosts { - pod, pvcs, err := s.getPodAndPvcs(nc, podName) + pod, pvcs, err := getPodAndPvcs(s.clientSet, nc, cl, podName) if err != nil { return err } @@ -302,27 +311,23 @@ func (s *storagedFailover) checkPendingPod(nc *v1alpha1.NebulaCluster) error { return nil } -func (s *storagedFailover) getPodAndPvcs(nc *v1alpha1.NebulaCluster, podName string) (*corev1.Pod, []corev1.PersistentVolumeClaim, error) { - pod, err := s.clientSet.Pod().GetPod(nc.Namespace, podName) - if err != nil { - return nil, nil, err - } - pvcs, err := getPodPvcs(s.clientSet, nc, podName) - if err != nil { - return pod, nil, err - } - return pod, pvcs, nil -} - func (s *storagedFailover) balanceData(nc *v1alpha1.NebulaCluster) error { - for podName := range nc.Status.Storaged.FailureHosts { + podNames := make([]string, 0) + for podName, fh := range nc.Status.Storaged.FailureHosts { + if pointer.BoolDeref(fh.DataBalanced, true) { + continue + } pod, err := s.clientSet.Pod().GetPod(nc.Namespace, podName) if err != nil { return err } - if !podutils.IsPodReady(pod) { - return utilerrors.ReconcileErrorf("rebuilt storaged pod [%s/%s] is not ready", nc.Namespace, podName) + if !isPodHealthy(pod) { + return utilerrors.ReconcileErrorf("rebuilt storaged pod [%s/%s] is not healthy", nc.Namespace, podName) } + podNames = append(podNames, podName) + } + if len(podNames) == 0 { + return nil } options, err := nebula.ClientOptions(nc, nebula.SetIsMeta(true)) @@ -346,7 +351,11 @@ func (s *storagedFailover) balanceData(nc *v1alpha1.NebulaCluster) error { if err != nil { return err } - if len(spaces) > 0 && nc.Status.Storaged.BalancedSpaces == nil { + if len(spaces) == 0 { + return utilerrors.ReconcileErrorf("storaged cluster [%s/%s] data balanced for recovery", nc.Namespace, nc.Name) + } + + if nc.Status.Storaged.BalancedSpaces == nil { nc.Status.Storaged.BalancedSpaces = make([]int32, 0, len(spaces)) } for _, space := range spaces { @@ -358,49 +367,12 @@ func (s *storagedFailover) balanceData(nc *v1alpha1.NebulaCluster) error { } } - nc.Status.Storaged.BalancedSpaces = nil - nc.Status.Storaged.LastBalanceJob = nil - nc.Status.Storaged.BalancedAfterFailover = pointer.Bool(true) - return s.clientSet.NebulaCluster().UpdateNebulaCluster(nc) -} - -func getPodPvcs(clientSet kube.ClientSet, nc *v1alpha1.NebulaCluster, podName string) ([]corev1.PersistentVolumeClaim, error) { - cl := label.New().Cluster(nc.GetClusterName()).Storaged() - cl[annotation.AnnPodNameKey] = podName - pvcSelector, err := cl.Selector() - if err != nil { - return nil, err - } - pvcs, err := clientSet.PVC().ListPVCs(nc.Namespace, pvcSelector) - if err != nil && !apierrors.IsNotFound(err) { - return nil, err - } - return pvcs, nil -} - -func isNodeDown(node *corev1.Node) bool { - for i := range nodeTaints { - if taintExists(node.Spec.Taints, nodeTaints[i]) { - klog.Infof("node %s found taint %s", node.Name, nodeTaints[i].Key) - return true - } - } - if condition.IsNodeReadyFalseOrUnknown(&node.Status) { - klog.Infof("node %s is not ready", node.Name) - conditions := condition.GetNodeTrueConditions(&node.Status) - for i := range conditions { - klog.Infof("node %s condition type %s is true", conditions[i].Type) - } - return true + for podName, fh := range nc.Status.Storaged.FailureHosts { + fh.DataBalanced = pointer.Bool(true) + nc.Status.Storaged.FailureHosts[podName] = fh } - return false -} -func taintExists(taints []corev1.Taint, taintToFind *corev1.Taint) bool { - for _, taint := range taints { - if taint.MatchTaint(taintToFind) { - return true - } - } - return false + nc.Status.Storaged.BalancedSpaces = nil + nc.Status.Storaged.LastBalanceJob = nil + return utilerrors.ReconcileErrorf("storaged cluster [%s/%s] data balanced for recovery", nc.Namespace, nc.Name) } diff --git a/pkg/controller/component/storaged_scaler.go b/pkg/controller/component/storaged_scaler.go index 30301259..89cf08c9 100644 --- a/pkg/controller/component/storaged_scaler.go +++ b/pkg/controller/component/storaged_scaler.go @@ -112,7 +112,7 @@ func (ss *storageScaler) ScaleOut(nc *v1alpha1.NebulaCluster) error { nc.Status.Storaged.BalancedSpaces = nil nc.Status.Storaged.LastBalanceJob = nil nc.Status.Storaged.Phase = v1alpha1.RunningPhase - return nil + return ss.clientSet.NebulaCluster().UpdateNebulaClusterStatus(nc) } func (ss *storageScaler) ScaleIn(nc *v1alpha1.NebulaCluster, oldReplicas, newReplicas int32) error { @@ -233,5 +233,5 @@ func (ss *storageScaler) ScaleIn(nc *v1alpha1.NebulaCluster, oldReplicas, newRep nc.Status.Storaged.RemovedSpaces = nil nc.Status.Storaged.LastBalanceJob = nil nc.Status.Storaged.Phase = v1alpha1.RunningPhase - return nil + return ss.clientSet.NebulaCluster().UpdateNebulaClusterStatus(nc) } diff --git a/pkg/controller/nebulabackup/nebula_backup_manager.go b/pkg/controller/nebulabackup/nebula_backup_manager.go index fc0e3fd5..9cc678ec 100644 --- a/pkg/controller/nebulabackup/nebula_backup_manager.go +++ b/pkg/controller/nebulabackup/nebula_backup_manager.go @@ -34,8 +34,8 @@ import ( "github.com/vesoft-inc/nebula-operator/apis/apps/v1alpha1" "github.com/vesoft-inc/nebula-operator/apis/pkg/label" + "github.com/vesoft-inc/nebula-operator/pkg/credentials" "github.com/vesoft-inc/nebula-operator/pkg/kube" - "github.com/vesoft-inc/nebula-operator/pkg/remote" utilerrors "github.com/vesoft-inc/nebula-operator/pkg/util/errors" "github.com/vesoft-inc/nebula-operator/pkg/util/maputil" ) @@ -88,7 +88,7 @@ func (bm *backupManager) Clean(backup *v1alpha1.NebulaBackup) error { } if backup.Status.BackupName == "" { - klog.Infof("backup [%s/%s] remote backup is empty", backup.Namespace, backup.Name) + klog.Infof("backup [%s/%s] credentials backup is empty", backup.Namespace, backup.Name) return bm.clientSet.NebulaBackup().UpdateNebulaBackupStatus(backup, &v1alpha1.BackupCondition{ Type: v1alpha1.BackupClean, Status: corev1.ConditionTrue, @@ -243,7 +243,7 @@ func (bm *backupManager) ensureBackupJobFinished(backup *v1alpha1.NebulaBackup) } if backup.Status.BackupName == "" { - klog.Infof("backup [%s/%s] job %s is running, cleaner need sync remote backup", backup.Namespace, backup.Name, backupJobName) + klog.Infof("backup [%s/%s] job %s is running, cleaner need sync credentials backup", backup.Namespace, backup.Name, backupJobName) return false, nil } @@ -386,21 +386,21 @@ func (bm *backupManager) getSslFlags(cluster *v1alpha1.NebulaCluster) string { func (bm *backupManager) getStorageFlags(namespace string, provider v1alpha1.StorageProvider) (string, error) { var storageFlags string - storageType := remote.GetStorageType(provider) + storageType := credentials.GetStorageType(provider) switch storageType { case v1alpha1.ObjectStorageS3: - accessKey, secretKey, err := remote.GetS3Key(bm.clientSet, namespace, provider.S3.SecretName) + accessKey, secretKey, err := credentials.GetS3Key(bm.clientSet, namespace, provider.S3.SecretName) if err != nil { return "", fmt.Errorf("get S3 key failed: %v", err) } storageFlags = fmt.Sprintf(" --storage s3://%s --s3.region %s --s3.endpoint %s --s3.access_key %s --s3.secret_key %s", provider.S3.Bucket, provider.S3.Region, provider.S3.Endpoint, accessKey, secretKey) case v1alpha1.ObjectStorageGS: - credentials, err := remote.GetGsCredentials(bm.clientSet, namespace, provider.GS.SecretName) + gsCredentials, err := credentials.GetGsCredentials(bm.clientSet, namespace, provider.GS.SecretName) if err != nil { return "", fmt.Errorf("get GS credentials failed: %v", err) } - storageFlags = fmt.Sprintf(` --storage gs://%s --gs.credentials '%s'`, provider.GS.Bucket, credentials) + storageFlags = fmt.Sprintf(` --storage gs://%s --gs.credentials '%s'`, provider.GS.Bucket, gsCredentials) default: return "", fmt.Errorf("unknown storage type: %s", storageType) } diff --git a/pkg/controller/nebulacluster/nebula_cluster_controller.go b/pkg/controller/nebulacluster/nebula_cluster_controller.go index 08003c56..737a293f 100644 --- a/pkg/controller/nebulacluster/nebula_cluster_controller.go +++ b/pkg/controller/nebulacluster/nebula_cluster_controller.go @@ -67,6 +67,8 @@ func NewClusterReconciler(mgr ctrl.Manager, enableKruise bool) (*ClusterReconcil graphdUpdater := component.NewGraphdUpdater(clientSet.Pod()) metadUpdater := component.NewMetadUpdater(clientSet.Pod()) storagedUpdater := component.NewStoragedUpdater(clientSet) + graphdFailover := component.NewGraphdFailover(mgr.GetClient(), clientSet) + metadFailover := component.NewMetadFailover(mgr.GetClient(), clientSet) storagedFailover := component.NewStoragedFailover(mgr.GetClient(), clientSet) dm, err := discutil.New(mgr.GetConfig()) @@ -103,11 +105,13 @@ func NewClusterReconciler(mgr ctrl.Manager, enableKruise bool) (*ClusterReconcil clientSet, dm, graphdUpdater, + graphdFailover, recorder), component.NewMetadCluster( clientSet, dm, metadUpdater, + metadFailover, recorder), component.NewStoragedCluster( clientSet, diff --git a/pkg/controller/nebularestore/nebula_restore_manager.go b/pkg/controller/nebularestore/nebula_restore_manager.go index 49c319aa..823a3e69 100644 --- a/pkg/controller/nebularestore/nebula_restore_manager.go +++ b/pkg/controller/nebularestore/nebula_restore_manager.go @@ -37,9 +37,9 @@ import ( "github.com/vesoft-inc/nebula-go/v3/nebula/meta" "github.com/vesoft-inc/nebula-operator/apis/apps/v1alpha1" "github.com/vesoft-inc/nebula-operator/apis/pkg/annotation" + "github.com/vesoft-inc/nebula-operator/pkg/credentials" "github.com/vesoft-inc/nebula-operator/pkg/kube" "github.com/vesoft-inc/nebula-operator/pkg/nebula" - "github.com/vesoft-inc/nebula-operator/pkg/remote" "github.com/vesoft-inc/nebula-operator/pkg/util/async" "github.com/vesoft-inc/nebula-operator/pkg/util/condition" utilerrors "github.com/vesoft-inc/nebula-operator/pkg/util/errors" @@ -363,7 +363,7 @@ func (rm *restoreManager) genNebulaCluster(restoredName string, nr *v1alpha1.Neb func initRestoreAgent(clientSet kube.ClientSet, restore *v1alpha1.NebulaRestore) (*RestoreAgent, error) { backend := &pb.Backend{} - storageType := remote.GetStorageType(restore.Spec.Config.StorageProvider) + storageType := credentials.GetStorageType(restore.Spec.Config.StorageProvider) switch storageType { case v1alpha1.ObjectStorageS3: if err := backend.SetUri(fmt.Sprintf("s3://%s", restore.Spec.Config.S3.Bucket)); err != nil { @@ -371,7 +371,7 @@ func initRestoreAgent(clientSet kube.ClientSet, restore *v1alpha1.NebulaRestore) } backend.GetS3().Region = restore.Spec.Config.S3.Region backend.GetS3().Endpoint = restore.Spec.Config.S3.Endpoint - accessKey, secretKey, err := remote.GetS3Key(clientSet, restore.Namespace, restore.Spec.Config.S3.SecretName) + accessKey, secretKey, err := credentials.GetS3Key(clientSet, restore.Namespace, restore.Spec.Config.S3.SecretName) if err != nil { return nil, fmt.Errorf("get S3 key failed: %v", err) } @@ -381,11 +381,11 @@ func initRestoreAgent(clientSet kube.ClientSet, restore *v1alpha1.NebulaRestore) if err := backend.SetUri(fmt.Sprintf("gs://%s", restore.Spec.Config.GS.Bucket)); err != nil { return nil, err } - credentials, err := remote.GetGsCredentials(clientSet, restore.Namespace, restore.Spec.Config.GS.SecretName) + gsCredentials, err := credentials.GetGsCredentials(clientSet, restore.Namespace, restore.Spec.Config.GS.SecretName) if err != nil { return nil, fmt.Errorf("get GS credentials failed: %v", err) } - backend.GetGs().Credentials = credentials + backend.GetGs().Credentials = gsCredentials default: return nil, fmt.Errorf("unknown storage type: %s", storageType) } diff --git a/pkg/remote/remote.go b/pkg/credentials/credentials.go similarity index 99% rename from pkg/remote/remote.go rename to pkg/credentials/credentials.go index 9bc1b4b3..e94f3475 100644 --- a/pkg/remote/remote.go +++ b/pkg/credentials/credentials.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package remote +package credentials import ( "os"