From 7b2bac59e07e9608c715b515d782eaa1057eb244 Mon Sep 17 00:00:00 2001 From: James Wu <54086668+james-jwu@users.noreply.github.com> Date: Thu, 28 Apr 2022 17:45:56 -0700 Subject: [PATCH] feat(conformance): Enable experiment API tests in KF mode. (#7596) * Conformance: Enable experiment API tests in KF mode. Added 3 flags: - isDebugMode is enables HTTP request/response logging - IsKubeflowMode enables the tests in full Kubeflow mode - resourceNamespace: the namespace/profile under which the test resources are created Added a new HTTP client that uses SA token volume projection auth. The test pods will be set up to project SA token. Plumbed everything through for experiment API tests. The other tests will be enabled in subsequent PRs. * Updated change to address comments. --- .../client/api_server/experiment_client.go | 37 +++-- .../common/client/api_server/job_client.go | 34 ++-- .../client/api_server/pipeline_client.go | 42 +++-- .../api_server/pipeline_upload_client.go | 23 ++- .../common/client/api_server/run_client.go | 36 +++-- backend/src/common/client/api_server/util.go | 34 +++- .../client/api_server/visualization_client.go | 6 +- .../initialization/initialization_test.go | 2 +- .../test/integration/experiment_api_test.go | 145 ++++++++++++++---- backend/test/integration/flags.go | 5 + backend/test/integration/job_api_test.go | 6 +- backend/test/integration/run_api_test.go | 4 +- backend/test/integration/upgrade_test.go | 12 +- backend/test/test_utils.go | 85 +++++++++- 14 files changed, 369 insertions(+), 102 deletions(-) diff --git a/backend/src/common/client/api_server/experiment_client.go b/backend/src/common/client/api_server/experiment_client.go index 7080922b0b9a..6f3f71ba353f 100644 --- a/backend/src/common/client/api_server/experiment_client.go +++ b/backend/src/common/client/api_server/experiment_client.go @@ -3,6 +3,7 @@ package api_server import ( "fmt" + "github.com/go-openapi/runtime" "github.com/go-openapi/strfmt" apiclient "github.com/kubeflow/pipelines/backend/api/go_http_client/experiment_client" params "github.com/kubeflow/pipelines/backend/api/go_http_client/experiment_client/experiment_service" @@ -23,7 +24,8 @@ type ExperimentInterface interface { } type ExperimentClient struct { - apiClient *apiclient.Experiment + apiClient *apiclient.Experiment + authInfoWriter runtime.ClientAuthInfoWriter } func NewExperimentClient(clientConfig clientcmd.ClientConfig, debug bool) ( @@ -31,14 +33,29 @@ func NewExperimentClient(clientConfig clientcmd.ClientConfig, debug bool) ( runtime, err := NewHTTPRuntime(clientConfig, debug) if err != nil { - return nil, err + return nil, fmt.Errorf("Error occurred when creating experiment client: %w", err) } apiClient := apiclient.New(runtime, strfmt.Default) - // Creating upload client + // Creating experiment client return &ExperimentClient{ - apiClient: apiClient, + apiClient: apiClient, + authInfoWriter: PassThroughAuth, + }, nil +} + +func NewKubeflowInClusterExperimentClient(namespace string, debug bool) ( + *ExperimentClient, error) { + + runtime := NewKubeflowInClusterHTTPRuntime(namespace, debug) + + apiClient := apiclient.New(runtime, strfmt.Default) + + // Creating experiment client + return &ExperimentClient{ + apiClient: apiClient, + authInfoWriter: SATokenVolumeProjectionAuth, }, nil } @@ -50,7 +67,7 @@ func (c *ExperimentClient) Create(parameters *params.CreateExperimentParams) (*m // Make service call parameters.Context = ctx - response, err := c.apiClient.ExperimentService.CreateExperiment(parameters, PassThroughAuth) + response, err := c.apiClient.ExperimentService.CreateExperiment(parameters, c.authInfoWriter) if err != nil { if defaultError, ok := err.(*params.CreateExperimentDefault); ok { err = CreateErrorFromAPIStatus(defaultError.Payload.Error, defaultError.Payload.Code) @@ -74,7 +91,7 @@ func (c *ExperimentClient) Get(parameters *params.GetExperimentParams) (*model.A // Make service call parameters.Context = ctx - response, err := c.apiClient.ExperimentService.GetExperiment(parameters, PassThroughAuth) + response, err := c.apiClient.ExperimentService.GetExperiment(parameters, c.authInfoWriter) if err != nil { if defaultError, ok := err.(*params.GetExperimentDefault); ok { err = CreateErrorFromAPIStatus(defaultError.Payload.Error, defaultError.Payload.Code) @@ -98,7 +115,7 @@ func (c *ExperimentClient) List(parameters *params.ListExperimentParams) ( // Make service call parameters.Context = ctx - response, err := c.apiClient.ExperimentService.ListExperiment(parameters, PassThroughAuth) + response, err := c.apiClient.ExperimentService.ListExperiment(parameters, c.authInfoWriter) if err != nil { if defaultError, ok := err.(*params.ListExperimentDefault); ok { err = CreateErrorFromAPIStatus(defaultError.Payload.Error, defaultError.Payload.Code) @@ -121,7 +138,7 @@ func (c *ExperimentClient) Delete(parameters *params.DeleteExperimentParams) err // Make service call parameters.Context = ctx - _, err := c.apiClient.ExperimentService.DeleteExperiment(parameters, PassThroughAuth) + _, err := c.apiClient.ExperimentService.DeleteExperiment(parameters, c.authInfoWriter) if err != nil { if defaultError, ok := err.(*params.DeleteExperimentDefault); ok { err = CreateErrorFromAPIStatus(defaultError.Payload.Error, defaultError.Payload.Code) @@ -174,7 +191,7 @@ func (c *ExperimentClient) Archive(parameters *params.ArchiveExperimentParams) e // Make service call parameters.Context = ctx - _, err := c.apiClient.ExperimentService.ArchiveExperiment(parameters, PassThroughAuth) + _, err := c.apiClient.ExperimentService.ArchiveExperiment(parameters, c.authInfoWriter) if err != nil { if defaultError, ok := err.(*params.ArchiveExperimentDefault); ok { @@ -198,7 +215,7 @@ func (c *ExperimentClient) Unarchive(parameters *params.UnarchiveExperimentParam // Make service call parameters.Context = ctx - _, err := c.apiClient.ExperimentService.UnarchiveExperiment(parameters, PassThroughAuth) + _, err := c.apiClient.ExperimentService.UnarchiveExperiment(parameters, c.authInfoWriter) if err != nil { if defaultError, ok := err.(*params.UnarchiveExperimentDefault); ok { diff --git a/backend/src/common/client/api_server/job_client.go b/backend/src/common/client/api_server/job_client.go index af7ef52d32dc..6c08bc32abb1 100644 --- a/backend/src/common/client/api_server/job_client.go +++ b/backend/src/common/client/api_server/job_client.go @@ -3,6 +3,7 @@ package api_server import ( "fmt" + "github.com/go-openapi/runtime" "github.com/go-openapi/strfmt" apiclient "github.com/kubeflow/pipelines/backend/api/go_http_client/job_client" params "github.com/kubeflow/pipelines/backend/api/go_http_client/job_client/job_service" @@ -24,7 +25,8 @@ type JobInterface interface { } type JobClient struct { - apiClient *apiclient.Job + apiClient *apiclient.Job + authInfoWriter runtime.ClientAuthInfoWriter } func NewJobClient(clientConfig clientcmd.ClientConfig, debug bool) ( @@ -32,17 +34,31 @@ func NewJobClient(clientConfig clientcmd.ClientConfig, debug bool) ( runtime, err := NewHTTPRuntime(clientConfig, debug) if err != nil { - return nil, err + return nil, fmt.Errorf("Error occurred when creating job client: %w", err) } apiClient := apiclient.New(runtime, strfmt.Default) - // Creating upload client + // Creating job client return &JobClient{ apiClient: apiClient, }, nil } +func NewKubeflowInClusterJobClient(namespace string, debug bool) ( + *JobClient, error) { + + runtime := NewKubeflowInClusterHTTPRuntime(namespace, debug) + + apiClient := apiclient.New(runtime, strfmt.Default) + + // Creating job client + return &JobClient{ + apiClient: apiClient, + authInfoWriter: SATokenVolumeProjectionAuth, + }, nil +} + func (c *JobClient) Create(parameters *params.CreateJobParams) (*model.APIJob, error) { // Create context with timeout @@ -51,7 +67,7 @@ func (c *JobClient) Create(parameters *params.CreateJobParams) (*model.APIJob, // Make service call parameters.Context = ctx - response, err := c.apiClient.JobService.CreateJob(parameters, PassThroughAuth) + response, err := c.apiClient.JobService.CreateJob(parameters, c.authInfoWriter) if err != nil { if defaultError, ok := err.(*params.CreateJobDefault); ok { err = CreateErrorFromAPIStatus(defaultError.Payload.Error, defaultError.Payload.Code) @@ -75,7 +91,7 @@ func (c *JobClient) Get(parameters *params.GetJobParams) (*model.APIJob, // Make service call parameters.Context = ctx - response, err := c.apiClient.JobService.GetJob(parameters, PassThroughAuth) + response, err := c.apiClient.JobService.GetJob(parameters, c.authInfoWriter) if err != nil { if defaultError, ok := err.(*params.GetJobDefault); ok { err = CreateErrorFromAPIStatus(defaultError.Payload.Error, defaultError.Payload.Code) @@ -98,7 +114,7 @@ func (c *JobClient) Delete(parameters *params.DeleteJobParams) error { // Make service call parameters.Context = ctx - _, err := c.apiClient.JobService.DeleteJob(parameters, PassThroughAuth) + _, err := c.apiClient.JobService.DeleteJob(parameters, c.authInfoWriter) if err != nil { if defaultError, ok := err.(*params.DeleteJobDefault); ok { err = CreateErrorFromAPIStatus(defaultError.Payload.Error, defaultError.Payload.Code) @@ -121,7 +137,7 @@ func (c *JobClient) Enable(parameters *params.EnableJobParams) error { // Make service call parameters.Context = ctx - _, err := c.apiClient.JobService.EnableJob(parameters, PassThroughAuth) + _, err := c.apiClient.JobService.EnableJob(parameters, c.authInfoWriter) if err != nil { if defaultError, ok := err.(*params.EnableJobDefault); ok { err = CreateErrorFromAPIStatus(defaultError.Payload.Error, defaultError.Payload.Code) @@ -144,7 +160,7 @@ func (c *JobClient) Disable(parameters *params.DisableJobParams) error { // Make service call parameters.Context = ctx - _, err := c.apiClient.JobService.DisableJob(parameters, PassThroughAuth) + _, err := c.apiClient.JobService.DisableJob(parameters, c.authInfoWriter) if err != nil { if defaultError, ok := err.(*params.DisableJobDefault); ok { err = CreateErrorFromAPIStatus(defaultError.Payload.Error, defaultError.Payload.Code) @@ -168,7 +184,7 @@ func (c *JobClient) List(parameters *params.ListJobsParams) ( // Make service call parameters.Context = ctx - response, err := c.apiClient.JobService.ListJobs(parameters, PassThroughAuth) + response, err := c.apiClient.JobService.ListJobs(parameters, c.authInfoWriter) if err != nil { if defaultError, ok := err.(*params.ListJobsDefault); ok { err = CreateErrorFromAPIStatus(defaultError.Payload.Error, defaultError.Payload.Code) diff --git a/backend/src/common/client/api_server/pipeline_client.go b/backend/src/common/client/api_server/pipeline_client.go index 56e87301d0e2..af0699005d28 100644 --- a/backend/src/common/client/api_server/pipeline_client.go +++ b/backend/src/common/client/api_server/pipeline_client.go @@ -3,6 +3,7 @@ package api_server import ( "fmt" + "github.com/go-openapi/runtime" "github.com/go-openapi/strfmt" apiclient "github.com/kubeflow/pipelines/backend/api/go_http_client/pipeline_client" params "github.com/kubeflow/pipelines/backend/api/go_http_client/pipeline_client/pipeline_service" @@ -26,7 +27,8 @@ type PipelineInterface interface { } type PipelineClient struct { - apiClient *apiclient.Pipeline + apiClient *apiclient.Pipeline + authInfoWriter runtime.ClientAuthInfoWriter } func (c *PipelineClient) UpdateDefaultVersion(parameters *params.UpdatePipelineDefaultVersionParams) error { @@ -35,7 +37,7 @@ func (c *PipelineClient) UpdateDefaultVersion(parameters *params.UpdatePipelineD defer cancel() // Make service call parameters.Context = ctx - _, err := c.apiClient.PipelineService.UpdatePipelineDefaultVersion(parameters, PassThroughAuth) + _, err := c.apiClient.PipelineService.UpdatePipelineDefaultVersion(parameters, c.authInfoWriter) if err != nil { if defaultError, ok := err.(*params.GetPipelineDefault); ok { err = CreateErrorFromAPIStatus(defaultError.Payload.Error, defaultError.Payload.Code) @@ -56,17 +58,31 @@ func NewPipelineClient(clientConfig clientcmd.ClientConfig, debug bool) ( runtime, err := NewHTTPRuntime(clientConfig, debug) if err != nil { - return nil, err + return nil, fmt.Errorf("Error occurred when creating pipeline client: %w", err) } apiClient := apiclient.New(runtime, strfmt.Default) - // Creating upload client + // Creating pipeline client return &PipelineClient{ apiClient: apiClient, }, nil } +func NewKubeflowInClusterPipelineClient(namespace string, debug bool) ( + *PipelineClient, error) { + + runtime := NewKubeflowInClusterHTTPRuntime(namespace, debug) + + apiClient := apiclient.New(runtime, strfmt.Default) + + // Creating pipeline client + return &PipelineClient{ + apiClient: apiClient, + authInfoWriter: SATokenVolumeProjectionAuth, + }, nil +} + func (c *PipelineClient) Create(parameters *params.CreatePipelineParams) (*model.APIPipeline, error) { // Create context with timeout @@ -74,7 +90,7 @@ func (c *PipelineClient) Create(parameters *params.CreatePipelineParams) (*model defer cancel() parameters.Context = ctx - response, err := c.apiClient.PipelineService.CreatePipeline(parameters, PassThroughAuth) + response, err := c.apiClient.PipelineService.CreatePipeline(parameters, c.authInfoWriter) if err != nil { if defaultError, ok := err.(*params.CreatePipelineDefault); ok { err = CreateErrorFromAPIStatus(defaultError.Payload.Error, defaultError.Payload.Code) @@ -98,7 +114,7 @@ func (c *PipelineClient) Get(parameters *params.GetPipelineParams) (*model.APIPi // Make service call parameters.Context = ctx - response, err := c.apiClient.PipelineService.GetPipeline(parameters, PassThroughAuth) + response, err := c.apiClient.PipelineService.GetPipeline(parameters, c.authInfoWriter) if err != nil { if defaultError, ok := err.(*params.GetPipelineDefault); ok { err = CreateErrorFromAPIStatus(defaultError.Payload.Error, defaultError.Payload.Code) @@ -121,7 +137,7 @@ func (c *PipelineClient) Delete(parameters *params.DeletePipelineParams) error { // Make service call parameters.Context = ctx - _, err := c.apiClient.PipelineService.DeletePipeline(parameters, PassThroughAuth) + _, err := c.apiClient.PipelineService.DeletePipeline(parameters, c.authInfoWriter) if err != nil { if defaultError, ok := err.(*params.DeletePipelineDefault); ok { err = CreateErrorFromAPIStatus(defaultError.Payload.Error, defaultError.Payload.Code) @@ -144,7 +160,7 @@ func (c *PipelineClient) GetTemplate(parameters *params.GetTemplateParams) (temp // Make service call parameters.Context = ctx - response, err := c.apiClient.PipelineService.GetTemplate(parameters, PassThroughAuth) + response, err := c.apiClient.PipelineService.GetTemplate(parameters, c.authInfoWriter) if err != nil { if defaultError, ok := err.(*params.GetTemplateDefault); ok { err = CreateErrorFromAPIStatus(defaultError.Payload.Error, defaultError.Payload.Code) @@ -169,7 +185,7 @@ func (c *PipelineClient) List(parameters *params.ListPipelinesParams) ( // Make service call parameters.Context = ctx - response, err := c.apiClient.PipelineService.ListPipelines(parameters, PassThroughAuth) + response, err := c.apiClient.PipelineService.ListPipelines(parameters, c.authInfoWriter) if err != nil { if defaultError, ok := err.(*params.ListPipelinesDefault); ok { err = CreateErrorFromAPIStatus(defaultError.Payload.Error, defaultError.Payload.Code) @@ -222,7 +238,7 @@ func (c *PipelineClient) CreatePipelineVersion(parameters *params.CreatePipeline defer cancel() parameters.Context = ctx - response, err := c.apiClient.PipelineService.CreatePipelineVersion(parameters, PassThroughAuth) + response, err := c.apiClient.PipelineService.CreatePipelineVersion(parameters, c.authInfoWriter) if err != nil { if defaultError, ok := err.(*params.CreatePipelineVersionDefault); ok { err = CreateErrorFromAPIStatus(defaultError.Payload.Error, defaultError.Payload.Code) @@ -246,7 +262,7 @@ func (c *PipelineClient) ListPipelineVersions(parameters *params.ListPipelineVer // Make service call parameters.Context = ctx - response, err := c.apiClient.PipelineService.ListPipelineVersions(parameters, PassThroughAuth) + response, err := c.apiClient.PipelineService.ListPipelineVersions(parameters, c.authInfoWriter) if err != nil { if defaultError, ok := err.(*params.ListPipelineVersionsDefault); ok { err = CreateErrorFromAPIStatus(defaultError.Payload.Error, defaultError.Payload.Code) @@ -270,7 +286,7 @@ func (c *PipelineClient) GetPipelineVersion(parameters *params.GetPipelineVersio // Make service call parameters.Context = ctx - response, err := c.apiClient.PipelineService.GetPipelineVersion(parameters, PassThroughAuth) + response, err := c.apiClient.PipelineService.GetPipelineVersion(parameters, c.authInfoWriter) if err != nil { if defaultError, ok := err.(*params.GetPipelineVersionDefault); ok { err = CreateErrorFromAPIStatus(defaultError.Payload.Error, defaultError.Payload.Code) @@ -294,7 +310,7 @@ func (c *PipelineClient) GetPipelineVersionTemplate(parameters *params.GetPipeli // Make service call parameters.Context = ctx - response, err := c.apiClient.PipelineService.GetPipelineVersionTemplate(parameters, PassThroughAuth) + response, err := c.apiClient.PipelineService.GetPipelineVersionTemplate(parameters, c.authInfoWriter) if err != nil { if defaultError, ok := err.(*params.GetPipelineVersionTemplateDefault); ok { err = CreateErrorFromAPIStatus(defaultError.Payload.Error, defaultError.Payload.Code) diff --git a/backend/src/common/client/api_server/pipeline_upload_client.go b/backend/src/common/client/api_server/pipeline_upload_client.go index 476745b85776..166b083d8a7f 100644 --- a/backend/src/common/client/api_server/pipeline_upload_client.go +++ b/backend/src/common/client/api_server/pipeline_upload_client.go @@ -28,7 +28,8 @@ type PipelineUploadInterface interface { } type PipelineUploadClient struct { - apiClient *apiclient.PipelineUpload + apiClient *apiclient.PipelineUpload + authInfoWriter runtime.ClientAuthInfoWriter } func NewPipelineUploadClient(clientConfig clientcmd.ClientConfig, debug bool) ( @@ -36,7 +37,7 @@ func NewPipelineUploadClient(clientConfig clientcmd.ClientConfig, debug bool) ( runtime, err := NewHTTPRuntime(clientConfig, debug) if err != nil { - return nil, err + return nil, fmt.Errorf("Error occurred when creating pipeline upload client: %w", err) } apiClient := apiclient.New(runtime, strfmt.Default) @@ -47,6 +48,20 @@ func NewPipelineUploadClient(clientConfig clientcmd.ClientConfig, debug bool) ( }, nil } +func NewKubeflowInClusterPipelineUploadClient(namespace string, debug bool) ( + *PipelineUploadClient, error) { + + runtime := NewKubeflowInClusterHTTPRuntime(namespace, debug) + + apiClient := apiclient.New(runtime, strfmt.Default) + + // Creating upload client + return &PipelineUploadClient{ + apiClient: apiClient, + authInfoWriter: SATokenVolumeProjectionAuth, + }, nil +} + func (c *PipelineUploadClient) UploadFile(filePath string, parameters *params.UploadPipelineParams) ( *model.APIPipeline, error) { file, err := os.Open(filePath) @@ -68,7 +83,7 @@ func (c *PipelineUploadClient) Upload(parameters *params.UploadPipelineParams) ( // Make service call parameters.Context = ctx - response, err := c.apiClient.PipelineUploadService.UploadPipeline(parameters, PassThroughAuth) + response, err := c.apiClient.PipelineUploadService.UploadPipeline(parameters, c.authInfoWriter) if err != nil { if defaultError, ok := err.(*params.UploadPipelineDefault); ok { @@ -103,7 +118,7 @@ func (c *PipelineUploadClient) UploadPipelineVersion(filePath string, parameters // Make service call parameters.Context = ctx - response, err := c.apiClient.PipelineUploadService.UploadPipelineVersion(parameters, PassThroughAuth) + response, err := c.apiClient.PipelineUploadService.UploadPipelineVersion(parameters, c.authInfoWriter) if err != nil { if defaultError, ok := err.(*params.UploadPipelineVersionDefault); ok { diff --git a/backend/src/common/client/api_server/run_client.go b/backend/src/common/client/api_server/run_client.go index 43da417510b4..f6b8e0521d9d 100644 --- a/backend/src/common/client/api_server/run_client.go +++ b/backend/src/common/client/api_server/run_client.go @@ -5,6 +5,7 @@ import ( workflowapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/ghodss/yaml" + "github.com/go-openapi/runtime" "github.com/go-openapi/strfmt" apiclient "github.com/kubeflow/pipelines/backend/api/go_http_client/run_client" params "github.com/kubeflow/pipelines/backend/api/go_http_client/run_client/run_service" @@ -25,7 +26,8 @@ type RunInterface interface { } type RunClient struct { - apiClient *apiclient.Run + apiClient *apiclient.Run + authInfoWriter runtime.ClientAuthInfoWriter } func NewRunClient(clientConfig clientcmd.ClientConfig, debug bool) ( @@ -33,17 +35,31 @@ func NewRunClient(clientConfig clientcmd.ClientConfig, debug bool) ( runtime, err := NewHTTPRuntime(clientConfig, debug) if err != nil { - return nil, err + return nil, fmt.Errorf("Error occurred when creating run client: %w", err) } apiClient := apiclient.New(runtime, strfmt.Default) - // Creating upload client + // Creating run client return &RunClient{ apiClient: apiClient, }, nil } +func NewKubeflowInClusterRunClient(namespace string, debug bool) ( + *RunClient, error) { + + runtime := NewKubeflowInClusterHTTPRuntime(namespace, debug) + + apiClient := apiclient.New(runtime, strfmt.Default) + + // Creating run client + return &RunClient{ + apiClient: apiClient, + authInfoWriter: SATokenVolumeProjectionAuth, + }, nil +} + func (c *RunClient) Create(parameters *params.CreateRunParams) (*model.APIRunDetail, *workflowapi.Workflow, error) { // Create context with timeout @@ -52,7 +68,7 @@ func (c *RunClient) Create(parameters *params.CreateRunParams) (*model.APIRunDet // Make service call parameters.Context = ctx - response, err := c.apiClient.RunService.CreateRun(parameters, PassThroughAuth) + response, err := c.apiClient.RunService.CreateRun(parameters, c.authInfoWriter) if err != nil { if defaultError, ok := err.(*params.GetRunDefault); ok { err = CreateErrorFromAPIStatus(defaultError.Payload.Error, defaultError.Payload.Code) @@ -86,7 +102,7 @@ func (c *RunClient) Get(parameters *params.GetRunParams) (*model.APIRunDetail, // Make service call parameters.Context = ctx - response, err := c.apiClient.RunService.GetRun(parameters, PassThroughAuth) + response, err := c.apiClient.RunService.GetRun(parameters, c.authInfoWriter) if err != nil { if defaultError, ok := err.(*params.GetRunDefault); ok { err = CreateErrorFromAPIStatus(defaultError.Payload.Error, defaultError.Payload.Code) @@ -119,7 +135,7 @@ func (c *RunClient) Archive(parameters *params.ArchiveRunParams) error { // Make service call parameters.Context = ctx - _, err := c.apiClient.RunService.ArchiveRun(parameters, PassThroughAuth) + _, err := c.apiClient.RunService.ArchiveRun(parameters, c.authInfoWriter) if err != nil { if defaultError, ok := err.(*params.ListRunsDefault); ok { @@ -143,7 +159,7 @@ func (c *RunClient) Unarchive(parameters *params.UnarchiveRunParams) error { // Make service call parameters.Context = ctx - _, err := c.apiClient.RunService.UnarchiveRun(parameters, PassThroughAuth) + _, err := c.apiClient.RunService.UnarchiveRun(parameters, c.authInfoWriter) if err != nil { if defaultError, ok := err.(*params.ListRunsDefault); ok { @@ -167,7 +183,7 @@ func (c *RunClient) Delete(parameters *params.DeleteRunParams) error { // Make service call parameters.Context = ctx - _, err := c.apiClient.RunService.DeleteRun(parameters, PassThroughAuth) + _, err := c.apiClient.RunService.DeleteRun(parameters, c.authInfoWriter) if err != nil { if defaultError, ok := err.(*params.ListRunsDefault); ok { @@ -192,7 +208,7 @@ func (c *RunClient) List(parameters *params.ListRunsParams) ( // Make service call parameters.Context = ctx - response, err := c.apiClient.RunService.ListRuns(parameters, PassThroughAuth) + response, err := c.apiClient.RunService.ListRuns(parameters, c.authInfoWriter) if err != nil { if defaultError, ok := err.(*params.ListRunsDefault); ok { @@ -245,7 +261,7 @@ func (c *RunClient) Terminate(parameters *params.TerminateRunParams) error { // Make service call parameters.Context = ctx - _, err := c.apiClient.RunService.TerminateRun(parameters, PassThroughAuth) + _, err := c.apiClient.RunService.TerminateRun(parameters, c.authInfoWriter) if err != nil { return util.NewUserError(err, fmt.Sprintf("Failed to terminate run. Params: %+v", parameters), diff --git a/backend/src/common/client/api_server/util.go b/backend/src/common/client/api_server/util.go index f5781a3462ba..92ad8c4ff64e 100644 --- a/backend/src/common/client/api_server/util.go +++ b/backend/src/common/client/api_server/util.go @@ -2,6 +2,9 @@ package api_server import ( "fmt" + "io/ioutil" + "net/http" + "os" "time" workflowapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" @@ -17,14 +20,33 @@ import ( ) const ( - apiServerBasePath = "/api/v1/namespaces/%s/services/ml-pipeline:8888/proxy/" - apiServerDefaultTimeout = 35 * time.Second + apiServerBasePath = "/api/v1/namespaces/%s/services/ml-pipeline:8888/proxy/" + apiServerKubeflowInClusterBasePath = "ml-pipeline.%s:8888" + apiServerDefaultTimeout = 35 * time.Second + saDefaultTokenPath = "/var/run/secrets/kubeflow/pipelines/token" + saTokenPathEnvVar = "KF_PIPELINES_SA_TOKEN_PATH" ) // PassThroughAuth never manipulates the request var PassThroughAuth runtime.ClientAuthInfoWriter = runtime.ClientAuthInfoWriterFunc( func(_ runtime.ClientRequest, _ strfmt.Registry) error { return nil }) +var SATokenVolumeProjectionAuth runtime.ClientAuthInfoWriter = runtime.ClientAuthInfoWriterFunc( + func(r runtime.ClientRequest, _ strfmt.Registry) error { + var projectedPath string + if projectedPath = os.Getenv(saTokenPathEnvVar); projectedPath == "" { + projectedPath = saDefaultTokenPath + } + + content, err := ioutil.ReadFile(projectedPath) + if err != nil { + return fmt.Errorf("Failed to read projected SA token at %s: %w", projectedPath, err) + } + + r.SetHeaderParam("Authorization", "Bearer "+string(content)) + return nil + }) + func toDateTimeTestOnly(timeInSec int64) strfmt.DateTime { result, err := strfmt.ParseDateTime(time.Unix(timeInSec, 0).String()) if err != nil { @@ -63,6 +85,14 @@ func NewHTTPRuntime(clientConfig clientcmd.ClientConfig, debug bool) ( return runtime, err } +func NewKubeflowInClusterHTTPRuntime(namespace string, debug bool) *httptransport.Runtime { + schemes := []string{"http"} + httpClient := http.Client{} + runtime := httptransport.NewWithClient(fmt.Sprintf(apiServerKubeflowInClusterBasePath, namespace), "/", schemes, &httpClient) + runtime.SetDebug(debug) + return runtime +} + func CreateErrorFromAPIStatus(error string, code int32) error { return fmt.Errorf("%v (code: %v)", error, code) } diff --git a/backend/src/common/client/api_server/visualization_client.go b/backend/src/common/client/api_server/visualization_client.go index b918807c95cd..9845210cf233 100644 --- a/backend/src/common/client/api_server/visualization_client.go +++ b/backend/src/common/client/api_server/visualization_client.go @@ -22,11 +22,11 @@ type VisualizationClient struct { } func NewVisualizationClient(clientConfig clientcmd.ClientConfig, debug bool) ( - *VisualizationClient, error) { + *VisualizationClient, error) { runtime, err := NewHTTPRuntime(clientConfig, debug) if err != nil { - return nil, err + return nil, fmt.Errorf("Error occurred when creating visualization client: %w", err) } apiClient := apiclient.New(runtime, strfmt.Default) @@ -38,7 +38,7 @@ func NewVisualizationClient(clientConfig clientcmd.ClientConfig, debug bool) ( } func (c *VisualizationClient) Create(parameters *params.CreateVisualizationParams) (*model.APIVisualization, - error) { + error) { // Create context with timeout ctx, cancel := context.WithTimeout(context.Background(), apiServerDefaultTimeout) defer cancel() diff --git a/backend/test/initialization/initialization_test.go b/backend/test/initialization/initialization_test.go index 9c3213bbb879..dd0b04213e36 100644 --- a/backend/test/initialization/initialization_test.go +++ b/backend/test/initialization/initialization_test.go @@ -47,7 +47,7 @@ func (s *InitializationTest) TestInitialization() { assert.Equal(t, "Default", experiments[0].Name) /* ---------- Clean up ---------- */ - test.DeleteAllExperiments(s.experimentClient, t) + test.DeleteAllExperiments(s.experimentClient, "", t) } func TestInitialization(t *testing.T) { diff --git a/backend/test/integration/experiment_api_test.go b/backend/test/integration/experiment_api_test.go index ae304623d1d3..4019c51cafef 100644 --- a/backend/test/integration/experiment_api_test.go +++ b/backend/test/integration/experiment_api_test.go @@ -2,7 +2,6 @@ package integration import ( "testing" - "time" "github.com/golang/glog" @@ -23,6 +22,7 @@ import ( type ExperimentApiTest struct { suite.Suite namespace string + resourceNamespace string experimentClient *api_server.ExperimentClient pipelineClient *api_server.PipelineClient pipelineUploadClient *api_server.PipelineUploadClient @@ -43,26 +43,71 @@ func (s *ExperimentApiTest) SetupTest() { glog.Exitf("Failed to initialize test. Error: %v", err) } } + s.namespace = *namespace - clientConfig := test.GetClientConfig(*namespace) + + var newExperimentClient func() (*api_server.ExperimentClient, error) + var newPipelineUploadClient func() (*api_server.PipelineUploadClient, error) + var newPipelineClient func() (*api_server.PipelineClient, error) + var newRunClient func() (*api_server.RunClient, error) + var newJobClient func() (*api_server.JobClient, error) + + if *isKubeflowMode { + s.resourceNamespace = *resourceNamespace + + newExperimentClient = func() (*api_server.ExperimentClient, error) { + return api_server.NewKubeflowInClusterExperimentClient(s.namespace, *isDebugMode) + } + newPipelineUploadClient = func() (*api_server.PipelineUploadClient, error) { + return api_server.NewKubeflowInClusterPipelineUploadClient(s.namespace, *isDebugMode) + } + newPipelineClient = func() (*api_server.PipelineClient, error) { + return api_server.NewKubeflowInClusterPipelineClient(s.namespace, *isDebugMode) + } + newRunClient = func() (*api_server.RunClient, error) { + return api_server.NewKubeflowInClusterRunClient(s.namespace, *isDebugMode) + } + newJobClient = func() (*api_server.JobClient, error) { + return api_server.NewKubeflowInClusterJobClient(s.namespace, *isDebugMode) + } + } else { + clientConfig := test.GetClientConfig(*namespace) + + newExperimentClient = func() (*api_server.ExperimentClient, error) { + return api_server.NewExperimentClient(clientConfig, *isDebugMode) + } + newPipelineUploadClient = func() (*api_server.PipelineUploadClient, error) { + return api_server.NewPipelineUploadClient(clientConfig, *isDebugMode) + } + newPipelineClient = func() (*api_server.PipelineClient, error) { + return api_server.NewPipelineClient(clientConfig, *isDebugMode) + } + newRunClient = func() (*api_server.RunClient, error) { + return api_server.NewRunClient(clientConfig, *isDebugMode) + } + newJobClient = func() (*api_server.JobClient, error) { + return api_server.NewJobClient(clientConfig, *isDebugMode) + } + } + var err error - s.experimentClient, err = api_server.NewExperimentClient(clientConfig, false) + s.experimentClient, err = newExperimentClient() if err != nil { glog.Exitf("Failed to get experiment client. Error: %v", err) } - s.pipelineUploadClient, err = api_server.NewPipelineUploadClient(clientConfig, false) + s.pipelineUploadClient, err = newPipelineUploadClient() if err != nil { glog.Exitf("Failed to get pipeline upload client. Error: %s", err.Error()) } - s.pipelineClient, err = api_server.NewPipelineClient(clientConfig, false) + s.pipelineClient, err = newPipelineClient() if err != nil { glog.Exitf("Failed to get pipeline client. Error: %s", err.Error()) } - s.runClient, err = api_server.NewRunClient(clientConfig, false) + s.runClient, err = newRunClient() if err != nil { glog.Exitf("Failed to get run client. Error: %s", err.Error()) } - s.jobClient, err = api_server.NewJobClient(clientConfig, false) + s.jobClient, err = newJobClient() if err != nil { glog.Exitf("Failed to get job client. Error: %s", err.Error()) } @@ -74,20 +119,21 @@ func (s *ExperimentApiTest) TestExperimentAPI() { t := s.T() /* ---------- Verify no experiment exist ---------- */ - experiments, totalSize, _, err := s.experimentClient.List(¶ms.ListExperimentParams{}) + experiments, totalSize, _, err := test.ListAllExperiment(s.experimentClient, s.resourceNamespace) assert.Nil(t, err) assert.Equal(t, 0, totalSize) assert.True(t, len(experiments) == 0) /* ---------- Create a new experiment ---------- */ - experiment := &experiment_model.APIExperiment{Name: "training", Description: "my first experiment"} + experiment := test.GetExperiment("training", "my first experiment", s.resourceNamespace) trainingExperiment, err := s.experimentClient.Create(¶ms.CreateExperimentParams{ Body: experiment, }) assert.Nil(t, err) - expectedTrainingExperiment := &experiment_model.APIExperiment{ - ID: trainingExperiment.ID, Name: experiment.Name, - Description: experiment.Description, CreatedAt: trainingExperiment.CreatedAt, StorageState: "STORAGESTATE_AVAILABLE"} + expectedTrainingExperiment := test.GetExperiment(experiment.Name, experiment.Description, s.resourceNamespace) + expectedTrainingExperiment.ID = trainingExperiment.ID + expectedTrainingExperiment.CreatedAt = trainingExperiment.CreatedAt + expectedTrainingExperiment.StorageState = "STORAGESTATE_AVAILABLE" assert.Equal(t, expectedTrainingExperiment, trainingExperiment) /* ---------- Create an experiment with same name. Should fail due to name uniqueness ---------- */ @@ -98,19 +144,19 @@ func (s *ExperimentApiTest) TestExperimentAPI() { /* ---------- Create a few more new experiment ---------- */ // 1 second interval. This ensures they can be sorted by create time in expected order. time.Sleep(1 * time.Second) - experiment = &experiment_model.APIExperiment{Name: "prediction", Description: "my second experiment"} + experiment = test.GetExperiment("prediction", "my second experiment", s.resourceNamespace) _, err = s.experimentClient.Create(¶ms.CreateExperimentParams{ Body: experiment, }) time.Sleep(1 * time.Second) - experiment = &experiment_model.APIExperiment{Name: "moonshot", Description: "my second experiment"} + experiment = test.GetExperiment("moonshot", "my second experiment", s.resourceNamespace) _, err = s.experimentClient.Create(¶ms.CreateExperimentParams{ Body: experiment, }) assert.Nil(t, err) /* ---------- Verify list experiments works ---------- */ - experiments, totalSize, nextPageToken, err := s.experimentClient.List(¶ms.ListExperimentParams{}) + experiments, totalSize, nextPageToken, err := test.ListAllExperiment(s.experimentClient, s.resourceNamespace) assert.Nil(t, err) assert.Equal(t, 3, totalSize) assert.Equal(t, 3, len(experiments)) @@ -122,8 +168,13 @@ func (s *ExperimentApiTest) TestExperimentAPI() { } /* ---------- Verify list experiments sorted by names ---------- */ - experiments, totalSize, nextPageToken, err = s.experimentClient.List(¶ms.ListExperimentParams{ - PageSize: util.Int32Pointer(2), SortBy: util.StringPointer("name")}) + experiments, totalSize, nextPageToken, err = test.ListExperiment( + s.experimentClient, + nil, + util.Int32Pointer(2), + nil, + util.StringPointer("name"), + s.resourceNamespace) assert.Nil(t, err) assert.Equal(t, 3, totalSize) assert.Equal(t, 2, len(experiments)) @@ -131,8 +182,13 @@ func (s *ExperimentApiTest) TestExperimentAPI() { assert.Equal(t, "prediction", experiments[1].Name) assert.NotEmpty(t, nextPageToken) - experiments, totalSize, nextPageToken, err = s.experimentClient.List(¶ms.ListExperimentParams{ - PageToken: util.StringPointer(nextPageToken), PageSize: util.Int32Pointer(2), SortBy: util.StringPointer("name")}) + experiments, totalSize, nextPageToken, err = test.ListExperiment( + s.experimentClient, + nil, + util.Int32Pointer(2), + util.StringPointer(nextPageToken), + util.StringPointer("name"), + s.resourceNamespace) assert.Nil(t, err) assert.Equal(t, 3, totalSize) assert.Equal(t, 1, len(experiments)) @@ -140,8 +196,13 @@ func (s *ExperimentApiTest) TestExperimentAPI() { assert.Empty(t, nextPageToken) /* ---------- Verify list experiments sorted by creation time ---------- */ - experiments, totalSize, nextPageToken, err = s.experimentClient.List(¶ms.ListExperimentParams{ - PageSize: util.Int32Pointer(2), SortBy: util.StringPointer("created_at")}) + experiments, totalSize, nextPageToken, err = test.ListExperiment( + s.experimentClient, + nil, + util.Int32Pointer(2), + nil, + util.StringPointer("created_at"), + s.resourceNamespace) assert.Nil(t, err) assert.Equal(t, 3, totalSize) assert.Equal(t, 2, len(experiments)) @@ -149,8 +210,13 @@ func (s *ExperimentApiTest) TestExperimentAPI() { assert.Equal(t, "prediction", experiments[1].Name) assert.NotEmpty(t, nextPageToken) - experiments, totalSize, nextPageToken, err = s.experimentClient.List(¶ms.ListExperimentParams{ - PageToken: util.StringPointer(nextPageToken), PageSize: util.Int32Pointer(2), SortBy: util.StringPointer("created_at")}) + experiments, totalSize, nextPageToken, err = test.ListExperiment( + s.experimentClient, + nil, + util.Int32Pointer(2), + util.StringPointer(nextPageToken), + util.StringPointer("created_at"), + s.resourceNamespace) assert.Nil(t, err) assert.Equal(t, 3, totalSize) assert.Equal(t, 1, len(experiments)) @@ -158,13 +224,23 @@ func (s *ExperimentApiTest) TestExperimentAPI() { assert.Empty(t, nextPageToken) /* ---------- List experiments sort by unsupported field. Should fail. ---------- */ - _, _, _, err = s.experimentClient.List(¶ms.ListExperimentParams{ - PageSize: util.Int32Pointer(2), SortBy: util.StringPointer("unknownfield")}) + _, _, _, err = test.ListExperiment( + s.experimentClient, + nil, + util.Int32Pointer(2), + nil, + util.StringPointer("unknownfield"), + s.resourceNamespace) assert.NotNil(t, err) /* ---------- List experiments sorted by names descend order ---------- */ - experiments, totalSize, nextPageToken, err = s.experimentClient.List(¶ms.ListExperimentParams{ - PageSize: util.Int32Pointer(2), SortBy: util.StringPointer("name desc")}) + experiments, totalSize, nextPageToken, err = test.ListExperiment( + s.experimentClient, + nil, + util.Int32Pointer(2), + nil, + util.StringPointer("name desc"), + s.resourceNamespace) assert.Nil(t, err) assert.Equal(t, 3, totalSize) assert.Equal(t, 2, len(experiments)) @@ -172,8 +248,13 @@ func (s *ExperimentApiTest) TestExperimentAPI() { assert.Equal(t, "prediction", experiments[1].Name) assert.NotEmpty(t, nextPageToken) - experiments, totalSize, nextPageToken, err = s.experimentClient.List(¶ms.ListExperimentParams{ - PageToken: util.StringPointer(nextPageToken), PageSize: util.Int32Pointer(2), SortBy: util.StringPointer("name desc")}) + experiments, totalSize, nextPageToken, err = test.ListExperiment( + s.experimentClient, + nil, + util.Int32Pointer(2), + util.StringPointer(nextPageToken), + util.StringPointer("name desc"), + s.resourceNamespace) assert.Nil(t, err) assert.Equal(t, 3, totalSize) assert.Equal(t, 1, len(experiments)) @@ -281,8 +362,8 @@ func (s *ExperimentApiTest) TearDownSuite() { } func (s *ExperimentApiTest) cleanUp() { - test.DeleteAllExperiments(s.experimentClient, s.T()) + test.DeleteAllExperiments(s.experimentClient, s.resourceNamespace, s.T()) test.DeleteAllPipelines(s.pipelineClient, s.T()) - test.DeleteAllRuns(s.runClient, s.T()) - test.DeleteAllJobs(s.jobClient, s.T()) + test.DeleteAllRuns(s.runClient, s.resourceNamespace, s.T()) + test.DeleteAllJobs(s.jobClient, s.resourceNamespace, s.T()) } diff --git a/backend/test/integration/flags.go b/backend/test/integration/flags.go index 6e688bf3ee78..f77519234ae7 100644 --- a/backend/test/integration/flags.go +++ b/backend/test/integration/flags.go @@ -16,3 +16,8 @@ var runUpgradeTests = flag.Bool("runUpgradeTests", false, "Whether to run upgrad * 2. One step that doesn't work locally is skipped. */ var isDevMode = flag.Bool("isDevMode", false, "Dev mode helps local development of integration tests") + +var isDebugMode = flag.Bool("isDebugMode", false, "Whether to enable debug mode. Debug mode will log more diagnostics messages.") + +var isKubeflowMode = flag.Bool("isKubeflowMode", false, "Runs tests in full Kubeflow mode") +var resourceNamespace = flag.String("resourceNamespace", "", "The namespace that will store the test resources in Kubeflow mode") diff --git a/backend/test/integration/job_api_test.go b/backend/test/integration/job_api_test.go index 982f6d7719e4..2e63a9fe50fe 100644 --- a/backend/test/integration/job_api_test.go +++ b/backend/test/integration/job_api_test.go @@ -587,10 +587,10 @@ func (s *JobApiTestSuite) TearDownSuite() { /** ======== the following are util functions ========= **/ func (s *JobApiTestSuite) cleanUp() { - test.DeleteAllExperiments(s.experimentClient, s.T()) + test.DeleteAllExperiments(s.experimentClient, "", s.T()) test.DeleteAllPipelines(s.pipelineClient, s.T()) - test.DeleteAllJobs(s.jobClient, s.T()) - test.DeleteAllRuns(s.runClient, s.T()) + test.DeleteAllJobs(s.jobClient, "", s.T()) + test.DeleteAllRuns(s.runClient, "", s.T()) } func defaultApiJob(pipelineVersionId, experimentId string) *job_model.APIJob { diff --git a/backend/test/integration/run_api_test.go b/backend/test/integration/run_api_test.go index 036ef1d4bec6..76292dde277a 100644 --- a/backend/test/integration/run_api_test.go +++ b/backend/test/integration/run_api_test.go @@ -361,7 +361,7 @@ func (s *RunApiTestSuite) TearDownSuite() { func (s *RunApiTestSuite) cleanUp() { /* ---------- Clean up ---------- */ - test.DeleteAllExperiments(s.experimentClient, s.T()) + test.DeleteAllExperiments(s.experimentClient, "", s.T()) test.DeleteAllPipelines(s.pipelineClient, s.T()) - test.DeleteAllRuns(s.runClient, s.T()) + test.DeleteAllRuns(s.runClient, "", s.T()) } diff --git a/backend/test/integration/upgrade_test.go b/backend/test/integration/upgrade_test.go index 7cf3d15acc8a..00b258301d6d 100644 --- a/backend/test/integration/upgrade_test.go +++ b/backend/test/integration/upgrade_test.go @@ -46,10 +46,10 @@ func TestUpgrade(t *testing.T) { func (s *UpgradeTests) TestPrepare() { t := s.T() - test.DeleteAllJobs(s.jobClient, t) - test.DeleteAllRuns(s.runClient, t) + test.DeleteAllJobs(s.jobClient, "", t) + test.DeleteAllRuns(s.runClient, "", t) test.DeleteAllPipelines(s.pipelineClient, t) - test.DeleteAllExperiments(s.experimentClient, t) + test.DeleteAllExperiments(s.experimentClient, "", t) s.PrepareExperiments() s.PreparePipelines() @@ -112,10 +112,10 @@ func (s *UpgradeTests) TearDownSuite() { // Clean up after the suite to unblock other tests. (Not needed for upgrade // tests because it needs changes in prepare tests to persist and verified // later.) - test.DeleteAllExperiments(s.experimentClient, t) + test.DeleteAllExperiments(s.experimentClient, "", t) test.DeleteAllPipelines(s.pipelineClient, t) - test.DeleteAllRuns(s.runClient, t) - test.DeleteAllJobs(s.jobClient, t) + test.DeleteAllRuns(s.runClient, "", t) + test.DeleteAllJobs(s.jobClient, "", t) } } } diff --git a/backend/test/test_utils.go b/backend/test/test_utils.go index 28f47a12c3bf..2b63328df8b0 100644 --- a/backend/test/test_utils.go +++ b/backend/test/test_utils.go @@ -24,12 +24,17 @@ import ( "net/http" "github.com/cenkalti/backoff" + api "github.com/kubeflow/pipelines/backend/api/go_client" experimentparams "github.com/kubeflow/pipelines/backend/api/go_http_client/experiment_client/experiment_service" + "github.com/kubeflow/pipelines/backend/api/go_http_client/experiment_model" jobparams "github.com/kubeflow/pipelines/backend/api/go_http_client/job_client/job_service" + "github.com/kubeflow/pipelines/backend/api/go_http_client/job_model" pipelineparams "github.com/kubeflow/pipelines/backend/api/go_http_client/pipeline_client/pipeline_service" + "github.com/kubeflow/pipelines/backend/api/go_http_client/pipeline_model" runparams "github.com/kubeflow/pipelines/backend/api/go_http_client/run_client/run_service" "github.com/kubeflow/pipelines/backend/api/go_http_client/run_model" "github.com/kubeflow/pipelines/backend/src/common/client/api_server" + "github.com/kubeflow/pipelines/backend/src/common/util" "github.com/pkg/errors" "github.com/stretchr/testify/assert" "k8s.io/client-go/tools/clientcmd" @@ -67,31 +72,31 @@ func GetClientConfig(namespace string) clientcmd.ClientConfig { } func DeleteAllPipelines(client *api_server.PipelineClient, t *testing.T) { - pipelines, _, _, err := client.List(&pipelineparams.ListPipelinesParams{}) + pipelines, _, _, err := ListPipelines(client) assert.Nil(t, err) for _, p := range pipelines { assert.Nil(t, client.Delete(&pipelineparams.DeletePipelineParams{ID: p.ID})) } } -func DeleteAllExperiments(client *api_server.ExperimentClient, t *testing.T) { - experiments, _, _, err := client.List(&experimentparams.ListExperimentParams{}) +func DeleteAllExperiments(client *api_server.ExperimentClient, namespace string, t *testing.T) { + experiments, _, _, err := ListAllExperiment(client, namespace) assert.Nil(t, err) for _, e := range experiments { assert.Nil(t, client.Delete(&experimentparams.DeleteExperimentParams{ID: e.ID})) } } -func DeleteAllRuns(client *api_server.RunClient, t *testing.T) { - runs, _, _, err := client.List(&runparams.ListRunsParams{}) +func DeleteAllRuns(client *api_server.RunClient, namespace string, t *testing.T) { + runs, _, _, err := ListRuns(client, namespace) assert.Nil(t, err) for _, r := range runs { assert.Nil(t, client.Delete(&runparams.DeleteRunParams{ID: r.ID})) } } -func DeleteAllJobs(client *api_server.JobClient, t *testing.T) { - jobs, _, _, err := client.List(&jobparams.ListJobsParams{}) +func DeleteAllJobs(client *api_server.JobClient, namespace string, t *testing.T) { + jobs, _, _, err := ListJobs(client, namespace) assert.Nil(t, err) for _, j := range jobs { assert.Nil(t, client.Delete(&jobparams.DeleteJobParams{ID: j.ID})) @@ -108,3 +113,69 @@ func GetExperimentIDFromAPIResourceReferences(resourceRefs []*run_model.APIResou } return experimentID } + +func ListPipelines(client *api_server.PipelineClient) ( + []*pipeline_model.APIPipeline, int, string, error) { + parameters := &pipelineparams.ListPipelinesParams{} + + return client.List(parameters) +} + +func ListAllExperiment(client *api_server.ExperimentClient, namespace string) ([]*experiment_model.APIExperiment, int, string, error) { + return ListExperiment(client, nil, nil, nil, nil, namespace) +} + +func ListExperiment(client *api_server.ExperimentClient, filter *string, pageSize *int32, pageToken *string, sortBy *string, namespace string) ([]*experiment_model.APIExperiment, int, string, error) { + parameters := &experimentparams.ListExperimentParams{ + Filter: filter, + PageSize: pageSize, + PageToken: pageToken, + SortBy: sortBy, + } + if namespace != "" { + parameters.SetResourceReferenceKeyType(util.StringPointer(api.ResourceType_name[int32(api.ResourceType_NAMESPACE)])) + parameters.SetResourceReferenceKeyID(&namespace) + } + + return client.List(parameters) +} + +func ListRuns(client *api_server.RunClient, namespace string) ([]*run_model.APIRun, int, string, error) { + parameters := &runparams.ListRunsParams{} + if namespace != "" { + parameters.SetResourceReferenceKeyType(util.StringPointer(api.ResourceType_name[int32(api.ResourceType_NAMESPACE)])) + parameters.SetResourceReferenceKeyID(&namespace) + } + + return client.List(parameters) +} + +func ListJobs(client *api_server.JobClient, namespace string) ([]*job_model.APIJob, int, string, error) { + parameters := &jobparams.ListJobsParams{} + if namespace != "" { + parameters.SetResourceReferenceKeyType(util.StringPointer(api.ResourceType_name[int32(api.ResourceType_NAMESPACE)])) + parameters.SetResourceReferenceKeyID(&namespace) + } + + return client.List(parameters) +} + +func GetExperiment(name string, description string, namespace string) *experiment_model.APIExperiment { + experiment := &experiment_model.APIExperiment{ + Name: name, + Description: description} + + if namespace != "" { + experiment.ResourceReferences = []*experiment_model.APIResourceReference{ + { + Key: &experiment_model.APIResourceKey{ + Type: experiment_model.APIResourceTypeNAMESPACE, + ID: namespace, + }, + Relationship: experiment_model.APIRelationshipOWNER, + }, + } + } + + return experiment +}