Skip to content

Commit

Permalink
add event recorder (#411)
Browse files Browse the repository at this point in the history
  • Loading branch information
MegaByte875 authored Dec 14, 2023
1 parent f8b2f07 commit 483cf0d
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 32 deletions.
4 changes: 4 additions & 0 deletions pkg/controller/component/graphd_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"

"github.com/vesoft-inc/nebula-operator/apis/apps/v1alpha1"
Expand All @@ -38,17 +39,20 @@ type graphdCluster struct {
clientSet kube.ClientSet
dm discovery.Interface
updateManager UpdateManager
eventRecorder record.EventRecorder
}

func NewGraphdCluster(
clientSet kube.ClientSet,
dm discovery.Interface,
um UpdateManager,
recorder record.EventRecorder,
) ReconcileManager {
return &graphdCluster{
clientSet: clientSet,
dm: dm,
updateManager: um,
eventRecorder: recorder,
}
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/component/metad_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"

"github.com/vesoft-inc/nebula-go/v3/nebula/meta"
Expand All @@ -39,17 +40,20 @@ type metadCluster struct {
clientSet kube.ClientSet
dm discovery.Interface
updateManager UpdateManager
eventRecorder record.EventRecorder
}

func NewMetadCluster(
clientSet kube.ClientSet,
dm discovery.Interface,
um UpdateManager,
recorder record.EventRecorder,
) ReconcileManager {
return &metadCluster{
clientSet: clientSet,
dm: dm,
updateManager: um,
eventRecorder: recorder,
}
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/component/storaged_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"

"github.com/vesoft-inc/nebula-operator/apis/apps/v1alpha1"
Expand All @@ -48,6 +49,7 @@ type storagedCluster struct {
scaleManager ScaleManager
updateManager UpdateManager
failoverManager FailoverManager
eventRecorder record.EventRecorder
}

func NewStoragedCluster(
Expand All @@ -56,13 +58,15 @@ func NewStoragedCluster(
sm ScaleManager,
um UpdateManager,
fm FailoverManager,
recorder record.EventRecorder,
) ReconcileManager {
return &storagedCluster{
clientSet: clientSet,
dm: dm,
scaleManager: sm,
updateManager: um,
failoverManager: fm,
eventRecorder: recorder,
}
}

Expand Down
22 changes: 19 additions & 3 deletions pkg/controller/nebulacluster/nebula_cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import (
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
errorutils "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedv1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -82,24 +86,36 @@ func NewClusterReconciler(mgr ctrl.Manager, enableKruise bool) (*ClusterReconcil
return nil, fmt.Errorf("server version not supported")
}

kubeClient, err := kubernetes.NewForConfig(mgr.GetConfig())
if err != nil {
return nil, fmt.Errorf("create kubernetes client failed: %v", err)
}
eventBroadcaster := record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{QPS: 1})
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&typedv1.EventSinkImpl{Interface: typedv1.New(kubeClient.CoreV1().RESTClient()).Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "nebula-cluster-controller"})

return &ClusterReconciler{
control: NewDefaultNebulaClusterControl(
mgr.GetClient(),
clientSet.NebulaCluster(),
component.NewGraphdCluster(
clientSet,
dm,
graphdUpdater),
graphdUpdater,
recorder),
component.NewMetadCluster(
clientSet,
dm,
metadUpdater),
metadUpdater,
recorder),
component.NewStoragedCluster(
clientSet,
dm,
sm,
storagedUpdater,
storagedFailover),
storagedFailover,
recorder),
component.NewNebulaExporter(clientSet),
component.NewNebulaConsole(clientSet),
reclaimer.NewMetaReconciler(clientSet),
Expand Down
17 changes: 16 additions & 1 deletion pkg/controller/nebularestore/nebula_restore_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,15 @@ package nebularestore

import (
"context"
"fmt"
"time"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedv1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -50,7 +56,16 @@ func NewRestoreReconciler(mgr ctrl.Manager) (*Reconciler, error) {
return nil, err
}

restoreMgr := NewRestoreManager(clientSet)
kubeClient, err := kubernetes.NewForConfig(mgr.GetConfig())
if err != nil {
return nil, fmt.Errorf("create kubernetes client failed: %v", err)
}
eventBroadcaster := record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{QPS: 1})
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&typedv1.EventSinkImpl{Interface: typedv1.New(kubeClient.CoreV1().RESTClient()).Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "nebula-restore-controller"})

