From a30097549055e21b0b3c1953b6c224b90f4b2c56 Mon Sep 17 00:00:00 2001 From: Michael Barz Date: Fri, 15 Oct 2021 21:58:53 +0200 Subject: [PATCH] lookup space root and tests --- .../unreleased/get-quota-storage-space.md | 1 + pkg/storage/utils/decomposedfs/lookup.go | 43 ++++++++++-- pkg/storage/utils/decomposedfs/lookup_test.go | 56 +++++++++++++++- pkg/storage/utils/decomposedfs/node/node.go | 2 + .../utils/decomposedfs/testhelpers/helpers.go | 24 +++++++ pkg/storage/utils/decomposedfs/upload.go | 65 ++++++++++--------- pkg/storage/utils/decomposedfs/upload_test.go | 48 ++++++++++++++ 7 files changed, 199 insertions(+), 40 deletions(-) diff --git a/changelog/unreleased/get-quota-storage-space.md b/changelog/unreleased/get-quota-storage-space.md index fe2089d381e..a6657bd77c5 100644 --- a/changelog/unreleased/get-quota-storage-space.md +++ b/changelog/unreleased/get-quota-storage-space.md @@ -6,3 +6,4 @@ Make the cs3apis accept a Reference in the getQuota Request to limit the call to https://github.com/cs3org/reva/pull/2152 https://github.com/cs3org/reva/pull/2178 +https://github.com/cs3org/reva/pull/2187 diff --git a/pkg/storage/utils/decomposedfs/lookup.go b/pkg/storage/utils/decomposedfs/lookup.go index 974edd0e4c1..57c10bf3f0d 100644 --- a/pkg/storage/utils/decomposedfs/lookup.go +++ b/pkg/storage/utils/decomposedfs/lookup.go @@ -45,11 +45,10 @@ func (lu *Lookup) NodeFromResource(ctx context.Context, ref *provider.Reference) if ref.ResourceId != nil { // check if a storage space reference is used // currently, the decomposed fs uses the root node id as the space id - spaceRoot, err := lu.NodeFromID(ctx, ref.ResourceId) + n, err := lu.NodeFromID(ctx, ref.ResourceId) if err != nil { return nil, err } - n := spaceRoot // is this a relative reference? if ref.Path != "" { p := filepath.Clean(ref.Path) @@ -62,8 +61,6 @@ func (lu *Lookup) NodeFromResource(ctx context.Context, ref *provider.Reference) return nil, err } } - // use reference id as space root for relative references - n.SpaceRoot = spaceRoot } return n, nil } @@ -110,7 +107,18 @@ func (lu *Lookup) NodeFromID(ctx context.Context, id *provider.ResourceId) (n *n if id == nil || id.OpaqueId == "" { return nil, fmt.Errorf("invalid resource id %+v", id) } - return node.ReadNode(ctx, lu, id.OpaqueId) + n, err = node.ReadNode(ctx, lu, id.OpaqueId) + if err != nil { + return nil, err + } + // we only need to look for the space root if this is not a space root itself + if n.SpaceRoot == nil { + err := lu.FindStorageSpaceRoot(n) + if err != nil { + return nil, err + } + } + return n, err } // Path returns the path for node @@ -180,6 +188,12 @@ func (lu *Lookup) WalkPath(ctx context.Context, r *node.Node, p string, followRe } } + if spaceNameBytes, err := xattr.Get(r.InternalPath(), xattrs.SpaceNameAttr); err == nil { + if string(spaceNameBytes) != "" { + r.SpaceRoot = r + } + } + if !r.Exists && i < len(segments)-1 { return r, errtypes.NotFound(segments[i]) } @@ -192,6 +206,25 @@ func (lu *Lookup) WalkPath(ctx context.Context, r *node.Node, p string, followRe return r, nil } +// FindStorageSpaceRoot calls n.Parent() and climbs the tree until it finds the space root node. +func (lu *Lookup) FindStorageSpaceRoot(r *node.Node) error { + var err error + n := r + for i := 0; r.ParentID != ""; i++ { + if r, err = r.Parent(); err != nil { + return err + } + path := r.InternalPath() + if spaceNameBytes, err := xattr.Get(path, xattrs.SpaceNameAttr); err == nil { + if string(spaceNameBytes) != "" { + n.SpaceRoot = r + break + } + } + } + return nil +} + // HomeOrRootNode returns the users home node when home support is enabled. // it returns the storages root node otherwise func (lu *Lookup) HomeOrRootNode(ctx context.Context) (node *node.Node, err error) { diff --git a/pkg/storage/utils/decomposedfs/lookup_test.go b/pkg/storage/utils/decomposedfs/lookup_test.go index 699e3d9d3e4..3ddc5ee72cb 100644 --- a/pkg/storage/utils/decomposedfs/lookup_test.go +++ b/pkg/storage/utils/decomposedfs/lookup_test.go @@ -19,9 +19,9 @@ package decomposedfs_test import ( + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" helpers "github.com/cs3org/reva/pkg/storage/utils/decomposedfs/testhelpers" "github.com/cs3org/reva/pkg/storage/utils/decomposedfs/xattrs" - . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" ) @@ -41,10 +41,9 @@ var _ = Describe("Lookup", func() { if env != nil { env.Cleanup() } - }) - Describe("Path", func() { + Describe("Node from path", func() { It("returns the path including a leading slash", func() { n, err := env.Lookup.NodeFromPath(env.Ctx, "/dir1/file1", false) Expect(err).ToNot(HaveOccurred()) @@ -55,6 +54,57 @@ var _ = Describe("Lookup", func() { }) }) + Describe("Node From Resource only by path", func() { + It("returns the path including a leading slash and the space root is set", func() { + n, err := env.Lookup.NodeFromResource(env.Ctx, &provider.Reference{Path: "/dir1/subdir1/file2"}) + Expect(err).ToNot(HaveOccurred()) + + path, err := env.Lookup.Path(env.Ctx, n) + Expect(err).ToNot(HaveOccurred()) + Expect(path).To(Equal("/dir1/subdir1/file2")) + Expect(n.SpaceRoot.Name).To(Equal("userid")) + Expect(n.SpaceRoot.ParentID).To(Equal("root")) + }) + }) + + Describe("Node From Resource only by id", func() { + It("returns the path including a leading slash and the space root is set", func() { + // do a node lookup by path + nRef, err := env.Lookup.NodeFromPath(env.Ctx, "/dir1/file1", false) + Expect(err).ToNot(HaveOccurred()) + + // try to find the same node by id + n, err := env.Lookup.NodeFromResource(env.Ctx, &provider.Reference{ResourceId: &provider.ResourceId{OpaqueId: nRef.ID}}) + Expect(err).ToNot(HaveOccurred()) + + // Check if we got the right node and spaceRoot + path, err := env.Lookup.Path(env.Ctx, n) + Expect(err).ToNot(HaveOccurred()) + Expect(path).To(Equal("/dir1/file1")) + Expect(n.SpaceRoot.Name).To(Equal("userid")) + Expect(n.SpaceRoot.ParentID).To(Equal("root")) + }) + }) + + Describe("Node From Resource by id and relative path", func() { + It("returns the path including a leading slash and the space root is set", func() { + // do a node lookup by path for the parent + nRef, err := env.Lookup.NodeFromPath(env.Ctx, "/dir1", false) + Expect(err).ToNot(HaveOccurred()) + + // try to find the child node by parent id and relative path + n, err := env.Lookup.NodeFromResource(env.Ctx, &provider.Reference{ResourceId: &provider.ResourceId{OpaqueId: nRef.ID}, Path: "./file1"}) + Expect(err).ToNot(HaveOccurred()) + + // Check if we got the right node and spaceRoot + path, err := env.Lookup.Path(env.Ctx, n) + Expect(err).ToNot(HaveOccurred()) + Expect(path).To(Equal("/dir1/file1")) + Expect(n.SpaceRoot.Name).To(Equal("userid")) + Expect(n.SpaceRoot.ParentID).To(Equal("root")) + }) + }) + Describe("Reference Parsing", func() { It("parses a valid cs3 reference", func() { in := []byte("cs3:bede11a0-ea3d-11eb-a78b-bf907adce8ed/c402d01c-ea3d-11eb-a0fc-c32f9d32528f") diff --git a/pkg/storage/utils/decomposedfs/node/node.go b/pkg/storage/utils/decomposedfs/node/node.go index 84f05fcfa7b..98715485764 100644 --- a/pkg/storage/utils/decomposedfs/node/node.go +++ b/pkg/storage/utils/decomposedfs/node/node.go @@ -192,6 +192,7 @@ func ReadNode(ctx context.Context, lu PathLookup, id string) (n *Node, err error default: return nil, errtypes.InternalError(err.Error()) } + // check if this is a space root if _, err = xattr.Get(nodePath, xattrs.SpaceNameAttr); err == nil { n.SpaceRoot = n @@ -261,6 +262,7 @@ func (n *Node) Child(ctx context.Context, name string) (*Node, error) { if err != nil { return nil, errors.Wrap(err, "could not read child node") } + c.SpaceRoot = n.SpaceRoot } else { return nil, fmt.Errorf("Decomposedfs: expected '../ prefix, got' %+v", link) } diff --git a/pkg/storage/utils/decomposedfs/testhelpers/helpers.go b/pkg/storage/utils/decomposedfs/testhelpers/helpers.go index 5bb8ad38857..ae4e7564553 100644 --- a/pkg/storage/utils/decomposedfs/testhelpers/helpers.go +++ b/pkg/storage/utils/decomposedfs/testhelpers/helpers.go @@ -23,7 +23,9 @@ import ( "os" "path/filepath" + "github.com/cs3org/reva/pkg/storage/utils/decomposedfs/xattrs" "github.com/google/uuid" + "github.com/pkg/xattr" "github.com/stretchr/testify/mock" userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" @@ -112,6 +114,18 @@ func NewTestEnv() (*TestEnv, error) { return nil, err } + // the space name attribute is the stop condition in the lookup + h, err := lookup.HomeNode(ctx) + if err != nil { + return nil, err + } + if err := xattr.Set(h.InternalPath(), xattrs.SpaceNameAttr, []byte("username")); err != nil { + return nil, err + } + if err != nil { + return nil, err + } + // Create dir1 dir1, err := env.CreateTestDir("/dir1") if err != nil { @@ -130,6 +144,16 @@ func NewTestEnv() (*TestEnv, error) { return nil, err } + dir2, err := dir1.Child(ctx, "subdir1") + if err != nil { + return nil, err + } + // Create file1 in dir1 + _, err = env.CreateTestFile("file2", "file2-blobid", 12345, dir2.ID) + if err != nil { + return nil, err + } + // Create emptydir err = fs.CreateDir(ctx, &providerv1beta1.Reference{Path: "/emptydir"}) if err != nil { diff --git a/pkg/storage/utils/decomposedfs/upload.go b/pkg/storage/utils/decomposedfs/upload.go index ff8b61b3acd..5cbfe9ead0d 100644 --- a/pkg/storage/utils/decomposedfs/upload.go +++ b/pkg/storage/utils/decomposedfs/upload.go @@ -110,6 +110,37 @@ func (fs *Decomposedfs) Upload(ctx context.Context, ref *provider.Reference, r i return uploadInfo.FinishUpload(ctx) } +// CheckQuota checks if both disk space and available quota are sufficient +var CheckQuota = func(ctx context.Context, fs *Decomposedfs, spaceRoot *node.Node, fileSize uint64) (quotaSufficient bool, err error) { + used, _ := spaceRoot.GetTreeSize() + enoughDiskSpace := enoughDiskSpace(fs, spaceRoot.InternalPath(), fileSize) + if !enoughDiskSpace { + return false, errtypes.InsufficientStorage("disk full") + } + quotaB, _ := xattr.Get(spaceRoot.InternalPath(), xattrs.QuotaAttr) + var total uint64 + if quotaB != nil { + total, _ = strconv.ParseUint(string(quotaB), 10, 64) + } else { + // if quota is not set, it means unlimited + return true, nil + } + + if fileSize > total-used || total < used { + return false, errtypes.InsufficientStorage("quota exceeded") + } + return true, nil +} + +func enoughDiskSpace(fs *Decomposedfs, path string, fileSize uint64) bool { + avalB, err := fs.getAvailableSize(path) + if err != nil { + return false + } + + return avalB > fileSize +} + // InitiateUpload returns upload ids corresponding to different protocols it supports // TODO read optional content for small files in this request // TODO InitiateUpload (and Upload) needs a way to receive the expected checksum. Maybe in metadata as 'checksum' => 'sha1 aeosvp45w5xaeoe' = lowercase, space separated? @@ -163,7 +194,7 @@ func (fs *Decomposedfs) InitiateUpload(ctx context.Context, ref *provider.Refere log.Debug().Interface("info", info).Interface("node", n).Interface("metadata", metadata).Msg("Decomposedfs: resolved filename") - _, err = checkQuota(ctx, fs, n.SpaceRoot, uint64(info.Size)) + _, err = CheckQuota(ctx, fs, n.SpaceRoot, uint64(info.Size)) if err != nil { return nil, err } @@ -486,7 +517,7 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { ) n.SpaceRoot = node.New(upload.info.Storage["SpaceRoot"], "", "", 0, "", nil, upload.fs.lu) - _, err = checkQuota(upload.ctx, upload.fs, n.SpaceRoot, uint64(fi.Size())) + _, err = CheckQuota(upload.ctx, upload.fs, n.SpaceRoot, uint64(fi.Size())) if err != nil { return err } @@ -749,33 +780,3 @@ func (upload *fileUpload) ConcatUploads(ctx context.Context, uploads []tusd.Uplo return } - -func checkQuota(ctx context.Context, fs *Decomposedfs, spaceRoot *node.Node, fileSize uint64) (quotaSufficient bool, err error) { - used, _ := spaceRoot.GetTreeSize() - enoughDiskSpace := enoughDiskSpace(fs, spaceRoot.InternalPath(), fileSize) - if !enoughDiskSpace { - return false, errtypes.InsufficientStorage("disk full") - } - quotaB, _ := xattr.Get(spaceRoot.InternalPath(), xattrs.QuotaAttr) - var total uint64 - if quotaB != nil { - total, _ = strconv.ParseUint(string(quotaB), 10, 64) - } else { - // if quota is not set, it means unlimited - return true, nil - } - - if fileSize > total-used || total < used { - return false, errtypes.InsufficientStorage("quota exceeded") - } - return true, nil -} - -func enoughDiskSpace(fs *Decomposedfs, path string, fileSize uint64) bool { - avalB, err := fs.getAvailableSize(path) - if err != nil { - return false - } - - return avalB > fileSize -} diff --git a/pkg/storage/utils/decomposedfs/upload_test.go b/pkg/storage/utils/decomposedfs/upload_test.go index 01ef9cb4b3e..6dea19be9ef 100644 --- a/pkg/storage/utils/decomposedfs/upload_test.go +++ b/pkg/storage/utils/decomposedfs/upload_test.go @@ -21,12 +21,17 @@ package decomposedfs_test import ( "bytes" "context" + "fmt" "io" "io/ioutil" "os" userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/cs3org/reva/pkg/errtypes" + "github.com/cs3org/reva/pkg/storage/utils/decomposedfs/node" + "github.com/cs3org/reva/pkg/storage/utils/decomposedfs/xattrs" + "github.com/pkg/xattr" "github.com/stretchr/testify/mock" ruser "github.com/cs3org/reva/pkg/ctx" @@ -93,6 +98,20 @@ var _ = Describe("File uploads", func() { Expect(err).ToNot(HaveOccurred()) }) + Context("quota exceeded", func() { + Describe("InitiateUpload", func() { + It("fails", func() { + var originalFunc = decomposedfs.CheckQuota + decomposedfs.CheckQuota = func(ctx context.Context, fs *decomposedfs.Decomposedfs, spaceRoot *node.Node, fileSize uint64) (quotaSufficient bool, err error) { + return false, errtypes.InsufficientStorage("quota exceeded") + } + _, err := fs.InitiateUpload(ctx, ref, 10, map[string]string{}) + Expect(err).To(MatchError(errtypes.InsufficientStorage("quota exceeded"))) + decomposedfs.CheckQuota = originalFunc + }) + }) + }) + Context("with insufficient permissions", func() { BeforeEach(func() { permissions.On("HasPermission", mock.Anything, mock.Anything, mock.Anything).Return(false, nil) @@ -106,6 +125,35 @@ var _ = Describe("File uploads", func() { }) }) + Context("with insufficient permissions, home node", func() { + BeforeEach(func() { + var err error + // recreate the fs with home enabled + o.EnableHome = true + tree := tree.New(o.Root, true, true, lookup, bs) + fs, err = decomposedfs.New(o, lookup, permissions, tree) + Expect(err).ToNot(HaveOccurred()) + err = fs.CreateHome(ctx) + Expect(err).ToNot(HaveOccurred()) + // the space name attribute is the stop condition in the lookup + h, err := lookup.HomeNode(ctx) + Expect(err).ToNot(HaveOccurred()) + err = xattr.Set(h.InternalPath(), xattrs.SpaceNameAttr, []byte("username")) + Expect(err).ToNot(HaveOccurred()) + permissions.On("HasPermission", mock.Anything, mock.Anything, mock.Anything).Return(false, nil) + }) + + Describe("InitiateUpload", func() { + It("fails", func() { + h, err := lookup.HomeNode(ctx) + Expect(err).ToNot(HaveOccurred()) + msg := fmt.Sprintf("error: permission denied: %s/foo", h.ID) + _, err = fs.InitiateUpload(ctx, ref, 10, map[string]string{}) + Expect(err).To(MatchError(msg)) + }) + }) + }) + Context("with sufficient permissions", func() { BeforeEach(func() { permissions.On("HasPermission", mock.Anything, mock.Anything, mock.Anything).Return(true, nil)