diff options
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/httpclient/client.go | 19 | ||||
| -rw-r--r-- | internal/httpclient/queue.go | 68 | ||||
| -rw-r--r-- | internal/httpclient/queue_test.go | 106 | 
3 files changed, 187 insertions, 6 deletions
diff --git a/internal/httpclient/client.go b/internal/httpclient/client.go index 56992b915..45994b2ba 100644 --- a/internal/httpclient/client.go +++ b/internal/httpclient/client.go @@ -80,7 +80,7 @@ type Config struct {  //   is available (context channels still respected)  type Client struct {  	client http.Client -	queue  chan struct{} +	rc     *requestQueue  	bmax   int64  } @@ -118,7 +118,9 @@ func New(cfg Config) *Client {  	// Prepare client fields  	c.bmax = cfg.MaxBodySize -	c.queue = make(chan struct{}, cfg.MaxOpenConns) +	c.rc = &requestQueue{ +		maxOpenConns: cfg.MaxOpenConns, +	}  	c.client.Timeout = cfg.Timeout  	// Set underlying HTTP client roundtripper @@ -143,13 +145,18 @@ func New(cfg Config) *Client {  // as the standard http.Client{}.Do() implementation except that response body will  // be wrapped by an io.LimitReader() to limit response body sizes.  func (c *Client) Do(req *http.Request) (*http.Response, error) { +	// request a spot in the wait queue... +	wait, release := c.rc.getWaitSpot(req.Host, req.Method) + +	// ... and wait our turn  	select { -	// Request context cancelled  	case <-req.Context().Done(): +		// the request was canceled before we +		// got to our turn: no need to release  		return nil, req.Context().Err() +	case wait <- struct{}{}: +		// it's our turn! -	// Slot in queue acquired -	case c.queue <- struct{}{}:  		// NOTE:  		// Ideally here we would set the slot release to happen either  		// on error return, or via callback from the response body closer. @@ -160,7 +167,7 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {  		// that connections may not be closed until response body is closed.  		// The current implementation will reduce the viability of denial of  		// service attacks, but if there are future issues heed this advice :] -		defer func() { <-c.queue }() +		defer release()  	}  	// Firstly, ensure this is a valid request diff --git a/internal/httpclient/queue.go b/internal/httpclient/queue.go new file mode 100644 index 000000000..8cb1274be --- /dev/null +++ b/internal/httpclient/queue.go @@ -0,0 +1,68 @@ +/* +   GoToSocial +   Copyright (C) 2021-2022 GoToSocial Authors admin@gotosocial.org + +   This program is free software: you can redistribute it and/or modify +   it under the terms of the GNU Affero General Public License as published by +   the Free Software Foundation, either version 3 of the License, or +   (at your option) any later version. + +   This program is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU Affero General Public License for more details. + +   You should have received a copy of the GNU Affero General Public License +   along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +package httpclient + +import ( +	"strings" +	"sync" + +	"github.com/superseriousbusiness/gotosocial/internal/log" +) + +type requestQueue struct { +	hostQueues   sync.Map // map of `hostQueue` +	maxOpenConns int      // max open conns per host per request method +} + +type hostQueue struct { +	slotsByMethod sync.Map +} + +// getWaitSpot returns a wait channel and release function for http clients +// that want to do requests politely: that is, wait for their turn. +// +// To wait, a caller should do a select on an attempted insert into the +// returned wait channel. Once the insert succeeds, then the caller should +// proceed with the http request that pertains to the given host + method. +// It doesn't matter what's put into the wait channel, just any interface{}. +// +// When the caller is finished with their http request, they should free up the +// slot they were occupying in the wait queue, by calling the release function. +// +// The reason for the caller needing to provide host and method, is that each +// remote host has a separate wait queue, and there's a separate wait queue +// per method for that host as well. This ensures that outgoing requests can still +// proceed for others hosts and methods while other requests are undergoing, +// while also preventing one host from being spammed with, for example, a +// shitload of GET requests all at once. +func (rc *requestQueue) getWaitSpot(host string, method string) (wait chan<- interface{}, release func()) { +	hostQueueI, _ := rc.hostQueues.LoadOrStore(host, new(hostQueue)) +	hostQueue, ok := hostQueueI.(*hostQueue) +	if !ok { +		log.Panic("hostQueueI was not a *hostQueue") +	} + +	waitSlotI, _ := hostQueue.slotsByMethod.LoadOrStore(strings.ToUpper(method), make(chan interface{}, rc.maxOpenConns)) +	methodQueue, ok := waitSlotI.(chan interface{}) +	if !ok { +		log.Panic("waitSlotI was not a chan interface{}") +	} + +	return methodQueue, func() { <-methodQueue } +} diff --git a/internal/httpclient/queue_test.go b/internal/httpclient/queue_test.go new file mode 100644 index 000000000..c6d6ad324 --- /dev/null +++ b/internal/httpclient/queue_test.go @@ -0,0 +1,106 @@ +/* +   GoToSocial +   Copyright (C) 2021-2022 GoToSocial Authors admin@gotosocial.org + +   This program is free software: you can redistribute it and/or modify +   it under the terms of the GNU Affero General Public License as published by +   the Free Software Foundation, either version 3 of the License, or +   (at your option) any later version. + +   This program is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU Affero General Public License for more details. + +   You should have received a copy of the GNU Affero General Public License +   along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +package httpclient + +import ( +	"net/http" +	"testing" +	"time" + +	"github.com/stretchr/testify/suite" +) + +type QueueTestSuite struct { +	suite.Suite +} + +func (suite *QueueTestSuite) TestQueue() { +	maxOpenConns := 5 +	waitTimeout := 1 * time.Second + +	rc := &requestQueue{ +		maxOpenConns: maxOpenConns, +	} + +	// fill all the open connections +	var release func() +	for i, n := range make([]interface{}, maxOpenConns) { +		w, r := rc.getWaitSpot("example.org", http.MethodPost) +		w <- n +		if i == maxOpenConns-1 { +			// save the last release function +			release = r +		} +	} + +	// try to wait again for the same host/method combo, it should timeout +	waitAgain, _ := rc.getWaitSpot("example.org", "post") + +	select { +	case waitAgain <- struct{}{}: +		suite.FailNow("first wait did not time out") +	case <-time.After(waitTimeout): +		break +	} + +	// now close the final release that we derived earlier +	release() + +	// try waiting again, it should work this time +	select { +	case waitAgain <- struct{}{}: +		break +	case <-time.After(waitTimeout): +		suite.FailNow("second wait timed out") +	} + +	// the POST queue is now sitting on full +	suite.Len(waitAgain, maxOpenConns) + +	// we should still be able to make a GET for the same host though +	getWait, getRelease := rc.getWaitSpot("example.org", http.MethodGet) +	select { +	case getWait <- struct{}{}: +		break +	case <-time.After(waitTimeout): +		suite.FailNow("get wait timed out") +	} + +	// the GET queue has one request waiting +	suite.Len(getWait, 1) +	// clear it... +	getRelease() +	suite.Empty(getWait) + +	// even though the POST queue for example.org is full, we +	// should still be able to make a POST request to another host :) +	waitForAnotherHost, _ := rc.getWaitSpot("somewhere.else", http.MethodPost) +	select { +	case waitForAnotherHost <- struct{}{}: +		break +	case <-time.After(waitTimeout): +		suite.FailNow("get wait timed out") +	} + +	suite.Len(waitForAnotherHost, 1) +} + +func TestQueueTestSuite(t *testing.T) { +	suite.Run(t, &QueueTestSuite{}) +}  | 
