finish up some code commenting, bodge a vendored activity library change, integrate the deliverypool changes into transportcontroller

This commit is contained in:
kim 2024-04-03 11:30:03 +01:00
parent aa01437a5b
commit 8aede54741
11 changed files with 201 additions and 146 deletions

View File

@ -24,6 +24,7 @@ import (
"net/http"
"os"
"os/signal"
"runtime"
"syscall"
"time"
@ -124,6 +125,11 @@ var Start action.GTSAction = func(ctx context.Context) error {
TLSInsecureSkipVerify: config.GetHTTPClientTLSInsecureSkipVerify(),
})
// Initialize the queues.
state.Queues.Init()
state.Workers.HTTPClient.Init(client, &state.Queues.HTTPRequest, runtime.GOMAXPROCS(0))
// Initialize workers.
state.Workers.Start()
defer state.Workers.Stop()

View File

@ -20,17 +20,12 @@ package cache
import (
"time"
"codeberg.org/gruf/go-cache/v3/ttl"
"github.com/superseriousbusiness/gotosocial/internal/cache/headerfilter"
"github.com/superseriousbusiness/gotosocial/internal/log"
)
type Caches struct {
// BadHosts provides access to the HTTP
// client bad (i.e. erroring) hosts cache.
BadHosts ttl.Cache[string, struct{}]
// GTS provides access to the collection of
// gtsmodel object caches. (used by the database).
GTS GTSCaches

View File

@ -32,12 +32,12 @@ import (
"time"
"codeberg.org/gruf/go-bytesize"
"codeberg.org/gruf/go-cache/v3"
errorsv2 "codeberg.org/gruf/go-errors/v2"
"codeberg.org/gruf/go-iotools"
"github.com/superseriousbusiness/gotosocial/internal/gtscontext"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/log"
"github.com/superseriousbusiness/gotosocial/internal/state"
)
var (
@ -105,9 +105,9 @@ type Config struct {
// - optional request signing
// - request logging
type Client struct {
state *state.State
client http.Client
bodyMax int64
client http.Client
badHosts cache.TTLCache[string, struct{}]
bodyMax int64
}
// New returns a new instance of Client initialized using configuration.
@ -175,6 +175,13 @@ func New(cfg Config) *Client {
DisableCompression: cfg.DisableCompression,
}}
// Initiate outgoing bad hosts lookup cache.
c.badHosts = cache.NewTTL[string, struct{}](0, 512, 0)
c.badHosts.SetTTL(time.Hour, false)
if !c.badHosts.Start(time.Minute) {
log.Panic(nil, "failed to start transport controller cache")
}
return &c
}
@ -197,11 +204,11 @@ func (c *Client) Do(r *http.Request) (rsp *http.Response, err error) {
// errors that are retried upon are server failure, TLS
// and domain resolution type errors, so this cached result
// indicates this server is likely having issues.
fastFail = c.state.Caches.BadHosts.Has(host)
fastFail = c.badHosts.Has(host)
defer func() {
if err != nil {
// On error return ensure marked as bad-host.
c.state.Caches.BadHosts.Set(host, struct{}{})
// On error mark as a bad-host.
c.badHosts.Set(host, struct{}{})
}
}()
}

View File

@ -27,20 +27,21 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/queue"
)
// DeliveryWorkerPool ...
type DeliveryWorkerPool struct {
client *Client
queue *queue.StructQueue[queue.HTTPRequest]
queue *queue.StructQueue[*queue.HTTPRequest]
workers []DeliveryWorker
}
// Init ...
// Init will initialize the DeliveryWorker{} pool
// with given http client, request queue to pull
// from and number of delivery workers to spawn.
func (p *DeliveryWorkerPool) Init(
client *Client,
queue *queue.StructQueue[queue.HTTPRequest],
queue *queue.StructQueue[*queue.HTTPRequest],
workers int,
) {
p.client = client
p.queue = queue
p.workers = make([]DeliveryWorker, workers)
for i := range p.workers {
p.workers[i] = NewDeliveryWorker(
@ -50,7 +51,8 @@ func (p *DeliveryWorkerPool) Init(
}
}
// Start ...
// Start will attempt to start all of the contained DeliveryWorker{}s.
// NOTE: this is not safe to call concurrently with .Init().
func (p *DeliveryWorkerPool) Start() bool {
if len(p.workers) == 0 {
return false
@ -62,7 +64,8 @@ func (p *DeliveryWorkerPool) Start() bool {
return ok
}
// Stop ...
// Stop will attempt to stop all of the contained DeliveryWorker{}s.
// NOTE: this is not safe to call concurrently with .Init().
func (p *DeliveryWorkerPool) Stop() bool {
if len(p.workers) == 0 {
return false
@ -74,15 +77,16 @@ func (p *DeliveryWorkerPool) Stop() bool {
return ok
}
// DeliveryWorker ...
type DeliveryWorker struct {
client *Client
queue *queue.StructQueue[queue.HTTPRequest]
queue *queue.StructQueue[*queue.HTTPRequest]
backlog []*delivery
service runners.Service
}
// NewDeliveryWorker returns a new DeliveryWorker that feeds from queue, using given HTTP client.
func NewDeliveryWorker(client *Client, queue *queue.StructQueue[queue.HTTPRequest]) DeliveryWorker {
func NewDeliveryWorker(client *Client, queue *queue.StructQueue[*queue.HTTPRequest]) DeliveryWorker {
return DeliveryWorker{
client: client,
queue: queue,
@ -90,12 +94,12 @@ func NewDeliveryWorker(client *Client, queue *queue.StructQueue[queue.HTTPReques
}
}
// Start ...
// Start will attempt to start the DeliveryWorker{}.
func (w *DeliveryWorker) Start() bool {
return w.service.Run(w.process)
}
// Stop ...
// Stop will attempt to stop the DeliveryWorker{}.
func (w *DeliveryWorker) Stop() bool {
return w.service.Stop()
}
@ -237,7 +241,7 @@ func (d *delivery) BackOff() time.Duration {
}
// wrapMsg wraps a received queued HTTP request message in our delivery type.
func wrapMsg(ctx context.Context, msg queue.HTTPRequest) *delivery {
func wrapMsg(ctx context.Context, msg *queue.HTTPRequest) *delivery {
dlv := new(delivery)
dlv.request = wrapRequest(msg.Request)
dlv.log = requestLog(dlv.req)

View File

@ -1,3 +1,20 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package httpclient
import "time"

View File

@ -62,7 +62,4 @@ type HTTPRequest struct {
// Request ...
Request *http.Request
// Signer ...
Signer func(*http.Request) error
}

View File

@ -19,118 +19,139 @@ package transport
import (
"context"
"encoding/json"
"net/http"
"net/url"
"sync"
"codeberg.org/gruf/go-byteutil"
apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util"
"github.com/superseriousbusiness/gotosocial/internal/config"
"github.com/superseriousbusiness/gotosocial/internal/gtscontext"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/queue"
)
func (t *transport) BatchDeliver(ctx context.Context, b []byte, recipients []*url.URL) error {
func (t *transport) BatchDeliver(ctx context.Context, obj map[string]interface{}, recipients []*url.URL) error {
var (
// errs accumulates errors received during
// attempted delivery by deliverer routines.
// accumulated prepared reqs.
reqs []*queue.HTTPRequest
// accumulated preparation errs.
errs gtserror.MultiError
// wait blocks until all sender
// routines have returned.
wait sync.WaitGroup
// mutex protects 'recipients' and
// 'errs' for concurrent access.
mutex sync.Mutex
// Get current instance host info.
domain = config.GetAccountDomain()
host = config.GetHost()
)
// Block on expect no. senders.
wait.Add(t.controller.senders)
for i := 0; i < t.controller.senders; i++ {
go func() {
// Mark returned.
defer wait.Done()
for {
// Acquire lock.
mutex.Lock()
if len(recipients) == 0 {
// Reached end.
mutex.Unlock()
return
}
// Pop next recipient.
i := len(recipients) - 1
to := recipients[i]
recipients = recipients[:i]
// Done with lock.
mutex.Unlock()
// Skip delivery to recipient if it is "us".
if to.Host == host || to.Host == domain {
continue
}
// Attempt to deliver data to recipient.
if err := t.deliver(ctx, b, to); err != nil {
mutex.Lock() // safely append err to accumulator.
errs.Appendf("error delivering to %s: %w", to, err)
mutex.Unlock()
}
}
}()
// Marshal object as JSON.
b, err := json.Marshal(obj)
if err != nil {
return gtserror.Newf("error marshaling json: %w", err)
}
// Wait for finish.
wait.Wait()
// Extract object ID.
id := getObjectID(obj)
for _, to := range recipients {
// Skip delivery to recipient if it is "us".
if to.Host == host || to.Host == domain {
continue
}
// Prepare new http client request.
req, err := t.prepare(ctx, id, b, to)
if err != nil {
errs.Append(err)
continue
}
// Append to request queue.
reqs = append(reqs, req)
}
// Push the request list to HTTP client worker queue.
t.controller.state.Queues.HTTPRequest.Push(reqs...)
// Return combined err.
return errs.Combine()
}
func (t *transport) Deliver(ctx context.Context, b []byte, to *url.URL) error {
func (t *transport) Deliver(ctx context.Context, obj map[string]interface{}, to *url.URL) error {
// if 'to' host is our own, skip as we don't need to deliver to ourselves...
if to.Host == config.GetHost() || to.Host == config.GetAccountDomain() {
return nil
}
// Deliver data to recipient.
return t.deliver(ctx, b, to)
// Marshal object as JSON.
b, err := json.Marshal(obj)
if err != nil {
return gtserror.Newf("error marshaling json: %w", err)
}
// Extract object ID.
id := getObjectID(obj)
// Prepare new http client request.
req, err := t.prepare(ctx, id, b, to)
if err != nil {
return err
}
// Push the request to HTTP client worker queue.
t.controller.state.Queues.HTTPRequest.Push(req)
return nil
}
func (t *transport) deliver(ctx context.Context, b []byte, to *url.URL) error {
// prepare will prepare a POST http.Request{}
// to recipient at 'to', wrapping in a queued
// request object with signing function.
func (t *transport) prepare(
ctx context.Context,
objectID string,
data []byte,
to *url.URL,
) (
*queue.HTTPRequest,
error,
) {
url := to.String()
// Use rewindable bytes reader for body.
// Use rewindable reader for body.
var body byteutil.ReadNopCloser
body.Reset(b)
body.Reset(data)
// Prepare POST signer.
sign := t.signPOST(data)
// Update to-be-used request context with signing details.
ctx = gtscontext.SetOutgoingPublicKeyID(ctx, t.pubKeyID)
ctx = gtscontext.SetHTTPClientSignFunc(ctx, sign)
req, err := http.NewRequestWithContext(ctx, "POST", url, &body)
if err != nil {
return err
return nil, gtserror.Newf("error preparing request: %w", err)
}
req.Header.Add("Content-Type", string(apiutil.AppActivityLDJSON))
req.Header.Add("Accept-Charset", "utf-8")
rsp, err := t.POST(req, b)
if err != nil {
return err
}
defer rsp.Body.Close()
if code := rsp.StatusCode; code != http.StatusOK &&
code != http.StatusCreated && code != http.StatusAccepted {
return gtserror.NewFromResponse(rsp)
}
return nil
return &queue.HTTPRequest{
ObjectID: objectID,
Request: req,
}, nil
}
// getObjectID extracts an object ID from 'serialized' ActivityPub object map.
func getObjectID(obj map[string]interface{}) string {
switch t := obj["object"].(type) {
case string:
return t
case map[string]interface{}:
id, _ := t["id"].(string)
return id
default:
return ""
}
}

View File

@ -51,10 +51,10 @@ type Transport interface {
POST(*http.Request, []byte) (*http.Response, error)
// Deliver sends an ActivityStreams object.
Deliver(ctx context.Context, b []byte, to *url.URL) error
Deliver(ctx context.Context, obj map[string]interface{}, to *url.URL) error
// BatchDeliver sends an ActivityStreams object to multiple recipients.
BatchDeliver(ctx context.Context, b []byte, recipients []*url.URL) error
BatchDeliver(ctx context.Context, obj map[string]interface{}, recipients []*url.URL) error
/*
GET functions
@ -77,7 +77,8 @@ type Transport interface {
Finger(ctx context.Context, targetUsername string, targetDomain string) ([]byte, error)
}
// transport implements the Transport interface.
// transport implements
// the Transport interface.
type transport struct {
controller *controller
pubKeyID string

View File

@ -33,8 +33,8 @@ type Workers struct {
// Main task scheduler instance.
Scheduler scheduler.Scheduler
// Delivery ...
Delivery httpclient.DeliveryWorkerPool
// HTTPClient ...
HTTPClient httpclient.DeliveryWorkerPool
// ClientAPI provides a worker pool that handles both
// incoming client actions, and our own side-effects.
@ -78,7 +78,7 @@ func (w *Workers) Start() {
tryUntil("starting scheduler", 5, w.Scheduler.Start)
tryUntil("start http client workerpool", 5, w.Delivery.Start)
tryUntil("start http client workerpool", 5, w.HTTPClient.Start)
tryUntil("starting client API workerpool", 5, func() bool {
return w.ClientAPI.Start(4*maxprocs, 400*maxprocs)
@ -96,7 +96,7 @@ func (w *Workers) Start() {
// Stop will stop all of the contained worker pools (and global scheduler).
func (w *Workers) Stop() {
tryUntil("stopping scheduler", 5, w.Scheduler.Stop)
tryUntil("stopping http client workerpool", 5, w.Delivery.Stop)
tryUntil("stopping http client workerpool", 5, w.HTTPClient.Stop)
tryUntil("stopping client API workerpool", 5, w.ClientAPI.Stop)
tryUntil("stopping federator workerpool", 5, w.Federator.Stop)
tryUntil("stopping media workerpool", 5, w.Media.Stop)

View File

@ -2,7 +2,6 @@ package pub
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
@ -477,17 +476,12 @@ func (a *SideEffectActor) deliverToRecipients(c context.Context, boxIRI *url.URL
return err
}
b, err := json.Marshal(m)
if err != nil {
return err
}
tp, err := a.common.NewTransport(c, boxIRI, goFedUserAgent())
if err != nil {
return err
}
return tp.BatchDeliver(c, b, recipients)
return tp.BatchDeliver(c, m, recipients)
}
// addToOutbox adds the activity to the outbox and creates the activity in the

View File

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"crypto"
"encoding/json"
"fmt"
"net/http"
"net/url"
@ -44,10 +45,10 @@ type Transport interface {
Dereference(c context.Context, iri *url.URL) (*http.Response, error)
// Deliver sends an ActivityStreams object.
Deliver(c context.Context, b []byte, to *url.URL) error
Deliver(c context.Context, obj map[string]interface{}, to *url.URL) error
// BatchDeliver sends an ActivityStreams object to multiple recipients.
BatchDeliver(c context.Context, b []byte, recipients []*url.URL) error
BatchDeliver(c context.Context, obj map[string]interface{}, recipients []*url.URL) error
}
// Transport must be implemented by HttpSigTransport.
@ -138,7 +139,49 @@ func (h HttpSigTransport) Dereference(c context.Context, iri *url.URL) (*http.Re
}
// Deliver sends a POST request with an HTTP Signature.
func (h HttpSigTransport) Deliver(c context.Context, b []byte, to *url.URL) error {
func (h HttpSigTransport) Deliver(c context.Context, data map[string]interface{}, to *url.URL) error {
b, err := json.Marshal(data)
if err != nil {
return err
}
return h.deliver(c, b, to)
}
// BatchDeliver sends concurrent POST requests. Returns an error if any of the requests had an error.
func (h HttpSigTransport) BatchDeliver(c context.Context, data map[string]interface{}, recipients []*url.URL) error {
b, err := json.Marshal(data)
if err != nil {
return err
}
var wg sync.WaitGroup
errCh := make(chan error, len(recipients))
for _, recipient := range recipients {
wg.Add(1)
go func(r *url.URL) {
defer wg.Done()
if err := h.deliver(c, b, r); err != nil {
errCh <- err
}
}(recipient)
}
wg.Wait()
errs := make([]string, 0, len(recipients))
outer:
for {
select {
case e := <-errCh:
errs = append(errs, e.Error())
default:
break outer
}
}
if len(errs) > 0 {
return fmt.Errorf("batch deliver had at least one failure: %s", strings.Join(errs, "; "))
}
return nil
}
func (h HttpSigTransport) deliver(c context.Context, b []byte, to *url.URL) error {
req, err := http.NewRequest("POST", to.String(), bytes.NewReader(b))
if err != nil {
return err
@ -166,36 +209,6 @@ func (h HttpSigTransport) Deliver(c context.Context, b []byte, to *url.URL) erro
return nil
}
// BatchDeliver sends concurrent POST requests. Returns an error if any of the requests had an error.
func (h HttpSigTransport) BatchDeliver(c context.Context, b []byte, recipients []*url.URL) error {
var wg sync.WaitGroup
errCh := make(chan error, len(recipients))
for _, recipient := range recipients {
wg.Add(1)
go func(r *url.URL) {
defer wg.Done()
if err := h.Deliver(c, b, r); err != nil {
errCh <- err
}
}(recipient)
}
wg.Wait()
errs := make([]string, 0, len(recipients))
outer:
for {
select {
case e := <-errCh:
errs = append(errs, e.Error())
default:
break outer
}
}
if len(errs) > 0 {
return fmt.Errorf("batch deliver had at least one failure: %s", strings.Join(errs, "; "))
}
return nil
}
// HttpClient sends http requests, and is an abstraction only needed by the
// HttpSigTransport. The standard library's Client satisfies this interface.
type HttpClient interface {