Skip to content

Commit

Permalink
Merge pull request #10141 from owncloud/gtw-selector
Browse files Browse the repository at this point in the history
[full-ci] address Gtw selector sonarcloud complaints
  • Loading branch information
butonic authored Sep 26, 2024
2 parents e6d5c96 + 5140623 commit ef15a63
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 160 deletions.
3 changes: 2 additions & 1 deletion changelog/unreleased/fix-select-next-gateway-client.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ Bugfix: Always select next gateway client

We now use the gateway selector to always select the next gateway client. This ensures that we can always connect to the gateway during up- and downscaling.

https://github.com/owncloud/ocis/pull/10133
https://github.com/owncloud/ocis/pull/10141
https://github.com/owncloud/ocis/pull/10133
168 changes: 65 additions & 103 deletions services/collaboration/pkg/connector/contentconnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"crypto/tls"
"fmt"
"io"
"net/http"
"strconv"
Expand Down Expand Up @@ -55,6 +56,27 @@ func NewContentConnector(gws pool.Selectable[gatewayv1beta1.GatewayAPIClient], c
}
}

func newHttpRequest(ctx context.Context, wopiContext middleware.WopiContext, method, url, transferToken string, body io.Reader) (*http.Request, error) {
httpReq, err := http.NewRequestWithContext(ctx, method, url, body)
if err != nil {
return nil, err
}
if url == "" {
return nil, NewConnectorError(500, "url is missing")
}
if transferToken != "" {
httpReq.Header.Add("X-Reva-Transfer", transferToken)
}
if wopiContext.ViewMode == appproviderv1beta1.ViewMode_VIEW_MODE_VIEW_ONLY && wopiContext.ViewOnlyToken != "" {
httpReq.Header.Add("X-Access-Token", wopiContext.ViewOnlyToken)
} else {
httpReq.Header.Add("X-Access-Token", wopiContext.AccessToken)
}
tracingProp := tracing.GetPropagator()
tracingProp.Inject(ctx, propagation.HeaderCarrier(httpReq.Header))
return httpReq, nil
}

// GetFile downloads the file from the storage
// https://docs.microsoft.com/en-us/microsoft-365/cloud-storage-partner-program/rest/files/getfile
//
Expand Down Expand Up @@ -84,13 +106,10 @@ func (c *ContentConnector) GetFile(ctx context.Context, w http.ResponseWriter) e
sResp, err := gwc.Stat(ctx, &providerv1beta1.StatRequest{
Ref: wopiContext.FileReference,
})
if err != nil {
logger.Error().Err(err).Msg("GetFile: Stat Request failed")
if err := requestFailed(logger, sResp.GetStatus(), false, err, "GetFile: Stat Request failed"); err != nil {
return err
}
if sResp.GetStatus().GetCode() != rpcv1beta1.Code_CODE_OK {
return NewConnectorError(500, sResp.GetStatus().GetCode().String()+" "+sResp.GetStatus().GetMessage())
}

// Initiate download request
req := &providerv1beta1.InitiateFileDownloadRequest{
Ref: wopiContext.FileReference,
Expand All @@ -104,19 +123,10 @@ func (c *ContentConnector) GetFile(ctx context.Context, w http.ResponseWriter) e
return err
}
resp, err := gwc.InitiateFileDownload(ctx, req)
if err != nil {
logger.Error().Err(err).Msg("GetFile: InitiateFileDownload failed")
if err := requestFailed(logger, resp.GetStatus(), false, err, "GetFile: InitiateFileDownload failed"); err != nil {
return err
}

if resp.GetStatus().GetCode() != rpcv1beta1.Code_CODE_OK {
logger.Error().
Str("StatusCode", resp.GetStatus().GetCode().String()).
Str("StatusMsg", resp.GetStatus().GetMessage()).
Msg("GetFile: InitiateFileDownload failed with wrong status")
return NewConnectorError(500, resp.GetStatus().GetCode().String()+" "+resp.GetStatus().GetMessage())
}

// Figure out the download endpoint and download token
downloadEndpoint := ""
downloadToken := ""
Expand All @@ -131,13 +141,9 @@ func (c *ContentConnector) GetFile(ctx context.Context, w http.ResponseWriter) e
}
}

if downloadEndpoint == "" {
logger.Error().
Str("Endpoint", downloadEndpoint).
Bool("HasDownloadToken", hasDownloadToken).
Msg("GetFile: Download endpoint or token is missing")
return NewConnectorError(500, "GetFile: Download endpoint is missing")
}
logger = logger.With().
Str("Endpoint", downloadEndpoint).
Bool("HasDownloadToken", hasDownloadToken).Logger()

