Skip to content

Commit

Permalink
Add RemoveObjectsWithResult() to return delete-marker & delete-marker…
Browse files Browse the repository at this point in the history
…-version-id (#1456)

Using this API will make it easier to get the delete marker version id
after removing an object in a versioned bucket.
  • Loading branch information
vadmeste authored Dec 8, 2021
1 parent 500d1ce commit a566788
Show file tree
Hide file tree
Showing 3 changed files with 233 additions and 30 deletions.
128 changes: 99 additions & 29 deletions api-remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,11 @@ func (c *Client) RemoveObject(ctx context.Context, bucketName, objectName string
return err
}

return c.removeObject(ctx, bucketName, objectName, opts)
res := c.removeObject(ctx, bucketName, objectName, opts)
return res.Err
}

func (c *Client) removeObject(ctx context.Context, bucketName, objectName string, opts RemoveObjectOptions) error {

func (c *Client) removeObject(ctx context.Context, bucketName, objectName string, opts RemoveObjectOptions) RemoveObjectResult {
// Get resources properly escaped and lined up before
// using them in http request.
urlValues := make(url.Values)
Expand Down Expand Up @@ -181,19 +181,25 @@ func (c *Client) removeObject(ctx context.Context, bucketName, objectName string
})
defer closeResponse(resp)
if err != nil {
return err
return RemoveObjectResult{Err: err}
}
if resp != nil {
// if some unexpected error happened and max retry is reached, we want to let client know
if resp.StatusCode != http.StatusNoContent {
return httpRespToErrorResponse(resp, bucketName, objectName)
err := httpRespToErrorResponse(resp, bucketName, objectName)
return RemoveObjectResult{Err: err}
}
}

// DeleteObject always responds with http '204' even for
// objects which do not exist. So no need to handle them
// specifically.
return nil
return RemoveObjectResult{
ObjectName: objectName,
ObjectVersionID: opts.VersionID,
DeleteMarker: resp.Header.Get("x-amz-delete-marker") == "true",
DeleteMarkerVersionID: resp.Header.Get("x-amz-version-id"),
}
}

// RemoveObjectError - container of Multi Delete S3 API error
Expand All @@ -203,6 +209,17 @@ type RemoveObjectError struct {
Err error
}

// RemoveObjectResult - container of Multi Delete S3 API result
type RemoveObjectResult struct {
ObjectName string
ObjectVersionID string

DeleteMarker bool
DeleteMarkerVersionID string

Err error
}

// generateRemoveMultiObjects - generate the XML request for remove multi objects request
func generateRemoveMultiObjectsRequest(objects []ObjectInfo) []byte {
delObjects := []deleteObject{}
Expand All @@ -212,31 +229,42 @@ func generateRemoveMultiObjectsRequest(objects []ObjectInfo) []byte {
VersionID: obj.VersionID,
})
}
xmlBytes, _ := xml.Marshal(deleteMultiObjects{Objects: delObjects, Quiet: true})
xmlBytes, _ := xml.Marshal(deleteMultiObjects{Objects: delObjects, Quiet: false})
return xmlBytes
}

