diff options
Diffstat (limited to 'internal/transport/deliver.go')
-rw-r--r-- | internal/transport/deliver.go | 212 |
1 files changed, 141 insertions, 71 deletions
diff --git a/internal/transport/deliver.go b/internal/transport/deliver.go index fe4d04582..a7e73465d 100644 --- a/internal/transport/deliver.go +++ b/internal/transport/deliver.go @@ -19,118 +19,188 @@ package transport import ( "context" + "encoding/json" "net/http" "net/url" - "sync" "codeberg.org/gruf/go-byteutil" apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util" "github.com/superseriousbusiness/gotosocial/internal/config" + "github.com/superseriousbusiness/gotosocial/internal/gtscontext" "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/httpclient" + "github.com/superseriousbusiness/gotosocial/internal/transport/delivery" ) -func (t *transport) BatchDeliver(ctx context.Context, b []byte, recipients []*url.URL) error { +func (t *transport) BatchDeliver(ctx context.Context, obj map[string]interface{}, recipients []*url.URL) error { var ( - // errs accumulates errors received during - // attempted delivery by deliverer routines. - errs gtserror.MultiError - - // wait blocks until all sender - // routines have returned. - wait sync.WaitGroup + // accumulated delivery reqs. + reqs []*delivery.Delivery - // mutex protects 'recipients' and - // 'errs' for concurrent access. - mutex sync.Mutex + // accumulated preparation errs. + errs gtserror.MultiError // Get current instance host info. domain = config.GetAccountDomain() host = config.GetHost() ) - // Block on expect no. senders. - wait.Add(t.controller.senders) - - for i := 0; i < t.controller.senders; i++ { - go func() { - // Mark returned. - defer wait.Done() - - for { - // Acquire lock. - mutex.Lock() - - if len(recipients) == 0 { - // Reached end. - mutex.Unlock() - return - } - - // Pop next recipient. - i := len(recipients) - 1 - to := recipients[i] - recipients = recipients[:i] - - // Done with lock. - mutex.Unlock() - - // Skip delivery to recipient if it is "us". - if to.Host == host || to.Host == domain { - continue - } - - // Attempt to deliver data to recipient. - if err := t.deliver(ctx, b, to); err != nil { - mutex.Lock() // safely append err to accumulator. - errs.Appendf("error delivering to %s: %w", to, err) - mutex.Unlock() - } - } - }() + // Marshal object as JSON. + b, err := json.Marshal(obj) + if err != nil { + return gtserror.Newf("error marshaling json: %w", err) + } + + // Extract object IDs. + actID := getActorID(obj) + objID := getObjectID(obj) + tgtID := getTargetID(obj) + + for _, to := range recipients { + // Skip delivery to recipient if it is "us". + if to.Host == host || to.Host == domain { + continue + } + + // Prepare http client request. + req, err := t.prepare(ctx, + actID, + objID, + tgtID, + b, + to, + ) + if err != nil { + errs.Append(err) + continue + } + + // Append to request queue. + reqs = append(reqs, req) } - // Wait for finish. - wait.Wait() + // Push prepared request list to the delivery queue. + t.controller.state.Workers.Delivery.Queue.Push(reqs...) // Return combined err. return errs.Combine() } -func (t *transport) Deliver(ctx context.Context, b []byte, to *url.URL) error { +func (t *transport) Deliver(ctx context.Context, obj map[string]interface{}, to *url.URL) error { // if 'to' host is our own, skip as we don't need to deliver to ourselves... if to.Host == config.GetHost() || to.Host == config.GetAccountDomain() { return nil } - // Deliver data to recipient. - return t.deliver(ctx, b, to) + // Marshal object as JSON. + b, err := json.Marshal(obj) + if err != nil { + return gtserror.Newf("error marshaling json: %w", err) + } + + // Prepare http client request. + req, err := t.prepare(ctx, + getActorID(obj), + getObjectID(obj), + getTargetID(obj), + b, + to, + ) + if err != nil { + return err + } + + // Push prepared request to the delivery queue. + t.controller.state.Workers.Delivery.Queue.Push(req) + + return nil } -func (t *transport) deliver(ctx context.Context, b []byte, to *url.URL) error { +// prepare will prepare a POST http.Request{} +// to recipient at 'to', wrapping in a queued +// request object with signing function. +func (t *transport) prepare( + ctx context.Context, + actorID string, + objectID string, + targetID string, + data []byte, + to *url.URL, +) ( + *delivery.Delivery, + error, +) { url := to.String() - // Use rewindable bytes reader for body. + // Use rewindable reader for body. var body byteutil.ReadNopCloser - body.Reset(b) + body.Reset(data) + + // Prepare POST signer. + sign := t.signPOST(data) - req, err := http.NewRequestWithContext(ctx, "POST", url, &body) + // Update to-be-used request context with signing details. + ctx = gtscontext.SetOutgoingPublicKeyID(ctx, t.pubKeyID) + ctx = gtscontext.SetHTTPClientSignFunc(ctx, sign) + + // Prepare a new request with data body directed at URL. + r, err := http.NewRequestWithContext(ctx, "POST", url, &body) if err != nil { - return err + return nil, gtserror.Newf("error preparing request: %w", err) } - req.Header.Add("Content-Type", string(apiutil.AppActivityLDJSON)) - req.Header.Add("Accept-Charset", "utf-8") + // Set the standard ActivityPub content-type + charset headers. + r.Header.Add("Content-Type", string(apiutil.AppActivityLDJSON)) + r.Header.Add("Accept-Charset", "utf-8") - rsp, err := t.POST(req, b) - if err != nil { - return err + // Validate the request before queueing for delivery. + if err := httpclient.ValidateRequest(r); err != nil { + return nil, err + } + + return &delivery.Delivery{ + ActorID: actorID, + ObjectID: objectID, + TargetID: targetID, + Request: httpclient.WrapRequest(r), + }, nil +} + +// getObjectID extracts an object ID from 'serialized' ActivityPub object map. +func getObjectID(obj map[string]interface{}) string { + switch t := obj["object"].(type) { + case string: + return t + case map[string]interface{}: + id, _ := t["id"].(string) + return id + default: + return "" } - defer rsp.Body.Close() +} - if code := rsp.StatusCode; code != http.StatusOK && - code != http.StatusCreated && code != http.StatusAccepted { - return gtserror.NewFromResponse(rsp) +// getActorID extracts an actor ID from 'serialized' ActivityPub object map. +func getActorID(obj map[string]interface{}) string { + switch t := obj["actor"].(type) { + case string: + return t + case map[string]interface{}: + id, _ := t["id"].(string) + return id + default: + return "" } +} - return nil +// getTargetID extracts a target ID from 'serialized' ActivityPub object map. +func getTargetID(obj map[string]interface{}) string { + switch t := obj["target"].(type) { + case string: + return t + case map[string]interface{}: + id, _ := t["id"].(string) + return id + default: + return "" + } } |