summaryrefslogtreecommitdiff
path: root/internal/processing/stream/stream.go
diff options
context:
space:
mode:
authorLibravatar kim <89579420+NyaaaWhatsUpDoc@users.noreply.github.com>2024-02-20 18:07:49 +0000
committerLibravatar GitHub <noreply@github.com>2024-02-20 18:07:49 +0000
commit291e18099050ff9e19b8ee25c2ffad68d9baafef (patch)
tree0ad1be36b4c958830d1371f3b9a32f017c5dcff0 /internal/processing/stream/stream.go
parent[feature] Add `requested_by` to relationship model (#2672) (diff)
downloadgotosocial-291e18099050ff9e19b8ee25c2ffad68d9baafef.tar.xz
[bugfix] fix possible mutex lockup during streaming code (#2633)
* rewrite Stream{} to use much less mutex locking, update related code * use new context for the stream context * ensure stream gets closed on return of writeTo / readFrom WSConn() * ensure stream write timeout gets cancelled * remove embedded context type from Stream{}, reformat log messages for consistency * use c.Request.Context() for context passed into Stream().Open() * only return 1 boolean, fix tests to expect multiple stream types in messages * changes to ping logic * further improved ping logic * don't export unused function types, update message sending to only include relevant stream type * ensure stream gets closed :facepalm: * update to error log on failed json marshal (instead of panic) * inverse websocket read error checking to _ignore_ expected close errors
Diffstat (limited to 'internal/processing/stream/stream.go')
-rw-r--r--internal/processing/stream/stream.go46
1 files changed, 2 insertions, 44 deletions
diff --git a/internal/processing/stream/stream.go b/internal/processing/stream/stream.go
index a5b3b9386..0b7285b58 100644
--- a/internal/processing/stream/stream.go
+++ b/internal/processing/stream/stream.go
@@ -18,8 +18,6 @@
package stream
import (
- "sync"
-
"github.com/superseriousbusiness/gotosocial/internal/oauth"
"github.com/superseriousbusiness/gotosocial/internal/state"
"github.com/superseriousbusiness/gotosocial/internal/stream"
@@ -28,53 +26,13 @@ import (
type Processor struct {
state *state.State
oauthServer oauth.Server
- streamMap *sync.Map
+ streams stream.Streams
}
func New(state *state.State, oauthServer oauth.Server) Processor {
return Processor{
state: state,
oauthServer: oauthServer,
- streamMap: &sync.Map{},
- }
-}
-
-// toAccount streams the given payload with the given event type to any streams currently open for the given account ID.
-func (p *Processor) toAccount(payload string, event string, streamTypes []string, accountID string) error {
- // Load all streams open for this account.
- v, ok := p.streamMap.Load(accountID)
- if !ok {
- return nil // No entry = nothing to stream.
+ streams: stream.Streams{},
}
- streamsForAccount := v.(*stream.StreamsForAccount)
-
- streamsForAccount.Lock()
- defer streamsForAccount.Unlock()
-
- for _, s := range streamsForAccount.Streams {
- s.Lock()
- defer s.Unlock()
-
- if !s.Connected {
- continue
- }
-
- typeLoop:
- for _, streamType := range streamTypes {
- if _, found := s.StreamTypes[streamType]; found {
- s.Messages <- &stream.Message{
- Stream: []string{streamType},
- Event: string(event),
- Payload: payload,
- }
-
- // Break out to the outer loop,
- // to avoid sending duplicates of
- // the same event to the same stream.
- break typeLoop
- }
- }
- }
-
- return nil
}