Skip to content

Commit

Permalink
lookup space root and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
micbar committed Oct 18, 2021
1 parent 2e9bc0c commit a300975
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 40 deletions.
1 change: 1 addition & 0 deletions changelog/unreleased/get-quota-storage-space.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ Make the cs3apis accept a Reference in the getQuota Request to limit the call to

https://github.com/cs3org/reva/pull/2152
https://github.com/cs3org/reva/pull/2178
https://github.com/cs3org/reva/pull/2187
43 changes: 38 additions & 5 deletions pkg/storage/utils/decomposedfs/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,10 @@ func (lu *Lookup) NodeFromResource(ctx context.Context, ref *provider.Reference)
if ref.ResourceId != nil {
// check if a storage space reference is used
// currently, the decomposed fs uses the root node id as the space id
spaceRoot, err := lu.NodeFromID(ctx, ref.ResourceId)
n, err := lu.NodeFromID(ctx, ref.ResourceId)
if err != nil {
return nil, err
}
n := spaceRoot
// is this a relative reference?
if ref.Path != "" {
p := filepath.Clean(ref.Path)
Expand All @@ -62,8 +61,6 @@ func (lu *Lookup) NodeFromResource(ctx context.Context, ref *provider.Reference)
return nil, err
}
}
// use reference id as space root for relative references
n.SpaceRoot = spaceRoot
}
return n, nil
}
Expand Down Expand Up @@ -110,7 +107,18 @@ func (lu *Lookup) NodeFromID(ctx context.Context, id *provider.ResourceId) (n *n
if id == nil || id.OpaqueId == "" {
return nil, fmt.Errorf("invalid resource id %+v", id)
}
return node.ReadNode(ctx, lu, id.OpaqueId)
n, err = node.ReadNode(ctx, lu, id.OpaqueId)
if err != nil {
return nil, err
}
// we only need to look for the space root if this is not a space root itself
if n.SpaceRoot == nil {
err := lu.FindStorageSpaceRoot(n)
if err != nil {
return nil, err
}
}
return n, err
}

// Path returns the path for node
Expand Down Expand Up @@ -180,6 +188,12 @@ func (lu *Lookup) WalkPath(ctx context.Context, r *node.Node, p string, followRe
}
}

if spaceNameBytes, err := xattr.Get(r.InternalPath(), xattrs.SpaceNameAttr); err == nil {
if string(spaceNameBytes) != "" {
r.SpaceRoot = r
}
}

if !r.Exists && i < len(segments)-1 {
return r, errtypes.NotFound(segments[i])
}
Expand All @@ -192,6 +206,25 @@ func (lu *Lookup) WalkPath(ctx context.Context, r *node.Node, p string, followRe
return r, nil
}

// FindStorageSpaceRoot calls n.Parent() and climbs the tree until it finds the space root node.
func (lu *Lookup) FindStorageSpaceRoot(r *node.Node) error {
var err error
n := r
for i := 0; r.ParentID != ""; i++ {
if r, err = r.Parent(); err != nil {
return err
}
path := r.InternalPath()
if spaceNameBytes, err := xattr.Get(path, xattrs.SpaceNameAttr); err == nil {
if string(spaceNameBytes) != "" {
n.SpaceRoot = r
break
}
}
}
return nil
}

