Skip to content

Commit

Permalink
feat: Add pullman PVC storage provider (#36)
Browse files Browse the repository at this point in the history
* Add pullman PVC storage provider

Add pullman PVC storage provider to support models in PVC

Signed-off-by: Chin Huang <[email protected]>

* use symlink, restructure, cleanup based on review, temporary use join instead of SecureJoin due to use of symlinks

Signed-off-by: Chin Huang <[email protected]>

* add more error handling

Signed-off-by: Chin Huang <[email protected]>

---------

Signed-off-by: Chin Huang <[email protected]>
  • Loading branch information
chinhuang007 authored Mar 1, 2023
1 parent dd8c2c2 commit 6992097
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 4 deletions.
12 changes: 8 additions & 4 deletions model-serving-puller/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
_ "github.com/kserve/modelmesh-runtime-adapter/pullman/storageproviders/azure"
_ "github.com/kserve/modelmesh-runtime-adapter/pullman/storageproviders/gcs"
_ "github.com/kserve/modelmesh-runtime-adapter/pullman/storageproviders/http"
_ "github.com/kserve/modelmesh-runtime-adapter/pullman/storageproviders/pvc"
_ "github.com/kserve/modelmesh-runtime-adapter/pullman/storageproviders/s3"
)

Expand Down Expand Up @@ -183,10 +184,13 @@ func (s *Puller) ProcessLoadModelRequest(ctx context.Context, req *mmesh.LoadMod
}

// update model path to an absolute path in the local filesystem
modelFullPath, joinErr := util.SecureJoin(modelDir, modelPathFilename)
if joinErr != nil {
return nil, fmt.Errorf("Error joining paths '%s' and '%s': %w", modelDir, modelPathFilename, joinErr)
}
// commment out SecureJoin since it doesn't handle symlinks well
// modelFullPath, joinErr := util.SecureJoin(modelDir, modelPathFilename)
// if joinErr != nil {
// return nil, fmt.Errorf("Error joining paths '%s' and '%s': %w", modelDir, modelPathFilename, joinErr)
// }
modelFullPath := modelDir + string(filepath.Separator) + modelPathFilename

req.ModelPath = modelFullPath

// if it was included, update schema path to an absolute path in the local filesystem
Expand Down
128 changes: 128 additions & 0 deletions pullman/storageproviders/pvc/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright 2022 IBM Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pvcprovider

import (
"context"
"fmt"
"os"
"path/filepath"

"github.com/go-logr/logr"

"github.com/kserve/modelmesh-runtime-adapter/internal/util"
"github.com/kserve/modelmesh-runtime-adapter/pullman"
)

const (
// config fields
configPVCName = "name"

// defaults
defaultPVCMountBase = "/pvc_mounts"
)

type pvcProvider struct {
pvcMountBase string
}

// pvcProvider implements StorageProvider
var _ pullman.StorageProvider = (*pvcProvider)(nil)

func (p pvcProvider) GetKey(config pullman.Config) string {
// Since the same instance of the repository can be used for all PVCs, there is no need to distinguish them here.
return ""
}

func (p pvcProvider) NewRepository(config pullman.Config, log logr.Logger) (pullman.RepositoryClient, error) {
if p.pvcMountBase == "" {
p.pvcMountBase = defaultPVCMountBase
}

fileInfo, err := os.Stat(p.pvcMountBase)
if err != nil {
if os.IsNotExist(err) {
return nil, fmt.Errorf("the PVC mount base '%s' doesn't exist: %w", p.pvcMountBase, err)
}
return nil, fmt.Errorf("failed to access the PVC mount base '%s': %w", p.pvcMountBase, err)
}

if !fileInfo.IsDir() {
return nil, fmt.Errorf("the PVC mount base '%s' is not a directory", p.pvcMountBase)
}

return &pvcRepositoryClient{
pvcProvider: p,
log: log,
}, nil
}

type pvcRepositoryClient struct {
pvcProvider pvcProvider
log logr.Logger
}

// pvcRepositoryClient implements RepositoryClient
var _ pullman.RepositoryClient = (*pvcRepositoryClient)(nil)

