diff options
| author | 2023-02-22 16:05:26 +0100 | |
|---|---|---|
| committer | 2023-02-22 16:05:26 +0100 | |
| commit | b6fbdc66c1ce1ec61ebfb6fcc0351ea627a1d288 (patch) | |
| tree | c79d1107375597ab8a79045c80dd62dc95a204e7 /internal/processing/streaming | |
| parent | [bugfix] Remove initial storage cleanup (#1545) (diff) | |
| download | gotosocial-b6fbdc66c1ce1ec61ebfb6fcc0351ea627a1d288.tar.xz | |
[chore] Deinterface processor and subprocessors (#1501)
* [chore] Deinterface processor and subprocessors
* expose subprocessors via function calls
* missing license header
Diffstat (limited to 'internal/processing/streaming')
| -rw-r--r-- | internal/processing/streaming/authorize.go | 62 | ||||
| -rw-r--r-- | internal/processing/streaming/authorize_test.go | 48 | ||||
| -rw-r--r-- | internal/processing/streaming/notification.go | 37 | ||||
| -rw-r--r-- | internal/processing/streaming/notification_test.go | 91 | ||||
| -rw-r--r-- | internal/processing/streaming/openstream.go | 121 | ||||
| -rw-r--r-- | internal/processing/streaming/openstream_test.go | 41 | ||||
| -rw-r--r-- | internal/processing/streaming/streamdelete.go | 55 | ||||
| -rw-r--r-- | internal/processing/streaming/streaming.go | 60 | ||||
| -rw-r--r-- | internal/processing/streaming/streaming_test.go | 55 | ||||
| -rw-r--r-- | internal/processing/streaming/streamtoaccount.go | 61 | ||||
| -rw-r--r-- | internal/processing/streaming/update.go | 37 | 
11 files changed, 0 insertions, 668 deletions
| diff --git a/internal/processing/streaming/authorize.go b/internal/processing/streaming/authorize.go deleted file mode 100644 index 1581e7893..000000000 --- a/internal/processing/streaming/authorize.go +++ /dev/null @@ -1,62 +0,0 @@ -/* -   GoToSocial -   Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org - -   This program is free software: you can redistribute it and/or modify -   it under the terms of the GNU Affero General Public License as published by -   the Free Software Foundation, either version 3 of the License, or -   (at your option) any later version. - -   This program is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU Affero General Public License for more details. - -   You should have received a copy of the GNU Affero General Public License -   along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -package streaming - -import ( -	"context" -	"fmt" - -	"github.com/superseriousbusiness/gotosocial/internal/db" -	"github.com/superseriousbusiness/gotosocial/internal/gtserror" -	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" -) - -func (p *processor) AuthorizeStreamingRequest(ctx context.Context, accessToken string) (*gtsmodel.Account, gtserror.WithCode) { -	ti, err := p.oauthServer.LoadAccessToken(ctx, accessToken) -	if err != nil { -		err := fmt.Errorf("could not load access token: %s", err) -		return nil, gtserror.NewErrorUnauthorized(err) -	} - -	uid := ti.GetUserID() -	if uid == "" { -		err := fmt.Errorf("no userid in token") -		return nil, gtserror.NewErrorUnauthorized(err) -	} - -	user, err := p.db.GetUserByID(ctx, uid) -	if err != nil { -		if err == db.ErrNoEntries { -			err := fmt.Errorf("no user found for validated uid %s", uid) -			return nil, gtserror.NewErrorUnauthorized(err) -		} -		return nil, gtserror.NewErrorInternalError(err) -	} - -	acct, err := p.db.GetAccountByID(ctx, user.AccountID) -	if err != nil { -		if err == db.ErrNoEntries { -			err := fmt.Errorf("no account found for validated uid %s", uid) -			return nil, gtserror.NewErrorUnauthorized(err) -		} -		return nil, gtserror.NewErrorInternalError(err) -	} - -	return acct, nil -} diff --git a/internal/processing/streaming/authorize_test.go b/internal/processing/streaming/authorize_test.go deleted file mode 100644 index 17ec20424..000000000 --- a/internal/processing/streaming/authorize_test.go +++ /dev/null @@ -1,48 +0,0 @@ -/* -   GoToSocial -   Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org - -   This program is free software: you can redistribute it and/or modify -   it under the terms of the GNU Affero General Public License as published by -   the Free Software Foundation, either version 3 of the License, or -   (at your option) any later version. - -   This program is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU Affero General Public License for more details. - -   You should have received a copy of the GNU Affero General Public License -   along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -package streaming_test - -import ( -	"context" -	"testing" - -	"github.com/stretchr/testify/suite" -) - -type AuthorizeTestSuite struct { -	StreamingTestSuite -} - -func (suite *AuthorizeTestSuite) TestAuthorize() { -	account1, err := suite.streamingProcessor.AuthorizeStreamingRequest(context.Background(), suite.testTokens["local_account_1"].Access) -	suite.NoError(err) -	suite.Equal(suite.testAccounts["local_account_1"].ID, account1.ID) - -	account2, err := suite.streamingProcessor.AuthorizeStreamingRequest(context.Background(), suite.testTokens["local_account_2"].Access) -	suite.NoError(err) -	suite.Equal(suite.testAccounts["local_account_2"].ID, account2.ID) - -	noAccount, err := suite.streamingProcessor.AuthorizeStreamingRequest(context.Background(), "aaaaaaaaaaaaaaaaaaaaa!!") -	suite.EqualError(err, "could not load access token: no entries") -	suite.Nil(noAccount) -} - -func TestAuthorizeTestSuite(t *testing.T) { -	suite.Run(t, &AuthorizeTestSuite{}) -} diff --git a/internal/processing/streaming/notification.go b/internal/processing/streaming/notification.go deleted file mode 100644 index ce3d94394..000000000 --- a/internal/processing/streaming/notification.go +++ /dev/null @@ -1,37 +0,0 @@ -/* -   GoToSocial -   Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org - -   This program is free software: you can redistribute it and/or modify -   it under the terms of the GNU Affero General Public License as published by -   the Free Software Foundation, either version 3 of the License, or -   (at your option) any later version. - -   This program is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU Affero General Public License for more details. - -   You should have received a copy of the GNU Affero General Public License -   along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -package streaming - -import ( -	"encoding/json" -	"fmt" - -	apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" -	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" -	"github.com/superseriousbusiness/gotosocial/internal/stream" -) - -func (p *processor) StreamNotificationToAccount(n *apimodel.Notification, account *gtsmodel.Account) error { -	bytes, err := json.Marshal(n) -	if err != nil { -		return fmt.Errorf("error marshalling notification to json: %s", err) -	} - -	return p.streamToAccount(string(bytes), stream.EventTypeNotification, []string{stream.TimelineNotifications, stream.TimelineHome}, account.ID) -} diff --git a/internal/processing/streaming/notification_test.go b/internal/processing/streaming/notification_test.go deleted file mode 100644 index f31c169e1..000000000 --- a/internal/processing/streaming/notification_test.go +++ /dev/null @@ -1,91 +0,0 @@ -/* -   GoToSocial -   Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org - -   This program is free software: you can redistribute it and/or modify -   it under the terms of the GNU Affero General Public License as published by -   the Free Software Foundation, either version 3 of the License, or -   (at your option) any later version. - -   This program is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU Affero General Public License for more details. - -   You should have received a copy of the GNU Affero General Public License -   along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -package streaming_test - -import ( -	"bytes" -	"context" -	"encoding/json" -	"testing" - -	"github.com/stretchr/testify/suite" -	apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" -	"github.com/superseriousbusiness/gotosocial/testrig" -) - -type NotificationTestSuite struct { -	StreamingTestSuite -} - -func (suite *NotificationTestSuite) TestStreamNotification() { -	account := suite.testAccounts["local_account_1"] - -	openStream, errWithCode := suite.streamingProcessor.OpenStreamForAccount(context.Background(), account, "user") -	suite.NoError(errWithCode) - -	followAccount := suite.testAccounts["remote_account_1"] -	followAccountAPIModel, err := testrig.NewTestTypeConverter(suite.db).AccountToAPIAccountPublic(context.Background(), followAccount) -	suite.NoError(err) - -	notification := &apimodel.Notification{ -		ID:        "01FH57SJCMDWQGEAJ0X08CE3WV", -		Type:      "follow", -		CreatedAt: "2021-10-04T08:52:36.000Z", -		Account:   followAccountAPIModel, -	} - -	err = suite.streamingProcessor.StreamNotificationToAccount(notification, account) -	suite.NoError(err) - -	msg := <-openStream.Messages -	dst := new(bytes.Buffer) -	err = json.Indent(dst, []byte(msg.Payload), "", "  ") -	suite.NoError(err) -	suite.Equal(`{ -  "id": "01FH57SJCMDWQGEAJ0X08CE3WV", -  "type": "follow", -  "created_at": "2021-10-04T08:52:36.000Z", -  "account": { -    "id": "01F8MH5ZK5VRH73AKHQM6Y9VNX", -    "username": "foss_satan", -    "acct": "foss_satan@fossbros-anonymous.io", -    "display_name": "big gerald", -    "locked": false, -    "discoverable": true, -    "bot": false, -    "created_at": "2021-09-26T10:52:36.000Z", -    "note": "i post about like, i dunno, stuff, or whatever!!!!", -    "url": "http://fossbros-anonymous.io/@foss_satan", -    "avatar": "", -    "avatar_static": "", -    "header": "http://localhost:8080/assets/default_header.png", -    "header_static": "http://localhost:8080/assets/default_header.png", -    "followers_count": 0, -    "following_count": 0, -    "statuses_count": 1, -    "last_status_at": "2021-09-20T10:40:37.000Z", -    "emojis": [], -    "fields": [] -  } -}`, dst.String()) -} - -func TestNotificationTestSuite(t *testing.T) { -	suite.Run(t, &NotificationTestSuite{}) -} diff --git a/internal/processing/streaming/openstream.go b/internal/processing/streaming/openstream.go deleted file mode 100644 index 7913e6745..000000000 --- a/internal/processing/streaming/openstream.go +++ /dev/null @@ -1,121 +0,0 @@ -/* -   GoToSocial -   Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org - -   This program is free software: you can redistribute it and/or modify -   it under the terms of the GNU Affero General Public License as published by -   the Free Software Foundation, either version 3 of the License, or -   (at your option) any later version. - -   This program is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU Affero General Public License for more details. - -   You should have received a copy of the GNU Affero General Public License -   along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -package streaming - -import ( -	"context" -	"errors" -	"fmt" - -	"codeberg.org/gruf/go-kv" -	"github.com/superseriousbusiness/gotosocial/internal/gtserror" -	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" -	"github.com/superseriousbusiness/gotosocial/internal/id" -	"github.com/superseriousbusiness/gotosocial/internal/log" -	"github.com/superseriousbusiness/gotosocial/internal/stream" -) - -func (p *processor) OpenStreamForAccount(ctx context.Context, account *gtsmodel.Account, streamTimeline string) (*stream.Stream, gtserror.WithCode) { -	l := log.WithContext(ctx). -		WithFields(kv.Fields{ -			{"account", account.ID}, -			{"streamType", streamTimeline}, -		}...) -	l.Debug("received open stream request") - -	// each stream needs a unique ID so we know to close it -	streamID, err := id.NewRandomULID() -	if err != nil { -		return nil, gtserror.NewErrorInternalError(fmt.Errorf("error generating stream id: %s", err)) -	} - -	thisStream := &stream.Stream{ -		ID:        streamID, -		Timeline:  streamTimeline, -		Messages:  make(chan *stream.Message, 100), -		Hangup:    make(chan interface{}, 1), -		Connected: true, -	} -	go p.waitToCloseStream(account, thisStream) - -	v, ok := p.streamMap.Load(account.ID) -	if !ok || v == nil { -		// there is no entry in the streamMap for this account yet, so make one and store it -		streamsForAccount := &stream.StreamsForAccount{ -			Streams: []*stream.Stream{ -				thisStream, -			}, -		} -		p.streamMap.Store(account.ID, streamsForAccount) -	} else { -		// there is an entry in the streamMap for this account -		// parse the interface as a streamsForAccount -		streamsForAccount, ok := v.(*stream.StreamsForAccount) -		if !ok { -			return nil, gtserror.NewErrorInternalError(errors.New("stream map error")) -		} - -		// append this stream to it -		streamsForAccount.Lock() -		streamsForAccount.Streams = append(streamsForAccount.Streams, thisStream) -		streamsForAccount.Unlock() -	} - -	return thisStream, nil -} - -// waitToCloseStream waits until the hangup channel is closed for the given stream. -// It then iterates through the map of streams stored by the processor, removes the stream from it, -// and then closes the messages channel of the stream to indicate that the channel should no longer be read from. -func (p *processor) waitToCloseStream(account *gtsmodel.Account, thisStream *stream.Stream) { -	<-thisStream.Hangup // wait for a hangup message - -	// lock the stream to prevent more messages being put in it while we work -	thisStream.Lock() -	defer thisStream.Unlock() - -	// indicate the stream is no longer connected -	thisStream.Connected = false - -	// load and parse the entry for this account from the stream map -	v, ok := p.streamMap.Load(account.ID) -	if !ok || v == nil { -		return -	} -	streamsForAccount, ok := v.(*stream.StreamsForAccount) -	if !ok { -		return -	} - -	// lock the streams for account while we remove this stream from its slice -	streamsForAccount.Lock() -	defer streamsForAccount.Unlock() - -	// put everything into modified streams *except* the stream we're removing -	modifiedStreams := []*stream.Stream{} -	for _, s := range streamsForAccount.Streams { -		if s.ID != thisStream.ID { -			modifiedStreams = append(modifiedStreams, s) -		} -	} -	streamsForAccount.Streams = modifiedStreams - -	// finally close the messages channel so no more messages can be read from it -	close(thisStream.Messages) -} diff --git a/internal/processing/streaming/openstream_test.go b/internal/processing/streaming/openstream_test.go deleted file mode 100644 index 13b3c72b3..000000000 --- a/internal/processing/streaming/openstream_test.go +++ /dev/null @@ -1,41 +0,0 @@ -/* -   GoToSocial -   Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org - -   This program is free software: you can redistribute it and/or modify -   it under the terms of the GNU Affero General Public License as published by -   the Free Software Foundation, either version 3 of the License, or -   (at your option) any later version. - -   This program is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU Affero General Public License for more details. - -   You should have received a copy of the GNU Affero General Public License -   along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -package streaming_test - -import ( -	"context" -	"testing" - -	"github.com/stretchr/testify/suite" -) - -type OpenStreamTestSuite struct { -	StreamingTestSuite -} - -func (suite *OpenStreamTestSuite) TestOpenStream() { -	account := suite.testAccounts["local_account_1"] - -	_, errWithCode := suite.streamingProcessor.OpenStreamForAccount(context.Background(), account, "user") -	suite.NoError(errWithCode) -} - -func TestOpenStreamTestSuite(t *testing.T) { -	suite.Run(t, &OpenStreamTestSuite{}) -} diff --git a/internal/processing/streaming/streamdelete.go b/internal/processing/streaming/streamdelete.go deleted file mode 100644 index 314e90198..000000000 --- a/internal/processing/streaming/streamdelete.go +++ /dev/null @@ -1,55 +0,0 @@ -/* -   GoToSocial -   Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org - -   This program is free software: you can redistribute it and/or modify -   it under the terms of the GNU Affero General Public License as published by -   the Free Software Foundation, either version 3 of the License, or -   (at your option) any later version. - -   This program is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU Affero General Public License for more details. - -   You should have received a copy of the GNU Affero General Public License -   along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -package streaming - -import ( -	"fmt" -	"strings" - -	"github.com/superseriousbusiness/gotosocial/internal/stream" -) - -func (p *processor) StreamDelete(statusID string) error { -	errs := []string{} - -	// get all account IDs with open streams -	accountIDs := []string{} -	p.streamMap.Range(func(k interface{}, _ interface{}) bool { -		key, ok := k.(string) -		if !ok { -			panic("streamMap key was not a string (account id)") -		} - -		accountIDs = append(accountIDs, key) -		return true -	}) - -	// stream the delete to every account -	for _, accountID := range accountIDs { -		if err := p.streamToAccount(statusID, stream.EventTypeDelete, stream.AllStatusTimelines, accountID); err != nil { -			errs = append(errs, err.Error()) -		} -	} - -	if len(errs) != 0 { -		return fmt.Errorf("one or more errors streaming status delete: %s", strings.Join(errs, ";")) -	} - -	return nil -} diff --git a/internal/processing/streaming/streaming.go b/internal/processing/streaming/streaming.go deleted file mode 100644 index 4b2a80cc8..000000000 --- a/internal/processing/streaming/streaming.go +++ /dev/null @@ -1,60 +0,0 @@ -/* -   GoToSocial -   Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org - -   This program is free software: you can redistribute it and/or modify -   it under the terms of the GNU Affero General Public License as published by -   the Free Software Foundation, either version 3 of the License, or -   (at your option) any later version. - -   This program is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU Affero General Public License for more details. - -   You should have received a copy of the GNU Affero General Public License -   along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -package streaming - -import ( -	"context" -	"sync" - -	apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" -	"github.com/superseriousbusiness/gotosocial/internal/db" -	"github.com/superseriousbusiness/gotosocial/internal/gtserror" -	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" -	"github.com/superseriousbusiness/gotosocial/internal/oauth" -	"github.com/superseriousbusiness/gotosocial/internal/stream" -) - -// Processor wraps a bunch of functions for processing streaming. -type Processor interface { -	// AuthorizeStreamingRequest returns an oauth2 token info in response to an access token query from the streaming API -	AuthorizeStreamingRequest(ctx context.Context, accessToken string) (*gtsmodel.Account, gtserror.WithCode) -	// OpenStreamForAccount returns a new Stream for the given account, which will contain a channel for passing messages back to the caller. -	OpenStreamForAccount(ctx context.Context, account *gtsmodel.Account, timeline string) (*stream.Stream, gtserror.WithCode) -	// StreamUpdateToAccount streams the given update to any open, appropriate streams belonging to the given account. -	StreamUpdateToAccount(s *apimodel.Status, account *gtsmodel.Account, timeline string) error -	// StreamNotificationToAccount streams the given notification to any open, appropriate streams belonging to the given account. -	StreamNotificationToAccount(n *apimodel.Notification, account *gtsmodel.Account) error -	// StreamDelete streams the delete of the given statusID to *ALL* open streams. -	StreamDelete(statusID string) error -} - -type processor struct { -	db          db.DB -	oauthServer oauth.Server -	streamMap   *sync.Map -} - -// New returns a new status processor. -func New(db db.DB, oauthServer oauth.Server) Processor { -	return &processor{ -		db:          db, -		oauthServer: oauthServer, -		streamMap:   &sync.Map{}, -	} -} diff --git a/internal/processing/streaming/streaming_test.go b/internal/processing/streaming/streaming_test.go deleted file mode 100644 index 21323a051..000000000 --- a/internal/processing/streaming/streaming_test.go +++ /dev/null @@ -1,55 +0,0 @@ -/* -   GoToSocial -   Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org - -   This program is free software: you can redistribute it and/or modify -   it under the terms of the GNU Affero General Public License as published by -   the Free Software Foundation, either version 3 of the License, or -   (at your option) any later version. - -   This program is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU Affero General Public License for more details. - -   You should have received a copy of the GNU Affero General Public License -   along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -package streaming_test - -import ( -	"github.com/stretchr/testify/suite" -	"github.com/superseriousbusiness/gotosocial/internal/db" -	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" -	"github.com/superseriousbusiness/gotosocial/internal/oauth" -	"github.com/superseriousbusiness/gotosocial/internal/processing/streaming" -	"github.com/superseriousbusiness/gotosocial/testrig" -) - -type StreamingTestSuite struct { -	suite.Suite -	testAccounts map[string]*gtsmodel.Account -	testTokens   map[string]*gtsmodel.Token -	db           db.DB -	oauthServer  oauth.Server - -	streamingProcessor streaming.Processor -} - -func (suite *StreamingTestSuite) SetupTest() { -	testrig.InitTestLog() -	testrig.InitTestConfig() - -	suite.testAccounts = testrig.NewTestAccounts() -	suite.testTokens = testrig.NewTestTokens() -	suite.db = testrig.NewTestDB() -	suite.oauthServer = testrig.NewTestOauthServer(suite.db) -	suite.streamingProcessor = streaming.New(suite.db, suite.oauthServer) - -	testrig.StandardDBSetup(suite.db, suite.testAccounts) -} - -func (suite *StreamingTestSuite) TearDownTest() { -	testrig.StandardDBTeardown(suite.db) -} diff --git a/internal/processing/streaming/streamtoaccount.go b/internal/processing/streaming/streamtoaccount.go deleted file mode 100644 index f0159b7eb..000000000 --- a/internal/processing/streaming/streamtoaccount.go +++ /dev/null @@ -1,61 +0,0 @@ -/* -   GoToSocial -   Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org - -   This program is free software: you can redistribute it and/or modify -   it under the terms of the GNU Affero General Public License as published by -   the Free Software Foundation, either version 3 of the License, or -   (at your option) any later version. - -   This program is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU Affero General Public License for more details. - -   You should have received a copy of the GNU Affero General Public License -   along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -package streaming - -import ( -	"errors" - -	"github.com/superseriousbusiness/gotosocial/internal/stream" -) - -// streamToAccount streams the given payload with the given event type to any streams currently open for the given account ID. -func (p *processor) streamToAccount(payload string, event string, timelines []string, accountID string) error { -	v, ok := p.streamMap.Load(accountID) -	if !ok { -		// no open connections so nothing to stream -		return nil -	} - -	streamsForAccount, ok := v.(*stream.StreamsForAccount) -	if !ok { -		return errors.New("stream map error") -	} - -	streamsForAccount.Lock() -	defer streamsForAccount.Unlock() -	for _, s := range streamsForAccount.Streams { -		s.Lock() -		defer s.Unlock() -		if !s.Connected { -			continue -		} - -		for _, t := range timelines { -			if s.Timeline == string(t) { -				s.Messages <- &stream.Message{ -					Stream:  []string{string(t)}, -					Event:   string(event), -					Payload: payload, -				} -			} -		} -	} - -	return nil -} diff --git a/internal/processing/streaming/update.go b/internal/processing/streaming/update.go deleted file mode 100644 index e29ad6169..000000000 --- a/internal/processing/streaming/update.go +++ /dev/null @@ -1,37 +0,0 @@ -/* -   GoToSocial -   Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org - -   This program is free software: you can redistribute it and/or modify -   it under the terms of the GNU Affero General Public License as published by -   the Free Software Foundation, either version 3 of the License, or -   (at your option) any later version. - -   This program is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU Affero General Public License for more details. - -   You should have received a copy of the GNU Affero General Public License -   along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -package streaming - -import ( -	"encoding/json" -	"fmt" - -	apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" -	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" -	"github.com/superseriousbusiness/gotosocial/internal/stream" -) - -func (p *processor) StreamUpdateToAccount(s *apimodel.Status, account *gtsmodel.Account, timeline string) error { -	bytes, err := json.Marshal(s) -	if err != nil { -		return fmt.Errorf("error marshalling status to json: %s", err) -	} - -	return p.streamToAccount(string(bytes), stream.EventTypeUpdate, []string{timeline}, account.ID) -} | 
