diff options
author | 2021-09-01 18:29:25 +0200 | |
---|---|---|
committer | 2021-09-01 18:29:25 +0200 | |
commit | 4696e1a7b389599fa981f334b343daa911b11f5d (patch) | |
tree | d1ca0c896cdacb82ad7c64ee150aa32b37d4c053 /internal/processing/streaming/openstream.go | |
parent | move oauth models into gtsmodel (diff) | |
download | gotosocial-4696e1a7b389599fa981f334b343daa911b11f5d.tar.xz |
moving stuff around
Diffstat (limited to 'internal/processing/streaming/openstream.go')
-rw-r--r-- | internal/processing/streaming/openstream.go | 19 |
1 files changed, 10 insertions, 9 deletions
diff --git a/internal/processing/streaming/openstream.go b/internal/processing/streaming/openstream.go index dfad5398e..d4e4eef9f 100644 --- a/internal/processing/streaming/openstream.go +++ b/internal/processing/streaming/openstream.go @@ -9,9 +9,10 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/id" + "github.com/superseriousbusiness/gotosocial/internal/stream" ) -func (p *processor) OpenStreamForAccount(ctx context.Context, account *gtsmodel.Account, streamType string) (*gtsmodel.Stream, gtserror.WithCode) { +func (p *processor) OpenStreamForAccount(ctx context.Context, account *gtsmodel.Account, streamType string) (*stream.Stream, gtserror.WithCode) { l := p.log.WithFields(logrus.Fields{ "func": "OpenStreamForAccount", "account": account.ID, @@ -25,10 +26,10 @@ func (p *processor) OpenStreamForAccount(ctx context.Context, account *gtsmodel. return nil, gtserror.NewErrorInternalError(fmt.Errorf("error generating stream id: %s", err)) } - thisStream := >smodel.Stream{ + thisStream := &stream.Stream{ ID: streamID, Type: streamType, - Messages: make(chan *gtsmodel.Message, 100), + Messages: make(chan *stream.Message, 100), Hangup: make(chan interface{}, 1), Connected: true, } @@ -37,8 +38,8 @@ func (p *processor) OpenStreamForAccount(ctx context.Context, account *gtsmodel. v, ok := p.streamMap.Load(account.ID) if !ok || v == nil { // there is no entry in the streamMap for this account yet, so make one and store it - streamsForAccount := >smodel.StreamsForAccount{ - Streams: []*gtsmodel.Stream{ + streamsForAccount := &stream.StreamsForAccount{ + Streams: []*stream.Stream{ thisStream, }, } @@ -46,7 +47,7 @@ func (p *processor) OpenStreamForAccount(ctx context.Context, account *gtsmodel. } else { // there is an entry in the streamMap for this account // parse the interface as a streamsForAccount - streamsForAccount, ok := v.(*gtsmodel.StreamsForAccount) + streamsForAccount, ok := v.(*stream.StreamsForAccount) if !ok { return nil, gtserror.NewErrorInternalError(errors.New("stream map error")) } @@ -63,7 +64,7 @@ func (p *processor) OpenStreamForAccount(ctx context.Context, account *gtsmodel. // waitToCloseStream waits until the hangup channel is closed for the given stream. // It then iterates through the map of streams stored by the processor, removes the stream from it, // and then closes the messages channel of the stream to indicate that the channel should no longer be read from. -func (p *processor) waitToCloseStream(account *gtsmodel.Account, thisStream *gtsmodel.Stream) { +func (p *processor) waitToCloseStream(account *gtsmodel.Account, thisStream *stream.Stream) { <-thisStream.Hangup // wait for a hangup message // lock the stream to prevent more messages being put in it while we work @@ -78,7 +79,7 @@ func (p *processor) waitToCloseStream(account *gtsmodel.Account, thisStream *gts if !ok || v == nil { return } - streamsForAccount, ok := v.(*gtsmodel.StreamsForAccount) + streamsForAccount, ok := v.(*stream.StreamsForAccount) if !ok { return } @@ -88,7 +89,7 @@ func (p *processor) waitToCloseStream(account *gtsmodel.Account, thisStream *gts defer streamsForAccount.Unlock() // put everything into modified streams *except* the stream we're removing - modifiedStreams := []*gtsmodel.Stream{} + modifiedStreams := []*stream.Stream{} for _, s := range streamsForAccount.Streams { if s.ID != thisStream.ID { modifiedStreams = append(modifiedStreams, s) |