Skip to content

Commit

Permalink
add support for uploading new sample pipeline vers
Browse files Browse the repository at this point in the history
This change adds additional sampleconfig options that provide support
for uploading new Pipelines and PipelineVersion samples. To accommodate
this and backwards compatibility the structure of the samples config has
been changed. Configs following the old format will continue to be
supported.

Sample config code is also moved to its own file so as not to bloat
main.go

In order to handle conflicts, and detecting pipeline/pipelineVersion
existence, additional db queries are made per pipeline and pipeline
version at apiserver startup.

Signed-off-by: Humair Khan <[email protected]>
  • Loading branch information
HumairAK committed Jan 24, 2025
1 parent 8fe2157 commit 30108ac
Show file tree
Hide file tree
Showing 7 changed files with 488 additions and 125 deletions.
200 changes: 200 additions & 0 deletions backend/src/apiserver/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
// Copyright 2025 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package config

import (
"encoding/json"
"fmt"
"github.com/golang/glog"
"github.com/kubeflow/pipelines/backend/src/apiserver/client"
"github.com/kubeflow/pipelines/backend/src/apiserver/common"
"github.com/kubeflow/pipelines/backend/src/apiserver/model"
"github.com/kubeflow/pipelines/backend/src/apiserver/resource"
"github.com/kubeflow/pipelines/backend/src/apiserver/server"
"github.com/kubeflow/pipelines/backend/src/common/util"
"github.com/pkg/errors"
"google.golang.org/grpc/codes"
"os"
"time"
)

// deprecated
type deprecatedConfig struct {
Name string
Description string
File string
}

type configPipelines struct {
Name string
Description string
File string
// optional, Name is used for PipelineVersion if not provided
VersionName string
// optional, Description is used for PipelineVersion if not provided
VersionDescription string
}

type config struct {
// If pipeline version already exists and
// LoadSamplesOnRestart is enabled, then the pipeline
// version is uploaded again on server restart
// if it does not already exist
LoadSamplesOnRestart bool
Pipelines []configPipelines
}