// processRemoveMultiObjectsResponse - parse the remove multi objects web service
// and return the success/failure result status for each object
func processRemoveMultiObjectsResponse(body io.Reader, objects []ObjectInfo, errorCh chan<- RemoveObjectError) {
func processRemoveMultiObjectsResponse(body io.Reader, objects []ObjectInfo, resultCh chan<- RemoveObjectResult) {
// Parse multi delete XML response
rmResult := &deleteMultiObjectsResult{}
err := xmlDecoder(body, rmResult)
if err != nil {
errorCh <- RemoveObjectError{ObjectName: "", Err: err}
resultCh <- RemoveObjectResult{ObjectName: "", Err: err}
return
}

// Fill deletion that returned success
for _, obj := range rmResult.DeletedObjects {
resultCh <- RemoveObjectResult{
ObjectName: obj.Key,
// Only filled with versioned buckets
ObjectVersionID: obj.VersionID,
DeleteMarker: obj.DeleteMarker,
DeleteMarkerVersionID: obj.DeleteMarkerVersionID,
}
}

// Fill deletion that returned an error.
for _, obj := range rmResult.UnDeletedObjects {
// Version does not exist is not an error ignore and continue.
switch obj.Code {
case "InvalidArgument", "NoSuchVersion":
continue
}
errorCh <- RemoveObjectError{
ObjectName: obj.Key,
VersionID: obj.VersionID,
resultCh <- RemoveObjectResult{
ObjectName: obj.Key,
ObjectVersionID: obj.VersionID,
Err: ErrorResponse{
Code: obj.Code,
Message: obj.Message,
Expand Down Expand Up @@ -273,10 +301,54 @@ func (c *Client) RemoveObjects(ctx context.Context, bucketName string, objectsCh
return errorCh
}

go c.removeObjects(ctx, bucketName, objectsCh, errorCh, opts)
resultCh := make(chan RemoveObjectResult, 1)
go c.removeObjects(ctx, bucketName, objectsCh, resultCh, opts)
go func() {
defer close(errorCh)
for res := range resultCh {
// Send only errors to the error channel
if res.Err == nil {
continue
}
errorCh <- RemoveObjectError{
ObjectName: res.ObjectName,
VersionID: res.ObjectVersionID,
Err: res.Err,
}
}
}()

return errorCh
}

// RemoveObjectsWithResult removes multiple objects from a bucket while
// it is possible to specify objects versions which are received from
// objectsCh. Remove results, successes and failures are sent back via
// RemoveObjectResult channel
func (c *Client) RemoveObjectsWithResult(ctx context.Context, bucketName string, objectsCh <-chan ObjectInfo, opts RemoveObjectsOptions) <-chan RemoveObjectResult {
resultCh := make(chan RemoveObjectResult, 1)

// Validate if bucket name is valid.
if err := s3utils.CheckValidBucketName(bucketName); err != nil {
defer close(resultCh)
resultCh <- RemoveObjectResult{
Err: err,
}
return resultCh
}
// Validate objects channel to be properly allocated.
if objectsCh == nil {
defer close(resultCh)
resultCh <- RemoveObjectResult{
Err: errInvalidArgument("Objects channel cannot be nil"),
}
return resultCh
}

go c.removeObjects(ctx, bucketName, objectsCh, resultCh, opts)
return resultCh
}

// Return true if the character is within the allowed characters in an XML 1.0 document
// The list of allowed characters can be found here: https://www.w3.org/TR/xml/#charsets
func validXMLChar(r rune) (ok bool) {
Expand All @@ -298,14 +370,14 @@ func hasInvalidXMLChar(str string) bool {
}

// Generate and call MultiDelete S3 requests based on entries received from objectsCh
func (c *Client) removeObjects(ctx context.Context, bucketName string, objectsCh <-chan ObjectInfo, errorCh chan<- RemoveObjectError, opts RemoveObjectsOptions) {
func (c *Client) removeObjects(ctx context.Context, bucketName string, objectsCh <-chan ObjectInfo, resultCh chan<- RemoveObjectResult, opts RemoveObjectsOptions) {
maxEntries := 1000
finish := false
urlValues := make(url.Values)
urlValues.Set("delete", "")

// Close error channel when Multi delete finishes.
defer close(errorCh)
// Close result channel when Multi delete finishes.
defer close(resultCh)

// Loop over entries by 1000 and call MultiDelete requests
for {
Expand All @@ -319,22 +391,20 @@ func (c *Client) removeObjects(ctx context.Context, bucketName string, objectsCh
for object := range objectsCh {
if hasInvalidXMLChar(object.Key) {
// Use single DELETE so the object name will be in the request URL instead of the multi-delete XML document.
err := c.removeObject(ctx, bucketName, object.Key, RemoveObjectOptions{
removeResult := c.removeObject(ctx, bucketName, object.Key, RemoveObjectOptions{
VersionID: object.VersionID,
GovernanceBypass: opts.GovernanceBypass,
})
if err != nil {
if err := removeResult.Err; err != nil {
// Version does not exist is not an error ignore and continue.
switch ToErrorResponse(err).Code {
case "InvalidArgument", "NoSuchVersion":
continue
}
errorCh <- RemoveObjectError{
ObjectName: object.Key,
VersionID: object.VersionID,
Err: err,
}
resultCh <- removeResult
}

resultCh <- removeResult
continue
}

Expand Down Expand Up @@ -374,22 +444,22 @@ func (c *Client) removeObjects(ctx context.Context, bucketName string, objectsCh
if resp != nil {
if resp.StatusCode != http.StatusOK {
e := httpRespToErrorResponse(resp, bucketName, "")
errorCh <- RemoveObjectError{ObjectName: "", Err: e}
resultCh <- RemoveObjectResult{ObjectName: "", Err: e}
}
}
if err != nil {
for _, b := range batch {
errorCh <- RemoveObjectError{
ObjectName: b.Key,
VersionID: b.VersionID,
Err: err,
resultCh <- RemoveObjectResult{
ObjectName: b.Key,
ObjectVersionID: b.VersionID,
Err: err,
}
}
continue
}

// Process multiobjects remove xml response
processRemoveMultiObjectsResponse(resp.Body, batch, errorCh)
processRemoveMultiObjectsResponse(resp.Body, batch, resultCh)

closeResponse(resp)
}
Expand Down
2 changes: 1 addition & 1 deletion api-s3-datatypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ type deletedObject struct {
VersionID string `xml:"VersionId,omitempty"`
// These fields are ignored.
DeleteMarker bool
DeleteMarkerVersionID string
DeleteMarkerVersionID string `xml:"DeleteMarkerVersionId,omitempty"`
}

// nonDeletedObject container for Error element (failed deletion) in MultiObjects Delete XML response
Expand Down
133 changes: 133 additions & 0 deletions functional_tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -2628,6 +2628,138 @@ func testRemoveMultipleObjects() {
successLogger(testName, function, args, startTime).Info()
}

// Test removing multiple objects and check for results
func testRemoveMultipleObjectsWithResult() {
// initialize logging params
startTime := time.Now()
testName := getFuncName()
function := "RemoveObjects(bucketName, objectsCh)"
args := map[string]interface{}{
"bucketName": "",
}

// Seed random based on current time.
rand.Seed(time.Now().Unix())

// Instantiate new minio client object.
c, err := minio.New(os.Getenv(serverEndpoint),
&minio.Options{
Creds: credentials.NewStaticV4(os.Getenv(accessKey), os.Getenv(secretKey), ""),
Secure: mustParseBool(os.Getenv(enableHTTPS)),
})
if err != nil {
logError(testName, function, args, startTime, "", "MinIO client object creation failed", err)
return
}

// Set user agent.
c.SetAppInfo("MinIO-go-FunctionalTest", "0.1.0")

// Enable tracing, write to stdout.
// c.TraceOn(os.Stderr)

// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test-")
args["bucketName"] = bucketName

// Make a new bucket.
err = c.MakeBucket(context.Background(), bucketName, minio.MakeBucketOptions{Region: "us-east-1", ObjectLocking: true})
if err != nil {
logError(testName, function, args, startTime, "", "MakeBucket failed", err)
return
}

defer cleanupVersionedBucket(bucketName, c)

r := bytes.NewReader(bytes.Repeat([]byte("a"), 8))

nrObjects := 10
nrLockedObjects := 5

objectsCh := make(chan minio.ObjectInfo)

go func() {
defer close(objectsCh)
// Upload objects and send them to objectsCh
for i := 0; i < nrObjects; i++ {
objectName := "sample" + strconv.Itoa(i) + ".txt"
info, err := c.PutObject(context.Background(), bucketName, objectName, r, 8,
minio.PutObjectOptions{ContentType: "application/octet-stream"})
if err != nil {
logError(testName, function, args, startTime, "", "PutObject failed", err)
return
}
if i < nrLockedObjects {
// t := time.Date(2130, time.April, 25, 14, 0, 0, 0, time.UTC)
t := time.Now().Add(5 * time.Minute)
m := minio.RetentionMode(minio.Governance)
opts := minio.PutObjectRetentionOptions{
GovernanceBypass: false,
RetainUntilDate: &t,
Mode: &m,
VersionID: info.VersionID,
}
err = c.PutObjectRetention(context.Background(), bucketName, objectName, opts)
if err != nil {
logError(testName, function, args, startTime, "", "Error setting retention", err)
return
}
}

objectsCh <- minio.ObjectInfo{
Key: info.Key,
VersionID: info.VersionID,
}
}
}()

// Call RemoveObjects API
resultCh := c.RemoveObjectsWithResult(context.Background(), bucketName, objectsCh, minio.RemoveObjectsOptions{})

var foundNil, foundErr int

for {
// Check if errorCh doesn't receive any error
select {
case deleteRes, ok := <-resultCh:
if !ok {
goto out
}
if deleteRes.ObjectName == "" {
logError(testName, function, args, startTime, "", "Unexpected object name", nil)
return
}
if deleteRes.ObjectVersionID == "" {
logError(testName, function, args, startTime, "", "Unexpected object version ID", nil)
return
}

if deleteRes.Err == nil {
foundNil++
} else {
foundErr++
}
}
}
out:
if foundNil+foundErr != nrObjects {
logError(testName, function, args, startTime, "", "Unexpected number of results", nil)
return
}

if foundNil != nrObjects-nrLockedObjects {
logError(testName, function, args, startTime, "", "Unexpected number of nil errors", nil)
return
}

if foundErr != nrLockedObjects {
logError(testName, function, args, startTime, "", "Unexpected number of errors", nil)
return
}

successLogger(testName, function, args, startTime).Info()
}

// Tests FPutObject of a big file to trigger multipart
func testFPutObjectMultipart() {
// initialize logging params
Expand Down Expand Up @@ -12010,6 +12142,7 @@ func main() {
testGetObjectClosedTwice()
testGetObjectS3Zip()
testRemoveMultipleObjects()
testRemoveMultipleObjectsWithResult()
testFPutObjectMultipart()
testFPutObject()
testGetObjectReadSeekFunctional()
Expand Down

0 comments on commit a566788

Please sign in to comment.