diff options
Diffstat (limited to 'internal/api/client/streaming')
-rw-r--r-- | internal/api/client/streaming/stream.go | 60 | ||||
-rw-r--r-- | internal/api/client/streaming/streaming.go | 19 | ||||
-rw-r--r-- | internal/api/client/streaming/streaming_test.go | 2 |
3 files changed, 51 insertions, 30 deletions
diff --git a/internal/api/client/streaming/stream.go b/internal/api/client/streaming/stream.go index a9cb62732..de98719c2 100644 --- a/internal/api/client/streaming/stream.go +++ b/internal/api/client/streaming/stream.go @@ -1,3 +1,21 @@ +/* + GoToSocial + Copyright (C) 2021-2022 GoToSocial Authors admin@gotosocial.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + package streaming import ( @@ -6,7 +24,7 @@ import ( "time" "codeberg.org/gruf/go-kv" - "github.com/superseriousbusiness/gotosocial/internal/api" + apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/log" @@ -14,12 +32,15 @@ import ( "github.com/gorilla/websocket" ) -var wsUpgrader = websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, - // we expect cors requests (via eg., pinafore.social) so be lenient - CheckOrigin: func(r *http.Request) bool { return true }, -} +var ( + wsUpgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + // we expect cors requests (via eg., pinafore.social) so be lenient + CheckOrigin: func(r *http.Request) bool { return true }, + } + errNoToken = fmt.Errorf("no access token provided under query key %s or under header %s", AccessTokenQueryKey, AccessTokenHeader) +) // StreamGETHandler swagger:operation GET /api/v1/streaming streamGet // @@ -125,29 +146,33 @@ func (m *Module) StreamGETHandler(c *gin.Context) { streamType := c.Query(StreamQueryKey) if streamType == "" { err := fmt.Errorf("no stream type provided under query key %s", StreamQueryKey) - api.ErrorHandler(c, gtserror.NewErrorBadRequest(err, err.Error()), m.processor.InstanceGet) + apiutil.ErrorHandler(c, gtserror.NewErrorBadRequest(err, err.Error()), m.processor.InstanceGet) return } - accessToken := c.Query(AccessTokenQueryKey) - if accessToken == "" { - accessToken = c.GetHeader(AccessTokenHeader) - } - if accessToken == "" { - err := fmt.Errorf("no access token provided under query key %s or under header %s", AccessTokenQueryKey, AccessTokenHeader) - api.ErrorHandler(c, gtserror.NewErrorUnauthorized(err, err.Error()), m.processor.InstanceGet) + var accessToken string + if t := c.Query(AccessTokenQueryKey); t != "" { + // try query param first + accessToken = t + } else if t := c.GetHeader(AccessTokenHeader); t != "" { + // fall back to Sec-Websocket-Protocol + accessToken = t + } else { + // no token + err := errNoToken + apiutil.ErrorHandler(c, gtserror.NewErrorUnauthorized(err, err.Error()), m.processor.InstanceGet) return } account, errWithCode := m.processor.AuthorizeStreamingRequest(c.Request.Context(), accessToken) if errWithCode != nil { - api.ErrorHandler(c, errWithCode, m.processor.InstanceGet) + apiutil.ErrorHandler(c, errWithCode, m.processor.InstanceGet) return } stream, errWithCode := m.processor.OpenStreamForAccount(c.Request.Context(), account, streamType) if errWithCode != nil { - api.ErrorHandler(c, errWithCode, m.processor.InstanceGet) + apiutil.ErrorHandler(c, errWithCode, m.processor.InstanceGet) return } @@ -175,6 +200,7 @@ func (m *Module) StreamGETHandler(c *gin.Context) { }() streamTicker := time.NewTicker(m.tickDuration) + defer streamTicker.Stop() // We want to stay in the loop as long as possible while the client is connected. // The only thing that should break the loop is if the client leaves or the connection becomes unhealthy. diff --git a/internal/api/client/streaming/streaming.go b/internal/api/client/streaming/streaming.go index b15dfbdbd..f9d9fdf36 100644 --- a/internal/api/client/streaming/streaming.go +++ b/internal/api/client/streaming/streaming.go @@ -22,14 +22,13 @@ import ( "net/http" "time" - "github.com/superseriousbusiness/gotosocial/internal/api" + "github.com/gin-gonic/gin" "github.com/superseriousbusiness/gotosocial/internal/processing" - "github.com/superseriousbusiness/gotosocial/internal/router" ) const ( - // BasePath is the path for the streaming api - BasePath = "/api/v1/streaming" + // BasePath is the path for the streaming api, minus the 'api' prefix + BasePath = "/v1/streaming" // StreamQueryKey is the query key for the type of stream being requested StreamQueryKey = "stream" @@ -41,29 +40,25 @@ const ( AccessTokenHeader = "Sec-Websocket-Protocol" ) -// Module implements the api.ClientModule interface for everything related to streaming type Module struct { processor processing.Processor tickDuration time.Duration } -// New returns a new streaming module -func New(processor processing.Processor) api.ClientModule { +func New(processor processing.Processor) *Module { return &Module{ processor: processor, tickDuration: 30 * time.Second, } } -func NewWithTickDuration(processor processing.Processor, tickDuration time.Duration) api.ClientModule { +func NewWithTickDuration(processor processing.Processor, tickDuration time.Duration) *Module { return &Module{ processor: processor, tickDuration: tickDuration, } } -// Route attaches all routes from this module to the given router -func (m *Module) Route(r router.Router) error { - r.AttachHandler(http.MethodGet, BasePath, m.StreamGETHandler) - return nil +func (m *Module) Route(attachHandler func(method string, path string, f ...gin.HandlerFunc) gin.IRoutes) { + attachHandler(http.MethodGet, BasePath, m.StreamGETHandler) } diff --git a/internal/api/client/streaming/streaming_test.go b/internal/api/client/streaming/streaming_test.go index 49c983fff..2f2d850c1 100644 --- a/internal/api/client/streaming/streaming_test.go +++ b/internal/api/client/streaming/streaming_test.go @@ -99,7 +99,7 @@ func (suite *StreamingTestSuite) SetupTest() { suite.federator = testrig.NewTestFederator(suite.db, testrig.NewTestTransportController(testrig.NewMockHTTPClient(nil, "../../../../testrig/media"), suite.db, fedWorker), suite.storage, suite.mediaManager, fedWorker) suite.emailSender = testrig.NewEmailSender("../../../../web/template/", nil) suite.processor = testrig.NewTestProcessor(suite.db, suite.storage, suite.federator, suite.emailSender, suite.mediaManager, clientWorker, fedWorker) - suite.streamingModule = streaming.NewWithTickDuration(suite.processor, 1).(*streaming.Module) + suite.streamingModule = streaming.NewWithTickDuration(suite.processor, 1) suite.NoError(suite.processor.Start()) } |