diff options
Diffstat (limited to 'internal/processing/streaming')
-rw-r--r-- | internal/processing/streaming/openstream.go | 19 | ||||
-rw-r--r-- | internal/processing/streaming/streamdelete.go | 16 | ||||
-rw-r--r-- | internal/processing/streaming/streaming.go | 3 | ||||
-rw-r--r-- | internal/processing/streaming/streamnotification.go | 17 | ||||
-rw-r--r-- | internal/processing/streaming/streamstatus.go | 17 |
5 files changed, 38 insertions, 34 deletions
diff --git a/internal/processing/streaming/openstream.go b/internal/processing/streaming/openstream.go index dfad5398e..d4e4eef9f 100644 --- a/internal/processing/streaming/openstream.go +++ b/internal/processing/streaming/openstream.go @@ -9,9 +9,10 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/id" + "github.com/superseriousbusiness/gotosocial/internal/stream" ) -func (p *processor) OpenStreamForAccount(ctx context.Context, account *gtsmodel.Account, streamType string) (*gtsmodel.Stream, gtserror.WithCode) { +func (p *processor) OpenStreamForAccount(ctx context.Context, account *gtsmodel.Account, streamType string) (*stream.Stream, gtserror.WithCode) { l := p.log.WithFields(logrus.Fields{ "func": "OpenStreamForAccount", "account": account.ID, @@ -25,10 +26,10 @@ func (p *processor) OpenStreamForAccount(ctx context.Context, account *gtsmodel. return nil, gtserror.NewErrorInternalError(fmt.Errorf("error generating stream id: %s", err)) } - thisStream := >smodel.Stream{ + thisStream := &stream.Stream{ ID: streamID, Type: streamType, - Messages: make(chan *gtsmodel.Message, 100), + Messages: make(chan *stream.Message, 100), Hangup: make(chan interface{}, 1), Connected: true, } @@ -37,8 +38,8 @@ func (p *processor) OpenStreamForAccount(ctx context.Context, account *gtsmodel. v, ok := p.streamMap.Load(account.ID) if !ok || v == nil { // there is no entry in the streamMap for this account yet, so make one and store it - streamsForAccount := >smodel.StreamsForAccount{ - Streams: []*gtsmodel.Stream{ + streamsForAccount := &stream.StreamsForAccount{ + Streams: []*stream.Stream{ thisStream, }, } @@ -46,7 +47,7 @@ func (p *processor) OpenStreamForAccount(ctx context.Context, account *gtsmodel. } else { // there is an entry in the streamMap for this account // parse the interface as a streamsForAccount - streamsForAccount, ok := v.(*gtsmodel.StreamsForAccount) + streamsForAccount, ok := v.(*stream.StreamsForAccount) if !ok { return nil, gtserror.NewErrorInternalError(errors.New("stream map error")) } @@ -63,7 +64,7 @@ func (p *processor) OpenStreamForAccount(ctx context.Context, account *gtsmodel. // waitToCloseStream waits until the hangup channel is closed for the given stream. // It then iterates through the map of streams stored by the processor, removes the stream from it, // and then closes the messages channel of the stream to indicate that the channel should no longer be read from. -func (p *processor) waitToCloseStream(account *gtsmodel.Account, thisStream *gtsmodel.Stream) { +func (p *processor) waitToCloseStream(account *gtsmodel.Account, thisStream *stream.Stream) { <-thisStream.Hangup // wait for a hangup message // lock the stream to prevent more messages being put in it while we work @@ -78,7 +79,7 @@ func (p *processor) waitToCloseStream(account *gtsmodel.Account, thisStream *gts if !ok || v == nil { return } - streamsForAccount, ok := v.(*gtsmodel.StreamsForAccount) + streamsForAccount, ok := v.(*stream.StreamsForAccount) if !ok { return } @@ -88,7 +89,7 @@ func (p *processor) waitToCloseStream(account *gtsmodel.Account, thisStream *gts defer streamsForAccount.Unlock() // put everything into modified streams *except* the stream we're removing - modifiedStreams := []*gtsmodel.Stream{} + modifiedStreams := []*stream.Stream{} for _, s := range streamsForAccount.Streams { if s.ID != thisStream.ID { modifiedStreams = append(modifiedStreams, s) diff --git a/internal/processing/streaming/streamdelete.go b/internal/processing/streaming/streamdelete.go index 2282c29ae..cd541bc57 100644 --- a/internal/processing/streaming/streamdelete.go +++ b/internal/processing/streaming/streamdelete.go @@ -4,7 +4,7 @@ import ( "fmt" "strings" - "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/stream" ) func (p *processor) StreamDelete(statusID string) error { @@ -20,7 +20,7 @@ func (p *processor) StreamDelete(statusID string) error { } // the value of the map should be a buncha streams - streamsForAccount, ok := v.(*gtsmodel.StreamsForAccount) + streamsForAccount, ok := v.(*stream.StreamsForAccount) if !ok { errs = append(errs, fmt.Sprintf("stream map error for account stream %s", accountID)) } @@ -28,13 +28,13 @@ func (p *processor) StreamDelete(statusID string) error { // lock the streams while we work on them streamsForAccount.Lock() defer streamsForAccount.Unlock() - for _, stream := range streamsForAccount.Streams { + for _, s := range streamsForAccount.Streams { // lock each individual stream as we work on it - stream.Lock() - defer stream.Unlock() - if stream.Connected { - stream.Messages <- >smodel.Message{ - Stream: []string{stream.Type}, + s.Lock() + defer s.Unlock() + if s.Connected { + s.Messages <- &stream.Message{ + Stream: []string{s.Type}, Event: "delete", Payload: statusID, } diff --git a/internal/processing/streaming/streaming.go b/internal/processing/streaming/streaming.go index f349a655a..610d4a9d2 100644 --- a/internal/processing/streaming/streaming.go +++ b/internal/processing/streaming/streaming.go @@ -11,6 +11,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/oauth" + "github.com/superseriousbusiness/gotosocial/internal/stream" "github.com/superseriousbusiness/gotosocial/internal/typeutils" "github.com/superseriousbusiness/gotosocial/internal/visibility" ) @@ -20,7 +21,7 @@ type Processor interface { // AuthorizeStreamingRequest returns an oauth2 token info in response to an access token query from the streaming API AuthorizeStreamingRequest(ctx context.Context, accessToken string) (*gtsmodel.Account, error) // OpenStreamForAccount returns a new Stream for the given account, which will contain a channel for passing messages back to the caller. - OpenStreamForAccount(ctx context.Context, account *gtsmodel.Account, streamType string) (*gtsmodel.Stream, gtserror.WithCode) + OpenStreamForAccount(ctx context.Context, account *gtsmodel.Account, streamType string) (*stream.Stream, gtserror.WithCode) // StreamStatusToAccount streams the given status to any open, appropriate streams belonging to the given account. StreamStatusToAccount(s *apimodel.Status, account *gtsmodel.Account) error // StreamNotificationToAccount streams the given notification to any open, appropriate streams belonging to the given account. diff --git a/internal/processing/streaming/streamnotification.go b/internal/processing/streaming/streamnotification.go index 24c8342ee..d8460874f 100644 --- a/internal/processing/streaming/streamnotification.go +++ b/internal/processing/streaming/streamnotification.go @@ -8,6 +8,7 @@ import ( "github.com/sirupsen/logrus" apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/stream" ) func (p *processor) StreamNotificationToAccount(n *apimodel.Notification, account *gtsmodel.Account) error { @@ -21,7 +22,7 @@ func (p *processor) StreamNotificationToAccount(n *apimodel.Notification, accoun return nil } - streamsForAccount, ok := v.(*gtsmodel.StreamsForAccount) + streamsForAccount, ok := v.(*stream.StreamsForAccount) if !ok { return errors.New("stream map error") } @@ -33,13 +34,13 @@ func (p *processor) StreamNotificationToAccount(n *apimodel.Notification, accoun streamsForAccount.Lock() defer streamsForAccount.Unlock() - for _, stream := range streamsForAccount.Streams { - stream.Lock() - defer stream.Unlock() - if stream.Connected { - l.Debugf("streaming notification to stream id %s", stream.ID) - stream.Messages <- >smodel.Message{ - Stream: []string{stream.Type}, + for _, s := range streamsForAccount.Streams { + s.Lock() + defer s.Unlock() + if s.Connected { + l.Debugf("streaming notification to stream id %s", s.ID) + s.Messages <- &stream.Message{ + Stream: []string{s.Type}, Event: "notification", Payload: string(notificationBytes), } diff --git a/internal/processing/streaming/streamstatus.go b/internal/processing/streaming/streamstatus.go index 8d026252d..f4d6b2629 100644 --- a/internal/processing/streaming/streamstatus.go +++ b/internal/processing/streaming/streamstatus.go @@ -8,6 +8,7 @@ import ( "github.com/sirupsen/logrus" apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/stream" ) func (p *processor) StreamStatusToAccount(s *apimodel.Status, account *gtsmodel.Account) error { @@ -21,7 +22,7 @@ func (p *processor) StreamStatusToAccount(s *apimodel.Status, account *gtsmodel. return nil } - streamsForAccount, ok := v.(*gtsmodel.StreamsForAccount) + streamsForAccount, ok := v.(*stream.StreamsForAccount) if !ok { return errors.New("stream map error") } @@ -33,13 +34,13 @@ func (p *processor) StreamStatusToAccount(s *apimodel.Status, account *gtsmodel. streamsForAccount.Lock() defer streamsForAccount.Unlock() - for _, stream := range streamsForAccount.Streams { - stream.Lock() - defer stream.Unlock() - if stream.Connected { - l.Debugf("streaming status to stream id %s", stream.ID) - stream.Messages <- >smodel.Message{ - Stream: []string{stream.Type}, + for _, s := range streamsForAccount.Streams { + s.Lock() + defer s.Unlock() + if s.Connected { + l.Debugf("streaming status to stream id %s", s.ID) + s.Messages <- &stream.Message{ + Stream: []string{s.Type}, Event: "update", Payload: string(statusBytes), } |