summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLibravatar tobi <31960611+tsmethurst@users.noreply.github.com>2024-10-18 15:43:09 +0200
committerLibravatar GitHub <noreply@github.com>2024-10-18 15:43:09 +0200
commitffc86f9092507799b63d939e8444311757b0239f (patch)
treecb0801e831e8c3bc6833560adba430cd774d4590
parent[chore] Set some additional git attributes (#3454) (diff)
downloadgotosocial-ffc86f9092507799b63d939e8444311757b0239f.tar.xz
[bugfix] Fix occasionally streaming empty messages (#3456)
-rw-r--r--internal/api/client/streaming/stream.go62
1 files changed, 39 insertions, 23 deletions
diff --git a/internal/api/client/streaming/stream.go b/internal/api/client/streaming/stream.go
index 900df4383..eba040cba 100644
--- a/internal/api/client/streaming/stream.go
+++ b/internal/api/client/streaming/stream.go
@@ -35,6 +35,8 @@ import (
"github.com/gorilla/websocket"
)
+var pingMsg = []byte("ping!")
+
// StreamGETHandler swagger:operation GET /api/v1/streaming streamGet
//
// Initiate a websocket connection for live streaming of statuses and notifications.
@@ -389,40 +391,54 @@ func (m *Module) writeToWSConn(
) {
for {
// Wrap context with timeout to send a ping.
- pingctx, cncl := context.WithTimeout(ctx, ping)
+ pingCtx, cncl := context.WithTimeout(ctx, ping)
- // Block on receipt of msg.
- msg, ok := stream.Recv(pingctx)
+ // Block and wait for
+ // one of the following:
+ //
+ // - receipt of msg
+ // - timeout of pingCtx
+ // - stream closed.
+ msg, gotMsg := stream.Recv(pingCtx)
- // Check if cancel because ping.
- pinged := (pingctx.Err() != nil)
+ // If ping context has timed
+ // out, we should send a ping.
+ //
+ // In any case cancel pingCtx
+ // as we're done with it.
+ shouldPing := (pingCtx.Err() != nil)
cncl()
switch {
- case !ok && pinged:
- // The ping context timed out!
- l.Trace("writing websocket ping")
- // Wrapped context time-out, send a keep-alive "ping".
- if err := wsConn.WriteControl(websocket.PingMessage, nil, time.Time{}); err != nil {
+ // We have a message to stream.
+ case gotMsg:
+ l.Tracef("writing websocket message: %+v", msg)
+ if err := wsConn.WriteJSON(msg); err != nil {
+ // If there's an error writing then drop the
+ // connection, as client may have disappeared
+ // suddenly; they can reconnect if necessary.
+ l.Debugf("error writing websocket message: %v", err)
+ break
+ }
+
+ // We have no message but we
+ // need to send a keep-alive ping.
+ case shouldPing:
+ l.Trace("writing websocket ping")
+ if err := wsConn.WriteControl(websocket.PingMessage, pingMsg, time.Time{}); err != nil {
+ // If there's an error writing then drop the
+ // connection, as client may have disappeared
+ // suddenly; they can reconnect if necessary.
l.Debugf("error writing websocket ping: %v", err)
break
}
- case !ok:
- // Stream was
- // closed.
+ // We have no message and we shouldn't
+ // send a ping; this means the stream
+ // has been closed from the client's end.
+ default:
return
}
-
- l.Trace("writing websocket message: %+v", msg)
-
- // Received a new message from the processor.
- if err := wsConn.WriteJSON(msg); err != nil {
- l.Debugf("error writing websocket message: %v", err)
- break
- }
}
-
- l.Debug("finished websocket write")
}