summaryrefslogtreecommitdiff
path: root/vendor/code.superseriousbusiness.org/activity/pub/side_effect_actor.go
diff options
context:
space:
mode:
authorLibravatar tobi <31960611+tsmethurst@users.noreply.github.com>2025-04-25 15:15:36 +0200
committerLibravatar GitHub <noreply@github.com>2025-04-25 15:15:36 +0200
commitffde1b150faca940bc6c172068aa068cf468aa39 (patch)
tree2b325bf50946b95502d948d5700c148d667346d8 /vendor/code.superseriousbusiness.org/activity/pub/side_effect_actor.go
parent[chore] Update `activity` to v1.14.0 (#4038) (diff)
downloadgotosocial-ffde1b150faca940bc6c172068aa068cf468aa39.tar.xz
[chore] Move deps to code.superseriousbusiness.org (#4054)
Diffstat (limited to 'vendor/code.superseriousbusiness.org/activity/pub/side_effect_actor.go')
-rw-r--r--vendor/code.superseriousbusiness.org/activity/pub/side_effect_actor.go1047
1 files changed, 1047 insertions, 0 deletions
diff --git a/vendor/code.superseriousbusiness.org/activity/pub/side_effect_actor.go b/vendor/code.superseriousbusiness.org/activity/pub/side_effect_actor.go
new file mode 100644
index 000000000..820dca403
--- /dev/null
+++ b/vendor/code.superseriousbusiness.org/activity/pub/side_effect_actor.go
@@ -0,0 +1,1047 @@
+package pub
+
+import (
+ "context"
+ "fmt"
+ "net/http"
+ "net/url"
+
+ "code.superseriousbusiness.org/activity/streams"
+ "code.superseriousbusiness.org/activity/streams/vocab"
+)
+
+// sideEffectActor must satisfy the DelegateActor interface.
+var _ DelegateActor = &SideEffectActor{}
+
+// SideEffectActor is a DelegateActor that handles the ActivityPub
+// implementation side effects, but requires a more opinionated application to
+// be written.
+//
+// Note that when using the SideEffectActor with an application that good-faith
+// implements its required interfaces, the ActivityPub specification is
+// guaranteed to be correctly followed.
+type SideEffectActor struct {
+ // When doing deliveries to remote servers via the s2s protocol, the side effect
+ // actor will by default use the Serialize function from the streams package.
+ // However, this can be overridden after the side effect actor is intantiated,
+ // by setting the exposed Serialize function on the struct. For example:
+ //
+ // a := NewSideEffectActor(...)
+ // a.Serialize = func(a vocab.Type) (m map[string]interface{}, e error) {
+ // // Put your custom serializer logic here.
+ // }
+ //
+ // Note that you should only do this *immediately* after instantiating the side
+ // effect actor -- never while your application is already running, as this will
+ // likely cause race conditions or other problems! In most cases, you will never
+ // need to change this; it's provided solely to allow easier customization by
+ // applications.
+ Serialize func(a vocab.Type) (m map[string]interface{}, e error)
+
+ // When doing deliveries to remote servers via the s2s protocol, it may be desirable
+ // for implementations to be able to pre-sort recipients so that higher-priority
+ // recipients are higher up in the delivery queue, and lower-priority recipients
+ // are further down. This can be achieved by setting the DeliveryRecipientPreSort
+ // function on the side effect actor after it's instantiated. For example:
+ //
+ // a := NewSideEffectActor(...)
+ // a.DeliveryRecipientPreSort = func(actorAndCollectionIRIs []*url.URL) []*url.URL {
+ // // Put your sorting logic here.
+ // }
+ //
+ // The actorAndCollectionIRIs parameter will be the initial list of IRIs derived by
+ // looking at the "to", "cc", "bto", "bcc", and "audience" properties of the activity
+ // being delivered, excluding the AP public IRI, and before dereferencing of inboxes.
+ // It may look something like this:
+ //
+ // [
+ // "https://example.org/users/someone/followers", // <-- collection IRI
+ // "https://another.example.org/users/someone_else", // <-- actor IRI
+ // "[...]" // <-- etc
+ // ]
+ //
+ // In this case, implementers may wish to sort the slice so that the directly-addressed
+ // actor "https://another.example.org/users/someone_else" occurs at an earlier index in
+ // the slice than the followers collection "https://example.org/users/someone/followers",
+ // so that "@someone_else" receives the delivery first.
+ //
+ // Note that you should only do this *immediately* after instantiating the side
+ // effect actor -- never while your application is already running, as this will
+ // likely cause race conditions or other problems! It's also completely fine to not
+ // set this function at all -- in this case, no pre-sorting of recipients will be
+ // performed, and delivery will occur in a non-determinate order.
+ DeliveryRecipientPreSort func(actorAndCollectionIRIs []*url.URL) []*url.URL
+
+ common CommonBehavior
+ s2s FederatingProtocol
+ c2s SocialProtocol
+ db Database
+ clock Clock
+}
+
+// NewSideEffectActor returns a new SideEffectActor, which satisfies the
+// DelegateActor interface. Most of the time you will not need to call this
+// function, and should instead rely on the NewSocialActor, NewFederatingActor,
+// and NewActor functions, all of which use a SideEffectActor under the hood.
+// Nevertheless, this function is exposed in case application developers need
+// a SideEffectActor for some other reason (tests, monkey patches, etc).
+//
+// If you are using the returned SideEffectActor for federation, ensure that s2s
+// is not nil. Likewise, if you are using it for the social protocol, ensure
+// that c2s is not nil.
+func NewSideEffectActor(c CommonBehavior,
+ s2s FederatingProtocol,
+ c2s SocialProtocol,
+ db Database,
+ clock Clock) *SideEffectActor {
+ return &SideEffectActor{
+ Serialize: streams.Serialize,
+ common: c,
+ s2s: s2s,
+ c2s: c2s,
+ db: db,
+ clock: clock,
+ }
+}
+
+// PostInboxRequestBodyHook defers to the delegate.
+func (a *SideEffectActor) PostInboxRequestBodyHook(c context.Context, r *http.Request, activity Activity) (context.Context, error) {
+ return a.s2s.PostInboxRequestBodyHook(c, r, activity)
+}
+
+// PostOutboxRequestBodyHook defers to the delegate.
+func (a *SideEffectActor) PostOutboxRequestBodyHook(c context.Context, r *http.Request, data vocab.Type) (context.Context, error) {
+ return a.c2s.PostOutboxRequestBodyHook(c, r, data)
+}
+
+// AuthenticatePostInbox defers to the delegate to authenticate the request.
+func (a *SideEffectActor) AuthenticatePostInbox(c context.Context, w http.ResponseWriter, r *http.Request) (out context.Context, authenticated bool, err error) {
+ return a.s2s.AuthenticatePostInbox(c, w, r)
+}
+
+// AuthenticateGetInbox defers to the delegate to authenticate the request.
+func (a *SideEffectActor) AuthenticateGetInbox(c context.Context, w http.ResponseWriter, r *http.Request) (out context.Context, authenticated bool, err error) {
+ return a.common.AuthenticateGetInbox(c, w, r)
+}
+
+// AuthenticatePostOutbox defers to the delegate to authenticate the request.
+func (a *SideEffectActor) AuthenticatePostOutbox(c context.Context, w http.ResponseWriter, r *http.Request) (out context.Context, authenticated bool, err error) {
+ return a.c2s.AuthenticatePostOutbox(c, w, r)
+}
+
+// AuthenticateGetOutbox defers to the delegate to authenticate the request.
+func (a *SideEffectActor) AuthenticateGetOutbox(c context.Context, w http.ResponseWriter, r *http.Request) (out context.Context, authenticated bool, err error) {
+ return a.common.AuthenticateGetOutbox(c, w, r)
+}
+
+// GetOutbox delegates to the SocialProtocol.
+func (a *SideEffectActor) GetOutbox(c context.Context, r *http.Request) (vocab.ActivityStreamsOrderedCollectionPage, error) {
+ return a.common.GetOutbox(c, r)
+}
+
+// GetInbox delegates to the FederatingProtocol.
+func (a *SideEffectActor) GetInbox(c context.Context, r *http.Request) (vocab.ActivityStreamsOrderedCollectionPage, error) {
+ return a.s2s.GetInbox(c, r)
+}
+
+// AuthorizePostInbox defers to the federating protocol whether the peer request
+// is authorized based on the actors' ids.
+func (a *SideEffectActor) AuthorizePostInbox(c context.Context, w http.ResponseWriter, activity Activity) (authorized bool, err error) {
+ authorized = false
+ actor := activity.GetActivityStreamsActor()
+ if actor == nil {
+ err = fmt.Errorf("no actors in post to inbox")
+ return
+ }
+ var iris []*url.URL
+ for i := 0; i < actor.Len(); i++ {
+ iter := actor.At(i)
+ if iter.IsIRI() {
+ iris = append(iris, iter.GetIRI())
+ } else if t := iter.GetType(); t != nil {
+ iris = append(iris, activity.GetJSONLDId().Get())
+ } else {
+ err = fmt.Errorf("actor at index %d is missing an id", i)
+ return
+ }
+ }
+ // Determine if the actor(s) sending this request are blocked.
+ var blocked bool
+ if blocked, err = a.s2s.Blocked(c, iris); err != nil {
+ return
+ } else if blocked {
+ w.WriteHeader(http.StatusForbidden)
+ return
+ }
+ authorized = true
+ return
+}
+
+// PostInbox handles the side effects of determining whether to block the peer's
+// request, adding the activity to the actor's inbox, and triggering side
+// effects based on the activity's type.
+func (a *SideEffectActor) PostInbox(c context.Context, inboxIRI *url.URL, activity Activity) error {
+ isNew, err := a.addToInboxIfNew(c, inboxIRI, activity)
+ if err != nil {
+ return err
+ }
+ if isNew {
+ wrapped, other, err := a.s2s.FederatingCallbacks(c)
+ if err != nil {
+ return err
+ }
+ // Populate side channels.
+ wrapped.db = a.db
+ wrapped.inboxIRI = inboxIRI
+ wrapped.newTransport = a.common.NewTransport
+ wrapped.deliver = a.Deliver
+ wrapped.addNewIds = a.AddNewIDs
+ res, err := streams.NewTypeResolver(wrapped.callbacks(other)...)
+ if err != nil {
+ return err
+ }
+ if err = res.Resolve(c, activity); err != nil && !streams.IsUnmatchedErr(err) {
+ return err
+ } else if streams.IsUnmatchedErr(err) {
+ err = a.s2s.DefaultCallback(c, activity)
+ if err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+// InboxForwarding implements the 3-part inbox forwarding algorithm specified in
+// the ActivityPub specification. Does not modify the Activity, but may send
+// outbound requests as a side effect.
+//
+// InboxForwarding sets the federated data in the database.
+func (a *SideEffectActor) InboxForwarding(c context.Context, inboxIRI *url.URL, activity Activity) error {
+ // 1. Must be first time we have seen this Activity.
+ //
+ // Obtain the id of the activity
+ id := activity.GetJSONLDId()
+ // Acquire a lock for the id. To be held for the rest of execution.
+ unlock, err := a.db.Lock(c, id.Get())
+ if err != nil {
+ return err
+ }
+ // WARNING: Unlock is not deferred
+ //
+ // If the database already contains the activity, exit early.
+ exists, err := a.db.Exists(c, id.Get())
+ if err != nil {
+ unlock()
+ return err
+ } else if exists {
+ unlock()
+ return nil
+ }
+ // Attempt to create the activity entry.
+ err = a.db.Create(c, activity)
+ unlock() // unlock even on error return
+ if err != nil {
+ return err
+ }
+ // Unlock by this point and in every branch above.
+ //
+ // 2. The values of 'to', 'cc', or 'audience' are Collections owned by
+ // this server.
+ var r []*url.URL
+ to := activity.GetActivityStreamsTo()
+ if to != nil {
+ for iter := to.Begin(); iter != to.End(); iter = iter.Next() {
+ val, err := ToId(iter)
+ if err != nil {
+ return err
+ }
+ r = append(r, val)
+ }
+ }
+ cc := activity.GetActivityStreamsCc()
+ if cc != nil {
+ for iter := cc.Begin(); iter != cc.End(); iter = iter.Next() {
+ val, err := ToId(iter)
+ if err != nil {
+ return err
+ }
+ r = append(r, val)
+ }
+ }
+ audience := activity.GetActivityStreamsAudience()
+ if audience != nil {
+ for iter := audience.Begin(); iter != audience.End(); iter = iter.Next() {
+ val, err := ToId(iter)
+ if err != nil {
+ return err
+ }
+ r = append(r, val)
+ }
+ }
+ // Find all IRIs owned by this server. We need to find all of them so
+ // that forwarding can properly occur.
+ var myIRIs []*url.URL
+ for _, iri := range r {
+ if err != nil {
+ return err
+ }
+ var unlock func()
+ unlock, err = a.db.Lock(c, iri)
+ if err != nil {
+ return err
+ }
+ // WARNING: Unlock is not deferred
+ owns, err := a.db.Owns(c, iri)
+ unlock() // unlock even on error
+ if err != nil {
+ return err
+ } else if !owns {
+ continue
+ }
+ // Unlock by this point and in every branch above.
+ myIRIs = append(myIRIs, iri)
+ }
+ // Finally, load our IRIs to determine if they are a Collection or
+ // OrderedCollection.
+ //
+ // Load the unfiltered IRIs.
+ var colIRIs []*url.URL
+ col := make(map[string]itemser)
+ oCol := make(map[string]orderedItemser)
+ for _, iri := range myIRIs {
+ var unlock func()
+ unlock, err = a.db.Lock(c, iri)
+ if err != nil {
+ return err
+ }
+ // WARNING: Not Unlocked
+ t, err := a.db.Get(c, iri)
+ if err != nil {
+ return err
+ }
+ if streams.IsOrExtendsActivityStreamsOrderedCollection(t) {
+ if im, ok := t.(orderedItemser); ok {
+ oCol[iri.String()] = im
+ colIRIs = append(colIRIs, iri)
+ defer unlock()
+ } else {
+ unlock() // unlock instantly
+ }
+ } else if streams.IsOrExtendsActivityStreamsCollection(t) {
+ if im, ok := t.(itemser); ok {
+ col[iri.String()] = im
+ colIRIs = append(colIRIs, iri)
+ defer unlock()
+ } else {
+ unlock() // unlock instantly
+ }
+ } else {
+ unlock() // unlock instantly
+ }
+ }
+ // If we own none of the Collection IRIs in 'to', 'cc', or 'audience'
+ // then no need to do inbox forwarding. We have nothing to forward to.
+ if len(colIRIs) == 0 {
+ return nil
+ }
+ // 3. The values of 'inReplyTo', 'object', 'target', or 'tag' are owned
+ // by this server. This is only a boolean trigger: As soon as we get
+ // a hit that we own something, then we should do inbox forwarding.
+ maxDepth := a.s2s.MaxInboxForwardingRecursionDepth(c)
+ ownsValue, err := a.hasInboxForwardingValues(c, inboxIRI, activity, maxDepth, 0)
+ if err != nil {
+ return err
+ }
+ // If we don't own any of the 'inReplyTo', 'object', 'target', or 'tag'
+ // values, then no need to do inbox forwarding.
+ if !ownsValue {
+ return nil
+ }
+ // Do the inbox forwarding since the above conditions hold true. Support
+ // the behavior of letting the application filter out the resulting
+ // collections to be targeted.
+ toSend, err := a.s2s.FilterForwarding(c, colIRIs, activity)
+ if err != nil {
+ return err
+ }
+ recipients := make([]*url.URL, 0, len(toSend))
+ for _, iri := range toSend {
+ if c, ok := col[iri.String()]; ok {
+ if it := c.GetActivityStreamsItems(); it != nil {
+ for iter := it.Begin(); iter != it.End(); iter = iter.Next() {
+ id, err := ToId(iter)
+ if err != nil {
+ return err
+ }
+ recipients = append(recipients, id)
+ }
+ }
+ } else if oc, ok := oCol[iri.String()]; ok {
+ if oit := oc.GetActivityStreamsOrderedItems(); oit != nil {
+ for iter := oit.Begin(); iter != oit.End(); iter = iter.Next() {
+ id, err := ToId(iter)
+ if err != nil {
+ return err
+ }
+ recipients = append(recipients, id)
+ }
+ }
+ }
+ }
+ return a.deliverToRecipients(c, inboxIRI, activity, recipients)
+}
+
+// PostOutbox handles the side effects of adding the activity to the actor's
+// outbox, and triggering side effects based on the activity's type.
+//
+// This implementation assumes all types are meant to be delivered except for
+// the ActivityStreams Block type.
+func (a *SideEffectActor) PostOutbox(c context.Context, activity Activity, outboxIRI *url.URL, rawJSON map[string]interface{}) (deliverable bool, err error) {
+ // TODO: Determine this if c2s is nil
+ deliverable = true
+ if a.c2s != nil {
+ var wrapped SocialWrappedCallbacks
+ var other []interface{}
+ wrapped, other, err = a.c2s.SocialCallbacks(c)
+ if err != nil {
+ return
+ }
+ // Populate side channels.
+ wrapped.db = a.db
+ wrapped.outboxIRI = outboxIRI
+ wrapped.rawActivity = rawJSON
+ wrapped.clock = a.clock
+ wrapped.newTransport = a.common.NewTransport
+ undeliverable := false
+ wrapped.undeliverable = &undeliverable
+ var res *streams.TypeResolver
+ res, err = streams.NewTypeResolver(wrapped.callbacks(other)...)
+ if err != nil {
+ return
+ }
+ if err = res.Resolve(c, activity); err != nil && !streams.IsUnmatchedErr(err) {
+ return
+ } else if streams.IsUnmatchedErr(err) {
+ deliverable = true
+ err = a.c2s.DefaultCallback(c, activity)
+ if err != nil {
+ return
+ }
+ } else {
+ deliverable = !undeliverable
+ }
+ }
+ err = a.addToOutbox(c, outboxIRI, activity)
+ return
+}
+
+// AddNewIDs creates new 'id' entries on an activity and its objects if it is a
+// Create activity.
+func (a *SideEffectActor) AddNewIDs(c context.Context, activity Activity) error {
+ id, err := a.db.NewID(c, activity)
+ if err != nil {
+ return err
+ }
+ activityId := streams.NewJSONLDIdProperty()
+ activityId.Set(id)
+ activity.SetJSONLDId(activityId)
+ if streams.IsOrExtendsActivityStreamsCreate(activity) {
+ o, ok := activity.(objecter)
+ if !ok {
+ return fmt.Errorf("cannot add new id for Create: %T has no object property", activity)
+ }
+ if oProp := o.GetActivityStreamsObject(); oProp != nil {
+ for iter := oProp.Begin(); iter != oProp.End(); iter = iter.Next() {
+ t := iter.GetType()
+ if t == nil {
+ return fmt.Errorf("cannot add new id for object in Create: object is not embedded as a value literal")
+ }
+ id, err = a.db.NewID(c, t)
+ if err != nil {
+ return err
+ }
+ objId := streams.NewJSONLDIdProperty()
+ objId.Set(id)
+ t.SetJSONLDId(objId)
+ }
+ }
+ }
+ return nil
+}
+
+// deliver will complete the peer-to-peer sending of a federated message to
+// another server.
+//
+// Must be called if at least the federated protocol is supported.
+func (a *SideEffectActor) Deliver(c context.Context, outboxIRI *url.URL, activity Activity) error {
+ recipients, err := a.prepare(c, outboxIRI, activity)
+ if err != nil {
+ return err
+ }
+ return a.deliverToRecipients(c, outboxIRI, activity, recipients)
+}
+
+// WrapInCreate wraps an object with a Create activity.
+func (a *SideEffectActor) WrapInCreate(c context.Context, obj vocab.Type, outboxIRI *url.URL) (create vocab.ActivityStreamsCreate, err error) {
+ var unlock func()
+ unlock, err = a.db.Lock(c, outboxIRI)
+ if err != nil {
+ return
+ }
+ // WARNING: No deferring the Unlock
+ actorIRI, err := a.db.ActorForOutbox(c, outboxIRI)
+ unlock() // unlock after regardless
+ if err != nil {
+ return
+ }
+ // Unlock the lock at this point and every branch above
+ return wrapInCreate(c, obj, actorIRI)
+}
+
+// deliverToRecipients will take a prepared Activity and send it to specific
+// recipients on behalf of an actor.
+func (a *SideEffectActor) deliverToRecipients(c context.Context, boxIRI *url.URL, activity Activity, recipients []*url.URL) error {
+ // Call whichever serializer is
+ // set on the side effect actor.
+ m, err := a.Serialize(activity)
+ if err != nil {
+ return err
+ }
+
+ tp, err := a.common.NewTransport(c, boxIRI, goFedUserAgent())
+ if err != nil {
+ return err
+ }
+
+ return tp.BatchDeliver(c, m, recipients)
+}
+
+// addToOutbox adds the activity to the outbox and creates the activity in the
+// internal database as its own entry.
+func (a *SideEffectActor) addToOutbox(c context.Context, outboxIRI *url.URL, activity Activity) error {
+ // Set the activity in the database first.
+ id := activity.GetJSONLDId()
+ unlock, err := a.db.Lock(c, id.Get())
+ if err != nil {
+ return err
+ }
+ // WARNING: Unlock not deferred
+ err = a.db.Create(c, activity)
+ unlock() // unlock after regardless
+ if err != nil {
+ return err
+ }
+ // WARNING: Unlock(c, id) should be called by this point and in every
+ // return before here.
+ //
+ // Acquire a lock to read the outbox. Defer release.
+ unlock, err = a.db.Lock(c, outboxIRI)
+ if err != nil {
+ return err
+ }
+ defer unlock()
+ outbox, err := a.db.GetOutbox(c, outboxIRI)
+ if err != nil {
+ return err
+ }
+ // Prepend the activity to the list of 'orderedItems'.
+ oi := outbox.GetActivityStreamsOrderedItems()
+ if oi == nil {
+ oi = streams.NewActivityStreamsOrderedItemsProperty()
+ }
+ oi.PrependIRI(id.Get())
+ outbox.SetActivityStreamsOrderedItems(oi)
+ // Save in the database.
+ err = a.db.SetOutbox(c, outbox)
+ return err
+}
+
+// addToInboxIfNew will add the activity to the inbox at the specified IRI if
+// the activity's ID has not yet been added to the inbox.
+//
+// It does not add the activity to this database's know federated data.
+//
+// Returns true when the activity is novel.
+func (a *SideEffectActor) addToInboxIfNew(c context.Context, inboxIRI *url.URL, activity Activity) (isNew bool, err error) {
+ // Acquire a lock to read the inbox. Defer release.
+ var unlock func()
+ unlock, err = a.db.Lock(c, inboxIRI)
+ if err != nil {
+ return
+ }
+ defer unlock()
+ // Obtain the id of the activity
+ id := activity.GetJSONLDId()
+ // If the inbox already contains the URL, early exit.
+ contains, err := a.db.InboxContains(c, inboxIRI, id.Get())
+ if err != nil {
+ return
+ } else if contains {
+ return
+ }
+ // It is a new id, acquire the inbox.
+ isNew = true
+ inbox, err := a.db.GetInbox(c, inboxIRI)
+ if err != nil {
+ return
+ }
+ // Prepend the activity to the list of 'orderedItems'.
+ oi := inbox.GetActivityStreamsOrderedItems()
+ if oi == nil {
+ oi = streams.NewActivityStreamsOrderedItemsProperty()
+ }
+ oi.PrependIRI(id.Get())
+ inbox.SetActivityStreamsOrderedItems(oi)
+ // Save in the database.
+ err = a.db.SetInbox(c, inbox)
+ return
+}
+
+// Given an ActivityStreams value, recursively examines ownership of the id or
+// href and the ones on properties applicable to inbox forwarding.
+//
+// Recursion may be limited by providing a 'maxDepth' greater than zero. A
+// value of zero or a negative number will result in infinite recursion.
+func (a *SideEffectActor) hasInboxForwardingValues(c context.Context, inboxIRI *url.URL, val vocab.Type, maxDepth, currDepth int) (bool, error) {
+ // Stop recurring if we are exceeding the maximum depth and the maximum
+ // is a positive number.
+ if maxDepth > 0 && currDepth >= maxDepth {
+ return false, nil
+ }
+ // Determine if we own the 'id' of any values on the properties we care
+ // about.
+ types, iris := getInboxForwardingValues(val)
+ // For IRIs, simply check if we own them.
+ for _, iri := range iris {
+ unlock, err := a.db.Lock(c, iri)
+ if err != nil {
+ return false, err
+ }
+ // WARNING: Unlock is not deferred
+ owns, err := a.db.Owns(c, iri)
+ unlock() // unlock after regardless
+ if err != nil {
+ return false, err
+ } else if owns {
+ return true, nil
+ }
+ // Unlock by this point and in every branch above
+ }
+ // For embedded literals, check the id.
+ for _, val := range types {
+ id, err := GetId(val)
+ if err != nil {
+ return false, err
+ }
+ var unlock func()
+ unlock, err = a.db.Lock(c, id)
+ if err != nil {
+ return false, err
+ }
+ // WARNING: Unlock is not deferred
+ owns, err := a.db.Owns(c, id)
+ unlock() // unlock after regardless
+ if err != nil {
+ return false, err
+ } else if owns {
+ return true, nil
+ }
+ // Unlock by this point and in every branch above
+ }
+ // Recur Preparation: Try fetching the IRIs so we can recur into them.
+ for _, iri := range iris {
+ // Dereferencing the IRI.
+ tport, err := a.common.NewTransport(c, inboxIRI, goFedUserAgent())
+ if err != nil {
+ return false, err
+ }
+ resp, err := tport.Dereference(c, iri)
+ if err != nil {
+ // Do not fail the entire process if the data is
+ // missing.
+ continue
+ }
+ m, err := readActivityPubResp(resp)
+ if err != nil {
+ return false, err
+ }
+ t, err := streams.ToType(c, m)
+ if err != nil {
+ // Do not fail the entire process if we cannot handle
+ // the type.
+ continue
+ }
+ types = append(types, t)
+ }
+ // Recur.
+ for _, nextVal := range types {
+ if has, err := a.hasInboxForwardingValues(c, inboxIRI, nextVal, maxDepth, currDepth+1); err != nil {
+ return false, err
+ } else if has {
+ return true, nil
+ }
+ }
+ return false, nil
+}
+
+// prepare takes a deliverableObject and returns a list of the final
+// recipient inbox IRIs. Additionally, the deliverableObject will have
+// any hidden hidden recipients ("bto" and "bcc") stripped from it.
+//
+// Only call if both the social and federated protocol are supported.
+func (a *SideEffectActor) prepare(
+ ctx context.Context,
+ outboxIRI *url.URL,
+ activity Activity,
+) ([]*url.URL, error) {
+ // Iterate through to, bto, cc, bcc, and audience
+ // to extract a slice of addressee IRIs / IDs.
+ //
+ // The resulting slice might look something like:
+ //
+ // [
+ // "https://example.org/users/someone/followers", // <-- collection IRI
+ // "https://another.example.org/users/someone_else", // <-- actor IRI
+ // "[...]" // <-- etc
+ // ]
+ var actorsAndCollections []*url.URL
+ if to := activity.GetActivityStreamsTo(); to != nil {
+ for iter := to.Begin(); iter != to.End(); iter = iter.Next() {
+ var err error
+ actorsAndCollections, err = appendToActorsAndCollectionsIRIs(
+ iter, actorsAndCollections,
+ )
+ if err != nil {
+ return nil, err
+ }
+ }
+ }
+
+ if bto := activity.GetActivityStreamsBto(); bto != nil {
+ for iter := bto.Begin(); iter != bto.End(); iter = iter.Next() {
+ var err error
+ actorsAndCollections, err = appendToActorsAndCollectionsIRIs(
+ iter, actorsAndCollections,
+ )
+ if err != nil {
+ return nil, err
+ }
+ }
+ }
+
+ if cc := activity.GetActivityStreamsCc(); cc != nil {
+ for iter := cc.Begin(); iter != cc.End(); iter = iter.Next() {
+ var err error
+ actorsAndCollections, err = appendToActorsAndCollectionsIRIs(
+ iter, actorsAndCollections,
+ )
+ if err != nil {
+ return nil, err
+ }
+ }
+ }
+
+ if bcc := activity.GetActivityStreamsBcc(); bcc != nil {
+ for iter := bcc.Begin(); iter != bcc.End(); iter = iter.Next() {
+ var err error
+ actorsAndCollections, err = appendToActorsAndCollectionsIRIs(
+ iter, actorsAndCollections,
+ )
+ if err != nil {
+ return nil, err
+ }
+ }
+ }
+
+ if audience := activity.GetActivityStreamsAudience(); audience != nil {
+ for iter := audience.Begin(); iter != audience.End(); iter = iter.Next() {
+ var err error
+ actorsAndCollections, err = appendToActorsAndCollectionsIRIs(
+ iter, actorsAndCollections,
+ )
+ if err != nil {
+ return nil, err
+ }
+ }
+ }
+
+ // PRE-SORTING
+
+ // If the pre-delivery sort function is defined,
+ // call it now so that implementations can sort
+ // delivery order to their preferences.
+ if a.DeliveryRecipientPreSort != nil {
+ actorsAndCollections = a.DeliveryRecipientPreSort(actorsAndCollections)
+ }
+
+ // We now need to dereference the actor or collection
+ // IRIs to derive inboxes that we can POST requests to.
+ var (
+ inboxes = make([]*url.URL, 0, len(actorsAndCollections))
+ derefdEntries = make(map[string]struct{}, len(actorsAndCollections))
+ )
+
+ // First check if the implemented database logic
+ // can return any of these inboxes without having
+ // to make remote dereference calls (much cheaper).
+ for _, actorOrCollection := range actorsAndCollections {
+ actorOrCollectionStr := actorOrCollection.String()
+ if _, derefd := derefdEntries[actorOrCollectionStr]; derefd {
+ // Ignore potential duplicates
+ // we've already derefd to inbox(es).
+ continue
+ }
+
+ // BEGIN LOCK
+ unlock, err := a.db.Lock(ctx, actorOrCollection)
+ if err != nil {
+ return nil, err
+ }
+
+ // Try to get inbox(es) for this actor or collection.
+ gotInboxes, err := a.db.InboxesForIRI(ctx, actorOrCollection)
+
+ // END LOCK
+ unlock()
+
+ if err != nil {
+ return nil, err
+ }
+
+ if len(gotInboxes) == 0 {
+ // No hit(s).
+ continue
+ }
+
+ // We have one or more hits.
+ inboxes = append(inboxes, gotInboxes...)
+
+ // Mark this actor or collection as deref'd.
+ derefdEntries[actorOrCollectionStr] = struct{}{}
+ }
+
+ // Now look for any remaining actors/collections
+ // that weren't already dereferenced into inboxes
+ // with db calls; find these by making deref calls
+ // to remote instances.
+ //
+ // First get a transport to do the http calls.
+ t, err := a.common.NewTransport(ctx, outboxIRI, goFedUserAgent())
+ if err != nil {
+ return nil, err
+ }
+
+ // Make HTTP calls to unpack collection IRIs into
+ // Actor IRIs and then into Actor types, ignoring
+ // actors or collections we've already deref'd.
+ actorsFromRemote, err := a.resolveActors(
+ ctx,
+ t,
+ actorsAndCollections,
+ derefdEntries,
+ 0, a.s2s.MaxDeliveryRecursionDepth(ctx),
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ // Release no-longer-needed collections.
+ clear(derefdEntries)
+ clear(actorsAndCollections)
+
+ // Extract inbox IRI from each deref'd Actor (if any).
+ inboxesFromRemote, err := actorsToInboxIRIs(actorsFromRemote)
+ if err != nil {
+ return nil, err
+ }
+
+ // Combine db-discovered inboxes and remote-discovered
+ // inboxes into a final list of destination inboxes.
+ inboxes = append(inboxes, inboxesFromRemote...)
+
+ // POST FILTERING
+
+ // Do a final pass of the inboxes to:
+ //
+ // 1. Deduplicate entries.
+ // 2. Ensure that the list of inboxes doesn't
+ // contain the inbox of whoever the outbox
+ // belongs to, no point delivering to oneself.
+ //
+ // To do this we first need to get the
+ // inbox IRI of this outbox's Actor.
+
+ // BEGIN LOCK
+ unlock, err := a.db.Lock(ctx, outboxIRI)
+ if err != nil {
+ return nil, err
+ }
+
+ // Get the IRI of the Actor who owns this outbox.
+ outboxActorIRI, err := a.db.ActorForOutbox(ctx, outboxIRI)
+
+ // END LOCK
+ unlock()
+
+ if err != nil {
+ return nil, err
+ }
+
+ // BEGIN LOCK
+ unlock, err = a.db.Lock(ctx, outboxActorIRI)
+ if err != nil {
+ return nil, err
+ }
+
+ // Now get the Actor who owns this outbox.
+ outboxActor, err := a.db.Get(ctx, outboxActorIRI)
+
+ // END LOCK
+ unlock()
+
+ if err != nil {
+ return nil, err
+ }
+
+ // Extract the inbox IRI for the outbox Actor.
+ inboxOfOutboxActor, err := getInbox(outboxActor)
+ if err != nil {
+ return nil, err
+ }
+
+ // Deduplicate the final inboxes slice, and filter
+ // out of the inbox of this outbox actor (if present).
+ inboxes = filterInboxIRIs(inboxes, inboxOfOutboxActor)
+
+ // Now that we've derived inboxes to deliver
+ // the activity to, strip off any bto or bcc
+ // recipients, as per the AP spec requirements.
+ stripHiddenRecipients(activity)
+
+ // All done!
+ return inboxes, nil
+}
+
+// resolveActors takes a list of Actor id URIs and returns them as concrete
+// instances of actorObject. It attempts to apply recursively when it encounters
+// a target that is a Collection or OrderedCollection.
+//
+// Any IRI strings in the ignores map will be skipped (use this when
+// you've already dereferenced some of the actorAndCollectionIRIs).
+//
+// If maxDepth is zero or negative, then recursion is infinitely applied.
+//
+// If a recipient is a Collection or OrderedCollection, then the server MUST
+// dereference the collection, WITH the user's credentials.
+//
+// Note that this also applies to CollectionPage and OrderedCollectionPage.
+func (a *SideEffectActor) resolveActors(
+ ctx context.Context,
+ t Transport,
+ actorAndCollectionIRIs []*url.URL,
+ ignores map[string]struct{},
+ depth, maxDepth int,
+) ([]vocab.Type, error) {
+ if maxDepth > 0 && depth >= maxDepth {
+ // Hit our max depth.
+ return nil, nil
+ }
+
+ if len(actorAndCollectionIRIs) == 0 {
+ // Nothing to do.
+ return nil, nil
+ }
+
+ // Optimistically assume 1:1 mapping of IRIs to actors.
+ actors := make([]vocab.Type, 0, len(actorAndCollectionIRIs))
+
+ // Deref each actorOrCollectionIRI if not ignored.
+ for _, actorOrCollectionIRI := range actorAndCollectionIRIs {
+ _, ignore := ignores[actorOrCollectionIRI.String()]
+ if ignore {
+ // Don't try to
+ // deref this one.
+ continue
+ }
+
+ // TODO: Determine if more logic is needed here for
+ // inaccessible collections owned by peer servers.
+ actor, more, err := a.dereferenceForResolvingInboxes(ctx, t, actorOrCollectionIRI)
+ if err != nil {
+ // Missing recipient -- skip.
+ continue
+ }
+
+ if actor != nil {
+ // Got a hit.
+ actors = append(actors, actor)
+ }
+
+ // If this was a collection, get more.
+ recurActors, err := a.resolveActors(
+ ctx,
+ t,
+ more,
+ ignores,
+ depth+1, maxDepth,
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ actors = append(actors, recurActors...)
+ }
+
+ return actors, nil
+}
+
+// dereferenceForResolvingInboxes dereferences an IRI solely for finding an
+// actor's inbox IRI to deliver to.
+//
+// The returned actor could be nil, if it wasn't an actor (ex: a Collection or
+// OrderedCollection).
+func (a *SideEffectActor) dereferenceForResolvingInboxes(c context.Context, t Transport, actorIRI *url.URL) (actor vocab.Type, moreActorIRIs []*url.URL, err error) {
+ var resp *http.Response
+ resp, err = t.Dereference(c, actorIRI)
+ if err != nil {
+ return
+ }
+ var m map[string]interface{}
+ m, err = readActivityPubResp(resp)
+ if err != nil {
+ return
+ }
+ actor, err = streams.ToType(c, m)
+ if err != nil {
+ return
+ }
+ // Attempt to see if the 'actor' is really some sort of type that has
+ // an 'items' or 'orderedItems' property.
+ if v, ok := actor.(itemser); ok {
+ if i := v.GetActivityStreamsItems(); i != nil {
+ for iter := i.Begin(); iter != i.End(); iter = iter.Next() {
+ var id *url.URL
+ id, err = ToId(iter)
+ if err != nil {
+ return
+ }
+ moreActorIRIs = append(moreActorIRIs, id)
+ }
+ }
+ actor = nil
+ } else if v, ok := actor.(orderedItemser); ok {
+ if i := v.GetActivityStreamsOrderedItems(); i != nil {
+ for iter := i.Begin(); iter != i.End(); iter = iter.Next() {
+ var id *url.URL
+ id, err = ToId(iter)
+ if err != nil {
+ return
+ }
+ moreActorIRIs = append(moreActorIRIs, id)
+ }
+ }
+ actor = nil
+ }
+ return
+}