update go-structr to add (+then rely on) QueueCtx{} type

This commit is contained in:
kim 2024-04-25 17:39:40 +01:00
parent 382790cf7f
commit 4cba8174e7
10 changed files with 272 additions and 106 deletions

2
go.mod
View File

@ -22,7 +22,7 @@ require (
codeberg.org/gruf/go-runners v1.6.2
codeberg.org/gruf/go-sched v1.2.3
codeberg.org/gruf/go-store/v2 v2.2.4
codeberg.org/gruf/go-structr v0.6.2
codeberg.org/gruf/go-structr v0.7.0
codeberg.org/superseriousbusiness/exif-terminator v0.7.0
github.com/DmitriyVTitov/size v1.5.0
github.com/KimMachineGun/automemlimit v0.6.0

4
go.sum
View File

@ -74,8 +74,8 @@ codeberg.org/gruf/go-sched v1.2.3 h1:H5ViDxxzOBR3uIyGBCf0eH8b1L8wMybOXcdtUUTXZHk
codeberg.org/gruf/go-sched v1.2.3/go.mod h1:vT9uB6KWFIIwnG9vcPY2a0alYNoqdL1mSzRM8I+PK7A=
codeberg.org/gruf/go-store/v2 v2.2.4 h1:8HO1Jh2gg7boQKA3hsDAIXd9zwieu5uXwDXEcTOD9js=
codeberg.org/gruf/go-store/v2 v2.2.4/go.mod h1:zI4VWe5CpXAktYMtaBMrgA5QmO0sQH53LBRvfn1huys=
codeberg.org/gruf/go-structr v0.6.2 h1:1zs7UkPBsRGRDMHhrfFL7GrwAyPHxFXCchu8ADv/zuM=
codeberg.org/gruf/go-structr v0.6.2/go.mod h1:K1FXkUyO6N/JKt8aWqyQ8rtW7Z9ZmXKWP8mFAQ2OJjE=
codeberg.org/gruf/go-structr v0.7.0 h1:gy0/wD7718HwJDoBMeMumk4+7veLrkumgCEOnCyzS8w=
codeberg.org/gruf/go-structr v0.7.0/go.mod h1:K1FXkUyO6N/JKt8aWqyQ8rtW7Z9ZmXKWP8mFAQ2OJjE=
codeberg.org/superseriousbusiness/exif-terminator v0.7.0 h1:Y6VApSXhKqExG0H2hZ2JelRK4xmWdjDQjn13CpEfzko=
codeberg.org/superseriousbusiness/exif-terminator v0.7.0/go.mod h1:gCWKduudUWFzsnixoMzu0FYVdxHWG+AbXnZ50DqxsUE=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=

View File

@ -18,6 +18,7 @@
package queue
import (
"context"
"sync"
"codeberg.org/gruf/go-list"
@ -39,7 +40,10 @@ func (q *SimpleQueue[T]) Push(value T) {
elem := q.alloc()
elem.Value = value
q.l.PushElemFront(elem)
q.broadcast()
if q.w != nil {
close(q.w)
q.w = nil
}
q.m.Unlock()
}
@ -56,6 +60,59 @@ func (q *SimpleQueue[T]) Pop() (value T, ok bool) {
return
}
// PopCtx will attempt to pop value from queue, else blocking on context.
func (q *SimpleQueue[T]) PopCtx(ctx context.Context) (value T, ok bool) {
// Acquire lock.
q.m.Lock()
var elem *list.Elem[T]
for {
// Get next elem.
elem = q.l.Tail
if ok = (elem != nil); ok {
break
}
if q.w == nil {
// Create new wait channel.
q.w = make(chan struct{})
}
// Get current
// ch pointer.
ch := q.w
// Done with lock.
q.m.Unlock()
select {
// Context canceled.
case <-ctx.Done():
return
// Pushed!
case <-ch:
}
// Relock queue.
q.m.Lock()
}
// Extract value.
value = elem.Value
// Remove element.
q.l.Remove(elem)
q.free(elem)
// Done with lock.
q.m.Unlock()
return
}
// Len returns the current length of the queue.
func (q *SimpleQueue[T]) Len() int {
q.m.Lock()
@ -64,18 +121,6 @@ func (q *SimpleQueue[T]) Len() int {
return l
}
// Wait returns current wait channel, which may be
// blocked on to awaken when new value pushed to queue.
func (q *SimpleQueue[T]) Wait() (ch <-chan struct{}) {
q.m.Lock()
if q.w == nil {
q.w = make(chan struct{})
}
ch = q.w
q.m.Unlock()
return
}
// alloc will allocate new list element (relying on memory pool).
func (q *SimpleQueue[T]) alloc() *list.Elem[T] {
if len(q.p) > 0 {
@ -94,12 +139,3 @@ func (q *SimpleQueue[T]) free(elem *list.Elem[T]) {
elem.Value = zero
q.p = append(q.p, elem)
}
// broadcast safely closes wait channel if
// currently set, releasing waiting goroutines.
func (q *SimpleQueue[T]) broadcast() {
if q.w != nil {
close(q.w)
q.w = nil
}
}

View File

@ -18,7 +18,7 @@
package queue
import (
"sync/atomic"
"context"
"codeberg.org/gruf/go-structr"
)
@ -26,15 +26,14 @@ import (
// StructQueue wraps a structr.Queue{} to
// provide simple index caching by name.
type StructQueue[StructType any] struct {
queue structr.Queue[StructType]
queue structr.QueueCtx[StructType]
index map[string]*structr.Index
wait atomic.Pointer[chan struct{}]
}
// Init initializes queue with structr.QueueConfig{}.
func (q *StructQueue[T]) Init(config structr.QueueConfig[T]) {
q.index = make(map[string]*structr.Index, len(config.Indices))
q.queue = structr.Queue[T]{}
// q.queue = structr.QueueCtx[T]{}
q.queue.Init(config)
for _, cfg := range config.Indices {
q.index[cfg.Fields] = q.queue.Index(cfg.Fields)
@ -43,13 +42,22 @@ func (q *StructQueue[T]) Init(config structr.QueueConfig[T]) {
// Pop: see structr.Queue{}.PopFront().
func (q *StructQueue[T]) Pop() (value T, ok bool) {
return q.queue.PopFront()
values := q.queue.PopFrontN(1)
if ok = (len(values) > 0); !ok {
return
}
value = values[0]
return
}
// Push wraps structr.Queue{}.PushBack() to awaken those blocking on <-.Wait().
// PopCtx: see structr.QueueCtx{}.PopFront().
func (q *StructQueue[T]) PopCtx(ctx context.Context) (value T, ok bool) {
return q.queue.PopFront(ctx)
}
// Push: see structr.Queue.PushBack().
func (q *StructQueue[T]) Push(values ...T) {
q.queue.PushBack(values...)
q.broadcast()
}
// Delete pops (and drops!) all queued entries under index with key.
@ -66,31 +74,5 @@ func (q *StructQueue[T]) Len() int {
// Wait returns current wait channel, which may be
// blocked on to awaken when new value pushed to queue.
func (q *StructQueue[T]) Wait() <-chan struct{} {
var ch chan struct{}
for {
// Get channel ptr.
ptr := q.wait.Load()
if ptr != nil {
return *ptr
}
if ch == nil {
// Allocate new channel.
ch = make(chan struct{})
}
// Try set the new wait channel ptr.
if q.wait.CompareAndSwap(ptr, &ch) {
return ch
}
}
}
// broadcast safely closes wait channel if
// currently set, releasing waiting goroutines.
func (q *StructQueue[T]) broadcast() {
if ptr := q.wait.Swap(nil); ptr != nil {
close(*ptr)
}
return q.queue.Wait()
}

View File

@ -107,21 +107,11 @@ func (w *FnWorker) process(ctx context.Context) {
panic("not yet initialized")
}
loop:
for {
select {
// Worker ctx done.
case <-ctx.Done():
return
// New message enqueued!
case <-w.Queue.Wait():
}
// Try pop next function.
fn, ok := w.Queue.Pop()
// Block until pop next func.
fn, ok := w.Queue.PopCtx(ctx)
if !ok {
continue loop
return
}
// run!

View File

@ -120,21 +120,11 @@ func (w *MsgWorker[T]) process(ctx context.Context) {
panic("not yet initialized")
}
loop:
for {
select {
// Worker ctx done.
case <-ctx.Done():
return
// New message enqueued!
case <-w.Queue.Wait():
}
// Try pop next message.
msg, ok := w.Queue.Pop()
// Block until pop next message.
msg, ok := w.Queue.PopCtx(ctx)
if !ok {
continue loop
return
}
// Attempt to process popped message type.

View File

@ -150,10 +150,10 @@ func (c *Cache[T]) Get(index *Index, keys ...Key) []T {
// Acquire lock.
c.mutex.Lock()
defer c.mutex.Unlock()
// Check cache init.
if c.copy == nil {
c.mutex.Unlock()
panic("not initialized")
}
@ -173,9 +173,6 @@ func (c *Cache[T]) Get(index *Index, keys ...Key) []T {
})
}
// Done with lock.
c.mutex.Unlock()
return values
}
@ -185,12 +182,12 @@ func (c *Cache[T]) Put(values ...T) {
// Acquire lock.
c.mutex.Lock()
// Get func ptrs.
invalid := c.invalid
// Wrap unlock to only do once.
unlock := once(c.mutex.Unlock)
defer unlock()
// Check cache init.
if c.copy == nil {
c.mutex.Unlock()
panic("not initialized")
}
@ -203,8 +200,12 @@ func (c *Cache[T]) Put(values ...T) {
)
}
// Done with lock.
c.mutex.Unlock()
// Get func ptrs.
invalid := c.invalid
// Done with
// the lock.
unlock()
if invalid != nil {
// Pass all invalidated values
@ -241,13 +242,13 @@ func (c *Cache[T]) LoadOne(index *Index, key Key, load func() (T, error)) (T, er
// Acquire lock.
c.mutex.Lock()
// Get func ptrs.
ignore := c.ignore
// Wrap unlock to only do once.
unlock := once(c.mutex.Unlock)
defer unlock()
// Check init'd.
if c.copy == nil ||
ignore == nil {
c.mutex.Unlock()
c.ignore == nil {
panic("not initialized")
}
@ -273,8 +274,12 @@ func (c *Cache[T]) LoadOne(index *Index, key Key, load func() (T, error)) (T, er
}
}
// Done with lock.
c.mutex.Unlock()
// Get func ptrs.
ignore := c.ignore
// Done with
// the lock.
unlock()
if ok {
// item found!
@ -325,9 +330,12 @@ func (c *Cache[T]) Load(index *Index, keys []Key, load func([]Key) ([]T, error))
// Acquire lock.
c.mutex.Lock()
// Wrap unlock to only do once.
unlock := once(c.mutex.Unlock)
defer unlock()
// Check init'd.
if c.copy == nil {
c.mutex.Unlock()
panic("not initialized")
}
@ -365,8 +373,9 @@ func (c *Cache[T]) Load(index *Index, keys []Key, load func([]Key) ([]T, error))
i++
}
// Done with lock.
c.mutex.Unlock()
// Done with
// the lock.
unlock()
// Load uncached values.
uncached, err := load(keys)
@ -374,8 +383,20 @@ func (c *Cache[T]) Load(index *Index, keys []Key, load func([]Key) ([]T, error))
return nil, err
}
// Insert uncached.
c.Put(uncached...)
// Acquire lock.
c.mutex.Lock()
// Store all uncached values.
for i := range uncached {
c.store_value(
nil,
Key{},
uncached[i],
)
}
// Done with lock.
c.mutex.Unlock()
// Append uncached to return values.
values = append(values, uncached...)

134
vendor/codeberg.org/gruf/go-structr/queue_ctx.go generated vendored Normal file
View File

@ -0,0 +1,134 @@
package structr
import (
"context"
)
// QueueCtx is a context-aware form of Queue{}.
type QueueCtx[StructType any] struct {
Queue[StructType]
ch chan struct{}
}
// PopFront pops the current value at front of the queue, else blocking on ctx.
func (q *QueueCtx[T]) PopFront(ctx context.Context) (T, bool) {
return q.pop(ctx, func() *list_elem {
return q.queue.head
})
}
// PopBack pops the current value at back of the queue, else blocking on ctx.
func (q *QueueCtx[T]) PopBack(ctx context.Context) (T, bool) {
return q.pop(ctx, func() *list_elem {
return q.queue.tail
})
}
// PushFront pushes values to front of queue.
func (q *QueueCtx[T]) PushFront(values ...T) {
q.mutex.Lock()
for i := range values {
item := q.index(values[i])
q.queue.push_front(&item.elem)
}
if q.ch != nil {
close(q.ch)
q.ch = nil
}
q.mutex.Unlock()
}
// PushBack pushes values to back of queue.
func (q *QueueCtx[T]) PushBack(values ...T) {
q.mutex.Lock()
for i := range values {
item := q.index(values[i])
q.queue.push_back(&item.elem)
}
if q.ch != nil {
close(q.ch)
q.ch = nil
}
q.mutex.Unlock()
}
// Wait returns a ptr to the current ctx channel,
// this will block until next push to the queue.
func (q *QueueCtx[T]) Wait() <-chan struct{} {
q.mutex.Lock()
if q.ch == nil {
q.ch = make(chan struct{})
}
ctx := q.ch
q.mutex.Unlock()
return ctx
}
func (q *QueueCtx[T]) pop(ctx context.Context, next func() *list_elem) (T, bool) {
if next == nil {
panic("nil fn")
} else if ctx == nil {
panic("nil ctx")
}
// Acquire lock.
q.mutex.Lock()
var elem *list_elem
for {
// Get element.
elem = next()
if elem != nil {
break
}
if q.ch == nil {
// Allocate new ctx channel.
q.ch = make(chan struct{})
}
// Get current
// ch pointer.
ch := q.ch
// Unlock queue.
q.mutex.Unlock()
select {
// Ctx cancelled.
case <-ctx.Done():
var z T
return z, false
// Pushed!
case <-ch:
}
// Relock queue.
q.mutex.Lock()
}
// Cast the indexed item from elem.
item := (*indexed_item)(elem.data)
// Extract item value.
value := item.data.(T)
// Delete queued.
q.delete(item)
// Get func ptrs.
pop := q.Queue.pop
// Done with lock.
q.mutex.Unlock()
if pop != nil {
// Pass to
// user hook.
pop(value)
}
return value, true
}

13
vendor/codeberg.org/gruf/go-structr/util.go generated vendored Normal file
View File

@ -0,0 +1,13 @@
package structr
// once only executes 'fn' once.
func once(fn func()) func() {
var once int32
return func() {
if once != 0 {
return
}
once = 1
fn()
}
}

2
vendor/modules.txt vendored
View File

@ -62,7 +62,7 @@ codeberg.org/gruf/go-sched
## explicit; go 1.19
codeberg.org/gruf/go-store/v2/storage
codeberg.org/gruf/go-store/v2/util
# codeberg.org/gruf/go-structr v0.6.2
# codeberg.org/gruf/go-structr v0.7.0
## explicit; go 1.21
codeberg.org/gruf/go-structr
# codeberg.org/superseriousbusiness/exif-terminator v0.7.0