summaryrefslogtreecommitdiff
path: root/vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go')
-rw-r--r--vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go215
1 files changed, 213 insertions, 2 deletions
diff --git a/vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go b/vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go
index e3a14c59d..cdc7d3d38 100644
--- a/vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go
+++ b/vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go
@@ -28,6 +28,7 @@ import (
"net/url"
"sort"
"strings"
+ "sync"
"github.com/google/uuid"
"github.com/minio/minio-go/v7/pkg/s3utils"
@@ -44,7 +45,9 @@ import (
func (c *Client) putObjectMultipartStream(ctx context.Context, bucketName, objectName string,
reader io.Reader, size int64, opts PutObjectOptions,
) (info UploadInfo, err error) {
- if !isObject(reader) && isReadAt(reader) && !opts.SendContentMd5 {
+ if opts.ConcurrentStreamParts && opts.NumThreads > 1 {
+ info, err = c.putObjectMultipartStreamParallel(ctx, bucketName, objectName, reader, opts)
+ } else if !isObject(reader) && isReadAt(reader) && !opts.SendContentMd5 {
// Verify if the reader implements ReadAt and it is not a *minio.Object then we will use parallel uploader.
info, err = c.putObjectMultipartStreamFromReadAt(ctx, bucketName, objectName, reader.(io.ReaderAt), size, opts)
} else {
@@ -266,6 +269,9 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN
// Sort all completed parts.
sort.Sort(completedParts(complMultipartUpload.Parts))
+ opts = PutObjectOptions{
+ ServerSideEncryption: opts.ServerSideEncryption,
+ }
if withChecksum {
// Add hash of hashes.
crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
@@ -278,7 +284,7 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN
opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))}
}
- uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, PutObjectOptions{})
+ uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
if err != nil {
return UploadInfo{}, err
}
@@ -425,6 +431,211 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b
// Sort all completed parts.
sort.Sort(completedParts(complMultipartUpload.Parts))
+ opts = PutObjectOptions{
+ ServerSideEncryption: opts.ServerSideEncryption,
+ }
+ if len(crcBytes) > 0 {
+ // Add hash of hashes.
+ crc.Reset()
+ crc.Write(crcBytes)
+ opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))}
+ }
+ uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
+ if err != nil {
+ return UploadInfo{}, err
+ }
+
+ uploadInfo.Size = totalUploadedSize
+ return uploadInfo, nil
+}
+
+// putObjectMultipartStreamParallel uploads opts.NumThreads parts in parallel.
+// This is expected to take opts.PartSize * opts.NumThreads * (GOGC / 100) bytes of buffer.
+func (c *Client) putObjectMultipartStreamParallel(ctx context.Context, bucketName, objectName string,
+ reader io.Reader, opts PutObjectOptions) (info UploadInfo, err error) {
+ // Input validation.
+ if err = s3utils.CheckValidBucketName(bucketName); err != nil {
+ return UploadInfo{}, err
+ }
+
+ if err = s3utils.CheckValidObjectName(objectName); err != nil {
+ return UploadInfo{}, err
+ }
+
+ if !opts.SendContentMd5 {
+ if opts.UserMetadata == nil {
+ opts.UserMetadata = make(map[string]string, 1)
+ }
+ opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C"
+ }
+
+ // Cancel all when an error occurs.
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ // Calculate the optimal parts info for a given size.
+ totalPartsCount, partSize, _, err := OptimalPartInfo(-1, opts.PartSize)
+ if err != nil {
+ return UploadInfo{}, err
+ }
+
+ // Initiates a new multipart request
+ uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
+ if err != nil {
+ return UploadInfo{}, err
+ }
+ delete(opts.UserMetadata, "X-Amz-Checksum-Algorithm")
+
+ // Aborts the multipart upload if the function returns
+ // any error, since we do not resume we should purge
+ // the parts which have been uploaded to relinquish
+ // storage space.
+ defer func() {
+ if err != nil {
+ c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
+ }
+ }()
+
+ // Create checksums
+ // CRC32C is ~50% faster on AMD64 @ 30GB/s
+ var crcBytes []byte
+ crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
+ md5Hash := c.md5Hasher()
+ defer md5Hash.Close()
+
+ // Total data read and written to server. should be equal to 'size' at the end of the call.
+ var totalUploadedSize int64
+
+ // Initialize parts uploaded map.
+ partsInfo := make(map[int]ObjectPart)
+
+ // Create a buffer.
+ nBuffers := int64(opts.NumThreads)
+ bufs := make(chan []byte, nBuffers)
+ all := make([]byte, nBuffers*partSize)
+ for i := int64(0); i < nBuffers; i++ {
+ bufs <- all[i*partSize : i*partSize+partSize]
+ }
+
+ var wg sync.WaitGroup
+ var mu sync.Mutex
+ errCh := make(chan error, opts.NumThreads)
+
+ reader = newHook(reader, opts.Progress)
+
+ // Part number always starts with '1'.
+ var partNumber int
+ for partNumber = 1; partNumber <= totalPartsCount; partNumber++ {
+ // Proceed to upload the part.
+ var buf []byte
+ select {
+ case buf = <-bufs:
+ case err = <-errCh:
+ cancel()
+ wg.Wait()
+ return UploadInfo{}, err
+ }
+
+ if int64(len(buf)) != partSize {
+ return UploadInfo{}, fmt.Errorf("read buffer < %d than expected partSize: %d", len(buf), partSize)
+ }
+
+ length, rerr := readFull(reader, buf)
+ if rerr == io.EOF && partNumber > 1 {
+ // Done
+ break
+ }
+
+ if rerr != nil && rerr != io.ErrUnexpectedEOF && err != io.EOF {
+ cancel()
+ wg.Wait()
+ return UploadInfo{}, rerr
+ }
+
+ // Calculate md5sum.
+ customHeader := make(http.Header)
+ if !opts.SendContentMd5 {
+ // Add CRC32C instead.
+ crc.Reset()
+ crc.Write(buf[:length])
+ cSum := crc.Sum(nil)
+ customHeader.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(cSum))
+ crcBytes = append(crcBytes, cSum...)
+ }
+
+ wg.Add(1)
+ go func(partNumber int) {
+ // Avoid declaring variables in the for loop
+ var md5Base64 string
+
+ if opts.SendContentMd5 {
+ md5Hash.Reset()
+ md5Hash.Write(buf[:length])
+ md5Base64 = base64.StdEncoding.EncodeToString(md5Hash.Sum(nil))
+ }
+
+ defer wg.Done()
+ p := uploadPartParams{
+ bucketName: bucketName,
+ objectName: objectName,
+ uploadID: uploadID,
+ reader: bytes.NewReader(buf[:length]),
+ partNumber: partNumber,
+ md5Base64: md5Base64,
+ size: int64(length),
+ sse: opts.ServerSideEncryption,
+ streamSha256: !opts.DisableContentSha256,
+ customHeader: customHeader,
+ }
+ objPart, uerr := c.uploadPart(ctx, p)
+ if uerr != nil {
+ errCh <- uerr
+ }
+
+ // Save successfully uploaded part metadata.
+ mu.Lock()
+ partsInfo[partNumber] = objPart
+ mu.Unlock()
+
+ // Send buffer back so it can be reused.
+ bufs <- buf
+ }(partNumber)
+
+ // Save successfully uploaded size.
+ totalUploadedSize += int64(length)
+ }
+ wg.Wait()
+
+ // Collect any error
+ select {
+ case err = <-errCh:
+ return UploadInfo{}, err
+ default:
+ }
+
+ // Complete multipart upload.
+ var complMultipartUpload completeMultipartUpload
+
+ // Loop over total uploaded parts to save them in
+ // Parts array before completing the multipart request.
+ for i := 1; i < partNumber; i++ {
+ part, ok := partsInfo[i]
+ if !ok {
+ return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i))
+ }
+ complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
+ ETag: part.ETag,
+ PartNumber: part.PartNumber,
+ ChecksumCRC32: part.ChecksumCRC32,
+ ChecksumCRC32C: part.ChecksumCRC32C,
+ ChecksumSHA1: part.ChecksumSHA1,
+ ChecksumSHA256: part.ChecksumSHA256,
+ })
+ }
+
+ // Sort all completed parts.
+ sort.Sort(completedParts(complMultipartUpload.Parts))
+
opts = PutObjectOptions{}
if len(crcBytes) > 0 {
// Add hash of hashes.