[performance] update go-structr and go-mutexes with memory usage improvements (#2909)

* update go-structr and go-mutexes with memory usage improvements

* bump to go-structr v0.8.4
This commit is contained in:
kim
2024-05-13 08:05:46 +00:00
committed by GitHub
parent 578a4e0cf5
commit c06e6fb656
26 changed files with 1409 additions and 163 deletions

View File

@ -3,17 +3,16 @@ package mutexes
import (
"sync"
"sync/atomic"
"unsafe"
"codeberg.org/gruf/go-mempool"
"github.com/dolthub/swiss"
)
const (
// possible lock types.
lockTypeRead = uint8(1) << 0
lockTypeWrite = uint8(1) << 1
// frequency of GC cycles
// per no. unlocks. i.e.
// every 'gcfreq' unlocks.
gcfreq = 1024
)
// MutexMap is a structure that allows read / write locking
@ -28,15 +27,15 @@ const (
// like structures for sleeping / awaking awaiting goroutines.
type MutexMap struct {
mapmu sync.Mutex
mumap map[string]*rwmutex
mupool rwmutexPool
count uint32
mumap *swiss.Map[string, *rwmutex]
mupool mempool.UnsafePool
}
// checkInit ensures MutexMap is initialized (UNSAFE).
func (mm *MutexMap) checkInit() {
if mm.mumap == nil {
mm.mumap = make(map[string]*rwmutex)
mm.mumap = swiss.NewMap[string, *rwmutex](0)
mm.mupool.DirtyFactor = 256
}
}
@ -58,13 +57,13 @@ func (mm *MutexMap) lock(key string, lt uint8) func() {
mm.checkInit()
for {
// Check map for mu.
mu := mm.mumap[key]
// Check map for mutex.
mu, _ := mm.mumap.Get(key)
if mu == nil {
// Allocate new mutex.
mu = mm.mupool.Acquire()
mm.mumap[key] = mu
// Allocate mutex.
mu = mm.acquire()
mm.mumap.Put(key, mu)
}
if !mu.Lock(lt) {
@ -87,63 +86,58 @@ func (mm *MutexMap) unlock(key string, mu *rwmutex) {
mm.mapmu.Lock()
// Unlock mutex.
if mu.Unlock() {
if !mu.Unlock() {
// Mutex fully unlocked
// with zero waiters. Self
// evict and release it.
delete(mm.mumap, key)
mm.mupool.Release(mu)
// Fast path. Mutex still
// used so no map change.
mm.mapmu.Unlock()
return
}
if mm.count++; mm.count%gcfreq == 0 {
// Every 'gcfreq' unlocks perform
// a garbage collection to keep
// us squeaky clean :]
mm.mupool.GC()
// Mutex fully unlocked
// with zero waiters. Self
// evict and release it.
mm.mumap.Delete(key)
mm.release(mu)
// Maximum load factor before
// 'swiss' allocates new hmap:
// maxLoad = 7 / 8
//
// So we apply the inverse/2, once
// $maxLoad/2 % of hmap is empty we
// compact the map to drop buckets.
len := mm.mumap.Count()
cap := mm.mumap.Capacity()
if cap-len > (cap*7)/(8*2) {
// Create a new map only as big as required.
mumap := swiss.NewMap[string, *rwmutex](uint32(len))
mm.mumap.Iter(func(k string, v *rwmutex) (stop bool) {
mumap.Put(k, v)
return false
})
// Set new map.
mm.mumap = mumap
}
// Done with map.
mm.mapmu.Unlock()
}
// rwmutexPool is a very simply memory rwmutexPool.
type rwmutexPool struct {
current []*rwmutex
victim []*rwmutex
}
// Acquire will returns a rwmutexState from rwmutexPool (or alloc new).
func (p *rwmutexPool) Acquire() *rwmutex {
// First try the current queue
if l := len(p.current) - 1; l >= 0 {
mu := p.current[l]
p.current = p.current[:l]
return mu
// acquire will acquire mutex from memory pool, or alloc new.
func (mm *MutexMap) acquire() *rwmutex {
if ptr := mm.mupool.Get(); ptr != nil {
return (*rwmutex)(ptr)
}
// Next try the victim queue.
if l := len(p.victim) - 1; l >= 0 {
mu := p.victim[l]
p.victim = p.victim[:l]
return mu
}
// Lastly, alloc new.
mu := new(rwmutex)
return mu
return new(rwmutex)
}
// Release places a sync.rwmutexState back in the rwmutexPool.
func (p *rwmutexPool) Release(mu *rwmutex) {
p.current = append(p.current, mu)
}
// GC will clear out unused entries from the rwmutexPool.
func (p *rwmutexPool) GC() {
current := p.current
p.current = nil
p.victim = current
// release will release given mutex to memory pool.
func (mm *MutexMap) release(mu *rwmutex) {
ptr := unsafe.Pointer(mu)
mm.mupool.Put(ptr)
}
// rwmutex represents a RW mutex when used correctly within