func (r *pvcRepositoryClient) Pull(ctx context.Context, pc pullman.PullCommand) error {
targets := pc.Targets
destDir := pc.Directory

// Process per-command configuration
pvcName, ok := pullman.GetString(pc.RepositoryConfig, configPVCName)
if !ok {
return fmt.Errorf("required configuration '%s' missing from command", configPVCName)
}

// create destination directories
if mkdirErr := os.MkdirAll(destDir, 0755); mkdirErr != nil {
return fmt.Errorf("unable to create directories '%s': %w", destDir, mkdirErr)
}

pvcDir, joinErr := util.SecureJoin(r.pvcProvider.pvcMountBase, pvcName)
if joinErr != nil {
return fmt.Errorf("unable to join paths '%s' and '%s': %v", r.pvcProvider.pvcMountBase, pvcName, joinErr)
}
r.log.V(1).Info("The PVC directory is set", "pvcDir", pvcDir)

for _, pt := range targets {
fullModelPath, joinErr := util.SecureJoin(pvcDir, pt.RemotePath)
if joinErr != nil {
return fmt.Errorf("unable to join paths '%s' and '%s': %v", pvcDir, pt.RemotePath, joinErr)
}
r.log.V(1).Info("The model path is set", "fullModelPath", fullModelPath)

// check the local model path /pvcMountBase/pvcName/modelPath exists
if _, err := os.Stat(fullModelPath); err != nil {
return fmt.Errorf("unable to access model local path '%s': %w", fullModelPath, err)
}

// create symlink
linkPath, err := util.SecureJoin(destDir, filepath.Base(fullModelPath))
if err != nil {
return fmt.Errorf("unable to join paths '%s' and '%s': %v", destDir, filepath.Base(fullModelPath), err)
}
if err := os.Symlink(fullModelPath, linkPath); err != nil {
return fmt.Errorf("unable to create symlink '%s': %w", linkPath, err)
}
}

return nil
}

func init() {
p := pvcProvider{}
pullman.RegisterProvider("pvc", p)
}
109 changes: 109 additions & 0 deletions pullman/storageproviders/pvc/provider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2022 IBM Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pvcprovider

import (
"context"
"os"
"testing"

"github.com/go-logr/logr"
"github.com/kserve/modelmesh-runtime-adapter/pullman"
"github.com/stretchr/testify/assert"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
)

func newPVCProvider(t *testing.T) (pvcProvider, logr.Logger) {
p := pvcProvider{}
log := zap.New()
return p, log
}

func newPVCRepositoryClient(t *testing.T) *pvcRepositoryClient {
pvcMountBase, _ := os.Getwd()
p, log := newPVCProvider(t)
p.pvcMountBase = pvcMountBase
pvcrc := pvcRepositoryClient{
pvcProvider: p,
log: log,
}
return &pvcrc
}

func Test_NewRepository(t *testing.T) {
pvcMountBase, _ := os.Getwd()
p, log := newPVCProvider(t)
p.pvcMountBase = pvcMountBase
c := pullman.NewRepositoryConfig("pvc", nil)
c.Set("name", "pvcName")
_, err := p.NewRepository(c, log)
assert.NoError(t, err)
}

func Test_Verify_Directory(t *testing.T) {
pvcName := "pvcName"
modelDir := "modelDir"
modelID := "testId"
pvcRc := newPVCRepositoryClient(t)
c := pullman.NewRepositoryConfig("pvc", nil)
c.Set("name", pvcName)

pvcNameDir := pvcRc.pvcProvider.pvcMountBase + "/" + pvcName
pvcModelDir := pvcNameDir + "/" + modelDir
serveModelDir := pvcRc.pvcProvider.pvcMountBase + "/" + modelID

inputPullCommand := pullman.PullCommand{
RepositoryConfig: c,
Directory: serveModelDir,
Targets: []pullman.Target{
{
RemotePath: modelDir,
},
},
}

// should return error because the directory doesn't exist
err := pvcRc.Pull(context.Background(), inputPullCommand)
assert.Error(t, err)

err = os.MkdirAll(pvcModelDir, os.ModePerm)
assert.NoError(t, err)

// should not return error because the directory exists
err = pvcRc.Pull(context.Background(), inputPullCommand)
assert.NoError(t, err)

err = os.RemoveAll(pvcNameDir)
assert.NoError(t, err)

err = os.RemoveAll(serveModelDir)
assert.NoError(t, err)
}

func Test_GetKey(t *testing.T) {
provider := pvcProvider{}

createTestConfig := func(pvcName string) *pullman.RepositoryConfig {
config := pullman.NewRepositoryConfig("pvc", nil)
config.Set(configPVCName, pvcName)
return config
}

// different pvc names should have the same key
t.Run("shouldChangeForTokenUri", func(t *testing.T) {
config1 := createTestConfig("pvcName1")
config2 := createTestConfig("pvcName2")
assert.Equal(t, provider.GetKey(config1), provider.GetKey(config2))
})
}

0 comments on commit 6992097

Please sign in to comment.