summaryrefslogtreecommitdiff
path: root/internal/message/processor.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/message/processor.go')
-rw-r--r--internal/message/processor.go215
1 files changed, 215 insertions, 0 deletions
diff --git a/internal/message/processor.go b/internal/message/processor.go
new file mode 100644
index 000000000..d0027c915
--- /dev/null
+++ b/internal/message/processor.go
@@ -0,0 +1,215 @@
+/*
+ GoToSocial
+ Copyright (C) 2021 GoToSocial Authors admin@gotosocial.org
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+package message
+
+import (
+ "net/http"
+
+ "github.com/sirupsen/logrus"
+ apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
+ "github.com/superseriousbusiness/gotosocial/internal/config"
+ "github.com/superseriousbusiness/gotosocial/internal/db"
+ "github.com/superseriousbusiness/gotosocial/internal/federation"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/media"
+ "github.com/superseriousbusiness/gotosocial/internal/oauth"
+ "github.com/superseriousbusiness/gotosocial/internal/storage"
+ "github.com/superseriousbusiness/gotosocial/internal/typeutils"
+)
+
+// Processor should be passed to api modules (see internal/apimodule/...). It is used for
+// passing messages back and forth from the client API and the federating interface, via channels.
+// It also contains logic for filtering which messages should end up where.
+// It is designed to be used asynchronously: the client API and the federating API should just be able to
+// fire messages into the processor and not wait for a reply before proceeding with other work. This allows
+// for clean distribution of messages without slowing down the client API and harming the user experience.
+type Processor interface {
+ // ToClientAPI returns a channel for putting in messages that need to go to the gts client API.
+ ToClientAPI() chan ToClientAPI
+ // FromClientAPI returns a channel for putting messages in that come from the client api going to the processor
+ FromClientAPI() chan FromClientAPI
+ // ToFederator returns a channel for putting in messages that need to go to the federator (activitypub).
+ ToFederator() chan ToFederator
+ // FromFederator returns a channel for putting messages in that come from the federator (activitypub) going into the processor
+ FromFederator() chan FromFederator
+ // Start starts the Processor, reading from its channels and passing messages back and forth.
+ Start() error
+ // Stop stops the processor cleanly, finishing handling any remaining messages before closing down.
+ Stop() error
+
+ /*
+ CLIENT API-FACING PROCESSING FUNCTIONS
+ These functions are intended to be called when the API client needs an immediate (ie., synchronous) reply
+ to an HTTP request. As such, they will only do the bare-minimum of work necessary to give a properly
+ formed reply. For more intensive (and time-consuming) calls, where you don't require an immediate
+ response, pass work to the processor using a channel instead.
+ */
+
+ // AccountCreate processes the given form for creating a new account, returning an oauth token for that account if successful.
+ AccountCreate(authed *oauth.Auth, form *apimodel.AccountCreateRequest) (*apimodel.Token, error)
+ // AccountGet processes the given request for account information.
+ AccountGet(authed *oauth.Auth, targetAccountID string) (*apimodel.Account, error)
+ // AccountUpdate processes the update of an account with the given form
+ AccountUpdate(authed *oauth.Auth, form *apimodel.UpdateCredentialsRequest) (*apimodel.Account, error)
+
+ // AppCreate processes the creation of a new API application
+ AppCreate(authed *oauth.Auth, form *apimodel.ApplicationCreateRequest) (*apimodel.Application, error)
+
+ // StatusCreate processes the given form to create a new status, returning the api model representation of that status if it's OK.
+ StatusCreate(authed *oauth.Auth, form *apimodel.AdvancedStatusCreateForm) (*apimodel.Status, error)
+ // StatusDelete processes the delete of a given status, returning the deleted status if the delete goes through.
+ StatusDelete(authed *oauth.Auth, targetStatusID string) (*apimodel.Status, error)
+ // StatusFave processes the faving of a given status, returning the updated status if the fave goes through.
+ StatusFave(authed *oauth.Auth, targetStatusID string) (*apimodel.Status, error)
+ // StatusFavedBy returns a slice of accounts that have liked the given status, filtered according to privacy settings.
+ StatusFavedBy(authed *oauth.Auth, targetStatusID string) ([]*apimodel.Account, error)
+ // StatusGet gets the given status, taking account of privacy settings and blocks etc.
+ StatusGet(authed *oauth.Auth, targetStatusID string) (*apimodel.Status, error)
+ // StatusUnfave processes the unfaving of a given status, returning the updated status if the fave goes through.
+ StatusUnfave(authed *oauth.Auth, targetStatusID string) (*apimodel.Status, error)
+
+ // MediaCreate handles the creation of a media attachment, using the given form.
+ MediaCreate(authed *oauth.Auth, form *apimodel.AttachmentRequest) (*apimodel.Attachment, error)
+ // MediaGet handles the fetching of a media attachment, using the given request form.
+ MediaGet(authed *oauth.Auth, form *apimodel.GetContentRequestForm) (*apimodel.Content, error)
+ // AdminEmojiCreate handles the creation of a new instance emoji by an admin, using the given form.
+ AdminEmojiCreate(authed *oauth.Auth, form *apimodel.EmojiCreateRequest) (*apimodel.Emoji, error)
+
+ /*
+ FEDERATION API-FACING PROCESSING FUNCTIONS
+ These functions are intended to be called when the federating client needs an immediate (ie., synchronous) reply
+ to an HTTP request. As such, they will only do the bare-minimum of work necessary to give a properly
+ formed reply. For more intensive (and time-consuming) calls, where you don't require an immediate
+ response, pass work to the processor using a channel instead.
+ */
+
+ // GetFediUser handles the getting of a fedi/activitypub representation of a user/account, performing appropriate authentication
+ // before returning a JSON serializable interface to the caller.
+ GetFediUser(requestedUsername string, request *http.Request) (interface{}, ErrorWithCode)
+}
+
+// processor just implements the Processor interface
+type processor struct {
+ // federator pub.FederatingActor
+ toClientAPI chan ToClientAPI
+ fromClientAPI chan FromClientAPI
+ toFederator chan ToFederator
+ fromFederator chan FromFederator
+ federator federation.Federator
+ stop chan interface{}
+ log *logrus.Logger
+ config *config.Config
+ tc typeutils.TypeConverter
+ oauthServer oauth.Server
+ mediaHandler media.Handler
+ storage storage.Storage
+ db db.DB
+}
+
+// NewProcessor returns a new Processor that uses the given federator and logger
+func NewProcessor(config *config.Config, tc typeutils.TypeConverter, federator federation.Federator, oauthServer oauth.Server, mediaHandler media.Handler, storage storage.Storage, db db.DB, log *logrus.Logger) Processor {
+ return &processor{
+ toClientAPI: make(chan ToClientAPI, 100),
+ fromClientAPI: make(chan FromClientAPI, 100),
+ toFederator: make(chan ToFederator, 100),
+ fromFederator: make(chan FromFederator, 100),
+ federator: federator,
+ stop: make(chan interface{}),
+ log: log,
+ config: config,
+ tc: tc,
+ oauthServer: oauthServer,
+ mediaHandler: mediaHandler,
+ storage: storage,
+ db: db,
+ }
+}
+
+func (p *processor) ToClientAPI() chan ToClientAPI {
+ return p.toClientAPI
+}
+
+func (p *processor) FromClientAPI() chan FromClientAPI {
+ return p.fromClientAPI
+}
+
+func (p *processor) ToFederator() chan ToFederator {
+ return p.toFederator
+}
+
+func (p *processor) FromFederator() chan FromFederator {
+ return p.fromFederator
+}
+
+// Start starts the Processor, reading from its channels and passing messages back and forth.
+func (p *processor) Start() error {
+ go func() {
+ DistLoop:
+ for {
+ select {
+ case clientMsg := <-p.toClientAPI:
+ p.log.Infof("received message TO client API: %+v", clientMsg)
+ case clientMsg := <-p.fromClientAPI:
+ p.log.Infof("received message FROM client API: %+v", clientMsg)
+ case federatorMsg := <-p.toFederator:
+ p.log.Infof("received message TO federator: %+v", federatorMsg)
+ case federatorMsg := <-p.fromFederator:
+ p.log.Infof("received message FROM federator: %+v", federatorMsg)
+ case <-p.stop:
+ break DistLoop
+ }
+ }
+ }()
+ return nil
+}
+
+// Stop stops the processor cleanly, finishing handling any remaining messages before closing down.
+// TODO: empty message buffer properly before stopping otherwise we'll lose federating messages.
+func (p *processor) Stop() error {
+ close(p.stop)
+ return nil
+}
+
+// ToClientAPI wraps a message that travels from the processor into the client API
+type ToClientAPI struct {
+ APObjectType gtsmodel.ActivityStreamsObject
+ APActivityType gtsmodel.ActivityStreamsActivity
+ Activity interface{}
+}
+
+// FromClientAPI wraps a message that travels from client API into the processor
+type FromClientAPI struct {
+ APObjectType gtsmodel.ActivityStreamsObject
+ APActivityType gtsmodel.ActivityStreamsActivity
+ Activity interface{}
+}
+
+// ToFederator wraps a message that travels from the processor into the federator
+type ToFederator struct {
+ APObjectType gtsmodel.ActivityStreamsObject
+ APActivityType gtsmodel.ActivityStreamsActivity
+ Activity interface{}
+}
+
+// FromFederator wraps a message that travels from the federator into the processor
+type FromFederator struct {
+ APObjectType gtsmodel.ActivityStreamsObject
+ APActivityType gtsmodel.ActivityStreamsActivity
+ Activity interface{}
+}