From f94c201e9420a96ab80a5633d383e6d04fc50d85 Mon Sep 17 00:00:00 2001 From: kim Date: Wed, 24 Apr 2024 13:39:10 +0100 Subject: [PATCH] start the job scheduler separately to the worker pools --- cmd/gotosocial/action/server/server.go | 13 ++++++++++++- internal/workers/workers.go | 10 ++++++---- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/cmd/gotosocial/action/server/server.go b/cmd/gotosocial/action/server/server.go index b60a0cff4..e2d6ec3ea 100644 --- a/cmd/gotosocial/action/server/server.go +++ b/cmd/gotosocial/action/server/server.go @@ -198,6 +198,10 @@ var Start action.GTSAction = func(ctx context.Context) error { return fmt.Errorf("error starting list timeline: %s", err) } + // Start the job scheduler + // (this is required for cleaner). + state.Workers.StartScheduler() + // Create a media cleaner // using the given state. cleaner := cleaner.New(&state) @@ -214,10 +218,17 @@ var Start action.GTSAction = func(ctx context.Context) error { emailSender, ) - // Set state client / federator synchronous processing functions. + // Initialize the specialized workers. + state.Workers.Client.Init(messages.ClientMsgIndices()) + state.Workers.Federator.Init(messages.FederatorMsgIndices()) + state.Workers.Delivery.Init(client) state.Workers.Client.Process = processor.Workers().ProcessFromClientAPI state.Workers.Federator.Process = processor.Workers().ProcessFromFediAPI + // Initialize workers. + state.Workers.Start() + defer state.Workers.Stop() + // Schedule tasks for all existing poll expiries. if err := processor.Polls().ScheduleAll(ctx); err != nil { return fmt.Errorf("error scheduling poll expiries: %w", err) diff --git a/internal/workers/workers.go b/internal/workers/workers.go index 54dfcabe9..c9e2db9a1 100644 --- a/internal/workers/workers.go +++ b/internal/workers/workers.go @@ -57,14 +57,16 @@ type Workers struct { _ nocopy } -// Start will start all of the contained -// worker pools (and global scheduler). +// StartScheduler starts the job scheduler. +func (w *Workers) StartScheduler() { + tryUntil("starting scheduler", 5, w.Scheduler.Start) +} + +// Start will start contained worker pools. func (w *Workers) Start() { // Get currently set GOMAXPROCS. maxprocs := runtime.GOMAXPROCS(0) - tryUntil("starting scheduler", 5, w.Scheduler.Start) - tryUntil("start delivery workerpool", 5, func() bool { n := config.GetAdvancedSenderMultiplier() if n < 1 {