summaryrefslogtreecommitdiff
path: root/internal/federation
diff options
context:
space:
mode:
Diffstat (limited to 'internal/federation')
-rw-r--r--internal/federation/dereferencing/account.go139
-rw-r--r--internal/federation/dereferencing/dereferencer.go23
-rw-r--r--internal/federation/dereferencing/emoji.go17
-rw-r--r--internal/federation/dereferencing/status.go167
-rw-r--r--internal/federation/dereferencing/util.go (renamed from internal/federation/dereferencing/error.go)11
-rw-r--r--internal/federation/federatingdb/db.go3
-rw-r--r--internal/federation/federatingdb/lock.go7
7 files changed, 239 insertions, 128 deletions
diff --git a/internal/federation/dereferencing/account.go b/internal/federation/dereferencing/account.go
index 670c8e2c8..58f07f9cd 100644
--- a/internal/federation/dereferencing/account.go
+++ b/internal/federation/dereferencing/account.go
@@ -122,7 +122,7 @@ func (d *Dereferencer) getAccountByURI(ctx context.Context, requestUser string,
}
// Create and pass-through a new bare-bones model for dereferencing.
- return d.enrichAccount(ctx, requestUser, uri, &gtsmodel.Account{
+ return d.enrichAccountSafely(ctx, requestUser, uri, &gtsmodel.Account{
ID: id.NewULID(),
Domain: uri.Host,
URI: uriStr,
@@ -139,7 +139,7 @@ func (d *Dereferencer) getAccountByURI(ctx context.Context, requestUser string,
}
// Try to update existing account model.
- latest, apubAcc, err := d.enrichAccount(ctx,
+ latest, apubAcc, err := d.enrichAccountSafely(ctx,
requestUser,
uri,
account,
@@ -148,10 +148,6 @@ func (d *Dereferencer) getAccountByURI(ctx context.Context, requestUser string,
if err != nil {
log.Errorf(ctx, "error enriching remote account: %v", err)
- // Update fetch-at to slow re-attempts.
- account.FetchedAt = time.Now()
- _ = d.state.DB.UpdateAccount(ctx, account, "fetched_at")
-
// Fallback to existing.
return account, nil, nil
}
@@ -218,7 +214,7 @@ func (d *Dereferencer) getAccountByUsernameDomain(
}
// Create and pass-through a new bare-bones model for dereferencing.
- account, apubAcc, err := d.enrichAccount(ctx, requestUser, nil, &gtsmodel.Account{
+ account, apubAcc, err := d.enrichAccountSafely(ctx, requestUser, nil, &gtsmodel.Account{
ID: id.NewULID(),
Username: username,
Domain: domain,
@@ -244,7 +240,7 @@ func (d *Dereferencer) getAccountByUsernameDomain(
if apubAcc == nil {
// This is existing up-to-date account, ensure it is populated.
- if err := d.state.DB.PopulateAccount(ctx, account); err != nil {
+ if err := d.state.DB.PopulateAccount(ctx, latest); err != nil {
log.Errorf(ctx, "error populating existing account: %v", err)
}
}
@@ -267,8 +263,8 @@ func (d *Dereferencer) RefreshAccount(ctx context.Context, requestUser string, a
return nil, nil, gtserror.Newf("invalid account uri %q: %w", account.URI, err)
}
- // Try to update + deref existing account model.
- latest, apubAcc, err := d.enrichAccount(ctx,
+ // Try to update + deref passed account model.
+ latest, apubAcc, err := d.enrichAccountSafely(ctx,
requestUser,
uri,
account,
@@ -276,20 +272,17 @@ func (d *Dereferencer) RefreshAccount(ctx context.Context, requestUser string, a
)
if err != nil {
log.Errorf(ctx, "error enriching remote account: %v", err)
-
- // Update fetch-at to slow re-attempts.
- account.FetchedAt = time.Now()
- _ = d.state.DB.UpdateAccount(ctx, account, "fetched_at")
-
- return nil, nil, err
+ return nil, nil, gtserror.Newf("error enriching remote account: %w", err)
}
- // This account was updated, enqueue re-dereference featured posts.
- d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) {
- if err := d.dereferenceAccountFeatured(ctx, requestUser, account); err != nil {
- log.Errorf(ctx, "error fetching account featured collection: %v", err)
- }
- })
+ if apubAcc != nil {
+ // This account was updated, enqueue re-dereference featured posts.
+ d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) {
+ if err := d.dereferenceAccountFeatured(ctx, requestUser, latest); err != nil {
+ log.Errorf(ctx, "error fetching account featured collection: %v", err)
+ }
+ })
+ }
return latest, apubAcc, nil
}
@@ -311,21 +304,94 @@ func (d *Dereferencer) RefreshAccountAsync(ctx context.Context, requestUser stri
// Enqueue a worker function to enrich this account async.
d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) {
- latest, _, err := d.enrichAccount(ctx, requestUser, uri, account, apubAcc)
+ latest, apubAcc, err := d.enrichAccountSafely(ctx, requestUser, uri, account, apubAcc)
if err != nil {
log.Errorf(ctx, "error enriching remote account: %v", err)
return
}
- // This account was updated, re-dereference account featured posts.
- if err := d.dereferenceAccountFeatured(ctx, requestUser, latest); err != nil {
- log.Errorf(ctx, "error fetching account featured collection: %v", err)
+ if apubAcc != nil {
+ // This account was updated, enqueue re-dereference featured posts.
+ d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) {
+ if err := d.dereferenceAccountFeatured(ctx, requestUser, latest); err != nil {
+ log.Errorf(ctx, "error fetching account featured collection: %v", err)
+ }
+ })
}
})
}
-// enrichAccount will enrich the given account, whether a new barebones model, or existing model from the database. It handles necessary dereferencing, webfingering etc.
-func (d *Dereferencer) enrichAccount(ctx context.Context, requestUser string, uri *url.URL, account *gtsmodel.Account, apubAcc ap.Accountable) (*gtsmodel.Account, ap.Accountable, error) {
+// enrichAccountSafely wraps enrichAccount() to perform
+// it within the State{}.FedLocks mutexmap, which protects
+// dereferencing actions with per-URI mutex locks.
+func (d *Dereferencer) enrichAccountSafely(
+ ctx context.Context,
+ requestUser string,
+ uri *url.URL,
+ account *gtsmodel.Account,
+ apubAcc ap.Accountable,
+) (*gtsmodel.Account, ap.Accountable, error) {
+ // By default use account.URI
+ // as the per-URI deref lock.
+ uriStr := account.URI
+
+ if uriStr == "" {
+ // No URI is set yet, instead generate a faux-one from user+domain.
+ uriStr = "https://" + account.Domain + "/user/" + account.Username
+ }
+
+ // Acquire per-URI deref lock, wraping unlock
+ // to safely defer in case of panic, while still
+ // performing more granular unlocks when needed.
+ unlock := d.state.FedLocks.Lock(uriStr)
+ unlock = doOnce(unlock)
+ defer unlock()
+
+ // Perform status enrichment with passed vars.
+ latest, apubAcc, err := d.enrichAccount(ctx,
+ requestUser,
+ uri,
+ account,
+ apubAcc,
+ )
+
+ if gtserror.StatusCode(err) >= 400 {
+ // Update fetch-at to slow re-attempts.
+ account.FetchedAt = time.Now()
+ _ = d.state.DB.UpdateAccount(ctx, account, "fetched_at")
+ }
+
+ // Unlock now
+ // we're done.
+ unlock()
+
+ if errors.Is(err, db.ErrAlreadyExists) {
+ // Ensure AP model isn't set,
+ // otherwise this indicates WE
+ // enriched the account.
+ apubAcc = nil
+
+ // DATA RACE! We likely lost out to another goroutine
+ // in a call to db.Put(Account). Look again in DB by URI.
+ latest, err = d.state.DB.GetAccountByURI(ctx, account.URI)
+ if err != nil {
+ err = gtserror.Newf("error getting account %s from database after race: %w", uriStr, err)
+ }
+ }
+
+ return latest, apubAcc, err
+}
+
+// enrichAccount will enrich the given account, whether a
+// new barebones model, or existing model from the database.
+// It handles necessary dereferencing, webfingering etc.
+func (d *Dereferencer) enrichAccount(
+ ctx context.Context,
+ requestUser string,
+ uri *url.URL,
+ account *gtsmodel.Account,
+ apubAcc ap.Accountable,
+) (*gtsmodel.Account, ap.Accountable, error) {
// Pre-fetch a transport for requesting username, used by later deref procedures.
tsport, err := d.transportController.NewTransportForUsername(ctx, requestUser)
if err != nil {
@@ -476,13 +542,6 @@ func (d *Dereferencer) enrichAccount(ctx context.Context, requestUser string, ur
// This is new, put it in the database.
err := d.state.DB.PutAccount(ctx, latestAcc)
-
- if errors.Is(err, db.ErrAlreadyExists) {
- // TODO: replace this quick fix with per-URI deref locks.
- latestAcc, err = d.state.DB.GetAccountByURI(ctx, latestAcc.URI)
- return latestAcc, nil, err
- }
-
if err != nil {
return nil, nil, gtserror.Newf("error putting in database: %w", err)
}
@@ -545,7 +604,8 @@ func (d *Dereferencer) fetchRemoteAccountAvatar(ctx context.Context, tsport tran
}
// Acquire lock for derefs map.
- unlock := d.derefAvatarsMu.Lock()
+ unlock := d.state.FedLocks.Lock(latestAcc.AvatarRemoteURL)
+ unlock = doOnce(unlock)
defer unlock()
// Look for an existing dereference in progress.
@@ -573,7 +633,7 @@ func (d *Dereferencer) fetchRemoteAccountAvatar(ctx context.Context, tsport tran
defer func() {
// On exit safely remove media from map.
- unlock := d.derefAvatarsMu.Lock()
+ unlock := d.state.FedLocks.Lock(latestAcc.AvatarRemoteURL)
delete(d.derefAvatars, latestAcc.AvatarRemoteURL)
unlock()
}()
@@ -635,7 +695,8 @@ func (d *Dereferencer) fetchRemoteAccountHeader(ctx context.Context, tsport tran
}
// Acquire lock for derefs map.
- unlock := d.derefHeadersMu.Lock()
+ unlock := d.state.FedLocks.Lock(latestAcc.HeaderRemoteURL)
+ unlock = doOnce(unlock)
defer unlock()
// Look for an existing dereference in progress.
@@ -663,7 +724,7 @@ func (d *Dereferencer) fetchRemoteAccountHeader(ctx context.Context, tsport tran
defer func() {
// On exit safely remove media from map.
- unlock := d.derefHeadersMu.Lock()
+ unlock := d.state.FedLocks.Lock(latestAcc.HeaderRemoteURL)
delete(d.derefHeaders, latestAcc.HeaderRemoteURL)
unlock()
}()
diff --git a/internal/federation/dereferencing/dereferencer.go b/internal/federation/dereferencing/dereferencer.go
index a5c68bd80..5bd16c1e0 100644
--- a/internal/federation/dereferencing/dereferencer.go
+++ b/internal/federation/dereferencing/dereferencer.go
@@ -21,7 +21,6 @@ import (
"net/url"
"sync"
- "codeberg.org/gruf/go-mutexes"
"github.com/superseriousbusiness/gotosocial/internal/media"
"github.com/superseriousbusiness/gotosocial/internal/state"
"github.com/superseriousbusiness/gotosocial/internal/transport"
@@ -35,14 +34,14 @@ type Dereferencer struct {
converter *typeutils.Converter
transportController transport.Controller
mediaManager *media.Manager
- derefAvatars map[string]*media.ProcessingMedia
- derefAvatarsMu mutexes.Mutex
- derefHeaders map[string]*media.ProcessingMedia
- derefHeadersMu mutexes.Mutex
- derefEmojis map[string]*media.ProcessingEmoji
- derefEmojisMu mutexes.Mutex
- handshakes map[string][]*url.URL
- handshakesMu sync.Mutex // mutex to lock/unlock when checking or updating the handshakes map
+
+ // all protected by State{}.FedLocks.
+ derefAvatars map[string]*media.ProcessingMedia
+ derefHeaders map[string]*media.ProcessingMedia
+ derefEmojis map[string]*media.ProcessingEmoji
+
+ handshakes map[string][]*url.URL
+ handshakesMu sync.Mutex
}
// NewDereferencer returns a Dereferencer initialized with the given parameters.
@@ -61,11 +60,5 @@ func NewDereferencer(
derefHeaders: make(map[string]*media.ProcessingMedia),
derefEmojis: make(map[string]*media.ProcessingEmoji),
handshakes: make(map[string][]*url.URL),
-
- // use wrapped mutexes to allow safely deferring unlock
- // even when more granular locks are required (only unlocks once).
- derefAvatarsMu: mutexes.WithSafety(mutexes.New()),
- derefHeadersMu: mutexes.WithSafety(mutexes.New()),
- derefEmojisMu: mutexes.WithSafety(mutexes.New()),
}
}
diff --git a/internal/federation/dereferencing/emoji.go b/internal/federation/dereferencing/emoji.go
index 2d86da663..1bf19d2fd 100644
--- a/internal/federation/dereferencing/emoji.go
+++ b/internal/federation/dereferencing/emoji.go
@@ -36,8 +36,15 @@ func (d *Dereferencer) GetRemoteEmoji(ctx context.Context, requestingUsername st
processingEmoji *media.ProcessingEmoji
)
+ // Ensure we have been passed a valid URL.
+ derefURI, err := url.Parse(remoteURL)
+ if err != nil {
+ return nil, fmt.Errorf("GetRemoteEmoji: error parsing url for emoji %s: %s", shortcodeDomain, err)
+ }
+
// Acquire lock for derefs map.
- unlock := d.derefEmojisMu.Lock()
+ unlock := d.state.FedLocks.Lock(remoteURL)
+ unlock = doOnce(unlock)
defer unlock()
// first check if we're already processing this emoji
@@ -51,11 +58,6 @@ func (d *Dereferencer) GetRemoteEmoji(ctx context.Context, requestingUsername st
return nil, fmt.Errorf("GetRemoteEmoji: error creating transport to fetch emoji %s: %s", shortcodeDomain, err)
}
- derefURI, err := url.Parse(remoteURL)
- if err != nil {
- return nil, fmt.Errorf("GetRemoteEmoji: error parsing url for emoji %s: %s", shortcodeDomain, err)
- }
-
dataFunc := func(innerCtx context.Context) (io.ReadCloser, int64, error) {
return t.DereferenceMedia(innerCtx, derefURI)
}
@@ -75,7 +77,7 @@ func (d *Dereferencer) GetRemoteEmoji(ctx context.Context, requestingUsername st
defer func() {
// On exit safely remove emoji from map.
- unlock := d.derefEmojisMu.Lock()
+ unlock := d.state.FedLocks.Lock(remoteURL)
delete(d.derefEmojis, shortcodeDomain)
unlock()
}()
@@ -95,7 +97,6 @@ func (d *Dereferencer) populateEmojis(ctx context.Context, rawEmojis []*gtsmodel
// * the shortcode of the emoji
// * the remote URL of the image
// This should be enough to dereference the emoji
-
gotEmojis := make([]*gtsmodel.Emoji, 0, len(rawEmojis))
for _, e := range rawEmojis {
diff --git a/internal/federation/dereferencing/status.go b/internal/federation/dereferencing/status.go
index adc73e843..712692814 100644
--- a/internal/federation/dereferencing/status.go
+++ b/internal/federation/dereferencing/status.go
@@ -115,7 +115,7 @@ func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, u
}
// Create and pass-through a new bare-bones model for deref.
- return d.enrichStatus(ctx, requestUser, uri, &gtsmodel.Status{
+ return d.enrichStatusSafely(ctx, requestUser, uri, &gtsmodel.Status{
Local: func() *bool { var false bool; return &false }(),
URI: uriStr,
}, nil)
@@ -131,7 +131,7 @@ func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, u
}
// Try to update + deref existing status model.
- latest, apubStatus, err := d.enrichStatus(ctx,
+ latest, apubStatus, err := d.enrichStatusSafely(ctx,
requestUser,
uri,
status,
@@ -140,10 +140,6 @@ func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, u
if err != nil {
log.Errorf(ctx, "error enriching remote status: %v", err)
- // Update fetch-at to slow re-attempts.
- status.FetchedAt = time.Now()
- _ = d.state.DB.UpdateStatus(ctx, status, "fetched_at")
-
// Fallback to existing.
return status, nil, nil
}
@@ -166,8 +162,8 @@ func (d *Dereferencer) RefreshStatus(ctx context.Context, requestUser string, st
return nil, nil, gtserror.Newf("invalid status uri %q: %w", status.URI, err)
}
- // Try to update + deref existing status model.
- latest, apubStatus, err := d.enrichStatus(ctx,
+ // Try to update + deref the passed status model.
+ latest, apubStatus, err := d.enrichStatusSafely(ctx,
requestUser,
uri,
status,
@@ -189,7 +185,7 @@ func (d *Dereferencer) RefreshStatus(ctx context.Context, requestUser string, st
// This is a more optimized form of manually enqueueing .UpdateStatus() to the federation worker, since it only enqueues update if necessary.
func (d *Dereferencer) RefreshStatusAsync(ctx context.Context, requestUser string, status *gtsmodel.Status, apubStatus ap.Statusable, force bool) {
// Check whether needs update.
- if statusUpToDate(status) {
+ if !force && statusUpToDate(status) {
return
}
@@ -202,17 +198,81 @@ func (d *Dereferencer) RefreshStatusAsync(ctx context.Context, requestUser strin
// Enqueue a worker function to re-fetch this status async.
d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) {
- latest, apubStatus, err := d.enrichStatus(ctx, requestUser, uri, status, apubStatus)
+ latest, apubStatus, err := d.enrichStatusSafely(ctx, requestUser, uri, status, apubStatus)
if err != nil {
log.Errorf(ctx, "error enriching remote status: %v", err)
return
}
- // This status was updated, re-dereference the whole thread.
- d.dereferenceThread(ctx, requestUser, uri, latest, apubStatus)
+ if apubStatus != nil {
+ // This status was updated, re-dereference the whole thread.
+ d.dereferenceThread(ctx, requestUser, uri, latest, apubStatus)
+ }
})
}
+// enrichStatusSafely wraps enrichStatus() to perform
+// it within the State{}.FedLocks mutexmap, which protects
+// dereferencing actions with per-URI mutex locks.
+func (d *Dereferencer) enrichStatusSafely(
+ ctx context.Context,
+ requestUser string,
+ uri *url.URL,
+ status *gtsmodel.Status,
+ apubStatus ap.Statusable,
+) (*gtsmodel.Status, ap.Statusable, error) {
+ uriStr := status.URI
+
+ if status.ID != "" {
+ // This is an existing status, first try to populate it. This
+ // is required by the checks below for existing tags, media etc.
+ if err := d.state.DB.PopulateStatus(ctx, status); err != nil {
+ log.Errorf(ctx, "error populating existing status %s: %v", uriStr, err)
+ }
+ }
+
+ // Acquire per-URI deref lock, wraping unlock
+ // to safely defer in case of panic, while still
+ // performing more granular unlocks when needed.
+ unlock := d.state.FedLocks.Lock(uriStr)
+ unlock = doOnce(unlock)
+ defer unlock()
+
+ // Perform status enrichment with passed vars.
+ latest, apubStatus, err := d.enrichStatus(ctx,
+ requestUser,
+ uri,
+ status,
+ apubStatus,
+ )
+
+ if gtserror.StatusCode(err) >= 400 {
+ // Update fetch-at to slow re-attempts.
+ status.FetchedAt = time.Now()
+ _ = d.state.DB.UpdateStatus(ctx, status, "fetched_at")
+ }
+
+ // Unlock now
+ // we're done.
+ unlock()
+
+ if errors.Is(err, db.ErrAlreadyExists) {
+ // Ensure AP model isn't set,
+ // otherwise this indicates WE
+ // enriched the status.
+ apubStatus = nil
+
+ // DATA RACE! We likely lost out to another goroutine
+ // in a call to db.Put(Status). Look again in DB by URI.
+ latest, err = d.state.DB.GetStatusByURI(ctx, status.URI)
+ if err != nil {
+ err = gtserror.Newf("error getting status %s from database after race: %w", uriStr, err)
+ }
+ }
+
+ return latest, apubStatus, err
+}
+
// enrichStatus will enrich the given status, whether a new
// barebones model, or existing model from the database.
// It handles necessary dereferencing, database updates, etc.
@@ -258,15 +318,10 @@ func (d *Dereferencer) enrichStatus(
return nil, nil, gtserror.New("attributedTo was empty")
}
- // Ensure we have the author account of the status dereferenced (+ up-to-date).
- if author, _, err := d.getAccountByURI(ctx, requestUser, attributedTo); err != nil {
- if status.AccountID == "" {
- // Provided status account is nil, i.e. this is a new status / author, so a deref fail is unrecoverable.
- return nil, nil, gtserror.Newf("failed to dereference status author %s: %w", uri, err)
- }
- } else if status.AccountID != "" && status.AccountID != author.ID {
- // There already existed an account for this status author, but account ID changed. This shouldn't happen!
- log.Warnf(ctx, "status author account ID changed: old=%s new=%s", status.AccountID, author.ID)
+ // Ensure we have the author account of the status dereferenced (+ up-to-date). If this is a new status
+ // (i.e. status.AccountID == "") then any error here is irrecoverable. AccountID must ALWAYS be set.
+ if _, _, err := d.getAccountByURI(ctx, requestUser, attributedTo); err != nil && status.AccountID == "" {
+ return nil, nil, gtserror.Newf("failed to dereference status author %s: %w", uri, err)
}
// ActivityPub model was recently dereferenced, so assume that passed status
@@ -303,7 +358,7 @@ func (d *Dereferencer) enrichStatus(
}
// Ensure the status' tags are populated, (changes are expected / okay).
- if err := d.fetchStatusTags(ctx, latestStatus); err != nil {
+ if err := d.fetchStatusTags(ctx, status, latestStatus); err != nil {
return nil, nil, gtserror.Newf("error populating tags for status %s: %w", uri, err)
}
@@ -323,13 +378,6 @@ func (d *Dereferencer) enrichStatus(
//
// This is new, put the status in the database.
err := d.state.DB.PutStatus(ctx, latestStatus)
-
- if errors.Is(err, db.ErrAlreadyExists) {
- // TODO: replace this quick fix with per-URI deref locks.
- latestStatus, err = d.state.DB.GetStatusByURI(ctx, latestStatus.URI)
- return latestStatus, nil, err
- }
-
if err != nil {
return nil, nil, gtserror.Newf("error putting in database: %w", err)
}
@@ -545,36 +593,41 @@ func (d *Dereferencer) threadStatus(ctx context.Context, status *gtsmodel.Status
return nil
}
-func (d *Dereferencer) fetchStatusTags(ctx context.Context, status *gtsmodel.Status) error {
+func (d *Dereferencer) fetchStatusTags(ctx context.Context, existing, status *gtsmodel.Status) error {
// Allocate new slice to take the yet-to-be determined tag IDs.
status.TagIDs = make([]string, len(status.Tags))
for i := range status.Tags {
- placeholder := status.Tags[i]
+ tag := status.Tags[i]
+
+ // Look for tag in existing status with name.
+ existing, ok := existing.GetTagByName(tag.Name)
+ if ok && existing.ID != "" {
+ status.Tags[i] = existing
+ status.TagIDs[i] = existing.ID
+ continue
+ }
- // Look for existing tag with this name first.
- tag, err := d.state.DB.GetTagByName(ctx, placeholder.Name)
+ // Look for existing tag with name in the database.
+ existing, err := d.state.DB.GetTagByName(ctx, tag.Name)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
- log.Errorf(ctx, "db error getting tag %s: %v", tag.Name, err)
+ return gtserror.Newf("db error getting tag %s: %w", tag.Name, err)
+ } else if existing != nil {
+ status.Tags[i] = existing
+ status.TagIDs[i] = existing.ID
continue
}
- if tag == nil {
- // Create new ID for tag name.
- tag = &gtsmodel.Tag{
- ID: id.NewULID(),
- Name: placeholder.Name,
- }
+ // Create new ID for tag.
+ tag.ID = id.NewULID()
- // Insert this tag with new name into the database.
- if err := d.state.DB.PutTag(ctx, tag); err != nil {
- log.Errorf(ctx, "db error putting tag %s: %v", tag.Name, err)
- continue
- }
+ // Insert this tag with new name into the database.
+ if err := d.state.DB.PutTag(ctx, tag); err != nil {
+ log.Errorf(ctx, "db error putting tag %s: %v", tag.Name, err)
+ continue
}
- // Set the *new* tag and ID.
- status.Tags[i] = tag
+ // Set new tag ID in slice.
status.TagIDs[i] = tag.ID
}
@@ -600,10 +653,10 @@ func (d *Dereferencer) fetchStatusAttachments(ctx context.Context, tsport transp
status.AttachmentIDs = make([]string, len(status.Attachments))
for i := range status.Attachments {
- placeholder := status.Attachments[i]
+ attachment := status.Attachments[i]
// Look for existing media attachment with remoet URL first.
- existing, ok := existing.GetAttachmentByRemoteURL(placeholder.RemoteURL)
+ existing, ok := existing.GetAttachmentByRemoteURL(attachment.RemoteURL)
if ok && existing.ID != "" && *existing.Cached {
status.Attachments[i] = existing
status.AttachmentIDs[i] = existing.ID
@@ -611,9 +664,9 @@ func (d *Dereferencer) fetchStatusAttachments(ctx context.Context, tsport transp
}
// Ensure a valid media attachment remote URL.
- remoteURL, err := url.Parse(placeholder.RemoteURL)
+ remoteURL, err := url.Parse(attachment.RemoteURL)
if err != nil {
- log.Errorf(ctx, "invalid remote media url %q: %v", placeholder.RemoteURL, err)
+ log.Errorf(ctx, "invalid remote media url %q: %v", attachment.RemoteURL, err)
continue
}
@@ -622,9 +675,9 @@ func (d *Dereferencer) fetchStatusAttachments(ctx context.Context, tsport transp
return tsport.DereferenceMedia(ctx, remoteURL)
}, status.AccountID, &media.AdditionalMediaInfo{
StatusID: &status.ID,
- RemoteURL: &placeholder.RemoteURL,
- Description: &placeholder.Description,
- Blurhash: &placeholder.Blurhash,
+ RemoteURL: &attachment.RemoteURL,
+ Description: &attachment.Description,
+ Blurhash: &attachment.Blurhash,
})
if err != nil {
log.Errorf(ctx, "error processing attachment: %v", err)
@@ -632,15 +685,15 @@ func (d *Dereferencer) fetchStatusAttachments(ctx context.Context, tsport transp
}
// Force attachment loading *right now*.
- media, err := processing.LoadAttachment(ctx)
+ attachment, err = processing.LoadAttachment(ctx)
if err != nil {
log.Errorf(ctx, "error loading attachment: %v", err)
continue
}
// Set the *new* attachment and ID.
- status.Attachments[i] = media
- status.AttachmentIDs[i] = media.ID
+ status.Attachments[i] = attachment
+ status.AttachmentIDs[i] = attachment.ID
}
for i := 0; i < len(status.AttachmentIDs); {
diff --git a/internal/federation/dereferencing/error.go b/internal/federation/dereferencing/util.go
index 6a1ce0a6e..e69aeec3b 100644
--- a/internal/federation/dereferencing/error.go
+++ b/internal/federation/dereferencing/util.go
@@ -16,3 +16,14 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package dereferencing
+
+// doOnce wraps a function to only perform it once.
+func doOnce(fn func()) func() {
+ var once int32
+ return func() {
+ if once == 0 {
+ fn()
+ once = 1
+ }
+ }
+}
diff --git a/internal/federation/federatingdb/db.go b/internal/federation/federatingdb/db.go
index c412ba3f8..8e98dc2f2 100644
--- a/internal/federation/federatingdb/db.go
+++ b/internal/federation/federatingdb/db.go
@@ -20,7 +20,6 @@ package federatingdb
import (
"context"
- "codeberg.org/gruf/go-mutexes"
"github.com/superseriousbusiness/activity/pub"
"github.com/superseriousbusiness/activity/streams/vocab"
"github.com/superseriousbusiness/gotosocial/internal/state"
@@ -40,7 +39,6 @@ type DB interface {
// FederatingDB uses the underlying DB interface to implement the go-fed pub.Database interface.
// It doesn't care what the underlying implementation of the DB interface is, as long as it works.
type federatingDB struct {
- locks mutexes.MutexMap
state *state.State
converter *typeutils.Converter
}
@@ -48,7 +46,6 @@ type federatingDB struct {
// New returns a DB interface using the given database and config
func New(state *state.State, converter *typeutils.Converter) DB {
fdb := federatingDB{
- locks: mutexes.NewMap(-1, -1), // use defaults
state: state,
converter: converter,
}
diff --git a/internal/federation/federatingdb/lock.go b/internal/federation/federatingdb/lock.go
index 900a282af..5353aea91 100644
--- a/internal/federation/federatingdb/lock.go
+++ b/internal/federation/federatingdb/lock.go
@@ -19,7 +19,6 @@ package federatingdb
import (
"context"
- "errors"
"net/url"
)
@@ -35,9 +34,5 @@ import (
//
// Used to ensure race conditions in multiple requests do not occur.
func (f *federatingDB) Lock(c context.Context, id *url.URL) (func(), error) {
- if id == nil {
- return nil, errors.New("Lock: id was nil")
- }
- unlock := f.locks.Lock(id.String())
- return unlock, nil
+ return f.state.FedLocks.Lock("federatingDB " + id.String()), nil // id should NEVER be nil.
}