Skip to content

Commit

Permalink
Refactor listing and statting across providers for virtual views (#1925)
Browse files Browse the repository at this point in the history
  • Loading branch information
ishank011 authored Jul 28, 2021
1 parent b7d9958 commit 30db43b
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 76 deletions.
3 changes: 3 additions & 0 deletions changelog/unreleased/virtual-views-refactor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Enhancement: Refactor listing and statting across providers for virtual views

https://github.com/cs3org/reva/pull/1925
102 changes: 71 additions & 31 deletions internal/grpc/services/gateway/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"net/url"
"path"
"path/filepath"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -293,9 +294,10 @@ func (s *svc) DeleteStorageSpace(ctx context.Context, req *provider.DeleteStorag
}

func (s *svc) GetHome(ctx context.Context, _ *provider.GetHomeRequest) (*provider.GetHomeResponse, error) {
home := s.getHome(ctx)
homeRes := &provider.GetHomeResponse{Path: home, Status: status.NewOK(ctx)}
return homeRes, nil
return &provider.GetHomeResponse{
Path: s.getHome(ctx),
Status: status.NewOK(ctx),
}, nil
}

func (s *svc) getHome(_ context.Context) string {
Expand Down Expand Up @@ -369,7 +371,7 @@ func (s *svc) InitiateFileDownload(ctx context.Context, req *provider.InitiateFi

if protocol == "webdav" {
// TODO(ishank011): pass this through the datagateway service
// for now, we just expose the file server to the user
// For now, we just expose the file server to the user
ep, opaque, err := s.webdavRefTransferEndpoint(ctx, statRes.Info.Target)
if err != nil {
return &gateway.InitiateFileDownloadResponse{
Expand Down Expand Up @@ -433,7 +435,7 @@ func (s *svc) InitiateFileDownload(ctx context.Context, req *provider.InitiateFi

if protocol == "webdav" {
// TODO(ishank011): pass this through the datagateway service
// for now, we just expose the file server to the user
// For now, we just expose the file server to the user
ep, opaque, err := s.webdavRefTransferEndpoint(ctx, statRes.Info.Target, shareChild)
if err != nil {
return &gateway.InitiateFileDownloadResponse{
Expand Down Expand Up @@ -568,7 +570,7 @@ func (s *svc) InitiateFileUpload(ctx context.Context, req *provider.InitiateFile

if protocol == "webdav" {
// TODO(ishank011): pass this through the datagateway service
// for now, we just expose the file server to the user
// For now, we just expose the file server to the user
ep, opaque, err := s.webdavRefTransferEndpoint(ctx, statRes.Info.Target)
if err != nil {
return &gateway.InitiateFileUploadResponse{
Expand Down Expand Up @@ -630,7 +632,7 @@ func (s *svc) InitiateFileUpload(ctx context.Context, req *provider.InitiateFile

if protocol == "webdav" {
// TODO(ishank011): pass this through the datagateway service
// for now, we just expose the file server to the user
// For now, we just expose the file server to the user
ep, opaque, err := s.webdavRefTransferEndpoint(ctx, statRes.Info.Target, shareChild)
if err != nil {
return &gateway.InitiateFileUploadResponse{
Expand Down Expand Up @@ -987,10 +989,10 @@ func (s *svc) Move(ctx context.Context, req *provider.MoveRequest) (*provider.Mo
}, nil
}

dp, st2 := s.getPath(ctx, req.Destination)
if st2.Code != rpc.Code_CODE_OK && st2.Code != rpc.Code_CODE_NOT_FOUND {
dp, st := s.getPath(ctx, req.Destination)
if st.Code != rpc.Code_CODE_OK && st.Code != rpc.Code_CODE_NOT_FOUND {
return &provider.MoveResponse{
Status: st2,
Status: st,
}, nil
}

Expand Down Expand Up @@ -1083,6 +1085,15 @@ func (s *svc) move(ctx context.Context, req *provider.MoveRequest) (*provider.Mo
Status: status.NewStatusFromErrType(ctx, "move dst="+req.Destination.String(), err),
}, nil
}

// if providers are not the same we do not implement cross storage move yet.
if len(srcList) != 1 || len(dstList) != 1 {
res := &provider.MoveResponse{
Status: status.NewUnimplemented(ctx, nil, "gateway: cross storage copy not yet implemented"),
}
return res, nil
}

srcP, dstP := srcList[0], dstList[0]

// if providers are not the same we do not implement cross storage copy yet.
Expand Down Expand Up @@ -1247,6 +1258,12 @@ func (s *svc) stat(ctx context.Context, req *provider.StatRequest) (*provider.St
return c.Stat(ctx, req)
}

return s.statAcrossProviders(ctx, req, providers)
}

func (s *svc) statAcrossProviders(ctx context.Context, req *provider.StatRequest, providers []*registry.ProviderInfo) (*provider.StatResponse, error) {
log := appctx.GetLogger(ctx)

infoFromProviders := make([]*provider.ResourceInfo, len(providers))
errors := make([]error, len(providers))
var wg sync.WaitGroup
Expand All @@ -1260,9 +1277,8 @@ func (s *svc) stat(ctx context.Context, req *provider.StatRequest) (*provider.St
var totalSize uint64
for i := range providers {
if errors[i] != nil {
return &provider.StatResponse{
Status: status.NewStatusFromErrType(ctx, "stat ref: "+req.Ref.String(), errors[i]),
}, nil
log.Warn().Msgf("statting on provider %s returned err %+v", providers[i].ProviderPath, errors[i])
continue
}
if infoFromProviders[i] != nil {
totalSize += infoFromProviders[i].Size
Expand All @@ -1278,7 +1294,7 @@ func (s *svc) stat(ctx context.Context, req *provider.StatRequest) (*provider.St
OpaqueId: uuid.New().String(),
},
Type: provider.ResourceType_RESOURCE_TYPE_CONTAINER,
Path: resPath,
Path: req.Ref.GetPath(),
Size: totalSize,
},
}, nil
Expand All @@ -1299,7 +1315,7 @@ func (s *svc) statOnProvider(ctx context.Context, req *provider.StatRequest, res
}
r, err := c.Stat(ctx, &provider.StatRequest{Ref: &provider.Reference{Path: newPath}})
if err != nil {
*e = errors.Wrap(err, "gateway: error calling ListContainer")
*e = errors.Wrap(err, fmt.Sprintf("gateway: error calling Stat %s on %+v", newPath, p))
return
}
if res == nil {
Expand Down Expand Up @@ -1588,51 +1604,55 @@ func (s *svc) listSharesFolder(ctx context.Context) (*provider.ListContainerResp
}

func (s *svc) listContainer(ctx context.Context, req *provider.ListContainerRequest) (*provider.ListContainerResponse, error) {
log := appctx.GetLogger(ctx)
providers, err := s.findProviders(ctx, req.Ref)
if err != nil {
return &provider.ListContainerResponse{
Status: status.NewStatusFromErrType(ctx, "listContainer ref: "+req.Ref.String(), err),
}, nil
}

resPath := path.Clean(req.Ref.GetPath())
infoFromProviders := make([][]*provider.ResourceInfo, len(providers))
errors := make([]error, len(providers))
indirects := make([]bool, len(providers))
var wg sync.WaitGroup

for i, p := range providers {
wg.Add(1)
go s.listContainerOnProvider(ctx, req, &infoFromProviders[i], p, &errors[i], &wg)
go s.listContainerOnProvider(ctx, req, &infoFromProviders[i], p, &indirects[i], &errors[i], &wg)
}
wg.Wait()

infos := []*provider.ResourceInfo{}
indirects := make(map[string][]*provider.ResourceInfo)
nestedInfos := make(map[string][]*provider.ResourceInfo)
for i := range providers {
if errors[i] != nil {
return &provider.ListContainerResponse{
Status: status.NewStatusFromErrType(ctx, "listContainer ref: "+req.Ref.String(), errors[i]),
}, nil
// return if there's only one mount, else skip this one
if len(providers) == 1 {
return &provider.ListContainerResponse{
Status: status.NewStatusFromErrType(ctx, "listContainer ref: "+req.Ref.String(), errors[i]),
}, nil
}
log.Warn().Msgf("listing container on provider %s returned err %+v", providers[i].ProviderPath, errors[i])
continue
}
for _, inf := range infoFromProviders[i] {
if parent := path.Dir(inf.Path); resPath != "" && resPath != parent {
parts := strings.Split(strings.TrimPrefix(inf.Path, resPath), "/")
p := path.Join(resPath, parts[1])
indirects[p] = append(indirects[p], inf)
if indirects[i] {
p := inf.Path
nestedInfos[p] = append(nestedInfos[p], inf)
} else {
infos = append(infos, inf)
}
}
}

for k, v := range indirects {
for k := range nestedInfos {
inf := &provider.ResourceInfo{
Id: &provider.ResourceId{
StorageId: "/",
OpaqueId: uuid.New().String(),
},
Type: provider.ResourceType_RESOURCE_TYPE_CONTAINER,
Etag: etag.GenerateEtagFromResources(nil, v),
Path: k,
Size: 0,
}
Expand All @@ -1645,7 +1665,7 @@ func (s *svc) listContainer(ctx context.Context, req *provider.ListContainerRequ
}, nil
}

func (s *svc) listContainerOnProvider(ctx context.Context, req *provider.ListContainerRequest, res *[]*provider.ResourceInfo, p *registry.ProviderInfo, e *error, wg *sync.WaitGroup) {
func (s *svc) listContainerOnProvider(ctx context.Context, req *provider.ListContainerRequest, res *[]*provider.ResourceInfo, p *registry.ProviderInfo, ind *bool, e *error, wg *sync.WaitGroup) {
defer wg.Done()
c, err := s.getStorageProviderClient(ctx, p)
if err != nil {
Expand All @@ -1654,11 +1674,31 @@ func (s *svc) listContainerOnProvider(ctx context.Context, req *provider.ListCon
}

resPath := path.Clean(req.Ref.GetPath())
newPath := req.Ref.GetPath()
if resPath != "" && !strings.HasPrefix(resPath, p.ProviderPath) {
newPath = p.ProviderPath
// The path which we're supposed to list encompasses this provider
// so just return the first child and mark it as indirect
rel, err := filepath.Rel(resPath, p.ProviderPath)
if err != nil {
*e = err
return
}
parts := strings.Split(rel, "/")
p := path.Join(resPath, parts[0])
*ind = true
*res = []*provider.ResourceInfo{
{
Id: &provider.ResourceId{
StorageId: "/",
OpaqueId: uuid.New().String(),
},
Type: provider.ResourceType_RESOURCE_TYPE_CONTAINER,
Path: p,
Size: 0,
},
}
return
}
r, err := c.ListContainer(ctx, &provider.ListContainerRequest{Ref: &provider.Reference{Path: newPath}})
r, err := c.ListContainer(ctx, req)
if err != nil {
*e = errors.Wrap(err, "gateway: error calling ListContainer")
return
Expand Down
Loading

0 comments on commit 30db43b

Please sign in to comment.