diff options
Diffstat (limited to 'internal/processing/stream/open.go')
-rw-r--r-- | internal/processing/stream/open.go | 97 |
1 files changed, 1 insertions, 96 deletions
diff --git a/internal/processing/stream/open.go b/internal/processing/stream/open.go index 1c041309f..2f2bbd4a3 100644 --- a/internal/processing/stream/open.go +++ b/internal/processing/stream/open.go @@ -19,13 +19,10 @@ package stream import ( "context" - "errors" - "fmt" "codeberg.org/gruf/go-kv" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" - "github.com/superseriousbusiness/gotosocial/internal/id" "github.com/superseriousbusiness/gotosocial/internal/log" "github.com/superseriousbusiness/gotosocial/internal/stream" ) @@ -37,97 +34,5 @@ func (p *Processor) Open(ctx context.Context, account *gtsmodel.Account, streamT {"streamType", streamType}, }...) l.Debug("received open stream request") - - var ( - streamID string - err error - ) - - // Each stream needs a unique ID so we know to close it. - streamID, err = id.NewRandomULID() - if err != nil { - return nil, gtserror.NewErrorInternalError(fmt.Errorf("error generating stream id: %w", err)) - } - - // Each stream can be subscibed to multiple types. - // Record them in a set, and include the initial one - // if it was given to us. - streamTypes := map[string]any{} - if streamType != "" { - streamTypes[streamType] = true - } - - newStream := &stream.Stream{ - ID: streamID, - StreamTypes: streamTypes, - Messages: make(chan *stream.Message, 100), - Hangup: make(chan interface{}, 1), - Connected: true, - } - go p.waitToCloseStream(account, newStream) - - v, ok := p.streamMap.Load(account.ID) - if ok { - // There is an entry in the streamMap - // for this account. Parse it out. - streamsForAccount, ok := v.(*stream.StreamsForAccount) - if !ok { - return nil, gtserror.NewErrorInternalError(errors.New("stream map error")) - } - - // Append new stream to existing entry. - streamsForAccount.Lock() - streamsForAccount.Streams = append(streamsForAccount.Streams, newStream) - streamsForAccount.Unlock() - } else { - // There is no entry in the streamMap for - // this account yet. Create one and store it. - p.streamMap.Store(account.ID, &stream.StreamsForAccount{ - Streams: []*stream.Stream{ - newStream, - }, - }) - } - - return newStream, nil -} - -// waitToCloseStream waits until the hangup channel is closed for the given stream. -// It then iterates through the map of streams stored by the processor, removes the stream from it, -// and then closes the messages channel of the stream to indicate that the channel should no longer be read from. -func (p *Processor) waitToCloseStream(account *gtsmodel.Account, thisStream *stream.Stream) { - <-thisStream.Hangup // wait for a hangup message - - // lock the stream to prevent more messages being put in it while we work - thisStream.Lock() - defer thisStream.Unlock() - - // indicate the stream is no longer connected - thisStream.Connected = false - - // load and parse the entry for this account from the stream map - v, ok := p.streamMap.Load(account.ID) - if !ok || v == nil { - return - } - streamsForAccount, ok := v.(*stream.StreamsForAccount) - if !ok { - return - } - - // lock the streams for account while we remove this stream from its slice - streamsForAccount.Lock() - defer streamsForAccount.Unlock() - - // put everything into modified streams *except* the stream we're removing - modifiedStreams := []*stream.Stream{} - for _, s := range streamsForAccount.Streams { - if s.ID != thisStream.ID { - modifiedStreams = append(modifiedStreams, s) - } - } - streamsForAccount.Streams = modifiedStreams - - // finally close the messages channel so no more messages can be read from it - close(thisStream.Messages) + return p.streams.Open(account.ID, streamType), nil } |