start replacing client + federator + media workers with new worker + queue types

This commit is contained in:
kim
2024-04-15 10:51:59 +01:00
parent ba4f51ce2f
commit ef48fc13f0
66 changed files with 1313 additions and 632 deletions

View File

@@ -18,11 +18,9 @@
package workers
import (
"context"
"log"
"runtime"
"codeberg.org/gruf/go-runners"
"github.com/superseriousbusiness/gotosocial/internal/config"
"github.com/superseriousbusiness/gotosocial/internal/messages"
"github.com/superseriousbusiness/gotosocial/internal/scheduler"
@@ -39,35 +37,17 @@ type Workers struct {
// indexed queue of Delivery{} objects.
Delivery delivery.WorkerPool
// ClientAPI provides a worker pool that handles both
// incoming client actions, and our own side-effects.
ClientAPI runners.WorkerPool
// Client ...
Client MsgWorkerPool[*messages.FromClientAPI]
// Federator provides a worker pool that handles both
// incoming federated actions, and our own side-effects.
Federator runners.WorkerPool
// Federator ...
Federator MsgWorkerPool[*messages.FromFediAPI]
// Enqueue functions for clientAPI / federator worker pools,
// these are pointers to Processor{}.Enqueue___() msg functions.
// This prevents dependency cycling as Processor depends on Workers.
EnqueueClientAPI func(context.Context, ...messages.FromClientAPI)
EnqueueFediAPI func(context.Context, ...messages.FromFediAPI)
// Blocking processing functions for clientAPI / federator.
// These are pointers to Processor{}.Process___() msg functions.
// This prevents dependency cycling as Processor depends on Workers.
//
// Rather than queueing messages for asynchronous processing, these
// functions will process immediately and in a blocking manner, and
// will not use up a worker slot.
//
// As such, you should only call them in special cases where something
// synchronous needs to happen before you can do something else.
ProcessFromClientAPI func(context.Context, messages.FromClientAPI) error
ProcessFromFediAPI func(context.Context, messages.FromFediAPI) error
// Dereferencer worker pools.
Dereference FnWorkerPool
// Media manager worker pools.
Media runners.WorkerPool
Media FnWorkerPool
// prevent pass-by-value.
_ nocopy
@@ -90,16 +70,20 @@ func (w *Workers) Start() {
return w.Delivery.Start(n * maxprocs)
})
tryUntil("starting client API workerpool", 5, func() bool {
return w.ClientAPI.Start(4*maxprocs, 400*maxprocs)
tryUntil("starting client workerpool", 5, func() bool {
return w.Client.Start(4 * maxprocs)
})
tryUntil("starting federator workerpool", 5, func() bool {
return w.Federator.Start(4*maxprocs, 400*maxprocs)
return w.Federator.Start(4 * maxprocs)
})
tryUntil("starting dereference workerpool", 5, func() bool {
return w.Dereference.Start(4 * maxprocs)
})
tryUntil("starting media workerpool", 5, func() bool {
return w.Media.Start(8*maxprocs, 80*maxprocs)
return w.Media.Start(8 * maxprocs)
})
}
@@ -107,8 +91,9 @@ func (w *Workers) Start() {
func (w *Workers) Stop() {
tryUntil("stopping scheduler", 5, w.Scheduler.Stop)
tryUntil("stopping delivery workerpool", 5, w.Delivery.Stop)
tryUntil("stopping client API workerpool", 5, w.ClientAPI.Stop)
tryUntil("stopping client API workerpool", 5, w.Client.Stop)
tryUntil("stopping federator workerpool", 5, w.Federator.Stop)
tryUntil("stopping dereference workerpool", 5, w.Dereference.Stop)
tryUntil("stopping media workerpool", 5, w.Media.Stop)
}