summaryrefslogtreecommitdiff
path: root/vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go
diff options
context:
space:
mode:
authorLibravatar kim <89579420+NyaaaWhatsUpDoc@users.noreply.github.com>2022-09-28 18:30:40 +0100
committerLibravatar GitHub <noreply@github.com>2022-09-28 18:30:40 +0100
commita156188b3eb5cb3da44aa1b7452265f5fa38a607 (patch)
tree7097fa48d56fbabc7c2c8750b1f3bc9321d71c0f /vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go
parent[bugfix] Fix emphasis being added to emoji shortcodes with markdown parsing (... (diff)
downloadgotosocial-a156188b3eb5cb3da44aa1b7452265f5fa38a607.tar.xz
[chore] update dependencies, bump to Go 1.19.1 (#826)
* update dependencies, bump Go version to 1.19 * bump test image Go version * update golangci-lint * update gotosocial-drone-build * sign * linting, go fmt * update swagger docs * update swagger docs * whitespace * update contributing.md * fuckin whoopsie doopsie * linterino, linteroni * fix followrequest test not starting processor * fix other api/client tests not starting processor * fix remaining tests where processor not started * bump go-runners version * don't check last-webfingered-at, processor may have updated this * update swagger command * update bun to latest version * fix embed to work the same as before with new bun Signed-off-by: kim <grufwub@gmail.com> Co-authored-by: tsmethurst <tobi.smethurst@protonmail.com>
Diffstat (limited to 'vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go')
-rw-r--r--vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go98
1 files changed, 55 insertions, 43 deletions
diff --git a/vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go b/vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go
index 2497aecf3..11b3a5255 100644
--- a/vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go
+++ b/vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go
@@ -130,34 +130,44 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN
var complMultipartUpload completeMultipartUpload
// Declare a channel that sends the next part number to be uploaded.
- // Buffered to 10000 because thats the maximum number of parts allowed
- // by S3.
- uploadPartsCh := make(chan uploadPartReq, 10000)
+ uploadPartsCh := make(chan uploadPartReq)
// Declare a channel that sends back the response of a part upload.
- // Buffered to 10000 because thats the maximum number of parts allowed
- // by S3.
- uploadedPartsCh := make(chan uploadedPartRes, 10000)
+ uploadedPartsCh := make(chan uploadedPartRes)
// Used for readability, lastPartNumber is always totalPartsCount.
lastPartNumber := totalPartsCount
+ partitionCtx, partitionCancel := context.WithCancel(ctx)
+ defer partitionCancel()
// Send each part number to the channel to be processed.
- for p := 1; p <= totalPartsCount; p++ {
- uploadPartsCh <- uploadPartReq{PartNum: p}
- }
- close(uploadPartsCh)
-
- partsBuf := make([][]byte, opts.getNumThreads())
- for i := range partsBuf {
- partsBuf[i] = make([]byte, 0, partSize)
- }
+ go func() {
+ defer close(uploadPartsCh)
+
+ for p := 1; p <= totalPartsCount; p++ {
+ select {
+ case <-partitionCtx.Done():
+ return
+ case uploadPartsCh <- uploadPartReq{PartNum: p}:
+ }
+ }
+ }()
// Receive each part number from the channel allowing three parallel uploads.
for w := 1; w <= opts.getNumThreads(); w++ {
- go func(w int, partSize int64) {
- // Each worker will draw from the part channel and upload in parallel.
- for uploadReq := range uploadPartsCh {
+ go func(partSize int64) {
+ for {
+ var uploadReq uploadPartReq
+ var ok bool
+ select {
+ case <-ctx.Done():
+ return
+ case uploadReq, ok = <-uploadPartsCh:
+ if !ok {
+ return
+ }
+ // Each worker will draw from the part channel and upload in parallel.
+ }
// If partNumber was not uploaded we calculate the missing
// part offset and size. For all other part numbers we
@@ -171,22 +181,15 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN
partSize = lastPartSize
}
- n, rerr := readFull(io.NewSectionReader(reader, readOffset, partSize), partsBuf[w-1][:partSize])
- if rerr != nil && rerr != io.ErrUnexpectedEOF && rerr != io.EOF {
- uploadedPartsCh <- uploadedPartRes{
- Error: rerr,
- }
- // Exit the goroutine.
- return
- }
-
- // Get a section reader on a particular offset.
- hookReader := newHook(bytes.NewReader(partsBuf[w-1][:n]), opts.Progress)
+ sectionReader := newHook(io.NewSectionReader(reader, readOffset, partSize), opts.Progress)
// Proceed to upload the part.
objPart, err := c.uploadPart(ctx, bucketName, objectName,
- uploadID, hookReader, uploadReq.PartNum,
- "", "", partSize, opts.ServerSideEncryption)
+ uploadID, sectionReader, uploadReq.PartNum,
+ "", "", partSize,
+ opts.ServerSideEncryption,
+ !opts.DisableContentSha256,
+ )
if err != nil {
uploadedPartsCh <- uploadedPartRes{
Error: err,
@@ -205,23 +208,28 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN
Part: uploadReq.Part,
}
}
- }(w, partSize)
+ }(partSize)
}
// Gather the responses as they occur and update any
// progress bar.
for u := 1; u <= totalPartsCount; u++ {
- uploadRes := <-uploadedPartsCh
- if uploadRes.Error != nil {
- return UploadInfo{}, uploadRes.Error
+ select {
+ case <-ctx.Done():
+ return UploadInfo{}, ctx.Err()
+ case uploadRes := <-uploadedPartsCh:
+ if uploadRes.Error != nil {
+
+ return UploadInfo{}, uploadRes.Error
+ }
+
+ // Update the totalUploadedSize.
+ totalUploadedSize += uploadRes.Size
+ complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
+ ETag: uploadRes.Part.ETag,
+ PartNumber: uploadRes.Part.PartNumber,
+ })
}
- // Update the totalUploadedSize.
- totalUploadedSize += uploadRes.Size
- // Store the parts to be completed in order.
- complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
- ETag: uploadRes.Part.ETag,
- PartNumber: uploadRes.Part.PartNumber,
- })
}
// Verify if we uploaded all the data.
@@ -322,7 +330,10 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b
objPart, uerr := c.uploadPart(ctx, bucketName, objectName, uploadID,
io.LimitReader(hookReader, partSize),
- partNumber, md5Base64, "", partSize, opts.ServerSideEncryption)
+ partNumber, md5Base64, "", partSize,
+ opts.ServerSideEncryption,
+ !opts.DisableContentSha256,
+ )
if uerr != nil {
return UploadInfo{}, uerr
}
@@ -452,6 +463,7 @@ func (c *Client) putObjectDo(ctx context.Context, bucketName, objectName string,
contentLength: size,
contentMD5Base64: md5Base64,
contentSHA256Hex: sha256Hex,
+ streamSha256: !opts.DisableContentSha256,
}
if opts.Internal.SourceVersionID != "" {
if opts.Internal.SourceVersionID != nullVersionID {