summaryrefslogtreecommitdiff
path: root/internal/processing/stream/open.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/processing/stream/open.go')
-rw-r--r--internal/processing/stream/open.go97
1 files changed, 1 insertions, 96 deletions
diff --git a/internal/processing/stream/open.go b/internal/processing/stream/open.go
index 1c041309f..2f2bbd4a3 100644
--- a/internal/processing/stream/open.go
+++ b/internal/processing/stream/open.go
@@ -19,13 +19,10 @@ package stream
import (
"context"
- "errors"
- "fmt"
"codeberg.org/gruf/go-kv"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
- "github.com/superseriousbusiness/gotosocial/internal/id"
"github.com/superseriousbusiness/gotosocial/internal/log"
"github.com/superseriousbusiness/gotosocial/internal/stream"
)
@@ -37,97 +34,5 @@ func (p *Processor) Open(ctx context.Context, account *gtsmodel.Account, streamT
{"streamType", streamType},
}...)
l.Debug("received open stream request")
-
- var (
- streamID string
- err error
- )
-
- // Each stream needs a unique ID so we know to close it.
- streamID, err = id.NewRandomULID()
- if err != nil {
- return nil, gtserror.NewErrorInternalError(fmt.Errorf("error generating stream id: %w", err))
- }
-
- // Each stream can be subscibed to multiple types.
- // Record them in a set, and include the initial one
- // if it was given to us.
- streamTypes := map[string]any{}
- if streamType != "" {
- streamTypes[streamType] = true
- }
-
- newStream := &stream.Stream{
- ID: streamID,
- StreamTypes: streamTypes,
- Messages: make(chan *stream.Message, 100),
- Hangup: make(chan interface{}, 1),
- Connected: true,
- }
- go p.waitToCloseStream(account, newStream)
-
- v, ok := p.streamMap.Load(account.ID)
- if ok {
- // There is an entry in the streamMap
- // for this account. Parse it out.
- streamsForAccount, ok := v.(*stream.StreamsForAccount)
- if !ok {
- return nil, gtserror.NewErrorInternalError(errors.New("stream map error"))
- }
-
- // Append new stream to existing entry.
- streamsForAccount.Lock()
- streamsForAccount.Streams = append(streamsForAccount.Streams, newStream)
- streamsForAccount.Unlock()
- } else {
- // There is no entry in the streamMap for
- // this account yet. Create one and store it.
- p.streamMap.Store(account.ID, &stream.StreamsForAccount{
- Streams: []*stream.Stream{
- newStream,
- },
- })
- }
-
- return newStream, nil
-}
-
-// waitToCloseStream waits until the hangup channel is closed for the given stream.
-// It then iterates through the map of streams stored by the processor, removes the stream from it,
-// and then closes the messages channel of the stream to indicate that the channel should no longer be read from.
-func (p *Processor) waitToCloseStream(account *gtsmodel.Account, thisStream *stream.Stream) {
- <-thisStream.Hangup // wait for a hangup message
-
- // lock the stream to prevent more messages being put in it while we work
- thisStream.Lock()
- defer thisStream.Unlock()
-
- // indicate the stream is no longer connected
- thisStream.Connected = false
-
- // load and parse the entry for this account from the stream map
- v, ok := p.streamMap.Load(account.ID)
- if !ok || v == nil {
- return
- }
- streamsForAccount, ok := v.(*stream.StreamsForAccount)
- if !ok {
- return
- }
-
- // lock the streams for account while we remove this stream from its slice
- streamsForAccount.Lock()
- defer streamsForAccount.Unlock()
-
- // put everything into modified streams *except* the stream we're removing
- modifiedStreams := []*stream.Stream{}
- for _, s := range streamsForAccount.Streams {
- if s.ID != thisStream.ID {
- modifiedStreams = append(modifiedStreams, s)
- }
- }
- streamsForAccount.Streams = modifiedStreams
-
- // finally close the messages channel so no more messages can be read from it
- close(thisStream.Messages)
+ return p.streams.Open(account.ID, streamType), nil
}