diff options
author | 2021-06-19 11:18:55 +0200 | |
---|---|---|
committer | 2021-06-19 11:18:55 +0200 | |
commit | aa8a0d08501cbb22400a67ece85c45fdfbdc6131 (patch) | |
tree | 4a4581fb8f1c9bf8cac742be15d7a57eec170a1b /internal/processing | |
parent | update CONTRIBUTING with css bundling instructions, and go fmt (#48) (diff) | |
download | gotosocial-aa8a0d08501cbb22400a67ece85c45fdfbdc6131.tar.xz |
Streaming (#49)
Add new status and notification websocket streaming capabilities
Diffstat (limited to 'internal/processing')
-rw-r--r-- | internal/processing/fromcommon.go | 76 | ||||
-rw-r--r-- | internal/processing/processor.go | 13 | ||||
-rw-r--r-- | internal/processing/streaming.go | 14 | ||||
-rw-r--r-- | internal/processing/synchronous/streaming/authorize.go | 33 | ||||
-rw-r--r-- | internal/processing/synchronous/streaming/openstream.go | 100 | ||||
-rw-r--r-- | internal/processing/synchronous/streaming/streaming.go | 47 | ||||
-rw-r--r-- | internal/processing/synchronous/streaming/streamnotification.go | 50 | ||||
-rw-r--r-- | internal/processing/synchronous/streaming/streamstatus.go | 50 | ||||
-rw-r--r-- | internal/processing/timeline.go | 2 |
9 files changed, 381 insertions, 4 deletions
diff --git a/internal/processing/fromcommon.go b/internal/processing/fromcommon.go index 65ccef45d..e10f75441 100644 --- a/internal/processing/fromcommon.go +++ b/internal/processing/fromcommon.go @@ -96,6 +96,16 @@ func (p *processor) notifyStatus(status *gtsmodel.Status) error { if err := p.db.Put(notif); err != nil { return fmt.Errorf("notifyStatus: error putting notification in database: %s", err) } + + // now stream the notification to the user + mastoNotif, err := p.tc.NotificationToMasto(notif) + if err != nil { + return fmt.Errorf("notifyStatus: error converting notification to masto representation: %s", err) + } + + if err := p.streamingProcessor.StreamNotificationToAccount(mastoNotif, m.GTSAccount); err != nil { + return fmt.Errorf("notifyStatus: error streaming notification to account: %s", err) + } } return nil @@ -123,6 +133,16 @@ func (p *processor) notifyFollowRequest(followRequest *gtsmodel.FollowRequest, r return fmt.Errorf("notifyFollowRequest: error putting notification in database: %s", err) } + // now stream the notification to the user + mastoNotif, err := p.tc.NotificationToMasto(notif) + if err != nil { + return fmt.Errorf("notifyStatus: error converting notification to masto representation: %s", err) + } + + if err := p.streamingProcessor.StreamNotificationToAccount(mastoNotif, receivingAccount); err != nil { + return fmt.Errorf("notifyStatus: error streaming notification to account: %s", err) + } + return nil } @@ -157,6 +177,16 @@ func (p *processor) notifyFollow(follow *gtsmodel.Follow, receivingAccount *gtsm return fmt.Errorf("notifyFollow: error putting notification in database: %s", err) } + // now stream the notification to the user + mastoNotif, err := p.tc.NotificationToMasto(notif) + if err != nil { + return fmt.Errorf("notifyStatus: error converting notification to masto representation: %s", err) + } + + if err := p.streamingProcessor.StreamNotificationToAccount(mastoNotif, receivingAccount); err != nil { + return fmt.Errorf("notifyStatus: error streaming notification to account: %s", err) + } + return nil } @@ -183,6 +213,16 @@ func (p *processor) notifyFave(fave *gtsmodel.StatusFave, receivingAccount *gtsm return fmt.Errorf("notifyFave: error putting notification in database: %s", err) } + // now stream the notification to the user + mastoNotif, err := p.tc.NotificationToMasto(notif) + if err != nil { + return fmt.Errorf("notifyStatus: error converting notification to masto representation: %s", err) + } + + if err := p.streamingProcessor.StreamNotificationToAccount(mastoNotif, receivingAccount); err != nil { + return fmt.Errorf("notifyStatus: error streaming notification to account: %s", err) + } + return nil } @@ -242,6 +282,16 @@ func (p *processor) notifyAnnounce(status *gtsmodel.Status) error { return fmt.Errorf("notifyAnnounce: error putting notification in database: %s", err) } + // now stream the notification to the user + mastoNotif, err := p.tc.NotificationToMasto(notif) + if err != nil { + return fmt.Errorf("notifyStatus: error converting notification to masto representation: %s", err) + } + + if err := p.streamingProcessor.StreamNotificationToAccount(mastoNotif, boostedAcct); err != nil { + return fmt.Errorf("notifyStatus: error streaming notification to account: %s", err) + } + return nil } @@ -321,8 +371,32 @@ func (p *processor) timelineStatusForAccount(status *gtsmodel.Status, accountID return } - if err := p.timelineManager.IngestAndPrepare(status, timelineAccount.ID); err != nil { + // stick the status in the timeline for the account and then immediately prepare it so they can see it right away + inserted, err := p.timelineManager.IngestAndPrepare(status, timelineAccount.ID) + if err != nil { errors <- fmt.Errorf("timelineStatusForAccount: error ingesting status %s: %s", status.ID, err) + return + } + + // the status was inserted to stream it to the user + if inserted { + mastoStatus, err := p.tc.StatusToMasto(status, timelineAccount) + if err != nil { + errors <- fmt.Errorf("timelineStatusForAccount: error converting status %s to frontend representation: %s", status.ID, err) + } else { + if err := p.streamingProcessor.StreamStatusToAccount(mastoStatus, timelineAccount); err != nil { + errors <- fmt.Errorf("timelineStatusForAccount: error streaming status %s: %s", status.ID, err) + } + } + } + + mastoStatus, err := p.tc.StatusToMasto(status, timelineAccount) + if err != nil { + errors <- fmt.Errorf("timelineStatusForAccount: error converting status %s to frontend representation: %s", status.ID, err) + } else { + if err := p.streamingProcessor.StreamStatusToAccount(mastoStatus, timelineAccount); err != nil { + errors <- fmt.Errorf("timelineStatusForAccount: error streaming status %s: %s", status.ID, err) + } } } diff --git a/internal/processing/processor.go b/internal/processing/processor.go index 301cb5707..d1b44314d 100644 --- a/internal/processing/processor.go +++ b/internal/processing/processor.go @@ -33,6 +33,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/media" "github.com/superseriousbusiness/gotosocial/internal/oauth" "github.com/superseriousbusiness/gotosocial/internal/processing/synchronous/status" + "github.com/superseriousbusiness/gotosocial/internal/processing/synchronous/streaming" "github.com/superseriousbusiness/gotosocial/internal/timeline" "github.com/superseriousbusiness/gotosocial/internal/typeutils" "github.com/superseriousbusiness/gotosocial/internal/visibility" @@ -132,6 +133,11 @@ type Processor interface { // PublicTimelineGet returns statuses from the public/local timeline, with the given filters/parameters. PublicTimelineGet(authed *oauth.Auth, maxID string, sinceID string, minID string, limit int, local bool) ([]*apimodel.Status, gtserror.WithCode) + // AuthorizeStreamingRequest returns a gotosocial account in exchange for an access token, or an error if the given token is not valid. + AuthorizeStreamingRequest(accessToken string) (*gtsmodel.Account, error) + // OpenStreamForAccount opens a new stream for the given account, with the given stream type. + OpenStreamForAccount(account *gtsmodel.Account, streamType string) (*gtsmodel.Stream, gtserror.WithCode) + /* FEDERATION API-FACING PROCESSING FUNCTIONS These functions are intended to be called when the federating client needs an immediate (ie., synchronous) reply @@ -192,7 +198,8 @@ type processor struct { SUB-PROCESSORS */ - statusProcessor status.Processor + statusProcessor status.Processor + streamingProcessor streaming.Processor } // NewProcessor returns a new Processor that uses the given federator and logger @@ -202,6 +209,7 @@ func NewProcessor(config *config.Config, tc typeutils.TypeConverter, federator f fromFederator := make(chan gtsmodel.FromFederator, 1000) statusProcessor := status.New(db, tc, config, fromClientAPI, log) + streamingProcessor := streaming.New(db, tc, oauthServer, config, log) return &processor{ fromClientAPI: fromClientAPI, @@ -218,7 +226,8 @@ func NewProcessor(config *config.Config, tc typeutils.TypeConverter, federator f db: db, filter: visibility.NewFilter(db, log), - statusProcessor: statusProcessor, + statusProcessor: statusProcessor, + streamingProcessor: streamingProcessor, } } diff --git a/internal/processing/streaming.go b/internal/processing/streaming.go new file mode 100644 index 000000000..1e566da81 --- /dev/null +++ b/internal/processing/streaming.go @@ -0,0 +1,14 @@ +package processing + +import ( + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +) + +func (p *processor) AuthorizeStreamingRequest(accessToken string) (*gtsmodel.Account, error) { + return p.streamingProcessor.AuthorizeStreamingRequest(accessToken) +} + +func (p *processor) OpenStreamForAccount(account *gtsmodel.Account, streamType string) (*gtsmodel.Stream, gtserror.WithCode) { + return p.streamingProcessor.OpenStreamForAccount(account, streamType) +} diff --git a/internal/processing/synchronous/streaming/authorize.go b/internal/processing/synchronous/streaming/authorize.go new file mode 100644 index 000000000..8bbf1856d --- /dev/null +++ b/internal/processing/synchronous/streaming/authorize.go @@ -0,0 +1,33 @@ +package streaming + +import ( + "context" + "fmt" + + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +) + +func (p *processor) AuthorizeStreamingRequest(accessToken string) (*gtsmodel.Account, error) { + ti, err := p.oauthServer.LoadAccessToken(context.Background(), accessToken) + if err != nil { + return nil, fmt.Errorf("AuthorizeStreamingRequest: error loading access token: %s", err) + } + + uid := ti.GetUserID() + if uid == "" { + return nil, fmt.Errorf("AuthorizeStreamingRequest: no userid in token") + } + + // fetch user's and account for this user id + user := >smodel.User{} + if err := p.db.GetByID(uid, user); err != nil || user == nil { + return nil, fmt.Errorf("AuthorizeStreamingRequest: no user found for validated uid %s", uid) + } + + acct := >smodel.Account{} + if err := p.db.GetByID(user.AccountID, acct); err != nil || acct == nil { + return nil, fmt.Errorf("AuthorizeStreamingRequest: no account retrieved for user with id %s", uid) + } + + return acct, nil +} diff --git a/internal/processing/synchronous/streaming/openstream.go b/internal/processing/synchronous/streaming/openstream.go new file mode 100644 index 000000000..68446bac6 --- /dev/null +++ b/internal/processing/synchronous/streaming/openstream.go @@ -0,0 +1,100 @@ +package streaming + +import ( + "errors" + "fmt" + + "github.com/sirupsen/logrus" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/id" +) + +func (p *processor) OpenStreamForAccount(account *gtsmodel.Account, streamType string) (*gtsmodel.Stream, gtserror.WithCode) { + l := p.log.WithFields(logrus.Fields{ + "func": "OpenStreamForAccount", + "account": account.ID, + "streamType": streamType, + }) + l.Debug("received open stream request") + + // each stream needs a unique ID so we know to close it + streamID, err := id.NewRandomULID() + if err != nil { + return nil, gtserror.NewErrorInternalError(fmt.Errorf("error generating stream id: %s", err)) + } + + thisStream := >smodel.Stream{ + ID: streamID, + Type: streamType, + Messages: make(chan *gtsmodel.Message, 100), + Hangup: make(chan interface{}, 1), + Connected: true, + } + go p.waitToCloseStream(account, thisStream) + + v, ok := p.streamMap.Load(account.ID) + if !ok || v == nil { + // there is no entry in the streamMap for this account yet, so make one and store it + streamsForAccount := >smodel.StreamsForAccount{ + Streams: []*gtsmodel.Stream{ + thisStream, + }, + } + p.streamMap.Store(account.ID, streamsForAccount) + } else { + // there is an entry in the streamMap for this account + // parse the interface as a streamsForAccount + streamsForAccount, ok := v.(*gtsmodel.StreamsForAccount) + if !ok { + return nil, gtserror.NewErrorInternalError(errors.New("stream map error")) + } + + // append this stream to it + streamsForAccount.Lock() + streamsForAccount.Streams = append(streamsForAccount.Streams, thisStream) + streamsForAccount.Unlock() + } + + return thisStream, nil +} + +// waitToCloseStream waits until the hangup channel is closed for the given stream. +// It then iterates through the map of streams stored by the processor, removes the stream from it, +// and then closes the messages channel of the stream to indicate that the channel should no longer be read from. +func (p *processor) waitToCloseStream(account *gtsmodel.Account, thisStream *gtsmodel.Stream) { + <-thisStream.Hangup // wait for a hangup message + + // lock the stream to prevent more messages being put in it while we work + thisStream.Lock() + defer thisStream.Unlock() + + // indicate the stream is no longer connected + thisStream.Connected = false + + // load and parse the entry for this account from the stream map + v, ok := p.streamMap.Load(account.ID) + if !ok || v == nil { + return + } + streamsForAccount, ok := v.(*gtsmodel.StreamsForAccount) + if !ok { + return + } + + // lock the streams for account while we remove this stream from its slice + streamsForAccount.Lock() + defer streamsForAccount.Unlock() + + // put everything into modified streams *except* the stream we're removing + modifiedStreams := []*gtsmodel.Stream{} + for _, s := range streamsForAccount.Streams { + if s.ID != thisStream.ID { + modifiedStreams = append(modifiedStreams, s) + } + } + streamsForAccount.Streams = modifiedStreams + + // finally close the messages channel so no more messages can be read from it + close(thisStream.Messages) +} diff --git a/internal/processing/synchronous/streaming/streaming.go b/internal/processing/synchronous/streaming/streaming.go new file mode 100644 index 000000000..9fd628547 --- /dev/null +++ b/internal/processing/synchronous/streaming/streaming.go @@ -0,0 +1,47 @@ +package streaming + +import ( + "sync" + + "github.com/sirupsen/logrus" + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/config" + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/oauth" + "github.com/superseriousbusiness/gotosocial/internal/typeutils" + "github.com/superseriousbusiness/gotosocial/internal/visibility" +) + +// Processor wraps a bunch of functions for processing streaming. +type Processor interface { + // AuthorizeStreamingRequest returns an oauth2 token info in response to an access token query from the streaming API + AuthorizeStreamingRequest(accessToken string) (*gtsmodel.Account, error) + OpenStreamForAccount(account *gtsmodel.Account, streamType string) (*gtsmodel.Stream, gtserror.WithCode) + StreamStatusToAccount(s *apimodel.Status, account *gtsmodel.Account) error + StreamNotificationToAccount(n *apimodel.Notification, account *gtsmodel.Account) error +} + +type processor struct { + tc typeutils.TypeConverter + config *config.Config + db db.DB + filter visibility.Filter + log *logrus.Logger + oauthServer oauth.Server + streamMap *sync.Map +} + +// New returns a new status processor. +func New(db db.DB, tc typeutils.TypeConverter, oauthServer oauth.Server, config *config.Config, log *logrus.Logger) Processor { + return &processor{ + tc: tc, + config: config, + db: db, + filter: visibility.NewFilter(db, log), + log: log, + oauthServer: oauthServer, + streamMap: &sync.Map{}, + } +} diff --git a/internal/processing/synchronous/streaming/streamnotification.go b/internal/processing/synchronous/streaming/streamnotification.go new file mode 100644 index 000000000..24c8342ee --- /dev/null +++ b/internal/processing/synchronous/streaming/streamnotification.go @@ -0,0 +1,50 @@ +package streaming + +import ( + "encoding/json" + "errors" + "fmt" + + "github.com/sirupsen/logrus" + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +) + +func (p *processor) StreamNotificationToAccount(n *apimodel.Notification, account *gtsmodel.Account) error { + l := p.log.WithFields(logrus.Fields{ + "func": "StreamNotificationToAccount", + "account": account.ID, + }) + v, ok := p.streamMap.Load(account.ID) + if !ok { + // no open connections so nothing to stream + return nil + } + + streamsForAccount, ok := v.(*gtsmodel.StreamsForAccount) + if !ok { + return errors.New("stream map error") + } + + notificationBytes, err := json.Marshal(n) + if err != nil { + return fmt.Errorf("error marshalling notification to json: %s", err) + } + + streamsForAccount.Lock() + defer streamsForAccount.Unlock() + for _, stream := range streamsForAccount.Streams { + stream.Lock() + defer stream.Unlock() + if stream.Connected { + l.Debugf("streaming notification to stream id %s", stream.ID) + stream.Messages <- >smodel.Message{ + Stream: []string{stream.Type}, + Event: "notification", + Payload: string(notificationBytes), + } + } + } + + return nil +} diff --git a/internal/processing/synchronous/streaming/streamstatus.go b/internal/processing/synchronous/streaming/streamstatus.go new file mode 100644 index 000000000..8d026252d --- /dev/null +++ b/internal/processing/synchronous/streaming/streamstatus.go @@ -0,0 +1,50 @@ +package streaming + +import ( + "encoding/json" + "errors" + "fmt" + + "github.com/sirupsen/logrus" + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +) + +func (p *processor) StreamStatusToAccount(s *apimodel.Status, account *gtsmodel.Account) error { + l := p.log.WithFields(logrus.Fields{ + "func": "StreamStatusForAccount", + "account": account.ID, + }) + v, ok := p.streamMap.Load(account.ID) + if !ok { + // no open connections so nothing to stream + return nil + } + + streamsForAccount, ok := v.(*gtsmodel.StreamsForAccount) + if !ok { + return errors.New("stream map error") + } + + statusBytes, err := json.Marshal(s) + if err != nil { + return fmt.Errorf("error marshalling status to json: %s", err) + } + + streamsForAccount.Lock() + defer streamsForAccount.Unlock() + for _, stream := range streamsForAccount.Streams { + stream.Lock() + defer stream.Unlock() + if stream.Connected { + l.Debugf("streaming status to stream id %s", stream.ID) + stream.Messages <- >smodel.Message{ + Stream: []string{stream.Type}, + Event: "update", + Payload: string(statusBytes), + } + } + } + + return nil +} diff --git a/internal/processing/timeline.go b/internal/processing/timeline.go index a8f42d64c..8f6b1d26b 100644 --- a/internal/processing/timeline.go +++ b/internal/processing/timeline.go @@ -201,7 +201,7 @@ func (p *processor) indexAndIngest(statuses []*gtsmodel.Status, timelineAccount continue } if timelineable { - if err := p.timelineManager.Ingest(s, timelineAccount.ID); err != nil { + if _, err := p.timelineManager.Ingest(s, timelineAccount.ID); err != nil { l.Error(fmt.Errorf("initTimelineFor: error ingesting status %s: %s", s.ID, err)) continue } |