From ad0a2ec7d5981b3de1bb1ddc71054c7b90aeab8d Mon Sep 17 00:00:00 2001 From: Antoon P Date: Thu, 24 Nov 2022 12:06:07 +0100 Subject: [PATCH 1/9] Transfer method depends on implementation or configuration. --- pkg/datatx/datatx.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/datatx/datatx.go b/pkg/datatx/datatx.go index 2671dd09bb..07419c6bdc 100644 --- a/pkg/datatx/datatx.go +++ b/pkg/datatx/datatx.go @@ -26,8 +26,7 @@ import ( // Manager the interface any transfer driver should implement. type Manager interface { - // StartTransfer initiates a transfer job and returns a TxInfo object including a unique transfer id, and error if any. - StartTransfer(ctx context.Context, srcRemote string, srcPath string, srcToken string, destRemote string, destPath string, destToken string) (*datatx.TxInfo, error) + Transfer(ctx context.Context, srcTargetURI string, dstTargetURI string) (*datatx.TxInfo, error) // GetTransferStatus returns a TxInfo object including the current status, and error if any. GetTransferStatus(ctx context.Context, transferID string) (*datatx.TxInfo, error) // CancelTransfer cancels the transfer and returns a TxInfo object and error if any. From 1f65bc08d5d0eda60f256ece83132f52cf680d61 Mon Sep 17 00:00:00 2001 From: Antoon P Date: Thu, 24 Nov 2022 12:08:39 +0100 Subject: [PATCH 2/9] Support rclone tpc push. --- pkg/datatx/manager/rclone/rclone.go | 69 ++++++++++++++++++++++++++--- 1 file changed, 64 insertions(+), 5 deletions(-) diff --git a/pkg/datatx/manager/rclone/rclone.go b/pkg/datatx/manager/rclone/rclone.go index b6eb1f4035..ec7b0f6123 100644 --- a/pkg/datatx/manager/rclone/rclone.go +++ b/pkg/datatx/manager/rclone/rclone.go @@ -37,6 +37,7 @@ import ( "github.com/cs3org/reva/pkg/appctx" txdriver "github.com/cs3org/reva/pkg/datatx" registry "github.com/cs3org/reva/pkg/datatx/manager/registry" + "github.com/cs3org/reva/pkg/errtypes" "github.com/cs3org/reva/pkg/rhttp" "github.com/google/uuid" "github.com/mitchellh/mapstructure" @@ -64,6 +65,7 @@ type config struct { Endpoint string `mapstructure:"endpoint"` AuthUser string `mapstructure:"auth_user"` // rclone basicauth user AuthPass string `mapstructure:"auth_pass"` // rclone basicauth pass + AuthHeader string `mapstructure:"auth_header"` File string `mapstructure:"file"` JobStatusCheckInterval int `mapstructure:"job_status_check_interval"` JobTimeout int `mapstructure:"job_timeout"` @@ -118,7 +120,14 @@ var txEndStatuses = map[string]int32{ "STATUS_TRANSFER_EXPIRED": 10, } -// New returns a new rclone driver. +type endpoint struct { + filePath string + endpoint string + endpointScheme string + token string +} + +// New returns a new rclone driver func New(m map[string]interface{}) (txdriver.Manager, error) { c, err := parseConfig(m) if err != nil { @@ -207,9 +216,31 @@ func (m *transferModel) saveTransfer(e error) error { return e } -// StartTransfer initiates a transfer job and returns a TxInfo object that includes a unique transfer id. -func (driver *rclone) StartTransfer(ctx context.Context, srcRemote string, srcPath string, srcToken string, destRemote string, destPath string, destToken string) (*datatx.TxInfo, error) { - return driver.startJob(ctx, "", srcRemote, srcPath, srcToken, destRemote, destPath, destToken) +// Transfer initiates a transfer job and returns a TxInfo object that includes a unique transfer id. +func (driver *rclone) Transfer(ctx context.Context, srcTargetURI string, dstTargetURI string) (*datatx.TxInfo, error) { + logger := appctx.GetLogger(ctx) + + srcEp, err := driver.extractEndpointInfo(ctx, srcTargetURI) + if err != nil { + return nil, err + } + srcRemote := fmt.Sprintf("%s://%s", srcEp.endpointScheme, srcEp.endpoint) + srcPath := srcEp.filePath + srcToken := srcEp.token + + destEp, err := driver.extractEndpointInfo(ctx, dstTargetURI) + if err != nil { + return nil, err + } + dstPath := destEp.filePath + dstToken := destEp.token + // we always set the userinfo part of the destination url for rclone tpc push support + dstRemote := fmt.Sprintf("%s://%s@%s", destEp.endpointScheme, dstToken, destEp.endpoint) + + logger.Debug().Msgf("destination target URI: %v", dstTargetURI) + logger.Debug().Msgf("destination remote: %v", dstRemote) + + return driver.startJob(ctx, "", srcRemote, srcPath, srcToken, dstRemote, dstPath, dstToken) } // startJob starts a transfer job. Retries a previous job if transferID is specified. @@ -281,7 +312,12 @@ func (driver *rclone) startJob(ctx context.Context, transferID string, srcRemote // DstToken string `json:"destToken"` Async bool `json:"_async"` } - srcFs := fmt.Sprintf(":webdav,headers=\"x-access-token,%v\",url=\"%v\":%v", srcToken, srcRemote, srcPath) + // bearer is the default authentication scheme for reva + srcAuthHeader := fmt.Sprintf("bearer_token=\"%v\"", srcToken) + if driver.config.AuthHeader == "x-access-token" { + srcAuthHeader = fmt.Sprintf("headers=\"x-access-token,%v\"", srcToken) + } + srcFs := fmt.Sprintf(":webdav,%v,url=\"%v\":%v", srcAuthHeader, srcRemote, srcPath) dstFs := fmt.Sprintf(":webdav,headers=\"x-access-token,%v\",url=\"%v\":%v", destToken, destRemote, destPath) rcloneReq := &rcloneAsyncReqJSON{ SrcFs: srcFs, @@ -829,3 +865,26 @@ func (driver *rclone) remotePathIsFolder(remote string, remotePath string, remot // in all other cases the remote path is a directory return true, nil } + +func (driver *rclone) extractEndpointInfo(ctx context.Context, targetURL string) (*endpoint, error) { + if targetURL == "" { + return nil, errtypes.BadRequest("datatx service: ref target is an empty uri") + } + + uri, err := url.Parse(targetURL) + if err != nil { + return nil, errors.Wrap(err, "datatx service: error parsing target uri: "+targetURL) + } + + m, err := url.ParseQuery(uri.RawQuery) + if err != nil { + return nil, errors.Wrap(err, "datatx service: error parsing target resource name") + } + + return &endpoint{ + filePath: m["name"][0], + endpoint: uri.Host + uri.Path, + endpointScheme: uri.Scheme, + token: uri.User.String(), + }, nil +} From 1e7731dcc07fd52b039577cdaab5f13d9d5f6d4a Mon Sep 17 00:00:00 2001 From: Antoon P Date: Thu, 24 Nov 2022 12:11:39 +0100 Subject: [PATCH 3/9] Support tpc push with different accounts at src and dest. --- .../http/services/owncloud/ocdav/ocdav.go | 7 +++- internal/http/services/owncloud/ocdav/tpc.go | 37 +++++++++++++++++-- 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/internal/http/services/owncloud/ocdav/ocdav.go b/internal/http/services/owncloud/ocdav/ocdav.go index 8a45c7ac9a..5a251ff634 100644 --- a/internal/http/services/owncloud/ocdav/ocdav.go +++ b/internal/http/services/owncloud/ocdav/ocdav.go @@ -100,7 +100,12 @@ type Config struct { Timeout int64 `mapstructure:"timeout"` Insecure bool `mapstructure:"insecure" docs:"false;Whether to skip certificate checks when sending requests."` // If true, HTTP COPY will expect the HTTP-TPC (third-party copy) headers - EnableHTTPTpc bool `mapstructure:"enable_http_tpc"` + EnableHTTPTpc bool `mapstructure:"enable_http_tpc"` + // The authentication scheme to use for the tpc push call when userinfo part is specified in the Destination header uri. Default value is 'bearer'. + // Possible values: + // "bearer" results in header: Authorization: Bearer ...token... + // "x-access-token": results in header: X-Access-Token: ...token... + HTTPTpcPushAuthHeader string `mapstructure:"http_tpc_push_auth_header"` PublicURL string `mapstructure:"public_url"` FavoriteStorageDriver string `mapstructure:"favorite_storage_driver"` FavoriteStorageDrivers map[string]map[string]interface{} `mapstructure:"favorite_storage_drivers"` diff --git a/internal/http/services/owncloud/ocdav/tpc.go b/internal/http/services/owncloud/ocdav/tpc.go index a1a9aaae5e..90ee3220aa 100644 --- a/internal/http/services/owncloud/ocdav/tpc.go +++ b/internal/http/services/owncloud/ocdav/tpc.go @@ -23,6 +23,7 @@ import ( "fmt" "io" "net/http" + "net/url" "path" "strconv" "strings" @@ -392,9 +393,28 @@ func (s *svc) performHTTPPush(ctx context.Context, client gateway.GatewayAPIClie return err } - // add authentication header and content length - bearerHeader := r.Header.Get(HeaderTransferAuth) - req.Header.Add("Authorization", bearerHeader) + // Check if there is userinfo to be found in the destination URI + // This should be the token to use in the HTTP push call + userInfo, err := s.extractUserInfo(ctx, dst) + if err != nil { + sublog.Debug().Msgf("tpc push: error: %v", err) + } + if len(userInfo) > 0 { + sublog.Debug().Msg("tpc push: userinfo part found in destination url, using userinfo as token for the HTTP push request authorization header") + if s.c.HTTPTpcPushAuthHeader == "x-access-token" { + req.Header.Add(s.c.HTTPTpcPushAuthHeader, userInfo) + sublog.Debug().Msgf("tpc push: using authentication scheme: %v", s.c.HTTPTpcPushAuthHeader) + } else { // Bearer is the default + req.Header.Add("Authorization", "Bearer "+userInfo) + sublog.Debug().Msg("tpc push: using authentication scheme: bearer") + } + } else { + sublog.Debug().Msg("tpc push: no userinfo part found in destination url, using token from the COPY request authorization header") + // add authorization header; single token tpc + bearerHeader := r.Header.Get(HeaderTransferAuth) + req.Header.Add("Authorization", bearerHeader) + } + // add content length req.ContentLength = int64(srcInfo.GetSize()) // do Upload @@ -412,3 +432,14 @@ func (s *svc) performHTTPPush(ctx context.Context, client gateway.GatewayAPIClie return nil } + +// Extracts and returns the userinfo part of the specified target URL (https://[userinfo]@www.example.com:123/...path). +// Returns an empty string if no userinfo part is found. +func (s *svc) extractUserInfo(ctx context.Context, targetURL string) (string, error) { + parsedURL, err := url.Parse(targetURL) + if err != nil { + return "", errtypes.BadRequest("tpc: error extracting userinfo part - error parsing url: " + targetURL) + } + + return parsedURL.User.String(), nil +} From eee2f12e2d97796230e73643c09e3b02991baea4 Mon Sep 17 00:00:00 2001 From: Antoon P Date: Thu, 24 Nov 2022 12:15:01 +0100 Subject: [PATCH 4/9] Simplify; don't touch the target URIs. --- internal/grpc/services/datatx/datatx.go | 52 +------------------------ 1 file changed, 2 insertions(+), 50 deletions(-) diff --git a/internal/grpc/services/datatx/datatx.go b/internal/grpc/services/datatx/datatx.go index a574eb06d2..f30dca89b8 100644 --- a/internal/grpc/services/datatx/datatx.go +++ b/internal/grpc/services/datatx/datatx.go @@ -21,9 +21,7 @@ package datatx import ( "context" "encoding/json" - "fmt" "io" - "net/url" "os" "sync" @@ -77,13 +75,6 @@ type txShare struct { Opaque *types.Opaque `json:"opaque"` } -type webdavEndpoint struct { - filePath string - endpoint string - endpointScheme string - token string -} - func (c *config) init() { if c.TxDriver == "" { c.TxDriver = "rclone" @@ -156,23 +147,7 @@ func (s *service) UnprotectedEndpoints() []string { } func (s *service) PullTransfer(ctx context.Context, req *datatx.PullTransferRequest) (*datatx.PullTransferResponse, error) { - srcEp, err := s.extractEndpointInfo(ctx, req.SrcTargetUri) - if err != nil { - return nil, err - } - srcRemote := fmt.Sprintf("%s://%s", srcEp.endpointScheme, srcEp.endpoint) - srcPath := srcEp.filePath - srcToken := srcEp.token - - destEp, err := s.extractEndpointInfo(ctx, req.DestTargetUri) - if err != nil { - return nil, err - } - dstRemote := fmt.Sprintf("%s://%s", destEp.endpointScheme, destEp.endpoint) - dstPath := destEp.filePath - dstToken := destEp.token - - txInfo, startTransferErr := s.txManager.StartTransfer(ctx, srcRemote, srcPath, srcToken, dstRemote, dstPath, dstToken) + txInfo, startTransferErr := s.txManager.Transfer(ctx, req.SrcTargetUri, req.DestTargetUri) // we always save the transfer regardless of start transfer outcome // only then, if starting fails, can we try to restart it @@ -205,7 +180,7 @@ func (s *service) PullTransfer(ctx context.Context, req *datatx.PullTransferRequ return &datatx.PullTransferResponse{ Status: status.NewOK(ctx), TxInfo: txInfo, - }, err + }, nil } func (s *service) GetTransferStatus(ctx context.Context, req *datatx.GetTransferStatusRequest) (*datatx.GetTransferStatusResponse, error) { @@ -307,29 +282,6 @@ func (s *service) RetryTransfer(ctx context.Context, req *datatx.RetryTransferRe }, nil } -func (s *service) extractEndpointInfo(ctx context.Context, targetURL string) (*webdavEndpoint, error) { - if targetURL == "" { - return nil, errtypes.BadRequest("datatx service: ref target is an empty uri") - } - - uri, err := url.Parse(targetURL) - if err != nil { - return nil, errors.Wrap(err, "datatx service: error parsing target uri: "+targetURL) - } - - m, err := url.ParseQuery(uri.RawQuery) - if err != nil { - return nil, errors.Wrap(err, "datatx service: error parsing target resource name") - } - - return &webdavEndpoint{ - filePath: m["name"][0], - endpoint: uri.Host + uri.Path, - endpointScheme: uri.Scheme, - token: uri.User.String(), - }, nil -} - func loadOrCreate(file string) (*txShareModel, error) { _, err := os.Stat(file) if os.IsNotExist(err) { From 0142b350390a5b4d6a87a5e5f645e8d067a7b3a0 Mon Sep 17 00:00:00 2001 From: Antoon P Date: Thu, 24 Nov 2022 12:16:10 +0100 Subject: [PATCH 5/9] Support rclone tpc push. --- examples/datatx/datatx.toml | 39 ++++++++++++++++++++++++++----------- 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/examples/datatx/datatx.toml b/examples/datatx/datatx.toml index cb50385005..f1271b3483 100644 --- a/examples/datatx/datatx.toml +++ b/examples/datatx/datatx.toml @@ -1,22 +1,39 @@ -# example data transfer service configuration +# Example data transfer service configuration [grpc.services.datatx] -# rclone is the default data transfer driver +# Rclone is the default data transfer driver txdriver = "rclone" -# the shares,transfers db file (default: /var/tmp/reva/datatx-shares.json) +# The shares,transfers db file (default: /var/tmp/reva/datatx-shares.json) tx_shares_file = "" -# base folder of the data transfers (default: /home/DataTransfers) +# Base folder of the data transfers (default: /home/DataTransfers) data_transfers_folder = "" -# rclone data transfer driver +# Rclone data transfer driver [grpc.services.datatx.txdrivers.rclone] -# rclone endpoint +# Rclone endpoint endpoint = "http://..." -# basic auth is used +# Basic auth is used auth_user = "...rcloneuser" auth_pass = "...rcloneusersecret" -# the transfers(jobs) db file (default: /var/tmp/reva/datatx-transfers.json) +# The authentication scheme to use in the src and dest requests by rclone (follows the endpoints' authentication methods) +# Valid values: +# "bearer" (default) will result in rclone using request header: Authorization: "Bearer ...token..." +# "x-access-token" will result in rclone using request header: X-Access-Token: "...token..." +# If not set "bearer" is assumed +auth_header = "x-access-token" +# The transfers(jobs) db file (default: /var/tmp/reva/datatx-transfers.json) file = "" -# check status job interval in milliseconds +# Check status job interval in milliseconds job_status_check_interval = 2000 -# the job timeout in milliseconds (must be long enough for big transfers!) -job_timeout = 120000 \ No newline at end of file +# The job timeout in milliseconds (must be long enough for big transfers!) +job_timeout = 120000 + +[http.services.ocdav] +# Rclone supports third-party copy push; for that to work with reva enable this setting +enable_http_tpc = true +# The authentication scheme reva uses for the tpc push call (the call to Destination). +# Follows the destination endpoint authentication method. +# Valid values: +# "bearer" (default) will result in header: Authorization: "Bearer ...token..." +# "x-access-token" will result in header: X-Access-Token: "...token..." +# If not set "bearer" is assumed +http_tpc_push_auth_header = "x-access-token" From 3292cbc1d10000670210b829218e9cdbcdbb5775 Mon Sep 17 00:00:00 2001 From: Antoon P Date: Thu, 24 Nov 2022 14:50:44 +0100 Subject: [PATCH 6/9] Enhancement: rclone tpc push --- changelog/unreleased/rclone-tpc.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 changelog/unreleased/rclone-tpc.md diff --git a/changelog/unreleased/rclone-tpc.md b/changelog/unreleased/rclone-tpc.md new file mode 100644 index 0000000000..b6f7582f89 --- /dev/null +++ b/changelog/unreleased/rclone-tpc.md @@ -0,0 +1,5 @@ +Enhancement: implement rclone third-party copy push option + +This enhancement gives the option to use third-party copy push with rclone between two different user accounts. + +https://github.com/cs3org/reva/pull/3491 \ No newline at end of file From 7c08220e86891f6b2dca33d9109c41998484ebdb Mon Sep 17 00:00:00 2001 From: Antoon P Date: Thu, 24 Nov 2022 15:09:52 +0100 Subject: [PATCH 7/9] Fix comment --- pkg/datatx/manager/rclone/rclone.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/datatx/manager/rclone/rclone.go b/pkg/datatx/manager/rclone/rclone.go index ec7b0f6123..524ef8fef1 100644 --- a/pkg/datatx/manager/rclone/rclone.go +++ b/pkg/datatx/manager/rclone/rclone.go @@ -127,7 +127,7 @@ type endpoint struct { token string } -// New returns a new rclone driver +// New returns a new rclone driver. func New(m map[string]interface{}) (txdriver.Manager, error) { c, err := parseConfig(m) if err != nil { From 0081f94f51d7b6c1c5ceaf316c06e0d79b0b4593 Mon Sep 17 00:00:00 2001 From: Antoon P Date: Wed, 30 Nov 2022 11:39:21 +0100 Subject: [PATCH 8/9] Configurable auth scheme for dest fs --- pkg/datatx/manager/rclone/rclone.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/datatx/manager/rclone/rclone.go b/pkg/datatx/manager/rclone/rclone.go index 524ef8fef1..2bda4529bf 100644 --- a/pkg/datatx/manager/rclone/rclone.go +++ b/pkg/datatx/manager/rclone/rclone.go @@ -318,7 +318,11 @@ func (driver *rclone) startJob(ctx context.Context, transferID string, srcRemote srcAuthHeader = fmt.Sprintf("headers=\"x-access-token,%v\"", srcToken) } srcFs := fmt.Sprintf(":webdav,%v,url=\"%v\":%v", srcAuthHeader, srcRemote, srcPath) - dstFs := fmt.Sprintf(":webdav,headers=\"x-access-token,%v\",url=\"%v\":%v", destToken, destRemote, destPath) + destAuthHeader := fmt.Sprintf("bearer_token=\"%v\"", destToken) + if driver.config.AuthHeader == "x-access-token" { + destAuthHeader = fmt.Sprintf("headers=\"x-access-token,%v\"", destToken) + } + dstFs := fmt.Sprintf(":webdav,%v,url=\"%v\":%v", destAuthHeader, destRemote, destPath) rcloneReq := &rcloneAsyncReqJSON{ SrcFs: srcFs, DstFs: dstFs, From e27f9bda34a2e5c0e15b9e492a783cb38ed3ce5e Mon Sep 17 00:00:00 2001 From: Antoon P Date: Thu, 1 Dec 2022 10:00:34 +0100 Subject: [PATCH 9/9] Method name changed and documented --- internal/grpc/services/datatx/datatx.go | 2 +- pkg/datatx/datatx.go | 4 +++- pkg/datatx/manager/rclone/rclone.go | 5 +++-- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/internal/grpc/services/datatx/datatx.go b/internal/grpc/services/datatx/datatx.go index f30dca89b8..b574892664 100644 --- a/internal/grpc/services/datatx/datatx.go +++ b/internal/grpc/services/datatx/datatx.go @@ -147,7 +147,7 @@ func (s *service) UnprotectedEndpoints() []string { } func (s *service) PullTransfer(ctx context.Context, req *datatx.PullTransferRequest) (*datatx.PullTransferResponse, error) { - txInfo, startTransferErr := s.txManager.Transfer(ctx, req.SrcTargetUri, req.DestTargetUri) + txInfo, startTransferErr := s.txManager.CreateTransfer(ctx, req.SrcTargetUri, req.DestTargetUri) // we always save the transfer regardless of start transfer outcome // only then, if starting fails, can we try to restart it diff --git a/pkg/datatx/datatx.go b/pkg/datatx/datatx.go index 07419c6bdc..fd84418905 100644 --- a/pkg/datatx/datatx.go +++ b/pkg/datatx/datatx.go @@ -26,7 +26,9 @@ import ( // Manager the interface any transfer driver should implement. type Manager interface { - Transfer(ctx context.Context, srcTargetURI string, dstTargetURI string) (*datatx.TxInfo, error) + // CreateTransfer creates a transfer job and returns a TxInfo object that includes a unique transfer id. + // Specified target URIs are of form scheme://userinfo@host:port?name={path} + CreateTransfer(ctx context.Context, srcTargetURI string, dstTargetURI string) (*datatx.TxInfo, error) // GetTransferStatus returns a TxInfo object including the current status, and error if any. GetTransferStatus(ctx context.Context, transferID string) (*datatx.TxInfo, error) // CancelTransfer cancels the transfer and returns a TxInfo object and error if any. diff --git a/pkg/datatx/manager/rclone/rclone.go b/pkg/datatx/manager/rclone/rclone.go index 2bda4529bf..1b260980ab 100644 --- a/pkg/datatx/manager/rclone/rclone.go +++ b/pkg/datatx/manager/rclone/rclone.go @@ -216,8 +216,9 @@ func (m *transferModel) saveTransfer(e error) error { return e } -// Transfer initiates a transfer job and returns a TxInfo object that includes a unique transfer id. -func (driver *rclone) Transfer(ctx context.Context, srcTargetURI string, dstTargetURI string) (*datatx.TxInfo, error) { +// CreateTransfer creates a transfer job and returns a TxInfo object that includes a unique transfer id. +// Specified target URIs are of form scheme://userinfo@host:port?name={path} +func (driver *rclone) CreateTransfer(ctx context.Context, srcTargetURI string, dstTargetURI string) (*datatx.TxInfo, error) { logger := appctx.GetLogger(ctx) srcEp, err := driver.extractEndpointInfo(ctx, srcTargetURI)