Skip to content

Commit

Permalink
Merge remote-tracking branch 'monorepo-flytepropeller/master' into mo…
Browse files Browse the repository at this point in the history
…norepo--bump-flytepropeller-to-v1.1.129

Signed-off-by: Eduardo Apolinario <[email protected]>
  • Loading branch information
eapolinario committed Oct 2, 2023
2 parents b35cc95 + bb053a7 commit 57c0c4b
Show file tree
Hide file tree
Showing 17 changed files with 604 additions and 233 deletions.
10 changes: 5 additions & 5 deletions flytepropeller/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func getAdminClient(ctx context.Context) (client service.AdminServiceClient, sig
}

// New returns a new FlyteWorkflow controller
func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Interface, flytepropellerClientset clientset.Interface,
func New(ctx context.Context, cfg *config.Config, kubeClientset kubernetes.Interface, flytepropellerClientset clientset.Interface,
flyteworkflowInformerFactory informers.SharedInformerFactory, informerFactory k8sInformers.SharedInformerFactory,
kubeClient executors.Client, scope promutils.Scope) (*Controller, error) {

Expand Down Expand Up @@ -354,13 +354,13 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter
if err != nil {
return nil, errors.Wrapf(err, "Failed to create EventSink [%v], error %v", events.GetConfig(ctx).Type, err)
}
gc, err := NewGarbageCollector(cfg, scope, clock.RealClock{}, kubeclientset.CoreV1().Namespaces(), flytepropellerClientset.FlyteworkflowV1alpha1())
gc, err := NewGarbageCollector(cfg, scope, clock.RealClock{}, kubeClientset.CoreV1().Namespaces(), flytepropellerClientset.FlyteworkflowV1alpha1())
if err != nil {
logger.Errorf(ctx, "failed to initialize GC for workflows")
return nil, errors.Wrapf(err, "failed to initialize WF GC")
}

eventRecorder, err := utils.NewK8sEventRecorder(ctx, kubeclientset, controllerAgentName, cfg.PublishK8sEvents)
eventRecorder, err := utils.NewK8sEventRecorder(ctx, kubeClientset, controllerAgentName, cfg.PublishK8sEvents)
if err != nil {
logger.Errorf(ctx, "failed to event recorder %v", err)
return nil, errors.Wrapf(err, "failed to initialize resource lock.")
Expand All @@ -372,7 +372,7 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter
numWorkers: cfg.Workers,
}

lock, err := leader.NewResourceLock(kubeclientset.CoreV1(), kubeclientset.CoordinationV1(), eventRecorder, cfg.LeaderElection)
lock, err := leader.NewResourceLock(kubeClientset.CoreV1(), kubeClientset.CoordinationV1(), eventRecorder, cfg.LeaderElection)
if err != nil {
logger.Errorf(ctx, "failed to initialize resource lock.")
return nil, errors.Wrapf(err, "failed to initialize resource lock.")
Expand Down Expand Up @@ -440,7 +440,7 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter

recoveryClient := recovery.NewClient(adminClient)
nodeHandlerFactory, err := factory.NewHandlerFactory(ctx, launchPlanActor, launchPlanActor,
kubeClient, catalogClient, recoveryClient, &cfg.EventConfig, cfg.ClusterID, signalClient, scope)
kubeClient, kubeClientset, catalogClient, recoveryClient, &cfg.EventConfig, cfg.ClusterID, signalClient, scope)
if err != nil {
return nil, errors.Wrapf(err, "failed to create node handler factory")
}
Expand Down
2 changes: 2 additions & 0 deletions flytepropeller/pkg/controller/executors/dag_structure.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
)

//go:generate mockery -name DAGStructure -name DAGStructureWithStartNode -case=underscore

// An interface that captures the Directed Acyclic Graph structure in which the nodes are connected.
// If NodeLookup and DAGStructure are used together a traversal can be implemented.
type DAGStructure interface {
Expand Down
2 changes: 2 additions & 0 deletions flytepropeller/pkg/controller/executors/execution_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
)

// go:generate mockery -case=underscore

type TaskDetailsGetter interface {
GetTask(id v1alpha1.TaskID) (v1alpha1.ExecutableTask, error)
}
Expand Down
2 changes: 2 additions & 0 deletions flytepropeller/pkg/controller/executors/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

//go:generate mockery -name Client -case=underscore

// Client is a friendlier controller-runtime client that gets passed to executors
type Client interface {
// GetClient returns a client configured with the Config
Expand Down
2 changes: 2 additions & 0 deletions flytepropeller/pkg/controller/executors/node_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
)

//go:generate mockery -name NodeLookup -case=underscore

// NodeLookup provides a structure that enables looking up all nodes within the current execution hierarchy/context.
// NOTE: execution hierarchy may change the nodes available, this is because when a SubWorkflow is being executed, only
// the nodes within the subworkflow are visible
Expand Down
2 changes: 2 additions & 0 deletions flytepropeller/pkg/controller/executors/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
)

//go:generate mockery -name Workflow -case=underscore

