diff options
| author | 2023-10-31 11:12:22 +0000 | |
|---|---|---|
| committer | 2023-10-31 11:12:22 +0000 | |
| commit | ce71a5a7902963538fc54583588850563f6746cc (patch) | |
| tree | 3e869eba6d25d2db5fe81184ffee595e451b3147 /internal | |
| parent | [bugfix] Relax `Mention` parsing, allowing either href or name (#2320) (diff) | |
| download | gotosocial-ce71a5a7902963538fc54583588850563f6746cc.tar.xz | |
[feature] add per-uri dereferencer locks (#2291)
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/db/bundb/migrations/20220612091800_duplicated_media_cleanup.go | 9 | ||||
| -rw-r--r-- | internal/federation/dereferencing/account.go | 139 | ||||
| -rw-r--r-- | internal/federation/dereferencing/dereferencer.go | 23 | ||||
| -rw-r--r-- | internal/federation/dereferencing/emoji.go | 17 | ||||
| -rw-r--r-- | internal/federation/dereferencing/status.go | 167 | ||||
| -rw-r--r-- | internal/federation/dereferencing/util.go (renamed from internal/federation/dereferencing/error.go) | 11 | ||||
| -rw-r--r-- | internal/federation/federatingdb/db.go | 3 | ||||
| -rw-r--r-- | internal/federation/federatingdb/lock.go | 7 | ||||
| -rw-r--r-- | internal/gtsmodel/status.go | 117 | ||||
| -rw-r--r-- | internal/state/state.go | 6 | 
10 files changed, 281 insertions, 218 deletions
diff --git a/internal/db/bundb/migrations/20220612091800_duplicated_media_cleanup.go b/internal/db/bundb/migrations/20220612091800_duplicated_media_cleanup.go index 688878c0e..28bbb3a81 100644 --- a/internal/db/bundb/migrations/20220612091800_duplicated_media_cleanup.go +++ b/internal/db/bundb/migrations/20220612091800_duplicated_media_cleanup.go @@ -23,7 +23,6 @@ import (  	"fmt"  	"path" -	"codeberg.org/gruf/go-store/v2/kv"  	"codeberg.org/gruf/go-store/v2/storage"  	"github.com/superseriousbusiness/gotosocial/internal/config"  	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" @@ -32,14 +31,14 @@ import (  )  func init() { -	deleteAttachment := func(ctx context.Context, l log.Entry, a *gtsmodel.MediaAttachment, s *kv.KVStore, tx bun.Tx) { -		if err := s.Delete(ctx, a.File.Path); err != nil && err != storage.ErrNotFound { +	deleteAttachment := func(ctx context.Context, l log.Entry, a *gtsmodel.MediaAttachment, s storage.Storage, tx bun.Tx) { +		if err := s.Remove(ctx, a.File.Path); err != nil && err != storage.ErrNotFound {  			l.Errorf("error removing file %s: %s", a.File.Path, err)  		} else {  			l.Debugf("deleted %s", a.File.Path)  		} -		if err := s.Delete(ctx, a.Thumbnail.Path); err != nil && err != storage.ErrNotFound { +		if err := s.Remove(ctx, a.Thumbnail.Path); err != nil && err != storage.ErrNotFound {  			l.Errorf("error removing file %s: %s", a.Thumbnail.Path, err)  		} else {  			l.Debugf("deleted %s", a.Thumbnail.Path) @@ -69,7 +68,7 @@ func init() {  		}  		return db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { -			s, err := kv.OpenDisk(storageBasePath, &storage.DiskConfig{ +			s, err := storage.OpenDisk(storageBasePath, &storage.DiskConfig{  				LockFile: path.Join(storageBasePath, "store.lock"),  			})  			if err != nil { diff --git a/internal/federation/dereferencing/account.go b/internal/federation/dereferencing/account.go index 670c8e2c8..58f07f9cd 100644 --- a/internal/federation/dereferencing/account.go +++ b/internal/federation/dereferencing/account.go @@ -122,7 +122,7 @@ func (d *Dereferencer) getAccountByURI(ctx context.Context, requestUser string,  		}  		// Create and pass-through a new bare-bones model for dereferencing. -		return d.enrichAccount(ctx, requestUser, uri, >smodel.Account{ +		return d.enrichAccountSafely(ctx, requestUser, uri, >smodel.Account{  			ID:     id.NewULID(),  			Domain: uri.Host,  			URI:    uriStr, @@ -139,7 +139,7 @@ func (d *Dereferencer) getAccountByURI(ctx context.Context, requestUser string,  	}  	// Try to update existing account model. -	latest, apubAcc, err := d.enrichAccount(ctx, +	latest, apubAcc, err := d.enrichAccountSafely(ctx,  		requestUser,  		uri,  		account, @@ -148,10 +148,6 @@ func (d *Dereferencer) getAccountByURI(ctx context.Context, requestUser string,  	if err != nil {  		log.Errorf(ctx, "error enriching remote account: %v", err) -		// Update fetch-at to slow re-attempts. -		account.FetchedAt = time.Now() -		_ = d.state.DB.UpdateAccount(ctx, account, "fetched_at") -  		// Fallback to existing.  		return account, nil, nil  	} @@ -218,7 +214,7 @@ func (d *Dereferencer) getAccountByUsernameDomain(  		}  		// Create and pass-through a new bare-bones model for dereferencing. -		account, apubAcc, err := d.enrichAccount(ctx, requestUser, nil, >smodel.Account{ +		account, apubAcc, err := d.enrichAccountSafely(ctx, requestUser, nil, >smodel.Account{  			ID:       id.NewULID(),  			Username: username,  			Domain:   domain, @@ -244,7 +240,7 @@ func (d *Dereferencer) getAccountByUsernameDomain(  	if apubAcc == nil {  		// This is existing up-to-date account, ensure it is populated. -		if err := d.state.DB.PopulateAccount(ctx, account); err != nil { +		if err := d.state.DB.PopulateAccount(ctx, latest); err != nil {  			log.Errorf(ctx, "error populating existing account: %v", err)  		}  	} @@ -267,8 +263,8 @@ func (d *Dereferencer) RefreshAccount(ctx context.Context, requestUser string, a  		return nil, nil, gtserror.Newf("invalid account uri %q: %w", account.URI, err)  	} -	// Try to update + deref existing account model. -	latest, apubAcc, err := d.enrichAccount(ctx, +	// Try to update + deref passed account model. +	latest, apubAcc, err := d.enrichAccountSafely(ctx,  		requestUser,  		uri,  		account, @@ -276,20 +272,17 @@ func (d *Dereferencer) RefreshAccount(ctx context.Context, requestUser string, a  	)  	if err != nil {  		log.Errorf(ctx, "error enriching remote account: %v", err) - -		// Update fetch-at to slow re-attempts. -		account.FetchedAt = time.Now() -		_ = d.state.DB.UpdateAccount(ctx, account, "fetched_at") - -		return nil, nil, err +		return nil, nil, gtserror.Newf("error enriching remote account: %w", err)  	} -	// This account was updated, enqueue re-dereference featured posts. -	d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { -		if err := d.dereferenceAccountFeatured(ctx, requestUser, account); err != nil { -			log.Errorf(ctx, "error fetching account featured collection: %v", err) -		} -	}) +	if apubAcc != nil { +		// This account was updated, enqueue re-dereference featured posts. +		d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { +			if err := d.dereferenceAccountFeatured(ctx, requestUser, latest); err != nil { +				log.Errorf(ctx, "error fetching account featured collection: %v", err) +			} +		}) +	}  	return latest, apubAcc, nil  } @@ -311,21 +304,94 @@ func (d *Dereferencer) RefreshAccountAsync(ctx context.Context, requestUser stri  	// Enqueue a worker function to enrich this account async.  	d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { -		latest, _, err := d.enrichAccount(ctx, requestUser, uri, account, apubAcc) +		latest, apubAcc, err := d.enrichAccountSafely(ctx, requestUser, uri, account, apubAcc)  		if err != nil {  			log.Errorf(ctx, "error enriching remote account: %v", err)  			return  		} -		// This account was updated, re-dereference account featured posts. -		if err := d.dereferenceAccountFeatured(ctx, requestUser, latest); err != nil { -			log.Errorf(ctx, "error fetching account featured collection: %v", err) +		if apubAcc != nil { +			// This account was updated, enqueue re-dereference featured posts. +			d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { +				if err := d.dereferenceAccountFeatured(ctx, requestUser, latest); err != nil { +					log.Errorf(ctx, "error fetching account featured collection: %v", err) +				} +			})  		}  	})  } -// enrichAccount will enrich the given account, whether a new barebones model, or existing model from the database. It handles necessary dereferencing, webfingering etc. -func (d *Dereferencer) enrichAccount(ctx context.Context, requestUser string, uri *url.URL, account *gtsmodel.Account, apubAcc ap.Accountable) (*gtsmodel.Account, ap.Accountable, error) { +// enrichAccountSafely wraps enrichAccount() to perform +// it within the State{}.FedLocks mutexmap, which protects +// dereferencing actions with per-URI mutex locks. +func (d *Dereferencer) enrichAccountSafely( +	ctx context.Context, +	requestUser string, +	uri *url.URL, +	account *gtsmodel.Account, +	apubAcc ap.Accountable, +) (*gtsmodel.Account, ap.Accountable, error) { +	// By default use account.URI +	// as the per-URI deref lock. +	uriStr := account.URI + +	if uriStr == "" { +		// No URI is set yet, instead generate a faux-one from user+domain. +		uriStr = "https://" + account.Domain + "/user/" + account.Username +	} + +	// Acquire per-URI deref lock, wraping unlock +	// to safely defer in case of panic, while still +	// performing more granular unlocks when needed. +	unlock := d.state.FedLocks.Lock(uriStr) +	unlock = doOnce(unlock) +	defer unlock() + +	// Perform status enrichment with passed vars. +	latest, apubAcc, err := d.enrichAccount(ctx, +		requestUser, +		uri, +		account, +		apubAcc, +	) + +	if gtserror.StatusCode(err) >= 400 { +		// Update fetch-at to slow re-attempts. +		account.FetchedAt = time.Now() +		_ = d.state.DB.UpdateAccount(ctx, account, "fetched_at") +	} + +	// Unlock now +	// we're done. +	unlock() + +	if errors.Is(err, db.ErrAlreadyExists) { +		// Ensure AP model isn't set, +		// otherwise this indicates WE +		// enriched the account. +		apubAcc = nil + +		// DATA RACE! We likely lost out to another goroutine +		// in a call to db.Put(Account). Look again in DB by URI. +		latest, err = d.state.DB.GetAccountByURI(ctx, account.URI) +		if err != nil { +			err = gtserror.Newf("error getting account %s from database after race: %w", uriStr, err) +		} +	} + +	return latest, apubAcc, err +} + +// enrichAccount will enrich the given account, whether a +// new barebones model, or existing model from the database. +// It handles necessary dereferencing, webfingering etc. +func (d *Dereferencer) enrichAccount( +	ctx context.Context, +	requestUser string, +	uri *url.URL, +	account *gtsmodel.Account, +	apubAcc ap.Accountable, +) (*gtsmodel.Account, ap.Accountable, error) {  	// Pre-fetch a transport for requesting username, used by later deref procedures.  	tsport, err := d.transportController.NewTransportForUsername(ctx, requestUser)  	if err != nil { @@ -476,13 +542,6 @@ func (d *Dereferencer) enrichAccount(ctx context.Context, requestUser string, ur  		// This is new, put it in the database.  		err := d.state.DB.PutAccount(ctx, latestAcc) - -		if errors.Is(err, db.ErrAlreadyExists) { -			// TODO: replace this quick fix with per-URI deref locks. -			latestAcc, err = d.state.DB.GetAccountByURI(ctx, latestAcc.URI) -			return latestAcc, nil, err -		} -  		if err != nil {  			return nil, nil, gtserror.Newf("error putting in database: %w", err)  		} @@ -545,7 +604,8 @@ func (d *Dereferencer) fetchRemoteAccountAvatar(ctx context.Context, tsport tran  	}  	// Acquire lock for derefs map. -	unlock := d.derefAvatarsMu.Lock() +	unlock := d.state.FedLocks.Lock(latestAcc.AvatarRemoteURL) +	unlock = doOnce(unlock)  	defer unlock()  	// Look for an existing dereference in progress. @@ -573,7 +633,7 @@ func (d *Dereferencer) fetchRemoteAccountAvatar(ctx context.Context, tsport tran  		defer func() {  			// On exit safely remove media from map. -			unlock := d.derefAvatarsMu.Lock() +			unlock := d.state.FedLocks.Lock(latestAcc.AvatarRemoteURL)  			delete(d.derefAvatars, latestAcc.AvatarRemoteURL)  			unlock()  		}() @@ -635,7 +695,8 @@ func (d *Dereferencer) fetchRemoteAccountHeader(ctx context.Context, tsport tran  	}  	// Acquire lock for derefs map. -	unlock := d.derefHeadersMu.Lock() +	unlock := d.state.FedLocks.Lock(latestAcc.HeaderRemoteURL) +	unlock = doOnce(unlock)  	defer unlock()  	// Look for an existing dereference in progress. @@ -663,7 +724,7 @@ func (d *Dereferencer) fetchRemoteAccountHeader(ctx context.Context, tsport tran  		defer func() {  			// On exit safely remove media from map. -			unlock := d.derefHeadersMu.Lock() +			unlock := d.state.FedLocks.Lock(latestAcc.HeaderRemoteURL)  			delete(d.derefHeaders, latestAcc.HeaderRemoteURL)  			unlock()  		}() diff --git a/internal/federation/dereferencing/dereferencer.go b/internal/federation/dereferencing/dereferencer.go index a5c68bd80..5bd16c1e0 100644 --- a/internal/federation/dereferencing/dereferencer.go +++ b/internal/federation/dereferencing/dereferencer.go @@ -21,7 +21,6 @@ import (  	"net/url"  	"sync" -	"codeberg.org/gruf/go-mutexes"  	"github.com/superseriousbusiness/gotosocial/internal/media"  	"github.com/superseriousbusiness/gotosocial/internal/state"  	"github.com/superseriousbusiness/gotosocial/internal/transport" @@ -35,14 +34,14 @@ type Dereferencer struct {  	converter           *typeutils.Converter  	transportController transport.Controller  	mediaManager        *media.Manager -	derefAvatars        map[string]*media.ProcessingMedia -	derefAvatarsMu      mutexes.Mutex -	derefHeaders        map[string]*media.ProcessingMedia -	derefHeadersMu      mutexes.Mutex -	derefEmojis         map[string]*media.ProcessingEmoji -	derefEmojisMu       mutexes.Mutex -	handshakes          map[string][]*url.URL -	handshakesMu        sync.Mutex // mutex to lock/unlock when checking or updating the handshakes map + +	// all protected by State{}.FedLocks. +	derefAvatars map[string]*media.ProcessingMedia +	derefHeaders map[string]*media.ProcessingMedia +	derefEmojis  map[string]*media.ProcessingEmoji + +	handshakes   map[string][]*url.URL +	handshakesMu sync.Mutex  }  // NewDereferencer returns a Dereferencer initialized with the given parameters. @@ -61,11 +60,5 @@ func NewDereferencer(  		derefHeaders:        make(map[string]*media.ProcessingMedia),  		derefEmojis:         make(map[string]*media.ProcessingEmoji),  		handshakes:          make(map[string][]*url.URL), - -		// use wrapped mutexes to allow safely deferring unlock -		// even when more granular locks are required (only unlocks once). -		derefAvatarsMu: mutexes.WithSafety(mutexes.New()), -		derefHeadersMu: mutexes.WithSafety(mutexes.New()), -		derefEmojisMu:  mutexes.WithSafety(mutexes.New()),  	}  } diff --git a/internal/federation/dereferencing/emoji.go b/internal/federation/dereferencing/emoji.go index 2d86da663..1bf19d2fd 100644 --- a/internal/federation/dereferencing/emoji.go +++ b/internal/federation/dereferencing/emoji.go @@ -36,8 +36,15 @@ func (d *Dereferencer) GetRemoteEmoji(ctx context.Context, requestingUsername st  		processingEmoji *media.ProcessingEmoji  	) +	// Ensure we have been passed a valid URL. +	derefURI, err := url.Parse(remoteURL) +	if err != nil { +		return nil, fmt.Errorf("GetRemoteEmoji: error parsing url for emoji %s: %s", shortcodeDomain, err) +	} +  	// Acquire lock for derefs map. -	unlock := d.derefEmojisMu.Lock() +	unlock := d.state.FedLocks.Lock(remoteURL) +	unlock = doOnce(unlock)  	defer unlock()  	// first check if we're already processing this emoji @@ -51,11 +58,6 @@ func (d *Dereferencer) GetRemoteEmoji(ctx context.Context, requestingUsername st  			return nil, fmt.Errorf("GetRemoteEmoji: error creating transport to fetch emoji %s: %s", shortcodeDomain, err)  		} -		derefURI, err := url.Parse(remoteURL) -		if err != nil { -			return nil, fmt.Errorf("GetRemoteEmoji: error parsing url for emoji %s: %s", shortcodeDomain, err) -		} -  		dataFunc := func(innerCtx context.Context) (io.ReadCloser, int64, error) {  			return t.DereferenceMedia(innerCtx, derefURI)  		} @@ -75,7 +77,7 @@ func (d *Dereferencer) GetRemoteEmoji(ctx context.Context, requestingUsername st  	defer func() {  		// On exit safely remove emoji from map. -		unlock := d.derefEmojisMu.Lock() +		unlock := d.state.FedLocks.Lock(remoteURL)  		delete(d.derefEmojis, shortcodeDomain)  		unlock()  	}() @@ -95,7 +97,6 @@ func (d *Dereferencer) populateEmojis(ctx context.Context, rawEmojis []*gtsmodel  	// * the shortcode of the emoji  	// * the remote URL of the image  	// This should be enough to dereference the emoji -  	gotEmojis := make([]*gtsmodel.Emoji, 0, len(rawEmojis))  	for _, e := range rawEmojis { diff --git a/internal/federation/dereferencing/status.go b/internal/federation/dereferencing/status.go index adc73e843..712692814 100644 --- a/internal/federation/dereferencing/status.go +++ b/internal/federation/dereferencing/status.go @@ -115,7 +115,7 @@ func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, u  		}  		// Create and pass-through a new bare-bones model for deref. -		return d.enrichStatus(ctx, requestUser, uri, >smodel.Status{ +		return d.enrichStatusSafely(ctx, requestUser, uri, >smodel.Status{  			Local: func() *bool { var false bool; return &false }(),  			URI:   uriStr,  		}, nil) @@ -131,7 +131,7 @@ func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, u  	}  	// Try to update + deref existing status model. -	latest, apubStatus, err := d.enrichStatus(ctx, +	latest, apubStatus, err := d.enrichStatusSafely(ctx,  		requestUser,  		uri,  		status, @@ -140,10 +140,6 @@ func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, u  	if err != nil {  		log.Errorf(ctx, "error enriching remote status: %v", err) -		// Update fetch-at to slow re-attempts. -		status.FetchedAt = time.Now() -		_ = d.state.DB.UpdateStatus(ctx, status, "fetched_at") -  		// Fallback to existing.  		return status, nil, nil  	} @@ -166,8 +162,8 @@ func (d *Dereferencer) RefreshStatus(ctx context.Context, requestUser string, st  		return nil, nil, gtserror.Newf("invalid status uri %q: %w", status.URI, err)  	} -	// Try to update + deref existing status model. -	latest, apubStatus, err := d.enrichStatus(ctx, +	// Try to update + deref the passed status model. +	latest, apubStatus, err := d.enrichStatusSafely(ctx,  		requestUser,  		uri,  		status, @@ -189,7 +185,7 @@ func (d *Dereferencer) RefreshStatus(ctx context.Context, requestUser string, st  // This is a more optimized form of manually enqueueing .UpdateStatus() to the federation worker, since it only enqueues update if necessary.  func (d *Dereferencer) RefreshStatusAsync(ctx context.Context, requestUser string, status *gtsmodel.Status, apubStatus ap.Statusable, force bool) {  	// Check whether needs update. -	if statusUpToDate(status) { +	if !force && statusUpToDate(status) {  		return  	} @@ -202,17 +198,81 @@ func (d *Dereferencer) RefreshStatusAsync(ctx context.Context, requestUser strin  	// Enqueue a worker function to re-fetch this status async.  	d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { -		latest, apubStatus, err := d.enrichStatus(ctx, requestUser, uri, status, apubStatus) +		latest, apubStatus, err := d.enrichStatusSafely(ctx, requestUser, uri, status, apubStatus)  		if err != nil {  			log.Errorf(ctx, "error enriching remote status: %v", err)  			return  		} -		// This status was updated, re-dereference the whole thread. -		d.dereferenceThread(ctx, requestUser, uri, latest, apubStatus) +		if apubStatus != nil { +			// This status was updated, re-dereference the whole thread. +			d.dereferenceThread(ctx, requestUser, uri, latest, apubStatus) +		}  	})  } +// enrichStatusSafely wraps enrichStatus() to perform +// it within the State{}.FedLocks mutexmap, which protects +// dereferencing actions with per-URI mutex locks. +func (d *Dereferencer) enrichStatusSafely( +	ctx context.Context, +	requestUser string, +	uri *url.URL, +	status *gtsmodel.Status, +	apubStatus ap.Statusable, +) (*gtsmodel.Status, ap.Statusable, error) { +	uriStr := status.URI + +	if status.ID != "" { +		// This is an existing status, first try to populate it. This +		// is required by the checks below for existing tags, media etc. +		if err := d.state.DB.PopulateStatus(ctx, status); err != nil { +			log.Errorf(ctx, "error populating existing status %s: %v", uriStr, err) +		} +	} + +	// Acquire per-URI deref lock, wraping unlock +	// to safely defer in case of panic, while still +	// performing more granular unlocks when needed. +	unlock := d.state.FedLocks.Lock(uriStr) +	unlock = doOnce(unlock) +	defer unlock() + +	// Perform status enrichment with passed vars. +	latest, apubStatus, err := d.enrichStatus(ctx, +		requestUser, +		uri, +		status, +		apubStatus, +	) + +	if gtserror.StatusCode(err) >= 400 { +		// Update fetch-at to slow re-attempts. +		status.FetchedAt = time.Now() +		_ = d.state.DB.UpdateStatus(ctx, status, "fetched_at") +	} + +	// Unlock now +	// we're done. +	unlock() + +	if errors.Is(err, db.ErrAlreadyExists) { +		// Ensure AP model isn't set, +		// otherwise this indicates WE +		// enriched the status. +		apubStatus = nil + +		// DATA RACE! We likely lost out to another goroutine +		// in a call to db.Put(Status). Look again in DB by URI. +		latest, err = d.state.DB.GetStatusByURI(ctx, status.URI) +		if err != nil { +			err = gtserror.Newf("error getting status %s from database after race: %w", uriStr, err) +		} +	} + +	return latest, apubStatus, err +} +  // enrichStatus will enrich the given status, whether a new  // barebones model, or existing model from the database.  // It handles necessary dereferencing, database updates, etc. @@ -258,15 +318,10 @@ func (d *Dereferencer) enrichStatus(  		return nil, nil, gtserror.New("attributedTo was empty")  	} -	// Ensure we have the author account of the status dereferenced (+ up-to-date). -	if author, _, err := d.getAccountByURI(ctx, requestUser, attributedTo); err != nil { -		if status.AccountID == "" { -			// Provided status account is nil, i.e. this is a new status / author, so a deref fail is unrecoverable. -			return nil, nil, gtserror.Newf("failed to dereference status author %s: %w", uri, err) -		} -	} else if status.AccountID != "" && status.AccountID != author.ID { -		// There already existed an account for this status author, but account ID changed. This shouldn't happen! -		log.Warnf(ctx, "status author account ID changed: old=%s new=%s", status.AccountID, author.ID) +	// Ensure we have the author account of the status dereferenced (+ up-to-date). If this is a new status +	// (i.e. status.AccountID == "") then any error here is irrecoverable. AccountID must ALWAYS be set. +	if _, _, err := d.getAccountByURI(ctx, requestUser, attributedTo); err != nil && status.AccountID == "" { +		return nil, nil, gtserror.Newf("failed to dereference status author %s: %w", uri, err)  	}  	// ActivityPub model was recently dereferenced, so assume that passed status @@ -303,7 +358,7 @@ func (d *Dereferencer) enrichStatus(  	}  	// Ensure the status' tags are populated, (changes are expected / okay). -	if err := d.fetchStatusTags(ctx, latestStatus); err != nil { +	if err := d.fetchStatusTags(ctx, status, latestStatus); err != nil {  		return nil, nil, gtserror.Newf("error populating tags for status %s: %w", uri, err)  	} @@ -323,13 +378,6 @@ func (d *Dereferencer) enrichStatus(  		//  		// This is new, put the status in the database.  		err := d.state.DB.PutStatus(ctx, latestStatus) - -		if errors.Is(err, db.ErrAlreadyExists) { -			// TODO: replace this quick fix with per-URI deref locks. -			latestStatus, err = d.state.DB.GetStatusByURI(ctx, latestStatus.URI) -			return latestStatus, nil, err -		} -  		if err != nil {  			return nil, nil, gtserror.Newf("error putting in database: %w", err)  		} @@ -545,36 +593,41 @@ func (d *Dereferencer) threadStatus(ctx context.Context, status *gtsmodel.Status  	return nil  } -func (d *Dereferencer) fetchStatusTags(ctx context.Context, status *gtsmodel.Status) error { +func (d *Dereferencer) fetchStatusTags(ctx context.Context, existing, status *gtsmodel.Status) error {  	// Allocate new slice to take the yet-to-be determined tag IDs.  	status.TagIDs = make([]string, len(status.Tags))  	for i := range status.Tags { -		placeholder := status.Tags[i] +		tag := status.Tags[i] + +		// Look for tag in existing status with name. +		existing, ok := existing.GetTagByName(tag.Name) +		if ok && existing.ID != "" { +			status.Tags[i] = existing +			status.TagIDs[i] = existing.ID +			continue +		} -		// Look for existing tag with this name first. -		tag, err := d.state.DB.GetTagByName(ctx, placeholder.Name) +		// Look for existing tag with name in the database. +		existing, err := d.state.DB.GetTagByName(ctx, tag.Name)  		if err != nil && !errors.Is(err, db.ErrNoEntries) { -			log.Errorf(ctx, "db error getting tag %s: %v", tag.Name, err) +			return gtserror.Newf("db error getting tag %s: %w", tag.Name, err) +		} else if existing != nil { +			status.Tags[i] = existing +			status.TagIDs[i] = existing.ID  			continue  		} -		if tag == nil { -			// Create new ID for tag name. -			tag = >smodel.Tag{ -				ID:   id.NewULID(), -				Name: placeholder.Name, -			} +		// Create new ID for tag. +		tag.ID = id.NewULID() -			// Insert this tag with new name into the database. -			if err := d.state.DB.PutTag(ctx, tag); err != nil { -				log.Errorf(ctx, "db error putting tag %s: %v", tag.Name, err) -				continue -			} +		// Insert this tag with new name into the database. +		if err := d.state.DB.PutTag(ctx, tag); err != nil { +			log.Errorf(ctx, "db error putting tag %s: %v", tag.Name, err) +			continue  		} -		// Set the *new* tag and ID. -		status.Tags[i] = tag +		// Set new tag ID in slice.  		status.TagIDs[i] = tag.ID  	} @@ -600,10 +653,10 @@ func (d *Dereferencer) fetchStatusAttachments(ctx context.Context, tsport transp  	status.AttachmentIDs = make([]string, len(status.Attachments))  	for i := range status.Attachments { -		placeholder := status.Attachments[i] +		attachment := status.Attachments[i]  		// Look for existing media attachment with remoet URL first. -		existing, ok := existing.GetAttachmentByRemoteURL(placeholder.RemoteURL) +		existing, ok := existing.GetAttachmentByRemoteURL(attachment.RemoteURL)  		if ok && existing.ID != "" && *existing.Cached {  			status.Attachments[i] = existing  			status.AttachmentIDs[i] = existing.ID @@ -611,9 +664,9 @@ func (d *Dereferencer) fetchStatusAttachments(ctx context.Context, tsport transp  		}  		// Ensure a valid media attachment remote URL. -		remoteURL, err := url.Parse(placeholder.RemoteURL) +		remoteURL, err := url.Parse(attachment.RemoteURL)  		if err != nil { -			log.Errorf(ctx, "invalid remote media url %q: %v", placeholder.RemoteURL, err) +			log.Errorf(ctx, "invalid remote media url %q: %v", attachment.RemoteURL, err)  			continue  		} @@ -622,9 +675,9 @@ func (d *Dereferencer) fetchStatusAttachments(ctx context.Context, tsport transp  			return tsport.DereferenceMedia(ctx, remoteURL)  		}, status.AccountID, &media.AdditionalMediaInfo{  			StatusID:    &status.ID, -			RemoteURL:   &placeholder.RemoteURL, -			Description: &placeholder.Description, -			Blurhash:    &placeholder.Blurhash, +			RemoteURL:   &attachment.RemoteURL, +			Description: &attachment.Description, +			Blurhash:    &attachment.Blurhash,  		})  		if err != nil {  			log.Errorf(ctx, "error processing attachment: %v", err) @@ -632,15 +685,15 @@ func (d *Dereferencer) fetchStatusAttachments(ctx context.Context, tsport transp  		}  		// Force attachment loading *right now*. -		media, err := processing.LoadAttachment(ctx) +		attachment, err = processing.LoadAttachment(ctx)  		if err != nil {  			log.Errorf(ctx, "error loading attachment: %v", err)  			continue  		}  		// Set the *new* attachment and ID. -		status.Attachments[i] = media -		status.AttachmentIDs[i] = media.ID +		status.Attachments[i] = attachment +		status.AttachmentIDs[i] = attachment.ID  	}  	for i := 0; i < len(status.AttachmentIDs); { diff --git a/internal/federation/dereferencing/error.go b/internal/federation/dereferencing/util.go index 6a1ce0a6e..e69aeec3b 100644 --- a/internal/federation/dereferencing/error.go +++ b/internal/federation/dereferencing/util.go @@ -16,3 +16,14 @@  // along with this program.  If not, see <http://www.gnu.org/licenses/>.  package dereferencing + +// doOnce wraps a function to only perform it once. +func doOnce(fn func()) func() { +	var once int32 +	return func() { +		if once == 0 { +			fn() +			once = 1 +		} +	} +} diff --git a/internal/federation/federatingdb/db.go b/internal/federation/federatingdb/db.go index c412ba3f8..8e98dc2f2 100644 --- a/internal/federation/federatingdb/db.go +++ b/internal/federation/federatingdb/db.go @@ -20,7 +20,6 @@ package federatingdb  import (  	"context" -	"codeberg.org/gruf/go-mutexes"  	"github.com/superseriousbusiness/activity/pub"  	"github.com/superseriousbusiness/activity/streams/vocab"  	"github.com/superseriousbusiness/gotosocial/internal/state" @@ -40,7 +39,6 @@ type DB interface {  // FederatingDB uses the underlying DB interface to implement the go-fed pub.Database interface.  // It doesn't care what the underlying implementation of the DB interface is, as long as it works.  type federatingDB struct { -	locks     mutexes.MutexMap  	state     *state.State  	converter *typeutils.Converter  } @@ -48,7 +46,6 @@ type federatingDB struct {  // New returns a DB interface using the given database and config  func New(state *state.State, converter *typeutils.Converter) DB {  	fdb := federatingDB{ -		locks:     mutexes.NewMap(-1, -1), // use defaults  		state:     state,  		converter: converter,  	} diff --git a/internal/federation/federatingdb/lock.go b/internal/federation/federatingdb/lock.go index 900a282af..5353aea91 100644 --- a/internal/federation/federatingdb/lock.go +++ b/internal/federation/federatingdb/lock.go @@ -19,7 +19,6 @@ package federatingdb  import (  	"context" -	"errors"  	"net/url"  ) @@ -35,9 +34,5 @@ import (  //  // Used to ensure race conditions in multiple requests do not occur.  func (f *federatingDB) Lock(c context.Context, id *url.URL) (func(), error) { -	if id == nil { -		return nil, errors.New("Lock: id was nil") -	} -	unlock := f.locks.Lock(id.String()) -	return unlock, nil +	return f.state.FedLocks.Lock("federatingDB " + id.String()), nil // id should NEVER be nil.  } diff --git a/internal/gtsmodel/status.go b/internal/gtsmodel/status.go index fe8aa4a7b..81b5b029d 100644 --- a/internal/gtsmodel/status.go +++ b/internal/gtsmodel/status.go @@ -18,11 +18,8 @@  package gtsmodel  import ( -	"time" -  	"slices" - -	"github.com/superseriousbusiness/gotosocial/internal/log" +	"time"  )  // Status represents a user-created 'post' or 'status' in the database, either remote or local @@ -91,40 +88,14 @@ func (s *Status) GetBoostOfAccountID() string {  	return s.BoostOfAccountID  } -func (s *Status) GetAttachmentByID(id string) (*MediaAttachment, bool) { -	for _, media := range s.Attachments { -		if media == nil { -			log.Warnf(nil, "nil attachment in slice for status %s", s.URI) -			continue -		} -		if media.ID == id { -			return media, true -		} -	} -	return nil, false -} - -func (s *Status) GetAttachmentByRemoteURL(url string) (*MediaAttachment, bool) { -	for _, media := range s.Attachments { -		if media == nil { -			log.Warnf(nil, "nil attachment in slice for status %s", s.URI) -			continue -		} -		if media.RemoteURL == url { -			return media, true -		} -	} -	return nil, false -} -  // AttachmentsPopulated returns whether media attachments are populated according to current AttachmentIDs.  func (s *Status) AttachmentsPopulated() bool {  	if len(s.AttachmentIDs) != len(s.Attachments) {  		// this is the quickest indicator.  		return false  	} -	for _, id := range s.AttachmentIDs { -		if _, ok := s.GetAttachmentByID(id); !ok { +	for i, id := range s.AttachmentIDs { +		if s.Attachments[i].ID != id {  			return false  		}  	} @@ -137,55 +108,22 @@ func (s *Status) TagsPopulated() bool {  		// this is the quickest indicator.  		return false  	} - -	// Tags must be in same order.  	for i, id := range s.TagIDs { -		if s.Tags[i] == nil { -			log.Warnf(nil, "nil tag in slice for status %s", s.URI) -			continue -		}  		if s.Tags[i].ID != id {  			return false  		}  	} -  	return true  } -func (s *Status) GetMentionByID(id string) (*Mention, bool) { -	for _, mention := range s.Mentions { -		if mention == nil { -			log.Warnf(nil, "nil mention in slice for status %s", s.URI) -			continue -		} -		if mention.ID == id { -			return mention, true -		} -	} -	return nil, false -} - -func (s *Status) GetMentionByTargetURI(uri string) (*Mention, bool) { -	for _, mention := range s.Mentions { -		if mention == nil { -			log.Warnf(nil, "nil mention in slice for status %s", s.URI) -			continue -		} -		if mention.TargetAccountURI == uri { -			return mention, true -		} -	} -	return nil, false -} -  // MentionsPopulated returns whether mentions are populated according to current MentionIDs.  func (s *Status) MentionsPopulated() bool {  	if len(s.MentionIDs) != len(s.Mentions) {  		// this is the quickest indicator.  		return false  	} -	for _, id := range s.MentionIDs { -		if _, ok := s.GetMentionByID(id); !ok { +	for i, id := range s.MentionIDs { +		if s.Mentions[i].ID != id {  			return false  		}  	} @@ -198,18 +136,11 @@ func (s *Status) EmojisPopulated() bool {  		// this is the quickest indicator.  		return false  	} - -	// Emojis must be in same order.  	for i, id := range s.EmojiIDs { -		if s.Emojis[i] == nil { -			log.Warnf(nil, "nil emoji in slice for status %s", s.URI) -			continue -		}  		if s.Emojis[i].ID != id {  			return false  		}  	} -  	return true  } @@ -221,26 +152,42 @@ func (s *Status) EmojisUpToDate(other *Status) bool {  		// this is the quickest indicator.  		return false  	} - -	// Emojis must be in same order.  	for i := range s.Emojis { -		if s.Emojis[i] == nil { -			log.Warnf(nil, "nil emoji in slice for status %s", s.URI) +		if s.Emojis[i].URI != other.Emojis[i].URI {  			return false  		} +	} +	return true +} -		if other.Emojis[i] == nil { -			log.Warnf(nil, "nil emoji in slice for status %s", other.URI) -			return false +// GetAttachmentByRemoteURL searches status for MediaAttachment{} with remote URL. +func (s *Status) GetAttachmentByRemoteURL(url string) (*MediaAttachment, bool) { +	for _, media := range s.Attachments { +		if media.RemoteURL == url { +			return media, true  		} +	} +	return nil, false +} -		if s.Emojis[i].URI != other.Emojis[i].URI { -			// Emoji URI has changed, not up-to-date! -			return false +// GetMentionByTargetURI searches status for Mention{} with target URI. +func (s *Status) GetMentionByTargetURI(uri string) (*Mention, bool) { +	for _, mention := range s.Mentions { +		if mention.TargetAccountURI == uri { +			return mention, true  		}  	} +	return nil, false +} -	return true +// GetTagByName searches status for Tag{} with name. +func (s *Status) GetTagByName(name string) (*Tag, bool) { +	for _, tag := range s.Tags { +		if tag.Name == name { +			return tag, true +		} +	} +	return nil, false  }  // MentionsAccount returns whether status mentions the given account ID. diff --git a/internal/state/state.go b/internal/state/state.go index 6ff1baa52..7cd0406b0 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -18,6 +18,7 @@  package state  import ( +	"codeberg.org/gruf/go-mutexes"  	"github.com/superseriousbusiness/gotosocial/internal/cache"  	"github.com/superseriousbusiness/gotosocial/internal/db"  	"github.com/superseriousbusiness/gotosocial/internal/storage" @@ -41,6 +42,11 @@ type State struct {  	// DB provides access to the database.  	DB db.DB +	// FedLocks provides access to this state's mutex map +	// of per URI federation locks. Used during dereferencing +	// and by the go-fed/activity library. +	FedLocks mutexes.MutexMap +  	// Storage provides access to the storage driver.  	Storage *storage.Driver  | 
