summaryrefslogtreecommitdiff
path: root/internal/processing/stream/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/processing/stream/stream.go')
-rw-r--r--internal/processing/stream/stream.go46
1 files changed, 2 insertions, 44 deletions
diff --git a/internal/processing/stream/stream.go b/internal/processing/stream/stream.go
index a5b3b9386..0b7285b58 100644
--- a/internal/processing/stream/stream.go
+++ b/internal/processing/stream/stream.go
@@ -18,8 +18,6 @@
package stream
import (
- "sync"
-
"github.com/superseriousbusiness/gotosocial/internal/oauth"
"github.com/superseriousbusiness/gotosocial/internal/state"
"github.com/superseriousbusiness/gotosocial/internal/stream"
@@ -28,53 +26,13 @@ import (
type Processor struct {
state *state.State
oauthServer oauth.Server
- streamMap *sync.Map
+ streams stream.Streams
}
func New(state *state.State, oauthServer oauth.Server) Processor {
return Processor{
state: state,
oauthServer: oauthServer,
- streamMap: &sync.Map{},
- }
-}
-
-// toAccount streams the given payload with the given event type to any streams currently open for the given account ID.
-func (p *Processor) toAccount(payload string, event string, streamTypes []string, accountID string) error {
- // Load all streams open for this account.
- v, ok := p.streamMap.Load(accountID)
- if !ok {
- return nil // No entry = nothing to stream.
+ streams: stream.Streams{},
}
- streamsForAccount := v.(*stream.StreamsForAccount)
-
- streamsForAccount.Lock()
- defer streamsForAccount.Unlock()
-
- for _, s := range streamsForAccount.Streams {
- s.Lock()
- defer s.Unlock()
-
- if !s.Connected {
- continue
- }
-
- typeLoop:
- for _, streamType := range streamTypes {
- if _, found := s.StreamTypes[streamType]; found {
- s.Messages <- &stream.Message{
- Stream: []string{streamType},
- Event: string(event),
- Payload: payload,
- }
-
- // Break out to the outer loop,
- // to avoid sending duplicates of
- // the same event to the same stream.
- break typeLoop
- }
- }
- }
-
- return nil
}