diff options
55 files changed, 1484 insertions, 1050 deletions
| diff --git a/cmd/gotosocial/action/server/server.go b/cmd/gotosocial/action/server/server.go index b33688800..e4158ea88 100644 --- a/cmd/gotosocial/action/server/server.go +++ b/cmd/gotosocial/action/server/server.go @@ -116,7 +116,7 @@ var Start action.GTSAction = func(ctx context.Context) error {  	typeConverter := typeutils.NewConverter(dbService)  	federatingDB := federatingdb.New(&state, typeConverter)  	transportController := transport.NewController(&state, federatingDB, &federation.Clock{}, client) -	federator := federation.NewFederator(dbService, federatingDB, transportController, typeConverter, mediaManager) +	federator := federation.NewFederator(&state, federatingDB, transportController, typeConverter, mediaManager)  	// decide whether to create a noop email sender (won't send emails) or a real one  	var emailSender email.Sender diff --git a/internal/cache/cache.go b/internal/cache/cache.go index e981d79b1..510b6eb53 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -19,6 +19,7 @@ package cache  import (  	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +	"github.com/superseriousbusiness/gotosocial/internal/log"  )  type Caches struct { @@ -41,6 +42,8 @@ type Caches struct {  // Init will (re)initialize both the GTS and AP cache collections.  // NOTE: the cache MUST NOT be in use anywhere, this is not thread-safe.  func (c *Caches) Init() { +	log.Infof(nil, "init: %p", c) +  	c.GTS.Init()  	c.AP.Init()  	c.Visibility.Init() @@ -52,6 +55,8 @@ func (c *Caches) Init() {  // Start will start both the GTS and AP cache collections.  func (c *Caches) Start() { +	log.Infof(nil, "start: %p", c) +  	c.GTS.Start()  	c.AP.Start()  	c.Visibility.Start() @@ -59,6 +64,8 @@ func (c *Caches) Start() {  // Stop will stop both the GTS and AP cache collections.  func (c *Caches) Stop() { +	log.Infof(nil, "stop: %p", c) +  	c.GTS.Stop()  	c.AP.Stop()  	c.Visibility.Stop() diff --git a/internal/cache/util.go b/internal/cache/util.go index ba4a43fdd..2d7badf68 100644 --- a/internal/cache/util.go +++ b/internal/cache/util.go @@ -74,8 +74,14 @@ func tryStop[ValueType any](cache *result.Cache[ValueType], sweep time.Duration)  func tryUntil(msg string, count int, do func() bool) {  	for i := 0; i < count; i++ {  		if do() { +			// success.  			return  		} + +		// Sleep for a little before retry (a bcakoff). +		time.Sleep(time.Millisecond * 1 << (i + 1))  	} + +	// panic on total failure as this shouldn't happen.  	log.Panicf(nil, "failed %s after %d tries", msg, count)  } diff --git a/internal/db/bundb/account.go b/internal/db/bundb/account.go index 56d46a232..f7e243f47 100644 --- a/internal/db/bundb/account.go +++ b/internal/db/bundb/account.go @@ -302,7 +302,7 @@ func (a *accountDB) UpdateAccount(ctx context.Context, account *gtsmodel.Account  		columns = append(columns, "updated_at")  	} -	err := a.state.Caches.GTS.Account().Store(account, func() error { +	return a.state.Caches.GTS.Account().Store(account, func() error {  		// It is safe to run this database transaction within cache.Store  		// as the cache does not attempt a mutex lock until AFTER hook.  		// @@ -338,15 +338,23 @@ func (a *accountDB) UpdateAccount(ctx context.Context, account *gtsmodel.Account  			return err  		})  	}) -	if err != nil { -		return err -	} - -	return nil  }  func (a *accountDB) DeleteAccount(ctx context.Context, id string) db.Error { -	if err := a.conn.RunInTx(ctx, func(tx bun.Tx) error { +	defer a.state.Caches.GTS.Account().Invalidate("ID", id) + +	// Load account into cache before attempting a delete, +	// as we need it cached in order to trigger the invalidate +	// callback. This in turn invalidates others. +	_, err := a.GetAccountByID(gtscontext.SetBarebones(ctx), id) +	if err != nil && !errors.Is(err, db.ErrNoEntries) { +		// NOTE: even if db.ErrNoEntries is returned, we +		// still run the below transaction to ensure related +		// objects are appropriately deleted. +		return err +	} + +	return a.conn.RunInTx(ctx, func(tx bun.Tx) error {  		// clear out any emoji links  		if _, err := tx.  			NewDelete(). @@ -363,14 +371,7 @@ func (a *accountDB) DeleteAccount(ctx context.Context, id string) db.Error {  			Where("? = ?", bun.Ident("account.id"), id).  			Exec(ctx)  		return err -	}); err != nil { -		return err -	} - -	// Invalidate account from database lookups. -	a.state.Caches.GTS.Account().Invalidate("ID", id) - -	return nil +	})  }  func (a *accountDB) GetAccountLastPosted(ctx context.Context, accountID string, webOnly bool) (time.Time, db.Error) { diff --git a/internal/db/bundb/bundb_test.go b/internal/db/bundb/bundb_test.go index e6d482ac1..2566be2ba 100644 --- a/internal/db/bundb/bundb_test.go +++ b/internal/db/bundb/bundb_test.go @@ -66,9 +66,9 @@ func (suite *BunDBStandardTestSuite) SetupSuite() {  }  func (suite *BunDBStandardTestSuite) SetupTest() { -	suite.state.Caches.Init()  	testrig.InitTestConfig()  	testrig.InitTestLog() +	suite.state.Caches.Init()  	suite.db = testrig.NewTestDB(&suite.state)  	testrig.StandardDBSetup(suite.db, suite.testAccounts)  } diff --git a/internal/db/bundb/emoji.go b/internal/db/bundb/emoji.go index 0c72be9d3..60b8fc12b 100644 --- a/internal/db/bundb/emoji.go +++ b/internal/db/bundb/emoji.go @@ -19,10 +19,12 @@ package bundb  import (  	"context" +	"errors"  	"strings"  	"time"  	"github.com/superseriousbusiness/gotosocial/internal/db" +	"github.com/superseriousbusiness/gotosocial/internal/gtscontext"  	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"  	"github.com/superseriousbusiness/gotosocial/internal/log"  	"github.com/superseriousbusiness/gotosocial/internal/state" @@ -56,24 +58,46 @@ func (e *emojiDB) PutEmoji(ctx context.Context, emoji *gtsmodel.Emoji) db.Error  }  func (e *emojiDB) UpdateEmoji(ctx context.Context, emoji *gtsmodel.Emoji, columns ...string) (*gtsmodel.Emoji, db.Error) { -	// Update the emoji's last-updated  	emoji.UpdatedAt = time.Now() +	if len(columns) > 0 { +		// If we're updating by column, ensure "updated_at" is included. +		columns = append(columns, "updated_at") +	} -	if _, err := e.conn. -		NewUpdate(). -		Model(emoji). -		Where("? = ?", bun.Ident("emoji.id"), emoji.ID). -		Column(columns...). -		Exec(ctx); err != nil { -		return nil, e.conn.ProcessError(err) +	err := e.state.Caches.GTS.Emoji().Store(emoji, func() error { +		_, err := e.conn. +			NewUpdate(). +			Model(emoji). +			Where("? = ?", bun.Ident("emoji.id"), emoji.ID). +			Column(columns...). +			Exec(ctx) +		return e.conn.ProcessError(err) +	}) +	if err != nil { +		return nil, err  	} -	e.state.Caches.GTS.Emoji().Invalidate("ID", emoji.ID)  	return emoji, nil  }  func (e *emojiDB) DeleteEmojiByID(ctx context.Context, id string) db.Error { -	if err := e.conn.RunInTx(ctx, func(tx bun.Tx) error { +	defer e.state.Caches.GTS.Emoji().Invalidate("ID", id) + +	// Load emoji into cache before attempting a delete, +	// as we need it cached in order to trigger the invalidate +	// callback. This in turn invalidates others. +	_, err := e.GetEmojiByID( +		gtscontext.SetBarebones(ctx), +		id, +	) +	if err != nil && !errors.Is(err, db.ErrNoEntries) { +		// NOTE: even if db.ErrNoEntries is returned, we +		// still run the below transaction to ensure related +		// objects are appropriately deleted. +		return err +	} + +	return e.conn.RunInTx(ctx, func(tx bun.Tx) error {  		// delete links between this emoji and any statuses that use it  		if _, err := tx.  			NewDelete(). @@ -101,12 +125,7 @@ func (e *emojiDB) DeleteEmojiByID(ctx context.Context, id string) db.Error {  		}  		return nil -	}); err != nil { -		return err -	} - -	e.state.Caches.GTS.Emoji().Invalidate("ID", id) -	return nil +	})  }  func (e *emojiDB) GetEmojis(ctx context.Context, domain string, includeDisabled bool, includeEnabled bool, shortcode string, maxShortcodeDomain string, minShortcodeDomain string, limit int) ([]*gtsmodel.Emoji, db.Error) { diff --git a/internal/db/bundb/errors.go b/internal/db/bundb/errors.go index 8de42b6aa..6236b82d8 100644 --- a/internal/db/bundb/errors.go +++ b/internal/db/bundb/errors.go @@ -52,7 +52,8 @@ func processSQLiteError(err error) db.Error {  	// Handle supplied error code:  	switch sqliteErr.Code() { -	case sqlite3.SQLITE_CONSTRAINT_UNIQUE, sqlite3.SQLITE_CONSTRAINT_PRIMARYKEY: +	case sqlite3.SQLITE_CONSTRAINT_UNIQUE, +		sqlite3.SQLITE_CONSTRAINT_PRIMARYKEY:  		return db.ErrAlreadyExists  	default:  		return err diff --git a/internal/db/bundb/media.go b/internal/db/bundb/media.go index d17d64b35..b64447beb 100644 --- a/internal/db/bundb/media.go +++ b/internal/db/bundb/media.go @@ -19,9 +19,11 @@ package bundb  import (  	"context" +	"errors"  	"time"  	"github.com/superseriousbusiness/gotosocial/internal/db" +	"github.com/superseriousbusiness/gotosocial/internal/gtscontext"  	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"  	"github.com/superseriousbusiness/gotosocial/internal/log"  	"github.com/superseriousbusiness/gotosocial/internal/state" @@ -103,17 +105,26 @@ func (m *mediaDB) UpdateAttachment(ctx context.Context, media *gtsmodel.MediaAtt  }  func (m *mediaDB) DeleteAttachment(ctx context.Context, id string) error { -	// Attempt to delete from database. -	if _, err := m.conn.NewDelete(). -		TableExpr("? AS ?", bun.Ident("media_attachments"), bun.Ident("media_attachment")). -		Where("? = ?", bun.Ident("media_attachment.id"), id). -		Exec(ctx); err != nil { -		return m.conn.ProcessError(err) +	defer m.state.Caches.GTS.Media().Invalidate("ID", id) + +	// Load media into cache before attempting a delete, +	// as we need it cached in order to trigger the invalidate +	// callback. This in turn invalidates others. +	_, err := m.GetAttachmentByID(gtscontext.SetBarebones(ctx), id) +	if err != nil { +		if errors.Is(err, db.ErrNoEntries) { +			// not an issue. +			err = nil +		} +		return err  	} -	// Invalidate this media item from the cache. -	m.state.Caches.GTS.Media().Invalidate("ID", id) -	return nil +	// Finally delete media from DB. +	_, err = m.conn.NewDelete(). +		TableExpr("? AS ?", bun.Ident("media_attachments"), bun.Ident("media_attachment")). +		Where("? = ?", bun.Ident("media_attachment.id"), id). +		Exec(ctx) +	return m.conn.ProcessError(err)  }  func (m *mediaDB) GetRemoteOlderThan(ctx context.Context, olderThan time.Time, limit int) ([]*gtsmodel.MediaAttachment, db.Error) { diff --git a/internal/db/bundb/mention.go b/internal/db/bundb/mention.go index e64d6dac4..9a41eb3b8 100644 --- a/internal/db/bundb/mention.go +++ b/internal/db/bundb/mention.go @@ -19,6 +19,7 @@ package bundb  import (  	"context" +	"errors"  	"fmt"  	"github.com/superseriousbusiness/gotosocial/internal/db" @@ -109,16 +110,24 @@ func (m *mentionDB) PutMention(ctx context.Context, mention *gtsmodel.Mention) e  }  func (m *mentionDB) DeleteMentionByID(ctx context.Context, id string) error { -	if _, err := m.conn. -		NewDelete(). -		Table("mentions"). -		Where("? = ?", bun.Ident("id"), id). -		Exec(ctx); err != nil { -		return m.conn.ProcessError(err) -	} +	defer m.state.Caches.GTS.Mention().Invalidate("ID", id) -	// Invalidate mention from the lookup cache. -	m.state.Caches.GTS.Mention().Invalidate("ID", id) +	// Load mention into cache before attempting a delete, +	// as we need it cached in order to trigger the invalidate +	// callback. This in turn invalidates others. +	_, err := m.GetMention(gtscontext.SetBarebones(ctx), id) +	if err != nil { +		if errors.Is(err, db.ErrNoEntries) { +			// not an issue. +			err = nil +		} +		return err +	} -	return nil +	// Finally delete mention from DB. +	_, err = m.conn.NewDelete(). +		Table("mentions"). +		Where("? = ?", bun.Ident("id"), id). +		Exec(ctx) +	return m.conn.ProcessError(err)  } diff --git a/internal/db/bundb/migrations/20230511181430_add_status_fetched_at.go b/internal/db/bundb/migrations/20230511181430_add_status_fetched_at.go new file mode 100644 index 000000000..cc41b518e --- /dev/null +++ b/internal/db/bundb/migrations/20230511181430_add_status_fetched_at.go @@ -0,0 +1,47 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. + +package migrations + +import ( +	"context" +	"strings" + +	"github.com/uptrace/bun" +) + +func init() { +	up := func(ctx context.Context, db *bun.DB) error { +		return db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { +			_, err := tx.ExecContext(ctx, "ALTER TABLE ? ADD COLUMN ? TIMESTAMPTZ", bun.Ident("statuses"), bun.Ident("fetched_at")) +			if err != nil && !(strings.Contains(err.Error(), "already exists") || strings.Contains(err.Error(), "duplicate column name") || strings.Contains(err.Error(), "SQLSTATE 42701")) { +				return err +			} +			return nil +		}) +	} + +	down := func(ctx context.Context, db *bun.DB) error { +		return db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { +			return nil +		}) +	} + +	if err := Migrations.Register(up, down); err != nil { +		panic(err) +	} +} diff --git a/internal/db/bundb/notification.go b/internal/db/bundb/notification.go index f2ff60b9a..277a935fd 100644 --- a/internal/db/bundb/notification.go +++ b/internal/db/bundb/notification.go @@ -22,6 +22,7 @@ import (  	"errors"  	"github.com/superseriousbusiness/gotosocial/internal/db" +	"github.com/superseriousbusiness/gotosocial/internal/gtscontext"  	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"  	"github.com/superseriousbusiness/gotosocial/internal/id"  	"github.com/superseriousbusiness/gotosocial/internal/log" @@ -179,16 +180,26 @@ func (n *notificationDB) PutNotification(ctx context.Context, notif *gtsmodel.No  }  func (n *notificationDB) DeleteNotificationByID(ctx context.Context, id string) db.Error { -	if _, err := n.conn. -		NewDelete(). -		TableExpr("? AS ?", bun.Ident("notifications"), bun.Ident("notification")). -		Where("? = ?", bun.Ident("notification.id"), id). -		Exec(ctx); err != nil { -		return n.conn.ProcessError(err) +	defer n.state.Caches.GTS.Notification().Invalidate("ID", id) + +	// Load notif into cache before attempting a delete, +	// as we need it cached in order to trigger the invalidate +	// callback. This in turn invalidates others. +	_, err := n.GetNotificationByID(gtscontext.SetBarebones(ctx), id) +	if err != nil { +		if errors.Is(err, db.ErrNoEntries) { +			// not an issue. +			err = nil +		} +		return err  	} -	n.state.Caches.GTS.Notification().Invalidate("ID", id) -	return nil +	// Finally delete notif from DB. +	_, err = n.conn.NewDelete(). +		TableExpr("? AS ?", bun.Ident("notifications"), bun.Ident("notification")). +		Where("? = ?", bun.Ident("notification.id"), id). +		Exec(ctx) +	return n.conn.ProcessError(err)  }  func (n *notificationDB) DeleteNotifications(ctx context.Context, types []string, targetAccountID string, originAccountID string) db.Error { @@ -196,56 +207,88 @@ func (n *notificationDB) DeleteNotifications(ctx context.Context, types []string  		return errors.New("DeleteNotifications: one of targetAccountID or originAccountID must be set")  	} -	// Capture notification IDs in a RETURNING statement. -	var ids []string +	var notifIDs []string  	q := n.conn. -		NewDelete(). -		TableExpr("? AS ?", bun.Ident("notifications"), bun.Ident("notification")). -		Returning("?", bun.Ident("id")) +		NewSelect(). +		Column("id"). +		Table("notifications")  	if len(types) > 0 { -		q = q.Where("? IN (?)", bun.Ident("notification.notification_type"), bun.In(types)) +		q = q.Where("? IN (?)", bun.Ident("notification_type"), bun.In(types))  	}  	if targetAccountID != "" { -		q = q.Where("? = ?", bun.Ident("notification.target_account_id"), targetAccountID) +		q = q.Where("? = ?", bun.Ident("target_account_id"), targetAccountID)  	}  	if originAccountID != "" { -		q = q.Where("? = ?", bun.Ident("notification.origin_account_id"), originAccountID) +		q = q.Where("? = ?", bun.Ident("origin_account_id"), originAccountID)  	} -	if _, err := q.Exec(ctx, &ids); err != nil { +	if _, err := q.Exec(ctx, ¬ifIDs); err != nil {  		return n.conn.ProcessError(err)  	} -	// Invalidate each returned ID. -	for _, id := range ids { -		n.state.Caches.GTS.Notification().Invalidate("ID", id) +	defer func() { +		// Invalidate all IDs on return. +		for _, id := range notifIDs { +			n.state.Caches.GTS.Notification().Invalidate("ID", id) +		} +	}() + +	// Load all notif into cache, this *really* isn't great +	// but it is the only way we can ensure we invalidate all +	// related caches correctly (e.g. visibility). +	for _, id := range notifIDs { +		_, err := n.GetNotificationByID(ctx, id) +		if err != nil && !errors.Is(err, db.ErrNoEntries) { +			return err +		}  	} -	return nil +	// Finally delete all from DB. +	_, err := n.conn.NewDelete(). +		Table("notifications"). +		Where("? IN (?)", bun.Ident("id"), bun.In(notifIDs)). +		Exec(ctx) +	return n.conn.ProcessError(err)  }  func (n *notificationDB) DeleteNotificationsForStatus(ctx context.Context, statusID string) db.Error { -	// Capture notification IDs in a RETURNING statement. -	var ids []string +	var notifIDs []string  	q := n.conn. -		NewDelete(). -		TableExpr("? AS ?", bun.Ident("notifications"), bun.Ident("notification")). -		Where("? = ?", bun.Ident("notification.status_id"), statusID). -		Returning("?", bun.Ident("id")) +		NewSelect(). +		Column("id"). +		Table("notifications"). +		Where("? = ?", bun.Ident("status_id"), statusID) -	if _, err := q.Exec(ctx, &ids); err != nil { +	if _, err := q.Exec(ctx, ¬ifIDs); err != nil {  		return n.conn.ProcessError(err)  	} -	// Invalidate each returned ID. -	for _, id := range ids { -		n.state.Caches.GTS.Notification().Invalidate("ID", id) +	defer func() { +		// Invalidate all IDs on return. +		for _, id := range notifIDs { +			n.state.Caches.GTS.Notification().Invalidate("ID", id) +		} +	}() + +	// Load all notif into cache, this *really* isn't great +	// but it is the only way we can ensure we invalidate all +	// related caches correctly (e.g. visibility). +	for _, id := range notifIDs { +		_, err := n.GetNotificationByID(ctx, id) +		if err != nil && !errors.Is(err, db.ErrNoEntries) { +			return err +		}  	} -	return nil +	// Finally delete all from DB. +	_, err := n.conn.NewDelete(). +		Table("notifications"). +		Where("? IN (?)", bun.Ident("id"), bun.In(notifIDs)). +		Exec(ctx) +	return n.conn.ProcessError(err)  } diff --git a/internal/db/bundb/relationship_block.go b/internal/db/bundb/relationship_block.go index 9232ea984..fa68a2e97 100644 --- a/internal/db/bundb/relationship_block.go +++ b/internal/db/bundb/relationship_block.go @@ -25,7 +25,6 @@ import (  	"github.com/superseriousbusiness/gotosocial/internal/db"  	"github.com/superseriousbusiness/gotosocial/internal/gtscontext"  	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" -	"github.com/superseriousbusiness/gotosocial/internal/log"  	"github.com/uptrace/bun"  ) @@ -142,62 +141,65 @@ func (r *relationshipDB) getBlock(ctx context.Context, lookup string, dbQuery fu  }  func (r *relationshipDB) PutBlock(ctx context.Context, block *gtsmodel.Block) error { -	err := r.state.Caches.GTS.Block().Store(block, func() error { +	return r.state.Caches.GTS.Block().Store(block, func() error {  		_, err := r.conn.NewInsert().Model(block).Exec(ctx)  		return r.conn.ProcessError(err)  	}) -	if err != nil { -		return err -	} - -	// Invalidate block origin account ID cached visibility. -	r.state.Caches.Visibility.Invalidate("ItemID", block.AccountID) -	r.state.Caches.Visibility.Invalidate("RequesterID", block.AccountID) - -	// Invalidate block target account ID cached visibility. -	r.state.Caches.Visibility.Invalidate("ItemID", block.TargetAccountID) -	r.state.Caches.Visibility.Invalidate("RequesterID", block.TargetAccountID) - -	return nil  }  func (r *relationshipDB) DeleteBlockByID(ctx context.Context, id string) error { -	block, err := r.GetBlockByID(gtscontext.SetBarebones(ctx), id) +	defer r.state.Caches.GTS.Block().Invalidate("ID", id) + +	// Load block into cache before attempting a delete, +	// as we need it cached in order to trigger the invalidate +	// callback. This in turn invalidates others. +	_, err := r.GetBlockByID(gtscontext.SetBarebones(ctx), id)  	if err != nil { +		if errors.Is(err, db.ErrNoEntries) { +			// not an issue. +			err = nil +		}  		return err  	} -	return r.deleteBlock(ctx, block) + +	// Finally delete block from DB. +	_, err = r.conn.NewDelete(). +		Table("blocks"). +		Where("? = ?", bun.Ident("id"), id). +		Exec(ctx) +	return r.conn.ProcessError(err)  }  func (r *relationshipDB) DeleteBlockByURI(ctx context.Context, uri string) error { -	block, err := r.GetBlockByURI(gtscontext.SetBarebones(ctx), uri) +	defer r.state.Caches.GTS.Block().Invalidate("URI", uri) + +	// Load block into cache before attempting a delete, +	// as we need it cached in order to trigger the invalidate +	// callback. This in turn invalidates others. +	_, err := r.GetBlockByURI(gtscontext.SetBarebones(ctx), uri)  	if err != nil { +		if errors.Is(err, db.ErrNoEntries) { +			// not an issue. +			err = nil +		}  		return err  	} -	return r.deleteBlock(ctx, block) -} -func (r *relationshipDB) deleteBlock(ctx context.Context, block *gtsmodel.Block) error { -	if _, err := r.conn. -		NewDelete(). +	// Finally delete block from DB. +	_, err = r.conn.NewDelete().  		Table("blocks"). -		Where("? = ?", bun.Ident("id"), block.ID). -		Exec(ctx); err != nil { -		return r.conn.ProcessError(err) -	} - -	// Invalidate block from cache lookups. -	r.state.Caches.GTS.Block().Invalidate("ID", block.ID) - -	return nil +		Where("? = ?", bun.Ident("uri"), uri). +		Exec(ctx) +	return r.conn.ProcessError(err)  }  func (r *relationshipDB) DeleteAccountBlocks(ctx context.Context, accountID string) error {  	var blockIDs []string +	// Get full list of IDs.  	if err := r.conn.NewSelect(). +		Column("id").  		Table("blocks"). -		ColumnExpr("?", bun.Ident("id")).  		WhereOr("? = ? OR ? = ?",  			bun.Ident("account_id"),  			accountID, @@ -208,11 +210,27 @@ func (r *relationshipDB) DeleteAccountBlocks(ctx context.Context, accountID stri  		return r.conn.ProcessError(err)  	} +	defer func() { +		// Invalidate all IDs on return. +		for _, id := range blockIDs { +			r.state.Caches.GTS.Block().Invalidate("ID", id) +		} +	}() + +	// Load all blocks into cache, this *really* isn't great +	// but it is the only way we can ensure we invalidate all +	// related caches correctly (e.g. visibility).  	for _, id := range blockIDs { -		if err := r.DeleteBlockByID(ctx, id); err != nil { -			log.Errorf(ctx, "error deleting block %q: %v", id, err) +		_, err := r.GetBlockByID(ctx, id) +		if err != nil && !errors.Is(err, db.ErrNoEntries) { +			return err  		}  	} -	return nil +	// Finally delete all from DB. +	_, err := r.conn.NewDelete(). +		Table("blocks"). +		Where("? IN (?)", bun.Ident("id"), bun.In(blockIDs)). +		Exec(ctx) +	return r.conn.ProcessError(err)  } diff --git a/internal/db/bundb/relationship_follow.go b/internal/db/bundb/relationship_follow.go index 1b1de77b1..fe1f26bf1 100644 --- a/internal/db/bundb/relationship_follow.go +++ b/internal/db/bundb/relationship_follow.go @@ -171,23 +171,10 @@ func (r *relationshipDB) getFollow(ctx context.Context, lookup string, dbQuery f  }  func (r *relationshipDB) PutFollow(ctx context.Context, follow *gtsmodel.Follow) error { -	err := r.state.Caches.GTS.Follow().Store(follow, func() error { +	return r.state.Caches.GTS.Follow().Store(follow, func() error {  		_, err := r.conn.NewInsert().Model(follow).Exec(ctx)  		return r.conn.ProcessError(err)  	}) -	if err != nil { -		return err -	} - -	// Invalidate follow origin account ID cached visibility. -	r.state.Caches.Visibility.Invalidate("ItemID", follow.AccountID) -	r.state.Caches.Visibility.Invalidate("RequesterID", follow.AccountID) - -	// Invalidate follow target account ID cached visibility. -	r.state.Caches.Visibility.Invalidate("ItemID", follow.TargetAccountID) -	r.state.Caches.Visibility.Invalidate("RequesterID", follow.TargetAccountID) - -	return nil  }  func (r *relationshipDB) UpdateFollow(ctx context.Context, follow *gtsmodel.Follow, columns ...string) error { @@ -211,38 +198,58 @@ func (r *relationshipDB) UpdateFollow(ctx context.Context, follow *gtsmodel.Foll  }  func (r *relationshipDB) DeleteFollowByID(ctx context.Context, id string) error { -	if _, err := r.conn.NewDelete(). -		Table("follows"). -		Where("? = ?", bun.Ident("id"), id). -		Exec(ctx); err != nil { -		return r.conn.ProcessError(err) -	} +	defer r.state.Caches.GTS.Follow().Invalidate("ID", id) -	// Invalidate follow from cache lookups. -	r.state.Caches.GTS.Follow().Invalidate("ID", id) +	// Load follow into cache before attempting a delete, +	// as we need it cached in order to trigger the invalidate +	// callback. This in turn invalidates others. +	_, err := r.GetFollowByID(gtscontext.SetBarebones(ctx), id) +	if err != nil { +		if errors.Is(err, db.ErrNoEntries) { +			// not an issue. +			err = nil +		} +		return err +	} -	return nil +	// Finally delete follow from DB. +	_, err = r.conn.NewDelete(). +		Table("follows"). +		Where("? = ?", bun.Ident("id"), id). +		Exec(ctx) +	return r.conn.ProcessError(err)  }  func (r *relationshipDB) DeleteFollowByURI(ctx context.Context, uri string) error { -	if _, err := r.conn.NewDelete(). -		Table("follows"). -		Where("? = ?", bun.Ident("uri"), uri). -		Exec(ctx); err != nil { -		return r.conn.ProcessError(err) -	} +	defer r.state.Caches.GTS.Follow().Invalidate("URI", uri) -	// Invalidate follow from cache lookups. -	r.state.Caches.GTS.Follow().Invalidate("URI", uri) +	// Load follow into cache before attempting a delete, +	// as we need it cached in order to trigger the invalidate +	// callback. This in turn invalidates others. +	_, err := r.GetFollowByURI(gtscontext.SetBarebones(ctx), uri) +	if err != nil { +		if errors.Is(err, db.ErrNoEntries) { +			// not an issue. +			err = nil +		} +		return err +	} -	return nil +	// Finally delete follow from DB. +	_, err = r.conn.NewDelete(). +		Table("follows"). +		Where("? = ?", bun.Ident("uri"), uri). +		Exec(ctx) +	return r.conn.ProcessError(err)  }  func (r *relationshipDB) DeleteAccountFollows(ctx context.Context, accountID string) error {  	var followIDs []string +	// Get full list of IDs.  	if _, err := r.conn. -		NewDelete(). +		NewSelect(). +		Column("id").  		Table("follows").  		WhereOr("? = ? OR ? = ?",  			bun.Ident("account_id"), @@ -250,15 +257,31 @@ func (r *relationshipDB) DeleteAccountFollows(ctx context.Context, accountID str  			bun.Ident("target_account_id"),  			accountID,  		). -		Returning("?", bun.Ident("id")).  		Exec(ctx, &followIDs); err != nil {  		return r.conn.ProcessError(err)  	} -	// Invalidate each returned ID. +	defer func() { +		// Invalidate all IDs on return. +		for _, id := range followIDs { +			r.state.Caches.GTS.Follow().Invalidate("ID", id) +		} +	}() + +	// Load all follows into cache, this *really* isn't great +	// but it is the only way we can ensure we invalidate all +	// related caches correctly (e.g. visibility).  	for _, id := range followIDs { -		r.state.Caches.GTS.Follow().Invalidate("ID", id) +		_, err := r.GetFollowByID(ctx, id) +		if err != nil && !errors.Is(err, db.ErrNoEntries) { +			return err +		}  	} -	return nil +	// Finally delete all from DB. +	_, err := r.conn.NewDelete(). +		Table("follows"). +		Where("? IN (?)", bun.Ident("id"), bun.In(followIDs)). +		Exec(ctx) +	return r.conn.ProcessError(err)  } diff --git a/internal/db/bundb/relationship_follow_req.go b/internal/db/bundb/relationship_follow_req.go index c0f4bce3b..c2ac3d2f1 100644 --- a/internal/db/bundb/relationship_follow_req.go +++ b/internal/db/bundb/relationship_follow_req.go @@ -149,23 +149,10 @@ func (r *relationshipDB) getFollowRequest(ctx context.Context, lookup string, db  }  func (r *relationshipDB) PutFollowRequest(ctx context.Context, follow *gtsmodel.FollowRequest) error { -	err := r.state.Caches.GTS.FollowRequest().Store(follow, func() error { +	return r.state.Caches.GTS.FollowRequest().Store(follow, func() error {  		_, err := r.conn.NewInsert().Model(follow).Exec(ctx)  		return r.conn.ProcessError(err)  	}) -	if err != nil { -		return err -	} - -	// Invalidate follow request origin account ID cached visibility. -	r.state.Caches.Visibility.Invalidate("ItemID", follow.AccountID) -	r.state.Caches.Visibility.Invalidate("RequesterID", follow.AccountID) - -	// Invalidate follow request target account ID cached visibility. -	r.state.Caches.Visibility.Invalidate("ItemID", follow.TargetAccountID) -	r.state.Caches.Visibility.Invalidate("RequesterID", follow.TargetAccountID) - -	return nil  }  func (r *relationshipDB) UpdateFollowRequest(ctx context.Context, followRequest *gtsmodel.FollowRequest, columns ...string) error { @@ -221,6 +208,9 @@ func (r *relationshipDB) AcceptFollowRequest(ctx context.Context, sourceAccountI  		return nil, err  	} +	// Invalidate follow request from cache lookups on return. +	defer r.state.Caches.GTS.FollowRequest().Invalidate("ID", followReq.ID) +  	// Delete original follow request.  	if _, err := r.conn.  		NewDelete(). @@ -230,9 +220,6 @@ func (r *relationshipDB) AcceptFollowRequest(ctx context.Context, sourceAccountI  		return nil, r.conn.ProcessError(err)  	} -	// Invalidate follow request from cache lookups -	r.state.Caches.GTS.FollowRequest().Invalidate("ID", followReq.ID) -  	// Delete original follow request notification  	if err := r.state.DB.DeleteNotifications(ctx, []string{  		string(gtsmodel.NotificationFollowRequest), @@ -244,15 +231,30 @@ func (r *relationshipDB) AcceptFollowRequest(ctx context.Context, sourceAccountI  }  func (r *relationshipDB) RejectFollowRequest(ctx context.Context, sourceAccountID string, targetAccountID string) db.Error { -	// Get original follow request. -	followReq, err := r.GetFollowRequest(ctx, sourceAccountID, targetAccountID) +	defer r.state.Caches.GTS.FollowRequest().Invalidate("AccountID.TargetAccountID", sourceAccountID, targetAccountID) + +	// Load followreq into cache before attempting a delete, +	// as we need it cached in order to trigger the invalidate +	// callback. This in turn invalidates others. +	_, err := r.GetFollowRequest(gtscontext.SetBarebones(ctx), +		sourceAccountID, +		targetAccountID, +	)  	if err != nil {  		return err  	} -	// Delete original follow request. -	if err := r.DeleteFollowRequestByID(ctx, followReq.ID); err != nil { -		return err +	// Attempt to delete follow request. +	if _, err = r.conn.NewDelete(). +		Table("follow_requests"). +		Where("? = ? AND ? = ?", +			bun.Ident("account_id"), +			sourceAccountID, +			bun.Ident("target_account_id"), +			targetAccountID, +		). +		Exec(ctx); err != nil { +		return r.conn.ProcessError(err)  	}  	// Delete original follow request notification @@ -262,54 +264,90 @@ func (r *relationshipDB) RejectFollowRequest(ctx context.Context, sourceAccountI  }  func (r *relationshipDB) DeleteFollowRequestByID(ctx context.Context, id string) error { -	if _, err := r.conn.NewDelete(). -		Table("follow_requests"). -		Where("? = ?", bun.Ident("id"), id). -		Exec(ctx); err != nil { -		return r.conn.ProcessError(err) -	} +	defer r.state.Caches.GTS.FollowRequest().Invalidate("ID", id) -	// Invalidate follow request from cache lookups. -	r.state.Caches.GTS.FollowRequest().Invalidate("ID", id) +	// Load followreq into cache before attempting a delete, +	// as we need it cached in order to trigger the invalidate +	// callback. This in turn invalidates others. +	_, err := r.GetFollowRequestByID(gtscontext.SetBarebones(ctx), id) +	if err != nil { +		if errors.Is(err, db.ErrNoEntries) { +			// not an issue. +			err = nil +		} +		return err +	} -	return nil +	// Finally delete followreq from DB. +	_, err = r.conn.NewDelete(). +		Table("follow_requests"). +		Where("? = ?", bun.Ident("id"), id). +		Exec(ctx) +	return r.conn.ProcessError(err)  }  func (r *relationshipDB) DeleteFollowRequestByURI(ctx context.Context, uri string) error { -	if _, err := r.conn.NewDelete(). -		Table("follow_requests"). -		Where("? = ?", bun.Ident("uri"), uri). -		Exec(ctx); err != nil { -		return r.conn.ProcessError(err) -	} +	defer r.state.Caches.GTS.FollowRequest().Invalidate("URI", uri) -	// Invalidate follow request from cache lookups. -	r.state.Caches.GTS.FollowRequest().Invalidate("URI", uri) +	// Load followreq into cache before attempting a delete, +	// as we need it cached in order to trigger the invalidate +	// callback. This in turn invalidates others. +	_, err := r.GetFollowRequestByURI(gtscontext.SetBarebones(ctx), uri) +	if err != nil { +		if errors.Is(err, db.ErrNoEntries) { +			// not an issue. +			err = nil +		} +		return err +	} -	return nil +	// Finally delete followreq from DB. +	_, err = r.conn.NewDelete(). +		Table("follow_requests"). +		Where("? = ?", bun.Ident("uri"), uri). +		Exec(ctx) +	return r.conn.ProcessError(err)  }  func (r *relationshipDB) DeleteAccountFollowRequests(ctx context.Context, accountID string) error { -	var followIDs []string +	var followReqIDs []string +	// Get full list of IDs.  	if _, err := r.conn. -		NewDelete(). -		Table("follow_requests"). +		NewSelect(). +		Column("id"). +		Table("follow_requestss").  		WhereOr("? = ? OR ? = ?",  			bun.Ident("account_id"),  			accountID,  			bun.Ident("target_account_id"),  			accountID,  		). -		Returning("?", bun.Ident("id")). -		Exec(ctx, &followIDs); err != nil { +		Exec(ctx, &followReqIDs); err != nil {  		return r.conn.ProcessError(err)  	} -	// Invalidate each returned ID. -	for _, id := range followIDs { -		r.state.Caches.GTS.FollowRequest().Invalidate("ID", id) +	defer func() { +		// Invalidate all IDs on return. +		for _, id := range followReqIDs { +			r.state.Caches.GTS.FollowRequest().Invalidate("ID", id) +		} +	}() + +	// Load all followreqs into cache, this *really* isn't +	// great but it is the only way we can ensure we invalidate +	// all related caches correctly (e.g. visibility). +	for _, id := range followReqIDs { +		_, err := r.GetFollowRequestByID(ctx, id) +		if err != nil && !errors.Is(err, db.ErrNoEntries) { +			return err +		}  	} -	return nil +	// Finally delete all from DB. +	_, err := r.conn.NewDelete(). +		Table("follow_requests"). +		Where("? IN (?)", bun.Ident("id"), bun.In(followReqIDs)). +		Exec(ctx) +	return r.conn.ProcessError(err)  } diff --git a/internal/db/bundb/report.go b/internal/db/bundb/report.go index 17e1348b9..e017a8906 100644 --- a/internal/db/bundb/report.go +++ b/internal/db/bundb/report.go @@ -19,10 +19,12 @@ package bundb  import (  	"context" +	"errors"  	"fmt"  	"time"  	"github.com/superseriousbusiness/gotosocial/internal/db" +	"github.com/superseriousbusiness/gotosocial/internal/gtscontext"  	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"  	"github.com/superseriousbusiness/gotosocial/internal/log"  	"github.com/superseriousbusiness/gotosocial/internal/state" @@ -192,14 +194,24 @@ func (r *reportDB) UpdateReport(ctx context.Context, report *gtsmodel.Report, co  }  func (r *reportDB) DeleteReportByID(ctx context.Context, id string) db.Error { -	if _, err := r.conn. -		NewDelete(). -		TableExpr("? AS ?", bun.Ident("reports"), bun.Ident("report")). -		Where("? = ?", bun.Ident("report.id"), id). -		Exec(ctx); err != nil { -		return r.conn.ProcessError(err) +	defer r.state.Caches.GTS.Report().Invalidate("ID", id) + +	// Load status into cache before attempting a delete, +	// as we need it cached in order to trigger the invalidate +	// callback. This in turn invalidates others. +	_, err := r.GetReportByID(gtscontext.SetBarebones(ctx), id) +	if err != nil { +		if errors.Is(err, db.ErrNoEntries) { +			// not an issue. +			err = nil +		} +		return err  	} -	r.state.Caches.GTS.Report().Invalidate("ID", id) -	return nil +	// Finally delete report from DB. +	_, err = r.conn.NewDelete(). +		TableExpr("? AS ?", bun.Ident("reports"), bun.Ident("report")). +		Where("? = ?", bun.Ident("report.id"), id). +		Exec(ctx) +	return r.conn.ProcessError(err)  } diff --git a/internal/db/bundb/status.go b/internal/db/bundb/status.go index 49d4f938b..0dffbabcc 100644 --- a/internal/db/bundb/status.go +++ b/internal/db/bundb/status.go @@ -244,7 +244,7 @@ func (s *statusDB) PopulateStatus(ctx context.Context, status *gtsmodel.Status)  }  func (s *statusDB) PutStatus(ctx context.Context, status *gtsmodel.Status) db.Error { -	err := s.state.Caches.GTS.Status().Store(status, func() error { +	return s.state.Caches.GTS.Status().Store(status, func() error {  		// It is safe to run this database transaction within cache.Store  		// as the cache does not attempt a mutex lock until AFTER hook.  		// @@ -304,21 +304,6 @@ func (s *statusDB) PutStatus(ctx context.Context, status *gtsmodel.Status) db.Er  			return err  		})  	}) -	if err != nil { -		return err -	} - -	for _, id := range status.AttachmentIDs { -		// Invalidate media attachments from cache. -		// -		// NOTE: this is needed due to the way in which -		// we upload status attachments, and only after -		// update them with a known status ID. This is -		// not the case for header/avatar attachments. -		s.state.Caches.GTS.Media().Invalidate("ID", id) -	} - -	return nil  }  func (s *statusDB) UpdateStatus(ctx context.Context, status *gtsmodel.Status, columns ...string) db.Error { @@ -328,88 +313,91 @@ func (s *statusDB) UpdateStatus(ctx context.Context, status *gtsmodel.Status, co  		columns = append(columns, "updated_at")  	} -	if err := s.conn.RunInTx(ctx, func(tx bun.Tx) error { -		// create links between this status and any emojis it uses -		for _, i := range status.EmojiIDs { -			if _, err := tx. -				NewInsert(). -				Model(>smodel.StatusToEmoji{ -					StatusID: status.ID, -					EmojiID:  i, -				}). -				On("CONFLICT (?, ?) DO NOTHING", bun.Ident("status_id"), bun.Ident("emoji_id")). -				Exec(ctx); err != nil { -				err = s.conn.ProcessError(err) -				if !errors.Is(err, db.ErrAlreadyExists) { -					return err +	return s.state.Caches.GTS.Status().Store(status, func() error { +		// It is safe to run this database transaction within cache.Store +		// as the cache does not attempt a mutex lock until AFTER hook. +		// +		return s.conn.RunInTx(ctx, func(tx bun.Tx) error { +			// create links between this status and any emojis it uses +			for _, i := range status.EmojiIDs { +				if _, err := tx. +					NewInsert(). +					Model(>smodel.StatusToEmoji{ +						StatusID: status.ID, +						EmojiID:  i, +					}). +					On("CONFLICT (?, ?) DO NOTHING", bun.Ident("status_id"), bun.Ident("emoji_id")). +					Exec(ctx); err != nil { +					err = s.conn.ProcessError(err) +					if !errors.Is(err, db.ErrAlreadyExists) { +						return err +					}  				}  			} -		} -		// create links between this status and any tags it uses -		for _, i := range status.TagIDs { -			if _, err := tx. -				NewInsert(). -				Model(>smodel.StatusToTag{ -					StatusID: status.ID, -					TagID:    i, -				}). -				On("CONFLICT (?, ?) DO NOTHING", bun.Ident("status_id"), bun.Ident("tag_id")). -				Exec(ctx); err != nil { -				err = s.conn.ProcessError(err) -				if !errors.Is(err, db.ErrAlreadyExists) { -					return err +			// create links between this status and any tags it uses +			for _, i := range status.TagIDs { +				if _, err := tx. +					NewInsert(). +					Model(>smodel.StatusToTag{ +						StatusID: status.ID, +						TagID:    i, +					}). +					On("CONFLICT (?, ?) DO NOTHING", bun.Ident("status_id"), bun.Ident("tag_id")). +					Exec(ctx); err != nil { +					err = s.conn.ProcessError(err) +					if !errors.Is(err, db.ErrAlreadyExists) { +						return err +					}  				}  			} -		} -		// change the status ID of the media attachments to the new status -		for _, a := range status.Attachments { -			a.StatusID = status.ID -			a.UpdatedAt = time.Now() -			if _, err := tx. -				NewUpdate(). -				Model(a). -				Where("? = ?", bun.Ident("media_attachment.id"), a.ID). -				Exec(ctx); err != nil { -				err = s.conn.ProcessError(err) -				if !errors.Is(err, db.ErrAlreadyExists) { -					return err +			// change the status ID of the media attachments to the new status +			for _, a := range status.Attachments { +				a.StatusID = status.ID +				a.UpdatedAt = time.Now() +				if _, err := tx. +					NewUpdate(). +					Model(a). +					Where("? = ?", bun.Ident("media_attachment.id"), a.ID). +					Exec(ctx); err != nil { +					err = s.conn.ProcessError(err) +					if !errors.Is(err, db.ErrAlreadyExists) { +						return err +					}  				}  			} -		} -		// Finally, update the status -		_, err := tx. -			NewUpdate(). -			Model(status). -			Column(columns...). -			Where("? = ?", bun.Ident("status.id"), status.ID). -			Exec(ctx) -		return err -	}); err != nil { -		// already processed -		return err -	} +			// Finally, update the status +			_, err := tx. +				NewUpdate(). +				Model(status). +				Column(columns...). +				Where("? = ?", bun.Ident("status.id"), status.ID). +				Exec(ctx) +			return err +		}) +	}) +} -	// Invalidate status from database lookups. -	s.state.Caches.GTS.Status().Invalidate("ID", status.ID) +func (s *statusDB) DeleteStatusByID(ctx context.Context, id string) db.Error { +	defer s.state.Caches.GTS.Status().Invalidate("ID", id) -	for _, id := range status.AttachmentIDs { -		// Invalidate media attachments from cache. -		// -		// NOTE: this is needed due to the way in which -		// we upload status attachments, and only after -		// update them with a known status ID. This is -		// not the case for header/avatar attachments. -		s.state.Caches.GTS.Media().Invalidate("ID", id) +	// Load status into cache before attempting a delete, +	// as we need it cached in order to trigger the invalidate +	// callback. This in turn invalidates others. +	_, err := s.GetStatusByID( +		gtscontext.SetBarebones(ctx), +		id, +	) +	if err != nil && !errors.Is(err, db.ErrNoEntries) { +		// NOTE: even if db.ErrNoEntries is returned, we +		// still run the below transaction to ensure related +		// objects are appropriately deleted. +		return err  	} -	return nil -} - -func (s *statusDB) DeleteStatusByID(ctx context.Context, id string) db.Error { -	if err := s.conn.RunInTx(ctx, func(tx bun.Tx) error { +	return s.conn.RunInTx(ctx, func(tx bun.Tx) error {  		// delete links between this status and any emojis it uses  		if _, err := tx.  			NewDelete(). @@ -438,17 +426,7 @@ func (s *statusDB) DeleteStatusByID(ctx context.Context, id string) db.Error {  		}  		return nil -	}); err != nil { -		return err -	} - -	// Invalidate status from database lookups. -	s.state.Caches.GTS.Status().Invalidate("ID", id) - -	// Invalidate status from all visibility lookups. -	s.state.Caches.Visibility.Invalidate("ItemID", id) - -	return nil +	})  }  func (s *statusDB) GetStatusParents(ctx context.Context, status *gtsmodel.Status, onlyDirect bool) ([]*gtsmodel.Status, db.Error) { diff --git a/internal/db/bundb/statusfave.go b/internal/db/bundb/statusfave.go index 0f7e5df74..497262530 100644 --- a/internal/db/bundb/statusfave.go +++ b/internal/db/bundb/statusfave.go @@ -156,16 +156,26 @@ func (s *statusFaveDB) PutStatusFave(ctx context.Context, fave *gtsmodel.StatusF  }  func (s *statusFaveDB) DeleteStatusFaveByID(ctx context.Context, id string) db.Error { -	if _, err := s.conn. -		NewDelete(). -		Table("status_faves"). -		Where("? = ?", bun.Ident("id"), id). -		Exec(ctx); err != nil { -		return s.conn.ProcessError(err) +	defer s.state.Caches.GTS.StatusFave().Invalidate("ID", id) + +	// Load fave into cache before attempting a delete, +	// as we need it cached in order to trigger the invalidate +	// callback. This in turn invalidates others. +	_, err := s.GetStatusFaveByID(gtscontext.SetBarebones(ctx), id) +	if err != nil { +		if errors.Is(err, db.ErrNoEntries) { +			// not an issue. +			err = nil +		} +		return err  	} -	s.state.Caches.GTS.StatusFave().Invalidate("ID", id) -	return nil +	// Finally delete fave from DB. +	_, err = s.conn.NewDelete(). +		Table("status_faves"). +		Where("? = ?", bun.Ident("id"), id). +		Exec(ctx) +	return s.conn.ProcessError(err)  }  func (s *statusFaveDB) DeleteStatusFaves(ctx context.Context, targetAccountID string, originAccountID string) db.Error { @@ -173,13 +183,12 @@ func (s *statusFaveDB) DeleteStatusFaves(ctx context.Context, targetAccountID st  		return errors.New("DeleteStatusFaves: one of targetAccountID or originAccountID must be set")  	} -	// Capture fave IDs in a RETURNING statement.  	var faveIDs []string  	q := s.conn. -		NewDelete(). -		Table("status_faves"). -		Returning("?", bun.Ident("id")) +		NewSelect(). +		Column("id"). +		Table("status_faves")  	if targetAccountID != "" {  		q = q.Where("? = ?", bun.Ident("target_account_id"), targetAccountID) @@ -193,12 +202,29 @@ func (s *statusFaveDB) DeleteStatusFaves(ctx context.Context, targetAccountID st  		return s.conn.ProcessError(err)  	} +	defer func() { +		// Invalidate all IDs on return. +		for _, id := range faveIDs { +			s.state.Caches.GTS.StatusFave().Invalidate("ID", id) +		} +	}() + +	// Load all faves into cache, this *really* isn't great +	// but it is the only way we can ensure we invalidate all +	// related caches correctly (e.g. visibility).  	for _, id := range faveIDs { -		// Invalidate each of the returned status fave IDs. -		s.state.Caches.GTS.StatusFave().Invalidate("ID", id) +		_, err := s.GetStatusFaveByID(ctx, id) +		if err != nil && !errors.Is(err, db.ErrNoEntries) { +			return err +		}  	} -	return nil +	// Finally delete all from DB. +	_, err := s.conn.NewDelete(). +		Table("status_faves"). +		Where("? IN (?)", bun.Ident("id"), bun.In(faveIDs)). +		Exec(ctx) +	return s.conn.ProcessError(err)  }  func (s *statusFaveDB) DeleteStatusFavesForStatus(ctx context.Context, statusID string) db.Error { @@ -206,19 +232,35 @@ func (s *statusFaveDB) DeleteStatusFavesForStatus(ctx context.Context, statusID  	var faveIDs []string  	q := s.conn. -		NewDelete(). +		NewSelect(). +		Column("id").  		Table("status_faves"). -		Where("? = ?", bun.Ident("status_id"), statusID). -		Returning("?", bun.Ident("id")) - +		Where("? = ?", bun.Ident("status_id"), statusID)  	if _, err := q.Exec(ctx, &faveIDs); err != nil {  		return s.conn.ProcessError(err)  	} +	defer func() { +		// Invalidate all IDs on return. +		for _, id := range faveIDs { +			s.state.Caches.GTS.StatusFave().Invalidate("ID", id) +		} +	}() + +	// Load all faves into cache, this *really* isn't great +	// but it is the only way we can ensure we invalidate all +	// related caches correctly (e.g. visibility).  	for _, id := range faveIDs { -		// Invalidate each of the returned status fave IDs. -		s.state.Caches.GTS.StatusFave().Invalidate("ID", id) +		_, err := s.GetStatusFaveByID(ctx, id) +		if err != nil && !errors.Is(err, db.ErrNoEntries) { +			return err +		}  	} -	return nil +	// Finally delete all from DB. +	_, err := s.conn.NewDelete(). +		Table("status_faves"). +		Where("? IN (?)", bun.Ident("id"), bun.In(faveIDs)). +		Exec(ctx) +	return s.conn.ProcessError(err)  } diff --git a/internal/db/bundb/tombstone.go b/internal/db/bundb/tombstone.go index 393b2e356..668bde1af 100644 --- a/internal/db/bundb/tombstone.go +++ b/internal/db/bundb/tombstone.go @@ -67,16 +67,12 @@ func (t *tombstoneDB) PutTombstone(ctx context.Context, tombstone *gtsmodel.Tomb  }  func (t *tombstoneDB) DeleteTombstone(ctx context.Context, id string) db.Error { -	if _, err := t.conn. -		NewDelete(). +	defer t.state.Caches.GTS.Tombstone().Invalidate("ID", id) + +	// Delete tombstone from DB. +	_, err := t.conn.NewDelete().  		TableExpr("? AS ?", bun.Ident("tombstones"), bun.Ident("tombstone")).  		Where("? = ?", bun.Ident("tombstone.id"), id). -		Exec(ctx); err != nil { -		return t.conn.ProcessError(err) -	} - -	// Invalidate from cache by ID -	t.state.Caches.GTS.Tombstone().Invalidate("ID", id) - -	return nil +		Exec(ctx) +	return t.conn.ProcessError(err)  } diff --git a/internal/db/bundb/user.go b/internal/db/bundb/user.go index b5dae1573..c2ea5a67d 100644 --- a/internal/db/bundb/user.go +++ b/internal/db/bundb/user.go @@ -19,9 +19,11 @@ package bundb  import (  	"context" +	"errors"  	"time"  	"github.com/superseriousbusiness/gotosocial/internal/db" +	"github.com/superseriousbusiness/gotosocial/internal/gtscontext"  	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"  	"github.com/superseriousbusiness/gotosocial/internal/state"  	"github.com/uptrace/bun" @@ -155,32 +157,36 @@ func (u *userDB) UpdateUser(ctx context.Context, user *gtsmodel.User, columns ..  		columns = append(columns, "updated_at")  	} -	// Update the user in DB -	_, err := u.conn. -		NewUpdate(). -		Model(user). -		Where("? = ?", bun.Ident("user.id"), user.ID). -		Column(columns...). -		Exec(ctx) -	if err != nil { +	return u.state.Caches.GTS.User().Store(user, func() error { +		_, err := u.conn. +			NewUpdate(). +			Model(user). +			Where("? = ?", bun.Ident("user.id"), user.ID). +			Column(columns...). +			Exec(ctx)  		return u.conn.ProcessError(err) -	} - -	// Invalidate user from cache -	u.state.Caches.GTS.User().Invalidate("ID", user.ID) -	return nil +	})  }  func (u *userDB) DeleteUserByID(ctx context.Context, userID string) db.Error { -	if _, err := u.conn. -		NewDelete(). -		TableExpr("? AS ?", bun.Ident("users"), bun.Ident("user")). -		Where("? = ?", bun.Ident("user.id"), userID). -		Exec(ctx); err != nil { -		return u.conn.ProcessError(err) +	defer u.state.Caches.GTS.User().Invalidate("ID", userID) + +	// Load user into cache before attempting a delete, +	// as we need it cached in order to trigger the invalidate +	// callback. This in turn invalidates others. +	_, err := u.GetUserByID(gtscontext.SetBarebones(ctx), userID) +	if err != nil { +		if errors.Is(err, db.ErrNoEntries) { +			// not an issue. +			err = nil +		} +		return err  	} -	// Invalidate user from cache -	u.state.Caches.GTS.User().Invalidate("ID", userID) -	return nil +	// Finally delete user from DB. +	_, err = u.conn.NewDelete(). +		TableExpr("? AS ?", bun.Ident("users"), bun.Ident("user")). +		Where("? = ?", bun.Ident("user.id"), userID). +		Exec(ctx) +	return u.conn.ProcessError(err)  } diff --git a/internal/federation/dereferencing/account.go b/internal/federation/dereferencing/account.go index b33f1d6fa..764dc1539 100644 --- a/internal/federation/dereferencing/account.go +++ b/internal/federation/dereferencing/account.go @@ -38,7 +38,52 @@ import (  	"github.com/superseriousbusiness/gotosocial/internal/transport"  ) -func (d *deref) GetAccountByURI(ctx context.Context, requestUser string, uri *url.URL) (*gtsmodel.Account, error) { +// accountUpToDate returns whether the given account model is both updateable (i.e. +// non-instance remote account) and whether it needs an update based on `fetched_at`. +func accountUpToDate(account *gtsmodel.Account) bool { +	if account.IsLocal() { +		// Can't update local accounts. +		return true +	} + +	if !account.CreatedAt.IsZero() && account.IsInstance() { +		// Existing instance account. No need for update. +		return true +	} + +	// If this account was updated recently (last interval), we return as-is. +	if next := account.FetchedAt.Add(6 * time.Hour); time.Now().Before(next) { +		return true +	} + +	return false +} + +// GetAccountByURI: implements Dereferencer{}.GetAccountByURI. +func (d *deref) GetAccountByURI(ctx context.Context, requestUser string, uri *url.URL) (*gtsmodel.Account, ap.Accountable, error) { +	// Fetch and dereference account if necessary. +	account, apubAcc, err := d.getAccountByURI(ctx, +		requestUser, +		uri, +	) +	if err != nil { +		return nil, nil, err +	} + +	if apubAcc != nil { +		// This account was updated, enqueue re-dereference featured posts. +		d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { +			if err := d.dereferenceAccountFeatured(ctx, requestUser, account); err != nil { +				log.Errorf(ctx, "error fetching account featured collection: %v", err) +			} +		}) +	} + +	return account, apubAcc, nil +} + +// getAccountByURI is a package internal form of .GetAccountByURI() that doesn't bother dereferencing featured posts on update. +func (d *deref) getAccountByURI(ctx context.Context, requestUser string, uri *url.URL) (*gtsmodel.Account, ap.Accountable, error) {  	var (  		account *gtsmodel.Account  		uriStr  = uri.String() @@ -46,23 +91,23 @@ func (d *deref) GetAccountByURI(ctx context.Context, requestUser string, uri *ur  	)  	// Search the database for existing account with ID URI. -	account, err = d.db.GetAccountByURI(ctx, uriStr) +	account, err = d.state.DB.GetAccountByURI(ctx, uriStr)  	if err != nil && !errors.Is(err, db.ErrNoEntries) { -		return nil, fmt.Errorf("GetAccountByURI: error checking database for account %s by uri: %w", uriStr, err) +		return nil, nil, fmt.Errorf("GetAccountByURI: error checking database for account %s by uri: %w", uriStr, err)  	}  	if account == nil {  		// Else, search the database for existing by ID URL. -		account, err = d.db.GetAccountByURL(ctx, uriStr) +		account, err = d.state.DB.GetAccountByURL(ctx, uriStr)  		if err != nil && !errors.Is(err, db.ErrNoEntries) { -			return nil, fmt.Errorf("GetAccountByURI: error checking database for account %s by url: %w", uriStr, err) +			return nil, nil, fmt.Errorf("GetAccountByURI: error checking database for account %s by url: %w", uriStr, err)  		}  	}  	if account == nil {  		// Ensure that this is isn't a search for a local account.  		if uri.Host == config.GetHost() || uri.Host == config.GetAccountDomain() { -			return nil, NewErrNotRetrievable(err) // this will be db.ErrNoEntries +			return nil, nil, NewErrNotRetrievable(err) // this will be db.ErrNoEntries  		}  		// Create and pass-through a new bare-bones model for dereferencing. @@ -70,163 +115,193 @@ func (d *deref) GetAccountByURI(ctx context.Context, requestUser string, uri *ur  			ID:     id.NewULID(),  			Domain: uri.Host,  			URI:    uriStr, -		}, d.defaultFetchLatest, false) +		}, nil) +	} + +	// Check whether needs update. +	if accountUpToDate(account) { +		return account, nil, nil  	} -	// Try to update existing account model -	enriched, err := d.enrichAccount(ctx, requestUser, uri, account, d.defaultFetchLatest, false) +	// Try to update existing account model. +	latest, apubAcc, err := d.enrichAccount(ctx, +		requestUser, +		uri, +		account, +		nil, +	)  	if err != nil {  		log.Errorf(ctx, "error enriching remote account: %v", err) -		return account, nil // fall back to returning existing + +		// Update fetch-at to slow re-attempts. +		account.FetchedAt = time.Now() +		_ = d.state.DB.UpdateAccount(ctx, account, "fetched_at") + +		// Fallback to existing. +		return account, nil, nil  	} -	return enriched, nil +	return latest, apubAcc, nil  } -func (d *deref) GetAccountByUsernameDomain(ctx context.Context, requestUser string, username string, domain string) (*gtsmodel.Account, error) { +// GetAccountByUsernameDomain: implements Dereferencer{}.GetAccountByUsernameDomain. +func (d *deref) GetAccountByUsernameDomain(ctx context.Context, requestUser string, username string, domain string) (*gtsmodel.Account, ap.Accountable, error) {  	if domain == config.GetHost() || domain == config.GetAccountDomain() {  		// We do local lookups using an empty domain,  		// else it will fail the db search below.  		domain = ""  	} -	// Search the database for existing account with USERNAME@DOMAIN -	account, err := d.db.GetAccountByUsernameDomain(ctx, username, domain) +	// Search the database for existing account with USERNAME@DOMAIN. +	account, err := d.state.DB.GetAccountByUsernameDomain(ctx, username, domain)  	if err != nil && !errors.Is(err, db.ErrNoEntries) { -		return nil, fmt.Errorf("GetAccountByUsernameDomain: error checking database for account %s@%s: %w", username, domain, err) +		return nil, nil, fmt.Errorf("GetAccountByUsernameDomain: error checking database for account %s@%s: %w", username, domain, err)  	}  	if account == nil { -		// Check for failed local lookup.  		if domain == "" { -			return nil, NewErrNotRetrievable(err) // wrapped err will be db.ErrNoEntries +			// failed local lookup, will be db.ErrNoEntries. +			return nil, nil, NewErrNotRetrievable(err)  		}  		// Create and pass-through a new bare-bones model for dereferencing. -		account = >smodel.Account{ +		account, apubAcc, err := d.enrichAccount(ctx, requestUser, nil, >smodel.Account{  			ID:       id.NewULID(),  			Username: username,  			Domain:   domain, +		}, nil) +		if err != nil { +			return nil, nil, err  		} -		// There's no known account to fall back on, -		// so return error if we can't enrich account. -		return d.enrichAccount(ctx, requestUser, nil, account, d.defaultFetchLatest, false) +		// This account was updated, enqueue dereference featured posts. +		d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { +			if err := d.dereferenceAccountFeatured(ctx, requestUser, account); err != nil { +				log.Errorf(ctx, "error fetching account featured collection: %v", err) +			} +		}) + +		return account, apubAcc, nil  	} -	// We knew about this account already; -	// try to update existing account model. -	enriched, err := d.enrichAccount(ctx, requestUser, nil, account, d.defaultFetchLatest, false) +	// Try to update existing account model. +	latest, apubAcc, err := d.RefreshAccount(ctx, +		requestUser, +		account, +		nil, +		false, +	)  	if err != nil { -		log.Errorf(ctx, "error enriching account from remote: %v", err) -		return account, nil // fall back to returning unchanged existing account model +		// Fallback to existing. +		return account, nil, nil //nolint  	} -	return enriched, nil +	return latest, apubAcc, nil  } -func (d *deref) RefreshAccount(ctx context.Context, requestUser string, accountable ap.Accountable, account *gtsmodel.Account) (*gtsmodel.Account, error) { -	// To avoid unnecessarily refetching multiple times from remote, -	// we can just pass in the Accountable object that we received, -	// if it was defined. If not, fall back to default fetch func. -	var f fetchLatest -	if accountable != nil { -		f = func( -			_ context.Context, -			_ transport.Transport, -			_ *url.URL, -			_ string, -		) (ap.Accountable, *gtsmodel.Account, error) { -			return accountable, account, nil -		} -	} else { -		f = d.defaultFetchLatest +// RefreshAccount: implements Dereferencer{}.RefreshAccount. +func (d *deref) RefreshAccount(ctx context.Context, requestUser string, account *gtsmodel.Account, apubAcc ap.Accountable, force bool) (*gtsmodel.Account, ap.Accountable, error) { +	// Check whether needs update (and not forced). +	if accountUpToDate(account) && !force { +		return account, nil, nil  	} -	// Set 'force' to 'true' to always fetch latest media etc. -	return d.enrichAccount(ctx, requestUser, nil, account, f, true) -} - -// fetchLatest defines a function for using a transport and uri to fetch the fetchLatest -// version of an account (and its AP representation) from a remote instance. -type fetchLatest func(ctx context.Context, transport transport.Transport, uri *url.URL, accountDomain string) (ap.Accountable, *gtsmodel.Account, error) - -// defaultFetchLatest deduplicates latest fetching code that is used in several -// different functions. It simply calls the remote uri using the given transport, -// parses a returned AP representation into an account, and then returns both. -func (d *deref) defaultFetchLatest(ctx context.Context, transport transport.Transport, uri *url.URL, accountDomain string) (ap.Accountable, *gtsmodel.Account, error) { -	// Dereference this account to get the latest available. -	apubAcc, err := d.dereferenceAccountable(ctx, transport, uri) +	// Parse the URI from account. +	uri, err := url.Parse(account.URI)  	if err != nil { -		return nil, nil, fmt.Errorf("error dereferencing account %s: %w", uri, err) +		return nil, nil, fmt.Errorf("RefreshAccount: invalid account uri %q: %w", account.URI, err)  	} -	// Convert the dereferenced AP account object to our GTS model. -	latestAcc, err := d.typeConverter.ASRepresentationToAccount( -		ctx, apubAcc, accountDomain, +	// Try to update + deref existing account model. +	latest, apubAcc, err := d.enrichAccount(ctx, +		requestUser, +		uri, +		account, +		apubAcc,  	)  	if err != nil { -		return nil, nil, fmt.Errorf("error converting accountable to gts model for account %s: %w", uri, err) +		log.Errorf(ctx, "error enriching remote account: %v", err) + +		// Update fetch-at to slow re-attempts. +		account.FetchedAt = time.Now() +		_ = d.state.DB.UpdateAccount(ctx, account, "fetched_at") + +		return nil, nil, err  	} -	return apubAcc, latestAcc, nil +	// This account was updated, enqueue re-dereference featured posts. +	d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { +		if err := d.dereferenceAccountFeatured(ctx, requestUser, account); err != nil { +			log.Errorf(ctx, "error fetching account featured collection: %v", err) +		} +	}) + +	return latest, apubAcc, nil  } -// enrichAccount will ensure the given account is the most up-to-date model of the account, re-webfingering and re-dereferencing if necessary. -func (d *deref) enrichAccount( -	ctx context.Context, -	requestUser string, -	uri *url.URL, -	account *gtsmodel.Account, -	f fetchLatest, -	force bool, -) (*gtsmodel.Account, error) { -	if account.IsLocal() { -		// Can't update local accounts. -		return account, nil +// RefreshAccountAsync: implements Dereferencer{}.RefreshAccountAsync. +func (d *deref) RefreshAccountAsync(ctx context.Context, requestUser string, account *gtsmodel.Account, apubAcc ap.Accountable, force bool) { +	// Check whether needs update (and not forced). +	if accountUpToDate(account) && !force { +		return  	} -	if !account.CreatedAt.IsZero() && account.IsInstance() { -		// Existing instance account. No need for update. -		return account, nil +	// Parse the URI from account. +	uri, err := url.Parse(account.URI) +	if err != nil { +		log.Errorf(ctx, "RefreshAccountAsync: invalid account uri %q: %v", account.URI, err) +		return  	} -	if !force { -		const interval = time.Hour * 48 +	// Enqueue a worker function to enrich this account async. +	d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { +		latest, _, err := d.enrichAccount(ctx, requestUser, uri, account, apubAcc) +		if err != nil { +			log.Errorf(ctx, "error enriching remote account: %v", err) +			return +		} -		// If this account was updated recently (last interval), we return as-is. -		if next := account.FetchedAt.Add(interval); time.Now().Before(next) { -			return account, nil +		// This account was updated, re-dereference account featured posts. +		if err := d.dereferenceAccountFeatured(ctx, requestUser, latest); err != nil { +			log.Errorf(ctx, "error fetching account featured collection: %v", err)  		} -	} +	}) +} +// enrichAccount will enrich the given account, whether a new barebones model, or existing model from the database. It handles necessary dereferencing, webfingering etc. +func (d *deref) enrichAccount(ctx context.Context, requestUser string, uri *url.URL, account *gtsmodel.Account, apubAcc ap.Accountable) (*gtsmodel.Account, ap.Accountable, error) {  	// Pre-fetch a transport for requesting username, used by later deref procedures. -	transport, err := d.transportController.NewTransportForUsername(ctx, requestUser) +	tsport, err := d.transportController.NewTransportForUsername(ctx, requestUser)  	if err != nil { -		return nil, fmt.Errorf("enrichAccount: couldn't create transport: %w", err) +		return nil, nil, fmt.Errorf("enrichAccount: couldn't create transport: %w", err)  	}  	if account.Username != "" {  		// A username was provided so we can attempt a webfinger, this ensures up-to-date accountdomain info. -		accDomain, accURI, err := d.fingerRemoteAccount(ctx, transport, account.Username, account.Domain) - -		switch { -		case err != nil && account.URI == "": -			// this is a new account (to us) with username@domain but failed webfinger, nothing more we can do. -			return nil, fmt.Errorf("enrichAccount: error webfingering account: %w", err) +		accDomain, accURI, err := d.fingerRemoteAccount(ctx, tsport, account.Username, account.Domain) +		if err != nil { +			if account.URI == "" { +				// this is a new account (to us) with username@domain but failed webfinger, nothing more we can do. +				return nil, nil, &ErrNotRetrievable{fmt.Errorf("enrichAccount: error webfingering account: %w", err)} +			} -		case err != nil: +			// Simply log this error and move on, we already have an account URI.  			log.Errorf(ctx, "error webfingering[1] remote account %s@%s: %v", account.Username, account.Domain, err) +		} -		case err == nil: +		if err == nil {  			if account.Domain != accDomain { +				// Domain has changed, assume the activitypub +				// account data provided may not be the latest. +				apubAcc = nil +  				// After webfinger, we now have correct account domain from which we can do a final DB check. -				alreadyAccount, err := d.db.GetAccountByUsernameDomain(ctx, account.Username, accDomain) +				alreadyAccount, err := d.state.DB.GetAccountByUsernameDomain(ctx, account.Username, accDomain)  				if err != nil && !errors.Is(err, db.ErrNoEntries) { -					return nil, fmt.Errorf("enrichAccount: db err looking for account again after webfinger: %w", err) +					return nil, nil, fmt.Errorf("enrichAccount: db err looking for account again after webfinger: %w", err)  				} -				if err == nil { +				if alreadyAccount != nil {  					// Enrich existing account.  					account = alreadyAccount  				} @@ -240,30 +315,49 @@ func (d *deref) enrichAccount(  	}  	if uri == nil { -		var err error -  		// No URI provided / found, must parse from account.  		uri, err = url.Parse(account.URI)  		if err != nil { -			return nil, fmt.Errorf("enrichAccount: invalid uri %q: %w", account.URI, err) +			return nil, nil, fmt.Errorf("enrichAccount: invalid uri %q: %w", account.URI, err)  		}  	}  	// Check whether this account URI is a blocked domain / subdomain. -	if blocked, err := d.db.IsDomainBlocked(ctx, uri.Host); err != nil { -		return nil, newErrDB(fmt.Errorf("enrichAccount: error checking blocked domain: %w", err)) +	if blocked, err := d.state.DB.IsDomainBlocked(ctx, uri.Host); err != nil { +		return nil, nil, fmt.Errorf("enrichAccount: error checking blocked domain: %w", err)  	} else if blocked { -		return nil, fmt.Errorf("enrichAccount: %s is blocked", uri.Host) +		return nil, nil, fmt.Errorf("enrichAccount: %s is blocked", uri.Host)  	}  	// Mark deref+update handshake start.  	d.startHandshake(requestUser, uri)  	defer d.stopHandshake(requestUser, uri) -	// Fetch latest version of the account, dereferencing if necessary. -	apubAcc, latestAcc, err := f(ctx, transport, uri, account.Domain) -	if err != nil { -		return nil, fmt.Errorf("enrichAccount: error calling fetchLatest function: %w", err) +	// By default we assume that apubAcc has been passed, +	// indicating that the given account is already latest. +	latestAcc := account + +	if apubAcc == nil { +		// Dereference latest version of the account. +		b, err := tsport.Dereference(ctx, uri) +		if err != nil { +			return nil, nil, &ErrNotRetrievable{fmt.Errorf("enrichAccount: error deferencing %s: %w", uri, err)} +		} + +		// Attempt to resolve ActivityPub account from data. +		apubAcc, err = ap.ResolveAccountable(ctx, b) +		if err != nil { +			return nil, nil, fmt.Errorf("enrichAccount: error resolving accountable from data for account %s: %w", uri, err) +		} + +		// Convert the dereferenced AP account object to our GTS model. +		latestAcc, err = d.typeConverter.ASRepresentationToAccount(ctx, +			apubAcc, +			account.Domain, +		) +		if err != nil { +			return nil, nil, fmt.Errorf("enrichAccount: error converting accountable to gts model for account %s: %w", uri, err) +		}  	}  	if account.Username == "" { @@ -281,11 +375,17 @@ func (d *deref) enrichAccount(  		// Assume the host from the returned ActivityPub representation.  		idProp := apubAcc.GetJSONLDId()  		if idProp == nil || !idProp.IsIRI() { -			return nil, errors.New("enrichAccount: no id property found on person, or id was not an iri") +			return nil, nil, errors.New("enrichAccount: no id property found on person, or id was not an iri")  		} + +		// Get IRI host value.  		accHost := idProp.GetIRI().Host -		accDomain, _, err := d.fingerRemoteAccount(ctx, transport, latestAcc.Username, accHost) +		latestAcc.Domain, _, err = d.fingerRemoteAccount(ctx, +			tsport, +			latestAcc.Username, +			accHost, +		)  		if err != nil {  			// We still couldn't webfinger the account, so we're not certain  			// what the accountDomain actually is. Still, we can make a solid @@ -293,9 +393,6 @@ func (d *deref) enrichAccount(  			// If we're wrong, we can just try again in a couple days.  			log.Errorf(ctx, "error webfingering[2] remote account %s@%s: %v", latestAcc.Username, accHost, err)  			latestAcc.Domain = accHost -		} else { -			// Update account with latest info. -			latestAcc.Domain = accDomain  		}  	} @@ -307,14 +404,15 @@ func (d *deref) enrichAccount(  	latestAcc.AvatarMediaAttachmentID = account.AvatarMediaAttachmentID  	latestAcc.HeaderMediaAttachmentID = account.HeaderMediaAttachmentID -	if force || (latestAcc.AvatarRemoteURL != account.AvatarRemoteURL) { +	if (latestAcc.AvatarMediaAttachmentID == "") || +		(latestAcc.AvatarRemoteURL != account.AvatarRemoteURL) {  		// Reset the avatar media ID (handles removed).  		latestAcc.AvatarMediaAttachmentID = ""  		if latestAcc.AvatarRemoteURL != "" {  			// Avatar has changed to a new one, fetch up-to-date copy and use new ID.  			latestAcc.AvatarMediaAttachmentID, err = d.fetchRemoteAccountAvatar(ctx, -				transport, +				tsport,  				latestAcc.AvatarRemoteURL,  				latestAcc.ID,  			) @@ -328,14 +426,15 @@ func (d *deref) enrichAccount(  		}  	} -	if force || (latestAcc.HeaderRemoteURL != account.HeaderRemoteURL) { +	if (latestAcc.HeaderMediaAttachmentID == "") || +		(latestAcc.HeaderRemoteURL != account.HeaderRemoteURL) {  		// Reset the header media ID (handles removed).  		latestAcc.HeaderMediaAttachmentID = ""  		if latestAcc.HeaderRemoteURL != "" {  			// Header has changed to a new one, fetch up-to-date copy and use new ID.  			latestAcc.HeaderMediaAttachmentID, err = d.fetchRemoteAccountHeader(ctx, -				transport, +				tsport,  				latestAcc.HeaderRemoteURL,  				latestAcc.ID,  			) @@ -363,15 +462,16 @@ func (d *deref) enrichAccount(  		latestAcc.UpdatedAt = latestAcc.FetchedAt  		// This is new, put it in the database. -		err := d.db.PutAccount(ctx, latestAcc) +		err := d.state.DB.PutAccount(ctx, latestAcc)  		if errors.Is(err, db.ErrAlreadyExists) {  			// TODO: replace this quick fix with per-URI deref locks. -			latestAcc, err = d.db.GetAccountByURI(ctx, latestAcc.URI) +			latestAcc, err = d.state.DB.GetAccountByURI(ctx, latestAcc.URI) +			return latestAcc, nil, err  		}  		if err != nil { -			return nil, fmt.Errorf("enrichAccount: error putting in database: %w", err) +			return nil, nil, fmt.Errorf("enrichAccount: error putting in database: %w", err)  		}  	} else {  		// Set time of update from the last-fetched date. @@ -382,35 +482,12 @@ func (d *deref) enrichAccount(  		latestAcc.Language = account.Language  		// This is an existing account, update the model in the database. -		if err := d.db.UpdateAccount(ctx, latestAcc); err != nil { -			return nil, fmt.Errorf("enrichAccount: error updating database: %w", err) +		if err := d.state.DB.UpdateAccount(ctx, latestAcc); err != nil { +			return nil, nil, fmt.Errorf("enrichAccount: error updating database: %w", err)  		}  	} -	if latestAcc.FeaturedCollectionURI != "" { -		// Fetch this account's pinned statuses, now that the account is in the database. -		// -		// The order is important here: if we tried to fetch the pinned statuses before -		// storing the account, the process might end up calling enrichAccount again, -		// causing us to get stuck in a loop. By calling it now, we make sure this doesn't -		// happen! -		if err := d.fetchRemoteAccountFeatured(ctx, requestUser, latestAcc.FeaturedCollectionURI, latestAcc.ID); err != nil { -			log.Errorf(ctx, "error fetching featured collection for account %s: %v", uri, err) -		} -	} - -	return latestAcc, nil -} - -// dereferenceAccountable calls remoteAccountID with a GET request, and tries to parse whatever -// it finds as something that an account model can be constructed out of. -func (d *deref) dereferenceAccountable(ctx context.Context, transport transport.Transport, remoteAccountID *url.URL) (ap.Accountable, error) { -	b, err := transport.Dereference(ctx, remoteAccountID) -	if err != nil { -		return nil, fmt.Errorf("dereferenceAccountable: error deferencing %s: %w", remoteAccountID.String(), err) -	} - -	return ap.ResolveAccountable(ctx, b) +	return latestAcc, apubAcc, nil  }  func (d *deref) fetchRemoteAccountAvatar(ctx context.Context, tsport transport.Transport, avatarURL string, accountID string) (string, error) { @@ -531,7 +608,7 @@ func (d *deref) fetchRemoteAccountEmojis(ctx context.Context, targetAccount *gts  	if len(maybeEmojiIDs) > len(maybeEmojis) {  		maybeEmojis = make([]*gtsmodel.Emoji, 0, len(maybeEmojiIDs))  		for _, emojiID := range maybeEmojiIDs { -			maybeEmoji, err := d.db.GetEmojiByID(ctx, emojiID) +			maybeEmoji, err := d.state.DB.GetEmojiByID(ctx, emojiID)  			if err != nil {  				return false, err  			} @@ -631,18 +708,18 @@ func (d *deref) fetchRemoteAccountEmojis(ctx context.Context, targetAccount *gts  	return changed, nil  } -// fetchRemoteAccountFeatured dereferences an account's featuredCollectionURI (if not empty). -// For each discovered status, this status will be dereferenced (if necessary) and marked as -// pinned (if necessary). Then, old pins will be removed if they're not included in new pins. -func (d *deref) fetchRemoteAccountFeatured(ctx context.Context, requestingUsername string, featuredCollectionURI string, accountID string) error { -	uri, err := url.Parse(featuredCollectionURI) +// dereferenceAccountFeatured dereferences an account's featuredCollectionURI (if not empty). For each discovered status, this status will +// be dereferenced (if necessary) and marked as pinned (if necessary). Then, old pins will be removed if they're not included in new pins. +func (d *deref) dereferenceAccountFeatured(ctx context.Context, requestUser string, account *gtsmodel.Account) error { +	uri, err := url.Parse(account.FeaturedCollectionURI)  	if err != nil {  		return err  	} -	tsport, err := d.transportController.NewTransportForUsername(ctx, requestingUsername) +	// Pre-fetch a transport for requesting username, used by later deref procedures. +	tsport, err := d.transportController.NewTransportForUsername(ctx, requestUser)  	if err != nil { -		return err +		return fmt.Errorf("enrichAccount: couldn't create transport: %w", err)  	}  	b, err := tsport.Dereference(ctx, uri) @@ -661,7 +738,7 @@ func (d *deref) fetchRemoteAccountFeatured(ctx context.Context, requestingUserna  	}  	if t.GetTypeName() != ap.ObjectOrderedCollection { -		return fmt.Errorf("%s was not an OrderedCollection", featuredCollectionURI) +		return fmt.Errorf("%s was not an OrderedCollection", uri)  	}  	collection, ok := t.(vocab.ActivityStreamsOrderedCollection) @@ -675,7 +752,7 @@ func (d *deref) fetchRemoteAccountFeatured(ctx context.Context, requestingUserna  	}  	// Get previous pinned statuses (we'll need these later). -	wasPinned, err := d.db.GetAccountPinnedStatuses(ctx, accountID) +	wasPinned, err := d.state.DB.GetAccountPinnedStatuses(ctx, account.ID)  	if err != nil && !errors.Is(err, db.ErrNoEntries) {  		return fmt.Errorf("error getting account pinned statuses: %w", err)  	} @@ -720,11 +797,10 @@ func (d *deref) fetchRemoteAccountFeatured(ctx context.Context, requestingUserna  		// we still know it was *meant* to be pinned.  		statusURIs = append(statusURIs, statusURI) -		status, _, err := d.GetStatus(ctx, requestingUsername, statusURI, false, false) +		status, _, err := d.getStatusByURI(ctx, requestUser, statusURI)  		if err != nil { -			// We couldn't get the status, bummer. -			// Just log + move on, we can try later. -			log.Errorf(ctx, "error getting status from featured collection %s: %s", featuredCollectionURI, err) +			// We couldn't get the status, bummer. Just log + move on, we can try later. +			log.Errorf(ctx, "error getting status from featured collection %s: %v", statusURI, err)  			continue  		} @@ -733,7 +809,7 @@ func (d *deref) fetchRemoteAccountFeatured(ctx context.Context, requestingUserna  			continue  		} -		if status.AccountID != accountID { +		if status.AccountID != account.ID {  			// Someone's pinned a status that doesn't  			// belong to them, this doesn't work for us.  			continue @@ -748,8 +824,9 @@ func (d *deref) fetchRemoteAccountFeatured(ctx context.Context, requestingUserna  		// All conditions are met for this status to  		// be pinned, so we can finally update it.  		status.PinnedAt = time.Now() -		if err := d.db.UpdateStatus(ctx, status, "pinned_at"); err != nil { -			log.Errorf(ctx, "error updating status in featured collection %s: %s", featuredCollectionURI, err) +		if err := d.state.DB.UpdateStatus(ctx, status, "pinned_at"); err != nil { +			log.Errorf(ctx, "error updating status in featured collection %s: %v", status.URI, err) +			continue  		}  	} @@ -768,8 +845,9 @@ outerLoop:  		// Status was pinned before, but is not included  		// in most recent pinned uris, so unpin it now.  		status.PinnedAt = time.Time{} -		if err := d.db.UpdateStatus(ctx, status, "pinned_at"); err != nil { -			return fmt.Errorf("error unpinning status: %w", err) +		if err := d.state.DB.UpdateStatus(ctx, status, "pinned_at"); err != nil { +			log.Errorf(ctx, "error unpinning status %s: %v", status.URI, err) +			continue  		}  	} diff --git a/internal/federation/dereferencing/account_test.go b/internal/federation/dereferencing/account_test.go index c4b946f5b..9cff0a171 100644 --- a/internal/federation/dereferencing/account_test.go +++ b/internal/federation/dereferencing/account_test.go @@ -37,7 +37,7 @@ func (suite *AccountTestSuite) TestDereferenceGroup() {  	fetchingAccount := suite.testAccounts["local_account_1"]  	groupURL := testrig.URLMustParse("https://unknown-instance.com/groups/some_group") -	group, err := suite.dereferencer.GetAccountByURI( +	group, _, err := suite.dereferencer.GetAccountByURI(  		context.Background(),  		fetchingAccount.Username,  		groupURL, @@ -61,7 +61,7 @@ func (suite *AccountTestSuite) TestDereferenceService() {  	fetchingAccount := suite.testAccounts["local_account_1"]  	serviceURL := testrig.URLMustParse("https://owncast.example.org/federation/user/rgh") -	service, err := suite.dereferencer.GetAccountByURI( +	service, _, err := suite.dereferencer.GetAccountByURI(  		context.Background(),  		fetchingAccount.Username,  		serviceURL, @@ -93,7 +93,7 @@ func (suite *AccountTestSuite) TestDereferenceLocalAccountAsRemoteURL() {  	fetchingAccount := suite.testAccounts["local_account_1"]  	targetAccount := suite.testAccounts["local_account_2"] -	fetchedAccount, err := suite.dereferencer.GetAccountByURI( +	fetchedAccount, _, err := suite.dereferencer.GetAccountByURI(  		context.Background(),  		fetchingAccount.Username,  		testrig.URLMustParse(targetAccount.URI), @@ -112,7 +112,7 @@ func (suite *AccountTestSuite) TestDereferenceLocalAccountAsRemoteURLNoSharedInb  		suite.FailNow(err.Error())  	} -	fetchedAccount, err := suite.dereferencer.GetAccountByURI( +	fetchedAccount, _, err := suite.dereferencer.GetAccountByURI(  		context.Background(),  		fetchingAccount.Username,  		testrig.URLMustParse(targetAccount.URI), @@ -126,7 +126,7 @@ func (suite *AccountTestSuite) TestDereferenceLocalAccountAsUsername() {  	fetchingAccount := suite.testAccounts["local_account_1"]  	targetAccount := suite.testAccounts["local_account_2"] -	fetchedAccount, err := suite.dereferencer.GetAccountByURI( +	fetchedAccount, _, err := suite.dereferencer.GetAccountByURI(  		context.Background(),  		fetchingAccount.Username,  		testrig.URLMustParse(targetAccount.URI), @@ -140,7 +140,7 @@ func (suite *AccountTestSuite) TestDereferenceLocalAccountAsUsernameDomain() {  	fetchingAccount := suite.testAccounts["local_account_1"]  	targetAccount := suite.testAccounts["local_account_2"] -	fetchedAccount, err := suite.dereferencer.GetAccountByURI( +	fetchedAccount, _, err := suite.dereferencer.GetAccountByURI(  		context.Background(),  		fetchingAccount.Username,  		testrig.URLMustParse(targetAccount.URI), @@ -154,7 +154,7 @@ func (suite *AccountTestSuite) TestDereferenceLocalAccountAsUsernameDomainAndURL  	fetchingAccount := suite.testAccounts["local_account_1"]  	targetAccount := suite.testAccounts["local_account_2"] -	fetchedAccount, err := suite.dereferencer.GetAccountByUsernameDomain( +	fetchedAccount, _, err := suite.dereferencer.GetAccountByUsernameDomain(  		context.Background(),  		fetchingAccount.Username,  		targetAccount.Username, @@ -168,7 +168,7 @@ func (suite *AccountTestSuite) TestDereferenceLocalAccountAsUsernameDomainAndURL  func (suite *AccountTestSuite) TestDereferenceLocalAccountWithUnknownUsername() {  	fetchingAccount := suite.testAccounts["local_account_1"] -	fetchedAccount, err := suite.dereferencer.GetAccountByUsernameDomain( +	fetchedAccount, _, err := suite.dereferencer.GetAccountByUsernameDomain(  		context.Background(),  		fetchingAccount.Username,  		"thisaccountdoesnotexist", @@ -183,7 +183,7 @@ func (suite *AccountTestSuite) TestDereferenceLocalAccountWithUnknownUsername()  func (suite *AccountTestSuite) TestDereferenceLocalAccountWithUnknownUsernameDomain() {  	fetchingAccount := suite.testAccounts["local_account_1"] -	fetchedAccount, err := suite.dereferencer.GetAccountByUsernameDomain( +	fetchedAccount, _, err := suite.dereferencer.GetAccountByUsernameDomain(  		context.Background(),  		fetchingAccount.Username,  		"thisaccountdoesnotexist", @@ -198,7 +198,7 @@ func (suite *AccountTestSuite) TestDereferenceLocalAccountWithUnknownUsernameDom  func (suite *AccountTestSuite) TestDereferenceLocalAccountWithUnknownUserURI() {  	fetchingAccount := suite.testAccounts["local_account_1"] -	fetchedAccount, err := suite.dereferencer.GetAccountByURI( +	fetchedAccount, _, err := suite.dereferencer.GetAccountByURI(  		context.Background(),  		fetchingAccount.Username,  		testrig.URLMustParse("http://localhost:8080/users/thisaccountdoesnotexist"), diff --git a/internal/federation/dereferencing/announce.go b/internal/federation/dereferencing/announce.go index 2258cbe5f..e479e41b1 100644 --- a/internal/federation/dereferencing/announce.go +++ b/internal/federation/dereferencing/announce.go @@ -40,7 +40,7 @@ func (d *deref) DereferenceAnnounce(ctx context.Context, announce *gtsmodel.Stat  	}  	// Check whether the originating status is from a blocked host -	if blocked, err := d.db.IsDomainBlocked(ctx, boostedURI.Host); blocked || err != nil { +	if blocked, err := d.state.DB.IsDomainBlocked(ctx, boostedURI.Host); blocked || err != nil {  		return fmt.Errorf("DereferenceAnnounce: domain %s is blocked", boostedURI.Host)  	} @@ -48,7 +48,7 @@ func (d *deref) DereferenceAnnounce(ctx context.Context, announce *gtsmodel.Stat  	if boostedURI.Host == config.GetHost() {  		// This is a local status, fetch from the database -		status, err := d.db.GetStatusByURI(ctx, boostedURI.String()) +		status, err := d.state.DB.GetStatusByURI(ctx, boostedURI.String())  		if err != nil {  			return fmt.Errorf("DereferenceAnnounce: error fetching local status %q: %v", announce.BoostOf.URI, err)  		} @@ -57,14 +57,11 @@ func (d *deref) DereferenceAnnounce(ctx context.Context, announce *gtsmodel.Stat  		boostedStatus = status  	} else {  		// This is a boost of a remote status, we need to dereference it. -		status, statusable, err := d.GetStatus(ctx, requestingUsername, boostedURI, true, true) +		status, _, err := d.GetStatusByURI(ctx, requestingUsername, boostedURI)  		if err != nil {  			return fmt.Errorf("DereferenceAnnounce: error dereferencing remote status with id %s: %s", announce.BoostOf.URI, err)  		} -		// Dereference all statuses in the thread of the boosted status -		d.DereferenceThread(ctx, requestingUsername, boostedURI, status, statusable) -  		// Set boosted status  		boostedStatus = status  	} diff --git a/internal/federation/dereferencing/collectionpage.go b/internal/federation/dereferencing/collectionpage.go index 3f9271256..d76c4b2ab 100644 --- a/internal/federation/dereferencing/collectionpage.go +++ b/internal/federation/dereferencing/collectionpage.go @@ -29,9 +29,9 @@ import (  	"github.com/superseriousbusiness/gotosocial/internal/ap"  ) -// DereferenceCollectionPage returns the activitystreams CollectionPage at the specified IRI, or an error if something goes wrong. -func (d *deref) DereferenceCollectionPage(ctx context.Context, username string, pageIRI *url.URL) (ap.CollectionPageable, error) { -	if blocked, err := d.db.IsDomainBlocked(ctx, pageIRI.Host); blocked || err != nil { +// dereferenceCollectionPage returns the activitystreams CollectionPage at the specified IRI, or an error if something goes wrong. +func (d *deref) dereferenceCollectionPage(ctx context.Context, username string, pageIRI *url.URL) (ap.CollectionPageable, error) { +	if blocked, err := d.state.DB.IsDomainBlocked(ctx, pageIRI.Host); blocked || err != nil {  		return nil, fmt.Errorf("DereferenceCollectionPage: domain %s is blocked", pageIRI.Host)  	} diff --git a/internal/federation/dereferencing/dereferencer.go b/internal/federation/dereferencing/dereferencer.go index 181196caa..26d3432d0 100644 --- a/internal/federation/dereferencing/dereferencer.go +++ b/internal/federation/dereferencing/dereferencer.go @@ -24,42 +24,61 @@ import (  	"codeberg.org/gruf/go-mutexes"  	"github.com/superseriousbusiness/gotosocial/internal/ap" -	"github.com/superseriousbusiness/gotosocial/internal/db"  	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"  	"github.com/superseriousbusiness/gotosocial/internal/media" +	"github.com/superseriousbusiness/gotosocial/internal/state"  	"github.com/superseriousbusiness/gotosocial/internal/transport"  	"github.com/superseriousbusiness/gotosocial/internal/typeutils"  )  // Dereferencer wraps logic and functionality for doing dereferencing of remote accounts, statuses, etc, from federated instances.  type Dereferencer interface { -	// GetAccountByURI will attempt to fetch an account by its URI, first checking the database and in the case of a remote account will either check the -	// last_fetched (and updating if beyond fetch interval) or dereferencing for the first-time if this remote account has never been encountered before. -	GetAccountByURI(ctx context.Context, requestUser string, uri *url.URL) (*gtsmodel.Account, error) +	// GetAccountByURI will attempt to fetch an accounts by its URI, first checking the database. In the case of a newly-met remote model, or a remote model +	// whose last_fetched date is beyond a certain interval, the account will be dereferenced. In the case of dereferencing, some low-priority account information +	// may be enqueued for asynchronous fetching, e.g. featured account statuses (pins). An ActivityPub object indicates the account was dereferenced. +	GetAccountByURI(ctx context.Context, requestUser string, uri *url.URL) (*gtsmodel.Account, ap.Accountable, error) -	// GetAccountByUsernameDomain will attempt to fetch an account by username@domain, first checking the database and in the case of a remote account will either -	// check the last_fetched (and updating if beyond fetch interval) or dereferencing for the first-time if this remote account has never been encountered before. -	GetAccountByUsernameDomain(ctx context.Context, requestUser string, username string, domain string) (*gtsmodel.Account, error) +	// GetAccountByUsernameDomain will attempt to fetch an accounts by its username@domain, first checking the database. In the case of a newly-met remote model, +	// or a remote model whose last_fetched date is beyond a certain interval, the account will be dereferenced. In the case of dereferencing, some low-priority +	// account information may be enqueued for asynchronous fetching, e.g. featured account statuses (pins). An ActivityPub object indicates the account was dereferenced. +	GetAccountByUsernameDomain(ctx context.Context, requestUser string, username string, domain string) (*gtsmodel.Account, ap.Accountable, error) -	// RefreshAccount forces a refresh of the given account by fetching the current/latest state of the account from the remote instance. -	// An updated account model is returned, but not yet inserted/updated in the database; this is the caller's responsibility. -	RefreshAccount(ctx context.Context, requestUser string, accountable ap.Accountable, account *gtsmodel.Account) (*gtsmodel.Account, error) +	// RefreshAccount updates the given account if remote and last_fetched is beyond fetch interval, or if force is set. An updated account model is returned, +	// but in the case of dereferencing, some low-priority account information may be enqueued for asynchronous fetching, e.g. featured account statuses (pins). +	// An ActivityPub object indicates the account was dereferenced (i.e. updated). +	RefreshAccount(ctx context.Context, requestUser string, account *gtsmodel.Account, apubAcc ap.Accountable, force bool) (*gtsmodel.Account, ap.Accountable, error) -	GetStatus(ctx context.Context, username string, remoteStatusID *url.URL, refetch, includeParent bool) (*gtsmodel.Status, ap.Statusable, error) +	// RefreshAccountAsync enqueues the given account for an asychronous update fetching, if last_fetched is beyond fetch interval, or if forcc is set. +	// This is a more optimized form of manually enqueueing .UpdateAccount() to the federation worker, since it only enqueues update if necessary. +	RefreshAccountAsync(ctx context.Context, requestUser string, account *gtsmodel.Account, apubAcc ap.Accountable, force bool) + +	// GetStatusByURI will attempt to fetch a status by its URI, first checking the database. In the case of a newly-met remote model, or a remote model +	// whose last_fetched date is beyond a certain interval, the status will be dereferenced. In the case of dereferencing, some low-priority status information +	// may be enqueued for asynchronous fetching, e.g. dereferencing the remainder of the status thread. An ActivityPub object indicates the status was dereferenced. +	GetStatusByURI(ctx context.Context, requestUser string, uri *url.URL) (*gtsmodel.Status, ap.Statusable, error) + +	// RefreshStatus updates the given status if remote and last_fetched is beyond fetch interval, or if force is set. An updated status model is returned, +	// but in the case of dereferencing, some low-priority status information may be enqueued for asynchronous fetching, e.g. dereferencing the remainder of the +	// status thread. An ActivityPub object indicates the status was dereferenced (i.e. updated). +	RefreshStatus(ctx context.Context, requestUser string, status *gtsmodel.Status, apubStatus ap.Statusable, force bool) (*gtsmodel.Status, ap.Statusable, error) + +	// RefreshStatusAsync enqueues the given status for an asychronous update fetching, if last_fetched is beyond fetch interval, or if force is set. +	// This is a more optimized form of manually enqueueing .UpdateStatus() to the federation worker, since it only enqueues update if necessary. +	RefreshStatusAsync(ctx context.Context, requestUser string, status *gtsmodel.Status, apubStatus ap.Statusable, force bool) -	EnrichRemoteStatus(ctx context.Context, username string, status *gtsmodel.Status, includeParent bool) (*gtsmodel.Status, error)  	GetRemoteInstance(ctx context.Context, username string, remoteInstanceURI *url.URL) (*gtsmodel.Instance, error) +  	DereferenceAnnounce(ctx context.Context, announce *gtsmodel.Status, requestingUsername string) error -	DereferenceThread(ctx context.Context, username string, statusIRI *url.URL, status *gtsmodel.Status, statusable ap.Statusable)  	GetRemoteMedia(ctx context.Context, requestingUsername string, accountID string, remoteURL string, ai *media.AdditionalMediaInfo) (*media.ProcessingMedia, error) +  	GetRemoteEmoji(ctx context.Context, requestingUsername string, remoteURL string, shortcode string, domain string, id string, emojiURI string, ai *media.AdditionalEmojiInfo, refresh bool) (*media.ProcessingEmoji, error)  	Handshaking(username string, remoteAccountID *url.URL) bool  }  type deref struct { -	db                  db.DB +	state               *state.State  	typeConverter       typeutils.TypeConverter  	transportController transport.Controller  	mediaManager        media.Manager @@ -74,9 +93,9 @@ type deref struct {  }  // NewDereferencer returns a Dereferencer initialized with the given parameters. -func NewDereferencer(db db.DB, typeConverter typeutils.TypeConverter, transportController transport.Controller, mediaManager media.Manager) Dereferencer { +func NewDereferencer(state *state.State, typeConverter typeutils.TypeConverter, transportController transport.Controller, mediaManager media.Manager) Dereferencer {  	return &deref{ -		db:                  db, +		state:               state,  		typeConverter:       typeConverter,  		transportController: transportController,  		mediaManager:        mediaManager, diff --git a/internal/federation/dereferencing/dereferencer_test.go b/internal/federation/dereferencing/dereferencer_test.go index f90043e7a..3cec176fe 100644 --- a/internal/federation/dereferencing/dereferencer_test.go +++ b/internal/federation/dereferencing/dereferencer_test.go @@ -65,7 +65,7 @@ func (suite *DereferencerStandardTestSuite) SetupTest() {  	suite.state.DB = suite.db  	suite.state.Storage = suite.storage  	media := testrig.NewTestMediaManager(&suite.state) -	suite.dereferencer = dereferencing.NewDereferencer(suite.db, testrig.NewTestTypeConverter(suite.db), testrig.NewTestTransportController(&suite.state, testrig.NewMockHTTPClient(nil, "../../../testrig/media")), media) +	suite.dereferencer = dereferencing.NewDereferencer(&suite.state, testrig.NewTestTypeConverter(suite.db), testrig.NewTestTransportController(&suite.state, testrig.NewMockHTTPClient(nil, "../../../testrig/media")), media)  	testrig.StandardDBSetup(suite.db, nil)  } diff --git a/internal/federation/dereferencing/emoji.go b/internal/federation/dereferencing/emoji.go index e1fdd8cc3..857d0c040 100644 --- a/internal/federation/dereferencing/emoji.go +++ b/internal/federation/dereferencing/emoji.go @@ -109,7 +109,7 @@ func (d *deref) populateEmojis(ctx context.Context, rawEmojis []*gtsmodel.Emoji,  			// it should be fleshed out already and we won't  			// have to get it from the database again  			gotEmoji = e -		} else if gotEmoji, err = d.db.GetEmojiByShortcodeDomain(ctx, e.Shortcode, e.Domain); err != nil && err != db.ErrNoEntries { +		} else if gotEmoji, err = d.state.DB.GetEmojiByShortcodeDomain(ctx, e.Shortcode, e.Domain); err != nil && err != db.ErrNoEntries {  			log.Errorf(ctx, "error checking database for emoji %s: %s", shortcodeDomain, err)  			continue  		} diff --git a/internal/federation/dereferencing/error.go b/internal/federation/dereferencing/error.go index 769150cef..1b8d90653 100644 --- a/internal/federation/dereferencing/error.go +++ b/internal/federation/dereferencing/error.go @@ -19,25 +19,8 @@ package dereferencing  import (  	"fmt" -	"net/http" - -	"github.com/superseriousbusiness/gotosocial/internal/gtserror"  ) -// ErrDB denotes that a proper error has occurred when doing -// a database call, as opposed to a simple db.ErrNoEntries. -type ErrDB struct { -	wrapped error -} - -func (err *ErrDB) Error() string { -	return fmt.Sprintf("database error during dereferencing: %v", err.wrapped) -} - -func newErrDB(err error) error { -	return &ErrDB{wrapped: err} -} -  // ErrNotRetrievable denotes that an item could not be dereferenced  // with the given parameters.  type ErrNotRetrievable struct { @@ -51,52 +34,3 @@ func (err *ErrNotRetrievable) Error() string {  func NewErrNotRetrievable(err error) error {  	return &ErrNotRetrievable{wrapped: err}  } - -// ErrTransportError indicates that something unforeseen went wrong creating -// a transport, or while making an http call to a remote resource with a transport. -type ErrTransportError struct { -	wrapped error -} - -func (err *ErrTransportError) Error() string { -	return fmt.Sprintf("transport error: %v", err.wrapped) -} - -func newErrTransportError(err error) error { -	return &ErrTransportError{wrapped: err} -} - -// ErrOther denotes some other kind of weird error, perhaps from a malformed json -// or some other weird crapola. -type ErrOther struct { -	wrapped error -} - -func (err *ErrOther) Error() string { -	return fmt.Sprintf("unexpected error: %v", err.wrapped) -} - -func newErrOther(err error) error { -	return &ErrOther{wrapped: err} -} - -func wrapDerefError(derefErr error, fluff string) error { -	// Wrap with fluff. -	err := derefErr -	if fluff != "" { -		err = fmt.Errorf("%s: %w", fluff, derefErr) -	} - -	// Check for unretrievable HTTP status code errors. -	if code := gtserror.StatusCode(derefErr); // nocollapse -	code == http.StatusGone || code == http.StatusNotFound { -		return NewErrNotRetrievable(err) -	} - -	// Check for other untrievable errors. -	if gtserror.NotFound(derefErr) { -		return NewErrNotRetrievable(err) -	} - -	return err -} diff --git a/internal/federation/dereferencing/finger.go b/internal/federation/dereferencing/finger.go index f4cecb9c4..fff079327 100644 --- a/internal/federation/dereferencing/finger.go +++ b/internal/federation/dereferencing/finger.go @@ -69,6 +69,5 @@ func (d *deref) fingerRemoteAccount(ctx context.Context, transport transport.Tra  		}  	} -	err = errors.New("fingerRemoteAccount: no match found in webfinger response") -	return +	return "", nil, errors.New("fingerRemoteAccount: no match found in webfinger response")  } diff --git a/internal/federation/dereferencing/instance.go b/internal/federation/dereferencing/instance.go index 6f8d2b23f..742239637 100644 --- a/internal/federation/dereferencing/instance.go +++ b/internal/federation/dereferencing/instance.go @@ -26,7 +26,7 @@ import (  )  func (d *deref) GetRemoteInstance(ctx context.Context, username string, remoteInstanceURI *url.URL) (*gtsmodel.Instance, error) { -	if blocked, err := d.db.IsDomainBlocked(ctx, remoteInstanceURI.Host); blocked || err != nil { +	if blocked, err := d.state.DB.IsDomainBlocked(ctx, remoteInstanceURI.Host); blocked || err != nil {  		return nil, fmt.Errorf("GetRemoteInstance: domain %s is blocked", remoteInstanceURI.Host)  	} diff --git a/internal/federation/dereferencing/status.go b/internal/federation/dereferencing/status.go index 8e130393a..dfec0605f 100644 --- a/internal/federation/dereferencing/status.go +++ b/internal/federation/dereferencing/status.go @@ -21,8 +21,9 @@ import (  	"context"  	"errors"  	"fmt" +	"io"  	"net/url" -	"strings" +	"time"  	"github.com/superseriousbusiness/gotosocial/internal/ap"  	"github.com/superseriousbusiness/gotosocial/internal/config" @@ -34,374 +35,430 @@ import (  	"github.com/superseriousbusiness/gotosocial/internal/transport"  ) -// EnrichRemoteStatus takes a remote status that's already been inserted into the database in a minimal form, -// and populates it with additional fields, media, etc. -// -// EnrichRemoteStatus is mostly useful for calling after a status has been initially created by -// the federatingDB's Create function, but additional dereferencing is needed on it. -func (d *deref) EnrichRemoteStatus(ctx context.Context, username string, status *gtsmodel.Status, includeParent bool) (*gtsmodel.Status, error) { -	if err := d.populateStatusFields(ctx, status, username, includeParent); err != nil { -		return nil, err +// statusUpToDate returns whether the given status model is both updateable +// (i.e. remote status) and whether it needs an update based on `fetched_at`. +func statusUpToDate(status *gtsmodel.Status) bool { +	if *status.Local { +		// Can't update local statuses. +		return true  	} -	if err := d.db.UpdateStatus(ctx, status); err != nil { -		return nil, err + +	// If this status was updated recently (last interval), we return as-is. +	if next := status.FetchedAt.Add(2 * time.Hour); time.Now().Before(next) { +		return true  	} -	return status, nil + +	return false  } -// GetStatus completely dereferences a status, converts it to a GtS model status, -// puts it in the database, and returns it to a caller. -// -// If refetch is true, then regardless of whether we have the original status in the database or not, -// the ap.Statusable representation of the status will be dereferenced and returned. -// -// If refetch is false, the ap.Statusable will only be returned if this is a new status, so callers -// should check whether or not this is nil. -// -// GetAccount will guard against trying to do http calls to fetch a status that belongs to this instance. -// Instead of making calls, it will just return the status early if it finds it, or return an error. -func (d *deref) GetStatus(ctx context.Context, username string, statusURI *url.URL, refetch, includeParent bool) (*gtsmodel.Status, ap.Statusable, error) { -	uriString := statusURI.String() - -	// try to get by URI first -	status, dbErr := d.db.GetStatusByURI(ctx, uriString) -	if dbErr != nil { -		if !errors.Is(dbErr, db.ErrNoEntries) { -			// real error -			return nil, nil, newErrDB(fmt.Errorf("GetRemoteStatus: error during GetStatusByURI for %s: %w", uriString, dbErr)) -		} -		// no problem, just press on -	} else if !refetch { -		// we already had the status and we aren't being asked to refetch the AP representation -		return status, nil, nil +// GetStatus: implements Dereferencer{}.GetStatus(). +func (d *deref) GetStatusByURI(ctx context.Context, requestUser string, uri *url.URL) (*gtsmodel.Status, ap.Statusable, error) { +	// Fetch and dereference status if necessary. +	status, apubStatus, err := d.getStatusByURI(ctx, +		requestUser, +		uri, +	) +	if err != nil { +		return nil, nil, err +	} + +	if apubStatus != nil { +		// This status was updated, enqueue re-dereferencing the whole thread. +		d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { +			d.dereferenceThread(ctx, requestUser, uri, status, apubStatus) +		}) +	} + +	return status, apubStatus, nil +} + +// getStatusByURI is a package internal form of .GetStatusByURI() that doesn't bother dereferencing the whole thread on update. +func (d *deref) getStatusByURI(ctx context.Context, requestUser string, uri *url.URL) (*gtsmodel.Status, ap.Statusable, error) { +	var ( +		status *gtsmodel.Status +		uriStr = uri.String() +		err    error +	) + +	// Search the database for existing status with ID URI. +	status, err = d.state.DB.GetStatusByURI(ctx, uriStr) +	if err != nil && !errors.Is(err, db.ErrNoEntries) { +		return nil, nil, fmt.Errorf("GetStatusByURI: error checking database for status %s by uri: %w", uriStr, err)  	} -	// try to get by URL if we couldn't get by URI now  	if status == nil { -		status, dbErr = d.db.GetStatusByURL(ctx, uriString) -		if dbErr != nil { -			if !errors.Is(dbErr, db.ErrNoEntries) { -				// real error -				return nil, nil, newErrDB(fmt.Errorf("GetRemoteStatus: error during GetStatusByURI for %s: %w", uriString, dbErr)) -			} -			// no problem, just press on -		} else if !refetch { -			// we already had the status and we aren't being asked to refetch the AP representation -			return status, nil, nil +		// Else, search the database for existing by ID URL. +		status, err = d.state.DB.GetStatusByURL(ctx, uriStr) +		if err != nil && !errors.Is(err, db.ErrNoEntries) { +			return nil, nil, fmt.Errorf("GetStatusByURI: error checking database for status %s by url: %w", uriStr, err)  		}  	} -	// guard against having our own statuses passed in -	if host := statusURI.Host; host == config.GetHost() || host == config.GetAccountDomain() { -		// this is our status, definitely don't search for it -		if status != nil { -			return status, nil, nil +	if status == nil { +		// Ensure that this is isn't a search for a local status. +		if uri.Host == config.GetHost() || uri.Host == config.GetAccountDomain() { +			return nil, nil, NewErrNotRetrievable(err) // this will be db.ErrNoEntries  		} -		return nil, nil, NewErrNotRetrievable(fmt.Errorf("GetRemoteStatus: uri %s is apparently ours, but we have nothing in the db for it, will not proceed to dereference our own status", uriString)) + +		// Create and pass-through a new bare-bones model for deref. +		return d.enrichStatus(ctx, requestUser, uri, >smodel.Status{ +			Local: func() *bool { var false bool; return &false }(), +			URI:   uriStr, +		}, nil)  	} -	// if we got here, either we didn't have the status -	// in the db, or we had it but need to refetch it -	tsport, err := d.transportController.NewTransportForUsername(ctx, username) +	// Try to update + deref existing status model. +	latest, apubStatus, err := d.enrichStatus(ctx, +		requestUser, +		uri, +		status, +		nil, +	)  	if err != nil { -		return nil, nil, newErrTransportError(fmt.Errorf("GetRemoteStatus: error creating transport for %s: %w", username, err)) -	} +		log.Errorf(ctx, "error enriching remote status: %v", err) -	statusable, derefErr := d.dereferenceStatusable(ctx, tsport, statusURI) -	if derefErr != nil { -		return nil, nil, wrapDerefError(derefErr, "GetRemoteStatus: error dereferencing statusable") -	} +		// Update fetch-at to slow re-attempts. +		status.FetchedAt = time.Now() +		_ = d.state.DB.UpdateStatus(ctx, status, "fetched_at") -	if status != nil && refetch { -		// we already had the status in the db, and we've also -		// now fetched the AP representation as requested -		return status, statusable, nil +		// Fallback to existing. +		return status, nil, nil  	} -	// from here on out we can consider this to be a 'new' status because we didn't have the status in the db already -	accountURI, err := ap.ExtractAttributedTo(statusable) -	if err != nil { -		return nil, nil, newErrOther(fmt.Errorf("GetRemoteStatus: error extracting attributedTo: %w", err)) -	} +	return latest, apubStatus, nil +} -	// we need to get the author of the status else we can't serialize it properly -	if _, err = d.GetAccountByURI(ctx, username, accountURI); err != nil { -		return nil, nil, newErrOther(fmt.Errorf("GetRemoteStatus: couldn't get status author: %s", err)) +// RefreshStatus: implements Dereferencer{}.RefreshStatus(). +func (d *deref) RefreshStatus(ctx context.Context, requestUser string, status *gtsmodel.Status, apubStatus ap.Statusable, force bool) (*gtsmodel.Status, ap.Statusable, error) { +	// Check whether needs update. +	if statusUpToDate(status) { +		return status, nil, nil  	} -	status, err = d.typeConverter.ASStatusToStatus(ctx, statusable) +	// Parse the URI from status. +	uri, err := url.Parse(status.URI)  	if err != nil { -		return nil, nil, newErrOther(fmt.Errorf("GetRemoteStatus: error converting statusable to status: %s", err)) +		return nil, nil, fmt.Errorf("RefreshStatus: invalid status uri %q: %w", status.URI, err)  	} -	ulid, err := id.NewULIDFromTime(status.CreatedAt) +	// Try to update + deref existing status model. +	latest, apubStatus, err := d.enrichStatus(ctx, +		requestUser, +		uri, +		status, +		apubStatus, +	)  	if err != nil { -		return nil, nil, newErrOther(fmt.Errorf("GetRemoteStatus: error generating new id for status: %s", err)) -	} -	status.ID = ulid - -	if err := d.populateStatusFields(ctx, status, username, includeParent); err != nil { -		return nil, nil, newErrOther(fmt.Errorf("GetRemoteStatus: error populating status fields: %s", err)) +		return nil, nil, err  	} -	if err := d.db.PutStatus(ctx, status); err != nil && !errors.Is(err, db.ErrAlreadyExists) { -		return nil, nil, newErrDB(fmt.Errorf("GetRemoteStatus: error putting new status: %s", err)) -	} +	// This status was updated, enqueue re-dereferencing the whole thread. +	d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { +		d.dereferenceThread(ctx, requestUser, uri, latest, apubStatus) +	}) -	return status, statusable, nil +	return latest, apubStatus, nil  } -func (d *deref) dereferenceStatusable(ctx context.Context, tsport transport.Transport, remoteStatusID *url.URL) (ap.Statusable, error) { -	if blocked, err := d.db.IsDomainBlocked(ctx, remoteStatusID.Host); blocked || err != nil { -		return nil, fmt.Errorf("DereferenceStatusable: domain %s is blocked", remoteStatusID.Host) +// RefreshStatusAsync: implements Dereferencer{}.RefreshStatusAsync(). +func (d *deref) RefreshStatusAsync(ctx context.Context, requestUser string, status *gtsmodel.Status, apubStatus ap.Statusable, force bool) { +	// Check whether needs update. +	if statusUpToDate(status) { +		return  	} -	b, err := tsport.Dereference(ctx, remoteStatusID) +	// Parse the URI from status. +	uri, err := url.Parse(status.URI)  	if err != nil { -		return nil, fmt.Errorf("dereferenceStatusable: error deferencing %s: %w", remoteStatusID.String(), err) +		log.Errorf(ctx, "RefreshStatusAsync: invalid status uri %q: %v", status.URI, err) +		return  	} -	return ap.ResolveStatusable(ctx, b) +	// Enqueue a worker function to re-fetch this status async. +	d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { +		latest, apubStatus, err := d.enrichStatus(ctx, requestUser, uri, status, apubStatus) +		if err != nil { +			log.Errorf(ctx, "error enriching remote status: %v", err) +			return +		} + +		// This status was updated, re-dereference the whole thread. +		d.dereferenceThread(ctx, requestUser, uri, latest, apubStatus) +	})  } -// populateStatusFields fetches all the information we temporarily pinned to an incoming -// federated status, back in the federating db's Create function. -// -// When a status comes in from the federation API, there are certain fields that -// haven't been dereferenced yet, because we needed to provide a snappy synchronous -// response to the caller. By the time it reaches this function though, it's being -// processed asynchronously, so we have all the time in the world to fetch the various -// bits and bobs that are attached to the status, and properly flesh it out, before we -// send the status to any timelines and notify people. -// -// Things to dereference and fetch here: -// -// 1. Media attachments. -// 2. Hashtags. -// 3. Emojis. -// 4. Mentions. -// 5. Replied-to-status. -// -// SIDE EFFECTS: -// This function will deference all of the above, insert them in the database as necessary, -// and attach them to the status. The status itself will not be added to the database yet, -// that's up the caller to do. -func (d *deref) populateStatusFields(ctx context.Context, status *gtsmodel.Status, requestingUsername string, includeParent bool) error { -	statusIRI, err := url.Parse(status.URI) +// enrichStatus will enrich the given status, whether a new barebones model, or existing model from the database. It handles necessary dereferencing etc. +func (d *deref) enrichStatus(ctx context.Context, requestUser string, uri *url.URL, status *gtsmodel.Status, apubStatus ap.Statusable) (*gtsmodel.Status, ap.Statusable, error) { +	// Pre-fetch a transport for requesting username, used by later dereferencing. +	tsport, err := d.transportController.NewTransportForUsername(ctx, requestUser)  	if err != nil { -		return fmt.Errorf("populateStatusFields: couldn't parse status URI %s: %s", status.URI, err) +		return nil, nil, fmt.Errorf("enrichStatus: couldn't create transport: %w", err)  	} -	blocked, err := d.db.IsURIBlocked(ctx, statusIRI) -	if err != nil { -		return fmt.Errorf("populateStatusFields: error checking blocked status of %s: %s", statusIRI, err) -	} -	if blocked { -		return fmt.Errorf("populateStatusFields: domain %s is blocked", statusIRI) +	// Check whether this account URI is a blocked domain / subdomain. +	if blocked, err := d.state.DB.IsDomainBlocked(ctx, uri.Host); err != nil { +		return nil, nil, fmt.Errorf("enrichStatus: error checking blocked domain: %w", err) +	} else if blocked { +		return nil, nil, fmt.Errorf("enrichStatus: %s is blocked", uri.Host)  	} -	// in case the status doesn't have an id yet (ie., it hasn't entered the database yet), then create one -	if status.ID == "" { -		newID, err := id.NewULIDFromTime(status.CreatedAt) +	var derefd bool + +	if apubStatus == nil { +		// Dereference latest version of the status. +		b, err := tsport.Dereference(ctx, uri)  		if err != nil { -			return fmt.Errorf("populateStatusFields: error creating ulid for status: %s", err) +			return nil, nil, &ErrNotRetrievable{fmt.Errorf("enrichStatus: error deferencing %s: %w", uri, err)}  		} -		status.ID = newID -	} -	// 1. Media attachments. -	if err := d.populateStatusAttachments(ctx, status, requestingUsername); err != nil { -		return fmt.Errorf("populateStatusFields: error populating status attachments: %s", err) -	} +		// Attempt to resolve ActivityPub status from data. +		apubStatus, err = ap.ResolveStatusable(ctx, b) +		if err != nil { +			return nil, nil, fmt.Errorf("enrichStatus: error resolving statusable from data for account %s: %w", uri, err) +		} -	// 2. Hashtags -	// TODO +		// Mark as deref'd. +		derefd = true +	} -	// 3. Emojis -	if err := d.populateStatusEmojis(ctx, status, requestingUsername); err != nil { -		return fmt.Errorf("populateStatusFields: error populating status emojis: %s", err) +	// Get the attributed-to status in order to fetch profile. +	attributedTo, err := ap.ExtractAttributedTo(apubStatus) +	if err != nil { +		return nil, nil, errors.New("enrichStatus: attributedTo was empty")  	} -	// 4. Mentions -	// TODO: do we need to handle removing empty mention objects and just using mention IDs slice? -	if err := d.populateStatusMentions(ctx, status, requestingUsername); err != nil { -		return fmt.Errorf("populateStatusFields: error populating status mentions: %s", err) +	// Ensure we have the author account of the status dereferenced (+ up-to-date). +	if author, _, err := d.getAccountByURI(ctx, requestUser, attributedTo); err != nil { +		if status.AccountID == "" { +			// Provided status account is nil, i.e. this is a new status / author, so a deref fail is unrecoverable. +			return nil, nil, fmt.Errorf("enrichStatus: failed to dereference status author %s: %w", uri, err) +		} +	} else if status.AccountID != "" && status.AccountID != author.ID { +		// There already existed an account for this status author, but account ID changed. This shouldn't happen! +		log.Warnf(ctx, "status author account ID changed: old=%s new=%s", status.AccountID, author.ID)  	} -	// 5. Replied-to-status (only if requested) -	if includeParent { -		if err := d.populateStatusRepliedTo(ctx, status, requestingUsername); err != nil { -			return fmt.Errorf("populateStatusFields: error populating status repliedTo: %s", err) +	// By default we assume that apubStatus has been passed, +	// indicating that the given status is already latest. +	latestStatus := status + +	if derefd { +		// ActivityPub model was recently dereferenced, so assume that passed status +		// may contain out-of-date information, convert AP model to our GTS model. +		latestStatus, err = d.typeConverter.ASStatusToStatus(ctx, apubStatus) +		if err != nil { +			return nil, nil, fmt.Errorf("enrichStatus: error converting statusable to gts model for status %s: %w", uri, err)  		}  	} -	return nil -} +	// Use existing status ID. +	latestStatus.ID = status.ID -func (d *deref) populateStatusMentions(ctx context.Context, status *gtsmodel.Status, requestingUsername string) error { -	// At this point, mentions should have the namestring and mentionedAccountURI set on them. -	// We can use these to find the accounts. - -	mentionIDs := []string{} -	newMentions := []*gtsmodel.Mention{} -	for _, m := range status.Mentions { -		if m.ID != "" { -			// we've already populated this mention, since it has an ID -			log.Debug(ctx, "mention already populated") -			mentionIDs = append(mentionIDs, m.ID) -			newMentions = append(newMentions, m) -			continue +	if latestStatus.ID == "" { +		// Generate new status ID from the provided creation date. +		latestStatus.ID, err = id.NewULIDFromTime(latestStatus.CreatedAt) +		if err != nil { +			return nil, nil, fmt.Errorf("enrichStatus: invalid created at date: %w", err)  		} +	} -		if m.TargetAccountURI == "" { -			log.Debug(ctx, "target URI not set on mention") -			continue +	// Carry-over values and set fetch time. +	latestStatus.FetchedAt = time.Now() +	latestStatus.Local = status.Local + +	// Ensure the status' mentions are populated, and pass in existing to check for changes. +	if err := d.fetchStatusMentions(ctx, requestUser, status, latestStatus); err != nil { +		return nil, nil, fmt.Errorf("enrichStatus: error populating mentions for status %s: %w", uri, err) +	} + +	// TODO: populateStatusTags() + +	// Ensure the status' media attachments are populated, passing in existing to check for changes. +	if err := d.fetchStatusAttachments(ctx, tsport, status, latestStatus); err != nil { +		return nil, nil, fmt.Errorf("enrichStatus: error populating attachments for status %s: %w", uri, err) +	} + +	// Ensure the status' emoji attachments are populated, passing in existing to check for changes. +	if err := d.fetchStatusEmojis(ctx, requestUser, status, latestStatus); err != nil { +		return nil, nil, fmt.Errorf("enrichStatus: error populating emojis for status %s: %w", uri, err) +	} + +	if status.CreatedAt.IsZero() { +		// CreatedAt will be zero if no local copy was +		// found in one of the GetStatusBy___() functions. +		// +		// This is new, put the status in the database. +		err := d.state.DB.PutStatus(ctx, latestStatus) + +		if errors.Is(err, db.ErrAlreadyExists) { +			// TODO: replace this quick fix with per-URI deref locks. +			latestStatus, err = d.state.DB.GetStatusByURI(ctx, latestStatus.URI) +			return latestStatus, nil, err  		} -		targetAccountURI, err := url.Parse(m.TargetAccountURI)  		if err != nil { -			log.Debugf(ctx, "error parsing mentioned account uri %s: %s", m.TargetAccountURI, err) -			continue +			return nil, nil, fmt.Errorf("enrichStatus: error putting in database: %w", err)  		} +	} else { +		// This is an existing status, update the model in the database. +		if err := d.state.DB.UpdateStatus(ctx, latestStatus); err != nil { +			return nil, nil, fmt.Errorf("enrichStatus: error updating database: %w", err) +		} +	} + +	return latestStatus, apubStatus, nil +} -		var targetAccount *gtsmodel.Account -		errs := []string{} +func (d *deref) fetchStatusMentions(ctx context.Context, requestUser string, existing *gtsmodel.Status, status *gtsmodel.Status) error { +	// Allocate new slice to take the yet-to-be created mention IDs. +	status.MentionIDs = make([]string, len(status.Mentions)) -		// check if account is in the db already -		if a, err := d.db.GetAccountByURI(ctx, targetAccountURI.String()); err != nil { -			errs = append(errs, err.Error()) -		} else { -			log.Debugf(ctx, "got target account %s with id %s through GetAccountByURI", targetAccountURI, a.ID) -			targetAccount = a -		} +	for i := range status.Mentions { +		mention := status.Mentions[i] -		if targetAccount == nil { -			// we didn't find the account in our database already -			// check if we can get the account remotely (dereference it) -			if a, err := d.GetAccountByURI(ctx, requestingUsername, targetAccountURI); err != nil { -				errs = append(errs, err.Error()) -			} else { -				log.Debugf(ctx, "got target account %s with id %s through GetRemoteAccount", targetAccountURI, a.ID) -				targetAccount = a -			} +		// Look for existing mention with target account URI first. +		existing, ok := existing.GetMentionByTargetURI(mention.TargetAccountURI) +		if ok && existing.ID != "" { +			status.Mentions[i] = existing +			status.MentionIDs[i] = existing.ID +			continue  		} -		if targetAccount == nil { -			log.Debugf(ctx, "couldn't get target account %s: %s", m.TargetAccountURI, strings.Join(errs, " : ")) +		// Ensure that mention account URI is parseable. +		accountURI, err := url.Parse(mention.TargetAccountURI) +		if err != nil { +			log.Errorf(ctx, "invalid account uri %q: %v", mention.TargetAccountURI, err)  			continue  		} -		mID, err := id.NewRandomULID() +		// Ensure we have the account of the mention target dereferenced. +		mention.TargetAccount, _, err = d.getAccountByURI(ctx, requestUser, accountURI)  		if err != nil { -			return fmt.Errorf("populateStatusMentions: error generating ulid: %s", err) +			log.Errorf(ctx, "failed to dereference account %s: %v", accountURI, err) +			continue  		} -		newMention := >smodel.Mention{ -			ID:               mID, -			StatusID:         status.ID, -			Status:           m.Status, -			CreatedAt:        status.CreatedAt, -			UpdatedAt:        status.UpdatedAt, -			OriginAccountID:  status.AccountID, -			OriginAccountURI: status.AccountURI, -			OriginAccount:    status.Account, -			TargetAccountID:  targetAccount.ID, -			TargetAccount:    targetAccount, -			NameString:       m.NameString, -			TargetAccountURI: targetAccount.URI, -			TargetAccountURL: targetAccount.URL, +		// Generate new ID according to status creation. +		mention.ID, err = id.NewULIDFromTime(status.CreatedAt) +		if err != nil { +			log.Errorf(ctx, "invalid created at date: %v", err) +			mention.ID = id.NewULID() // just use "now"  		} -		if err := d.db.PutMention(ctx, newMention); err != nil { -			return fmt.Errorf("populateStatusMentions: error creating mention: %s", err) +		// Set known further mention details. +		mention.CreatedAt = status.CreatedAt +		mention.UpdatedAt = status.UpdatedAt +		mention.OriginAccount = status.Account +		mention.OriginAccountID = status.AccountID +		mention.OriginAccountURI = status.AccountURI +		mention.TargetAccountID = mention.TargetAccount.ID +		mention.TargetAccountURI = mention.TargetAccount.URI +		mention.TargetAccountURL = mention.TargetAccount.URL +		mention.StatusID = status.ID +		mention.Status = status + +		// Place the new mention into the database. +		if err := d.state.DB.PutMention(ctx, mention); err != nil { +			return fmt.Errorf("error putting mention in database: %w", err)  		} -		mentionIDs = append(mentionIDs, newMention.ID) -		newMentions = append(newMentions, newMention) +		// Set the *new* mention and ID. +		status.Mentions[i] = mention +		status.MentionIDs[i] = mention.ID  	} -	status.MentionIDs = mentionIDs -	status.Mentions = newMentions +	for i := 0; i < len(status.MentionIDs); i++ { +		if status.MentionIDs[i] == "" { +			// This is a failed mention population, likely due +			// to invalid incoming data / now-deleted accounts. +			copy(status.Mentions[i:], status.Mentions[i+1:]) +			copy(status.MentionIDs[i:], status.MentionIDs[i+1:]) +			status.Mentions = status.Mentions[:len(status.Mentions)-1] +			status.MentionIDs = status.MentionIDs[:len(status.MentionIDs)-1] +		} +	}  	return nil  } -func (d *deref) populateStatusAttachments(ctx context.Context, status *gtsmodel.Status, requestingUsername string) error { -	// At this point we should know: -	// * the media type of the file we're looking for (a.File.ContentType) -	// * the file type (a.Type) -	// * the remote URL (a.RemoteURL) -	// This should be enough to dereference the piece of media. - -	attachmentIDs := []string{} -	attachments := []*gtsmodel.MediaAttachment{} - -	for _, a := range status.Attachments { -		a.AccountID = status.AccountID -		a.StatusID = status.ID - -		processingMedia, err := d.GetRemoteMedia(ctx, requestingUsername, a.AccountID, a.RemoteURL, &media.AdditionalMediaInfo{ -			CreatedAt:   &a.CreatedAt, -			StatusID:    &a.StatusID, -			RemoteURL:   &a.RemoteURL, -			Description: &a.Description, -			Blurhash:    &a.Blurhash, +func (d *deref) fetchStatusAttachments(ctx context.Context, tsport transport.Transport, existing *gtsmodel.Status, status *gtsmodel.Status) error { +	// Allocate new slice to take the yet-to-be fetched attachment IDs. +	status.AttachmentIDs = make([]string, len(status.Attachments)) + +	for i := range status.Attachments { +		placeholder := status.Attachments[i] + +		// Look for existing media attachment with remoet URL first. +		existing, ok := existing.GetAttachmentByRemoteURL(placeholder.RemoteURL) +		if ok && existing.ID != "" { +			status.Attachments[i] = existing +			status.AttachmentIDs[i] = existing.ID +			continue +		} + +		// Ensure a valid media attachment remote URL. +		remoteURL, err := url.Parse(placeholder.RemoteURL) +		if err != nil { +			log.Errorf(ctx, "invalid remote media url %q: %v", placeholder.RemoteURL, err) +			continue +		} + +		// Start pre-processing remote media at remote URL. +		processing, err := d.mediaManager.PreProcessMedia(ctx, func(ctx context.Context) (io.ReadCloser, int64, error) { +			return tsport.DereferenceMedia(ctx, remoteURL) +		}, nil, status.AccountID, &media.AdditionalMediaInfo{ +			StatusID:    &status.ID, +			RemoteURL:   &placeholder.RemoteURL, +			Description: &placeholder.Description, +			Blurhash:    &placeholder.Blurhash,  		})  		if err != nil { -			log.Errorf(ctx, "couldn't get remote media %s: %s", a.RemoteURL, err) +			log.Errorf(ctx, "error processing attachment: %v", err)  			continue  		} -		attachment, err := processingMedia.LoadAttachment(ctx) +		// Force attachment loading *right now*. +		media, err := processing.LoadAttachment(ctx)  		if err != nil { -			log.Errorf(ctx, "couldn't load remote attachment %s: %s", a.RemoteURL, err) +			log.Errorf(ctx, "error loading attachment: %v", err)  			continue  		} -		attachmentIDs = append(attachmentIDs, attachment.ID) -		attachments = append(attachments, attachment) +		// Set the *new* attachment and ID. +		status.Attachments[i] = media +		status.AttachmentIDs[i] = media.ID  	} -	status.AttachmentIDs = attachmentIDs -	status.Attachments = attachments +	for i := 0; i < len(status.AttachmentIDs); i++ { +		if status.AttachmentIDs[i] == "" { +			// This is a failed attachment population, this may +			// be due to us not currently supporting a media type. +			copy(status.Attachments[i:], status.Attachments[i+1:]) +			copy(status.AttachmentIDs[i:], status.AttachmentIDs[i+1:]) +			status.Attachments = status.Attachments[:len(status.Attachments)-1] +			status.AttachmentIDs = status.AttachmentIDs[:len(status.AttachmentIDs)-1] +		} +	}  	return nil  } -func (d *deref) populateStatusEmojis(ctx context.Context, status *gtsmodel.Status, requestingUsername string) error { -	emojis, err := d.populateEmojis(ctx, status.Emojis, requestingUsername) +func (d *deref) fetchStatusEmojis(ctx context.Context, requestUser string, existing *gtsmodel.Status, status *gtsmodel.Status) error { +	// Fetch the full-fleshed-out emoji objects for our status. +	emojis, err := d.populateEmojis(ctx, status.Emojis, requestUser)  	if err != nil { -		return err +		return fmt.Errorf("failed to populate emojis: %w", err)  	} +	// Iterate over and get their IDs.  	emojiIDs := make([]string, 0, len(emojis))  	for _, e := range emojis {  		emojiIDs = append(emojiIDs, e.ID)  	} +	// Set known emoji details.  	status.Emojis = emojis  	status.EmojiIDs = emojiIDs -	return nil -} - -func (d *deref) populateStatusRepliedTo(ctx context.Context, status *gtsmodel.Status, requestingUsername string) error { -	if status.InReplyToURI != "" && status.InReplyToID == "" { -		statusURI, err := url.Parse(status.InReplyToURI) -		if err != nil { -			return err -		} - -		replyToStatus, _, err := d.GetStatus(ctx, requestingUsername, statusURI, false, false) -		if err != nil { -			return fmt.Errorf("populateStatusRepliedTo: couldn't get reply to status with uri %s: %s", status.InReplyToURI, err) -		} - -		// we have the status -		status.InReplyToID = replyToStatus.ID -		status.InReplyTo = replyToStatus -		status.InReplyToAccountID = replyToStatus.AccountID -		status.InReplyToAccount = replyToStatus.Account -	}  	return nil  } diff --git a/internal/federation/dereferencing/status_test.go b/internal/federation/dereferencing/status_test.go index c4299b1a3..9ec77fbcc 100644 --- a/internal/federation/dereferencing/status_test.go +++ b/internal/federation/dereferencing/status_test.go @@ -36,7 +36,7 @@ func (suite *StatusTestSuite) TestDereferenceSimpleStatus() {  	fetchingAccount := suite.testAccounts["local_account_1"]  	statusURL := testrig.URLMustParse("https://unknown-instance.com/users/brand_new_person/statuses/01FE4NTHKWW7THT67EF10EB839") -	status, _, err := suite.dereferencer.GetStatus(context.Background(), fetchingAccount.Username, statusURL, false, false) +	status, _, err := suite.dereferencer.GetStatusByURI(context.Background(), fetchingAccount.Username, statusURL)  	suite.NoError(err)  	suite.NotNil(status) @@ -76,7 +76,7 @@ func (suite *StatusTestSuite) TestDereferenceStatusWithMention() {  	fetchingAccount := suite.testAccounts["local_account_1"]  	statusURL := testrig.URLMustParse("https://unknown-instance.com/users/brand_new_person/statuses/01FE5Y30E3W4P7TRE0R98KAYQV") -	status, _, err := suite.dereferencer.GetStatus(context.Background(), fetchingAccount.Username, statusURL, false, false) +	status, _, err := suite.dereferencer.GetStatusByURI(context.Background(), fetchingAccount.Username, statusURL)  	suite.NoError(err)  	suite.NotNil(status) @@ -127,7 +127,7 @@ func (suite *StatusTestSuite) TestDereferenceStatusWithImageAndNoContent() {  	fetchingAccount := suite.testAccounts["local_account_1"]  	statusURL := testrig.URLMustParse("https://turnip.farm/users/turniplover6969/statuses/70c53e54-3146-42d5-a630-83c8b6c7c042") -	status, _, err := suite.dereferencer.GetStatus(context.Background(), fetchingAccount.Username, statusURL, false, false) +	status, _, err := suite.dereferencer.GetStatusByURI(context.Background(), fetchingAccount.Username, statusURL)  	suite.NoError(err)  	suite.NotNil(status) diff --git a/internal/federation/dereferencing/thread.go b/internal/federation/dereferencing/thread.go index cf6048689..b516f837b 100644 --- a/internal/federation/dereferencing/thread.go +++ b/internal/federation/dereferencing/thread.go @@ -35,34 +35,16 @@ import (  // ancesters we are willing to follow before returning error.  const maxIter = 1000 -// DereferenceThread takes a statusable (something that has withReplies and withInReplyTo), -// and dereferences statusables in the conversation. -// -// This process involves working up and down the chain of replies, and parsing through the collections of IDs -// presented by remote instances as part of their replies collections, and will likely involve making several calls to -// multiple different hosts. -// -// This does not return error, as for robustness we do not want to error-out on a status because another further up / down has issues. -func (d *deref) DereferenceThread(ctx context.Context, username string, statusIRI *url.URL, status *gtsmodel.Status, statusable ap.Statusable) { -	l := log.WithContext(ctx). -		WithFields(kv.Fields{ -			{"username", username}, -			{"statusIRI", status.URI}, -		}...) - -	// Log function start -	l.Trace("beginning") - +// dereferenceThread will dereference statuses both above and below the given status in a thread, it returns no error and is intended to be called asychronously. +func (d *deref) dereferenceThread(ctx context.Context, username string, statusIRI *url.URL, status *gtsmodel.Status, statusable ap.Statusable) {  	// Ensure that ancestors have been fully dereferenced  	if err := d.dereferenceStatusAncestors(ctx, username, status); err != nil { -		l.Errorf("error dereferencing status ancestors: %v", err) -		// we don't return error, we have deref'd as much as we can +		log.Errorf(ctx, "error dereferencing status ancestors: %v", err)  	}  	// Ensure that descendants have been fully dereferenced  	if err := d.dereferenceStatusDescendants(ctx, username, statusIRI, statusable); err != nil { -		l.Errorf("error dereferencing status descendants: %v", err) -		// we don't return error, we have deref'd as much as we can +		log.Errorf(ctx, "error dereferencing status descendants: %v", err)  	}  } @@ -103,7 +85,7 @@ func (d *deref) dereferenceStatusAncestors(ctx context.Context, username string,  			}  			// Fetch this status from the database -			localStatus, err := d.db.GetStatusByID(ctx, id) +			localStatus, err := d.state.DB.GetStatusByID(ctx, id)  			if err != nil {  				return fmt.Errorf("error fetching local status %q: %w", id, err)  			} @@ -115,7 +97,10 @@ func (d *deref) dereferenceStatusAncestors(ctx context.Context, username string,  			l.Tracef("following remote status ancestors: %s", status.InReplyToURI)  			// Fetch the remote status found at this IRI -			remoteStatus, _, err := d.GetStatus(ctx, username, replyIRI, false, false) +			remoteStatus, _, err := d.getStatusByURI(ctx, +				username, +				replyIRI, +			)  			if err != nil {  				return fmt.Errorf("error fetching remote status %q: %w", status.InReplyToURI, err)  			} @@ -277,10 +262,15 @@ stackLoop:  					continue itemLoop  				} -				// Dereference the remote status and store in the database -				_, statusable, err := d.GetStatus(ctx, username, itemIRI, true, false) +				// Dereference the remote status and store in the database. +				_, statusable, err := d.getStatusByURI(ctx, username, itemIRI)  				if err != nil { -					l.Errorf("error dereferencing remote status %q: %s", itemIRI.String(), err) +					l.Errorf("error dereferencing remote status %s: %v", itemIRI, err) +					continue itemLoop +				} + +				if statusable == nil { +					// Already up-to-date.  					continue itemLoop  				} @@ -307,7 +297,10 @@ stackLoop:  			}  			// Dereference this next collection page by its IRI -			collectionPage, err := d.DereferenceCollectionPage(ctx, username, pageNextIRI) +			collectionPage, err := d.dereferenceCollectionPage(ctx, +				username, +				pageNextIRI, +			)  			if err != nil {  				l.Errorf("error dereferencing remote collection page %q: %s", pageNextIRI.String(), err)  				continue stackLoop diff --git a/internal/federation/federatingactor_test.go b/internal/federation/federatingactor_test.go index da764ba68..604f458f5 100644 --- a/internal/federation/federatingactor_test.go +++ b/internal/federation/federatingactor_test.go @@ -58,7 +58,7 @@ func (suite *FederatingActorTestSuite) TestSendNoRemoteFollowers() {  	tc := testrig.NewTestTransportController(&suite.state, httpClient)  	// setup module being tested -	federator := federation.NewFederator(suite.db, testrig.NewTestFederatingDB(&suite.state), tc, suite.tc, testrig.NewTestMediaManager(&suite.state)) +	federator := federation.NewFederator(&suite.state, testrig.NewTestFederatingDB(&suite.state), tc, suite.tc, testrig.NewTestMediaManager(&suite.state))  	activity, err := federator.FederatingActor().Send(ctx, testrig.URLMustParse(testAccount.OutboxURI), testActivity)  	suite.NoError(err) @@ -103,7 +103,7 @@ func (suite *FederatingActorTestSuite) TestSendRemoteFollower() {  	httpClient := testrig.NewMockHTTPClient(nil, "../../testrig/media")  	tc := testrig.NewTestTransportController(&suite.state, httpClient)  	// setup module being tested -	federator := federation.NewFederator(suite.db, testrig.NewTestFederatingDB(&suite.state), tc, suite.tc, testrig.NewTestMediaManager(&suite.state)) +	federator := federation.NewFederator(&suite.state, testrig.NewTestFederatingDB(&suite.state), tc, suite.tc, testrig.NewTestMediaManager(&suite.state))  	activity, err := federator.FederatingActor().Send(ctx, testrig.URLMustParse(testAccount.OutboxURI), testActivity)  	suite.NoError(err) diff --git a/internal/federation/federatingdb/create.go b/internal/federation/federatingdb/create.go index b14a0597b..f68279d37 100644 --- a/internal/federation/federatingdb/create.go +++ b/internal/federation/federatingdb/create.go @@ -205,6 +205,7 @@ func (f *federatingDB) createNote(ctx context.Context, note vocab.ActivityStream  			APObjectType:     ap.ObjectNote,  			APActivityType:   ap.ActivityCreate,  			APIri:            id.GetIRI(), +			APObjectModel:    nil,  			GTSModel:         nil,  			ReceivingAccount: receivingAccount,  		}) @@ -238,6 +239,7 @@ func (f *federatingDB) createNote(ctx context.Context, note vocab.ActivityStream  	f.state.Workers.EnqueueFederator(ctx, messages.FromFederator{  		APObjectType:     ap.ObjectNote,  		APActivityType:   ap.ActivityCreate, +		APObjectModel:    note,  		GTSModel:         status,  		ReceivingAccount: receivingAccount,  	}) diff --git a/internal/federation/federatingprotocol.go b/internal/federation/federatingprotocol.go index e81a409f0..d7e598edc 100644 --- a/internal/federation/federatingprotocol.go +++ b/internal/federation/federatingprotocol.go @@ -210,9 +210,7 @@ func (f *federator) AuthenticatePostInbox(ctx context.Context, w http.ResponseWr  	// We know the public key owner URI now, so we can  	// dereference the remote account (or just get it  	// from the db if we already have it). -	requestingAccount, err := f.GetAccountByURI( -		gtscontext.SetFastFail(ctx), username, publicKeyOwnerURI, -	) +	requestingAccount, _, err := f.GetAccountByURI(gtscontext.SetFastFail(ctx), username, publicKeyOwnerURI)  	if err != nil {  		if gtserror.StatusCode(err) == http.StatusGone {  			// This is the same case as the http.StatusGone check above. diff --git a/internal/federation/federatingprotocol_test.go b/internal/federation/federatingprotocol_test.go index bc1495726..3350099b0 100644 --- a/internal/federation/federatingprotocol_test.go +++ b/internal/federation/federatingprotocol_test.go @@ -43,7 +43,7 @@ func (suite *FederatingProtocolTestSuite) TestPostInboxRequestBodyHook1() {  	httpClient := testrig.NewMockHTTPClient(nil, "../../testrig/media")  	tc := testrig.NewTestTransportController(&suite.state, httpClient)  	// setup module being tested -	federator := federation.NewFederator(suite.db, testrig.NewTestFederatingDB(&suite.state), tc, suite.tc, testrig.NewTestMediaManager(&suite.state)) +	federator := federation.NewFederator(&suite.state, testrig.NewTestFederatingDB(&suite.state), tc, suite.tc, testrig.NewTestMediaManager(&suite.state))  	// setup request  	ctx := context.Background() @@ -73,7 +73,7 @@ func (suite *FederatingProtocolTestSuite) TestPostInboxRequestBodyHook2() {  	tc := testrig.NewTestTransportController(&suite.state, httpClient)  	// setup module being tested -	federator := federation.NewFederator(suite.db, testrig.NewTestFederatingDB(&suite.state), tc, suite.tc, testrig.NewTestMediaManager(&suite.state)) +	federator := federation.NewFederator(&suite.state, testrig.NewTestFederatingDB(&suite.state), tc, suite.tc, testrig.NewTestMediaManager(&suite.state))  	// setup request  	ctx := context.Background() @@ -104,7 +104,7 @@ func (suite *FederatingProtocolTestSuite) TestPostInboxRequestBodyHook3() {  	tc := testrig.NewTestTransportController(&suite.state, httpClient)  	// setup module being tested -	federator := federation.NewFederator(suite.db, testrig.NewTestFederatingDB(&suite.state), tc, suite.tc, testrig.NewTestMediaManager(&suite.state)) +	federator := federation.NewFederator(&suite.state, testrig.NewTestFederatingDB(&suite.state), tc, suite.tc, testrig.NewTestMediaManager(&suite.state))  	// setup request  	ctx := context.Background() @@ -137,7 +137,7 @@ func (suite *FederatingProtocolTestSuite) TestAuthenticatePostInbox() {  	tc := testrig.NewTestTransportController(&suite.state, httpClient)  	// now setup module being tested, with the mock transport controller -	federator := federation.NewFederator(suite.db, testrig.NewTestFederatingDB(&suite.state), tc, suite.tc, testrig.NewTestMediaManager(&suite.state)) +	federator := federation.NewFederator(&suite.state, testrig.NewTestFederatingDB(&suite.state), tc, suite.tc, testrig.NewTestMediaManager(&suite.state))  	request := httptest.NewRequest(http.MethodPost, "http://localhost:8080/users/the_mighty_zork/inbox", nil)  	// we need these headers for the request to be validated @@ -180,7 +180,7 @@ func (suite *FederatingProtocolTestSuite) TestAuthenticatePostGone() {  	tc := testrig.NewTestTransportController(&suite.state, httpClient)  	// now setup module being tested, with the mock transport controller -	federator := federation.NewFederator(suite.db, testrig.NewTestFederatingDB(&suite.state), tc, suite.tc, testrig.NewTestMediaManager(&suite.state)) +	federator := federation.NewFederator(&suite.state, testrig.NewTestFederatingDB(&suite.state), tc, suite.tc, testrig.NewTestMediaManager(&suite.state))  	request := httptest.NewRequest(http.MethodPost, "http://localhost:8080/users/the_mighty_zork/inbox", nil)  	// we need these headers for the request to be validated @@ -222,7 +222,7 @@ func (suite *FederatingProtocolTestSuite) TestAuthenticatePostGoneNoTombstoneYet  	tc := testrig.NewTestTransportController(&suite.state, httpClient)  	// now setup module being tested, with the mock transport controller -	federator := federation.NewFederator(suite.db, testrig.NewTestFederatingDB(&suite.state), tc, suite.tc, testrig.NewTestMediaManager(&suite.state)) +	federator := federation.NewFederator(&suite.state, testrig.NewTestFederatingDB(&suite.state), tc, suite.tc, testrig.NewTestMediaManager(&suite.state))  	request := httptest.NewRequest(http.MethodPost, "http://localhost:8080/users/the_mighty_zork/inbox", nil)  	// we need these headers for the request to be validated @@ -258,7 +258,7 @@ func (suite *FederatingProtocolTestSuite) TestAuthenticatePostGoneNoTombstoneYet  func (suite *FederatingProtocolTestSuite) TestBlocked1() {  	httpClient := testrig.NewMockHTTPClient(nil, "../../testrig/media")  	tc := testrig.NewTestTransportController(&suite.state, httpClient) -	federator := federation.NewFederator(suite.db, testrig.NewTestFederatingDB(&suite.state), tc, suite.tc, testrig.NewTestMediaManager(&suite.state)) +	federator := federation.NewFederator(&suite.state, testrig.NewTestFederatingDB(&suite.state), tc, suite.tc, testrig.NewTestMediaManager(&suite.state))  	sendingAccount := suite.testAccounts["remote_account_1"]  	inboxAccount := suite.testAccounts["local_account_1"] @@ -280,7 +280,7 @@ func (suite *FederatingProtocolTestSuite) TestBlocked1() {  func (suite *FederatingProtocolTestSuite) TestBlocked2() {  	httpClient := testrig.NewMockHTTPClient(nil, "../../testrig/media")  	tc := testrig.NewTestTransportController(&suite.state, httpClient) -	federator := federation.NewFederator(suite.db, testrig.NewTestFederatingDB(&suite.state), tc, suite.tc, testrig.NewTestMediaManager(&suite.state)) +	federator := federation.NewFederator(&suite.state, testrig.NewTestFederatingDB(&suite.state), tc, suite.tc, testrig.NewTestMediaManager(&suite.state))  	sendingAccount := suite.testAccounts["remote_account_1"]  	inboxAccount := suite.testAccounts["local_account_1"] @@ -313,7 +313,7 @@ func (suite *FederatingProtocolTestSuite) TestBlocked2() {  func (suite *FederatingProtocolTestSuite) TestBlocked3() {  	httpClient := testrig.NewMockHTTPClient(nil, "../../testrig/media")  	tc := testrig.NewTestTransportController(&suite.state, httpClient) -	federator := federation.NewFederator(suite.db, testrig.NewTestFederatingDB(&suite.state), tc, suite.tc, testrig.NewTestMediaManager(&suite.state)) +	federator := federation.NewFederator(&suite.state, testrig.NewTestFederatingDB(&suite.state), tc, suite.tc, testrig.NewTestMediaManager(&suite.state))  	sendingAccount := suite.testAccounts["remote_account_1"]  	inboxAccount := suite.testAccounts["local_account_1"] @@ -349,7 +349,7 @@ func (suite *FederatingProtocolTestSuite) TestBlocked3() {  func (suite *FederatingProtocolTestSuite) TestBlocked4() {  	httpClient := testrig.NewMockHTTPClient(nil, "../../testrig/media")  	tc := testrig.NewTestTransportController(&suite.state, httpClient) -	federator := federation.NewFederator(suite.db, testrig.NewTestFederatingDB(&suite.state), tc, suite.tc, testrig.NewTestMediaManager(&suite.state)) +	federator := federation.NewFederator(&suite.state, testrig.NewTestFederatingDB(&suite.state), tc, suite.tc, testrig.NewTestMediaManager(&suite.state))  	sendingAccount := suite.testAccounts["remote_account_1"]  	inboxAccount := suite.testAccounts["local_account_1"] diff --git a/internal/federation/federator.go b/internal/federation/federator.go index ce0161f0a..c5aed6e62 100644 --- a/internal/federation/federator.go +++ b/internal/federation/federator.go @@ -27,6 +27,7 @@ import (  	"github.com/superseriousbusiness/gotosocial/internal/federation/federatingdb"  	"github.com/superseriousbusiness/gotosocial/internal/gtserror"  	"github.com/superseriousbusiness/gotosocial/internal/media" +	"github.com/superseriousbusiness/gotosocial/internal/state"  	"github.com/superseriousbusiness/gotosocial/internal/transport"  	"github.com/superseriousbusiness/gotosocial/internal/typeutils"  ) @@ -67,12 +68,12 @@ type federator struct {  }  // NewFederator returns a new federator -func NewFederator(db db.DB, federatingDB federatingdb.DB, transportController transport.Controller, typeConverter typeutils.TypeConverter, mediaManager media.Manager) Federator { -	dereferencer := dereferencing.NewDereferencer(db, typeConverter, transportController, mediaManager) +func NewFederator(state *state.State, federatingDB federatingdb.DB, transportController transport.Controller, typeConverter typeutils.TypeConverter, mediaManager media.Manager) Federator { +	dereferencer := dereferencing.NewDereferencer(state, typeConverter, transportController, mediaManager)  	clock := &Clock{}  	f := &federator{ -		db:                  db, +		db:                  state.DB,  		federatingDB:        federatingDB,  		clock:               &Clock{},  		typeConverter:       typeConverter, diff --git a/internal/gtsmodel/status.go b/internal/gtsmodel/status.go index d04deecef..393bb1ac7 100644 --- a/internal/gtsmodel/status.go +++ b/internal/gtsmodel/status.go @@ -28,6 +28,7 @@ type Status struct {  	ID                       string             `validate:"required,ulid" bun:"type:CHAR(26),pk,nullzero,notnull,unique"`                              // id of this item in the database  	CreatedAt                time.Time          `validate:"-" bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"`                       // when was item created  	UpdatedAt                time.Time          `validate:"-" bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"`                       // when was item last updated +	FetchedAt                time.Time          `validate:"required_with=!Local" bun:"type:timestamptz,nullzero"`                                      // when was item (remote) last fetched.  	PinnedAt                 time.Time          `validate:"-" bun:"type:timestamptz,nullzero"`                                                         // Status was pinned by owning account at this time.  	URI                      string             `validate:"required,url" bun:",unique,nullzero,notnull"`                                               // activitypub URI of this status  	URL                      string             `validate:"url" bun:",nullzero"`                                                                       // web url for viewing this status @@ -87,24 +88,43 @@ func (s *Status) GetBoostOfAccountID() string {  	return s.BoostOfAccountID  } +func (s *Status) GetAttachmentByID(id string) (*MediaAttachment, bool) { +	for _, media := range s.Attachments { +		if media == nil { +			log.Warnf(nil, "nil attachment in slice for status %s", s.URI) +			continue +		} +		if media.ID == id { +			return media, true +		} +	} +	return nil, false +} + +func (s *Status) GetAttachmentByRemoteURL(url string) (*MediaAttachment, bool) { +	for _, media := range s.Attachments { +		if media == nil { +			log.Warnf(nil, "nil attachment in slice for status %s", s.URI) +			continue +		} +		if media.RemoteURL == url { +			return media, true +		} +	} +	return nil, false +} +  // AttachmentsPopulated returns whether media attachments are populated according to current AttachmentIDs.  func (s *Status) AttachmentsPopulated() bool {  	if len(s.AttachmentIDs) != len(s.Attachments) {  		// this is the quickest indicator.  		return false  	} - -	// Attachments must be in same order. -	for i, id := range s.AttachmentIDs { -		if s.Attachments[i] == nil { -			log.Warnf(nil, "nil attachment in slice for status %s", s.URI) -			continue -		} -		if s.Attachments[i].ID != id { +	for _, id := range s.AttachmentIDs { +		if _, ok := s.GetAttachmentByID(id); !ok {  			return false  		}  	} -  	return true  } @@ -129,24 +149,43 @@ func (s *Status) TagsPopulated() bool {  	return true  } +func (s *Status) GetMentionByID(id string) (*Mention, bool) { +	for _, mention := range s.Mentions { +		if mention == nil { +			log.Warnf(nil, "nil mention in slice for status %s", s.URI) +			continue +		} +		if mention.ID == id { +			return mention, true +		} +	} +	return nil, false +} + +func (s *Status) GetMentionByTargetURI(uri string) (*Mention, bool) { +	for _, mention := range s.Mentions { +		if mention == nil { +			log.Warnf(nil, "nil mention in slice for status %s", s.URI) +			continue +		} +		if mention.TargetAccountURI == uri { +			return mention, true +		} +	} +	return nil, false +} +  // MentionsPopulated returns whether mentions are populated according to current MentionIDs.  func (s *Status) MentionsPopulated() bool {  	if len(s.MentionIDs) != len(s.Mentions) {  		// this is the quickest indicator.  		return false  	} - -	// Mentions must be in same order. -	for i, id := range s.MentionIDs { -		if s.Mentions[i] == nil { -			log.Warnf(nil, "nil mention in slice for status %s", s.URI) -			continue -		} -		if s.Mentions[i].ID != id { +	for _, id := range s.MentionIDs { +		if _, ok := s.GetMentionByID(id); !ok {  			return false  		}  	} -  	return true  } @@ -171,6 +210,36 @@ func (s *Status) EmojisPopulated() bool {  	return true  } +// EmojissUpToDate returns whether status emoji attachments of receiving status are up-to-date +// according to emoji attachments of the passed status, by comparing their emoji URIs. We don't +// use IDs as this is used to determine whether there are new emojis to fetch. +func (s *Status) EmojisUpToDate(other *Status) bool { +	if len(s.Emojis) != len(other.Emojis) { +		// this is the quickest indicator. +		return false +	} + +	// Emojis must be in same order. +	for i := range s.Emojis { +		if s.Emojis[i] == nil { +			log.Warnf(nil, "nil emoji in slice for status %s", s.URI) +			return false +		} + +		if other.Emojis[i] == nil { +			log.Warnf(nil, "nil emoji in slice for status %s", other.URI) +			return false +		} + +		if s.Emojis[i].URI != other.Emojis[i].URI { +			// Emoji URI has changed, not up-to-date! +			return false +		} +	} + +	return true +} +  // MentionsAccount returns whether status mentions the given account ID.  func (s *Status) MentionsAccount(id string) bool {  	for _, mention := range s.Mentions { diff --git a/internal/media/processingemoji.go b/internal/media/processingemoji.go index add2ae18b..b2da54289 100644 --- a/internal/media/processingemoji.go +++ b/internal/media/processingemoji.go @@ -123,7 +123,6 @@ func (p *ProcessingEmoji) load(ctx context.Context) (*gtsmodel.Emoji, bool, erro  		if p.refresh {  			columns := []string{ -				"updated_at",  				"image_remote_url",  				"image_static_remote_url",  				"image_url", diff --git a/internal/media/processingmedia.go b/internal/media/processingmedia.go index ea92c8cc6..fc75c3136 100644 --- a/internal/media/processingmedia.go +++ b/internal/media/processingmedia.go @@ -245,8 +245,10 @@ func (p *ProcessingMedia) store(ctx context.Context) error {  		info.Extension,  	)  	p.media.File.ContentType = info.MIME.Value -	cached := true -	p.media.Cached = &cached +	p.media.Cached = func() *bool { +		ok := true +		return &ok +	}()  	return nil  } diff --git a/internal/processing/account/get.go b/internal/processing/account/get.go index c7d271b0a..ddb7c14e0 100644 --- a/internal/processing/account/get.go +++ b/internal/processing/account/get.go @@ -28,16 +28,17 @@ import (  	"github.com/superseriousbusiness/gotosocial/internal/gtscontext"  	"github.com/superseriousbusiness/gotosocial/internal/gtserror"  	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +	"github.com/superseriousbusiness/gotosocial/internal/log"  )  // Get processes the given request for account information.  func (p *Processor) Get(ctx context.Context, requestingAccount *gtsmodel.Account, targetAccountID string) (*apimodel.Account, gtserror.WithCode) {  	targetAccount, err := p.state.DB.GetAccountByID(ctx, targetAccountID)  	if err != nil { -		if err == db.ErrNoEntries { +		if errors.Is(err, db.ErrNoEntries) {  			return nil, gtserror.NewErrorNotFound(errors.New("account not found"))  		} -		return nil, gtserror.NewErrorInternalError(fmt.Errorf("db error: %s", err)) +		return nil, gtserror.NewErrorInternalError(fmt.Errorf("db error: %w", err))  	}  	return p.getFor(ctx, requestingAccount, targetAccount) @@ -47,10 +48,10 @@ func (p *Processor) Get(ctx context.Context, requestingAccount *gtsmodel.Account  func (p *Processor) GetLocalByUsername(ctx context.Context, requestingAccount *gtsmodel.Account, username string) (*apimodel.Account, gtserror.WithCode) {  	targetAccount, err := p.state.DB.GetAccountByUsernameDomain(ctx, username, "")  	if err != nil { -		if err == db.ErrNoEntries { +		if errors.Is(err, db.ErrNoEntries) {  			return nil, gtserror.NewErrorNotFound(errors.New("account not found"))  		} -		return nil, gtserror.NewErrorInternalError(fmt.Errorf("db error: %s", err)) +		return nil, gtserror.NewErrorInternalError(fmt.Errorf("db error: %w", err))  	}  	return p.getFor(ctx, requestingAccount, targetAccount) @@ -60,56 +61,58 @@ func (p *Processor) GetLocalByUsername(ctx context.Context, requestingAccount *g  func (p *Processor) GetCustomCSSForUsername(ctx context.Context, username string) (string, gtserror.WithCode) {  	customCSS, err := p.state.DB.GetAccountCustomCSSByUsername(ctx, username)  	if err != nil { -		if err == db.ErrNoEntries { +		if errors.Is(err, db.ErrNoEntries) {  			return "", gtserror.NewErrorNotFound(errors.New("account not found"))  		} -		return "", gtserror.NewErrorInternalError(fmt.Errorf("db error: %s", err)) +		return "", gtserror.NewErrorInternalError(fmt.Errorf("db error: %w", err))  	}  	return customCSS, nil  }  func (p *Processor) getFor(ctx context.Context, requestingAccount *gtsmodel.Account, targetAccount *gtsmodel.Account) (*apimodel.Account, gtserror.WithCode) { -	var blocked bool  	var err error +  	if requestingAccount != nil { -		blocked, err = p.state.DB.IsEitherBlocked(ctx, requestingAccount.ID, targetAccount.ID) +		blocked, err := p.state.DB.IsEitherBlocked(ctx, requestingAccount.ID, targetAccount.ID)  		if err != nil { -			return nil, gtserror.NewErrorInternalError(fmt.Errorf("error checking account block: %s", err)) +			return nil, gtserror.NewErrorInternalError(fmt.Errorf("error checking account block: %w", err))  		} -	} -	var apiAccount *apimodel.Account -	if blocked { -		apiAccount, err = p.tc.AccountToAPIAccountBlocked(ctx, targetAccount) -		if err != nil { -			return nil, gtserror.NewErrorInternalError(fmt.Errorf("error converting account: %s", err)) +		if blocked { +			apiAccount, err := p.tc.AccountToAPIAccountBlocked(ctx, targetAccount) +			if err != nil { +				return nil, gtserror.NewErrorInternalError(fmt.Errorf("error converting account: %w", err)) +			} +			return apiAccount, nil  		} -		return apiAccount, nil  	} -	// last-minute check to make sure we have remote account header/avi cached  	if targetAccount.Domain != "" {  		targetAccountURI, err := url.Parse(targetAccount.URI)  		if err != nil { -			return nil, gtserror.NewErrorInternalError(fmt.Errorf("error parsing url %s: %s", targetAccount.URI, err)) +			return nil, gtserror.NewErrorInternalError(fmt.Errorf("error parsing url %s: %w", targetAccount.URI, err))  		} -		a, err := p.federator.GetAccountByURI( -			gtscontext.SetFastFail(ctx), requestingAccount.Username, targetAccountURI, -		) -		if err == nil { -			targetAccount = a +		// Perform a last-minute fetch of target account to ensure remote account header / avatar is cached. +		latest, _, err := p.federator.GetAccountByURI(gtscontext.SetFastFail(ctx), requestingAccount.Username, targetAccountURI) +		if err != nil { +			log.Errorf(ctx, "error fetching latest target account: %v", err) +		} else { +			// Use latest account model. +			targetAccount = latest  		}  	} +	var apiAccount *apimodel.Account +  	if requestingAccount != nil && targetAccount.ID == requestingAccount.ID {  		apiAccount, err = p.tc.AccountToAPIAccountSensitive(ctx, targetAccount)  	} else {  		apiAccount, err = p.tc.AccountToAPIAccountPublic(ctx, targetAccount)  	}  	if err != nil { -		return nil, gtserror.NewErrorInternalError(fmt.Errorf("error converting account: %s", err)) +		return nil, gtserror.NewErrorInternalError(fmt.Errorf("error converting account: %w", err))  	}  	return apiAccount, nil diff --git a/internal/processing/admin/emoji.go b/internal/processing/admin/emoji.go index 0fbeee4c2..acea85116 100644 --- a/internal/processing/admin/emoji.go +++ b/internal/processing/admin/emoji.go @@ -385,7 +385,7 @@ func (p *Processor) emojiUpdateDisable(ctx context.Context, emoji *gtsmodel.Emoj  	emojiDisabled := true  	emoji.Disabled = &emojiDisabled -	updatedEmoji, err := p.state.DB.UpdateEmoji(ctx, emoji, "updated_at", "disabled") +	updatedEmoji, err := p.state.DB.UpdateEmoji(ctx, emoji, "disabled")  	if err != nil {  		err = fmt.Errorf("emojiUpdateDisable: error updating emoji %s: %s", emoji.ID, err)  		return nil, gtserror.NewErrorInternalError(err) @@ -434,7 +434,7 @@ func (p *Processor) emojiUpdateModify(ctx context.Context, emoji *gtsmodel.Emoji  	if !updateImage {  		// only updating fields, we only need  		// to do a database update for this -		columns := []string{"updated_at"} +		var columns []string  		if updateCategoryID {  			emoji.CategoryID = updatedCategoryID diff --git a/internal/processing/fedi/common.go b/internal/processing/fedi/common.go index 093a9d761..68a84f303 100644 --- a/internal/processing/fedi/common.go +++ b/internal/processing/fedi/common.go @@ -40,7 +40,7 @@ func (p *Processor) authenticate(ctx context.Context, requestedUsername string)  		return  	} -	if requestingAccount, err = p.federator.GetAccountByURI(gtscontext.SetFastFail(ctx), requestedUsername, requestingAccountURI); err != nil { +	if requestingAccount, _, err = p.federator.GetAccountByURI(gtscontext.SetFastFail(ctx), requestedUsername, requestingAccountURI); err != nil {  		errWithCode = gtserror.NewErrorUnauthorized(err)  		return  	} diff --git a/internal/processing/fedi/user.go b/internal/processing/fedi/user.go index b78f6de9d..4ec780a87 100644 --- a/internal/processing/fedi/user.go +++ b/internal/processing/fedi/user.go @@ -55,9 +55,7 @@ func (p *Processor) UserGet(ctx context.Context, requestedUsername string, reque  		// if we're not already handshaking/dereferencing a remote account, dereference it now  		if !p.federator.Handshaking(requestedUsername, requestingAccountURI) { -			requestingAccount, err := p.federator.GetAccountByURI( -				gtscontext.SetFastFail(ctx), requestedUsername, requestingAccountURI, -			) +			requestingAccount, _, err := p.federator.GetAccountByURI(gtscontext.SetFastFail(ctx), requestedUsername, requestingAccountURI)  			if err != nil {  				return nil, gtserror.NewErrorUnauthorized(err)  			} diff --git a/internal/processing/fromfederator.go b/internal/processing/fromfederator.go index 82e08060c..eccdbb894 100644 --- a/internal/processing/fromfederator.go +++ b/internal/processing/fromfederator.go @@ -110,17 +110,30 @@ func (p *Processor) ProcessFromFederator(ctx context.Context, federatorMsg messa  func (p *Processor) processCreateStatusFromFederator(ctx context.Context, federatorMsg messages.FromFederator) error {  	// check for either an IRI that we still need to dereference, OR an already dereferenced  	// and converted status pinned to the message. -	var status *gtsmodel.Status +	var ( +		status *gtsmodel.Status +		err    error +	)  	if federatorMsg.GTSModel != nil { -		// there's a gts model already pinned to the message, it should be a status  		var ok bool + +		// there's a gts model already pinned to the message, it should be a status  		if status, ok = federatorMsg.GTSModel.(*gtsmodel.Status); !ok {  			return errors.New("ProcessFromFederator: note was not parseable as *gtsmodel.Status")  		} -		var err error -		status, err = p.federator.EnrichRemoteStatus(ctx, federatorMsg.ReceivingAccount.Username, status, true) +		// Since this was a create originating AP object +		// statusable may have been set on message (no problem if not). +		statusable, _ := federatorMsg.APObjectModel.(ap.Statusable) + +		// Call refresh on status to deref if necessary etc. +		status, _, err = p.federator.RefreshStatus(ctx, +			federatorMsg.ReceivingAccount.Username, +			status, +			statusable, +			false, +		)  		if err != nil {  			return err  		} @@ -129,38 +142,29 @@ func (p *Processor) processCreateStatusFromFederator(ctx context.Context, federa  		if federatorMsg.APIri == nil {  			return errors.New("ProcessFromFederator: status was not pinned to federatorMsg, and neither was an IRI for us to dereference")  		} -		var err error -		status, _, err = p.federator.GetStatus(ctx, federatorMsg.ReceivingAccount.Username, federatorMsg.APIri, false, false) -		if err != nil { -			return err -		} -	} -	// make sure the account is pinned -	if status.Account == nil { -		a, err := p.state.DB.GetAccountByID(ctx, status.AccountID) +		status, _, err = p.federator.GetStatusByURI(ctx, federatorMsg.ReceivingAccount.Username, federatorMsg.APIri)  		if err != nil {  			return err  		} -		status.Account = a  	} -	// Get the remote account to make sure the avi and header are cached. -	if status.Account.Domain != "" { -		remoteAccountID, err := url.Parse(status.Account.URI) +	if status.Account == nil || status.Account.IsRemote() { +		// Either no account attached yet, or a remote account. +		// Both situations we need to parse account URI to fetch it. +		remoteAccURI, err := url.Parse(status.AccountURI)  		if err != nil {  			return err  		} -		a, err := p.federator.GetAccountByURI(ctx, +		// Ensure that account for this status has been deref'd. +		status.Account, _, err = p.federator.GetAccountByURI(ctx,  			federatorMsg.ReceivingAccount.Username, -			remoteAccountID, +			remoteAccURI,  		)  		if err != nil {  			return err  		} - -		status.Account = a  	}  	if err := p.timelineAndNotifyStatus(ctx, status); err != nil { @@ -193,7 +197,7 @@ func (p *Processor) processCreateFaveFromFederator(ctx context.Context, federato  			return err  		} -		a, err := p.federator.GetAccountByURI(ctx, +		a, _, err := p.federator.GetAccountByURI(ctx,  			federatorMsg.ReceivingAccount.Username,  			remoteAccountID,  		) @@ -234,7 +238,7 @@ func (p *Processor) processCreateFollowRequestFromFederator(ctx context.Context,  			return err  		} -		a, err := p.federator.GetAccountByURI(ctx, +		a, _, err := p.federator.GetAccountByURI(ctx,  			federatorMsg.ReceivingAccount.Username,  			remoteAccountID,  		) @@ -294,7 +298,7 @@ func (p *Processor) processCreateAnnounceFromFederator(ctx context.Context, fede  			return err  		} -		a, err := p.federator.GetAccountByURI(ctx, +		a, _, err := p.federator.GetAccountByURI(ctx,  			federatorMsg.ReceivingAccount.Username,  			remoteAccountID,  		) @@ -376,11 +380,12 @@ func (p *Processor) processUpdateAccountFromFederator(ctx context.Context, feder  	}  	// Call RefreshAccount to fetch up-to-date bio, avatar, header, etc. -	updatedAccount, err := p.federator.RefreshAccount( +	updatedAccount, _, err := p.federator.RefreshAccount(  		ctx,  		federatorMsg.ReceivingAccount.Username, -		incomingAccountable,  		incomingAccount, +		incomingAccountable, +		true,  	)  	if err != nil {  		return fmt.Errorf("error enriching updated account from federator: %s", err) diff --git a/internal/processing/fromfederator_test.go b/internal/processing/fromfederator_test.go index 58d644287..a981899d2 100644 --- a/internal/processing/fromfederator_test.go +++ b/internal/processing/fromfederator_test.go @@ -142,15 +142,10 @@ func (suite *FromFederatorTestSuite) TestProcessReplyMention() {  	suite.NoError(err)  	// 2. a notification should exist for the mention -	where := []db.Where{ -		{ -			Key:   "status_id", -			Value: replyingStatus.ID, -		}, -	} - -	notif := >smodel.Notification{} -	err = suite.db.GetWhere(context.Background(), where, notif) +	var notif gtsmodel.Notification +	err = suite.db.GetWhere(context.Background(), []db.Where{ +		{Key: "status_id", Value: replyingStatus.ID}, +	}, ¬if)  	suite.NoError(err)  	suite.Equal(gtsmodel.NotificationMention, notif.NotificationType)  	suite.Equal(replyingStatus.InReplyToAccountID, notif.TargetAccountID) diff --git a/internal/processing/processor.go b/internal/processing/processor.go index a61a57f88..749987d6a 100644 --- a/internal/processing/processor.go +++ b/internal/processing/processor.go @@ -131,7 +131,7 @@ func NewProcessor(  	processor.fedi = fedi.New(state, tc, federator, filter)  	processor.media = media.New(state, tc, mediaManager, federator.TransportController())  	processor.report = report.New(state, tc) -	processor.status = status.New(state, tc, filter, parseMentionFunc) +	processor.status = status.New(state, federator, tc, filter, parseMentionFunc)  	processor.stream = stream.New(state, oauthServer)  	processor.user = user.New(state, emailSender) diff --git a/internal/processing/processor_test.go b/internal/processing/processor_test.go index 7c66c6e65..e572593d1 100644 --- a/internal/processing/processor_test.go +++ b/internal/processing/processor_test.go @@ -100,6 +100,8 @@ func (suite *ProcessingStandardTestSuite) SetupTest() {  	suite.state.Storage = suite.storage  	suite.typeconverter = testrig.NewTestTypeConverter(suite.db)  	suite.httpClient = testrig.NewMockHTTPClient(nil, "../../testrig/media") +	suite.httpClient.TestRemotePeople = testrig.NewTestFediPeople() +	suite.httpClient.TestRemoteStatuses = testrig.NewTestFediStatuses()  	suite.transportController = testrig.NewTestTransportController(&suite.state, suite.httpClient)  	suite.mediaManager = testrig.NewTestMediaManager(&suite.state) diff --git a/internal/processing/search.go b/internal/processing/search.go index c8dc58320..ef5da9ee7 100644 --- a/internal/processing/search.go +++ b/internal/processing/search.go @@ -226,17 +226,8 @@ func (p *Processor) SearchGet(ctx context.Context, authed *oauth.Auth, search *a  }  func (p *Processor) searchStatusByURI(ctx context.Context, authed *oauth.Auth, uri *url.URL) (*gtsmodel.Status, error) { -	status, statusable, err := p.federator.GetStatus(gtscontext.SetFastFail(ctx), authed.Account.Username, uri, true, true) -	if err != nil { -		return nil, err -	} - -	if !*status.Local && statusable != nil { -		// Attempt to dereference the status thread while we are here -		p.federator.DereferenceThread(gtscontext.SetFastFail(ctx), authed.Account.Username, uri, status, statusable) -	} - -	return status, nil +	status, _, err := p.federator.GetStatusByURI(gtscontext.SetFastFail(ctx), authed.Account.Username, uri) +	return status, err  }  func (p *Processor) searchAccountByURI(ctx context.Context, authed *oauth.Auth, uri *url.URL, resolve bool) (*gtsmodel.Account, error) { @@ -267,11 +258,12 @@ func (p *Processor) searchAccountByURI(ctx context.Context, authed *oauth.Auth,  		return account, nil  	} -	return p.federator.GetAccountByURI( +	account, _, err := p.federator.GetAccountByURI(  		gtscontext.SetFastFail(ctx),  		authed.Account.Username,  		uri,  	) +	return account, err  }  func (p *Processor) searchAccountByUsernameDomain(ctx context.Context, authed *oauth.Auth, username string, domain string, resolve bool) (*gtsmodel.Account, error) { @@ -294,9 +286,10 @@ func (p *Processor) searchAccountByUsernameDomain(ctx context.Context, authed *o  		return account, nil  	} -	return p.federator.GetAccountByUsernameDomain( +	account, _, err := p.federator.GetAccountByUsernameDomain(  		gtscontext.SetFastFail(ctx),  		authed.Account.Username,  		username, domain,  	) +	return account, err  } diff --git a/internal/processing/status/common.go b/internal/processing/status/common.go index d6478d35a..1c08a1e65 100644 --- a/internal/processing/status/common.go +++ b/internal/processing/status/common.go @@ -43,6 +43,16 @@ func (p *Processor) getVisibleStatus(ctx context.Context, requestingAccount *gts  		return nil, gtserror.NewErrorNotFound(err)  	} +	if requestingAccount != nil { +		// Ensure the status is up-to-date. +		p.federator.RefreshStatusAsync(ctx, +			requestingAccount.Username, +			targetStatus, +			nil, +			false, +		) +	} +  	visible, err := p.filter.StatusVisible(ctx, requestingAccount, targetStatus)  	if err != nil {  		err = fmt.Errorf("getVisibleStatus: error seeing if status %s is visible: %w", targetStatus.ID, err) diff --git a/internal/processing/status/status.go b/internal/processing/status/status.go index 2bc1b62ce..c34bff30f 100644 --- a/internal/processing/status/status.go +++ b/internal/processing/status/status.go @@ -18,6 +18,7 @@  package status  import ( +	"github.com/superseriousbusiness/gotosocial/internal/federation"  	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"  	"github.com/superseriousbusiness/gotosocial/internal/state"  	"github.com/superseriousbusiness/gotosocial/internal/text" @@ -27,6 +28,7 @@ import (  type Processor struct {  	state        *state.State +	federator    federation.Federator  	tc           typeutils.TypeConverter  	filter       *visibility.Filter  	formatter    text.Formatter @@ -34,9 +36,10 @@ type Processor struct {  }  // New returns a new status processor. -func New(state *state.State, tc typeutils.TypeConverter, filter *visibility.Filter, parseMention gtsmodel.ParseMentionFunc) Processor { +func New(state *state.State, federator federation.Federator, tc typeutils.TypeConverter, filter *visibility.Filter, parseMention gtsmodel.ParseMentionFunc) Processor {  	return Processor{  		state:        state, +		federator:    federator,  		tc:           tc,  		filter:       filter,  		formatter:    text.NewFormatter(state.DB), diff --git a/internal/processing/status/status_test.go b/internal/processing/status/status_test.go index bef0a6e69..0de56c30e 100644 --- a/internal/processing/status/status_test.go +++ b/internal/processing/status/status_test.go @@ -88,7 +88,7 @@ func (suite *StatusStandardTestSuite) SetupTest() {  	suite.federator = testrig.NewTestFederator(&suite.state, suite.tc, suite.mediaManager)  	filter := visibility.NewFilter(&suite.state) -	suite.status = status.New(&suite.state, suite.typeConverter, filter, processing.GetParseMentionFunc(suite.db, suite.federator)) +	suite.status = status.New(&suite.state, suite.federator, suite.typeConverter, filter, processing.GetParseMentionFunc(suite.db, suite.federator))  	testrig.StandardDBSetup(suite.db, suite.testAccounts)  	testrig.StandardStorageSetup(suite.storage, "../../../testrig/media") diff --git a/internal/processing/util.go b/internal/processing/util.go index 601d8c4de..7aefe64fc 100644 --- a/internal/processing/util.go +++ b/internal/processing/util.go @@ -57,7 +57,7 @@ func GetParseMentionFunc(dbConn db.DB, federator federation.Federator) gtsmodel.  				requestingUsername = originAccount.Username  			} -			remoteAccount, err := federator.GetAccountByUsernameDomain( +			remoteAccount, _, err := federator.GetAccountByUsernameDomain(  				gtscontext.SetFastFail(ctx),  				requestingUsername,  				username, diff --git a/testrig/federator.go b/testrig/federator.go index 38ba487a9..23d780806 100644 --- a/testrig/federator.go +++ b/testrig/federator.go @@ -26,5 +26,5 @@ import (  // NewTestFederator returns a federator with the given database and (mock!!) transport controller.  func NewTestFederator(state *state.State, tc transport.Controller, mediaManager media.Manager) federation.Federator { -	return federation.NewFederator(state.DB, NewTestFederatingDB(state), tc, NewTestTypeConverter(state.DB), mediaManager) +	return federation.NewFederator(state, NewTestFederatingDB(state), tc, NewTestTypeConverter(state.DB), mediaManager)  } diff --git a/testrig/testmodels.go b/testrig/testmodels.go index 60c4b919a..6337004ff 100644 --- a/testrig/testmodels.go +++ b/testrig/testmodels.go @@ -2482,6 +2482,26 @@ func NewTestFediStatuses() map[string]vocab.ActivityStreamsNote {  				),  			},  		), +		"http://fossbros-anonymous.io/users/foss_satan/statuses/106221634728637552": NewAPNote( +			URLMustParse("http://fossbros-anonymous.io/users/foss_satan/statuses/106221634728637552"), +			URLMustParse("http://fossbros-anonymous.io/@foss_satan/106221634728637552"), +			TimeMustParse("2022-07-13T12:13:12+02:00"), +			`<p><span class="h-card"><a href="http://localhost:8080/@the_mighty_zork" class="u-url mention">@<span>the_mighty_zork</span></a></span> nice there it is:</p><p><a href="http://localhost:8080/users/the_mighty_zork/statuses/01F8MHAMCHF6Y650WCRSCP4WMY/activity" rel="nofollow noopener noreferrer" target="_blank"><span class="invisible">https://</span><span class="ellipsis">social.pixie.town/users/f0x/st</span><span class="invisible">atuses/106221628567855262/activity</span></a></p>`, +			"", +			URLMustParse("http://fossbros-anonymous.io/users/foss_satan"), +			[]*url.URL{ +				URLMustParse(pub.PublicActivityPubIRI), +			}, +			[]*url.URL{}, +			false, +			[]vocab.ActivityStreamsMention{ +				newAPMention( +					URLMustParse("http://localhost:8080/users/the_mighty_zork"), +					"@the_mighty_zork@localhost:8080", +				), +			}, +			nil, +		),  	}  } | 
