Skip to content

Commit

Permalink
[K8s Array] Fix ResourceQuota usage + logs for partially running jobs (
Browse files Browse the repository at this point in the history
…flyteorg#175)

* Fix ResourceQuota usage + logs for partailly running jobs

Signed-off-by: Anmol Khurana <[email protected]>

* re-work log

Signed-off-by: Anmol Khurana <[email protected]>

* Use cache to lookup pod

Signed-off-by: Anmol Khurana <[email protected]>

* Update go/tasks/plugins/array/k8s/task.go

Co-authored-by: Haytham Abuelfutuh <[email protected]>
Signed-off-by: Anmol Khurana <[email protected]>

Co-authored-by: Haytham Abuelfutuh <[email protected]>
  • Loading branch information
akhurana001 and EngHabu authored May 18, 2021
1 parent ff648a8 commit b77864e
Show file tree
Hide file tree
Showing 7 changed files with 270 additions and 38 deletions.
173 changes: 173 additions & 0 deletions flyteplugins/go/tasks/pluginmachinery/core/mocks/fake_k8s_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package mocks

import (
"context"
"fmt"
"reflect"
"sync"

"sigs.k8s.io/controller-runtime/pkg/cache"

"k8s.io/apimachinery/pkg/api/meta"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type FakeKubeCache struct {
syncObj sync.RWMutex
Cache map[string]runtime.Object
}

func (m *FakeKubeCache) GetInformer(ctx context.Context, obj client.Object) (cache.Informer, error) {
panic("implement me")
}

func (m *FakeKubeCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (cache.Informer, error) {
panic("implement me")
}

func (m *FakeKubeCache) Start(ctx context.Context) error {
panic("implement me")
}

func (m *FakeKubeCache) WaitForCacheSync(ctx context.Context) bool {
panic("implement me")
}

func (m *FakeKubeCache) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error {
panic("implement me")
}

func (m *FakeKubeCache) Get(ctx context.Context, key client.ObjectKey, out client.Object) error {
m.syncObj.RLock()
defer m.syncObj.RUnlock()

item, found := m.Cache[formatKey(key, out.GetObjectKind().GroupVersionKind())]
if found {
// deep copy to avoid mutating cache
item = item.(runtime.Object).DeepCopyObject()
_, isUnstructured := out.(*unstructured.Unstructured)
if isUnstructured {
// Copy the value of the item in the cache to the returned value
outVal := reflect.ValueOf(out)
objVal := reflect.ValueOf(item)
if !objVal.Type().AssignableTo(outVal.Type()) {
return fmt.Errorf("cache had type %s, but %s was asked for", objVal.Type(), outVal.Type())
}
reflect.Indirect(outVal).Set(reflect.Indirect(objVal))
return nil
}

p, err := runtime.DefaultUnstructuredConverter.ToUnstructured(item)
if err != nil {
return err
}

return runtime.DefaultUnstructuredConverter.FromUnstructured(p, out)
}

return errors.NewNotFound(schema.GroupResource{}, key.Name)
}

func (m *FakeKubeCache) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error {
m.syncObj.RLock()
defer m.syncObj.RUnlock()

objs := make([]runtime.Object, 0, len(m.Cache))

listOptions := &client.ListOptions{}
for _, opt := range opts {
opt.ApplyToList(listOptions)
}

for _, val := range m.Cache {
if listOptions.Raw != nil {
if val.GetObjectKind().GroupVersionKind().Kind != listOptions.Raw.Kind {
continue
}

if val.GetObjectKind().GroupVersionKind().GroupVersion().String() != listOptions.Raw.APIVersion {
continue
}
}

objs = append(objs, val.(runtime.Object).DeepCopyObject())
}

return meta.SetList(list, objs)
}

func (m *FakeKubeCache) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) (err error) {
m.syncObj.Lock()
defer m.syncObj.Unlock()

accessor, err := meta.Accessor(obj)
if err != nil {
return err
}

key := formatKey(types.NamespacedName{
Name: accessor.GetName(),
Namespace: accessor.GetNamespace(),
}, obj.GetObjectKind().GroupVersionKind())

if _, exists := m.Cache[key]; !exists {
m.Cache[key] = obj
return nil
}

return errors.NewAlreadyExists(schema.GroupResource{}, accessor.GetName())
}

func (m *FakeKubeCache) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error {
m.syncObj.Lock()
defer m.syncObj.Unlock()

accessor, err := meta.Accessor(obj)
if err != nil {
return err
}

key := formatKey(types.NamespacedName{
Name: accessor.GetName(),
Namespace: accessor.GetNamespace(),
}, obj.GetObjectKind().GroupVersionKind())

delete(m.Cache, key)

return nil
}

func (m *FakeKubeCache) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error {
m.syncObj.Lock()
defer m.syncObj.Unlock()

accessor, err := meta.Accessor(obj)
if err != nil {
return err
}

key := formatKey(types.NamespacedName{
Name: accessor.GetName(),
Namespace: accessor.GetNamespace(),
}, obj.GetObjectKind().GroupVersionKind())

if _, exists := m.Cache[key]; exists {
m.Cache[key] = obj
return nil
}

return errors.NewNotFound(schema.GroupResource{}, accessor.GetName())
}

func NewFakeKubeCache() *FakeKubeCache {
return &FakeKubeCache{
syncObj: sync.RWMutex{},
Cache: map[string]runtime.Object{},
}
}
9 changes: 8 additions & 1 deletion flyteplugins/go/tasks/pluginmachinery/core/phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,20 @@ func PhaseInfoNotReady(t time.Time, version uint32, reason string) PhaseInfo {
return pi
}

// Return in the case the plugin is not ready to start
// Deprecated: Please use PhaseInfoWaitingForResourcesInfo instead
func PhaseInfoWaitingForResources(t time.Time, version uint32, reason string) PhaseInfo {
pi := phaseInfo(PhaseWaitingForResources, version, nil, &TaskInfo{OccurredAt: &t})
pi.reason = reason
return pi
}

// Return in the case the plugin is not ready to start
func PhaseInfoWaitingForResourcesInfo(t time.Time, version uint32, reason string, info *TaskInfo) PhaseInfo {
pi := phaseInfo(PhaseWaitingForResources, version, nil, info)
pi.reason = reason
return pi
}

func PhaseInfoQueued(t time.Time, version uint32, reason string) PhaseInfo {
pi := phaseInfo(PhaseQueued, version, nil, &TaskInfo{OccurredAt: &t})
pi.reason = reason
Expand Down
2 changes: 1 addition & 1 deletion flyteplugins/go/tasks/plugins/array/core/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func MapArrayStateToPluginPhase(_ context.Context, state *State, logLinks []*idl
phaseInfo = core.PhaseInfoRunning(version, nowTaskInfo)

case PhaseWaitingForResources:
phaseInfo = core.PhaseInfoWaitingForResources(t, version, state.GetReason())
phaseInfo = core.PhaseInfoWaitingForResourcesInfo(t, version, state.GetReason(), nowTaskInfo)

case PhaseCheckingSubTaskExecutions:
// For future Running core.Phases, we have to make sure we don't use an earlier Admin version number,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func init() {
func newMockExecutor(ctx context.Context, t testing.TB) (Executor, array.AdvanceIteration) {
kubeClient := &mocks.KubeClient{}
kubeClient.OnGetClient().Return(mocks.NewFakeKubeClient())
kubeClient.OnGetCache().Return(mocks.NewFakeKubeCache())
e, err := NewExecutor(kubeClient, &Config{
MaxErrorStringLength: 200,
OutputAssembler: workqueue.Config{
Expand Down
58 changes: 46 additions & 12 deletions flyteplugins/go/tasks/plugins/array/k8s/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon
existingPhase := core.Phases[existingPhaseIdx]
indexStr := strconv.Itoa(childIdx)
podName := formatSubTaskName(ctx, tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), indexStr)

if existingPhase.IsTerminal() {
// If we get here it means we have already "processed" this terminal phase since we will only persist
// the phase after all processing is done (e.g. check outputs/errors file, record events... etc.).
Expand All @@ -83,21 +84,36 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon
}
newArrayStatus.Summary.Inc(existingPhase)
newArrayStatus.Detailed.SetItem(childIdx, bitarray.Item(existingPhase))
originalIdx := arrayCore.CalculateOriginalIndex(childIdx, newState.GetIndexesToCache())

phaseInfo, err := FetchPodStatusAndLogs(ctx, kubeClient,
k8sTypes.NamespacedName{
Name: podName,
Namespace: GetNamespaceForExecution(tCtx, config.NamespaceTemplate),
},
originalIdx,
tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID().RetryAttempt,
logPlugin)

if err != nil {
return currentState, logLinks, subTaskIDs, err
}

if phaseInfo.Info() != nil {
logLinks = append(logLinks, phaseInfo.Info().Logs...)
}

// TODO: collect log links before doing this
continue
}

task := &Task{
LogLinks: logLinks,
State: newState,
NewArrayStatus: newArrayStatus,
Config: config,
ChildIdx: childIdx,
MessageCollector: &msg,
SubTaskIDs: subTaskIDs,
}

// The first time we enter this state we will launch every subtask. On subsequent rounds, the pod
// has already been created so we return a Success value and continue with the Monitor step.
var launchResult LaunchResult
Expand All @@ -121,8 +137,11 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon
}

var monitorResult MonitorResult
monitorResult, err = task.Monitor(ctx, tCtx, kubeClient, dataStore, outputPrefix, baseOutputDataSandbox, logPlugin)
logLinks = task.LogLinks
monitorResult, taskLogs, err := task.Monitor(ctx, tCtx, kubeClient, dataStore, outputPrefix, baseOutputDataSandbox, logPlugin)

if len(taskLogs) > 0 {
logLinks = append(logLinks, taskLogs...)
}
subTaskIDs = task.SubTaskIDs

if monitorResult != MonitorSuccess {
Expand Down Expand Up @@ -214,20 +233,35 @@ func FetchPodStatusAndLogs(ctx context.Context, client core.KubeClient, name k8s
taskInfo.Logs = o.TaskLogs
}
}

var phaseInfo core.PhaseInfo
var err2 error

switch pod.Status.Phase {
case v1.PodSucceeded:
return flytek8s.DemystifySuccess(pod.Status, taskInfo)
phaseInfo, err2 = flytek8s.DemystifySuccess(pod.Status, taskInfo)
case v1.PodFailed:
code, message := flytek8s.ConvertPodFailureToError(pod.Status)
return core.PhaseInfoRetryableFailure(code, message, &taskInfo), nil
phaseInfo = core.PhaseInfoRetryableFailure(code, message, &taskInfo)
case v1.PodPending:
return flytek8s.DemystifyPending(pod.Status)
phaseInfo, err2 = flytek8s.DemystifyPending(pod.Status)
case v1.PodUnknown:
return core.PhaseInfoUndefined, nil
phaseInfo = core.PhaseInfoUndefined
default:
if len(taskInfo.Logs) > 0 {
phaseInfo = core.PhaseInfoRunning(core.DefaultPhaseVersion+1, &taskInfo)
} else {
phaseInfo = core.PhaseInfoRunning(core.DefaultPhaseVersion, &taskInfo)
}
}
if len(taskInfo.Logs) > 0 {
return core.PhaseInfoRunning(core.DefaultPhaseVersion+1, &taskInfo), nil

if err2 == nil && phaseInfo.Info() != nil {
// Append sub-job status in Log Name for viz.
for _, log := range phaseInfo.Info().Logs {
log.Name += fmt.Sprintf(" (%s)", phaseInfo.Phase().String())
}
}
return core.PhaseInfoRunning(core.DefaultPhaseVersion, &taskInfo), nil

return phaseInfo, err2

}
6 changes: 6 additions & 0 deletions flyteplugins/go/tasks/plugins/array/k8s/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ func TestCheckSubTasksState(t *testing.T) {
tCtx := getMockTaskExecutionContext(ctx)
kubeClient := mocks.KubeClient{}
kubeClient.OnGetClient().Return(mocks.NewFakeKubeClient())
kubeClient.OnGetCache().Return(mocks.NewFakeKubeCache())

resourceManager := mocks.ResourceManager{}
resourceManager.OnAllocateResourceMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(core.AllocationStatusExhausted, nil)
tCtx.OnResourceManager().Return(&resourceManager)
Expand Down Expand Up @@ -213,6 +215,8 @@ func TestCheckSubTasksStateResourceGranted(t *testing.T) {
tCtx := getMockTaskExecutionContext(ctx)
kubeClient := mocks.KubeClient{}
kubeClient.OnGetClient().Return(mocks.NewFakeKubeClient())
kubeClient.OnGetCache().Return(mocks.NewFakeKubeCache())

resourceManager := mocks.ResourceManager{}

resourceManager.OnAllocateResourceMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(core.AllocationStatusGranted, nil)
Expand Down Expand Up @@ -264,12 +268,14 @@ func TestCheckSubTasksStateResourceGranted(t *testing.T) {
arrayStatus.Detailed.SetItem(childIdx, bitarray.Item(core.PhaseSuccess))

}
cacheIndexes := bitarray.NewBitSet(5)
newState, _, subTaskIDs, err := LaunchAndCheckSubTasksState(ctx, tCtx, &kubeClient, &config, nil, "/prefix/", "/prefix-sand/", &arrayCore.State{
CurrentPhase: arrayCore.PhaseCheckingSubTaskExecutions,
ExecutionArraySize: 5,
OriginalArraySize: 10,
OriginalMinSuccesses: 5,
ArrayStatus: *arrayStatus,
IndexesToCache: cacheIndexes,
})

assert.Nil(t, err)
Expand Down
Loading

0 comments on commit b77864e

Please sign in to comment.