summaryrefslogtreecommitdiff
path: root/internal/processing/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'internal/processing/streaming')
-rw-r--r--internal/processing/streaming/authorize.go62
-rw-r--r--internal/processing/streaming/authorize_test.go48
-rw-r--r--internal/processing/streaming/notification.go37
-rw-r--r--internal/processing/streaming/notification_test.go91
-rw-r--r--internal/processing/streaming/openstream.go121
-rw-r--r--internal/processing/streaming/openstream_test.go41
-rw-r--r--internal/processing/streaming/streamdelete.go55
-rw-r--r--internal/processing/streaming/streaming.go60
-rw-r--r--internal/processing/streaming/streaming_test.go55
-rw-r--r--internal/processing/streaming/streamtoaccount.go61
-rw-r--r--internal/processing/streaming/update.go37
11 files changed, 0 insertions, 668 deletions
diff --git a/internal/processing/streaming/authorize.go b/internal/processing/streaming/authorize.go
deleted file mode 100644
index 1581e7893..000000000
--- a/internal/processing/streaming/authorize.go
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- GoToSocial
- Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org
-
- This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU Affero General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Affero General Public License for more details.
-
- You should have received a copy of the GNU Affero General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-package streaming
-
-import (
- "context"
- "fmt"
-
- "github.com/superseriousbusiness/gotosocial/internal/db"
- "github.com/superseriousbusiness/gotosocial/internal/gtserror"
- "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
-)
-
-func (p *processor) AuthorizeStreamingRequest(ctx context.Context, accessToken string) (*gtsmodel.Account, gtserror.WithCode) {
- ti, err := p.oauthServer.LoadAccessToken(ctx, accessToken)
- if err != nil {
- err := fmt.Errorf("could not load access token: %s", err)
- return nil, gtserror.NewErrorUnauthorized(err)
- }
-
- uid := ti.GetUserID()
- if uid == "" {
- err := fmt.Errorf("no userid in token")
- return nil, gtserror.NewErrorUnauthorized(err)
- }
-
- user, err := p.db.GetUserByID(ctx, uid)
- if err != nil {
- if err == db.ErrNoEntries {
- err := fmt.Errorf("no user found for validated uid %s", uid)
- return nil, gtserror.NewErrorUnauthorized(err)
- }
- return nil, gtserror.NewErrorInternalError(err)
- }
-
- acct, err := p.db.GetAccountByID(ctx, user.AccountID)
- if err != nil {
- if err == db.ErrNoEntries {
- err := fmt.Errorf("no account found for validated uid %s", uid)
- return nil, gtserror.NewErrorUnauthorized(err)
- }
- return nil, gtserror.NewErrorInternalError(err)
- }
-
- return acct, nil
-}
diff --git a/internal/processing/streaming/authorize_test.go b/internal/processing/streaming/authorize_test.go
deleted file mode 100644
index 17ec20424..000000000
--- a/internal/processing/streaming/authorize_test.go
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- GoToSocial
- Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org
-
- This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU Affero General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Affero General Public License for more details.
-
- You should have received a copy of the GNU Affero General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-package streaming_test
-
-import (
- "context"
- "testing"
-
- "github.com/stretchr/testify/suite"
-)
-
-type AuthorizeTestSuite struct {
- StreamingTestSuite
-}
-
-func (suite *AuthorizeTestSuite) TestAuthorize() {
- account1, err := suite.streamingProcessor.AuthorizeStreamingRequest(context.Background(), suite.testTokens["local_account_1"].Access)
- suite.NoError(err)
- suite.Equal(suite.testAccounts["local_account_1"].ID, account1.ID)
-
- account2, err := suite.streamingProcessor.AuthorizeStreamingRequest(context.Background(), suite.testTokens["local_account_2"].Access)
- suite.NoError(err)
- suite.Equal(suite.testAccounts["local_account_2"].ID, account2.ID)
-
- noAccount, err := suite.streamingProcessor.AuthorizeStreamingRequest(context.Background(), "aaaaaaaaaaaaaaaaaaaaa!!")
- suite.EqualError(err, "could not load access token: no entries")
- suite.Nil(noAccount)
-}
-
-func TestAuthorizeTestSuite(t *testing.T) {
- suite.Run(t, &AuthorizeTestSuite{})
-}
diff --git a/internal/processing/streaming/notification.go b/internal/processing/streaming/notification.go
deleted file mode 100644
index ce3d94394..000000000
--- a/internal/processing/streaming/notification.go
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- GoToSocial
- Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org
-
- This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU Affero General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Affero General Public License for more details.
-
- You should have received a copy of the GNU Affero General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-package streaming
-
-import (
- "encoding/json"
- "fmt"
-
- apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
- "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
- "github.com/superseriousbusiness/gotosocial/internal/stream"
-)
-
-func (p *processor) StreamNotificationToAccount(n *apimodel.Notification, account *gtsmodel.Account) error {
- bytes, err := json.Marshal(n)
- if err != nil {
- return fmt.Errorf("error marshalling notification to json: %s", err)
- }
-
- return p.streamToAccount(string(bytes), stream.EventTypeNotification, []string{stream.TimelineNotifications, stream.TimelineHome}, account.ID)
-}
diff --git a/internal/processing/streaming/notification_test.go b/internal/processing/streaming/notification_test.go
deleted file mode 100644
index f31c169e1..000000000
--- a/internal/processing/streaming/notification_test.go
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- GoToSocial
- Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org
-
- This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU Affero General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Affero General Public License for more details.
-
- You should have received a copy of the GNU Affero General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-package streaming_test
-
-import (
- "bytes"
- "context"
- "encoding/json"
- "testing"
-
- "github.com/stretchr/testify/suite"
- apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
- "github.com/superseriousbusiness/gotosocial/testrig"
-)
-
-type NotificationTestSuite struct {
- StreamingTestSuite
-}
-
-func (suite *NotificationTestSuite) TestStreamNotification() {
- account := suite.testAccounts["local_account_1"]
-
- openStream, errWithCode := suite.streamingProcessor.OpenStreamForAccount(context.Background(), account, "user")
- suite.NoError(errWithCode)
-
- followAccount := suite.testAccounts["remote_account_1"]
- followAccountAPIModel, err := testrig.NewTestTypeConverter(suite.db).AccountToAPIAccountPublic(context.Background(), followAccount)
- suite.NoError(err)
-
- notification := &apimodel.Notification{
- ID: "01FH57SJCMDWQGEAJ0X08CE3WV",
- Type: "follow",
- CreatedAt: "2021-10-04T08:52:36.000Z",
- Account: followAccountAPIModel,
- }
-
- err = suite.streamingProcessor.StreamNotificationToAccount(notification, account)
- suite.NoError(err)
-
- msg := <-openStream.Messages
- dst := new(bytes.Buffer)
- err = json.Indent(dst, []byte(msg.Payload), "", " ")
- suite.NoError(err)
- suite.Equal(`{
- "id": "01FH57SJCMDWQGEAJ0X08CE3WV",
- "type": "follow",
- "created_at": "2021-10-04T08:52:36.000Z",
- "account": {
- "id": "01F8MH5ZK5VRH73AKHQM6Y9VNX",
- "username": "foss_satan",
- "acct": "foss_satan@fossbros-anonymous.io",
- "display_name": "big gerald",
- "locked": false,
- "discoverable": true,
- "bot": false,
- "created_at": "2021-09-26T10:52:36.000Z",
- "note": "i post about like, i dunno, stuff, or whatever!!!!",
- "url": "http://fossbros-anonymous.io/@foss_satan",
- "avatar": "",
- "avatar_static": "",
- "header": "http://localhost:8080/assets/default_header.png",
- "header_static": "http://localhost:8080/assets/default_header.png",
- "followers_count": 0,
- "following_count": 0,
- "statuses_count": 1,
- "last_status_at": "2021-09-20T10:40:37.000Z",
- "emojis": [],
- "fields": []
- }
-}`, dst.String())
-}
-
-func TestNotificationTestSuite(t *testing.T) {
- suite.Run(t, &NotificationTestSuite{})
-}
diff --git a/internal/processing/streaming/openstream.go b/internal/processing/streaming/openstream.go
deleted file mode 100644
index 7913e6745..000000000
--- a/internal/processing/streaming/openstream.go
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- GoToSocial
- Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org
-
- This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU Affero General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Affero General Public License for more details.
-
- You should have received a copy of the GNU Affero General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-package streaming
-
-import (
- "context"
- "errors"
- "fmt"
-
- "codeberg.org/gruf/go-kv"
- "github.com/superseriousbusiness/gotosocial/internal/gtserror"
- "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
- "github.com/superseriousbusiness/gotosocial/internal/id"
- "github.com/superseriousbusiness/gotosocial/internal/log"
- "github.com/superseriousbusiness/gotosocial/internal/stream"
-)
-
-func (p *processor) OpenStreamForAccount(ctx context.Context, account *gtsmodel.Account, streamTimeline string) (*stream.Stream, gtserror.WithCode) {
- l := log.WithContext(ctx).
- WithFields(kv.Fields{
- {"account", account.ID},
- {"streamType", streamTimeline},
- }...)
- l.Debug("received open stream request")
-
- // each stream needs a unique ID so we know to close it
- streamID, err := id.NewRandomULID()
- if err != nil {
- return nil, gtserror.NewErrorInternalError(fmt.Errorf("error generating stream id: %s", err))
- }
-
- thisStream := &stream.Stream{
- ID: streamID,
- Timeline: streamTimeline,
- Messages: make(chan *stream.Message, 100),
- Hangup: make(chan interface{}, 1),
- Connected: true,
- }
- go p.waitToCloseStream(account, thisStream)
-
- v, ok := p.streamMap.Load(account.ID)
- if !ok || v == nil {
- // there is no entry in the streamMap for this account yet, so make one and store it
- streamsForAccount := &stream.StreamsForAccount{
- Streams: []*stream.Stream{
- thisStream,
- },
- }
- p.streamMap.Store(account.ID, streamsForAccount)
- } else {
- // there is an entry in the streamMap for this account
- // parse the interface as a streamsForAccount
- streamsForAccount, ok := v.(*stream.StreamsForAccount)
- if !ok {
- return nil, gtserror.NewErrorInternalError(errors.New("stream map error"))
- }
-
- // append this stream to it
- streamsForAccount.Lock()
- streamsForAccount.Streams = append(streamsForAccount.Streams, thisStream)
- streamsForAccount.Unlock()
- }
-
- return thisStream, nil
-}
-
-// waitToCloseStream waits until the hangup channel is closed for the given stream.
-// It then iterates through the map of streams stored by the processor, removes the stream from it,
-// and then closes the messages channel of the stream to indicate that the channel should no longer be read from.
-func (p *processor) waitToCloseStream(account *gtsmodel.Account, thisStream *stream.Stream) {
- <-thisStream.Hangup // wait for a hangup message
-
- // lock the stream to prevent more messages being put in it while we work
- thisStream.Lock()
- defer thisStream.Unlock()
-
- // indicate the stream is no longer connected
- thisStream.Connected = false
-
- // load and parse the entry for this account from the stream map
- v, ok := p.streamMap.Load(account.ID)
- if !ok || v == nil {
- return
- }
- streamsForAccount, ok := v.(*stream.StreamsForAccount)
- if !ok {
- return
- }
-
- // lock the streams for account while we remove this stream from its slice
- streamsForAccount.Lock()
- defer streamsForAccount.Unlock()
-
- // put everything into modified streams *except* the stream we're removing
- modifiedStreams := []*stream.Stream{}
- for _, s := range streamsForAccount.Streams {
- if s.ID != thisStream.ID {
- modifiedStreams = append(modifiedStreams, s)
- }
- }
- streamsForAccount.Streams = modifiedStreams
-
- // finally close the messages channel so no more messages can be read from it
- close(thisStream.Messages)
-}
diff --git a/internal/processing/streaming/openstream_test.go b/internal/processing/streaming/openstream_test.go
deleted file mode 100644
index 13b3c72b3..000000000
--- a/internal/processing/streaming/openstream_test.go
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- GoToSocial
- Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org
-
- This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU Affero General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Affero General Public License for more details.
-
- You should have received a copy of the GNU Affero General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-package streaming_test
-
-import (
- "context"
- "testing"
-
- "github.com/stretchr/testify/suite"
-)
-
-type OpenStreamTestSuite struct {
- StreamingTestSuite
-}
-
-func (suite *OpenStreamTestSuite) TestOpenStream() {
- account := suite.testAccounts["local_account_1"]
-
- _, errWithCode := suite.streamingProcessor.OpenStreamForAccount(context.Background(), account, "user")
- suite.NoError(errWithCode)
-}
-
-func TestOpenStreamTestSuite(t *testing.T) {
- suite.Run(t, &OpenStreamTestSuite{})
-}
diff --git a/internal/processing/streaming/streamdelete.go b/internal/processing/streaming/streamdelete.go
deleted file mode 100644
index 314e90198..000000000
--- a/internal/processing/streaming/streamdelete.go
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- GoToSocial
- Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org
-
- This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU Affero General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Affero General Public License for more details.
-
- You should have received a copy of the GNU Affero General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-package streaming
-
-import (
- "fmt"
- "strings"
-
- "github.com/superseriousbusiness/gotosocial/internal/stream"
-)
-
-func (p *processor) StreamDelete(statusID string) error {
- errs := []string{}
-
- // get all account IDs with open streams
- accountIDs := []string{}
- p.streamMap.Range(func(k interface{}, _ interface{}) bool {
- key, ok := k.(string)
- if !ok {
- panic("streamMap key was not a string (account id)")
- }
-
- accountIDs = append(accountIDs, key)
- return true
- })
-
- // stream the delete to every account
- for _, accountID := range accountIDs {
- if err := p.streamToAccount(statusID, stream.EventTypeDelete, stream.AllStatusTimelines, accountID); err != nil {
- errs = append(errs, err.Error())
- }
- }
-
- if len(errs) != 0 {
- return fmt.Errorf("one or more errors streaming status delete: %s", strings.Join(errs, ";"))
- }
-
- return nil
-}
diff --git a/internal/processing/streaming/streaming.go b/internal/processing/streaming/streaming.go
deleted file mode 100644
index 4b2a80cc8..000000000
--- a/internal/processing/streaming/streaming.go
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- GoToSocial
- Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org
-
- This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU Affero General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Affero General Public License for more details.
-
- You should have received a copy of the GNU Affero General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-package streaming
-
-import (
- "context"
- "sync"
-
- apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
- "github.com/superseriousbusiness/gotosocial/internal/db"
- "github.com/superseriousbusiness/gotosocial/internal/gtserror"
- "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
- "github.com/superseriousbusiness/gotosocial/internal/oauth"
- "github.com/superseriousbusiness/gotosocial/internal/stream"
-)
-
-// Processor wraps a bunch of functions for processing streaming.
-type Processor interface {
- // AuthorizeStreamingRequest returns an oauth2 token info in response to an access token query from the streaming API
- AuthorizeStreamingRequest(ctx context.Context, accessToken string) (*gtsmodel.Account, gtserror.WithCode)
- // OpenStreamForAccount returns a new Stream for the given account, which will contain a channel for passing messages back to the caller.
- OpenStreamForAccount(ctx context.Context, account *gtsmodel.Account, timeline string) (*stream.Stream, gtserror.WithCode)
- // StreamUpdateToAccount streams the given update to any open, appropriate streams belonging to the given account.
- StreamUpdateToAccount(s *apimodel.Status, account *gtsmodel.Account, timeline string) error
- // StreamNotificationToAccount streams the given notification to any open, appropriate streams belonging to the given account.
- StreamNotificationToAccount(n *apimodel.Notification, account *gtsmodel.Account) error
- // StreamDelete streams the delete of the given statusID to *ALL* open streams.
- StreamDelete(statusID string) error
-}
-
-type processor struct {
- db db.DB
- oauthServer oauth.Server
- streamMap *sync.Map
-}
-
-// New returns a new status processor.
-func New(db db.DB, oauthServer oauth.Server) Processor {
- return &processor{
- db: db,
- oauthServer: oauthServer,
- streamMap: &sync.Map{},
- }
-}
diff --git a/internal/processing/streaming/streaming_test.go b/internal/processing/streaming/streaming_test.go
deleted file mode 100644
index 21323a051..000000000
--- a/internal/processing/streaming/streaming_test.go
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- GoToSocial
- Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org
-
- This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU Affero General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Affero General Public License for more details.
-
- You should have received a copy of the GNU Affero General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-package streaming_test
-
-import (
- "github.com/stretchr/testify/suite"
- "github.com/superseriousbusiness/gotosocial/internal/db"
- "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
- "github.com/superseriousbusiness/gotosocial/internal/oauth"
- "github.com/superseriousbusiness/gotosocial/internal/processing/streaming"
- "github.com/superseriousbusiness/gotosocial/testrig"
-)
-
-type StreamingTestSuite struct {
- suite.Suite
- testAccounts map[string]*gtsmodel.Account
- testTokens map[string]*gtsmodel.Token
- db db.DB
- oauthServer oauth.Server
-
- streamingProcessor streaming.Processor
-}
-
-func (suite *StreamingTestSuite) SetupTest() {
- testrig.InitTestLog()
- testrig.InitTestConfig()
-
- suite.testAccounts = testrig.NewTestAccounts()
- suite.testTokens = testrig.NewTestTokens()
- suite.db = testrig.NewTestDB()
- suite.oauthServer = testrig.NewTestOauthServer(suite.db)
- suite.streamingProcessor = streaming.New(suite.db, suite.oauthServer)
-
- testrig.StandardDBSetup(suite.db, suite.testAccounts)
-}
-
-func (suite *StreamingTestSuite) TearDownTest() {
- testrig.StandardDBTeardown(suite.db)
-}
diff --git a/internal/processing/streaming/streamtoaccount.go b/internal/processing/streaming/streamtoaccount.go
deleted file mode 100644
index f0159b7eb..000000000
--- a/internal/processing/streaming/streamtoaccount.go
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- GoToSocial
- Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org
-
- This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU Affero General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Affero General Public License for more details.
-
- You should have received a copy of the GNU Affero General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-package streaming
-
-import (
- "errors"
-
- "github.com/superseriousbusiness/gotosocial/internal/stream"
-)
-
-// streamToAccount streams the given payload with the given event type to any streams currently open for the given account ID.
-func (p *processor) streamToAccount(payload string, event string, timelines []string, accountID string) error {
- v, ok := p.streamMap.Load(accountID)
- if !ok {
- // no open connections so nothing to stream
- return nil
- }
-
- streamsForAccount, ok := v.(*stream.StreamsForAccount)
- if !ok {
- return errors.New("stream map error")
- }
-
- streamsForAccount.Lock()
- defer streamsForAccount.Unlock()
- for _, s := range streamsForAccount.Streams {
- s.Lock()
- defer s.Unlock()
- if !s.Connected {
- continue
- }
-
- for _, t := range timelines {
- if s.Timeline == string(t) {
- s.Messages <- &stream.Message{
- Stream: []string{string(t)},
- Event: string(event),
- Payload: payload,
- }
- }
- }
- }
-
- return nil
-}
diff --git a/internal/processing/streaming/update.go b/internal/processing/streaming/update.go
deleted file mode 100644
index e29ad6169..000000000
--- a/internal/processing/streaming/update.go
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- GoToSocial
- Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org
-
- This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU Affero General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Affero General Public License for more details.
-
- You should have received a copy of the GNU Affero General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-package streaming
-
-import (
- "encoding/json"
- "fmt"
-
- apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
- "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
- "github.com/superseriousbusiness/gotosocial/internal/stream"
-)
-
-func (p *processor) StreamUpdateToAccount(s *apimodel.Status, account *gtsmodel.Account, timeline string) error {
- bytes, err := json.Marshal(s)
- if err != nil {
- return fmt.Errorf("error marshalling status to json: %s", err)
- }
-
- return p.streamToAccount(string(bytes), stream.EventTypeUpdate, []string{timeline}, account.ID)
-}