summaryrefslogtreecommitdiff
path: root/internal/httpclient/client.go
diff options
context:
space:
mode:
authorLibravatar kim <89579420+NyaaaWhatsUpDoc@users.noreply.github.com>2022-11-08 09:35:24 +0000
committerLibravatar GitHub <noreply@github.com>2022-11-08 10:35:24 +0100
commit0e572460830ce7767562f08034f96d51e41b6349 (patch)
tree2ffad44c4d6f07b9d4c3bd494d6d1a3d3918692f /internal/httpclient/client.go
parent[chore] update gruf libraries (#996) (diff)
downloadgotosocial-0e572460830ce7767562f08034f96d51e41b6349.tar.xz
[feature] various worker / request queue improvements (#995)
* greatly simplify httpclient request queuing Signed-off-by: kim <grufwub@gmail.com> * improved request queue mutex logic Signed-off-by: kim <grufwub@gmail.com> * use improved hashmap library Signed-off-by: kim <grufwub@gmail.com> * add warn logging when request queues are full Signed-off-by: kim <grufwub@gmail.com> * improve worker pool prefix var naming Signed-off-by: kim <grufwub@gmail.com> * improved worker pool error logging Signed-off-by: kim <grufwub@gmail.com> * move error message into separate field Signed-off-by: kim <grufwub@gmail.com> * remove old log statement Signed-off-by: kim <grufwub@gmail.com> * don't export worker message, it gets very spammy :') Signed-off-by: kim <grufwub@gmail.com> Signed-off-by: kim <grufwub@gmail.com>
Diffstat (limited to 'internal/httpclient/client.go')
-rw-r--r--internal/httpclient/client.go78
1 files changed, 58 insertions, 20 deletions
diff --git a/internal/httpclient/client.go b/internal/httpclient/client.go
index 7aa0cd8ea..8792e5b82 100644
--- a/internal/httpclient/client.go
+++ b/internal/httpclient/client.go
@@ -26,6 +26,11 @@ import (
"net/netip"
"runtime"
"time"
+
+ "codeberg.org/gruf/go-bytesize"
+ "codeberg.org/gruf/go-kv"
+ "github.com/cornelk/hashmap"
+ "github.com/superseriousbusiness/gotosocial/internal/log"
)
// ErrInvalidRequest is returned if a given HTTP request is invalid and cannot be performed.
@@ -42,8 +47,8 @@ var ErrBodyTooLarge = errors.New("body size too large")
// configuration values passed to initialized http.Transport{}
// and http.Client{}, along with httpclient.Client{} specific.
type Config struct {
- // MaxOpenConns limits the max number of concurrent open connections.
- MaxOpenConns int
+ // MaxOpenConnsPerHost limits the max number of open connections to a host.
+ MaxOpenConnsPerHost int
// MaxIdleConns: see http.Transport{}.MaxIdleConns.
MaxIdleConns int
@@ -80,8 +85,9 @@ type Config struct {
// is available (context channels still respected)
type Client struct {
client http.Client
- rc *requestQueue
- bmax int64
+ queue *hashmap.Map[string, chan struct{}]
+ bmax int64 // max response body size
+ cmax int // max open conns per host
}
// New returns a new instance of Client initialized using configuration.
@@ -94,20 +100,20 @@ func New(cfg Config) *Client {
Resolver: &net.Resolver{},
}
- if cfg.MaxOpenConns <= 0 {
+ if cfg.MaxOpenConnsPerHost <= 0 {
// By default base this value on GOMAXPROCS.
maxprocs := runtime.GOMAXPROCS(0)
- cfg.MaxOpenConns = maxprocs * 10
+ cfg.MaxOpenConnsPerHost = maxprocs * 20
}
if cfg.MaxIdleConns <= 0 {
// By default base this value on MaxOpenConns
- cfg.MaxIdleConns = cfg.MaxOpenConns * 10
+ cfg.MaxIdleConns = cfg.MaxOpenConnsPerHost * 10
}
if cfg.MaxBodySize <= 0 {
// By default set this to a reasonable 40MB
- cfg.MaxBodySize = 40 * 1024 * 1024
+ cfg.MaxBodySize = int64(40 * bytesize.MiB)
}
// Protect dialer with IP range sanitizer
@@ -117,11 +123,10 @@ func New(cfg Config) *Client {
}).Sanitize
// Prepare client fields
- c.bmax = cfg.MaxBodySize
- c.rc = &requestQueue{
- maxOpenConns: cfg.MaxOpenConns,
- }
c.client.Timeout = cfg.Timeout
+ c.cmax = cfg.MaxOpenConnsPerHost
+ c.bmax = cfg.MaxBodySize
+ c.queue = hashmap.New[string, chan struct{}]()
// Set underlying HTTP client roundtripper
c.client.Transport = &http.Transport{
@@ -145,17 +150,16 @@ func New(cfg Config) *Client {
// as the standard http.Client{}.Do() implementation except that response body will
// be wrapped by an io.LimitReader() to limit response body sizes.
func (c *Client) Do(req *http.Request) (*http.Response, error) {
- // request a spot in the wait queue...
- wait, release := c.rc.getWaitSpot(req.Host, req.Method)
+ // Get host's wait queue
+ wait := c.wait(req.Host)
+
+ var ok bool
- // ... and wait our turn
select {
- case <-req.Context().Done():
- // the request was canceled before we
- // got to our turn: no need to release
- return nil, req.Context().Err()
+ // Quickly try grab a spot
case wait <- struct{}{}:
// it's our turn!
+ ok = true
// NOTE:
// Ideally here we would set the slot release to happen either
@@ -167,7 +171,27 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
// that connections may not be closed until response body is closed.
// The current implementation will reduce the viability of denial of
// service attacks, but if there are future issues heed this advice :]
- defer release()
+ defer func() { <-wait }()
+ default:
+ }
+
+ if !ok {
+ // No spot acquired, log warning
+ log.WithFields(kv.Fields{
+ {K: "queue", V: len(wait)},
+ {K: "method", V: req.Method},
+ {K: "host", V: req.Host},
+ {K: "uri", V: req.URL.RequestURI()},
+ }...).Warn("full request queue")
+
+ select {
+ case <-req.Context().Done():
+ // the request was canceled before we
+ // got to our turn: no need to release
+ return nil, req.Context().Err()
+ case wait <- struct{}{}:
+ defer func() { <-wait }()
+ }
}
// Firstly, ensure this is a valid request
@@ -208,3 +232,17 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
return rsp, nil
}
+
+// wait acquires the 'wait' queue for the given host string, or allocates new.
+func (c *Client) wait(host string) chan struct{} {
+ // Look for an existing queue
+ queue, ok := c.queue.Get(host)
+ if ok {
+ return queue
+ }
+
+ // Allocate a new host queue (or return a sneaky existing one).
+ queue, _ = c.queue.GetOrInsert(host, make(chan struct{}, c.cmax))
+
+ return queue
+}