// HomeOrRootNode returns the users home node when home support is enabled.
// it returns the storages root node otherwise
func (lu *Lookup) HomeOrRootNode(ctx context.Context) (node *node.Node, err error) {
Expand Down
56 changes: 53 additions & 3 deletions pkg/storage/utils/decomposedfs/lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
package decomposedfs_test

import (
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
helpers "github.com/cs3org/reva/pkg/storage/utils/decomposedfs/testhelpers"
"github.com/cs3org/reva/pkg/storage/utils/decomposedfs/xattrs"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
Expand All @@ -41,10 +41,9 @@ var _ = Describe("Lookup", func() {
if env != nil {
env.Cleanup()
}

})

Describe("Path", func() {
Describe("Node from path", func() {
It("returns the path including a leading slash", func() {
n, err := env.Lookup.NodeFromPath(env.Ctx, "/dir1/file1", false)
Expect(err).ToNot(HaveOccurred())
Expand All @@ -55,6 +54,57 @@ var _ = Describe("Lookup", func() {
})
})

Describe("Node From Resource only by path", func() {
It("returns the path including a leading slash and the space root is set", func() {
n, err := env.Lookup.NodeFromResource(env.Ctx, &provider.Reference{Path: "/dir1/subdir1/file2"})
Expect(err).ToNot(HaveOccurred())

path, err := env.Lookup.Path(env.Ctx, n)
Expect(err).ToNot(HaveOccurred())
Expect(path).To(Equal("/dir1/subdir1/file2"))
Expect(n.SpaceRoot.Name).To(Equal("userid"))
Expect(n.SpaceRoot.ParentID).To(Equal("root"))
})
})

Describe("Node From Resource only by id", func() {
It("returns the path including a leading slash and the space root is set", func() {
// do a node lookup by path
nRef, err := env.Lookup.NodeFromPath(env.Ctx, "/dir1/file1", false)
Expect(err).ToNot(HaveOccurred())

// try to find the same node by id
n, err := env.Lookup.NodeFromResource(env.Ctx, &provider.Reference{ResourceId: &provider.ResourceId{OpaqueId: nRef.ID}})
Expect(err).ToNot(HaveOccurred())

// Check if we got the right node and spaceRoot
path, err := env.Lookup.Path(env.Ctx, n)
Expect(err).ToNot(HaveOccurred())
Expect(path).To(Equal("/dir1/file1"))
Expect(n.SpaceRoot.Name).To(Equal("userid"))
Expect(n.SpaceRoot.ParentID).To(Equal("root"))
})
})

Describe("Node From Resource by id and relative path", func() {
It("returns the path including a leading slash and the space root is set", func() {
// do a node lookup by path for the parent
nRef, err := env.Lookup.NodeFromPath(env.Ctx, "/dir1", false)
Expect(err).ToNot(HaveOccurred())

// try to find the child node by parent id and relative path
n, err := env.Lookup.NodeFromResource(env.Ctx, &provider.Reference{ResourceId: &provider.ResourceId{OpaqueId: nRef.ID}, Path: "./file1"})
Expect(err).ToNot(HaveOccurred())

// Check if we got the right node and spaceRoot
path, err := env.Lookup.Path(env.Ctx, n)
Expect(err).ToNot(HaveOccurred())
Expect(path).To(Equal("/dir1/file1"))
Expect(n.SpaceRoot.Name).To(Equal("userid"))
Expect(n.SpaceRoot.ParentID).To(Equal("root"))
})
})

Describe("Reference Parsing", func() {
It("parses a valid cs3 reference", func() {
in := []byte("cs3:bede11a0-ea3d-11eb-a78b-bf907adce8ed/c402d01c-ea3d-11eb-a0fc-c32f9d32528f")
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/utils/decomposedfs/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ func ReadNode(ctx context.Context, lu PathLookup, id string) (n *Node, err error
default:
return nil, errtypes.InternalError(err.Error())
}

// check if this is a space root
if _, err = xattr.Get(nodePath, xattrs.SpaceNameAttr); err == nil {
n.SpaceRoot = n
Expand Down Expand Up @@ -261,6 +262,7 @@ func (n *Node) Child(ctx context.Context, name string) (*Node, error) {
if err != nil {
return nil, errors.Wrap(err, "could not read child node")
}
c.SpaceRoot = n.SpaceRoot
} else {
return nil, fmt.Errorf("Decomposedfs: expected '../ prefix, got' %+v", link)
}
Expand Down
24 changes: 24 additions & 0 deletions pkg/storage/utils/decomposedfs/testhelpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"os"
"path/filepath"

"github.com/cs3org/reva/pkg/storage/utils/decomposedfs/xattrs"
"github.com/google/uuid"
"github.com/pkg/xattr"
"github.com/stretchr/testify/mock"

userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
Expand Down Expand Up @@ -112,6 +114,18 @@ func NewTestEnv() (*TestEnv, error) {
return nil, err
}

// the space name attribute is the stop condition in the lookup
h, err := lookup.HomeNode(ctx)
if err != nil {
return nil, err
}
if err := xattr.Set(h.InternalPath(), xattrs.SpaceNameAttr, []byte("username")); err != nil {
return nil, err
}
if err != nil {
return nil, err
}

// Create dir1
dir1, err := env.CreateTestDir("/dir1")
if err != nil {
Expand All @@ -130,6 +144,16 @@ func NewTestEnv() (*TestEnv, error) {
return nil, err
}

dir2, err := dir1.Child(ctx, "subdir1")
if err != nil {
return nil, err
}
// Create file1 in dir1
_, err = env.CreateTestFile("file2", "file2-blobid", 12345, dir2.ID)
if err != nil {
return nil, err
}

// Create emptydir
err = fs.CreateDir(ctx, &providerv1beta1.Reference{Path: "/emptydir"})
if err != nil {
Expand Down
65 changes: 33 additions & 32 deletions pkg/storage/utils/decomposedfs/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,37 @@ func (fs *Decomposedfs) Upload(ctx context.Context, ref *provider.Reference, r i
return uploadInfo.FinishUpload(ctx)
}

// CheckQuota checks if both disk space and available quota are sufficient
var CheckQuota = func(ctx context.Context, fs *Decomposedfs, spaceRoot *node.Node, fileSize uint64) (quotaSufficient bool, err error) {
used, _ := spaceRoot.GetTreeSize()
enoughDiskSpace := enoughDiskSpace(fs, spaceRoot.InternalPath(), fileSize)
if !enoughDiskSpace {
return false, errtypes.InsufficientStorage("disk full")
}
quotaB, _ := xattr.Get(spaceRoot.InternalPath(), xattrs.QuotaAttr)
var total uint64
if quotaB != nil {
total, _ = strconv.ParseUint(string(quotaB), 10, 64)
} else {
// if quota is not set, it means unlimited
return true, nil
}

if fileSize > total-used || total < used {
return false, errtypes.InsufficientStorage("quota exceeded")
}
return true, nil
}

func enoughDiskSpace(fs *Decomposedfs, path string, fileSize uint64) bool {
avalB, err := fs.getAvailableSize(path)
if err != nil {
return false
}

return avalB > fileSize
}

// InitiateUpload returns upload ids corresponding to different protocols it supports
// TODO read optional content for small files in this request
// TODO InitiateUpload (and Upload) needs a way to receive the expected checksum. Maybe in metadata as 'checksum' => 'sha1 aeosvp45w5xaeoe' = lowercase, space separated?
Expand Down Expand Up @@ -163,7 +194,7 @@ func (fs *Decomposedfs) InitiateUpload(ctx context.Context, ref *provider.Refere

log.Debug().Interface("info", info).Interface("node", n).Interface("metadata", metadata).Msg("Decomposedfs: resolved filename")

_, err = checkQuota(ctx, fs, n.SpaceRoot, uint64(info.Size))
_, err = CheckQuota(ctx, fs, n.SpaceRoot, uint64(info.Size))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -486,7 +517,7 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) {
)
n.SpaceRoot = node.New(upload.info.Storage["SpaceRoot"], "", "", 0, "", nil, upload.fs.lu)

_, err = checkQuota(upload.ctx, upload.fs, n.SpaceRoot, uint64(fi.Size()))
_, err = CheckQuota(upload.ctx, upload.fs, n.SpaceRoot, uint64(fi.Size()))
if err != nil {
return err
}
Expand Down Expand Up @@ -749,33 +780,3 @@ func (upload *fileUpload) ConcatUploads(ctx context.Context, uploads []tusd.Uplo

return
}

func checkQuota(ctx context.Context, fs *Decomposedfs, spaceRoot *node.Node, fileSize uint64) (quotaSufficient bool, err error) {
used, _ := spaceRoot.GetTreeSize()
enoughDiskSpace := enoughDiskSpace(fs, spaceRoot.InternalPath(), fileSize)
if !enoughDiskSpace {
return false, errtypes.InsufficientStorage("disk full")
}
quotaB, _ := xattr.Get(spaceRoot.InternalPath(), xattrs.QuotaAttr)
var total uint64
if quotaB != nil {
total, _ = strconv.ParseUint(string(quotaB), 10, 64)
} else {
// if quota is not set, it means unlimited
return true, nil
}

if fileSize > total-used || total < used {
return false, errtypes.InsufficientStorage("quota exceeded")
}
return true, nil
}

func enoughDiskSpace(fs *Decomposedfs, path string, fileSize uint64) bool {
avalB, err := fs.getAvailableSize(path)
if err != nil {
return false
}

return avalB > fileSize
}
48 changes: 48 additions & 0 deletions pkg/storage/utils/decomposedfs/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,17 @@ package decomposedfs_test
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"os"

userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/pkg/errtypes"
"github.com/cs3org/reva/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/pkg/storage/utils/decomposedfs/xattrs"
"github.com/pkg/xattr"
"github.com/stretchr/testify/mock"

ruser "github.com/cs3org/reva/pkg/ctx"
Expand Down Expand Up @@ -93,6 +98,20 @@ var _ = Describe("File uploads", func() {
Expect(err).ToNot(HaveOccurred())
})

Context("quota exceeded", func() {
Describe("InitiateUpload", func() {
It("fails", func() {
var originalFunc = decomposedfs.CheckQuota
decomposedfs.CheckQuota = func(ctx context.Context, fs *decomposedfs.Decomposedfs, spaceRoot *node.Node, fileSize uint64) (quotaSufficient bool, err error) {
return false, errtypes.InsufficientStorage("quota exceeded")
}
_, err := fs.InitiateUpload(ctx, ref, 10, map[string]string{})
Expect(err).To(MatchError(errtypes.InsufficientStorage("quota exceeded")))
decomposedfs.CheckQuota = originalFunc
})
})
})

Context("with insufficient permissions", func() {
BeforeEach(func() {
permissions.On("HasPermission", mock.Anything, mock.Anything, mock.Anything).Return(false, nil)
Expand All @@ -106,6 +125,35 @@ var _ = Describe("File uploads", func() {
})
})

Context("with insufficient permissions, home node", func() {
BeforeEach(func() {
var err error
// recreate the fs with home enabled
o.EnableHome = true
tree := tree.New(o.Root, true, true, lookup, bs)
fs, err = decomposedfs.New(o, lookup, permissions, tree)
Expect(err).ToNot(HaveOccurred())
err = fs.CreateHome(ctx)
Expect(err).ToNot(HaveOccurred())
// the space name attribute is the stop condition in the lookup
h, err := lookup.HomeNode(ctx)
Expect(err).ToNot(HaveOccurred())
err = xattr.Set(h.InternalPath(), xattrs.SpaceNameAttr, []byte("username"))
Expect(err).ToNot(HaveOccurred())
permissions.On("HasPermission", mock.Anything, mock.Anything, mock.Anything).Return(false, nil)
})

Describe("InitiateUpload", func() {
It("fails", func() {
h, err := lookup.HomeNode(ctx)
Expect(err).ToNot(HaveOccurred())
msg := fmt.Sprintf("error: permission denied: %s/foo", h.ID)
_, err = fs.InitiateUpload(ctx, ref, 10, map[string]string{})
Expect(err).To(MatchError(msg))
})
})
})

Context("with sufficient permissions", func() {
BeforeEach(func() {
permissions.On("HasPermission", mock.Anything, mock.Anything, mock.Anything).Return(true, nil)
Expand Down

0 comments on commit a300975

Please sign in to comment.