replace async client API / federator msg processing with worker pools (#497)

* replace async client API / federator msg processing with worker pools
* appease our lord-and-saviour, the linter
This commit is contained in:
kim
2022-04-28 13:23:11 +01:00
committed by GitHub
parent cc5f2e98b7
commit 420e2fb22b
64 changed files with 573 additions and 336 deletions

View File

@ -24,7 +24,6 @@ import (
"net/url"
"codeberg.org/gruf/go-store/kv"
"github.com/sirupsen/logrus"
apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/email"
@ -45,6 +44,7 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/timeline"
"github.com/superseriousbusiness/gotosocial/internal/typeutils"
"github.com/superseriousbusiness/gotosocial/internal/visibility"
"github.com/superseriousbusiness/gotosocial/internal/worker"
)
// Processor should be passed to api modules (see internal/apimodule/...). It is used for
@ -55,7 +55,7 @@ import (
// for clean distribution of messages without slowing down the client API and harming the user experience.
type Processor interface {
// Start starts the Processor, reading from its channels and passing messages back and forth.
Start(ctx context.Context) error
Start() error
// Stop stops the processor cleanly, finishing handling any remaining messages before closing down.
Stop() error
// ProcessFromClientAPI processes one message coming from the clientAPI channel, and triggers appropriate side effects.
@ -235,10 +235,10 @@ type Processor interface {
// processor just implements the Processor interface
type processor struct {
fromClientAPI chan messages.FromClientAPI
fromFederator chan messages.FromFederator
clientWorker *worker.Worker[messages.FromClientAPI]
fedWorker *worker.Worker[messages.FromFederator]
federator federation.Federator
stop chan interface{}
tc typeutils.TypeConverter
oauthServer oauth.Server
mediaManager media.Manager
@ -268,26 +268,26 @@ func NewProcessor(
mediaManager media.Manager,
storage *kv.KVStore,
db db.DB,
emailSender email.Sender) Processor {
fromClientAPI := make(chan messages.FromClientAPI, 1000)
fromFederator := make(chan messages.FromFederator, 1000)
emailSender email.Sender,
clientWorker *worker.Worker[messages.FromClientAPI],
fedWorker *worker.Worker[messages.FromFederator],
) Processor {
parseMentionFunc := GetParseMentionFunc(db, federator)
statusProcessor := status.New(db, tc, fromClientAPI, parseMentionFunc)
statusProcessor := status.New(db, tc, clientWorker, parseMentionFunc)
streamingProcessor := streaming.New(db, oauthServer)
accountProcessor := account.New(db, tc, mediaManager, oauthServer, fromClientAPI, federator, parseMentionFunc)
adminProcessor := admin.New(db, tc, mediaManager, fromClientAPI)
accountProcessor := account.New(db, tc, mediaManager, oauthServer, clientWorker, federator, parseMentionFunc)
adminProcessor := admin.New(db, tc, mediaManager, clientWorker)
mediaProcessor := mediaProcessor.New(db, tc, mediaManager, federator.TransportController(), storage)
userProcessor := user.New(db, emailSender)
federationProcessor := federationProcessor.New(db, tc, federator, fromFederator)
federationProcessor := federationProcessor.New(db, tc, federator)
filter := visibility.NewFilter(db)
return &processor{
fromClientAPI: fromClientAPI,
fromFederator: fromFederator,
clientWorker: clientWorker,
fedWorker: fedWorker,
federator: federator,
stop: make(chan interface{}),
tc: tc,
oauthServer: oauthServer,
mediaManager: mediaManager,
@ -307,36 +307,29 @@ func NewProcessor(
}
// Start starts the Processor, reading from its channels and passing messages back and forth.
func (p *processor) Start(ctx context.Context) error {
go func() {
DistLoop:
for {
select {
case clientMsg := <-p.fromClientAPI:
logrus.Tracef("received message FROM client API: %+v", clientMsg)
go func() {
if err := p.ProcessFromClientAPI(ctx, clientMsg); err != nil {
logrus.Error(err)
}
}()
case federatorMsg := <-p.fromFederator:
logrus.Tracef("received message FROM federator: %+v", federatorMsg)
go func() {
if err := p.ProcessFromFederator(ctx, federatorMsg); err != nil {
logrus.Error(err)
}
}()
case <-p.stop:
break DistLoop
}
}
}()
func (p *processor) Start() error {
// Setup and start the client API worker pool
p.clientWorker.SetProcessor(p.ProcessFromClientAPI)
if err := p.clientWorker.Start(); err != nil {
return err
}
// Setup and start the federator worker pool
p.fedWorker.SetProcessor(p.ProcessFromFederator)
if err := p.fedWorker.Start(); err != nil {
return err
}
return nil
}
// Stop stops the processor cleanly, finishing handling any remaining messages before closing down.
// TODO: empty message buffer properly before stopping otherwise we'll lose federating messages.
func (p *processor) Stop() error {
close(p.stop)
if err := p.clientWorker.Stop(); err != nil {
return err
}
if err := p.fedWorker.Stop(); err != nil {
return err
}
return nil
}