[feature] various worker / request queue improvements (#995)

* greatly simplify httpclient request queuing

Signed-off-by: kim <grufwub@gmail.com>

* improved request queue mutex logic

Signed-off-by: kim <grufwub@gmail.com>

* use improved hashmap library

Signed-off-by: kim <grufwub@gmail.com>

* add warn logging when request queues are full

Signed-off-by: kim <grufwub@gmail.com>

* improve worker pool prefix var naming

Signed-off-by: kim <grufwub@gmail.com>

* improved worker pool error logging

Signed-off-by: kim <grufwub@gmail.com>

* move error message into separate field

Signed-off-by: kim <grufwub@gmail.com>

* remove old log statement

Signed-off-by: kim <grufwub@gmail.com>

* don't export worker message, it gets very spammy :')

Signed-off-by: kim <grufwub@gmail.com>

Signed-off-by: kim <grufwub@gmail.com>
This commit is contained in:
kim 2022-11-08 09:35:24 +00:00 committed by GitHub
parent 7c0bbd3f6a
commit 0e57246083
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 88 additions and 208 deletions

2
go.mod
View File

@ -15,6 +15,7 @@ require (
codeberg.org/gruf/go-store/v2 v2.0.7 codeberg.org/gruf/go-store/v2 v2.0.7
github.com/buckket/go-blurhash v1.1.0 github.com/buckket/go-blurhash v1.1.0
github.com/coreos/go-oidc/v3 v3.4.0 github.com/coreos/go-oidc/v3 v3.4.0
github.com/cornelk/hashmap v1.0.8
github.com/disintegration/imaging v1.6.2 github.com/disintegration/imaging v1.6.2
github.com/gin-contrib/cors v1.4.0 github.com/gin-contrib/cors v1.4.0
github.com/gin-contrib/gzip v0.0.6 github.com/gin-contrib/gzip v0.0.6
@ -67,7 +68,6 @@ require (
codeberg.org/gruf/go-pools v1.1.0 // indirect codeberg.org/gruf/go-pools v1.1.0 // indirect
codeberg.org/gruf/go-sched v1.1.1 // indirect codeberg.org/gruf/go-sched v1.1.1 // indirect
github.com/aymerick/douceur v0.2.0 // indirect github.com/aymerick/douceur v0.2.0 // indirect
github.com/cornelk/hashmap v1.0.8 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dsoprea/go-exif/v3 v3.0.0-20210625224831-a6301f85c82b // indirect github.com/dsoprea/go-exif/v3 v3.0.0-20210625224831-a6301f85c82b // indirect
github.com/dsoprea/go-iptc v0.0.0-20200610044640-bc9ca208b413 // indirect github.com/dsoprea/go-iptc v0.0.0-20200610044640-bc9ca208b413 // indirect

View File

@ -26,6 +26,7 @@ import (
"reflect" "reflect"
"runtime" "runtime"
"codeberg.org/gruf/go-kv"
"codeberg.org/gruf/go-runners" "codeberg.org/gruf/go-runners"
"github.com/superseriousbusiness/gotosocial/internal/log" "github.com/superseriousbusiness/gotosocial/internal/log"
) )
@ -35,7 +36,7 @@ type WorkerPool[MsgType any] struct {
workers runners.WorkerPool workers runners.WorkerPool
process func(context.Context, MsgType) error process func(context.Context, MsgType) error
nw, nq int nw, nq int
prefix string // contains type prefix for logging wtype string // contains worker type for logging
} }
// New returns a new WorkerPool[MsgType] with given number of workers and queue ratio, // New returns a new WorkerPool[MsgType] with given number of workers and queue ratio,
@ -61,12 +62,12 @@ func NewWorkerPool[MsgType any](workers int, queueRatio int) *WorkerPool[MsgType
process: nil, process: nil,
nw: workers, nw: workers,
nq: workers * queueRatio, nq: workers * queueRatio,
prefix: fmt.Sprintf("worker.Worker[%s]", msgType), wtype: fmt.Sprintf("worker.Worker[%s]", msgType),
} }
// Log new worker creation with type prefix // Log new worker creation with worker type prefix
log.Infof("%s created with workers=%d queue=%d", log.Infof("%s created with workers=%d queue=%d",
w.prefix, w.wtype,
workers, workers,
workers*queueRatio, workers*queueRatio,
) )
@ -76,7 +77,7 @@ func NewWorkerPool[MsgType any](workers int, queueRatio int) *WorkerPool[MsgType
// Start will attempt to start the underlying worker pool, or return error. // Start will attempt to start the underlying worker pool, or return error.
func (w *WorkerPool[MsgType]) Start() error { func (w *WorkerPool[MsgType]) Start() error {
log.Infof("%s starting", w.prefix) log.Infof("%s starting", w.wtype)
// Check processor was set // Check processor was set
if w.process == nil { if w.process == nil {
@ -93,7 +94,7 @@ func (w *WorkerPool[MsgType]) Start() error {
// Stop will attempt to stop the underlying worker pool, or return error. // Stop will attempt to stop the underlying worker pool, or return error.
func (w *WorkerPool[MsgType]) Stop() error { func (w *WorkerPool[MsgType]) Stop() error {
log.Infof("%s stopping", w.prefix) log.Infof("%s stopping", w.wtype)
// Attempt to stop pool // Attempt to stop pool
if !w.workers.Stop() { if !w.workers.Stop() {
@ -106,19 +107,34 @@ func (w *WorkerPool[MsgType]) Stop() error {
// SetProcessor will set the Worker's processor function, which is called for each queued message. // SetProcessor will set the Worker's processor function, which is called for each queued message.
func (w *WorkerPool[MsgType]) SetProcessor(fn func(context.Context, MsgType) error) { func (w *WorkerPool[MsgType]) SetProcessor(fn func(context.Context, MsgType) error) {
if w.process != nil { if w.process != nil {
log.Fatalf("%s Worker.process is already set", w.prefix) log.Panicf("%s Worker.process is already set", w.wtype)
} }
w.process = fn w.process = fn
} }
// Queue will queue provided message to be processed with there's a free worker. // Queue will queue provided message to be processed with there's a free worker.
func (w *WorkerPool[MsgType]) Queue(msg MsgType) { func (w *WorkerPool[MsgType]) Queue(msg MsgType) {
log.Tracef("%s queueing message (queue=%d): %+v", log.Tracef("%s queueing message: %+v", w.wtype, msg)
w.prefix, w.workers.Queue(), msg,
) // Create new process function for msg
w.workers.Enqueue(func(ctx context.Context) { process := func(ctx context.Context) {
if err := w.process(ctx, msg); err != nil { if err := w.process(ctx, msg); err != nil {
log.Errorf("%s %v", w.prefix, err) log.WithFields(kv.Fields{
kv.Field{K: "type", V: w.wtype},
kv.Field{K: "error", V: err},
}...).Error("message processing error")
} }
}) }
// Attempt a fast-enqueue of process
if !w.workers.EnqueueNow(process) {
// No spot acquired, log warning
log.WithFields(kv.Fields{
kv.Field{K: "type", V: w.wtype},
kv.Field{K: "queue", V: w.workers.Queue()},
}...).Warn("full worker queue")
// Block on enqueuing process func
w.workers.Enqueue(process)
}
} }

View File

@ -26,6 +26,11 @@ import (
"net/netip" "net/netip"
"runtime" "runtime"
"time" "time"
"codeberg.org/gruf/go-bytesize"
"codeberg.org/gruf/go-kv"
"github.com/cornelk/hashmap"
"github.com/superseriousbusiness/gotosocial/internal/log"
) )
// ErrInvalidRequest is returned if a given HTTP request is invalid and cannot be performed. // ErrInvalidRequest is returned if a given HTTP request is invalid and cannot be performed.
@ -42,8 +47,8 @@ var ErrBodyTooLarge = errors.New("body size too large")
// configuration values passed to initialized http.Transport{} // configuration values passed to initialized http.Transport{}
// and http.Client{}, along with httpclient.Client{} specific. // and http.Client{}, along with httpclient.Client{} specific.
type Config struct { type Config struct {
// MaxOpenConns limits the max number of concurrent open connections. // MaxOpenConnsPerHost limits the max number of open connections to a host.
MaxOpenConns int MaxOpenConnsPerHost int
// MaxIdleConns: see http.Transport{}.MaxIdleConns. // MaxIdleConns: see http.Transport{}.MaxIdleConns.
MaxIdleConns int MaxIdleConns int
@ -80,8 +85,9 @@ type Config struct {
// is available (context channels still respected) // is available (context channels still respected)
type Client struct { type Client struct {
client http.Client client http.Client
rc *requestQueue queue *hashmap.Map[string, chan struct{}]
bmax int64 bmax int64 // max response body size
cmax int // max open conns per host
} }
// New returns a new instance of Client initialized using configuration. // New returns a new instance of Client initialized using configuration.
@ -94,20 +100,20 @@ func New(cfg Config) *Client {
Resolver: &net.Resolver{}, Resolver: &net.Resolver{},
} }
if cfg.MaxOpenConns <= 0 { if cfg.MaxOpenConnsPerHost <= 0 {
// By default base this value on GOMAXPROCS. // By default base this value on GOMAXPROCS.
maxprocs := runtime.GOMAXPROCS(0) maxprocs := runtime.GOMAXPROCS(0)
cfg.MaxOpenConns = maxprocs * 10 cfg.MaxOpenConnsPerHost = maxprocs * 20
} }
if cfg.MaxIdleConns <= 0 { if cfg.MaxIdleConns <= 0 {
// By default base this value on MaxOpenConns // By default base this value on MaxOpenConns
cfg.MaxIdleConns = cfg.MaxOpenConns * 10 cfg.MaxIdleConns = cfg.MaxOpenConnsPerHost * 10
} }
if cfg.MaxBodySize <= 0 { if cfg.MaxBodySize <= 0 {
// By default set this to a reasonable 40MB // By default set this to a reasonable 40MB
cfg.MaxBodySize = 40 * 1024 * 1024 cfg.MaxBodySize = int64(40 * bytesize.MiB)
} }
// Protect dialer with IP range sanitizer // Protect dialer with IP range sanitizer
@ -117,11 +123,10 @@ func New(cfg Config) *Client {
}).Sanitize }).Sanitize
// Prepare client fields // Prepare client fields
c.bmax = cfg.MaxBodySize
c.rc = &requestQueue{
maxOpenConns: cfg.MaxOpenConns,
}
c.client.Timeout = cfg.Timeout c.client.Timeout = cfg.Timeout
c.cmax = cfg.MaxOpenConnsPerHost
c.bmax = cfg.MaxBodySize
c.queue = hashmap.New[string, chan struct{}]()
// Set underlying HTTP client roundtripper // Set underlying HTTP client roundtripper
c.client.Transport = &http.Transport{ c.client.Transport = &http.Transport{
@ -145,17 +150,16 @@ func New(cfg Config) *Client {
// as the standard http.Client{}.Do() implementation except that response body will // as the standard http.Client{}.Do() implementation except that response body will
// be wrapped by an io.LimitReader() to limit response body sizes. // be wrapped by an io.LimitReader() to limit response body sizes.
func (c *Client) Do(req *http.Request) (*http.Response, error) { func (c *Client) Do(req *http.Request) (*http.Response, error) {
// request a spot in the wait queue... // Get host's wait queue
wait, release := c.rc.getWaitSpot(req.Host, req.Method) wait := c.wait(req.Host)
var ok bool
// ... and wait our turn
select { select {
case <-req.Context().Done(): // Quickly try grab a spot
// the request was canceled before we
// got to our turn: no need to release
return nil, req.Context().Err()
case wait <- struct{}{}: case wait <- struct{}{}:
// it's our turn! // it's our turn!
ok = true
// NOTE: // NOTE:
// Ideally here we would set the slot release to happen either // Ideally here we would set the slot release to happen either
@ -167,7 +171,27 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
// that connections may not be closed until response body is closed. // that connections may not be closed until response body is closed.
// The current implementation will reduce the viability of denial of // The current implementation will reduce the viability of denial of
// service attacks, but if there are future issues heed this advice :] // service attacks, but if there are future issues heed this advice :]
defer release() defer func() { <-wait }()
default:
}
if !ok {
// No spot acquired, log warning
log.WithFields(kv.Fields{
{K: "queue", V: len(wait)},
{K: "method", V: req.Method},
{K: "host", V: req.Host},
{K: "uri", V: req.URL.RequestURI()},
}...).Warn("full request queue")
select {
case <-req.Context().Done():
// the request was canceled before we
// got to our turn: no need to release
return nil, req.Context().Err()
case wait <- struct{}{}:
defer func() { <-wait }()
}
} }
// Firstly, ensure this is a valid request // Firstly, ensure this is a valid request
@ -208,3 +232,17 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
return rsp, nil return rsp, nil
} }
// wait acquires the 'wait' queue for the given host string, or allocates new.
func (c *Client) wait(host string) chan struct{} {
// Look for an existing queue
queue, ok := c.queue.Get(host)
if ok {
return queue
}
// Allocate a new host queue (or return a sneaky existing one).
queue, _ = c.queue.GetOrInsert(host, make(chan struct{}, c.cmax))
return queue
}

View File

@ -1,68 +0,0 @@
/*
GoToSocial
Copyright (C) 2021-2022 GoToSocial Authors admin@gotosocial.org
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package httpclient
import (
"strings"
"sync"
"github.com/superseriousbusiness/gotosocial/internal/log"
)
type requestQueue struct {
hostQueues sync.Map // map of `hostQueue`
maxOpenConns int // max open conns per host per request method
}
type hostQueue struct {
slotsByMethod sync.Map
}
// getWaitSpot returns a wait channel and release function for http clients
// that want to do requests politely: that is, wait for their turn.
//
// To wait, a caller should do a select on an attempted insert into the
// returned wait channel. Once the insert succeeds, then the caller should
// proceed with the http request that pertains to the given host + method.
// It doesn't matter what's put into the wait channel, just any interface{}.
//
// When the caller is finished with their http request, they should free up the
// slot they were occupying in the wait queue, by calling the release function.
//
// The reason for the caller needing to provide host and method, is that each
// remote host has a separate wait queue, and there's a separate wait queue
// per method for that host as well. This ensures that outgoing requests can still
// proceed for others hosts and methods while other requests are undergoing,
// while also preventing one host from being spammed with, for example, a
// shitload of GET requests all at once.
func (rc *requestQueue) getWaitSpot(host string, method string) (wait chan<- interface{}, release func()) {
hostQueueI, _ := rc.hostQueues.LoadOrStore(host, new(hostQueue))
hostQueue, ok := hostQueueI.(*hostQueue)
if !ok {
log.Panic("hostQueueI was not a *hostQueue")
}
waitSlotI, _ := hostQueue.slotsByMethod.LoadOrStore(strings.ToUpper(method), make(chan interface{}, rc.maxOpenConns))
methodQueue, ok := waitSlotI.(chan interface{})
if !ok {
log.Panic("waitSlotI was not a chan interface{}")
}
return methodQueue, func() { <-methodQueue }
}

View File

@ -1,106 +0,0 @@
/*
GoToSocial
Copyright (C) 2021-2022 GoToSocial Authors admin@gotosocial.org
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package httpclient
import (
"net/http"
"testing"
"time"
"github.com/stretchr/testify/suite"
)
type QueueTestSuite struct {
suite.Suite
}
func (suite *QueueTestSuite) TestQueue() {
maxOpenConns := 5
waitTimeout := 1 * time.Second
rc := &requestQueue{
maxOpenConns: maxOpenConns,
}
// fill all the open connections
var release func()
for i, n := range make([]interface{}, maxOpenConns) {
w, r := rc.getWaitSpot("example.org", http.MethodPost)
w <- n
if i == maxOpenConns-1 {
// save the last release function
release = r
}
}
// try to wait again for the same host/method combo, it should timeout
waitAgain, _ := rc.getWaitSpot("example.org", "post")
select {
case waitAgain <- struct{}{}:
suite.FailNow("first wait did not time out")
case <-time.After(waitTimeout):
break
}
// now close the final release that we derived earlier
release()
// try waiting again, it should work this time
select {
case waitAgain <- struct{}{}:
break
case <-time.After(waitTimeout):
suite.FailNow("second wait timed out")
}
// the POST queue is now sitting on full
suite.Len(waitAgain, maxOpenConns)
// we should still be able to make a GET for the same host though
getWait, getRelease := rc.getWaitSpot("example.org", http.MethodGet)
select {
case getWait <- struct{}{}:
break
case <-time.After(waitTimeout):
suite.FailNow("get wait timed out")
}
// the GET queue has one request waiting
suite.Len(getWait, 1)
// clear it...
getRelease()
suite.Empty(getWait)
// even though the POST queue for example.org is full, we
// should still be able to make a POST request to another host :)
waitForAnotherHost, _ := rc.getWaitSpot("somewhere.else", http.MethodPost)
select {
case waitForAnotherHost <- struct{}{}:
break
case <-time.After(waitTimeout):
suite.FailNow("get wait timed out")
}
suite.Len(waitForAnotherHost, 1)
}
func TestQueueTestSuite(t *testing.T) {
suite.Run(t, &QueueTestSuite{})
}