Skip to content

Commit

Permalink
Datatx createtransfershare (#1725)
Browse files Browse the repository at this point in the history
Co-authored-by: Ishank Arora <[email protected]>
  • Loading branch information
redblom and ishank011 authored May 25, 2021
1 parent 597e362 commit b1d57b9
Show file tree
Hide file tree
Showing 12 changed files with 269 additions and 57 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/datatx-createtransfer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: Create transfer type share

`transfer-create` creates a share of type transfer.

https://github.com/cs3org/reva/pull/1725
4 changes: 2 additions & 2 deletions cmd/reva/ocm-share-list-received.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ func ocmShareListReceivedCommand() *command {
t := table.NewWriter()
t.SetOutputMirror(os.Stdout)
t.AppendHeader(table.Row{"#", "Owner.Idp", "Owner.OpaqueId", "ResourceId", "Permissions", "Type",
"Grantee.Idp", "Grantee.OpaqueId", "Created", "Updated", "State"})
"Grantee.Idp", "Grantee.OpaqueId", "Created", "Updated", "State", "ShareType"})
for _, s := range shareRes.Shares {
t.AppendRows([]table.Row{
{s.Share.Id.OpaqueId, s.Share.Owner.Idp, s.Share.Owner.OpaqueId, s.Share.ResourceId.String(),
s.Share.Permissions.String(), s.Share.Grantee.Type.String(), s.Share.Grantee.GetUserId().Idp,
s.Share.Grantee.GetUserId().OpaqueId, time.Unix(int64(s.Share.Ctime.Seconds), 0),
time.Unix(int64(s.Share.Mtime.Seconds), 0), s.State.String()},
time.Unix(int64(s.Share.Mtime.Seconds), 0), s.State.String(), s.Share.ShareType.String()},
})
}
t.Render()
Expand Down
4 changes: 2 additions & 2 deletions cmd/reva/ocm-share-list.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,12 @@ func ocmShareListCommand() *command {
t := table.NewWriter()
t.SetOutputMirror(os.Stdout)
t.AppendHeader(table.Row{"#", "Owner.Idp", "Owner.OpaqueId", "ResourceId", "Permissions", "Type",
"Grantee.Idp", "Grantee.OpaqueId", "Created", "Updated"})
"ShareType", "Grantee.Idp", "Grantee.OpaqueId", "Created", "Updated"})

for _, s := range shareRes.Shares {
t.AppendRows([]table.Row{
{s.Id.OpaqueId, s.Owner.Idp, s.Owner.OpaqueId, s.ResourceId.String(), s.Permissions.String(),
s.Grantee.Type.String(), s.Grantee.GetUserId().Idp, s.Grantee.GetUserId().OpaqueId,
s.Grantee.Type.String(), s.ShareType.String(), s.Grantee.GetUserId().Idp, s.Grantee.GetUserId().OpaqueId,
time.Unix(int64(s.Ctime.Seconds), 0), time.Unix(int64(s.Mtime.Seconds), 0)},
})
}
Expand Down
129 changes: 120 additions & 9 deletions cmd/reva/transfer-create.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,42 +19,153 @@
package main

import (
"errors"
"io"
"os"
"strconv"
"strings"
"time"

userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
invitepb "github.com/cs3org/go-cs3apis/cs3/ocm/invite/v1beta1"
ocmprovider "github.com/cs3org/go-cs3apis/cs3/ocm/provider/v1beta1"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
tx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1"
ocm "github.com/cs3org/go-cs3apis/cs3/sharing/ocm/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/jedib0t/go-pretty/table"
"github.com/pkg/errors"
)

