diff options
author | 2022-09-28 18:30:40 +0100 | |
---|---|---|
committer | 2022-09-28 18:30:40 +0100 | |
commit | a156188b3eb5cb3da44aa1b7452265f5fa38a607 (patch) | |
tree | 7097fa48d56fbabc7c2c8750b1f3bc9321d71c0f /vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go | |
parent | [bugfix] Fix emphasis being added to emoji shortcodes with markdown parsing (... (diff) | |
download | gotosocial-a156188b3eb5cb3da44aa1b7452265f5fa38a607.tar.xz |
[chore] update dependencies, bump to Go 1.19.1 (#826)
* update dependencies, bump Go version to 1.19
* bump test image Go version
* update golangci-lint
* update gotosocial-drone-build
* sign
* linting, go fmt
* update swagger docs
* update swagger docs
* whitespace
* update contributing.md
* fuckin whoopsie doopsie
* linterino, linteroni
* fix followrequest test not starting processor
* fix other api/client tests not starting processor
* fix remaining tests where processor not started
* bump go-runners version
* don't check last-webfingered-at, processor may have updated this
* update swagger command
* update bun to latest version
* fix embed to work the same as before with new bun
Signed-off-by: kim <grufwub@gmail.com>
Co-authored-by: tsmethurst <tobi.smethurst@protonmail.com>
Diffstat (limited to 'vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go')
-rw-r--r-- | vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go | 98 |
1 files changed, 55 insertions, 43 deletions
diff --git a/vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go b/vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go index 2497aecf3..11b3a5255 100644 --- a/vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go +++ b/vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go @@ -130,34 +130,44 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN var complMultipartUpload completeMultipartUpload // Declare a channel that sends the next part number to be uploaded. - // Buffered to 10000 because thats the maximum number of parts allowed - // by S3. - uploadPartsCh := make(chan uploadPartReq, 10000) + uploadPartsCh := make(chan uploadPartReq) // Declare a channel that sends back the response of a part upload. - // Buffered to 10000 because thats the maximum number of parts allowed - // by S3. - uploadedPartsCh := make(chan uploadedPartRes, 10000) + uploadedPartsCh := make(chan uploadedPartRes) // Used for readability, lastPartNumber is always totalPartsCount. lastPartNumber := totalPartsCount + partitionCtx, partitionCancel := context.WithCancel(ctx) + defer partitionCancel() // Send each part number to the channel to be processed. - for p := 1; p <= totalPartsCount; p++ { - uploadPartsCh <- uploadPartReq{PartNum: p} - } - close(uploadPartsCh) - - partsBuf := make([][]byte, opts.getNumThreads()) - for i := range partsBuf { - partsBuf[i] = make([]byte, 0, partSize) - } + go func() { + defer close(uploadPartsCh) + + for p := 1; p <= totalPartsCount; p++ { + select { + case <-partitionCtx.Done(): + return + case uploadPartsCh <- uploadPartReq{PartNum: p}: + } + } + }() // Receive each part number from the channel allowing three parallel uploads. for w := 1; w <= opts.getNumThreads(); w++ { - go func(w int, partSize int64) { - // Each worker will draw from the part channel and upload in parallel. - for uploadReq := range uploadPartsCh { + go func(partSize int64) { + for { + var uploadReq uploadPartReq + var ok bool + select { + case <-ctx.Done(): + return + case uploadReq, ok = <-uploadPartsCh: + if !ok { + return + } + // Each worker will draw from the part channel and upload in parallel. + } // If partNumber was not uploaded we calculate the missing // part offset and size. For all other part numbers we @@ -171,22 +181,15 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN partSize = lastPartSize } - n, rerr := readFull(io.NewSectionReader(reader, readOffset, partSize), partsBuf[w-1][:partSize]) - if rerr != nil && rerr != io.ErrUnexpectedEOF && rerr != io.EOF { - uploadedPartsCh <- uploadedPartRes{ - Error: rerr, - } - // Exit the goroutine. - return - } - - // Get a section reader on a particular offset. - hookReader := newHook(bytes.NewReader(partsBuf[w-1][:n]), opts.Progress) + sectionReader := newHook(io.NewSectionReader(reader, readOffset, partSize), opts.Progress) // Proceed to upload the part. objPart, err := c.uploadPart(ctx, bucketName, objectName, - uploadID, hookReader, uploadReq.PartNum, - "", "", partSize, opts.ServerSideEncryption) + uploadID, sectionReader, uploadReq.PartNum, + "", "", partSize, + opts.ServerSideEncryption, + !opts.DisableContentSha256, + ) if err != nil { uploadedPartsCh <- uploadedPartRes{ Error: err, @@ -205,23 +208,28 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN Part: uploadReq.Part, } } - }(w, partSize) + }(partSize) } // Gather the responses as they occur and update any // progress bar. for u := 1; u <= totalPartsCount; u++ { - uploadRes := <-uploadedPartsCh - if uploadRes.Error != nil { - return UploadInfo{}, uploadRes.Error + select { + case <-ctx.Done(): + return UploadInfo{}, ctx.Err() + case uploadRes := <-uploadedPartsCh: + if uploadRes.Error != nil { + + return UploadInfo{}, uploadRes.Error + } + + // Update the totalUploadedSize. + totalUploadedSize += uploadRes.Size + complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{ + ETag: uploadRes.Part.ETag, + PartNumber: uploadRes.Part.PartNumber, + }) } - // Update the totalUploadedSize. - totalUploadedSize += uploadRes.Size - // Store the parts to be completed in order. - complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{ - ETag: uploadRes.Part.ETag, - PartNumber: uploadRes.Part.PartNumber, - }) } // Verify if we uploaded all the data. @@ -322,7 +330,10 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b objPart, uerr := c.uploadPart(ctx, bucketName, objectName, uploadID, io.LimitReader(hookReader, partSize), - partNumber, md5Base64, "", partSize, opts.ServerSideEncryption) + partNumber, md5Base64, "", partSize, + opts.ServerSideEncryption, + !opts.DisableContentSha256, + ) if uerr != nil { return UploadInfo{}, uerr } @@ -452,6 +463,7 @@ func (c *Client) putObjectDo(ctx context.Context, bucketName, objectName string, contentLength: size, contentMD5Base64: md5Base64, contentSHA256Hex: sha256Hex, + streamSha256: !opts.DisableContentSha256, } if opts.Internal.SourceVersionID != "" { if opts.Internal.SourceVersionID != nullVersionID { |