diff options
Diffstat (limited to 'internal/processing/synchronous/streaming')
6 files changed, 0 insertions, 336 deletions
diff --git a/internal/processing/synchronous/streaming/authorize.go b/internal/processing/synchronous/streaming/authorize.go deleted file mode 100644 index 8bbf1856d..000000000 --- a/internal/processing/synchronous/streaming/authorize.go +++ /dev/null @@ -1,33 +0,0 @@ -package streaming - -import ( - "context" - "fmt" - - "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" -) - -func (p *processor) AuthorizeStreamingRequest(accessToken string) (*gtsmodel.Account, error) { - ti, err := p.oauthServer.LoadAccessToken(context.Background(), accessToken) - if err != nil { - return nil, fmt.Errorf("AuthorizeStreamingRequest: error loading access token: %s", err) - } - - uid := ti.GetUserID() - if uid == "" { - return nil, fmt.Errorf("AuthorizeStreamingRequest: no userid in token") - } - - // fetch user's and account for this user id - user := >smodel.User{} - if err := p.db.GetByID(uid, user); err != nil || user == nil { - return nil, fmt.Errorf("AuthorizeStreamingRequest: no user found for validated uid %s", uid) - } - - acct := >smodel.Account{} - if err := p.db.GetByID(user.AccountID, acct); err != nil || acct == nil { - return nil, fmt.Errorf("AuthorizeStreamingRequest: no account retrieved for user with id %s", uid) - } - - return acct, nil -} diff --git a/internal/processing/synchronous/streaming/openstream.go b/internal/processing/synchronous/streaming/openstream.go deleted file mode 100644 index 68446bac6..000000000 --- a/internal/processing/synchronous/streaming/openstream.go +++ /dev/null @@ -1,100 +0,0 @@ -package streaming - -import ( - "errors" - "fmt" - - "github.com/sirupsen/logrus" - "github.com/superseriousbusiness/gotosocial/internal/gtserror" - "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" - "github.com/superseriousbusiness/gotosocial/internal/id" -) - -func (p *processor) OpenStreamForAccount(account *gtsmodel.Account, streamType string) (*gtsmodel.Stream, gtserror.WithCode) { - l := p.log.WithFields(logrus.Fields{ - "func": "OpenStreamForAccount", - "account": account.ID, - "streamType": streamType, - }) - l.Debug("received open stream request") - - // each stream needs a unique ID so we know to close it - streamID, err := id.NewRandomULID() - if err != nil { - return nil, gtserror.NewErrorInternalError(fmt.Errorf("error generating stream id: %s", err)) - } - - thisStream := >smodel.Stream{ - ID: streamID, - Type: streamType, - Messages: make(chan *gtsmodel.Message, 100), - Hangup: make(chan interface{}, 1), - Connected: true, - } - go p.waitToCloseStream(account, thisStream) - - v, ok := p.streamMap.Load(account.ID) - if !ok || v == nil { - // there is no entry in the streamMap for this account yet, so make one and store it - streamsForAccount := >smodel.StreamsForAccount{ - Streams: []*gtsmodel.Stream{ - thisStream, - }, - } - p.streamMap.Store(account.ID, streamsForAccount) - } else { - // there is an entry in the streamMap for this account - // parse the interface as a streamsForAccount - streamsForAccount, ok := v.(*gtsmodel.StreamsForAccount) - if !ok { - return nil, gtserror.NewErrorInternalError(errors.New("stream map error")) - } - - // append this stream to it - streamsForAccount.Lock() - streamsForAccount.Streams = append(streamsForAccount.Streams, thisStream) - streamsForAccount.Unlock() - } - - return thisStream, nil -} - -// waitToCloseStream waits until the hangup channel is closed for the given stream. -// It then iterates through the map of streams stored by the processor, removes the stream from it, -// and then closes the messages channel of the stream to indicate that the channel should no longer be read from. -func (p *processor) waitToCloseStream(account *gtsmodel.Account, thisStream *gtsmodel.Stream) { - <-thisStream.Hangup // wait for a hangup message - - // lock the stream to prevent more messages being put in it while we work - thisStream.Lock() - defer thisStream.Unlock() - - // indicate the stream is no longer connected - thisStream.Connected = false - - // load and parse the entry for this account from the stream map - v, ok := p.streamMap.Load(account.ID) - if !ok || v == nil { - return - } - streamsForAccount, ok := v.(*gtsmodel.StreamsForAccount) - if !ok { - return - } - - // lock the streams for account while we remove this stream from its slice - streamsForAccount.Lock() - defer streamsForAccount.Unlock() - - // put everything into modified streams *except* the stream we're removing - modifiedStreams := []*gtsmodel.Stream{} - for _, s := range streamsForAccount.Streams { - if s.ID != thisStream.ID { - modifiedStreams = append(modifiedStreams, s) - } - } - streamsForAccount.Streams = modifiedStreams - - // finally close the messages channel so no more messages can be read from it - close(thisStream.Messages) -} diff --git a/internal/processing/synchronous/streaming/streamdelete.go b/internal/processing/synchronous/streaming/streamdelete.go deleted file mode 100644 index 2282c29ae..000000000 --- a/internal/processing/synchronous/streaming/streamdelete.go +++ /dev/null @@ -1,51 +0,0 @@ -package streaming - -import ( - "fmt" - "strings" - - "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" -) - -func (p *processor) StreamDelete(statusID string) error { - errs := []string{} - - // we want to range through ALL streams for ALL accounts here to make sure it's very clear to everyone that the status has been deleted - p.streamMap.Range(func(k interface{}, v interface{}) bool { - // the key of this map should be an accountID (string) - accountID, ok := k.(string) - if !ok { - errs = append(errs, "key in streamMap was not a string!") - return false - } - - // the value of the map should be a buncha streams - streamsForAccount, ok := v.(*gtsmodel.StreamsForAccount) - if !ok { - errs = append(errs, fmt.Sprintf("stream map error for account stream %s", accountID)) - } - - // lock the streams while we work on them - streamsForAccount.Lock() - defer streamsForAccount.Unlock() - for _, stream := range streamsForAccount.Streams { - // lock each individual stream as we work on it - stream.Lock() - defer stream.Unlock() - if stream.Connected { - stream.Messages <- >smodel.Message{ - Stream: []string{stream.Type}, - Event: "delete", - Payload: statusID, - } - } - } - return true - }) - - if len(errs) != 0 { - return fmt.Errorf("one or more errors streaming status delete: %s", strings.Join(errs, ";")) - } - - return nil -} diff --git a/internal/processing/synchronous/streaming/streaming.go b/internal/processing/synchronous/streaming/streaming.go deleted file mode 100644 index de75b8f27..000000000 --- a/internal/processing/synchronous/streaming/streaming.go +++ /dev/null @@ -1,52 +0,0 @@ -package streaming - -import ( - "sync" - - "github.com/sirupsen/logrus" - apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" - "github.com/superseriousbusiness/gotosocial/internal/config" - "github.com/superseriousbusiness/gotosocial/internal/db" - "github.com/superseriousbusiness/gotosocial/internal/gtserror" - "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" - "github.com/superseriousbusiness/gotosocial/internal/oauth" - "github.com/superseriousbusiness/gotosocial/internal/typeutils" - "github.com/superseriousbusiness/gotosocial/internal/visibility" -) - -// Processor wraps a bunch of functions for processing streaming. -type Processor interface { - // AuthorizeStreamingRequest returns an oauth2 token info in response to an access token query from the streaming API - AuthorizeStreamingRequest(accessToken string) (*gtsmodel.Account, error) - // OpenStreamForAccount returns a new Stream for the given account, which will contain a channel for passing messages back to the caller. - OpenStreamForAccount(account *gtsmodel.Account, streamType string) (*gtsmodel.Stream, gtserror.WithCode) - // StreamStatusToAccount streams the given status to any open, appropriate streams belonging to the given account. - StreamStatusToAccount(s *apimodel.Status, account *gtsmodel.Account) error - // StreamNotificationToAccount streams the given notification to any open, appropriate streams belonging to the given account. - StreamNotificationToAccount(n *apimodel.Notification, account *gtsmodel.Account) error - // StreamDelete streams the delete of the given statusID to *ALL* open streams. - StreamDelete(statusID string) error -} - -type processor struct { - tc typeutils.TypeConverter - config *config.Config - db db.DB - filter visibility.Filter - log *logrus.Logger - oauthServer oauth.Server - streamMap *sync.Map -} - -// New returns a new status processor. -func New(db db.DB, tc typeutils.TypeConverter, oauthServer oauth.Server, config *config.Config, log *logrus.Logger) Processor { - return &processor{ - tc: tc, - config: config, - db: db, - filter: visibility.NewFilter(db, log), - log: log, - oauthServer: oauthServer, - streamMap: &sync.Map{}, - } -} diff --git a/internal/processing/synchronous/streaming/streamnotification.go b/internal/processing/synchronous/streaming/streamnotification.go deleted file mode 100644 index 24c8342ee..000000000 --- a/internal/processing/synchronous/streaming/streamnotification.go +++ /dev/null @@ -1,50 +0,0 @@ -package streaming - -import ( - "encoding/json" - "errors" - "fmt" - - "github.com/sirupsen/logrus" - apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" - "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" -) - -func (p *processor) StreamNotificationToAccount(n *apimodel.Notification, account *gtsmodel.Account) error { - l := p.log.WithFields(logrus.Fields{ - "func": "StreamNotificationToAccount", - "account": account.ID, - }) - v, ok := p.streamMap.Load(account.ID) - if !ok { - // no open connections so nothing to stream - return nil - } - - streamsForAccount, ok := v.(*gtsmodel.StreamsForAccount) - if !ok { - return errors.New("stream map error") - } - - notificationBytes, err := json.Marshal(n) - if err != nil { - return fmt.Errorf("error marshalling notification to json: %s", err) - } - - streamsForAccount.Lock() - defer streamsForAccount.Unlock() - for _, stream := range streamsForAccount.Streams { - stream.Lock() - defer stream.Unlock() - if stream.Connected { - l.Debugf("streaming notification to stream id %s", stream.ID) - stream.Messages <- >smodel.Message{ - Stream: []string{stream.Type}, - Event: "notification", - Payload: string(notificationBytes), - } - } - } - - return nil -} diff --git a/internal/processing/synchronous/streaming/streamstatus.go b/internal/processing/synchronous/streaming/streamstatus.go deleted file mode 100644 index 8d026252d..000000000 --- a/internal/processing/synchronous/streaming/streamstatus.go +++ /dev/null @@ -1,50 +0,0 @@ -package streaming - -import ( - "encoding/json" - "errors" - "fmt" - - "github.com/sirupsen/logrus" - apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" - "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" -) - -func (p *processor) StreamStatusToAccount(s *apimodel.Status, account *gtsmodel.Account) error { - l := p.log.WithFields(logrus.Fields{ - "func": "StreamStatusForAccount", - "account": account.ID, - }) - v, ok := p.streamMap.Load(account.ID) - if !ok { - // no open connections so nothing to stream - return nil - } - - streamsForAccount, ok := v.(*gtsmodel.StreamsForAccount) - if !ok { - return errors.New("stream map error") - } - - statusBytes, err := json.Marshal(s) - if err != nil { - return fmt.Errorf("error marshalling status to json: %s", err) - } - - streamsForAccount.Lock() - defer streamsForAccount.Unlock() - for _, stream := range streamsForAccount.Streams { - stream.Lock() - defer stream.Unlock() - if stream.Connected { - l.Debugf("streaming status to stream id %s", stream.ID) - stream.Messages <- >smodel.Message{ - Stream: []string{stream.Type}, - Event: "update", - Payload: string(statusBytes), - } - } - } - - return nil -} |