diff options
Diffstat (limited to 'vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go')
-rw-r--r-- | vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go | 108 |
1 files changed, 66 insertions, 42 deletions
diff --git a/vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go b/vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go index 11b3a5255..464bde7f3 100644 --- a/vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go +++ b/vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go @@ -22,6 +22,7 @@ import ( "context" "encoding/base64" "fmt" + "hash/crc32" "io" "net/http" "net/url" @@ -38,9 +39,8 @@ import ( // // Following code handles these types of readers. // -// - *minio.Object -// - Any reader which has a method 'ReadAt()' -// +// - *minio.Object +// - Any reader which has a method 'ReadAt()' func (c *Client) putObjectMultipartStream(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts PutObjectOptions, ) (info UploadInfo, err error) { @@ -184,12 +184,7 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN sectionReader := newHook(io.NewSectionReader(reader, readOffset, partSize), opts.Progress) // Proceed to upload the part. - objPart, err := c.uploadPart(ctx, bucketName, objectName, - uploadID, sectionReader, uploadReq.PartNum, - "", "", partSize, - opts.ServerSideEncryption, - !opts.DisableContentSha256, - ) + objPart, err := c.uploadPart(ctx, bucketName, objectName, uploadID, sectionReader, uploadReq.PartNum, "", "", partSize, opts.ServerSideEncryption, !opts.DisableContentSha256, nil) if err != nil { uploadedPartsCh <- uploadedPartRes{ Error: err, @@ -260,6 +255,13 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b return UploadInfo{}, err } + if !opts.SendContentMd5 { + if opts.UserMetadata == nil { + opts.UserMetadata = make(map[string]string, 1) + } + opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C" + } + // Calculate the optimal parts info for a given size. totalPartsCount, partSize, lastPartSize, err := OptimalPartInfo(size, opts.PartSize) if err != nil { @@ -270,6 +272,7 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b if err != nil { return UploadInfo{}, err } + delete(opts.UserMetadata, "X-Amz-Checksum-Algorithm") // Aborts the multipart upload if the function returns // any error, since we do not resume we should purge @@ -281,6 +284,14 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b } }() + // Create checksums + // CRC32C is ~50% faster on AMD64 @ 30GB/s + var crcBytes []byte + customHeader := make(http.Header) + crc := crc32.New(crc32.MakeTable(crc32.Castagnoli)) + md5Hash := c.md5Hasher() + defer md5Hash.Close() + // Total data read and written to server. should be equal to 'size' at the end of the call. var totalUploadedSize int64 @@ -292,7 +303,6 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b // Avoid declaring variables in the for loop var md5Base64 string - var hookReader io.Reader // Part number always starts with '1'. var partNumber int @@ -303,37 +313,34 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b partSize = lastPartSize } - if opts.SendContentMd5 { - length, rerr := readFull(reader, buf) - if rerr == io.EOF && partNumber > 1 { - break - } - - if rerr != nil && rerr != io.ErrUnexpectedEOF && err != io.EOF { - return UploadInfo{}, rerr - } + length, rerr := readFull(reader, buf) + if rerr == io.EOF && partNumber > 1 { + break + } - // Calculate md5sum. - hash := c.md5Hasher() - hash.Write(buf[:length]) - md5Base64 = base64.StdEncoding.EncodeToString(hash.Sum(nil)) - hash.Close() + if rerr != nil && rerr != io.ErrUnexpectedEOF && err != io.EOF { + return UploadInfo{}, rerr + } - // Update progress reader appropriately to the latest offset - // as we read from the source. - hookReader = newHook(bytes.NewReader(buf[:length]), opts.Progress) + // Calculate md5sum. + if opts.SendContentMd5 { + md5Hash.Reset() + md5Hash.Write(buf[:length]) + md5Base64 = base64.StdEncoding.EncodeToString(md5Hash.Sum(nil)) } else { - // Update progress reader appropriately to the latest offset - // as we read from the source. - hookReader = newHook(reader, opts.Progress) + // Add CRC32C instead. + crc.Reset() + crc.Write(buf[:length]) + cSum := crc.Sum(nil) + customHeader.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(cSum)) + crcBytes = append(crcBytes, cSum...) } - objPart, uerr := c.uploadPart(ctx, bucketName, objectName, uploadID, - io.LimitReader(hookReader, partSize), - partNumber, md5Base64, "", partSize, - opts.ServerSideEncryption, - !opts.DisableContentSha256, - ) + // Update progress reader appropriately to the latest offset + // as we read from the source. + hooked := newHook(bytes.NewReader(buf[:length]), opts.Progress) + + objPart, uerr := c.uploadPart(ctx, bucketName, objectName, uploadID, hooked, partNumber, md5Base64, "", partSize, opts.ServerSideEncryption, !opts.DisableContentSha256, customHeader) if uerr != nil { return UploadInfo{}, uerr } @@ -363,15 +370,26 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i)) } complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{ - ETag: part.ETag, - PartNumber: part.PartNumber, + ETag: part.ETag, + PartNumber: part.PartNumber, + ChecksumCRC32: part.ChecksumCRC32, + ChecksumCRC32C: part.ChecksumCRC32C, + ChecksumSHA1: part.ChecksumSHA1, + ChecksumSHA256: part.ChecksumSHA256, }) } // Sort all completed parts. sort.Sort(completedParts(complMultipartUpload.Parts)) - uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, PutObjectOptions{}) + opts = PutObjectOptions{} + if len(crcBytes) > 0 { + // Add hash of hashes. + crc.Reset() + crc.Write(crcBytes) + opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))} + } + uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts) if err != nil { return UploadInfo{}, err } @@ -490,14 +508,20 @@ func (c *Client) putObjectDo(ctx context.Context, bucketName, objectName string, // extract lifecycle expiry date and rule ID expTime, ruleID := amzExpirationToExpiryDateRuleID(resp.Header.Get(amzExpiration)) - + h := resp.Header return UploadInfo{ Bucket: bucketName, Key: objectName, - ETag: trimEtag(resp.Header.Get("ETag")), - VersionID: resp.Header.Get(amzVersionID), + ETag: trimEtag(h.Get("ETag")), + VersionID: h.Get(amzVersionID), Size: size, Expiration: expTime, ExpirationRuleID: ruleID, + + // Checksum values + ChecksumCRC32: h.Get("x-amz-checksum-crc32"), + ChecksumCRC32C: h.Get("x-amz-checksum-crc32c"), + ChecksumSHA1: h.Get("x-amz-checksum-sha1"), + ChecksumSHA256: h.Get("x-amz-checksum-sha256"), }, nil } |