[performance] improved request batching (removes need for queueing) (#1687)

* revamp http client to not limit requests, instead use sender worker

Signed-off-by: kim <grufwub@gmail.com>

* remove separate sender worker pool, spawn 2*GOMAXPROCS batch senders each time, no need for transport cache sweeping

Signed-off-by: kim <grufwub@gmail.com>

* improve batch senders to keep popping recipients until remote URL found

Signed-off-by: kim <grufwub@gmail.com>

* fix recipient looping issue

Signed-off-by: kim <grufwub@gmail.com>

* fix missing mutex unlock

Signed-off-by: kim <grufwub@gmail.com>

* move request id ctx key to gtscontext, finish filling out more code comments, add basic support for not logging client IP

Signed-off-by: kim <grufwub@gmail.com>

* slight code reformatting

Signed-off-by: kim <grufwub@gmail.com>

* a whitespace

Signed-off-by: kim <grufwub@gmail.com>

* remove unused code

Signed-off-by: kim <grufwub@gmail.com>

* add missing license headers

Signed-off-by: kim <grufwub@gmail.com>

* fix request backoff calculation

Signed-off-by: kim <grufwub@gmail.com>

---------

Signed-off-by: kim <grufwub@gmail.com>
This commit is contained in:
kim
2023-04-28 16:45:21 +01:00
committed by GitHub
parent 6b4f6dc755
commit 6a29c5ffd4
24 changed files with 431 additions and 493 deletions

View File

@@ -18,31 +18,39 @@
package httpclient
import (
"context"
"crypto/x509"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/netip"
"runtime"
"strconv"
"strings"
"time"
"codeberg.org/gruf/go-bytesize"
"codeberg.org/gruf/go-byteutil"
"codeberg.org/gruf/go-cache/v3"
errorsv2 "codeberg.org/gruf/go-errors/v2"
"codeberg.org/gruf/go-kv"
"github.com/cornelk/hashmap"
"github.com/superseriousbusiness/gotosocial/internal/gtscontext"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/log"
)
// ErrInvalidRequest is returned if a given HTTP request is invalid and cannot be performed.
var ErrInvalidRequest = errors.New("invalid http request")
var (
// ErrInvalidNetwork is returned if the request would not be performed over TCP
ErrInvalidNetwork = errors.New("invalid network type")
// ErrInvalidNetwork is returned if the request would not be performed over TCP
var ErrInvalidNetwork = errors.New("invalid network type")
// ErrReservedAddr is returned if a dialed address resolves to an IP within a blocked or reserved net.
ErrReservedAddr = errors.New("dial within blocked / reserved IP range")
// ErrReservedAddr is returned if a dialed address resolves to an IP within a blocked or reserved net.
var ErrReservedAddr = errors.New("dial within blocked / reserved IP range")
// ErrBodyTooLarge is returned when a received response body is above predefined limit (default 40MB).
var ErrBodyTooLarge = errors.New("body size too large")
// ErrBodyTooLarge is returned when a received response body is above predefined limit (default 40MB).
ErrBodyTooLarge = errors.New("body size too large")
)
// Config provides configuration details for setting up a new
// instance of httpclient.Client{}. Within are a subset of the
@@ -83,13 +91,10 @@ type Config struct {
// cases to protect against forged / unknown content-lengths
// - protection from server side request forgery (SSRF) by only dialing
// out to known public IP prefixes, configurable with allows/blocks
// - limit number of concurrent requests, else blocking until a slot
// is available (context channels still respected)
type Client struct {
client http.Client
queue *hashmap.Map[string, chan struct{}]
bmax int64 // max response body size
cmax int // max open conns per host
client http.Client
badHosts cache.Cache[string, struct{}]
bodyMax int64
}
// New returns a new instance of Client initialized using configuration.
@@ -109,28 +114,26 @@ func New(cfg Config) *Client {
}
if cfg.MaxIdleConns <= 0 {
// By default base this value on MaxOpenConns
// By default base this value on MaxOpenConns.
cfg.MaxIdleConns = cfg.MaxOpenConnsPerHost * 10
}
if cfg.MaxBodySize <= 0 {
// By default set this to a reasonable 40MB
// By default set this to a reasonable 40MB.
cfg.MaxBodySize = int64(40 * bytesize.MiB)
}
// Protect dialer with IP range sanitizer
// Protect dialer with IP range sanitizer.
d.Control = (&sanitizer{
allow: cfg.AllowRanges,
block: cfg.BlockRanges,
}).Sanitize
// Prepare client fields
// Prepare client fields.
c.client.Timeout = cfg.Timeout
c.cmax = cfg.MaxOpenConnsPerHost
c.bmax = cfg.MaxBodySize
c.queue = hashmap.New[string, chan struct{}]()
c.bodyMax = cfg.MaxBodySize
// Set underlying HTTP client roundtripper
// Set underlying HTTP client roundtripper.
c.client.Transport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
ForceAttemptHTTP2: true,
@@ -144,90 +147,185 @@ func New(cfg Config) *Client {
DisableCompression: cfg.DisableCompression,
}
// Initiate outgoing bad hosts lookup cache.
c.badHosts = cache.New[string, struct{}](0, 1000, 0)
c.badHosts.SetTTL(15*time.Minute, false)
if !c.badHosts.Start(time.Minute) {
log.Panic(nil, "failed to start transport controller cache")
}
return &c
}
// Do will perform given request when an available slot in the queue is available,
// and block until this time. For returned values, this follows the same semantics
// as the standard http.Client{}.Do() implementation except that response body will
// be wrapped by an io.LimitReader() to limit response body sizes.
func (c *Client) Do(req *http.Request) (*http.Response, error) {
// Ensure this is a valid request
if err := ValidateRequest(req); err != nil {
return nil, err
// Do ...
func (c *Client) Do(r *http.Request) (*http.Response, error) {
return c.DoSigned(r, func(r *http.Request) error {
return nil // no request signing
})
}
// DoSigned ...
func (c *Client) DoSigned(r *http.Request, sign SignFunc) (*http.Response, error) {
const (
// max no. attempts.
maxRetries = 5
// starting backoff duration.
baseBackoff = 2 * time.Second
)
// Get request hostname.
host := r.URL.Hostname()
// Check whether request should fast fail.
fastFail := gtscontext.IsFastfail(r.Context())
if !fastFail {
// Check if recently reached max retries for this host
// so we don't bother with a retry-backoff loop. The only
// errors that are retried upon are server failure and
// domain resolution type errors, so this cached result
// indicates this server is likely having issues.
fastFail = c.badHosts.Has(host)
}
// Get host's wait queue
wait := c.wait(req.Host)
// Start a log entry for this request
l := log.WithContext(r.Context()).
WithFields(kv.Fields{
{"method", r.Method},
{"url", r.URL.String()},
}...)
var ok bool
for i := 0; i < maxRetries; i++ {
var backoff time.Duration
select {
// Quickly try grab a spot
case wait <- struct{}{}:
// it's our turn!
ok = true
// Reset signing header fields
now := time.Now().UTC()
r.Header.Set("Date", now.Format("Mon, 02 Jan 2006 15:04:05")+" GMT")
r.Header.Del("Signature")
r.Header.Del("Digest")
// NOTE:
// Ideally here we would set the slot release to happen either
// on error return, or via callback from the response body closer.
// However when implementing this, there appear deadlocks between
// the channel queue here and the media manager worker pool. So
// currently we only place a limit on connections dialing out, but
// there may still be more connections open than len(c.queue) given
// that connections may not be closed until response body is closed.
// The current implementation will reduce the viability of denial of
// service attacks, but if there are future issues heed this advice :]
defer func() { <-wait }()
default:
}
// Rewind body reader and content-length if set.
if rc, ok := r.Body.(*byteutil.ReadNopCloser); ok {
r.ContentLength = int64(rc.Len())
rc.Rewind()
}
if !ok {
// No spot acquired, log warning
log.WithContext(req.Context()).
WithFields(kv.Fields{
{K: "queue", V: len(wait)},
{K: "method", V: req.Method},
{K: "host", V: req.Host},
{K: "uri", V: req.URL.RequestURI()},
}...).Warn("full request queue")
// Sign the outgoing request.
if err := sign(r); err != nil {
return nil, err
}
l.Infof("performing request")
// Perform the request.
rsp, err := c.do(r)
if err == nil { //nolint:gocritic
// TooManyRequest means we need to slow
// down and retry our request. Codes over
// 500 generally indicate temp. outages.
if code := rsp.StatusCode; code < 500 &&
code != http.StatusTooManyRequests {
return rsp, nil
}
// Generate error from status code for logging
err = errors.New(`http response "` + rsp.Status + `"`)
// Search for a provided "Retry-After" header value.
if after := rsp.Header.Get("Retry-After"); after != "" {
if u, _ := strconv.ParseUint(after, 10, 32); u != 0 {
// An integer number of backoff seconds was provided.
backoff = time.Duration(u) * time.Second
} else if at, _ := http.ParseTime(after); !at.Before(now) {
// An HTTP formatted future date-time was provided.
backoff = at.Sub(now)
}
// Don't let their provided backoff exceed our max.
if max := baseBackoff * maxRetries; backoff > max {
backoff = max
}
}
} else if errorsv2.Is(err,
context.DeadlineExceeded,
context.Canceled,
ErrBodyTooLarge,
ErrReservedAddr,
) {
// Return on non-retryable errors
return nil, err
} else if strings.Contains(err.Error(), "stopped after 10 redirects") {
// Don't bother if net/http returned after too many redirects
return nil, err
} else if errors.As(err, &x509.UnknownAuthorityError{}) {
// Unknown authority errors we do NOT recover from
return nil, err
} else if dnserr := (*net.DNSError)(nil); // nocollapse
errors.As(err, &dnserr) && dnserr.IsNotFound {
// DNS lookup failure, this domain does not exist
return nil, gtserror.SetNotFound(err)
}
if fastFail {
// on fast-fail, don't bother backoff/retry
return nil, fmt.Errorf("%w (fast fail)", err)
}
if backoff == 0 {
// No retry-after found, set our predefined
// backoff according to a multiplier of 2^n.
backoff = baseBackoff * 1 << (i + 1)
}
l.Errorf("backing off for %s after http request error: %v", backoff, err)
select {
case <-req.Context().Done():
// the request was canceled before we
// got to our turn: no need to release
return nil, req.Context().Err()
case wait <- struct{}{}:
defer func() { <-wait }()
// Request ctx cancelled
case <-r.Context().Done():
return nil, r.Context().Err()
// Backoff for some time
case <-time.After(backoff):
}
}
// Perform the HTTP request
// Add "bad" entry for this host.
c.badHosts.Set(host, struct{}{})
return nil, errors.New("transport reached max retries")
}
// do ...
func (c *Client) do(req *http.Request) (*http.Response, error) {
// Perform the HTTP request.
rsp, err := c.client.Do(req)
if err != nil {
return nil, err
}
// Check response body not too large
if rsp.ContentLength > c.bmax {
// Check response body not too large.
if rsp.ContentLength > c.bodyMax {
return nil, ErrBodyTooLarge
}
// Seperate the body implementers
// Seperate the body implementers.
rbody := (io.Reader)(rsp.Body)
cbody := (io.Closer)(rsp.Body)
var limit int64
if limit = rsp.ContentLength; limit < 0 {
// If unknown, use max as reader limit
limit = c.bmax
// If unknown, use max as reader limit.
limit = c.bodyMax
}
// Don't trust them, limit body reads
// Don't trust them, limit body reads.
rbody = io.LimitReader(rbody, limit)
// Wrap body with limit
// Wrap body with limit.
rsp.Body = &struct {
io.Reader
io.Closer
@@ -235,17 +333,3 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
return rsp, nil
}
// wait acquires the 'wait' queue for the given host string, or allocates new.
func (c *Client) wait(host string) chan struct{} {
// Look for an existing queue
queue, ok := c.queue.Get(host)
if ok {
return queue
}
// Allocate a new host queue (or return a sneaky existing one).
queue, _ = c.queue.GetOrInsert(host, make(chan struct{}, c.cmax))
return queue
}