mirror of
https://github.com/superseriousbusiness/gotosocial
synced 2024-12-27 08:43:29 +01:00
c06e6fb656
* update go-structr and go-mutexes with memory usage improvements * bump to go-structr v0.8.4
246 lines
5.8 KiB
Go
246 lines
5.8 KiB
Go
package mutexes
|
|
|
|
import (
|
|
"sync"
|
|
"sync/atomic"
|
|
"unsafe"
|
|
|
|
"codeberg.org/gruf/go-mempool"
|
|
"github.com/dolthub/swiss"
|
|
)
|
|
|
|
const (
|
|
// possible lock types.
|
|
lockTypeRead = uint8(1) << 0
|
|
lockTypeWrite = uint8(1) << 1
|
|
)
|
|
|
|
// MutexMap is a structure that allows read / write locking
|
|
// per key, performing as you'd expect a map[string]*RWMutex
|
|
// to perform, without you needing to worry about deadlocks
|
|
// between competing read / write locks and the map's own mutex.
|
|
// It uses memory pooling for the internal "mutex" (ish) types
|
|
// and performs self-eviction of keys.
|
|
//
|
|
// Under the hood this is achieved using a single mutex for the
|
|
// map, state tracking for individual keys, and some sync.Cond{}
|
|
// like structures for sleeping / awaking awaiting goroutines.
|
|
type MutexMap struct {
|
|
mapmu sync.Mutex
|
|
mumap *swiss.Map[string, *rwmutex]
|
|
mupool mempool.UnsafePool
|
|
}
|
|
|
|
// checkInit ensures MutexMap is initialized (UNSAFE).
|
|
func (mm *MutexMap) checkInit() {
|
|
if mm.mumap == nil {
|
|
mm.mumap = swiss.NewMap[string, *rwmutex](0)
|
|
mm.mupool.DirtyFactor = 256
|
|
}
|
|
}
|
|
|
|
// Lock acquires a write lock on key in map, returning unlock function.
|
|
func (mm *MutexMap) Lock(key string) func() {
|
|
return mm.lock(key, lockTypeWrite)
|
|
}
|
|
|
|
// RLock acquires a read lock on key in map, returning runlock function.
|
|
func (mm *MutexMap) RLock(key string) func() {
|
|
return mm.lock(key, lockTypeRead)
|
|
}
|
|
|
|
func (mm *MutexMap) lock(key string, lt uint8) func() {
|
|
// Perform first map lock
|
|
// and check initialization
|
|
// OUTSIDE the main loop.
|
|
mm.mapmu.Lock()
|
|
mm.checkInit()
|
|
|
|
for {
|
|
// Check map for mutex.
|
|
mu, _ := mm.mumap.Get(key)
|
|
|
|
if mu == nil {
|
|
// Allocate mutex.
|
|
mu = mm.acquire()
|
|
mm.mumap.Put(key, mu)
|
|
}
|
|
|
|
if !mu.Lock(lt) {
|
|
// Wait on mutex unlock, after
|
|
// immediately relocking map mu.
|
|
mu.WaitRelock(&mm.mapmu)
|
|
continue
|
|
}
|
|
|
|
// Done with map.
|
|
mm.mapmu.Unlock()
|
|
|
|
// Return mutex unlock function.
|
|
return func() { mm.unlock(key, mu) }
|
|
}
|
|
}
|
|
|
|
func (mm *MutexMap) unlock(key string, mu *rwmutex) {
|
|
// Get map lock.
|
|
mm.mapmu.Lock()
|
|
|
|
// Unlock mutex.
|
|
if !mu.Unlock() {
|
|
|
|
// Fast path. Mutex still
|
|
// used so no map change.
|
|
mm.mapmu.Unlock()
|
|
return
|
|
}
|
|
|
|
// Mutex fully unlocked
|
|
// with zero waiters. Self
|
|
// evict and release it.
|
|
mm.mumap.Delete(key)
|
|
mm.release(mu)
|
|
|
|
// Maximum load factor before
|
|
// 'swiss' allocates new hmap:
|
|
// maxLoad = 7 / 8
|
|
//
|
|
// So we apply the inverse/2, once
|
|
// $maxLoad/2 % of hmap is empty we
|
|
// compact the map to drop buckets.
|
|
len := mm.mumap.Count()
|
|
cap := mm.mumap.Capacity()
|
|
if cap-len > (cap*7)/(8*2) {
|
|
|
|
// Create a new map only as big as required.
|
|
mumap := swiss.NewMap[string, *rwmutex](uint32(len))
|
|
mm.mumap.Iter(func(k string, v *rwmutex) (stop bool) {
|
|
mumap.Put(k, v)
|
|
return false
|
|
})
|
|
|
|
// Set new map.
|
|
mm.mumap = mumap
|
|
}
|
|
|
|
// Done with map.
|
|
mm.mapmu.Unlock()
|
|
}
|
|
|
|
// acquire will acquire mutex from memory pool, or alloc new.
|
|
func (mm *MutexMap) acquire() *rwmutex {
|
|
if ptr := mm.mupool.Get(); ptr != nil {
|
|
return (*rwmutex)(ptr)
|
|
}
|
|
return new(rwmutex)
|
|
}
|
|
|
|
// release will release given mutex to memory pool.
|
|
func (mm *MutexMap) release(mu *rwmutex) {
|
|
ptr := unsafe.Pointer(mu)
|
|
mm.mupool.Put(ptr)
|
|
}
|
|
|
|
// rwmutex represents a RW mutex when used correctly within
|
|
// a MapMutex. It should ONLY be access when protected by
|
|
// the outer map lock, except for the 'notifyList' which is
|
|
// a runtime internal structure borrowed from the sync.Cond{}.
|
|
//
|
|
// this functions very similarly to a sync.Cond{}, but with
|
|
// lock state tracking, and returning on 'Broadcast()' whether
|
|
// any goroutines were actually awoken. it also has a less
|
|
// confusing API than sync.Cond{} with the outer locking
|
|
// mechanism we use, otherwise all Cond{}.L would reference
|
|
// the same outer map mutex.
|
|
type rwmutex struct {
|
|
n notifyList // 'trigger' mechanism
|
|
l int32 // no. locks
|
|
t uint8 // lock type
|
|
}
|
|
|
|
// Lock will lock the mutex for given lock type, in the
|
|
// sense that it will update the internal state tracker
|
|
// accordingly. Return value is true on successful lock.
|
|
func (mu *rwmutex) Lock(lt uint8) bool {
|
|
switch mu.t {
|
|
case lockTypeRead:
|
|
// already read locked,
|
|
// only permit more reads.
|
|
if lt != lockTypeRead {
|
|
return false
|
|
}
|
|
|
|
case lockTypeWrite:
|
|
// already write locked,
|
|
// no other locks allowed.
|
|
return false
|
|
|
|
default:
|
|
// Fully unlocked,
|
|
// set incoming type.
|
|
mu.t = lt
|
|
}
|
|
|
|
// Update
|
|
// count.
|
|
mu.l++
|
|
|
|
return true
|
|
}
|
|
|
|
// Unlock will unlock the mutex, in the sense that it
|
|
// will update the internal state tracker accordingly.
|
|
// On totally unlocked state, it will awaken all
|
|
// sleeping goroutines waiting on this mutex.
|
|
func (mu *rwmutex) Unlock() bool {
|
|
switch mu.l--; {
|
|
case mu.l > 0 && mu.t == lockTypeWrite:
|
|
panic("BUG: multiple writer locks")
|
|
case mu.l < 0:
|
|
panic("BUG: negative lock count")
|
|
|
|
case mu.l == 0:
|
|
// Fully unlocked.
|
|
mu.t = 0
|
|
|
|
// Awake all blocked goroutines and check
|
|
// for change in the last notified ticket.
|
|
before := atomic.LoadUint32(&mu.n.notify)
|
|
runtime_notifyListNotifyAll(&mu.n)
|
|
after := atomic.LoadUint32(&mu.n.notify)
|
|
|
|
// If ticket changed, this indicates
|
|
// AT LEAST one goroutine was awoken.
|
|
//
|
|
// (before != after) => (waiters > 0)
|
|
// (before == after) => (waiters = 0)
|
|
return (before == after)
|
|
|
|
default:
|
|
// i.e. mutex still
|
|
// locked by others.
|
|
return false
|
|
}
|
|
}
|
|
|
|
// WaitRelock expects a mutex to be passed in, already in the
|
|
// locked state. It incr the notifyList waiter count before
|
|
// unlocking the outer mutex and blocking on notifyList wait.
|
|
// On awake it will decr wait count and relock outer mutex.
|
|
func (mu *rwmutex) WaitRelock(outer *sync.Mutex) {
|
|
|
|
// add ourselves to list while still
|
|
// under protection of outer map lock.
|
|
t := runtime_notifyListAdd(&mu.n)
|
|
|
|
// Finished with
|
|
// outer map lock.
|
|
outer.Unlock()
|
|
|
|
// Block until awoken by another
|
|
// goroutine within mu.Unlock().
|
|
runtime_notifyListWait(&mu.n, t)
|
|
|
|
// Relock!
|
|
outer.Lock()
|
|
}
|