summaryrefslogtreecommitdiff
path: root/internal/processing/stream
diff options
context:
space:
mode:
Diffstat (limited to 'internal/processing/stream')
-rw-r--r--internal/processing/stream/authorize.go63
-rw-r--r--internal/processing/stream/authorize_test.go48
-rw-r--r--internal/processing/stream/delete.go56
-rw-r--r--internal/processing/stream/notification.go38
-rw-r--r--internal/processing/stream/notification_test.go91
-rw-r--r--internal/processing/stream/open.go121
-rw-r--r--internal/processing/stream/open_test.go41
-rw-r--r--internal/processing/stream/stream.go78
-rw-r--r--internal/processing/stream/stream_test.go55
-rw-r--r--internal/processing/stream/update.go38
10 files changed, 629 insertions, 0 deletions
diff --git a/internal/processing/stream/authorize.go b/internal/processing/stream/authorize.go
new file mode 100644
index 000000000..5f6811db9
--- /dev/null
+++ b/internal/processing/stream/authorize.go
@@ -0,0 +1,63 @@
+/*
+ GoToSocial
+ Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+package stream
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/superseriousbusiness/gotosocial/internal/db"
+ "github.com/superseriousbusiness/gotosocial/internal/gtserror"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+)
+
+// Authorize returns an oauth2 token info in response to an access token query from the streaming API
+func (p *Processor) Authorize(ctx context.Context, accessToken string) (*gtsmodel.Account, gtserror.WithCode) {
+ ti, err := p.oauthServer.LoadAccessToken(ctx, accessToken)
+ if err != nil {
+ err := fmt.Errorf("could not load access token: %s", err)
+ return nil, gtserror.NewErrorUnauthorized(err)
+ }
+
+ uid := ti.GetUserID()
+ if uid == "" {
+ err := fmt.Errorf("no userid in token")
+ return nil, gtserror.NewErrorUnauthorized(err)
+ }
+
+ user, err := p.db.GetUserByID(ctx, uid)
+ if err != nil {
+ if err == db.ErrNoEntries {
+ err := fmt.Errorf("no user found for validated uid %s", uid)
+ return nil, gtserror.NewErrorUnauthorized(err)
+ }
+ return nil, gtserror.NewErrorInternalError(err)
+ }
+
+ acct, err := p.db.GetAccountByID(ctx, user.AccountID)
+ if err != nil {
+ if err == db.ErrNoEntries {
+ err := fmt.Errorf("no account found for validated uid %s", uid)
+ return nil, gtserror.NewErrorUnauthorized(err)
+ }
+ return nil, gtserror.NewErrorInternalError(err)
+ }
+
+ return acct, nil
+}
diff --git a/internal/processing/stream/authorize_test.go b/internal/processing/stream/authorize_test.go
new file mode 100644
index 000000000..664c63787
--- /dev/null
+++ b/internal/processing/stream/authorize_test.go
@@ -0,0 +1,48 @@
+/*
+ GoToSocial
+ Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+package stream_test
+
+import (
+ "context"
+ "testing"
+
+ "github.com/stretchr/testify/suite"
+)
+
+type AuthorizeTestSuite struct {
+ StreamTestSuite
+}
+
+func (suite *AuthorizeTestSuite) TestAuthorize() {
+ account1, err := suite.streamProcessor.Authorize(context.Background(), suite.testTokens["local_account_1"].Access)
+ suite.NoError(err)
+ suite.Equal(suite.testAccounts["local_account_1"].ID, account1.ID)
+
+ account2, err := suite.streamProcessor.Authorize(context.Background(), suite.testTokens["local_account_2"].Access)
+ suite.NoError(err)
+ suite.Equal(suite.testAccounts["local_account_2"].ID, account2.ID)
+
+ noAccount, err := suite.streamProcessor.Authorize(context.Background(), "aaaaaaaaaaaaaaaaaaaaa!!")
+ suite.EqualError(err, "could not load access token: no entries")
+ suite.Nil(noAccount)
+}
+
+func TestAuthorizeTestSuite(t *testing.T) {
+ suite.Run(t, &AuthorizeTestSuite{})
+}
diff --git a/internal/processing/stream/delete.go b/internal/processing/stream/delete.go
new file mode 100644
index 000000000..0a25b4a50
--- /dev/null
+++ b/internal/processing/stream/delete.go
@@ -0,0 +1,56 @@
+/*
+ GoToSocial
+ Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+package stream
+
+import (
+ "fmt"
+ "strings"
+
+ "github.com/superseriousbusiness/gotosocial/internal/stream"
+)
+
+// Delete streams the delete of the given statusID to *ALL* open streams.
+func (p *Processor) Delete(statusID string) error {
+ errs := []string{}
+
+ // get all account IDs with open streams
+ accountIDs := []string{}
+ p.streamMap.Range(func(k interface{}, _ interface{}) bool {
+ key, ok := k.(string)
+ if !ok {
+ panic("streamMap key was not a string (account id)")
+ }
+
+ accountIDs = append(accountIDs, key)
+ return true
+ })
+
+ // stream the delete to every account
+ for _, accountID := range accountIDs {
+ if err := p.toAccount(statusID, stream.EventTypeDelete, stream.AllStatusTimelines, accountID); err != nil {
+ errs = append(errs, err.Error())
+ }
+ }
+
+ if len(errs) != 0 {
+ return fmt.Errorf("one or more errors streaming status delete: %s", strings.Join(errs, ";"))
+ }
+
+ return nil
+}
diff --git a/internal/processing/stream/notification.go b/internal/processing/stream/notification.go
new file mode 100644
index 000000000..cf5dbea6a
--- /dev/null
+++ b/internal/processing/stream/notification.go
@@ -0,0 +1,38 @@
+/*
+ GoToSocial
+ Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+package stream
+
+import (
+ "encoding/json"
+ "fmt"
+
+ apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/stream"
+)
+
+// Notify streams the given notification to any open, appropriate streams belonging to the given account.
+func (p *Processor) Notify(n *apimodel.Notification, account *gtsmodel.Account) error {
+ bytes, err := json.Marshal(n)
+ if err != nil {
+ return fmt.Errorf("error marshalling notification to json: %s", err)
+ }
+
+ return p.toAccount(string(bytes), stream.EventTypeNotification, []string{stream.TimelineNotifications, stream.TimelineHome}, account.ID)
+}
diff --git a/internal/processing/stream/notification_test.go b/internal/processing/stream/notification_test.go
new file mode 100644
index 000000000..56c20b61a
--- /dev/null
+++ b/internal/processing/stream/notification_test.go
@@ -0,0 +1,91 @@
+/*
+ GoToSocial
+ Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+package stream_test
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "testing"
+
+ "github.com/stretchr/testify/suite"
+ apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
+ "github.com/superseriousbusiness/gotosocial/testrig"
+)
+
+type NotificationTestSuite struct {
+ StreamTestSuite
+}
+
+func (suite *NotificationTestSuite) TestStreamNotification() {
+ account := suite.testAccounts["local_account_1"]
+
+ openStream, errWithCode := suite.streamProcessor.Open(context.Background(), account, "user")
+ suite.NoError(errWithCode)
+
+ followAccount := suite.testAccounts["remote_account_1"]
+ followAccountAPIModel, err := testrig.NewTestTypeConverter(suite.db).AccountToAPIAccountPublic(context.Background(), followAccount)
+ suite.NoError(err)
+
+ notification := &apimodel.Notification{
+ ID: "01FH57SJCMDWQGEAJ0X08CE3WV",
+ Type: "follow",
+ CreatedAt: "2021-10-04T08:52:36.000Z",
+ Account: followAccountAPIModel,
+ }
+
+ err = suite.streamProcessor.Notify(notification, account)
+ suite.NoError(err)
+
+ msg := <-openStream.Messages
+ dst := new(bytes.Buffer)
+ err = json.Indent(dst, []byte(msg.Payload), "", " ")
+ suite.NoError(err)
+ suite.Equal(`{
+ "id": "01FH57SJCMDWQGEAJ0X08CE3WV",
+ "type": "follow",
+ "created_at": "2021-10-04T08:52:36.000Z",
+ "account": {
+ "id": "01F8MH5ZK5VRH73AKHQM6Y9VNX",
+ "username": "foss_satan",
+ "acct": "foss_satan@fossbros-anonymous.io",
+ "display_name": "big gerald",
+ "locked": false,
+ "discoverable": true,
+ "bot": false,
+ "created_at": "2021-09-26T10:52:36.000Z",
+ "note": "i post about like, i dunno, stuff, or whatever!!!!",
+ "url": "http://fossbros-anonymous.io/@foss_satan",
+ "avatar": "",
+ "avatar_static": "",
+ "header": "http://localhost:8080/assets/default_header.png",
+ "header_static": "http://localhost:8080/assets/default_header.png",
+ "followers_count": 0,
+ "following_count": 0,
+ "statuses_count": 1,
+ "last_status_at": "2021-09-20T10:40:37.000Z",
+ "emojis": [],
+ "fields": []
+ }
+}`, dst.String())
+}
+
+func TestNotificationTestSuite(t *testing.T) {
+ suite.Run(t, &NotificationTestSuite{})
+}
diff --git a/internal/processing/stream/open.go b/internal/processing/stream/open.go
new file mode 100644
index 000000000..10d01a767
--- /dev/null
+++ b/internal/processing/stream/open.go
@@ -0,0 +1,121 @@
+/*
+ GoToSocial
+ Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+package stream
+
+import (
+ "context"
+ "errors"
+ "fmt"
+
+ "codeberg.org/gruf/go-kv"
+ "github.com/superseriousbusiness/gotosocial/internal/gtserror"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/id"
+ "github.com/superseriousbusiness/gotosocial/internal/log"
+ "github.com/superseriousbusiness/gotosocial/internal/stream"
+)
+
+// Open returns a new Stream for the given account, which will contain a channel for passing messages back to the caller.
+func (p *Processor) Open(ctx context.Context, account *gtsmodel.Account, streamTimeline string) (*stream.Stream, gtserror.WithCode) {
+ l := log.WithContext(ctx).WithFields(kv.Fields{
+ {"account", account.ID},
+ {"streamType", streamTimeline},
+ }...)
+ l.Debug("received open stream request")
+
+ // each stream needs a unique ID so we know to close it
+ streamID, err := id.NewRandomULID()
+ if err != nil {
+ return nil, gtserror.NewErrorInternalError(fmt.Errorf("error generating stream id: %s", err))
+ }
+
+ thisStream := &stream.Stream{
+ ID: streamID,
+ Timeline: streamTimeline,
+ Messages: make(chan *stream.Message, 100),
+ Hangup: make(chan interface{}, 1),
+ Connected: true,
+ }
+ go p.waitToCloseStream(account, thisStream)
+
+ v, ok := p.streamMap.Load(account.ID)
+ if !ok || v == nil {
+ // there is no entry in the streamMap for this account yet, so make one and store it
+ streamsForAccount := &stream.StreamsForAccount{
+ Streams: []*stream.Stream{
+ thisStream,
+ },
+ }
+ p.streamMap.Store(account.ID, streamsForAccount)
+ } else {
+ // there is an entry in the streamMap for this account
+ // parse the interface as a streamsForAccount
+ streamsForAccount, ok := v.(*stream.StreamsForAccount)
+ if !ok {
+ return nil, gtserror.NewErrorInternalError(errors.New("stream map error"))
+ }
+
+ // append this stream to it
+ streamsForAccount.Lock()
+ streamsForAccount.Streams = append(streamsForAccount.Streams, thisStream)
+ streamsForAccount.Unlock()
+ }
+
+ return thisStream, nil
+}
+
+// waitToCloseStream waits until the hangup channel is closed for the given stream.
+// It then iterates through the map of streams stored by the processor, removes the stream from it,
+// and then closes the messages channel of the stream to indicate that the channel should no longer be read from.
+func (p *Processor) waitToCloseStream(account *gtsmodel.Account, thisStream *stream.Stream) {
+ <-thisStream.Hangup // wait for a hangup message
+
+ // lock the stream to prevent more messages being put in it while we work
+ thisStream.Lock()
+ defer thisStream.Unlock()
+
+ // indicate the stream is no longer connected
+ thisStream.Connected = false
+
+ // load and parse the entry for this account from the stream map
+ v, ok := p.streamMap.Load(account.ID)
+ if !ok || v == nil {
+ return
+ }
+ streamsForAccount, ok := v.(*stream.StreamsForAccount)
+ if !ok {
+ return
+ }
+
+ // lock the streams for account while we remove this stream from its slice
+ streamsForAccount.Lock()
+ defer streamsForAccount.Unlock()
+
+ // put everything into modified streams *except* the stream we're removing
+ modifiedStreams := []*stream.Stream{}
+ for _, s := range streamsForAccount.Streams {
+ if s.ID != thisStream.ID {
+ modifiedStreams = append(modifiedStreams, s)
+ }
+ }
+ streamsForAccount.Streams = modifiedStreams
+
+ // finally close the messages channel so no more messages can be read from it
+ close(thisStream.Messages)
+}
diff --git a/internal/processing/stream/open_test.go b/internal/processing/stream/open_test.go
new file mode 100644
index 000000000..81b587b58
--- /dev/null
+++ b/internal/processing/stream/open_test.go
@@ -0,0 +1,41 @@
+/*
+ GoToSocial
+ Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+package stream_test
+
+import (
+ "context"
+ "testing"
+
+ "github.com/stretchr/testify/suite"
+)
+
+type OpenStreamTestSuite struct {
+ StreamTestSuite
+}
+
+func (suite *OpenStreamTestSuite) TestOpenStream() {
+ account := suite.testAccounts["local_account_1"]
+
+ _, errWithCode := suite.streamProcessor.Open(context.Background(), account, "user")
+ suite.NoError(errWithCode)
+}
+
+func TestOpenStreamTestSuite(t *testing.T) {
+ suite.Run(t, &OpenStreamTestSuite{})
+}
diff --git a/internal/processing/stream/stream.go b/internal/processing/stream/stream.go
new file mode 100644
index 000000000..3c38e720a
--- /dev/null
+++ b/internal/processing/stream/stream.go
@@ -0,0 +1,78 @@
+/*
+ GoToSocial
+ Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+package stream
+
+import (
+ "errors"
+ "sync"
+
+ "github.com/superseriousbusiness/gotosocial/internal/db"
+ "github.com/superseriousbusiness/gotosocial/internal/oauth"
+ "github.com/superseriousbusiness/gotosocial/internal/stream"
+)
+
+type Processor struct {
+ db db.DB
+ oauthServer oauth.Server
+ streamMap *sync.Map
+}
+
+func New(db db.DB, oauthServer oauth.Server) Processor {
+ return Processor{
+ db: db,
+ oauthServer: oauthServer,
+ streamMap: &sync.Map{},
+ }
+}
+
+// toAccount streams the given payload with the given event type to any streams currently open for the given account ID.
+func (p *Processor) toAccount(payload string, event string, timelines []string, accountID string) error {
+ v, ok := p.streamMap.Load(accountID)
+ if !ok {
+ // no open connections so nothing to stream
+ return nil
+ }
+
+ streamsForAccount, ok := v.(*stream.StreamsForAccount)
+ if !ok {
+ return errors.New("stream map error")
+ }
+
+ streamsForAccount.Lock()
+ defer streamsForAccount.Unlock()
+ for _, s := range streamsForAccount.Streams {
+ s.Lock()
+ defer s.Unlock()
+ if !s.Connected {
+ continue
+ }
+
+ for _, t := range timelines {
+ if s.Timeline == string(t) {
+ s.Messages <- &stream.Message{
+ Stream: []string{string(t)},
+ Event: string(event),
+ Payload: payload,
+ }
+ }
+ }
+ }
+
+ return nil
+}
diff --git a/internal/processing/stream/stream_test.go b/internal/processing/stream/stream_test.go
new file mode 100644
index 000000000..907c7e1d0
--- /dev/null
+++ b/internal/processing/stream/stream_test.go
@@ -0,0 +1,55 @@
+/*
+ GoToSocial
+ Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+package stream_test
+
+import (
+ "github.com/stretchr/testify/suite"
+ "github.com/superseriousbusiness/gotosocial/internal/db"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/oauth"
+ "github.com/superseriousbusiness/gotosocial/internal/processing/stream"
+ "github.com/superseriousbusiness/gotosocial/testrig"
+)
+
+type StreamTestSuite struct {
+ suite.Suite
+ testAccounts map[string]*gtsmodel.Account
+ testTokens map[string]*gtsmodel.Token
+ db db.DB
+ oauthServer oauth.Server
+
+ streamProcessor stream.Processor
+}
+
+func (suite *StreamTestSuite) SetupTest() {
+ testrig.InitTestLog()
+ testrig.InitTestConfig()
+
+ suite.testAccounts = testrig.NewTestAccounts()
+ suite.testTokens = testrig.NewTestTokens()
+ suite.db = testrig.NewTestDB()
+ suite.oauthServer = testrig.NewTestOauthServer(suite.db)
+ suite.streamProcessor = stream.New(suite.db, suite.oauthServer)
+
+ testrig.StandardDBSetup(suite.db, suite.testAccounts)
+}
+
+func (suite *StreamTestSuite) TearDownTest() {
+ testrig.StandardDBTeardown(suite.db)
+}
diff --git a/internal/processing/stream/update.go b/internal/processing/stream/update.go
new file mode 100644
index 000000000..41ce2c4db
--- /dev/null
+++ b/internal/processing/stream/update.go
@@ -0,0 +1,38 @@
+/*
+ GoToSocial
+ Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+package stream
+
+import (
+ "encoding/json"
+ "fmt"
+
+ apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/stream"
+)
+
+// Update streams the given update to any open, appropriate streams belonging to the given account.
+func (p *Processor) Update(s *apimodel.Status, account *gtsmodel.Account, timeline string) error {
+ bytes, err := json.Marshal(s)
+ if err != nil {
+ return fmt.Errorf("error marshalling status to json: %s", err)
+ }
+
+ return p.toAccount(string(bytes), stream.EventTypeUpdate, []string{timeline}, account.ID)
+}