// LoadSamples preloads a collection of pipeline samples
//
// If LoadSamplesOnRestart is false then Samples are only
// loaded once when the pipeline system is initially installed.
// They won't be loaded on upgrade or pod restart, to
// prevent them from reappearing if user explicitly deletes the
// samples. If LoadSamplesOnRestart is true then PipelineVersions
// are uploaded if they do not already exist upon upgrade or pod
// restart.
func LoadSamples(resourceManager *resource.ResourceManager, sampleConfigPath string) error {
pathExists, err := client.PathExists(sampleConfigPath)
if err != nil {
return err
}

if !pathExists {
glog.Infof("No samples path provided, skipping loading samples..")
return nil
}

configBytes, err := os.ReadFile(sampleConfigPath)
if err != nil {
return fmt.Errorf("failed to read sample configurations file. Err: %v", err)
}

// Check if sample has being loaded already and skip loading if true.
haveSamplesLoaded, err := resourceManager.HaveSamplesLoaded()
if err != nil {
return err
}

var pipelineConfig config
if configErr := json.Unmarshal(configBytes, &pipelineConfig); configErr != nil {
// Attempt to parse to deprecated config version:
var deprecatedCfg []deprecatedConfig
if depConfigErr := json.Unmarshal(configBytes, &deprecatedCfg); depConfigErr != nil {
return fmt.Errorf("failed to read sample configurations. Err: %v", configErr)
}
glog.Warningf("encountered deprecated version of samples config, please update to the newer version to " +
"ensure future compatibility")
for _, cfg := range deprecatedCfg {
pipelineConfig.Pipelines = append(pipelineConfig.Pipelines, configPipelines{
Name: cfg.Name,
File: cfg.File,
Description: cfg.Description,
})
}
pipelineConfig.LoadSamplesOnRestart = false
}

if !pipelineConfig.LoadSamplesOnRestart && haveSamplesLoaded {
glog.Infof("Samples already loaded in the past. Skip loading.")
return nil
}

for _, cfg := range pipelineConfig.Pipelines {
reader, configErr := os.Open(cfg.File)
if configErr != nil {
return fmt.Errorf("failed to load sample %s. Error: %v", cfg.Name, configErr)
}
pipelineFile, configErr := server.ReadPipelineFile(cfg.File, reader, common.MaxFileLength)
if configErr != nil {
return fmt.Errorf("failed to decompress the file %s. Error: %v", cfg.Name, configErr)
}

// Create pipeline if it does not already exist
p, fetchErr := resourceManager.GetPipelineByNameAndNamespace(cfg.Name, "")
if fetchErr != nil {
var userErr *util.UserError
if errors.As(fetchErr, &userErr) && userErr.ExternalStatusCode() == codes.NotFound {
p, configErr = resourceManager.CreatePipeline(&model.Pipeline{
Name: cfg.Name,
Description: cfg.Description,
})
if configErr != nil {
// Log the error but not fail. The API Server pod can restart and it could potentially cause
// name collision. In the future, we might consider loading samples during deployment, instead
// of when API server starts.
glog.Warningf(fmt.Sprintf(
"Failed to create pipeline for %s. Error: %v", cfg.Name, configErr))
continue
}
} else {
return fmt.Errorf(
"Failed to handle load sample for Pipeline: %s. Error: %v", cfg.Name, fetchErr)
}
}

// Use Pipeline Version Name/Description if provided
// Otherwise fallback to owning Pipeline's Name/Description
pvDescription := cfg.Description
if cfg.VersionDescription != "" {
pvDescription = cfg.VersionDescription
}
pvName := cfg.Name
if cfg.VersionName != "" {
pvName = cfg.VersionName
}

// If the Pipeline Version exists, do nothing
// Otherwise upload new Pipeline Version for
// this pipeline.
_, fetchErr = resourceManager.GetPipelineVersionByName(pvName)
if fetchErr != nil {
var userErr *util.UserError
if errors.As(fetchErr, &userErr) && userErr.ExternalStatusCode() == codes.NotFound {
_, configErr = resourceManager.CreatePipelineVersion(
&model.PipelineVersion{
Name: pvName,
Description: pvDescription,
PipelineId: p.UUID,
PipelineSpec: string(pipelineFile),
},
)
if configErr != nil {
// Log the error but not fail. The API Server pod can restart and it could potentially cause name collision.
// In the future, we might consider loading samples during deployment, instead of when API server starts.
glog.Warningf(fmt.Sprintf("Failed to create pipeline for %s. Error: %v", pvName, configErr))

continue
}
// Since the default sorting is by create time,
// Sleep one second makes sure the samples are
// showing up in the same order as they are added.
time.Sleep(1 * time.Second)
} else {
return fmt.Errorf(
"Failed to handle load sample for PipelineVersion: %s. Error: %v", pvName, fetchErr)
}
} else {
// pipeline version already exists, do nothing
continue
}
}

err = resourceManager.MarkSampleLoaded()
if err != nil {
return err
}
glog.Info("All samples are loaded.")
return nil
}
186 changes: 186 additions & 0 deletions backend/src/apiserver/config/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
// Copyright 2025 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package config

import (
"encoding/json"
"github.com/kubeflow/pipelines/backend/src/apiserver/list"
"github.com/kubeflow/pipelines/backend/src/apiserver/model"
"github.com/kubeflow/pipelines/backend/src/apiserver/resource"
"github.com/kubeflow/pipelines/backend/src/common/util"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"os"
"path/filepath"
"testing"
)

func fakeResourceManager() *resource.ResourceManager {
clientManager := resource.NewFakeClientManagerOrFatalV2()
resourceManager := resource.NewResourceManager(clientManager, &resource.ResourceManagerOptions{CollectMetrics: false})
return resourceManager
}

func TestLoadSamplesConfigBackwardsCompatibility(t *testing.T) {
rm := fakeResourceManager()
pc := []deprecatedConfig{
{
Name: "Pipeline 1",
Description: "test description",
File: "testdata/sample_pipeline.yaml",
},
{
Name: "Pipeline 2",
Description: "test description",
File: "testdata/sample_pipeline.yaml",
},
}

path, err := writeSampleConfigDeprecated(t, pc, "sample.json")
require.NoError(t, err)
err = LoadSamples(rm, path)
require.NoError(t, err)

_, err = rm.GetPipelineByNameAndNamespace(pc[0].Name, "")
require.NoError(t, err)
_, err = rm.GetPipelineByNameAndNamespace(pc[1].Name, "")
require.NoError(t, err)

_, err = rm.GetPipelineVersionByName(pc[0].Name)
require.NoError(t, err)
_, err = rm.GetPipelineVersionByName(pc[1].Name)
require.NoError(t, err)

// Update pipeline version for Pipeline 1
pc[0].Name = "Pipeline 3"
path, err = writeSampleConfigDeprecated(t, pc, "sample.json")
require.NoError(t, err)

// Loading samples should result in no pipeline uploaded
err = LoadSamples(rm, path)
require.NoError(t, err)
_, err = rm.GetPipelineByNameAndNamespace(pc[0].Name, "")
var userErr *util.UserError
if assert.ErrorAs(t, err, &userErr) {
require.Equal(t, codes.NotFound, userErr.ExternalStatusCode())
}
}