func transferCreateCommand() *command {
cmd := newCommand("transfer-create")
cmd.Description = func() string { return "create transfer between 2 remotes" }
cmd.Usage = func() string { return "Usage: transfer-create [-flags]" }
cmd.Description = func() string { return "create transfer between 2 sites" }
cmd.Usage = func() string { return "Usage: transfer-create [-flags] <path>" }
grantee := cmd.String("grantee", "", "the grantee, receiver of the transfer")
granteeType := cmd.String("granteeType", "user", "the grantee type, one of: user, group")
idp := cmd.String("idp", "", "the idp of the grantee, default to same idp as the user triggering the action")

cmd.Action = func(w ...io.Writer) error {
// validate flags
if cmd.NArg() < 1 {
return errors.New("Invalid arguments: " + cmd.Usage())
}

if *grantee == "" {
return errors.New("Grantee cannot be empty: use -grantee flag\n" + cmd.Usage())
}
if *idp == "" {
return errors.New("Idp cannot be empty: use -idp flag\n" + cmd.Usage())
}

// the resource to transfer; the path
fn := cmd.Args()[0]

ctx := getAuthContext()
client, err := getClient()
if err != nil {
return err
}

transferRequest := &tx.CreateTransferRequest{}
// check if invitation has been accepted
acceptedUserRes, err := client.GetAcceptedUser(ctx, &invitepb.GetAcceptedUserRequest{
RemoteUserId: &userpb.UserId{OpaqueId: *grantee, Idp: *idp},
})
if err != nil {
return err
}
if acceptedUserRes.Status.Code != rpc.Code_CODE_OK {
return formatError(acceptedUserRes.Status)
}

transferResponse, err := client.CreateTransfer(ctx, transferRequest)
// verify resource stats
statReq := &provider.StatRequest{
Ref: &provider.Reference{
Spec: &provider.Reference_Path{
Path: fn,
},
},
}
statRes, err := client.Stat(ctx, statReq)
if err != nil {
return err
}
if transferResponse.Status.Code != rpc.Code_CODE_OK {
return formatError(transferResponse.Status)
if statRes.Status.Code != rpc.Code_CODE_OK {
return formatError(statRes.Status)
}

providerInfoResp, err := client.GetInfoByDomain(ctx, &ocmprovider.GetInfoByDomainRequest{
Domain: *idp,
})
if err != nil {
return err
}

resourcePermissions, pint, err := getOCMSharePerm(editorPermission)
if err != nil {
return err
}

gt := provider.GranteeType_GRANTEE_TYPE_USER
if strings.ToLower(*granteeType) == "group" {
gt = provider.GranteeType_GRANTEE_TYPE_GROUP
}

createShareReq := &ocm.CreateOCMShareRequest{
Opaque: &types.Opaque{
Map: map[string]*types.OpaqueEntry{
"permissions": &types.OpaqueEntry{
Decoder: "plain",
Value: []byte(strconv.Itoa(pint)),
},
"name": &types.OpaqueEntry{
Decoder: "plain",
Value: []byte(statRes.Info.Path),
},
"protocol": &types.OpaqueEntry{
Decoder: "plain",
Value: []byte("datatx"),
},
},
},
ResourceId: statRes.Info.Id,
Grant: &ocm.ShareGrant{
Grantee: &provider.Grantee{
Type: gt,
Id: &provider.Grantee_UserId{
UserId: &userpb.UserId{
Idp: *idp,
OpaqueId: *grantee,
},
},
},
Permissions: resourcePermissions,
},
RecipientMeshProvider: providerInfoResp.ProviderInfo,
}

createShareResponse, err := client.CreateOCMShare(ctx, createShareReq)
if err != nil {
return err
}
if createShareResponse.Status.Code != rpc.Code_CODE_OK {
if createShareResponse.Status.Code == rpc.Code_CODE_NOT_FOUND {
return formatError(statRes.Status)
}
return err
}

t := table.NewWriter()
t.SetOutputMirror(os.Stdout)
t.AppendHeader(table.Row{"#", "Owner.Idp", "Owner.OpaqueId", "ResourceId", "Permissions", "Type", "Grantee.Idp", "Grantee.OpaqueId", "ShareType", "Created", "Updated"})

s := createShareResponse.Share
t.AppendRows([]table.Row{
{s.Id.OpaqueId, s.Owner.Idp, s.Owner.OpaqueId, s.ResourceId.String(), s.Permissions.String(),
s.Grantee.Type.String(), s.Grantee.GetUserId().Idp, s.Grantee.GetUserId().OpaqueId, s.ShareType.String(),
time.Unix(int64(s.Ctime.Seconds), 0), time.Unix(int64(s.Mtime.Seconds), 0)},
})
t.Render()
return nil
}

return cmd
}
13 changes: 9 additions & 4 deletions internal/grpc/services/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,11 @@ type config struct {
TransferExpires int64 `mapstructure:"transfer_expires"`
TokenManager string `mapstructure:"token_manager"`
// ShareFolder is the location where to create shares in the recipient's storage provider.
ShareFolder string `mapstructure:"share_folder"`
HomeMapping string `mapstructure:"home_mapping"`
TokenManagers map[string]map[string]interface{} `mapstructure:"token_managers"`
EtagCacheTTL int `mapstructure:"etag_cache_ttl"`
ShareFolder string `mapstructure:"share_folder"`
DataTransfersFolder string `mapstructure:"data_transfers_folder"`
HomeMapping string `mapstructure:"home_mapping"`
TokenManagers map[string]map[string]interface{} `mapstructure:"token_managers"`
EtagCacheTTL int `mapstructure:"etag_cache_ttl"`
}

// sets defaults
Expand All @@ -77,6 +78,10 @@ func (c *config) init() {

c.ShareFolder = strings.Trim(c.ShareFolder, "/")

if c.DataTransfersFolder == "" {
c.DataTransfersFolder = "Data-Transfers"
}

if c.TokenManager == "" {
c.TokenManager = "jwt"
}
Expand Down
44 changes: 33 additions & 11 deletions internal/grpc/services/gateway/ocmshareprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceive
panic("gateway: error updating a received share: the share is nil")
}

createRefStatus, err := s.createWebdavReference(ctx, share.Share)
createRefStatus, err := s.createOCMReference(ctx, share.Share)
return &ocm.UpdateReceivedOCMShareResponse{
Status: createRefStatus,
}, err
Expand Down Expand Up @@ -291,7 +291,7 @@ func (s *svc) GetReceivedOCMShare(ctx context.Context, req *ocm.GetReceivedOCMSh
return res, nil
}

func (s *svc) createWebdavReference(ctx context.Context, share *ocm.Share) (*rpc.Status, error) {
func (s *svc) createOCMReference(ctx context.Context, share *ocm.Share) (*rpc.Status, error) {

log := appctx.GetLogger(ctx)

Expand All @@ -314,17 +314,39 @@ func (s *svc) createWebdavReference(ctx context.Context, share *ocm.Share) (*rpc
return status.NewInternal(ctx, err, "error updating received share"), nil
}

// reference path is the home path + some name on the corresponding
// mesh provider (/home/MyShares/x)
// It is the responsibility of the gateway to resolve these references and merge the response back
// from the main request.
refPath := path.Join(homeRes.Path, s.c.ShareFolder, path.Base(share.Name))
log.Info().Msg("mount path will be:" + refPath)
var refPath, targetURI string
if share.ShareType == ocm.Share_SHARE_TYPE_TRANSFER {
createTransferDir, err := s.CreateContainer(ctx, &provider.CreateContainerRequest{
Ref: &provider.Reference{
Spec: &provider.Reference_Path{
Path: path.Join(homeRes.Path, s.c.DataTransfersFolder),
},
},
})
if err != nil {
return status.NewInternal(ctx, err, "error creating transfers directory"), nil
}
if createTransferDir.Status.Code != rpc.Code_CODE_OK && createTransferDir.Status.Code != rpc.Code_CODE_ALREADY_EXISTS {
err := status.NewErrorFromCode(createTransferDir.Status.GetCode(), "gateway")
return status.NewInternal(ctx, err, "error creating transfers directory"), nil
}

createRefReq := &provider.CreateReferenceRequest{
Path: refPath,
refPath = path.Join(homeRes.Path, s.c.DataTransfersFolder, path.Base(share.Name))
targetURI = fmt.Sprintf("datatx://%s@%s?name=%s", token, share.Creator.Idp, share.Name)
} else {
// reference path is the home path + some name on the corresponding
// mesh provider (/home/MyShares/x)
// It is the responsibility of the gateway to resolve these references and merge the response back
// from the main request.
refPath = path.Join(homeRes.Path, s.c.ShareFolder, path.Base(share.Name))
// webdav is the scheme, token@host the opaque part and the share name the query of the URL.
TargetUri: fmt.Sprintf("webdav://%s@%s?name=%s", token, share.Creator.Idp, share.Name),
targetURI = fmt.Sprintf("webdav://%s@%s?name=%s", token, share.Creator.Idp, share.Name)
}

log.Info().Msg("mount path will be:" + refPath)
createRefReq := &provider.CreateReferenceRequest{
Path: refPath,
TargetUri: targetURI,
}

c, err := s.findByPath(ctx, refPath)
Expand Down
10 changes: 9 additions & 1 deletion internal/grpc/services/ocmcore/ocmcore.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,15 @@ func (s *service) CreateOCMCoreShare(ctx context.Context, req *ocmcore.CreateOCM
},
}

share, err := s.sm.Share(ctx, resource, grant, req.Name, nil, "", req.Owner, token, ocm.Share_SHARE_TYPE_REGULAR)
var shareType ocm.Share_ShareType
switch req.Protocol.Name {
case "datatx":
shareType = ocm.Share_SHARE_TYPE_TRANSFER
default:
shareType = ocm.Share_SHARE_TYPE_REGULAR
}

share, err := s.sm.Share(ctx, resource, grant, req.Name, nil, "", req.Owner, token, shareType)
if err != nil {
return &ocmcore.CreateOCMCoreShareResponse{
Status: status.NewInternal(ctx, err, "error creating ocm core share"),
Expand Down
19 changes: 18 additions & 1 deletion internal/grpc/services/ocmshareprovider/ocmshareprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,24 @@ func (s *service) CreateOCMShare(ctx context.Context, req *ocm.CreateOCMShareReq
}, nil
}

share, err := s.sm.Share(ctx, req.ResourceId, req.Grant, name, req.RecipientMeshProvider, permissions, nil, "", ocm.Share_SHARE_TYPE_REGULAR)
// discover share type
sharetype := ocm.Share_SHARE_TYPE_REGULAR
protocol, ok := req.Opaque.Map["protocol"]
if ok {
switch protocol.Decoder {
case "plain":
if string(protocol.Value) == "datatx" {
sharetype = ocm.Share_SHARE_TYPE_TRANSFER
}
default:
err := errors.New("protocol decoder not recognized")
return &ocm.CreateOCMShareResponse{
Status: status.NewInternal(ctx, err, "error creating share"),
}, nil
}
}

share, err := s.sm.Share(ctx, req.ResourceId, req.Grant, name, req.RecipientMeshProvider, permissions, nil, "", sharetype)
if err != nil {
return &ocm.CreateOCMShareResponse{
Status: status.NewInternal(ctx, err, "error creating share"),
Expand Down
2 changes: 1 addition & 1 deletion internal/grpc/services/storageprovider/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ func (s *service) CreateContainer(ctx context.Context, req *provider.CreateConta
case errtypes.IsNotFound:
st = status.NewNotFound(ctx, "path not found when creating container")
case errtypes.AlreadyExists:
st = status.NewInternal(ctx, err, "error: container already exists")
st = status.NewAlreadyExists(ctx, err, "container already exists")
case errtypes.PermissionDenied:
st = status.NewPermissionDenied(ctx, err, "permission denied")
default:
Expand Down
44 changes: 32 additions & 12 deletions pkg/ocm/share/manager/json/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,20 +269,40 @@ func (m *mgr) Share(ctx context.Context, md *provider.ResourceId, g *ocm.ShareGr
}

if isOwnersMeshProvider {
token, ok := tokenpkg.ContextGetToken(ctx)
if !ok {
return nil, errors.New("Could not get token from context")
}
var protocol []byte
if st == ocm.Share_SHARE_TYPE_TRANSFER {
protocol, err = json.Marshal(
map[string]interface{}{
"name": "datatx",
"options": map[string]string{
"permissions": pm,
"token": token,
},
},
)
if err != nil {
err = errors.Wrap(err, "error marshalling protocol data")
return nil, err
}

// Call the remote provider's CreateOCMCoreShare method
protocol, err := json.Marshal(
map[string]interface{}{
"name": "webdav",
"options": map[string]string{
"permissions": pm,
"token": tokenpkg.ContextMustGetToken(ctx),
} else {
protocol, err = json.Marshal(
map[string]interface{}{
"name": "webdav",
"options": map[string]string{
"permissions": pm,
"token": tokenpkg.ContextMustGetToken(ctx),
},
},
},
)
if err != nil {
err = errors.Wrap(err, "error marshalling protocol data")
return nil, err
)
if err != nil {
err = errors.Wrap(err, "error marshalling protocol data")
return nil, err
}
}

requestBody := url.Values{
Expand Down
Loading

0 comments on commit b1d57b9

Please sign in to comment.