diff options
| author | 2021-06-19 11:18:55 +0200 | |
|---|---|---|
| committer | 2021-06-19 11:18:55 +0200 | |
| commit | aa8a0d08501cbb22400a67ece85c45fdfbdc6131 (patch) | |
| tree | 4a4581fb8f1c9bf8cac742be15d7a57eec170a1b /internal/api/client | |
| parent | update CONTRIBUTING with css bundling instructions, and go fmt (#48) (diff) | |
| download | gotosocial-aa8a0d08501cbb22400a67ece85c45fdfbdc6131.tar.xz | |
Streaming (#49)
Add new status and notification websocket streaming capabilities
Diffstat (limited to 'internal/api/client')
| -rw-r--r-- | internal/api/client/streaming/stream.go | 89 | ||||
| -rw-r--r-- | internal/api/client/streaming/streaming.go | 62 | 
2 files changed, 151 insertions, 0 deletions
| diff --git a/internal/api/client/streaming/stream.go b/internal/api/client/streaming/stream.go new file mode 100644 index 000000000..a5b8a2d99 --- /dev/null +++ b/internal/api/client/streaming/stream.go @@ -0,0 +1,89 @@ +package streaming + +import ( +	"fmt" +	"net/http" +	"time" + +	"github.com/gin-gonic/gin" +	"github.com/gorilla/websocket" +) + +// StreamGETHandler handles the creation of a new websocket streaming request. +func (m *Module) StreamGETHandler(c *gin.Context) { +	l := m.log.WithField("func", "StreamGETHandler") + +	streamType := c.Query(StreamQueryKey) +	if streamType == "" { +		c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("no stream type provided under query key %s", StreamQueryKey)}) +		return +	} + +	accessToken := c.Query(AccessTokenQueryKey) +	if accessToken == "" { +		c.JSON(http.StatusUnauthorized, gin.H{"error": fmt.Sprintf("no access token provided under query key %s", AccessTokenQueryKey)}) +		return +	} + +	// make sure a valid token has been provided and obtain the associated account +	account, err := m.processor.AuthorizeStreamingRequest(accessToken) +	if err != nil { +		c.JSON(http.StatusUnauthorized, gin.H{"error": "could not authorize with given token"}) +		return +	} + +	// prepare to upgrade the connection to a websocket connection +	upgrader := websocket.Upgrader{ +		ReadBufferSize:  1024, +		WriteBufferSize: 1024, +		CheckOrigin: func(r *http.Request) bool { +			// we fully expect cors requests (via something like pinafore.social) so we should be lenient here +			return true +		}, +	} + +	// do the actual upgrade here +	conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) +	if err != nil { +		l.Infof("error upgrading websocket connection: %s", err) +		return +	} +	defer conn.Close() // whatever happens, when we leave this function we want to close the websocket connection + +	// inform the processor that we have a new connection and want a stream for it +	stream, errWithCode := m.processor.OpenStreamForAccount(account, streamType) +	if errWithCode != nil { +		c.JSON(errWithCode.Code(), errWithCode.Safe()) +		return +	} +	defer close(stream.Hangup) // closing stream.Hangup indicates that we've finished with the connection (the client has gone), so we want to do this on exiting this handler + +	// spawn a new ticker for pinging the connection periodically +	t := time.NewTicker(30 * time.Second) + +	// we want to stay in the sendloop as long as possible while the client is connected -- the only thing that should break the loop is if the client leaves or something else goes wrong +sendLoop: +	for { +		select { +		case m := <-stream.Messages: +			// we've got a streaming message!! +			l.Debug("received message from stream") +			if err := conn.WriteJSON(m); err != nil { +				l.Infof("error writing json to websocket connection: %s", err) +				// if something is wrong we want to bail and drop the connection -- the client will create a new one +				break sendLoop +			} +			l.Debug("wrote message into websocket connection") +		case <-t.C: +			l.Debug("received TICK from ticker") +			if err := conn.WriteMessage(websocket.PingMessage, []byte(": ping")); err != nil { +				l.Infof("error writing ping to websocket connection: %s", err) +				// if something is wrong we want to bail and drop the connection -- the client will create a new one +				break sendLoop +			} +			l.Debug("wrote ping message into websocket connection") +		} +	} + +	l.Debug("leaving StreamGETHandler") +} diff --git a/internal/api/client/streaming/streaming.go b/internal/api/client/streaming/streaming.go new file mode 100644 index 000000000..92dfccde8 --- /dev/null +++ b/internal/api/client/streaming/streaming.go @@ -0,0 +1,62 @@ +/* +   GoToSocial +   Copyright (C) 2021 GoToSocial Authors admin@gotosocial.org + +   This program is free software: you can redistribute it and/or modify +   it under the terms of the GNU Affero General Public License as published by +   the Free Software Foundation, either version 3 of the License, or +   (at your option) any later version. + +   This program is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU Affero General Public License for more details. + +   You should have received a copy of the GNU Affero General Public License +   along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +package streaming + +import ( +	"net/http" + +	"github.com/sirupsen/logrus" +	"github.com/superseriousbusiness/gotosocial/internal/api" +	"github.com/superseriousbusiness/gotosocial/internal/config" +	"github.com/superseriousbusiness/gotosocial/internal/processing" +	"github.com/superseriousbusiness/gotosocial/internal/router" +) + +const ( +	// BasePath is the path for the streaming api +	BasePath = "/api/v1/streaming" + +	// StreamQueryKey is the query key for the type of stream being requested +	StreamQueryKey = "stream" + +	// AccessTokenQueryKey is the query key for an oauth access token that should be passed in streaming requests. +	AccessTokenQueryKey = "access_token" +) + +// Module implements the api.ClientModule interface for everything related to streaming +type Module struct { +	config    *config.Config +	processor processing.Processor +	log       *logrus.Logger +} + +// New returns a new streaming module +func New(config *config.Config, processor processing.Processor, log *logrus.Logger) api.ClientModule { +	return &Module{ +		config:    config, +		processor: processor, +		log:       log, +	} +} + +// Route attaches all routes from this module to the given router +func (m *Module) Route(r router.Router) error { +	r.AttachHandler(http.MethodGet, BasePath, m.StreamGETHandler) +	return nil +} | 
