Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Use pluginCore LoadPlugin
Browse files Browse the repository at this point in the history
Signed-off-by: Filipe Regadas <[email protected]>
  • Loading branch information
regadas committed Mar 24, 2021
1 parent 5340d24 commit c59294e
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 31 deletions.
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,7 @@ require (
sigs.k8s.io/controller-runtime v0.8.2
)

replace github.com/aws/amazon-sagemaker-operator-for-k8s => github.com/aws/amazon-sagemaker-operator-for-k8s v1.0.1-0.20210303003444-0fb33b1fd49d
replace (
github.com/aws/amazon-sagemaker-operator-for-k8s => github.com/aws/amazon-sagemaker-operator-for-k8s v1.0.1-0.20210303003444-0fb33b1fd49d
github.com/flyteorg/flyteplugins => ../flyteplugins
)
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ github.com/Azure/go-autorest/logger v0.2.0/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ
github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk=
github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo=
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295 h1:xJ0dAkuxJXfwdH7IaSzBEbSQxEDz36YUmt7+CB4zoNA=
Expand Down Expand Up @@ -234,8 +233,6 @@ github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4
github.com/flyteorg/flyteidl v0.18.17/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI=
github.com/flyteorg/flyteidl v0.18.24 h1:Y4+y/tu6Qsb3jNXxuVsflycfSocfthUi6XsMgJTfGuc=
github.com/flyteorg/flyteidl v0.18.24/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI=
github.com/flyteorg/flyteplugins v0.5.38 h1:xAQ1J23cRxzwNDgzbmRuuvflq2PFetntRCjuM5RBfTw=
github.com/flyteorg/flyteplugins v0.5.38/go.mod h1:CxerBGWWEmNYmPxSMHnwQEr9cc1Fbo/g5fcABazU6Jo=
github.com/flyteorg/flytestdlib v0.3.13 h1:5ioA/q3ixlyqkFh5kDaHgmPyTP/AHtqq1K/TIbVLUzM=
github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk=
Expand Down Expand Up @@ -1230,7 +1227,6 @@ honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4 h1:UoveltGrhghAA7ePc+e+QYDHXrBps2PqFZiHkGR/xK8=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
k8s.io/api v0.0.0-20210217171935-8e2decd92398/go.mod h1:60tmSUpHxGPFerNHbo/ayI2lKxvtrhbxFyXuEIWJd78=
k8s.io/api v0.18.2/go.mod h1:SJCWI7OLzhZSvbY7U8zwNl9UA4o1fizoug34OV/2r78=
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (t *Handler) Setup(ctx context.Context, sCtx handler.SetupContext) error {
sCtxFinal := newNameSpacedSetupCtx(
tSCtx, newResourceManagerBuilder.GetResourceRegistrar(pluginResourceNamespacePrefix))
logger.Infof(ctx, "Loading Plugin [%s] ENABLED", p.ID)
cp, err := p.LoadPlugin(ctx, sCtxFinal)
cp, err := pluginCore.LoadPlugin(ctx, sCtxFinal, p)
if err != nil {
return regErrors.Wrapf(err, "failed to load plugin - %s", p.ID)
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/nodes/task/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,16 +106,20 @@ func Test_task_Setup(t *testing.T) {
corePluginType := "core"
corePlugin := &pluginCoreMocks.Plugin{}
corePlugin.On("GetID").Return(corePluginType)
corePlugin.OnGetProperties().Return(pluginCore.PluginProperties{})

corePluginDefaultType := "coredefault"
corePluginDefault := &pluginCoreMocks.Plugin{}
corePluginDefault.On("GetID").Return(corePluginDefaultType)
corePluginDefault.OnGetProperties().Return(pluginCore.PluginProperties{})

k8sPluginType := "k8s"
k8sPlugin := &pluginK8sMocks.Plugin{}
k8sPlugin.OnGetProperties().Return(pluginK8s.PluginProperties{})

k8sPluginDefaultType := "k8sdefault"
k8sPluginDefault := &pluginK8sMocks.Plugin{}
k8sPluginDefault.OnGetProperties().Return(pluginK8s.PluginProperties{})

corePluginEntry := pluginCore.PluginEntry{
ID: corePluginType,
Expand Down
31 changes: 13 additions & 18 deletions pkg/controller/nodes/task/k8s/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,8 @@ type PluginManager struct {
kubeClient pluginsCore.KubeClient
metrics PluginMetrics
// Per namespace-resource
backOffController *backoff.Controller
resourceLevelMonitor *ResourceLevelMonitor
disableInjectOwnerReferences bool
disableInjectFinalizer bool
GeneratedNameMaxLength *int
backOffController *backoff.Controller
resourceLevelMonitor *ResourceLevelMonitor
}

func (e *PluginManager) AddObjectMetadata(taskCtx pluginsCore.TaskExecutionMetadata, o client.Object, cfg *config.K8sPluginConfig) {
Expand All @@ -110,19 +107,20 @@ func (e *PluginManager) AddObjectMetadata(taskCtx pluginsCore.TaskExecutionMetad
o.SetLabels(utils.UnionMaps(o.GetLabels(), utils.CopyMap(taskCtx.GetLabels()), cfg.DefaultLabels))
o.SetName(taskCtx.GetTaskExecutionID().GetGeneratedName())

if !e.disableInjectOwnerReferences {
if !e.plugin.GetProperties().DisableInjectOwnerReferences {
o.SetOwnerReferences([]metav1.OwnerReference{taskCtx.GetOwnerReference()})
}

if cfg.InjectFinalizer && !e.disableInjectFinalizer {
if cfg.InjectFinalizer && !e.plugin.GetProperties().DisableInjectFinalizer {
f := append(o.GetFinalizers(), finalizer)
o.SetFinalizers(f)
}
}

func (e *PluginManager) GetProperties() pluginsCore.PluginProperties {
props := e.plugin.GetProperties()
return pluginsCore.PluginProperties{
GeneratedNameMaxLength: e.GeneratedNameMaxLength,
GeneratedNameMaxLength: props.GeneratedNameMaxLength,
}
}

Expand Down Expand Up @@ -435,7 +433,7 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry
}

workflowParentPredicate := func(o metav1.Object) bool {
if entry.DisableInjectOwnerReferences {
if entry.Plugin.GetProperties().DisableInjectOwnerReferences {
return true
}

Expand Down Expand Up @@ -529,15 +527,12 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry
rm.RunCollectorOnce(ctx)

return &PluginManager{
id: entry.ID,
plugin: entry.Plugin,
resourceToWatch: entry.ResourceToWatch,
metrics: newPluginMetrics(metricsScope),
kubeClient: kubeClient,
resourceLevelMonitor: rm,
disableInjectOwnerReferences: entry.DisableInjectOwnerReferences,
disableInjectFinalizer: entry.DisableInjectFinalizer,
GeneratedNameMaxLength: entry.GeneratedNameMaxLength,
id: entry.ID,
plugin: entry.Plugin,
resourceToWatch: entry.ResourceToWatch,
metrics: newPluginMetrics(metricsScope),
kubeClient: kubeClient,
resourceLevelMonitor: rm,
}, nil
}

Expand Down
29 changes: 25 additions & 4 deletions pkg/controller/nodes/task/k8s/plugin_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func (e extendedFakeClient) Delete(ctx context.Context, obj client.Object, opts
type k8sSampleHandler struct {
}

func (k8sSampleHandler) GetProperties() k8s.PluginProperties {
panic("implement me")
}

func (k8sSampleHandler) BuildResource(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext) (client.Object, error) {
panic("implement me")
}
Expand Down Expand Up @@ -208,6 +212,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) {
tCtx := getMockTaskContext(PluginPhaseNotStarted, PluginPhaseStarted)
// common setup code
mockResourceHandler := &pluginsk8sMock.Plugin{}
mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{})
mockResourceHandler.OnBuildResourceMatch(mock.Anything, mock.Anything).Return(&v1.Pod{}, nil)
fakeClient := fake.NewClientBuilder().WithRuntimeObjects().Build()
pluginManager, err := NewPluginManager(ctx, dummySetupContext(fakeClient), k8s.PluginEntry{
Expand Down Expand Up @@ -236,6 +241,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) {
tctx := getMockTaskContext(PluginPhaseNotStarted, PluginPhaseStarted)
// common setup code
mockResourceHandler := &pluginsk8sMock.Plugin{}
mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{})
mockResourceHandler.OnBuildResourceMatch(mock.Anything, mock.Anything).Return(&v1.Pod{}, nil)
fakeClient := fake.NewClientBuilder().WithRuntimeObjects().Build()
pluginManager, err := NewPluginManager(ctx, dummySetupContext(fakeClient), k8s.PluginEntry{
Expand Down Expand Up @@ -266,6 +272,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) {
tctx := getMockTaskContext(PluginPhaseNotStarted, PluginPhaseNotStarted)
// common setup code
mockResourceHandler := &pluginsk8sMock.Plugin{}
mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{})
mockResourceHandler.OnBuildResourceMatch(mock.Anything, mock.Anything).Return(&v1.Pod{}, nil)
fakeClient := extendedFakeClient{
Client: fake.NewClientBuilder().WithRuntimeObjects().Build(),
Expand Down Expand Up @@ -299,6 +306,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) {
tctx := getMockTaskContext(PluginPhaseNotStarted, PluginPhaseNotStarted)
// common setup code
mockResourceHandler := &pluginsk8sMock.Plugin{}
mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{})
mockResourceHandler.OnBuildResourceMatch(mock.Anything, mock.Anything).Return(&v1.Pod{}, nil)
fakeClient := extendedFakeClient{
Client: fake.NewClientBuilder().WithRuntimeObjects().Build(),
Expand Down Expand Up @@ -330,6 +338,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) {
tctx := getMockTaskContext(PluginPhaseNotStarted, PluginPhaseNotStarted)
// Creating a mock k8s plugin
mockResourceHandler := &pluginsk8sMock.Plugin{}
mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{})
mockResourceHandler.OnBuildResourceMatch(mock.Anything, mock.Anything).Return(&v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: flytek8s.PodKind,
Expand Down Expand Up @@ -397,6 +406,7 @@ func TestPluginManager_Abort(t *testing.T) {

// common setup code
mockResourceHandler := &pluginsk8sMock.Plugin{}
mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{})
mockResourceHandler.OnBuildIdentityResourceMatch(mock.Anything, tctx.TaskExecutionMetadata()).Return(&v1.Pod{}, nil)
mockResourceHandler.OnGetTaskPhaseMatch(mock.Anything, mock.Anything, mock.Anything).Return(pluginsCore.PhaseInfo{}, nil)
pluginManager, err := NewPluginManager(ctx, dummySetupContext(fc), k8s.PluginEntry{
Expand All @@ -417,6 +427,7 @@ func TestPluginManager_Abort(t *testing.T) {
fc := extendedFakeClient{Client: fake.NewFakeClientWithScheme(scheme.Scheme)}
// common setup code
mockResourceHandler := &pluginsk8sMock.Plugin{}
mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{})
mockResourceHandler.OnBuildIdentityResourceMatch(mock.Anything, tctx.TaskExecutionMetadata()).Return(&v1.Pod{}, nil)
mockResourceHandler.OnGetTaskPhaseMatch(mock.Anything, mock.Anything, mock.Anything).Return(pluginsCore.PhaseInfo{}, nil)
pluginManager, err := NewPluginManager(ctx, dummySetupContext(fc), k8s.PluginEntry{
Expand Down Expand Up @@ -538,6 +549,7 @@ func TestPluginManager_Handle_CheckResourceStatus(t *testing.T) {
fc := tt.args.fakeClient()
// common setup code
mockResourceHandler := &pluginsk8sMock.Plugin{}
mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{})
mockResourceHandler.On("BuildIdentityResource", mock.Anything, tctx.TaskExecutionMetadata()).Return(&v1.Pod{}, nil)
mockResourceHandler.On("GetTaskPhase", mock.Anything, mock.Anything, mock.Anything).Return(tt.args.getTaskPhaseCB())
pluginManager, err := NewPluginManager(ctx, dummySetupContext(fc), k8s.PluginEntry{
Expand Down Expand Up @@ -573,6 +585,7 @@ func TestPluginManager_CustomKubeClient(t *testing.T) {
tctx := getMockTaskContext(PluginPhaseNotStarted, PluginPhaseStarted)
// common setup code
mockResourceHandler := &pluginsk8sMock.Plugin{}
mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{})
mockResourceHandler.On("BuildResource", mock.Anything, tctx).Return(&v1.Pod{}, nil)
fakeClient := fake.NewClientBuilder().Build()
newFakeClient := &pluginsCoreMock.KubeClient{}
Expand Down Expand Up @@ -601,7 +614,9 @@ func TestPluginManager_AddObjectMetadata(t *testing.T) {

t.Run("default", func(t *testing.T) {
o := &v1.Pod{}
pluginManager := PluginManager{}
p := pluginsk8sMock.Plugin{}
p.OnGetProperties().Return(k8s.PluginProperties{})
pluginManager := PluginManager{plugin: &p}
pluginManager.AddObjectMetadata(tm, o, cfg)
assert.Equal(t, genName, o.GetName())
assert.Equal(t, []v12.OwnerReference{or}, o.GetOwnerReferences())
Expand All @@ -615,7 +630,9 @@ func TestPluginManager_AddObjectMetadata(t *testing.T) {
})

t.Run("Disable OwnerReferences injection", func(t *testing.T) {
pluginManager := PluginManager{disableInjectOwnerReferences: true}
p := pluginsk8sMock.Plugin{}
p.OnGetProperties().Return(k8s.PluginProperties{DisableInjectOwnerReferences: true})
pluginManager := PluginManager{plugin: &p}
o := &v1.Pod{}
pluginManager.AddObjectMetadata(tm, o, cfg)
assert.Equal(t, genName, o.GetName())
Expand All @@ -631,7 +648,9 @@ func TestPluginManager_AddObjectMetadata(t *testing.T) {
})

t.Run("Disable enabled InjectFinalizer", func(t *testing.T) {
pluginManager := PluginManager{disableInjectFinalizer: true}
p := pluginsk8sMock.Plugin{}
p.OnGetProperties().Return(k8s.PluginProperties{DisableInjectFinalizer: true})
pluginManager := PluginManager{plugin: &p}
// enable finalizer injection
cfg.InjectFinalizer = true
o := &v1.Pod{}
Expand All @@ -649,7 +668,9 @@ func TestPluginManager_AddObjectMetadata(t *testing.T) {
})

t.Run("Disable disabled InjectFinalizer", func(t *testing.T) {
pluginManager := PluginManager{disableInjectFinalizer: true}
p := pluginsk8sMock.Plugin{}
p.OnGetProperties().Return(k8s.PluginProperties{DisableInjectFinalizer: true})
pluginManager := PluginManager{plugin: &p}
// disable finalizer injection
cfg.InjectFinalizer = false
o := &v1.Pod{}
Expand Down
3 changes: 0 additions & 3 deletions pkg/controller/nodes/task/taskexec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,6 @@ func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx handler.Node
length := IDMaxLength
if l := plugin.GetProperties().GeneratedNameMaxLength; l != nil {
length = *l
if length <= 0 {
return nil, errors.Errorf(errors.RuntimeExecutionError, nCtx.NodeID(), "GeneratedNameMaxLength shoud greater then 0")
}
}

uniqueID, err := utils.FixedLengthUniqueIDForParts(length, nCtx.NodeExecutionMetadata().GetOwnerID().Name, currentNodeUniqueID, strconv.Itoa(int(id.RetryAttempt)))
Expand Down

0 comments on commit c59294e

Please sign in to comment.