diff options
Diffstat (limited to 'vendor/codeberg.org/gruf/go-storage/s3/s3.go')
-rw-r--r-- | vendor/codeberg.org/gruf/go-storage/s3/s3.go | 479 |
1 files changed, 479 insertions, 0 deletions
diff --git a/vendor/codeberg.org/gruf/go-storage/s3/s3.go b/vendor/codeberg.org/gruf/go-storage/s3/s3.go new file mode 100644 index 000000000..0067d3e19 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-storage/s3/s3.go @@ -0,0 +1,479 @@ +package s3 + +import ( + "bytes" + "context" + "errors" + "io" + + "codeberg.org/gruf/go-storage" + "codeberg.org/gruf/go-storage/internal" + "github.com/minio/minio-go/v7" +) + +// ensure S3Storage conforms to storage.Storage. +var _ storage.Storage = (*S3Storage)(nil) + +// ensure bytes.Reader conforms to ReaderSize. +var _ ReaderSize = (*bytes.Reader)(nil) + +// ReaderSize is an extension of the io.Reader interface +// that may be implemented by callers of WriteStream() in +// order to improve performance. When the size is known it +// is passed onto the underlying minio S3 library. +type ReaderSize interface { + io.Reader + Size() int64 +} + +// DefaultConfig returns the default S3Storage configuration. +func DefaultConfig() Config { + return defaultConfig +} + +// immutable default configuration. +var defaultConfig = Config{ + CoreOpts: minio.Options{}, + GetOpts: minio.GetObjectOptions{}, + PutOpts: minio.PutObjectOptions{}, + PutChunkOpts: minio.PutObjectPartOptions{}, + PutChunkSize: 4 * 1024 * 1024, // 4MiB + StatOpts: minio.StatObjectOptions{}, + RemoveOpts: minio.RemoveObjectOptions{}, + ListSize: 200, +} + +// Config defines options to be used when opening an S3Storage, +// mostly options for underlying S3 client library. +type Config struct { + // CoreOpts are S3 client options + // passed during initialization. + CoreOpts minio.Options + + // GetOpts are S3 client options + // passed during .Read___() calls. + GetOpts minio.GetObjectOptions + + // PutOpts are S3 client options + // passed during .Write___() calls. + PutOpts minio.PutObjectOptions + + // PutChunkSize is the chunk size (in bytes) + // to use when sending a byte stream reader + // of unknown size as a multi-part object. + PutChunkSize int64 + + // PutChunkOpts are S3 client options + // passed during chunked .Write___() calls. + PutChunkOpts minio.PutObjectPartOptions + + // StatOpts are S3 client options + // passed during .Stat() calls. + StatOpts minio.StatObjectOptions + + // RemoveOpts are S3 client options + // passed during .Remove() calls. + RemoveOpts minio.RemoveObjectOptions + + // ListSize determines how many items + // to include in each list request, made + // during calls to .WalkKeys(). + ListSize int +} + +// getS3Config returns valid (and owned!) Config for given ptr. +func getS3Config(cfg *Config) Config { + // See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html + const minChunkSz = 5 * 1024 * 1024 + + if cfg == nil { + // use defaults. + return defaultConfig + } + + // Ensure a minimum compat chunk size. + if cfg.PutChunkSize <= minChunkSz { + cfg.PutChunkSize = minChunkSz + } + + // Ensure valid list size. + if cfg.ListSize <= 0 { + cfg.ListSize = 200 + } + + return Config{ + CoreOpts: cfg.CoreOpts, + GetOpts: cfg.GetOpts, + PutOpts: cfg.PutOpts, + PutChunkSize: cfg.PutChunkSize, + ListSize: cfg.ListSize, + StatOpts: cfg.StatOpts, + RemoveOpts: cfg.RemoveOpts, + } +} + +// S3Storage is a storage implementation that stores key-value +// pairs in an S3 instance at given endpoint with bucket name. +type S3Storage struct { + client *minio.Core + bucket string + config Config +} + +// Open opens a new S3Storage instance with given S3 endpoint URL, bucket name and configuration. +func Open(endpoint string, bucket string, cfg *Config) (*S3Storage, error) { + // Check + set config defaults. + config := getS3Config(cfg) + + // Create new S3 client connection to given endpoint. + client, err := minio.NewCore(endpoint, &config.CoreOpts) + if err != nil { + return nil, err + } + + ctx := context.Background() + + // Check that provided bucket actually exists. + exists, err := client.BucketExists(ctx, bucket) + if err != nil { + return nil, err + } else if !exists { + return nil, errors.New("storage/s3: bucket does not exist") + } + + return &S3Storage{ + client: client, + bucket: bucket, + config: config, + }, nil +} + +// Client: returns access to the underlying S3 client. +func (st *S3Storage) Client() *minio.Core { + return st.client +} + +// Clean: implements Storage.Clean(). +func (st *S3Storage) Clean(ctx context.Context) error { + return nil // nothing to do for S3 +} + +// ReadBytes: implements Storage.ReadBytes(). +func (st *S3Storage) ReadBytes(ctx context.Context, key string) ([]byte, error) { + // Get stream reader for key + rc, err := st.ReadStream(ctx, key) + if err != nil { + return nil, err + } + + // Read all data to memory. + data, err := io.ReadAll(rc) + if err != nil { + _ = rc.Close() + return nil, err + } + + // Close storage stream reader. + if err := rc.Close(); err != nil { + return nil, err + } + + return data, nil +} + +// ReadStream: implements Storage.ReadStream(). +func (st *S3Storage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) { + // Fetch object reader from S3 bucket + rc, _, _, err := st.client.GetObject( + ctx, + st.bucket, + key, + st.config.GetOpts, + ) + if err != nil { + + if isNotFoundError(err) { + // Wrap not found errors as our not found type. + err = internal.WrapErr(err, storage.ErrNotFound) + } else if !isObjectNameError(err) { + // Wrap object name errors as our invalid key type. + err = internal.WrapErr(err, storage.ErrInvalidKey) + } + + return nil, transformS3Error(err) + } + return rc, nil +} + +// WriteBytes: implements Storage.WriteBytes(). +func (st *S3Storage) WriteBytes(ctx context.Context, key string, value []byte) (int, error) { + n, err := st.WriteStream(ctx, key, bytes.NewReader(value)) + return int(n), err +} + +// WriteStream: implements Storage.WriteStream(). +func (st *S3Storage) WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) { + if rs, ok := r.(ReaderSize); ok { + // This reader supports providing us the size of + // the encompassed data, allowing us to perform + // a singular .PutObject() call with length. + info, err := st.client.PutObject( + ctx, + st.bucket, + key, + r, + rs.Size(), + "", + "", + st.config.PutOpts, + ) + if err != nil { + + if isConflictError(err) { + // Wrap conflict errors as our already exists type. + err = internal.WrapErr(err, storage.ErrAlreadyExists) + } else if !isObjectNameError(err) { + // Wrap object name errors as our invalid key type. + err = internal.WrapErr(err, storage.ErrInvalidKey) + } + + return 0, err + } + + return info.Size, nil + } + + // Start a new multipart upload to get ID. + uploadID, err := st.client.NewMultipartUpload( + ctx, + st.bucket, + key, + st.config.PutOpts, + ) + if err != nil { + + if isConflictError(err) { + // Wrap conflict errors as our already exists type. + err = internal.WrapErr(err, storage.ErrAlreadyExists) + } else if !isObjectNameError(err) { + // Wrap object name errors as our invalid key type. + err = internal.WrapErr(err, storage.ErrInvalidKey) + } + + return 0, transformS3Error(err) + } + + var ( + index = int(1) // parts index + total = int64(0) + parts []minio.CompletePart + chunk = make([]byte, st.config.PutChunkSize) + rbuf = bytes.NewReader(nil) + ) + + // Note that we do not perform any kind of + // memory pooling of the chunk buffers here. + // Optimal chunking sizes for S3 writes are in + // the orders of megabytes, so letting the GC + // collect these ASAP is much preferred. + +loop: + for done := false; !done; { + // Read next chunk into byte buffer. + n, err := io.ReadFull(r, chunk) + + switch err { + // Successful read. + case nil: + + // Reached end, buffer empty. + case io.EOF: + break loop + + // Reached end, but buffer not empty. + case io.ErrUnexpectedEOF: + done = true + + // All other errors. + default: + return 0, err + } + + // Reset byte reader. + rbuf.Reset(chunk[:n]) + + // Put this object chunk in S3 store. + pt, err := st.client.PutObjectPart( + ctx, + st.bucket, + key, + uploadID, + index, + rbuf, + int64(n), + st.config.PutChunkOpts, + ) + if err != nil { + return 0, err + } + + // Append completed part to slice. + parts = append(parts, minio.CompletePart{ + PartNumber: pt.PartNumber, + ETag: pt.ETag, + ChecksumCRC32: pt.ChecksumCRC32, + ChecksumCRC32C: pt.ChecksumCRC32C, + ChecksumSHA1: pt.ChecksumSHA1, + ChecksumSHA256: pt.ChecksumSHA256, + }) + + // Iterate. + index++ + + // Update total size. + total += pt.Size + } + + // Complete this multi-part upload operation + _, err = st.client.CompleteMultipartUpload( + ctx, + st.bucket, + key, + uploadID, + parts, + st.config.PutOpts, + ) + if err != nil { + return 0, err + } + + return total, nil +} + +// Stat: implements Storage.Stat(). +func (st *S3Storage) Stat(ctx context.Context, key string) (*storage.Entry, error) { + // Query object in S3 bucket. + stat, err := st.client.StatObject( + ctx, + st.bucket, + key, + st.config.StatOpts, + ) + if err != nil { + + if isNotFoundError(err) { + // Ignore err return + // for not-found. + err = nil + } else if !isObjectNameError(err) { + // Wrap object name errors as our invalid key type. + err = internal.WrapErr(err, storage.ErrInvalidKey) + } + + return nil, err + } + + return &storage.Entry{ + Key: key, + Size: stat.Size, + }, nil +} + +// Remove: implements Storage.Remove(). +func (st *S3Storage) Remove(ctx context.Context, key string) error { + // Query object in S3 bucket. + _, err := st.client.StatObject( + ctx, + st.bucket, + key, + st.config.StatOpts, + ) + if err != nil { + + if isNotFoundError(err) { + // Wrap not found errors as our not found type. + err = internal.WrapErr(err, storage.ErrNotFound) + } else if !isObjectNameError(err) { + // Wrap object name errors as our invalid key type. + err = internal.WrapErr(err, storage.ErrInvalidKey) + } + + return err + } + + // Remove object from S3 bucket + err = st.client.RemoveObject( + ctx, + st.bucket, + key, + st.config.RemoveOpts, + ) + if err != nil { + + if isNotFoundError(err) { + // Wrap not found errors as our not found type. + err = internal.WrapErr(err, storage.ErrNotFound) + } else if !isObjectNameError(err) { + // Wrap object name errors as our invalid key type. + err = internal.WrapErr(err, storage.ErrInvalidKey) + } + + return err + } + + return nil +} + +// WalkKeys: implements Storage.WalkKeys(). +func (st *S3Storage) WalkKeys(ctx context.Context, opts storage.WalkKeysOpts) error { + if opts.Step == nil { + panic("nil step fn") + } + + var ( + prev string + token string + ) + + for { + // List objects in bucket starting at marker. + result, err := st.client.ListObjectsV2( + st.bucket, + opts.Prefix, + prev, + token, + "", + st.config.ListSize, + ) + if err != nil { + return err + } + + // Iterate through list result contents. + for _, obj := range result.Contents { + + // Skip filtered obj keys. + if opts.Filter != nil && + opts.Filter(obj.Key) { + continue + } + + // Pass each obj through step func. + if err := opts.Step(storage.Entry{ + Key: obj.Key, + Size: obj.Size, + }); err != nil { + return err + } + } + + // No token means we reached end of bucket. + if result.NextContinuationToken == "" { + return nil + } + + // Set continue token and prev mark + token = result.NextContinuationToken + prev = result.StartAfter + } +} |