diff options
| author | 2025-04-25 15:15:36 +0200 | |
|---|---|---|
| committer | 2025-04-25 15:15:36 +0200 | |
| commit | ffde1b150faca940bc6c172068aa068cf468aa39 (patch) | |
| tree | 2b325bf50946b95502d948d5700c148d667346d8 /vendor/code.superseriousbusiness.org/activity/pub/side_effect_actor.go | |
| parent | [chore] Update `activity` to v1.14.0 (#4038) (diff) | |
| download | gotosocial-ffde1b150faca940bc6c172068aa068cf468aa39.tar.xz | |
[chore] Move deps to code.superseriousbusiness.org (#4054)
Diffstat (limited to 'vendor/code.superseriousbusiness.org/activity/pub/side_effect_actor.go')
| -rw-r--r-- | vendor/code.superseriousbusiness.org/activity/pub/side_effect_actor.go | 1047 |
1 files changed, 1047 insertions, 0 deletions
diff --git a/vendor/code.superseriousbusiness.org/activity/pub/side_effect_actor.go b/vendor/code.superseriousbusiness.org/activity/pub/side_effect_actor.go new file mode 100644 index 000000000..820dca403 --- /dev/null +++ b/vendor/code.superseriousbusiness.org/activity/pub/side_effect_actor.go @@ -0,0 +1,1047 @@ +package pub + +import ( + "context" + "fmt" + "net/http" + "net/url" + + "code.superseriousbusiness.org/activity/streams" + "code.superseriousbusiness.org/activity/streams/vocab" +) + +// sideEffectActor must satisfy the DelegateActor interface. +var _ DelegateActor = &SideEffectActor{} + +// SideEffectActor is a DelegateActor that handles the ActivityPub +// implementation side effects, but requires a more opinionated application to +// be written. +// +// Note that when using the SideEffectActor with an application that good-faith +// implements its required interfaces, the ActivityPub specification is +// guaranteed to be correctly followed. +type SideEffectActor struct { + // When doing deliveries to remote servers via the s2s protocol, the side effect + // actor will by default use the Serialize function from the streams package. + // However, this can be overridden after the side effect actor is intantiated, + // by setting the exposed Serialize function on the struct. For example: + // + // a := NewSideEffectActor(...) + // a.Serialize = func(a vocab.Type) (m map[string]interface{}, e error) { + // // Put your custom serializer logic here. + // } + // + // Note that you should only do this *immediately* after instantiating the side + // effect actor -- never while your application is already running, as this will + // likely cause race conditions or other problems! In most cases, you will never + // need to change this; it's provided solely to allow easier customization by + // applications. + Serialize func(a vocab.Type) (m map[string]interface{}, e error) + + // When doing deliveries to remote servers via the s2s protocol, it may be desirable + // for implementations to be able to pre-sort recipients so that higher-priority + // recipients are higher up in the delivery queue, and lower-priority recipients + // are further down. This can be achieved by setting the DeliveryRecipientPreSort + // function on the side effect actor after it's instantiated. For example: + // + // a := NewSideEffectActor(...) + // a.DeliveryRecipientPreSort = func(actorAndCollectionIRIs []*url.URL) []*url.URL { + // // Put your sorting logic here. + // } + // + // The actorAndCollectionIRIs parameter will be the initial list of IRIs derived by + // looking at the "to", "cc", "bto", "bcc", and "audience" properties of the activity + // being delivered, excluding the AP public IRI, and before dereferencing of inboxes. + // It may look something like this: + // + // [ + // "https://example.org/users/someone/followers", // <-- collection IRI + // "https://another.example.org/users/someone_else", // <-- actor IRI + // "[...]" // <-- etc + // ] + // + // In this case, implementers may wish to sort the slice so that the directly-addressed + // actor "https://another.example.org/users/someone_else" occurs at an earlier index in + // the slice than the followers collection "https://example.org/users/someone/followers", + // so that "@someone_else" receives the delivery first. + // + // Note that you should only do this *immediately* after instantiating the side + // effect actor -- never while your application is already running, as this will + // likely cause race conditions or other problems! It's also completely fine to not + // set this function at all -- in this case, no pre-sorting of recipients will be + // performed, and delivery will occur in a non-determinate order. + DeliveryRecipientPreSort func(actorAndCollectionIRIs []*url.URL) []*url.URL + + common CommonBehavior + s2s FederatingProtocol + c2s SocialProtocol + db Database + clock Clock +} + +// NewSideEffectActor returns a new SideEffectActor, which satisfies the +// DelegateActor interface. Most of the time you will not need to call this +// function, and should instead rely on the NewSocialActor, NewFederatingActor, +// and NewActor functions, all of which use a SideEffectActor under the hood. +// Nevertheless, this function is exposed in case application developers need +// a SideEffectActor for some other reason (tests, monkey patches, etc). +// +// If you are using the returned SideEffectActor for federation, ensure that s2s +// is not nil. Likewise, if you are using it for the social protocol, ensure +// that c2s is not nil. +func NewSideEffectActor(c CommonBehavior, + s2s FederatingProtocol, + c2s SocialProtocol, + db Database, + clock Clock) *SideEffectActor { + return &SideEffectActor{ + Serialize: streams.Serialize, + common: c, + s2s: s2s, + c2s: c2s, + db: db, + clock: clock, + } +} + +// PostInboxRequestBodyHook defers to the delegate. +func (a *SideEffectActor) PostInboxRequestBodyHook(c context.Context, r *http.Request, activity Activity) (context.Context, error) { + return a.s2s.PostInboxRequestBodyHook(c, r, activity) +} + +// PostOutboxRequestBodyHook defers to the delegate. +func (a *SideEffectActor) PostOutboxRequestBodyHook(c context.Context, r *http.Request, data vocab.Type) (context.Context, error) { + return a.c2s.PostOutboxRequestBodyHook(c, r, data) +} + +// AuthenticatePostInbox defers to the delegate to authenticate the request. +func (a *SideEffectActor) AuthenticatePostInbox(c context.Context, w http.ResponseWriter, r *http.Request) (out context.Context, authenticated bool, err error) { + return a.s2s.AuthenticatePostInbox(c, w, r) +} + +// AuthenticateGetInbox defers to the delegate to authenticate the request. +func (a *SideEffectActor) AuthenticateGetInbox(c context.Context, w http.ResponseWriter, r *http.Request) (out context.Context, authenticated bool, err error) { + return a.common.AuthenticateGetInbox(c, w, r) +} + +// AuthenticatePostOutbox defers to the delegate to authenticate the request. +func (a *SideEffectActor) AuthenticatePostOutbox(c context.Context, w http.ResponseWriter, r *http.Request) (out context.Context, authenticated bool, err error) { + return a.c2s.AuthenticatePostOutbox(c, w, r) +} + +// AuthenticateGetOutbox defers to the delegate to authenticate the request. +func (a *SideEffectActor) AuthenticateGetOutbox(c context.Context, w http.ResponseWriter, r *http.Request) (out context.Context, authenticated bool, err error) { + return a.common.AuthenticateGetOutbox(c, w, r) +} + +// GetOutbox delegates to the SocialProtocol. +func (a *SideEffectActor) GetOutbox(c context.Context, r *http.Request) (vocab.ActivityStreamsOrderedCollectionPage, error) { + return a.common.GetOutbox(c, r) +} + +// GetInbox delegates to the FederatingProtocol. +func (a *SideEffectActor) GetInbox(c context.Context, r *http.Request) (vocab.ActivityStreamsOrderedCollectionPage, error) { + return a.s2s.GetInbox(c, r) +} + +// AuthorizePostInbox defers to the federating protocol whether the peer request +// is authorized based on the actors' ids. +func (a *SideEffectActor) AuthorizePostInbox(c context.Context, w http.ResponseWriter, activity Activity) (authorized bool, err error) { + authorized = false + actor := activity.GetActivityStreamsActor() + if actor == nil { + err = fmt.Errorf("no actors in post to inbox") + return + } + var iris []*url.URL + for i := 0; i < actor.Len(); i++ { + iter := actor.At(i) + if iter.IsIRI() { + iris = append(iris, iter.GetIRI()) + } else if t := iter.GetType(); t != nil { + iris = append(iris, activity.GetJSONLDId().Get()) + } else { + err = fmt.Errorf("actor at index %d is missing an id", i) + return + } + } + // Determine if the actor(s) sending this request are blocked. + var blocked bool + if blocked, err = a.s2s.Blocked(c, iris); err != nil { + return + } else if blocked { + w.WriteHeader(http.StatusForbidden) + return + } + authorized = true + return +} + +// PostInbox handles the side effects of determining whether to block the peer's +// request, adding the activity to the actor's inbox, and triggering side +// effects based on the activity's type. +func (a *SideEffectActor) PostInbox(c context.Context, inboxIRI *url.URL, activity Activity) error { + isNew, err := a.addToInboxIfNew(c, inboxIRI, activity) + if err != nil { + return err + } + if isNew { + wrapped, other, err := a.s2s.FederatingCallbacks(c) + if err != nil { + return err + } + // Populate side channels. + wrapped.db = a.db + wrapped.inboxIRI = inboxIRI + wrapped.newTransport = a.common.NewTransport + wrapped.deliver = a.Deliver + wrapped.addNewIds = a.AddNewIDs + res, err := streams.NewTypeResolver(wrapped.callbacks(other)...) + if err != nil { + return err + } + if err = res.Resolve(c, activity); err != nil && !streams.IsUnmatchedErr(err) { + return err + } else if streams.IsUnmatchedErr(err) { + err = a.s2s.DefaultCallback(c, activity) + if err != nil { + return err + } + } + } + return nil +} + +// InboxForwarding implements the 3-part inbox forwarding algorithm specified in +// the ActivityPub specification. Does not modify the Activity, but may send +// outbound requests as a side effect. +// +// InboxForwarding sets the federated data in the database. +func (a *SideEffectActor) InboxForwarding(c context.Context, inboxIRI *url.URL, activity Activity) error { + // 1. Must be first time we have seen this Activity. + // + // Obtain the id of the activity + id := activity.GetJSONLDId() + // Acquire a lock for the id. To be held for the rest of execution. + unlock, err := a.db.Lock(c, id.Get()) + if err != nil { + return err + } + // WARNING: Unlock is not deferred + // + // If the database already contains the activity, exit early. + exists, err := a.db.Exists(c, id.Get()) + if err != nil { + unlock() + return err + } else if exists { + unlock() + return nil + } + // Attempt to create the activity entry. + err = a.db.Create(c, activity) + unlock() // unlock even on error return + if err != nil { + return err + } + // Unlock by this point and in every branch above. + // + // 2. The values of 'to', 'cc', or 'audience' are Collections owned by + // this server. + var r []*url.URL + to := activity.GetActivityStreamsTo() + if to != nil { + for iter := to.Begin(); iter != to.End(); iter = iter.Next() { + val, err := ToId(iter) + if err != nil { + return err + } + r = append(r, val) + } + } + cc := activity.GetActivityStreamsCc() + if cc != nil { + for iter := cc.Begin(); iter != cc.End(); iter = iter.Next() { + val, err := ToId(iter) + if err != nil { + return err + } + r = append(r, val) + } + } + audience := activity.GetActivityStreamsAudience() + if audience != nil { + for iter := audience.Begin(); iter != audience.End(); iter = iter.Next() { + val, err := ToId(iter) + if err != nil { + return err + } + r = append(r, val) + } + } + // Find all IRIs owned by this server. We need to find all of them so + // that forwarding can properly occur. + var myIRIs []*url.URL + for _, iri := range r { + if err != nil { + return err + } + var unlock func() + unlock, err = a.db.Lock(c, iri) + if err != nil { + return err + } + // WARNING: Unlock is not deferred + owns, err := a.db.Owns(c, iri) + unlock() // unlock even on error + if err != nil { + return err + } else if !owns { + continue + } + // Unlock by this point and in every branch above. + myIRIs = append(myIRIs, iri) + } + // Finally, load our IRIs to determine if they are a Collection or + // OrderedCollection. + // + // Load the unfiltered IRIs. + var colIRIs []*url.URL + col := make(map[string]itemser) + oCol := make(map[string]orderedItemser) + for _, iri := range myIRIs { + var unlock func() + unlock, err = a.db.Lock(c, iri) + if err != nil { + return err + } + // WARNING: Not Unlocked + t, err := a.db.Get(c, iri) + if err != nil { + return err + } + if streams.IsOrExtendsActivityStreamsOrderedCollection(t) { + if im, ok := t.(orderedItemser); ok { + oCol[iri.String()] = im + colIRIs = append(colIRIs, iri) + defer unlock() + } else { + unlock() // unlock instantly + } + } else if streams.IsOrExtendsActivityStreamsCollection(t) { + if im, ok := t.(itemser); ok { + col[iri.String()] = im + colIRIs = append(colIRIs, iri) + defer unlock() + } else { + unlock() // unlock instantly + } + } else { + unlock() // unlock instantly + } + } + // If we own none of the Collection IRIs in 'to', 'cc', or 'audience' + // then no need to do inbox forwarding. We have nothing to forward to. + if len(colIRIs) == 0 { + return nil + } + // 3. The values of 'inReplyTo', 'object', 'target', or 'tag' are owned + // by this server. This is only a boolean trigger: As soon as we get + // a hit that we own something, then we should do inbox forwarding. + maxDepth := a.s2s.MaxInboxForwardingRecursionDepth(c) + ownsValue, err := a.hasInboxForwardingValues(c, inboxIRI, activity, maxDepth, 0) + if err != nil { + return err + } + // If we don't own any of the 'inReplyTo', 'object', 'target', or 'tag' + // values, then no need to do inbox forwarding. + if !ownsValue { + return nil + } + // Do the inbox forwarding since the above conditions hold true. Support + // the behavior of letting the application filter out the resulting + // collections to be targeted. + toSend, err := a.s2s.FilterForwarding(c, colIRIs, activity) + if err != nil { + return err + } + recipients := make([]*url.URL, 0, len(toSend)) + for _, iri := range toSend { + if c, ok := col[iri.String()]; ok { + if it := c.GetActivityStreamsItems(); it != nil { + for iter := it.Begin(); iter != it.End(); iter = iter.Next() { + id, err := ToId(iter) + if err != nil { + return err + } + recipients = append(recipients, id) + } + } + } else if oc, ok := oCol[iri.String()]; ok { + if oit := oc.GetActivityStreamsOrderedItems(); oit != nil { + for iter := oit.Begin(); iter != oit.End(); iter = iter.Next() { + id, err := ToId(iter) + if err != nil { + return err + } + recipients = append(recipients, id) + } + } + } + } + return a.deliverToRecipients(c, inboxIRI, activity, recipients) +} + +// PostOutbox handles the side effects of adding the activity to the actor's +// outbox, and triggering side effects based on the activity's type. +// +// This implementation assumes all types are meant to be delivered except for +// the ActivityStreams Block type. +func (a *SideEffectActor) PostOutbox(c context.Context, activity Activity, outboxIRI *url.URL, rawJSON map[string]interface{}) (deliverable bool, err error) { + // TODO: Determine this if c2s is nil + deliverable = true + if a.c2s != nil { + var wrapped SocialWrappedCallbacks + var other []interface{} + wrapped, other, err = a.c2s.SocialCallbacks(c) + if err != nil { + return + } + // Populate side channels. + wrapped.db = a.db + wrapped.outboxIRI = outboxIRI + wrapped.rawActivity = rawJSON + wrapped.clock = a.clock + wrapped.newTransport = a.common.NewTransport + undeliverable := false + wrapped.undeliverable = &undeliverable + var res *streams.TypeResolver + res, err = streams.NewTypeResolver(wrapped.callbacks(other)...) + if err != nil { + return + } + if err = res.Resolve(c, activity); err != nil && !streams.IsUnmatchedErr(err) { + return + } else if streams.IsUnmatchedErr(err) { + deliverable = true + err = a.c2s.DefaultCallback(c, activity) + if err != nil { + return + } + } else { + deliverable = !undeliverable + } + } + err = a.addToOutbox(c, outboxIRI, activity) + return +} + +// AddNewIDs creates new 'id' entries on an activity and its objects if it is a +// Create activity. +func (a *SideEffectActor) AddNewIDs(c context.Context, activity Activity) error { + id, err := a.db.NewID(c, activity) + if err != nil { + return err + } + activityId := streams.NewJSONLDIdProperty() + activityId.Set(id) + activity.SetJSONLDId(activityId) + if streams.IsOrExtendsActivityStreamsCreate(activity) { + o, ok := activity.(objecter) + if !ok { + return fmt.Errorf("cannot add new id for Create: %T has no object property", activity) + } + if oProp := o.GetActivityStreamsObject(); oProp != nil { + for iter := oProp.Begin(); iter != oProp.End(); iter = iter.Next() { + t := iter.GetType() + if t == nil { + return fmt.Errorf("cannot add new id for object in Create: object is not embedded as a value literal") + } + id, err = a.db.NewID(c, t) + if err != nil { + return err + } + objId := streams.NewJSONLDIdProperty() + objId.Set(id) + t.SetJSONLDId(objId) + } + } + } + return nil +} + +// deliver will complete the peer-to-peer sending of a federated message to +// another server. +// +// Must be called if at least the federated protocol is supported. +func (a *SideEffectActor) Deliver(c context.Context, outboxIRI *url.URL, activity Activity) error { + recipients, err := a.prepare(c, outboxIRI, activity) + if err != nil { + return err + } + return a.deliverToRecipients(c, outboxIRI, activity, recipients) +} + +// WrapInCreate wraps an object with a Create activity. +func (a *SideEffectActor) WrapInCreate(c context.Context, obj vocab.Type, outboxIRI *url.URL) (create vocab.ActivityStreamsCreate, err error) { + var unlock func() + unlock, err = a.db.Lock(c, outboxIRI) + if err != nil { + return + } + // WARNING: No deferring the Unlock + actorIRI, err := a.db.ActorForOutbox(c, outboxIRI) + unlock() // unlock after regardless + if err != nil { + return + } + // Unlock the lock at this point and every branch above + return wrapInCreate(c, obj, actorIRI) +} + +// deliverToRecipients will take a prepared Activity and send it to specific +// recipients on behalf of an actor. +func (a *SideEffectActor) deliverToRecipients(c context.Context, boxIRI *url.URL, activity Activity, recipients []*url.URL) error { + // Call whichever serializer is + // set on the side effect actor. + m, err := a.Serialize(activity) + if err != nil { + return err + } + + tp, err := a.common.NewTransport(c, boxIRI, goFedUserAgent()) + if err != nil { + return err + } + + return tp.BatchDeliver(c, m, recipients) +} + +// addToOutbox adds the activity to the outbox and creates the activity in the +// internal database as its own entry. +func (a *SideEffectActor) addToOutbox(c context.Context, outboxIRI *url.URL, activity Activity) error { + // Set the activity in the database first. + id := activity.GetJSONLDId() + unlock, err := a.db.Lock(c, id.Get()) + if err != nil { + return err + } + // WARNING: Unlock not deferred + err = a.db.Create(c, activity) + unlock() // unlock after regardless + if err != nil { + return err + } + // WARNING: Unlock(c, id) should be called by this point and in every + // return before here. + // + // Acquire a lock to read the outbox. Defer release. + unlock, err = a.db.Lock(c, outboxIRI) + if err != nil { + return err + } + defer unlock() + outbox, err := a.db.GetOutbox(c, outboxIRI) + if err != nil { + return err + } + // Prepend the activity to the list of 'orderedItems'. + oi := outbox.GetActivityStreamsOrderedItems() + if oi == nil { + oi = streams.NewActivityStreamsOrderedItemsProperty() + } + oi.PrependIRI(id.Get()) + outbox.SetActivityStreamsOrderedItems(oi) + // Save in the database. + err = a.db.SetOutbox(c, outbox) + return err +} + +// addToInboxIfNew will add the activity to the inbox at the specified IRI if +// the activity's ID has not yet been added to the inbox. +// +// It does not add the activity to this database's know federated data. +// +// Returns true when the activity is novel. +func (a *SideEffectActor) addToInboxIfNew(c context.Context, inboxIRI *url.URL, activity Activity) (isNew bool, err error) { + // Acquire a lock to read the inbox. Defer release. + var unlock func() + unlock, err = a.db.Lock(c, inboxIRI) + if err != nil { + return + } + defer unlock() + // Obtain the id of the activity + id := activity.GetJSONLDId() + // If the inbox already contains the URL, early exit. + contains, err := a.db.InboxContains(c, inboxIRI, id.Get()) + if err != nil { + return + } else if contains { + return + } + // It is a new id, acquire the inbox. + isNew = true + inbox, err := a.db.GetInbox(c, inboxIRI) + if err != nil { + return + } + // Prepend the activity to the list of 'orderedItems'. + oi := inbox.GetActivityStreamsOrderedItems() + if oi == nil { + oi = streams.NewActivityStreamsOrderedItemsProperty() + } + oi.PrependIRI(id.Get()) + inbox.SetActivityStreamsOrderedItems(oi) + // Save in the database. + err = a.db.SetInbox(c, inbox) + return +} + +// Given an ActivityStreams value, recursively examines ownership of the id or +// href and the ones on properties applicable to inbox forwarding. +// +// Recursion may be limited by providing a 'maxDepth' greater than zero. A +// value of zero or a negative number will result in infinite recursion. +func (a *SideEffectActor) hasInboxForwardingValues(c context.Context, inboxIRI *url.URL, val vocab.Type, maxDepth, currDepth int) (bool, error) { + // Stop recurring if we are exceeding the maximum depth and the maximum + // is a positive number. + if maxDepth > 0 && currDepth >= maxDepth { + return false, nil + } + // Determine if we own the 'id' of any values on the properties we care + // about. + types, iris := getInboxForwardingValues(val) + // For IRIs, simply check if we own them. + for _, iri := range iris { + unlock, err := a.db.Lock(c, iri) + if err != nil { + return false, err + } + // WARNING: Unlock is not deferred + owns, err := a.db.Owns(c, iri) + unlock() // unlock after regardless + if err != nil { + return false, err + } else if owns { + return true, nil + } + // Unlock by this point and in every branch above + } + // For embedded literals, check the id. + for _, val := range types { + id, err := GetId(val) + if err != nil { + return false, err + } + var unlock func() + unlock, err = a.db.Lock(c, id) + if err != nil { + return false, err + } + // WARNING: Unlock is not deferred + owns, err := a.db.Owns(c, id) + unlock() // unlock after regardless + if err != nil { + return false, err + } else if owns { + return true, nil + } + // Unlock by this point and in every branch above + } + // Recur Preparation: Try fetching the IRIs so we can recur into them. + for _, iri := range iris { + // Dereferencing the IRI. + tport, err := a.common.NewTransport(c, inboxIRI, goFedUserAgent()) + if err != nil { + return false, err + } + resp, err := tport.Dereference(c, iri) + if err != nil { + // Do not fail the entire process if the data is + // missing. + continue + } + m, err := readActivityPubResp(resp) + if err != nil { + return false, err + } + t, err := streams.ToType(c, m) + if err != nil { + // Do not fail the entire process if we cannot handle + // the type. + continue + } + types = append(types, t) + } + // Recur. + for _, nextVal := range types { + if has, err := a.hasInboxForwardingValues(c, inboxIRI, nextVal, maxDepth, currDepth+1); err != nil { + return false, err + } else if has { + return true, nil + } + } + return false, nil +} + +// prepare takes a deliverableObject and returns a list of the final +// recipient inbox IRIs. Additionally, the deliverableObject will have +// any hidden hidden recipients ("bto" and "bcc") stripped from it. +// +// Only call if both the social and federated protocol are supported. +func (a *SideEffectActor) prepare( + ctx context.Context, + outboxIRI *url.URL, + activity Activity, +) ([]*url.URL, error) { + // Iterate through to, bto, cc, bcc, and audience + // to extract a slice of addressee IRIs / IDs. + // + // The resulting slice might look something like: + // + // [ + // "https://example.org/users/someone/followers", // <-- collection IRI + // "https://another.example.org/users/someone_else", // <-- actor IRI + // "[...]" // <-- etc + // ] + var actorsAndCollections []*url.URL + if to := activity.GetActivityStreamsTo(); to != nil { + for iter := to.Begin(); iter != to.End(); iter = iter.Next() { + var err error + actorsAndCollections, err = appendToActorsAndCollectionsIRIs( + iter, actorsAndCollections, + ) + if err != nil { + return nil, err + } + } + } + + if bto := activity.GetActivityStreamsBto(); bto != nil { + for iter := bto.Begin(); iter != bto.End(); iter = iter.Next() { + var err error + actorsAndCollections, err = appendToActorsAndCollectionsIRIs( + iter, actorsAndCollections, + ) + if err != nil { + return nil, err + } + } + } + + if cc := activity.GetActivityStreamsCc(); cc != nil { + for iter := cc.Begin(); iter != cc.End(); iter = iter.Next() { + var err error + actorsAndCollections, err = appendToActorsAndCollectionsIRIs( + iter, actorsAndCollections, + ) + if err != nil { + return nil, err + } + } + } + + if bcc := activity.GetActivityStreamsBcc(); bcc != nil { + for iter := bcc.Begin(); iter != bcc.End(); iter = iter.Next() { + var err error + actorsAndCollections, err = appendToActorsAndCollectionsIRIs( + iter, actorsAndCollections, + ) + if err != nil { + return nil, err + } + } + } + + if audience := activity.GetActivityStreamsAudience(); audience != nil { + for iter := audience.Begin(); iter != audience.End(); iter = iter.Next() { + var err error + actorsAndCollections, err = appendToActorsAndCollectionsIRIs( + iter, actorsAndCollections, + ) + if err != nil { + return nil, err + } + } + } + + // PRE-SORTING + + // If the pre-delivery sort function is defined, + // call it now so that implementations can sort + // delivery order to their preferences. + if a.DeliveryRecipientPreSort != nil { + actorsAndCollections = a.DeliveryRecipientPreSort(actorsAndCollections) + } + + // We now need to dereference the actor or collection + // IRIs to derive inboxes that we can POST requests to. + var ( + inboxes = make([]*url.URL, 0, len(actorsAndCollections)) + derefdEntries = make(map[string]struct{}, len(actorsAndCollections)) + ) + + // First check if the implemented database logic + // can return any of these inboxes without having + // to make remote dereference calls (much cheaper). + for _, actorOrCollection := range actorsAndCollections { + actorOrCollectionStr := actorOrCollection.String() + if _, derefd := derefdEntries[actorOrCollectionStr]; derefd { + // Ignore potential duplicates + // we've already derefd to inbox(es). + continue + } + + // BEGIN LOCK + unlock, err := a.db.Lock(ctx, actorOrCollection) + if err != nil { + return nil, err + } + + // Try to get inbox(es) for this actor or collection. + gotInboxes, err := a.db.InboxesForIRI(ctx, actorOrCollection) + + // END LOCK + unlock() + + if err != nil { + return nil, err + } + + if len(gotInboxes) == 0 { + // No hit(s). + continue + } + + // We have one or more hits. + inboxes = append(inboxes, gotInboxes...) + + // Mark this actor or collection as deref'd. + derefdEntries[actorOrCollectionStr] = struct{}{} + } + + // Now look for any remaining actors/collections + // that weren't already dereferenced into inboxes + // with db calls; find these by making deref calls + // to remote instances. + // + // First get a transport to do the http calls. + t, err := a.common.NewTransport(ctx, outboxIRI, goFedUserAgent()) + if err != nil { + return nil, err + } + + // Make HTTP calls to unpack collection IRIs into + // Actor IRIs and then into Actor types, ignoring + // actors or collections we've already deref'd. + actorsFromRemote, err := a.resolveActors( + ctx, + t, + actorsAndCollections, + derefdEntries, + 0, a.s2s.MaxDeliveryRecursionDepth(ctx), + ) + if err != nil { + return nil, err + } + + // Release no-longer-needed collections. + clear(derefdEntries) + clear(actorsAndCollections) + + // Extract inbox IRI from each deref'd Actor (if any). + inboxesFromRemote, err := actorsToInboxIRIs(actorsFromRemote) + if err != nil { + return nil, err + } + + // Combine db-discovered inboxes and remote-discovered + // inboxes into a final list of destination inboxes. + inboxes = append(inboxes, inboxesFromRemote...) + + // POST FILTERING + + // Do a final pass of the inboxes to: + // + // 1. Deduplicate entries. + // 2. Ensure that the list of inboxes doesn't + // contain the inbox of whoever the outbox + // belongs to, no point delivering to oneself. + // + // To do this we first need to get the + // inbox IRI of this outbox's Actor. + + // BEGIN LOCK + unlock, err := a.db.Lock(ctx, outboxIRI) + if err != nil { + return nil, err + } + + // Get the IRI of the Actor who owns this outbox. + outboxActorIRI, err := a.db.ActorForOutbox(ctx, outboxIRI) + + // END LOCK + unlock() + + if err != nil { + return nil, err + } + + // BEGIN LOCK + unlock, err = a.db.Lock(ctx, outboxActorIRI) + if err != nil { + return nil, err + } + + // Now get the Actor who owns this outbox. + outboxActor, err := a.db.Get(ctx, outboxActorIRI) + + // END LOCK + unlock() + + if err != nil { + return nil, err + } + + // Extract the inbox IRI for the outbox Actor. + inboxOfOutboxActor, err := getInbox(outboxActor) + if err != nil { + return nil, err + } + + // Deduplicate the final inboxes slice, and filter + // out of the inbox of this outbox actor (if present). + inboxes = filterInboxIRIs(inboxes, inboxOfOutboxActor) + + // Now that we've derived inboxes to deliver + // the activity to, strip off any bto or bcc + // recipients, as per the AP spec requirements. + stripHiddenRecipients(activity) + + // All done! + return inboxes, nil +} + +// resolveActors takes a list of Actor id URIs and returns them as concrete +// instances of actorObject. It attempts to apply recursively when it encounters +// a target that is a Collection or OrderedCollection. +// +// Any IRI strings in the ignores map will be skipped (use this when +// you've already dereferenced some of the actorAndCollectionIRIs). +// +// If maxDepth is zero or negative, then recursion is infinitely applied. +// +// If a recipient is a Collection or OrderedCollection, then the server MUST +// dereference the collection, WITH the user's credentials. +// +// Note that this also applies to CollectionPage and OrderedCollectionPage. +func (a *SideEffectActor) resolveActors( + ctx context.Context, + t Transport, + actorAndCollectionIRIs []*url.URL, + ignores map[string]struct{}, + depth, maxDepth int, +) ([]vocab.Type, error) { + if maxDepth > 0 && depth >= maxDepth { + // Hit our max depth. + return nil, nil + } + + if len(actorAndCollectionIRIs) == 0 { + // Nothing to do. + return nil, nil + } + + // Optimistically assume 1:1 mapping of IRIs to actors. + actors := make([]vocab.Type, 0, len(actorAndCollectionIRIs)) + + // Deref each actorOrCollectionIRI if not ignored. + for _, actorOrCollectionIRI := range actorAndCollectionIRIs { + _, ignore := ignores[actorOrCollectionIRI.String()] + if ignore { + // Don't try to + // deref this one. + continue + } + + // TODO: Determine if more logic is needed here for + // inaccessible collections owned by peer servers. + actor, more, err := a.dereferenceForResolvingInboxes(ctx, t, actorOrCollectionIRI) + if err != nil { + // Missing recipient -- skip. + continue + } + + if actor != nil { + // Got a hit. + actors = append(actors, actor) + } + + // If this was a collection, get more. + recurActors, err := a.resolveActors( + ctx, + t, + more, + ignores, + depth+1, maxDepth, + ) + if err != nil { + return nil, err + } + + actors = append(actors, recurActors...) + } + + return actors, nil +} + +// dereferenceForResolvingInboxes dereferences an IRI solely for finding an +// actor's inbox IRI to deliver to. +// +// The returned actor could be nil, if it wasn't an actor (ex: a Collection or +// OrderedCollection). +func (a *SideEffectActor) dereferenceForResolvingInboxes(c context.Context, t Transport, actorIRI *url.URL) (actor vocab.Type, moreActorIRIs []*url.URL, err error) { + var resp *http.Response + resp, err = t.Dereference(c, actorIRI) + if err != nil { + return + } + var m map[string]interface{} + m, err = readActivityPubResp(resp) + if err != nil { + return + } + actor, err = streams.ToType(c, m) + if err != nil { + return + } + // Attempt to see if the 'actor' is really some sort of type that has + // an 'items' or 'orderedItems' property. + if v, ok := actor.(itemser); ok { + if i := v.GetActivityStreamsItems(); i != nil { + for iter := i.Begin(); iter != i.End(); iter = iter.Next() { + var id *url.URL + id, err = ToId(iter) + if err != nil { + return + } + moreActorIRIs = append(moreActorIRIs, id) + } + } + actor = nil + } else if v, ok := actor.(orderedItemser); ok { + if i := v.GetActivityStreamsOrderedItems(); i != nil { + for iter := i.Begin(); iter != i.End(); iter = iter.Next() { + var id *url.URL + id, err = ToId(iter) + if err != nil { + return + } + moreActorIRIs = append(moreActorIRIs, id) + } + } + actor = nil + } + return +} |