restoreMgr := NewRestoreManager(clientSet, recorder)

return &Reconciler{
control: NewRestoreControl(clientSet, restoreMgr),
Expand Down
8 changes: 5 additions & 3 deletions pkg/controller/nebularestore/nebula_restore_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"

Expand Down Expand Up @@ -76,11 +77,12 @@ type Manager interface {
var _ Manager = (*restoreManager)(nil)

type restoreManager struct {
clientSet kube.ClientSet
clientSet kube.ClientSet
eventRecorder record.EventRecorder
}

func NewRestoreManager(clientSet kube.ClientSet) Manager {
return &restoreManager{clientSet: clientSet}
func NewRestoreManager(clientSet kube.ClientSet, recorder record.EventRecorder) Manager {
return &restoreManager{clientSet: clientSet, eventRecorder: recorder}
}

func (rm *restoreManager) Sync(restore *v1alpha1.NebulaRestore) error {
Expand Down
36 changes: 11 additions & 25 deletions pkg/kube/rbac.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,11 @@ func createClusterRole(ctx context.Context, k8sClient client.Client) error {
},
},
}
if err := k8sClient.Get(ctx, client.ObjectKey{Name: v1alpha1.NebulaRoleName}, &rbacv1.ClusterRole{}); err != nil {
if apierrors.IsNotFound(err) {
if err := k8sClient.Create(ctx, &role); err != nil {
if apierrors.IsAlreadyExists(err) {
return nil
}
return fmt.Errorf("failed to create ClusterRole role: %v", err)
}
if err := k8sClient.Create(ctx, &role); err != nil {
if apierrors.IsAlreadyExists(err) {
return nil
}
return err
return fmt.Errorf("failed to create ClusterRole: %v", err)
}
return nil
}
Expand All @@ -93,16 +87,14 @@ func createClusterRoleBinding(ctx context.Context, k8sClient client.Client, name
},
}
if err := k8sClient.Get(ctx, client.ObjectKey{Name: v1alpha1.NebulaRoleBindingName}, binding); err != nil {
if apierrors.IsNotFound(err) {
if err := k8sClient.Create(ctx, binding); err != nil {
if apierrors.IsAlreadyExists(err) {
return nil
}
if !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to get ClusterRoleBinding: %v", err)
}
if err := k8sClient.Create(ctx, binding); err != nil {
if !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("failed to create ClusterRoleBinding: %v", err)
}
return nil
}
return err
}
if !isApplied(binding.Subjects, namespace) {
binding.Subjects = append(binding.Subjects, rbacv1.Subject{
Expand All @@ -126,17 +118,11 @@ func createServiceAccount(ctx context.Context, k8sClient client.Client, namespac
Namespace: namespace,
},
}
if err := k8sClient.Get(ctx, client.ObjectKey{Name: v1alpha1.NebulaServiceAccountName, Namespace: namespace}, &corev1.ServiceAccount{}); err != nil {
if apierrors.IsNotFound(err) {
if err := k8sClient.Create(ctx, &serviceAccount); err != nil {
if apierrors.IsAlreadyExists(err) {
return nil
}
return fmt.Errorf("failed to create ServiceAccount: %v", err)
}
if err := k8sClient.Create(ctx, &serviceAccount); err != nil {
if apierrors.IsAlreadyExists(err) {
return nil
}
return err
return fmt.Errorf("failed to create nebula ServiceAccount: %v", err)
}
return nil
}
Expand Down

0 comments on commit 483cf0d

Please sign in to comment.