[chore] consolidate caching libraries (#704)

* add miekg/dns dependency

* set/validate accountDomain

* move finger to dereferencer

* totally break GetRemoteAccount

* start reworking finger func a bit

* start reworking getRemoteAccount a bit

* move mention parts to namestring

* rework webfingerget

* use util function to extract webfinger parts

* use accountDomain

* rework finger again, final form

* just a real nasty commit, the worst

* remove refresh from account

* use new ASRepToAccount signature

* fix incorrect debug call

* fix for new getRemoteAccount

* rework GetRemoteAccount

* start updating tests to remove repetition

* break a lot of tests
Move shared test logic into the testrig,
rather than having it scattered all over
the place. This allows us to just mock
the transport controller once, and have
all tests use it (unless they need not to
for some other reason).

* fix up tests to use main mock httpclient

* webfinger only if necessary

* cheeky linting with the lads

* update mentionName regex
recognize instance accounts

* don't finger instance accounts

* test webfinger part extraction

* increase default worker count to 4 per cpu

* don't repeat regex parsing

* final search for discovered accountDomain

* be more permissive in namestring lookup

* add more extraction tests

* simplify GetParseMentionFunc

* skip long search if local account

* fix broken test

* consolidate to all use same caching libraries

Signed-off-by: kim <grufwub@gmail.com>

* perform more caching in the database layer

Signed-off-by: kim <grufwub@gmail.com>

* remove ASNote cache

Signed-off-by: kim <grufwub@gmail.com>

* update cache library, improve db tracing hooks

Signed-off-by: kim <grufwub@gmail.com>

* return ErrNoEntries if no account status IDs found, small formatting changes

Signed-off-by: kim <grufwub@gmail.com>

* fix tests, thanks tobi!

Signed-off-by: kim <grufwub@gmail.com>

Co-authored-by: tsmethurst <tobi.smethurst@protonmail.com>
This commit is contained in:
kim
2022-07-10 16:18:21 +01:00
committed by GitHub
parent 211266c072
commit 7cc40302a5
67 changed files with 3159 additions and 1244 deletions

9
vendor/codeberg.org/gruf/go-atomics/LICENSE generated vendored Normal file
View File

@@ -0,0 +1,9 @@
MIT License
Copyright (c) 2022 gruf
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

3
vendor/codeberg.org/gruf/go-atomics/README.md generated vendored Normal file
View File

@@ -0,0 +1,3 @@
# go-atomics
This library provides a variety of types for atomic operations on common Go types.

57
vendor/codeberg.org/gruf/go-atomics/atomic.tpl generated vendored Normal file
View File

@@ -0,0 +1,57 @@
package atomics
import (
"sync/atomic"
"unsafe"
)
// {{ .Name }} provides user-friendly means of performing atomic operations on {{ .Type }} types.
type {{ .Name }} struct{ ptr unsafe.Pointer }
// New{{ .Name }} will return a new {{ .Name }} instance initialized with zero value.
func New{{ .Name }}() *{{ .Name }} {
var v {{ .Type }}
return &{{ .Name }}{
ptr: unsafe.Pointer(&v),
}
}
// Store will atomically store {{ .Type }} value in address contained within v.
func (v *{{ .Name }}) Store(val {{ .Type }}) {
atomic.StorePointer(&v.ptr, unsafe.Pointer(&val))
}
// Load will atomically load {{ .Type }} value at address contained within v.
func (v *{{ .Name }}) Load() {{ .Type }} {
return *(*{{ .Type }})(atomic.LoadPointer(&v.ptr))
}
// CAS performs a compare-and-swap for a(n) {{ .Type }} value at address contained within v.
func (v *{{ .Name }}) CAS(cmp, swp {{ .Type }}) bool {
for {
// Load current value at address
ptr := atomic.LoadPointer(&v.ptr)
cur := *(*{{ .Type }})(ptr)
// Perform comparison against current
if !({{ call .Compare "cur" "cmp" }}) {
return false
}
// Attempt to replace pointer
if atomic.CompareAndSwapPointer(
&v.ptr,
ptr,
unsafe.Pointer(&swp),
) {
return true
}
}
}
// Swap atomically stores new {{ .Type }} value into address contained within v, and returns previous value.
func (v *{{ .Name }}) Swap(swp {{ .Type }}) {{ .Type }} {
ptr := unsafe.Pointer(&swp)
ptr = atomic.SwapPointer(&v.ptr, ptr)
return *(*{{ .Type }})(ptr)
}

60
vendor/codeberg.org/gruf/go-atomics/atomic_test.tpl generated vendored Normal file
View File

@@ -0,0 +1,60 @@
package atomics_test
import (
"atomic"
"unsafe"
"testing"
"codeberg.org/gruf/go-atomics"
)
func Test{{ .Name }}StoreLoad(t *testing.T) {
for _, test := range {{ .Name }}Tests {
val := atomics.New{{ .Name }}()
val.Store(test.V1)
if !({{ call .Compare "val.Load()" "test.V1" }}) {
t.Fatalf("failed testing .Store and .Load: expect=%v actual=%v", val.Load(), test.V1)
}
val.Store(test.V2)
if !({{ call .Compare "val.Load()" "test.V2" }}) {
t.Fatalf("failed testing .Store and .Load: expect=%v actual=%v", val.Load(), test.V2)
}
}
}
func Test{{ .Name }}CAS(t *testing.T) {
for _, test := range {{ .Name }}Tests {
val := atomics.New{{ .Name }}()
val.Store(test.V1)
if val.CAS(test.V2, test.V1) {
t.Fatalf("failed testing negative .CAS: test=%+v state=%v", test, val.Load())
}
if !val.CAS(test.V1, test.V2) {
t.Fatalf("failed testing positive .CAS: test=%+v state=%v", test, val.Load())
}
}
}
func Test{{ .Name }}Swap(t *testing.T) {
for _, test := range {{ .Name }}Tests {
val := atomics.New{{ .Name }}()
val.Store(test.V1)
if !({{ call .Compare "val.Swap(test.V2)" "test.V1" }}) {
t.Fatal("failed testing .Swap")
}
if !({{ call .Compare "val.Swap(test.V1)" "test.V2" }}) {
t.Fatal("failed testing .Swap")
}
}
}

47
vendor/codeberg.org/gruf/go-atomics/bool.go generated vendored Normal file
View File

@@ -0,0 +1,47 @@
package atomics
import "sync/atomic"
// Bool provides user-friendly means of performing atomic operations on bool types.
type Bool uint32
// NewBool will return a new Bool instance initialized with zero value.
func NewBool() *Bool {
return new(Bool)
}
// Store will atomically store bool value in address contained within i.
func (b *Bool) Store(val bool) {
atomic.StoreUint32((*uint32)(b), fromBool(val))
}
// Load will atomically load bool value at address contained within i.
func (b *Bool) Load() bool {
return toBool(atomic.LoadUint32((*uint32)(b)))
}
// CAS performs a compare-and-swap for a(n) bool value at address contained within i.
func (b *Bool) CAS(cmp, swp bool) bool {
return atomic.CompareAndSwapUint32((*uint32)(b), fromBool(cmp), fromBool(swp))
}
// Swap atomically stores new bool value into address contained within i, and returns previous value.
func (b *Bool) Swap(swp bool) bool {
return toBool(atomic.SwapUint32((*uint32)(b), fromBool(swp)))
}
// toBool converts uint32 value to bool.
func toBool(u uint32) bool {
if u == 0 {
return false
}
return true
}
// fromBool converts from bool to uint32 value.
func fromBool(b bool) uint32 {
if b {
return 1
}
return 0
}

57
vendor/codeberg.org/gruf/go-atomics/bytes.go generated vendored Normal file
View File

@@ -0,0 +1,57 @@
package atomics
import (
"sync/atomic"
"unsafe"
)
// Bytes provides user-friendly means of performing atomic operations on []byte types.
type Bytes struct{ ptr unsafe.Pointer }
// NewBytes will return a new Bytes instance initialized with zero value.
func NewBytes() *Bytes {
var v []byte
return &Bytes{
ptr: unsafe.Pointer(&v),
}
}
// Store will atomically store []byte value in address contained within v.
func (v *Bytes) Store(val []byte) {
atomic.StorePointer(&v.ptr, unsafe.Pointer(&val))
}
// Load will atomically load []byte value at address contained within v.
func (v *Bytes) Load() []byte {
return *(*[]byte)(atomic.LoadPointer(&v.ptr))
}
// CAS performs a compare-and-swap for a(n) []byte value at address contained within v.
func (v *Bytes) CAS(cmp, swp []byte) bool {
for {
// Load current value at address
ptr := atomic.LoadPointer(&v.ptr)
cur := *(*[]byte)(ptr)
// Perform comparison against current
if !(string(cur) == string(cmp)) {
return false
}
// Attempt to replace pointer
if atomic.CompareAndSwapPointer(
&v.ptr,
ptr,
unsafe.Pointer(&swp),
) {
return true
}
}
}
// Swap atomically stores new []byte value into address contained within v, and returns previous value.
func (v *Bytes) Swap(swp []byte) []byte {
ptr := unsafe.Pointer(&swp)
ptr = atomic.SwapPointer(&v.ptr, ptr)
return *(*[]byte)(ptr)
}

57
vendor/codeberg.org/gruf/go-atomics/error.go generated vendored Normal file
View File

@@ -0,0 +1,57 @@
package atomics
import (
"sync/atomic"
"unsafe"
)
// Error provides user-friendly means of performing atomic operations on error types.
type Error struct{ ptr unsafe.Pointer }
// NewError will return a new Error instance initialized with zero value.
func NewError() *Error {
var v error
return &Error{
ptr: unsafe.Pointer(&v),
}
}
// Store will atomically store error value in address contained within v.
func (v *Error) Store(val error) {
atomic.StorePointer(&v.ptr, unsafe.Pointer(&val))
}
// Load will atomically load error value at address contained within v.
func (v *Error) Load() error {
return *(*error)(atomic.LoadPointer(&v.ptr))
}
// CAS performs a compare-and-swap for a(n) error value at address contained within v.
func (v *Error) CAS(cmp, swp error) bool {
for {
// Load current value at address
ptr := atomic.LoadPointer(&v.ptr)
cur := *(*error)(ptr)
// Perform comparison against current
if !(cur == cmp) {
return false
}
// Attempt to replace pointer
if atomic.CompareAndSwapPointer(
&v.ptr,
ptr,
unsafe.Pointer(&swp),
) {
return true
}
}
}
// Swap atomically stores new error value into address contained within v, and returns previous value.
func (v *Error) Swap(swp error) error {
ptr := unsafe.Pointer(&swp)
ptr = atomic.SwapPointer(&v.ptr, ptr)
return *(*error)(ptr)
}

97
vendor/codeberg.org/gruf/go-atomics/flags.go generated vendored Normal file
View File

@@ -0,0 +1,97 @@
package atomics
import (
"sync/atomic"
"codeberg.org/gruf/go-bitutil"
)
// Flags32 provides user-friendly means of performing atomic operations on bitutil.Flags32 types.
type Flags32 bitutil.Flags32
// NewFlags32 will return a new Flags32 instance initialized with zero value.
func NewFlags32() *Flags32 {
return new(Flags32)
}
// Get will atomically load a(n) bitutil.Flags32 value contained within f, and check if bit value is set.
func (f *Flags32) Get(bit uint8) bool {
return f.Load().Get(bit)
}
// Set performs a compare-and-swap for a(n) bitutil.Flags32 with bit value set, at address contained within f.
func (f *Flags32) Set(bit uint8) bool {
cur := f.Load()
return f.CAS(cur, cur.Set(bit))
}
// Unset performs a compare-and-swap for a(n) bitutil.Flags32 with bit value unset, at address contained within f.
func (f *Flags32) Unset(bit uint8) bool {
cur := f.Load()
return f.CAS(cur, cur.Unset(bit))
}
// Store will atomically store bitutil.Flags32 value in address contained within f.
func (f *Flags32) Store(val bitutil.Flags32) {
atomic.StoreUint32((*uint32)(f), uint32(val))
}
// Load will atomically load bitutil.Flags32 value at address contained within f.
func (f *Flags32) Load() bitutil.Flags32 {
return bitutil.Flags32(atomic.LoadUint32((*uint32)(f)))
}
// CAS performs a compare-and-swap for a(n) bitutil.Flags32 value at address contained within f.
func (f *Flags32) CAS(cmp, swp bitutil.Flags32) bool {
return atomic.CompareAndSwapUint32((*uint32)(f), uint32(cmp), uint32(swp))
}
// Swap atomically stores new bitutil.Flags32 value into address contained within f, and returns previous value.
func (f *Flags32) Swap(swp bitutil.Flags32) bitutil.Flags32 {
return bitutil.Flags32(atomic.SwapUint32((*uint32)(f), uint32(swp)))
}
// Flags64 provides user-friendly means of performing atomic operations on bitutil.Flags64 types.
type Flags64 bitutil.Flags64
// NewFlags64 will return a new Flags64 instance initialized with zero value.
func NewFlags64() *Flags64 {
return new(Flags64)
}
// Get will atomically load a(n) bitutil.Flags64 value contained within f, and check if bit value is set.
func (f *Flags64) Get(bit uint8) bool {
return f.Load().Get(bit)
}
// Set performs a compare-and-swap for a(n) bitutil.Flags64 with bit value set, at address contained within f.
func (f *Flags64) Set(bit uint8) bool {
cur := f.Load()
return f.CAS(cur, cur.Set(bit))
}
// Unset performs a compare-and-swap for a(n) bitutil.Flags64 with bit value unset, at address contained within f.
func (f *Flags64) Unset(bit uint8) bool {
cur := f.Load()
return f.CAS(cur, cur.Unset(bit))
}
// Store will atomically store bitutil.Flags64 value in address contained within f.
func (f *Flags64) Store(val bitutil.Flags64) {
atomic.StoreUint64((*uint64)(f), uint64(val))
}
// Load will atomically load bitutil.Flags64 value at address contained within f.
func (f *Flags64) Load() bitutil.Flags64 {
return bitutil.Flags64(atomic.LoadUint64((*uint64)(f)))
}
// CAS performs a compare-and-swap for a(n) bitutil.Flags64 value at address contained within f.
func (f *Flags64) CAS(cmp, swp bitutil.Flags64) bool {
return atomic.CompareAndSwapUint64((*uint64)(f), uint64(cmp), uint64(swp))
}
// Swap atomically stores new bitutil.Flags64 value into address contained within f, and returns previous value.
func (f *Flags64) Swap(swp bitutil.Flags64) bitutil.Flags64 {
return bitutil.Flags64(atomic.SwapUint64((*uint64)(f), uint64(swp)))
}

69
vendor/codeberg.org/gruf/go-atomics/int.go generated vendored Normal file
View File

@@ -0,0 +1,69 @@
package atomics
import "sync/atomic"
// Int32 provides user-friendly means of performing atomic operations on int32 types.
type Int32 int32
// NewInt32 will return a new Int32 instance initialized with zero value.
func NewInt32() *Int32 {
return new(Int32)
}
// Add will atomically add int32 delta to value in address contained within i, returning new value.
func (i *Int32) Add(delta int32) int32 {
return atomic.AddInt32((*int32)(i), delta)
}
// Store will atomically store int32 value in address contained within i.
func (i *Int32) Store(val int32) {
atomic.StoreInt32((*int32)(i), val)
}
// Load will atomically load int32 value at address contained within i.
func (i *Int32) Load() int32 {
return atomic.LoadInt32((*int32)(i))
}
// CAS performs a compare-and-swap for a(n) int32 value at address contained within i.
func (i *Int32) CAS(cmp, swp int32) bool {
return atomic.CompareAndSwapInt32((*int32)(i), cmp, swp)
}
// Swap atomically stores new int32 value into address contained within i, and returns previous value.
func (i *Int32) Swap(swp int32) int32 {
return atomic.SwapInt32((*int32)(i), swp)
}
// Int64 provides user-friendly means of performing atomic operations on int64 types.
type Int64 int64
// NewInt64 will return a new Int64 instance initialized with zero value.
func NewInt64() *Int64 {
return new(Int64)
}
// Add will atomically add int64 delta to value in address contained within i, returning new value.
func (i *Int64) Add(delta int64) int64 {
return atomic.AddInt64((*int64)(i), delta)
}
// Store will atomically store int64 value in address contained within i.
func (i *Int64) Store(val int64) {
atomic.StoreInt64((*int64)(i), val)
}
// Load will atomically load int64 value at address contained within i.
func (i *Int64) Load() int64 {
return atomic.LoadInt64((*int64)(i))
}
// CAS performs a compare-and-swap for a(n) int64 value at address contained within i.
func (i *Int64) CAS(cmp, swp int64) bool {
return atomic.CompareAndSwapInt64((*int64)(i), cmp, swp)
}
// Swap atomically stores new int64 value into address contained within i, and returns previous value.
func (i *Int64) Swap(swp int64) int64 {
return atomic.SwapInt64((*int64)(i), swp)
}

57
vendor/codeberg.org/gruf/go-atomics/interface.go generated vendored Normal file
View File

@@ -0,0 +1,57 @@
package atomics
import (
"sync/atomic"
"unsafe"
)
// Interface provides user-friendly means of performing atomic operations on interface{} types.
type Interface struct{ ptr unsafe.Pointer }
// NewInterface will return a new Interface instance initialized with zero value.
func NewInterface() *Interface {
var v interface{}
return &Interface{
ptr: unsafe.Pointer(&v),
}
}
// Store will atomically store interface{} value in address contained within v.
func (v *Interface) Store(val interface{}) {
atomic.StorePointer(&v.ptr, unsafe.Pointer(&val))
}
// Load will atomically load interface{} value at address contained within v.
func (v *Interface) Load() interface{} {
return *(*interface{})(atomic.LoadPointer(&v.ptr))
}
// CAS performs a compare-and-swap for a(n) interface{} value at address contained within v.
func (v *Interface) CAS(cmp, swp interface{}) bool {
for {
// Load current value at address
ptr := atomic.LoadPointer(&v.ptr)
cur := *(*interface{})(ptr)
// Perform comparison against current
if !(cur == cmp) {
return false
}
// Attempt to replace pointer
if atomic.CompareAndSwapPointer(
&v.ptr,
ptr,
unsafe.Pointer(&swp),
) {
return true
}
}
}
// Swap atomically stores new interface{} value into address contained within v, and returns previous value.
func (v *Interface) Swap(swp interface{}) interface{} {
ptr := unsafe.Pointer(&swp)
ptr = atomic.SwapPointer(&v.ptr, ptr)
return *(*interface{})(ptr)
}

58
vendor/codeberg.org/gruf/go-atomics/state.go generated vendored Normal file
View File

@@ -0,0 +1,58 @@
package atomics
import "sync"
// State provides user-friendly means of performing atomic-like
// operations on a uint32 state, and allowing callbacks on successful
// state change. This is a bit of a misnomer being where it is, as it
// actually uses a mutex under-the-hood.
type State struct {
mutex sync.Mutex
state uint32
}
// Store will update State value safely within mutex lock.
func (st *State) Store(val uint32) {
st.mutex.Lock()
st.state = val
st.mutex.Unlock()
}
// Load will get value of State safely within mutex lock.
func (st *State) Load() uint32 {
st.mutex.Lock()
state := st.state
st.mutex.Unlock()
return state
}
// WithLock performs fn within State mutex lock, useful if you want
// to just use State's mutex for locking instead of creating another.
func (st *State) WithLock(fn func()) {
st.mutex.Lock()
defer st.mutex.Unlock()
fn()
}
// Update performs fn within State mutex lock, with the current state
// value provided as an argument, and return value used to update state.
func (st *State) Update(fn func(state uint32) uint32) {
st.mutex.Lock()
defer st.mutex.Unlock()
st.state = fn(st.state)
}
// CAS performs a compare-and-swap on State, calling fn on success. Success value is also returned.
func (st *State) CAS(cmp, swp uint32, fn func()) (ok bool) {
// Acquire lock
st.mutex.Lock()
defer st.mutex.Unlock()
// Perform CAS operation, fn() on success
if ok = (st.state == cmp); ok {
st.state = swp
fn()
}
return
}

57
vendor/codeberg.org/gruf/go-atomics/string.go generated vendored Normal file
View File

@@ -0,0 +1,57 @@
package atomics
import (
"sync/atomic"
"unsafe"
)
// String provides user-friendly means of performing atomic operations on string types.
type String struct{ ptr unsafe.Pointer }
// NewString will return a new String instance initialized with zero value.
func NewString() *String {
var v string
return &String{
ptr: unsafe.Pointer(&v),
}
}
// Store will atomically store string value in address contained within v.
func (v *String) Store(val string) {
atomic.StorePointer(&v.ptr, unsafe.Pointer(&val))
}
// Load will atomically load string value at address contained within v.
func (v *String) Load() string {
return *(*string)(atomic.LoadPointer(&v.ptr))
}
// CAS performs a compare-and-swap for a(n) string value at address contained within v.
func (v *String) CAS(cmp, swp string) bool {
for {
// Load current value at address
ptr := atomic.LoadPointer(&v.ptr)
cur := *(*string)(ptr)
// Perform comparison against current
if !(cur == cmp) {
return false
}
// Attempt to replace pointer
if atomic.CompareAndSwapPointer(
&v.ptr,
ptr,
unsafe.Pointer(&swp),
) {
return true
}
}
}
// Swap atomically stores new string value into address contained within v, and returns previous value.
func (v *String) Swap(swp string) string {
ptr := unsafe.Pointer(&swp)
ptr = atomic.SwapPointer(&v.ptr, ptr)
return *(*string)(ptr)
}

58
vendor/codeberg.org/gruf/go-atomics/time.go generated vendored Normal file
View File

@@ -0,0 +1,58 @@
package atomics
import (
"sync/atomic"
"time"
"unsafe"
)
// Time provides user-friendly means of performing atomic operations on time.Time types.
type Time struct{ ptr unsafe.Pointer }
// NewTime will return a new Time instance initialized with zero value.
func NewTime() *Time {
var v time.Time
return &Time{
ptr: unsafe.Pointer(&v),
}
}
// Store will atomically store time.Time value in address contained within v.
func (v *Time) Store(val time.Time) {
atomic.StorePointer(&v.ptr, unsafe.Pointer(&val))
}
// Load will atomically load time.Time value at address contained within v.
func (v *Time) Load() time.Time {
return *(*time.Time)(atomic.LoadPointer(&v.ptr))
}
// CAS performs a compare-and-swap for a(n) time.Time value at address contained within v.
func (v *Time) CAS(cmp, swp time.Time) bool {
for {
// Load current value at address
ptr := atomic.LoadPointer(&v.ptr)
cur := *(*time.Time)(ptr)
// Perform comparison against current
if !(cur.Equal(cmp)) {
return false
}
// Attempt to replace pointer
if atomic.CompareAndSwapPointer(
&v.ptr,
ptr,
unsafe.Pointer(&swp),
) {
return true
}
}
}
// Swap atomically stores new time.Time value into address contained within v, and returns previous value.
func (v *Time) Swap(swp time.Time) time.Time {
ptr := unsafe.Pointer(&swp)
ptr = atomic.SwapPointer(&v.ptr, ptr)
return *(*time.Time)(ptr)
}

69
vendor/codeberg.org/gruf/go-atomics/uint.go generated vendored Normal file
View File

@@ -0,0 +1,69 @@
package atomics
import "sync/atomic"
// Uint32 provides user-friendly means of performing atomic operations on uint32 types.
type Uint32 uint32
// NewUint32 will return a new Uint32 instance initialized with zero value.
func NewUint32() *Uint32 {
return new(Uint32)
}
// Add will atomically add uint32 delta to value in address contained within i, returning new value.
func (u *Uint32) Add(delta uint32) uint32 {
return atomic.AddUint32((*uint32)(u), delta)
}
// Store will atomically store uint32 value in address contained within i.
func (u *Uint32) Store(val uint32) {
atomic.StoreUint32((*uint32)(u), val)
}
// Load will atomically load uint32 value at address contained within i.
func (u *Uint32) Load() uint32 {
return atomic.LoadUint32((*uint32)(u))
}
// CAS performs a compare-and-swap for a(n) uint32 value at address contained within i.
func (u *Uint32) CAS(cmp, swp uint32) bool {
return atomic.CompareAndSwapUint32((*uint32)(u), cmp, swp)
}
// Swap atomically stores new uint32 value into address contained within i, and returns previous value.
func (u *Uint32) Swap(swp uint32) uint32 {
return atomic.SwapUint32((*uint32)(u), swp)
}
// Uint64 provides user-friendly means of performing atomic operations on uint64 types.
type Uint64 uint64
// NewUint64 will return a new Uint64 instance initialized with zero value.
func NewUint64() *Uint64 {
return new(Uint64)
}
// Add will atomically add uint64 delta to value in address contained within i, returning new value.
func (u *Uint64) Add(delta uint64) uint64 {
return atomic.AddUint64((*uint64)(u), delta)
}
// Store will atomically store uint64 value in address contained within i.
func (u *Uint64) Store(val uint64) {
atomic.StoreUint64((*uint64)(u), val)
}
// Load will atomically load uint64 value at address contained within i.
func (u *Uint64) Load() uint64 {
return atomic.LoadUint64((*uint64)(u))
}
// CAS performs a compare-and-swap for a(n) uint64 value at address contained within i.
func (u *Uint64) CAS(cmp, swp uint64) bool {
return atomic.CompareAndSwapUint64((*uint64)(u), cmp, swp)
}
// Swap atomically stores new uint64 value into address contained within i, and returns previous value.
func (u *Uint64) Swap(swp uint64) uint64 {
return atomic.SwapUint64((*uint64)(u), swp)
}

View File

@@ -1,7 +1,7 @@
package bitutil
import (
"codeberg.org/gruf/go-bytes"
"codeberg.org/gruf/go-byteutil"
)
// Flags8 is a type-casted unsigned integer with helper
@@ -173,7 +173,7 @@ func (f Flags8) Unset7() Flags8 {
// String returns a human readable representation of Flags8.
func (f Flags8) String() string {
var val bool
var buf bytes.Buffer
var buf byteutil.Buffer
buf.WriteByte('{')
@@ -210,7 +210,7 @@ func (f Flags8) String() string {
// GoString returns a more verbose human readable representation of Flags8.
func (f Flags8) GoString() string {
var val bool
var buf bytes.Buffer
var buf byteutil.Buffer
buf.WriteString("bitutil.Flags8{")
@@ -557,7 +557,7 @@ func (f Flags16) Unset15() Flags16 {
// String returns a human readable representation of Flags16.
func (f Flags16) String() string {
var val bool
var buf bytes.Buffer
var buf byteutil.Buffer
buf.WriteByte('{')
@@ -618,7 +618,7 @@ func (f Flags16) String() string {
// GoString returns a more verbose human readable representation of Flags16.
func (f Flags16) GoString() string {
var val bool
var buf bytes.Buffer
var buf byteutil.Buffer
buf.WriteString("bitutil.Flags16{")
@@ -1277,7 +1277,7 @@ func (f Flags32) Unset31() Flags32 {
// String returns a human readable representation of Flags32.
func (f Flags32) String() string {
var val bool
var buf bytes.Buffer
var buf byteutil.Buffer
buf.WriteByte('{')
@@ -1386,7 +1386,7 @@ func (f Flags32) String() string {
// GoString returns a more verbose human readable representation of Flags32.
func (f Flags32) GoString() string {
var val bool
var buf bytes.Buffer
var buf byteutil.Buffer
buf.WriteString("bitutil.Flags32{")
@@ -2669,7 +2669,7 @@ func (f Flags64) Unset63() Flags64 {
// String returns a human readable representation of Flags64.
func (f Flags64) String() string {
var val bool
var buf bytes.Buffer
var buf byteutil.Buffer
buf.WriteByte('{')
@@ -2874,7 +2874,7 @@ func (f Flags64) String() string {
// GoString returns a more verbose human readable representation of Flags64.
func (f Flags64) GoString() string {
var val bool
var buf bytes.Buffer
var buf byteutil.Buffer
buf.WriteString("bitutil.Flags64{")

View File

@@ -3,7 +3,7 @@ package bitutil
import (
"strings"
"codeberg.org/gruf/go-bytes"
"codeberg.org/gruf/go-byteutil"
)
{{ range $idx, $size := . }}
@@ -55,7 +55,7 @@ func (f Flags{{ $size.Size }}) Unset{{ $idx }}() Flags{{ $size.Size }} {
// String returns a human readable representation of Flags{{ $size.Size }}.
func (f Flags{{ $size.Size }}) String() string {
var val bool
var buf bytes.Buffer
var buf byteutil.Buffer
buf.WriteByte('{')
{{ range $idx := .Bits }}
@@ -71,7 +71,7 @@ func (f Flags{{ $size.Size }}) String() string {
// GoString returns a more verbose human readable representation of Flags{{ $size.Size }}.
func (f Flags{{ $size.Size }})GoString() string {
var val bool
var buf bytes.Buffer
var buf byteutil.Buffer
buf.WriteString("bitutil.Flags{{ $size.Size }}{")
{{ range $idx := .Bits }}

View File

@@ -127,3 +127,8 @@ func (buf *Buffer) Reset() {
func (buf *Buffer) String() string {
return B2S(buf.B)
}
// Full returns the full capacity byteslice allocated for this buffer.
func (buf *Buffer) Full() []byte {
return buf.B[0:cap(buf.B)]
}

View File

@@ -61,7 +61,7 @@ type Cache[Key comparable, Value any] interface {
// New returns a new initialized Cache.
func New[K comparable, V any]() Cache[K, V] {
c := TTLCache[K, V]{}
c := &TTLCache[K, V]{}
c.Init()
return &c
return c
}

View File

@@ -40,9 +40,9 @@ type LookupCache[OGKey, AltKey comparable, Value any] interface {
}
type lookupTTLCache[OK, AK comparable, V any] struct {
TTLCache[OK, V]
config LookupCfg[OK, AK, V]
lookup LookupMap[OK, AK]
TTLCache[OK, V]
}
// NewLookup returns a new initialized LookupCache.
@@ -55,14 +55,13 @@ func NewLookup[OK, AK comparable, V any](cfg LookupCfg[OK, AK, V]) LookupCache[O
case cfg.DeleteLookups == nil:
panic("cache: nil delete lookups function")
}
c := lookupTTLCache[OK, AK, V]{config: cfg}
c := &lookupTTLCache[OK, AK, V]{config: cfg}
c.TTLCache.Init()
c.lookup.lookup = make(map[string]map[AK]OK)
c.config.RegisterLookups(&c.lookup)
c.SetEvictionCallback(nil)
c.SetInvalidateCallback(nil)
c.lookup.initd = true
return &c
return c
}
func (c *lookupTTLCache[OK, AK, V]) SetEvictionCallback(hook Hook[OK, V]) {
@@ -158,16 +157,13 @@ func (c *lookupTTLCache[OK, AK, V]) InvalidateBy(lookup string, key AK) bool {
// keys to primary keys under supplied lookup identifiers.
// This is essentially a wrapper around map[string](map[K1]K2).
type LookupMap[OK comparable, AK comparable] struct {
initd bool
lookup map[string](map[AK]OK)
}
// RegisterLookup registers a lookup identifier in the LookupMap,
// note this can only be doing during the cfg.RegisterLookups() hook.
func (l *LookupMap[OK, AK]) RegisterLookup(id string) {
if l.initd {
panic("cache: cannot register lookup after initialization")
} else if _, ok := l.lookup[id]; ok {
if _, ok := l.lookup[id]; ok {
panic("cache: lookup mapping already exists for identifier")
}
l.lookup[id] = make(map[AK]OK, 100)

17
vendor/codeberg.org/gruf/go-cache/v2/scheduler.go generated vendored Normal file
View File

@@ -0,0 +1,17 @@
package cache
import (
"time"
"codeberg.org/gruf/go-sched"
)
// scheduler is the global cache runtime scheduler
// for handling regular cache evictions.
var scheduler = sched.NewScheduler(5)
// schedule will given sweep routine to the global scheduler, and start global scheduler.
func schedule(sweep func(time.Time), freq time.Duration) func() {
go scheduler.Start() // does nothing if already running
return scheduler.Schedule(sched.NewJob(sweep).Every(freq))
}

View File

@@ -1,11 +1,8 @@
package cache
import (
"context"
"sync"
"time"
"codeberg.org/gruf/go-runners"
)
// TTLCache is the underlying Cache implementation, providing both the base
@@ -16,11 +13,11 @@ type TTLCache[Key comparable, Value any] struct {
evict Hook[Key, Value] // the evict hook is called when an item is evicted from the cache, includes manual delete
invalid Hook[Key, Value] // the invalidate hook is called when an item's data in the cache is invalidated
ttl time.Duration // ttl is the item TTL
svc runners.Service // svc manages running of the cache eviction routine
stop func() // stop is the cancel function for the scheduled eviction routine
mu sync.Mutex // mu protects TTLCache for concurrent access
}
// Init performs Cache initialization, this MUST be called.
// Init performs Cache initialization. MUST be called.
func (c *TTLCache[K, V]) Init() {
c.cache = make(map[K](*entry[V]), 100)
c.evict = emptyHook[K, V]
@@ -28,68 +25,48 @@ func (c *TTLCache[K, V]) Init() {
c.ttl = time.Minute * 5
}
func (c *TTLCache[K, V]) Start(freq time.Duration) bool {
func (c *TTLCache[K, V]) Start(freq time.Duration) (ok bool) {
// Nothing to start
if freq <= 0 {
return false
}
// Track state of starting
done := make(chan struct{})
started := false
// Safely start
c.mu.Lock()
go func() {
ran := c.svc.Run(func(ctx context.Context) {
// Successfully started
started = true
close(done)
// start routine
c.run(ctx, freq)
})
// failed to start
if !ran {
close(done)
}
}()
<-done
return started
}
func (c *TTLCache[K, V]) Stop() bool {
return c.svc.Stop()
}
func (c *TTLCache[K, V]) run(ctx context.Context, freq time.Duration) {
t := time.NewTimer(freq)
for {
select {
// we got stopped
case <-ctx.Done():
if !t.Stop() {
<-t.C
}
return
// next tick
case <-t.C:
c.sweep()
t.Reset(freq)
}
if ok = c.stop == nil; ok {
// Not yet running, schedule us
c.stop = schedule(c.sweep, freq)
}
// Done with lock
c.mu.Unlock()
return
}
func (c *TTLCache[K, V]) Stop() (ok bool) {
// Safely stop
c.mu.Lock()
if ok = c.stop != nil; ok {
// We're running, cancel evicts
c.stop()
c.stop = nil
}
// Done with lock
c.mu.Unlock()
return
}
// sweep attempts to evict expired items (with callback!) from cache.
func (c *TTLCache[K, V]) sweep() {
func (c *TTLCache[K, V]) sweep(now time.Time) {
// Lock and defer unlock (in case of hook panic)
c.mu.Lock()
defer c.mu.Unlock()
// Fetch current time for TTL check
now := time.Now()
// Sweep the cache for old items!
for key, item := range c.cache {
if now.After(item.expiry) {
@@ -116,9 +93,9 @@ func (c *TTLCache[K, V]) SetEvictionCallback(hook Hook[K, V]) {
}
// Safely set evict hook
c.Lock()
c.mu.Lock()
c.evict = hook
c.Unlock()
c.mu.Unlock()
}
func (c *TTLCache[K, V]) SetInvalidateCallback(hook Hook[K, V]) {
@@ -128,14 +105,14 @@ func (c *TTLCache[K, V]) SetInvalidateCallback(hook Hook[K, V]) {
}
// Safely set invalidate hook
c.Lock()
c.mu.Lock()
c.invalid = hook
c.Unlock()
c.mu.Unlock()
}
func (c *TTLCache[K, V]) SetTTL(ttl time.Duration, update bool) {
// Safely update TTL
c.Lock()
c.mu.Lock()
diff := ttl - c.ttl
c.ttl = ttl
@@ -147,13 +124,13 @@ func (c *TTLCache[K, V]) SetTTL(ttl time.Duration, update bool) {
}
// We're done
c.Unlock()
c.mu.Unlock()
}
func (c *TTLCache[K, V]) Get(key K) (V, bool) {
c.Lock()
c.mu.Lock()
value, ok := c.GetUnsafe(key)
c.Unlock()
c.mu.Unlock()
return value, ok
}
@@ -169,9 +146,9 @@ func (c *TTLCache[K, V]) GetUnsafe(key K) (V, bool) {
}
func (c *TTLCache[K, V]) Put(key K, value V) bool {
c.Lock()
c.mu.Lock()
success := c.PutUnsafe(key, value)
c.Unlock()
c.mu.Unlock()
return success
}
@@ -192,8 +169,8 @@ func (c *TTLCache[K, V]) PutUnsafe(key K, value V) bool {
}
func (c *TTLCache[K, V]) Set(key K, value V) {
c.Lock()
defer c.Unlock() // defer in case of hook panic
c.mu.Lock()
defer c.mu.Unlock() // defer in case of hook panic
c.SetUnsafe(key, value)
}
@@ -215,9 +192,9 @@ func (c *TTLCache[K, V]) SetUnsafe(key K, value V) {
}
func (c *TTLCache[K, V]) CAS(key K, cmp V, swp V) bool {
c.Lock()
c.mu.Lock()
ok := c.CASUnsafe(key, cmp, swp)
c.Unlock()
c.mu.Unlock()
return ok
}
@@ -240,9 +217,9 @@ func (c *TTLCache[K, V]) CASUnsafe(key K, cmp V, swp V) bool {
}
func (c *TTLCache[K, V]) Swap(key K, swp V) V {
c.Lock()
c.mu.Lock()
old := c.SwapUnsafe(key, swp)
c.Unlock()
c.mu.Unlock()
return old
}
@@ -267,9 +244,9 @@ func (c *TTLCache[K, V]) SwapUnsafe(key K, swp V) V {
}
func (c *TTLCache[K, V]) Has(key K) bool {
c.Lock()
c.mu.Lock()
ok := c.HasUnsafe(key)
c.Unlock()
c.mu.Unlock()
return ok
}
@@ -280,8 +257,8 @@ func (c *TTLCache[K, V]) HasUnsafe(key K) bool {
}
func (c *TTLCache[K, V]) Invalidate(key K) bool {
c.Lock()
defer c.Unlock()
c.mu.Lock()
defer c.mu.Unlock()
return c.InvalidateUnsafe(key)
}
@@ -300,8 +277,8 @@ func (c *TTLCache[K, V]) InvalidateUnsafe(key K) bool {
}
func (c *TTLCache[K, V]) Clear() {
c.Lock()
defer c.Unlock()
c.mu.Lock()
defer c.mu.Unlock()
c.ClearUnsafe()
}
@@ -314,9 +291,9 @@ func (c *TTLCache[K, V]) ClearUnsafe() {
}
func (c *TTLCache[K, V]) Size() int {
c.Lock()
c.mu.Lock()
sz := c.SizeUnsafe()
c.Unlock()
c.mu.Unlock()
return sz
}

View File

@@ -1,5 +1,9 @@
package debug
import (
_debug "runtime/debug"
)
// DEBUG returns whether debugging is enabled.
func DEBUG() bool {
return debug
@@ -11,3 +15,44 @@ func Run(fn func()) {
fn()
}
}
// BuildInfo will return a useful new-line separated build info string for current binary, setting name as given value.
func BuildInfo(name string) string {
// Read build info from current binary
build, ok := _debug.ReadBuildInfo()
if !ok {
return "name=" + name + "\n"
}
var flags, vcs, commit, time string
// Parse build information from BuildInfo.Settings
for i := 0; i < len(build.Settings); i++ {
switch build.Settings[i].Key {
case "-gcflags":
flags += ` -gcflags="` + build.Settings[i].Value + `"`
case "-ldflags":
flags += ` -ldflags="` + build.Settings[i].Value + `"`
case "-tags":
flags += ` -tags="` + build.Settings[i].Value + `"`
case "vcs":
vcs = build.Settings[i].Value
case "vcs.revision":
commit = build.Settings[i].Value
if len(commit) > 8 {
commit = commit[:8]
}
case "vcs.time":
time = build.Settings[i].Value
}
}
return "" +
"name=" + name + "\n" +
"vcs=" + vcs + "\n" +
"commit=" + commit + "\n" +
"version=" + build.Main.Version + "\n" +
"path=" + build.Path + "\n" +
"build=" + build.GoVersion + flags + "\n" +
"time=" + time + "\n"
}

View File

@@ -18,14 +18,21 @@ import (
func Is(err error, targets ...error) bool {
var flags bitutil.Flags64
// Flags only has 64 bit slots
if len(targets) > 64 {
panic("too many targets")
}
// Determine if each of targets are comparable
// Check if error is nil so we can catch
// the fast-case where a target is nil
isNil := (err == nil)
for i := 0; i < len(targets); {
// Drop nil errors
// Drop nil targets
if targets[i] == nil {
if isNil /* match! */ {
return true
}
targets = append(targets[:i], targets[i+1:]...)
continue
}

9
vendor/codeberg.org/gruf/go-sched/LICENSE generated vendored Normal file
View File

@@ -0,0 +1,9 @@
MIT License
Copyright (c) 2022 gruf
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

5
vendor/codeberg.org/gruf/go-sched/README.md generated vendored Normal file
View File

@@ -0,0 +1,5 @@
# go-sched
A simple job (both run-once and recurring) queueing library with down-to millisecond precision.
Precision estimates based on test output (running on i7-11800h): 1ms precision with 80% tolerance.

99
vendor/codeberg.org/gruf/go-sched/job.go generated vendored Normal file
View File

@@ -0,0 +1,99 @@
package sched
import (
"time"
"codeberg.org/gruf/go-atomics"
)
// Job encapsulates logic for a scheduled job to be run according
// to a set Timing, executing the job with a set panic handler, and
// holding onto a next execution time safely in a concurrent environment.
type Job struct {
id uint64
next atomics.Time
timing Timing
call func(time.Time)
panic func(interface{})
}
// NewJob returns a new Job to run given function.
func NewJob(fn func(now time.Time)) *Job {
if fn == nil {
// Ensure a function
panic("nil func")
}
j := &Job{ // set defaults
timing: emptytiming, // i.e. fire immediately
call: fn,
panic: func(i interface{}) { panic(i) },
}
// Init next time ptr
j.next.Store(zerotime)
return j
}
// At sets this Job to execute at time, by passing (*sched.Once)(&at) to .With(). See .With() for details.
func (job *Job) At(at time.Time) *Job {
return job.With((*Once)(&at))
}
// Every sets this Job to execute every period, by passing sched.Period(period) to .With(). See .With() for details.
func (job *Job) Every(period time.Duration) *Job {
return job.With(Periodic(period))
}
// EveryAt sets this Job to execute every period starting at time, by passing &PeriodicAt{once: Once(at), period: Periodic(period)} to .With(). See .With() for details.
func (job *Job) EveryAt(at time.Time, period time.Duration) *Job {
return job.With(&PeriodicAt{Once: Once(at), Period: Periodic(period)})
}
// With sets this Job's timing to given implementation, or if already set will wrap existing using sched.TimingWrap{}.
func (job *Job) With(t Timing) *Job {
if t == nil {
// Ensure a timing
panic("nil Timing")
}
if job.timing == emptytiming {
// Set new timing
job.timing = t
} else {
// Wrap old timing
old := job.timing
job.timing = &TimingWrap{
Outer: t,
Inner: old,
}
}
return job
}
// Panic specifics how this job handles panics, default is an actual panic.
func (job *Job) Panic(fn func(interface{})) *Job {
if fn == nil {
// Ensure a function
panic("nil func")
}
job.panic = fn
return job
}
// Next returns the next time this Job is expected to run.
func (job *Job) Next() time.Time {
return job.next.Load()
}
// Run will execute this Job and pass through given now time.
func (job *Job) Run(now time.Time) {
defer func() {
if r := recover(); r != nil {
job.panic(r)
}
}()
job.call(now)
}

240
vendor/codeberg.org/gruf/go-sched/scheduler.go generated vendored Normal file
View File

@@ -0,0 +1,240 @@
package sched
import (
"context"
"sort"
"time"
"codeberg.org/gruf/go-atomics"
"codeberg.org/gruf/go-runners"
)
var (
// neverticks is a timer channel that never ticks (it's starved).
neverticks = make(chan time.Time)
// alwaysticks is a timer channel that always ticks (it's closed).
alwaysticks = func() chan time.Time {
ch := make(chan time.Time)
close(ch)
return ch
}()
)
// Scheduler provides a means of running jobs at specific times and
// regular intervals, all while sharing a single underlying timer.
type Scheduler struct {
jobs []*Job // jobs is a list of tracked Jobs to be executed
jch chan interface{} // jch accepts either Jobs or job IDs to notify new/removed jobs
svc runners.Service // svc manages the main scheduler routine
jid atomics.Uint64 // jid is used to iteratively generate unique IDs for jobs
}
// New returns a new Scheduler instance with given job change queue size.
func NewScheduler(queue int) Scheduler {
if queue < 0 {
queue = 10
}
return Scheduler{jch: make(chan interface{}, queue)}
}
// Start will attempt to start the Scheduler. Immediately returns false if the Service is already running, and true after completed run.
func (sch *Scheduler) Start() bool {
return sch.svc.Run(sch.run)
}
// Stop will attempt to stop the Scheduler. Immediately returns false if not running, and true only after Scheduler is fully stopped.
func (sch *Scheduler) Stop() bool {
return sch.svc.Stop()
}
// Running will return whether Scheduler is running.
func (sch *Scheduler) Running() bool {
return sch.svc.Running()
}
// Schedule will add provided Job to the Scheduler, returning a cancel function.
func (sch *Scheduler) Schedule(job *Job) (cancel func()) {
if job == nil {
// Ensure there's a job!
panic("nil job")
}
// Get last known job ID
last := sch.jid.Load()
// Give this job an ID and check overflow
if job.id = sch.jid.Add(1); job.id < last {
panic("scheduler job id overflow")
}
// Pass job to scheduler
sch.jch <- job
// Return cancel function for job ID
return func() { sch.jch <- job.id }
}
// run is the main scheduler run routine, which runs for as long as ctx is valid.
func (sch *Scheduler) run(ctx context.Context) {
var (
// timerset represents whether timer was running
// for a particular run of the loop. false means
// that tch == neverticks || tch == alwaysticks
timerset bool
// timer tick channel (or a never-tick channel)
tch <-chan time.Time
// timer notifies this main routine to wake when
// the job queued needs to be checked for executions
timer *time.Timer
// stopdrain will stop and drain the timer
// if it has been running (i.e. timerset == true)
stopdrain = func() {
if timerset && !timer.Stop() {
<-timer.C
}
}
)
for {
select {
// Handle received job/id
case v := <-sch.jch:
sch.handle(v)
continue
// No more
default:
}
// Done
break
}
// Create a stopped timer
timer = time.NewTimer(1)
<-timer.C
for {
// Reset timer state
timerset = false
if len(sch.jobs) > 0 {
// Sort jobs by next occurring
sort.Sort(byNext(sch.jobs))
// Get execution time
now := time.Now()
// Get next job time
next := sch.jobs[0].Next()
if until := next.Sub(now); until <= 0 {
// This job is behind schedule,
// set timer to always tick
tch = alwaysticks
} else {
// Reset timer to period
timer.Reset(until)
tch = timer.C
timerset = true
}
} else {
// Unset timer
tch = neverticks
}
select {
// Scheduler stopped
case <-ctx.Done():
stopdrain()
return
// Timer ticked, run scheduled
case now := <-tch:
sch.schedule(now)
// Received update, handle job/id
case v := <-sch.jch:
sch.handle(v)
stopdrain()
}
}
}
// handle takes an interfaces received from Scheduler.jch and handles either:
// - Job --> new job to add.
// - uint64 --> job ID to remove.
func (sch *Scheduler) handle(v interface{}) {
switch v := v.(type) {
// New job added
case *Job:
// Get current time
now := time.Now()
// Update the next call time
next := v.timing.Next(now)
v.next.Store(next)
// Append this job to queued
sch.jobs = append(sch.jobs, v)
// Job removed
case uint64:
for i := 0; i < len(sch.jobs); i++ {
if sch.jobs[i].id == v {
// This is the job we're looking for! Drop this
sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...)
return
}
}
}
}
// schedule will iterate through the scheduler jobs and execute those necessary, updating their next call time.
func (sch *Scheduler) schedule(now time.Time) {
for i := 0; i < len(sch.jobs); {
// Scope our own var
job := sch.jobs[i]
// We know these jobs are ordered by .Next(), so as soon
// as we reach one with .Next() after now, we can return
if job.Next().After(now) {
return
}
// Update the next call time
next := job.timing.Next(now)
job.next.Store(next)
// Run this job async!
go job.Run(now)
if job.Next().IsZero() {
// Zero time, this job is done and can be dropped
sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...)
continue
}
// Iter
i++
}
}
// byNext is an implementation of sort.Interface to sort Jobs by their .Next() time.
type byNext []*Job
func (by byNext) Len() int {
return len(by)
}
func (by byNext) Less(i int, j int) bool {
return by[i].Next().Before(by[j].Next())
}
func (by byNext) Swap(i int, j int) {
by[i], by[j] = by[j], by[i]
}

92
vendor/codeberg.org/gruf/go-sched/timing.go generated vendored Normal file
View File

@@ -0,0 +1,92 @@
package sched
import (
"time"
)
var (
// zerotime is zero time.Time (unix epoch).
zerotime = time.Time{}
// emptytiming is a global timingempty to check against.
emptytiming = timingempty{}
)
// Timing provides scheduling for a Job, determining the next time
// for given current time that execution is required. Please note that
// calls to .Next() may alter the results of the next call, and should
// only be called by the Scheduler.
type Timing interface {
Next(time.Time) time.Time
}
// timingempty is a 'zero' Timing implementation that always returns zero time.
type timingempty struct{}
func (timingempty) Next(time.Time) time.Time {
return zerotime
}
// Once implements Timing to provide a run-once Job execution.
type Once time.Time
func (o *Once) Next(time.Time) time.Time {
ret := *(*time.Time)(o)
*o = Once(zerotime) // reset
return ret
}
// Periodic implements Timing to provide a recurring Job execution.
type Periodic time.Duration
func (p Periodic) Next(now time.Time) time.Time {
return now.Add(time.Duration(p))
}
// PeriodicAt implements Timing to provide a recurring Job execution starting at 'Once' time.
type PeriodicAt struct {
Once Once
Period Periodic
}
func (p *PeriodicAt) Next(now time.Time) time.Time {
if next := p.Once.Next(now); !next.IsZero() {
return next
}
return p.Period.Next(now)
}
// TimingWrap allows combining two different Timing implementations.
type TimingWrap struct {
Outer Timing
Inner Timing
// determined next times
outerNext time.Time
innerNext time.Time
}
func (t *TimingWrap) Next(now time.Time) time.Time {
if t.outerNext.IsZero() {
// Regenerate outermost next run time
t.outerNext = t.Outer.Next(now)
}
if t.innerNext.IsZero() {
// Regenerate innermost next run time
t.innerNext = t.Inner.Next(now)
}
// If outer comes before inner, return outer
if t.outerNext != zerotime &&
t.outerNext.Before(t.innerNext) {
next := t.outerNext
t.outerNext = zerotime
return next
}
// Else, return inner
next := t.innerNext
t.innerNext = zerotime
return next
}