[performance] update storage backend and make use of seek syscall when available (#2924)

* update to use go-storage/ instead of go-store/v2/storage/

* pull in latest version from codeberg

* remove test output 😇

* add code comments

* set the exclusive bit when creating new files in disk config

* bump to actual release version

* bump to v0.1.1 (tis a simple no-logic change)

* update readme

* only use a temporary read seeker when decoding video if required (should only be S3 now)

* use fastcopy library to use memory pooled buffers when calling TempFileSeeker()

* update to use seek call in serveFileRange()
This commit is contained in:
kim
2024-05-22 09:46:24 +00:00
committed by GitHub
parent 06b1e0173b
commit 3d3e99ae52
91 changed files with 1610 additions and 12737 deletions

5
vendor/codeberg.org/gruf/go-storage/README.md generated vendored Normal file
View File

@@ -0,0 +1,5 @@
# go-storage
A simple library providing various storage implementations with a simple read-write-stat interface.
Supports: on-disk, memory, S3.

467
vendor/codeberg.org/gruf/go-storage/disk/disk.go generated vendored Normal file
View File

@@ -0,0 +1,467 @@
package disk
import (
"bytes"
"context"
"errors"
"io"
"io/fs"
"os"
"path"
"strings"
"syscall"
"codeberg.org/gruf/go-fastcopy"
"codeberg.org/gruf/go-fastpath/v2"
"codeberg.org/gruf/go-storage"
"codeberg.org/gruf/go-storage/internal"
)
// ensure DiskStorage conforms to storage.Storage.
var _ storage.Storage = (*DiskStorage)(nil)
// DefaultConfig returns the default DiskStorage configuration.
func DefaultConfig() Config {
return defaultConfig
}
// immutable default configuration.
var defaultConfig = Config{
OpenRead: OpenArgs{syscall.O_RDONLY, 0o644},
OpenWrite: OpenArgs{syscall.O_CREAT | syscall.O_WRONLY, 0o644},
MkdirPerms: 0o755,
WriteBufSize: 4096,
}
// OpenArgs defines args passed
// in a syscall.Open() operation.
type OpenArgs struct {
Flags int
Perms uint32
}
// Config defines options to be
// used when opening a DiskStorage.
type Config struct {
// OpenRead are the arguments passed
// to syscall.Open() when opening a
// file for read operations.
OpenRead OpenArgs
// OpenWrite are the arguments passed
// to syscall.Open() when opening a
// file for write operations.
OpenWrite OpenArgs
// MkdirPerms are the permissions used
// when creating necessary sub-dirs in
// a storage key with slashes.
MkdirPerms uint32
// WriteBufSize is the buffer size
// to use when writing file streams.
WriteBufSize int
}
// getDiskConfig returns valid (and owned!) Config for given ptr.
func getDiskConfig(cfg *Config) Config {
if cfg == nil {
// use defaults.
return defaultConfig
}
// Ensure non-zero syscall args.
if cfg.OpenRead.Flags == 0 {
cfg.OpenRead.Flags = defaultConfig.OpenRead.Flags
}
if cfg.OpenRead.Perms == 0 {
cfg.OpenRead.Perms = defaultConfig.OpenRead.Perms
}
if cfg.OpenWrite.Flags == 0 {
cfg.OpenWrite.Flags = defaultConfig.OpenWrite.Flags
}
if cfg.OpenWrite.Perms == 0 {
cfg.OpenWrite.Perms = defaultConfig.OpenWrite.Perms
}
if cfg.MkdirPerms == 0 {
cfg.MkdirPerms = defaultConfig.MkdirPerms
}
// Ensure valid write buf.
if cfg.WriteBufSize <= 0 {
cfg.WriteBufSize = defaultConfig.WriteBufSize
}
return Config{
OpenRead: cfg.OpenRead,
OpenWrite: cfg.OpenWrite,
MkdirPerms: cfg.MkdirPerms,
WriteBufSize: cfg.WriteBufSize,
}
}
// DiskStorage is a Storage implementation
// that stores directly to a filesystem.
type DiskStorage struct {
path string // path is the root path of this store
pool fastcopy.CopyPool // pool is the prepared io copier with buffer pool
cfg Config // cfg is the supplied configuration for this store
}
// Open opens a DiskStorage instance for given folder path and configuration.
func Open(path string, cfg *Config) (*DiskStorage, error) {
// Check + set config defaults.
config := getDiskConfig(cfg)
// Clean provided storage path, ensure
// final '/' to help with path trimming.
pb := internal.GetPathBuilder()
path = pb.Clean(path) + "/"
internal.PutPathBuilder(pb)
// Ensure directories up-to path exist.
perms := fs.FileMode(config.MkdirPerms)
err := os.MkdirAll(path, perms)
if err != nil {
return nil, err
}
// Prepare DiskStorage.
st := &DiskStorage{
path: path,
cfg: config,
}
// Set fastcopy pool buffer size.
st.pool.Buffer(config.WriteBufSize)
return st, nil
}
// Clean: implements Storage.Clean().
func (st *DiskStorage) Clean(ctx context.Context) error {
// Check context still valid.
if err := ctx.Err(); err != nil {
return err
}
// Clean unused directories.
return cleanDirs(st.path, OpenArgs{
Flags: syscall.O_RDONLY,
})
}
// ReadBytes: implements Storage.ReadBytes().
func (st *DiskStorage) ReadBytes(ctx context.Context, key string) ([]byte, error) {
// Get stream reader for key
rc, err := st.ReadStream(ctx, key)
if err != nil {
return nil, err
}
// Read all data to memory.
data, err := io.ReadAll(rc)
if err != nil {
_ = rc.Close()
return nil, err
}
// Close storage stream reader.
if err := rc.Close(); err != nil {
return nil, err
}
return data, nil
}
// ReadStream: implements Storage.ReadStream().
func (st *DiskStorage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) {
// Generate file path for key.
kpath, err := st.Filepath(key)
if err != nil {
return nil, err
}
// Check context still valid.
if err := ctx.Err(); err != nil {
return nil, err
}
// Attempt to open file with read args.
file, err := open(kpath, st.cfg.OpenRead)
if err != nil {
if err == syscall.ENOENT {
// Translate not-found errors and wrap with key.
err = internal.ErrWithKey(storage.ErrNotFound, key)
}
return nil, err
}
return file, nil
}
// WriteBytes: implements Storage.WriteBytes().
func (st *DiskStorage) WriteBytes(ctx context.Context, key string, value []byte) (int, error) {
n, err := st.WriteStream(ctx, key, bytes.NewReader(value))
return int(n), err
}
// WriteStream: implements Storage.WriteStream().
func (st *DiskStorage) WriteStream(ctx context.Context, key string, stream io.Reader) (int64, error) {
// Acquire path builder buffer.
pb := internal.GetPathBuilder()
// Generate the file path for given key.
kpath, subdir, err := st.filepath(pb, key)
if err != nil {
return 0, err
}
// Done with path buffer.
internal.PutPathBuilder(pb)
// Check context still valid.
if err := ctx.Err(); err != nil {
return 0, err
}
if subdir {
// Get dir of key path.
dir := path.Dir(kpath)
// Note that subdir will only be set if
// the transformed key (without base path)
// contains any slashes. This is not a
// definitive check, but it allows us to
// skip a syscall if mkdirall not needed!
perms := fs.FileMode(st.cfg.MkdirPerms)
err = os.MkdirAll(dir, perms)
if err != nil {
return 0, err
}
}
// Attempt to open file with write args.
file, err := open(kpath, st.cfg.OpenWrite)
if err != nil {
if st.cfg.OpenWrite.Flags&syscall.O_EXCL != 0 &&
err == syscall.EEXIST {
// Translate already exists errors and wrap with key.
err = internal.ErrWithKey(storage.ErrAlreadyExists, key)
}
return 0, err
}
// Copy provided stream to file interface.
n, err := st.pool.Copy(file, stream)
if err != nil {
_ = file.Close()
return n, err
}
// Finally, close file.
return n, file.Close()
}
// Stat implements Storage.Stat().
func (st *DiskStorage) Stat(ctx context.Context, key string) (*storage.Entry, error) {
// Generate file path for key.
kpath, err := st.Filepath(key)
if err != nil {
return nil, err
}
// Check context still valid.
if err := ctx.Err(); err != nil {
return nil, err
}
// Stat file on disk.
stat, err := stat(kpath)
if stat == nil {
return nil, err
}
return &storage.Entry{
Key: key,
Size: stat.Size,
}, nil
}
// Remove implements Storage.Remove().
func (st *DiskStorage) Remove(ctx context.Context, key string) error {
// Generate file path for key.
kpath, err := st.Filepath(key)
if err != nil {
return err
}
// Check context still valid.
if err := ctx.Err(); err != nil {
return err
}
// Stat file on disk.
stat, err := stat(kpath)
if err != nil {
return err
}
// Not-found (or handled
// as) error situations.
if stat == nil {
return internal.ErrWithKey(storage.ErrNotFound, key)
} else if stat.Mode&syscall.S_IFREG == 0 {
err := errors.New("storage/disk: not a regular file")
return internal.ErrWithKey(err, key)
}
// Remove at path (we know this is file).
if err := unlink(kpath); err != nil {
if err == syscall.ENOENT {
// Translate not-found errors and wrap with key.
err = internal.ErrWithKey(storage.ErrNotFound, key)
}
return err
}
return nil
}
// WalkKeys implements Storage.WalkKeys().
func (st *DiskStorage) WalkKeys(ctx context.Context, opts storage.WalkKeysOpts) error {
if opts.Step == nil {
panic("nil step fn")
}
// Check context still valid.
if err := ctx.Err(); err != nil {
return err
}
// Acquire path builder for walk.
pb := internal.GetPathBuilder()
defer internal.PutPathBuilder(pb)
// Dir to walk.
dir := st.path
if opts.Prefix != "" {
// Convert key prefix to one of our storage filepaths.
pathprefix, subdir, err := st.filepath(pb, opts.Prefix)
if err != nil {
return internal.ErrWithMsg(err, "prefix error")
}
if subdir {
// Note that subdir will only be set if
// the transformed key (without base path)
// contains any slashes. This is not a
// definitive check, but it allows us to
// update the directory we walk in case
// it might narrow search parameters!
dir = path.Dir(pathprefix)
}
// Set updated storage
// path prefix in opts.
opts.Prefix = pathprefix
}
// Only need to open dirs as read-only.
args := OpenArgs{Flags: syscall.O_RDONLY}
return walkDir(pb, dir, args, func(kpath string, fsentry fs.DirEntry) error {
if !fsentry.Type().IsRegular() {
// Ignore anything but
// regular file types.
return nil
}
// Get full item path (without root).
kpath = pb.Join(kpath, fsentry.Name())
// Perform a fast filter check against storage path prefix (if set).
if opts.Prefix != "" && !strings.HasPrefix(kpath, opts.Prefix) {
return nil // ignore
}
// Storage key without base.
key := kpath[len(st.path):]
// Ignore filtered keys.
if opts.Filter != nil &&
!opts.Filter(key) {
return nil // ignore
}
// Load file info. This should already
// be loaded due to the underlying call
// to os.File{}.ReadDir() populating them.
info, err := fsentry.Info()
if err != nil {
return err
}
// Perform provided walk function
return opts.Step(storage.Entry{
Key: key,
Size: info.Size(),
})
})
}
// Filepath checks and returns a formatted Filepath for given key.
func (st *DiskStorage) Filepath(key string) (path string, err error) {
pb := internal.GetPathBuilder()
path, _, err = st.filepath(pb, key)
internal.PutPathBuilder(pb)
return
}
// filepath performs the "meat" of Filepath(), returning also if path *may* be a subdir of base.
func (st *DiskStorage) filepath(pb *fastpath.Builder, key string) (path string, subdir bool, err error) {
// Fast check for whether this may be a
// sub-directory. This is not a definitive
// check, it's only for a fastpath check.
subdir = strings.ContainsRune(key, '/')
// Build from base.
pb.Append(st.path)
pb.Append(key)
// Take COPY of bytes.
path = string(pb.B)
// Check for dir traversal outside base.
if isDirTraversal(st.path, path) {
err = internal.ErrWithKey(storage.ErrInvalidKey, key)
}
return
}
// isDirTraversal will check if rootPlusPath is a dir traversal outside of root,
// assuming that both are cleaned and that rootPlusPath is path.Join(root, somePath).
func isDirTraversal(root, rootPlusPath string) bool {
switch {
// Root is $PWD, check for traversal out of
case root == ".":
return strings.HasPrefix(rootPlusPath, "../")
// The path MUST be prefixed by root
case !strings.HasPrefix(rootPlusPath, root):
return true
// In all other cases, check not equal
default:
return len(root) == len(rootPlusPath)
}
}

View File

@@ -1,24 +1,14 @@
package storage
package disk
import (
"errors"
"fmt"
"io/fs"
"os"
"syscall"
"codeberg.org/gruf/go-fastpath/v2"
"codeberg.org/gruf/go-store/v2/util"
)
const (
// default file permission bits
defaultDirPerms = 0o755
defaultFilePerms = 0o644
// default file open flags
defaultFileROFlags = syscall.O_RDONLY
defaultFileRWFlags = syscall.O_CREAT | syscall.O_RDWR
defaultFileLockFlags = syscall.O_RDONLY | syscall.O_CREAT
"codeberg.org/gruf/go-storage/internal"
)
// NOTE:
@@ -26,9 +16,9 @@ const (
// not necessarily for e.g. initial setup (OpenFile)
// walkDir traverses the dir tree of the supplied path, performing the supplied walkFn on each entry
func walkDir(pb *fastpath.Builder, path string, walkFn func(string, fs.DirEntry) error) error {
// Read directory entries
entries, err := readDir(path)
func walkDir(pb *fastpath.Builder, path string, args OpenArgs, walkFn func(string, fs.DirEntry) error) error {
// Read directory entries at path.
entries, err := readDir(path, args)
if err != nil {
return err
}
@@ -85,7 +75,7 @@ outer:
path = pb.Join(path, entry.Name())
// Read next directory entries
next, err := readDir(path)
next, err := readDir(path, args)
if err != nil {
return err
}
@@ -102,16 +92,17 @@ outer:
}
// cleanDirs traverses the dir tree of the supplied path, removing any folders with zero children
func cleanDirs(path string) error {
pb := util.GetPathBuilder()
defer util.PutPathBuilder(pb)
return cleanDir(pb, path, true)
func cleanDirs(path string, args OpenArgs) error {
pb := internal.GetPathBuilder()
err := cleanDir(pb, path, args, true)
internal.PutPathBuilder(pb)
return err
}
// cleanDir performs the actual dir cleaning logic for the above top-level version.
func cleanDir(pb *fastpath.Builder, path string, top bool) error {
// Get dir entries at path.
entries, err := readDir(path)
func cleanDir(pb *fastpath.Builder, path string, args OpenArgs, top bool) error {
// Get directory entries at path.
entries, err := readDir(path, args)
if err != nil {
return err
}
@@ -121,30 +112,36 @@ func cleanDir(pb *fastpath.Builder, path string, top bool) error {
return rmdir(path)
}
var errs []error
// Iterate all directory entries.
for _, entry := range entries {
if entry.IsDir() {
// Calculate directory path.
dirPath := pb.Join(path, entry.Name())
dir := pb.Join(path, entry.Name())
// Recursively clean sub-directory entries.
if err := cleanDir(pb, dirPath, false); err != nil {
fmt.Fprintf(os.Stderr, "[go-store/storage] error cleaning %s: %v", dirPath, err)
// Recursively clean sub-directory entries, adding errs.
if err := cleanDir(pb, dir, args, false); err != nil {
err = fmt.Errorf("error(s) cleaning subdir %s: %w", dir, err)
errs = append(errs, err)
}
}
}
return nil
// Return combined errors.
return errors.Join(errs...)
}
// readDir will open file at path, read the unsorted list of entries, then close.
func readDir(path string) ([]fs.DirEntry, error) {
// Open file at path
file, err := open(path, defaultFileROFlags)
func readDir(path string, args OpenArgs) ([]fs.DirEntry, error) {
// Open directory at path.
file, err := open(path, args)
if err != nil {
return nil, err
}
// Read directory entries
// Read ALL directory entries.
entries, err := file.ReadDir(-1)
// Done with file
@@ -153,11 +150,11 @@ func readDir(path string) ([]fs.DirEntry, error) {
return entries, err
}
// open will open a file at the given path with flags and default file perms.
func open(path string, flags int) (*os.File, error) {
// open is a simple wrapper around syscall.Open().
func open(path string, args OpenArgs) (*os.File, error) {
var fd int
err := retryOnEINTR(func() (err error) {
fd, err = syscall.Open(path, flags, defaultFilePerms)
fd, err = syscall.Open(path, args.Flags, args.Perms)
return
})
if err != nil {
@@ -166,8 +163,8 @@ func open(path string, flags int) (*os.File, error) {
return os.NewFile(uintptr(fd), path), nil
}
// stat checks for a file on disk.
func stat(path string) (bool, error) {
// stat is a simple wrapper around syscall.Stat().
func stat(path string) (*syscall.Stat_t, error) {
var stat syscall.Stat_t
err := retryOnEINTR(func() error {
return syscall.Stat(path, &stat)
@@ -177,26 +174,27 @@ func stat(path string) (bool, error) {
// not-found is no error
err = nil
}
return false, err
return nil, err
}
return true, nil
return &stat, nil
}
// unlink removes a file (not dir!) on disk.
// unlink is a simple wrapper around syscall.Unlink().
func unlink(path string) error {
return retryOnEINTR(func() error {
return syscall.Unlink(path)
})
}
// rmdir removes a dir (not file!) on disk.
// rmdir is a simple wrapper around syscall.Rmdir().
func rmdir(path string) error {
return retryOnEINTR(func() error {
return syscall.Rmdir(path)
})
}
// retryOnEINTR is a low-level filesystem function for retrying syscalls on O_EINTR received.
// retryOnEINTR is a low-level filesystem function
// for retrying syscalls on O_EINTR received.
func retryOnEINTR(do func() error) error {
for {
err := do()

16
vendor/codeberg.org/gruf/go-storage/errors.go generated vendored Normal file
View File

@@ -0,0 +1,16 @@
package storage
import (
"errors"
)
var (
// ErrNotFound is the error returned when a key cannot be found in storage
ErrNotFound = errors.New("storage: key not found")
// ErrAlreadyExist is the error returned when a key already exists in storage
ErrAlreadyExists = errors.New("storage: key already exists")
// ErrInvalidkey is the error returned when an invalid key is passed to storage
ErrInvalidKey = errors.New("storage: invalid key")
)

56
vendor/codeberg.org/gruf/go-storage/internal/errors.go generated vendored Normal file
View File

@@ -0,0 +1,56 @@
package internal
func ErrWithKey(err error, key string) error {
return &errorWithKey{key: key, err: err}
}
type errorWithKey struct {
key string
err error
}
func (err *errorWithKey) Error() string {
return err.err.Error() + ": " + err.key
}
func (err *errorWithKey) Unwrap() error {
return err.err
}
func ErrWithMsg(err error, msg string) error {
return &errorWithMsg{msg: msg, err: err}
}
type errorWithMsg struct {
msg string
err error
}
func (err *errorWithMsg) Error() string {
return err.msg + ": " + err.err.Error()
}
func (err *errorWithMsg) Unwrap() error {
return err.err
}
func WrapErr(inner, outer error) error {
return &wrappedError{inner: inner, outer: outer}
}
type wrappedError struct {
inner error
outer error
}
func (err *wrappedError) Is(other error) bool {
return err.inner == other || err.outer == other
}
func (err *wrappedError) Error() string {
return err.inner.Error() + ": " + err.outer.Error()
}
func (err *wrappedError) Unwrap() []error {
return []error{err.inner, err.outer}
}

24
vendor/codeberg.org/gruf/go-storage/internal/path.go generated vendored Normal file
View File

@@ -0,0 +1,24 @@
package internal
import (
"sync"
"codeberg.org/gruf/go-fastpath/v2"
)
var pathBuilderPool sync.Pool
func GetPathBuilder() *fastpath.Builder {
v := pathBuilderPool.Get()
if v == nil {
pb := new(fastpath.Builder)
pb.B = make([]byte, 0, 512)
v = pb
}
return v.(*fastpath.Builder)
}
func PutPathBuilder(pb *fastpath.Builder) {
pb.Reset()
pathBuilderPool.Put(pb)
}

253
vendor/codeberg.org/gruf/go-storage/memory/memory.go generated vendored Normal file
View File

@@ -0,0 +1,253 @@
package memory
import (
"bytes"
"context"
"io"
"strings"
"sync"
"codeberg.org/gruf/go-iotools"
"codeberg.org/gruf/go-storage"
"codeberg.org/gruf/go-storage/internal"
)
// ensure MemoryStorage conforms to storage.Storage.
var _ storage.Storage = (*MemoryStorage)(nil)
// MemoryStorage is a storage implementation that simply stores key-value
// pairs in a Go map in-memory. The map is protected by a mutex.
type MemoryStorage struct {
ow bool // overwrites
fs map[string][]byte
mu sync.Mutex
}
// Open opens a new MemoryStorage instance with internal map starting size.
func Open(size int, overwrites bool) *MemoryStorage {
return &MemoryStorage{
ow: overwrites,
fs: make(map[string][]byte, size),
}
}
// Clean: implements Storage.Clean().
func (st *MemoryStorage) Clean(ctx context.Context) error {
// Check context still valid
if err := ctx.Err(); err != nil {
return err
}
// Lock map.
st.mu.Lock()
// Resize map to only necessary size in-mem.
fs := make(map[string][]byte, len(st.fs))
for key, val := range st.fs {
fs[key] = val
}
st.fs = fs
// Done with lock.
st.mu.Unlock()
return nil
}
// ReadBytes: implements Storage.ReadBytes().
func (st *MemoryStorage) ReadBytes(ctx context.Context, key string) ([]byte, error) {
// Check context still valid.
if err := ctx.Err(); err != nil {
return nil, err
}
// Lock map.
st.mu.Lock()
// Check key in store.
b, ok := st.fs[key]
if ok {
// COPY bytes.
b = copyb(b)
}
// Done with lock.
st.mu.Unlock()
if !ok {
return nil, internal.ErrWithKey(storage.ErrNotFound, key)
}
return b, nil
}
// ReadStream: implements Storage.ReadStream().
func (st *MemoryStorage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) {
// Read value data from store.
b, err := st.ReadBytes(ctx, key)
if err != nil {
return nil, err
}
// Wrap in readcloser.
r := bytes.NewReader(b)
return iotools.NopReadCloser(r), nil
}
// WriteBytes: implements Storage.WriteBytes().
func (st *MemoryStorage) WriteBytes(ctx context.Context, key string, b []byte) (int, error) {
// Check context still valid
if err := ctx.Err(); err != nil {
return 0, err
}
// Lock map.
st.mu.Lock()
// Check key in store.
_, ok := st.fs[key]
if ok && !st.ow {
// Done with lock.
st.mu.Unlock()
// Overwrites are disabled, return existing key error.
return 0, internal.ErrWithKey(storage.ErrAlreadyExists, key)
}
// Write copy to store.
st.fs[key] = copyb(b)
// Done with lock.
st.mu.Unlock()
return len(b), nil
}
// WriteStream: implements Storage.WriteStream().
func (st *MemoryStorage) WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) {
// Read all from reader.
b, err := io.ReadAll(r)
if err != nil {
return 0, err
}
// Write in-memory data to store.
n, err := st.WriteBytes(ctx, key, b)
return int64(n), err
}
// Stat: implements Storage.Stat().
func (st *MemoryStorage) Stat(ctx context.Context, key string) (*storage.Entry, error) {
// Check context still valid
if err := ctx.Err(); err != nil {
return nil, err
}
// Lock map.
st.mu.Lock()
// Check key in store.
b, ok := st.fs[key]
// Get entry size.
sz := int64(len(b))
// Done with lock.
st.mu.Unlock()
if !ok {
return nil, nil
}
return &storage.Entry{
Key: key,
Size: sz,
}, nil
}
// Remove: implements Storage.Remove().
func (st *MemoryStorage) Remove(ctx context.Context, key string) error {
// Check context still valid
if err := ctx.Err(); err != nil {
return err
}
// Lock map.
st.mu.Lock()
// Check key in store.
_, ok := st.fs[key]
if ok {
// Delete store key.
delete(st.fs, key)
}
// Done with lock.
st.mu.Unlock()
if !ok {
return internal.ErrWithKey(storage.ErrNotFound, key)
}
return nil
}
// WalkKeys: implements Storage.WalkKeys().
func (st *MemoryStorage) WalkKeys(ctx context.Context, opts storage.WalkKeysOpts) error {
if opts.Step == nil {
panic("nil step fn")
}
// Check context still valid.
if err := ctx.Err(); err != nil {
return err
}
var err error
// Lock map.
st.mu.Lock()
// Ensure unlocked.
defer st.mu.Unlock()
// Range all key-vals in hash map.
for key, val := range st.fs {
// Check for filtered prefix.
if opts.Prefix != "" &&
!strings.HasPrefix(key, opts.Prefix) {
continue // ignore
}
// Check for filtered key.
if opts.Filter != nil &&
!opts.Filter(key) {
continue // ignore
}
// Pass to provided step func.
err = opts.Step(storage.Entry{
Key: key,
Size: int64(len(val)),
})
if err != nil {
return err
}
}
return err
}
// copyb returns a copy of byte-slice b.
func copyb(b []byte) []byte {
if b == nil {
return nil
}
p := make([]byte, len(b))
_ = copy(p, b)
return p
}

47
vendor/codeberg.org/gruf/go-storage/s3/errors.go generated vendored Normal file
View File

@@ -0,0 +1,47 @@
package s3
import (
"strings"
"codeberg.org/gruf/go-storage"
"codeberg.org/gruf/go-storage/internal"
"github.com/minio/minio-go/v7"
)
// transformS3Error transforms an error returned from S3Storage underlying
// minio.Core client, by wrapping where necessary with our own error types.
func transformS3Error(err error) error {
// Cast this to a minio error response
ersp, ok := err.(minio.ErrorResponse)
if ok {
switch ersp.Code {
case "NoSuchKey":
return internal.WrapErr(err, storage.ErrNotFound)
case "Conflict":
return internal.WrapErr(err, storage.ErrAlreadyExists)
default:
return err
}
}
// Check if error has an invalid object name prefix
if strings.HasPrefix(err.Error(), "Object name ") {
return internal.WrapErr(err, storage.ErrInvalidKey)
}
return err
}
func isNotFoundError(err error) bool {
errRsp, ok := err.(minio.ErrorResponse)
return ok && errRsp.Code == "NoSuchKey"
}
func isConflictError(err error) bool {
errRsp, ok := err.(minio.ErrorResponse)
return ok && errRsp.Code == "Conflict"
}
func isObjectNameError(err error) bool {
return strings.HasPrefix(err.Error(), "Object name ")
}

479
vendor/codeberg.org/gruf/go-storage/s3/s3.go generated vendored Normal file
View File

@@ -0,0 +1,479 @@
package s3
import (
"bytes"
"context"
"errors"
"io"
"codeberg.org/gruf/go-storage"
"codeberg.org/gruf/go-storage/internal"
"github.com/minio/minio-go/v7"
)
// ensure S3Storage conforms to storage.Storage.
var _ storage.Storage = (*S3Storage)(nil)
// ensure bytes.Reader conforms to ReaderSize.
var _ ReaderSize = (*bytes.Reader)(nil)
// ReaderSize is an extension of the io.Reader interface
// that may be implemented by callers of WriteStream() in
// order to improve performance. When the size is known it
// is passed onto the underlying minio S3 library.
type ReaderSize interface {
io.Reader
Size() int64
}
// DefaultConfig returns the default S3Storage configuration.
func DefaultConfig() Config {
return defaultConfig
}
// immutable default configuration.
var defaultConfig = Config{
CoreOpts: minio.Options{},
GetOpts: minio.GetObjectOptions{},
PutOpts: minio.PutObjectOptions{},
PutChunkOpts: minio.PutObjectPartOptions{},
PutChunkSize: 4 * 1024 * 1024, // 4MiB
StatOpts: minio.StatObjectOptions{},
RemoveOpts: minio.RemoveObjectOptions{},
ListSize: 200,
}
// Config defines options to be used when opening an S3Storage,
// mostly options for underlying S3 client library.
type Config struct {
// CoreOpts are S3 client options
// passed during initialization.
CoreOpts minio.Options
// GetOpts are S3 client options
// passed during .Read___() calls.
GetOpts minio.GetObjectOptions
// PutOpts are S3 client options
// passed during .Write___() calls.
PutOpts minio.PutObjectOptions
// PutChunkSize is the chunk size (in bytes)
// to use when sending a byte stream reader
// of unknown size as a multi-part object.
PutChunkSize int64
// PutChunkOpts are S3 client options
// passed during chunked .Write___() calls.
PutChunkOpts minio.PutObjectPartOptions
// StatOpts are S3 client options
// passed during .Stat() calls.
StatOpts minio.StatObjectOptions
// RemoveOpts are S3 client options
// passed during .Remove() calls.
RemoveOpts minio.RemoveObjectOptions
// ListSize determines how many items
// to include in each list request, made
// during calls to .WalkKeys().
ListSize int
}
// getS3Config returns valid (and owned!) Config for given ptr.
func getS3Config(cfg *Config) Config {
// See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
const minChunkSz = 5 * 1024 * 1024
if cfg == nil {
// use defaults.
return defaultConfig
}
// Ensure a minimum compat chunk size.
if cfg.PutChunkSize <= minChunkSz {
cfg.PutChunkSize = minChunkSz
}
// Ensure valid list size.
if cfg.ListSize <= 0 {
cfg.ListSize = 200
}
return Config{
CoreOpts: cfg.CoreOpts,
GetOpts: cfg.GetOpts,
PutOpts: cfg.PutOpts,
PutChunkSize: cfg.PutChunkSize,
ListSize: cfg.ListSize,
StatOpts: cfg.StatOpts,
RemoveOpts: cfg.RemoveOpts,
}
}
// S3Storage is a storage implementation that stores key-value
// pairs in an S3 instance at given endpoint with bucket name.
type S3Storage struct {
client *minio.Core
bucket string
config Config
}
// Open opens a new S3Storage instance with given S3 endpoint URL, bucket name and configuration.
func Open(endpoint string, bucket string, cfg *Config) (*S3Storage, error) {
// Check + set config defaults.
config := getS3Config(cfg)
// Create new S3 client connection to given endpoint.
client, err := minio.NewCore(endpoint, &config.CoreOpts)
if err != nil {
return nil, err
}
ctx := context.Background()
// Check that provided bucket actually exists.
exists, err := client.BucketExists(ctx, bucket)
if err != nil {
return nil, err
} else if !exists {
return nil, errors.New("storage/s3: bucket does not exist")
}
return &S3Storage{
client: client,
bucket: bucket,
config: config,
}, nil
}
// Client: returns access to the underlying S3 client.
func (st *S3Storage) Client() *minio.Core {
return st.client
}
// Clean: implements Storage.Clean().
func (st *S3Storage) Clean(ctx context.Context) error {
return nil // nothing to do for S3
}
// ReadBytes: implements Storage.ReadBytes().
func (st *S3Storage) ReadBytes(ctx context.Context, key string) ([]byte, error) {
// Get stream reader for key
rc, err := st.ReadStream(ctx, key)
if err != nil {
return nil, err
}
// Read all data to memory.
data, err := io.ReadAll(rc)
if err != nil {
_ = rc.Close()
return nil, err
}
// Close storage stream reader.
if err := rc.Close(); err != nil {
return nil, err
}
return data, nil
}
// ReadStream: implements Storage.ReadStream().
func (st *S3Storage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) {
// Fetch object reader from S3 bucket
rc, _, _, err := st.client.GetObject(
ctx,
st.bucket,
key,
st.config.GetOpts,
)
if err != nil {
if isNotFoundError(err) {
// Wrap not found errors as our not found type.
err = internal.WrapErr(err, storage.ErrNotFound)
} else if !isObjectNameError(err) {
// Wrap object name errors as our invalid key type.
err = internal.WrapErr(err, storage.ErrInvalidKey)
}
return nil, transformS3Error(err)
}
return rc, nil
}
// WriteBytes: implements Storage.WriteBytes().
func (st *S3Storage) WriteBytes(ctx context.Context, key string, value []byte) (int, error) {
n, err := st.WriteStream(ctx, key, bytes.NewReader(value))
return int(n), err
}
// WriteStream: implements Storage.WriteStream().
func (st *S3Storage) WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) {
if rs, ok := r.(ReaderSize); ok {
// This reader supports providing us the size of
// the encompassed data, allowing us to perform
// a singular .PutObject() call with length.
info, err := st.client.PutObject(
ctx,
st.bucket,
key,
r,
rs.Size(),
"",
"",
st.config.PutOpts,
)
if err != nil {
if isConflictError(err) {
// Wrap conflict errors as our already exists type.
err = internal.WrapErr(err, storage.ErrAlreadyExists)
} else if !isObjectNameError(err) {
// Wrap object name errors as our invalid key type.
err = internal.WrapErr(err, storage.ErrInvalidKey)
}
return 0, err
}
return info.Size, nil
}
// Start a new multipart upload to get ID.
uploadID, err := st.client.NewMultipartUpload(
ctx,
st.bucket,
key,
st.config.PutOpts,
)
if err != nil {
if isConflictError(err) {
// Wrap conflict errors as our already exists type.
err = internal.WrapErr(err, storage.ErrAlreadyExists)
} else if !isObjectNameError(err) {
// Wrap object name errors as our invalid key type.
err = internal.WrapErr(err, storage.ErrInvalidKey)
}
return 0, transformS3Error(err)
}
var (
index = int(1) // parts index
total = int64(0)
parts []minio.CompletePart
chunk = make([]byte, st.config.PutChunkSize)
rbuf = bytes.NewReader(nil)
)
// Note that we do not perform any kind of
// memory pooling of the chunk buffers here.
// Optimal chunking sizes for S3 writes are in
// the orders of megabytes, so letting the GC
// collect these ASAP is much preferred.
loop:
for done := false; !done; {
// Read next chunk into byte buffer.
n, err := io.ReadFull(r, chunk)
switch err {
// Successful read.
case nil:
// Reached end, buffer empty.
case io.EOF:
break loop
// Reached end, but buffer not empty.
case io.ErrUnexpectedEOF:
done = true
// All other errors.
default:
return 0, err
}
// Reset byte reader.
rbuf.Reset(chunk[:n])
// Put this object chunk in S3 store.
pt, err := st.client.PutObjectPart(
ctx,
st.bucket,
key,
uploadID,
index,
rbuf,
int64(n),
st.config.PutChunkOpts,
)
if err != nil {
return 0, err
}
// Append completed part to slice.
parts = append(parts, minio.CompletePart{
PartNumber: pt.PartNumber,
ETag: pt.ETag,
ChecksumCRC32: pt.ChecksumCRC32,
ChecksumCRC32C: pt.ChecksumCRC32C,
ChecksumSHA1: pt.ChecksumSHA1,
ChecksumSHA256: pt.ChecksumSHA256,
})
// Iterate.
index++
// Update total size.
total += pt.Size
}
// Complete this multi-part upload operation
_, err = st.client.CompleteMultipartUpload(
ctx,
st.bucket,
key,
uploadID,
parts,
st.config.PutOpts,
)
if err != nil {
return 0, err
}
return total, nil
}
// Stat: implements Storage.Stat().
func (st *S3Storage) Stat(ctx context.Context, key string) (*storage.Entry, error) {
// Query object in S3 bucket.
stat, err := st.client.StatObject(
ctx,
st.bucket,
key,
st.config.StatOpts,
)
if err != nil {
if isNotFoundError(err) {
// Ignore err return
// for not-found.
err = nil
} else if !isObjectNameError(err) {
// Wrap object name errors as our invalid key type.
err = internal.WrapErr(err, storage.ErrInvalidKey)
}
return nil, err
}
return &storage.Entry{
Key: key,
Size: stat.Size,
}, nil
}
// Remove: implements Storage.Remove().
func (st *S3Storage) Remove(ctx context.Context, key string) error {
// Query object in S3 bucket.
_, err := st.client.StatObject(
ctx,
st.bucket,
key,
st.config.StatOpts,
)
if err != nil {
if isNotFoundError(err) {
// Wrap not found errors as our not found type.
err = internal.WrapErr(err, storage.ErrNotFound)
} else if !isObjectNameError(err) {
// Wrap object name errors as our invalid key type.
err = internal.WrapErr(err, storage.ErrInvalidKey)
}
return err
}
// Remove object from S3 bucket
err = st.client.RemoveObject(
ctx,
st.bucket,
key,
st.config.RemoveOpts,
)
if err != nil {
if isNotFoundError(err) {
// Wrap not found errors as our not found type.
err = internal.WrapErr(err, storage.ErrNotFound)
} else if !isObjectNameError(err) {
// Wrap object name errors as our invalid key type.
err = internal.WrapErr(err, storage.ErrInvalidKey)
}
return err
}
return nil
}
// WalkKeys: implements Storage.WalkKeys().
func (st *S3Storage) WalkKeys(ctx context.Context, opts storage.WalkKeysOpts) error {
if opts.Step == nil {
panic("nil step fn")
}
var (
prev string
token string
)
for {
// List objects in bucket starting at marker.
result, err := st.client.ListObjectsV2(
st.bucket,
opts.Prefix,
prev,
token,
"",
st.config.ListSize,
)
if err != nil {
return err
}
// Iterate through list result contents.
for _, obj := range result.Contents {
// Skip filtered obj keys.
if opts.Filter != nil &&
opts.Filter(obj.Key) {
continue
}
// Pass each obj through step func.
if err := opts.Step(storage.Entry{
Key: obj.Key,
Size: obj.Size,
}); err != nil {
return err
}
}
// No token means we reached end of bucket.
if result.NextContinuationToken == "" {
return nil
}
// Set continue token and prev mark
token = result.NextContinuationToken
prev = result.StartAfter
}
}

73
vendor/codeberg.org/gruf/go-storage/storage.go generated vendored Normal file
View File

@@ -0,0 +1,73 @@
package storage
import (
"context"
"io"
)
// Storage defines a means of accessing and storing
// data to some abstracted underlying mechanism. Whether
// that be in-memory, an on-disk filesystem or S3 bucket.
type Storage interface {
// ReadBytes returns the data located at key (e.g. filepath) in storage.
ReadBytes(ctx context.Context, key string) ([]byte, error)
// ReadStream returns an io.ReadCloser for the data at key (e.g. filepath) in storage.
ReadStream(ctx context.Context, key string) (io.ReadCloser, error)
// WriteBytes writes the supplied data at key (e.g. filepath) in storage.
WriteBytes(ctx context.Context, key string, data []byte) (int, error)
// WriteStream writes the supplied data stream at key (e.g. filepath) in storage.
WriteStream(ctx context.Context, key string, stream io.Reader) (int64, error)
// Stat returns details about key (e.g. filepath) in storage, nil indicates not found.
Stat(ctx context.Context, key string) (*Entry, error)
// Remove will remove data at key from storage.
Remove(ctx context.Context, key string) error
// Clean in simple terms performs a clean of underlying
// storage mechanism. For memory implementations this may
// compact the underlying hashmap, for disk filesystems
// this may remove now-unused directories.
Clean(ctx context.Context) error
// WalkKeys walks available keys using opts in storage.
WalkKeys(ctx context.Context, opts WalkKeysOpts) error
}
// Entry represents a key in a Storage{} implementation,
// with any associated metadata that may have been set.
type Entry struct {
// Key is this entry's
// unique storage key.
Key string
// Size is the size of
// this entry in storage.
Size int64
}
// WalkKeysOpts are arguments provided
// to a storage WalkKeys() implementation.
type WalkKeysOpts struct {
// Prefix can be used to filter entries
// by the given key prefix, for example
// only those under a subdirectory. This
// is preferred over Filter() function.
Prefix string
// Filter can be used to filter entries
// by any custom metric before before they
// are passed to Step() function. E.g.
// filter storage entries by regexp.
Filter func(string) bool
// Step is called for each entry during
// WalkKeys, error triggers early return.
Step func(Entry) error
}

29
vendor/codeberg.org/gruf/go-storage/test.sh generated vendored Normal file
View File

@@ -0,0 +1,29 @@
#!/bin/sh
export \
MINIO_ADDR='127.0.0.1:8080' \
MINIO_BUCKET='test' \
MINIO_ROOT_USER='root' \
MINIO_ROOT_PASSWORD='password' \
MINIO_PID=0 \
S3_DIR=$(mktemp -d)
# Drop the test S3 bucket and kill minio on exit
trap 'rm -rf "$S3_DIR"; [ $MINIO_PID -ne 0 ] && kill -9 $MINIO_PID' \
HUP INT QUIT ABRT KILL TERM EXIT
# Create required S3 bucket dir
mkdir -p "${S3_DIR}/${MINIO_BUCKET}"
# Start the minio test S3 server instance
minio server --address "$MINIO_ADDR" "$S3_DIR" & > /dev/null 2>&1
MINIO_PID=$!; [ $? -ne 0 ] && {
echo 'failed to start minio'
exit 1
}
# Let server startup
sleep 1
# Run go-store tests
go test ./... -v

View File

@@ -1,303 +0,0 @@
package storage
import (
"bytes"
"io"
"sync"
"codeberg.org/gruf/go-iotools"
"github.com/klauspost/compress/gzip"
"github.com/klauspost/compress/snappy"
"github.com/klauspost/compress/zlib"
)
// Compressor defines a means of compressing/decompressing values going into a key-value store
type Compressor interface {
// Reader returns a new decompressing io.ReadCloser based on supplied (compressed) io.Reader
Reader(io.ReadCloser) (io.ReadCloser, error)
// Writer returns a new compressing io.WriteCloser based on supplied (uncompressed) io.Writer
Writer(io.WriteCloser) (io.WriteCloser, error)
}
type gzipCompressor struct {
rpool sync.Pool
wpool sync.Pool
}
// GZipCompressor returns a new Compressor that implements GZip at default compression level
func GZipCompressor() Compressor {
return GZipCompressorLevel(gzip.DefaultCompression)
}
// GZipCompressorLevel returns a new Compressor that implements GZip at supplied compression level
func GZipCompressorLevel(level int) Compressor {
// GZip readers immediately check for valid
// header data on allocation / reset, so we
// need a set of valid header data so we can
// iniitialize reader instances in mempool.
hdr := bytes.NewBuffer(nil)
// Init writer to ensure valid level provided
gw, err := gzip.NewWriterLevel(hdr, level)
if err != nil {
panic(err)
}
// Write empty data to ensure gzip
// header data is in byte buffer.
_, _ = gw.Write([]byte{})
_ = gw.Close()
return &gzipCompressor{
rpool: sync.Pool{
New: func() any {
hdr := bytes.NewReader(hdr.Bytes())
gr, _ := gzip.NewReader(hdr)
return gr
},
},
wpool: sync.Pool{
New: func() any {
gw, _ := gzip.NewWriterLevel(nil, level)
return gw
},
},
}
}
func (c *gzipCompressor) Reader(rc io.ReadCloser) (io.ReadCloser, error) {
var released bool
// Acquire from pool.
gr := c.rpool.Get().(*gzip.Reader)
if err := gr.Reset(rc); err != nil {
c.rpool.Put(gr)
return nil, err
}
return iotools.ReadCloser(gr, iotools.CloserFunc(func() error {
if !released {
released = true
defer c.rpool.Put(gr)
}
// Close compressor
err1 := gr.Close()
// Close original stream.
err2 := rc.Close()
// Return err1 or 2
if err1 != nil {
return err1
}
return err2
})), nil
}
func (c *gzipCompressor) Writer(wc io.WriteCloser) (io.WriteCloser, error) {
var released bool
// Acquire from pool.
gw := c.wpool.Get().(*gzip.Writer)
gw.Reset(wc)
return iotools.WriteCloser(gw, iotools.CloserFunc(func() error {
if !released {
released = true
c.wpool.Put(gw)
}
// Close compressor
err1 := gw.Close()
// Close original stream.
err2 := wc.Close()
// Return err1 or 2
if err1 != nil {
return err1
}
return err2
})), nil
}
type zlibCompressor struct {
rpool sync.Pool
wpool sync.Pool
dict []byte
}
// ZLibCompressor returns a new Compressor that implements ZLib at default compression level
func ZLibCompressor() Compressor {
return ZLibCompressorLevelDict(zlib.DefaultCompression, nil)
}
// ZLibCompressorLevel returns a new Compressor that implements ZLib at supplied compression level
func ZLibCompressorLevel(level int) Compressor {
return ZLibCompressorLevelDict(level, nil)
}
// ZLibCompressorLevelDict returns a new Compressor that implements ZLib at supplied compression level with supplied dict
func ZLibCompressorLevelDict(level int, dict []byte) Compressor {
// ZLib readers immediately check for valid
// header data on allocation / reset, so we
// need a set of valid header data so we can
// iniitialize reader instances in mempool.
hdr := bytes.NewBuffer(nil)
// Init writer to ensure valid level + dict provided
zw, err := zlib.NewWriterLevelDict(hdr, level, dict)
if err != nil {
panic(err)
}
// Write empty data to ensure zlib
// header data is in byte buffer.
zw.Write([]byte{})
zw.Close()
return &zlibCompressor{
rpool: sync.Pool{
New: func() any {
hdr := bytes.NewReader(hdr.Bytes())
zr, _ := zlib.NewReaderDict(hdr, dict)
return zr
},
},
wpool: sync.Pool{
New: func() any {
zw, _ := zlib.NewWriterLevelDict(nil, level, dict)
return zw
},
},
dict: dict,
}
}
func (c *zlibCompressor) Reader(rc io.ReadCloser) (io.ReadCloser, error) {
var released bool
zr := c.rpool.Get().(interface {
io.ReadCloser
zlib.Resetter
})
if err := zr.Reset(rc, c.dict); err != nil {
c.rpool.Put(zr)
return nil, err
}
return iotools.ReadCloser(zr, iotools.CloserFunc(func() error {
if !released {
released = true
defer c.rpool.Put(zr)
}
// Close compressor
err1 := zr.Close()
// Close original stream.
err2 := rc.Close()
// Return err1 or 2
if err1 != nil {
return err1
}
return err2
})), nil
}
func (c *zlibCompressor) Writer(wc io.WriteCloser) (io.WriteCloser, error) {
var released bool
// Acquire from pool.
zw := c.wpool.Get().(*zlib.Writer)
zw.Reset(wc)
return iotools.WriteCloser(zw, iotools.CloserFunc(func() error {
if !released {
released = true
c.wpool.Put(zw)
}
// Close compressor
err1 := zw.Close()
// Close original stream.
err2 := wc.Close()
// Return err1 or 2
if err1 != nil {
return err1
}
return err2
})), nil
}
type snappyCompressor struct {
rpool sync.Pool
wpool sync.Pool
}
// SnappyCompressor returns a new Compressor that implements Snappy.
func SnappyCompressor() Compressor {
return &snappyCompressor{
rpool: sync.Pool{
New: func() any { return snappy.NewReader(nil) },
},
wpool: sync.Pool{
New: func() any { return snappy.NewWriter(nil) },
},
}
}
func (c *snappyCompressor) Reader(rc io.ReadCloser) (io.ReadCloser, error) {
var released bool
// Acquire from pool.
sr := c.rpool.Get().(*snappy.Reader)
sr.Reset(rc)
return iotools.ReadCloser(sr, iotools.CloserFunc(func() error {
if !released {
released = true
defer c.rpool.Put(sr)
}
// Close original stream.
return rc.Close()
})), nil
}
func (c *snappyCompressor) Writer(wc io.WriteCloser) (io.WriteCloser, error) {
var released bool
// Acquire from pool.
sw := c.wpool.Get().(*snappy.Writer)
sw.Reset(wc)
return iotools.WriteCloser(sw, iotools.CloserFunc(func() error {
if !released {
released = true
c.wpool.Put(sw)
}
// Close original stream.
return wc.Close()
})), nil
}
type nopCompressor struct{}
// NoCompression is a Compressor that simply does nothing.
func NoCompression() Compressor {
return &nopCompressor{}
}
func (c *nopCompressor) Reader(rc io.ReadCloser) (io.ReadCloser, error) {
return rc, nil
}
func (c *nopCompressor) Writer(wc io.WriteCloser) (io.WriteCloser, error) {
return wc, nil
}

View File

@@ -1,424 +0,0 @@
package storage
import (
"context"
"errors"
"io"
"io/fs"
"os"
"path"
_path "path"
"strings"
"syscall"
"codeberg.org/gruf/go-bytes"
"codeberg.org/gruf/go-fastcopy"
"codeberg.org/gruf/go-store/v2/util"
)
// DefaultDiskConfig is the default DiskStorage configuration.
var DefaultDiskConfig = &DiskConfig{
Overwrite: true,
WriteBufSize: 4096,
Transform: NopTransform(),
Compression: NoCompression(),
}
// DiskConfig defines options to be used when opening a DiskStorage.
type DiskConfig struct {
// Transform is the supplied key <--> path KeyTransform.
Transform KeyTransform
// WriteBufSize is the buffer size to use when writing file streams.
WriteBufSize int
// Overwrite allows overwriting values of stored keys in the storage.
Overwrite bool
// LockFile allows specifying the filesystem path to use for the lockfile,
// providing only a filename it will store the lockfile within provided store
// path and nest the store under `path/store` to prevent access to lockfile.
LockFile string
// Compression is the Compressor to use when reading / writing files,
// default is no compression.
Compression Compressor
}
// getDiskConfig returns a valid DiskConfig for supplied ptr.
func getDiskConfig(cfg *DiskConfig) DiskConfig {
// If nil, use default
if cfg == nil {
cfg = DefaultDiskConfig
}
// Assume nil transform == none
if cfg.Transform == nil {
cfg.Transform = NopTransform()
}
// Assume nil compress == none
if cfg.Compression == nil {
cfg.Compression = NoCompression()
}
// Assume 0 buf size == use default
if cfg.WriteBufSize <= 0 {
cfg.WriteBufSize = DefaultDiskConfig.WriteBufSize
}
// Assume empty lockfile path == use default
if len(cfg.LockFile) == 0 {
cfg.LockFile = LockFile
}
// Return owned config copy
return DiskConfig{
Transform: cfg.Transform,
WriteBufSize: cfg.WriteBufSize,
Overwrite: cfg.Overwrite,
LockFile: cfg.LockFile,
Compression: cfg.Compression,
}
}
// DiskStorage is a Storage implementation that stores directly to a filesystem.
type DiskStorage struct {
path string // path is the root path of this store
cppool fastcopy.CopyPool // cppool is the prepared io copier with buffer pool
config DiskConfig // cfg is the supplied configuration for this store
lock *Lock // lock is the opened lockfile for this storage instance
}
// OpenDisk opens a DiskStorage instance for given folder path and configuration.
func OpenDisk(path string, cfg *DiskConfig) (*DiskStorage, error) {
// Get checked config
config := getDiskConfig(cfg)
// Acquire path builder
pb := util.GetPathBuilder()
defer util.PutPathBuilder(pb)
// Clean provided store path, ensure
// ends in '/' to help later path trimming
storePath := pb.Clean(path) + "/"
// Clean provided lockfile path
lockfile := pb.Clean(config.LockFile)
// Check if lockfile is an *actual* path or just filename
if lockDir, _ := _path.Split(lockfile); lockDir == "" {
// Lockfile is a filename, store must be nested under
// $storePath/store to prevent access to the lockfile
storePath += "store/"
lockfile = pb.Join(path, lockfile)
}
// Attempt to open dir path
file, err := os.OpenFile(storePath, defaultFileROFlags, defaultDirPerms)
if err != nil {
// If not a not-exist error, return
if !os.IsNotExist(err) {
return nil, err
}
// Attempt to make store path dirs
err = os.MkdirAll(storePath, defaultDirPerms)
if err != nil {
return nil, err
}
// Reopen dir now it's been created
file, err = os.OpenFile(storePath, defaultFileROFlags, defaultDirPerms)
if err != nil {
return nil, err
}
}
defer file.Close()
// Double check this is a dir (NOT a file!)
stat, err := file.Stat()
if err != nil {
return nil, err
} else if !stat.IsDir() {
return nil, errors.New("store/storage: path is file")
}
// Open and acquire storage lock for path
lock, err := OpenLock(lockfile)
if err != nil {
return nil, err
}
// Prepare DiskStorage
st := &DiskStorage{
path: storePath,
config: config,
lock: lock,
}
// Set copypool buffer size
st.cppool.Buffer(config.WriteBufSize)
return st, nil
}
// Clean implements Storage.Clean().
func (st *DiskStorage) Clean(ctx context.Context) error {
// Check if open
if st.lock.Closed() {
return ErrClosed
}
// Check context still valid
if err := ctx.Err(); err != nil {
return err
}
// Clean-out unused directories
return cleanDirs(st.path)
}
// ReadBytes implements Storage.ReadBytes().
func (st *DiskStorage) ReadBytes(ctx context.Context, key string) ([]byte, error) {
// Get stream reader for key
rc, err := st.ReadStream(ctx, key)
if err != nil {
return nil, err
}
defer rc.Close()
// Read all bytes and return
return io.ReadAll(rc)
}
// ReadStream implements Storage.ReadStream().
func (st *DiskStorage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) {
// Get file path for key
kpath, err := st.Filepath(key)
if err != nil {
return nil, err
}
// Check if open
if st.lock.Closed() {
return nil, ErrClosed
}
// Check context still valid
if err := ctx.Err(); err != nil {
return nil, err
}
// Attempt to open file (replace ENOENT with our own)
file, err := open(kpath, defaultFileROFlags)
if err != nil {
return nil, errSwapNotFound(err)
}
// Wrap the file in a compressor
cFile, err := st.config.Compression.Reader(file)
if err != nil {
_ = file.Close()
return nil, err
}
return cFile, nil
}
// WriteBytes implements Storage.WriteBytes().
func (st *DiskStorage) WriteBytes(ctx context.Context, key string, value []byte) (int, error) {
n, err := st.WriteStream(ctx, key, bytes.NewReader(value))
return int(n), err
}
// WriteStream implements Storage.WriteStream().
func (st *DiskStorage) WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) {
// Get file path for key
kpath, err := st.Filepath(key)
if err != nil {
return 0, err
}
// Check if open
if st.lock.Closed() {
return 0, ErrClosed
}
// Check context still valid
if err := ctx.Err(); err != nil {
return 0, err
}
// Ensure dirs leading up to file exist
err = os.MkdirAll(path.Dir(kpath), defaultDirPerms)
if err != nil {
return 0, err
}
// Prepare to swap error if need-be
errSwap := errSwapNoop
// Build file RW flags
flags := defaultFileRWFlags
if !st.config.Overwrite {
flags |= syscall.O_EXCL
// Catch + replace err exist
errSwap = errSwapExist
}
// Attempt to open file
file, err := open(kpath, flags)
if err != nil {
return 0, errSwap(err)
}
// Wrap the file in a compressor
cFile, err := st.config.Compression.Writer(file)
if err != nil {
_ = file.Close()
return 0, err
}
// Wraps file.Close().
defer cFile.Close()
// Copy provided reader to file
return st.cppool.Copy(cFile, r)
}
// Stat implements Storage.Stat().
func (st *DiskStorage) Stat(ctx context.Context, key string) (bool, error) {
// Get file path for key
kpath, err := st.Filepath(key)
if err != nil {
return false, err
}
// Check if open
if st.lock.Closed() {
return false, ErrClosed
}
// Check context still valid
if err := ctx.Err(); err != nil {
return false, err
}
// Check for file on disk
return stat(kpath)
}
// Remove implements Storage.Remove().
func (st *DiskStorage) Remove(ctx context.Context, key string) error {
// Get file path for key
kpath, err := st.Filepath(key)
if err != nil {
return err
}
// Check if open
if st.lock.Closed() {
return ErrClosed
}
// Check context still valid
if err := ctx.Err(); err != nil {
return err
}
// Remove at path (we know this is file)
if err := unlink(kpath); err != nil {
return errSwapNotFound(err)
}
return nil
}
// Close implements Storage.Close().
func (st *DiskStorage) Close() error {
return st.lock.Close()
}
// WalkKeys implements Storage.WalkKeys().
func (st *DiskStorage) WalkKeys(ctx context.Context, opts WalkKeysOptions) error {
// Check if open
if st.lock.Closed() {
return ErrClosed
}
// Check context still valid
if err := ctx.Err(); err != nil {
return err
}
// Acquire path builder
pb := util.GetPathBuilder()
defer util.PutPathBuilder(pb)
// Walk dir for entries
return walkDir(pb, st.path, func(kpath string, fsentry fs.DirEntry) error {
if !fsentry.Type().IsRegular() {
// Only deal with regular files
return nil
}
// Get full item path (without root)
kpath = pb.Join(kpath, fsentry.Name())
kpath = kpath[len(st.path):]
// Load file info. This should already
// be loaded due to the underlying call
// to os.File{}.ReadDir() populating them
info, err := fsentry.Info()
if err != nil {
return err
}
// Perform provided walk function
return opts.WalkFn(ctx, Entry{
Key: st.config.Transform.PathToKey(kpath),
Size: info.Size(),
})
})
}
// Filepath checks and returns a formatted Filepath for given key.
func (st *DiskStorage) Filepath(key string) (string, error) {
// Calculate transformed key path
key = st.config.Transform.KeyToPath(key)
// Acquire path builder
pb := util.GetPathBuilder()
defer util.PutPathBuilder(pb)
// Generate key path
pb.Append(st.path)
pb.Append(key)
// Check for dir traversal outside of root
if isDirTraversal(st.path, pb.String()) {
return "", ErrInvalidKey
}
return string(pb.B), nil
}
// isDirTraversal will check if rootPlusPath is a dir traversal outside of root,
// assuming that both are cleaned and that rootPlusPath is path.Join(root, somePath).
func isDirTraversal(root, rootPlusPath string) bool {
switch {
// Root is $PWD, check for traversal out of
case root == ".":
return strings.HasPrefix(rootPlusPath, "../")
// The path MUST be prefixed by root
case !strings.HasPrefix(rootPlusPath, root):
return true
// In all other cases, check not equal
default:
return len(root) == len(rootPlusPath)
}
}

View File

@@ -1,110 +0,0 @@
package storage
import (
"errors"
"strings"
"syscall"
"github.com/minio/minio-go/v7"
)
var (
// ErrClosed is returned on operations on a closed storage
ErrClosed = new_error("closed")
// ErrNotFound is the error returned when a key cannot be found in storage
ErrNotFound = new_error("key not found")
// ErrAlreadyExist is the error returned when a key already exists in storage
ErrAlreadyExists = new_error("key already exists")
// ErrInvalidkey is the error returned when an invalid key is passed to storage
ErrInvalidKey = new_error("invalid key")
// ErrAlreadyLocked is returned on fail opening a storage lockfile
ErrAlreadyLocked = new_error("storage lock already open")
)
// new_error returns a new error instance prefixed by package prefix.
func new_error(msg string) error {
return errors.New("store/storage: " + msg)
}
// wrappedError allows wrapping together an inner with outer error.
type wrappedError struct {
inner error
outer error
}
// wrap will return a new wrapped error from given inner and outer errors.
func wrap(outer, inner error) *wrappedError {
return &wrappedError{
inner: inner,
outer: outer,
}
}
func (e *wrappedError) Is(target error) bool {
return e.outer == target || e.inner == target
}
func (e *wrappedError) Error() string {
return e.outer.Error() + ": " + e.inner.Error()
}
func (e *wrappedError) Unwrap() error {
return e.inner
}
// errSwapNoop performs no error swaps
func errSwapNoop(err error) error {
return err
}
// ErrSwapNotFound swaps syscall.ENOENT for ErrNotFound
func errSwapNotFound(err error) error {
if err == syscall.ENOENT {
return ErrNotFound
}
return err
}
// errSwapExist swaps syscall.EEXIST for ErrAlreadyExists
func errSwapExist(err error) error {
if err == syscall.EEXIST {
return ErrAlreadyExists
}
return err
}
// errSwapUnavailable swaps syscall.EAGAIN for ErrAlreadyLocked
func errSwapUnavailable(err error) error {
if err == syscall.EAGAIN {
return ErrAlreadyLocked
}
return err
}
// transformS3Error transforms an error returned from S3Storage underlying
// minio.Core client, by wrapping where necessary with our own error types.
func transformS3Error(err error) error {
// Cast this to a minio error response
ersp, ok := err.(minio.ErrorResponse)
if ok {
switch ersp.Code {
case "NoSuchKey":
return wrap(ErrNotFound, err)
case "Conflict":
return wrap(ErrAlreadyExists, err)
default:
return err
}
}
// Check if error has an invalid object name prefix
if strings.HasPrefix(err.Error(), "Object name ") {
return wrap(ErrInvalidKey, err)
}
return err
}

View File

@@ -1,59 +0,0 @@
package storage
import (
"sync/atomic"
"syscall"
)
// LockFile is our standard lockfile name.
const LockFile = "store.lock"
// Lock represents a filesystem lock to ensure only one storage instance open per path.
type Lock struct {
fd int
st uint32
}
// OpenLock opens a lockfile at path.
func OpenLock(path string) (*Lock, error) {
var fd int
// Open the file descriptor at path
err := retryOnEINTR(func() (err error) {
fd, err = syscall.Open(path, defaultFileLockFlags, defaultFilePerms)
return
})
if err != nil {
return nil, err
}
// Get a flock on the file descriptor
err = retryOnEINTR(func() error {
return syscall.Flock(fd, syscall.LOCK_EX|syscall.LOCK_NB)
})
if err != nil {
return nil, errSwapUnavailable(err)
}
return &Lock{fd: fd}, nil
}
// Close will attempt to close the lockfile and file descriptor.
func (f *Lock) Close() error {
var err error
if atomic.CompareAndSwapUint32(&f.st, 0, 1) {
// Ensure gets closed
defer syscall.Close(f.fd)
// Call funlock on the file descriptor
err = retryOnEINTR(func() error {
return syscall.Flock(f.fd, syscall.LOCK_UN|syscall.LOCK_NB)
})
}
return err
}
// Closed will return whether this lockfile has been closed (and unlocked).
func (f *Lock) Closed() bool {
return (atomic.LoadUint32(&f.st) == 1)
}

View File

@@ -1,228 +0,0 @@
package storage
import (
"context"
"io"
"sync/atomic"
"codeberg.org/gruf/go-bytes"
"codeberg.org/gruf/go-iotools"
"github.com/cornelk/hashmap"
)
// MemoryStorage is a storage implementation that simply stores key-value
// pairs in a Go map in-memory. The map is protected by a mutex.
type MemoryStorage struct {
ow bool // overwrites
fs *hashmap.Map[string, []byte]
st uint32
}
// OpenMemory opens a new MemoryStorage instance with internal map starting size.
func OpenMemory(size int, overwrites bool) *MemoryStorage {
if size <= 0 {
size = 8
}
return &MemoryStorage{
fs: hashmap.NewSized[string, []byte](uintptr(size)),
ow: overwrites,
}
}
// Clean implements Storage.Clean().
func (st *MemoryStorage) Clean(ctx context.Context) error {
// Check store open
if st.closed() {
return ErrClosed
}
// Check context still valid
if err := ctx.Err(); err != nil {
return err
}
return nil
}
// ReadBytes implements Storage.ReadBytes().
func (st *MemoryStorage) ReadBytes(ctx context.Context, key string) ([]byte, error) {
// Check store open
if st.closed() {
return nil, ErrClosed
}
// Check context still valid
if err := ctx.Err(); err != nil {
return nil, err
}
// Check for key in store
b, ok := st.fs.Get(key)
if !ok {
return nil, ErrNotFound
}
// Create return copy
return copyb(b), nil
}
// ReadStream implements Storage.ReadStream().
func (st *MemoryStorage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) {
// Check store open
if st.closed() {
return nil, ErrClosed
}
// Check context still valid
if err := ctx.Err(); err != nil {
return nil, err
}
// Check for key in store
b, ok := st.fs.Get(key)
if !ok {
return nil, ErrNotFound
}
// Create io.ReadCloser from 'b' copy
r := bytes.NewReader(copyb(b))
return iotools.NopReadCloser(r), nil
}
// WriteBytes implements Storage.WriteBytes().
func (st *MemoryStorage) WriteBytes(ctx context.Context, key string, b []byte) (int, error) {
// Check store open
if st.closed() {
return 0, ErrClosed
}
// Check context still valid
if err := ctx.Err(); err != nil {
return 0, err
}
// Check for key that already exists
if _, ok := st.fs.Get(key); ok && !st.ow {
return 0, ErrAlreadyExists
}
// Write key copy to store
st.fs.Set(key, copyb(b))
return len(b), nil
}
// WriteStream implements Storage.WriteStream().
func (st *MemoryStorage) WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) {
// Check store open
if st.closed() {
return 0, ErrClosed
}
// Check context still valid
if err := ctx.Err(); err != nil {
return 0, err
}
// Check for key that already exists
if _, ok := st.fs.Get(key); ok && !st.ow {
return 0, ErrAlreadyExists
}
// Read all from reader
b, err := io.ReadAll(r)
if err != nil {
return 0, err
}
// Write key to store
st.fs.Set(key, b)
return int64(len(b)), nil
}
// Stat implements Storage.Stat().
func (st *MemoryStorage) Stat(ctx context.Context, key string) (bool, error) {
// Check store open
if st.closed() {
return false, ErrClosed
}
// Check context still valid
if err := ctx.Err(); err != nil {
return false, err
}
// Check for key in store
_, ok := st.fs.Get(key)
return ok, nil
}
// Remove implements Storage.Remove().
func (st *MemoryStorage) Remove(ctx context.Context, key string) error {
// Check store open
if st.closed() {
return ErrClosed
}
// Check context still valid
if err := ctx.Err(); err != nil {
return err
}
// Attempt to delete key
ok := st.fs.Del(key)
if !ok {
return ErrNotFound
}
return nil
}
// WalkKeys implements Storage.WalkKeys().
func (st *MemoryStorage) WalkKeys(ctx context.Context, opts WalkKeysOptions) error {
// Check store open
if st.closed() {
return ErrClosed
}
// Check context still valid
if err := ctx.Err(); err != nil {
return err
}
var err error
// Nil check func
_ = opts.WalkFn
// Pass each key in map to walk function
st.fs.Range(func(key string, val []byte) bool {
err = opts.WalkFn(ctx, Entry{
Key: key,
Size: int64(len(val)),
})
return (err == nil)
})
return err
}
// Close implements Storage.Close().
func (st *MemoryStorage) Close() error {
atomic.StoreUint32(&st.st, 1)
return nil
}
// closed returns whether MemoryStorage is closed.
func (st *MemoryStorage) closed() bool {
return (atomic.LoadUint32(&st.st) == 1)
}
// copyb returns a copy of byte-slice b.
func copyb(b []byte) []byte {
if b == nil {
return nil
}
p := make([]byte, len(b))
_ = copy(p, b)
return p
}

View File

@@ -1,397 +0,0 @@
package storage
import (
"bytes"
"context"
"io"
"sync/atomic"
"codeberg.org/gruf/go-store/v2/util"
"github.com/minio/minio-go/v7"
)
// DefaultS3Config is the default S3Storage configuration.
var DefaultS3Config = &S3Config{
CoreOpts: minio.Options{},
GetOpts: minio.GetObjectOptions{},
PutOpts: minio.PutObjectOptions{},
PutChunkOpts: minio.PutObjectPartOptions{},
PutChunkSize: 4 * 1024 * 1024, // 4MiB
StatOpts: minio.StatObjectOptions{},
RemoveOpts: minio.RemoveObjectOptions{},
ListSize: 200,
}
// S3Config defines options to be used when opening an S3Storage,
// mostly options for underlying S3 client library.
type S3Config struct {
// CoreOpts are S3 client options passed during initialization.
CoreOpts minio.Options
// GetOpts are S3 client options passed during .Read___() calls.
GetOpts minio.GetObjectOptions
// PutOpts are S3 client options passed during .Write___() calls.
PutOpts minio.PutObjectOptions
// PutChunkSize is the chunk size (in bytes) to use when sending
// a byte stream reader of unknown size as a multi-part object.
PutChunkSize int64
// PutChunkOpts are S3 client options passed during chunked .Write___() calls.
PutChunkOpts minio.PutObjectPartOptions
// StatOpts are S3 client options passed during .Stat() calls.
StatOpts minio.StatObjectOptions
// RemoveOpts are S3 client options passed during .Remove() calls.
RemoveOpts minio.RemoveObjectOptions
// ListSize determines how many items to include in each
// list request, made during calls to .WalkKeys().
ListSize int
}
// getS3Config returns a valid S3Config for supplied ptr.
func getS3Config(cfg *S3Config) S3Config {
const minChunkSz = 5 * 1024 * 1024
// If nil, use default
if cfg == nil {
cfg = DefaultS3Config
}
// Ensure a minimum compatible chunk size
if cfg.PutChunkSize <= minChunkSz {
// See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
cfg.PutChunkSize = minChunkSz
}
// Assume 0 list size == use default
if cfg.ListSize <= 0 {
cfg.ListSize = 200
}
// Return owned config copy
return S3Config{
CoreOpts: cfg.CoreOpts,
GetOpts: cfg.GetOpts,
PutOpts: cfg.PutOpts,
PutChunkSize: cfg.PutChunkSize,
ListSize: cfg.ListSize,
StatOpts: cfg.StatOpts,
RemoveOpts: cfg.RemoveOpts,
}
}
// S3Storage is a storage implementation that stores key-value
// pairs in an S3 instance at given endpoint with bucket name.
type S3Storage struct {
client *minio.Core
bucket string
config S3Config
state uint32
}
// OpenS3 opens a new S3Storage instance with given S3 endpoint URL, bucket name and configuration.
func OpenS3(endpoint string, bucket string, cfg *S3Config) (*S3Storage, error) {
// Get checked config
config := getS3Config(cfg)
// Create new S3 client connection
client, err := minio.NewCore(endpoint, &config.CoreOpts)
if err != nil {
return nil, err
}
// Check that provided bucket actually exists
exists, err := client.BucketExists(context.Background(), bucket)
if err != nil {
return nil, err
} else if !exists {
return nil, new_error("bucket does not exist")
}
return &S3Storage{
client: client,
bucket: bucket,
config: config,
}, nil
}
// Client returns access to the underlying S3 client.
func (st *S3Storage) Client() *minio.Core {
return st.client
}
// Clean implements Storage.Clean().
func (st *S3Storage) Clean(ctx context.Context) error {
return nil // nothing to do for S3
}
// ReadBytes implements Storage.ReadBytes().
func (st *S3Storage) ReadBytes(ctx context.Context, key string) ([]byte, error) {
// Fetch object reader from S3 bucket
rc, err := st.ReadStream(ctx, key)
if err != nil {
return nil, err
}
defer rc.Close()
// Read all bytes and return
return io.ReadAll(rc)
}
// ReadStream implements Storage.ReadStream().
func (st *S3Storage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) {
// Check storage open
if st.closed() {
return nil, ErrClosed
}
// Fetch object reader from S3 bucket
rc, _, _, err := st.client.GetObject(
ctx,
st.bucket,
key,
st.config.GetOpts,
)
if err != nil {
return nil, transformS3Error(err)
}
return rc, nil
}
// WriteBytes implements Storage.WriteBytes().
func (st *S3Storage) WriteBytes(ctx context.Context, key string, value []byte) (int, error) {
n, err := st.WriteStream(ctx, key, util.NewByteReaderSize(value))
return int(n), err
}
// WriteStream implements Storage.WriteStream().
func (st *S3Storage) WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) {
// Check storage open
if st.closed() {
return 0, ErrClosed
}
if rs, ok := r.(util.ReaderSize); ok {
// This reader supports providing us the size of
// the encompassed data, allowing us to perform
// a singular .PutObject() call with length.
info, err := st.client.PutObject(
ctx,
st.bucket,
key,
r,
rs.Size(),
"",
"",
st.config.PutOpts,
)
if err != nil {
err = transformS3Error(err)
}
return info.Size, err
}
// Start a new multipart upload to get ID
uploadID, err := st.client.NewMultipartUpload(
ctx,
st.bucket,
key,
st.config.PutOpts,
)
if err != nil {
return 0, transformS3Error(err)
}
var (
index = int(1) // parts index
total = int64(0)
parts []minio.CompletePart
chunk = make([]byte, st.config.PutChunkSize)
rbuf = bytes.NewReader(nil)
)
// Note that we do not perform any kind of
// memory pooling of the chunk buffers here.
// Optimal chunking sizes for S3 writes are in
// the orders of megabytes, so letting the GC
// collect these ASAP is much preferred.
loop:
for done := false; !done; {
// Read next chunk into byte buffer
n, err := io.ReadFull(r, chunk)
switch err {
// Successful read
case nil:
// Reached end, buffer empty
case io.EOF:
break loop
// Reached end, but buffer not empty
case io.ErrUnexpectedEOF:
done = true
// All other errors
default:
return 0, err
}
// Reset byte reader
rbuf.Reset(chunk[:n])
// Put this object chunk in S3 store
pt, err := st.client.PutObjectPart(
ctx,
st.bucket,
key,
uploadID,
index,
rbuf,
int64(n),
st.config.PutChunkOpts,
)
if err != nil {
return 0, err
}
// Append completed part to slice
parts = append(parts, minio.CompletePart{
PartNumber: pt.PartNumber,
ETag: pt.ETag,
ChecksumCRC32: pt.ChecksumCRC32,
ChecksumCRC32C: pt.ChecksumCRC32C,
ChecksumSHA1: pt.ChecksumSHA1,
ChecksumSHA256: pt.ChecksumSHA256,
})
// Iterate idx
index++
// Update total size
total += pt.Size
}
// Complete this multi-part upload operation
_, err = st.client.CompleteMultipartUpload(
ctx,
st.bucket,
key,
uploadID,
parts,
st.config.PutOpts,
)
if err != nil {
return 0, err
}
return total, nil
}
// Stat implements Storage.Stat().
func (st *S3Storage) Stat(ctx context.Context, key string) (bool, error) {
// Check storage open
if st.closed() {
return false, ErrClosed
}
// Query object in S3 bucket
_, err := st.client.StatObject(
ctx,
st.bucket,
key,
st.config.StatOpts,
)
if err != nil {
return false, transformS3Error(err)
}
return true, nil
}
// Remove implements Storage.Remove().
func (st *S3Storage) Remove(ctx context.Context, key string) error {
// Check storage open
if st.closed() {
return ErrClosed
}
// S3 returns no error on remove for non-existent keys
if ok, err := st.Stat(ctx, key); err != nil {
return err
} else if !ok {
return ErrNotFound
}
// Remove object from S3 bucket
err := st.client.RemoveObject(
ctx,
st.bucket,
key,
st.config.RemoveOpts,
)
if err != nil {
return transformS3Error(err)
}
return nil
}
// WalkKeys implements Storage.WalkKeys().
func (st *S3Storage) WalkKeys(ctx context.Context, opts WalkKeysOptions) error {
var (
prev string
token string
)
for {
// List the objects in bucket starting at marker
result, err := st.client.ListObjectsV2(
st.bucket,
"",
prev,
token,
"",
st.config.ListSize,
)
if err != nil {
return err
}
// Pass each object through walk func
for _, obj := range result.Contents {
if err := opts.WalkFn(ctx, Entry{
Key: obj.Key,
Size: obj.Size,
}); err != nil {
return err
}
}
// No token means we reached end of bucket
if result.NextContinuationToken == "" {
return nil
}
// Set continue token and prev mark
token = result.NextContinuationToken
prev = result.StartAfter
}
}
// Close implements Storage.Close().
func (st *S3Storage) Close() error {
atomic.StoreUint32(&st.state, 1)
return nil
}
// closed returns whether S3Storage is closed.
func (st *S3Storage) closed() bool {
return (atomic.LoadUint32(&st.state) == 1)
}

View File

@@ -1,53 +0,0 @@
package storage
import (
"context"
"io"
)
// Storage defines a means of storing and accessing key value pairs
type Storage interface {
// ReadBytes returns the byte value for key in storage
ReadBytes(ctx context.Context, key string) ([]byte, error)
// ReadStream returns an io.ReadCloser for the value bytes at key in the storage
ReadStream(ctx context.Context, key string) (io.ReadCloser, error)
// WriteBytes writes the supplied value bytes at key in the storage
WriteBytes(ctx context.Context, key string, value []byte) (int, error)
// WriteStream writes the bytes from supplied reader at key in the storage
WriteStream(ctx context.Context, key string, r io.Reader) (int64, error)
// Stat checks if the supplied key is in the storage
Stat(ctx context.Context, key string) (bool, error)
// Remove attempts to remove the supplied key-value pair from storage
Remove(ctx context.Context, key string) error
// Close will close the storage, releasing any file locks
Close() error
// Clean removes unused values and unclutters the storage (e.g. removing empty folders)
Clean(ctx context.Context) error
// WalkKeys walks the keys in the storage
WalkKeys(ctx context.Context, opts WalkKeysOptions) error
}
// Entry represents a key in a Storage{} implementation,
// with any associated metadata that may have been set.
type Entry struct {
// Key is this entry's unique storage key.
Key string
// Size is the size of this entry in storage.
// Note that size < 0 indicates unknown.
Size int64
}
// WalkKeysOptions defines how to walk the keys in a storage implementation
type WalkKeysOptions struct {
// WalkFn is the function to apply on each StorageEntry
WalkFn func(context.Context, Entry) error
}

View File

@@ -1,25 +0,0 @@
package storage
// KeyTransform defines a method of converting store keys to storage paths (and vice-versa)
type KeyTransform interface {
// KeyToPath converts a supplied key to storage path
KeyToPath(string) string
// PathToKey converts a supplied storage path to key
PathToKey(string) string
}
type nopKeyTransform struct{}
// NopTransform returns a nop key transform (i.e. key = path)
func NopTransform() KeyTransform {
return &nopKeyTransform{}
}
func (t *nopKeyTransform) KeyToPath(key string) string {
return key
}
func (t *nopKeyTransform) PathToKey(path string) string {
return path
}

View File

@@ -1,41 +0,0 @@
package util
import (
"bytes"
"io"
)
// ReaderSize defines a reader of known size in bytes.
type ReaderSize interface {
io.Reader
Size() int64
}
// ByteReaderSize implements ReaderSize for an in-memory byte-slice.
type ByteReaderSize struct {
br bytes.Reader
sz int64
}
// NewByteReaderSize returns a new ByteReaderSize instance reset to slice b.
func NewByteReaderSize(b []byte) *ByteReaderSize {
rs := new(ByteReaderSize)
rs.Reset(b)
return rs
}
// Read implements io.Reader.
func (rs *ByteReaderSize) Read(b []byte) (int, error) {
return rs.br.Read(b)
}
// Size implements ReaderSize.
func (rs *ByteReaderSize) Size() int64 {
return rs.sz
}
// Reset resets the ReaderSize to be reading from b.
func (rs *ByteReaderSize) Reset(b []byte) {
rs.br.Reset(b)
rs.sz = int64(len(b))
}

View File

@@ -1,26 +0,0 @@
package util
import (
"sync"
"codeberg.org/gruf/go-fastpath/v2"
)
// pathBuilderPool is the global fastpath.Builder pool.
var pathBuilderPool = sync.Pool{
New: func() any {
return &fastpath.Builder{B: make([]byte, 0, 512)}
},
}
// GetPathBuilder fetches a fastpath.Builder object from the pool.
func GetPathBuilder() *fastpath.Builder {
pb, _ := pathBuilderPool.Get().(*fastpath.Builder)
return pb
}
// PutPathBuilder places supplied fastpath.Builder back in the pool.
func PutPathBuilder(pb *fastpath.Builder) {
pb.Reset()
pathBuilderPool.Put(pb)
}