diff options
Diffstat (limited to 'internal/message/processor.go')
-rw-r--r-- | internal/message/processor.go | 138 |
1 files changed, 32 insertions, 106 deletions
diff --git a/internal/message/processor.go b/internal/message/processor.go index 7fc850e37..c9ba5f858 100644 --- a/internal/message/processor.go +++ b/internal/message/processor.go @@ -20,10 +20,7 @@ package message import ( "context" - "errors" - "fmt" "net/http" - "net/url" "github.com/sirupsen/logrus" apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" @@ -45,13 +42,13 @@ import ( // for clean distribution of messages without slowing down the client API and harming the user experience. type Processor interface { // ToClientAPI returns a channel for putting in messages that need to go to the gts client API. - ToClientAPI() chan ToClientAPI + // ToClientAPI() chan gtsmodel.ToClientAPI // FromClientAPI returns a channel for putting messages in that come from the client api going to the processor - FromClientAPI() chan FromClientAPI + FromClientAPI() chan gtsmodel.FromClientAPI // ToFederator returns a channel for putting in messages that need to go to the federator (activitypub). - ToFederator() chan ToFederator + // ToFederator() chan gtsmodel.ToFederator // FromFederator returns a channel for putting messages in that come from the federator (activitypub) going into the processor - FromFederator() chan FromFederator + FromFederator() chan gtsmodel.FromFederator // Start starts the Processor, reading from its channels and passing messages back and forth. Start() error // Stop stops the processor cleanly, finishing handling any remaining messages before closing down. @@ -71,6 +68,11 @@ type Processor interface { AccountGet(authed *oauth.Auth, targetAccountID string) (*apimodel.Account, error) // AccountUpdate processes the update of an account with the given form AccountUpdate(authed *oauth.Auth, form *apimodel.UpdateCredentialsRequest) (*apimodel.Account, error) + // AccountStatusesGet fetches a number of statuses (in time descending order) from the given account, filtered by visibility for + // the account given in authed. + AccountStatusesGet(authed *oauth.Auth, targetAccountID string, limit int, excludeReplies bool, maxID string, pinned bool, mediaOnly bool) ([]apimodel.Status, ErrorWithCode) + // AccountFollowersGet + AccountFollowersGet(authed *oauth.Auth, targetAccountID string) ([]apimodel.Account, ErrorWithCode) // AdminEmojiCreate handles the creation of a new instance emoji by an admin, using the given form. AdminEmojiCreate(authed *oauth.Auth, form *apimodel.EmojiCreateRequest) (*apimodel.Emoji, error) @@ -142,10 +144,10 @@ type Processor interface { // processor just implements the Processor interface type processor struct { // federator pub.FederatingActor - toClientAPI chan ToClientAPI - fromClientAPI chan FromClientAPI - toFederator chan ToFederator - fromFederator chan FromFederator + // toClientAPI chan gtsmodel.ToClientAPI + fromClientAPI chan gtsmodel.FromClientAPI + // toFederator chan gtsmodel.ToFederator + fromFederator chan gtsmodel.FromFederator federator federation.Federator stop chan interface{} log *logrus.Logger @@ -160,10 +162,10 @@ type processor struct { // NewProcessor returns a new Processor that uses the given federator and logger func NewProcessor(config *config.Config, tc typeutils.TypeConverter, federator federation.Federator, oauthServer oauth.Server, mediaHandler media.Handler, storage storage.Storage, db db.DB, log *logrus.Logger) Processor { return &processor{ - toClientAPI: make(chan ToClientAPI, 100), - fromClientAPI: make(chan FromClientAPI, 100), - toFederator: make(chan ToFederator, 100), - fromFederator: make(chan FromFederator, 100), + // toClientAPI: make(chan gtsmodel.ToClientAPI, 100), + fromClientAPI: make(chan gtsmodel.FromClientAPI, 100), + // toFederator: make(chan gtsmodel.ToFederator, 100), + fromFederator: make(chan gtsmodel.FromFederator, 100), federator: federator, stop: make(chan interface{}), log: log, @@ -176,19 +178,19 @@ func NewProcessor(config *config.Config, tc typeutils.TypeConverter, federator f } } -func (p *processor) ToClientAPI() chan ToClientAPI { - return p.toClientAPI -} +// func (p *processor) ToClientAPI() chan gtsmodel.ToClientAPI { +// return p.toClientAPI +// } -func (p *processor) FromClientAPI() chan FromClientAPI { +func (p *processor) FromClientAPI() chan gtsmodel.FromClientAPI { return p.fromClientAPI } -func (p *processor) ToFederator() chan ToFederator { - return p.toFederator -} +// func (p *processor) ToFederator() chan gtsmodel.ToFederator { +// return p.toFederator +// } -func (p *processor) FromFederator() chan FromFederator { +func (p *processor) FromFederator() chan gtsmodel.FromFederator { return p.fromFederator } @@ -198,17 +200,20 @@ func (p *processor) Start() error { DistLoop: for { select { - case clientMsg := <-p.toClientAPI: - p.log.Infof("received message TO client API: %+v", clientMsg) + // case clientMsg := <-p.toClientAPI: + // p.log.Infof("received message TO client API: %+v", clientMsg) case clientMsg := <-p.fromClientAPI: p.log.Infof("received message FROM client API: %+v", clientMsg) if err := p.processFromClientAPI(clientMsg); err != nil { p.log.Error(err) } - case federatorMsg := <-p.toFederator: - p.log.Infof("received message TO federator: %+v", federatorMsg) + // case federatorMsg := <-p.toFederator: + // p.log.Infof("received message TO federator: %+v", federatorMsg) case federatorMsg := <-p.fromFederator: p.log.Infof("received message FROM federator: %+v", federatorMsg) + if err := p.processFromFederator(federatorMsg); err != nil { + p.log.Error(err) + } case <-p.stop: break DistLoop } @@ -223,82 +228,3 @@ func (p *processor) Stop() error { close(p.stop) return nil } - -// ToClientAPI wraps a message that travels from the processor into the client API -type ToClientAPI struct { - APObjectType gtsmodel.ActivityStreamsObject - APActivityType gtsmodel.ActivityStreamsActivity - Activity interface{} -} - -// FromClientAPI wraps a message that travels from client API into the processor -type FromClientAPI struct { - APObjectType gtsmodel.ActivityStreamsObject - APActivityType gtsmodel.ActivityStreamsActivity - Activity interface{} -} - -// ToFederator wraps a message that travels from the processor into the federator -type ToFederator struct { - APObjectType gtsmodel.ActivityStreamsObject - APActivityType gtsmodel.ActivityStreamsActivity - Activity interface{} -} - -// FromFederator wraps a message that travels from the federator into the processor -type FromFederator struct { - APObjectType gtsmodel.ActivityStreamsObject - APActivityType gtsmodel.ActivityStreamsActivity - Activity interface{} -} - -func (p *processor) processFromClientAPI(clientMsg FromClientAPI) error { - switch clientMsg.APObjectType { - case gtsmodel.ActivityStreamsNote: - status, ok := clientMsg.Activity.(*gtsmodel.Status) - if !ok { - return errors.New("note was not parseable as *gtsmodel.Status") - } - - if err := p.notifyStatus(status); err != nil { - return err - } - - if status.VisibilityAdvanced.Federated { - return p.federateStatus(status) - } - return nil - } - return fmt.Errorf("message type unprocessable: %+v", clientMsg) -} - -func (p *processor) federateStatus(status *gtsmodel.Status) error { - // derive the sending account -- it might be attached to the status already - sendingAcct := >smodel.Account{} - if status.GTSAccount != nil { - sendingAcct = status.GTSAccount - } else { - // it wasn't attached so get it from the db instead - if err := p.db.GetByID(status.AccountID, sendingAcct); err != nil { - return err - } - } - - outboxURI, err := url.Parse(sendingAcct.OutboxURI) - if err != nil { - return err - } - - // convert the status to AS format Note - note, err := p.tc.StatusToAS(status) - if err != nil { - return err - } - - _, err = p.federator.FederatingActor().Send(context.Background(), outboxURI, note) - return err -} - -func (p *processor) notifyStatus(status *gtsmodel.Status) error { - return nil -} |