diff options
author | 2024-04-11 10:45:35 +0100 | |
---|---|---|
committer | 2024-04-11 11:45:35 +0200 | |
commit | a483bd9e38333b153df1d4df95276cca38f99ff5 (patch) | |
tree | b2fdb6f53ef248b31719a15adc93e767eba3d5c4 /vendor/github.com | |
parent | [chore]: Bump github.com/yuin/goldmark from 1.7.0 to 1.7.1 (#2819) (diff) | |
download | gotosocial-a483bd9e38333b153df1d4df95276cca38f99ff5.tar.xz |
[performance] massively improved ActivityPub delivery worker efficiency (#2812)
* add delivery worker type that pulls from queue to httpclient package
* finish up some code commenting, bodge a vendored activity library change, integrate the deliverypool changes into transportcontroller
* hook up queue deletion logic
* support deleting queued http requests by target ID
* don't index APRequest by hostname in the queue
* use gorun
* use the original context's values when wrapping msg type as delivery{}
* actually log in the AP delivery worker ...
* add uncommitted changes
* use errors.AsV2()
* use errorsv2.AsV2()
* finish adding some code comments, add bad host handling to delivery workers
* slightly tweak deliveryworkerpool API, use advanced sender multiplier
* remove PopCtx() method, let others instead rely on Wait()
* shuffle things around to move delivery stuff into transport/ subpkg
* remove dead code
* formatting
* validate request before queueing for delivery
* finish adding code comments, fix up backoff code
* finish adding more code comments
* clamp minimum no. senders to 1
* add start/stop logging to delivery worker, some slight changes
* remove double logging
* use worker ptrs
* expose the embedded log fields in httpclient.Request{}
* ensure request context values are preserved when updating ctx
* add delivery worker tests
* fix linter issues
* ensure delivery worker gets inited in testrig
* fix tests to delivering messages to check worker delivery queue
* update error type to use ptr instead of value receiver
* fix test calling Workers{}.Start() instead of testrig.StartWorkers()
* update docs for advanced-sender-multiplier
* update to the latest activity library version
* add comment about not using httptest.Server{}
Diffstat (limited to 'vendor/github.com')
-rw-r--r-- | vendor/github.com/superseriousbusiness/activity/pub/side_effect_actor.go | 8 | ||||
-rw-r--r-- | vendor/github.com/superseriousbusiness/activity/pub/transport.go | 67 |
2 files changed, 41 insertions, 34 deletions
diff --git a/vendor/github.com/superseriousbusiness/activity/pub/side_effect_actor.go b/vendor/github.com/superseriousbusiness/activity/pub/side_effect_actor.go index 4062cf507..0c1da9e91 100644 --- a/vendor/github.com/superseriousbusiness/activity/pub/side_effect_actor.go +++ b/vendor/github.com/superseriousbusiness/activity/pub/side_effect_actor.go @@ -2,7 +2,6 @@ package pub import ( "context" - "encoding/json" "fmt" "net/http" "net/url" @@ -477,17 +476,12 @@ func (a *SideEffectActor) deliverToRecipients(c context.Context, boxIRI *url.URL return err } - b, err := json.Marshal(m) - if err != nil { - return err - } - tp, err := a.common.NewTransport(c, boxIRI, goFedUserAgent()) if err != nil { return err } - return tp.BatchDeliver(c, b, recipients) + return tp.BatchDeliver(c, m, recipients) } // addToOutbox adds the activity to the outbox and creates the activity in the diff --git a/vendor/github.com/superseriousbusiness/activity/pub/transport.go b/vendor/github.com/superseriousbusiness/activity/pub/transport.go index a770b8b46..101ff5c07 100644 --- a/vendor/github.com/superseriousbusiness/activity/pub/transport.go +++ b/vendor/github.com/superseriousbusiness/activity/pub/transport.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "crypto" + "encoding/json" "fmt" "net/http" "net/url" @@ -44,10 +45,10 @@ type Transport interface { Dereference(c context.Context, iri *url.URL) (*http.Response, error) // Deliver sends an ActivityStreams object. - Deliver(c context.Context, b []byte, to *url.URL) error + Deliver(c context.Context, obj map[string]interface{}, to *url.URL) error // BatchDeliver sends an ActivityStreams object to multiple recipients. - BatchDeliver(c context.Context, b []byte, recipients []*url.URL) error + BatchDeliver(c context.Context, obj map[string]interface{}, recipients []*url.URL) error } // Transport must be implemented by HttpSigTransport. @@ -138,43 +139,27 @@ func (h HttpSigTransport) Dereference(c context.Context, iri *url.URL) (*http.Re } // Deliver sends a POST request with an HTTP Signature. -func (h HttpSigTransport) Deliver(c context.Context, b []byte, to *url.URL) error { - req, err := http.NewRequest("POST", to.String(), bytes.NewReader(b)) - if err != nil { - return err - } - req = req.WithContext(c) - req.Header.Add(contentTypeHeader, contentTypeHeaderValue) - req.Header.Add("Accept-Charset", "utf-8") - req.Header.Add("Date", h.clock.Now().UTC().Format("Mon, 02 Jan 2006 15:04:05")+" GMT") - req.Header.Add("User-Agent", fmt.Sprintf("%s %s", h.appAgent, h.gofedAgent)) - req.Header.Set("Host", to.Host) - h.postSignerMu.Lock() - err = h.postSigner.SignRequest(h.privKey, h.pubKeyId, req, b) - h.postSignerMu.Unlock() +func (h HttpSigTransport) Deliver(c context.Context, data map[string]interface{}, to *url.URL) error { + b, err := json.Marshal(data) if err != nil { return err } - resp, err := h.client.Do(req) - if err != nil { - return err - } - defer resp.Body.Close() - if !isSuccess(resp.StatusCode) { - return fmt.Errorf("POST request to %s failed (%d): %s", to.String(), resp.StatusCode, resp.Status) - } - return nil + return h.deliver(c, b, to) } // BatchDeliver sends concurrent POST requests. Returns an error if any of the requests had an error. -func (h HttpSigTransport) BatchDeliver(c context.Context, b []byte, recipients []*url.URL) error { +func (h HttpSigTransport) BatchDeliver(c context.Context, data map[string]interface{}, recipients []*url.URL) error { + b, err := json.Marshal(data) + if err != nil { + return err + } var wg sync.WaitGroup errCh := make(chan error, len(recipients)) for _, recipient := range recipients { wg.Add(1) go func(r *url.URL) { defer wg.Done() - if err := h.Deliver(c, b, r); err != nil { + if err := h.deliver(c, b, r); err != nil { errCh <- err } }(recipient) @@ -196,6 +181,34 @@ outer: return nil } +func (h HttpSigTransport) deliver(c context.Context, b []byte, to *url.URL) error { + req, err := http.NewRequest("POST", to.String(), bytes.NewReader(b)) + if err != nil { + return err + } + req = req.WithContext(c) + req.Header.Add(contentTypeHeader, contentTypeHeaderValue) + req.Header.Add("Accept-Charset", "utf-8") + req.Header.Add("Date", h.clock.Now().UTC().Format("Mon, 02 Jan 2006 15:04:05")+" GMT") + req.Header.Add("User-Agent", fmt.Sprintf("%s %s", h.appAgent, h.gofedAgent)) + req.Header.Set("Host", to.Host) + h.postSignerMu.Lock() + err = h.postSigner.SignRequest(h.privKey, h.pubKeyId, req, b) + h.postSignerMu.Unlock() + if err != nil { + return err + } + resp, err := h.client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if !isSuccess(resp.StatusCode) { + return fmt.Errorf("POST request to %s failed (%d): %s", to.String(), resp.StatusCode, resp.Status) + } + return nil +} + // HttpClient sends http requests, and is an abstraction only needed by the // HttpSigTransport. The standard library's Client satisfies this interface. type HttpClient interface { |