diff --git a/pkg/apis/scheduledworkflow/v1alpha1/types.go b/pkg/apis/scheduledworkflow/v1alpha1/types.go index d977d093d52..97abad6da9e 100644 --- a/pkg/apis/scheduledworkflow/v1alpha1/types.go +++ b/pkg/apis/scheduledworkflow/v1alpha1/types.go @@ -18,7 +18,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" "k8s.io/apimachinery/pkg/types" - api "k8s.io/kubernetes/pkg/apis/core" + "k8s.io/kubernetes/pkg/apis/core" ) // +genclient @@ -169,10 +169,10 @@ const ( ) type ScheduledWorkflowCondition struct { - // Type of job condition, Complete or Failed. + // Type of job condition. Type ScheduledWorkflowConditionType `json:"type,omitempty"` // Status of the condition, one of True, False, Unknown. - Status api.ConditionStatus `json:"status,omitempty"` + Status core.ConditionStatus `json:"status,omitempty"` // Last time the condition was checked. // +optional LastProbeTime metav1.Time `json:"lastHeartbeatTime,omitempty"` diff --git a/pkg/client/clientset/versioned/typed/schedule/v1alpha1/doc.go b/pkg/client/clientset/versioned/typed/schedule/v1alpha1/doc.go deleted file mode 100644 index 7c6f02e5302..00000000000 --- a/pkg/client/clientset/versioned/typed/schedule/v1alpha1/doc.go +++ /dev/null @@ -1,18 +0,0 @@ -// Copyright 2018 The Kubeflow Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Code generated by client-gen. DO NOT EDIT. - -// This package has the automatically generated typed clients. -package v1alpha1 diff --git a/pkg/client/clientset/versioned/typed/schedule/v1alpha1/fake/fake_schedule.go b/pkg/client/clientset/versioned/typed/schedule/v1alpha1/fake/fake_schedule.go deleted file mode 100644 index bc719b10cc7..00000000000 --- a/pkg/client/clientset/versioned/typed/schedule/v1alpha1/fake/fake_schedule.go +++ /dev/null @@ -1,126 +0,0 @@ -// Copyright 2018 The Kubeflow Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Code generated by client-gen. DO NOT EDIT. - -package fake - -import ( - v1alpha1 "github.com/kubeflow/pipelines/pkg/apis/scheduledworkflow/v1alpha1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - labels "k8s.io/apimachinery/pkg/labels" - schema "k8s.io/apimachinery/pkg/runtime/schema" - types "k8s.io/apimachinery/pkg/types" - watch "k8s.io/apimachinery/pkg/watch" - testing "k8s.io/client-go/testing" -) - -// FakeSchedules implements ScheduleInterface -type FakeSchedules struct { - Fake *FakeScheduleV1alpha1 - ns string -} - -var schedulesResource = schema.GroupVersionResource{Group: "schedule.kubeflow.org", Version: "v1alpha1", Resource: "schedules"} - -var schedulesKind = schema.GroupVersionKind{Group: "schedule.kubeflow.org", Version: "v1alpha1", Kind: "ScheduledWorkflow"} - -// Get takes name of the schedule, and returns the corresponding schedule object, and an error if there is any. -func (c *FakeSchedules) Get(name string, options v1.GetOptions) (result *v1alpha1.ScheduledWorkflow, err error) { - obj, err := c.Fake. - Invokes(testing.NewGetAction(schedulesResource, c.ns, name), &v1alpha1.ScheduledWorkflow{}) - - if obj == nil { - return nil, err - } - return obj.(*v1alpha1.ScheduledWorkflow), err -} - -// List takes label and field selectors, and returns the list of Schedules that match those selectors. -func (c *FakeSchedules) List(opts v1.ListOptions) (result *v1alpha1.ScheduledWorkflowList, err error) { - obj, err := c.Fake. - Invokes(testing.NewListAction(schedulesResource, schedulesKind, c.ns, opts), &v1alpha1.ScheduledWorkflowList{}) - - if obj == nil { - return nil, err - } - - label, _, _ := testing.ExtractFromListOptions(opts) - if label == nil { - label = labels.Everything() - } - list := &v1alpha1.ScheduledWorkflowList{ListMeta: obj.(*v1alpha1.ScheduledWorkflowList).ListMeta} - for _, item := range obj.(*v1alpha1.ScheduledWorkflowList).Items { - if label.Matches(labels.Set(item.Labels)) { - list.Items = append(list.Items, item) - } - } - return list, err -} - -// Watch returns a watch.Interface that watches the requested schedules. -func (c *FakeSchedules) Watch(opts v1.ListOptions) (watch.Interface, error) { - return c.Fake. - InvokesWatch(testing.NewWatchAction(schedulesResource, c.ns, opts)) - -} - -// Create takes the representation of a schedule and creates it. Returns the server's representation of the schedule, and an error, if there is any. -func (c *FakeSchedules) Create(schedule *v1alpha1.ScheduledWorkflow) (result *v1alpha1.ScheduledWorkflow, err error) { - obj, err := c.Fake. - Invokes(testing.NewCreateAction(schedulesResource, c.ns, schedule), &v1alpha1.ScheduledWorkflow{}) - - if obj == nil { - return nil, err - } - return obj.(*v1alpha1.ScheduledWorkflow), err -} - -// Update takes the representation of a schedule and updates it. Returns the server's representation of the schedule, and an error, if there is any. -func (c *FakeSchedules) Update(schedule *v1alpha1.ScheduledWorkflow) (result *v1alpha1.ScheduledWorkflow, err error) { - obj, err := c.Fake. - Invokes(testing.NewUpdateAction(schedulesResource, c.ns, schedule), &v1alpha1.ScheduledWorkflow{}) - - if obj == nil { - return nil, err - } - return obj.(*v1alpha1.ScheduledWorkflow), err -} - -// Delete takes name of the schedule and deletes it. Returns an error if one occurs. -func (c *FakeSchedules) Delete(name string, options *v1.DeleteOptions) error { - _, err := c.Fake. - Invokes(testing.NewDeleteAction(schedulesResource, c.ns, name), &v1alpha1.ScheduledWorkflow{}) - - return err -} - -// DeleteCollection deletes a collection of objects. -func (c *FakeSchedules) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error { - action := testing.NewDeleteCollectionAction(schedulesResource, c.ns, listOptions) - - _, err := c.Fake.Invokes(action, &v1alpha1.ScheduledWorkflowList{}) - return err -} - -// Patch applies the patch and returns the patched schedule. -func (c *FakeSchedules) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.ScheduledWorkflow, err error) { - obj, err := c.Fake. - Invokes(testing.NewPatchSubresourceAction(schedulesResource, c.ns, name, data, subresources...), &v1alpha1.ScheduledWorkflow{}) - - if obj == nil { - return nil, err - } - return obj.(*v1alpha1.ScheduledWorkflow), err -} diff --git a/pkg/client/clientset/versioned/typed/schedule/v1alpha1/schedule.go b/pkg/client/clientset/versioned/typed/schedule/v1alpha1/schedule.go deleted file mode 100644 index 4bf7a1de50c..00000000000 --- a/pkg/client/clientset/versioned/typed/schedule/v1alpha1/schedule.go +++ /dev/null @@ -1,155 +0,0 @@ -// Copyright 2018 The Kubeflow Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Code generated by client-gen. DO NOT EDIT. - -package v1alpha1 - -import ( - v1alpha1 "github.com/kubeflow/pipelines/pkg/apis/scheduledworkflow/v1alpha1" - scheme "github.com/kubeflow/pipelines/pkg/client/clientset/versioned/scheme" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - types "k8s.io/apimachinery/pkg/types" - watch "k8s.io/apimachinery/pkg/watch" - rest "k8s.io/client-go/rest" -) - -// SchedulesGetter has a method to return a ScheduleInterface. -// A group's client should implement this interface. -type SchedulesGetter interface { - Schedules(namespace string) ScheduleInterface -} - -// ScheduleInterface has methods to work with ScheduledWorkflow resources. -type ScheduleInterface interface { - Create(*v1alpha1.ScheduledWorkflow) (*v1alpha1.ScheduledWorkflow, error) - Update(*v1alpha1.ScheduledWorkflow) (*v1alpha1.ScheduledWorkflow, error) - Delete(name string, options *v1.DeleteOptions) error - DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error - Get(name string, options v1.GetOptions) (*v1alpha1.ScheduledWorkflow, error) - List(opts v1.ListOptions) (*v1alpha1.ScheduledWorkflowList, error) - Watch(opts v1.ListOptions) (watch.Interface, error) - Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.ScheduledWorkflow, err error) - ScheduleExpansion -} - -// schedules implements ScheduleInterface -type schedules struct { - client rest.Interface - ns string -} - -// newSchedules returns a Schedules -func newSchedules(c *ScheduleV1alpha1Client, namespace string) *schedules { - return &schedules{ - client: c.RESTClient(), - ns: namespace, - } -} - -// Get takes name of the schedule, and returns the corresponding schedule object, and an error if there is any. -func (c *schedules) Get(name string, options v1.GetOptions) (result *v1alpha1.ScheduledWorkflow, err error) { - result = &v1alpha1.ScheduledWorkflow{} - err = c.client.Get(). - Namespace(c.ns). - Resource("schedules"). - Name(name). - VersionedParams(&options, scheme.ParameterCodec). - Do(). - Into(result) - return -} - -// List takes label and field selectors, and returns the list of Schedules that match those selectors. -func (c *schedules) List(opts v1.ListOptions) (result *v1alpha1.ScheduledWorkflowList, err error) { - result = &v1alpha1.ScheduledWorkflowList{} - err = c.client.Get(). - Namespace(c.ns). - Resource("schedules"). - VersionedParams(&opts, scheme.ParameterCodec). - Do(). - Into(result) - return -} - -// Watch returns a watch.Interface that watches the requested schedules. -func (c *schedules) Watch(opts v1.ListOptions) (watch.Interface, error) { - opts.Watch = true - return c.client.Get(). - Namespace(c.ns). - Resource("schedules"). - VersionedParams(&opts, scheme.ParameterCodec). - Watch() -} - -// Create takes the representation of a schedule and creates it. Returns the server's representation of the schedule, and an error, if there is any. -func (c *schedules) Create(schedule *v1alpha1.ScheduledWorkflow) (result *v1alpha1.ScheduledWorkflow, err error) { - result = &v1alpha1.ScheduledWorkflow{} - err = c.client.Post(). - Namespace(c.ns). - Resource("schedules"). - Body(schedule). - Do(). - Into(result) - return -} - -// Update takes the representation of a schedule and updates it. Returns the server's representation of the schedule, and an error, if there is any. -func (c *schedules) Update(schedule *v1alpha1.ScheduledWorkflow) (result *v1alpha1.ScheduledWorkflow, err error) { - result = &v1alpha1.ScheduledWorkflow{} - err = c.client.Put(). - Namespace(c.ns). - Resource("schedules"). - Name(schedule.Name). - Body(schedule). - Do(). - Into(result) - return -} - -// Delete takes name of the schedule and deletes it. Returns an error if one occurs. -func (c *schedules) Delete(name string, options *v1.DeleteOptions) error { - return c.client.Delete(). - Namespace(c.ns). - Resource("schedules"). - Name(name). - Body(options). - Do(). - Error() -} - -// DeleteCollection deletes a collection of objects. -func (c *schedules) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error { - return c.client.Delete(). - Namespace(c.ns). - Resource("schedules"). - VersionedParams(&listOptions, scheme.ParameterCodec). - Body(options). - Do(). - Error() -} - -// Patch applies the patch and returns the patched schedule. -func (c *schedules) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.ScheduledWorkflow, err error) { - result = &v1alpha1.ScheduledWorkflow{} - err = c.client.Patch(pt). - Namespace(c.ns). - Resource("schedules"). - SubResource(subresources...). - Name(name). - Body(data). - Do(). - Into(result) - return -} diff --git a/pkg/client/clientset/versioned/typed/schedule/v1alpha1/schedule_client.go b/pkg/client/clientset/versioned/typed/schedule/v1alpha1/schedule_client.go deleted file mode 100644 index f8d47e7bb51..00000000000 --- a/pkg/client/clientset/versioned/typed/schedule/v1alpha1/schedule_client.go +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright 2018 The Kubeflow Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Code generated by client-gen. DO NOT EDIT. - -package v1alpha1 - -import ( - v1alpha1 "github.com/kubeflow/pipelines/pkg/apis/scheduledworkflow/v1alpha1" - "github.com/kubeflow/pipelines/pkg/client/clientset/versioned/scheme" - serializer "k8s.io/apimachinery/pkg/runtime/serializer" - rest "k8s.io/client-go/rest" -) - -type ScheduleV1alpha1Interface interface { - RESTClient() rest.Interface - SchedulesGetter -} - -// ScheduleV1alpha1Client is used to interact with features provided by the schedule.kubeflow.org group. -type ScheduleV1alpha1Client struct { - restClient rest.Interface -} - -func (c *ScheduleV1alpha1Client) Schedules(namespace string) ScheduleInterface { - return newSchedules(c, namespace) -} - -// NewForConfig creates a new ScheduleV1alpha1Client for the given config. -func NewForConfig(c *rest.Config) (*ScheduleV1alpha1Client, error) { - config := *c - if err := setConfigDefaults(&config); err != nil { - return nil, err - } - client, err := rest.RESTClientFor(&config) - if err != nil { - return nil, err - } - return &ScheduleV1alpha1Client{client}, nil -} - -// NewForConfigOrDie creates a new ScheduleV1alpha1Client for the given config and -// panics if there is an error in the config. -func NewForConfigOrDie(c *rest.Config) *ScheduleV1alpha1Client { - client, err := NewForConfig(c) - if err != nil { - panic(err) - } - return client -} - -// New creates a new ScheduleV1alpha1Client for the given RESTClient. -func New(c rest.Interface) *ScheduleV1alpha1Client { - return &ScheduleV1alpha1Client{c} -} - -func setConfigDefaults(config *rest.Config) error { - gv := v1alpha1.SchemeGroupVersion - config.GroupVersion = &gv - config.APIPath = "/apis" - config.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: scheme.Codecs} - - if config.UserAgent == "" { - config.UserAgent = rest.DefaultKubernetesUserAgent() - } - - return nil -} - -// RESTClient returns a RESTClient that is used to communicate -// with API server by this client implementation. -func (c *ScheduleV1alpha1Client) RESTClient() rest.Interface { - if c == nil { - return nil - } - return c.restClient -} diff --git a/pkg/client/informers/externalversions/schedule/interface.go b/pkg/client/informers/externalversions/schedule/interface.go deleted file mode 100644 index 2034ecbe9a8..00000000000 --- a/pkg/client/informers/externalversions/schedule/interface.go +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright 2018 The Kubeflow Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Code generated by informer-gen. DO NOT EDIT. - -package schedule - -import ( - internalinterfaces "github.com/kubeflow/pipelines/pkg/client/informers/externalversions/internalinterfaces" - v1alpha1 "github.com/kubeflow/pipelines/pkg/client/informers/externalversions/schedule/v1alpha1" -) - -// Interface provides access to each of this group's versions. -type Interface interface { - // V1alpha1 provides access to shared informers for resources in V1alpha1. - V1alpha1() v1alpha1.Interface -} - -type group struct { - factory internalinterfaces.SharedInformerFactory - namespace string - tweakListOptions internalinterfaces.TweakListOptionsFunc -} - -// New returns a new Interface. -func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface { - return &group{factory: f, namespace: namespace, tweakListOptions: tweakListOptions} -} - -// V1alpha1 returns a new v1alpha1.Interface. -func (g *group) V1alpha1() v1alpha1.Interface { - return v1alpha1.New(g.factory, g.namespace, g.tweakListOptions) -} diff --git a/pkg/client/informers/externalversions/schedule/v1alpha1/interface.go b/pkg/client/informers/externalversions/schedule/v1alpha1/interface.go deleted file mode 100644 index 1d7b556232f..00000000000 --- a/pkg/client/informers/externalversions/schedule/v1alpha1/interface.go +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright 2018 The Kubeflow Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Code generated by informer-gen. DO NOT EDIT. - -package v1alpha1 - -import ( - internalinterfaces "github.com/kubeflow/pipelines/pkg/client/informers/externalversions/internalinterfaces" -) - -// Interface provides access to all the informers in this group version. -type Interface interface { - // Schedules returns a ScheduleInformer. - Schedules() ScheduleInformer -} - -type version struct { - factory internalinterfaces.SharedInformerFactory - namespace string - tweakListOptions internalinterfaces.TweakListOptionsFunc -} - -// New returns a new Interface. -func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface { - return &version{factory: f, namespace: namespace, tweakListOptions: tweakListOptions} -} - -// Schedules returns a ScheduleInformer. -func (v *version) Schedules() ScheduleInformer { - return &scheduleInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} -} diff --git a/pkg/client/informers/externalversions/schedule/v1alpha1/schedule.go b/pkg/client/informers/externalversions/schedule/v1alpha1/schedule.go deleted file mode 100644 index 331f3e3b515..00000000000 --- a/pkg/client/informers/externalversions/schedule/v1alpha1/schedule.go +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright 2018 The Kubeflow Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Code generated by informer-gen. DO NOT EDIT. - -package v1alpha1 - -import ( - time "time" - - schedule_v1alpha1 "github.com/kubeflow/pipelines/pkg/apis/scheduledworkflow/v1alpha1" - versioned "github.com/kubeflow/pipelines/pkg/client/clientset/versioned" - internalinterfaces "github.com/kubeflow/pipelines/pkg/client/informers/externalversions/internalinterfaces" - v1alpha1 "github.com/kubeflow/pipelines/pkg/client/listers/schedule/v1alpha1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - runtime "k8s.io/apimachinery/pkg/runtime" - watch "k8s.io/apimachinery/pkg/watch" - cache "k8s.io/client-go/tools/cache" -) - -// ScheduleInformer provides access to a shared informer and lister for -// Schedules. -type ScheduleInformer interface { - Informer() cache.SharedIndexInformer - Lister() v1alpha1.ScheduleLister -} - -type scheduleInformer struct { - factory internalinterfaces.SharedInformerFactory - tweakListOptions internalinterfaces.TweakListOptionsFunc - namespace string -} - -// NewScheduleInformer constructs a new informer for ScheduledWorkflow type. -// Always prefer using an informer factory to get a shared informer instead of getting an independent -// one. This reduces memory footprint and number of connections to the server. -func NewScheduleInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { - return NewFilteredScheduleInformer(client, namespace, resyncPeriod, indexers, nil) -} - -// NewFilteredScheduleInformer constructs a new informer for ScheduledWorkflow type. -// Always prefer using an informer factory to get a shared informer instead of getting an independent -// one. This reduces memory footprint and number of connections to the server. -func NewFilteredScheduleInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { - return cache.NewSharedIndexInformer( - &cache.ListWatch{ - ListFunc: func(options v1.ListOptions) (runtime.Object, error) { - if tweakListOptions != nil { - tweakListOptions(&options) - } - return client.ScheduleV1alpha1().Schedules(namespace).List(options) - }, - WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { - if tweakListOptions != nil { - tweakListOptions(&options) - } - return client.ScheduleV1alpha1().Schedules(namespace).Watch(options) - }, - }, - &schedule_v1alpha1.ScheduledWorkflow{}, - resyncPeriod, - indexers, - ) -} - -func (f *scheduleInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { - return NewFilteredScheduleInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) -} - -func (f *scheduleInformer) Informer() cache.SharedIndexInformer { - return f.factory.InformerFor(&schedule_v1alpha1.ScheduledWorkflow{}, f.defaultInformer) -} - -func (f *scheduleInformer) Lister() v1alpha1.ScheduleLister { - return v1alpha1.NewScheduleLister(f.Informer().GetIndexer()) -} diff --git a/pkg/client/listers/schedule/v1alpha1/schedule.go b/pkg/client/listers/schedule/v1alpha1/schedule.go deleted file mode 100644 index acad8437683..00000000000 --- a/pkg/client/listers/schedule/v1alpha1/schedule.go +++ /dev/null @@ -1,92 +0,0 @@ -// Copyright 2018 The Kubeflow Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Code generated by lister-gen. DO NOT EDIT. - -package v1alpha1 - -import ( - v1alpha1 "github.com/kubeflow/pipelines/pkg/apis/scheduledworkflow/v1alpha1" - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/client-go/tools/cache" -) - -// ScheduleLister helps list Schedules. -type ScheduleLister interface { - // List lists all Schedules in the indexer. - List(selector labels.Selector) (ret []*v1alpha1.ScheduledWorkflow, err error) - // Schedules returns an object that can list and get Schedules. - Schedules(namespace string) ScheduleNamespaceLister - ScheduleListerExpansion -} - -// scheduleLister implements the ScheduleLister interface. -type scheduleLister struct { - indexer cache.Indexer -} - -// NewScheduleLister returns a new ScheduleLister. -func NewScheduleLister(indexer cache.Indexer) ScheduleLister { - return &scheduleLister{indexer: indexer} -} - -// List lists all Schedules in the indexer. -func (s *scheduleLister) List(selector labels.Selector) (ret []*v1alpha1.ScheduledWorkflow, err error) { - err = cache.ListAll(s.indexer, selector, func(m interface{}) { - ret = append(ret, m.(*v1alpha1.ScheduledWorkflow)) - }) - return ret, err -} - -// Schedules returns an object that can list and get Schedules. -func (s *scheduleLister) Schedules(namespace string) ScheduleNamespaceLister { - return scheduleNamespaceLister{indexer: s.indexer, namespace: namespace} -} - -// ScheduleNamespaceLister helps list and get Schedules. -type ScheduleNamespaceLister interface { - // List lists all Schedules in the indexer for a given namespace. - List(selector labels.Selector) (ret []*v1alpha1.ScheduledWorkflow, err error) - // Get retrieves the ScheduledWorkflow from the indexer for a given namespace and name. - Get(name string) (*v1alpha1.ScheduledWorkflow, error) - ScheduleNamespaceListerExpansion -} - -// scheduleNamespaceLister implements the ScheduleNamespaceLister -// interface. -type scheduleNamespaceLister struct { - indexer cache.Indexer - namespace string -} - -// List lists all Schedules in the indexer for a given namespace. -func (s scheduleNamespaceLister) List(selector labels.Selector) (ret []*v1alpha1.ScheduledWorkflow, err error) { - err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) { - ret = append(ret, m.(*v1alpha1.ScheduledWorkflow)) - }) - return ret, err -} - -// Get retrieves the ScheduledWorkflow from the indexer for a given namespace and name. -func (s scheduleNamespaceLister) Get(name string) (*v1alpha1.ScheduledWorkflow, error) { - obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name) - if err != nil { - return nil, err - } - if !exists { - return nil, errors.NewNotFound(v1alpha1.Resource("schedule"), name) - } - return obj.(*v1alpha1.ScheduledWorkflow), nil -} diff --git a/resources/scheduledworkflow/client/kube_client.go b/resources/scheduledworkflow/client/kube_client.go new file mode 100644 index 00000000000..b29aba7e5ec --- /dev/null +++ b/resources/scheduledworkflow/client/kube_client.go @@ -0,0 +1,59 @@ +// Copyright 2018 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "fmt" + swfapi "github.com/kubeflow/pipelines/pkg/apis/scheduledworkflow/v1alpha1" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" + _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" + "k8s.io/client-go/tools/record" +) + +const ( + successSynced = "Synced" + failedSynced = "Failed" + messageResourceSuccessSynced = "Scheduled workflow synced successfull" + messageResourceFailedSynced = "Schedulde workflow synced failed" +) + +// KubeClient is a client to call the core Kubernetes APIs. +type KubeClient struct { + // The Kubernetes API client. + kubeClientSet kubernetes.Interface + // Recorder is an event recorder for recording Event resources to the Kubernetes API. + recorder record.EventRecorder +} + +// NewKubeClient creates a new client to call the core Kubernetes APIs. +func NewKubeClient(kubeClientSet kubernetes.Interface, recorder record.EventRecorder) *KubeClient { + return &KubeClient{ + kubeClientSet: kubeClientSet, + recorder: recorder, + } +} + +// RecordSyncSuccess records the success of a sync. +func (k *KubeClient) RecordSyncSuccess(swf *swfapi.ScheduledWorkflow, message string) { + k.recorder.Event(swf, corev1.EventTypeNormal, successSynced, + fmt.Sprintf("%v: %v", messageResourceSuccessSynced, message)) +} + +// RecordSyncFailure records the failure of a sync. +func (k *KubeClient) RecordSyncFailure(swf *swfapi.ScheduledWorkflow, message string) { + k.recorder.Event(swf, corev1.EventTypeWarning, failedSynced, + fmt.Sprintf("%v: %v", messageResourceFailedSynced, message)) +} diff --git a/resources/scheduledworkflow/client/swf_client.go b/resources/scheduledworkflow/client/swf_client.go new file mode 100644 index 00000000000..70bf5cca239 --- /dev/null +++ b/resources/scheduledworkflow/client/swf_client.go @@ -0,0 +1,66 @@ +// Copyright 2018 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + swfclientset "github.com/kubeflow/pipelines/pkg/client/clientset/versioned" + "github.com/kubeflow/pipelines/pkg/client/informers/externalversions/scheduledworkflow/v1alpha1" + "github.com/kubeflow/pipelines/resources/scheduledworkflow/util" + _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" + "k8s.io/client-go/tools/cache" +) + +// ScheduledWorkflowClient is a client to call the ScheduledWorkflow API. +type ScheduledWorkflowClient struct { + clientSet swfclientset.Interface + informer v1alpha1.ScheduledWorkflowInformer +} + +// NewScheduledWorkflowClient creates an instance of the client. +func NewScheduledWorkflowClient(clientSet swfclientset.Interface, + informer v1alpha1.ScheduledWorkflowInformer) *ScheduledWorkflowClient { + return &ScheduledWorkflowClient{ + clientSet: clientSet, + informer: informer, + } +} + +// AddEventHandler adds an event handler. +func (p *ScheduledWorkflowClient) AddEventHandler(funcs *cache.ResourceEventHandlerFuncs) { + p.informer.Informer().AddEventHandler(funcs) +} + +// HasSynced returns true if the shared informer's store has synced. +func (p *ScheduledWorkflowClient) HasSynced() func() bool { + return p.informer.Informer().HasSynced +} + +// Get returns a ScheduledWorkflow, given a namespace and a name. +func (p *ScheduledWorkflowClient) Get(namespace string, name string) (*util.ScheduledWorkflow, error) { + schedule, err := p.informer.Lister().ScheduledWorkflows(namespace).Get(name) + if err != nil { + return nil, err + } + + return util.NewScheduledWorkflow(schedule), nil +} + +// Update Updates a ScheduledWorkflow in the Kubernetes API server. +func (p *ScheduledWorkflowClient) Update(namespace string, + schedule *util.ScheduledWorkflow) error { + _, err := p.clientSet.ScheduledworkflowV1alpha1().ScheduledWorkflows(namespace). + Update(schedule.Get()) + return err +} diff --git a/resources/scheduledworkflow/client/workflow_client.go b/resources/scheduledworkflow/client/workflow_client.go new file mode 100644 index 00000000000..5ce1f6d43ee --- /dev/null +++ b/resources/scheduledworkflow/client/workflow_client.go @@ -0,0 +1,153 @@ +// Copyright 2018 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + workflowapi "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" + workflowclientset "github.com/argoproj/argo/pkg/client/clientset/versioned" + "github.com/argoproj/argo/pkg/client/informers/externalversions/workflow/v1alpha1" + swfapi "github.com/kubeflow/pipelines/pkg/apis/scheduledworkflow/v1alpha1" + "github.com/kubeflow/pipelines/resources/scheduledworkflow/util" + wraperror "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" + "k8s.io/client-go/tools/cache" + "time" +) + +// WorkflowClient is a client to call the Workflow API. +type WorkflowClient struct { + clientSet workflowclientset.Interface + informer v1alpha1.WorkflowInformer +} + +// NewWorkflowClient creates an instance of the WorkflowClient. +func NewWorkflowClient(clientSet workflowclientset.Interface, + informer v1alpha1.WorkflowInformer) *WorkflowClient { + return &WorkflowClient{ + clientSet: clientSet, + informer: informer, + } +} + +// AddEventHandler adds an event handler. +func (p *WorkflowClient) AddEventHandler(funcs *cache.ResourceEventHandlerFuncs) { + p.informer.Informer().AddEventHandler(funcs) +} + +// HasSynced returns true if the shared informer's store has synced. +func (p *WorkflowClient) HasSynced() func() bool { + return p.informer.Informer().HasSynced +} + +// Get returns a Workflow, given a namespace and name. +func (p *WorkflowClient) Get(namespace string, name string) ( + wf *util.Workflow, isNotFoundError bool, err error) { + workflow, err := p.informer.Lister().Workflows(namespace).Get(name) + if err != nil { + return nil, util.IsNotFound(err), wraperror.Wrapf(err, + "Error retrieving workflow (%v) in namespace (%v): %v", name, namespace, err) + } + return util.NewWorkflow(workflow), false, nil +} + +// List returns a list of workflows given the name of their ScheduledWorkflow, +// whether they are completed, and their minimum index (to avoid returning the whole list). +func (p *WorkflowClient) List(swfName string, completed bool, minIndex int64) ( + status []swfapi.WorkflowStatus, err error) { + + labelSelector := getLabelSelectorToGetWorkflows(swfName, completed, minIndex) + + workflows, err := p.informer.Lister().List(*labelSelector) + if err != nil { + return nil, wraperror.Wrapf(err, + "Could not retrieve workflows for scheduled workflow (%v): %v", swfName, err) + } + + result := toWorkflowStatuses(workflows) + + return result, nil +} + +func toWorkflowStatuses(workflows []*workflowapi.Workflow) []swfapi.WorkflowStatus { + result := make([]swfapi.WorkflowStatus, 0) + for _, workflow := range workflows { + result = append(result, *toWorkflowStatus(workflow)) + } + return result +} + +func toWorkflowStatus(workflow *workflowapi.Workflow) *swfapi.WorkflowStatus { + return &swfapi.WorkflowStatus{ + Name: workflow.Name, + Namespace: workflow.Namespace, + SelfLink: workflow.SelfLink, + UID: workflow.UID, + Phase: workflow.Status.Phase, + Message: workflow.Status.Message, + CreatedAt: workflow.CreationTimestamp, + StartedAt: workflow.Status.StartedAt, + FinishedAt: workflow.Status.FinishedAt, + ScheduledAt: retrieveScheduledTime(workflow), + Index: retrieveIndex(workflow), + } +} + +func retrieveScheduledTime(workflow *workflowapi.Workflow) metav1.Time { + value, ok := workflow.Labels[util.LabelKeyWorkflowEpoch] + if !ok { + return workflow.CreationTimestamp + } + result, err := util.RetrieveInt64FromLabel(value) + if err != nil { + return workflow.CreationTimestamp + } + return metav1.NewTime(time.Unix(result, 0).UTC()) +} + +func retrieveIndex(workflow *workflowapi.Workflow) int64 { + value, ok := workflow.Labels[util.LabelKeyWorkflowIndex] + if !ok { + return 0 + } + result, err := util.RetrieveInt64FromLabel(value) + if err != nil { + return 0 + } + return result +} + +// Create creates a workflow given a namespace and its specification. +func (p *WorkflowClient) Create(namespace string, workflow *util.Workflow) ( + *util.Workflow, error) { + result, err := p.clientSet.ArgoprojV1alpha1().Workflows(namespace).Create(workflow.Get()) + if err != nil { + return nil, wraperror.Wrapf(err, "Error creating workflow in namespace (%v): %v: %+v", namespace, + err, workflow.Get()) + } + return util.NewWorkflow(result), nil +} + +func getLabelSelectorToGetWorkflows(swfName string, completed bool, minIndex int64) *labels.Selector { + labelSelector := labels.NewSelector() + // The Argo workflow should be active or completed + labelSelector = labelSelector.Add(*util.GetRequirementForCompletedWorkflowOrFatal(completed)) + // The Argo workflow should be labelled with this scheduled workflow name. + labelSelector = labelSelector.Add(*util.GetRequirementForScheduleNameOrFatal(swfName)) + // The Argo workflow should have an index greater than... + labelSelector = labelSelector.Add(*util.GetRequirementForMinIndexOrFatal(minIndex)) + return &labelSelector +} diff --git a/resources/scheduledworkflow/client/workflow_client_test.go b/resources/scheduledworkflow/client/workflow_client_test.go new file mode 100644 index 00000000000..475ea8d18d3 --- /dev/null +++ b/resources/scheduledworkflow/client/workflow_client_test.go @@ -0,0 +1,218 @@ +// Copyright 2018 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + workflowapi "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" + workflowcommon "github.com/argoproj/argo/workflow/common" + swfapi "github.com/kubeflow/pipelines/pkg/apis/scheduledworkflow/v1alpha1" + "github.com/kubeflow/pipelines/resources/scheduledworkflow/util" + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" + "testing" + "time" +) + +func TestToWorkflowStatuses(t *testing.T) { + workflow := &workflowapi.Workflow{ + ObjectMeta: metav1.ObjectMeta{ + Name: "WORKFLOW_NAME", + Namespace: "NAMESPACE", + SelfLink: "SELF_LINK", + UID: "UID", + CreationTimestamp: metav1.NewTime(time.Unix(50, 0).UTC()), + Labels: map[string]string{ + util.LabelKeyWorkflowEpoch: "54", + util.LabelKeyWorkflowIndex: "55", + }, + }, + Status: workflowapi.WorkflowStatus{ + Phase: workflowapi.NodeRunning, + Message: "WORKFLOW_MESSAGE", + StartedAt: metav1.NewTime(time.Unix(51, 0).UTC()), + FinishedAt: metav1.NewTime(time.Unix(52, 0).UTC()), + }, + } + + result := toWorkflowStatuses([]*workflowapi.Workflow{workflow}) + + expected := &swfapi.WorkflowStatus{ + Name: "WORKFLOW_NAME", + Namespace: "NAMESPACE", + SelfLink: "SELF_LINK", + UID: "UID", + Phase: workflowapi.NodeRunning, + Message: "WORKFLOW_MESSAGE", + CreatedAt: metav1.NewTime(time.Unix(50, 0).UTC()), + StartedAt: metav1.NewTime(time.Unix(51, 0).UTC()), + FinishedAt: metav1.NewTime(time.Unix(52, 0).UTC()), + ScheduledAt: metav1.NewTime(time.Unix(54, 0).UTC()), + Index: 55, + } + + assert.Equal(t, []swfapi.WorkflowStatus{*expected}, result) +} + +func TestToWorkflowStatuses_NullOrEmpty(t *testing.T) { + workflow := &workflowapi.Workflow{} + + result := toWorkflowStatuses([]*workflowapi.Workflow{workflow}) + + expected := &swfapi.WorkflowStatus{ + Name: "", + Namespace: "", + SelfLink: "", + UID: "", + Phase: "", + Message: "", + CreatedAt: metav1.NewTime(time.Time{}.UTC()), + StartedAt: metav1.NewTime(time.Time{}.UTC()), + FinishedAt: metav1.NewTime(time.Time{}.UTC()), + ScheduledAt: metav1.NewTime(time.Time{}.UTC()), + Index: 0, + } + + assert.Equal(t, []swfapi.WorkflowStatus{*expected}, result) +} + +func TestRetrieveScheduledTime(t *testing.T) { + + // Base case. + workflow := &workflowapi.Workflow{ + ObjectMeta: metav1.ObjectMeta{ + CreationTimestamp: metav1.NewTime(time.Unix(50, 0).UTC()), + Labels: map[string]string{ + util.LabelKeyWorkflowEpoch: "54", + }, + }, + } + result := retrieveScheduledTime(workflow) + assert.Equal(t, metav1.NewTime(time.Unix(54, 0).UTC()), result) + + // No label + workflow = &workflowapi.Workflow{ + ObjectMeta: metav1.ObjectMeta{ + CreationTimestamp: metav1.NewTime(time.Unix(50, 0).UTC()), + Labels: map[string]string{ + "WRONG_LABEL": "54", + }, + }, + } + result = retrieveScheduledTime(workflow) + assert.Equal(t, metav1.NewTime(time.Unix(50, 0).UTC()), result) + + // Parsing problem + workflow = &workflowapi.Workflow{ + ObjectMeta: metav1.ObjectMeta{ + CreationTimestamp: metav1.NewTime(time.Unix(50, 0).UTC()), + Labels: map[string]string{ + util.LabelKeyWorkflowEpoch: "UNPARSABLE_@%^%@^#%", + }, + }, + } + result = retrieveScheduledTime(workflow) + assert.Equal(t, metav1.NewTime(time.Unix(50, 0).UTC()), result) +} + +func TestRetrieveIndex(t *testing.T) { + + // Base case. + workflow := &workflowapi.Workflow{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + util.LabelKeyWorkflowIndex: "100", + }, + }, + } + result := retrieveIndex(workflow) + assert.Equal(t, int64(100), result) + + // No label + workflow = &workflowapi.Workflow{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "WRONG_LABEL": "100", + }, + }, + } + result = retrieveIndex(workflow) + assert.Equal(t, int64(0), result) + + // Parsing problem + workflow = &workflowapi.Workflow{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + util.LabelKeyWorkflowIndex: "UNPARSABLE_LABEL_!@^^!%@#%", + }, + }, + } + result = retrieveIndex(workflow) + assert.Equal(t, int64(0), result) +} + +func TestLabelSelectorToGetWorkflows(t *testing.T) { + + // Completed + result := getLabelSelectorToGetWorkflows( + "PIPELINE_NAME", + true, /* completed */ + 50 /* min index */) + + expected := labels.NewSelector() + + req, err := labels.NewRequirement(workflowcommon.LabelKeyCompleted, selection.Equals, + []string{"true"}) + assert.Nil(t, err) + expected = expected.Add(*req) + + req, err = labels.NewRequirement(util.LabelKeyWorkflowScheduledWorkflowName, selection.Equals, + []string{"PIPELINE_NAME"}) + assert.Nil(t, err) + expected = expected.Add(*req) + + req, err = labels.NewRequirement(util.LabelKeyWorkflowIndex, selection.GreaterThan, + []string{"50"}) + assert.Nil(t, err) + expected = expected.Add(*req) + + assert.Equal(t, expected, *result) + + // Not completed + result = getLabelSelectorToGetWorkflows( + "PIPELINE_NAME", + false, /* completed */ + 50 /* min index */) + + expected = labels.NewSelector() + + req, err = labels.NewRequirement(workflowcommon.LabelKeyCompleted, selection.NotEquals, + []string{"true"}) + assert.Nil(t, err) + expected = expected.Add(*req) + + req, err = labels.NewRequirement(util.LabelKeyWorkflowScheduledWorkflowName, selection.Equals, + []string{"PIPELINE_NAME"}) + assert.Nil(t, err) + expected = expected.Add(*req) + + req, err = labels.NewRequirement(util.LabelKeyWorkflowIndex, selection.GreaterThan, + []string{"50"}) + assert.Nil(t, err) + expected = expected.Add(*req) + + assert.Equal(t, expected, *result) +} diff --git a/resources/scheduledworkflow/controller.go b/resources/scheduledworkflow/controller.go new file mode 100644 index 00000000000..d780a4c62ba --- /dev/null +++ b/resources/scheduledworkflow/controller.go @@ -0,0 +1,523 @@ +// Copyright 2018 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "fmt" + workflowapi "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" + workflowclientset "github.com/argoproj/argo/pkg/client/clientset/versioned" + workflowinformers "github.com/argoproj/argo/pkg/client/informers/externalversions" + swfregister "github.com/kubeflow/pipelines/pkg/apis/scheduledworkflow" + swfapi "github.com/kubeflow/pipelines/pkg/apis/scheduledworkflow/v1alpha1" + swfclientset "github.com/kubeflow/pipelines/pkg/client/clientset/versioned" + swfScheme "github.com/kubeflow/pipelines/pkg/client/clientset/versioned/scheme" + swfinformers "github.com/kubeflow/pipelines/pkg/client/informers/externalversions" + "github.com/kubeflow/pipelines/resources/scheduledworkflow/client" + "github.com/kubeflow/pipelines/resources/scheduledworkflow/util" + wraperror "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + "time" +) + +const ( + Workflow = "Workflow" + ScheduledWorkflow = "ScheduledWorkflow" +) + +var ( + // DefaultJobBackOff is the max backoff period + DefaultJobBackOff = 10 * time.Second + // MaxJobBackOff is the max backoff period + MaxJobBackOff = 360 * time.Second +) + +// Controller is the controller implementation for ScheduledWorkflow resources +type Controller struct { + kubeClient *client.KubeClient + swfClient *client.ScheduledWorkflowClient + workflowClient *client.WorkflowClient + + // workqueue is a rate limited work queue. This is used to queue work to be + // processed instead of performing it as soon as a change happens. This + // means we can ensure we only process a fixed amount of resources at a + // time, and makes it easy to ensure we are never processing the same item + // simultaneously in two different workers. + workqueue workqueue.RateLimitingInterface + + // An interface to generate the current time. + time util.TimeInterface +} + +// NewController returns a new sample controller +func NewController( + kubeClientSet kubernetes.Interface, + swfClientSet swfclientset.Interface, + workflowClientSet workflowclientset.Interface, + swfInformerFactory swfinformers.SharedInformerFactory, + workflowInformerFactory workflowinformers.SharedInformerFactory, + time util.TimeInterface) *Controller { + + // obtain references to shared informers + swfInformer := swfInformerFactory.Scheduledworkflow().V1alpha1().ScheduledWorkflows() + workflowInformer := workflowInformerFactory.Argoproj().V1alpha1().Workflows() + + // Add controller types to the default Kubernetes Scheme so Events can be + // logged for controller types. + swfScheme.AddToScheme(scheme.Scheme) + + // Create event broadcaster + log.Info("Creating event broadcaster") + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(log.Infof) + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClientSet.CoreV1().Events("")}) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: util.ControllerAgentName}) + + controller := &Controller{ + kubeClient: client.NewKubeClient(kubeClientSet, recorder), + swfClient: client.NewScheduledWorkflowClient(swfClientSet, swfInformer), + workflowClient: client.NewWorkflowClient(workflowClientSet, workflowInformer), + workqueue: workqueue.NewNamedRateLimitingQueue( + workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), swfregister.Kind), + time: time, + } + + log.Info("Setting up event handlers") + + // Set up an event handler for when the Scheduled Workflow changes + controller.swfClient.AddEventHandler(&cache.ResourceEventHandlerFuncs{ + AddFunc: controller.enqueueScheduledWorkflow, + UpdateFunc: func(old, new interface{}) { + controller.enqueueScheduledWorkflow(new) + }, + DeleteFunc: controller.enqueueScheduledWorkflowForDelete, + }) + + // Set up an event handler for when WorkflowHistory resources change. This + // handler will lookup the owner of the given WorkflowHistory, and if it is + // owned by a ScheduledWorkflow, it will enqueue that ScheduledWorkflow for + // processing. This way, we don't need to implement custom logic for + // handling WorkflowHistory resources. More info on this pattern: + // https://github.com/kubernetes/community/blob/8cafef897a22026d42f5e5bb3f104febe7e29830/contributors/devel/controllers.md + controller.workflowClient.AddEventHandler(&cache.ResourceEventHandlerFuncs{ + AddFunc: controller.handleWorkflow, + UpdateFunc: func(old, new interface{}) { + newWorkflow := new.(*workflowapi.Workflow) + oldWorkflow := old.(*workflowapi.Workflow) + if newWorkflow.ResourceVersion == oldWorkflow.ResourceVersion { + // Periodic resync will send update events for all known Workflows. + // Two different versions of the same WorkflowHistory will always have different RVs. + return + } + controller.handleWorkflow(new) + }, + DeleteFunc: controller.handleWorkflow, + }) + + return controller +} + +// Run will set up the event handlers for types we are interested in, as well +// as syncing informer caches and starting workers. It will block until stopCh +// is closed, at which point it will shutdown the workqueue and wait for +// workers to finish processing their current work items. +func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { + defer runtime.HandleCrash() + defer c.workqueue.ShutDown() + + // Start the informer factories to begin populating the informer caches + log.Info("Starting ScheduledWorkflow controller") + + // Wait for the caches to be synced before starting workers + log.Info("Waiting for informer caches to sync") + + if ok := cache.WaitForCacheSync(stopCh, + c.workflowClient.HasSynced(), + c.swfClient.HasSynced()); !ok { + return fmt.Errorf("Failed to wait for caches to sync") + } + + // Launch multiple workers to process ScheduledWorkflows + log.Info("Starting workers") + for i := 0; i < threadiness; i++ { + go wait.Until(c.runWorker, time.Second, stopCh) + } + log.Info("Started workers") + + log.Info("Wait for shut down") + <-stopCh + log.Info("Shutting down workers") + + return nil +} + +// runWorker is a long-running function that will continually call the +// processNextWorkItem function in order to read and process a message on the +// workqueue. It enforces that the syncHandler is never invoked concurrently with the same key. +func (c *Controller) runWorker() { + for c.processNextWorkItem() { + } +} + +// enqueueScheduledWorkflow takes a ScheduledWorkflow and converts it into a namespace/name +// string which is then put onto the work queue. This method should *not* be +// passed resources of any type other than ScheduledWorkflow. +func (c *Controller) enqueueScheduledWorkflow(obj interface{}) { + var key string + var err error + if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + runtime.HandleError(fmt.Errorf("Equeuing object: error: %v: %+v", err, obj)) + return + } + c.workqueue.AddRateLimited(key) +} + +func (c *Controller) enqueueScheduledWorkflowForDelete(obj interface{}) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err == nil { + c.workqueue.Add(key) + } +} + +// handleWorkflow will take any resource implementing metav1.Object and attempt +// to find the ScheduledWorkflow that 'owns' it. It does this by looking at the +// objects metadata.ownerReferences field for an appropriate OwnerReference. +// It then enqueues that ScheduledWorkflow to be processed. If the object does not +// have an appropriate OwnerReference, it will simply be skipped. +func (c *Controller) handleWorkflow(obj interface{}) { + var object metav1.Object + var ok bool + if object, ok = obj.(metav1.Object); !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + runtime.HandleError(fmt.Errorf("Error decoding object, invalid type.")) + return + } + object, ok = tombstone.Obj.(metav1.Object) + if !ok { + runtime.HandleError(fmt.Errorf("Error decoding object tombstone, invalid type.")) + return + } + log.WithFields(log.Fields{ + Workflow: object.GetName(), + }).Infof("Recovered deleted object '%s' from tombstone.", object.GetName()) + } + + if ownerRef := metav1.GetControllerOf(object); ownerRef != nil { + // If this object is not owned by a ScheduledWorkflow, we should not do anything more + // with it. + if ownerRef.Kind != swfregister.Kind { + log.WithFields(log.Fields{ + Workflow: object.GetName(), + }).Infof("Processing object (%s): owner is not a Scheduled Workflow.", object.GetName()) + return + } + + swf, err := c.swfClient.Get(object.GetNamespace(), ownerRef.Name) + if err != nil { + log.WithFields(log.Fields{ + Workflow: object.GetName(), + }).Infof("Processing object (%s): ignoring orphaned object of scheduled Workflow (%s).", + object.GetName(), ownerRef.Name) + return + } + + log.WithFields(log.Fields{ + Workflow: object.GetName(), + ScheduledWorkflow: ownerRef.Name, + }).Infof("Processing object (%s): owner is a ScheduledWorkflow (%s).", object.GetName(), + ownerRef.Name) + c.enqueueScheduledWorkflow(swf.Get()) + return + } + log.WithFields(log.Fields{ + Workflow: object.GetName(), + }).Infof("Processing object (%s): object has no owner.", object.GetName()) + return +} + +// processNextWorkItem will read a single work item off the workqueue and +// attempt to process it, by calling the syncHandler. +func (c *Controller) processNextWorkItem() bool { + obj, shutdown := c.workqueue.Get() + + if shutdown { + return false + } + + // We wrap this block in a func so we can defer c.workqueue.Done. + return func(obj interface{}) bool { + // We call Done here so the workqueue knows we have finished + // processing this item. We also must remember to call Forget if we + // do not want this work item being re-queued. For example, we do + // not call Forget if a transient error occurs, instead the item is + // put back on the workqueue and attempted again after a back-off + // period. + defer c.workqueue.Done(obj) + var key string + var ok bool + // We expect strings to come off the workqueue. These are of the + // form namespace/name. We do this as the delayed nature of the + // workqueue means the items in the informer cache may actually be + // more up to date that when the item was initially put onto the + // workqueue. + if key, ok = obj.(string); !ok { + // As the item in the workqueue is actually invalid, we call + // Forget here else we'd go into a loop of attempting to + // process a work item that is invalid. + c.workqueue.Forget(obj) + runtime.HandleError(fmt.Errorf("Expected string in workqueue but got %#v", obj)) + return true + } + + // Notes on workqueues: + // - when using: workqueue.Forget + // The item is reprocessed after the next SharedInformerFactory defaultResync. + // - when using: workqueue.Forget && workqueue.Add() + // The item is reprocessed immediately. + // This is not recommended as the status changes may not have propagated, leading to + // a (recoverable) versioning error. + // - when using: workqueue.Forget && workqueue.AddAfter(X seconds) + // The item is reprocessed after X seconds. + // It can be re-processes earlier depending on SharedInformerFactory defaultResync. + // Deleting and recreating the resource using kubectl does not trigger early processing. + // - when using: workqueue.Forget && workqueue.AddRateLimited() + // The item is reprocessed after the baseDelay + // - when using: workqueue.AddRateLimited() + // The item is reprocessed folowing the exponential backoff strategy: + // baseDelay * 10^(failure count) + // It is not reprocessed earlier due to SharedInformerFactory defaultResync. + // It is not reprocessed earlier even if the resource is deleted/re-created. + // - when using: workqueue.Add() + // The item is reprocessed immediately (not recommended) + // - when using: workqueue.AddAfter(X seconds) + // The item is reprocessed immediately + // - when using: nothing + // The item is reprocessed using the exponential backoff strategy. + + // Run the syncHandler, passing it the namespace/name string of the + // ScheduledWorkflow to be synced. + syncAgain, retryOnError, swf, err := c.syncHandler(key) + if err != nil && retryOnError { + // Transient failure. We will retry. + c.workqueue.AddRateLimited(obj) // Exponential backoff. + runtime.HandleError(fmt.Errorf("Transient failure: %+v", err)) + if swf != nil { + c.kubeClient.RecordSyncFailure(swf.Get(), + fmt.Sprintf("Transient failure: %v", err.Error())) + } + return true + } else if err != nil && !retryOnError { + // Permanent failure. We won't retry. + // Will resync after the SharedInformerFactory defaultResync delay. + c.workqueue.Forget(obj) + runtime.HandleError(fmt.Errorf("Permanent failure: %+v", err)) + if swf != nil { + c.kubeClient.RecordSyncFailure(swf.Get(), + fmt.Sprintf("Permanent failure: %v", err.Error())) + } + return true + } else if err == nil && !syncAgain { + // Success. + // Will resync after the SharedInformerFactory defaultResync delay. + c.workqueue.Forget(obj) + if swf != nil { + c.kubeClient.RecordSyncSuccess(swf.Get(), "All done") + } + return true + } else { + // Success and sync again soon. + c.workqueue.Forget(obj) + c.workqueue.AddAfter(obj, 10*time.Second) // Need status changes to propagate. + if swf != nil { + c.kubeClient.RecordSyncSuccess(swf.Get(), "Partially done, syncing again soon") + } + return true + } + }(obj) +} + +// syncHandler compares the actual state with the desired, and attempts to +// converge the two. It then updates the Status block of the ScheduledWorkflow +// with the current status of the resource. +func (c *Controller) syncHandler(key string) ( + syncAgain bool, retryOnError bool, swf *util.ScheduledWorkflow, err error) { + + // Convert the namespace/name string into a distinct namespace and name + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + // Permanent failure. + return false, false, nil, + wraperror.Wrapf(err, "Invalid resource key (%s): %v", key, err) + } + + // Get the ScheduledWorkflow with this namespace/name + swf, err = c.swfClient.Get(namespace, name) + if err != nil { + // Permanent failure. + // The ScheduledWorkflow may no longer exist, we stop processing and do not retry. + return false, false, nil, + wraperror.Wrapf(err, "ScheduledWorkflow (%s) in work queue no longer exists: %v", key, err) + } + + // Get the current time + // NOTE: call time.Now() only once per event so that all the functions have a consistent + // number for the current time. + nowEpoch := c.time.Now().Unix() + + // Get active workflows for this ScheduledWorkflow. + active, err := c.workflowClient.List(swf.Name, + false, /* active workflow */ + 0 /* retrieve all workflows */) + if err != nil { + return false, true, swf, + wraperror.Wrapf(err, "Syncing ScheduledWorkflow (%v): transient failure, can't fetch active workflows: %v", name, err) + } + + // Get completed workflows for this ScheduledWorkflow. + completed, err := c.workflowClient.List(swf.Name, + true, /* completed workflows */ + swf.MinIndex()) + if err != nil { + return false, true, swf, + wraperror.Wrapf(err, "Syncing ScheduledWorkflow (%v): transient failure, can't fetch completed workflows: %v", name, err) + } + + workflow, nextScheduledEpoch, err := c.submitNextWorkflowIfNeeded(swf, len(active), nowEpoch) + if err != nil { + return false, true, swf, + wraperror.Wrapf(err, "Syncing ScheduledWorkflow (%v): transient failure, can't fetch completed workflows: %v", name, err) + } + + err = c.updateStatus(swf, workflow, active, completed, nextScheduledEpoch, nowEpoch) + if err != nil { + return false, true, swf, + wraperror.Wrapf(err, "Syncing ScheduledWorkflow (%v): transient failure, can't update swf status: %v", name, err) + } + + if workflow != nil { + // Success. Since we created a new workflow, sync again soon since there might be one more + // resource to create. + log.WithFields(log.Fields{ + ScheduledWorkflow: name, + }).Infof("Syncing ScheduledWorkflow (%v): success, requeuing for further processing.", name) + return true, false, swf, nil + } + + // Success. We did not create any new resource. We can sync again when something changes. + log.WithFields(log.Fields{ + ScheduledWorkflow: name, + }).Infof("Syncing ScheduledWorkflow (%v): success, processing complete.", name) + return false, false, swf, nil +} + +// Submits the next workflow if a workflow is due to execute. Returns the submitted workflow, +// an error (if any), and a boolean indicating (in case of an error) whether handling the +// ScheduledWorkflow should be attempted again at a later time. +func (c *Controller) submitNextWorkflowIfNeeded(swf *util.ScheduledWorkflow, + activeWorkflowCount int, nowEpoch int64) ( + workflow *util.Workflow, nextScheduledEpoch int64, err error) { + // Compute the next scheduled time. + nextScheduledEpoch, shouldRunNow := swf.GetNextScheduledEpoch( + int64(activeWorkflowCount), nowEpoch) + + if !shouldRunNow { + log.WithFields(log.Fields{ + ScheduledWorkflow: swf.Name, + }).Infof("Submitting workflow for ScheduledWorkflow (%v): nothing to submit (next scheduled at: %v)", + swf.Name, util.FormatTimeForLogging(nextScheduledEpoch)) + return nil, nextScheduledEpoch, nil + } + + workflow, err = c.submitNewWorkflowIfNotAlreadySubmitted(swf, nextScheduledEpoch, nowEpoch) + if err != nil { + log.WithFields(log.Fields{ + ScheduledWorkflow: swf.Name, + }).Errorf("Submitting workflow for ScheduledWorkflow (%v): transient error while submitting workflow: %v", + swf.Name, err) + // There was an error submitting a new workflow. + // We should attempt to handle the schedule again at a later time. + return nil, nextScheduledEpoch, err + } + log.WithFields(log.Fields{ + ScheduledWorkflow: swf.Name, + Workflow: workflow.Get().Name, + }).Infof("Submitting workflow for ScheduledWorkflow (%v): workflow (%v) successfully submitted (scheduled at: %v)", + swf.Name, workflow.Get().Name, util.FormatTimeForLogging(nextScheduledEpoch)) + return workflow, nextScheduledEpoch, nil +} + +func (c *Controller) submitNewWorkflowIfNotAlreadySubmitted( + swf *util.ScheduledWorkflow, nextScheduledEpoch int64, nowEpoch int64) ( + *util.Workflow, error) { + + workflowName := swf.NextResourceName() + + // Try to fetch this workflow + // If it already exists, it means that it was already created in a previous iteration + // of this controller but that the controller failed to save this data. + foundWorkflow, isNotFoundError, err := c.workflowClient.Get(swf.Namespace, + workflowName) + if err == nil { + // The workflow was already created by a previous iteration of this controller. + // Nothing to do except returning the information needed by the controller to update + // the ScheduledWorkflow status. + return foundWorkflow, nil + } + + if !isNotFoundError { + // There was an error while attempting to retrieve the workflow + return nil, err + } + + // If the workflow is not found, we need to create it. + newWorkflow := swf.NewWorkflow(nextScheduledEpoch, nowEpoch) + createdWorkflow, err := c.workflowClient.Create(swf.Namespace, newWorkflow) + + if err != nil { + return nil, err + } + return createdWorkflow, nil +} + +func (c *Controller) updateStatus( + swf *util.ScheduledWorkflow, + workflow *util.Workflow, + active []swfapi.WorkflowStatus, + completed []swfapi.WorkflowStatus, + nextScheduledEpoch int64, + nowEpoch int64) error { + // NEVER modify objects from the store. It's a read-only, local cache. + // You can use DeepCopy() to make a deep copy of original object and modify this copy + // Or create a copy manually for better performance + swfCopy := util.NewScheduledWorkflow(swf.Get().DeepCopy()) + swfCopy.UpdateStatus(nowEpoch, workflow, nextScheduledEpoch, active, completed) + + // Until #38113 is merged, we must use Update instead of UpdateStatus to + // update the Status block of the ScheduledWorkflow. UpdateStatus will not + // allow changes to the Spec of the resource, which is ideal for ensuring + // nothing other than resource status has been updated. + return c.swfClient.Update(swf.Namespace, swfCopy) +} diff --git a/resources/scheduledworkflow/main.go b/resources/scheduledworkflow/main.go new file mode 100644 index 00000000000..41579254a8c --- /dev/null +++ b/resources/scheduledworkflow/main.go @@ -0,0 +1,85 @@ +// Copyright 2018 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "flag" + workflowclientSet "github.com/argoproj/argo/pkg/client/clientset/versioned" + workflowinformers "github.com/argoproj/argo/pkg/client/informers/externalversions" + swfclientset "github.com/kubeflow/pipelines/pkg/client/clientset/versioned" + swfinformers "github.com/kubeflow/pipelines/pkg/client/informers/externalversions" + "github.com/kubeflow/pipelines/pkg/signals" + "github.com/kubeflow/pipelines/resources/scheduledworkflow/util" + log "github.com/sirupsen/logrus" + "k8s.io/client-go/kubernetes" + _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" + "k8s.io/client-go/tools/clientcmd" + "time" +) + +var ( + masterURL string + kubeconfig string +) + +func main() { + flag.Parse() + + // set up signals so we handle the first shutdown signal gracefully + stopCh := signals.SetupSignalHandler() + + cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig) + if err != nil { + log.Fatalf("Error building kubeconfig: %s", err.Error()) + } + + kubeClient, err := kubernetes.NewForConfig(cfg) + if err != nil { + log.Fatalf("Error building kubernetes clientset: %s", err.Error()) + } + + scheduleClient, err := swfclientset.NewForConfig(cfg) + if err != nil { + log.Fatalf("Error building schedule clientset: %s", err.Error()) + } + + workflowClient, err := workflowclientSet.NewForConfig(cfg) + if err != nil { + log.Fatalf("Error building workflow clientset: %s", err.Error()) + } + + scheduleInformerFactory := swfinformers.NewSharedInformerFactory(scheduleClient, time.Second*30) + workflowInformerFactory := workflowinformers.NewSharedInformerFactory(workflowClient, time.Second*30) + + controller := NewController( + kubeClient, + scheduleClient, + workflowClient, + scheduleInformerFactory, + workflowInformerFactory, + util.NewRealTime()) + + go scheduleInformerFactory.Start(stopCh) + go workflowInformerFactory.Start(stopCh) + + if err = controller.Run(2, stopCh); err != nil { + log.Fatalf("Error running controller: %s", err.Error()) + } +} + +func init() { + flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") + flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") +} diff --git a/resources/scheduledworkflow/util/constants.go b/resources/scheduledworkflow/util/constants.go new file mode 100644 index 00000000000..a927f10f0ce --- /dev/null +++ b/resources/scheduledworkflow/util/constants.go @@ -0,0 +1,44 @@ +// Copyright 2018 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + constants "github.com/kubeflow/pipelines/pkg/apis/scheduledworkflow" +) + +const ( + // ControllerAgentName is the name of the controller. + ControllerAgentName = "scheduled-workflow-controller" + + // LabelKeyWorkflowEpoch is a label on a Workflow. + // It captures the epoch at which the workflow was scheduled. + LabelKeyWorkflowEpoch = constants.FullName + "/workflowEpoch" + // LabelKeyWorkflowIndex is a label on a Workflow. + // It captures the index of creation the workflow by the ScheduledWorkflow. + LabelKeyWorkflowIndex = constants.FullName + "/workflowIndex" + // LabelKeyWorkflowIsOwnedByScheduledWorkflow is a label on a Workflow. + // It captures whether the workflow is owned by a ScheduledWorkflow. + LabelKeyWorkflowIsOwnedByScheduledWorkflow = constants.FullName + "/isOwnedByScheduledWorkflow" + // LabelKeyWorkflowScheduledWorkflowName is a label on a Workflow. + // It captures whether the name of the owning ScheduledWorkflow. + LabelKeyWorkflowScheduledWorkflowName = constants.FullName + "/scheduledWorkflowName" + + // LabelKeyScheduledWorkflowEnabled is a label on a ScheduledWorkflow. + // It captures whether the ScheduledWorkflow is enabled. + LabelKeyScheduledWorkflowEnabled = constants.FullName + "/enabled" + // LabelKeyScheduledWorkflowStatus is a label on a ScheduledWorkflow. + // It captures the status of the scheduled workflow. + LabelKeyScheduledWorkflowStatus = constants.FullName + "/status" +) diff --git a/pkg/client/clientset/versioned/typed/schedule/v1alpha1/fake/fake_schedule_client.go b/resources/scheduledworkflow/util/convert.go similarity index 51% rename from pkg/client/clientset/versioned/typed/schedule/v1alpha1/fake/fake_schedule_client.go rename to resources/scheduledworkflow/util/convert.go index cff61b39000..0f5f98c0dc7 100644 --- a/pkg/client/clientset/versioned/typed/schedule/v1alpha1/fake/fake_schedule_client.go +++ b/resources/scheduledworkflow/util/convert.go @@ -12,27 +12,36 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Code generated by client-gen. DO NOT EDIT. - -package fake +package util import ( - v1alpha1 "github.com/kubeflow/pipelines/pkg/client/clientset/versioned/typed/schedule/v1alpha1" - rest "k8s.io/client-go/rest" - testing "k8s.io/client-go/testing" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -type FakeScheduleV1alpha1 struct { - *testing.Fake +// StringPointer converts a string to a string pointer. +func StringPointer(s string) *string { + return &s +} + +// BooleanPointer converts a bool to a bool pointer. +func BooleanPointer(b bool) *bool { + return &b +} + +// Metav1TimePointer converts a metav1.Time to a pointer. +func Metav1TimePointer(t metav1.Time) *metav1.Time { + return &t } -func (c *FakeScheduleV1alpha1) Schedules(namespace string) v1alpha1.ScheduleInterface { - return &FakeSchedules{c, namespace} +// Int64Pointer converts an int64 to a pointer. +func Int64Pointer(i int64) *int64 { + return &i } -// RESTClient returns a RESTClient that is used to communicate -// with API server by this client implementation. -func (c *FakeScheduleV1alpha1) RESTClient() rest.Interface { - var ret *rest.RESTClient - return ret +func toInt64Pointer(t *metav1.Time) *int64 { + if t == nil { + return nil + } else { + return Int64Pointer(t.Unix()) + } } diff --git a/resources/scheduledworkflow/util/cron_schedule.go b/resources/scheduledworkflow/util/cron_schedule.go new file mode 100644 index 00000000000..9bd43be1d1b --- /dev/null +++ b/resources/scheduledworkflow/util/cron_schedule.go @@ -0,0 +1,76 @@ +// Copyright 2018 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + swfapi "github.com/kubeflow/pipelines/pkg/apis/scheduledworkflow/v1alpha1" + wraperror "github.com/pkg/errors" + "github.com/robfig/cron" + log "github.com/sirupsen/logrus" + "math" + "time" +) + +// CronSchedule is a type to help manipulate CronSchedule objects. +type CronSchedule struct { + *swfapi.CronSchedule +} + +// NewCronSchedule creates a CronSchedule. +func NewCronSchedule(cronSchedule *swfapi.CronSchedule) *CronSchedule { + if cronSchedule == nil { + log.Fatalf("The cronSchedule should never be nil") + } + + return &CronSchedule{ + cronSchedule, + } +} + +// GetNextScheduledEpoch returns the next epoch at which a workflow must be +// scheduled. +func (s *CronSchedule) GetNextScheduledEpoch(lastJobEpoch *int64, + defaultStartEpoch int64) int64 { + effectiveLastJobEpoch := defaultStartEpoch + if lastJobEpoch != nil { + effectiveLastJobEpoch = *lastJobEpoch + } else if s.StartTime != nil { + effectiveLastJobEpoch = s.StartTime.Unix() + } + return s.getNextScheduledEpoch(effectiveLastJobEpoch) +} + +func (s *CronSchedule) getNextScheduledEpoch(lastJobEpoch int64) int64 { + schedule, err := cron.Parse(s.Cron) + if err != nil { + // This should never happen, validation should have caught this at resource creation. + log.Errorf("%+v", wraperror.Errorf( + "Found invalid schedule (%v): %v", s.Cron, err)) + return math.MaxInt64 + } + + startEpoch := lastJobEpoch + if s.StartTime != nil && s.StartTime.Unix() > startEpoch { + startEpoch = s.StartTime.Unix() + } + result := schedule.Next(time.Unix(startEpoch, 0).UTC()).Unix() + + if s.EndTime != nil && + s.EndTime.Unix() < result { + return math.MaxInt64 + } + + return result +} diff --git a/resources/scheduledworkflow/util/cron_schedule_test.go b/resources/scheduledworkflow/util/cron_schedule_test.go new file mode 100644 index 00000000000..7e7b107b969 --- /dev/null +++ b/resources/scheduledworkflow/util/cron_schedule_test.go @@ -0,0 +1,110 @@ +// Copyright 2018 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + swfapi "github.com/kubeflow/pipelines/pkg/apis/scheduledworkflow/v1alpha1" + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/apis/meta/v1" + "math" + "testing" + "time" +) + +const ( + second = 1 + minute = 60 * second + hour = 60 * minute +) + +func TestCronSchedule_getNextScheduledEpoch_Cron_StartDate_EndDate(t *testing.T) { + // First job. + schedule := NewCronSchedule(&swfapi.CronSchedule{ + StartTime: Metav1TimePointer(v1.NewTime(time.Unix(10*hour, 0).UTC())), + EndTime: Metav1TimePointer(v1.NewTime(time.Unix(11*hour, 0).UTC())), + Cron: "0 * * * * * ", + }) + lastJobEpoch := int64(0) + assert.Equal(t, int64(10*hour+minute), + schedule.getNextScheduledEpoch(lastJobEpoch)) + + // Not the first job. + lastJobEpoch = int64(10*hour + 5*minute) + assert.Equal(t, int64(10*hour+6*minute), + schedule.getNextScheduledEpoch(lastJobEpoch)) + + // Last job + lastJobEpoch = int64(13 * hour) + assert.Equal(t, int64(math.MaxInt64), + schedule.getNextScheduledEpoch(lastJobEpoch)) + +} + +func TestCronSchedule_getNextScheduledEpoch_CronOnly(t *testing.T) { + schedule := NewCronSchedule(&swfapi.CronSchedule{ + Cron: "0 * * * * * ", + }) + lastJobEpoch := int64(10 * hour) + assert.Equal(t, int64(10*hour+minute), + schedule.getNextScheduledEpoch(lastJobEpoch)) +} + +func TestCronSchedule_getNextScheduledEpoch_NoCron(t *testing.T) { + schedule := NewCronSchedule(&swfapi.CronSchedule{ + StartTime: Metav1TimePointer(v1.NewTime(time.Unix(10*hour, 0).UTC())), + EndTime: Metav1TimePointer(v1.NewTime(time.Unix(11*hour, 0).UTC())), + Cron: "", + }) + lastJobEpoch := int64(0) + assert.Equal(t, int64(math.MaxInt64), + schedule.getNextScheduledEpoch(lastJobEpoch)) +} + +func TestCronSchedule_getNextScheduledEpoch_InvalidCron(t *testing.T) { + schedule := NewCronSchedule(&swfapi.CronSchedule{ + StartTime: Metav1TimePointer(v1.NewTime(time.Unix(10*hour, 0).UTC())), + EndTime: Metav1TimePointer(v1.NewTime(time.Unix(11*hour, 0).UTC())), + Cron: "*$&%*(W&", + }) + lastJobEpoch := int64(0) + assert.Equal(t, int64(math.MaxInt64), + schedule.getNextScheduledEpoch(lastJobEpoch)) +} + +func TestCronSchedule_GetNextScheduledEpoch(t *testing.T) { + // There was a previous job. + schedule := NewCronSchedule(&swfapi.CronSchedule{ + StartTime: Metav1TimePointer(v1.NewTime(time.Unix(10*hour+10*minute, 0).UTC())), + EndTime: Metav1TimePointer(v1.NewTime(time.Unix(11*hour, 0).UTC())), + Cron: "0 * * * * * ", + }) + lastJobEpoch := int64(10*hour + 20*minute) + defaultStartEpoch := int64(10*hour + 15*minute) + assert.Equal(t, int64(10*hour+20*minute+minute), + schedule.GetNextScheduledEpoch(&lastJobEpoch, defaultStartEpoch)) + + // There is no previous job, falling back on the start date of the schedule. + assert.Equal(t, int64(10*hour+10*minute+minute), + schedule.GetNextScheduledEpoch(nil, defaultStartEpoch)) + + // There is no previous job, no schedule start date, falling back on the + // creation date of the workflow. + schedule = NewCronSchedule(&swfapi.CronSchedule{ + EndTime: Metav1TimePointer(v1.NewTime(time.Unix(11*hour, 0).UTC())), + Cron: "0 * * * * * ", + }) + assert.Equal(t, int64(10*hour+15*minute+minute), + schedule.GetNextScheduledEpoch(nil, defaultStartEpoch)) +} diff --git a/pkg/client/clientset/versioned/typed/schedule/v1alpha1/fake/doc.go b/resources/scheduledworkflow/util/error.go similarity index 50% rename from pkg/client/clientset/versioned/typed/schedule/v1alpha1/fake/doc.go rename to resources/scheduledworkflow/util/error.go index c3f1566b39f..ddd2f1ed4c0 100644 --- a/pkg/client/clientset/versioned/typed/schedule/v1alpha1/fake/doc.go +++ b/resources/scheduledworkflow/util/error.go @@ -12,7 +12,25 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Code generated by client-gen. DO NOT EDIT. +package util -// Package fake has the automatically generated clients. -package fake +import ( + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// IsNotFound returns whether an error indicates that a resource was "not found". +func IsNotFound(err error) bool { + return reasonForError(err) == metav1.StatusReasonNotFound +} + +// ReasonForError returns the HTTP status for a particular error. +func reasonForError(err error) metav1.StatusReason { + switch t := err.(type) { + case errors.APIStatus: + return t.Status().Reason + case *errors.StatusError: + return t.Status().Reason + } + return metav1.StatusReasonUnknown +} diff --git a/pkg/client/listers/schedule/v1alpha1/expansion_generated.go b/resources/scheduledworkflow/util/error_test.go similarity index 61% rename from pkg/client/listers/schedule/v1alpha1/expansion_generated.go rename to resources/scheduledworkflow/util/error_test.go index 2cd91a6f38d..02d0f2d0418 100644 --- a/pkg/client/listers/schedule/v1alpha1/expansion_generated.go +++ b/resources/scheduledworkflow/util/error_test.go @@ -12,14 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Code generated by lister-gen. DO NOT EDIT. +package util -package v1alpha1 +import ( + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime/schema" + "testing" +) -// ScheduleListerExpansion allows custom methods to be added to -// ScheduleLister. -type ScheduleListerExpansion interface{} - -// ScheduleNamespaceListerExpansion allows custom methods to be added to -// ScheduleNamespaceLister. -type ScheduleNamespaceListerExpansion interface{} +func TestIsNotFound(t *testing.T) { + assert.Equal(t, true, IsNotFound(errors.NewNotFound(schema.GroupResource{}, "NAME"))) + assert.Equal(t, false, IsNotFound(errors.NewAlreadyExists(schema.GroupResource{}, "NAME"))) +} diff --git a/resources/scheduledworkflow/util/label.go b/resources/scheduledworkflow/util/label.go new file mode 100644 index 00000000000..0eab4581e89 --- /dev/null +++ b/resources/scheduledworkflow/util/label.go @@ -0,0 +1,69 @@ +// Copyright 2018 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "fmt" + "github.com/argoproj/argo/workflow/common" + log "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" + "strconv" +) + +// GetRequirementForCompletedWorkflowOrFatal returns a label requirement indicating +// whether a workflow is completed. +func GetRequirementForCompletedWorkflowOrFatal(completed bool) *labels.Requirement { + operator := selection.NotEquals + if completed == true { + operator = selection.Equals + } + req, err := labels.NewRequirement(common.LabelKeyCompleted, operator, + []string{"true"}) + if err != nil { + log.Fatalf("Error while creating requirement: %s", err) + } + return req +} + +// GetRequirementForScheduleNameOrFatal returns a label requirement for a specific +// ScheduledWorkflow name. +func GetRequirementForScheduleNameOrFatal(swf string) *labels.Requirement { + req, err := labels.NewRequirement(LabelKeyWorkflowScheduledWorkflowName, selection.Equals, []string{swf}) + if err != nil { + log.Fatalf("Error while creating requirement: %s", err) + } + return req +} + +// GetRequirementForScheduleNameOrFatal returns a label requirement for a minimum +// index of creation of a workflow (to avoid querying the whole list). +func GetRequirementForMinIndexOrFatal(minIndex int64) *labels.Requirement { + req, err := labels.NewRequirement(LabelKeyWorkflowIndex, selection.GreaterThan, + []string{formatInt64ForLabel(minIndex)}) + if err != nil { + log.Fatalf("Error while creating requirement: %s", err) + } + return req +} + +func formatInt64ForLabel(epoch int64) string { + return fmt.Sprintf("%d", epoch) +} + +// RetrieveInt64FromLabel converts a string label value into an epoch. +func RetrieveInt64FromLabel(epoch string) (int64, error) { + return strconv.ParseInt(epoch, 10, 64) +} diff --git a/pkg/client/clientset/versioned/typed/schedule/v1alpha1/generated_expansion.go b/resources/scheduledworkflow/util/label_test.go similarity index 64% rename from pkg/client/clientset/versioned/typed/schedule/v1alpha1/generated_expansion.go rename to resources/scheduledworkflow/util/label_test.go index 48b8ed8c77b..e744dbceb7e 100644 --- a/pkg/client/clientset/versioned/typed/schedule/v1alpha1/generated_expansion.go +++ b/resources/scheduledworkflow/util/label_test.go @@ -12,8 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Code generated by client-gen. DO NOT EDIT. +package util -package v1alpha1 +import ( + "github.com/stretchr/testify/assert" + "testing" +) -type ScheduleExpansion interface{} +func TestFormatInt64ForLabel(t *testing.T) { + assert.Equal(t, "100", formatInt64ForLabel(100)) +} + +func TestRetrieveInt64FromLabel(t *testing.T) { + result, err := RetrieveInt64FromLabel("100") + assert.Nil(t, err) + assert.Equal(t, int64(100), result) +} diff --git a/resources/scheduledworkflow/util/parameter_formatter.go b/resources/scheduledworkflow/util/parameter_formatter.go new file mode 100644 index 00000000000..87514549467 --- /dev/null +++ b/resources/scheduledworkflow/util/parameter_formatter.go @@ -0,0 +1,90 @@ +// Copyright 2018 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "fmt" + "regexp" + "strings" + "time" +) + +const ( + scheduledTimeExpression = "[[ScheduledTime]]" + currentTimeExpression = "[[CurrentTime]]" + IndexExpression = "[[Index]]" + scheduledTimePrefix = "[[ScheduledTime." + currentTimePrefix = "[[CurrentTime." + defaultTimeFormat = "20060102150405" + suffix = "]]" +) + +// ParameterFormatter is an object that substitutes specific strings +// in workflow parameters by information about the workflow execution (time at +// which the workflow was started, time at which the workflow was scheduled, etc.) +type ParameterFormatter struct { + scheduledEpoch int64 + nowEpoch int64 + index int64 +} + +// NewParameterFormatter returns a new ParameterFormatter. +func NewParameterFormatter(scheduledEpoch int64, nowEpoch int64, + index int64) *ParameterFormatter { + return &ParameterFormatter{ + scheduledEpoch: scheduledEpoch, + nowEpoch: nowEpoch, + index: index, + } +} + +// Format substitutes special strings in the provided string. +func (p *ParameterFormatter) Format(s string) string { + re := regexp.MustCompile(`\[\[(.*?)\]\]`) + matches := re.FindAllString(s, -1) + if matches == nil { + return s + } + + result := s + + for _, match := range matches { + substitute := p.createSubtitute(match) + result = strings.Replace(result, match, substitute, 1) + } + + return result +} + +func (p *ParameterFormatter) createSubtitute(match string) string { + + if strings.HasPrefix(match, scheduledTimeExpression) { + return time.Unix(p.scheduledEpoch, 0).UTC().Format(defaultTimeFormat) + } else if strings.HasPrefix(match, currentTimeExpression) { + return time.Unix(p.nowEpoch, 0).UTC().Format(defaultTimeFormat) + } else if strings.HasPrefix(match, IndexExpression) { + return fmt.Sprintf("%v", p.index) + } else if strings.HasPrefix(match, scheduledTimePrefix) { + match = strings.Replace(match, scheduledTimePrefix, "", 1) + match = strings.Replace(match, suffix, "", 1) + return time.Unix(p.scheduledEpoch, 0).UTC().Format(match) + } else if strings.HasPrefix(match, currentTimePrefix) { + match = strings.Replace(match, currentTimePrefix, "", 1) + match = strings.Replace(match, suffix, "", 1) + return time.Unix(p.nowEpoch, 0).UTC().Format(match) + } else { + return match + } +} diff --git a/resources/scheduledworkflow/util/parameter_formatter_test.go b/resources/scheduledworkflow/util/parameter_formatter_test.go new file mode 100644 index 00000000000..3e0b5969722 --- /dev/null +++ b/resources/scheduledworkflow/util/parameter_formatter_test.go @@ -0,0 +1,51 @@ +// Copyright 2018 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestParameterFormatter_Format(t *testing.T) { + formatter := NewParameterFormatter( + 25, /* scheduled time */ + 26, /* current time */ + 27 /* index */) + + // Test [[ScheduledTime]] substitution + assert.Equal(t, "FOO 19700101000025 FOO", formatter.Format("FOO [[ScheduledTime]] FOO")) + + // Test [[CurrentTime]] substitution + assert.Equal(t, "FOO 19700101000026 FOO", formatter.Format("FOO [[CurrentTime]] FOO")) + + // Test [[Index]] + assert.Equal(t, "FOO 27 FOO", formatter.Format("FOO [[Index]] FOO")) + + // Test [[ScheduledTime.15-04-05]] substition + assert.Equal(t, "FOO 00-00-25 FOO", formatter.Format("FOO [[ScheduledTime.15-04-05]] FOO")) + + // Test [[CurrentTime.15-04-05]] substitution + assert.Equal(t, "FOO 00-00-26 FOO", formatter.Format("FOO [[CurrentTime.15-04-05]] FOO")) + + // Test multiple substitution + assert.Equal(t, "19700101000025 19700101000025 27", formatter.Format("[[ScheduledTime]] [[ScheduledTime]] [[Index]]")) + + // Test no substitution + assert.Equal(t, "FOO FOO FOO", formatter.Format("FOO FOO FOO")) + + // Test empty string + assert.Equal(t, "", formatter.Format("")) +} diff --git a/resources/scheduledworkflow/util/periodic_schedule.go b/resources/scheduledworkflow/util/periodic_schedule.go new file mode 100644 index 00000000000..13b99f3423e --- /dev/null +++ b/resources/scheduledworkflow/util/periodic_schedule.go @@ -0,0 +1,71 @@ +// Copyright 2018 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + swfapi "github.com/kubeflow/pipelines/pkg/apis/scheduledworkflow/v1alpha1" + log "github.com/sirupsen/logrus" + "math" +) + +// PeriodicSchedule is a type to help manipulate PeriodicSchedule objects. +type PeriodicSchedule struct { + *swfapi.PeriodicSchedule +} + +// NewPeriodicSchedule creates a new PeriodicSchedule. +func NewPeriodicSchedule(periodicSchedule *swfapi.PeriodicSchedule) *PeriodicSchedule { + if periodicSchedule == nil { + log.Fatalf("The periodicSchedule should never be nil") + } + + return &PeriodicSchedule{ + periodicSchedule, + } +} + +// GetNextScheduledEpoch returns the next epoch at which a workflow should be +// scheduled. +func (s *PeriodicSchedule) GetNextScheduledEpoch(lastJobEpoch *int64, + defaultStartEpoch int64) int64 { + effectiveLastJobEpoch := defaultStartEpoch + if lastJobEpoch != nil { + effectiveLastJobEpoch = *lastJobEpoch + } else if s.StartTime != nil { + effectiveLastJobEpoch = s.StartTime.Unix() + } + return s.getNextScheduledEpoch(effectiveLastJobEpoch) +} + +func (s *PeriodicSchedule) getNextScheduledEpoch(lastJobEpoch int64) int64 { + startEpoch := lastJobEpoch + if s.StartTime != nil && s.StartTime.Unix() > startEpoch { + startEpoch = s.StartTime.Unix() + } + + interval := s.IntervalSecond + if interval == 0 { + interval = 1 + } + + result := startEpoch + interval + + if s.EndTime != nil && + s.EndTime.Unix() < result { + return math.MaxInt64 + } + + return result +} diff --git a/resources/scheduledworkflow/util/periodic_schedule_test.go b/resources/scheduledworkflow/util/periodic_schedule_test.go new file mode 100644 index 00000000000..4399616b514 --- /dev/null +++ b/resources/scheduledworkflow/util/periodic_schedule_test.go @@ -0,0 +1,93 @@ +// Copyright 2018 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + swfapi "github.com/kubeflow/pipelines/pkg/apis/scheduledworkflow/v1alpha1" + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/apis/meta/v1" + "math" + "testing" + "time" +) + +func TestPeriodicSchedule_getNextScheduledEpoch_StartDate_EndDate(t *testing.T) { + // First job. + schedule := NewPeriodicSchedule(&swfapi.PeriodicSchedule{ + StartTime: Metav1TimePointer(v1.NewTime(time.Unix(10*hour, 0).UTC())), + EndTime: Metav1TimePointer(v1.NewTime(time.Unix(11*hour, 0).UTC())), + IntervalSecond: minute, + }) + lastJobEpoch := int64(0) + assert.Equal(t, int64(10*hour+minute), + schedule.getNextScheduledEpoch(lastJobEpoch)) + + // Not the first job. + lastJobEpoch = int64(10*hour + 5*minute) + assert.Equal(t, int64(10*hour+6*minute), + schedule.getNextScheduledEpoch(lastJobEpoch)) + + // Last job + lastJobEpoch = int64(13 * hour) + assert.Equal(t, int64(math.MaxInt64), + schedule.getNextScheduledEpoch(lastJobEpoch)) + +} + +func TestPeriodicSchedule_getNextScheduledEpoch_PeriodOnly(t *testing.T) { + schedule := NewPeriodicSchedule(&swfapi.PeriodicSchedule{ + IntervalSecond: minute, + }) + lastJobEpoch := int64(10 * hour) + assert.Equal(t, int64(10*hour+minute), + schedule.getNextScheduledEpoch(lastJobEpoch)) +} + +func TestPeriodicSchedule_getNextScheduledEpoch_NoPeriod(t *testing.T) { + schedule := NewPeriodicSchedule(&swfapi.PeriodicSchedule{ + StartTime: Metav1TimePointer(v1.NewTime(time.Unix(10*hour, 0).UTC())), + EndTime: Metav1TimePointer(v1.NewTime(time.Unix(11*hour, 0).UTC())), + IntervalSecond: 0, + }) + lastJobEpoch := int64(10 * hour) + assert.Equal(t, int64(10*hour+second), + schedule.getNextScheduledEpoch(lastJobEpoch)) +} + +func TestPeriodicSchedule_GetNextScheduledEpoch(t *testing.T) { + // There was a previous job. + schedule := NewPeriodicSchedule(&swfapi.PeriodicSchedule{ + StartTime: Metav1TimePointer(v1.NewTime(time.Unix(10*hour+10*minute, 0).UTC())), + EndTime: Metav1TimePointer(v1.NewTime(time.Unix(11*hour, 0).UTC())), + IntervalSecond: 60, + }) + lastJobEpoch := int64(10*hour + 20*minute) + defaultStartEpoch := int64(10*hour + 15*minute) + assert.Equal(t, int64(10*hour+20*minute+minute), + schedule.GetNextScheduledEpoch(&lastJobEpoch, defaultStartEpoch)) + + // There is no previous job, falling back on the start date of the schedule. + assert.Equal(t, int64(10*hour+10*minute+minute), + schedule.GetNextScheduledEpoch(nil, defaultStartEpoch)) + + // There is no previous job, no schedule start date, falling back on the + // creation date of the workflow. + schedule = NewPeriodicSchedule(&swfapi.PeriodicSchedule{ + EndTime: Metav1TimePointer(v1.NewTime(time.Unix(11*hour, 0).UTC())), + IntervalSecond: 60, + }) + assert.Equal(t, int64(10*hour+15*minute+minute), + schedule.GetNextScheduledEpoch(nil, defaultStartEpoch)) +} diff --git a/resources/scheduledworkflow/util/scheduled_workflow.go b/resources/scheduledworkflow/util/scheduled_workflow.go new file mode 100644 index 00000000000..ebd97069766 --- /dev/null +++ b/resources/scheduledworkflow/util/scheduled_workflow.go @@ -0,0 +1,348 @@ +// Copyright 2018 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "fmt" + workflowapi "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" + swfapi "github.com/kubeflow/pipelines/pkg/apis/scheduledworkflow/v1alpha1" + "hash/fnv" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/apis/core" + "math" + "sort" + "strconv" + "time" +) + +const ( + defaultMaxConcurrency = int64(1) + minMaxConcurrency = int64(1) + maxMaxConcurrency = int64(10) + defaultMaxHistory = int64(10) + minMaxHistory = int64(0) + maxMaxHistory = int64(100) +) + +// ScheduledWorkflow is a type to help manipulate ScheduledWorkflow objects. +type ScheduledWorkflow struct { + *swfapi.ScheduledWorkflow +} + +// NewScheduledWorkflow creates an instance of ScheduledWorkflow. +func NewScheduledWorkflow(swf *swfapi.ScheduledWorkflow) *ScheduledWorkflow { + return &ScheduledWorkflow{ + swf, + } +} + +// Get converts this object to a swfapi.ScheduledWorkflow. +func (s *ScheduledWorkflow) Get() *swfapi.ScheduledWorkflow { + return s.ScheduledWorkflow +} + +func (s *ScheduledWorkflow) creationEpoch() int64 { + return s.CreationTimestamp.Unix() +} + +func (s *ScheduledWorkflow) enabled() bool { + return s.Spec.Enabled +} + +func (s *ScheduledWorkflow) maxConcurrency() int64 { + if s.Spec.MaxConcurrency == nil { + return defaultMaxConcurrency + } + + if *s.Spec.MaxConcurrency < minMaxConcurrency { + return minMaxConcurrency + } + + if *s.Spec.MaxConcurrency > maxMaxConcurrency { + return maxMaxConcurrency + } + + return *s.Spec.MaxConcurrency +} + +func (s *ScheduledWorkflow) maxHistory() int64 { + if s.Spec.MaxHistory == nil { + return defaultMaxHistory + } + + if *s.Spec.MaxHistory < minMaxHistory { + return minMaxHistory + } + + if *s.Spec.MaxHistory > maxMaxHistory { + return maxMaxHistory + } + + return *s.Spec.MaxHistory +} + +func (s *ScheduledWorkflow) hasRunAtLeastOnce() bool { + return s.Status.Trigger.LastTriggeredTime != nil +} + +func (s *ScheduledWorkflow) lastIndex() int64 { + if s.Status.Trigger.LastIndex == nil { + return 0 + } else { + return *s.Status.Trigger.LastIndex + } +} + +func (s *ScheduledWorkflow) nextIndex() int64 { + return s.lastIndex() + 1 +} + +// MinIndex returns the minimum index of the workflow to retrieve as part of the workflow +// history. +func (s *ScheduledWorkflow) MinIndex() int64 { + result := s.lastIndex() - s.maxHistory() + if result < 0 { + return 0 + } + return result +} + +func (s *ScheduledWorkflow) isOneOffRun() bool { + return s.Spec.Trigger.CronSchedule == nil && + s.Spec.Trigger.PeriodicSchedule == nil +} + +func (s *ScheduledWorkflow) nextResourceID() string { + return s.Name + "-" + strconv.FormatInt(s.nextIndex(), 10) +} + +// NextResourceName creates a deterministic resource name for the next resource. +func (s *ScheduledWorkflow) NextResourceName() string { + nextResourceID := s.nextResourceID() + h := fnv.New32a() + _, _ = h.Write([]byte(nextResourceID)) + return fmt.Sprintf("%s-%v", nextResourceID, h.Sum32()) +} + +func (s *ScheduledWorkflow) getWorkflowParametersAsMap() map[string]string { + resultAsArray := s.Spec.Workflow.Parameters + resultAsMap := make(map[string]string) + for _, param := range resultAsArray { + resultAsMap[param.Name] = param.Value + } + return resultAsMap +} + +func (s *ScheduledWorkflow) getFormattedWorkflowParametersAsMap( + formatter *ParameterFormatter) map[string]string { + + result := make(map[string]string) + for key, value := range s.getWorkflowParametersAsMap() { + formatted := formatter.Format(value) + result[key] = formatted + } + return result +} + +// NewWorkflow creates a workflow for this schedule. It also sets +// the appropriate OwnerReferences on the resource so handleObject can discover +// the Schedule resource that 'owns' it. +func (s *ScheduledWorkflow) NewWorkflow( + nextScheduledEpoch int64, nowEpoch int64) *Workflow { + + const ( + workflowKind = "Workflow" + workflowApiVersion = "argoproj.io/v1alpha1" + ) + + // Creating the workflow. + workflow := &workflowapi.Workflow{ + Spec: *s.Spec.Workflow.Spec.DeepCopy(), + } + workflow.Kind = workflowKind + workflow.APIVersion = workflowApiVersion + result := NewWorkflow(workflow) + + // Set the name of the worfklow. + result.OverrideName(s.NextResourceName()) + + // Get the workflow parameters and format them. + formatter := NewParameterFormatter(nextScheduledEpoch, nowEpoch, s.nextIndex()) + formattedParams := s.getFormattedWorkflowParametersAsMap(formatter) + + // Set the parameters. + result.OverrideParameters(formattedParams) + + // Set the labels. + result.SetCanonicalLabels(s.Name, nextScheduledEpoch, s.nextIndex()) + + // The the owner references. + result.SetOwnerReferences(s.ScheduledWorkflow) + + return result +} + +// GetNextScheduledEpoch returns the next epoch at which a workflow should be scheduled, +// and whether it should be run now. +func (s *ScheduledWorkflow) GetNextScheduledEpoch(activeWorkflowCount int64, nowEpoch int64) ( + nextScheduleEpoch int64, shouldRunNow bool) { + + // Get the next scheduled time. + nextScheduledEpoch := s.getNextScheduledEpoch() + + // If the schedule is not enabled, we should not schedule the workflow now. + if s.enabled() == false { + return nextScheduledEpoch, false + } + + // If the maxConcurrency is exceeded, return. + if activeWorkflowCount >= s.maxConcurrency() { + return nextScheduledEpoch, false + } + + // If it is not yet time to schedule the next workflow... + if nextScheduledEpoch > nowEpoch { + return nextScheduledEpoch, false + } + + return nextScheduledEpoch, true +} + +func (s *ScheduledWorkflow) getNextScheduledEpoch() int64 { + // Periodic schedule + if s.Spec.Trigger.PeriodicSchedule != nil { + return NewPeriodicSchedule(s.Spec.Trigger.PeriodicSchedule). + GetNextScheduledEpoch( + toInt64Pointer(s.Status.Trigger.LastTriggeredTime), + s.creationEpoch()) + } + + // Cron schedule + if s.Spec.Trigger.CronSchedule != nil { + return NewCronSchedule(s.Spec.Trigger.CronSchedule). + GetNextScheduledEpoch( + toInt64Pointer(s.Status.Trigger.LastTriggeredTime), + s.creationEpoch()) + } + + return s.getNextScheduledEpochForOneTimeRun() +} + +func (s *ScheduledWorkflow) getNextScheduledEpochForOneTimeRun() int64 { + if s.Status.Trigger.LastTriggeredTime != nil { + return math.MaxInt64 + } + + return s.creationEpoch() +} + +func (s *ScheduledWorkflow) setLabel(key string, value string) { + if s.Labels == nil { + s.Labels = make(map[string]string) + } + s.Labels[key] = value +} + +// UpdateStatus updates the status of a workflow in the Kubernetes API server. +func (s *ScheduledWorkflow) UpdateStatus(updatedEpoch int64, workflow *Workflow, + scheduledEpoch int64, active []swfapi.WorkflowStatus, + completed []swfapi.WorkflowStatus) { + + updatedTime := metav1.NewTime(time.Unix(updatedEpoch, 0).UTC()) + + conditionType, status, message := s.getStatusAndMessage(len(active)) + + condition := swfapi.ScheduledWorkflowCondition{ + Type: conditionType, + Status: status, + LastProbeTime: updatedTime, + LastTransitionTime: updatedTime, + Reason: string(conditionType), + Message: message, + } + + conditions := make([]swfapi.ScheduledWorkflowCondition, 0) + conditions = append(conditions, condition) + + s.Status.Conditions = conditions + + // Sort and set inactive workflows. + sort.Slice(active, func(i, j int) bool { + return active[i].ScheduledAt.Unix() > active[j].ScheduledAt.Unix() + }) + + sort.Slice(completed, func(i, j int) bool { + return completed[i].ScheduledAt.Unix() > completed[j].ScheduledAt.Unix() + }) + + s.Status.WorkflowHistory = &swfapi.WorkflowHistory{ + Active: active, + Completed: completed, + } + + s.setLabel(LabelKeyScheduledWorkflowEnabled, strconv.FormatBool( + s.enabled())) + s.setLabel(LabelKeyScheduledWorkflowStatus, string(conditionType)) + + if workflow != nil { + s.updateLastTriggeredTime(scheduledEpoch) + s.Status.Trigger.LastIndex = Int64Pointer(s.nextIndex()) + s.updateNextTriggeredTime(s.getNextScheduledEpoch()) + } else { + // LastTriggeredTime is unchanged. + s.updateNextTriggeredTime(scheduledEpoch) + // LastIndex is unchanged + } +} + +func (s *ScheduledWorkflow) updateLastTriggeredTime(epoch int64) { + s.Status.Trigger.LastTriggeredTime = Metav1TimePointer( + metav1.NewTime(time.Unix(epoch, 0).UTC())) +} + +func (s *ScheduledWorkflow) updateNextTriggeredTime(epoch int64) { + if epoch != math.MaxInt64 { + s.Status.Trigger.NextTriggeredTime = Metav1TimePointer( + metav1.NewTime(time.Unix(epoch, 0).UTC())) + } else { + s.Status.Trigger.NextTriggeredTime = nil + } +} + +func (s *ScheduledWorkflow) getStatusAndMessage(activeCount int) ( + conditionType swfapi.ScheduledWorkflowConditionType, + status core.ConditionStatus, message string) { + // Schedule messages + const ( + ScheduleEnabledMessage = "The schedule is enabled." + ScheduleDisabledMessage = "The schedule is disabled." + ScheduleRunningMessage = "The one-off schedule is running." + ScheduleSucceededMessage = "The one-off schedule has succeeded." + ) + + if s.isOneOffRun() { + if s.hasRunAtLeastOnce() && activeCount == 0 { + return swfapi.ScheduledWorkflowSucceeded, core.ConditionTrue, ScheduleSucceededMessage + } else { + return swfapi.ScheduledWorkflowRunning, core.ConditionTrue, ScheduleRunningMessage + } + } else { + if s.enabled() { + return swfapi.ScheduledWorkflowEnabled, core.ConditionTrue, ScheduleEnabledMessage + } else { + return swfapi.ScheduledWorkflowDisabled, core.ConditionTrue, ScheduleDisabledMessage + } + } +} diff --git a/resources/scheduledworkflow/util/scheduled_workflow_test.go b/resources/scheduledworkflow/util/scheduled_workflow_test.go new file mode 100644 index 00000000000..6caf18191ef --- /dev/null +++ b/resources/scheduledworkflow/util/scheduled_workflow_test.go @@ -0,0 +1,602 @@ +// Copyright 2018 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + workflowapi "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" + swfapi "github.com/kubeflow/pipelines/pkg/apis/scheduledworkflow/v1alpha1" + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/apis/core" + "math" + "strconv" + "testing" + "time" +) + +func TestScheduledWorkflow_maxConcurrency(t *testing.T) { + // nil + schedule := NewScheduledWorkflow(&swfapi.ScheduledWorkflow{}) + assert.Equal(t, int64(1), schedule.maxConcurrency()) + + // lower than min + schedule = NewScheduledWorkflow(&swfapi.ScheduledWorkflow{ + Spec: swfapi.ScheduledWorkflowSpec{ + MaxConcurrency: Int64Pointer(0), + }, + }) + assert.Equal(t, int64(1), schedule.maxConcurrency()) + + // higher than max + schedule = NewScheduledWorkflow(&swfapi.ScheduledWorkflow{ + Spec: swfapi.ScheduledWorkflowSpec{ + MaxConcurrency: Int64Pointer(2000000), + }, + }) + assert.Equal(t, int64(10), schedule.maxConcurrency()) +} + +func TestScheduledWorkflow_maxHistory(t *testing.T) { + // nil + schedule := NewScheduledWorkflow(&swfapi.ScheduledWorkflow{}) + assert.Equal(t, int64(10), schedule.maxHistory()) + + // lower than min + schedule = NewScheduledWorkflow(&swfapi.ScheduledWorkflow{ + Spec: swfapi.ScheduledWorkflowSpec{ + MaxHistory: Int64Pointer(0), + }, + }) + assert.Equal(t, int64(0), schedule.maxHistory()) + + // higher than max + schedule = NewScheduledWorkflow(&swfapi.ScheduledWorkflow{ + Spec: swfapi.ScheduledWorkflowSpec{ + MaxHistory: Int64Pointer(2000000), + }, + }) + assert.Equal(t, int64(100), schedule.maxHistory()) +} + +func TestScheduledWorkflow_hasRunAtLeastOnce(t *testing.T) { + // Never ran a workflow + schedule := NewScheduledWorkflow(&swfapi.ScheduledWorkflow{ + Status: swfapi.ScheduledWorkflowStatus{ + Trigger: swfapi.TriggerStatus{ + LastTriggeredTime: nil, + }, + }, + }) + assert.Equal(t, false, schedule.hasRunAtLeastOnce()) + + // Ran one workflow + schedule = NewScheduledWorkflow(&swfapi.ScheduledWorkflow{ + Status: swfapi.ScheduledWorkflowStatus{ + Trigger: swfapi.TriggerStatus{ + LastTriggeredTime: Metav1TimePointer(metav1.NewTime(time.Unix(50, 0).UTC())), + }, + }, + }) + assert.Equal(t, true, schedule.hasRunAtLeastOnce()) +} + +func TestScheduledWorkflow_lastIndex(t *testing.T) { + // Never ran a workflow + schedule := NewScheduledWorkflow(&swfapi.ScheduledWorkflow{}) + assert.Equal(t, int64(0), schedule.lastIndex()) + + // Ran one workflow + schedule = NewScheduledWorkflow(&swfapi.ScheduledWorkflow{ + Status: swfapi.ScheduledWorkflowStatus{ + Trigger: swfapi.TriggerStatus{ + LastIndex: Int64Pointer(50), + }, + }, + }) + assert.Equal(t, int64(50), schedule.lastIndex()) +} + +func TestScheduledWorkflow_nextIndex(t *testing.T) { + // Never ran a workflow + schedule := NewScheduledWorkflow(&swfapi.ScheduledWorkflow{}) + assert.Equal(t, int64(1), schedule.nextIndex()) + + // Ran one workflow + schedule = NewScheduledWorkflow(&swfapi.ScheduledWorkflow{ + Status: swfapi.ScheduledWorkflowStatus{ + Trigger: swfapi.TriggerStatus{ + LastIndex: Int64Pointer(50), + }, + }, + }) + assert.Equal(t, int64(51), schedule.nextIndex()) +} + +func TestScheduledWorkflow_MinIndex(t *testing.T) { + schedule := NewScheduledWorkflow(&swfapi.ScheduledWorkflow{ + Spec: swfapi.ScheduledWorkflowSpec{ + MaxHistory: Int64Pointer(100), + }, + Status: swfapi.ScheduledWorkflowStatus{ + Trigger: swfapi.TriggerStatus{ + LastIndex: Int64Pointer(50), + }, + }, + }) + assert.Equal(t, int64(0), schedule.MinIndex()) + + schedule = NewScheduledWorkflow(&swfapi.ScheduledWorkflow{ + Spec: swfapi.ScheduledWorkflowSpec{ + MaxHistory: Int64Pointer(20), + }, + Status: swfapi.ScheduledWorkflowStatus{ + Trigger: swfapi.TriggerStatus{ + LastIndex: Int64Pointer(50), + }, + }, + }) + assert.Equal(t, int64(30), schedule.MinIndex()) +} + +func TestScheduledWorkflow_isOneOffRun(t *testing.T) { + // No schedule + schedule := NewScheduledWorkflow(&swfapi.ScheduledWorkflow{}) + assert.Equal(t, true, schedule.isOneOffRun()) + + // Cron schedule + schedule = NewScheduledWorkflow(&swfapi.ScheduledWorkflow{ + Spec: swfapi.ScheduledWorkflowSpec{ + Trigger: swfapi.Trigger{ + CronSchedule: &swfapi.CronSchedule{}, + }, + }, + }) + assert.Equal(t, false, schedule.isOneOffRun()) + + // Periodic schedule + schedule = NewScheduledWorkflow(&swfapi.ScheduledWorkflow{ + Spec: swfapi.ScheduledWorkflowSpec{ + Trigger: swfapi.Trigger{ + PeriodicSchedule: &swfapi.PeriodicSchedule{}, + }, + }, + }) + assert.Equal(t, false, schedule.isOneOffRun()) +} + +func TestScheduledWorkflow_nextResourceID(t *testing.T) { + // No schedule + schedule := NewScheduledWorkflow(&swfapi.ScheduledWorkflow{ + ObjectMeta: metav1.ObjectMeta{ + Name: "WORKFLOW_NAME", + }, + Status: swfapi.ScheduledWorkflowStatus{ + Trigger: swfapi.TriggerStatus{ + LastIndex: Int64Pointer(50), + }, + }, + }) + assert.Equal(t, "WORKFLOW_NAME-51", schedule.nextResourceID()) +} + +func TestScheduledWorkflow_NextResourceName(t *testing.T) { + // No schedule + schedule := NewScheduledWorkflow(&swfapi.ScheduledWorkflow{ + ObjectMeta: metav1.ObjectMeta{ + Name: "WORKFLOW_NAME", + }, + Status: swfapi.ScheduledWorkflowStatus{ + Trigger: swfapi.TriggerStatus{ + LastIndex: Int64Pointer(50), + }, + }, + }) + assert.Equal(t, "WORKFLOW_NAME-51-2626342551", schedule.NextResourceName()) +} + +func TestScheduledWorkflow_GetNextScheduledEpoch_OneTimeRun(t *testing.T) { + + // Must run now + nowEpoch := int64(10 * hour) + pastEpoch := int64(1 * hour) + creationTimestamp := metav1.NewTime(time.Unix(9*hour, 0).UTC()) + lastTimeRun := metav1.NewTime(time.Unix(11*hour, 0).UTC()) + never := int64(math.MaxInt64) + + schedule := NewScheduledWorkflow(&swfapi.ScheduledWorkflow{ + ObjectMeta: metav1.ObjectMeta{ + CreationTimestamp: creationTimestamp, + }, + Spec: swfapi.ScheduledWorkflowSpec{ + Enabled: true, + }, + }) + nextScheduledEpoch, mustRunNow := schedule.GetNextScheduledEpoch( + int64(0) /* active workflow count */, nowEpoch) + assert.Equal(t, true, mustRunNow) + assert.Equal(t, creationTimestamp.Unix(), nextScheduledEpoch) + + // Has already run + schedule = NewScheduledWorkflow(&swfapi.ScheduledWorkflow{ + ObjectMeta: metav1.ObjectMeta{ + CreationTimestamp: creationTimestamp, + }, + Spec: swfapi.ScheduledWorkflowSpec{ + Enabled: true, + }, + Status: swfapi.ScheduledWorkflowStatus{ + Trigger: swfapi.TriggerStatus{ + LastTriggeredTime: &lastTimeRun, + }, + }, + }) + nextScheduledEpoch, mustRunNow = schedule.GetNextScheduledEpoch( + int64(0) /* active workflow count */, nowEpoch) + assert.Equal(t, false, mustRunNow) + assert.Equal(t, never, nextScheduledEpoch) + + // Should not run yet because it is not time + schedule = NewScheduledWorkflow(&swfapi.ScheduledWorkflow{ + ObjectMeta: metav1.ObjectMeta{ + CreationTimestamp: creationTimestamp, + }, + Spec: swfapi.ScheduledWorkflowSpec{ + Enabled: true, + }, + }) + nextScheduledEpoch, mustRunNow = schedule.GetNextScheduledEpoch( + int64(0) /* active workflow count */, pastEpoch) + assert.Equal(t, false, mustRunNow) + assert.Equal(t, creationTimestamp.Unix(), nextScheduledEpoch) + + // Should not run because the schedule is disabled + schedule = NewScheduledWorkflow(&swfapi.ScheduledWorkflow{ + ObjectMeta: metav1.ObjectMeta{ + CreationTimestamp: creationTimestamp, + }, + Spec: swfapi.ScheduledWorkflowSpec{ + Enabled: false, + }, + }) + nextScheduledEpoch, mustRunNow = schedule.GetNextScheduledEpoch( + int64(0) /* active workflow count */, nowEpoch) + assert.Equal(t, false, mustRunNow) + assert.Equal(t, creationTimestamp.Unix(), nextScheduledEpoch) + + // Should not run because there are active workflows + schedule = NewScheduledWorkflow(&swfapi.ScheduledWorkflow{ + ObjectMeta: metav1.ObjectMeta{ + CreationTimestamp: creationTimestamp, + }, + Spec: swfapi.ScheduledWorkflowSpec{ + Enabled: true, + }, + }) + nextScheduledEpoch, mustRunNow = schedule.GetNextScheduledEpoch( + int64(1) /* active workflow count */, nowEpoch) + assert.Equal(t, false, mustRunNow) + assert.Equal(t, creationTimestamp.Unix(), nextScheduledEpoch) +} + +func TestScheduledWorkflow_GetNextScheduledEpoch_CronSchedule(t *testing.T) { + + // Must run now + nowEpoch := int64(10 * hour) + pastEpoch := int64(3 * hour) + creationTimestamp := metav1.NewTime(time.Unix(9*hour, 0).UTC()) + + schedule := NewScheduledWorkflow(&swfapi.ScheduledWorkflow{ + ObjectMeta: metav1.ObjectMeta{ + CreationTimestamp: creationTimestamp, + }, + Spec: swfapi.ScheduledWorkflowSpec{ + Enabled: true, + MaxConcurrency: Int64Pointer(int64(10)), + Trigger: swfapi.Trigger{ + CronSchedule: &swfapi.CronSchedule{ + Cron: "0 * * * * *", + }, + }, + }, + }) + nextScheduledEpoch, mustRunNow := schedule.GetNextScheduledEpoch( + int64(9) /* active workflow count */, nowEpoch) + assert.Equal(t, true, mustRunNow) + assert.Equal(t, int64(9*hour+minute), nextScheduledEpoch) + + // Must run later + nextScheduledEpoch, mustRunNow = schedule.GetNextScheduledEpoch( + int64(9) /* active workflow count */, pastEpoch) + assert.Equal(t, false, mustRunNow) + assert.Equal(t, int64(9*hour+minute), nextScheduledEpoch) + + // Cannot run because of concurrency + nextScheduledEpoch, mustRunNow = schedule.GetNextScheduledEpoch( + int64(10) /* active workflow count */, nowEpoch) + assert.Equal(t, false, mustRunNow) + assert.Equal(t, int64(9*hour+minute), nextScheduledEpoch) +} + +func TestScheduledWorkflow_GetNextScheduledEpoch_PeriodicSchedule(t *testing.T) { + + // Must run now + nowEpoch := int64(10 * hour) + pastEpoch := int64(3 * hour) + creationTimestamp := metav1.NewTime(time.Unix(9*hour, 0).UTC()) + + schedule := NewScheduledWorkflow(&swfapi.ScheduledWorkflow{ + ObjectMeta: metav1.ObjectMeta{ + CreationTimestamp: creationTimestamp, + }, + Spec: swfapi.ScheduledWorkflowSpec{ + Enabled: true, + MaxConcurrency: Int64Pointer(int64(10)), + Trigger: swfapi.Trigger{ + PeriodicSchedule: &swfapi.PeriodicSchedule{ + IntervalSecond: int64(60), + }, + }, + }, + }) + nextScheduledEpoch, mustRunNow := schedule.GetNextScheduledEpoch( + int64(9) /* active workflow count */, nowEpoch) + assert.Equal(t, true, mustRunNow) + assert.Equal(t, int64(9*hour+minute), nextScheduledEpoch) + + // Must run later + nextScheduledEpoch, mustRunNow = schedule.GetNextScheduledEpoch( + int64(9) /* active workflow count */, pastEpoch) + assert.Equal(t, false, mustRunNow) + assert.Equal(t, int64(9*hour+minute), nextScheduledEpoch) + + // Cannot run because of concurrency + nextScheduledEpoch, mustRunNow = schedule.GetNextScheduledEpoch( + int64(10) /* active workflow count */, nowEpoch) + assert.Equal(t, false, mustRunNow) + assert.Equal(t, int64(9*hour+minute), nextScheduledEpoch) + +} + +func TestScheduledWorkflow_GetNextScheduledEpoch_UpdateStatus_NoWorkflow(t *testing.T) { + // Must run now + scheduledEpoch := int64(10 * hour) + updatedEpoch := int64(11 * hour) + creationTimestamp := metav1.NewTime(time.Unix(9*hour, 0).UTC()) + + schedule := NewScheduledWorkflow(&swfapi.ScheduledWorkflow{ + ObjectMeta: metav1.ObjectMeta{ + CreationTimestamp: creationTimestamp, + }, + Spec: swfapi.ScheduledWorkflowSpec{ + Enabled: true, + MaxConcurrency: Int64Pointer(int64(10)), + Trigger: swfapi.Trigger{ + PeriodicSchedule: &swfapi.PeriodicSchedule{ + IntervalSecond: int64(60), + }, + }, + }, + }) + + status1 := createStatus("WORKFLOW1", 5) + status2 := createStatus("WORKFLOW2", 3) + status3 := createStatus("WORKFLOW3", 7) + status4 := createStatus("WORKFLOW4", 4) + + schedule.UpdateStatus( + updatedEpoch, + nil, /* no workflow created during this run */ + scheduledEpoch, + []swfapi.WorkflowStatus{*status1, *status2, *status3}, + []swfapi.WorkflowStatus{*status1, *status2, *status3, *status4}) + + expected := &swfapi.ScheduledWorkflow{ + ObjectMeta: metav1.ObjectMeta{ + CreationTimestamp: creationTimestamp, + Labels: map[string]string{ + LabelKeyScheduledWorkflowEnabled: "true", + LabelKeyScheduledWorkflowStatus: string(swfapi.ScheduledWorkflowEnabled), + }, + }, + Spec: swfapi.ScheduledWorkflowSpec{ + Enabled: true, + MaxConcurrency: Int64Pointer(int64(10)), + Trigger: swfapi.Trigger{ + PeriodicSchedule: &swfapi.PeriodicSchedule{ + IntervalSecond: int64(60), + }, + }, + }, + Status: swfapi.ScheduledWorkflowStatus{ + Conditions: []swfapi.ScheduledWorkflowCondition{{ + Type: swfapi.ScheduledWorkflowEnabled, + Status: core.ConditionTrue, + LastProbeTime: metav1.NewTime(time.Unix(updatedEpoch, 0).UTC()), + LastTransitionTime: metav1.NewTime(time.Unix(updatedEpoch, 0).UTC()), + Reason: string(swfapi.ScheduledWorkflowEnabled), + Message: "The schedule is enabled.", + }, + }, + WorkflowHistory: &swfapi.WorkflowHistory{ + Active: []swfapi.WorkflowStatus{*status3, *status1, *status2}, + Completed: []swfapi.WorkflowStatus{*status3, *status1, *status4, *status2}, + }, + Trigger: swfapi.TriggerStatus{ + NextTriggeredTime: Metav1TimePointer( + metav1.NewTime(time.Unix(scheduledEpoch, 0).UTC())), + }, + }, + } + + assert.Equal(t, expected, schedule.Get()) +} + +func createStatus(workflowName string, scheduledEpoch int64) *swfapi.WorkflowStatus { + return &swfapi.WorkflowStatus{ + Name: workflowName, + ScheduledAt: metav1.NewTime(time.Unix(scheduledEpoch, 0).UTC()), + } +} + +func TestScheduledWorkflow_GetNextScheduledEpoch_UpdateStatus_WithWorkflow(t *testing.T) { + // Must run now + scheduledEpoch := int64(10 * hour) + updatedEpoch := int64(11 * hour) + creationTimestamp := metav1.NewTime(time.Unix(9*hour, 0).UTC()) + + schedule := NewScheduledWorkflow(&swfapi.ScheduledWorkflow{ + ObjectMeta: metav1.ObjectMeta{ + CreationTimestamp: creationTimestamp, + }, + Spec: swfapi.ScheduledWorkflowSpec{ + Enabled: true, + MaxConcurrency: Int64Pointer(int64(10)), + Trigger: swfapi.Trigger{ + PeriodicSchedule: &swfapi.PeriodicSchedule{ + IntervalSecond: int64(60), + }, + }, + }, + }) + + status1 := createStatus("WORKFLOW1", 5) + status2 := createStatus("WORKFLOW2", 3) + status3 := createStatus("WORKFLOW3", 7) + status4 := createStatus("WORKFLOW4", 4) + + workflow := NewWorkflow(&workflowapi.Workflow{}) + + schedule.UpdateStatus( + updatedEpoch, + workflow, /* no workflow created during this run */ + scheduledEpoch, + []swfapi.WorkflowStatus{*status1, *status2, *status3}, + []swfapi.WorkflowStatus{*status1, *status2, *status3, *status4}) + + expected := &swfapi.ScheduledWorkflow{ + ObjectMeta: metav1.ObjectMeta{ + CreationTimestamp: creationTimestamp, + Labels: map[string]string{ + LabelKeyScheduledWorkflowEnabled: "true", + LabelKeyScheduledWorkflowStatus: string(swfapi.ScheduledWorkflowEnabled), + }, + }, + Spec: swfapi.ScheduledWorkflowSpec{ + Enabled: true, + MaxConcurrency: Int64Pointer(int64(10)), + Trigger: swfapi.Trigger{ + PeriodicSchedule: &swfapi.PeriodicSchedule{ + IntervalSecond: int64(60), + }, + }, + }, + Status: swfapi.ScheduledWorkflowStatus{ + Conditions: []swfapi.ScheduledWorkflowCondition{{ + Type: swfapi.ScheduledWorkflowEnabled, + Status: core.ConditionTrue, + LastProbeTime: metav1.NewTime(time.Unix(updatedEpoch, 0).UTC()), + LastTransitionTime: metav1.NewTime(time.Unix(updatedEpoch, 0).UTC()), + Reason: string(swfapi.ScheduledWorkflowEnabled), + Message: "The schedule is enabled.", + }}, + WorkflowHistory: &swfapi.WorkflowHistory{ + Active: []swfapi.WorkflowStatus{*status3, *status1, *status2}, + Completed: []swfapi.WorkflowStatus{*status3, *status1, *status4, *status2}, + }, + Trigger: swfapi.TriggerStatus{ + LastTriggeredTime: Metav1TimePointer( + metav1.NewTime(time.Unix(scheduledEpoch, 0).UTC())), + NextTriggeredTime: Metav1TimePointer( + metav1.NewTime(time.Unix(scheduledEpoch+minute, 0).UTC())), + LastIndex: Int64Pointer(int64(1)), + }, + }, + } + + assert.Equal(t, expected, schedule.Get()) +} + +func TestScheduledWorkflow_NewWorkflow(t *testing.T) { + // Must run now + scheduledEpoch := int64(10 * hour) + nowEpoch := int64(11 * hour) + creationTimestamp := metav1.NewTime(time.Unix(9*hour, 0).UTC()) + + schedule := NewScheduledWorkflow(&swfapi.ScheduledWorkflow{ + ObjectMeta: metav1.ObjectMeta{ + Name: "SCHEDULE1", + CreationTimestamp: creationTimestamp, + }, + Spec: swfapi.ScheduledWorkflowSpec{ + Enabled: true, + MaxConcurrency: Int64Pointer(int64(10)), + Trigger: swfapi.Trigger{ + PeriodicSchedule: &swfapi.PeriodicSchedule{ + IntervalSecond: int64(60), + }, + }, + Workflow: &swfapi.WorkflowResource{ + Parameters: []swfapi.Parameter{ + {Name: "PARAM1", Value: "NEW_VALUE1"}, + {Name: "PARAM3", Value: "NEW_VALUE3"}, + }, + Spec: workflowapi.WorkflowSpec{ + ServiceAccountName: "SERVICE_ACCOUNT", + Arguments: workflowapi.Arguments{ + Parameters: []workflowapi.Parameter{ + {Name: "PARAM1", Value: StringPointer("VALUE1")}, + {Name: "PARAM2", Value: StringPointer("VALUE2")}, + }, + }, + }, + }, + }, + }) + + result := schedule.NewWorkflow(scheduledEpoch, nowEpoch) + + expected := &workflowapi.Workflow{ + TypeMeta: metav1.TypeMeta{ + Kind: "Workflow", + APIVersion: "argoproj.io/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "SCHEDULE1-1-3321103997", + Labels: map[string]string{ + "scheduledworkflows.kubeflow.org/isOwnedByScheduledWorkflow": "true", + "scheduledworkflows.kubeflow.org/scheduledWorkflowName": "SCHEDULE1", + "scheduledworkflows.kubeflow.org/workflowEpoch": strconv.Itoa(int(scheduledEpoch)), + "scheduledworkflows.kubeflow.org/workflowIndex": "1"}, + OwnerReferences: []metav1.OwnerReference{{ + APIVersion: "kubeflow.org/v1alpha1", + Kind: "ScheduledWorkflow", + Name: "SCHEDULE1", + UID: "", + Controller: BooleanPointer(true), + BlockOwnerDeletion: BooleanPointer(true)}}, + }, + Spec: workflowapi.WorkflowSpec{ + ServiceAccountName: "SERVICE_ACCOUNT", + Arguments: workflowapi.Arguments{ + Parameters: []workflowapi.Parameter{ + {Name: "PARAM1", Value: StringPointer("NEW_VALUE1")}, + {Name: "PARAM2", Value: StringPointer("VALUE2")}, + }, + }, + }, + } + + assert.Equal(t, expected, result.Get()) +} diff --git a/resources/scheduledworkflow/util/time_interface.go b/resources/scheduledworkflow/util/time_interface.go new file mode 100644 index 00000000000..baa301b49b7 --- /dev/null +++ b/resources/scheduledworkflow/util/time_interface.go @@ -0,0 +1,75 @@ +// Copyright 2018 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "math" + "time" +) + +// TimeInterface is an interface for objects generating the current time. +type TimeInterface interface { + Now() time.Time +} + +// RealTime is an implementation of TimeInterface that generates the current time. +type RealTime struct { +} + +// NewRealTime creates an instance of RealTime. +func NewRealTime() TimeInterface { + return &RealTime{} +} + +// Now returns the current time. +func (r *RealTime) Now() time.Time { + return time.Now().UTC() +} + +// FakeTime is a fake implementation of TimeInterface for testing. +type FakeTime struct { + now time.Time +} + +// NewFakeTime creates an instance of FakeTime that will return a fixed time. +func NewFakeTime(now time.Time) TimeInterface { + return &FakeTime{ + now: now.UTC(), + } +} + +// NewFakeTimeForEpoch creates an instance of FakeTime that will return a fixed epoch. +func NewFakeTimeForEpoch() TimeInterface { + return &FakeTime{ + now: time.Unix(0, 0).UTC(), + } +} + +// Now returns the current (fake) time. +func (f *FakeTime) Now() time.Time { + f.now = time.Unix(f.now.Unix()+1, 0).UTC() + return f.now +} + +// FormatTimeForLogging formats an epoch for logging purposes. +func FormatTimeForLogging(epoch int64) string { + if epoch <= 0 { + return "INVALID TIME" + } else if epoch == math.MaxInt64 { + return "NEVER" + } else { + return time.Unix(epoch, 0).UTC().String() + } +} diff --git a/resources/scheduledworkflow/util/workflow.go b/resources/scheduledworkflow/util/workflow.go new file mode 100644 index 00000000000..c2bbb8f6efd --- /dev/null +++ b/resources/scheduledworkflow/util/workflow.go @@ -0,0 +1,90 @@ +// Copyright 2018 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + workflowapi "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" + swfregister "github.com/kubeflow/pipelines/pkg/apis/scheduledworkflow" + swfapi "github.com/kubeflow/pipelines/pkg/apis/scheduledworkflow/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +// Workflow is a type to help manipulate Workflow objects. +type Workflow struct { + *workflowapi.Workflow +} + +// NewWorkflow creates an Workflow. +func NewWorkflow(workflow *workflowapi.Workflow) *Workflow { + return &Workflow{ + workflow, + } +} + +// Get converts this object to a workflowapi.Workflow. +func (w *Workflow) Get() *workflowapi.Workflow { + return w.Workflow +} + +// OverrideName sets the name of a Workflow. +func (w *Workflow) OverrideName(name string) { + w.GenerateName = "" + w.Name = name +} + +// OverrideParameters overrides some of the parameters of a Workflow. +func (w *Workflow) OverrideParameters(desiredMap map[string]string) { + desiredSlice := make([]workflowapi.Parameter, 0) + for _, currentParam := range w.Spec.Arguments.Parameters { + + var desiredValue *string = nil + if param, ok := desiredMap[currentParam.Name]; ok { + desiredValue = ¶m + } else { + desiredValue = currentParam.Value + } + desiredSlice = append(desiredSlice, workflowapi.Parameter{ + Name: currentParam.Name, + Value: desiredValue, + }) + } + + w.Spec.Arguments.Parameters = desiredSlice +} + +// SetCanonicalLabels sets the labels needed by the ScheduledWorkflow on the Workflow. +func (w *Workflow) SetCanonicalLabels(scheduleName string, + nextScheduledEpoch int64, index int64) { + if w.Labels == nil { + w.Labels = make(map[string]string) + } + w.Labels[LabelKeyWorkflowScheduledWorkflowName] = scheduleName + w.Labels[LabelKeyWorkflowEpoch] = formatInt64ForLabel( + nextScheduledEpoch) + w.Labels[LabelKeyWorkflowIndex] = formatInt64ForLabel(index) + w.Labels[LabelKeyWorkflowIsOwnedByScheduledWorkflow] = "true" +} + +// SetOwnerReferences sets owner references on a Workflow. +func (w *Workflow) SetOwnerReferences(schedule *swfapi.ScheduledWorkflow) { + w.OwnerReferences = []metav1.OwnerReference{ + *metav1.NewControllerRef(schedule, schema.GroupVersionKind{ + Group: swfapi.SchemeGroupVersion.Group, + Version: swfapi.SchemeGroupVersion.Version, + Kind: swfregister.Kind, + }), + } +} diff --git a/resources/scheduledworkflow/util/workflow_test.go b/resources/scheduledworkflow/util/workflow_test.go new file mode 100644 index 00000000000..0b5d64b728a --- /dev/null +++ b/resources/scheduledworkflow/util/workflow_test.go @@ -0,0 +1,140 @@ +// Copyright 2018 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + workflowapi "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" + swfapi "github.com/kubeflow/pipelines/pkg/apis/scheduledworkflow/v1alpha1" + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "testing" +) + +func TestWorkflow_OverrideName(t *testing.T) { + workflow := NewWorkflow(&workflowapi.Workflow{ + ObjectMeta: metav1.ObjectMeta{ + Name: "WORKFLOW_NAME", + }, + }) + + workflow.OverrideName("NEW_WORKFLOW_NAME") + + expected := &workflowapi.Workflow{ + ObjectMeta: metav1.ObjectMeta{ + Name: "NEW_WORKFLOW_NAME", + }, + } + + assert.Equal(t, expected, workflow.Get()) +} + +func TestWorkflow_OverrideParameters(t *testing.T) { + workflow := NewWorkflow(&workflowapi.Workflow{ + ObjectMeta: metav1.ObjectMeta{ + Name: "WORKFLOW_NAME", + }, + Spec: workflowapi.WorkflowSpec{ + Arguments: workflowapi.Arguments{ + Parameters: []workflowapi.Parameter{ + {Name: "PARAM1", Value: StringPointer("VALUE1")}, + {Name: "PARAM2", Value: StringPointer("VALUE2")}, + {Name: "PARAM3", Value: StringPointer("VALUE3")}, + {Name: "PARAM4", Value: StringPointer("")}, + {Name: "PARAM5", Value: StringPointer("VALUE5")}, + }, + }, + }, + }) + + workflow.OverrideParameters(map[string]string{ + "PARAM1": "NEW_VALUE1", + "PARAM3": "NEW_VALUE3", + "PARAM4": "NEW_VALUE4", + "PARAM5": "", + "PARAM9": "NEW_VALUE9", + }) + + expected := &workflowapi.Workflow{ + ObjectMeta: metav1.ObjectMeta{ + Name: "WORKFLOW_NAME", + }, + Spec: workflowapi.WorkflowSpec{ + Arguments: workflowapi.Arguments{ + Parameters: []workflowapi.Parameter{ + {Name: "PARAM1", Value: StringPointer("NEW_VALUE1")}, + {Name: "PARAM2", Value: StringPointer("VALUE2")}, + {Name: "PARAM3", Value: StringPointer("NEW_VALUE3")}, + {Name: "PARAM4", Value: StringPointer("NEW_VALUE4")}, + {Name: "PARAM5", Value: StringPointer("")}, + }, + }, + }, + } + assert.Equal(t, expected, workflow.Get()) +} + +func TestWorkflow_SetCanonicalLabels(t *testing.T) { + workflow := NewWorkflow(&workflowapi.Workflow{ + ObjectMeta: metav1.ObjectMeta{ + Name: "WORKFLOW_NAME", + }, + }) + + const index = 50 + const nextScheduledEpoch = 100 + workflow.SetCanonicalLabels("SCHEDULED_WORKFLOW_NAME", nextScheduledEpoch, index) + + expected := &workflowapi.Workflow{ + ObjectMeta: metav1.ObjectMeta{ + Name: "WORKFLOW_NAME", + Labels: map[string]string{ + "scheduledworkflows.kubeflow.org/isOwnedByScheduledWorkflow": "true", + "scheduledworkflows.kubeflow.org/scheduledWorkflowName": "SCHEDULED_WORKFLOW_NAME", + "scheduledworkflows.kubeflow.org/workflowEpoch": "100", + "scheduledworkflows.kubeflow.org/workflowIndex": "50"}, + }, + } + + assert.Equal(t, expected, workflow.Get()) +} + +func TestWorkflow_SetOwnerReferences(t *testing.T) { + workflow := NewWorkflow(&workflowapi.Workflow{ + ObjectMeta: metav1.ObjectMeta{ + Name: "WORKFLOW_NAME", + }, + }) + + workflow.SetOwnerReferences(&swfapi.ScheduledWorkflow{ + ObjectMeta: metav1.ObjectMeta{ + Name: "SCHEDULE_NAME", + }, + }) + + expected := &workflowapi.Workflow{ + ObjectMeta: metav1.ObjectMeta{ + Name: "WORKFLOW_NAME", + OwnerReferences: []metav1.OwnerReference{{ + APIVersion: "kubeflow.org/v1alpha1", + Kind: "ScheduledWorkflow", + Name: "SCHEDULE_NAME", + Controller: BooleanPointer(true), + BlockOwnerDeletion: BooleanPointer(true), + }}, + }, + } + + assert.Equal(t, expected, workflow.Get()) +}