diff options
Diffstat (limited to 'vendor/codeberg.org/gruf')
| -rw-r--r-- | vendor/codeberg.org/gruf/go-fastpath/v2/path.go | 10 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-storage/s3/cache.go | 94 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-storage/s3/cache/cache.go | 44 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-storage/s3/errors.go | 31 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-storage/s3/s3.go | 110 |
5 files changed, 267 insertions, 22 deletions
diff --git a/vendor/codeberg.org/gruf/go-fastpath/v2/path.go b/vendor/codeberg.org/gruf/go-fastpath/v2/path.go index 42cbfd4f7..91ebb5b36 100644 --- a/vendor/codeberg.org/gruf/go-fastpath/v2/path.go +++ b/vendor/codeberg.org/gruf/go-fastpath/v2/path.go @@ -33,12 +33,14 @@ func (b *Builder) Reset() { b.set = false } -// Len returns the number of accumulated bytes in the Builder +// Len returns the number of +// accumulated bytes in the Builder. func (b Builder) Len() int { return len(b.B) } -// Cap returns the capacity of the underlying Builder buffer +// Cap returns the capacity of +// the underlying Builder buffer. func (b Builder) Cap() int { return cap(b.B) } @@ -50,7 +52,7 @@ func (b Builder) Bytes() []byte { // String returns the accumulated path string. func (b Builder) String() string { - return *(*string)(unsafe.Pointer(&b.B)) + return unsafe.String(unsafe.SliceData(b.B), len(b.B)) } // Absolute returns whether current path is absolute (not relative). @@ -139,7 +141,7 @@ func (b *Builder) AppendBytes(path []byte) { return } b.Guarantee(len(path) + 1) - b.append(*(*string)(unsafe.Pointer(&b))) + b.append(unsafe.String(unsafe.SliceData(path), len(path))) } // Append adds and cleans the supplied path string to the diff --git a/vendor/codeberg.org/gruf/go-storage/s3/cache.go b/vendor/codeberg.org/gruf/go-storage/s3/cache.go new file mode 100644 index 000000000..37ff9f321 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-storage/s3/cache.go @@ -0,0 +1,94 @@ +package s3 + +import ( + "time" + + "github.com/minio/minio-go/v7" +) + +// EntryCache should provide a cache of +// S3 object information for speeding up +// Get(), Stat() and Remove() operations. +type EntryCache interface { + + // Get should return 'found' = true when information is cached, + // with 'info' optionally being nilable to allow caching errors. + Get(key string) (info *CachedObjectInfo, found bool) + + // Put should cache the given information under key, with + // nil CachedObjectInfo{} meaning a 'not found' response. + Put(key string, info *CachedObjectInfo) +} + +// CachedObjectInfo provides the minimum cacheable +// set of S3 object information that may be returned +// from a Get() or Stat() operation, or on Put(). +type CachedObjectInfo struct { + Key string + ETag string + Size int64 + ContentType string + LastModified time.Time + VersionID string +} + +// ToObjectInfo converts CachedObjectInfo to returnable minio.ObjectInfo. +func (info *CachedObjectInfo) ToObjectInfo() minio.ObjectInfo { + return minio.ObjectInfo{ + Key: info.Key, + ETag: info.ETag, + Size: info.Size, + ContentType: info.ContentType, + LastModified: info.LastModified, + VersionID: info.VersionID, + } +} + +// cacheGet wraps cache.Get() operations to check if cache is nil. +func cacheGet(cache EntryCache, key string) (*CachedObjectInfo, bool) { + if cache != nil { + return cache.Get(key) + } + return nil, false +} + +// objectToCachedObjectInfo converts minio.ObjectInfo to CachedObjectInfo for caching. +func objectToCachedObjectInfo(info minio.ObjectInfo) *CachedObjectInfo { + return &CachedObjectInfo{ + Key: info.Key, + ETag: info.ETag, + Size: info.Size, + ContentType: info.ContentType, + LastModified: info.LastModified, + VersionID: info.VersionID, + } +} + +// cacheObject wraps cache.Put() operations to check if cache is nil. +func cacheObject(cache EntryCache, key string, info minio.ObjectInfo) { + if cache != nil { + cache.Put(key, objectToCachedObjectInfo(info)) + } +} + +// cacheUpload wraps cache.Put() operations to check if cache is nil, uses ContentType from given opts. +func cacheUpload(cache EntryCache, key string, info minio.UploadInfo, opts minio.PutObjectOptions) { + if cache != nil { + cache.Put(key, &CachedObjectInfo{ + Key: info.Key, + ETag: info.ETag, + Size: info.Size, + ContentType: opts.ContentType, + LastModified: info.LastModified, + VersionID: info.VersionID, + }) + } +} + +// cacheNotFound wraps cache.Put() to check if cache is +// nil, storing a nil entry (i.e. not found) in cache. +func cacheNotFound(cache EntryCache, key string) { + if cache != nil { + cache.Put(key, nil) + } +} diff --git a/vendor/codeberg.org/gruf/go-storage/s3/cache/cache.go b/vendor/codeberg.org/gruf/go-storage/s3/cache/cache.go new file mode 100644 index 000000000..ce8aeb63b --- /dev/null +++ b/vendor/codeberg.org/gruf/go-storage/s3/cache/cache.go @@ -0,0 +1,44 @@ +package cache + +import ( + "time" + + "codeberg.org/gruf/go-cache/v3/simple" + "codeberg.org/gruf/go-cache/v3/ttl" + "codeberg.org/gruf/go-storage/s3" +) + +// check interface conformity. +var _ s3.EntryCache = &EntryCache{} +var _ s3.EntryCache = &EntryTTLCache{} + +// EntryCache provides a basic implementation +// of an s3.EntryCache{}. Under the hood it is +// a mutex locked ordered map with max capacity. +type EntryCache struct { + simple.Cache[string, *s3.CachedObjectInfo] +} + +func New(len, cap int) *EntryCache { + var cache EntryCache + cache.Init(len, cap) + return &cache +} + +func (c *EntryCache) Put(key string, info *s3.CachedObjectInfo) { + c.Cache.Set(key, info) +} + +type EntryTTLCache struct { + ttl.Cache[string, *s3.CachedObjectInfo] +} + +func NewTTL(len, cap int, ttl time.Duration) *EntryTTLCache { + var cache EntryTTLCache + cache.Init(len, cap, ttl) + return &cache +} + +func (c *EntryTTLCache) Put(key string, info *s3.CachedObjectInfo) { + c.Cache.Set(key, info) +} diff --git a/vendor/codeberg.org/gruf/go-storage/s3/errors.go b/vendor/codeberg.org/gruf/go-storage/s3/errors.go index 1f4404469..0b2d3be62 100644 --- a/vendor/codeberg.org/gruf/go-storage/s3/errors.go +++ b/vendor/codeberg.org/gruf/go-storage/s3/errors.go @@ -3,9 +3,35 @@ package s3 import ( "strings" + "codeberg.org/gruf/go-storage" + "codeberg.org/gruf/go-storage/internal" "github.com/minio/minio-go/v7" ) +// CachedErrorResponse can be returned +// when an S3 is configured with caching, +// and the basic details of an error +// response have been stored in the cache. +type CachedErrorResponse struct { + Code string + Key string +} + +func (err *CachedErrorResponse) Error() string { + return "cached '" + err.Code + "' response for key:" + err.Key +} + +func (err *CachedErrorResponse) Is(other error) bool { + switch other { + case storage.ErrNotFound: + return err.Code == "NoSuchKey" + case storage.ErrAlreadyExists: + return err.Code == "Conflict" + default: + return false + } +} + func isNotFoundError(err error) bool { errRsp, ok := err.(minio.ErrorResponse) return ok && errRsp.Code == "NoSuchKey" @@ -19,3 +45,8 @@ func isConflictError(err error) bool { func isObjectNameError(err error) bool { return strings.HasPrefix(err.Error(), "Object name ") } + +func cachedNotFoundError(key string) error { + err := CachedErrorResponse{Code: "NoSuchKey", Key: key} + return internal.WrapErr(&err, storage.ErrNotFound) +} diff --git a/vendor/codeberg.org/gruf/go-storage/s3/s3.go b/vendor/codeberg.org/gruf/go-storage/s3/s3.go index c53560161..bf7eff02f 100644 --- a/vendor/codeberg.org/gruf/go-storage/s3/s3.go +++ b/vendor/codeberg.org/gruf/go-storage/s3/s3.go @@ -64,6 +64,11 @@ type Config struct { // to include in each list request, made // during calls to .WalkKeys(). ListSize int + + // Cache is an optional type that may be + // provided to store simple entry information + // to speed up Get() / Stat() operations. + Cache EntryCache } // getS3Config returns valid (and owned!) Config for given ptr. @@ -91,6 +96,7 @@ func getS3Config(cfg *Config) Config { CoreOpts: cfg.CoreOpts, PutChunkSize: cfg.PutChunkSize, ListSize: cfg.ListSize, + Cache: cfg.Cache, } } @@ -181,6 +187,15 @@ func (st *S3Storage) GetObject(ctx context.Context, key string, opts minio.GetOb // Update given key with prefix. key = st.config.KeyPrefix + key + // Check cache for possible known response. + cinfo, ok := cacheGet(st.config.Cache, key) + switch { + case !ok: + break + case cinfo == nil: + return nil, minio.ObjectInfo{}, nil, cachedNotFoundError(key) + } + // Query bucket for object data and info. rc, info, hdr, err := st.client.GetObject( ctx, @@ -191,6 +206,9 @@ func (st *S3Storage) GetObject(ctx context.Context, key string, opts minio.GetOb if err != nil { if isNotFoundError(err) { + // On 'not found', cache this resp. + cacheNotFound(st.config.Cache, key) + // Wrap not found errors as our not found type. err = internal.WrapErr(err, storage.ErrNotFound) } else if isObjectNameError(err) { @@ -198,6 +216,10 @@ func (st *S3Storage) GetObject(ctx context.Context, key string, opts minio.GetOb err = internal.WrapErr(err, storage.ErrInvalidKey) } + } else { + + // On success, cache fetched obj info. + cacheObject(st.config.Cache, key, info) } return rc, info, hdr, err @@ -246,6 +268,10 @@ func (st *S3Storage) PutObject(ctx context.Context, key string, r io.Reader, opt err = internal.WrapErr(err, storage.ErrInvalidKey) } + } else { + + // On success, cache uploaded object info. + cacheUpload(st.config.Cache, key, info, opts) } return info, err @@ -360,6 +386,10 @@ loop: // Set correct size. info.Size = total + + // On success, cache uploaded object info. + cacheUpload(st.config.Cache, key, info, opts) + return info, nil } @@ -384,6 +414,17 @@ func (st *S3Storage) StatObject(ctx context.Context, key string, opts minio.Stat // Update given key with prefix. key = st.config.KeyPrefix + key + // Check cache for possible known response. + cinfo, ok := cacheGet(st.config.Cache, key) + switch { + case !ok: + break + case cinfo == nil: + return minio.ObjectInfo{}, cachedNotFoundError(key) + default: + return cinfo.ToObjectInfo(), nil + } + // Query bucket for object info. info, err := st.client.StatObject( ctx, @@ -394,6 +435,9 @@ func (st *S3Storage) StatObject(ctx context.Context, key string, opts minio.Stat if err != nil { if isNotFoundError(err) { + // On 'not found', cache this resp. + cacheNotFound(st.config.Cache, key) + // Wrap not found errors as our not found type. err = internal.WrapErr(err, storage.ErrNotFound) } else if isObjectNameError(err) { @@ -401,6 +445,10 @@ func (st *S3Storage) StatObject(ctx context.Context, key string, opts minio.Stat err = internal.WrapErr(err, storage.ErrInvalidKey) } + } else { + + // On success, cache fetchedd obj info. + cacheObject(st.config.Cache, key, info) } return info, err @@ -421,6 +469,15 @@ func (st *S3Storage) RemoveObject(ctx context.Context, key string, opts minio.Re // Update given key with prefix. key = st.config.KeyPrefix + key + // Check cache for possible known response. + cinfo, ok := cacheGet(st.config.Cache, key) + switch { + case !ok: + break + case cinfo == nil: + return cachedNotFoundError(key) + } + // Remove object from S3 bucket err := st.client.RemoveObject( ctx, @@ -432,6 +489,9 @@ func (st *S3Storage) RemoveObject(ctx context.Context, key string, opts minio.Re if err != nil { if isNotFoundError(err) { + // On 'not found', cache this resp. + cacheNotFound(st.config.Cache, key) + // Wrap not found errors as our not found type. err = internal.WrapErr(err, storage.ErrNotFound) } else if isObjectNameError(err) { @@ -439,6 +499,10 @@ func (st *S3Storage) RemoveObject(ctx context.Context, key string, opts minio.Re err = internal.WrapErr(err, storage.ErrInvalidKey) } + } else { + + // Removed! also cache as 'not found'. + cacheNotFound(st.config.Cache, key) } return err @@ -450,15 +514,32 @@ func (st *S3Storage) WalkKeys(ctx context.Context, opts storage.WalkKeysOpts) er panic("nil step fn") } - var ( - prev string - token string - ) + // Step function called on each listed object. + var step func(string, minio.ObjectInfo) error + + if st.config.Cache == nil { + // No cache, simply pass straight through to opts.Step(). + step = func(key string, info minio.ObjectInfo) error { + return opts.Step(storage.Entry{ + Key: key, + Size: info.Size, + }) + } + } else { + // Cache configured, store before passing to opts.Step(). + step = func(key string, info minio.ObjectInfo) error { + st.config.Cache.Put(key, objectToCachedObjectInfo(info)) + return opts.Step(storage.Entry{ + Key: key, + Size: info.Size, + }) + } + } // Update options prefix to include global prefix. opts.Prefix = st.config.KeyPrefix + opts.Prefix - for { + for prev, token := "", ""; ; { // List objects in bucket starting at marker. result, err := st.client.ListObjectsV2( st.bucket, @@ -482,17 +563,13 @@ func (st *S3Storage) WalkKeys(ctx context.Context, opts storage.WalkKeysOpts) er // Trim any global prefix from returned object key. key := strings.TrimPrefix(obj.Key, st.config.KeyPrefix) - // Skip filtered obj keys. - if opts.Filter != nil && - opts.Filter(key) { + // Skip filtered keys. + if opts.Filter(key) { continue } - // Pass each obj through step func. - if err := opts.Step(storage.Entry{ - Key: key, - Size: obj.Size, - }); err != nil { + // Pass each object to step funcion. + if err := step(key, obj); err != nil { return err } } @@ -501,11 +578,8 @@ func (st *S3Storage) WalkKeys(ctx context.Context, opts storage.WalkKeysOpts) er // Trim any global prefix from returned object key. key := strings.TrimPrefix(obj.Key, st.config.KeyPrefix) - // Pass each obj through step func. - if err := opts.Step(storage.Entry{ - Key: key, - Size: obj.Size, - }); err != nil { + // Pass each object to step funcion. + if err := step(key, obj); err != nil { return err } } |
