diff options
Diffstat (limited to 'internal/api/client/streaming/stream.go')
-rw-r--r-- | internal/api/client/streaming/stream.go | 99 |
1 files changed, 54 insertions, 45 deletions
diff --git a/internal/api/client/streaming/stream.go b/internal/api/client/streaming/stream.go index c01bf2216..2c18e4e41 100644 --- a/internal/api/client/streaming/stream.go +++ b/internal/api/client/streaming/stream.go @@ -2,14 +2,24 @@ package streaming import ( "fmt" - "github.com/sirupsen/logrus" "net/http" "time" + "github.com/sirupsen/logrus" + "github.com/superseriousbusiness/gotosocial/internal/api" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/gin-gonic/gin" "github.com/gorilla/websocket" ) +var wsUpgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + // we expect cors requests (via eg., pinafore.social) so be lenient + CheckOrigin: func(r *http.Request) bool { return true }, +} + // StreamGETHandler swagger:operation GET /api/v1/streaming streamGet // // Initiate a websocket connection for live streaming of statuses and notifications. @@ -108,79 +118,78 @@ import ( // '400': // description: bad request func (m *Module) StreamGETHandler(c *gin.Context) { - l := logrus.WithField("func", "StreamGETHandler") - streamType := c.Query(StreamQueryKey) if streamType == "" { - c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("no stream type provided under query key %s", StreamQueryKey)}) + err := fmt.Errorf("no stream type provided under query key %s", StreamQueryKey) + api.ErrorHandler(c, gtserror.NewErrorBadRequest(err, err.Error()), m.processor.InstanceGet) return } accessToken := c.Query(AccessTokenQueryKey) if accessToken == "" { - c.JSON(http.StatusUnauthorized, gin.H{"error": fmt.Sprintf("no access token provided under query key %s", AccessTokenQueryKey)}) + err := fmt.Errorf("no access token provided under query key %s", AccessTokenQueryKey) + api.ErrorHandler(c, gtserror.NewErrorUnauthorized(err, err.Error()), m.processor.InstanceGet) return } - // make sure a valid token has been provided and obtain the associated account - account, err := m.processor.AuthorizeStreamingRequest(c.Request.Context(), accessToken) - if err != nil { - c.JSON(http.StatusUnauthorized, gin.H{"error": "could not authorize with given token"}) + account, errWithCode := m.processor.AuthorizeStreamingRequest(c.Request.Context(), accessToken) + if errWithCode != nil { + api.ErrorHandler(c, errWithCode, m.processor.InstanceGet) return } - // prepare to upgrade the connection to a websocket connection - upgrader := websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, - CheckOrigin: func(r *http.Request) bool { - // we fully expect cors requests (via something like pinafore.social) so we should be lenient here - return true - }, + stream, errWithCode := m.processor.OpenStreamForAccount(c.Request.Context(), account, streamType) + if errWithCode != nil { + api.ErrorHandler(c, errWithCode, m.processor.InstanceGet) + return } - // do the actual upgrade here - conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) + l := logrus.WithFields(logrus.Fields{ + "account": account.Username, + "path": BasePath, + "streamID": stream.ID, + "streamType": streamType, + }) + + wsConn, err := wsUpgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { - l.Infof("error upgrading websocket connection: %s", err) + // If the upgrade fails, then Upgrade replies to the client with an HTTP error response. + // Because websocket issues are a pretty common source of headaches, we should also log + // this at Error to make this plenty visible and help admins out a bit. + l.Errorf("error upgrading websocket connection: %s", err) + close(stream.Hangup) return } - defer conn.Close() // whatever happens, when we leave this function we want to close the websocket connection - // inform the processor that we have a new connection and want a s for it - s, errWithCode := m.processor.OpenStreamForAccount(c.Request.Context(), account, streamType) - if errWithCode != nil { - c.JSON(errWithCode.Code(), errWithCode.Safe()) - return - } - defer close(s.Hangup) // closing stream.Hangup indicates that we've finished with the connection (the client has gone), so we want to do this on exiting this handler + defer func() { + // cleanup + wsConn.Close() + close(stream.Hangup) + }() - // spawn a new ticker for pinging the connection periodically - t := time.NewTicker(30 * time.Second) + streamTicker := time.NewTicker(30 * time.Second) - // we want to stay in the sendloop as long as possible while the client is connected -- the only thing that should break the loop is if the client leaves or something else goes wrong -sendLoop: + // We want to stay in the loop as long as possible while the client is connected. + // The only thing that should break the loop is if the client leaves or the connection becomes unhealthy. + // + // If the loop does break, we expect the client to reattempt connection, so it's cheap to leave + try again +wsLoop: for { select { - case m := <-s.Messages: - // we've got a streaming message!! + case m := <-stream.Messages: l.Trace("received message from stream") - if err := conn.WriteJSON(m); err != nil { - l.Debugf("error writing json to websocket connection: %s", err) - // if something is wrong we want to bail and drop the connection -- the client will create a new one - break sendLoop + if err := wsConn.WriteJSON(m); err != nil { + l.Debugf("error writing json to websocket connection; breaking off: %s", err) + break wsLoop } l.Trace("wrote message into websocket connection") - case <-t.C: + case <-streamTicker.C: l.Trace("received TICK from ticker") - if err := conn.WriteMessage(websocket.PingMessage, []byte(": ping")); err != nil { - l.Debugf("error writing ping to websocket connection: %s", err) - // if something is wrong we want to bail and drop the connection -- the client will create a new one - break sendLoop + if err := wsConn.WriteMessage(websocket.PingMessage, []byte(": ping")); err != nil { + l.Debugf("error writing ping to websocket connection; breaking off: %s", err) + break wsLoop } l.Trace("wrote ping message into websocket connection") } } - - l.Trace("leaving StreamGETHandler") } |