httpClient := http.Client{
Transport: &http.Transport{
Expand All @@ -148,42 +154,23 @@ func (c *ContentConnector) GetFile(ctx context.Context, w http.ResponseWriter) e
}

// Prepare the request to download the file
httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, downloadEndpoint, bytes.NewReader([]byte("")))
// public link downloads have the token in the download endpoint
httpReq, err := newHttpRequest(ctx, wopiContext, http.MethodGet, downloadEndpoint, downloadToken, bytes.NewReader([]byte("")))
if err != nil {
logger.Error().
Err(err).
Str("Endpoint", downloadEndpoint).
Bool("HasDownloadToken", hasDownloadToken).
Msg("GetFile: Could not create the request to the endpoint")
logger.Error().Err(err).Msg("GetFile: Could not create the request to the endpoint")
return err
}
if downloadToken != "" {
// public link downloads have the token in the download endpoint
httpReq.Header.Add("X-Reva-Transfer", downloadToken)
}
if wopiContext.ViewMode == appproviderv1beta1.ViewMode_VIEW_MODE_VIEW_ONLY && wopiContext.ViewOnlyToken != "" {
httpReq.Header.Add("X-Access-Token", wopiContext.ViewOnlyToken)
} else {
httpReq.Header.Add("X-Access-Token", wopiContext.AccessToken)
}
tracingProp := tracing.GetPropagator()
tracingProp.Inject(ctx, propagation.HeaderCarrier(httpReq.Header))

httpResp, err := httpClient.Do(httpReq)
if err != nil {
logger.Error().
Err(err).
Str("Endpoint", downloadEndpoint).
Bool("HasDownloadToken", hasDownloadToken).
Msg("GetFile: Get request to the download endpoint failed")
logger.Error().Err(err).Msg("GetFile: Get request to the download endpoint failed")
return err
}

defer httpResp.Body.Close()

