[feature] Read + Write tombstones for deleted Actors (#1005)

* [feature] Read + Write tombstones for deleted Actors

* copyTombstone

* update to use resultcache instead of old ttl cache

Signed-off-by: kim <grufwub@gmail.com>

* update go-cache library to fix result cache capacity / ordering bugs

Signed-off-by: kim <grufwub@gmail.com>

* bump go-cache/v3 to v3.1.6 to fix bugs

Signed-off-by: kim <grufwub@gmail.com>

* switch on status code

* better explain ErrGone reasoning

Signed-off-by: kim <grufwub@gmail.com>
Co-authored-by: kim <grufwub@gmail.com>
This commit is contained in:
tobi
2022-11-11 12:18:38 +01:00
committed by GitHub
parent 948e90b95a
commit edcee14d07
47 changed files with 3808 additions and 7 deletions

341
vendor/codeberg.org/gruf/go-cache/v3/result/cache.go generated vendored Normal file
View File

@@ -0,0 +1,341 @@
package result
import (
"reflect"
"time"
"codeberg.org/gruf/go-cache/v3/ttl"
)
// Cache ...
type Cache[Value any] struct {
cache ttl.Cache[int64, result[Value]] // underlying result cache
lookups structKeys // pre-determined struct lookups
copy func(Value) Value // copies a Value type
next int64 // update key counter
}
// New returns a new initialized Cache, with given lookups and underlying value copy function.
func New[Value any](lookups []string, copy func(Value) Value) *Cache[Value] {
return NewSized(lookups, copy, 64)
}
// NewSized returns a new initialized Cache, with given lookups, underlying value copy function and provided capacity.
func NewSized[Value any](lookups []string, copy func(Value) Value, cap int) *Cache[Value] {
var z Value
// Determine generic type
t := reflect.TypeOf(z)
// Iteratively deref pointer type
for t.Kind() == reflect.Pointer {
t = t.Elem()
}
// Ensure that this is a struct type
if t.Kind() != reflect.Struct {
panic("generic parameter type must be struct (or ptr to)")
}
// Allocate new cache object
c := &Cache[Value]{copy: copy}
c.lookups = make([]keyFields, len(lookups))
for i, lookup := range lookups {
// Generate keyed field info for lookup
c.lookups[i].pkeys = make(map[string]int64, cap)
c.lookups[i].lookup = lookup
c.lookups[i].populate(t)
}
// Create and initialize underlying cache
c.cache.Init(0, cap, 0)
c.SetEvictionCallback(nil)
c.SetInvalidateCallback(nil)
return c
}
// Start will start the cache background eviction routine with given sweep frequency. If already
// running or a freq <= 0 provided, this is a no-op. This will block until eviction routine started.
func (c *Cache[Value]) Start(freq time.Duration) bool {
return c.cache.Start(freq)
}
// Stop will stop cache background eviction routine. If not running this
// is a no-op. This will block until the eviction routine has stopped.
func (c *Cache[Value]) Stop() bool {
return c.cache.Stop()
}
// SetTTL sets the cache item TTL. Update can be specified to force updates of existing items
// in the cache, this will simply add the change in TTL to their current expiry time.
func (c *Cache[Value]) SetTTL(ttl time.Duration, update bool) {
c.cache.SetTTL(ttl, update)
}
// SetEvictionCallback sets the eviction callback to the provided hook.
func (c *Cache[Value]) SetEvictionCallback(hook func(Value)) {
if hook == nil {
// Ensure non-nil hook.
hook = func(Value) {}
}
c.cache.SetEvictionCallback(func(item *ttl.Entry[int64, result[Value]]) {
for _, key := range item.Value.Keys {
// Delete key->pkey lookup
pkeys := key.fields.pkeys
delete(pkeys, key.value)
}
if item.Value.Error != nil {
// Skip error hooks
return
}
// Call user hook.
hook(item.Value.Value)
})
}
// SetInvalidateCallback sets the invalidate callback to the provided hook.
func (c *Cache[Value]) SetInvalidateCallback(hook func(Value)) {
if hook == nil {
// Ensure non-nil hook.
hook = func(Value) {}
}
c.cache.SetInvalidateCallback(func(item *ttl.Entry[int64, result[Value]]) {
for _, key := range item.Value.Keys {
if key.fields != nil {
// Delete key->pkey lookup
pkeys := key.fields.pkeys
delete(pkeys, key.value)
}
}
if item.Value.Error != nil {
// Skip error hooks
return
}
// Call user hook.
hook(item.Value.Value)
})
}
// Load ...
func (c *Cache[Value]) Load(lookup string, load func() (Value, error), keyParts ...any) (Value, error) {
var (
zero Value
res result[Value]
)
// Get lookup map by name.
kfields := c.getFields(lookup)
lmap := kfields.pkeys
// Generate cache key string.
ckey := genkey(keyParts...)
// Acquire cache lock
c.cache.Lock()
// Look for primary key
pkey, ok := lmap[ckey]
if ok {
// Fetch the result for primary key
entry, _ := c.cache.Cache.Get(pkey)
res = entry.Value
}
// Done with lock
c.cache.Unlock()
if !ok {
// Generate new result from fresh load.
res.Value, res.Error = load()
if res.Error != nil {
// This load returned an error, only
// store this item under provided key.
res.Keys = []cacheKey{{
value: ckey,
fields: kfields,
}}
} else {
// This was a successful load, generate keys.
res.Keys = c.lookups.generate(res.Value)
}
// Acquire cache lock.
c.cache.Lock()
defer c.cache.Unlock()
// Attempt to cache this result.
if key, ok := c.storeResult(res); !ok {
return zero, ConflictError{key}
}
}
// Catch and return error
if res.Error != nil {
return zero, res.Error
}
// Return a copy of value from cache
return c.copy(res.Value), nil
}
// Store ...
func (c *Cache[Value]) Store(value Value, store func() error) error {
// Attempt to store this value.
if err := store(); err != nil {
return err
}
// Prepare cached result.
result := result[Value]{
Keys: c.lookups.generate(value),
Value: c.copy(value),
Error: nil,
}
// Acquire cache lock.
c.cache.Lock()
defer c.cache.Unlock()
// Attempt to cache result, only return conflict
// error if the appropriate flag has been set.
if key, ok := c.storeResult(result); !ok {
return ConflictError{key}
}
return nil
}
// Has ...
func (c *Cache[Value]) Has(lookup string, keyParts ...any) bool {
var res result[Value]
// Get lookup map by name.
kfields := c.getFields(lookup)
lmap := kfields.pkeys
// Generate cache key string.
ckey := genkey(keyParts...)
// Acquire cache lock
c.cache.Lock()
// Look for primary key
pkey, ok := lmap[ckey]
if ok {
// Fetch the result for primary key
entry, _ := c.cache.Cache.Get(pkey)
res = entry.Value
}
// Done with lock
c.cache.Unlock()
// Check for non-error result.
return ok && (res.Error == nil)
}
// Invalidate ...
func (c *Cache[Value]) Invalidate(lookup string, keyParts ...any) {
// Get lookup map by name.
kfields := c.getFields(lookup)
lmap := kfields.pkeys
// Generate cache key string.
ckey := genkey(keyParts...)
// Look for primary key
c.cache.Lock()
pkey, ok := lmap[ckey]
c.cache.Unlock()
if !ok {
return
}
// Invalid by primary key
c.cache.Invalidate(pkey)
}
// Clear empties the cache, calling the invalidate callback.
func (c *Cache[Value]) Clear() {
c.cache.Clear()
}
// Len ...
func (c *Cache[Value]) Len() int {
return c.cache.Cache.Len()
}
// Cap ...
func (c *Cache[Value]) Cap() int {
return c.cache.Cache.Cap()
}
func (c *Cache[Value]) getFields(name string) *keyFields {
for _, k := range c.lookups {
// Find key fields with name
if k.lookup == name {
return &k
}
}
panic("invalid lookup: " + name)
}
func (c *Cache[Value]) storeResult(res result[Value]) (string, bool) {
for _, key := range res.Keys {
pkeys := key.fields.pkeys
// Look for cache primary key
pkey, ok := pkeys[key.value]
if ok {
// Look for overlap with non error keys,
// as an overlap for some but not all keys
// could produce inconsistent results.
entry, _ := c.cache.Cache.Get(pkey)
if entry.Value.Error == nil {
return key.value, false
}
}
}
// Get primary key
pkey := c.next
c.next++
// Store all primary key lookups
for _, key := range res.Keys {
pkeys := key.fields.pkeys
pkeys[key.value] = pkey
}
// Store main entry under primary key, using evict hook if needed
c.cache.Cache.SetWithHook(pkey, &ttl.Entry[int64, result[Value]]{
Expiry: time.Now().Add(c.cache.TTL),
Key: pkey,
Value: res,
}, func(_ int64, item *ttl.Entry[int64, result[Value]]) {
c.cache.Evict(item)
})
return "", true
}
type result[Value any] struct {
// keys accessible under
Keys []cacheKey
// cached value
Value Value
// cached error
Error error
}

22
vendor/codeberg.org/gruf/go-cache/v3/result/error.go generated vendored Normal file
View File

@@ -0,0 +1,22 @@
package result
import "errors"
// ErrUnkownLookup ...
var ErrUnknownLookup = errors.New("unknown lookup identifier")
// IsConflictErr returns whether error is due to key conflict.
func IsConflictErr(err error) bool {
_, ok := err.(ConflictError)
return ok
}
// ConflictError is returned on cache key conflict.
type ConflictError struct {
Key string
}
// Error returns the message for this key conflict error.
func (c ConflictError) Error() string {
return "cache conflict for key \"" + c.Key + "\""
}

184
vendor/codeberg.org/gruf/go-cache/v3/result/key.go generated vendored Normal file
View File

@@ -0,0 +1,184 @@
package result
import (
"reflect"
"strings"
"sync"
"unicode"
"unicode/utf8"
"codeberg.org/gruf/go-byteutil"
"codeberg.org/gruf/go-mangler"
)
// structKeys provides convience methods for a list
// of struct field combinations used for cache keys.
type structKeys []keyFields
// get fetches the key-fields for given lookup (else, panics).
func (sk structKeys) get(lookup string) *keyFields {
for i := range sk {
if sk[i].lookup == lookup {
return &sk[i]
}
}
panic("unknown lookup: \"" + lookup + "\"")
}
// generate will calculate the value string for each required
// cache key as laid-out by the receiving structKeys{}.
func (sk structKeys) generate(a any) []cacheKey {
// Get reflected value in order
// to access the struct fields
v := reflect.ValueOf(a)
// Iteratively deref pointer value
for v.Kind() == reflect.Pointer {
if v.IsNil() {
panic("nil ptr")
}
v = v.Elem()
}
// Preallocate expected slice of keys
keys := make([]cacheKey, len(sk))
// Acquire byte buffer
buf := bufpool.Get().(*byteutil.Buffer)
defer bufpool.Put(buf)
for i := range sk {
// Reset buffer
buf.B = buf.B[:0]
// Set the key-fields reference
keys[i].fields = &sk[i]
// Calculate cache-key value
keys[i].populate(buf, v)
}
return keys
}
// cacheKey represents an actual cache key.
type cacheKey struct {
// value is the actual string representing
// this cache key for hashmap lookups.
value string
// fieldsRO is a read-only slice (i.e. we should
// NOT be modifying them, only using for reference)
// of struct fields encapsulated by this cache key.
fields *keyFields
}
// populate will calculate the cache key's value string for given
// value's reflected information. Passed encoder is for string building.
func (k *cacheKey) populate(buf *byteutil.Buffer, v reflect.Value) {
// Append each field value to buffer.
for _, idx := range k.fields.fields {
fv := v.Field(idx)
fi := fv.Interface()
buf.B = mangler.Append(buf.B, fi)
buf.B = append(buf.B, '.')
}
// Drop last '.'
buf.Truncate(1)
// Create string copy from buf
k.value = string(buf.B)
}
// keyFields represents a list of struct fields
// encompassed in a single cache key, the string name
// of the lookup, and the lookup map to primary keys.
type keyFields struct {
// lookup is the calculated (well, provided)
// cache key lookup, consisting of dot sep'd
// struct field names.
lookup string
// fields is a slice of runtime struct field
// indices, of the fields encompassed by this key.
fields []int
// pkeys is a lookup of stored struct key values
// to the primary cache lookup key (int64).
pkeys map[string]int64
}
// populate will populate this keyFields{} object's .fields member by determining
// the field names from the given lookup, and querying given reflected type to get
// the runtime field indices for each of the fields. this speeds-up future value lookups.
func (kf *keyFields) populate(t reflect.Type) {
// Split dot-separated lookup to get
// the individual struct field names
names := strings.Split(kf.lookup, ".")
if len(names) == 0 {
panic("no key fields specified")
}
// Pre-allocate slice of expected length
kf.fields = make([]int, len(names))
for i, name := range names {
// Get field info for given name
ft, ok := t.FieldByName(name)
if !ok {
panic("no field found for name: \"" + name + "\"")
}
// Check field is usable
if !isExported(name) {
panic("field must be exported")
}
// Set the runtime field index
kf.fields[i] = ft.Index[0]
}
}
// genkey generates a cache key for given key values.
func genkey(parts ...any) string {
if len(parts) < 1 {
// Panic to prevent annoying usecase
// where user forgets to pass lookup
// and instead only passes a key part,
// e.g. cache.Get("key")
// which then always returns false.
panic("no key parts provided")
}
// Acquire buffer and reset
buf := bufpool.Get().(*byteutil.Buffer)
defer bufpool.Put(buf)
buf.Reset()
// Encode each key part
for _, part := range parts {
buf.B = mangler.Append(buf.B, part)
buf.B = append(buf.B, '.')
}
// Drop last '.'
buf.Truncate(1)
// Return string copy
return string(buf.B)
}
// isExported checks whether function name is exported.
func isExported(fnName string) bool {
r, _ := utf8.DecodeRuneInString(fnName)
return unicode.IsUpper(r)
}
// bufpool provides a memory pool of byte
// buffers use when encoding key types.
var bufpool = sync.Pool{
New: func() any {
return &byteutil.Buffer{B: make([]byte, 0, 512)}
},
}