diff options
Diffstat (limited to 'internal/federation')
-rw-r--r-- | internal/federation/dereferencing/account.go | 139 | ||||
-rw-r--r-- | internal/federation/dereferencing/dereferencer.go | 23 | ||||
-rw-r--r-- | internal/federation/dereferencing/emoji.go | 17 | ||||
-rw-r--r-- | internal/federation/dereferencing/status.go | 167 | ||||
-rw-r--r-- | internal/federation/dereferencing/util.go (renamed from internal/federation/dereferencing/error.go) | 11 | ||||
-rw-r--r-- | internal/federation/federatingdb/db.go | 3 | ||||
-rw-r--r-- | internal/federation/federatingdb/lock.go | 7 |
7 files changed, 239 insertions, 128 deletions
diff --git a/internal/federation/dereferencing/account.go b/internal/federation/dereferencing/account.go index 670c8e2c8..58f07f9cd 100644 --- a/internal/federation/dereferencing/account.go +++ b/internal/federation/dereferencing/account.go @@ -122,7 +122,7 @@ func (d *Dereferencer) getAccountByURI(ctx context.Context, requestUser string, } // Create and pass-through a new bare-bones model for dereferencing. - return d.enrichAccount(ctx, requestUser, uri, >smodel.Account{ + return d.enrichAccountSafely(ctx, requestUser, uri, >smodel.Account{ ID: id.NewULID(), Domain: uri.Host, URI: uriStr, @@ -139,7 +139,7 @@ func (d *Dereferencer) getAccountByURI(ctx context.Context, requestUser string, } // Try to update existing account model. - latest, apubAcc, err := d.enrichAccount(ctx, + latest, apubAcc, err := d.enrichAccountSafely(ctx, requestUser, uri, account, @@ -148,10 +148,6 @@ func (d *Dereferencer) getAccountByURI(ctx context.Context, requestUser string, if err != nil { log.Errorf(ctx, "error enriching remote account: %v", err) - // Update fetch-at to slow re-attempts. - account.FetchedAt = time.Now() - _ = d.state.DB.UpdateAccount(ctx, account, "fetched_at") - // Fallback to existing. return account, nil, nil } @@ -218,7 +214,7 @@ func (d *Dereferencer) getAccountByUsernameDomain( } // Create and pass-through a new bare-bones model for dereferencing. - account, apubAcc, err := d.enrichAccount(ctx, requestUser, nil, >smodel.Account{ + account, apubAcc, err := d.enrichAccountSafely(ctx, requestUser, nil, >smodel.Account{ ID: id.NewULID(), Username: username, Domain: domain, @@ -244,7 +240,7 @@ func (d *Dereferencer) getAccountByUsernameDomain( if apubAcc == nil { // This is existing up-to-date account, ensure it is populated. - if err := d.state.DB.PopulateAccount(ctx, account); err != nil { + if err := d.state.DB.PopulateAccount(ctx, latest); err != nil { log.Errorf(ctx, "error populating existing account: %v", err) } } @@ -267,8 +263,8 @@ func (d *Dereferencer) RefreshAccount(ctx context.Context, requestUser string, a return nil, nil, gtserror.Newf("invalid account uri %q: %w", account.URI, err) } - // Try to update + deref existing account model. - latest, apubAcc, err := d.enrichAccount(ctx, + // Try to update + deref passed account model. + latest, apubAcc, err := d.enrichAccountSafely(ctx, requestUser, uri, account, @@ -276,20 +272,17 @@ func (d *Dereferencer) RefreshAccount(ctx context.Context, requestUser string, a ) if err != nil { log.Errorf(ctx, "error enriching remote account: %v", err) - - // Update fetch-at to slow re-attempts. - account.FetchedAt = time.Now() - _ = d.state.DB.UpdateAccount(ctx, account, "fetched_at") - - return nil, nil, err + return nil, nil, gtserror.Newf("error enriching remote account: %w", err) } - // This account was updated, enqueue re-dereference featured posts. - d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { - if err := d.dereferenceAccountFeatured(ctx, requestUser, account); err != nil { - log.Errorf(ctx, "error fetching account featured collection: %v", err) - } - }) + if apubAcc != nil { + // This account was updated, enqueue re-dereference featured posts. + d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { + if err := d.dereferenceAccountFeatured(ctx, requestUser, latest); err != nil { + log.Errorf(ctx, "error fetching account featured collection: %v", err) + } + }) + } return latest, apubAcc, nil } @@ -311,21 +304,94 @@ func (d *Dereferencer) RefreshAccountAsync(ctx context.Context, requestUser stri // Enqueue a worker function to enrich this account async. d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { - latest, _, err := d.enrichAccount(ctx, requestUser, uri, account, apubAcc) + latest, apubAcc, err := d.enrichAccountSafely(ctx, requestUser, uri, account, apubAcc) if err != nil { log.Errorf(ctx, "error enriching remote account: %v", err) return } - // This account was updated, re-dereference account featured posts. - if err := d.dereferenceAccountFeatured(ctx, requestUser, latest); err != nil { - log.Errorf(ctx, "error fetching account featured collection: %v", err) + if apubAcc != nil { + // This account was updated, enqueue re-dereference featured posts. + d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { + if err := d.dereferenceAccountFeatured(ctx, requestUser, latest); err != nil { + log.Errorf(ctx, "error fetching account featured collection: %v", err) + } + }) } }) } -// enrichAccount will enrich the given account, whether a new barebones model, or existing model from the database. It handles necessary dereferencing, webfingering etc. -func (d *Dereferencer) enrichAccount(ctx context.Context, requestUser string, uri *url.URL, account *gtsmodel.Account, apubAcc ap.Accountable) (*gtsmodel.Account, ap.Accountable, error) { +// enrichAccountSafely wraps enrichAccount() to perform +// it within the State{}.FedLocks mutexmap, which protects +// dereferencing actions with per-URI mutex locks. +func (d *Dereferencer) enrichAccountSafely( + ctx context.Context, + requestUser string, + uri *url.URL, + account *gtsmodel.Account, + apubAcc ap.Accountable, +) (*gtsmodel.Account, ap.Accountable, error) { + // By default use account.URI + // as the per-URI deref lock. + uriStr := account.URI + + if uriStr == "" { + // No URI is set yet, instead generate a faux-one from user+domain. + uriStr = "https://" + account.Domain + "/user/" + account.Username + } + + // Acquire per-URI deref lock, wraping unlock + // to safely defer in case of panic, while still + // performing more granular unlocks when needed. + unlock := d.state.FedLocks.Lock(uriStr) + unlock = doOnce(unlock) + defer unlock() + + // Perform status enrichment with passed vars. + latest, apubAcc, err := d.enrichAccount(ctx, + requestUser, + uri, + account, + apubAcc, + ) + + if gtserror.StatusCode(err) >= 400 { + // Update fetch-at to slow re-attempts. + account.FetchedAt = time.Now() + _ = d.state.DB.UpdateAccount(ctx, account, "fetched_at") + } + + // Unlock now + // we're done. + unlock() + + if errors.Is(err, db.ErrAlreadyExists) { + // Ensure AP model isn't set, + // otherwise this indicates WE + // enriched the account. + apubAcc = nil + + // DATA RACE! We likely lost out to another goroutine + // in a call to db.Put(Account). Look again in DB by URI. + latest, err = d.state.DB.GetAccountByURI(ctx, account.URI) + if err != nil { + err = gtserror.Newf("error getting account %s from database after race: %w", uriStr, err) + } + } + + return latest, apubAcc, err +} + +// enrichAccount will enrich the given account, whether a +// new barebones model, or existing model from the database. +// It handles necessary dereferencing, webfingering etc. +func (d *Dereferencer) enrichAccount( + ctx context.Context, + requestUser string, + uri *url.URL, + account *gtsmodel.Account, + apubAcc ap.Accountable, +) (*gtsmodel.Account, ap.Accountable, error) { // Pre-fetch a transport for requesting username, used by later deref procedures. tsport, err := d.transportController.NewTransportForUsername(ctx, requestUser) if err != nil { @@ -476,13 +542,6 @@ func (d *Dereferencer) enrichAccount(ctx context.Context, requestUser string, ur // This is new, put it in the database. err := d.state.DB.PutAccount(ctx, latestAcc) - - if errors.Is(err, db.ErrAlreadyExists) { - // TODO: replace this quick fix with per-URI deref locks. - latestAcc, err = d.state.DB.GetAccountByURI(ctx, latestAcc.URI) - return latestAcc, nil, err - } - if err != nil { return nil, nil, gtserror.Newf("error putting in database: %w", err) } @@ -545,7 +604,8 @@ func (d *Dereferencer) fetchRemoteAccountAvatar(ctx context.Context, tsport tran } // Acquire lock for derefs map. - unlock := d.derefAvatarsMu.Lock() + unlock := d.state.FedLocks.Lock(latestAcc.AvatarRemoteURL) + unlock = doOnce(unlock) defer unlock() // Look for an existing dereference in progress. @@ -573,7 +633,7 @@ func (d *Dereferencer) fetchRemoteAccountAvatar(ctx context.Context, tsport tran defer func() { // On exit safely remove media from map. - unlock := d.derefAvatarsMu.Lock() + unlock := d.state.FedLocks.Lock(latestAcc.AvatarRemoteURL) delete(d.derefAvatars, latestAcc.AvatarRemoteURL) unlock() }() @@ -635,7 +695,8 @@ func (d *Dereferencer) fetchRemoteAccountHeader(ctx context.Context, tsport tran } // Acquire lock for derefs map. - unlock := d.derefHeadersMu.Lock() + unlock := d.state.FedLocks.Lock(latestAcc.HeaderRemoteURL) + unlock = doOnce(unlock) defer unlock() // Look for an existing dereference in progress. @@ -663,7 +724,7 @@ func (d *Dereferencer) fetchRemoteAccountHeader(ctx context.Context, tsport tran defer func() { // On exit safely remove media from map. - unlock := d.derefHeadersMu.Lock() + unlock := d.state.FedLocks.Lock(latestAcc.HeaderRemoteURL) delete(d.derefHeaders, latestAcc.HeaderRemoteURL) unlock() }() diff --git a/internal/federation/dereferencing/dereferencer.go b/internal/federation/dereferencing/dereferencer.go index a5c68bd80..5bd16c1e0 100644 --- a/internal/federation/dereferencing/dereferencer.go +++ b/internal/federation/dereferencing/dereferencer.go @@ -21,7 +21,6 @@ import ( "net/url" "sync" - "codeberg.org/gruf/go-mutexes" "github.com/superseriousbusiness/gotosocial/internal/media" "github.com/superseriousbusiness/gotosocial/internal/state" "github.com/superseriousbusiness/gotosocial/internal/transport" @@ -35,14 +34,14 @@ type Dereferencer struct { converter *typeutils.Converter transportController transport.Controller mediaManager *media.Manager - derefAvatars map[string]*media.ProcessingMedia - derefAvatarsMu mutexes.Mutex - derefHeaders map[string]*media.ProcessingMedia - derefHeadersMu mutexes.Mutex - derefEmojis map[string]*media.ProcessingEmoji - derefEmojisMu mutexes.Mutex - handshakes map[string][]*url.URL - handshakesMu sync.Mutex // mutex to lock/unlock when checking or updating the handshakes map + + // all protected by State{}.FedLocks. + derefAvatars map[string]*media.ProcessingMedia + derefHeaders map[string]*media.ProcessingMedia + derefEmojis map[string]*media.ProcessingEmoji + + handshakes map[string][]*url.URL + handshakesMu sync.Mutex } // NewDereferencer returns a Dereferencer initialized with the given parameters. @@ -61,11 +60,5 @@ func NewDereferencer( derefHeaders: make(map[string]*media.ProcessingMedia), derefEmojis: make(map[string]*media.ProcessingEmoji), handshakes: make(map[string][]*url.URL), - - // use wrapped mutexes to allow safely deferring unlock - // even when more granular locks are required (only unlocks once). - derefAvatarsMu: mutexes.WithSafety(mutexes.New()), - derefHeadersMu: mutexes.WithSafety(mutexes.New()), - derefEmojisMu: mutexes.WithSafety(mutexes.New()), } } diff --git a/internal/federation/dereferencing/emoji.go b/internal/federation/dereferencing/emoji.go index 2d86da663..1bf19d2fd 100644 --- a/internal/federation/dereferencing/emoji.go +++ b/internal/federation/dereferencing/emoji.go @@ -36,8 +36,15 @@ func (d *Dereferencer) GetRemoteEmoji(ctx context.Context, requestingUsername st processingEmoji *media.ProcessingEmoji ) + // Ensure we have been passed a valid URL. + derefURI, err := url.Parse(remoteURL) + if err != nil { + return nil, fmt.Errorf("GetRemoteEmoji: error parsing url for emoji %s: %s", shortcodeDomain, err) + } + // Acquire lock for derefs map. - unlock := d.derefEmojisMu.Lock() + unlock := d.state.FedLocks.Lock(remoteURL) + unlock = doOnce(unlock) defer unlock() // first check if we're already processing this emoji @@ -51,11 +58,6 @@ func (d *Dereferencer) GetRemoteEmoji(ctx context.Context, requestingUsername st return nil, fmt.Errorf("GetRemoteEmoji: error creating transport to fetch emoji %s: %s", shortcodeDomain, err) } - derefURI, err := url.Parse(remoteURL) - if err != nil { - return nil, fmt.Errorf("GetRemoteEmoji: error parsing url for emoji %s: %s", shortcodeDomain, err) - } - dataFunc := func(innerCtx context.Context) (io.ReadCloser, int64, error) { return t.DereferenceMedia(innerCtx, derefURI) } @@ -75,7 +77,7 @@ func (d *Dereferencer) GetRemoteEmoji(ctx context.Context, requestingUsername st defer func() { // On exit safely remove emoji from map. - unlock := d.derefEmojisMu.Lock() + unlock := d.state.FedLocks.Lock(remoteURL) delete(d.derefEmojis, shortcodeDomain) unlock() }() @@ -95,7 +97,6 @@ func (d *Dereferencer) populateEmojis(ctx context.Context, rawEmojis []*gtsmodel // * the shortcode of the emoji // * the remote URL of the image // This should be enough to dereference the emoji - gotEmojis := make([]*gtsmodel.Emoji, 0, len(rawEmojis)) for _, e := range rawEmojis { diff --git a/internal/federation/dereferencing/status.go b/internal/federation/dereferencing/status.go index adc73e843..712692814 100644 --- a/internal/federation/dereferencing/status.go +++ b/internal/federation/dereferencing/status.go @@ -115,7 +115,7 @@ func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, u } // Create and pass-through a new bare-bones model for deref. - return d.enrichStatus(ctx, requestUser, uri, >smodel.Status{ + return d.enrichStatusSafely(ctx, requestUser, uri, >smodel.Status{ Local: func() *bool { var false bool; return &false }(), URI: uriStr, }, nil) @@ -131,7 +131,7 @@ func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, u } // Try to update + deref existing status model. - latest, apubStatus, err := d.enrichStatus(ctx, + latest, apubStatus, err := d.enrichStatusSafely(ctx, requestUser, uri, status, @@ -140,10 +140,6 @@ func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, u if err != nil { log.Errorf(ctx, "error enriching remote status: %v", err) - // Update fetch-at to slow re-attempts. - status.FetchedAt = time.Now() - _ = d.state.DB.UpdateStatus(ctx, status, "fetched_at") - // Fallback to existing. return status, nil, nil } @@ -166,8 +162,8 @@ func (d *Dereferencer) RefreshStatus(ctx context.Context, requestUser string, st return nil, nil, gtserror.Newf("invalid status uri %q: %w", status.URI, err) } - // Try to update + deref existing status model. - latest, apubStatus, err := d.enrichStatus(ctx, + // Try to update + deref the passed status model. + latest, apubStatus, err := d.enrichStatusSafely(ctx, requestUser, uri, status, @@ -189,7 +185,7 @@ func (d *Dereferencer) RefreshStatus(ctx context.Context, requestUser string, st // This is a more optimized form of manually enqueueing .UpdateStatus() to the federation worker, since it only enqueues update if necessary. func (d *Dereferencer) RefreshStatusAsync(ctx context.Context, requestUser string, status *gtsmodel.Status, apubStatus ap.Statusable, force bool) { // Check whether needs update. - if statusUpToDate(status) { + if !force && statusUpToDate(status) { return } @@ -202,17 +198,81 @@ func (d *Dereferencer) RefreshStatusAsync(ctx context.Context, requestUser strin // Enqueue a worker function to re-fetch this status async. d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { - latest, apubStatus, err := d.enrichStatus(ctx, requestUser, uri, status, apubStatus) + latest, apubStatus, err := d.enrichStatusSafely(ctx, requestUser, uri, status, apubStatus) if err != nil { log.Errorf(ctx, "error enriching remote status: %v", err) return } - // This status was updated, re-dereference the whole thread. - d.dereferenceThread(ctx, requestUser, uri, latest, apubStatus) + if apubStatus != nil { + // This status was updated, re-dereference the whole thread. + d.dereferenceThread(ctx, requestUser, uri, latest, apubStatus) + } }) } +// enrichStatusSafely wraps enrichStatus() to perform +// it within the State{}.FedLocks mutexmap, which protects +// dereferencing actions with per-URI mutex locks. +func (d *Dereferencer) enrichStatusSafely( + ctx context.Context, + requestUser string, + uri *url.URL, + status *gtsmodel.Status, + apubStatus ap.Statusable, +) (*gtsmodel.Status, ap.Statusable, error) { + uriStr := status.URI + + if status.ID != "" { + // This is an existing status, first try to populate it. This + // is required by the checks below for existing tags, media etc. + if err := d.state.DB.PopulateStatus(ctx, status); err != nil { + log.Errorf(ctx, "error populating existing status %s: %v", uriStr, err) + } + } + + // Acquire per-URI deref lock, wraping unlock + // to safely defer in case of panic, while still + // performing more granular unlocks when needed. + unlock := d.state.FedLocks.Lock(uriStr) + unlock = doOnce(unlock) + defer unlock() + + // Perform status enrichment with passed vars. + latest, apubStatus, err := d.enrichStatus(ctx, + requestUser, + uri, + status, + apubStatus, + ) + + if gtserror.StatusCode(err) >= 400 { + // Update fetch-at to slow re-attempts. + status.FetchedAt = time.Now() + _ = d.state.DB.UpdateStatus(ctx, status, "fetched_at") + } + + // Unlock now + // we're done. + unlock() + + if errors.Is(err, db.ErrAlreadyExists) { + // Ensure AP model isn't set, + // otherwise this indicates WE + // enriched the status. + apubStatus = nil + + // DATA RACE! We likely lost out to another goroutine + // in a call to db.Put(Status). Look again in DB by URI. + latest, err = d.state.DB.GetStatusByURI(ctx, status.URI) + if err != nil { + err = gtserror.Newf("error getting status %s from database after race: %w", uriStr, err) + } + } + + return latest, apubStatus, err +} + // enrichStatus will enrich the given status, whether a new // barebones model, or existing model from the database. // It handles necessary dereferencing, database updates, etc. @@ -258,15 +318,10 @@ func (d *Dereferencer) enrichStatus( return nil, nil, gtserror.New("attributedTo was empty") } - // Ensure we have the author account of the status dereferenced (+ up-to-date). - if author, _, err := d.getAccountByURI(ctx, requestUser, attributedTo); err != nil { - if status.AccountID == "" { - // Provided status account is nil, i.e. this is a new status / author, so a deref fail is unrecoverable. - return nil, nil, gtserror.Newf("failed to dereference status author %s: %w", uri, err) - } - } else if status.AccountID != "" && status.AccountID != author.ID { - // There already existed an account for this status author, but account ID changed. This shouldn't happen! - log.Warnf(ctx, "status author account ID changed: old=%s new=%s", status.AccountID, author.ID) + // Ensure we have the author account of the status dereferenced (+ up-to-date). If this is a new status + // (i.e. status.AccountID == "") then any error here is irrecoverable. AccountID must ALWAYS be set. + if _, _, err := d.getAccountByURI(ctx, requestUser, attributedTo); err != nil && status.AccountID == "" { + return nil, nil, gtserror.Newf("failed to dereference status author %s: %w", uri, err) } // ActivityPub model was recently dereferenced, so assume that passed status @@ -303,7 +358,7 @@ func (d *Dereferencer) enrichStatus( } // Ensure the status' tags are populated, (changes are expected / okay). - if err := d.fetchStatusTags(ctx, latestStatus); err != nil { + if err := d.fetchStatusTags(ctx, status, latestStatus); err != nil { return nil, nil, gtserror.Newf("error populating tags for status %s: %w", uri, err) } @@ -323,13 +378,6 @@ func (d *Dereferencer) enrichStatus( // // This is new, put the status in the database. err := d.state.DB.PutStatus(ctx, latestStatus) - - if errors.Is(err, db.ErrAlreadyExists) { - // TODO: replace this quick fix with per-URI deref locks. - latestStatus, err = d.state.DB.GetStatusByURI(ctx, latestStatus.URI) - return latestStatus, nil, err - } - if err != nil { return nil, nil, gtserror.Newf("error putting in database: %w", err) } @@ -545,36 +593,41 @@ func (d *Dereferencer) threadStatus(ctx context.Context, status *gtsmodel.Status return nil } -func (d *Dereferencer) fetchStatusTags(ctx context.Context, status *gtsmodel.Status) error { +func (d *Dereferencer) fetchStatusTags(ctx context.Context, existing, status *gtsmodel.Status) error { // Allocate new slice to take the yet-to-be determined tag IDs. status.TagIDs = make([]string, len(status.Tags)) for i := range status.Tags { - placeholder := status.Tags[i] + tag := status.Tags[i] + + // Look for tag in existing status with name. + existing, ok := existing.GetTagByName(tag.Name) + if ok && existing.ID != "" { + status.Tags[i] = existing + status.TagIDs[i] = existing.ID + continue + } - // Look for existing tag with this name first. - tag, err := d.state.DB.GetTagByName(ctx, placeholder.Name) + // Look for existing tag with name in the database. + existing, err := d.state.DB.GetTagByName(ctx, tag.Name) if err != nil && !errors.Is(err, db.ErrNoEntries) { - log.Errorf(ctx, "db error getting tag %s: %v", tag.Name, err) + return gtserror.Newf("db error getting tag %s: %w", tag.Name, err) + } else if existing != nil { + status.Tags[i] = existing + status.TagIDs[i] = existing.ID continue } - if tag == nil { - // Create new ID for tag name. - tag = >smodel.Tag{ - ID: id.NewULID(), - Name: placeholder.Name, - } + // Create new ID for tag. + tag.ID = id.NewULID() - // Insert this tag with new name into the database. - if err := d.state.DB.PutTag(ctx, tag); err != nil { - log.Errorf(ctx, "db error putting tag %s: %v", tag.Name, err) - continue - } + // Insert this tag with new name into the database. + if err := d.state.DB.PutTag(ctx, tag); err != nil { + log.Errorf(ctx, "db error putting tag %s: %v", tag.Name, err) + continue } - // Set the *new* tag and ID. - status.Tags[i] = tag + // Set new tag ID in slice. status.TagIDs[i] = tag.ID } @@ -600,10 +653,10 @@ func (d *Dereferencer) fetchStatusAttachments(ctx context.Context, tsport transp status.AttachmentIDs = make([]string, len(status.Attachments)) for i := range status.Attachments { - placeholder := status.Attachments[i] + attachment := status.Attachments[i] // Look for existing media attachment with remoet URL first. - existing, ok := existing.GetAttachmentByRemoteURL(placeholder.RemoteURL) + existing, ok := existing.GetAttachmentByRemoteURL(attachment.RemoteURL) if ok && existing.ID != "" && *existing.Cached { status.Attachments[i] = existing status.AttachmentIDs[i] = existing.ID @@ -611,9 +664,9 @@ func (d *Dereferencer) fetchStatusAttachments(ctx context.Context, tsport transp } // Ensure a valid media attachment remote URL. - remoteURL, err := url.Parse(placeholder.RemoteURL) + remoteURL, err := url.Parse(attachment.RemoteURL) if err != nil { - log.Errorf(ctx, "invalid remote media url %q: %v", placeholder.RemoteURL, err) + log.Errorf(ctx, "invalid remote media url %q: %v", attachment.RemoteURL, err) continue } @@ -622,9 +675,9 @@ func (d *Dereferencer) fetchStatusAttachments(ctx context.Context, tsport transp return tsport.DereferenceMedia(ctx, remoteURL) }, status.AccountID, &media.AdditionalMediaInfo{ StatusID: &status.ID, - RemoteURL: &placeholder.RemoteURL, - Description: &placeholder.Description, - Blurhash: &placeholder.Blurhash, + RemoteURL: &attachment.RemoteURL, + Description: &attachment.Description, + Blurhash: &attachment.Blurhash, }) if err != nil { log.Errorf(ctx, "error processing attachment: %v", err) @@ -632,15 +685,15 @@ func (d *Dereferencer) fetchStatusAttachments(ctx context.Context, tsport transp } // Force attachment loading *right now*. - media, err := processing.LoadAttachment(ctx) + attachment, err = processing.LoadAttachment(ctx) if err != nil { log.Errorf(ctx, "error loading attachment: %v", err) continue } // Set the *new* attachment and ID. - status.Attachments[i] = media - status.AttachmentIDs[i] = media.ID + status.Attachments[i] = attachment + status.AttachmentIDs[i] = attachment.ID } for i := 0; i < len(status.AttachmentIDs); { diff --git a/internal/federation/dereferencing/error.go b/internal/federation/dereferencing/util.go index 6a1ce0a6e..e69aeec3b 100644 --- a/internal/federation/dereferencing/error.go +++ b/internal/federation/dereferencing/util.go @@ -16,3 +16,14 @@ // along with this program. If not, see <http://www.gnu.org/licenses/>. package dereferencing + +// doOnce wraps a function to only perform it once. +func doOnce(fn func()) func() { + var once int32 + return func() { + if once == 0 { + fn() + once = 1 + } + } +} diff --git a/internal/federation/federatingdb/db.go b/internal/federation/federatingdb/db.go index c412ba3f8..8e98dc2f2 100644 --- a/internal/federation/federatingdb/db.go +++ b/internal/federation/federatingdb/db.go @@ -20,7 +20,6 @@ package federatingdb import ( "context" - "codeberg.org/gruf/go-mutexes" "github.com/superseriousbusiness/activity/pub" "github.com/superseriousbusiness/activity/streams/vocab" "github.com/superseriousbusiness/gotosocial/internal/state" @@ -40,7 +39,6 @@ type DB interface { // FederatingDB uses the underlying DB interface to implement the go-fed pub.Database interface. // It doesn't care what the underlying implementation of the DB interface is, as long as it works. type federatingDB struct { - locks mutexes.MutexMap state *state.State converter *typeutils.Converter } @@ -48,7 +46,6 @@ type federatingDB struct { // New returns a DB interface using the given database and config func New(state *state.State, converter *typeutils.Converter) DB { fdb := federatingDB{ - locks: mutexes.NewMap(-1, -1), // use defaults state: state, converter: converter, } diff --git a/internal/federation/federatingdb/lock.go b/internal/federation/federatingdb/lock.go index 900a282af..5353aea91 100644 --- a/internal/federation/federatingdb/lock.go +++ b/internal/federation/federatingdb/lock.go @@ -19,7 +19,6 @@ package federatingdb import ( "context" - "errors" "net/url" ) @@ -35,9 +34,5 @@ import ( // // Used to ensure race conditions in multiple requests do not occur. func (f *federatingDB) Lock(c context.Context, id *url.URL) (func(), error) { - if id == nil { - return nil, errors.New("Lock: id was nil") - } - unlock := f.locks.Lock(id.String()) - return unlock, nil + return f.state.FedLocks.Lock("federatingDB " + id.String()), nil // id should NEVER be nil. } |