Skip to content

Commit

Permalink
Event watcher (#1237)
Browse files Browse the repository at this point in the history
* Add event watcher, To #37688693

Signed-off-by: cheyang <[email protected]>

* Add event watcher, To #37688693

Signed-off-by: cheyang <[email protected]>

* Add event watcher, To #37688693

Signed-off-by: cheyang <[email protected]>

* Add event handler, To #37688693

Signed-off-by: cheyang <[email protected]>

* Add event handler, To #37688693

Signed-off-by: cheyang <[email protected]>

* Add event handler, To #37688693

Signed-off-by: cheyang <[email protected]>

* Add event handler, To #37688693

Signed-off-by: cheyang <[email protected]>

* Add event handler, To #37688693

Signed-off-by: cheyang <[email protected]>

* Add event handler, To #37688693

Signed-off-by: cheyang <[email protected]>

* Add event handler, To #37688693

Signed-off-by: cheyang <[email protected]>

* Add event handler, To #37688693

Signed-off-by: cheyang <[email protected]>

* Add event handler, To #37688693

Signed-off-by: cheyang <[email protected]>

* Add event handler, To #37688693

Signed-off-by: cheyang <[email protected]>

* Add event handler, To #37688693

Signed-off-by: cheyang <[email protected]>

* Support Healthy check, To #37688693

Signed-off-by: cheyang <[email protected]>

* Add event watcher, To #37688693

Signed-off-by: cheyang <[email protected]>

* Add event watcher, To #37688693

Signed-off-by: cheyang <[email protected]>

* Add event watcher, To #37688693

Signed-off-by: cheyang <[email protected]>

* Add event watcher, To #37688693

Signed-off-by: cheyang <[email protected]>

* Add event watcher, To #37688693

Signed-off-by: cheyang <[email protected]>

* Add event watcher, To #37688693

Signed-off-by: cheyang <[email protected]>

* Add event watcher, To #37688693

Signed-off-by: cheyang <[email protected]>

* Add deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object, To #38346130

Signed-off-by: cheyang <[email protected]>

* Add deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object, To #38346130

Signed-off-by: cheyang <[email protected]>

* Add deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object, To #38346130

Signed-off-by: cheyang <[email protected]>

* Add event handler, To #38346130

Signed-off-by: cheyang <[email protected]>

* Update logging, To #38346130

Signed-off-by: cheyang <[email protected]>
  • Loading branch information
cheyang authored Dec 10, 2021
1 parent 7b5df4b commit 76f6206
Show file tree
Hide file tree
Showing 32 changed files with 2,771 additions and 109 deletions.
4 changes: 4 additions & 0 deletions api/v1alpha1/jindoruntime_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
JindoRuntimeKind = "JindoRuntime"
)

