Skip to content

Commit

Permalink
Merge pull request #85 from kvaster/cache
Browse files Browse the repository at this point in the history
  • Loading branch information
chrislusf authored Sep 6, 2022
2 parents 006d723 + 8b4d862 commit 78d6ce7
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 5 deletions.
25 changes: 20 additions & 5 deletions pkg/driver/mounter_seaweedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package driver

import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"

Expand All @@ -19,6 +21,11 @@ type seaweedFsMounter struct {
volContext map[string]string
}

type seaweedFsUnmounter struct {
unmounter Unmounter
cacheDir string
}

const (
seaweedFsCmd = "weed"
)
Expand Down Expand Up @@ -46,7 +53,7 @@ func (seaweedFs *seaweedFsMounter) getOrDefaultContextInt(key string, defaultVal
v := seaweedFs.getOrDefaultContext(key, "")
if v != "" {
iv, err := strconv.Atoi(v)
if err != nil {
if err == nil {
return iv
}
}
Expand Down Expand Up @@ -97,9 +104,10 @@ func (seaweedFs *seaweedFsMounter) Mount(target string) (Unmounter, error) {
args = append(args, "-readOnly")
}

if seaweedFs.driver.CacheDir != "" {
args = append(args, fmt.Sprintf("-cacheDir=%s", seaweedFs.driver.CacheDir))
}
// CacheDir should be always defined - we use temp dir in case it is not defined
// we need to use predictable cache path, because we need to clean it up on unstage
cacheDir := filepath.Join(seaweedFs.driver.CacheDir, seaweedFs.volumeID)
args = append(args, fmt.Sprintf("-cacheDir=%s", cacheDir))

if cw := seaweedFs.getOrDefaultContextInt("concurrentWriters", seaweedFs.driver.ConcurrentWriters); cw > 0 {
args = append(args, fmt.Sprintf("-concurrentWriters=%d", cw))
Expand All @@ -115,7 +123,14 @@ func (seaweedFs *seaweedFsMounter) Mount(target string) (Unmounter, error) {
if err != nil {
glog.Errorf("mount %v %s to %s: %s", seaweedFs.driver.filers, seaweedFs.path, target, err)
}
return u, err

return &seaweedFsUnmounter{unmounter: u, cacheDir: cacheDir}, err
}

func (su *seaweedFsUnmounter) Unmount() error {
err := su.unmounter.Unmount()
_ = os.RemoveAll(su.cacheDir)
return err
}

func GetLocalSocket(volumeID string) string {
Expand Down
21 changes: 21 additions & 0 deletions pkg/driver/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package driver
import (
"fmt"
"os"
"path/filepath"
"strings"
"sync"

Expand All @@ -14,6 +15,10 @@ import (
)

func NewNodeServer(n *SeaweedFsDriver) *NodeServer {
if err := removeDirContent(n.CacheDir); err != nil {
glog.Warning("error cleaning up cache dir")
}

return &NodeServer{
Driver: n,
volumeMutexes: NewKeyMutex(),
Expand Down Expand Up @@ -83,6 +88,22 @@ func checkMount(targetPath string) (bool, error) {
return notMnt, nil
}

func removeDirContent(path string) error {
files, err := filepath.Glob(filepath.Join(path, "*"))
if err != nil {
return err
}

for _, file := range files {
err = os.RemoveAll(file)
if err != nil {
return err
}
}

return nil
}

type KeyMutex struct {
mutexes sync.Map
}
Expand Down

0 comments on commit 78d6ce7

Please sign in to comment.