summaryrefslogtreecommitdiff
path: root/internal/processing/streaming/streaming.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/processing/streaming/streaming.go')
-rw-r--r--internal/processing/streaming/streaming.go52
1 files changed, 52 insertions, 0 deletions
diff --git a/internal/processing/streaming/streaming.go b/internal/processing/streaming/streaming.go
new file mode 100644
index 000000000..de75b8f27
--- /dev/null
+++ b/internal/processing/streaming/streaming.go
@@ -0,0 +1,52 @@
+package streaming
+
+import (
+ "sync"
+
+ "github.com/sirupsen/logrus"
+ apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
+ "github.com/superseriousbusiness/gotosocial/internal/config"
+ "github.com/superseriousbusiness/gotosocial/internal/db"
+ "github.com/superseriousbusiness/gotosocial/internal/gtserror"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/oauth"
+ "github.com/superseriousbusiness/gotosocial/internal/typeutils"
+ "github.com/superseriousbusiness/gotosocial/internal/visibility"
+)
+
+// Processor wraps a bunch of functions for processing streaming.
+type Processor interface {
+ // AuthorizeStreamingRequest returns an oauth2 token info in response to an access token query from the streaming API
+ AuthorizeStreamingRequest(accessToken string) (*gtsmodel.Account, error)
+ // OpenStreamForAccount returns a new Stream for the given account, which will contain a channel for passing messages back to the caller.
+ OpenStreamForAccount(account *gtsmodel.Account, streamType string) (*gtsmodel.Stream, gtserror.WithCode)
+ // StreamStatusToAccount streams the given status to any open, appropriate streams belonging to the given account.
+ StreamStatusToAccount(s *apimodel.Status, account *gtsmodel.Account) error
+ // StreamNotificationToAccount streams the given notification to any open, appropriate streams belonging to the given account.
+ StreamNotificationToAccount(n *apimodel.Notification, account *gtsmodel.Account) error
+ // StreamDelete streams the delete of the given statusID to *ALL* open streams.
+ StreamDelete(statusID string) error
+}
+
+type processor struct {
+ tc typeutils.TypeConverter
+ config *config.Config
+ db db.DB
+ filter visibility.Filter
+ log *logrus.Logger
+ oauthServer oauth.Server
+ streamMap *sync.Map
+}
+
+// New returns a new status processor.
+func New(db db.DB, tc typeutils.TypeConverter, oauthServer oauth.Server, config *config.Config, log *logrus.Logger) Processor {
+ return &processor{
+ tc: tc,
+ config: config,
+ db: db,
+ filter: visibility.NewFilter(db, log),
+ log: log,
+ oauthServer: oauthServer,
+ streamMap: &sync.Map{},
+ }
+}