func TestLoadSamples(t *testing.T) {
rm := fakeResourceManager()
pc := config{
LoadSamplesOnRestart: true,
Pipelines: []configPipelines{
{
Name: "Pipeline 1",
Description: "test description",
File: "testdata/sample_pipeline.yaml",
VersionName: "Pipeline 1 - Ver 1",
VersionDescription: "Pipeline 1 - Ver 1 Description",
},
{
Name: "Pipeline 2",
Description: "test description",
File: "testdata/sample_pipeline.yaml",
VersionName: "Pipeline 2 - Ver 1",
VersionDescription: "Pipeline 2 - Ver 1 Description",
},
},
}

path, err := writeSampleConfig(t, pc, "sample.json")
require.NoError(t, err)
err = LoadSamples(rm, path)
require.NoError(t, err)

var pipeline1 *model.Pipeline
pipeline1, err = rm.GetPipelineByNameAndNamespace(pc.Pipelines[0].Name, "")
require.NoError(t, err)
var pipeline2 *model.Pipeline
pipeline2, err = rm.GetPipelineByNameAndNamespace(pc.Pipelines[1].Name, "")
require.NoError(t, err)

_, err = rm.GetPipelineVersionByName(pc.Pipelines[0].VersionName)
require.NoError(t, err)

// Update pipeline version for Pipeline 1
pc.Pipelines[0].VersionName = "Pipeline 1 - Ver 2"
path, err = writeSampleConfig(t, pc, "sample.json")
require.NoError(t, err)
err = LoadSamples(rm, path)
require.NoError(t, err)

// Expect another Pipeline version added for Pipeline 1
opts, err := list.NewOptions(&model.PipelineVersion{}, 10, "id", nil)
require.NoError(t, err)
_, totalSize, _, err := rm.ListPipelineVersions(pipeline1.UUID, opts)
require.NoError(t, err)
require.Equal(t, totalSize, 2)

// Update pipeline version for Pipeline 2
pc.Pipelines[1].VersionName = "Pipeline 2 - Ver 2"
path, err = writeSampleConfig(t, pc, "sample.json")
require.NoError(t, err)
err = LoadSamples(rm, path)
require.NoError(t, err)

// Expect another Pipeline version added for Pipeline 2
_, err = rm.GetPipelineVersionByName(pc.Pipelines[1].VersionName)
require.NoError(t, err)
_, totalSize, _, err = rm.ListPipelineVersions(pipeline2.UUID, opts)
require.Equal(t, totalSize, 2)

// Confirm previous pipeline version count has not been affected
_, totalSize, _, err = rm.ListPipelineVersions(pipeline1.UUID, opts)
require.Equal(t, totalSize, 2)

// When LoadSamplesOnRestart is false, changes to config should
// result in no new pipelines update upon restart
pc.LoadSamplesOnRestart = false

// Update pipeline version for Pipeline 2
pc.Pipelines[1].VersionName = "Pipeline 2 - Ver 3"
path, err = writeSampleConfig(t, pc, "sample.json")
require.NoError(t, err)
err = LoadSamples(rm, path)
require.NoError(t, err)

// Expect no change
_, totalSize, _, err = rm.ListPipelineVersions(pipeline2.UUID, opts)
require.NoError(t, err)
require.Equal(t, totalSize, 2)
}

func writeSampleConfig(t *testing.T, config config, path string) (string, error) {
return writeContents(t, config, path)
}

func writeSampleConfigDeprecated(t *testing.T, config []deprecatedConfig, path string) (string, error) {
return writeContents(t, config, path)
}

func writeContents(t *testing.T, content interface{}, path string) (string, error) {
tempDir := t.TempDir()
sampleFilePath := filepath.Join(tempDir, path)
marshal, err := json.Marshal(content)
if err != nil {
return "", err
}
if err := os.WriteFile(sampleFilePath, marshal, 0644); err != nil {
t.Fatalf("Failed to create %v file: %v", path, err)
}
return sampleFilePath, nil
}
Loading

0 comments on commit 30108ac

Please sign in to comment.