if httpResp.StatusCode != http.StatusOK {
logger.Error().
Err(err).
Int("HttpCode", httpResp.StatusCode).
Msg("GetFile: downloading the file failed")
return NewConnectorError(500, "GetFile: Downloading the file failed")
Expand Down Expand Up @@ -245,19 +232,11 @@ func (c *ContentConnector) PutFile(ctx context.Context, stream io.Reader, stream
statRes, err := gwc.Stat(ctx, &providerv1beta1.StatRequest{
Ref: wopiContext.FileReference,
})
if err != nil {
logger.Error().Err(err).Msg("PutFile: stat failed")
// we can ignore a not found error here, as we're going to create the file
if err := requestFailed(logger, statRes.GetStatus(), true, err, "PutFile: stat failed"); err != nil {
return nil, err
}

if statRes.GetStatus().GetCode() != rpcv1beta1.Code_CODE_OK && statRes.GetStatus().GetCode() != rpcv1beta1.Code_CODE_NOT_FOUND {
logger.Error().
Str("StatusCode", statRes.GetStatus().GetCode().String()).
Str("StatusMsg", statRes.GetStatus().GetMessage()).
Msg("PutFile: stat failed with unexpected status")
return NewResponse(500), nil
}

mtime := statRes.GetInfo().GetMtime()
// If there is a lock and it mismatches, return 409
if statRes.GetInfo().GetLock() != nil && statRes.GetInfo().GetLock().GetLockId() != lockID {
Expand Down Expand Up @@ -305,19 +284,10 @@ func (c *ContentConnector) PutFile(ctx context.Context, stream io.Reader, stream
}
// Initiate the upload request
resp, err := gwc.InitiateFileUpload(ctx, req)
if err != nil {
logger.Error().Err(err).Msg("UploadHelper: InitiateFileUpload failed")
if err := requestFailed(logger, resp.GetStatus(), false, err, "PutFile: InitiateFileUpload failed"); err != nil {
return nil, err
}

if resp.GetStatus().GetCode() != rpcv1beta1.Code_CODE_OK {
logger.Error().
Str("StatusCode", resp.GetStatus().GetCode().String()).
Str("StatusMsg", resp.GetStatus().GetMessage()).
Msg("UploadHelper: InitiateFileUpload failed with wrong status")
return NewResponse(500), nil
}

// if the content length is greater than 0, we need to upload the content to the
// target endpoint, otherwise we're done
if streamLength > 0 {
Expand All @@ -335,13 +305,9 @@ func (c *ContentConnector) PutFile(ctx context.Context, stream io.Reader, stream
}
}

if uploadEndpoint == "" {
logger.Error().
Str("Endpoint", uploadEndpoint).
Bool("HasUploadToken", hasUploadToken).
Msg("UploadHelper: Upload endpoint or token is missing")
return NewResponse(500), nil
}
logger = logger.With().
Str("Endpoint", uploadEndpoint).
Bool("HasUploadToken", hasUploadToken).Logger()

httpClient := http.Client{
Transport: &http.Transport{
Expand All @@ -353,52 +319,35 @@ func (c *ContentConnector) PutFile(ctx context.Context, stream io.Reader, stream
}

// prepare the request to upload the contents to the upload endpoint
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPut, uploadEndpoint, stream)
// public link uploads have the token in the upload endpoint
httpReq, err := newHttpRequest(ctx, wopiContext, http.MethodPut, uploadEndpoint, uploadToken, stream)
if err != nil {
logger.Error().
Err(err).
Str("Endpoint", uploadEndpoint).
Bool("HasUploadToken", hasUploadToken).
Msg("UploadHelper: Could not create the request to the endpoint")
logger.Error().Err(err).Msg("UploadHelper: Could not create the request to the endpoint")
return nil, err
}
// "stream" is an *http.body and doesn't fill the httpReq.ContentLength automatically
// we need to fill the ContentLength ourselves, and must match the stream length in order
// to prevent issues
httpReq.ContentLength = streamLength

if uploadToken != "" {
// public link uploads have the token in the upload endpoint
httpReq.Header.Add("X-Reva-Transfer", uploadToken)
}
httpReq.Header.Add("X-Access-Token", wopiContext.AccessToken)

httpReq.Header.Add("X-Lock-Id", lockID)
// TODO: better mechanism for the upload while locked, relies on patch in REVA
//if lockID, ok := ctxpkg.ContextGetLockID(ctx); ok {
// httpReq.Header.Add("X-Lock-Id", lockID)
//}
tracingProp := tracing.GetPropagator()
tracingProp.Inject(ctx, propagation.HeaderCarrier(httpReq.Header))

httpResp, err := httpClient.Do(httpReq)
if err != nil {
logger.Error().
Err(err).
Str("Endpoint", uploadEndpoint).
Bool("HasUploadToken", hasUploadToken).
Msg("UploadHelper: Put request to the upload endpoint failed")
logger.Error().Err(err).Msg("UploadHelper: Put request to the upload endpoint failed")
return nil, err
}
defer httpResp.Body.Close()

if httpResp.StatusCode != http.StatusOK {
logger.Error().
Str("Endpoint", uploadEndpoint).
Bool("HasUploadToken", hasUploadToken).
Int("HttpCode", httpResp.StatusCode).
Msg("UploadHelper: Put request to the upload endpoint failed with unexpected status")
return NewResponse(500), nil
return nil, NewConnectorError(500, fmt.Sprintf("unexpected status code %d from the upload endpoint", httpResp.StatusCode))
}
gwc, err = c.gws.Next()
if err != nil {
Expand All @@ -409,20 +358,33 @@ func (c *ContentConnector) PutFile(ctx context.Context, stream io.Reader, stream
statResAfter, err := gwc.Stat(ctx, &providerv1beta1.StatRequest{
Ref: wopiContext.FileReference,
})
if err != nil {
logger.Error().Err(err).Msg("PutFile: stat after upload failed")
if err := requestFailed(logger, statResAfter.GetStatus(), false, err, "PutFile: stat after upload failed"); err != nil {
return nil, err
}
if statResAfter.GetStatus().GetCode() != rpcv1beta1.Code_CODE_OK {
logger.Error().
Str("StatusCode", statRes.GetStatus().GetCode().String()).
Str("StatusMsg", statRes.GetStatus().GetMessage()).
Msg("PutFile: stat after upload failed with unexpected status")
return NewResponse(500), nil
}
mtime = statResAfter.GetInfo().GetMtime()
}

logger.Debug().Msg("PutFile: success")
return NewResponseWithVersion(mtime), nil
}

func requestFailed(logger zerolog.Logger, s *rpcv1beta1.Status, allowNotFound bool, err error, msg string) error {
switch {
case err != nil: // a connection error
logger.Error().Err(err).Msg(msg)
return err
case s == nil: // we need a status
logger.Error().Msg(msg + ": nil status")
return NewConnectorError(500, msg+": nil status")
case s.GetCode() == rpcv1beta1.Code_CODE_OK: // ok is fine
return nil
case allowNotFound && s.GetCode() == rpcv1beta1.Code_CODE_NOT_FOUND: // not found might be ok
return nil
default: // any other status is an error
logger.Error().
Str("StatusCode", s.GetCode().String()).
Str("StatusMsg", s.GetMessage()).
Msg(msg)
return NewConnectorError(500, s.GetCode().String()+" "+s.GetMessage())
}
}
29 changes: 14 additions & 15 deletions services/collaboration/pkg/connector/contentconnector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,8 @@ var _ = Describe("ContentConnector", func() {
}, nil)

err := cc.GetFile(ctx, sb)
Expect(err).To(HaveOccurred())
conErr := err.(*connector.ConnectorError)
Expect(conErr.HttpCodeOut).To(Equal(500))
targetErr := connector.NewConnectorError(500, "CODE_INTERNAL Something failed")
Expect(err).To(Equal(targetErr))
})

It("Missing download endpoint", func() {
Expand Down Expand Up @@ -264,9 +263,9 @@ var _ = Describe("ContentConnector", func() {
}, nil)

response, err := cc.PutFile(ctx, reader, reader.Size(), "notARandomLockId")
Expect(err).ToNot(HaveOccurred())
Expect(response.Status).To(Equal(500))
Expect(response.Headers).To(BeNil())
targetErr := connector.NewConnectorError(500, "CODE_INTERNAL Something failed")
Expect(err).To(Equal(targetErr))
Expect(response).To(BeNil())
})

It("Mismatched lockId", func() {
Expand Down Expand Up @@ -354,9 +353,9 @@ var _ = Describe("ContentConnector", func() {
}, nil)

response, err := cc.PutFile(ctx, reader, reader.Size(), "goodAndValidLock")
Expect(err).ToNot(HaveOccurred())
Expect(response.Status).To(Equal(500))
Expect(response.Headers).To(BeNil())
targetErr := connector.NewConnectorError(500, "CODE_INTERNAL Something failed")
Expect(err).To(Equal(targetErr))
Expect(response).To(BeNil())
})

It("Empty upload successful", func() {
Expand Down Expand Up @@ -406,9 +405,9 @@ var _ = Describe("ContentConnector", func() {
}, nil)

response, err := cc.PutFile(ctx, reader, reader.Size(), "goodAndValidLock")
Expect(err).ToNot(HaveOccurred())
Expect(response.Status).To(Equal(500))
Expect(response.Headers).To(BeNil())
targetErr := connector.NewConnectorError(500, "url is missing")
Expect(err).To(Equal(targetErr))
Expect(response).To(BeNil())
})

It("upload request failed", func() {
Expand Down Expand Up @@ -438,9 +437,9 @@ var _ = Describe("ContentConnector", func() {

response, err := cc.PutFile(ctx, reader, reader.Size(), "goodAndValidLock")
Expect(srvReqHeader.Get("X-Access-Token")).To(Equal(wopiCtx.AccessToken))
Expect(err).ToNot(HaveOccurred())
Expect(response.Status).To(Equal(500))
Expect(response.Headers).To(BeNil())
targetErr := connector.NewConnectorError(500, "unexpected status code 404 from the upload endpoint")
Expect(err).To(Equal(targetErr))
Expect(response).To(BeNil())
})

It("upload request success", func() {
Expand Down
6 changes: 3 additions & 3 deletions services/collaboration/pkg/connector/fileconnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ func (f *FileConnector) PutRelativeFileSuggested(ctx context.Context, ccs Conten
// try to put the file. It mustn't return a 400 or 409
putResponse, err := ccs.PutFile(newCtx, stream, streamLength, "")
if err != nil {
newLogger.Error().Err(err).Msg("PutRelativeFileSuggested: put file failed")
newLogger.Error().Err(err).Msg("PutRelativeFileSuggested: put file failed") // fails here
return nil, err
}

Expand Down Expand Up @@ -799,7 +799,7 @@ func (f *FileConnector) PutRelativeFileRelative(ctx context.Context, ccs Content
// try to put the file
putResponse, err := ccs.PutFile(newCtx, stream, streamLength, "")
if err != nil {
newLogger.Error().Err(err).Msg("PutRelativeFileRelative: put file failed")
newLogger.Error().Err(err).Msg("PutRelativeFileRelative: put file failed") // or here
return nil, err
}

Expand Down Expand Up @@ -849,7 +849,7 @@ func (f *FileConnector) PutRelativeFileRelative(ctx context.Context, ccs Content
newLogger.Error().
Str("LockID", lockID).
Msg("PutRelativeFileRelative: put file failed with unhandled status")
return NewResponse(500), nil
return nil, NewConnectorError(putResponse.Status, "put file failed with unhandled status")
}

if err := f.adjustWopiReference(ctx, &wopiContext, newLogger); err != nil {
Expand Down
Loading

0 comments on commit ef15a63

Please sign in to comment.