From 6488475ee1b58ccc73cedb9ee69b83319f974c18 Mon Sep 17 00:00:00 2001 From: wangyizhi1 Date: Thu, 19 Oct 2023 10:04:38 +0800 Subject: [PATCH] Merge pull request #103 from qiuming520/main add kosmos-scheduler for kube-scheduler v1.26 --- Makefile | 2 + cmd/scheduler/main.go | 4 +- deploy/scheduler/deployment.yaml | 101 ++++++ deploy/scheduler/rbac.yaml | 170 +++++++++ hack/update-codegen.sh | 21 +- hack/util.sh | 1 + pkg/apis/config/doc.go | 20 ++ pkg/apis/config/register.go | 50 +++ pkg/apis/config/scheme/scheme.go | 45 +++ pkg/apis/config/types.go | 59 ++++ pkg/apis/config/v1/defaults.go | 27 ++ pkg/apis/config/v1/doc.go | 23 ++ pkg/apis/config/v1/register.go | 52 +++ pkg/apis/config/v1/types.go | 60 ++++ pkg/apis/config/v1/zz_generated.conversion.go | 93 +++++ pkg/apis/config/v1/zz_generated.deepcopy.go | 61 ++++ pkg/apis/config/v1/zz_generated.defaults.go | 17 + pkg/apis/config/zz_generated.deepcopy.go | 56 +++ pkg/clustertree/knode-manager/knode.go | 193 +++++------ pkg/clustertree/knode-manager/knodemanager.go | 27 +- pkg/clustertree/knode-manager/utils/util.go | 118 +++++++ .../lifted/helpers/tainttoleration.go | 17 +- .../knode_tainttoleration.go | 40 +-- .../knode_volume_binding.go | 322 ++++++++++++++++++ 24 files changed, 1424 insertions(+), 155 deletions(-) create mode 100644 deploy/scheduler/deployment.yaml create mode 100644 deploy/scheduler/rbac.yaml create mode 100644 pkg/apis/config/doc.go create mode 100644 pkg/apis/config/register.go create mode 100644 pkg/apis/config/scheme/scheme.go create mode 100644 pkg/apis/config/types.go create mode 100644 pkg/apis/config/v1/defaults.go create mode 100644 pkg/apis/config/v1/doc.go create mode 100644 pkg/apis/config/v1/register.go create mode 100644 pkg/apis/config/v1/types.go create mode 100644 pkg/apis/config/v1/zz_generated.conversion.go create mode 100644 pkg/apis/config/v1/zz_generated.deepcopy.go create mode 100644 pkg/apis/config/v1/zz_generated.defaults.go create mode 100644 pkg/apis/config/zz_generated.deepcopy.go create mode 100644 pkg/scheduler/lifted/plugins/knodevolumebinding/knode_volume_binding.go diff --git a/Makefile b/Makefile index e2360b467..814f14ed8 100644 --- a/Makefile +++ b/Makefile @@ -16,6 +16,7 @@ TARGETS := clusterlink-controller-manager \ clusterlink-network-manager \ clusterlink-proxy \ clustertree-knode-manager \ + scheduler \ CTL_TARGETS := kosmosctl @@ -106,6 +107,7 @@ upload-images: images docker push ${REGISTRY}/clusterlink-floater:${VERSION} docker push ${REGISTRY}/clusterlink-elector:${VERSION} docker push ${REGISTRY}/clustertree-knode-manager:${VERSION} + docker push ${REGISTRY}/scheduler:${VERSION} .PHONY: release release: diff --git a/cmd/scheduler/main.go b/cmd/scheduler/main.go index 637d5368a..f1e8041c3 100644 --- a/cmd/scheduler/main.go +++ b/cmd/scheduler/main.go @@ -6,7 +6,9 @@ import ( "k8s.io/component-base/cli" "k8s.io/kubernetes/cmd/kube-scheduler/app" + _ "github.com/kosmos.io/kosmos/pkg/apis/config/scheme" "github.com/kosmos.io/kosmos/pkg/scheduler/lifted/plugins/knodetainttoleration" + "github.com/kosmos.io/kosmos/pkg/scheduler/lifted/plugins/knodevolumebinding" ) func main() { @@ -15,8 +17,8 @@ func main() { // used by various kinds of workloads. command := app.NewSchedulerCommand( app.WithPlugin(knodetainttoleration.Name, knodetainttoleration.New), + app.WithPlugin(knodevolumebinding.Name, knodevolumebinding.New), ) - code := cli.Run(command) os.Exit(code) } diff --git a/deploy/scheduler/deployment.yaml b/deploy/scheduler/deployment.yaml new file mode 100644 index 000000000..bd50e07d7 --- /dev/null +++ b/deploy/scheduler/deployment.yaml @@ -0,0 +1,101 @@ +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: kosmos-scheduler + namespace: kosmos-system + labels: + component: scheduler +spec: + replicas: 1 + selector: + matchLabels: + component: scheduler + template: + metadata: + labels: + component: scheduler + spec: + volumes: + - name: scheduler-config + configMap: + name: scheduler-config + defaultMode: 420 + containers: + - name: kosmos-scheduler + image: ghcr.io/kosmos-io/scheduler:0.0.2 + command: + - scheduler + - --leader-elect=true + - --leader-elect-resource-name=kosmos-scheduler + - --leader-elect-resource-namespace=kosmos-system + - --config=/etc/kubernetes/kube-scheduler/scheduler-config.yaml + resources: + requests: + cpu: 200m + volumeMounts: + - name: scheduler-config + readOnly: true + mountPath: /etc/kubernetes/kube-scheduler + livenessProbe: + httpGet: + path: /healthz + port: 10259 + scheme: HTTPS + initialDelaySeconds: 15 + periodSeconds: 10 + failureThreshold: 3 + readinessProbe: + httpGet: + path: /healthz + port: 10259 + scheme: HTTPS + restartPolicy: Always + dnsPolicy: ClusterFirst + serviceAccountName: kosmos-scheduler + serviceAccount: kosmos-scheduler + +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: scheduler-config + namespace: kosmos-system +data: + scheduler-config.yaml: | + apiVersion: kubescheduler.config.k8s.io/v1 + kind: KubeSchedulerConfiguration + leaderElection: + leaderElect: true + profiles: + - schedulerName: kosmos-scheduler + plugins: + preFilter: + disabled: + - name: "VolumeBinding" + enabled: + - name: "KnodeVolumeBinding" + filter: + disabled: + - name: "VolumeBinding" + - name: "TaintToleration" + enabled: + - name: "KNodeTaintToleration" + - name: "KnodeVolumeBinding" + score: + disabled: + - name: "VolumeBinding" + reserve: + disabled: + - name: "VolumeBinding" + enabled: + - name: "KnodeVolumeBinding" + preBind: + disabled: + - name: "VolumeBinding" + enabled: + - name: "KnodeVolumeBinding" + pluginConfig: + - name: KnodeVolumeBinding + args: + bindTimeoutSeconds: 5 \ No newline at end of file diff --git a/deploy/scheduler/rbac.yaml b/deploy/scheduler/rbac.yaml new file mode 100644 index 000000000..0f008445d --- /dev/null +++ b/deploy/scheduler/rbac.yaml @@ -0,0 +1,170 @@ +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: kosmos-scheduler + namespace: kosmos-system + +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: kosmos-scheduler +subjects: + - kind: ServiceAccount + name: kosmos-scheduler + namespace: kosmos-system +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: kosmos-scheduler + +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: kosmos-scheduler +rules: + - verbs: + - create + - patch + - update + apiGroups: + - '' + - events.k8s.io + resources: + - events + - verbs: + - create + apiGroups: + - coordination.k8s.io + resources: + - leases + - verbs: + - get + - update + apiGroups: + - coordination.k8s.io + resources: + - leases + resourceNames: + - kosmos-scheduler + - verbs: + - create + apiGroups: + - '' + resources: + - endpoints + - verbs: + - get + - update + apiGroups: + - '' + resources: + - endpoints + resourceNames: + - kube-scheduler + - verbs: + - get + - list + - watch + apiGroups: + - '' + resources: + - nodes + - verbs: + - delete + - get + - list + - watch + apiGroups: + - '' + resources: + - pods + - verbs: + - create + apiGroups: + - '' + resources: + - bindings + - pods/binding + - verbs: + - patch + - update + apiGroups: + - '' + resources: + - pods/status + - verbs: + - get + - list + - watch + apiGroups: + - '' + resources: + - replicationcontrollers + - services + - verbs: + - get + - list + - watch + apiGroups: + - apps + - extensions + resources: + - replicasets + - verbs: + - get + - list + - watch + apiGroups: + - apps + resources: + - statefulsets + - verbs: + - get + - list + - watch + apiGroups: + - policy + resources: + - poddisruptionbudgets + - verbs: + - get + - list + - watch + - update + apiGroups: + - '' + resources: + - persistentvolumeclaims + - persistentvolumes + - verbs: + - create + apiGroups: + - authentication.k8s.io + resources: + - tokenreviews + - verbs: + - create + apiGroups: + - authorization.k8s.io + resources: + - subjectaccessreviews + - verbs: + - get + - list + - watch + apiGroups: + - storage.k8s.io + resources: + - '*' + - verbs: + - get + - list + - watch + apiGroups: + - '' + resources: + - configmaps + - namespaces \ No newline at end of file diff --git a/hack/update-codegen.sh b/hack/update-codegen.sh index 1f37a989f..1bcb11246 100755 --- a/hack/update-codegen.sh +++ b/hack/update-codegen.sh @@ -40,27 +40,27 @@ create_gopath_tree "${REPO_ROOT}" "${link_path}" deepcopy-gen \ --go-header-file hack/boilerplate/boilerplate.go.txt \ - --input-dirs=github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1 \ + --input-dirs="github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1,github.com/kosmos.io/kosmos/pkg/apis/config,github.com/kosmos.io/kosmos/pkg/apis/config/v1" \ --output-base="${REPO_ROOT}" \ - --output-package=pkg/apis/kosmos/v1alpha1 \ + --output-package="pkg/apis/kosmos/v1alpha1,pkg/apis/config,pkg/apis/config/v1" \ --output-file-base=zz_generated.deepcopy echo "Generating with register-gen" GO111MODULE=on go install k8s.io/code-generator/cmd/register-gen register-gen \ --go-header-file hack/boilerplate/boilerplate.go.txt \ - --input-dirs=github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1 \ + --input-dirs="github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" \ --output-base="${REPO_ROOT}" \ - --output-package=pkg/apis/kosmos/v1alpha1 \ + --output-package="pkg/apis/kosmos/v1alpha1" \ --output-file-base=zz_generated.register echo "Generating with conversion-gen" GO111MODULE=on go install k8s.io/code-generator/cmd/conversion-gen conversion-gen \ --go-header-file hack/boilerplate/boilerplate.go.txt \ - --input-dirs=github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1 \ + --input-dirs="github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1,github.com/kosmos.io/kosmos/pkg/apis/config/v1" \ --output-base="${REPO_ROOT}" \ - --output-package=github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1 \ + --output-package="github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1,pkg/apis/config/v1" \ --output-file-base=zz_generated.conversion echo "Generating with client-gen" @@ -73,6 +73,15 @@ client-gen \ --output-package=github.com/kosmos.io/kosmos/pkg/generated/clientset \ --clientset-name=versioned +echo "Generating with defaults-gen" +GO111MODULE=on go install k8s.io/code-generator/cmd/defaulter-gen +defaulter-gen \ + --go-header-file hack/boilerplate/boilerplate.go.txt \ + --input-dirs="github.com/kosmos.io/kosmos/pkg/apis/config/v1" \ + --output-base="${REPO_ROOT}" \ + --output-package="pkg/apis/config/v1" \ + --output-file-base=zz_generated.defaults + echo "Generating with lister-gen" GO111MODULE=on go install k8s.io/code-generator/cmd/lister-gen lister-gen \ diff --git a/hack/util.sh b/hack/util.sh index dcb3eee32..71877505e 100755 --- a/hack/util.sh +++ b/hack/util.sh @@ -13,6 +13,7 @@ CLUSTERLINK_GO_PACKAGE="github.com/kosmos.io/kosmos" MIN_Go_VERSION=go1.19.0 CLUSTERLINK_TARGET_SOURCE=( + scheduler=cmd/scheduler clusterlink-proxy=cmd/clusterlink/proxy clusterlink-operator=cmd/clusterlink/operator clusterlink-elector=cmd/clusterlink/elector diff --git a/pkg/apis/config/doc.go b/pkg/apis/config/doc.go new file mode 100644 index 000000000..c26a611f6 --- /dev/null +++ b/pkg/apis/config/doc.go @@ -0,0 +1,20 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// +k8s:deepcopy-gen=package +// +groupName=kubescheduler.config.k8s.io + +package config // import "github.com/kosmos.io/kosmos/pkg/apis" diff --git a/pkg/apis/config/register.go b/pkg/apis/config/register.go new file mode 100644 index 000000000..033158338 --- /dev/null +++ b/pkg/apis/config/register.go @@ -0,0 +1,50 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + schedscheme "k8s.io/kubernetes/pkg/scheduler/apis/config" +) + +// GroupName is the group name used in this package +const GroupName = "kubescheduler.config.k8s.io" + +// SchemeGroupVersion is group version used to register these objects +var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: runtime.APIVersionInternal} + +var ( + localSchemeBuilder = &schedscheme.SchemeBuilder + // AddToScheme is a global function that registers this API group & version to a scheme + AddToScheme = localSchemeBuilder.AddToScheme +) + +// addKnownTypes registers known types to the given scheme +func addKnownTypes(scheme *runtime.Scheme) error { + scheme.AddKnownTypes(SchemeGroupVersion, + &KnodeVolumeBindingArgs{}, + ) + return nil +} + +func init() { + // We only register manually written functions here. The registration of the + // generated functions takes place in the generated files. The separation + // makes the code compile even when the generated files are missing. + localSchemeBuilder.Register(addKnownTypes) +} diff --git a/pkg/apis/config/scheme/scheme.go b/pkg/apis/config/scheme/scheme.go new file mode 100644 index 000000000..5b8566387 --- /dev/null +++ b/pkg/apis/config/scheme/scheme.go @@ -0,0 +1,45 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheme + +import ( + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + kubeschedulerscheme "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme" + + "github.com/kosmos.io/kosmos/pkg/apis/config" + configv1 "github.com/kosmos.io/kosmos/pkg/apis/config/v1" +) + +var ( + // Re-use the in-tree Scheme. + Scheme = kubeschedulerscheme.Scheme + + // Codecs provides access to encoding and decoding for the scheme. + Codecs = serializer.NewCodecFactory(Scheme, serializer.EnableStrict) +) + +func init() { + AddToScheme(Scheme) +} + +// AddToScheme builds the kubescheduler scheme using all known versions of the kubescheduler api. +func AddToScheme(scheme *runtime.Scheme) { + utilruntime.Must(config.AddToScheme(scheme)) + utilruntime.Must(configv1.AddToScheme(scheme)) +} diff --git a/pkg/apis/config/types.go b/pkg/apis/config/types.go new file mode 100644 index 000000000..d79c27f2b --- /dev/null +++ b/pkg/apis/config/types.go @@ -0,0 +1,59 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// MaxCustomPriorityScore is the max score UtilizationShapePoint expects. +const MaxCustomPriorityScore int64 = 10 + +// UtilizationShapePoint represents a single point of a priority function shape. +type UtilizationShapePoint struct { + // Utilization (x axis). Valid values are 0 to 100. Fully utilized node maps to 100. + Utilization int32 + // Score assigned to a given utilization (y axis). Valid values are 0 to 10. + Score int32 +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// KnodeVolumeBindingArgs holds arguments used to configure the KnodeVolumeBindingArgs plugin. +type KnodeVolumeBindingArgs struct { + metav1.TypeMeta `json:",inline"` + + // BindTimeoutSeconds is the timeout in seconds in volume binding operation. + // Value must be non-negative integer. The value zero indicates no waiting. + // If this value is nil, the default value will be used. + BindTimeoutSeconds int64 `json:"bindTimeoutSeconds,omitempty"` + + // Shape specifies the points defining the score function shape, which is + // used to score nodes based on the utilization of statically provisioned + // PVs. The utilization is calculated by dividing the total requested + // storage of the pod by the total capacity of feasible PVs on each node. + // Each point contains utilization (ranges from 0 to 100) and its + // associated score (ranges from 0 to 10). You can turn the priority by + // specifying different scores for different utilization numbers. + // The default shape points are: + // 1) 0 for 0 utilization + // 2) 10 for 100 utilization + // All points must be sorted in increasing order by utilization. + // +featureGate=VolumeCapacityPriority + // +optional + Shape []UtilizationShapePoint +} diff --git a/pkg/apis/config/v1/defaults.go b/pkg/apis/config/v1/defaults.go new file mode 100644 index 000000000..c7323c5b1 --- /dev/null +++ b/pkg/apis/config/v1/defaults.go @@ -0,0 +1,27 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// +k8s:defaulter-gen=true + +package v1 + +var ( + defaultBindTimeoutSeconds int64 = 600 +) + +func SetDefaultsVolumeBindingArgs(obj *KnodeVolumeBindingArgs) { + obj.BindTimeoutSeconds = &defaultBindTimeoutSeconds +} diff --git a/pkg/apis/config/v1/doc.go b/pkg/apis/config/v1/doc.go new file mode 100644 index 000000000..fd702391e --- /dev/null +++ b/pkg/apis/config/v1/doc.go @@ -0,0 +1,23 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// +k8s:deepcopy-gen=package +// +k8s:conversion-gen=github.com/kosmos.io/kosmos/pkg/apis/config +// +k8s:defaulter-gen=TypeMeta +// +groupName=kubescheduler.config.k8s.io + +// Package v1 is the v1 version of the API. +package v1 // import "github.com/kosmos.io/kosmos/pkg/apis/v1" diff --git a/pkg/apis/config/v1/register.go b/pkg/apis/config/v1/register.go new file mode 100644 index 000000000..fb46ccf87 --- /dev/null +++ b/pkg/apis/config/v1/register.go @@ -0,0 +1,52 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +import ( + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + schedschemev1 "k8s.io/kube-scheduler/config/v1" +) + +// GroupName is the group name used in this package +const GroupName = "kubescheduler.config.k8s.io" + +// SchemeGroupVersion is group version used to register these objects +var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1"} + +var ( + localSchemeBuilder = &schedschemev1.SchemeBuilder + // AddToScheme is a global function that registers this API group & version to a scheme + AddToScheme = localSchemeBuilder.AddToScheme +) + +// addKnownTypes registers known types to the given scheme +func addKnownTypes(scheme *runtime.Scheme) error { + scheme.AddKnownTypes(SchemeGroupVersion, + &KnodeVolumeBindingArgs{}, + ) + return nil +} + +func init() { + // We only register manually written functions here. The registration of the + // generated functions takes place in the generated files. The separation + // makes the code compile even when the generated files are missing. + localSchemeBuilder.Register(addKnownTypes) + localSchemeBuilder.Register(RegisterDefaults) + localSchemeBuilder.Register(RegisterConversions) +} diff --git a/pkg/apis/config/v1/types.go b/pkg/apis/config/v1/types.go new file mode 100644 index 000000000..0ac96263e --- /dev/null +++ b/pkg/apis/config/v1/types.go @@ -0,0 +1,60 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// MaxCustomPriorityScore is the max score UtilizationShapePoint expects. +const MaxCustomPriorityScore int64 = 10 + +// UtilizationShapePoint represents a single point of a priority function shape. +type UtilizationShapePoint struct { + // Utilization (x axis). Valid values are 0 to 100. Fully utilized node maps to 100. + Utilization int32 + // Score assigned to a given utilization (y axis). Valid values are 0 to 10. + Score int32 +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +// +k8s:defaulter-gen=true + +// KnodeVolumeBindingArgs holds arguments used to configure the KnodeVolumeBinding plugin +type KnodeVolumeBindingArgs struct { + metav1.TypeMeta `json:",inline"` + + // BindTimeoutSeconds is the timeout in seconds in volume binding operation. + // Value must be non-negative integer. The value zero indicates no waiting. + // If this value is nil, the default value (600) will be used. + BindTimeoutSeconds *int64 `json:"bindTimeoutSeconds,omitempty"` + + // Shape specifies the points defining the score function shape, which is + // used to score nodes based on the utilization of statically provisioned + // PVs. The utilization is calculated by dividing the total requested + // storage of the pod by the total capacity of feasible PVs on each node. + // Each point contains utilization (ranges from 0 to 100) and its + // associated score (ranges from 0 to 10). You can turn the priority by + // specifying different scores for different utilization numbers. + // The default shape points are: + // 1) 0 for 0 utilization + // 2) 10 for 100 utilization + // All points must be sorted in increasing order by utilization. + // +featureGate=VolumeCapacityPriority + // +optional + Shape []UtilizationShapePoint +} diff --git a/pkg/apis/config/v1/zz_generated.conversion.go b/pkg/apis/config/v1/zz_generated.conversion.go new file mode 100644 index 000000000..1a7d80e2f --- /dev/null +++ b/pkg/apis/config/v1/zz_generated.conversion.go @@ -0,0 +1,93 @@ +//go:build !ignore_autogenerated +// +build !ignore_autogenerated + +// Code generated by conversion-gen. DO NOT EDIT. + +package v1 + +import ( + unsafe "unsafe" + + config "github.com/kosmos.io/kosmos/pkg/apis/config" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + conversion "k8s.io/apimachinery/pkg/conversion" + runtime "k8s.io/apimachinery/pkg/runtime" +) + +func init() { + localSchemeBuilder.Register(RegisterConversions) +} + +// RegisterConversions adds conversion functions to the given scheme. +// Public to allow building arbitrary schemes. +func RegisterConversions(s *runtime.Scheme) error { + if err := s.AddGeneratedConversionFunc((*KnodeVolumeBindingArgs)(nil), (*config.KnodeVolumeBindingArgs)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1_KnodeVolumeBindingArgs_To_config_KnodeVolumeBindingArgs(a.(*KnodeVolumeBindingArgs), b.(*config.KnodeVolumeBindingArgs), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*config.KnodeVolumeBindingArgs)(nil), (*KnodeVolumeBindingArgs)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_config_KnodeVolumeBindingArgs_To_v1_KnodeVolumeBindingArgs(a.(*config.KnodeVolumeBindingArgs), b.(*KnodeVolumeBindingArgs), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*UtilizationShapePoint)(nil), (*config.UtilizationShapePoint)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1_UtilizationShapePoint_To_config_UtilizationShapePoint(a.(*UtilizationShapePoint), b.(*config.UtilizationShapePoint), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*config.UtilizationShapePoint)(nil), (*UtilizationShapePoint)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_config_UtilizationShapePoint_To_v1_UtilizationShapePoint(a.(*config.UtilizationShapePoint), b.(*UtilizationShapePoint), scope) + }); err != nil { + return err + } + return nil +} + +func autoConvert_v1_KnodeVolumeBindingArgs_To_config_KnodeVolumeBindingArgs(in *KnodeVolumeBindingArgs, out *config.KnodeVolumeBindingArgs, s conversion.Scope) error { + if err := metav1.Convert_Pointer_int64_To_int64(&in.BindTimeoutSeconds, &out.BindTimeoutSeconds, s); err != nil { + return err + } + out.Shape = *(*[]config.UtilizationShapePoint)(unsafe.Pointer(&in.Shape)) + return nil +} + +// Convert_v1_KnodeVolumeBindingArgs_To_config_KnodeVolumeBindingArgs is an autogenerated conversion function. +func Convert_v1_KnodeVolumeBindingArgs_To_config_KnodeVolumeBindingArgs(in *KnodeVolumeBindingArgs, out *config.KnodeVolumeBindingArgs, s conversion.Scope) error { + return autoConvert_v1_KnodeVolumeBindingArgs_To_config_KnodeVolumeBindingArgs(in, out, s) +} + +func autoConvert_config_KnodeVolumeBindingArgs_To_v1_KnodeVolumeBindingArgs(in *config.KnodeVolumeBindingArgs, out *KnodeVolumeBindingArgs, s conversion.Scope) error { + if err := metav1.Convert_int64_To_Pointer_int64(&in.BindTimeoutSeconds, &out.BindTimeoutSeconds, s); err != nil { + return err + } + out.Shape = *(*[]UtilizationShapePoint)(unsafe.Pointer(&in.Shape)) + return nil +} + +// Convert_config_KnodeVolumeBindingArgs_To_v1_KnodeVolumeBindingArgs is an autogenerated conversion function. +func Convert_config_KnodeVolumeBindingArgs_To_v1_KnodeVolumeBindingArgs(in *config.KnodeVolumeBindingArgs, out *KnodeVolumeBindingArgs, s conversion.Scope) error { + return autoConvert_config_KnodeVolumeBindingArgs_To_v1_KnodeVolumeBindingArgs(in, out, s) +} + +func autoConvert_v1_UtilizationShapePoint_To_config_UtilizationShapePoint(in *UtilizationShapePoint, out *config.UtilizationShapePoint, s conversion.Scope) error { + out.Utilization = in.Utilization + out.Score = in.Score + return nil +} + +// Convert_v1_UtilizationShapePoint_To_config_UtilizationShapePoint is an autogenerated conversion function. +func Convert_v1_UtilizationShapePoint_To_config_UtilizationShapePoint(in *UtilizationShapePoint, out *config.UtilizationShapePoint, s conversion.Scope) error { + return autoConvert_v1_UtilizationShapePoint_To_config_UtilizationShapePoint(in, out, s) +} + +func autoConvert_config_UtilizationShapePoint_To_v1_UtilizationShapePoint(in *config.UtilizationShapePoint, out *UtilizationShapePoint, s conversion.Scope) error { + out.Utilization = in.Utilization + out.Score = in.Score + return nil +} + +// Convert_config_UtilizationShapePoint_To_v1_UtilizationShapePoint is an autogenerated conversion function. +func Convert_config_UtilizationShapePoint_To_v1_UtilizationShapePoint(in *config.UtilizationShapePoint, out *UtilizationShapePoint, s conversion.Scope) error { + return autoConvert_config_UtilizationShapePoint_To_v1_UtilizationShapePoint(in, out, s) +} diff --git a/pkg/apis/config/v1/zz_generated.deepcopy.go b/pkg/apis/config/v1/zz_generated.deepcopy.go new file mode 100644 index 000000000..cc7efb06b --- /dev/null +++ b/pkg/apis/config/v1/zz_generated.deepcopy.go @@ -0,0 +1,61 @@ +//go:build !ignore_autogenerated +// +build !ignore_autogenerated + +// Code generated by deepcopy-gen. DO NOT EDIT. + +package v1 + +import ( + runtime "k8s.io/apimachinery/pkg/runtime" +) + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *KnodeVolumeBindingArgs) DeepCopyInto(out *KnodeVolumeBindingArgs) { + *out = *in + out.TypeMeta = in.TypeMeta + if in.BindTimeoutSeconds != nil { + in, out := &in.BindTimeoutSeconds, &out.BindTimeoutSeconds + *out = new(int64) + **out = **in + } + if in.Shape != nil { + in, out := &in.Shape, &out.Shape + *out = make([]UtilizationShapePoint, len(*in)) + copy(*out, *in) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KnodeVolumeBindingArgs. +func (in *KnodeVolumeBindingArgs) DeepCopy() *KnodeVolumeBindingArgs { + if in == nil { + return nil + } + out := new(KnodeVolumeBindingArgs) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *KnodeVolumeBindingArgs) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *UtilizationShapePoint) DeepCopyInto(out *UtilizationShapePoint) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UtilizationShapePoint. +func (in *UtilizationShapePoint) DeepCopy() *UtilizationShapePoint { + if in == nil { + return nil + } + out := new(UtilizationShapePoint) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/apis/config/v1/zz_generated.defaults.go b/pkg/apis/config/v1/zz_generated.defaults.go new file mode 100644 index 000000000..88694caac --- /dev/null +++ b/pkg/apis/config/v1/zz_generated.defaults.go @@ -0,0 +1,17 @@ +//go:build !ignore_autogenerated +// +build !ignore_autogenerated + +// Code generated by defaulter-gen. DO NOT EDIT. + +package v1 + +import ( + runtime "k8s.io/apimachinery/pkg/runtime" +) + +// RegisterDefaults adds defaulters functions to the given scheme. +// Public to allow building arbitrary schemes. +// All generated defaulters are covering - they call all nested defaulters. +func RegisterDefaults(scheme *runtime.Scheme) error { + return nil +} diff --git a/pkg/apis/config/zz_generated.deepcopy.go b/pkg/apis/config/zz_generated.deepcopy.go new file mode 100644 index 000000000..2a0155c2d --- /dev/null +++ b/pkg/apis/config/zz_generated.deepcopy.go @@ -0,0 +1,56 @@ +//go:build !ignore_autogenerated +// +build !ignore_autogenerated + +// Code generated by deepcopy-gen. DO NOT EDIT. + +package config + +import ( + runtime "k8s.io/apimachinery/pkg/runtime" +) + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *KnodeVolumeBindingArgs) DeepCopyInto(out *KnodeVolumeBindingArgs) { + *out = *in + out.TypeMeta = in.TypeMeta + if in.Shape != nil { + in, out := &in.Shape, &out.Shape + *out = make([]UtilizationShapePoint, len(*in)) + copy(*out, *in) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KnodeVolumeBindingArgs. +func (in *KnodeVolumeBindingArgs) DeepCopy() *KnodeVolumeBindingArgs { + if in == nil { + return nil + } + out := new(KnodeVolumeBindingArgs) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *KnodeVolumeBindingArgs) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *UtilizationShapePoint) DeepCopyInto(out *UtilizationShapePoint) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UtilizationShapePoint. +func (in *UtilizationShapePoint) DeepCopy() *UtilizationShapePoint { + if in == nil { + return nil + } + out := new(UtilizationShapePoint) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/clustertree/knode-manager/knode.go b/pkg/clustertree/knode-manager/knode.go index be8305c4f..ee18b01f0 100644 --- a/pkg/clustertree/knode-manager/knode.go +++ b/pkg/clustertree/knode-manager/knode.go @@ -3,32 +3,22 @@ package knodemanager import ( "context" "fmt" - "path" "time" - "github.com/pkg/errors" - corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" kubeinformers "k8s.io/client-go/informers" corev1informers "k8s.io/client-go/informers/core/v1" discoveryv1 "k8s.io/client-go/informers/discovery/v1" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" "k8s.io/klog" - klogv2 "k8s.io/klog/v2" "github.com/kosmos.io/kosmos/cmd/clustertree/knode-manager/app/config" kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" - "github.com/kosmos.io/kosmos/pkg/clustertree/knode-manager/adapters" - k8sadapter "github.com/kosmos.io/kosmos/pkg/clustertree/knode-manager/adapters/k8s" - "github.com/kosmos.io/kosmos/pkg/clustertree/knode-manager/controllers" - "github.com/kosmos.io/kosmos/pkg/clustertree/knode-manager/controllers/mcs" "github.com/kosmos.io/kosmos/pkg/clustertree/knode-manager/utils" - "github.com/kosmos.io/kosmos/pkg/clustertree/knode-manager/utils/manager" kosmosversioned "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/generated/informers/externalversions" kosmosinformers "github.com/kosmos.io/kosmos/pkg/generated/informers/externalversions/apis/v1alpha1" @@ -40,10 +30,10 @@ type Knode struct { client kubernetes.Interface master kubernetes.Interface - podController *controllers.PodController - nodeController *controllers.NodeController - pvController *controllers.PVPVCController - serviceImportController *mcs.ServiceImportController + //podController *controllers.PodController + //nodeController *controllers.NodeController + //pvController *controllers.PVPVCController + //serviceImportController *mcs.ServiceImportController clientInformerFactory kubeinformers.SharedInformerFactory kosmosClientInformerFactory externalversions.SharedInformerFactory @@ -116,12 +106,11 @@ func NewKnode(ctx context.Context, knode *kosmosv1alpha1.Knode, cmdConfig *confi options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", knode.Spec.NodeName).String() })) - podInformerForNode := podInformerForNodeFactory.Core().V1().Pods() - - rm, err := manager.NewResourceManager(podInformerForNode.Lister(), masterInformers.secretInformer.Lister(), masterInformers.cmInformer.Lister(), masterInformers.serviceInformer.Lister()) - if err != nil { - return nil, errors.Wrap(err, "could not create resource manager") - } + //podInformerForNode := podInformerForNodeFactory.Core().V1().Pods() + //rm, err := manager.NewResourceManager(podInformerForNode.Lister(), masterInformers.secretInformer.Lister(), masterInformers.cmInformer.Lister(), masterInformers.serviceInformer.Lister()) + //if err != nil { + // return nil, errors.Wrap(err, "could not create resource manager") + //} // init adapter client client, err := utils.NewClientFromBytes(knode.Spec.Kubeconfig, func(config *rest.Config) { @@ -143,64 +132,64 @@ func NewKnode(ctx context.Context, knode *kosmosv1alpha1.Knode, cmdConfig *confi clientInformers := NewInformers(client, kosmosClient, cmdConfig.InformerResyncPeriod) - ac := &k8sadapter.AdapterConfig{ - Client: client, - Master: master, - PodInformer: clientInformers.podInformer, - NamespaceInformer: clientInformers.nsInformer, - NodeInformer: clientInformers.nodeInformer, - ConfigmapInformer: clientInformers.cmInformer, - SecretInformer: clientInformers.secretInformer, - ServiceInformer: clientInformers.serviceInformer, - ResourceManager: rm, - } - - var podAdapter adapters.PodHandler - var nodeAdapter adapters.NodeHandler - if knode.Spec.Type == kosmosv1alpha1.K8sAdapter { - podAdapter, err = k8sadapter.NewPodAdapter(ctx, ac, "", true) - if err != nil { - return nil, err - } - nodeAdapter, err = k8sadapter.NewNodeAdapter(ctx, knode, ac, cmdConfig) - if err != nil { - return nil, err - } - } - - dummyNode := controllers.BuildDummyNode(ctx, knode, nodeAdapter) - nc, err := controllers.NewNodeController(nodeAdapter, master, dummyNode) - if err != nil { - return nil, err - } + //ac := &k8sadapter.AdapterConfig{ + // Client: client, + // Master: master, + // PodInformer: clientInformers.podInformer, + // NamespaceInformer: clientInformers.nsInformer, + // NodeInformer: clientInformers.nodeInformer, + // ConfigmapInformer: clientInformers.cmInformer, + // SecretInformer: clientInformers.secretInformer, + // ServiceInformer: clientInformers.serviceInformer, + // ResourceManager: rm, + //} + // + //var podAdapter adapters.PodHandler + //var nodeAdapter adapters.NodeHandler + //if knode.Spec.Type == kosmosv1alpha1.K8sAdapter { + // podAdapter, err = k8sadapter.NewPodAdapter(ctx, ac, "", true) + // if err != nil { + // return nil, err + // } + // nodeAdapter, err = k8sadapter.NewNodeAdapter(ctx, knode, ac, cmdConfig) + // if err != nil { + // return nil, err + // } + //} + // + //dummyNode := controllers.BuildDummyNode(ctx, knode, nodeAdapter) + //nc, err := controllers.NewNodeController(nodeAdapter, master, dummyNode) + //if err != nil { + // return nil, err + //} eb := record.NewBroadcaster() eb.StartLogging(klog.Infof) eb.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: master.CoreV1().Events(cmdConfig.KubeNamespace)}) - - pc, err := controllers.NewPodController(controllers.PodConfig{ - PodClient: master.CoreV1(), - PodInformer: podInformerForNode, - EventRecorder: eb.NewRecorder(scheme.Scheme, corev1.EventSource{Component: path.Join(dummyNode.Name, ComponentName)}), - PodHandler: podAdapter, - ConfigMapInformer: masterInformers.cmInformer, - SecretInformer: masterInformers.secretInformer, - ServiceInformer: masterInformers.serviceInformer, - RateLimiterOpts: cmdConfig.RateLimiterOpts, - }) - if err != nil { - return nil, err - } - - pvController, err := controllers.NewPVPVCController(master, client, masterInformers.informer, clientInformers.informer, knode.Name) - if err != nil { - return nil, err - } - - serviceImportController, err := mcs.NewServiceImportController(client, kosmosClient, clientInformers.informer, masterInformers.informer, clientInformers.kosmosInformer) - if err != nil { - return nil, err - } + // + //pc, err := controllers.NewPodController(controllers.PodConfig{ + // PodClient: master.CoreV1(), + // PodInformer: podInformerForNode, + // EventRecorder: eb.NewRecorder(scheme.Scheme, corev1.EventSource{Component: path.Join(dummyNode.Name, ComponentName)}), + // PodHandler: podAdapter, + // ConfigMapInformer: masterInformers.cmInformer, + // SecretInformer: masterInformers.secretInformer, + // ServiceInformer: masterInformers.serviceInformer, + // RateLimiterOpts: cmdConfig.RateLimiterOpts, + //}) + //if err != nil { + // return nil, err + //} + // + //pvController, err := controllers.NewPVPVCController(master, client, masterInformers.informer, clientInformers.informer, knode.Name) + //if err != nil { + // return nil, err + //} + // + //serviceImportController, err := mcs.NewServiceImportController(client, kosmosClient, clientInformers.informer, masterInformers.informer, clientInformers.kosmosInformer) + //if err != nil { + // return nil, err + //} return &Knode{ client: client, @@ -209,38 +198,38 @@ func NewKnode(ctx context.Context, knode *kosmosv1alpha1.Knode, cmdConfig *confi kosmosClientInformerFactory: clientInformers.kosmosInformer, masterInformerFactory: masterInformers.informer, - podInformerFactory: podInformerForNodeFactory, - podController: pc, - nodeController: nc, - pvController: pvController, - serviceImportController: serviceImportController, + podInformerFactory: podInformerForNodeFactory, + //podController: pc, + //nodeController: nc, + //pvController: pvController, + //serviceImportController: serviceImportController, }, nil } func (kn *Knode) Run(ctx context.Context, c *config.Opts) { - go func() { - if err := kn.podController.Run(ctx, c.PodSyncWorkers); err != nil && !errors.Is(errors.Cause(err), context.Canceled) { - klogv2.Error(err) - } - }() - - go func() { - if err := kn.nodeController.Run(ctx); err != nil { - klogv2.Error(err) - } - }() - - go func() { - if err := kn.pvController.Run(ctx); err != nil { - klogv2.Error(err) - } - }() - - go func() { - if err := kn.serviceImportController.Run(ctx); err != nil { - klogv2.Error(err) - } - }() + //go func() { + // if err := kn.podController.Run(ctx, c.PodSyncWorkers); err != nil && !errors.Is(errors.Cause(err), context.Canceled) { + // klogv2.Error(err) + // } + //}() + // + //go func() { + // if err := kn.nodeController.Run(ctx); err != nil { + // klogv2.Error(err) + // } + //}() + // + //go func() { + // if err := kn.pvController.Run(ctx); err != nil { + // klogv2.Error(err) + // } + //}() + // + //go func() { + // if err := kn.serviceImportController.Run(ctx); err != nil { + // klogv2.Error(err) + // } + //}() kn.clientInformerFactory.Start(ctx.Done()) kn.masterInformerFactory.Start(ctx.Done()) diff --git a/pkg/clustertree/knode-manager/knodemanager.go b/pkg/clustertree/knode-manager/knodemanager.go index ee4e9cda2..4a7352489 100644 --- a/pkg/clustertree/knode-manager/knodemanager.go +++ b/pkg/clustertree/knode-manager/knodemanager.go @@ -21,7 +21,6 @@ import ( "github.com/kosmos.io/kosmos/cmd/clustertree/knode-manager/app/config" kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" - "github.com/kosmos.io/kosmos/pkg/clustertree/knode-manager/controllers/mcs" "github.com/kosmos.io/kosmos/pkg/clustertree/knode-manager/utils" crdclientset "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/generated/informers/externalversions" @@ -49,7 +48,7 @@ type KnodeManager struct { opts *config.Opts - serviceExportController *mcs.ServiceExportController + //serviceExportController *mcs.ServiceExportController } func NewManager(c *config.Config) (*KnodeManager, error) { @@ -65,10 +64,10 @@ func NewManager(c *config.Config) (*KnodeManager, error) { } masterInformers := NewInformers(kubeClient, c.CRDClient, c.Opts.InformerResyncPeriod) - serviceExportController, err := mcs.NewServiceExportController(kubeClient, c.CRDClient, masterInformers.informer, kosmosInformerFactory) - if err != nil { - return nil, err - } + //serviceExportController, err := mcs.NewServiceExportController(kubeClient, c.CRDClient, masterInformers.informer, kosmosInformerFactory) + //if err != nil { + // return nil, err + //} manager := &KnodeManager{ knclient: c.CRDClient, @@ -80,9 +79,9 @@ func NewManager(c *config.Config) (*KnodeManager, error) { queue: workqueue.NewRateLimitingQueue( NewItemExponentialFailureAndJitterSlowRateLimter(2*time.Second, 15*time.Second, 1*time.Minute, 1.0, defaultRetryNum), ), - knodes: make(map[string]*Knode), - opts: c.Opts, - serviceExportController: serviceExportController, + knodes: make(map[string]*Knode), + opts: c.Opts, + //serviceExportController: serviceExportController, } _, _ = knInformer.Informer().AddEventHandler( @@ -183,11 +182,11 @@ func (km *KnodeManager) Run(workers int, stopCh <-chan struct{}) { // kosmosInformerFactory should not be controlled by stopCh stopInformer := make(chan struct{}) - go func() { - if err := km.serviceExportController.Run(stopCh); err != nil { - klogv2.Error(err) - } - }() + //go func() { + // if err := km.serviceExportController.Run(stopCh); err != nil { + // klogv2.Error(err) + // } + //}() km.kosmosInformerFactory.Start(stopInformer) km.kubeInformerFactory.Start(stopInformer) diff --git a/pkg/clustertree/knode-manager/utils/util.go b/pkg/clustertree/knode-manager/utils/util.go index d4b585bf7..86c218a35 100644 --- a/pkg/clustertree/knode-manager/utils/util.go +++ b/pkg/clustertree/knode-manager/utils/util.go @@ -1 +1,119 @@ package utils + +import ( + "fmt" + + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + + kosmosversioned "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" +) + +type Handlers func(*rest.Config) + +func NewClientFromConfigPath(configPath string, opts ...Handlers) (kubernetes.Interface, error) { + var ( + config *rest.Config + err error + ) + config, err = clientcmd.BuildConfigFromFlags("", configPath) + if err != nil { + return nil, fmt.Errorf("failed to build config from configpath: %v", err) + } + + for _, opt := range opts { + if opt == nil { + continue + } + opt(config) + } + + client, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, fmt.Errorf("could not create clientset: %v", err) + } + return client, nil +} + +func NewKosmosClientFromConfigPath(configPath string, opts ...Handlers) (kosmosversioned.Interface, error) { + var ( + config *rest.Config + err error + ) + config, err = clientcmd.BuildConfigFromFlags("", configPath) + if err != nil { + return nil, fmt.Errorf("failed to build config from configpath: %v", err) + } + + for _, opt := range opts { + if opt == nil { + continue + } + opt(config) + } + + client, err := kosmosversioned.NewForConfig(config) + if err != nil { + return nil, fmt.Errorf("could not create clientset: %v", err) + } + return client, nil +} + +func NewClientFromBytes(kubeConfig []byte, opts ...Handlers) (kubernetes.Interface, error) { + var ( + config *rest.Config + err error + ) + + clientConfig, err := clientcmd.NewClientConfigFromBytes(kubeConfig) + if err != nil { + return nil, err + } + config, err = clientConfig.ClientConfig() + if err != nil { + return nil, err + } + + for _, opt := range opts { + if opt == nil { + continue + } + opt(config) + } + + client, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, fmt.Errorf("create client failed: %v", err) + } + return client, nil +} + +func NewKosmosClientFromBytes(kubeConfig []byte, opts ...Handlers) (kosmosversioned.Interface, error) { + var ( + config *rest.Config + err error + ) + + clientConfig, err := clientcmd.NewClientConfigFromBytes(kubeConfig) + if err != nil { + return nil, err + } + config, err = clientConfig.ClientConfig() + if err != nil { + return nil, err + } + + for _, opt := range opts { + if opt == nil { + continue + } + opt(config) + } + + client, err := kosmosversioned.NewForConfig(config) + if err != nil { + return nil, fmt.Errorf("create client failed: %v", err) + } + return client, nil +} diff --git a/pkg/scheduler/lifted/helpers/tainttoleration.go b/pkg/scheduler/lifted/helpers/tainttoleration.go index 6728c0639..ed08b5884 100644 --- a/pkg/scheduler/lifted/helpers/tainttoleration.go +++ b/pkg/scheduler/lifted/helpers/tainttoleration.go @@ -1,5 +1,5 @@ /* -Copyright 2017 The Kubernetes Authors. +Copyright 2019 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,20 +14,29 @@ See the License for the specific language governing permissions and limitations under the License. */ -// This code is directly lifted from the Kubernetes codebase in order to avoid relying on the k8s.io/kubernetes package. +// This code is lifted from the Kubernetes codebase and make some slight modifications in order to avoid relying on the k8s.io/kubernetes package. // For reference: -//https://github.com/kubernetes/component-helpers/blob/master/scheduling/corev1/helpers.go +//https://github.com/kubernetes/component-helpers/blob/release-1.26/scheduling/corev1/helpers.go package helpers import v1 "k8s.io/api/core/v1" var KnodeTaint = &v1.Taint{ - Key: "knode.kosmos.io", + Key: "kosmos.io/node", Value: "true", Effect: v1.TaintEffectNoSchedule, } +func HasKnodeTaint(node *v1.Node) bool { + for _, taint := range node.Spec.Taints { + if taint.Key == KnodeTaint.Key && taint.Value == KnodeTaint.Value && taint.Effect == KnodeTaint.Effect { + return true + } + } + return false +} + // TolerationsTolerateTaint checks if taint is tolerated by any of the tolerations. func TolerationsTolerateTaint(tolerations []v1.Toleration, taint *v1.Taint) bool { if taint.MatchTaint(KnodeTaint) { diff --git a/pkg/scheduler/lifted/plugins/knodetainttoleration/knode_tainttoleration.go b/pkg/scheduler/lifted/plugins/knodetainttoleration/knode_tainttoleration.go index 57103fc6a..7c1a49afb 100644 --- a/pkg/scheduler/lifted/plugins/knodetainttoleration/knode_tainttoleration.go +++ b/pkg/scheduler/lifted/plugins/knodetainttoleration/knode_tainttoleration.go @@ -1,5 +1,5 @@ /* -Copyright 2017 The Kubernetes Authors. +Copyright 2019 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,9 +14,9 @@ See the License for the specific language governing permissions and limitations under the License. */ -// This code is directly lifted from the Kubernetes codebase in order to avoid relying on the k8s.io/kubernetes package. +// This code is lifted from the Kubernetes codebase and make some slight modifications in order to avoid relying on the k8s.io/kubernetes package. // For reference: -// https://github.com/kubernetes/kubernetes/blob/master/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go +// https://github.com/kubernetes/kubernetes/blob/release-1.26/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go package knodetainttoleration @@ -27,7 +27,6 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/kubernetes/pkg/scheduler/framework" - "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper" "github.com/kosmos.io/kosmos/pkg/scheduler/lifted/helpers" ) @@ -38,11 +37,6 @@ type TaintToleration struct { frameworkHandler framework.Handle } -func (t *TaintToleration) NormalizeScore(ctx context.Context, state *framework.CycleState, p *corev1.Pod, scores framework.NodeScoreList) *framework.Status { - //TODO implement me - panic("implement me") -} - // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. func (t *TaintToleration) EventsToRegister() []framework.ClusterEvent { @@ -51,41 +45,31 @@ func (t *TaintToleration) EventsToRegister() []framework.ClusterEvent { } } -func (t *TaintToleration) PreScore(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodes []*corev1.Node) *framework.Status { - //TODO implement me - panic("implement me") -} - -func (t *TaintToleration) Score(ctx context.Context, state *framework.CycleState, p *corev1.Pod, nodeName string) (int64, *framework.Status) { - return 0, nil -} - -func (t *TaintToleration) ScoreExtensions() framework.ScoreExtensions { - return t -} - func (t *TaintToleration) Name() string { return Name } func (t *TaintToleration) Filter(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { - node := nodeInfo.Node() - if node == nil { + if nodeInfo == nil || nodeInfo.Node() == nil { return framework.AsStatus(fmt.Errorf("invalid nodeInfo")) } - taint, isUntolerated := helpers.FindMatchingUntoleratedTaint(node.Spec.Taints, pod.Spec.Tolerations, helper.DoNotScheduleTaintsFilterFunc()) + filterPredicate := func(t *corev1.Taint) bool { + // PodToleratesNodeTaints is only interested in NoSchedule and NoExecute taints. + return t.Effect == corev1.TaintEffectNoSchedule || t.Effect == corev1.TaintEffectNoExecute + } + + taint, isUntolerated := helpers.FindMatchingUntoleratedTaint(nodeInfo.Node().Spec.Taints, pod.Spec.Tolerations, filterPredicate) if !isUntolerated { return nil } - errReason := fmt.Sprintf("node(s) had untolerated taint {%s: %s}", taint.Key, taint.Value) + errReason := fmt.Sprintf("node(s) had taint {%s: %s}, that the pod didn't tolerate", + taint.Key, taint.Value) return framework.NewStatus(framework.UnschedulableAndUnresolvable, errReason) } var _ framework.FilterPlugin = &TaintToleration{} -var _ framework.PreScorePlugin = &TaintToleration{} -var _ framework.ScorePlugin = &TaintToleration{} var _ framework.EnqueueExtensions = &TaintToleration{} // New initializes a new plugin and returns it. diff --git a/pkg/scheduler/lifted/plugins/knodevolumebinding/knode_volume_binding.go b/pkg/scheduler/lifted/plugins/knodevolumebinding/knode_volume_binding.go new file mode 100644 index 000000000..f105a3a2a --- /dev/null +++ b/pkg/scheduler/lifted/plugins/knodevolumebinding/knode_volume_binding.go @@ -0,0 +1,322 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This code is lifted from the Kubernetes codebase and make some slight modifications in order to avoid relying on the k8s.io/kubernetes package. +// For reference: +// https://github.com/kubernetes/kubernetes/blob/release-1.26/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go + +package knodevolumebinding + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/component-helpers/storage/ephemeral" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework" + scheduling "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding" + + "github.com/kosmos.io/kosmos/pkg/apis/config" + "github.com/kosmos.io/kosmos/pkg/scheduler/lifted/helpers" +) + +const ( + stateKey framework.StateKey = Name +) + +// the state is initialized in PreFilter phase. because we save the pointer in +// framework.CycleState, in the later phases we don't need to call Write method +// to update the value +type stateData struct { + skip bool // set true if pod does not have PVCs + boundClaims []*corev1.PersistentVolumeClaim + claimsToBind []*corev1.PersistentVolumeClaim + allBound bool + // podVolumesByNode holds the pod's volume information found in the Filter + // phase for each node + // it's initialized in the PreFilter phase + podVolumesByNode map[string]*scheduling.PodVolumes + sync.Mutex +} + +func (d *stateData) Clone() framework.StateData { + return d +} + +// VolumeBinding is a plugin that binds pod volumes in scheduling. +// In the Filter phase, pod binding cache is created for the pod and used in +// Reserve and PreBind phases. +type VolumeBinding struct { + Binder scheduling.SchedulerVolumeBinder + PVCLister corelisters.PersistentVolumeClaimLister + frameworkHandler framework.Handle +} + +var _ framework.PreFilterPlugin = &VolumeBinding{} +var _ framework.FilterPlugin = &VolumeBinding{} +var _ framework.ReservePlugin = &VolumeBinding{} +var _ framework.PreBindPlugin = &VolumeBinding{} + +// Name is the name of the plugin used in Registry and configurations. +const Name = "KnodeVolumeBinding" + +// Name returns name of the plugin. It is used in logs, etc. +func (pl *VolumeBinding) Name() string { + return Name +} + +// podHasPVCs returns 2 values: +// - the first one to denote if the given "pod" has any PVC defined. +// - the second one to return any error if the requested PVC is illegal. +func (pl *VolumeBinding) podHasPVCs(pod *corev1.Pod) (bool, error) { + hasPVC := false + for i, vol := range pod.Spec.Volumes { + var pvcName string + isEphemeral := false + switch { + case vol.PersistentVolumeClaim != nil: + pvcName = vol.PersistentVolumeClaim.ClaimName + case vol.Ephemeral != nil: + pvcName = ephemeral.VolumeClaimName(pod, &pod.Spec.Volumes[i]) + isEphemeral = true + default: + // Volume is not using a PVC, ignore + continue + } + hasPVC = true + pvc, err := pl.PVCLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName) + if err != nil { + // The error usually has already enough context ("persistentvolumeclaim "myclaim" not found"), + // but we can do better for generic ephemeral inline volumes where that situation + // is normal directly after creating a pod. + if isEphemeral && apierrors.IsNotFound(err) { + err = fmt.Errorf("waiting for ephemeral volume controller to create the persistentvolumeclaim %q", pvcName) + } + return hasPVC, err + } + + if pvc.Status.Phase == corev1.ClaimLost { + return hasPVC, fmt.Errorf("persistentvolumeclaim %q bound to non-existent persistentvolume %q", pvc.Name, pvc.Spec.VolumeName) + } + if pvc.DeletionTimestamp != nil { + return hasPVC, fmt.Errorf("persistentvolumeclaim %q is being deleted", pvc.Name) + } + + if isEphemeral { + if err := ephemeral.VolumeIsForPod(pod, pvc); err != nil { + return hasPVC, err + } + } + } + return hasPVC, nil +} + +// PreFilter invoked at the prefilter extension point to check if pod has all +// immediate PVCs bound. If not all immediate PVCs are bound, an +// UnschedulableAndUnresolvable is returned. +func (pl *VolumeBinding) PreFilter(ctx context.Context, state *framework.CycleState, pod *corev1.Pod) (*framework.PreFilterResult, *framework.Status) { + // If pod does not reference any PVC, we don't need to do anything. + if hasPVC, err := pl.podHasPVCs(pod); err != nil { + return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error()) + } else if !hasPVC { + state.Write(stateKey, &stateData{skip: true}) + return nil, nil + } + boundClaims, claimsToBind, unboundClaimsImmediate, err := pl.Binder.GetPodVolumes(pod) + if err != nil { + return nil, framework.AsStatus(err) + } + if len(unboundClaimsImmediate) > 0 { + // Return UnschedulableAndUnresolvable error if immediate claims are + // not bound. Pod will be moved to active/backoff queues once these + // claims are bound by PV controller. + status := framework.NewStatus(framework.UnschedulableAndUnresolvable) + status.AppendReason("pod has unbound immediate PersistentVolumeClaims") + return nil, status + } + state.Write(stateKey, &stateData{boundClaims: boundClaims, claimsToBind: claimsToBind, podVolumesByNode: make(map[string]*scheduling.PodVolumes)}) + return nil, nil +} + +// PreFilterExtensions returns prefilter extensions, pod add and remove. +func (pl *VolumeBinding) PreFilterExtensions() framework.PreFilterExtensions { + return nil +} + +func getStateData(cs *framework.CycleState) (*stateData, error) { + state, err := cs.Read(stateKey) + if err != nil { + return nil, err + } + s, ok := state.(*stateData) + if !ok { + return nil, errors.New("unable to convert state into stateData") + } + return s, nil +} + +// Filter invoked at the filter extension point. +// It evaluates if a pod can fit due to the volumes it requests, +// for both bound and unbound PVCs. +// +// For PVCs that are bound, then it checks that the corresponding PV's node affinity is +// satisfied by the given node. +// +// For PVCs that are unbound, it tries to find available PVs that can satisfy the PVC requirements +// and that the PV node affinity is satisfied by the given node. +// +// If storage capacity tracking is enabled, then enough space has to be available +// for the node and volumes that still need to be created. +// +// The predicate returns true if all bound PVCs have compatible PVs with the node, and if all unbound +// PVCs can be matched with an available and node-compatible PV. +func (pl *VolumeBinding) Filter(_ context.Context, cs *framework.CycleState, pod *corev1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { + node := nodeInfo.Node() + if node == nil { + return framework.NewStatus(framework.Error, "node not found") + } + + if helpers.HasKnodeTaint(node) { + return nil + } + + state, err := getStateData(cs) + if err != nil { + return framework.AsStatus(err) + } + + if state.skip { + return nil + } + + podVolumes, reasons, err := pl.Binder.FindPodVolumes(pod, state.boundClaims, state.claimsToBind, node) + + if err != nil { + return framework.AsStatus(err) + } + + if len(reasons) > 0 { + status := framework.NewStatus(framework.UnschedulableAndUnresolvable) + for _, reason := range reasons { + status.AppendReason(string(reason)) + } + return status + } + + // multiple goroutines call `Filter` on different nodes simultaneously and the `CycleState` may be duplicated, so we must use a local lock here + state.Lock() + state.podVolumesByNode[node.Name] = podVolumes + state.Unlock() + return nil +} + +// Reserve reserves volumes of pod and saves binding status in cycle state. +func (pl *VolumeBinding) Reserve(_ context.Context, cs *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status { + state, err := getStateData(cs) + if err != nil { + return framework.AsStatus(err) + } + // we don't need to hold the lock as only one node will be reserved for the given pod + podVolumes, ok := state.podVolumesByNode[nodeName] + if ok { + allBound, err := pl.Binder.AssumePodVolumes(pod, nodeName, podVolumes) + if err != nil { + return framework.AsStatus(err) + } + state.allBound = allBound + } else { + // may not exist if the pod does not reference any PVC + state.allBound = true + } + return nil +} + +// PreBind will make the API update with the assumed bindings and wait until +// the PV controller has completely finished the binding operation. +// +// If binding errors, times out or gets undone, then an error will be returned to +// retry scheduling. +func (pl *VolumeBinding) PreBind(ctx context.Context, cs *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status { + s, err := getStateData(cs) + if err != nil { + return framework.AsStatus(err) + } + if s.allBound { + // no need to bind volumes + return nil + } + // we don't need to hold the lock as only one node will be pre-bound for the given pod + podVolumes, ok := s.podVolumesByNode[nodeName] + if !ok { + return framework.AsStatus(fmt.Errorf("no pod volumes found for node %q", nodeName)) + } + klog.V(5).InfoS("Trying to bind volumes for pod", "pod", klog.KObj(pod)) + err = pl.Binder.BindPodVolumes(ctx, pod, podVolumes) + if err != nil { + klog.V(1).InfoS("Failed to bind volumes for pod", "pod", klog.KObj(pod), "err", err) + return framework.AsStatus(err) + } + klog.V(5).InfoS("Success binding volumes for pod", "pod", klog.KObj(pod)) + return nil +} + +// Unreserve clears assumed PV and PVC cache. +// It's idempotent, and does nothing if no cache found for the given pod. +func (pl *VolumeBinding) Unreserve(_ context.Context, cs *framework.CycleState, _ *corev1.Pod, nodeName string) { + s, err := getStateData(cs) + if err != nil { + return + } + // we don't need to hold the lock as only one node may be unreserved + podVolumes, ok := s.podVolumesByNode[nodeName] + if !ok { + return + } + pl.Binder.RevertAssumedPodVolumes(podVolumes) +} + +// New initializes a new plugin and returns it. +func New(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) { + args, ok := plArgs.(*config.KnodeVolumeBindingArgs) + if !ok { + return nil, fmt.Errorf("want args to be of type VolumeBindingArgs, got %T", plArgs) + } + + podInformer := fh.SharedInformerFactory().Core().V1().Pods() + nodeInformer := fh.SharedInformerFactory().Core().V1().Nodes() + pvcInformer := fh.SharedInformerFactory().Core().V1().PersistentVolumeClaims() + pvInformer := fh.SharedInformerFactory().Core().V1().PersistentVolumes() + storageClassInformer := fh.SharedInformerFactory().Storage().V1().StorageClasses() + csiNodeInformer := fh.SharedInformerFactory().Storage().V1().CSINodes() + capacityCheck := scheduling.CapacityCheck{ + CSIDriverInformer: fh.SharedInformerFactory().Storage().V1().CSIDrivers(), + CSIStorageCapacityInformer: fh.SharedInformerFactory().Storage().V1().CSIStorageCapacities(), + } + binder := scheduling.NewVolumeBinder(fh.ClientSet(), podInformer, nodeInformer, csiNodeInformer, pvcInformer, pvInformer, storageClassInformer, capacityCheck, time.Duration(args.BindTimeoutSeconds)*time.Second) + + return &VolumeBinding{ + Binder: binder, + PVCLister: pvcInformer.Lister(), + frameworkHandler: fh, + }, nil +}