diff options
Diffstat (limited to 'internal/api')
| -rw-r--r-- | internal/api/client/streaming/stream.go | 62 | 
1 files changed, 39 insertions, 23 deletions
diff --git a/internal/api/client/streaming/stream.go b/internal/api/client/streaming/stream.go index 900df4383..eba040cba 100644 --- a/internal/api/client/streaming/stream.go +++ b/internal/api/client/streaming/stream.go @@ -35,6 +35,8 @@ import (  	"github.com/gorilla/websocket"  ) +var pingMsg = []byte("ping!") +  // StreamGETHandler swagger:operation GET /api/v1/streaming streamGet  //  // Initiate a websocket connection for live streaming of statuses and notifications. @@ -389,40 +391,54 @@ func (m *Module) writeToWSConn(  ) {  	for {  		// Wrap context with timeout to send a ping. -		pingctx, cncl := context.WithTimeout(ctx, ping) +		pingCtx, cncl := context.WithTimeout(ctx, ping) -		// Block on receipt of msg. -		msg, ok := stream.Recv(pingctx) +		// Block and wait for +		// one of the following: +		// +		// - receipt of msg +		// - timeout of pingCtx +		// - stream closed. +		msg, gotMsg := stream.Recv(pingCtx) -		// Check if cancel because ping. -		pinged := (pingctx.Err() != nil) +		// If ping context has timed +		// out, we should send a ping. +		// +		// In any case cancel pingCtx +		// as we're done with it. +		shouldPing := (pingCtx.Err() != nil)  		cncl()  		switch { -		case !ok && pinged: -			// The ping context timed out! -			l.Trace("writing websocket ping") -			// Wrapped context time-out, send a keep-alive "ping". -			if err := wsConn.WriteControl(websocket.PingMessage, nil, time.Time{}); err != nil { +		// We have a message to stream. +		case gotMsg: +			l.Tracef("writing websocket message: %+v", msg) +			if err := wsConn.WriteJSON(msg); err != nil { +				// If there's an error writing then drop the +				// connection, as client may have disappeared +				// suddenly; they can reconnect if necessary. +				l.Debugf("error writing websocket message: %v", err) +				break +			} + +		// We have no message but we +		// need to send a keep-alive ping. +		case shouldPing: +			l.Trace("writing websocket ping") +			if err := wsConn.WriteControl(websocket.PingMessage, pingMsg, time.Time{}); err != nil { +				// If there's an error writing then drop the +				// connection, as client may have disappeared +				// suddenly; they can reconnect if necessary.  				l.Debugf("error writing websocket ping: %v", err)  				break  			} -		case !ok: -			// Stream was -			// closed. +		// We have no message and we shouldn't +		// send a ping; this means the stream +		// has been closed from the client's end. +		default:  			return  		} - -		l.Trace("writing websocket message: %+v", msg) - -		// Received a new message from the processor. -		if err := wsConn.WriteJSON(msg); err != nil { -			l.Debugf("error writing websocket message: %v", err) -			break -		}  	} - -	l.Debug("finished websocket write")  }  | 
