diff options
author | 2023-02-22 16:05:26 +0100 | |
---|---|---|
committer | 2023-02-22 16:05:26 +0100 | |
commit | b6fbdc66c1ce1ec61ebfb6fcc0351ea627a1d288 (patch) | |
tree | c79d1107375597ab8a79045c80dd62dc95a204e7 /internal/processing/stream | |
parent | [bugfix] Remove initial storage cleanup (#1545) (diff) | |
download | gotosocial-b6fbdc66c1ce1ec61ebfb6fcc0351ea627a1d288.tar.xz |
[chore] Deinterface processor and subprocessors (#1501)
* [chore] Deinterface processor and subprocessors
* expose subprocessors via function calls
* missing license header
Diffstat (limited to 'internal/processing/stream')
-rw-r--r-- | internal/processing/stream/authorize.go | 63 | ||||
-rw-r--r-- | internal/processing/stream/authorize_test.go | 48 | ||||
-rw-r--r-- | internal/processing/stream/delete.go | 56 | ||||
-rw-r--r-- | internal/processing/stream/notification.go | 38 | ||||
-rw-r--r-- | internal/processing/stream/notification_test.go | 91 | ||||
-rw-r--r-- | internal/processing/stream/open.go | 121 | ||||
-rw-r--r-- | internal/processing/stream/open_test.go | 41 | ||||
-rw-r--r-- | internal/processing/stream/stream.go | 78 | ||||
-rw-r--r-- | internal/processing/stream/stream_test.go | 55 | ||||
-rw-r--r-- | internal/processing/stream/update.go | 38 |
10 files changed, 629 insertions, 0 deletions
diff --git a/internal/processing/stream/authorize.go b/internal/processing/stream/authorize.go new file mode 100644 index 000000000..5f6811db9 --- /dev/null +++ b/internal/processing/stream/authorize.go @@ -0,0 +1,63 @@ +/* + GoToSocial + Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +package stream + +import ( + "context" + "fmt" + + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +) + +// Authorize returns an oauth2 token info in response to an access token query from the streaming API +func (p *Processor) Authorize(ctx context.Context, accessToken string) (*gtsmodel.Account, gtserror.WithCode) { + ti, err := p.oauthServer.LoadAccessToken(ctx, accessToken) + if err != nil { + err := fmt.Errorf("could not load access token: %s", err) + return nil, gtserror.NewErrorUnauthorized(err) + } + + uid := ti.GetUserID() + if uid == "" { + err := fmt.Errorf("no userid in token") + return nil, gtserror.NewErrorUnauthorized(err) + } + + user, err := p.db.GetUserByID(ctx, uid) + if err != nil { + if err == db.ErrNoEntries { + err := fmt.Errorf("no user found for validated uid %s", uid) + return nil, gtserror.NewErrorUnauthorized(err) + } + return nil, gtserror.NewErrorInternalError(err) + } + + acct, err := p.db.GetAccountByID(ctx, user.AccountID) + if err != nil { + if err == db.ErrNoEntries { + err := fmt.Errorf("no account found for validated uid %s", uid) + return nil, gtserror.NewErrorUnauthorized(err) + } + return nil, gtserror.NewErrorInternalError(err) + } + + return acct, nil +} diff --git a/internal/processing/stream/authorize_test.go b/internal/processing/stream/authorize_test.go new file mode 100644 index 000000000..664c63787 --- /dev/null +++ b/internal/processing/stream/authorize_test.go @@ -0,0 +1,48 @@ +/* + GoToSocial + Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +package stream_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/suite" +) + +type AuthorizeTestSuite struct { + StreamTestSuite +} + +func (suite *AuthorizeTestSuite) TestAuthorize() { + account1, err := suite.streamProcessor.Authorize(context.Background(), suite.testTokens["local_account_1"].Access) + suite.NoError(err) + suite.Equal(suite.testAccounts["local_account_1"].ID, account1.ID) + + account2, err := suite.streamProcessor.Authorize(context.Background(), suite.testTokens["local_account_2"].Access) + suite.NoError(err) + suite.Equal(suite.testAccounts["local_account_2"].ID, account2.ID) + + noAccount, err := suite.streamProcessor.Authorize(context.Background(), "aaaaaaaaaaaaaaaaaaaaa!!") + suite.EqualError(err, "could not load access token: no entries") + suite.Nil(noAccount) +} + +func TestAuthorizeTestSuite(t *testing.T) { + suite.Run(t, &AuthorizeTestSuite{}) +} diff --git a/internal/processing/stream/delete.go b/internal/processing/stream/delete.go new file mode 100644 index 000000000..0a25b4a50 --- /dev/null +++ b/internal/processing/stream/delete.go @@ -0,0 +1,56 @@ +/* + GoToSocial + Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +package stream + +import ( + "fmt" + "strings" + + "github.com/superseriousbusiness/gotosocial/internal/stream" +) + +// Delete streams the delete of the given statusID to *ALL* open streams. +func (p *Processor) Delete(statusID string) error { + errs := []string{} + + // get all account IDs with open streams + accountIDs := []string{} + p.streamMap.Range(func(k interface{}, _ interface{}) bool { + key, ok := k.(string) + if !ok { + panic("streamMap key was not a string (account id)") + } + + accountIDs = append(accountIDs, key) + return true + }) + + // stream the delete to every account + for _, accountID := range accountIDs { + if err := p.toAccount(statusID, stream.EventTypeDelete, stream.AllStatusTimelines, accountID); err != nil { + errs = append(errs, err.Error()) + } + } + + if len(errs) != 0 { + return fmt.Errorf("one or more errors streaming status delete: %s", strings.Join(errs, ";")) + } + + return nil +} diff --git a/internal/processing/stream/notification.go b/internal/processing/stream/notification.go new file mode 100644 index 000000000..cf5dbea6a --- /dev/null +++ b/internal/processing/stream/notification.go @@ -0,0 +1,38 @@ +/* + GoToSocial + Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +package stream + +import ( + "encoding/json" + "fmt" + + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/stream" +) + +// Notify streams the given notification to any open, appropriate streams belonging to the given account. +func (p *Processor) Notify(n *apimodel.Notification, account *gtsmodel.Account) error { + bytes, err := json.Marshal(n) + if err != nil { + return fmt.Errorf("error marshalling notification to json: %s", err) + } + + return p.toAccount(string(bytes), stream.EventTypeNotification, []string{stream.TimelineNotifications, stream.TimelineHome}, account.ID) +} diff --git a/internal/processing/stream/notification_test.go b/internal/processing/stream/notification_test.go new file mode 100644 index 000000000..56c20b61a --- /dev/null +++ b/internal/processing/stream/notification_test.go @@ -0,0 +1,91 @@ +/* + GoToSocial + Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +package stream_test + +import ( + "bytes" + "context" + "encoding/json" + "testing" + + "github.com/stretchr/testify/suite" + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/testrig" +) + +type NotificationTestSuite struct { + StreamTestSuite +} + +func (suite *NotificationTestSuite) TestStreamNotification() { + account := suite.testAccounts["local_account_1"] + + openStream, errWithCode := suite.streamProcessor.Open(context.Background(), account, "user") + suite.NoError(errWithCode) + + followAccount := suite.testAccounts["remote_account_1"] + followAccountAPIModel, err := testrig.NewTestTypeConverter(suite.db).AccountToAPIAccountPublic(context.Background(), followAccount) + suite.NoError(err) + + notification := &apimodel.Notification{ + ID: "01FH57SJCMDWQGEAJ0X08CE3WV", + Type: "follow", + CreatedAt: "2021-10-04T08:52:36.000Z", + Account: followAccountAPIModel, + } + + err = suite.streamProcessor.Notify(notification, account) + suite.NoError(err) + + msg := <-openStream.Messages + dst := new(bytes.Buffer) + err = json.Indent(dst, []byte(msg.Payload), "", " ") + suite.NoError(err) + suite.Equal(`{ + "id": "01FH57SJCMDWQGEAJ0X08CE3WV", + "type": "follow", + "created_at": "2021-10-04T08:52:36.000Z", + "account": { + "id": "01F8MH5ZK5VRH73AKHQM6Y9VNX", + "username": "foss_satan", + "acct": "foss_satan@fossbros-anonymous.io", + "display_name": "big gerald", + "locked": false, + "discoverable": true, + "bot": false, + "created_at": "2021-09-26T10:52:36.000Z", + "note": "i post about like, i dunno, stuff, or whatever!!!!", + "url": "http://fossbros-anonymous.io/@foss_satan", + "avatar": "", + "avatar_static": "", + "header": "http://localhost:8080/assets/default_header.png", + "header_static": "http://localhost:8080/assets/default_header.png", + "followers_count": 0, + "following_count": 0, + "statuses_count": 1, + "last_status_at": "2021-09-20T10:40:37.000Z", + "emojis": [], + "fields": [] + } +}`, dst.String()) +} + +func TestNotificationTestSuite(t *testing.T) { + suite.Run(t, &NotificationTestSuite{}) +} diff --git a/internal/processing/stream/open.go b/internal/processing/stream/open.go new file mode 100644 index 000000000..10d01a767 --- /dev/null +++ b/internal/processing/stream/open.go @@ -0,0 +1,121 @@ +/* + GoToSocial + Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +package stream + +import ( + "context" + "errors" + "fmt" + + "codeberg.org/gruf/go-kv" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/id" + "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/stream" +) + +// Open returns a new Stream for the given account, which will contain a channel for passing messages back to the caller. +func (p *Processor) Open(ctx context.Context, account *gtsmodel.Account, streamTimeline string) (*stream.Stream, gtserror.WithCode) { + l := log.WithContext(ctx).WithFields(kv.Fields{ + {"account", account.ID}, + {"streamType", streamTimeline}, + }...) + l.Debug("received open stream request") + + // each stream needs a unique ID so we know to close it + streamID, err := id.NewRandomULID() + if err != nil { + return nil, gtserror.NewErrorInternalError(fmt.Errorf("error generating stream id: %s", err)) + } + + thisStream := &stream.Stream{ + ID: streamID, + Timeline: streamTimeline, + Messages: make(chan *stream.Message, 100), + Hangup: make(chan interface{}, 1), + Connected: true, + } + go p.waitToCloseStream(account, thisStream) + + v, ok := p.streamMap.Load(account.ID) + if !ok || v == nil { + // there is no entry in the streamMap for this account yet, so make one and store it + streamsForAccount := &stream.StreamsForAccount{ + Streams: []*stream.Stream{ + thisStream, + }, + } + p.streamMap.Store(account.ID, streamsForAccount) + } else { + // there is an entry in the streamMap for this account + // parse the interface as a streamsForAccount + streamsForAccount, ok := v.(*stream.StreamsForAccount) + if !ok { + return nil, gtserror.NewErrorInternalError(errors.New("stream map error")) + } + + // append this stream to it + streamsForAccount.Lock() + streamsForAccount.Streams = append(streamsForAccount.Streams, thisStream) + streamsForAccount.Unlock() + } + + return thisStream, nil +} + +// waitToCloseStream waits until the hangup channel is closed for the given stream. +// It then iterates through the map of streams stored by the processor, removes the stream from it, +// and then closes the messages channel of the stream to indicate that the channel should no longer be read from. +func (p *Processor) waitToCloseStream(account *gtsmodel.Account, thisStream *stream.Stream) { + <-thisStream.Hangup // wait for a hangup message + + // lock the stream to prevent more messages being put in it while we work + thisStream.Lock() + defer thisStream.Unlock() + + // indicate the stream is no longer connected + thisStream.Connected = false + + // load and parse the entry for this account from the stream map + v, ok := p.streamMap.Load(account.ID) + if !ok || v == nil { + return + } + streamsForAccount, ok := v.(*stream.StreamsForAccount) + if !ok { + return + } + + // lock the streams for account while we remove this stream from its slice + streamsForAccount.Lock() + defer streamsForAccount.Unlock() + + // put everything into modified streams *except* the stream we're removing + modifiedStreams := []*stream.Stream{} + for _, s := range streamsForAccount.Streams { + if s.ID != thisStream.ID { + modifiedStreams = append(modifiedStreams, s) + } + } + streamsForAccount.Streams = modifiedStreams + + // finally close the messages channel so no more messages can be read from it + close(thisStream.Messages) +} diff --git a/internal/processing/stream/open_test.go b/internal/processing/stream/open_test.go new file mode 100644 index 000000000..81b587b58 --- /dev/null +++ b/internal/processing/stream/open_test.go @@ -0,0 +1,41 @@ +/* + GoToSocial + Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +package stream_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/suite" +) + +type OpenStreamTestSuite struct { + StreamTestSuite +} + +func (suite *OpenStreamTestSuite) TestOpenStream() { + account := suite.testAccounts["local_account_1"] + + _, errWithCode := suite.streamProcessor.Open(context.Background(), account, "user") + suite.NoError(errWithCode) +} + +func TestOpenStreamTestSuite(t *testing.T) { + suite.Run(t, &OpenStreamTestSuite{}) +} diff --git a/internal/processing/stream/stream.go b/internal/processing/stream/stream.go new file mode 100644 index 000000000..3c38e720a --- /dev/null +++ b/internal/processing/stream/stream.go @@ -0,0 +1,78 @@ +/* + GoToSocial + Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +package stream + +import ( + "errors" + "sync" + + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/oauth" + "github.com/superseriousbusiness/gotosocial/internal/stream" +) + +type Processor struct { + db db.DB + oauthServer oauth.Server + streamMap *sync.Map +} + +func New(db db.DB, oauthServer oauth.Server) Processor { + return Processor{ + db: db, + oauthServer: oauthServer, + streamMap: &sync.Map{}, + } +} + +// toAccount streams the given payload with the given event type to any streams currently open for the given account ID. +func (p *Processor) toAccount(payload string, event string, timelines []string, accountID string) error { + v, ok := p.streamMap.Load(accountID) + if !ok { + // no open connections so nothing to stream + return nil + } + + streamsForAccount, ok := v.(*stream.StreamsForAccount) + if !ok { + return errors.New("stream map error") + } + + streamsForAccount.Lock() + defer streamsForAccount.Unlock() + for _, s := range streamsForAccount.Streams { + s.Lock() + defer s.Unlock() + if !s.Connected { + continue + } + + for _, t := range timelines { + if s.Timeline == string(t) { + s.Messages <- &stream.Message{ + Stream: []string{string(t)}, + Event: string(event), + Payload: payload, + } + } + } + } + + return nil +} diff --git a/internal/processing/stream/stream_test.go b/internal/processing/stream/stream_test.go new file mode 100644 index 000000000..907c7e1d0 --- /dev/null +++ b/internal/processing/stream/stream_test.go @@ -0,0 +1,55 @@ +/* + GoToSocial + Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +package stream_test + +import ( + "github.com/stretchr/testify/suite" + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/oauth" + "github.com/superseriousbusiness/gotosocial/internal/processing/stream" + "github.com/superseriousbusiness/gotosocial/testrig" +) + +type StreamTestSuite struct { + suite.Suite + testAccounts map[string]*gtsmodel.Account + testTokens map[string]*gtsmodel.Token + db db.DB + oauthServer oauth.Server + + streamProcessor stream.Processor +} + +func (suite *StreamTestSuite) SetupTest() { + testrig.InitTestLog() + testrig.InitTestConfig() + + suite.testAccounts = testrig.NewTestAccounts() + suite.testTokens = testrig.NewTestTokens() + suite.db = testrig.NewTestDB() + suite.oauthServer = testrig.NewTestOauthServer(suite.db) + suite.streamProcessor = stream.New(suite.db, suite.oauthServer) + + testrig.StandardDBSetup(suite.db, suite.testAccounts) +} + +func (suite *StreamTestSuite) TearDownTest() { + testrig.StandardDBTeardown(suite.db) +} diff --git a/internal/processing/stream/update.go b/internal/processing/stream/update.go new file mode 100644 index 000000000..41ce2c4db --- /dev/null +++ b/internal/processing/stream/update.go @@ -0,0 +1,38 @@ +/* + GoToSocial + Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +package stream + +import ( + "encoding/json" + "fmt" + + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/stream" +) + +// Update streams the given update to any open, appropriate streams belonging to the given account. +func (p *Processor) Update(s *apimodel.Status, account *gtsmodel.Account, timeline string) error { + bytes, err := json.Marshal(s) + if err != nil { + return fmt.Errorf("error marshalling status to json: %s", err) + } + + return p.toAccount(string(bytes), stream.EventTypeUpdate, []string{timeline}, account.ID) +} |