diff --git a/cmd/kubenest/operator/app/options/options.go b/cmd/kubenest/operator/app/options/options.go index b80b75aa3..f29396f91 100644 --- a/cmd/kubenest/operator/app/options/options.go +++ b/cmd/kubenest/operator/app/options/options.go @@ -29,6 +29,8 @@ type KubeNestOptions struct { AdmissionPlugins bool ApiServerReplicas int ClusterCIDR string + ETCDStorageClass string + ETCDUnitSize string } func NewOptions() *Options { @@ -61,4 +63,6 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) { flags.BoolVar(&o.KubeNestOptions.AdmissionPlugins, "kube-nest-admission-plugins", false, "kube-apiserver network disable-admission-plugins, false for - --disable-admission-plugins=License, true for remove the --disable-admission-plugins=License flag .") flags.IntVar(&o.KubeNestOptions.ApiServerReplicas, "kube-nest-apiserver-replicas", 1, "virtual-cluster kube-apiserver replicas. default is 2.") flags.StringVar(&o.KubeNestOptions.ClusterCIDR, "cluster-cidr", "10.244.0.0/16", "Used to set the cluster-cidr of kube-controller-manager and kube-proxy (configmap)") + flags.StringVar(&o.KubeNestOptions.ETCDStorageClass, "etcd-storage-class", "openebs-hostpath", "Used to set the etcd storage class.") + flags.StringVar(&o.KubeNestOptions.ETCDUnitSize, "etcd-unit-size", "1Gi", "Used to set the etcd unit size, each node is allocated storage of etcd-unit-size.") } diff --git a/deploy/crds/kosmos.io_virtualclusters.yaml b/deploy/crds/kosmos.io_virtualclusters.yaml index 0f498c33d..fc750ce90 100644 --- a/deploy/crds/kosmos.io_virtualclusters.yaml +++ b/deploy/crds/kosmos.io_virtualclusters.yaml @@ -208,4 +208,5 @@ spec: type: object served: true storage: true - subresources: {} + subresources: + status: {} diff --git a/deploy/virtual-cluster-host-port-cm.yaml b/deploy/virtual-cluster-host-port-cm.yaml deleted file mode 100644 index 8c2da3cce..000000000 --- a/deploy/virtual-cluster-host-port-cm.yaml +++ /dev/null @@ -1,22 +0,0 @@ ---- -apiVersion: v1 -kind: ConfigMap -metadata: - name: kosmos-hostports - namespace: kosmos-system -data: - config.yaml: | - # ports allocate for virtual cluster api server,from 33001, increment by 1 for each virtual cluster.Be careful not to use ports that are already in use - portsPool: - - 33001 - - 33002 - - 33003 - - 33004 - - 33005 - - 33006 - - 33007 - - 33008 - - 33009 - - 33010 - # when port is allocate from pool,it will be used for virtual cluster api server,and the port will be released after virtual cluster is deleted - clusterPorts: \ No newline at end of file diff --git a/deploy/virtual-cluster-operator.yml b/deploy/virtual-cluster-operator.yml index 404e0f146..933bd4ed1 100644 --- a/deploy/virtual-cluster-operator.yml +++ b/deploy/virtual-cluster-operator.yml @@ -271,26 +271,3 @@ data: - 33008 - 33009 - 33010 ---- -apiVersion: v1 -data: - egress_selector_configuration.yaml: | - apiVersion: apiserver.k8s.io/v1beta1 - kind: EgressSelectorConfiguration - egressSelections: - - name: cluster - connection: - proxyProtocol: GRPC - transport: - uds: - udsName: /etc/kubernetes/konnectivity-server/konnectivity-server.socket - - name: master - connection: - proxyProtocol: Direct - - name: etcd - connection: - proxyProtocol: Direct -kind: ConfigMap -metadata: - name: kas-proxy-files - namespace: kas-proxy \ No newline at end of file diff --git a/hack/cluster.sh b/hack/cluster.sh index f39c3f48a..9fdab3cb4 100755 --- a/hack/cluster.sh +++ b/hack/cluster.sh @@ -33,9 +33,10 @@ function prepare_test_image() { docker pull percona:5.7 docker pull prom/mysqld-exporter:v0.13.0 else - docker pull docker.m.daocloud.io/bitpoke/mysql-operator-orchestrator:v0.6.3 - docker pull docker.m.daocloud.io/bitpoke/mysql-operator:v0.6.3 - docker pull docker.m.daocloud.io/bitpoke/mysql-operator-sidecar-5.7:v0.6.3 +# todo add bitpoke to m.daocloud.io's Whitelist + docker pull bitpoke/mysql-operator-orchestrator:v0.6.3 + docker pull bitpoke/mysql-operator:v0.6.3 + docker pull bitpoke/mysql-operator-sidecar-5.7:v0.6.3 docker pull docker.m.daocloud.io/nginx docker pull docker.m.daocloud.io/percona:5.7 docker pull docker.m.daocloud.io/prom/mysqld-exporter:v0.13.0 diff --git a/pkg/apis/kosmos/v1alpha1/virtualcluster_types.go b/pkg/apis/kosmos/v1alpha1/virtualcluster_types.go index 719f8cd33..a91735ecc 100644 --- a/pkg/apis/kosmos/v1alpha1/virtualcluster_types.go +++ b/pkg/apis/kosmos/v1alpha1/virtualcluster_types.go @@ -27,6 +27,7 @@ const ( // +genclient // +kubebuilder:resource:scope=Namespaced,shortName=vc +// +kubebuilder:subresource:status // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // +kubebuilder:printcolumn:name="STATUS",type=string,JSONPath=`.status.phase` // +kubebuilder:printcolumn:name="UPDATE-TIME",type=string,JSONPath=`.status.updateTime` diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/selector/labelselector.go b/pkg/kubenest/controller/virtualcluster.node.controller/selector/labelselector.go deleted file mode 100644 index acadb3845..000000000 --- a/pkg/kubenest/controller/virtualcluster.node.controller/selector/labelselector.go +++ /dev/null @@ -1,75 +0,0 @@ -package selector - -import ( - "fmt" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - - "github.com/kosmos.io/kosmos/pkg/kubenest/util" -) - -type ParserMatchFunc func(matchExpression metav1.LabelSelectorRequirement, metaLabels labels.Set) bool - -func GetFunc(operator metav1.LabelSelectorOperator) (ParserMatchFunc, error) { - switch operator { - case metav1.LabelSelectorOpDoesNotExist: - return ParseWithLabelSelectorOpDoesNotExist, nil - case metav1.LabelSelectorOpExists: - return ParseWithLabelSelectorOpExists, nil - case metav1.LabelSelectorOpNotIn: - return ParseWithLabelSelectorOpNotIn, nil - case metav1.LabelSelectorOpIn: - return ParseWithLabelSelectorOpIn, nil - default: - return nil, fmt.Errorf("unsupported operator %s", operator) - } -} - -func MatchesWithLabelSelector(metaLabels labels.Set, labelSelector *metav1.LabelSelector) (bool, error) { - if !util.MapContains(metaLabels, labelSelector.MatchLabels) { - return false, nil - } - - for _, expr := range labelSelector.MatchExpressions { - parseMatchFunc, err := GetFunc(expr.Operator) - if err != nil { - return false, err - } - if !parseMatchFunc(expr, metaLabels) { - return false, nil - } - } - return true, nil -} - -func ParseWithLabelSelectorOpDoesNotExist(matchExpression metav1.LabelSelectorRequirement, metaLabels labels.Set) bool { - return !metaLabels.Has(matchExpression.Key) -} - -func ParseWithLabelSelectorOpExists(matchExpression metav1.LabelSelectorRequirement, metaLabels labels.Set) bool { - return metaLabels.Has(matchExpression.Key) -} - -func ParseWithLabelSelectorOpNotIn(matchExpression metav1.LabelSelectorRequirement, metaLabels labels.Set) bool { - if !metaLabels.Has(matchExpression.Key) || !contains(matchExpression.Values, metaLabels[matchExpression.Key]) { - return true - } - return false -} - -func ParseWithLabelSelectorOpIn(matchExpression metav1.LabelSelectorRequirement, metaLabels labels.Set) bool { - if metaLabels.Has(matchExpression.Key) && contains(matchExpression.Values, metaLabels[matchExpression.Key]) { - return true - } - return false -} - -func contains(arr []string, s string) bool { - for _, str := range arr { - if str == s { - return true - } - } - return false -} diff --git a/pkg/kubenest/controller/virtualcluster_init_controller.go b/pkg/kubenest/controller/virtualcluster_init_controller.go index fcf7727bd..ff0132777 100644 --- a/pkg/kubenest/controller/virtualcluster_init_controller.go +++ b/pkg/kubenest/controller/virtualcluster_init_controller.go @@ -12,6 +12,7 @@ import ( "gopkg.in/yaml.v3" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" @@ -34,7 +35,6 @@ import ( "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/kubenest/constants" - "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/selector" "github.com/kosmos.io/kosmos/pkg/kubenest/util" ) @@ -116,6 +116,7 @@ func (c *VirtualClusterInitController) Reconcile(ctx context.Context, request re updatedCluster.Status.Phase = v1alpha1.Preparing err := c.Update(updatedCluster) if err != nil { + klog.Errorf("Error update virtualcluster %s status, err: %v", updatedCluster.Name, err) return reconcile.Result{RequeueAfter: RequeueTime}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name) } err = c.createVirtualCluster(updatedCluster, c.KubeNestOptions) @@ -233,6 +234,7 @@ func (c *VirtualClusterInitController) ensureFinalizer(virtualCluster *v1alpha1. err := c.Client.Update(context.TODO(), updated) if err != nil { klog.Errorf("update virtualcluster %s error. %v", virtualCluster.Name, err) + klog.Errorf("Failed to add finalizer to VirtualCluster %s/%s: %v", virtualCluster.Namespace, virtualCluster.Name, err) return reconcile.Result{Requeue: true}, err } @@ -257,6 +259,7 @@ func (c *VirtualClusterInitController) removeFinalizer(virtualCluster *v1alpha1. controllerutil.RemoveFinalizer(updated, VirtualClusterControllerFinalizer) err := c.Client.Update(context.TODO(), updated) if err != nil { + klog.Errorf("Failed to remove finalizer to VirtualCluster %s/%s: %v", virtualCluster.Namespace, virtualCluster.Name, err) return reconcile.Result{Requeue: true}, err } @@ -380,20 +383,33 @@ func (c *VirtualClusterInitController) checkPromotePoliciesChanged(virtualCluste return false, nil } +func IsLabelsMatchSelector(selector *metav1.LabelSelector, targetLabels labels.Set) (match bool, err error) { + if selector == nil { + return true, nil + } + sel, err := metav1.LabelSelectorAsSelector(selector) + if err != nil { + return false, err + } + + match = sel.Matches(targetLabels) + return match, nil +} + // nodesChangeCalculate calculate nodes changed when update virtualcluster. func (c *VirtualClusterInitController) assignNodesByPolicy(virtualCluster *v1alpha1.VirtualCluster, policy v1alpha1.PromotePolicy, policyMatchedGlobalNodes []v1alpha1.GlobalNode) ([]v1alpha1.NodeInfo, error) { nodesAssigned, err := retrieveAssignedNodesByPolicy(virtualCluster, policyMatchedGlobalNodes) if err != nil { - return nil, errors.Wrapf(err, "Parse assigned nodes by policy %s error", policy.LabelSelector.String()) + return nil, fmt.Errorf("parse assigned nodes by policy %v error", policy.LabelSelector) } requestNodesChanged := policy.NodeCount - int32(len(nodesAssigned)) if requestNodesChanged == 0 { - klog.V(2).Infof("Nothing to do for policy %s", policy.LabelSelector.String()) + klog.V(2).Infof("Nothing to do for policy %v", policy.LabelSelector) return nodesAssigned, nil } else if requestNodesChanged > 0 { // nodes needs to increase - klog.V(2).Infof("Try allocate %d nodes for policy %s", requestNodesChanged, policy.LabelSelector.String()) + klog.V(2).Infof("Try allocate %d nodes for policy %v", requestNodesChanged, policy.LabelSelector) var newAssignNodesIndex []int for i, globalNode := range policyMatchedGlobalNodes { if globalNode.Spec.State == v1alpha1.NodeFreeState { @@ -404,20 +420,20 @@ func (c *VirtualClusterInitController) assignNodesByPolicy(virtualCluster *v1alp } } if int32(len(newAssignNodesIndex)) < requestNodesChanged { - return nodesAssigned, errors.Errorf("There is not enough work nodes for promotepolicy %s. Desired %d, matched %d", policy.LabelSelector.String(), requestNodesChanged, len(newAssignNodesIndex)) + return nodesAssigned, errors.Errorf("There is not enough work nodes for promotepolicy %v. Desired %d, matched %d", policy.LabelSelector, requestNodesChanged, len(newAssignNodesIndex)) } for _, index := range newAssignNodesIndex { - klog.V(2).Infof("Assign node %s for virtualcluster %s policy %s", policyMatchedGlobalNodes[index].Name, virtualCluster.GetName(), policy.LabelSelector.String()) + klog.V(2).Infof("Assign node %s for virtualcluster %s policy %v", policyMatchedGlobalNodes[index].Name, virtualCluster.GetName(), policy.LabelSelector) nodesAssigned = append(nodesAssigned, v1alpha1.NodeInfo{ NodeName: policyMatchedGlobalNodes[index].Name, }) } } else { // nodes needs to decrease - klog.V(2).Infof("Try decrease nodes %d for policy %s", -requestNodesChanged, policy.LabelSelector.String()) + klog.V(2).Infof("Try decrease nodes %d for policy %v", -requestNodesChanged, policy.LabelSelector) decrease := int(-requestNodesChanged) if len(nodesAssigned) < decrease { - return nil, errors.Errorf("Illegal work nodes decrease operation for promotepolicy %s. Desired %d, matched %d", policy.LabelSelector.String(), decrease, len(nodesAssigned)) + return nil, errors.Errorf("Illegal work nodes decrease operation for promotepolicy %v. Desired %d, matched %d", policy.LabelSelector, decrease, len(nodesAssigned)) } nodesAssigned = nodesAssigned[:len(nodesAssigned)-decrease] // note: node pool will not be modified here. NodeController will modify it when node delete success @@ -437,10 +453,24 @@ func retrieveAssignedNodesByPolicy(virtualCluster *v1alpha1.VirtualCluster, poli return nodesAssignedMatchedPolicy, nil } +func matchesWithLabelSelector(metaLabels labels.Set, labelSelector *metav1.LabelSelector) (bool, error) { + if labelSelector == nil { + return true, nil + } + + sel, err := metav1.LabelSelectorAsSelector(labelSelector) + if err != nil { + return false, err + } + + match := sel.Matches(metaLabels) + return match, nil +} + func retrieveGlobalNodesWithLabelSelector(nodes []v1alpha1.GlobalNode, labelSelector *metav1.LabelSelector) ([]v1alpha1.GlobalNode, error) { matchedNodes := make([]v1alpha1.GlobalNode, 0) for _, node := range nodes { - matched, err := selector.MatchesWithLabelSelector(node.Spec.Labels, labelSelector) + matched, err := matchesWithLabelSelector(node.Spec.Labels, labelSelector) if err != nil { return nil, err } @@ -457,7 +487,8 @@ func (c *VirtualClusterInitController) setGlobalNodeUsageStatus(virtualCluster * if err != nil { if apierrors.IsNotFound(err) { klog.Errorf("globalnode %s not found. This should not happen normally", node.Name) - return nil // 如果节点不存在,则不执行更新并返回nil + // 如果节点不存在,则不执行更新并返回nil + return nil } return fmt.Errorf("failed to get globalNode %s: %v", node.Name, err) } diff --git a/pkg/kubenest/controlplane/etcd.go b/pkg/kubenest/controlplane/etcd.go index 2c9b0e9bd..77f023ff8 100644 --- a/pkg/kubenest/controlplane/etcd.go +++ b/pkg/kubenest/controlplane/etcd.go @@ -6,17 +6,21 @@ import ( "github.com/pkg/errors" appsv1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/util/yaml" clientset "k8s.io/client-go/kubernetes" "k8s.io/component-base/cli/flag" + "k8s.io/klog" + ko "github.com/kosmos.io/kosmos/cmd/kubenest/operator/app/options" + "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/kubenest/constants" "github.com/kosmos.io/kosmos/pkg/kubenest/manifest/controlplane/etcd" "github.com/kosmos.io/kosmos/pkg/kubenest/util" ) -func EnsureVirtualClusterEtcd(client clientset.Interface, name, namespace string) error { - if err := installEtcd(client, name, namespace); err != nil { +func EnsureVirtualClusterEtcd(client clientset.Interface, name, namespace string, ko *ko.KubeNestOptions, vc *v1alpha1.VirtualCluster) error { + if err := installEtcd(client, name, namespace, ko, vc); err != nil { return err } return nil @@ -30,8 +34,17 @@ func DeleteVirtualClusterEtcd(client clientset.Interface, name, namespace string return nil } -func installEtcd(client clientset.Interface, name, namespace string) error { +func installEtcd(client clientset.Interface, name, namespace string, ko *ko.KubeNestOptions, vc *v1alpha1.VirtualCluster) error { imageRepository, imageVersion := util.GetImageMessage() + + nodeCount := getNodeCountFromPromotePolicy(vc) + resourceQuantity, err := resource.ParseQuantity(ko.ETCDUnitSize) + if err != nil { + klog.Errorf("Failed to parse quantity %s: %v", ko.ETCDUnitSize, err) + return err + } + resourceQuantity.Set(resourceQuantity.Value() * int64(nodeCount)) + initialClusters := make([]string, constants.EtcdReplicas) for index := range initialClusters { memberName := fmt.Sprintf("%s-%d", fmt.Sprintf("%s-%s", name, "etcd"), index) @@ -51,6 +64,7 @@ func installEtcd(client clientset.Interface, name, namespace string) error { CertsSecretName, EtcdPeerServiceName string InitialCluster, EtcdDataVolumeName, EtcdCipherSuites string Replicas, EtcdListenClientPort, EtcdListenPeerPort int32 + ETCDStorageClass, ETCDStorageSize string }{ StatefulSetName: fmt.Sprintf("%s-%s", name, "etcd"), Namespace: namespace, @@ -65,6 +79,8 @@ func installEtcd(client clientset.Interface, name, namespace string) error { Replicas: constants.EtcdReplicas, EtcdListenClientPort: constants.EtcdListenClientPort, EtcdListenPeerPort: constants.EtcdListenPeerPort, + ETCDStorageClass: ko.ETCDStorageClass, + ETCDStorageSize: resourceQuantity.String(), }) if err != nil { return fmt.Errorf("error when parsing Etcd statefuelset template: %w", err) @@ -81,3 +97,11 @@ func installEtcd(client clientset.Interface, name, namespace string) error { return nil } + +func getNodeCountFromPromotePolicy(vc *v1alpha1.VirtualCluster) int32 { + var nodeCount int32 + for _, policy := range vc.Spec.PromotePolicies { + nodeCount = nodeCount + policy.NodeCount + } + return nodeCount +} diff --git a/pkg/kubenest/init.go b/pkg/kubenest/init.go index 844358cb5..2fd8b886a 100644 --- a/pkg/kubenest/init.go +++ b/pkg/kubenest/init.go @@ -39,6 +39,8 @@ type initData struct { hostPortMap map[string]int32 kubeNestOptions *ko.KubeNestOptions virtualCluster *v1alpha1.VirtualCluster + ETCDStorageClass string + ETCDUnitSize string } type InitOptions struct { @@ -190,6 +192,8 @@ func newRunData(opt *InitOptions) (*initData, error) { hostPortMap: opt.virtualCluster.Status.PortMap, kubeNestOptions: opt.KubeNestOptions, virtualCluster: opt.virtualCluster, + ETCDUnitSize: opt.KubeNestOptions.ETCDUnitSize, + ETCDStorageClass: opt.KubeNestOptions.ETCDStorageClass, }, nil } @@ -241,8 +245,8 @@ func (i initData) DataDir() string { return i.virtualClusterDataDir } -func (i initData) VirtualClusterVersion() string { - return i.virtualClusterVersion.String() +func (i initData) VirtualCluster() *v1alpha1.VirtualCluster { + return i.virtualCluster } func (i initData) ExternalIP() string { diff --git a/pkg/kubenest/manifest/controlplane/etcd/mainfests_deployment.go b/pkg/kubenest/manifest/controlplane/etcd/mainfests_deployment.go index 35757fcb1..b090bf6e0 100644 --- a/pkg/kubenest/manifest/controlplane/etcd/mainfests_deployment.go +++ b/pkg/kubenest/manifest/controlplane/etcd/mainfests_deployment.go @@ -111,7 +111,7 @@ spec: - ReadWriteOnce resources: requests: - storage: 1Gi - storageClassName: openebs-hostpath + storage: {{ .ETCDStorageSize }} + storageClassName: {{ .ETCDStorageClass }} ` ) diff --git a/pkg/kubenest/tasks/data.go b/pkg/kubenest/tasks/data.go index 02c9a37e2..edf63616f 100644 --- a/pkg/kubenest/tasks/data.go +++ b/pkg/kubenest/tasks/data.go @@ -5,6 +5,7 @@ import ( clientset "k8s.io/client-go/kubernetes" ko "github.com/kosmos.io/kosmos/cmd/kubenest/operator/app/options" + "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/kubenest/util/cert" ) @@ -18,7 +19,7 @@ type InitData interface { RemoteClient() clientset.Interface KosmosClient() versioned.Interface DataDir() string - VirtualClusterVersion() string + VirtualCluster() *v1alpha1.VirtualCluster ExternalIP() string HostPort() int32 HostPortMap() map[string]int32 diff --git a/pkg/kubenest/tasks/etcd.go b/pkg/kubenest/tasks/etcd.go index 07264f7d5..38593c04f 100644 --- a/pkg/kubenest/tasks/etcd.go +++ b/pkg/kubenest/tasks/etcd.go @@ -56,7 +56,7 @@ func runDeployEtcd(r workflow.RunData) error { return errors.New("deploy-etcd task invoked with an invalid data struct") } - err := controlplane.EnsureVirtualClusterEtcd(data.RemoteClient(), data.GetName(), data.GetNamespace()) + err := controlplane.EnsureVirtualClusterEtcd(data.RemoteClient(), data.GetName(), data.GetNamespace(), data.KubeNestOpt(), data.VirtualCluster()) if err != nil { return fmt.Errorf("failed to install etcd component, err: %w", err) }