Skip to content

Commit

Permalink
Add pipeline state to k8s resource status (#591)
Browse files Browse the repository at this point in the history
* Formatting

* Add context to logger error message

* Add pipeline status to k8s resource condition constructor

* Pass pipeline status from scheduler subscription to k8s resource status

* Set pipeline status on k8s resource on unload requests

* Add pipeline status to k8s resource on failed loads

* Organise imports into stdlib/3rd party/application

* Convert status enum to string for pipeline conditions
  • Loading branch information
agrski authored Nov 16, 2022
1 parent 67da21f commit fdc50c1
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 17 deletions.
10 changes: 8 additions & 2 deletions operator/apis/mlops/v1alpha1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,15 +219,21 @@ func (ps *PipelineStatus) SetCondition(conditionType apis.ConditionType, conditi
}
}

func (ps *PipelineStatus) CreateAndSetCondition(conditionType apis.ConditionType, isTrue bool, reason string) {
func (ps *PipelineStatus) CreateAndSetCondition(
conditionType apis.ConditionType,
isTrue bool,
reason scheduler.PipelineVersionState_PipelineStatus,
description string,
) {
condition := apis.Condition{}
if isTrue {
condition.Status = v1.ConditionTrue
} else {
condition.Status = v1.ConditionFalse
}
condition.Type = conditionType
condition.Reason = reason
condition.Reason = reason.String()
condition.Message = description
condition.LastTransitionTime = apis.VolatileTime{
Inner: metav1.Now(),
}
Expand Down
16 changes: 7 additions & 9 deletions operator/controllers/mlops/pipeline_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,20 @@ import (
"context"

"github.com/go-logr/logr"

"sigs.k8s.io/controller-runtime/pkg/predicate"

"github.com/seldonio/seldon-core/operatorv2/pkg/constants"
"github.com/seldonio/seldon-core/operatorv2/pkg/utils"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

mlopsv1alpha1 "github.com/seldonio/seldon-core/operatorv2/apis/mlops/v1alpha1"
"github.com/seldonio/seldon-core/operatorv2/pkg/constants"
"github.com/seldonio/seldon-core/operatorv2/pkg/utils"
scheduler "github.com/seldonio/seldon-core/operatorv2/scheduler"
schedulerAPI "github.com/seldonio/seldon-core/scheduler/apis/mlops/scheduler"
)

// PipelineReconciler reconciles a Pipeline object
Expand Down Expand Up @@ -127,7 +125,7 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
}

func (r *PipelineReconciler) updateStatusFromError(ctx context.Context, logger logr.Logger, pipeline *mlopsv1alpha1.Pipeline, err error) {
pipeline.Status.CreateAndSetCondition(mlopsv1alpha1.PipelineReady, false, err.Error())
pipeline.Status.CreateAndSetCondition(mlopsv1alpha1.PipelineReady, false, schedulerAPI.PipelineVersionState_PipelineFailed, err.Error())
if errSet := r.Status().Update(ctx, pipeline); errSet != nil {
logger.Error(errSet, "Failed to set status on pipeline on error", "pipeline", pipeline.Name, "error", err.Error())
}
Expand Down
35 changes: 29 additions & 6 deletions operator/scheduler/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ func (s *SchedulerClient) UnloadPipeline(ctx context.Context, pipeline *v1alpha1
if err != nil {
return err, s.checkErrorRetryable(pipeline.Kind, pipeline.Name, err)
}
pipeline.Status.CreateAndSetCondition(v1alpha1.PipelineReady, false, "Pipeline terminating")
pipeline.Status.CreateAndSetCondition(
v1alpha1.PipelineReady,
false,
scheduler.PipelineVersionState_PipelineTerminating,
"Pipeline unload requested",
)
_ = s.updatePipelineStatusImpl(pipeline)
return nil, false
}
Expand All @@ -65,30 +70,45 @@ func (s *SchedulerClient) SubscribePipelineEvents(ctx context.Context) error {
if err != nil {
return err
}

for {
event, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
logger.Error(err, "event recv failed")
logger.Error(err, "failed to receive pipeline event")
return err
}

if len(event.Versions) != 1 {
logger.Info("Unexpected number of pipeline versions", "numVersions", len(event.Versions), "pipeline", event.PipelineName)
continue
}

pv := event.Versions[0]
if pv.GetPipeline().GetKubernetesMeta() == nil {
logger.Info("Received pipeline event with no k8s metadata so ignoring", "pipeline", event.PipelineName)
continue
}
logger.Info("Received event", "pipeline", event.PipelineName, "generation", pv.GetPipeline().GetKubernetesMeta().Generation, "version", pv.GetPipeline().Version, "State", pv.GetState().String())

logger.Info(
"Received event",
"pipeline", event.PipelineName,
"generation", pv.GetPipeline().GetKubernetesMeta().Generation,
"version", pv.GetPipeline().Version,
"State", pv.GetState().String(),
)

pipeline := &v1alpha1.Pipeline{}
err = s.Get(ctx, client.ObjectKey{Name: event.PipelineName, Namespace: pv.GetPipeline().GetKubernetesMeta().GetNamespace()}, pipeline)
if err != nil {
logger.Error(err, "Failed to get pipeline", "name", event.PipelineName, "namespace", pv.GetPipeline().GetKubernetesMeta().GetNamespace())
logger.Error(
err,
"Failed to get pipeline",
"name", event.PipelineName,
"namespace", pv.GetPipeline().GetKubernetesMeta().GetNamespace(),
)
continue
}

Expand Down Expand Up @@ -124,19 +144,22 @@ func (s *SchedulerClient) SubscribePipelineEvents(ctx context.Context) error {
if err != nil {
return err
}

if pv.GetPipeline().GetKubernetesMeta().GetGeneration() != pipeline.Generation {
logger.Info("Ignoring event for old generation", "currentGeneration", pipeline.Generation, "eventGeneration", pv.GetPipeline().GetKubernetesMeta().GetGeneration(), "server", event.PipelineName)
return nil
}

// Handle status update
switch pv.State.Status {
case scheduler.PipelineVersionState_PipelineReady:
logger.Info("Setting pipeline to ready", "pipeline", event.PipelineName, "generation", pipeline.Generation)
pipeline.Status.CreateAndSetCondition(v1alpha1.PipelineReady, true, pv.State.Reason)
pipeline.Status.CreateAndSetCondition(v1alpha1.PipelineReady, true, pv.State.Status, pv.State.Reason)
default:
logger.Info("Setting pipeline to not ready", "pipeline", event.PipelineName, "generation", pipeline.Generation)
pipeline.Status.CreateAndSetCondition(v1alpha1.PipelineReady, false, pv.State.Reason)
pipeline.Status.CreateAndSetCondition(v1alpha1.PipelineReady, false, pv.State.Status, pv.State.Reason)
}

return s.updatePipelineStatusImpl(pipeline)
})
if retryErr != nil {
Expand Down

0 comments on commit fdc50c1

Please sign in to comment.