diff options
Diffstat (limited to 'internal/processing')
-rw-r--r-- | internal/processing/account/update.go | 10 | ||||
-rw-r--r-- | internal/processing/app.go | 2 | ||||
-rw-r--r-- | internal/processing/instance.go | 12 | ||||
-rw-r--r-- | internal/processing/processor.go | 3 | ||||
-rw-r--r-- | internal/processing/streaming.go | 3 | ||||
-rw-r--r-- | internal/processing/streaming/openstream.go | 19 | ||||
-rw-r--r-- | internal/processing/streaming/streamdelete.go | 16 | ||||
-rw-r--r-- | internal/processing/streaming/streaming.go | 3 | ||||
-rw-r--r-- | internal/processing/streaming/streamnotification.go | 17 | ||||
-rw-r--r-- | internal/processing/streaming/streamstatus.go | 17 |
10 files changed, 53 insertions, 49 deletions
diff --git a/internal/processing/account/update.go b/internal/processing/account/update.go index 5cc95b71f..c0fee8e25 100644 --- a/internal/processing/account/update.go +++ b/internal/processing/account/update.go @@ -32,7 +32,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/media" "github.com/superseriousbusiness/gotosocial/internal/messages" "github.com/superseriousbusiness/gotosocial/internal/text" - "github.com/superseriousbusiness/gotosocial/internal/util" + "github.com/superseriousbusiness/gotosocial/internal/validate" ) func (p *processor) Update(ctx context.Context, account *gtsmodel.Account, form *apimodel.UpdateCredentialsRequest) (*apimodel.Account, error) { @@ -51,7 +51,7 @@ func (p *processor) Update(ctx context.Context, account *gtsmodel.Account, form } if form.DisplayName != nil { - if err := util.ValidateDisplayName(*form.DisplayName); err != nil { + if err := validate.DisplayName(*form.DisplayName); err != nil { return nil, err } displayName := text.RemoveHTML(*form.DisplayName) // no html allowed in display name @@ -61,7 +61,7 @@ func (p *processor) Update(ctx context.Context, account *gtsmodel.Account, form } if form.Note != nil { - if err := util.ValidateNote(*form.Note); err != nil { + if err := validate.Note(*form.Note); err != nil { return nil, err } note := text.SanitizeHTML(*form.Note) // html OK in note but sanitize it @@ -94,7 +94,7 @@ func (p *processor) Update(ctx context.Context, account *gtsmodel.Account, form if form.Source != nil { if form.Source.Language != nil { - if err := util.ValidateLanguage(*form.Source.Language); err != nil { + if err := validate.Language(*form.Source.Language); err != nil { return nil, err } if err := p.db.UpdateOneByID(ctx, account.ID, "language", *form.Source.Language, >smodel.Account{}); err != nil { @@ -109,7 +109,7 @@ func (p *processor) Update(ctx context.Context, account *gtsmodel.Account, form } if form.Source.Privacy != nil { - if err := util.ValidatePrivacy(*form.Source.Privacy); err != nil { + if err := validate.Privacy(*form.Source.Privacy); err != nil { return nil, err } if err := p.db.UpdateOneByID(ctx, account.ID, "privacy", *form.Source.Privacy, >smodel.Account{}); err != nil { diff --git a/internal/processing/app.go b/internal/processing/app.go index fc6814196..d6ded6efa 100644 --- a/internal/processing/app.go +++ b/internal/processing/app.go @@ -43,7 +43,6 @@ func (p *processor) AppCreate(ctx context.Context, authed *oauth.Auth, form *api return nil, err } clientSecret := uuid.NewString() - vapidKey := uuid.NewString() appID, err := id.NewRandomULID() if err != nil { @@ -59,7 +58,6 @@ func (p *processor) AppCreate(ctx context.Context, authed *oauth.Auth, form *api ClientID: clientID, ClientSecret: clientSecret, Scopes: scopes, - VapidKey: vapidKey, } // chuck it in the db diff --git a/internal/processing/instance.go b/internal/processing/instance.go index ced798c2e..e74d3077a 100644 --- a/internal/processing/instance.go +++ b/internal/processing/instance.go @@ -27,7 +27,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/text" - "github.com/superseriousbusiness/gotosocial/internal/util" + "github.com/superseriousbusiness/gotosocial/internal/validate" ) func (p *processor) InstanceGet(ctx context.Context, domain string) (*apimodel.Instance, gtserror.WithCode) { @@ -59,7 +59,7 @@ func (p *processor) InstancePatch(ctx context.Context, form *apimodel.InstanceSe // validate & update site title if it's set on the form if form.Title != nil { - if err := util.ValidateSiteTitle(*form.Title); err != nil { + if err := validate.SiteTitle(*form.Title); err != nil { return nil, gtserror.NewErrorBadRequest(err, fmt.Sprintf("site title invalid: %s", err)) } i.Title = text.RemoveHTML(*form.Title) // don't allow html in site title @@ -101,7 +101,7 @@ func (p *processor) InstancePatch(ctx context.Context, form *apimodel.InstanceSe // validate & update site contact email if it's set on the form if form.ContactEmail != nil { - if err := util.ValidateEmail(*form.ContactEmail); err != nil { + if err := validate.Email(*form.ContactEmail); err != nil { return nil, gtserror.NewErrorBadRequest(err, err.Error()) } i.ContactEmail = *form.ContactEmail @@ -109,7 +109,7 @@ func (p *processor) InstancePatch(ctx context.Context, form *apimodel.InstanceSe // validate & update site short description if it's set on the form if form.ShortDescription != nil { - if err := util.ValidateSiteShortDescription(*form.ShortDescription); err != nil { + if err := validate.SiteShortDescription(*form.ShortDescription); err != nil { return nil, gtserror.NewErrorBadRequest(err, err.Error()) } i.ShortDescription = text.SanitizeHTML(*form.ShortDescription) // html is OK in site description, but we should sanitize it @@ -117,7 +117,7 @@ func (p *processor) InstancePatch(ctx context.Context, form *apimodel.InstanceSe // validate & update site description if it's set on the form if form.Description != nil { - if err := util.ValidateSiteDescription(*form.Description); err != nil { + if err := validate.SiteDescription(*form.Description); err != nil { return nil, gtserror.NewErrorBadRequest(err, err.Error()) } i.Description = text.SanitizeHTML(*form.Description) // html is OK in site description, but we should sanitize it @@ -125,7 +125,7 @@ func (p *processor) InstancePatch(ctx context.Context, form *apimodel.InstanceSe // validate & update site terms if it's set on the form if form.Terms != nil { - if err := util.ValidateSiteTerms(*form.Terms); err != nil { + if err := validate.SiteTerms(*form.Terms); err != nil { return nil, gtserror.NewErrorBadRequest(err, err.Error()) } i.Terms = text.SanitizeHTML(*form.Terms) // html is OK in site terms, but we should sanitize it diff --git a/internal/processing/processor.go b/internal/processing/processor.go index 1ade38564..38076123f 100644 --- a/internal/processing/processor.go +++ b/internal/processing/processor.go @@ -39,6 +39,7 @@ import ( mediaProcessor "github.com/superseriousbusiness/gotosocial/internal/processing/media" "github.com/superseriousbusiness/gotosocial/internal/processing/status" "github.com/superseriousbusiness/gotosocial/internal/processing/streaming" + "github.com/superseriousbusiness/gotosocial/internal/stream" "github.com/superseriousbusiness/gotosocial/internal/timeline" "github.com/superseriousbusiness/gotosocial/internal/typeutils" "github.com/superseriousbusiness/gotosocial/internal/visibility" @@ -166,7 +167,7 @@ type Processor interface { // AuthorizeStreamingRequest returns a gotosocial account in exchange for an access token, or an error if the given token is not valid. AuthorizeStreamingRequest(ctx context.Context, accessToken string) (*gtsmodel.Account, error) // OpenStreamForAccount opens a new stream for the given account, with the given stream type. - OpenStreamForAccount(ctx context.Context, account *gtsmodel.Account, streamType string) (*gtsmodel.Stream, gtserror.WithCode) + OpenStreamForAccount(ctx context.Context, account *gtsmodel.Account, streamType string) (*stream.Stream, gtserror.WithCode) /* FEDERATION API-FACING PROCESSING FUNCTIONS diff --git a/internal/processing/streaming.go b/internal/processing/streaming.go index e1c134d00..fd5113b0d 100644 --- a/internal/processing/streaming.go +++ b/internal/processing/streaming.go @@ -23,12 +23,13 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/stream" ) func (p *processor) AuthorizeStreamingRequest(ctx context.Context, accessToken string) (*gtsmodel.Account, error) { return p.streamingProcessor.AuthorizeStreamingRequest(ctx, accessToken) } -func (p *processor) OpenStreamForAccount(ctx context.Context, account *gtsmodel.Account, streamType string) (*gtsmodel.Stream, gtserror.WithCode) { +func (p *processor) OpenStreamForAccount(ctx context.Context, account *gtsmodel.Account, streamType string) (*stream.Stream, gtserror.WithCode) { return p.streamingProcessor.OpenStreamForAccount(ctx, account, streamType) } diff --git a/internal/processing/streaming/openstream.go b/internal/processing/streaming/openstream.go index dfad5398e..d4e4eef9f 100644 --- a/internal/processing/streaming/openstream.go +++ b/internal/processing/streaming/openstream.go @@ -9,9 +9,10 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/id" + "github.com/superseriousbusiness/gotosocial/internal/stream" ) -func (p *processor) OpenStreamForAccount(ctx context.Context, account *gtsmodel.Account, streamType string) (*gtsmodel.Stream, gtserror.WithCode) { +func (p *processor) OpenStreamForAccount(ctx context.Context, account *gtsmodel.Account, streamType string) (*stream.Stream, gtserror.WithCode) { l := p.log.WithFields(logrus.Fields{ "func": "OpenStreamForAccount", "account": account.ID, @@ -25,10 +26,10 @@ func (p *processor) OpenStreamForAccount(ctx context.Context, account *gtsmodel. return nil, gtserror.NewErrorInternalError(fmt.Errorf("error generating stream id: %s", err)) } - thisStream := >smodel.Stream{ + thisStream := &stream.Stream{ ID: streamID, Type: streamType, - Messages: make(chan *gtsmodel.Message, 100), + Messages: make(chan *stream.Message, 100), Hangup: make(chan interface{}, 1), Connected: true, } @@ -37,8 +38,8 @@ func (p *processor) OpenStreamForAccount(ctx context.Context, account *gtsmodel. v, ok := p.streamMap.Load(account.ID) if !ok || v == nil { // there is no entry in the streamMap for this account yet, so make one and store it - streamsForAccount := >smodel.StreamsForAccount{ - Streams: []*gtsmodel.Stream{ + streamsForAccount := &stream.StreamsForAccount{ + Streams: []*stream.Stream{ thisStream, }, } @@ -46,7 +47,7 @@ func (p *processor) OpenStreamForAccount(ctx context.Context, account *gtsmodel. } else { // there is an entry in the streamMap for this account // parse the interface as a streamsForAccount - streamsForAccount, ok := v.(*gtsmodel.StreamsForAccount) + streamsForAccount, ok := v.(*stream.StreamsForAccount) if !ok { return nil, gtserror.NewErrorInternalError(errors.New("stream map error")) } @@ -63,7 +64,7 @@ func (p *processor) OpenStreamForAccount(ctx context.Context, account *gtsmodel. // waitToCloseStream waits until the hangup channel is closed for the given stream. // It then iterates through the map of streams stored by the processor, removes the stream from it, // and then closes the messages channel of the stream to indicate that the channel should no longer be read from. -func (p *processor) waitToCloseStream(account *gtsmodel.Account, thisStream *gtsmodel.Stream) { +func (p *processor) waitToCloseStream(account *gtsmodel.Account, thisStream *stream.Stream) { <-thisStream.Hangup // wait for a hangup message // lock the stream to prevent more messages being put in it while we work @@ -78,7 +79,7 @@ func (p *processor) waitToCloseStream(account *gtsmodel.Account, thisStream *gts if !ok || v == nil { return } - streamsForAccount, ok := v.(*gtsmodel.StreamsForAccount) + streamsForAccount, ok := v.(*stream.StreamsForAccount) if !ok { return } @@ -88,7 +89,7 @@ func (p *processor) waitToCloseStream(account *gtsmodel.Account, thisStream *gts defer streamsForAccount.Unlock() // put everything into modified streams *except* the stream we're removing - modifiedStreams := []*gtsmodel.Stream{} + modifiedStreams := []*stream.Stream{} for _, s := range streamsForAccount.Streams { if s.ID != thisStream.ID { modifiedStreams = append(modifiedStreams, s) diff --git a/internal/processing/streaming/streamdelete.go b/internal/processing/streaming/streamdelete.go index 2282c29ae..cd541bc57 100644 --- a/internal/processing/streaming/streamdelete.go +++ b/internal/processing/streaming/streamdelete.go @@ -4,7 +4,7 @@ import ( "fmt" "strings" - "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/stream" ) func (p *processor) StreamDelete(statusID string) error { @@ -20,7 +20,7 @@ func (p *processor) StreamDelete(statusID string) error { } // the value of the map should be a buncha streams - streamsForAccount, ok := v.(*gtsmodel.StreamsForAccount) + streamsForAccount, ok := v.(*stream.StreamsForAccount) if !ok { errs = append(errs, fmt.Sprintf("stream map error for account stream %s", accountID)) } @@ -28,13 +28,13 @@ func (p *processor) StreamDelete(statusID string) error { // lock the streams while we work on them streamsForAccount.Lock() defer streamsForAccount.Unlock() - for _, stream := range streamsForAccount.Streams { + for _, s := range streamsForAccount.Streams { // lock each individual stream as we work on it - stream.Lock() - defer stream.Unlock() - if stream.Connected { - stream.Messages <- >smodel.Message{ - Stream: []string{stream.Type}, + s.Lock() + defer s.Unlock() + if s.Connected { + s.Messages <- &stream.Message{ + Stream: []string{s.Type}, Event: "delete", Payload: statusID, } diff --git a/internal/processing/streaming/streaming.go b/internal/processing/streaming/streaming.go index f349a655a..610d4a9d2 100644 --- a/internal/processing/streaming/streaming.go +++ b/internal/processing/streaming/streaming.go @@ -11,6 +11,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/oauth" + "github.com/superseriousbusiness/gotosocial/internal/stream" "github.com/superseriousbusiness/gotosocial/internal/typeutils" "github.com/superseriousbusiness/gotosocial/internal/visibility" ) @@ -20,7 +21,7 @@ type Processor interface { // AuthorizeStreamingRequest returns an oauth2 token info in response to an access token query from the streaming API AuthorizeStreamingRequest(ctx context.Context, accessToken string) (*gtsmodel.Account, error) // OpenStreamForAccount returns a new Stream for the given account, which will contain a channel for passing messages back to the caller. - OpenStreamForAccount(ctx context.Context, account *gtsmodel.Account, streamType string) (*gtsmodel.Stream, gtserror.WithCode) + OpenStreamForAccount(ctx context.Context, account *gtsmodel.Account, streamType string) (*stream.Stream, gtserror.WithCode) // StreamStatusToAccount streams the given status to any open, appropriate streams belonging to the given account. StreamStatusToAccount(s *apimodel.Status, account *gtsmodel.Account) error // StreamNotificationToAccount streams the given notification to any open, appropriate streams belonging to the given account. diff --git a/internal/processing/streaming/streamnotification.go b/internal/processing/streaming/streamnotification.go index 24c8342ee..d8460874f 100644 --- a/internal/processing/streaming/streamnotification.go +++ b/internal/processing/streaming/streamnotification.go @@ -8,6 +8,7 @@ import ( "github.com/sirupsen/logrus" apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/stream" ) func (p *processor) StreamNotificationToAccount(n *apimodel.Notification, account *gtsmodel.Account) error { @@ -21,7 +22,7 @@ func (p *processor) StreamNotificationToAccount(n *apimodel.Notification, accoun return nil } - streamsForAccount, ok := v.(*gtsmodel.StreamsForAccount) + streamsForAccount, ok := v.(*stream.StreamsForAccount) if !ok { return errors.New("stream map error") } @@ -33,13 +34,13 @@ func (p *processor) StreamNotificationToAccount(n *apimodel.Notification, accoun streamsForAccount.Lock() defer streamsForAccount.Unlock() - for _, stream := range streamsForAccount.Streams { - stream.Lock() - defer stream.Unlock() - if stream.Connected { - l.Debugf("streaming notification to stream id %s", stream.ID) - stream.Messages <- >smodel.Message{ - Stream: []string{stream.Type}, + for _, s := range streamsForAccount.Streams { + s.Lock() + defer s.Unlock() + if s.Connected { + l.Debugf("streaming notification to stream id %s", s.ID) + s.Messages <- &stream.Message{ + Stream: []string{s.Type}, Event: "notification", Payload: string(notificationBytes), } diff --git a/internal/processing/streaming/streamstatus.go b/internal/processing/streaming/streamstatus.go index 8d026252d..f4d6b2629 100644 --- a/internal/processing/streaming/streamstatus.go +++ b/internal/processing/streaming/streamstatus.go @@ -8,6 +8,7 @@ import ( "github.com/sirupsen/logrus" apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/stream" ) func (p *processor) StreamStatusToAccount(s *apimodel.Status, account *gtsmodel.Account) error { @@ -21,7 +22,7 @@ func (p *processor) StreamStatusToAccount(s *apimodel.Status, account *gtsmodel. return nil } - streamsForAccount, ok := v.(*gtsmodel.StreamsForAccount) + streamsForAccount, ok := v.(*stream.StreamsForAccount) if !ok { return errors.New("stream map error") } @@ -33,13 +34,13 @@ func (p *processor) StreamStatusToAccount(s *apimodel.Status, account *gtsmodel. streamsForAccount.Lock() defer streamsForAccount.Unlock() - for _, stream := range streamsForAccount.Streams { - stream.Lock() - defer stream.Unlock() - if stream.Connected { - l.Debugf("streaming status to stream id %s", stream.ID) - stream.Messages <- >smodel.Message{ - Stream: []string{stream.Type}, + for _, s := range streamsForAccount.Streams { + s.Lock() + defer s.Unlock() + if s.Connected { + l.Debugf("streaming status to stream id %s", s.ID) + s.Messages <- &stream.Message{ + Stream: []string{s.Type}, Event: "update", Payload: string(statusBytes), } |