summaryrefslogtreecommitdiff
path: root/vendor/codeberg.org/gruf/go-storage/s3/s3.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/codeberg.org/gruf/go-storage/s3/s3.go')
-rw-r--r--vendor/codeberg.org/gruf/go-storage/s3/s3.go470
1 files changed, 0 insertions, 470 deletions
diff --git a/vendor/codeberg.org/gruf/go-storage/s3/s3.go b/vendor/codeberg.org/gruf/go-storage/s3/s3.go
deleted file mode 100644
index ad7686737..000000000
--- a/vendor/codeberg.org/gruf/go-storage/s3/s3.go
+++ /dev/null
@@ -1,470 +0,0 @@
-package s3
-
-import (
- "bytes"
- "context"
- "errors"
- "io"
- "net/http"
-
- "codeberg.org/gruf/go-storage"
- "codeberg.org/gruf/go-storage/internal"
- "github.com/minio/minio-go/v7"
-)
-
-// ensure S3Storage conforms to storage.Storage.
-var _ storage.Storage = (*S3Storage)(nil)
-
-// ensure bytes.Reader conforms to ReaderSize.
-var _ ReaderSize = (*bytes.Reader)(nil)
-
-// ReaderSize is an extension of the io.Reader interface
-// that may be implemented by callers of WriteStream() in
-// order to improve performance. When the size is known it
-// is passed onto the underlying minio S3 library.
-type ReaderSize interface {
- io.Reader
- Size() int64
-}
-
-// DefaultConfig returns the default S3Storage configuration.
-func DefaultConfig() Config {
- return defaultConfig
-}
-
-// immutable default configuration.
-var defaultConfig = Config{
- CoreOpts: minio.Options{},
- PutChunkSize: 4 * 1024 * 1024, // 4MiB
- ListSize: 200,
-}
-
-// Config defines options to be used when opening an S3Storage,
-// mostly options for underlying S3 client library.
-type Config struct {
- // CoreOpts are S3 client options
- // passed during initialization.
- CoreOpts minio.Options
-
- // PutChunkSize is the chunk size (in bytes)
- // to use when sending a byte stream reader
- // of unknown size as a multi-part object.
- PutChunkSize int64
-
- // ListSize determines how many items
- // to include in each list request, made
- // during calls to .WalkKeys().
- ListSize int
-}
-
-// getS3Config returns valid (and owned!) Config for given ptr.
-func getS3Config(cfg *Config) Config {
- // See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
- const minChunkSz = 5 * 1024 * 1024
-
- if cfg == nil {
- // use defaults.
- return defaultConfig
- }
-
- // Ensure a minimum compat chunk size.
- if cfg.PutChunkSize <= minChunkSz {
- cfg.PutChunkSize = minChunkSz
- }
-
- // Ensure valid list size.
- if cfg.ListSize <= 0 {
- cfg.ListSize = 200
- }
-
- return Config{
- CoreOpts: cfg.CoreOpts,
- PutChunkSize: cfg.PutChunkSize,
- ListSize: cfg.ListSize,
- }
-}
-
-// S3Storage is a storage implementation that stores key-value
-// pairs in an S3 instance at given endpoint with bucket name.
-type S3Storage struct {
- client *minio.Core
- bucket string
- config Config
-}
-
-// Open opens a new S3Storage instance with given S3 endpoint URL, bucket name and configuration.
-func Open(endpoint string, bucket string, cfg *Config) (*S3Storage, error) {
- // Check + set config defaults.
- config := getS3Config(cfg)
-
- // Create new S3 client connection to given endpoint.
- client, err := minio.NewCore(endpoint, &config.CoreOpts)
- if err != nil {
- return nil, err
- }
-
- ctx := context.Background()
-
- // Check that provided bucket actually exists.
- exists, err := client.BucketExists(ctx, bucket)
- if err != nil {
- return nil, err
- } else if !exists {
- return nil, errors.New("storage/s3: bucket does not exist")
- }
-
- return &S3Storage{
- client: client,
- bucket: bucket,
- config: config,
- }, nil
-}
-
-// Client: returns access to the underlying S3 client.
-func (st *S3Storage) Client() *minio.Core {
- return st.client
-}
-
-// Clean: implements Storage.Clean().
-func (st *S3Storage) Clean(ctx context.Context) error {
- return nil // nothing to do for S3
-}
-
-// ReadBytes: implements Storage.ReadBytes().
-func (st *S3Storage) ReadBytes(ctx context.Context, key string) ([]byte, error) {
- // Get stream reader for key
- rc, err := st.ReadStream(ctx, key)
- if err != nil {
- return nil, err
- }
-
- // Read all data to memory.
- data, err := io.ReadAll(rc)
- if err != nil {
- _ = rc.Close()
- return nil, err
- }
-
- // Close storage stream reader.
- if err := rc.Close(); err != nil {
- return nil, err
- }
-
- return data, nil
-}
-
-// ReadStream: implements Storage.ReadStream().
-func (st *S3Storage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) {
- rc, _, _, err := st.GetObject(ctx, key, minio.GetObjectOptions{})
- return rc, err
-}
-
-// GetObject wraps minio.Core{}.GetObject() to handle wrapping with our own storage library error types.
-func (st *S3Storage) GetObject(ctx context.Context, key string, opts minio.GetObjectOptions) (io.ReadCloser, minio.ObjectInfo, http.Header, error) {
-
- // Query bucket for object data and info.
- rc, info, hdr, err := st.client.GetObject(
- ctx,
- st.bucket,
- key,
- opts,
- )
- if err != nil {
-
- if isNotFoundError(err) {
- // Wrap not found errors as our not found type.
- err = internal.WrapErr(err, storage.ErrNotFound)
- } else if isObjectNameError(err) {
- // Wrap object name errors as our invalid key type.
- err = internal.WrapErr(err, storage.ErrInvalidKey)
- }
-
- }
-
- return rc, info, hdr, err
-}
-
-// WriteBytes: implements Storage.WriteBytes().
-func (st *S3Storage) WriteBytes(ctx context.Context, key string, value []byte) (int, error) {
- info, err := st.PutObject(ctx, key, bytes.NewReader(value), minio.PutObjectOptions{})
- return int(info.Size), err
-}
-
-// WriteStream: implements Storage.WriteStream().
-func (st *S3Storage) WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) {
- info, err := st.PutObject(ctx, key, r, minio.PutObjectOptions{})
- return info.Size, err
-}
-
-// PutObject wraps minio.Core{}.PutObject() to handle wrapping with our own storage library error types, and in the case of an io.Reader
-// that does not implement ReaderSize{}, it will instead handle upload by using minio.Core{}.NewMultipartUpload() in chunks of PutChunkSize.
-func (st *S3Storage) PutObject(ctx context.Context, key string, r io.Reader, opts minio.PutObjectOptions) (minio.UploadInfo, error) {
- if rs, ok := r.(ReaderSize); ok {
- // This reader supports providing us the size of
- // the encompassed data, allowing us to perform
- // a singular .PutObject() call with length.
- info, err := st.client.PutObject(
- ctx,
- st.bucket,
- key,
- r,
- rs.Size(),
- "",
- "",
- opts,
- )
- if err != nil {
-
- if isConflictError(err) {
- // Wrap conflict errors as our already exists type.
- err = internal.WrapErr(err, storage.ErrAlreadyExists)
- } else if isObjectNameError(err) {
- // Wrap object name errors as our invalid key type.
- err = internal.WrapErr(err, storage.ErrInvalidKey)
- }
-
- }
-
- return info, err
- }
-
- // Start a new multipart upload to get ID.
- uploadID, err := st.client.NewMultipartUpload(
- ctx,
- st.bucket,
- key,
- opts,
- )
- if err != nil {
-
- if isConflictError(err) {
- // Wrap conflict errors as our already exists type.
- err = internal.WrapErr(err, storage.ErrAlreadyExists)
- } else if isObjectNameError(err) {
- // Wrap object name errors as our invalid key type.
- err = internal.WrapErr(err, storage.ErrInvalidKey)
- }
-
- return minio.UploadInfo{}, err
- }
-
- var (
- total = int64(0)
- index = int(1) // parts index
- parts []minio.CompletePart
- chunk = make([]byte, st.config.PutChunkSize)
- rbuf = bytes.NewReader(nil)
- )
-
- // Note that we do not perform any kind of
- // memory pooling of the chunk buffers here.
- // Optimal chunking sizes for S3 writes are in
- // the orders of megabytes, so letting the GC
- // collect these ASAP is much preferred.
-
-loop:
- for done := false; !done; {
- // Read next chunk into byte buffer.
- n, err := io.ReadFull(r, chunk)
-
- switch err {
- // Successful read.
- case nil:
-
- // Reached end, buffer empty.
- case io.EOF:
- break loop
-
- // Reached end, but buffer not empty.
- case io.ErrUnexpectedEOF:
- done = true
-
- // All other errors.
- default:
- return minio.UploadInfo{}, err
- }
-
- // Reset byte reader.
- rbuf.Reset(chunk[:n])
-
- // Put this object chunk in S3 store.
- pt, err := st.client.PutObjectPart(
- ctx,
- st.bucket,
- key,
- uploadID,
- index,
- rbuf,
- int64(n),
- minio.PutObjectPartOptions{
- SSE: opts.ServerSideEncryption,
- DisableContentSha256: opts.DisableContentSha256,
- },
- )
- if err != nil {
- return minio.UploadInfo{}, err
- }
-
- // Append completed part to slice.
- parts = append(parts, minio.CompletePart{
- PartNumber: pt.PartNumber,
- ETag: pt.ETag,
- ChecksumCRC32: pt.ChecksumCRC32,
- ChecksumCRC32C: pt.ChecksumCRC32C,
- ChecksumSHA1: pt.ChecksumSHA1,
- ChecksumSHA256: pt.ChecksumSHA256,
- })
-
- // Update total.
- total += int64(n)
-
- // Iterate.
- index++
- }
-
- // Complete this multi-part upload operation
- info, err := st.client.CompleteMultipartUpload(
- ctx,
- st.bucket,
- key,
- uploadID,
- parts,
- opts,
- )
- if err != nil {
- return minio.UploadInfo{}, err
- }
-
- // Set correct size.
- info.Size = total
- return info, nil
-}
-
-// Stat: implements Storage.Stat().
-func (st *S3Storage) Stat(ctx context.Context, key string) (*storage.Entry, error) {
- info, err := st.StatObject(ctx, key, minio.StatObjectOptions{})
- if err != nil {
- if errors.Is(err, storage.ErrNotFound) {
- err = nil // mask not-found errors
- }
- return nil, err
- }
- return &storage.Entry{
- Key: key,
- Size: info.Size,
- }, nil
-}
-
-// StatObject wraps minio.Core{}.StatObject() to handle wrapping with our own storage library error types.
-func (st *S3Storage) StatObject(ctx context.Context, key string, opts minio.StatObjectOptions) (minio.ObjectInfo, error) {
-
- // Query bucket for object info.
- info, err := st.client.StatObject(
- ctx,
- st.bucket,
- key,
- opts,
- )
- if err != nil {
-
- if isNotFoundError(err) {
- // Wrap not found errors as our not found type.
- err = internal.WrapErr(err, storage.ErrNotFound)
- } else if isObjectNameError(err) {
- // Wrap object name errors as our invalid key type.
- err = internal.WrapErr(err, storage.ErrInvalidKey)
- }
-
- }
-
- return info, err
-}
-
-// Remove: implements Storage.Remove().
-func (st *S3Storage) Remove(ctx context.Context, key string) error {
- _, err := st.StatObject(ctx, key, minio.StatObjectOptions{})
- if err != nil {
- return err
- }
- return st.RemoveObject(ctx, key, minio.RemoveObjectOptions{})
-}
-
-// RemoveObject wraps minio.Core{}.RemoveObject() to handle wrapping with our own storage library error types.
-func (st *S3Storage) RemoveObject(ctx context.Context, key string, opts minio.RemoveObjectOptions) error {
-
- // Remove object from S3 bucket
- err := st.client.RemoveObject(
- ctx,
- st.bucket,
- key,
- opts,
- )
-
- if err != nil {
-
- if isNotFoundError(err) {
- // Wrap not found errors as our not found type.
- err = internal.WrapErr(err, storage.ErrNotFound)
- } else if isObjectNameError(err) {
- // Wrap object name errors as our invalid key type.
- err = internal.WrapErr(err, storage.ErrInvalidKey)
- }
-
- }
-
- return err
-}
-
-// WalkKeys: implements Storage.WalkKeys().
-func (st *S3Storage) WalkKeys(ctx context.Context, opts storage.WalkKeysOpts) error {
- if opts.Step == nil {
- panic("nil step fn")
- }
-
- var (
- prev string
- token string
- )
-
- for {
- // List objects in bucket starting at marker.
- result, err := st.client.ListObjectsV2(
- st.bucket,
- opts.Prefix,
- prev,
- token,
- "",
- st.config.ListSize,
- )
- if err != nil {
- return err
- }
-
- // Iterate through list result contents.
- for _, obj := range result.Contents {
-
- // Skip filtered obj keys.
- if opts.Filter != nil &&
- opts.Filter(obj.Key) {
- continue
- }
-
- // Pass each obj through step func.
- if err := opts.Step(storage.Entry{
- Key: obj.Key,
- Size: obj.Size,
- }); err != nil {
- return err
- }
- }
-
- // No token means we reached end of bucket.
- if result.NextContinuationToken == "" {
- return nil
- }
-
- // Set continue token and prev mark
- token = result.NextContinuationToken
- prev = result.StartAfter
- }
-}