type Workflow interface {
Initialize(ctx context.Context) error
HandleFlyteWorkflow(ctx context.Context, w *v1alpha1.FlyteWorkflow) error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package factory
import (
"context"

"k8s.io/client-go/kubernetes"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service"

"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/catalog"
Expand Down Expand Up @@ -33,6 +35,7 @@ type handlerFactory struct {
workflowLauncher launchplan.Executor
launchPlanReader launchplan.Reader
kubeClient executors.Client
kubeClientset kubernetes.Interface
catalogClient catalog.Client
recoveryClient recovery.Client
eventConfig *config.EventConfig
Expand All @@ -50,7 +53,7 @@ func (f *handlerFactory) GetHandler(kind v1alpha1.NodeKind) (interfaces.NodeHand
}

func (f *handlerFactory) Setup(ctx context.Context, executor interfaces.Node, setup interfaces.SetupContext) error {
t, err := task.New(ctx, f.kubeClient, f.catalogClient, f.eventConfig, f.clusterID, f.scope)
t, err := task.New(ctx, f.kubeClient, f.kubeClientset, f.catalogClient, f.eventConfig, f.clusterID, f.scope)
if err != nil {
return err
}
Expand Down Expand Up @@ -79,13 +82,14 @@ func (f *handlerFactory) Setup(ctx context.Context, executor interfaces.Node, se
}

func NewHandlerFactory(ctx context.Context, workflowLauncher launchplan.Executor, launchPlanReader launchplan.Reader,
kubeClient executors.Client, catalogClient catalog.Client, recoveryClient recovery.Client, eventConfig *config.EventConfig,
kubeClient executors.Client, kubeClientset kubernetes.Interface, catalogClient catalog.Client, recoveryClient recovery.Client, eventConfig *config.EventConfig,
clusterID string, signalClient service.SignalServiceClient, scope promutils.Scope) (interfaces.HandlerFactory, error) {

return &handlerFactory{
workflowLauncher: workflowLauncher,
launchPlanReader: launchPlanReader,
kubeClient: kubeClient,
kubeClientset: kubeClientset,
catalogClient: catalogClient,
recoveryClient: recoveryClient,
eventConfig: eventConfig,
Expand Down
20 changes: 11 additions & 9 deletions flytepropeller/pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"runtime/debug"
"time"

"k8s.io/client-go/kubernetes"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"
Expand Down Expand Up @@ -195,6 +197,7 @@ type Handler struct {
metrics *metrics
pluginRegistry PluginRegistryIface
kubeClient pluginCore.KubeClient
kubeClientset kubernetes.Interface
secretManager pluginCore.SecretManager
resourceManager resourcemanager.BaseResourceManager
cfg *config.Config
Expand Down Expand Up @@ -229,7 +232,7 @@ func (t *Handler) Setup(ctx context.Context, sCtx interfaces.SetupContext) error

// Create the resource negotiator here
// and then convert it to proxies later and pass them to plugins
enabledPlugins, defaultForTaskTypes, err := WranglePluginsAndGenerateFinalList(ctx, &t.cfg.TaskPlugins, t.pluginRegistry)
enabledPlugins, defaultForTaskTypes, err := WranglePluginsAndGenerateFinalList(ctx, &t.cfg.TaskPlugins, t.pluginRegistry, t.kubeClientset)
if err != nil {
logger.Errorf(ctx, "Failed to finalize enabled plugins. Error: %s", err)
return err
Expand Down Expand Up @@ -533,13 +536,6 @@ func (t Handler) Handle(ctx context.Context, nCtx interfaces.NodeExecutionContex
ts := nCtx.NodeStateReader().GetTaskNodeState()

pluginTrns := &pluginRequestedTransition{}
defer func() {
// increment parallelism if the final pluginTrns is not in a terminal state
if pluginTrns != nil && !pluginTrns.pInfo.Phase().IsTerminal() {
eCtx := nCtx.ExecutionContext()
logger.Infof(ctx, "Parallelism now set to [%d].", eCtx.IncrementParallelism())
}
}()

// We will start with the assumption that catalog is disabled
pluginTrns.PopulateCacheInfo(catalog.NewFailedCatalogEntry(catalog.NewStatus(core.CatalogCacheStatus_CACHE_DISABLED, nil)))
Expand Down Expand Up @@ -664,6 +660,10 @@ func (t Handler) Handle(ctx context.Context, nCtx interfaces.NodeExecutionContex
return handler.UnknownTransition, err
}

// increment parallelism if the final pluginTrns is not in a terminal state
if pluginTrns != nil && !pluginTrns.pInfo.Phase().IsTerminal() {
logger.Infof(ctx, "Parallelism now set to [%d].", nCtx.ExecutionContext().IncrementParallelism())
}
return pluginTrns.FinalTransition(ctx)
}

Expand Down Expand Up @@ -840,7 +840,8 @@ func (t Handler) Finalize(ctx context.Context, nCtx interfaces.NodeExecutionCont
}()
}

func New(ctx context.Context, kubeClient executors.Client, client catalog.Client, eventConfig *controllerConfig.EventConfig, clusterID string, scope promutils.Scope) (*Handler, error) {
func New(ctx context.Context, kubeClient executors.Client, kubeClientset kubernetes.Interface, client catalog.Client,
eventConfig *controllerConfig.EventConfig, clusterID string, scope promutils.Scope) (*Handler, error) {
// TODO New should take a pointer
async, err := catalog.NewAsyncClient(client, *catalog.GetConfig(), scope.NewSubScope("async_catalog"))
if err != nil {
Expand All @@ -866,6 +867,7 @@ func New(ctx context.Context, kubeClient executors.Client, client catalog.Client
},
pluginScope: scope.NewSubScope("plugin"),
kubeClient: kubeClient,
kubeClientset: kubeClientset,
catalog: client,
asyncCatalog: async,
resourceManager: nil,
Expand Down
51 changes: 26 additions & 25 deletions flytepropeller/pkg/controller/nodes/task/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,50 +6,48 @@ import (
"fmt"
"testing"

"github.com/flyteorg/flyteidl/clients/go/coreutils"
"github.com/golang/protobuf/proto"

eventsErr "github.com/flyteorg/flyte/flytepropeller/events/errors"

pluginK8sMocks "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/k8s/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
v1 "k8s.io/api/core/v1"
v12 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
k8sfake "k8s.io/client-go/kubernetes/fake"

"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flyteidl/clients/go/coreutils"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"

"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/ioutils"

"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task/resourcemanager"

"github.com/flyteorg/flyte/flytestdlib/contextutils"
"github.com/flyteorg/flyte/flytestdlib/promutils/labeled"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"

"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery"
pluginCatalogMocks "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/catalog/mocks"
pluginCore "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
pluginCoreMocks "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core/mocks"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/io"
ioMocks "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/io/mocks"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/ioutils"
pluginK8s "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/k8s"
controllerConfig "github.com/flyteorg/flyte/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytestdlib/storage"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
v1 "k8s.io/api/core/v1"
v12 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
pluginK8sMocks "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/k8s/mocks"

eventsErr "github.com/flyteorg/flyte/flytepropeller/events/errors"
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
flyteMocks "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks"
controllerConfig "github.com/flyteorg/flyte/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/executors/mocks"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/handler"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/interfaces"
nodeMocks "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/interfaces/mocks"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task/codex"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task/config"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task/fakeplugins"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task/resourcemanager"
rmConfig "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task/resourcemanager/config"

"github.com/flyteorg/flyte/flytestdlib/contextutils"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytestdlib/promutils/labeled"
"github.com/flyteorg/flyte/flytestdlib/storage"
)

var eventConfig = &controllerConfig.EventConfig{
Expand Down Expand Up @@ -242,12 +240,13 @@ func Test_task_Setup(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
sCtx := &nodeMocks.SetupContext{}
fakeKubeClient := mocks.NewFakeKubeClient()
mockClientset := k8sfake.NewSimpleClientset()
sCtx.On("KubeClient").Return(fakeKubeClient)
sCtx.On("OwnerKind").Return("test")
sCtx.On("EnqueueOwner").Return(pluginCore.EnqueueOwner(func(name types.NamespacedName) error { return nil }))
sCtx.On("MetricsScope").Return(promutils.NewTestScope())

tk, err := New(context.TODO(), mocks.NewFakeKubeClient(), &pluginCatalogMocks.Client{}, eventConfig, testClusterID, promutils.NewTestScope())
tk, err := New(context.TODO(), fakeKubeClient, mockClientset, &pluginCatalogMocks.Client{}, eventConfig, testClusterID, promutils.NewTestScope())
tk.cfg.TaskPlugins.EnabledPlugins = tt.enabledPlugins
tk.cfg.TaskPlugins.DefaultForTaskTypes = tt.defaultForTaskTypes
assert.NoError(t, err)
Expand Down Expand Up @@ -1226,7 +1225,8 @@ func Test_task_Finalize(t *testing.T) {

catalog := &pluginCatalogMocks.Client{}
m := tt.fields.defaultPluginCallback()
tk, err := New(context.TODO(), mocks.NewFakeKubeClient(), catalog, eventConfig, testClusterID, promutils.NewTestScope())
mockClientset := k8sfake.NewSimpleClientset()
tk, err := New(context.TODO(), mocks.NewFakeKubeClient(), mockClientset, catalog, eventConfig, testClusterID, promutils.NewTestScope())
assert.NoError(t, err)
tk.defaultPlugin = m
tk.resourceManager = noopRm
Expand All @@ -1245,7 +1245,8 @@ func Test_task_Finalize(t *testing.T) {
}

func TestNew(t *testing.T) {
got, err := New(context.TODO(), mocks.NewFakeKubeClient(), &pluginCatalogMocks.Client{}, eventConfig, testClusterID, promutils.NewTestScope())
mockClientset := k8sfake.NewSimpleClientset()
got, err := New(context.TODO(), mocks.NewFakeKubeClient(), mockClientset, &pluginCatalogMocks.Client{}, eventConfig, testClusterID, promutils.NewTestScope())
assert.NoError(t, err)
assert.NotNil(t, got)
assert.NotNil(t, got.defaultPlugins)
Expand Down
Loading

0 comments on commit 57c0c4b

Please sign in to comment.