updates go-mutexes to no longer rely on unsafe linkname (#3027)

This commit is contained in:
kim
2024-06-21 16:35:32 +00:00
committed by GitHub
parent b93087ceb4
commit 9143ac6fb4
19 changed files with 134 additions and 1242 deletions

View File

@ -6,7 +6,6 @@ import (
"unsafe"
"codeberg.org/gruf/go-mempool"
"github.com/dolthub/swiss"
)
const (
@ -27,14 +26,14 @@ const (
// like structures for sleeping / awaking awaiting goroutines.
type MutexMap struct {
mapmu sync.Mutex
mumap *swiss.Map[string, *rwmutex]
mumap hashmap
mupool mempool.UnsafePool
}
// checkInit ensures MutexMap is initialized (UNSAFE).
func (mm *MutexMap) checkInit() {
if mm.mumap == nil {
mm.mumap = swiss.NewMap[string, *rwmutex](0)
if mm.mumap.m == nil {
mm.mumap.init(0)
mm.mupool.DirtyFactor = 256
}
}
@ -58,7 +57,7 @@ func (mm *MutexMap) lock(key string, lt uint8) func() {
for {
// Check map for mutex.
mu, _ := mm.mumap.Get(key)
mu := mm.mumap.Get(key)
if mu == nil {
// Allocate mutex.
@ -69,7 +68,7 @@ func (mm *MutexMap) lock(key string, lt uint8) func() {
if !mu.Lock(lt) {
// Wait on mutex unlock, after
// immediately relocking map mu.
mu.WaitRelock(&mm.mapmu)
mu.WaitRelock()
continue
}
@ -100,27 +99,9 @@ func (mm *MutexMap) unlock(key string, mu *rwmutex) {
mm.mumap.Delete(key)
mm.release(mu)
// Maximum load factor before
// 'swiss' allocates new hmap:
// maxLoad = 7 / 8
//
// So we apply the inverse/2, once
// $maxLoad/2 % of hmap is empty we
// compact the map to drop buckets.
len := mm.mumap.Count()
cap := mm.mumap.Capacity()
if cap-len > (cap*7)/(8*2) {
// Create a new map only as big as required.
mumap := swiss.NewMap[string, *rwmutex](uint32(len))
mm.mumap.Iter(func(k string, v *rwmutex) (stop bool) {
mumap.Put(k, v)
return false
})
// Set new map.
mm.mumap = mumap
}
// Check if compaction
// needed.
mm.mumap.Compact()
// Done with map.
mm.mapmu.Unlock()
@ -131,7 +112,9 @@ func (mm *MutexMap) acquire() *rwmutex {
if ptr := mm.mupool.Get(); ptr != nil {
return (*rwmutex)(ptr)
}
return new(rwmutex)
mu := new(rwmutex)
mu.c.L = &mm.mapmu
return mu
}
// release will release given mutex to memory pool.
@ -152,9 +135,9 @@ func (mm *MutexMap) release(mu *rwmutex) {
// mechanism we use, otherwise all Cond{}.L would reference
// the same outer map mutex.
type rwmutex struct {
n notifyList // 'trigger' mechanism
l int32 // no. locks
t uint8 // lock type
c sync.Cond // 'trigger' mechanism
l int32 // no. locks
t uint8 // lock type
}
// Lock will lock the mutex for given lock type, in the
@ -202,11 +185,34 @@ func (mu *rwmutex) Unlock() bool {
// Fully unlocked.
mu.t = 0
// NOTE: must remain in
// sync with runtime.notifyList{}.
//
// goexperiment.staticlockranking
// does change it slightly, but
// this does not alter the first
// 2 fields which are all we need.
type notifyList struct {
_ uint32
notify uint32
// ... other fields
}
// NOTE: must remain in
// sync with sync.Cond{}.
type syncCond struct {
_ struct{}
L sync.Locker
n notifyList
// ... other fields
}
// Awake all blocked goroutines and check
// for change in the last notified ticket.
before := atomic.LoadUint32(&mu.n.notify)
runtime_notifyListNotifyAll(&mu.n)
after := atomic.LoadUint32(&mu.n.notify)
cptr := (*syncCond)(unsafe.Pointer(&mu.c))
before := atomic.LoadUint32(&cptr.n.notify)
mu.c.Broadcast() // awakes all blocked!
after := atomic.LoadUint32(&cptr.n.notify)
// If ticket changed, this indicates
// AT LEAST one goroutine was awoken.
@ -226,20 +232,4 @@ func (mu *rwmutex) Unlock() bool {
// locked state. It incr the notifyList waiter count before
// unlocking the outer mutex and blocking on notifyList wait.
// On awake it will decr wait count and relock outer mutex.
func (mu *rwmutex) WaitRelock(outer *sync.Mutex) {
// add ourselves to list while still
// under protection of outer map lock.
t := runtime_notifyListAdd(&mu.n)
// Finished with
// outer map lock.
outer.Unlock()
// Block until awoken by another
// goroutine within mu.Unlock().
runtime_notifyListWait(&mu.n, t)
// Relock!
outer.Lock()
}
func (mu *rwmutex) WaitRelock() { mu.c.Wait() }