// JindoCompTemplateSpec is a description of the Jindo commponents
type JindoCompTemplateSpec struct {
// Replicas is the desired number of replicas of the given template.
Expand Down
16 changes: 10 additions & 6 deletions cmd/jindo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package main

import (
"fmt"
"os"

"github.com/fluid-cloudnative/fluid"
datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
jindoctl "github.com/fluid-cloudnative/fluid/pkg/controllers/v1alpha1/jindo"
Expand All @@ -29,7 +31,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/net"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"os"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
Expand All @@ -41,10 +42,12 @@ var (
// Use compiler to check if the struct implements all the interface
_ base.Implement = (*jindo.JindoEngine)(nil)

short bool
metricsAddr string
enableLeaderElection bool
development bool
short bool
metricsAddr string
enableLeaderElection bool
development bool
// The new mode
eventDriven bool
portRange string
maxConcurrentReconciles int
)
Expand Down Expand Up @@ -79,6 +82,7 @@ func init() {
startCmd.Flags().BoolVarP(&development, "development", "", true, "Enable development mode for fluid controller.")
startCmd.Flags().StringVar(&portRange, "runtime-node-port-range", "18000-19999", "Set available port range for Jindo")
startCmd.Flags().IntVar(&maxConcurrentReconciles, "runtime-workers", 3, "Set max concurrent workers for JindoRuntime controller")
startCmd.Flags().BoolVar(&eventDriven, "event-driven", true, "The reconciler's loop strategy. if it's false, it indicates period driven.")
versionCmd.Flags().BoolVar(&short, "short", false, "print just the short version info")

cmd.AddCommand(startCmd)
Expand Down Expand Up @@ -128,7 +132,7 @@ func handle() {
ctrl.Log.WithName("jindoctl").WithName("JindoRuntime"),
mgr.GetScheme(),
mgr.GetEventRecorderFor("JindoRuntime"),
)).SetupWithManager(mgr, controllerOptions); err != nil {
)).SetupWithManager(mgr, controllerOptions, eventDriven); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "JindoRuntime")
os.Exit(1)
}
Expand Down
36 changes: 29 additions & 7 deletions pkg/controllers/v1alpha1/jindo/jindoruntime_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,31 @@ import (
"sync"
"time"

"sigs.k8s.io/controller-runtime/pkg/controller"

"github.com/pkg/errors"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"

"github.com/fluid-cloudnative/fluid/api/v1alpha1"
datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/controllers"
"github.com/fluid-cloudnative/fluid/pkg/ctrl/watch"
"github.com/fluid-cloudnative/fluid/pkg/ddc/base"
cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime"
"github.com/fluid-cloudnative/fluid/pkg/utils"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// Use compiler to check if the struct implements all the interface
var _ controllers.RuntimeReconcilerInterface = (*RuntimeReconciler)(nil)

const controllerName string = "JindoRuntimeController"

// RuntimeReconciler reconciles a JindoRuntime object
type RuntimeReconciler struct {
Scheme *runtime.Scheme
Expand Down Expand Up @@ -101,9 +105,27 @@ func (r *RuntimeReconciler) Reconcile(context context.Context, req ctrl.Request)
}

//SetupWithManager setups the manager with RuntimeReconciler
func (r *RuntimeReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error {
return ctrl.NewControllerManagedBy(mgr).
WithOptions(options).
For(&datav1alpha1.JindoRuntime{}).
Complete(r)
func (r *RuntimeReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options, eventDriven bool) (err error) {
if eventDriven {
err = watch.SetupWatcherWithReconciler(mgr, options, r)
} else {
err = ctrl.NewControllerManagedBy(mgr).
WithOptions(options).
For(&datav1alpha1.JindoRuntime{}).
Complete(r)
}
return
}

func (r *RuntimeReconciler) ControllerName() string {
return controllerName
}

func (r *RuntimeReconciler) ManagedResource() client.Object {
return &datav1alpha1.JindoRuntime{
TypeMeta: metav1.TypeMeta{
Kind: v1alpha1.JindoRuntimeKind,
APIVersion: datav1alpha1.GroupVersion.Group + "/" + datav1alpha1.GroupVersion.Version,
},
}
}
102 changes: 102 additions & 0 deletions pkg/ctrl/fuse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
Copyright 2021 The Fluid Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package ctrl

import (
"context"
"fmt"
"reflect"

datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/ddc/base"
"github.com/fluid-cloudnative/fluid/pkg/utils"
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
)

// CheckFuseHealthy checks the ds healthy with role
func (e *Helper) CheckFuseHealthy(recorder record.EventRecorder, runtime base.RuntimeInterface,
currentStatus datav1alpha1.RuntimeStatus,
ds *appsv1.DaemonSet) (err error) {
var (
healthy bool
unavailablePodNames []types.NamespacedName
)
if ds.Status.NumberUnavailable == 0 {
healthy = true
}

statusToUpdate := runtime.GetStatus()
if len(statusToUpdate.Conditions) == 0 {
statusToUpdate.Conditions = []datav1alpha1.RuntimeCondition{}
}

if healthy {
cond := utils.NewRuntimeCondition(datav1alpha1.RuntimeFusesReady, "The fuse is ready.",
"The fuse is ready.", corev1.ConditionTrue)
_, oldCond := utils.GetRuntimeCondition(statusToUpdate.Conditions, cond.Type)

if oldCond == nil || oldCond.Type != cond.Type {
statusToUpdate.Conditions =
utils.UpdateRuntimeCondition(statusToUpdate.Conditions,
cond)
}
statusToUpdate.FusePhase = datav1alpha1.RuntimePhaseReady
} else {
// 1. Update the status
cond := utils.NewRuntimeCondition(datav1alpha1.RuntimeFusesReady, "The fuses are not ready.",
fmt.Sprintf("The fuses %s in %s are not ready.", ds.Name, ds.Namespace), corev1.ConditionFalse)
_, oldCond := utils.GetRuntimeCondition(statusToUpdate.Conditions, cond.Type)

if oldCond == nil || oldCond.Type != cond.Type {
statusToUpdate.Conditions =
utils.UpdateRuntimeCondition(statusToUpdate.Conditions,
cond)
}
statusToUpdate.FusePhase = datav1alpha1.RuntimePhaseNotReady

// 2. Record the event
unavailablePodNames, err = kubeclient.GetUnavailableDaemonPodNames(e.client, ds)
if err != nil {
return err
}

// 3. Set error
err = fmt.Errorf("the fuse %s in %s are not ready. The expected number is %d, the actual number is %d, the unhealthy pods are %v",
ds.Name,
ds.Namespace,
ds.Status.DesiredNumberScheduled,
ds.Status.NumberReady,
unavailablePodNames)

recorder.Eventf(runtime, corev1.EventTypeWarning, "FuseUnhealthy", err.Error())
}

if err != nil {
return
}

status := *statusToUpdate
if !reflect.DeepEqual(status, currentStatus) {
return e.client.Status().Update(context.TODO(), runtime)
}

return
}
Loading

0 comments on commit 76f6206

Please sign in to comment.