summaryrefslogtreecommitdiff
path: root/vendor/github.com/superseriousbusiness/activity/pub/federating_wrapped_callbacks.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/superseriousbusiness/activity/pub/federating_wrapped_callbacks.go')
-rw-r--r--vendor/github.com/superseriousbusiness/activity/pub/federating_wrapped_callbacks.go908
1 files changed, 908 insertions, 0 deletions
diff --git a/vendor/github.com/superseriousbusiness/activity/pub/federating_wrapped_callbacks.go b/vendor/github.com/superseriousbusiness/activity/pub/federating_wrapped_callbacks.go
new file mode 100644
index 000000000..5115d95b1
--- /dev/null
+++ b/vendor/github.com/superseriousbusiness/activity/pub/federating_wrapped_callbacks.go
@@ -0,0 +1,908 @@
+package pub
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "net/url"
+
+ "github.com/superseriousbusiness/activity/streams"
+ "github.com/superseriousbusiness/activity/streams/vocab"
+)
+
+// OnFollowBehavior enumerates the different default actions that the go-fed
+// library can provide when receiving a Follow Activity from a peer.
+type OnFollowBehavior int
+
+const (
+ // OnFollowDoNothing does not take any action when a Follow Activity
+ // is received.
+ OnFollowDoNothing OnFollowBehavior = iota
+ // OnFollowAutomaticallyAccept triggers the side effect of sending an
+ // Accept of this Follow request in response.
+ OnFollowAutomaticallyAccept
+ // OnFollowAutomaticallyAccept triggers the side effect of sending a
+ // Reject of this Follow request in response.
+ OnFollowAutomaticallyReject
+)
+
+// FederatingWrappedCallbacks lists the callback functions that already have
+// some side effect behavior provided by the pub library.
+//
+// These functions are wrapped for the Federating Protocol.
+type FederatingWrappedCallbacks struct {
+ // Create handles additional side effects for the Create ActivityStreams
+ // type, specific to the application using go-fed.
+ //
+ // The wrapping callback for the Federating Protocol ensures the
+ // 'object' property is created in the database.
+ //
+ // Create calls Create for each object in the federated Activity.
+ Create func(context.Context, vocab.ActivityStreamsCreate) error
+ // Update handles additional side effects for the Update ActivityStreams
+ // type, specific to the application using go-fed.
+ //
+ // The wrapping callback for the Federating Protocol ensures the
+ // 'object' property is updated in the database.
+ //
+ // Update calls Update on the federated entry from the database, with a
+ // new value.
+ Update func(context.Context, vocab.ActivityStreamsUpdate) error
+ // Delete handles additional side effects for the Delete ActivityStreams
+ // type, specific to the application using go-fed.
+ //
+ // Delete removes the federated entry from the database.
+ Delete func(context.Context, vocab.ActivityStreamsDelete) error
+ // Follow handles additional side effects for the Follow ActivityStreams
+ // type, specific to the application using go-fed.
+ //
+ // The wrapping function can have one of several default behaviors,
+ // depending on the value of the OnFollow setting.
+ Follow func(context.Context, vocab.ActivityStreamsFollow) error
+ // OnFollow determines what action to take for this particular callback
+ // if a Follow Activity is handled.
+ OnFollow OnFollowBehavior
+ // Accept handles additional side effects for the Accept ActivityStreams
+ // type, specific to the application using go-fed.
+ //
+ // The wrapping function determines if this 'Accept' is in response to a
+ // 'Follow'. If so, then the 'actor' is added to the original 'actor's
+ // 'following' collection.
+ //
+ // Otherwise, no side effects are done by go-fed.
+ Accept func(context.Context, vocab.ActivityStreamsAccept) error
+ // Reject handles additional side effects for the Reject ActivityStreams
+ // type, specific to the application using go-fed.
+ //
+ // The wrapping function has no default side effects. However, if this
+ // 'Reject' is in response to a 'Follow' then the client MUST NOT go
+ // forward with adding the 'actor' to the original 'actor's 'following'
+ // collection by the client application.
+ Reject func(context.Context, vocab.ActivityStreamsReject) error
+ // Add handles additional side effects for the Add ActivityStreams
+ // type, specific to the application using go-fed.
+ //
+ // The wrapping function will add the 'object' IRIs to a specific
+ // 'target' collection if the 'target' collection(s) live on this
+ // server.
+ Add func(context.Context, vocab.ActivityStreamsAdd) error
+ // Remove handles additional side effects for the Remove ActivityStreams
+ // type, specific to the application using go-fed.
+ //
+ // The wrapping function will remove all 'object' IRIs from a specific
+ // 'target' collection if the 'target' collection(s) live on this
+ // server.
+ Remove func(context.Context, vocab.ActivityStreamsRemove) error
+ // Like handles additional side effects for the Like ActivityStreams
+ // type, specific to the application using go-fed.
+ //
+ // The wrapping function will add the activity to the "likes" collection
+ // on all 'object' targets owned by this server.
+ Like func(context.Context, vocab.ActivityStreamsLike) error
+ // Announce handles additional side effects for the Announce
+ // ActivityStreams type, specific to the application using go-fed.
+ //
+ // The wrapping function will add the activity to the "shares"
+ // collection on all 'object' targets owned by this server.
+ Announce func(context.Context, vocab.ActivityStreamsAnnounce) error
+ // Undo handles additional side effects for the Undo ActivityStreams
+ // type, specific to the application using go-fed.
+ //
+ // The wrapping function ensures the 'actor' on the 'Undo'
+ // is be the same as the 'actor' on all Activities being undone.
+ // It enforces that the actors on the Undo must correspond to all of the
+ // 'object' actors in some manner.
+ //
+ // It is expected that the application will implement the proper
+ // reversal of activities that are being undone.
+ Undo func(context.Context, vocab.ActivityStreamsUndo) error
+ // Block handles additional side effects for the Block ActivityStreams
+ // type, specific to the application using go-fed.
+ //
+ // The wrapping function provides no default side effects. It simply
+ // calls the wrapped function. However, note that Blocks should not be
+ // received from a federated peer, as delivering Blocks explicitly
+ // deviates from the original ActivityPub specification.
+ Block func(context.Context, vocab.ActivityStreamsBlock) error
+
+ // Sidechannel data -- this is set at request handling time. These must
+ // be set before the callbacks are used.
+
+ // db is the Database the FederatingWrappedCallbacks should use.
+ db Database
+ // inboxIRI is the inboxIRI that is handling this callback.
+ inboxIRI *url.URL
+ // addNewIds creates new 'id' entries on an activity and its objects if
+ // it is a Create activity.
+ addNewIds func(c context.Context, activity Activity) error
+ // deliver delivers an outgoing message.
+ deliver func(c context.Context, outboxIRI *url.URL, activity Activity) error
+ // newTransport creates a new Transport.
+ newTransport func(c context.Context, actorBoxIRI *url.URL, gofedAgent string) (t Transport, err error)
+}
+
+// callbacks returns the WrappedCallbacks members into a single interface slice
+// for use in streams.Resolver callbacks.
+//
+// If the given functions have a type that collides with the default behavior,
+// then disable our default behavior
+func (w FederatingWrappedCallbacks) callbacks(fns []interface{}) []interface{} {
+ enableCreate := true
+ enableUpdate := true
+ enableDelete := true
+ enableFollow := true
+ enableAccept := true
+ enableReject := true
+ enableAdd := true
+ enableRemove := true
+ enableLike := true
+ enableAnnounce := true
+ enableUndo := true
+ enableBlock := true
+ for _, fn := range fns {
+ switch fn.(type) {
+ default:
+ continue
+ case func(context.Context, vocab.ActivityStreamsCreate) error:
+ enableCreate = false
+ case func(context.Context, vocab.ActivityStreamsUpdate) error:
+ enableUpdate = false
+ case func(context.Context, vocab.ActivityStreamsDelete) error:
+ enableDelete = false
+ case func(context.Context, vocab.ActivityStreamsFollow) error:
+ enableFollow = false
+ case func(context.Context, vocab.ActivityStreamsAccept) error:
+ enableAccept = false
+ case func(context.Context, vocab.ActivityStreamsReject) error:
+ enableReject = false
+ case func(context.Context, vocab.ActivityStreamsAdd) error:
+ enableAdd = false
+ case func(context.Context, vocab.ActivityStreamsRemove) error:
+ enableRemove = false
+ case func(context.Context, vocab.ActivityStreamsLike) error:
+ enableLike = false
+ case func(context.Context, vocab.ActivityStreamsAnnounce) error:
+ enableAnnounce = false
+ case func(context.Context, vocab.ActivityStreamsUndo) error:
+ enableUndo = false
+ case func(context.Context, vocab.ActivityStreamsBlock) error:
+ enableBlock = false
+ }
+ }
+ if enableCreate {
+ fns = append(fns, w.create)
+ }
+ if enableUpdate {
+ fns = append(fns, w.update)
+ }
+ if enableDelete {
+ fns = append(fns, w.deleteFn)
+ }
+ if enableFollow {
+ fns = append(fns, w.follow)
+ }
+ if enableAccept {
+ fns = append(fns, w.accept)
+ }
+ if enableReject {
+ fns = append(fns, w.reject)
+ }
+ if enableAdd {
+ fns = append(fns, w.add)
+ }
+ if enableRemove {
+ fns = append(fns, w.remove)
+ }
+ if enableLike {
+ fns = append(fns, w.like)
+ }
+ if enableAnnounce {
+ fns = append(fns, w.announce)
+ }
+ if enableUndo {
+ fns = append(fns, w.undo)
+ }
+ if enableBlock {
+ fns = append(fns, w.block)
+ }
+ return fns
+}
+
+// create implements the federating Create activity side effects.
+func (w FederatingWrappedCallbacks) create(c context.Context, a vocab.ActivityStreamsCreate) error {
+ op := a.GetActivityStreamsObject()
+ if op == nil || op.Len() == 0 {
+ return ErrObjectRequired
+ }
+ // Create anonymous loop function to be able to properly scope the defer
+ // for the database lock at each iteration.
+ loopFn := func(iter vocab.ActivityStreamsObjectPropertyIterator) error {
+ t := iter.GetType()
+ if t == nil && iter.IsIRI() {
+ // Attempt to dereference the IRI instead
+ tport, err := w.newTransport(c, w.inboxIRI, goFedUserAgent())
+ if err != nil {
+ return err
+ }
+ b, err := tport.Dereference(c, iter.GetIRI())
+ if err != nil {
+ return err
+ }
+ var m map[string]interface{}
+ if err = json.Unmarshal(b, &m); err != nil {
+ return err
+ }
+ t, err = streams.ToType(c, m)
+ if err != nil {
+ return err
+ }
+ } else if t == nil {
+ return fmt.Errorf("cannot handle federated create: object is neither a value nor IRI")
+ }
+ id, err := GetId(t)
+ if err != nil {
+ return err
+ }
+ err = w.db.Lock(c, id)
+ if err != nil {
+ return err
+ }
+ defer w.db.Unlock(c, id)
+ if err := w.db.Create(c, t); err != nil {
+ return err
+ }
+ return nil
+ }
+ for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
+ if err := loopFn(iter); err != nil {
+ return err
+ }
+ }
+ if w.Create != nil {
+ return w.Create(c, a)
+ }
+ return nil
+}
+
+// update implements the federating Update activity side effects.
+func (w FederatingWrappedCallbacks) update(c context.Context, a vocab.ActivityStreamsUpdate) error {
+ op := a.GetActivityStreamsObject()
+ if op == nil || op.Len() == 0 {
+ return ErrObjectRequired
+ }
+ if err := mustHaveActivityOriginMatchObjects(a); err != nil {
+ return err
+ }
+ // Create anonymous loop function to be able to properly scope the defer
+ // for the database lock at each iteration.
+ loopFn := func(iter vocab.ActivityStreamsObjectPropertyIterator) error {
+ t := iter.GetType()
+ if t == nil {
+ return fmt.Errorf("update requires an object to be wholly provided")
+ }
+ id, err := GetId(t)
+ if err != nil {
+ return err
+ }
+ err = w.db.Lock(c, id)
+ if err != nil {
+ return err
+ }
+ defer w.db.Unlock(c, id)
+ if err := w.db.Update(c, t); err != nil {
+ return err
+ }
+ return nil
+ }
+ for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
+ if err := loopFn(iter); err != nil {
+ return err
+ }
+ }
+ if w.Update != nil {
+ return w.Update(c, a)
+ }
+ return nil
+}
+
+// deleteFn implements the federating Delete activity side effects.
+func (w FederatingWrappedCallbacks) deleteFn(c context.Context, a vocab.ActivityStreamsDelete) error {
+ op := a.GetActivityStreamsObject()
+ if op == nil || op.Len() == 0 {
+ return ErrObjectRequired
+ }
+ if err := mustHaveActivityOriginMatchObjects(a); err != nil {
+ return err
+ }
+ // Create anonymous loop function to be able to properly scope the defer
+ // for the database lock at each iteration.
+ loopFn := func(iter vocab.ActivityStreamsObjectPropertyIterator) error {
+ id, err := ToId(iter)
+ if err != nil {
+ return err
+ }
+ err = w.db.Lock(c, id)
+ if err != nil {
+ return err
+ }
+ defer w.db.Unlock(c, id)
+ if err := w.db.Delete(c, id); err != nil {
+ return err
+ }
+ return nil
+ }
+ for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
+ if err := loopFn(iter); err != nil {
+ return err
+ }
+ }
+ if w.Delete != nil {
+ return w.Delete(c, a)
+ }
+ return nil
+}
+
+// follow implements the federating Follow activity side effects.
+func (w FederatingWrappedCallbacks) follow(c context.Context, a vocab.ActivityStreamsFollow) error {
+ op := a.GetActivityStreamsObject()
+ if op == nil || op.Len() == 0 {
+ return ErrObjectRequired
+ }
+ // Check that we own at least one of the 'object' properties, and ensure
+ // it is to the actor that owns this inbox.
+ //
+ // If not then don't send a response. It was federated to us as an FYI,
+ // by mistake, or some other reason.
+ if err := w.db.Lock(c, w.inboxIRI); err != nil {
+ return err
+ }
+ // WARNING: Unlock not deferred.
+ actorIRI, err := w.db.ActorForInbox(c, w.inboxIRI)
+ if err != nil {
+ w.db.Unlock(c, w.inboxIRI)
+ return err
+ }
+ w.db.Unlock(c, w.inboxIRI)
+ // Unlock must be called by now and every branch above.
+ isMe := false
+ if w.OnFollow != OnFollowDoNothing {
+ for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
+ id, err := ToId(iter)
+ if err != nil {
+ return err
+ }
+ if id.String() == actorIRI.String() {
+ isMe = true
+ break
+ }
+ }
+ }
+ if isMe {
+ // Prepare the response.
+ var response Activity
+ if w.OnFollow == OnFollowAutomaticallyAccept {
+ response = streams.NewActivityStreamsAccept()
+ } else if w.OnFollow == OnFollowAutomaticallyReject {
+ response = streams.NewActivityStreamsReject()
+ } else {
+ return fmt.Errorf("unknown OnFollowBehavior: %d", w.OnFollow)
+ }
+ // Set us as the 'actor'.
+ me := streams.NewActivityStreamsActorProperty()
+ response.SetActivityStreamsActor(me)
+ me.AppendIRI(actorIRI)
+ // Set the Follow as the 'object' property.
+ op := streams.NewActivityStreamsObjectProperty()
+ response.SetActivityStreamsObject(op)
+ op.AppendActivityStreamsFollow(a)
+ // Add all actors on the original Follow to the 'to' property.
+ recipients := make([]*url.URL, 0)
+ to := streams.NewActivityStreamsToProperty()
+ response.SetActivityStreamsTo(to)
+ followActors := a.GetActivityStreamsActor()
+ for iter := followActors.Begin(); iter != followActors.End(); iter = iter.Next() {
+ id, err := ToId(iter)
+ if err != nil {
+ return err
+ }
+ to.AppendIRI(id)
+ recipients = append(recipients, id)
+ }
+ if w.OnFollow == OnFollowAutomaticallyAccept {
+ // If automatically accepting, then also update our
+ // followers collection with the new actors.
+ //
+ // If automatically rejecting, do not update the
+ // followers collection.
+ if err := w.db.Lock(c, actorIRI); err != nil {
+ return err
+ }
+ // WARNING: Unlock not deferred.
+ followers, err := w.db.Followers(c, actorIRI)
+ if err != nil {
+ w.db.Unlock(c, actorIRI)
+ return err
+ }
+ items := followers.GetActivityStreamsItems()
+ if items == nil {
+ items = streams.NewActivityStreamsItemsProperty()
+ followers.SetActivityStreamsItems(items)
+ }
+ for _, elem := range recipients {
+ items.PrependIRI(elem)
+ }
+ if err = w.db.Update(c, followers); err != nil {
+ w.db.Unlock(c, actorIRI)
+ return err
+ }
+ w.db.Unlock(c, actorIRI)
+ // Unlock must be called by now and every branch above.
+ }
+ // Lock without defer!
+ w.db.Lock(c, w.inboxIRI)
+ outboxIRI, err := w.db.OutboxForInbox(c, w.inboxIRI)
+ if err != nil {
+ w.db.Unlock(c, w.inboxIRI)
+ return err
+ }
+ w.db.Unlock(c, w.inboxIRI)
+ // Everything must be unlocked by now.
+ if err := w.addNewIds(c, response); err != nil {
+ return err
+ } else if err := w.deliver(c, outboxIRI, response); err != nil {
+ return err
+ }
+ }
+ if w.Follow != nil {
+ return w.Follow(c, a)
+ }
+ return nil
+}
+
+// accept implements the federating Accept activity side effects.
+func (w FederatingWrappedCallbacks) accept(c context.Context, a vocab.ActivityStreamsAccept) error {
+ op := a.GetActivityStreamsObject()
+ if op != nil && op.Len() > 0 {
+ // Get this actor's id.
+ if err := w.db.Lock(c, w.inboxIRI); err != nil {
+ return err
+ }
+ // WARNING: Unlock not deferred.
+ actorIRI, err := w.db.ActorForInbox(c, w.inboxIRI)
+ if err != nil {
+ w.db.Unlock(c, w.inboxIRI)
+ return err
+ }
+ w.db.Unlock(c, w.inboxIRI)
+ // Unlock must be called by now and every branch above.
+ //
+ // Determine if we are in a follow on the 'object' property.
+ //
+ // TODO: Handle Accept multiple Follow.
+ var maybeMyFollowIRI *url.URL
+ for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
+ t := iter.GetType()
+ if t == nil && iter.IsIRI() {
+ // Attempt to dereference the IRI instead
+ tport, err := w.newTransport(c, w.inboxIRI, goFedUserAgent())
+ if err != nil {
+ return err
+ }
+ b, err := tport.Dereference(c, iter.GetIRI())
+ if err != nil {
+ return err
+ }
+ var m map[string]interface{}
+ if err = json.Unmarshal(b, &m); err != nil {
+ return err
+ }
+ t, err = streams.ToType(c, m)
+ if err != nil {
+ return err
+ }
+ } else if t == nil {
+ return fmt.Errorf("cannot handle federated create: object is neither a value nor IRI")
+ }
+ // Ensure it is a Follow.
+ if !streams.IsOrExtendsActivityStreamsFollow(t) {
+ continue
+ }
+ follow, ok := t.(Activity)
+ if !ok {
+ return fmt.Errorf("a Follow in an Accept does not satisfy the Activity interface")
+ }
+ followId, err := GetId(follow)
+ if err != nil {
+ return err
+ }
+ // Ensure that we are one of the actors on the Follow.
+ actors := follow.GetActivityStreamsActor()
+ for iter := actors.Begin(); iter != actors.End(); iter = iter.Next() {
+ id, err := ToId(iter)
+ if err != nil {
+ return err
+ }
+ if id.String() == actorIRI.String() {
+ maybeMyFollowIRI = followId
+ break
+ }
+ }
+ // Continue breaking if we found ourselves
+ if maybeMyFollowIRI != nil {
+ break
+ }
+ }
+ // If we received an Accept whose 'object' is a Follow with an
+ // Accept that we sent, add to the following collection.
+ if maybeMyFollowIRI != nil {
+ // Verify our Follow request exists and the peer didn't
+ // fabricate it.
+ activityActors := a.GetActivityStreamsActor()
+ if activityActors == nil || activityActors.Len() == 0 {
+ return fmt.Errorf("an Accept with a Follow has no actors")
+ }
+ // This may be a duplicate check if we dereferenced the
+ // Follow above. TODO: Separate this logic to avoid
+ // redundancy.
+ //
+ // Use an anonymous function to properly scope the
+ // database lock, immediately call it.
+ err = func() error {
+ if err := w.db.Lock(c, maybeMyFollowIRI); err != nil {
+ return err
+ }
+ defer w.db.Unlock(c, maybeMyFollowIRI)
+ t, err := w.db.Get(c, maybeMyFollowIRI)
+ if err != nil {
+ return err
+ }
+ if !streams.IsOrExtendsActivityStreamsFollow(t) {
+ return fmt.Errorf("peer gave an Accept wrapping a Follow but provided a non-Follow id")
+ }
+ follow, ok := t.(Activity)
+ if !ok {
+ return fmt.Errorf("a Follow in an Accept does not satisfy the Activity interface")
+ }
+ // Ensure that we are one of the actors on the Follow.
+ ok = false
+ actors := follow.GetActivityStreamsActor()
+ for iter := actors.Begin(); iter != actors.End(); iter = iter.Next() {
+ id, err := ToId(iter)
+ if err != nil {
+ return err
+ }
+ if id.String() == actorIRI.String() {
+ ok = true
+ break
+ }
+ }
+ if !ok {
+ return fmt.Errorf("peer gave an Accept wrapping a Follow but we are not the actor on that Follow")
+ }
+ // Build map of original Accept actors
+ acceptActors := make(map[string]bool)
+ for iter := activityActors.Begin(); iter != activityActors.End(); iter = iter.Next() {
+ id, err := ToId(iter)
+ if err != nil {
+ return err
+ }
+ acceptActors[id.String()] = false
+ }
+ // Verify all actor(s) were on the original Follow.
+ followObj := follow.GetActivityStreamsObject()
+ for iter := followObj.Begin(); iter != followObj.End(); iter = iter.Next() {
+ id, err := ToId(iter)
+ if err != nil {
+ return err
+ }
+ if _, ok := acceptActors[id.String()]; ok {
+ acceptActors[id.String()] = true
+ }
+ }
+ for _, found := range acceptActors {
+ if !found {
+ return fmt.Errorf("peer gave an Accept wrapping a Follow but was not an object in the original Follow")
+ }
+ }
+ return nil
+ }()
+ if err != nil {
+ return err
+ }
+ // Add the peer to our following collection.
+ if err := w.db.Lock(c, actorIRI); err != nil {
+ return err
+ }
+ // WARNING: Unlock not deferred.
+ following, err := w.db.Following(c, actorIRI)
+ if err != nil {
+ w.db.Unlock(c, actorIRI)
+ return err
+ }
+ items := following.GetActivityStreamsItems()
+ if items == nil {
+ items = streams.NewActivityStreamsItemsProperty()
+ following.SetActivityStreamsItems(items)
+ }
+ for iter := activityActors.Begin(); iter != activityActors.End(); iter = iter.Next() {
+ id, err := ToId(iter)
+ if err != nil {
+ w.db.Unlock(c, actorIRI)
+ return err
+ }
+ items.PrependIRI(id)
+ }
+ if err = w.db.Update(c, following); err != nil {
+ w.db.Unlock(c, actorIRI)
+ return err
+ }
+ w.db.Unlock(c, actorIRI)
+ // Unlock must be called by now and every branch above.
+ }
+ }
+ if w.Accept != nil {
+ return w.Accept(c, a)
+ }
+ return nil
+}
+
+// reject implements the federating Reject activity side effects.
+func (w FederatingWrappedCallbacks) reject(c context.Context, a vocab.ActivityStreamsReject) error {
+ if w.Reject != nil {
+ return w.Reject(c, a)
+ }
+ return nil
+}
+
+// add implements the federating Add activity side effects.
+func (w FederatingWrappedCallbacks) add(c context.Context, a vocab.ActivityStreamsAdd) error {
+ op := a.GetActivityStreamsObject()
+ if op == nil || op.Len() == 0 {
+ return ErrObjectRequired
+ }
+ target := a.GetActivityStreamsTarget()
+ if target == nil || target.Len() == 0 {
+ return ErrTargetRequired
+ }
+ if err := add(c, op, target, w.db); err != nil {
+ return err
+ }
+ if w.Add != nil {
+ return w.Add(c, a)
+ }
+ return nil
+}
+
+// remove implements the federating Remove activity side effects.
+func (w FederatingWrappedCallbacks) remove(c context.Context, a vocab.ActivityStreamsRemove) error {
+ op := a.GetActivityStreamsObject()
+ if op == nil || op.Len() == 0 {
+ return ErrObjectRequired
+ }
+ target := a.GetActivityStreamsTarget()
+ if target == nil || target.Len() == 0 {
+ return ErrTargetRequired
+ }
+ if err := remove(c, op, target, w.db); err != nil {
+ return err
+ }
+ if w.Remove != nil {
+ return w.Remove(c, a)
+ }
+ return nil
+}
+
+// like implements the federating Like activity side effects.
+func (w FederatingWrappedCallbacks) like(c context.Context, a vocab.ActivityStreamsLike) error {
+ op := a.GetActivityStreamsObject()
+ if op == nil || op.Len() == 0 {
+ return ErrObjectRequired
+ }
+ id, err := GetId(a)
+ if err != nil {
+ return err
+ }
+ // Create anonymous loop function to be able to properly scope the defer
+ // for the database lock at each iteration.
+ loopFn := func(iter vocab.ActivityStreamsObjectPropertyIterator) error {
+ objId, err := ToId(iter)
+ if err != nil {
+ return err
+ }
+ if err := w.db.Lock(c, objId); err != nil {
+ return err
+ }
+ defer w.db.Unlock(c, objId)
+ if owns, err := w.db.Owns(c, objId); err != nil {
+ return err
+ } else if !owns {
+ return nil
+ }
+ t, err := w.db.Get(c, objId)
+ if err != nil {
+ return err
+ }
+ l, ok := t.(likeser)
+ if !ok {
+ return fmt.Errorf("cannot add Like to likes collection for type %T", t)
+ }
+ // Get 'likes' property on the object, creating default if
+ // necessary.
+ likes := l.GetActivityStreamsLikes()
+ if likes == nil {
+ likes = streams.NewActivityStreamsLikesProperty()
+ l.SetActivityStreamsLikes(likes)
+ }
+ // Get 'likes' value, defaulting to a collection.
+ likesT := likes.GetType()
+ if likesT == nil {
+ col := streams.NewActivityStreamsCollection()
+ likesT = col
+ likes.SetActivityStreamsCollection(col)
+ }
+ // Prepend the activity's 'id' on the 'likes' Collection or
+ // OrderedCollection.
+ if col, ok := likesT.(itemser); ok {
+ items := col.GetActivityStreamsItems()
+ if items == nil {
+ items = streams.NewActivityStreamsItemsProperty()
+ col.SetActivityStreamsItems(items)
+ }
+ items.PrependIRI(id)
+ } else if oCol, ok := likesT.(orderedItemser); ok {
+ oItems := oCol.GetActivityStreamsOrderedItems()
+ if oItems == nil {
+ oItems = streams.NewActivityStreamsOrderedItemsProperty()
+ oCol.SetActivityStreamsOrderedItems(oItems)
+ }
+ oItems.PrependIRI(id)
+ } else {
+ return fmt.Errorf("likes type is neither a Collection nor an OrderedCollection: %T", likesT)
+ }
+ err = w.db.Update(c, t)
+ if err != nil {
+ return err
+ }
+ return nil
+ }
+ for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
+ if err := loopFn(iter); err != nil {
+ return err
+ }
+ }
+ if w.Like != nil {
+ return w.Like(c, a)
+ }
+ return nil
+}
+
+// announce implements the federating Announce activity side effects.
+func (w FederatingWrappedCallbacks) announce(c context.Context, a vocab.ActivityStreamsAnnounce) error {
+ id, err := GetId(a)
+ if err != nil {
+ return err
+ }
+ op := a.GetActivityStreamsObject()
+ // Create anonymous loop function to be able to properly scope the defer
+ // for the database lock at each iteration.
+ loopFn := func(iter vocab.ActivityStreamsObjectPropertyIterator) error {
+ objId, err := ToId(iter)
+ if err != nil {
+ return err
+ }
+ if err := w.db.Lock(c, objId); err != nil {
+ return err
+ }
+ defer w.db.Unlock(c, objId)
+ if owns, err := w.db.Owns(c, objId); err != nil {
+ return err
+ } else if !owns {
+ return nil
+ }
+ t, err := w.db.Get(c, objId)
+ if err != nil {
+ return err
+ }
+ s, ok := t.(shareser)
+ if !ok {
+ return fmt.Errorf("cannot add Announce to Shares collection for type %T", t)
+ }
+ // Get 'shares' property on the object, creating default if
+ // necessary.
+ shares := s.GetActivityStreamsShares()
+ if shares == nil {
+ shares = streams.NewActivityStreamsSharesProperty()
+ s.SetActivityStreamsShares(shares)
+ }
+ // Get 'shares' value, defaulting to a collection.
+ sharesT := shares.GetType()
+ if sharesT == nil {
+ col := streams.NewActivityStreamsCollection()
+ sharesT = col
+ shares.SetActivityStreamsCollection(col)
+ }
+ // Prepend the activity's 'id' on the 'shares' Collection or
+ // OrderedCollection.
+ if col, ok := sharesT.(itemser); ok {
+ items := col.GetActivityStreamsItems()
+ if items == nil {
+ items = streams.NewActivityStreamsItemsProperty()
+ col.SetActivityStreamsItems(items)
+ }
+ items.PrependIRI(id)
+ } else if oCol, ok := sharesT.(orderedItemser); ok {
+ oItems := oCol.GetActivityStreamsOrderedItems()
+ if oItems == nil {
+ oItems = streams.NewActivityStreamsOrderedItemsProperty()
+ oCol.SetActivityStreamsOrderedItems(oItems)
+ }
+ oItems.PrependIRI(id)
+ } else {
+ return fmt.Errorf("shares type is neither a Collection nor an OrderedCollection: %T", sharesT)
+ }
+ err = w.db.Update(c, t)
+ if err != nil {
+ return err
+ }
+ return nil
+ }
+ if op != nil {
+ for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
+ if err := loopFn(iter); err != nil {
+ return err
+ }
+ }
+ }
+ if w.Announce != nil {
+ return w.Announce(c, a)
+ }
+ return nil
+}
+
+// undo implements the federating Undo activity side effects.
+func (w FederatingWrappedCallbacks) undo(c context.Context, a vocab.ActivityStreamsUndo) error {
+ op := a.GetActivityStreamsObject()
+ if op == nil || op.Len() == 0 {
+ return ErrObjectRequired
+ }
+ actors := a.GetActivityStreamsActor()
+ if err := mustHaveActivityActorsMatchObjectActors(c, actors, op, w.newTransport, w.inboxIRI); err != nil {
+ return err
+ }
+ if w.Undo != nil {
+ return w.Undo(c, a)
+ }
+ return nil
+}
+
+// block implements the federating Block activity side effects.
+func (w FederatingWrappedCallbacks) block(c context.Context, a vocab.ActivityStreamsBlock) error {
+ op := a.GetActivityStreamsObject()
+ if op == nil || op.Len() == 0 {
+ return ErrObjectRequired
+ }
+ if w.Block != nil {
+ return w.Block(c, a)
+ }
+ return nil
+}