summaryrefslogtreecommitdiff
path: root/vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go')
-rw-r--r--vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go108
1 files changed, 66 insertions, 42 deletions
diff --git a/vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go b/vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go
index 11b3a5255..464bde7f3 100644
--- a/vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go
+++ b/vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go
@@ -22,6 +22,7 @@ import (
"context"
"encoding/base64"
"fmt"
+ "hash/crc32"
"io"
"net/http"
"net/url"
@@ -38,9 +39,8 @@ import (
//
// Following code handles these types of readers.
//
-// - *minio.Object
-// - Any reader which has a method 'ReadAt()'
-//
+// - *minio.Object
+// - Any reader which has a method 'ReadAt()'
func (c *Client) putObjectMultipartStream(ctx context.Context, bucketName, objectName string,
reader io.Reader, size int64, opts PutObjectOptions,
) (info UploadInfo, err error) {
@@ -184,12 +184,7 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN
sectionReader := newHook(io.NewSectionReader(reader, readOffset, partSize), opts.Progress)
// Proceed to upload the part.
- objPart, err := c.uploadPart(ctx, bucketName, objectName,
- uploadID, sectionReader, uploadReq.PartNum,
- "", "", partSize,
- opts.ServerSideEncryption,
- !opts.DisableContentSha256,
- )
+ objPart, err := c.uploadPart(ctx, bucketName, objectName, uploadID, sectionReader, uploadReq.PartNum, "", "", partSize, opts.ServerSideEncryption, !opts.DisableContentSha256, nil)
if err != nil {
uploadedPartsCh <- uploadedPartRes{
Error: err,
@@ -260,6 +255,13 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b
return UploadInfo{}, err
}
+ if !opts.SendContentMd5 {
+ if opts.UserMetadata == nil {
+ opts.UserMetadata = make(map[string]string, 1)
+ }
+ opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C"
+ }
+
// Calculate the optimal parts info for a given size.
totalPartsCount, partSize, lastPartSize, err := OptimalPartInfo(size, opts.PartSize)
if err != nil {
@@ -270,6 +272,7 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b
if err != nil {
return UploadInfo{}, err
}
+ delete(opts.UserMetadata, "X-Amz-Checksum-Algorithm")
// Aborts the multipart upload if the function returns
// any error, since we do not resume we should purge
@@ -281,6 +284,14 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b
}
}()
+ // Create checksums
+ // CRC32C is ~50% faster on AMD64 @ 30GB/s
+ var crcBytes []byte
+ customHeader := make(http.Header)
+ crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
+ md5Hash := c.md5Hasher()
+ defer md5Hash.Close()
+
// Total data read and written to server. should be equal to 'size' at the end of the call.
var totalUploadedSize int64
@@ -292,7 +303,6 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b
// Avoid declaring variables in the for loop
var md5Base64 string
- var hookReader io.Reader
// Part number always starts with '1'.
var partNumber int
@@ -303,37 +313,34 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b
partSize = lastPartSize
}
- if opts.SendContentMd5 {
- length, rerr := readFull(reader, buf)
- if rerr == io.EOF && partNumber > 1 {
- break
- }
-
- if rerr != nil && rerr != io.ErrUnexpectedEOF && err != io.EOF {
- return UploadInfo{}, rerr
- }
+ length, rerr := readFull(reader, buf)
+ if rerr == io.EOF && partNumber > 1 {
+ break
+ }
- // Calculate md5sum.
- hash := c.md5Hasher()
- hash.Write(buf[:length])
- md5Base64 = base64.StdEncoding.EncodeToString(hash.Sum(nil))
- hash.Close()
+ if rerr != nil && rerr != io.ErrUnexpectedEOF && err != io.EOF {
+ return UploadInfo{}, rerr
+ }
- // Update progress reader appropriately to the latest offset
- // as we read from the source.
- hookReader = newHook(bytes.NewReader(buf[:length]), opts.Progress)
+ // Calculate md5sum.
+ if opts.SendContentMd5 {
+ md5Hash.Reset()
+ md5Hash.Write(buf[:length])
+ md5Base64 = base64.StdEncoding.EncodeToString(md5Hash.Sum(nil))
} else {
- // Update progress reader appropriately to the latest offset
- // as we read from the source.
- hookReader = newHook(reader, opts.Progress)
+ // Add CRC32C instead.
+ crc.Reset()
+ crc.Write(buf[:length])
+ cSum := crc.Sum(nil)
+ customHeader.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(cSum))
+ crcBytes = append(crcBytes, cSum...)
}
- objPart, uerr := c.uploadPart(ctx, bucketName, objectName, uploadID,
- io.LimitReader(hookReader, partSize),
- partNumber, md5Base64, "", partSize,
- opts.ServerSideEncryption,
- !opts.DisableContentSha256,
- )
+ // Update progress reader appropriately to the latest offset
+ // as we read from the source.
+ hooked := newHook(bytes.NewReader(buf[:length]), opts.Progress)
+
+ objPart, uerr := c.uploadPart(ctx, bucketName, objectName, uploadID, hooked, partNumber, md5Base64, "", partSize, opts.ServerSideEncryption, !opts.DisableContentSha256, customHeader)
if uerr != nil {
return UploadInfo{}, uerr
}
@@ -363,15 +370,26 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b
return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i))
}
complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
- ETag: part.ETag,
- PartNumber: part.PartNumber,
+ ETag: part.ETag,
+ PartNumber: part.PartNumber,
+ ChecksumCRC32: part.ChecksumCRC32,
+ ChecksumCRC32C: part.ChecksumCRC32C,
+ ChecksumSHA1: part.ChecksumSHA1,
+ ChecksumSHA256: part.ChecksumSHA256,
})
}
// Sort all completed parts.
sort.Sort(completedParts(complMultipartUpload.Parts))
- uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, PutObjectOptions{})
+ opts = PutObjectOptions{}
+ if len(crcBytes) > 0 {
+ // Add hash of hashes.
+ crc.Reset()
+ crc.Write(crcBytes)
+ opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))}
+ }
+ uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
if err != nil {
return UploadInfo{}, err
}
@@ -490,14 +508,20 @@ func (c *Client) putObjectDo(ctx context.Context, bucketName, objectName string,
// extract lifecycle expiry date and rule ID
expTime, ruleID := amzExpirationToExpiryDateRuleID(resp.Header.Get(amzExpiration))
-
+ h := resp.Header
return UploadInfo{
Bucket: bucketName,
Key: objectName,
- ETag: trimEtag(resp.Header.Get("ETag")),
- VersionID: resp.Header.Get(amzVersionID),
+ ETag: trimEtag(h.Get("ETag")),
+ VersionID: h.Get(amzVersionID),
Size: size,
Expiration: expTime,
ExpirationRuleID: ruleID,
+
+ // Checksum values
+ ChecksumCRC32: h.Get("x-amz-checksum-crc32"),
+ ChecksumCRC32C: h.Get("x-amz-checksum-crc32c"),
+ ChecksumSHA1: h.Get("x-amz-checksum-sha1"),
+ ChecksumSHA256: h.Get("x-amz-checksum-sha256"),
}, nil
}