diff options
Diffstat (limited to 'internal/processing')
| -rw-r--r-- | internal/processing/fromclientapi_test.go | 118 | ||||
| -rw-r--r-- | internal/processing/fromcommon.go | 27 | ||||
| -rw-r--r-- | internal/processing/fromfederator_test.go | 43 | ||||
| -rw-r--r-- | internal/processing/streaming/notification.go | 2 | ||||
| -rw-r--r-- | internal/processing/streaming/openstream.go | 6 | ||||
| -rw-r--r-- | internal/processing/streaming/streamdelete.go | 2 | ||||
| -rw-r--r-- | internal/processing/streaming/streaming.go | 4 | ||||
| -rw-r--r-- | internal/processing/streaming/streamtoaccount.go | 18 | ||||
| -rw-r--r-- | internal/processing/streaming/update.go | 4 | 
9 files changed, 182 insertions, 42 deletions
diff --git a/internal/processing/fromclientapi_test.go b/internal/processing/fromclientapi_test.go new file mode 100644 index 000000000..e1d4b1987 --- /dev/null +++ b/internal/processing/fromclientapi_test.go @@ -0,0 +1,118 @@ +/* +   GoToSocial +   Copyright (C) 2021 GoToSocial Authors admin@gotosocial.org + +   This program is free software: you can redistribute it and/or modify +   it under the terms of the GNU Affero General Public License as published by +   the Free Software Foundation, either version 3 of the License, or +   (at your option) any later version. + +   This program is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU Affero General Public License for more details. + +   You should have received a copy of the GNU Affero General Public License +   along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +package processing_test + +import ( +	"context" +	"encoding/json" +	"testing" + +	"github.com/stretchr/testify/suite" +	"github.com/superseriousbusiness/gotosocial/internal/ap" +	"github.com/superseriousbusiness/gotosocial/internal/api/model" +	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +	"github.com/superseriousbusiness/gotosocial/internal/messages" +	"github.com/superseriousbusiness/gotosocial/internal/stream" +	"github.com/superseriousbusiness/gotosocial/testrig" +) + +type FromClientAPITestSuite struct { +	ProcessingStandardTestSuite +} + +func (suite *FromClientAPITestSuite) TestProcessStreamNewStatus() { +	ctx := context.Background() + +	// let's say that the admin account posts a new status: it should end up in the +	// timeline of any account that follows it and has a stream open +	postingAccount := suite.testAccounts["admin_account"] +	receivingAccount := suite.testAccounts["local_account_1"] + +	// open a home timeline stream for zork +	wssStream, errWithCode := suite.processor.OpenStreamForAccount(ctx, receivingAccount, stream.TimelineHome) +	suite.NoError(errWithCode) + +	// open another stream for zork, but for a different timeline; +	// this shouldn't get stuff streamed into it, since it's for the public timeline +	irrelevantStream, errWithCode := suite.processor.OpenStreamForAccount(ctx, receivingAccount, stream.TimelinePublic) +	suite.NoError(errWithCode) + +	// make a new status from admin account +	newStatus := >smodel.Status{ +		ID:                       "01FN4B2F88TF9676DYNXWE1WSS", +		URI:                      "http://localhost:8080/users/admin/statuses/01FN4B2F88TF9676DYNXWE1WSS", +		URL:                      "http://localhost:8080/@admin/statuses/01FN4B2F88TF9676DYNXWE1WSS", +		Content:                  "this status should stream :)", +		AttachmentIDs:            []string{}, +		TagIDs:                   []string{}, +		MentionIDs:               []string{}, +		EmojiIDs:                 []string{}, +		CreatedAt:                testrig.TimeMustParse("2021-10-20T11:36:45Z"), +		UpdatedAt:                testrig.TimeMustParse("2021-10-20T11:36:45Z"), +		Local:                    true, +		AccountURI:               "http://localhost:8080/users/admin", +		AccountID:                "01F8MH17FWEB39HZJ76B6VXSKF", +		InReplyToID:              "", +		BoostOfID:                "", +		ContentWarning:           "", +		Visibility:               gtsmodel.VisibilityFollowersOnly, +		Sensitive:                false, +		Language:                 "en", +		CreatedWithApplicationID: "01F8MGXQRHYF5QPMTMXP78QC2F", +		Federated:                false, // set federated as false for this one, since we're not testing federation stuff now +		Boostable:                true, +		Replyable:                true, +		Likeable:                 true, +		ActivityStreamsType:      ap.ObjectNote, +	} + +	// put the status in the db first, to mimic what would have already happened earlier up the flow +	err := suite.db.PutStatus(ctx, newStatus) +	suite.NoError(err) + +	// process the new status +	err = suite.processor.ProcessFromClientAPI(ctx, messages.FromClientAPI{ +		APObjectType:   ap.ObjectNote, +		APActivityType: ap.ActivityCreate, +		GTSModel:       newStatus, +		OriginAccount:  postingAccount, +	}) +	suite.NoError(err) + +	// zork's stream should have the newly created status in it now +	msg := <-wssStream.Messages +	suite.Equal(stream.EventTypeUpdate, msg.Event) +	suite.NotEmpty(msg.Payload) +	suite.EqualValues([]string{stream.TimelineHome}, msg.Stream) +	statusStreamed := &model.Status{} +	err = json.Unmarshal([]byte(msg.Payload), statusStreamed) +	suite.NoError(err) +	suite.Equal("01FN4B2F88TF9676DYNXWE1WSS", statusStreamed.ID) +	suite.Equal("this status should stream :)", statusStreamed.Content) + +	// and stream should now be empty +	suite.Empty(wssStream.Messages) + +	// the irrelevant messages stream should also be empty +	suite.Empty(irrelevantStream.Messages) +} + +func TestFromClientAPITestSuite(t *testing.T) { +	suite.Run(t, &FromClientAPITestSuite{}) +} diff --git a/internal/processing/fromcommon.go b/internal/processing/fromcommon.go index 88f9994d4..ec45a3a57 100644 --- a/internal/processing/fromcommon.go +++ b/internal/processing/fromcommon.go @@ -27,6 +27,7 @@ import (  	"github.com/superseriousbusiness/gotosocial/internal/db"  	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"  	"github.com/superseriousbusiness/gotosocial/internal/id" +	"github.com/superseriousbusiness/gotosocial/internal/stream"  )  func (p *processor) notifyStatus(ctx context.Context, status *gtsmodel.Status) error { @@ -328,6 +329,8 @@ func (p *processor) notifyAnnounce(ctx context.Context, status *gtsmodel.Status)  	return nil  } +// timelineStatus processes the given new status and inserts it into +// the HOME timelines of accounts that follow the status author.  func (p *processor) timelineStatus(ctx context.Context, status *gtsmodel.Status) error {  	// make sure the author account is pinned onto the status  	if status.Account == nil { @@ -376,14 +379,18 @@ func (p *processor) timelineStatus(ctx context.Context, status *gtsmodel.Status)  	close(errors)  	if len(errs) != 0 { -		// we have some errors +		// we have at least one error  		return fmt.Errorf("timelineStatus: one or more errors timelining statuses: %s", strings.Join(errs, ";"))  	} -	// no errors, nice  	return nil  } +// timelineStatusForAccount puts the given status in the HOME timeline +// of the account with given accountID, if it's hometimelineable. +// +// If the status was inserted into the home timeline of the given account, +// it will also be streamed via websockets to the user.  func (p *processor) timelineStatusForAccount(ctx context.Context, status *gtsmodel.Status, accountID string, errors chan error, wg *sync.WaitGroup) {  	defer wg.Done() @@ -412,28 +419,22 @@ func (p *processor) timelineStatusForAccount(ctx context.Context, status *gtsmod  		return  	} -	// the status was inserted to stream it to the user +	// the status was inserted so stream it to the user  	if inserted {  		apiStatus, err := p.tc.StatusToAPIStatus(ctx, status, timelineAccount)  		if err != nil {  			errors <- fmt.Errorf("timelineStatusForAccount: error converting status %s to frontend representation: %s", status.ID, err) -		} else { -			if err := p.streamingProcessor.StreamUpdateToAccount(apiStatus, timelineAccount); err != nil { -				errors <- fmt.Errorf("timelineStatusForAccount: error streaming status %s: %s", status.ID, err) -			} +			return  		} -	} -	apiStatus, err := p.tc.StatusToAPIStatus(ctx, status, timelineAccount) -	if err != nil { -		errors <- fmt.Errorf("timelineStatusForAccount: error converting status %s to frontend representation: %s", status.ID, err) -	} else { -		if err := p.streamingProcessor.StreamUpdateToAccount(apiStatus, timelineAccount); err != nil { +		if err := p.streamingProcessor.StreamUpdateToAccount(apiStatus, timelineAccount, stream.TimelineHome); err != nil {  			errors <- fmt.Errorf("timelineStatusForAccount: error streaming status %s: %s", status.ID, err)  		}  	}  } +// deleteStatusFromTimelines completely removes the given status from all timelines. +// It will also stream deletion of the status to all open streams.  func (p *processor) deleteStatusFromTimelines(ctx context.Context, status *gtsmodel.Status) error {  	if err := p.timelineManager.WipeStatusFromAllTimelines(ctx, status.ID); err != nil {  		return err diff --git a/internal/processing/fromfederator_test.go b/internal/processing/fromfederator_test.go index 6846357d1..0351e5a43 100644 --- a/internal/processing/fromfederator_test.go +++ b/internal/processing/fromfederator_test.go @@ -32,6 +32,7 @@ import (  	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"  	"github.com/superseriousbusiness/gotosocial/internal/id"  	"github.com/superseriousbusiness/gotosocial/internal/messages" +	"github.com/superseriousbusiness/gotosocial/internal/stream"  	"github.com/superseriousbusiness/gotosocial/testrig"  ) @@ -115,6 +116,9 @@ func (suite *FromFederatorTestSuite) TestProcessReplyMention() {  		Likeable:            true,  	} +	wssStream, errWithCode := suite.processor.OpenStreamForAccount(context.Background(), repliedAccount, stream.TimelineHome) +	suite.NoError(errWithCode) +  	// id the status based on the time it was created  	statusID, err := id.NewULIDFromTime(replyingStatus.CreatedAt)  	suite.NoError(err) @@ -153,6 +157,17 @@ func (suite *FromFederatorTestSuite) TestProcessReplyMention() {  	suite.Equal(replyingStatus.AccountID, notif.OriginAccountID)  	suite.Equal(replyingStatus.ID, notif.StatusID)  	suite.False(notif.Read) + +	// the notification should also be streamed +	msg := <-wssStream.Messages +	suite.Equal(stream.EventTypeNotification, msg.Event) +	suite.NotEmpty(msg.Payload) +	suite.EqualValues([]string{stream.TimelineHome}, msg.Stream) +	notifStreamed := &model.Notification{} +	err = json.Unmarshal([]byte(msg.Payload), notifStreamed) +	suite.NoError(err) +	suite.Equal("mention", notifStreamed.Type) +	suite.Equal(replyingAccount.ID, notifStreamed.Account.ID)  }  func (suite *FromFederatorTestSuite) TestProcessFave() { @@ -160,7 +175,7 @@ func (suite *FromFederatorTestSuite) TestProcessFave() {  	favedStatus := suite.testStatuses["local_account_1_status_1"]  	favingAccount := suite.testAccounts["remote_account_1"] -	stream, errWithCode := suite.processor.OpenStreamForAccount(context.Background(), favedAccount, "user") +	wssStream, errWithCode := suite.processor.OpenStreamForAccount(context.Background(), favedAccount, stream.TimelineNotifications)  	suite.NoError(errWithCode)  	fave := >smodel.StatusFave{ @@ -210,10 +225,10 @@ func (suite *FromFederatorTestSuite) TestProcessFave() {  	suite.False(notif.Read)  	// 2. a notification should be streamed -	msg := <-stream.Messages -	suite.Equal("notification", msg.Event) +	msg := <-wssStream.Messages +	suite.Equal(stream.EventTypeNotification, msg.Event)  	suite.NotEmpty(msg.Payload) -	suite.EqualValues([]string{"user"}, msg.Stream) +	suite.EqualValues([]string{stream.TimelineNotifications}, msg.Stream)  }  // TestProcessFaveWithDifferentReceivingAccount ensures that when an account receives a fave that's for @@ -227,7 +242,7 @@ func (suite *FromFederatorTestSuite) TestProcessFaveWithDifferentReceivingAccoun  	favedStatus := suite.testStatuses["local_account_1_status_1"]  	favingAccount := suite.testAccounts["remote_account_1"] -	stream, errWithCode := suite.processor.OpenStreamForAccount(context.Background(), receivingAccount, "user") +	wssStream, errWithCode := suite.processor.OpenStreamForAccount(context.Background(), receivingAccount, stream.TimelineHome)  	suite.NoError(errWithCode)  	fave := >smodel.StatusFave{ @@ -277,7 +292,7 @@ func (suite *FromFederatorTestSuite) TestProcessFaveWithDifferentReceivingAccoun  	suite.False(notif.Read)  	// 2. no notification should be streamed to the account that received the fave message, because they weren't the target -	suite.Empty(stream.Messages) +	suite.Empty(wssStream.Messages)  }  func (suite *FromFederatorTestSuite) TestProcessAccountDelete() { @@ -368,7 +383,7 @@ func (suite *FromFederatorTestSuite) TestProcessFollowRequestLocked() {  	// target is a locked account  	targetAccount := suite.testAccounts["local_account_2"] -	stream, errWithCode := suite.processor.OpenStreamForAccount(context.Background(), targetAccount, "user") +	wssStream, errWithCode := suite.processor.OpenStreamForAccount(context.Background(), targetAccount, stream.TimelineHome)  	suite.NoError(errWithCode)  	// put the follow request in the database as though it had passed through the federating db already @@ -397,10 +412,10 @@ func (suite *FromFederatorTestSuite) TestProcessFollowRequestLocked() {  	suite.NoError(err)  	// a notification should be streamed -	msg := <-stream.Messages -	suite.Equal("notification", msg.Event) +	msg := <-wssStream.Messages +	suite.Equal(stream.EventTypeNotification, msg.Event)  	suite.NotEmpty(msg.Payload) -	suite.EqualValues([]string{"user"}, msg.Stream) +	suite.EqualValues([]string{stream.TimelineHome}, msg.Stream)  	notif := &model.Notification{}  	err = json.Unmarshal([]byte(msg.Payload), notif)  	suite.NoError(err) @@ -419,7 +434,7 @@ func (suite *FromFederatorTestSuite) TestProcessFollowRequestUnlocked() {  	// target is an unlocked account  	targetAccount := suite.testAccounts["local_account_1"] -	stream, errWithCode := suite.processor.OpenStreamForAccount(context.Background(), targetAccount, "user") +	wssStream, errWithCode := suite.processor.OpenStreamForAccount(context.Background(), targetAccount, stream.TimelineHome)  	suite.NoError(errWithCode)  	// put the follow request in the database as though it had passed through the federating db already @@ -448,10 +463,10 @@ func (suite *FromFederatorTestSuite) TestProcessFollowRequestUnlocked() {  	suite.NoError(err)  	// a notification should be streamed -	msg := <-stream.Messages -	suite.Equal("notification", msg.Event) +	msg := <-wssStream.Messages +	suite.Equal(stream.EventTypeNotification, msg.Event)  	suite.NotEmpty(msg.Payload) -	suite.EqualValues([]string{"user"}, msg.Stream) +	suite.EqualValues([]string{stream.TimelineHome}, msg.Stream)  	notif := &model.Notification{}  	err = json.Unmarshal([]byte(msg.Payload), notif)  	suite.NoError(err) diff --git a/internal/processing/streaming/notification.go b/internal/processing/streaming/notification.go index 870490be4..7f8cfb8ac 100644 --- a/internal/processing/streaming/notification.go +++ b/internal/processing/streaming/notification.go @@ -33,5 +33,5 @@ func (p *processor) StreamNotificationToAccount(n *apimodel.Notification, accoun  		return fmt.Errorf("error marshalling notification to json: %s", err)  	} -	return p.streamToAccount(string(bytes), stream.EventTypeNotification, account.ID) +	return p.streamToAccount(string(bytes), stream.EventTypeNotification, []string{stream.TimelineNotifications, stream.TimelineHome}, account.ID)  } diff --git a/internal/processing/streaming/openstream.go b/internal/processing/streaming/openstream.go index 706cc0675..c256842a4 100644 --- a/internal/processing/streaming/openstream.go +++ b/internal/processing/streaming/openstream.go @@ -30,11 +30,11 @@ import (  	"github.com/superseriousbusiness/gotosocial/internal/stream"  ) -func (p *processor) OpenStreamForAccount(ctx context.Context, account *gtsmodel.Account, streamType string) (*stream.Stream, gtserror.WithCode) { +func (p *processor) OpenStreamForAccount(ctx context.Context, account *gtsmodel.Account, streamTimeline string) (*stream.Stream, gtserror.WithCode) {  	l := logrus.WithFields(logrus.Fields{  		"func":       "OpenStreamForAccount",  		"account":    account.ID, -		"streamType": streamType, +		"streamType": streamTimeline,  	})  	l.Debug("received open stream request") @@ -46,7 +46,7 @@ func (p *processor) OpenStreamForAccount(ctx context.Context, account *gtsmodel.  	thisStream := &stream.Stream{  		ID:        streamID, -		Type:      streamType, +		Timeline:  streamTimeline,  		Messages:  make(chan *stream.Message, 100),  		Hangup:    make(chan interface{}, 1),  		Connected: true, diff --git a/internal/processing/streaming/streamdelete.go b/internal/processing/streaming/streamdelete.go index 8332c37dc..6eb271bff 100644 --- a/internal/processing/streaming/streamdelete.go +++ b/internal/processing/streaming/streamdelete.go @@ -37,7 +37,7 @@ func (p *processor) StreamDelete(statusID string) error {  	// stream the delete to every account  	for _, accountID := range accountIDs { -		if err := p.streamToAccount(statusID, stream.EventTypeDelete, accountID); err != nil { +		if err := p.streamToAccount(statusID, stream.EventTypeDelete, stream.AllStatusTimelines, accountID); err != nil {  			errs = append(errs, err.Error())  		}  	} diff --git a/internal/processing/streaming/streaming.go b/internal/processing/streaming/streaming.go index 604ec1267..296c07f09 100644 --- a/internal/processing/streaming/streaming.go +++ b/internal/processing/streaming/streaming.go @@ -35,9 +35,9 @@ type Processor interface {  	// AuthorizeStreamingRequest returns an oauth2 token info in response to an access token query from the streaming API  	AuthorizeStreamingRequest(ctx context.Context, accessToken string) (*gtsmodel.Account, error)  	// OpenStreamForAccount returns a new Stream for the given account, which will contain a channel for passing messages back to the caller. -	OpenStreamForAccount(ctx context.Context, account *gtsmodel.Account, streamType string) (*stream.Stream, gtserror.WithCode) +	OpenStreamForAccount(ctx context.Context, account *gtsmodel.Account, timeline string) (*stream.Stream, gtserror.WithCode)  	// StreamUpdateToAccount streams the given update to any open, appropriate streams belonging to the given account. -	StreamUpdateToAccount(s *apimodel.Status, account *gtsmodel.Account) error +	StreamUpdateToAccount(s *apimodel.Status, account *gtsmodel.Account, timeline string) error  	// StreamNotificationToAccount streams the given notification to any open, appropriate streams belonging to the given account.  	StreamNotificationToAccount(n *apimodel.Notification, account *gtsmodel.Account) error  	// StreamDelete streams the delete of the given statusID to *ALL* open streams. diff --git a/internal/processing/streaming/streamtoaccount.go b/internal/processing/streaming/streamtoaccount.go index 140910ab7..b950eecca 100644 --- a/internal/processing/streaming/streamtoaccount.go +++ b/internal/processing/streaming/streamtoaccount.go @@ -25,7 +25,7 @@ import (  )  // streamToAccount streams the given payload with the given event type to any streams currently open for the given account ID. -func (p *processor) streamToAccount(payload string, event stream.EventType, accountID string) error { +func (p *processor) streamToAccount(payload string, event string, timelines []string, accountID string) error {  	v, ok := p.streamMap.Load(accountID)  	if !ok {  		// no open connections so nothing to stream @@ -42,11 +42,17 @@ func (p *processor) streamToAccount(payload string, event stream.EventType, acco  	for _, s := range streamsForAccount.Streams {  		s.Lock()  		defer s.Unlock() -		if s.Connected { -			s.Messages <- &stream.Message{ -				Stream:  []string{s.Type}, -				Event:   string(event), -				Payload: payload, +		if !s.Connected { +			continue +		} + +		for _, t := range timelines { +			if s.Timeline == string(t) { +				s.Messages <- &stream.Message{ +					Stream:  []string{string(t)}, +					Event:   string(event), +					Payload: payload, +				}  			}  		}  	} diff --git a/internal/processing/streaming/update.go b/internal/processing/streaming/update.go index da7dcb6ce..bd7bb0b12 100644 --- a/internal/processing/streaming/update.go +++ b/internal/processing/streaming/update.go @@ -27,11 +27,11 @@ import (  	"github.com/superseriousbusiness/gotosocial/internal/stream"  ) -func (p *processor) StreamUpdateToAccount(s *apimodel.Status, account *gtsmodel.Account) error { +func (p *processor) StreamUpdateToAccount(s *apimodel.Status, account *gtsmodel.Account, timeline string) error {  	bytes, err := json.Marshal(s)  	if err != nil {  		return fmt.Errorf("error marshalling status to json: %s", err)  	} -	return p.streamToAccount(string(bytes), stream.EventTypeUpdate, account.ID) +	return p.streamToAccount(string(bytes), stream.EventTypeUpdate, []string{timeline}, account.ID)  }  | 
