[feature] List replies policy, refactor async workers (#2087)

* Add/update some DB functions.

* move async workers into subprocessor

* rename FromFederator -> FromFediAPI

* update home timeline check to include check for current status first before moving to parent status

* change streamMap to pointer to mollify linter

* update followtoas func signature

* fix merge

* remove errant debug log

* don't use separate errs.Combine() check to wrap errs

* wrap parts of workers functionality in sub-structs

* populate report using new db funcs

* embed federator (tiny bit tidier)

* flesh out error msg, add continue(!)

* fix other error messages to be more specific

* better, nicer

* give parseURI util function a bit more util

* missing headers

* use pointers for subprocessors
This commit is contained in:
tobi
2023-08-09 19:14:33 +02:00
committed by GitHub
parent dbf487effb
commit 9770d54237
49 changed files with 4110 additions and 2660 deletions

View File

@ -18,13 +18,9 @@
package processing
import (
"context"
"github.com/superseriousbusiness/gotosocial/internal/email"
"github.com/superseriousbusiness/gotosocial/internal/federation"
"github.com/superseriousbusiness/gotosocial/internal/log"
mm "github.com/superseriousbusiness/gotosocial/internal/media"
"github.com/superseriousbusiness/gotosocial/internal/messages"
"github.com/superseriousbusiness/gotosocial/internal/oauth"
"github.com/superseriousbusiness/gotosocial/internal/processing/account"
"github.com/superseriousbusiness/gotosocial/internal/processing/admin"
@ -38,19 +34,23 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/processing/stream"
"github.com/superseriousbusiness/gotosocial/internal/processing/timeline"
"github.com/superseriousbusiness/gotosocial/internal/processing/user"
"github.com/superseriousbusiness/gotosocial/internal/processing/workers"
"github.com/superseriousbusiness/gotosocial/internal/state"
"github.com/superseriousbusiness/gotosocial/internal/typeutils"
"github.com/superseriousbusiness/gotosocial/internal/visibility"
)
// Processor groups together processing functions and
// sub processors for handling actions + events coming
// from either the client or federating APIs.
//
// Many of the functions available through this struct
// or sub processors will trigger asynchronous processing
// via the workers contained in state.
type Processor struct {
federator federation.Federator
tc typeutils.TypeConverter
oauthServer oauth.Server
mediaManager *mm.Manager
state *state.State
emailSender email.Sender
filter *visibility.Filter
tc typeutils.TypeConverter
oauthServer oauth.Server
state *state.State
/*
SUB-PROCESSORS
@ -68,6 +68,7 @@ type Processor struct {
stream stream.Processor
timeline timeline.Processor
user user.Processor
workers workers.Processor
}
func (p *Processor) Account() *account.Processor {
@ -118,6 +119,10 @@ func (p *Processor) User() *user.Processor {
return &p.user
}
func (p *Processor) Workers() *workers.Processor {
return &p.workers
}
// NewProcessor returns a new Processor.
func NewProcessor(
tc typeutils.TypeConverter,
@ -127,57 +132,53 @@ func NewProcessor(
state *state.State,
emailSender email.Sender,
) *Processor {
parseMentionFunc := GetParseMentionFunc(state.DB, federator)
filter := visibility.NewFilter(state)
var (
parseMentionFunc = GetParseMentionFunc(state.DB, federator)
filter = visibility.NewFilter(state)
)
processor := &Processor{
federator: federator,
tc: tc,
oauthServer: oauthServer,
mediaManager: mediaManager,
state: state,
filter: filter,
emailSender: emailSender,
tc: tc,
oauthServer: oauthServer,
state: state,
}
// Instantiate sub processors.
processor.account = account.New(state, tc, mediaManager, oauthServer, federator, filter, parseMentionFunc)
//
// Start with sub processors that will
// be required by the workers processor.
accountProcessor := account.New(state, tc, mediaManager, oauthServer, federator, filter, parseMentionFunc)
mediaProcessor := media.New(state, tc, mediaManager, federator.TransportController())
streamProcessor := stream.New(state, oauthServer)
// Instantiate the rest of the sub
// processors + pin them to this struct.
processor.account = accountProcessor
processor.admin = admin.New(state, tc, mediaManager, federator.TransportController(), emailSender)
processor.fedi = fedi.New(state, tc, federator, filter)
processor.list = list.New(state, tc)
processor.markers = markers.New(state, tc)
processor.media = media.New(state, tc, mediaManager, federator.TransportController())
processor.media = mediaProcessor
processor.report = report.New(state, tc)
processor.timeline = timeline.New(state, tc, filter)
processor.search = search.New(state, federator, tc, filter)
processor.status = status.New(state, federator, tc, filter, parseMentionFunc)
processor.stream = stream.New(state, oauthServer)
processor.stream = streamProcessor
processor.user = user.New(state, emailSender)
// Workers processor handles asynchronous
// worker jobs; instantiate it separately
// and pass subset of sub processors it needs.
processor.workers = workers.New(
state,
federator,
tc,
filter,
emailSender,
&accountProcessor,
&mediaProcessor,
&streamProcessor,
)
return processor
}
func (p *Processor) EnqueueClientAPI(ctx context.Context, msgs ...messages.FromClientAPI) {
log.Trace(ctx, "enqueuing")
_ = p.state.Workers.ClientAPI.MustEnqueueCtx(ctx, func(ctx context.Context) {
for _, msg := range msgs {
log.Trace(ctx, "processing: %+v", msg)
if err := p.ProcessFromClientAPI(ctx, msg); err != nil {
log.Errorf(ctx, "error processing client API message: %v", err)
}
}
})
}
func (p *Processor) EnqueueFederator(ctx context.Context, msgs ...messages.FromFederator) {
log.Trace(ctx, "enqueuing")
_ = p.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) {
for _, msg := range msgs {
log.Trace(ctx, "processing: %+v", msg)
if err := p.ProcessFromFederator(ctx, msg); err != nil {
log.Errorf(ctx, "error processing federator message: %v", err)
}
}
})
}