diff options
Diffstat (limited to 'internal/api/client')
| -rw-r--r-- | internal/api/client/streaming/stream.go | 7 | ||||
| -rw-r--r-- | internal/api/client/streaming/streaming.go | 17 | ||||
| -rw-r--r-- | internal/api/client/streaming/streaming_test.go | 235 | 
3 files changed, 255 insertions, 4 deletions
| diff --git a/internal/api/client/streaming/stream.go b/internal/api/client/streaming/stream.go index a406de74b..a9cb62732 100644 --- a/internal/api/client/streaming/stream.go +++ b/internal/api/client/streaming/stream.go @@ -131,7 +131,10 @@ func (m *Module) StreamGETHandler(c *gin.Context) {  	accessToken := c.Query(AccessTokenQueryKey)  	if accessToken == "" { -		err := fmt.Errorf("no access token provided under query key %s", AccessTokenQueryKey) +		accessToken = c.GetHeader(AccessTokenHeader) +	} +	if accessToken == "" { +		err := fmt.Errorf("no access token provided under query key %s or under header %s", AccessTokenQueryKey, AccessTokenHeader)  		api.ErrorHandler(c, gtserror.NewErrorUnauthorized(err, err.Error()), m.processor.InstanceGet)  		return  	} @@ -171,7 +174,7 @@ func (m *Module) StreamGETHandler(c *gin.Context) {  		close(stream.Hangup)  	}() -	streamTicker := time.NewTicker(30 * time.Second) +	streamTicker := time.NewTicker(m.tickDuration)  	// We want to stay in the loop as long as possible while the client is connected.  	// The only thing that should break the loop is if the client leaves or the connection becomes unhealthy. diff --git a/internal/api/client/streaming/streaming.go b/internal/api/client/streaming/streaming.go index f09da4ed5..b15dfbdbd 100644 --- a/internal/api/client/streaming/streaming.go +++ b/internal/api/client/streaming/streaming.go @@ -20,6 +20,7 @@ package streaming  import (  	"net/http" +	"time"  	"github.com/superseriousbusiness/gotosocial/internal/api"  	"github.com/superseriousbusiness/gotosocial/internal/processing" @@ -35,17 +36,29 @@ const (  	// AccessTokenQueryKey is the query key for an oauth access token that should be passed in streaming requests.  	AccessTokenQueryKey = "access_token" +	// AccessTokenHeader is the header for an oauth access token that can be passed in streaming requests instead of AccessTokenQueryKey +	//nolint:gosec +	AccessTokenHeader = "Sec-Websocket-Protocol"  )  // Module implements the api.ClientModule interface for everything related to streaming  type Module struct { -	processor processing.Processor +	processor    processing.Processor +	tickDuration time.Duration  }  // New returns a new streaming module  func New(processor processing.Processor) api.ClientModule {  	return &Module{ -		processor: processor, +		processor:    processor, +		tickDuration: 30 * time.Second, +	} +} + +func NewWithTickDuration(processor processing.Processor, tickDuration time.Duration) api.ClientModule { +	return &Module{ +		processor:    processor, +		tickDuration: tickDuration,  	}  } diff --git a/internal/api/client/streaming/streaming_test.go b/internal/api/client/streaming/streaming_test.go new file mode 100644 index 000000000..49c983fff --- /dev/null +++ b/internal/api/client/streaming/streaming_test.go @@ -0,0 +1,235 @@ +/* +   GoToSocial +   Copyright (C) 2021-2022 GoToSocial Authors admin@gotosocial.org + +   This program is free software: you can redistribute it and/or modify +   it under the terms of the GNU Affero General Public License as published by +   the Free Software Foundation, either version 3 of the License, or +   (at your option) any later version. + +   This program is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU Affero General Public License for more details. + +   You should have received a copy of the GNU Affero General Public License +   along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +package streaming_test + +import ( +	"bufio" +	"errors" +	"fmt" +	"io/ioutil" +	"net" +	"net/http" +	"net/http/httptest" +	"testing" +	"time" + +	"github.com/gin-gonic/gin" +	"github.com/stretchr/testify/suite" +	"github.com/superseriousbusiness/gotosocial/internal/api/client/streaming" +	"github.com/superseriousbusiness/gotosocial/internal/concurrency" +	"github.com/superseriousbusiness/gotosocial/internal/db" +	"github.com/superseriousbusiness/gotosocial/internal/email" +	"github.com/superseriousbusiness/gotosocial/internal/federation" +	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +	"github.com/superseriousbusiness/gotosocial/internal/media" +	"github.com/superseriousbusiness/gotosocial/internal/messages" +	"github.com/superseriousbusiness/gotosocial/internal/oauth" +	"github.com/superseriousbusiness/gotosocial/internal/processing" +	"github.com/superseriousbusiness/gotosocial/internal/storage" +	"github.com/superseriousbusiness/gotosocial/internal/typeutils" +	"github.com/superseriousbusiness/gotosocial/testrig" +) + +type StreamingTestSuite struct { +	// standard suite interfaces +	suite.Suite +	db           db.DB +	tc           typeutils.TypeConverter +	mediaManager media.Manager +	federator    federation.Federator +	emailSender  email.Sender +	processor    processing.Processor +	storage      *storage.Driver + +	// standard suite models +	testTokens       map[string]*gtsmodel.Token +	testClients      map[string]*gtsmodel.Client +	testApplications map[string]*gtsmodel.Application +	testUsers        map[string]*gtsmodel.User +	testAccounts     map[string]*gtsmodel.Account +	testAttachments  map[string]*gtsmodel.MediaAttachment +	testStatuses     map[string]*gtsmodel.Status +	testFollows      map[string]*gtsmodel.Follow + +	// module being tested +	streamingModule *streaming.Module +} + +func (suite *StreamingTestSuite) SetupSuite() { +	suite.testTokens = testrig.NewTestTokens() +	suite.testClients = testrig.NewTestClients() +	suite.testApplications = testrig.NewTestApplications() +	suite.testUsers = testrig.NewTestUsers() +	suite.testAccounts = testrig.NewTestAccounts() +	suite.testAttachments = testrig.NewTestAttachments() +	suite.testStatuses = testrig.NewTestStatuses() +	suite.testFollows = testrig.NewTestFollows() +} + +func (suite *StreamingTestSuite) SetupTest() { +	testrig.InitTestConfig() +	testrig.InitTestLog() + +	suite.db = testrig.NewTestDB() +	suite.tc = testrig.NewTestTypeConverter(suite.db) +	suite.storage = testrig.NewInMemoryStorage() +	testrig.StandardDBSetup(suite.db, nil) +	testrig.StandardStorageSetup(suite.storage, "../../../../testrig/media") + +	fedWorker := concurrency.NewWorkerPool[messages.FromFederator](-1, -1) +	clientWorker := concurrency.NewWorkerPool[messages.FromClientAPI](-1, -1) + +	suite.mediaManager = testrig.NewTestMediaManager(suite.db, suite.storage) +	suite.federator = testrig.NewTestFederator(suite.db, testrig.NewTestTransportController(testrig.NewMockHTTPClient(nil, "../../../../testrig/media"), suite.db, fedWorker), suite.storage, suite.mediaManager, fedWorker) +	suite.emailSender = testrig.NewEmailSender("../../../../web/template/", nil) +	suite.processor = testrig.NewTestProcessor(suite.db, suite.storage, suite.federator, suite.emailSender, suite.mediaManager, clientWorker, fedWorker) +	suite.streamingModule = streaming.NewWithTickDuration(suite.processor, 1).(*streaming.Module) +	suite.NoError(suite.processor.Start()) +} + +func (suite *StreamingTestSuite) TearDownTest() { +	testrig.StandardDBTeardown(suite.db) +	testrig.StandardStorageTeardown(suite.storage) +} + +// Addr is a fake network interface which implements the net.Addr interface +type Addr struct { +	NetworkString string +	AddrString    string +} + +func (a Addr) Network() string { +	return a.NetworkString +} + +func (a Addr) String() string { +	return a.AddrString +} + +type connTester struct { +	deadline time.Time +	writes   int +} + +func (c *connTester) Read(b []byte) (n int, err error) { +	return 0, nil +} + +func (c *connTester) SetDeadline(t time.Time) error { +	c.deadline = t +	return nil +} + +func (c *connTester) SetReadDeadline(t time.Time) error { +	return nil +} + +func (c *connTester) SetWriteDeadline(t time.Time) error { +	return nil +} + +func (c *connTester) Write(p []byte) (int, error) { +	c.writes++ +	if c.writes > 1 { +		return 0, errors.New("timeout") +	} +	return 0, nil +} + +func (c *connTester) Close() error { +	return nil +} + +func (c *connTester) LocalAddr() net.Addr { +	return Addr{ +		NetworkString: "tcp", +		AddrString:    "127.0.0.1", +	} +} + +func (c *connTester) RemoteAddr() net.Addr { +	return Addr{ +		NetworkString: "tcp", +		AddrString:    "127.0.0.1", +	} +} + +type TestResponseRecorder struct { +	*httptest.ResponseRecorder +	w            gin.ResponseWriter +	closeChannel chan bool +} + +func (r *TestResponseRecorder) CloseNotify() <-chan bool { +	return r.closeChannel +} + +func (r *TestResponseRecorder) closeClient() { +	r.closeChannel <- true +} + +func (r *TestResponseRecorder) Hijack() (net.Conn, *bufio.ReadWriter, error) { +	conn := &connTester{ +		writes: 0, +	} +	brw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) +	return conn, brw, nil +} + +func CreateTestResponseRecorder() *TestResponseRecorder { +	w := new(gin.ResponseWriter) +	return &TestResponseRecorder{ +		httptest.NewRecorder(), +		*w, +		make(chan bool, 1), +	} +} + +func (suite *StreamingTestSuite) TestSecurityHeader() { +	// set up the context for the request +	t := suite.testTokens["local_account_1"] +	oauthToken := oauth.DBTokenToToken(t) +	recorder := CreateTestResponseRecorder() +	ctx, _ := testrig.CreateGinTestContext(recorder, nil) +	ctx.Set(oauth.SessionAuthorizedApplication, suite.testApplications["application_1"]) +	ctx.Set(oauth.SessionAuthorizedToken, oauthToken) +	ctx.Set(oauth.SessionAuthorizedUser, suite.testUsers["local_account_1"]) +	ctx.Set(oauth.SessionAuthorizedAccount, suite.testAccounts["local_account_1"]) +	ctx.Request = httptest.NewRequest(http.MethodGet, fmt.Sprintf("http://localhost:8080/%s?stream=user", streaming.BasePath), nil) // the endpoint we're hitting +	ctx.Request.Header.Set("accept", "application/json") +	ctx.Request.Header.Set(streaming.AccessTokenHeader, oauthToken.Access) +	ctx.Request.Header.Set("Connection", "upgrade") +	ctx.Request.Header.Set("Upgrade", "websocket") +	ctx.Request.Header.Set("Sec-Websocket-Version", "13") +	ctx.Request.Header.Set("Sec-Websocket-Key", "abcd") + +	suite.streamingModule.StreamGETHandler(ctx) + +	// check response +	suite.EqualValues(http.StatusOK, recorder.Code) + +	result := recorder.Result() +	defer result.Body.Close() +	_, err := ioutil.ReadAll(result.Body) +	suite.NoError(err) +} + +func TestStreamingTestSuite(t *testing.T) { +	suite.Run(t, new(StreamingTestSuite)) +} | 
