diff options
Diffstat (limited to 'vendor/github.com/minio/minio-go/v7/api-remove.go')
| -rw-r--r-- | vendor/github.com/minio/minio-go/v7/api-remove.go | 187 |
1 files changed, 177 insertions, 10 deletions
diff --git a/vendor/github.com/minio/minio-go/v7/api-remove.go b/vendor/github.com/minio/minio-go/v7/api-remove.go index 5b4443ec5..2a38e014a 100644 --- a/vendor/github.com/minio/minio-go/v7/api-remove.go +++ b/vendor/github.com/minio/minio-go/v7/api-remove.go @@ -22,6 +22,7 @@ import ( "context" "encoding/xml" "io" + "iter" "net/http" "net/url" "time" @@ -271,7 +272,7 @@ func processRemoveMultiObjectsResponse(body io.Reader, resultCh chan<- RemoveObj for _, obj := range rmResult.UnDeletedObjects { // Version does not exist is not an error ignore and continue. switch obj.Code { - case "InvalidArgument", "NoSuchVersion": + case InvalidArgument, NoSuchVersion: continue } resultCh <- RemoveObjectResult{ @@ -333,6 +334,33 @@ func (c *Client) RemoveObjects(ctx context.Context, bucketName string, objectsCh return errorCh } +// RemoveObjectsWithIter bulk deletes multiple objects from a bucket. +// Objects (with optional versions) to be removed must be provided with +// an iterator. Objects are removed asynchronously and results must be +// consumed. If the returned result iterator is stopped, the context is +// canceled, or a remote call failed, the provided iterator will no +// longer accept more objects. +func (c *Client) RemoveObjectsWithIter(ctx context.Context, bucketName string, objectsIter iter.Seq[ObjectInfo], opts RemoveObjectsOptions) (iter.Seq[RemoveObjectResult], error) { + // Validate if bucket name is valid. + if err := s3utils.CheckValidBucketName(bucketName); err != nil { + return nil, err + } + // Validate objects channel to be properly allocated. + if objectsIter == nil { + return nil, errInvalidArgument("Objects iter can never by nil") + } + + return func(yield func(RemoveObjectResult) bool) { + select { + case <-ctx.Done(): + return + default: + } + + c.removeObjectsIter(ctx, bucketName, objectsIter, yield, opts) + }, nil +} + // RemoveObjectsWithResult removes multiple objects from a bucket while // it is possible to specify objects versions which are received from // objectsCh. Remove results, successes and failures are sent back via @@ -381,6 +409,144 @@ func hasInvalidXMLChar(str string) bool { return false } +// Generate and call MultiDelete S3 requests based on entries received from the iterator. +func (c *Client) removeObjectsIter(ctx context.Context, bucketName string, objectsIter iter.Seq[ObjectInfo], yield func(RemoveObjectResult) bool, opts RemoveObjectsOptions) { + maxEntries := 1000 + urlValues := make(url.Values) + urlValues.Set("delete", "") + + // Build headers. + headers := make(http.Header) + if opts.GovernanceBypass { + // Set the bypass goverenance retention header + headers.Set(amzBypassGovernance, "true") + } + + processRemoveMultiObjectsResponseIter := func(batch []ObjectInfo, yield func(RemoveObjectResult) bool) bool { + if len(batch) == 0 { + return false + } + + // Generate remove multi objects XML request + removeBytes := generateRemoveMultiObjectsRequest(batch) + // Execute POST on bucket to remove objects. + resp, err := c.executeMethod(ctx, http.MethodPost, requestMetadata{ + bucketName: bucketName, + queryValues: urlValues, + contentBody: bytes.NewReader(removeBytes), + contentLength: int64(len(removeBytes)), + contentMD5Base64: sumMD5Base64(removeBytes), + contentSHA256Hex: sum256Hex(removeBytes), + customHeader: headers, + }) + if resp != nil { + defer closeResponse(resp) + if resp.StatusCode != http.StatusOK { + err = httpRespToErrorResponse(resp, bucketName, "") + } + } + if err != nil { + for _, b := range batch { + if !yield(RemoveObjectResult{ + ObjectName: b.Key, + ObjectVersionID: b.VersionID, + Err: err, + }) { + return false + } + } + return false + } + + // Parse multi delete XML response + rmResult := &deleteMultiObjectsResult{} + if err := xmlDecoder(resp.Body, rmResult); err != nil { + yield(RemoveObjectResult{ObjectName: "", Err: err}) + return false + } + + // Fill deletion that returned an error. + for _, obj := range rmResult.UnDeletedObjects { + // Version does not exist is not an error ignore and continue. + switch obj.Code { + case "InvalidArgument", "NoSuchVersion": + continue + } + if !yield(RemoveObjectResult{ + ObjectName: obj.Key, + ObjectVersionID: obj.VersionID, + Err: ErrorResponse{ + Code: obj.Code, + Message: obj.Message, + }, + }) { + return false + } + } + + // Fill deletion that returned success + for _, obj := range rmResult.DeletedObjects { + if !yield(RemoveObjectResult{ + ObjectName: obj.Key, + // Only filled with versioned buckets + ObjectVersionID: obj.VersionID, + DeleteMarker: obj.DeleteMarker, + DeleteMarkerVersionID: obj.DeleteMarkerVersionID, + }) { + return false + } + } + + return true + } + + var batch []ObjectInfo + + next, stop := iter.Pull(objectsIter) + defer stop() + + for { + // Loop over entries by 1000 and call MultiDelete requests + object, ok := next() + if !ok { + // delete the remaining batch. + processRemoveMultiObjectsResponseIter(batch, yield) + return + } + + if hasInvalidXMLChar(object.Key) { + // Use single DELETE so the object name will be in the request URL instead of the multi-delete XML document. + removeResult := c.removeObject(ctx, bucketName, object.Key, RemoveObjectOptions{ + VersionID: object.VersionID, + GovernanceBypass: opts.GovernanceBypass, + }) + if err := removeResult.Err; err != nil { + // Version does not exist is not an error ignore and continue. + switch ToErrorResponse(err).Code { + case "InvalidArgument", "NoSuchVersion": + continue + } + } + if !yield(removeResult) { + return + } + + continue + } + + batch = append(batch, object) + if len(batch) < maxEntries { + continue + } + + if !processRemoveMultiObjectsResponseIter(batch, yield) { + return + } + + batch = batch[:0] + } +} + // Generate and call MultiDelete S3 requests based on entries received from objectsCh func (c *Client) removeObjects(ctx context.Context, bucketName string, objectsCh <-chan ObjectInfo, resultCh chan<- RemoveObjectResult, opts RemoveObjectsOptions) { maxEntries := 1000 @@ -407,7 +573,7 @@ func (c *Client) removeObjects(ctx context.Context, bucketName string, objectsCh if err := removeResult.Err; err != nil { // Version does not exist is not an error ignore and continue. switch ToErrorResponse(err).Code { - case "InvalidArgument", "NoSuchVersion": + case InvalidArgument, NoSuchVersion: continue } resultCh <- removeResult @@ -442,13 +608,14 @@ func (c *Client) removeObjects(ctx context.Context, bucketName string, objectsCh removeBytes := generateRemoveMultiObjectsRequest(batch) // Execute POST on bucket to remove objects. resp, err := c.executeMethod(ctx, http.MethodPost, requestMetadata{ - bucketName: bucketName, - queryValues: urlValues, - contentBody: bytes.NewReader(removeBytes), - contentLength: int64(len(removeBytes)), - contentMD5Base64: sumMD5Base64(removeBytes), - contentSHA256Hex: sum256Hex(removeBytes), - customHeader: headers, + bucketName: bucketName, + queryValues: urlValues, + contentBody: bytes.NewReader(removeBytes), + contentLength: int64(len(removeBytes)), + contentMD5Base64: sumMD5Base64(removeBytes), + contentSHA256Hex: sum256Hex(removeBytes), + customHeader: headers, + expect200OKWithError: true, }) if resp != nil { if resp.StatusCode != http.StatusOK { @@ -535,7 +702,7 @@ func (c *Client) abortMultipartUpload(ctx context.Context, bucketName, objectNam // This is needed specifically for abort and it cannot // be converged into default case. errorResponse = ErrorResponse{ - Code: "NoSuchUpload", + Code: NoSuchUpload, Message: "The specified multipart upload does not exist.", BucketName: bucketName, Key: objectName, |
