diff options
| author | 2021-09-01 18:29:25 +0200 | |
|---|---|---|
| committer | 2021-09-01 18:29:25 +0200 | |
| commit | 4696e1a7b389599fa981f334b343daa911b11f5d (patch) | |
| tree | d1ca0c896cdacb82ad7c64ee150aa32b37d4c053 /internal/processing/streaming | |
| parent | move oauth models into gtsmodel (diff) | |
| download | gotosocial-4696e1a7b389599fa981f334b343daa911b11f5d.tar.xz | |
moving stuff around
Diffstat (limited to 'internal/processing/streaming')
| -rw-r--r-- | internal/processing/streaming/openstream.go | 19 | ||||
| -rw-r--r-- | internal/processing/streaming/streamdelete.go | 16 | ||||
| -rw-r--r-- | internal/processing/streaming/streaming.go | 3 | ||||
| -rw-r--r-- | internal/processing/streaming/streamnotification.go | 17 | ||||
| -rw-r--r-- | internal/processing/streaming/streamstatus.go | 17 | 
5 files changed, 38 insertions, 34 deletions
diff --git a/internal/processing/streaming/openstream.go b/internal/processing/streaming/openstream.go index dfad5398e..d4e4eef9f 100644 --- a/internal/processing/streaming/openstream.go +++ b/internal/processing/streaming/openstream.go @@ -9,9 +9,10 @@ import (  	"github.com/superseriousbusiness/gotosocial/internal/gtserror"  	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"  	"github.com/superseriousbusiness/gotosocial/internal/id" +	"github.com/superseriousbusiness/gotosocial/internal/stream"  ) -func (p *processor) OpenStreamForAccount(ctx context.Context, account *gtsmodel.Account, streamType string) (*gtsmodel.Stream, gtserror.WithCode) { +func (p *processor) OpenStreamForAccount(ctx context.Context, account *gtsmodel.Account, streamType string) (*stream.Stream, gtserror.WithCode) {  	l := p.log.WithFields(logrus.Fields{  		"func":       "OpenStreamForAccount",  		"account":    account.ID, @@ -25,10 +26,10 @@ func (p *processor) OpenStreamForAccount(ctx context.Context, account *gtsmodel.  		return nil, gtserror.NewErrorInternalError(fmt.Errorf("error generating stream id: %s", err))  	} -	thisStream := >smodel.Stream{ +	thisStream := &stream.Stream{  		ID:        streamID,  		Type:      streamType, -		Messages:  make(chan *gtsmodel.Message, 100), +		Messages:  make(chan *stream.Message, 100),  		Hangup:    make(chan interface{}, 1),  		Connected: true,  	} @@ -37,8 +38,8 @@ func (p *processor) OpenStreamForAccount(ctx context.Context, account *gtsmodel.  	v, ok := p.streamMap.Load(account.ID)  	if !ok || v == nil {  		// there is no entry in the streamMap for this account yet, so make one and store it -		streamsForAccount := >smodel.StreamsForAccount{ -			Streams: []*gtsmodel.Stream{ +		streamsForAccount := &stream.StreamsForAccount{ +			Streams: []*stream.Stream{  				thisStream,  			},  		} @@ -46,7 +47,7 @@ func (p *processor) OpenStreamForAccount(ctx context.Context, account *gtsmodel.  	} else {  		// there is an entry in the streamMap for this account  		// parse the interface as a streamsForAccount -		streamsForAccount, ok := v.(*gtsmodel.StreamsForAccount) +		streamsForAccount, ok := v.(*stream.StreamsForAccount)  		if !ok {  			return nil, gtserror.NewErrorInternalError(errors.New("stream map error"))  		} @@ -63,7 +64,7 @@ func (p *processor) OpenStreamForAccount(ctx context.Context, account *gtsmodel.  // waitToCloseStream waits until the hangup channel is closed for the given stream.  // It then iterates through the map of streams stored by the processor, removes the stream from it,  // and then closes the messages channel of the stream to indicate that the channel should no longer be read from. -func (p *processor) waitToCloseStream(account *gtsmodel.Account, thisStream *gtsmodel.Stream) { +func (p *processor) waitToCloseStream(account *gtsmodel.Account, thisStream *stream.Stream) {  	<-thisStream.Hangup // wait for a hangup message  	// lock the stream to prevent more messages being put in it while we work @@ -78,7 +79,7 @@ func (p *processor) waitToCloseStream(account *gtsmodel.Account, thisStream *gts  	if !ok || v == nil {  		return  	} -	streamsForAccount, ok := v.(*gtsmodel.StreamsForAccount) +	streamsForAccount, ok := v.(*stream.StreamsForAccount)  	if !ok {  		return  	} @@ -88,7 +89,7 @@ func (p *processor) waitToCloseStream(account *gtsmodel.Account, thisStream *gts  	defer streamsForAccount.Unlock()  	// put everything into modified streams *except* the stream we're removing -	modifiedStreams := []*gtsmodel.Stream{} +	modifiedStreams := []*stream.Stream{}  	for _, s := range streamsForAccount.Streams {  		if s.ID != thisStream.ID {  			modifiedStreams = append(modifiedStreams, s) diff --git a/internal/processing/streaming/streamdelete.go b/internal/processing/streaming/streamdelete.go index 2282c29ae..cd541bc57 100644 --- a/internal/processing/streaming/streamdelete.go +++ b/internal/processing/streaming/streamdelete.go @@ -4,7 +4,7 @@ import (  	"fmt"  	"strings" -	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +	"github.com/superseriousbusiness/gotosocial/internal/stream"  )  func (p *processor) StreamDelete(statusID string) error { @@ -20,7 +20,7 @@ func (p *processor) StreamDelete(statusID string) error {  		}  		// the value of the map should be a buncha streams -		streamsForAccount, ok := v.(*gtsmodel.StreamsForAccount) +		streamsForAccount, ok := v.(*stream.StreamsForAccount)  		if !ok {  			errs = append(errs, fmt.Sprintf("stream map error for account stream %s", accountID))  		} @@ -28,13 +28,13 @@ func (p *processor) StreamDelete(statusID string) error {  		// lock the streams while we work on them  		streamsForAccount.Lock()  		defer streamsForAccount.Unlock() -		for _, stream := range streamsForAccount.Streams { +		for _, s := range streamsForAccount.Streams {  			// lock each individual stream as we work on it -			stream.Lock() -			defer stream.Unlock() -			if stream.Connected { -				stream.Messages <- >smodel.Message{ -					Stream:  []string{stream.Type}, +			s.Lock() +			defer s.Unlock() +			if s.Connected { +				s.Messages <- &stream.Message{ +					Stream:  []string{s.Type},  					Event:   "delete",  					Payload: statusID,  				} diff --git a/internal/processing/streaming/streaming.go b/internal/processing/streaming/streaming.go index f349a655a..610d4a9d2 100644 --- a/internal/processing/streaming/streaming.go +++ b/internal/processing/streaming/streaming.go @@ -11,6 +11,7 @@ import (  	"github.com/superseriousbusiness/gotosocial/internal/gtserror"  	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"  	"github.com/superseriousbusiness/gotosocial/internal/oauth" +	"github.com/superseriousbusiness/gotosocial/internal/stream"  	"github.com/superseriousbusiness/gotosocial/internal/typeutils"  	"github.com/superseriousbusiness/gotosocial/internal/visibility"  ) @@ -20,7 +21,7 @@ type Processor interface {  	// AuthorizeStreamingRequest returns an oauth2 token info in response to an access token query from the streaming API  	AuthorizeStreamingRequest(ctx context.Context, accessToken string) (*gtsmodel.Account, error)  	// OpenStreamForAccount returns a new Stream for the given account, which will contain a channel for passing messages back to the caller. -	OpenStreamForAccount(ctx context.Context, account *gtsmodel.Account, streamType string) (*gtsmodel.Stream, gtserror.WithCode) +	OpenStreamForAccount(ctx context.Context, account *gtsmodel.Account, streamType string) (*stream.Stream, gtserror.WithCode)  	// StreamStatusToAccount streams the given status to any open, appropriate streams belonging to the given account.  	StreamStatusToAccount(s *apimodel.Status, account *gtsmodel.Account) error  	// StreamNotificationToAccount streams the given notification to any open, appropriate streams belonging to the given account. diff --git a/internal/processing/streaming/streamnotification.go b/internal/processing/streaming/streamnotification.go index 24c8342ee..d8460874f 100644 --- a/internal/processing/streaming/streamnotification.go +++ b/internal/processing/streaming/streamnotification.go @@ -8,6 +8,7 @@ import (  	"github.com/sirupsen/logrus"  	apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"  	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +	"github.com/superseriousbusiness/gotosocial/internal/stream"  )  func (p *processor) StreamNotificationToAccount(n *apimodel.Notification, account *gtsmodel.Account) error { @@ -21,7 +22,7 @@ func (p *processor) StreamNotificationToAccount(n *apimodel.Notification, accoun  		return nil  	} -	streamsForAccount, ok := v.(*gtsmodel.StreamsForAccount) +	streamsForAccount, ok := v.(*stream.StreamsForAccount)  	if !ok {  		return errors.New("stream map error")  	} @@ -33,13 +34,13 @@ func (p *processor) StreamNotificationToAccount(n *apimodel.Notification, accoun  	streamsForAccount.Lock()  	defer streamsForAccount.Unlock() -	for _, stream := range streamsForAccount.Streams { -		stream.Lock() -		defer stream.Unlock() -		if stream.Connected { -			l.Debugf("streaming notification to stream id %s", stream.ID) -			stream.Messages <- >smodel.Message{ -				Stream:  []string{stream.Type}, +	for _, s := range streamsForAccount.Streams { +		s.Lock() +		defer s.Unlock() +		if s.Connected { +			l.Debugf("streaming notification to stream id %s", s.ID) +			s.Messages <- &stream.Message{ +				Stream:  []string{s.Type},  				Event:   "notification",  				Payload: string(notificationBytes),  			} diff --git a/internal/processing/streaming/streamstatus.go b/internal/processing/streaming/streamstatus.go index 8d026252d..f4d6b2629 100644 --- a/internal/processing/streaming/streamstatus.go +++ b/internal/processing/streaming/streamstatus.go @@ -8,6 +8,7 @@ import (  	"github.com/sirupsen/logrus"  	apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"  	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +	"github.com/superseriousbusiness/gotosocial/internal/stream"  )  func (p *processor) StreamStatusToAccount(s *apimodel.Status, account *gtsmodel.Account) error { @@ -21,7 +22,7 @@ func (p *processor) StreamStatusToAccount(s *apimodel.Status, account *gtsmodel.  		return nil  	} -	streamsForAccount, ok := v.(*gtsmodel.StreamsForAccount) +	streamsForAccount, ok := v.(*stream.StreamsForAccount)  	if !ok {  		return errors.New("stream map error")  	} @@ -33,13 +34,13 @@ func (p *processor) StreamStatusToAccount(s *apimodel.Status, account *gtsmodel.  	streamsForAccount.Lock()  	defer streamsForAccount.Unlock() -	for _, stream := range streamsForAccount.Streams { -		stream.Lock() -		defer stream.Unlock() -		if stream.Connected { -			l.Debugf("streaming status to stream id %s", stream.ID) -			stream.Messages <- >smodel.Message{ -				Stream:  []string{stream.Type}, +	for _, s := range streamsForAccount.Streams { +		s.Lock() +		defer s.Unlock() +		if s.Connected { +			l.Debugf("streaming status to stream id %s", s.ID) +			s.Messages <- &stream.Message{ +				Stream:  []string{s.Type},  				Event:   "update",  				Payload: string(statusBytes),  			}  | 
