diff --git a/go.mod b/go.mod index d60c1cba8..52e2e23a8 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( codeberg.org/gruf/go-kv v1.5.2 codeberg.org/gruf/go-logger/v2 v2.2.1 codeberg.org/gruf/go-mutexes v1.1.5 - codeberg.org/gruf/go-runners v1.4.0 + codeberg.org/gruf/go-runners v1.5.1 codeberg.org/gruf/go-store/v2 v2.2.1 github.com/KimMachineGun/automemlimit v0.2.4 github.com/abema/go-mp4 v0.10.0 diff --git a/go.sum b/go.sum index b3d3db9ca..f424f455c 100644 --- a/go.sum +++ b/go.sum @@ -79,8 +79,8 @@ codeberg.org/gruf/go-mutexes v1.1.5 h1:8Y8DwCGf24MyzOSaPvLrtk/B4ecVx4z+fppL6dY+P codeberg.org/gruf/go-mutexes v1.1.5/go.mod h1:1j/6/MBeBQUedAtAtysLLnBKogfOZAxdym0E3wlaBD8= codeberg.org/gruf/go-pools v1.1.0 h1:LbYP24eQLl/YI1fSU2pafiwhGol1Z1zPjRrMsXpF88s= codeberg.org/gruf/go-pools v1.1.0/go.mod h1:ZMYpt/DjQWYC3zFD3T97QWSFKs62zAUGJ/tzvgB9D68= -codeberg.org/gruf/go-runners v1.4.0 h1:977nVjigAdH95+VAB/a6tyBJOKk99e60h+mfHzBs/n8= -codeberg.org/gruf/go-runners v1.4.0/go.mod h1:kUM6GYL7dC+f9Sc/XuwdvB/mB4FuI4fJFb150ADMsmw= +codeberg.org/gruf/go-runners v1.5.1 h1:ekhhxKvO6D/VC7nS/xpv71/iRX01JSqcBEbahqPUghg= +codeberg.org/gruf/go-runners v1.5.1/go.mod h1:kUM6GYL7dC+f9Sc/XuwdvB/mB4FuI4fJFb150ADMsmw= codeberg.org/gruf/go-sched v1.2.0 h1:utZl/7srVcbh30rFw42LC2/cMtak4UZRxtIOt/5riNA= codeberg.org/gruf/go-sched v1.2.0/go.mod h1:v4ueWq+fAtAw9JYt4aFXvadI1YoOqofgHQgszRYuslA= codeberg.org/gruf/go-store/v2 v2.2.1 h1:lbvMjhMLebefiaPNLtWvPySKSYM5xN1aztSxxz+vCzU= diff --git a/vendor/codeberg.org/gruf/go-runners/context.go b/vendor/codeberg.org/gruf/go-runners/context.go index 9cb6aa5f7..12f7f1a10 100644 --- a/vendor/codeberg.org/gruf/go-runners/context.go +++ b/vendor/codeberg.org/gruf/go-runners/context.go @@ -7,9 +7,9 @@ import ( // closedctx is an always closed context. var closedctx = func() context.Context { - ctx := make(cancelctx) + ctx := make(chan struct{}) close(ctx) - return ctx + return CancelCtx(ctx) }() // Closed returns an always closed context. @@ -17,24 +17,25 @@ func Closed() context.Context { return closedctx } -// ContextWithCancel returns a new context.Context impl with cancel. -func ContextWithCancel() (context.Context, context.CancelFunc) { - ctx := make(cancelctx) - return ctx, func() { close(ctx) } +// CtxWithCancel returns a new context.Context impl with cancel. +func CtxWithCancel() (context.Context, context.CancelFunc) { + ctx := make(chan struct{}) + cncl := func() { close(ctx) } + return CancelCtx(ctx), cncl } -// cancelctx is the simplest possible cancellable context. -type cancelctx (chan struct{}) +// CancelCtx is the simplest possible cancellable context. +type CancelCtx (<-chan struct{}) -func (cancelctx) Deadline() (time.Time, bool) { +func (CancelCtx) Deadline() (time.Time, bool) { return time.Time{}, false } -func (ctx cancelctx) Done() <-chan struct{} { +func (ctx CancelCtx) Done() <-chan struct{} { return ctx } -func (ctx cancelctx) Err() error { +func (ctx CancelCtx) Err() error { select { case <-ctx: return context.Canceled @@ -43,11 +44,11 @@ func (ctx cancelctx) Err() error { } } -func (cancelctx) Value(key interface{}) interface{} { +func (CancelCtx) Value(key interface{}) interface{} { return nil } -func (ctx cancelctx) String() string { +func (ctx CancelCtx) String() string { var state string select { case <-ctx: @@ -55,9 +56,9 @@ func (ctx cancelctx) String() string { default: state = "open" } - return "cancelctx{state:" + state + "}" + return "CancelCtx{state:" + state + "}" } -func (ctx cancelctx) GoString() string { +func (ctx CancelCtx) GoString() string { return "runners." + ctx.String() } diff --git a/vendor/codeberg.org/gruf/go-runners/pool.go b/vendor/codeberg.org/gruf/go-runners/pool.go index 1d83e85c7..16222b2e1 100644 --- a/vendor/codeberg.org/gruf/go-runners/pool.go +++ b/vendor/codeberg.org/gruf/go-runners/pool.go @@ -67,20 +67,14 @@ func (pool *WorkerPool) Start(workers int, queue int) bool { go func() { defer wait.Done() - // Run worker function. - for !worker_run(ctx, fns) { - // retry on panic + // Run worker function (retry on panic) + for !worker_run(CancelCtx(ctx), fns) { } }() } - // Set GC finalizer to stop pool on dealloc. - runtime.SetFinalizer(pool, func(pool *WorkerPool) { - _ = pool.svc.Stop() - }) - // Wait on ctx - <-ctx.Done() + <-ctx // Drain function queue. // @@ -110,6 +104,16 @@ func (pool *WorkerPool) Stop() bool { return pool.svc.Stop() } +// Running returns if WorkerPool management loop is running (i.e. NOT stopped / stopping). +func (pool *WorkerPool) Running() bool { + return pool.svc.Running() +} + +// Done returns a channel that's closed when WorkerPool.Stop() is called. It is the same channel provided to the currently running worker functions. +func (pool *WorkerPool) Done() <-chan struct{} { + return pool.svc.Done() +} + // Enqueue will add provided WorkerFunc to the queue to be performed when there is a free worker. // This will block until function is queued or pool is stopped. In all cases, the WorkerFunc will be // executed, with the state of the pool being indicated by <-ctx.Done() of the passed ctx. diff --git a/vendor/codeberg.org/gruf/go-runners/service.go b/vendor/codeberg.org/gruf/go-runners/service.go index 2c9be8225..c019a10f6 100644 --- a/vendor/codeberg.org/gruf/go-runners/service.go +++ b/vendor/codeberg.org/gruf/go-runners/service.go @@ -8,10 +8,10 @@ import ( // Service provides a means of tracking a single long-running service, provided protected state // changes and preventing multiple instances running. Also providing service state information. type Service struct { - state uint32 // 0=stopped, 1=running, 2=stopping - mutex sync.Mutex // mutext protects overall state changes - wait sync.Mutex // wait is used as a single-entity wait-group, only ever locked within 'mutex' - ctx cancelctx // ctx is the current context for running function (or nil if not running) + state uint32 // 0=stopped, 1=running, 2=stopping + mutex sync.Mutex // mutext protects overall state changes + wait sync.Mutex // wait is used as a single-entity wait-group, only ever locked within 'mutex' + ctx chan struct{} // ctx is the current context for running function (or nil if not running) } // Run will run the supplied function until completion, using given context to propagate cancel. @@ -31,8 +31,8 @@ func (svc *Service) Run(fn func(context.Context)) bool { _ = svc.Stop() }() - // Run - fn(ctx) + // Run with context. + fn(CancelCtx(ctx)) return true } @@ -55,8 +55,8 @@ func (svc *Service) GoRun(fn func(context.Context)) bool { _ = svc.Stop() }() - // Run - fn(ctx) + // Run with context. + fn(CancelCtx(ctx)) }() return true @@ -104,7 +104,7 @@ func (svc *Service) While(fn func()) { } // doStart will safely set Service state to started, returning a ptr to this context insance. -func (svc *Service) doStart() (cancelctx, bool) { +func (svc *Service) doStart() (chan struct{}, bool) { // Protect startup svc.mutex.Lock() @@ -119,7 +119,7 @@ func (svc *Service) doStart() (cancelctx, bool) { if svc.ctx == nil { // this will only have been allocated // if svc.Done() was already called. - svc.ctx = make(cancelctx) + svc.ctx = make(chan struct{}) } // Start the waiter @@ -134,7 +134,7 @@ func (svc *Service) doStart() (cancelctx, bool) { } // doStop will safely set Service state to stopping, returning a ptr to this cancelfunc instance. -func (svc *Service) doStop() (cancelctx, bool) { +func (svc *Service) doStop() (chan struct{}, bool) { // Protect stop svc.mutex.Lock() @@ -175,7 +175,7 @@ func (svc *Service) Done() <-chan struct{} { // here we create a new context so that the // returned 'done' channel here will still // be valid for when Service is next started. - svc.ctx = make(cancelctx) + svc.ctx = make(chan struct{}) } done = svc.ctx diff --git a/vendor/modules.txt b/vendor/modules.txt index 420c3df83..846a604e6 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -58,7 +58,7 @@ codeberg.org/gruf/go-mutexes # codeberg.org/gruf/go-pools v1.1.0 ## explicit; go 1.16 codeberg.org/gruf/go-pools -# codeberg.org/gruf/go-runners v1.4.0 +# codeberg.org/gruf/go-runners v1.5.1 ## explicit; go 1.14 codeberg.org/gruf/go-runners # codeberg.org/gruf/go-sched v1.2.0