summaryrefslogtreecommitdiff
path: root/vendor/codeberg.org/gruf/go-storage/s3/s3.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/codeberg.org/gruf/go-storage/s3/s3.go')
-rw-r--r--vendor/codeberg.org/gruf/go-storage/s3/s3.go89
1 files changed, 71 insertions, 18 deletions
diff --git a/vendor/codeberg.org/gruf/go-storage/s3/s3.go b/vendor/codeberg.org/gruf/go-storage/s3/s3.go
index ad7686737..c53560161 100644
--- a/vendor/codeberg.org/gruf/go-storage/s3/s3.go
+++ b/vendor/codeberg.org/gruf/go-storage/s3/s3.go
@@ -6,10 +6,12 @@ import (
"errors"
"io"
"net/http"
+ "strings"
"codeberg.org/gruf/go-storage"
"codeberg.org/gruf/go-storage/internal"
"github.com/minio/minio-go/v7"
+ "github.com/minio/minio-go/v7/pkg/s3utils"
)
// ensure S3Storage conforms to storage.Storage.
@@ -42,6 +44,13 @@ var defaultConfig = Config{
// Config defines options to be used when opening an S3Storage,
// mostly options for underlying S3 client library.
type Config struct {
+
+ // KeyPrefix allows specifying a
+ // prefix applied to all S3 object
+ // requests, e.g. allowing you to
+ // partition a bucket by key prefix.
+ KeyPrefix string
+
// CoreOpts are S3 client options
// passed during initialization.
CoreOpts minio.Options
@@ -78,6 +87,7 @@ func getS3Config(cfg *Config) Config {
}
return Config{
+ KeyPrefix: cfg.KeyPrefix,
CoreOpts: cfg.CoreOpts,
PutChunkSize: cfg.PutChunkSize,
ListSize: cfg.ListSize,
@@ -94,17 +104,22 @@ type S3Storage struct {
// Open opens a new S3Storage instance with given S3 endpoint URL, bucket name and configuration.
func Open(endpoint string, bucket string, cfg *Config) (*S3Storage, error) {
- // Check + set config defaults.
+ ctx := context.Background()
+
+ // Check/set config defaults.
config := getS3Config(cfg)
+ // Validate configured key prefix (if any), handles case of an empty string.
+ if err := s3utils.CheckValidObjectNamePrefix(config.KeyPrefix); err != nil {
+ return nil, err
+ }
+
// Create new S3 client connection to given endpoint.
client, err := minio.NewCore(endpoint, &config.CoreOpts)
if err != nil {
return nil, err
}
- ctx := context.Background()
-
// Check that provided bucket actually exists.
exists, err := client.BucketExists(ctx, bucket)
if err != nil {
@@ -132,7 +147,8 @@ func (st *S3Storage) Clean(ctx context.Context) error {
// ReadBytes: implements Storage.ReadBytes().
func (st *S3Storage) ReadBytes(ctx context.Context, key string) ([]byte, error) {
- // Get stream reader for key
+
+ // Get stream reader for key.
rc, err := st.ReadStream(ctx, key)
if err != nil {
return nil, err
@@ -162,6 +178,9 @@ func (st *S3Storage) ReadStream(ctx context.Context, key string) (io.ReadCloser,
// GetObject wraps minio.Core{}.GetObject() to handle wrapping with our own storage library error types.
func (st *S3Storage) GetObject(ctx context.Context, key string, opts minio.GetObjectOptions) (io.ReadCloser, minio.ObjectInfo, http.Header, error) {
+ // Update given key with prefix.
+ key = st.config.KeyPrefix + key
+
// Query bucket for object data and info.
rc, info, hdr, err := st.client.GetObject(
ctx,
@@ -199,6 +218,10 @@ func (st *S3Storage) WriteStream(ctx context.Context, key string, r io.Reader) (
// PutObject wraps minio.Core{}.PutObject() to handle wrapping with our own storage library error types, and in the case of an io.Reader
// that does not implement ReaderSize{}, it will instead handle upload by using minio.Core{}.NewMultipartUpload() in chunks of PutChunkSize.
func (st *S3Storage) PutObject(ctx context.Context, key string, r io.Reader, opts minio.PutObjectOptions) (minio.UploadInfo, error) {
+
+ // Update given key with prefix.
+ key = st.config.KeyPrefix + key
+
if rs, ok := r.(ReaderSize); ok {
// This reader supports providing us the size of
// the encompassed data, allowing us to perform
@@ -358,6 +381,9 @@ func (st *S3Storage) Stat(ctx context.Context, key string) (*storage.Entry, erro
// StatObject wraps minio.Core{}.StatObject() to handle wrapping with our own storage library error types.
func (st *S3Storage) StatObject(ctx context.Context, key string, opts minio.StatObjectOptions) (minio.ObjectInfo, error) {
+ // Update given key with prefix.
+ key = st.config.KeyPrefix + key
+
// Query bucket for object info.
info, err := st.client.StatObject(
ctx,
@@ -392,6 +418,9 @@ func (st *S3Storage) Remove(ctx context.Context, key string) error {
// RemoveObject wraps minio.Core{}.RemoveObject() to handle wrapping with our own storage library error types.
func (st *S3Storage) RemoveObject(ctx context.Context, key string, opts minio.RemoveObjectOptions) error {
+ // Update given key with prefix.
+ key = st.config.KeyPrefix + key
+
// Remove object from S3 bucket
err := st.client.RemoveObject(
ctx,
@@ -426,6 +455,9 @@ func (st *S3Storage) WalkKeys(ctx context.Context, opts storage.WalkKeysOpts) er
token string
)
+ // Update options prefix to include global prefix.
+ opts.Prefix = st.config.KeyPrefix + opts.Prefix
+
for {
// List objects in bucket starting at marker.
result, err := st.client.ListObjectsV2(
@@ -440,21 +472,42 @@ func (st *S3Storage) WalkKeys(ctx context.Context, opts storage.WalkKeysOpts) er
return err
}
- // Iterate through list result contents.
- for _, obj := range result.Contents {
-
- // Skip filtered obj keys.
- if opts.Filter != nil &&
- opts.Filter(obj.Key) {
- continue
+ // Iterate through list contents.
+ //
+ // Use different loops depending
+ // on if filter func was provided,
+ // to reduce loop operations.
+ if opts.Filter != nil {
+ for _, obj := range result.Contents {
+ // Trim any global prefix from returned object key.
+ key := strings.TrimPrefix(obj.Key, st.config.KeyPrefix)
+
+ // Skip filtered obj keys.
+ if opts.Filter != nil &&
+ opts.Filter(key) {
+ continue
+ }
+
+ // Pass each obj through step func.
+ if err := opts.Step(storage.Entry{
+ Key: key,
+ Size: obj.Size,
+ }); err != nil {
+ return err
+ }
}
-
- // Pass each obj through step func.
- if err := opts.Step(storage.Entry{
- Key: obj.Key,
- Size: obj.Size,
- }); err != nil {
- return err
+ } else {
+ for _, obj := range result.Contents {
+ // Trim any global prefix from returned object key.
+ key := strings.TrimPrefix(obj.Key, st.config.KeyPrefix)
+
+ // Pass each obj through step func.
+ if err := opts.Step(storage.Entry{
+ Key: key,
+ Size: obj.Size,
+ }); err != nil {
+ return err
+ }
}
}