summaryrefslogtreecommitdiff
path: root/internal/transport/deliver.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/transport/deliver.go')
-rw-r--r--internal/transport/deliver.go212
1 files changed, 141 insertions, 71 deletions
diff --git a/internal/transport/deliver.go b/internal/transport/deliver.go
index fe4d04582..a7e73465d 100644
--- a/internal/transport/deliver.go
+++ b/internal/transport/deliver.go
@@ -19,118 +19,188 @@ package transport
import (
"context"
+ "encoding/json"
"net/http"
"net/url"
- "sync"
"codeberg.org/gruf/go-byteutil"
apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util"
"github.com/superseriousbusiness/gotosocial/internal/config"
+ "github.com/superseriousbusiness/gotosocial/internal/gtscontext"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
+ "github.com/superseriousbusiness/gotosocial/internal/httpclient"
+ "github.com/superseriousbusiness/gotosocial/internal/transport/delivery"
)
-func (t *transport) BatchDeliver(ctx context.Context, b []byte, recipients []*url.URL) error {
+func (t *transport) BatchDeliver(ctx context.Context, obj map[string]interface{}, recipients []*url.URL) error {
var (
- // errs accumulates errors received during
- // attempted delivery by deliverer routines.
- errs gtserror.MultiError
-
- // wait blocks until all sender
- // routines have returned.
- wait sync.WaitGroup
+ // accumulated delivery reqs.
+ reqs []*delivery.Delivery
- // mutex protects 'recipients' and
- // 'errs' for concurrent access.
- mutex sync.Mutex
+ // accumulated preparation errs.
+ errs gtserror.MultiError
// Get current instance host info.
domain = config.GetAccountDomain()
host = config.GetHost()
)
- // Block on expect no. senders.
- wait.Add(t.controller.senders)
-
- for i := 0; i < t.controller.senders; i++ {
- go func() {
- // Mark returned.
- defer wait.Done()
-
- for {
- // Acquire lock.
- mutex.Lock()
-
- if len(recipients) == 0 {
- // Reached end.
- mutex.Unlock()
- return
- }
-
- // Pop next recipient.
- i := len(recipients) - 1
- to := recipients[i]
- recipients = recipients[:i]
-
- // Done with lock.
- mutex.Unlock()
-
- // Skip delivery to recipient if it is "us".
- if to.Host == host || to.Host == domain {
- continue
- }
-
- // Attempt to deliver data to recipient.
- if err := t.deliver(ctx, b, to); err != nil {
- mutex.Lock() // safely append err to accumulator.
- errs.Appendf("error delivering to %s: %w", to, err)
- mutex.Unlock()
- }
- }
- }()
+ // Marshal object as JSON.
+ b, err := json.Marshal(obj)
+ if err != nil {
+ return gtserror.Newf("error marshaling json: %w", err)
+ }
+
+ // Extract object IDs.
+ actID := getActorID(obj)
+ objID := getObjectID(obj)
+ tgtID := getTargetID(obj)
+
+ for _, to := range recipients {
+ // Skip delivery to recipient if it is "us".
+ if to.Host == host || to.Host == domain {
+ continue
+ }
+
+ // Prepare http client request.
+ req, err := t.prepare(ctx,
+ actID,
+ objID,
+ tgtID,
+ b,
+ to,
+ )
+ if err != nil {
+ errs.Append(err)
+ continue
+ }
+
+ // Append to request queue.
+ reqs = append(reqs, req)
}
- // Wait for finish.
- wait.Wait()
+ // Push prepared request list to the delivery queue.
+ t.controller.state.Workers.Delivery.Queue.Push(reqs...)
// Return combined err.
return errs.Combine()
}
-func (t *transport) Deliver(ctx context.Context, b []byte, to *url.URL) error {
+func (t *transport) Deliver(ctx context.Context, obj map[string]interface{}, to *url.URL) error {
// if 'to' host is our own, skip as we don't need to deliver to ourselves...
if to.Host == config.GetHost() || to.Host == config.GetAccountDomain() {
return nil
}
- // Deliver data to recipient.
- return t.deliver(ctx, b, to)
+ // Marshal object as JSON.
+ b, err := json.Marshal(obj)
+ if err != nil {
+ return gtserror.Newf("error marshaling json: %w", err)
+ }
+
+ // Prepare http client request.
+ req, err := t.prepare(ctx,
+ getActorID(obj),
+ getObjectID(obj),
+ getTargetID(obj),
+ b,
+ to,
+ )
+ if err != nil {
+ return err
+ }
+
+ // Push prepared request to the delivery queue.
+ t.controller.state.Workers.Delivery.Queue.Push(req)
+
+ return nil
}
-func (t *transport) deliver(ctx context.Context, b []byte, to *url.URL) error {
+// prepare will prepare a POST http.Request{}
+// to recipient at 'to', wrapping in a queued
+// request object with signing function.
+func (t *transport) prepare(
+ ctx context.Context,
+ actorID string,
+ objectID string,
+ targetID string,
+ data []byte,
+ to *url.URL,
+) (
+ *delivery.Delivery,
+ error,
+) {
url := to.String()
- // Use rewindable bytes reader for body.
+ // Use rewindable reader for body.
var body byteutil.ReadNopCloser
- body.Reset(b)
+ body.Reset(data)
+
+ // Prepare POST signer.
+ sign := t.signPOST(data)
- req, err := http.NewRequestWithContext(ctx, "POST", url, &body)
+ // Update to-be-used request context with signing details.
+ ctx = gtscontext.SetOutgoingPublicKeyID(ctx, t.pubKeyID)
+ ctx = gtscontext.SetHTTPClientSignFunc(ctx, sign)
+
+ // Prepare a new request with data body directed at URL.
+ r, err := http.NewRequestWithContext(ctx, "POST", url, &body)
if err != nil {
- return err
+ return nil, gtserror.Newf("error preparing request: %w", err)
}
- req.Header.Add("Content-Type", string(apiutil.AppActivityLDJSON))
- req.Header.Add("Accept-Charset", "utf-8")
+ // Set the standard ActivityPub content-type + charset headers.
+ r.Header.Add("Content-Type", string(apiutil.AppActivityLDJSON))
+ r.Header.Add("Accept-Charset", "utf-8")
- rsp, err := t.POST(req, b)
- if err != nil {
- return err
+ // Validate the request before queueing for delivery.
+ if err := httpclient.ValidateRequest(r); err != nil {
+ return nil, err
+ }
+
+ return &delivery.Delivery{
+ ActorID: actorID,
+ ObjectID: objectID,
+ TargetID: targetID,
+ Request: httpclient.WrapRequest(r),
+ }, nil
+}
+
+// getObjectID extracts an object ID from 'serialized' ActivityPub object map.
+func getObjectID(obj map[string]interface{}) string {
+ switch t := obj["object"].(type) {
+ case string:
+ return t
+ case map[string]interface{}:
+ id, _ := t["id"].(string)
+ return id
+ default:
+ return ""
}
- defer rsp.Body.Close()
+}
- if code := rsp.StatusCode; code != http.StatusOK &&
- code != http.StatusCreated && code != http.StatusAccepted {
- return gtserror.NewFromResponse(rsp)
+// getActorID extracts an actor ID from 'serialized' ActivityPub object map.
+func getActorID(obj map[string]interface{}) string {
+ switch t := obj["actor"].(type) {
+ case string:
+ return t
+ case map[string]interface{}:
+ id, _ := t["id"].(string)
+ return id
+ default:
+ return ""
}
+}
- return nil
+// getTargetID extracts a target ID from 'serialized' ActivityPub object map.
+func getTargetID(obj map[string]interface{}) string {
+ switch t := obj["target"].(type) {
+ case string:
+ return t
+ case map[string]interface{}:
+ id, _ := t["id"].(string)
+ return id
+ default:
+ return ""
+ }
}