Skip to content

Commit

Permalink
optimize: optimize synchronization procedure (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
ptyin authored Feb 7, 2024
1 parent 3775b03 commit fe4f87a
Show file tree
Hide file tree
Showing 13 changed files with 442 additions and 2,089 deletions.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ RUN go mod download
COPY main.go main.go
COPY api/ api/
COPY controllers/ controllers/
COPY pkg/ pkg/

# Build
# the GOARCH has not a default value to allow the binary be built according to the host where the command
Expand Down
4 changes: 3 additions & 1 deletion api/v1alpha1/seataserver_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ type SeataServerSpec struct {

// SeataServerStatus defines the observed state of SeataServer
type SeataServerStatus struct {
Deployed bool `json:"deployed"`
Synchronized bool `json:"synchronized"`
Replicas int32 `json:"replicas"`
ReadyReplicas int32 `json:"readyReplicas,omitempty"`
}

//+kubebuilder:object:root=true
Expand Down
11 changes: 9 additions & 2 deletions config/crd/bases/operator.seata.apache.org_seataservers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,17 @@ spec:
status:
description: SeataServerStatus defines the observed state of SeataServer
properties:
deployed:
readyReplicas:
format: int32
type: integer
replicas:
format: int32
type: integer
synchronized:
type: boolean
required:
- deployed
- replicas
- synchronized
type: object
type: object
served: true
Expand Down
161 changes: 121 additions & 40 deletions controllers/seataserver_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,19 @@ package controllers
import (
"context"
"fmt"
"github.com/go-logr/logr"
"github.com/apache/seata-k8s/pkg/seata"
appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"time"

seatav1alpha1 "github.com/apache/seata-k8s/api/v1alpha1"
)
Expand All @@ -37,6 +43,10 @@ type SeataServerReconciler struct {
Scheme *runtime.Scheme
}

type reconcileFun func(ctx context.Context, s *seatav1alpha1.SeataServer) error

const RequeueSeconds = 10

//+kubebuilder:rbac:groups=operator.seata.apache.org,resources=seataservers,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=operator.seata.apache.org,resources=seataservers/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=operator.seata.apache.org,resources=seataservers/finalizers,verbs=update
Expand Down Expand Up @@ -68,41 +78,129 @@ func (r *SeataServerReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
setupDefaults(s)

// create or update a service
service := initService(s)
if err := createOrUpdate(ctx, r, "Service", service, func() error {
updateService(service, s)
return controllerutil.SetControllerReference(s, service, r.Scheme)
}, logger); err != nil {
logger.Error(err, fmt.Sprintf("Failed to create resource Service(%v)", req.NamespacedName))
return ctrl.Result{}, err
for _, fun := range []reconcileFun{
r.reconcileHeadlessService,
r.reconcileStatefulSet,
} {
if err := fun(ctx, s); err != nil {
return reconcile.Result{}, err
}
}

// create or update stateful sets
statefulSet := initStatefulSet(s)
if err := createOrUpdate(ctx, r, "StatefulSet", statefulSet, func() error {
updateStatefulSet(ctx, statefulSet, s)
return controllerutil.SetControllerReference(s, statefulSet, r.Scheme)
}, logger); err != nil {
logger.Error(err, fmt.Sprintf("Failed to create resource StatefulSet(%v)", req.NamespacedName))
return ctrl.Result{}, err
if !s.Status.Synchronized {
logger.Info(fmt.Sprintf("SeataServer(%v) has not been synchronized yet, requeue in %d seconds",
req.NamespacedName, RequeueSeconds))
return ctrl.Result{Requeue: true, RequeueAfter: RequeueSeconds * time.Second}, nil
}

if !s.Status.Deployed {
s.Status.Deployed = true
if err := r.Status().Update(ctx, s); err != nil {
logger.Error(err, "Failed to update SeataServer/status")
return ctrl.Result{}, err
return ctrl.Result{}, nil
}

func (r *SeataServerReconciler) reconcileHeadlessService(ctx context.Context, s *seatav1alpha1.SeataServer) (err error) {
logger := log.FromContext(ctx)

svc := seata.MakeHeadlessService(s)
if err := controllerutil.SetControllerReference(s, svc, r.Scheme); err != nil {
return err
}
foundSvc := &apiv1.Service{}
err = r.Client.Get(ctx, types.NamespacedName{
Name: svc.Name,
Namespace: svc.Namespace,
}, foundSvc)
if err != nil && errors.IsNotFound(err) {
logger.Info(fmt.Sprintf("Creating a new SeataServer Service {%s:%s}",
svc.Namespace, svc.Name))
err = r.Client.Create(ctx, svc)
if err != nil {
return err
}
return nil
} else if err != nil {
return err
} else {
logger.Info(fmt.Sprintf("Updating existing SeataServer StatefulSet {%s:%s}",
foundSvc.Namespace, foundSvc.Name))

seata.SyncService(foundSvc, svc)
err = r.Client.Update(ctx, foundSvc)
if err != nil {
return err
}
}
return nil
}

return ctrl.Result{}, nil
func (r *SeataServerReconciler) reconcileStatefulSet(ctx context.Context, s *seatav1alpha1.SeataServer) (err error) {
logger := log.FromContext(ctx)

sts := seata.MakeStatefulSet(s)
if err := controllerutil.SetControllerReference(s, sts, r.Scheme); err != nil {
return err
}
foundSts := &appsv1.StatefulSet{}
err = r.Client.Get(ctx, types.NamespacedName{
Name: sts.Name,
Namespace: sts.Namespace,
}, foundSts)
if err != nil && errors.IsNotFound(err) {
logger.Info(fmt.Sprintf("Creating a new SeataServer StatefulSet {%s:%s}",
sts.Namespace, sts.Name))
err = r.Client.Create(ctx, sts)
if err != nil {
return err
}
return nil
} else if err != nil {
return err
} else {
err = r.updateStatefulSet(ctx, s, foundSts, sts)
if err != nil {
return err
}
}

return nil
}

func (r *SeataServerReconciler) updateStatefulSet(ctx context.Context, s *seatav1alpha1.SeataServer,
foundSts *appsv1.StatefulSet, sts *appsv1.StatefulSet) (err error) {
logger := log.FromContext(ctx)

logger.Info(fmt.Sprintf("Updating existing SeataServer StatefulSet {%s:%s}", foundSts.Namespace, foundSts.Name))
seata.SyncStatefulSet(foundSts, sts)

err = r.Client.Update(ctx, foundSts)
if err != nil {
return err
}
s.Status.Replicas = foundSts.Status.Replicas
s.Status.ReadyReplicas = foundSts.Status.ReadyReplicas

readySize := foundSts.Status.ReadyReplicas
newSize := *sts.Spec.Replicas
if readySize != newSize {
s.Status.Synchronized = false
}
if readySize == newSize && !s.Status.Synchronized {
if err = seata.SyncRaftCluster(ctx, s); err != nil {
logger.Error(err, "Failed to synchronize the raft cluster")
s.Status.Synchronized = false
} else {
logger.Info("Successfully synchronized the raft cluster")
s.Status.Synchronized = true
}
}
return r.Client.Status().Update(ctx, s)
}

// SetupWithManager sets up the controller with the Manager.
func (r *SeataServerReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&seatav1alpha1.SeataServer{}).
Owns(&appsv1.StatefulSet{}).
Owns(&apiv1.Service{}).
WithEventFilter(predicate.GenerationChangedPredicate{}).
Complete(r)
}

Expand All @@ -113,20 +211,3 @@ func setupDefaults(s *seatav1alpha1.SeataServer) {
s.Spec.Ports.RaftPort = 9091
}
}

func createOrUpdate(ctx context.Context, r *SeataServerReconciler, kind string, object client.Object,
f controllerutil.MutateFn, logger logr.Logger) error {
key := client.ObjectKeyFromObject(object)
status, err := controllerutil.CreateOrUpdate(ctx, r.Client, object, f)
if err != nil {
logger.Error(err, fmt.Sprintf("Failed to createOrUpdate object {%s:%v}", kind, key))
return err
}

logger.Info(fmt.Sprintf("createOrUpdate object {%s:%v} : %s", kind, key, status))
return nil
}

func makeLabels(name string) map[string]string {
return map[string]string{"cr_name": name}
}
47 changes: 0 additions & 47 deletions controllers/service.go

This file was deleted.

Loading

0 comments on commit fe4f87a

Please sign in to comment.