Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize: optimize synchronization procedure #14

Merged
merged 2 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ RUN go mod download
COPY main.go main.go
COPY api/ api/
COPY controllers/ controllers/
COPY pkg/ pkg/

# Build
# the GOARCH has not a default value to allow the binary be built according to the host where the command
Expand Down
4 changes: 3 additions & 1 deletion api/v1alpha1/seataserver_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ type SeataServerSpec struct {

// SeataServerStatus defines the observed state of SeataServer
type SeataServerStatus struct {
Deployed bool `json:"deployed"`
Synchronized bool `json:"synchronized"`
Replicas int32 `json:"replicas"`
ReadyReplicas int32 `json:"readyReplicas,omitempty"`
}

//+kubebuilder:object:root=true
Expand Down
11 changes: 9 additions & 2 deletions config/crd/bases/operator.seata.apache.org_seataservers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,17 @@ spec:
status:
description: SeataServerStatus defines the observed state of SeataServer
properties:
deployed:
readyReplicas:
format: int32
type: integer
replicas:
format: int32
type: integer
synchronized:
type: boolean
required:
- deployed
- replicas
- synchronized
type: object
type: object
served: true
Expand Down
161 changes: 121 additions & 40 deletions controllers/seataserver_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,19 @@ package controllers
import (
"context"
"fmt"
"github.com/go-logr/logr"
"github.com/apache/seata-k8s/pkg/seata"
appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"time"

seatav1alpha1 "github.com/apache/seata-k8s/api/v1alpha1"
)
Expand All @@ -37,6 +43,10 @@ type SeataServerReconciler struct {
Scheme *runtime.Scheme
}

type reconcileFun func(ctx context.Context, s *seatav1alpha1.SeataServer) error

const RequeueSeconds = 10

//+kubebuilder:rbac:groups=operator.seata.apache.org,resources=seataservers,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=operator.seata.apache.org,resources=seataservers/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=operator.seata.apache.org,resources=seataservers/finalizers,verbs=update
Expand Down Expand Up @@ -68,41 +78,129 @@ func (r *SeataServerReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
setupDefaults(s)

// create or update a service
service := initService(s)
if err := createOrUpdate(ctx, r, "Service", service, func() error {
updateService(service, s)
return controllerutil.SetControllerReference(s, service, r.Scheme)
}, logger); err != nil {
logger.Error(err, fmt.Sprintf("Failed to create resource Service(%v)", req.NamespacedName))
return ctrl.Result{}, err
for _, fun := range []reconcileFun{
r.reconcileHeadlessService,
r.reconcileStatefulSet,
} {
if err := fun(ctx, s); err != nil {
return reconcile.Result{}, err
}
}

// create or update stateful sets
statefulSet := initStatefulSet(s)
if err := createOrUpdate(ctx, r, "StatefulSet", statefulSet, func() error {
updateStatefulSet(ctx, statefulSet, s)
return controllerutil.SetControllerReference(s, statefulSet, r.Scheme)
}, logger); err != nil {
logger.Error(err, fmt.Sprintf("Failed to create resource StatefulSet(%v)", req.NamespacedName))
return ctrl.Result{}, err
if !s.Status.Synchronized {
logger.Info(fmt.Sprintf("SeataServer(%v) has not been synchronized yet, requeue in %d seconds",
req.NamespacedName, RequeueSeconds))
return ctrl.Result{Requeue: true, RequeueAfter: RequeueSeconds * time.Second}, nil
}

if !s.Status.Deployed {
s.Status.Deployed = true
if err := r.Status().Update(ctx, s); err != nil {
logger.Error(err, "Failed to update SeataServer/status")
return ctrl.Result{}, err
return ctrl.Result{}, nil
}

func (r *SeataServerReconciler) reconcileHeadlessService(ctx context.Context, s *seatav1alpha1.SeataServer) (err error) {
logger := log.FromContext(ctx)

svc := seata.MakeHeadlessService(s)
if err := controllerutil.SetControllerReference(s, svc, r.Scheme); err != nil {
return err
}
foundSvc := &apiv1.Service{}
err = r.Client.Get(ctx, types.NamespacedName{
Name: svc.Name,
Namespace: svc.Namespace,
}, foundSvc)
if err != nil && errors.IsNotFound(err) {
logger.Info(fmt.Sprintf("Creating a new SeataServer Service {%s:%s}",
svc.Namespace, svc.Name))
err = r.Client.Create(ctx, svc)
if err != nil {
return err
}
return nil
} else if err != nil {
return err
} else {
logger.Info(fmt.Sprintf("Updating existing SeataServer StatefulSet {%s:%s}",
foundSvc.Namespace, foundSvc.Name))

seata.SyncService(foundSvc, svc)
err = r.Client.Update(ctx, foundSvc)
if err != nil {
return err
}
}
return nil
}

return ctrl.Result{}, nil
func (r *SeataServerReconciler) reconcileStatefulSet(ctx context.Context, s *seatav1alpha1.SeataServer) (err error) {
logger := log.FromContext(ctx)

sts := seata.MakeStatefulSet(s)
if err := controllerutil.SetControllerReference(s, sts, r.Scheme); err != nil {
return err
}
foundSts := &appsv1.StatefulSet{}
err = r.Client.Get(ctx, types.NamespacedName{
Name: sts.Name,
Namespace: sts.Namespace,
}, foundSts)
if err != nil && errors.IsNotFound(err) {
logger.Info(fmt.Sprintf("Creating a new SeataServer StatefulSet {%s:%s}",
sts.Namespace, sts.Name))
err = r.Client.Create(ctx, sts)
if err != nil {
return err
}
return nil
} else if err != nil {
return err
} else {
err = r.updateStatefulSet(ctx, s, foundSts, sts)
if err != nil {
return err
}
}

return nil
}

func (r *SeataServerReconciler) updateStatefulSet(ctx context.Context, s *seatav1alpha1.SeataServer,
foundSts *appsv1.StatefulSet, sts *appsv1.StatefulSet) (err error) {
logger := log.FromContext(ctx)

logger.Info(fmt.Sprintf("Updating existing SeataServer StatefulSet {%s:%s}", foundSts.Namespace, foundSts.Name))
seata.SyncStatefulSet(foundSts, sts)

err = r.Client.Update(ctx, foundSts)
if err != nil {
return err
}
s.Status.Replicas = foundSts.Status.Replicas
s.Status.ReadyReplicas = foundSts.Status.ReadyReplicas

readySize := foundSts.Status.ReadyReplicas
newSize := *sts.Spec.Replicas
if readySize != newSize {
s.Status.Synchronized = false
}
if readySize == newSize && !s.Status.Synchronized {
if err = seata.SyncRaftCluster(ctx, s); err != nil {
logger.Error(err, "Failed to synchronize the raft cluster")
s.Status.Synchronized = false
} else {
logger.Info("Successfully synchronized the raft cluster")
s.Status.Synchronized = true
}
}
return r.Client.Status().Update(ctx, s)
}

// SetupWithManager sets up the controller with the Manager.
func (r *SeataServerReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&seatav1alpha1.SeataServer{}).
Owns(&appsv1.StatefulSet{}).
Owns(&apiv1.Service{}).
WithEventFilter(predicate.GenerationChangedPredicate{}).
Complete(r)
}

Expand All @@ -113,20 +211,3 @@ func setupDefaults(s *seatav1alpha1.SeataServer) {
s.Spec.Ports.RaftPort = 9091
}
}

func createOrUpdate(ctx context.Context, r *SeataServerReconciler, kind string, object client.Object,
f controllerutil.MutateFn, logger logr.Logger) error {
key := client.ObjectKeyFromObject(object)
status, err := controllerutil.CreateOrUpdate(ctx, r.Client, object, f)
if err != nil {
logger.Error(err, fmt.Sprintf("Failed to createOrUpdate object {%s:%v}", kind, key))
return err
}

logger.Info(fmt.Sprintf("createOrUpdate object {%s:%v} : %s", kind, key, status))
return nil
}

func makeLabels(name string) map[string]string {
return map[string]string{"cr_name": name}
}
47 changes: 0 additions & 47 deletions controllers/service.go

This file was deleted.

Loading
Loading