diff options
author | 2024-05-02 14:43:00 +0200 | |
---|---|---|
committer | 2024-05-02 13:43:00 +0100 | |
commit | ebec95a52280980caa88b9c8cd92d69c1a7dc164 (patch) | |
tree | afea663a4ed5a5312755c828e104f27ec137a7d6 /internal/processing/workers/surfacenotify.go | |
parent | [feature] Page through accounts as moderator (#2881) (diff) | |
download | gotosocial-ebec95a52280980caa88b9c8cd92d69c1a7dc164.tar.xz |
[bugfix] Lock when checking/creating notifs to avoid race (#2890)
* [bugfix] Lock when checking/creating notifs to avoid race
* test notif spam
Diffstat (limited to 'internal/processing/workers/surfacenotify.go')
-rw-r--r-- | internal/processing/workers/surfacenotify.go | 110 |
1 files changed, 74 insertions, 36 deletions
diff --git a/internal/processing/workers/surfacenotify.go b/internal/processing/workers/surfacenotify.go index 9c82712f2..be729fa7e 100644 --- a/internal/processing/workers/surfacenotify.go +++ b/internal/processing/workers/surfacenotify.go @@ -20,18 +20,20 @@ package workers import ( "context" "errors" + "strings" "github.com/superseriousbusiness/gotosocial/internal/db" "github.com/superseriousbusiness/gotosocial/internal/gtscontext" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/id" + "github.com/superseriousbusiness/gotosocial/internal/util" ) // notifyMentions iterates through mentions on the // given status, and notifies each mentioned account // that they have a new mention. -func (s *surface) notifyMentions( +func (s *Surface) notifyMentions( ctx context.Context, status *gtsmodel.Status, ) error { @@ -43,7 +45,7 @@ func (s *surface) notifyMentions( mention.Status = status // Beforehand, ensure the passed mention is fully populated. - if err := s.state.DB.PopulateMention(ctx, mention); err != nil { + if err := s.State.DB.PopulateMention(ctx, mention); err != nil { errs.Appendf("error populating mention %s: %w", mention.ID, err) continue } @@ -56,7 +58,7 @@ func (s *surface) notifyMentions( // Ensure thread not muted // by mentioned account. - muted, err := s.state.DB.IsThreadMutedByAccount( + muted, err := s.State.DB.IsThreadMutedByAccount( ctx, status.ThreadID, mention.TargetAccountID, @@ -75,7 +77,7 @@ func (s *surface) notifyMentions( // notify mentioned // by status author. - if err := s.notify(ctx, + if err := s.Notify(ctx, gtsmodel.NotificationMention, mention.TargetAccount, mention.OriginAccount, @@ -91,12 +93,12 @@ func (s *surface) notifyMentions( // notifyFollowRequest notifies the target of the given // follow request that they have a new follow request. -func (s *surface) notifyFollowRequest( +func (s *Surface) notifyFollowRequest( ctx context.Context, followReq *gtsmodel.FollowRequest, ) error { // Beforehand, ensure the passed follow request is fully populated. - if err := s.state.DB.PopulateFollowRequest(ctx, followReq); err != nil { + if err := s.State.DB.PopulateFollowRequest(ctx, followReq); err != nil { return gtserror.Newf("error populating follow request %s: %w", followReq.ID, err) } @@ -107,7 +109,7 @@ func (s *surface) notifyFollowRequest( } // Now notify the follow request itself. - if err := s.notify(ctx, + if err := s.Notify(ctx, gtsmodel.NotificationFollowRequest, followReq.TargetAccount, followReq.Account, @@ -123,12 +125,12 @@ func (s *surface) notifyFollowRequest( // they have a new follow. It will also remove any previous // notification of a follow request, essentially replacing // that notification. -func (s *surface) notifyFollow( +func (s *Surface) notifyFollow( ctx context.Context, follow *gtsmodel.Follow, ) error { // Beforehand, ensure the passed follow is fully populated. - if err := s.state.DB.PopulateFollow(ctx, follow); err != nil { + if err := s.State.DB.PopulateFollow(ctx, follow); err != nil { return gtserror.Newf("error populating follow %s: %w", follow.ID, err) } @@ -139,7 +141,7 @@ func (s *surface) notifyFollow( } // Check if previous follow req notif exists. - prevNotif, err := s.state.DB.GetNotification( + prevNotif, err := s.State.DB.GetNotification( gtscontext.SetBarebones(ctx), gtsmodel.NotificationFollowRequest, follow.TargetAccountID, @@ -152,14 +154,14 @@ func (s *surface) notifyFollow( if prevNotif != nil { // Previous follow request notif existed, delete it before creating new. - if err := s.state.DB.DeleteNotificationByID(ctx, prevNotif.ID); // nocollapse + if err := s.State.DB.DeleteNotificationByID(ctx, prevNotif.ID); // nocollapse err != nil && !errors.Is(err, db.ErrNoEntries) { return gtserror.Newf("error deleting notification %s: %w", prevNotif.ID, err) } } // Now notify the follow itself. - if err := s.notify(ctx, + if err := s.Notify(ctx, gtsmodel.NotificationFollow, follow.TargetAccount, follow.Account, @@ -173,7 +175,7 @@ func (s *surface) notifyFollow( // notifyFave notifies the target of the given // fave that their status has been liked/faved. -func (s *surface) notifyFave( +func (s *Surface) notifyFave( ctx context.Context, fave *gtsmodel.StatusFave, ) error { @@ -183,7 +185,7 @@ func (s *surface) notifyFave( } // Beforehand, ensure the passed status fave is fully populated. - if err := s.state.DB.PopulateStatusFave(ctx, fave); err != nil { + if err := s.State.DB.PopulateStatusFave(ctx, fave); err != nil { return gtserror.Newf("error populating fave %s: %w", fave.ID, err) } @@ -195,7 +197,7 @@ func (s *surface) notifyFave( // Ensure favee hasn't // muted the thread. - muted, err := s.state.DB.IsThreadMutedByAccount( + muted, err := s.State.DB.IsThreadMutedByAccount( ctx, fave.Status.ThreadID, fave.TargetAccountID, @@ -212,7 +214,7 @@ func (s *surface) notifyFave( // notify status author // of fave by account. - if err := s.notify(ctx, + if err := s.Notify(ctx, gtsmodel.NotificationFave, fave.TargetAccount, fave.Account, @@ -226,7 +228,7 @@ func (s *surface) notifyFave( // notifyAnnounce notifies the status boost target // account that their status has been boosted. -func (s *surface) notifyAnnounce( +func (s *Surface) notifyAnnounce( ctx context.Context, status *gtsmodel.Status, ) error { @@ -241,7 +243,7 @@ func (s *surface) notifyAnnounce( } // Beforehand, ensure the passed status is fully populated. - if err := s.state.DB.PopulateStatus(ctx, status); err != nil { + if err := s.State.DB.PopulateStatus(ctx, status); err != nil { return gtserror.Newf("error populating status %s: %w", status.ID, err) } @@ -253,7 +255,7 @@ func (s *surface) notifyAnnounce( // Ensure boostee hasn't // muted the thread. - muted, err := s.state.DB.IsThreadMutedByAccount( + muted, err := s.State.DB.IsThreadMutedByAccount( ctx, status.BoostOf.ThreadID, status.BoostOfAccountID, @@ -271,7 +273,7 @@ func (s *surface) notifyAnnounce( // notify status author // of boost by account. - if err := s.notify(ctx, + if err := s.Notify(ctx, gtsmodel.NotificationReblog, status.BoostOfAccount, status.Account, @@ -283,14 +285,14 @@ func (s *surface) notifyAnnounce( return nil } -func (s *surface) notifyPollClose(ctx context.Context, status *gtsmodel.Status) error { +func (s *Surface) notifyPollClose(ctx context.Context, status *gtsmodel.Status) error { // Beforehand, ensure the passed status is fully populated. - if err := s.state.DB.PopulateStatus(ctx, status); err != nil { + if err := s.State.DB.PopulateStatus(ctx, status); err != nil { return gtserror.Newf("error populating status %s: %w", status.ID, err) } // Fetch all votes in the attached status poll. - votes, err := s.state.DB.GetPollVotes(ctx, status.PollID) + votes, err := s.State.DB.GetPollVotes(ctx, status.PollID) if err != nil { return gtserror.Newf("error getting poll %s votes: %w", status.PollID, err) } @@ -300,7 +302,7 @@ func (s *surface) notifyPollClose(ctx context.Context, status *gtsmodel.Status) if status.Account.IsLocal() { // Send a notification to the status // author that their poll has closed! - if err := s.notify(ctx, + if err := s.Notify(ctx, gtsmodel.NotificationPoll, status.Account, status.Account, @@ -319,7 +321,7 @@ func (s *surface) notifyPollClose(ctx context.Context, status *gtsmodel.Status) // notify voter that // poll has been closed. - if err := s.notify(ctx, + if err := s.Notify(ctx, gtsmodel.NotificationPoll, vote.Account, status.Account, @@ -333,8 +335,8 @@ func (s *surface) notifyPollClose(ctx context.Context, status *gtsmodel.Status) return errs.Combine() } -func (s *surface) notifySignup(ctx context.Context, newUser *gtsmodel.User) error { - modAccounts, err := s.state.DB.GetInstanceModerators(ctx) +func (s *Surface) notifySignup(ctx context.Context, newUser *gtsmodel.User) error { + modAccounts, err := s.State.DB.GetInstanceModerators(ctx) if err != nil { if errors.Is(err, db.ErrNoEntries) { // No registered @@ -347,18 +349,18 @@ func (s *surface) notifySignup(ctx context.Context, newUser *gtsmodel.User) erro } // Ensure user + account populated. - if err := s.state.DB.PopulateUser(ctx, newUser); err != nil { + if err := s.State.DB.PopulateUser(ctx, newUser); err != nil { return gtserror.Newf("db error populating new user: %w", err) } - if err := s.state.DB.PopulateAccount(ctx, newUser.Account); err != nil { + if err := s.State.DB.PopulateAccount(ctx, newUser.Account); err != nil { return gtserror.Newf("db error populating new user's account: %w", err) } // Notify each moderator. var errs gtserror.MultiError for _, mod := range modAccounts { - if err := s.notify(ctx, + if err := s.Notify(ctx, gtsmodel.NotificationSignup, mod, newUser.Account, @@ -372,7 +374,24 @@ func (s *surface) notifySignup(ctx context.Context, newUser *gtsmodel.User) erro return errs.Combine() } -// notify creates, inserts, and streams a new +func getNotifyLockURI( + notificationType gtsmodel.NotificationType, + targetAccount *gtsmodel.Account, + originAccount *gtsmodel.Account, + statusID string, +) string { + builder := strings.Builder{} + builder.WriteString("notification:?") + builder.WriteString("type=" + string(notificationType)) + builder.WriteString("&target=" + targetAccount.URI) + builder.WriteString("&origin=" + originAccount.URI) + if statusID != "" { + builder.WriteString("&statusID=" + statusID) + } + return builder.String() +} + +// Notify creates, inserts, and streams a new // notification to the target account if it // doesn't yet exist with the given parameters. // @@ -383,7 +402,7 @@ func (s *surface) notifySignup(ctx context.Context, newUser *gtsmodel.User) erro // // targetAccount and originAccount must be // set, but statusID can be an empty string. -func (s *surface) notify( +func (s *Surface) Notify( ctx context.Context, notificationType gtsmodel.NotificationType, targetAccount *gtsmodel.Account, @@ -395,9 +414,24 @@ func (s *surface) notify( return nil } + // We're doing state-y stuff so get a + // lock on this combo of notif params. + lockURI := getNotifyLockURI( + notificationType, + targetAccount, + originAccount, + statusID, + ) + unlock := s.State.ProcessingLocks.Lock(lockURI) + + // Wrap the unlock so we + // can do granular unlocking. + unlock = util.DoOnce(unlock) + defer unlock() + // Make sure a notification doesn't // already exist with these params. - if _, err := s.state.DB.GetNotification( + if _, err := s.State.DB.GetNotification( gtscontext.SetBarebones(ctx), notificationType, targetAccount.ID, @@ -424,16 +458,20 @@ func (s *surface) notify( StatusID: statusID, } - if err := s.state.DB.PutNotification(ctx, notif); err != nil { + if err := s.State.DB.PutNotification(ctx, notif); err != nil { return gtserror.Newf("error putting notification in database: %w", err) } + // Unlock already, we're done + // with the state-y stuff. + unlock() + // Stream notification to the user. - apiNotif, err := s.converter.NotificationToAPINotification(ctx, notif) + apiNotif, err := s.Converter.NotificationToAPINotification(ctx, notif) if err != nil { return gtserror.Newf("error converting notification to api representation: %w", err) } - s.stream.Notify(ctx, targetAccount, apiNotif) + s.Stream.Notify(ctx, targetAccount, apiNotif) return nil } |