diff options
Diffstat (limited to 'internal/processing/streaming/streaming.go')
-rw-r--r-- | internal/processing/streaming/streaming.go | 52 |
1 files changed, 52 insertions, 0 deletions
diff --git a/internal/processing/streaming/streaming.go b/internal/processing/streaming/streaming.go new file mode 100644 index 000000000..de75b8f27 --- /dev/null +++ b/internal/processing/streaming/streaming.go @@ -0,0 +1,52 @@ +package streaming + +import ( + "sync" + + "github.com/sirupsen/logrus" + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/config" + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/oauth" + "github.com/superseriousbusiness/gotosocial/internal/typeutils" + "github.com/superseriousbusiness/gotosocial/internal/visibility" +) + +// Processor wraps a bunch of functions for processing streaming. +type Processor interface { + // AuthorizeStreamingRequest returns an oauth2 token info in response to an access token query from the streaming API + AuthorizeStreamingRequest(accessToken string) (*gtsmodel.Account, error) + // OpenStreamForAccount returns a new Stream for the given account, which will contain a channel for passing messages back to the caller. + OpenStreamForAccount(account *gtsmodel.Account, streamType string) (*gtsmodel.Stream, gtserror.WithCode) + // StreamStatusToAccount streams the given status to any open, appropriate streams belonging to the given account. + StreamStatusToAccount(s *apimodel.Status, account *gtsmodel.Account) error + // StreamNotificationToAccount streams the given notification to any open, appropriate streams belonging to the given account. + StreamNotificationToAccount(n *apimodel.Notification, account *gtsmodel.Account) error + // StreamDelete streams the delete of the given statusID to *ALL* open streams. + StreamDelete(statusID string) error +} + +type processor struct { + tc typeutils.TypeConverter + config *config.Config + db db.DB + filter visibility.Filter + log *logrus.Logger + oauthServer oauth.Server + streamMap *sync.Map +} + +// New returns a new status processor. +func New(db db.DB, tc typeutils.TypeConverter, oauthServer oauth.Server, config *config.Config, log *logrus.Logger) Processor { + return &processor{ + tc: tc, + config: config, + db: db, + filter: visibility.NewFilter(db, log), + log: log, + oauthServer: oauthServer, + streamMap: &sync.Map{}, + } +} |