summaryrefslogtreecommitdiff
path: root/internal/message/processor.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/message/processor.go')
-rw-r--r--internal/message/processor.go138
1 files changed, 32 insertions, 106 deletions
diff --git a/internal/message/processor.go b/internal/message/processor.go
index 7fc850e37..c9ba5f858 100644
--- a/internal/message/processor.go
+++ b/internal/message/processor.go
@@ -20,10 +20,7 @@ package message
import (
"context"
- "errors"
- "fmt"
"net/http"
- "net/url"
"github.com/sirupsen/logrus"
apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
@@ -45,13 +42,13 @@ import (
// for clean distribution of messages without slowing down the client API and harming the user experience.
type Processor interface {
// ToClientAPI returns a channel for putting in messages that need to go to the gts client API.
- ToClientAPI() chan ToClientAPI
+ // ToClientAPI() chan gtsmodel.ToClientAPI
// FromClientAPI returns a channel for putting messages in that come from the client api going to the processor
- FromClientAPI() chan FromClientAPI
+ FromClientAPI() chan gtsmodel.FromClientAPI
// ToFederator returns a channel for putting in messages that need to go to the federator (activitypub).
- ToFederator() chan ToFederator
+ // ToFederator() chan gtsmodel.ToFederator
// FromFederator returns a channel for putting messages in that come from the federator (activitypub) going into the processor
- FromFederator() chan FromFederator
+ FromFederator() chan gtsmodel.FromFederator
// Start starts the Processor, reading from its channels and passing messages back and forth.
Start() error
// Stop stops the processor cleanly, finishing handling any remaining messages before closing down.
@@ -71,6 +68,11 @@ type Processor interface {
AccountGet(authed *oauth.Auth, targetAccountID string) (*apimodel.Account, error)
// AccountUpdate processes the update of an account with the given form
AccountUpdate(authed *oauth.Auth, form *apimodel.UpdateCredentialsRequest) (*apimodel.Account, error)
+ // AccountStatusesGet fetches a number of statuses (in time descending order) from the given account, filtered by visibility for
+ // the account given in authed.
+ AccountStatusesGet(authed *oauth.Auth, targetAccountID string, limit int, excludeReplies bool, maxID string, pinned bool, mediaOnly bool) ([]apimodel.Status, ErrorWithCode)
+ // AccountFollowersGet
+ AccountFollowersGet(authed *oauth.Auth, targetAccountID string) ([]apimodel.Account, ErrorWithCode)
// AdminEmojiCreate handles the creation of a new instance emoji by an admin, using the given form.
AdminEmojiCreate(authed *oauth.Auth, form *apimodel.EmojiCreateRequest) (*apimodel.Emoji, error)
@@ -142,10 +144,10 @@ type Processor interface {
// processor just implements the Processor interface
type processor struct {
// federator pub.FederatingActor
- toClientAPI chan ToClientAPI
- fromClientAPI chan FromClientAPI
- toFederator chan ToFederator
- fromFederator chan FromFederator
+ // toClientAPI chan gtsmodel.ToClientAPI
+ fromClientAPI chan gtsmodel.FromClientAPI
+ // toFederator chan gtsmodel.ToFederator
+ fromFederator chan gtsmodel.FromFederator
federator federation.Federator
stop chan interface{}
log *logrus.Logger
@@ -160,10 +162,10 @@ type processor struct {
// NewProcessor returns a new Processor that uses the given federator and logger
func NewProcessor(config *config.Config, tc typeutils.TypeConverter, federator federation.Federator, oauthServer oauth.Server, mediaHandler media.Handler, storage storage.Storage, db db.DB, log *logrus.Logger) Processor {
return &processor{
- toClientAPI: make(chan ToClientAPI, 100),
- fromClientAPI: make(chan FromClientAPI, 100),
- toFederator: make(chan ToFederator, 100),
- fromFederator: make(chan FromFederator, 100),
+ // toClientAPI: make(chan gtsmodel.ToClientAPI, 100),
+ fromClientAPI: make(chan gtsmodel.FromClientAPI, 100),
+ // toFederator: make(chan gtsmodel.ToFederator, 100),
+ fromFederator: make(chan gtsmodel.FromFederator, 100),
federator: federator,
stop: make(chan interface{}),
log: log,
@@ -176,19 +178,19 @@ func NewProcessor(config *config.Config, tc typeutils.TypeConverter, federator f
}
}
-func (p *processor) ToClientAPI() chan ToClientAPI {
- return p.toClientAPI
-}
+// func (p *processor) ToClientAPI() chan gtsmodel.ToClientAPI {
+// return p.toClientAPI
+// }
-func (p *processor) FromClientAPI() chan FromClientAPI {
+func (p *processor) FromClientAPI() chan gtsmodel.FromClientAPI {
return p.fromClientAPI
}
-func (p *processor) ToFederator() chan ToFederator {
- return p.toFederator
-}
+// func (p *processor) ToFederator() chan gtsmodel.ToFederator {
+// return p.toFederator
+// }
-func (p *processor) FromFederator() chan FromFederator {
+func (p *processor) FromFederator() chan gtsmodel.FromFederator {
return p.fromFederator
}
@@ -198,17 +200,20 @@ func (p *processor) Start() error {
DistLoop:
for {
select {
- case clientMsg := <-p.toClientAPI:
- p.log.Infof("received message TO client API: %+v", clientMsg)
+ // case clientMsg := <-p.toClientAPI:
+ // p.log.Infof("received message TO client API: %+v", clientMsg)
case clientMsg := <-p.fromClientAPI:
p.log.Infof("received message FROM client API: %+v", clientMsg)
if err := p.processFromClientAPI(clientMsg); err != nil {
p.log.Error(err)
}
- case federatorMsg := <-p.toFederator:
- p.log.Infof("received message TO federator: %+v", federatorMsg)
+ // case federatorMsg := <-p.toFederator:
+ // p.log.Infof("received message TO federator: %+v", federatorMsg)
case federatorMsg := <-p.fromFederator:
p.log.Infof("received message FROM federator: %+v", federatorMsg)
+ if err := p.processFromFederator(federatorMsg); err != nil {
+ p.log.Error(err)
+ }
case <-p.stop:
break DistLoop
}
@@ -223,82 +228,3 @@ func (p *processor) Stop() error {
close(p.stop)
return nil
}
-
-// ToClientAPI wraps a message that travels from the processor into the client API
-type ToClientAPI struct {
- APObjectType gtsmodel.ActivityStreamsObject
- APActivityType gtsmodel.ActivityStreamsActivity
- Activity interface{}
-}
-
-// FromClientAPI wraps a message that travels from client API into the processor
-type FromClientAPI struct {
- APObjectType gtsmodel.ActivityStreamsObject
- APActivityType gtsmodel.ActivityStreamsActivity
- Activity interface{}
-}
-
-// ToFederator wraps a message that travels from the processor into the federator
-type ToFederator struct {
- APObjectType gtsmodel.ActivityStreamsObject
- APActivityType gtsmodel.ActivityStreamsActivity
- Activity interface{}
-}
-
-// FromFederator wraps a message that travels from the federator into the processor
-type FromFederator struct {
- APObjectType gtsmodel.ActivityStreamsObject
- APActivityType gtsmodel.ActivityStreamsActivity
- Activity interface{}
-}
-
-func (p *processor) processFromClientAPI(clientMsg FromClientAPI) error {
- switch clientMsg.APObjectType {
- case gtsmodel.ActivityStreamsNote:
- status, ok := clientMsg.Activity.(*gtsmodel.Status)
- if !ok {
- return errors.New("note was not parseable as *gtsmodel.Status")
- }
-
- if err := p.notifyStatus(status); err != nil {
- return err
- }
-
- if status.VisibilityAdvanced.Federated {
- return p.federateStatus(status)
- }
- return nil
- }
- return fmt.Errorf("message type unprocessable: %+v", clientMsg)
-}
-
-func (p *processor) federateStatus(status *gtsmodel.Status) error {
- // derive the sending account -- it might be attached to the status already
- sendingAcct := &gtsmodel.Account{}
- if status.GTSAccount != nil {
- sendingAcct = status.GTSAccount
- } else {
- // it wasn't attached so get it from the db instead
- if err := p.db.GetByID(status.AccountID, sendingAcct); err != nil {
- return err
- }
- }
-
- outboxURI, err := url.Parse(sendingAcct.OutboxURI)
- if err != nil {
- return err
- }
-
- // convert the status to AS format Note
- note, err := p.tc.StatusToAS(status)
- if err != nil {
- return err
- }
-
- _, err = p.federator.FederatingActor().Send(context.Background(), outboxURI, note)
- return err
-}
-
-func (p *processor) notifyStatus(status *gtsmodel.Status) error {
- return nil
-}