Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

return 409 conflict when a file was already created #4872

Merged
merged 1 commit into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/unreleased/fix-return-code-on-upload-error.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bugfix: return 409 conflict when a file was already created

We now return the correct 409 conflict status code when a file was already created by another upload.

https://github.com/cs3org/reva/pull/4872
72 changes: 42 additions & 30 deletions pkg/storage/utils/decomposedfs/upload/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"hash"
"io"
"io/fs"
"net/http"
"os"
"strconv"
"strings"
Expand All @@ -47,14 +48,16 @@ import (
"go.opentelemetry.io/otel/trace"
)

var tracer trace.Tracer
var (
tracer trace.Tracer
ErrAlreadyExists = tusd.NewError("ERR_ALREADY_EXISTS", "file already exists", http.StatusConflict)
defaultFilePerm = os.FileMode(0664)
)

func init() {
tracer = otel.Tracer("github.com/cs3org/reva/pkg/storage/utils/decomposedfs/upload")
}

var defaultFilePerm = os.FileMode(0664)

// WriteChunk writes the stream from the reader to the given offset of the upload
func (session *OcisSession) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) {
ctx, span := tracer.Start(session.Context(ctx), "WriteChunk")
Expand Down Expand Up @@ -165,7 +168,15 @@ func (session *OcisSession) FinishUpload(ctx context.Context) error {
n, err := session.store.CreateNodeForUpload(session, attrs)
if err != nil {
session.store.Cleanup(ctx, session, true, false, false)
return err
// we need to return a tusd error here to make the tusd handler return the correct status code
switch err.(type) {
case errtypes.AlreadyExists:
return ErrAlreadyExists
case errtypes.Aborted:
return tusd.NewError("ERR_PRECONDITION_FAILED", err.Error(), http.StatusPreconditionFailed)
default:
return err
}
}

// increase the processing counter for every started processing
Expand Down Expand Up @@ -302,34 +313,35 @@ func (session *OcisSession) Cleanup(revertNodeMetadata, cleanBin, cleanInfo bool
n, err := session.Node(ctx)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Str("sessionid", session.ID()).Msg("reading node for session failed")
}
if session.NodeExists() && session.info.MetaData["versionsPath"] != "" {
p := session.info.MetaData["versionsPath"]
if err := session.store.lu.CopyMetadata(ctx, p, n.InternalPath(), func(attributeName string, value []byte) (newValue []byte, copy bool) {
return value, strings.HasPrefix(attributeName, prefixes.ChecksumPrefix) ||
attributeName == prefixes.TypeAttr ||
attributeName == prefixes.BlobIDAttr ||
attributeName == prefixes.BlobsizeAttr ||
attributeName == prefixes.MTimeAttr
}, true); err != nil {
appctx.GetLogger(ctx).Info().Str("versionpath", p).Str("nodepath", n.InternalPath()).Err(err).Msg("renaming version node failed")
}

if err := os.RemoveAll(p); err != nil {
appctx.GetLogger(ctx).Info().Str("versionpath", p).Str("nodepath", n.InternalPath()).Err(err).Msg("error removing version")
}

} else {
// if no other upload session is in progress (processing id != session id) or has finished (processing id == "")
latestSession, err := n.ProcessingID(ctx)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Str("uploadid", session.ID()).Msg("reading processingid for session failed")
}
if latestSession == session.ID() {
// actually delete the node
session.removeNode(ctx)
if session.NodeExists() && session.info.MetaData["versionsPath"] != "" {
p := session.info.MetaData["versionsPath"]
if err := session.store.lu.CopyMetadata(ctx, p, n.InternalPath(), func(attributeName string, value []byte) (newValue []byte, copy bool) {
return value, strings.HasPrefix(attributeName, prefixes.ChecksumPrefix) ||
attributeName == prefixes.TypeAttr ||
attributeName == prefixes.BlobIDAttr ||
attributeName == prefixes.BlobsizeAttr ||
attributeName == prefixes.MTimeAttr
}, true); err != nil {
appctx.GetLogger(ctx).Info().Str("versionpath", p).Str("nodepath", n.InternalPath()).Err(err).Msg("renaming version node failed")
}

if err := os.RemoveAll(p); err != nil {
appctx.GetLogger(ctx).Info().Str("versionpath", p).Str("nodepath", n.InternalPath()).Err(err).Msg("error removing version")
}

} else {
// if no other upload session is in progress (processing id != session id) or has finished (processing id == "")
latestSession, err := n.ProcessingID(ctx)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Str("uploadid", session.ID()).Msg("reading processingid for session failed")
}
if latestSession == session.ID() {
// actually delete the node
session.removeNode(ctx)
}
// FIXME else if the upload has become a revision, delete the revision, or if it is the last one, delete the node
}
// FIXME else if the upload has become a revision, delete the revision, or if it is the last one, delete the node
}
}

Expand Down
Loading