Skip to content

Commit

Permalink
Use uncached client and check that pod is in running phase before init
Browse files Browse the repository at this point in the history
  • Loading branch information
AMecea authored and calind committed Jun 4, 2019
1 parent 027537c commit 470365f
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 16 deletions.
48 changes: 34 additions & 14 deletions pkg/controller/node/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ var log = logf.Log.WithName(controllerName)
const controllerName = "controller.mysqlNode"

// mysqlReconciliationTimeout the time that should last a reconciliation (this is used as a MySQL timout too)
const mysqlReconciliationTimeout = 10 * time.Second
const mysqlReconciliationTimeout = 5 * time.Second

// skipGTIDPurgedAnnotations, if this annotations is set on the cluster then the node controller skip setting GTID_PURGED variable.
// this is the case for the upgrade when the old cluster has already set GTID_PURGED
Expand All @@ -65,13 +65,18 @@ func Add(mgr manager.Manager) error {

// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager, sqlI sqlFactoryFunc) reconcile.Reconciler {
newClient, err := client.New(mgr.GetConfig(), client.Options{Scheme: mgr.GetScheme()})
if err != nil {
panic(err)
}

return &ReconcileMysqlNode{
// TODO(amecea): use client without cache here
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
recorder: mgr.GetRecorder(controllerName),
opt: options.GetOptions(),
sqlFactory: sqlI,
Client: mgr.GetClient(),
unCachedClient: newClient,
scheme: mgr.GetScheme(),
recorder: mgr.GetRecorder(controllerName),
opt: options.GetOptions(),
sqlFactory: sqlI,
}
}

Expand Down Expand Up @@ -99,6 +104,12 @@ func isInitialized(obj runtime.Object) bool {
return false
}

func isRunning(obj runtime.Object) bool {
pod := obj.(*corev1.Pod)

return pod.Status.Phase == corev1.PodRunning
}

// add adds a new Controller to mgr with r as the reconcile.Reconciler
func add(mgr manager.Manager, r reconcile.Reconciler) error {
// Create a new controller
Expand All @@ -110,11 +121,18 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
// Watch for changes to MysqlCluster
err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{}, predicate.Funcs{
CreateFunc: func(evt event.CreateEvent) bool {
return isOwnedByMySQL(evt.Meta) && !isInitialized(evt.Object)
log.Info("pod create event", "event", evt)
return false
},

// trigger node initialization only on pod update, after pod is created for a while
// also the pod should not be initialized before and should be running because the init
// timeout is ~5s (see above) and the cluster status can become obsolete
UpdateFunc: func(evt event.UpdateEvent) bool {
return isOwnedByMySQL(evt.MetaNew) && !isInitialized(evt.ObjectNew)
log.Info("pod update event", "event", evt.ObjectNew)
return isOwnedByMySQL(evt.MetaNew) && !isInitialized(evt.ObjectNew) && isRunning(evt.ObjectNew)
},

DeleteFunc: func(evt event.DeleteEvent) bool {
return false
},
Expand All @@ -131,9 +149,11 @@ var _ reconcile.Reconciler = &ReconcileMysqlNode{}
// ReconcileMysqlNode reconciles a MysqlCluster object
type ReconcileMysqlNode struct {
client.Client
scheme *runtime.Scheme
recorder record.EventRecorder
opt *options.Options

unCachedClient client.Client
scheme *runtime.Scheme
recorder record.EventRecorder
opt *options.Options

sqlFactory sqlFactoryFunc
}
Expand Down Expand Up @@ -179,7 +199,7 @@ func (r *ReconcileMysqlNode) Reconcile(request reconcile.Request) (reconcile.Res
fip := cluster.GetClusterCondition(api.ClusterConditionFailoverInProgress)
if fip != nil && fip.Status == corev1.ConditionTrue {
log.Info("cluster has a failover in progress, delaying new node sync", "pod", pod.Spec.Hostname, "since", fip.LastTransitionTime)
return reconcile.Result{}, nil
return reconcile.Result{}, fmt.Errorf("delay node sync because a failover is in progress")
}

// if it's a old version cluster then don't do anything
Expand Down Expand Up @@ -288,7 +308,7 @@ func (r *ReconcileMysqlNode) getNodeCluster(ctx context.Context, pod *corev1.Pod
Namespace: pod.Namespace,
}
cluster := mysqlcluster.New(&api.MysqlCluster{})
err := r.Get(ctx, clusterKey, cluster.Unwrap())
err := r.unCachedClient.Get(ctx, clusterKey, cluster.Unwrap())
return cluster, err
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/orchestrator/orchestrator_reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ func (ou *orcUpdater) updateStatusFromOrc(insts InstancesSet, master *orc.Instan
}

// check if the master is up to date and is not downtime to remove in progress failover condition
if master != nil && !master.IsDowntimed && master.IsUpToDate {
log.Info("cluster failover finished", "master", master.Key.Hostname)
if master != nil && !master.IsDowntimed && master.SecondsSinceLastSeen.Valid && master.SecondsSinceLastSeen.Int64 < 5 {
log.Info("cluster failover finished", "master", master)
ou.cluster.UpdateStatusCondition(api.ClusterConditionFailoverInProgress, core.ConditionFalse,
"ClusterMasterHealthy", "Master is healthy in orchestrator")
}
Expand Down

0 comments on commit 470365f

Please sign in to comment.