From 4a577678282dbe8ad6ca7e7d768ecc1666b1bd6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Tue, 1 Oct 2024 13:08:02 +0200 Subject: [PATCH] return 409 conflict when a file was already created MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- .../fix-return-code-on-upload-error.md | 5 ++ .../utils/decomposedfs/upload/upload.go | 72 +++++++++++-------- 2 files changed, 47 insertions(+), 30 deletions(-) create mode 100644 changelog/unreleased/fix-return-code-on-upload-error.md diff --git a/changelog/unreleased/fix-return-code-on-upload-error.md b/changelog/unreleased/fix-return-code-on-upload-error.md new file mode 100644 index 0000000000..2619e83d35 --- /dev/null +++ b/changelog/unreleased/fix-return-code-on-upload-error.md @@ -0,0 +1,5 @@ +Bugfix: return 409 conflict when a file was already created + +We now return the correct 409 conflict status code when a file was already created by another upload. + +https://github.com/cs3org/reva/pull/4872 diff --git a/pkg/storage/utils/decomposedfs/upload/upload.go b/pkg/storage/utils/decomposedfs/upload/upload.go index ae56604290..0c41b8f759 100644 --- a/pkg/storage/utils/decomposedfs/upload/upload.go +++ b/pkg/storage/utils/decomposedfs/upload/upload.go @@ -25,6 +25,7 @@ import ( "hash" "io" "io/fs" + "net/http" "os" "strconv" "strings" @@ -47,14 +48,16 @@ import ( "go.opentelemetry.io/otel/trace" ) -var tracer trace.Tracer +var ( + tracer trace.Tracer + ErrAlreadyExists = tusd.NewError("ERR_ALREADY_EXISTS", "file already exists", http.StatusConflict) + defaultFilePerm = os.FileMode(0664) +) func init() { tracer = otel.Tracer("github.com/cs3org/reva/pkg/storage/utils/decomposedfs/upload") } -var defaultFilePerm = os.FileMode(0664) - // WriteChunk writes the stream from the reader to the given offset of the upload func (session *OcisSession) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) { ctx, span := tracer.Start(session.Context(ctx), "WriteChunk") @@ -165,7 +168,15 @@ func (session *OcisSession) FinishUpload(ctx context.Context) error { n, err := session.store.CreateNodeForUpload(session, attrs) if err != nil { session.store.Cleanup(ctx, session, true, false, false) - return err + // we need to return a tusd error here to make the tusd handler return the correct status code + switch err.(type) { + case errtypes.AlreadyExists: + return ErrAlreadyExists + case errtypes.Aborted: + return tusd.NewError("ERR_PRECONDITION_FAILED", err.Error(), http.StatusPreconditionFailed) + default: + return err + } } // increase the processing counter for every started processing @@ -302,34 +313,35 @@ func (session *OcisSession) Cleanup(revertNodeMetadata, cleanBin, cleanInfo bool n, err := session.Node(ctx) if err != nil { appctx.GetLogger(ctx).Error().Err(err).Str("sessionid", session.ID()).Msg("reading node for session failed") - } - if session.NodeExists() && session.info.MetaData["versionsPath"] != "" { - p := session.info.MetaData["versionsPath"] - if err := session.store.lu.CopyMetadata(ctx, p, n.InternalPath(), func(attributeName string, value []byte) (newValue []byte, copy bool) { - return value, strings.HasPrefix(attributeName, prefixes.ChecksumPrefix) || - attributeName == prefixes.TypeAttr || - attributeName == prefixes.BlobIDAttr || - attributeName == prefixes.BlobsizeAttr || - attributeName == prefixes.MTimeAttr - }, true); err != nil { - appctx.GetLogger(ctx).Info().Str("versionpath", p).Str("nodepath", n.InternalPath()).Err(err).Msg("renaming version node failed") - } - - if err := os.RemoveAll(p); err != nil { - appctx.GetLogger(ctx).Info().Str("versionpath", p).Str("nodepath", n.InternalPath()).Err(err).Msg("error removing version") - } - } else { - // if no other upload session is in progress (processing id != session id) or has finished (processing id == "") - latestSession, err := n.ProcessingID(ctx) - if err != nil { - appctx.GetLogger(ctx).Error().Err(err).Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Str("uploadid", session.ID()).Msg("reading processingid for session failed") - } - if latestSession == session.ID() { - // actually delete the node - session.removeNode(ctx) + if session.NodeExists() && session.info.MetaData["versionsPath"] != "" { + p := session.info.MetaData["versionsPath"] + if err := session.store.lu.CopyMetadata(ctx, p, n.InternalPath(), func(attributeName string, value []byte) (newValue []byte, copy bool) { + return value, strings.HasPrefix(attributeName, prefixes.ChecksumPrefix) || + attributeName == prefixes.TypeAttr || + attributeName == prefixes.BlobIDAttr || + attributeName == prefixes.BlobsizeAttr || + attributeName == prefixes.MTimeAttr + }, true); err != nil { + appctx.GetLogger(ctx).Info().Str("versionpath", p).Str("nodepath", n.InternalPath()).Err(err).Msg("renaming version node failed") + } + + if err := os.RemoveAll(p); err != nil { + appctx.GetLogger(ctx).Info().Str("versionpath", p).Str("nodepath", n.InternalPath()).Err(err).Msg("error removing version") + } + + } else { + // if no other upload session is in progress (processing id != session id) or has finished (processing id == "") + latestSession, err := n.ProcessingID(ctx) + if err != nil { + appctx.GetLogger(ctx).Error().Err(err).Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Str("uploadid", session.ID()).Msg("reading processingid for session failed") + } + if latestSession == session.ID() { + // actually delete the node + session.removeNode(ctx) + } + // FIXME else if the upload has become a revision, delete the revision, or if it is the last one, delete the node } - // FIXME else if the upload has become a revision, delete the revision, or if it is the last one, delete the node } }