summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmd/gotosocial/action/server/server.go2
-rw-r--r--internal/federation/federatingdb/accept.go30
-rw-r--r--internal/federation/federatingdb/announce.go2
-rw-r--r--internal/federation/federatingdb/block.go2
-rw-r--r--internal/federation/federatingdb/create.go6
-rw-r--r--internal/federation/federatingdb/db.go21
-rw-r--r--internal/federation/federatingdb/delete.go6
-rw-r--r--internal/federation/federatingdb/exists.go2
-rw-r--r--internal/federation/federatingdb/federatingdb_test.go2
-rw-r--r--internal/federation/federatingdb/flag.go2
-rw-r--r--internal/federation/federatingdb/follow.go2
-rw-r--r--internal/federation/federatingdb/followers.go2
-rw-r--r--internal/federation/federatingdb/following.go2
-rw-r--r--internal/federation/federatingdb/get.go40
-rw-r--r--internal/federation/federatingdb/inbox.go8
-rw-r--r--internal/federation/federatingdb/like.go2
-rw-r--r--internal/federation/federatingdb/liked.go2
-rw-r--r--internal/federation/federatingdb/lock.go4
-rw-r--r--internal/federation/federatingdb/move.go2
-rw-r--r--internal/federation/federatingdb/outbox.go6
-rw-r--r--internal/federation/federatingdb/owns.go4
-rw-r--r--internal/federation/federatingdb/reject.go26
-rw-r--r--internal/federation/federatingdb/undo.go15
-rw-r--r--internal/federation/federatingdb/update.go6
-rw-r--r--internal/federation/federatingdb/util.go38
-rw-r--r--internal/federation/federator.go6
-rw-r--r--internal/processing/workers/fromfediapi_move.go5
-rw-r--r--internal/transport/controller.go58
-rw-r--r--internal/transport/dereference.go8
-rw-r--r--testrig/federatingdb.go2
-rw-r--r--testrig/transportcontroller.go3
31 files changed, 156 insertions, 160 deletions
diff --git a/cmd/gotosocial/action/server/server.go b/cmd/gotosocial/action/server/server.go
index 844d46ca4..703df5e4b 100644
--- a/cmd/gotosocial/action/server/server.go
+++ b/cmd/gotosocial/action/server/server.go
@@ -271,7 +271,7 @@ var Start action.GTSAction = func(ctx context.Context) error {
intFilter := interaction.NewFilter(state)
spamFilter := spam.NewFilter(state)
federatingDB := federatingdb.New(state, typeConverter, visFilter, intFilter, spamFilter)
- transportController := transport.NewController(state, federatingDB, &federation.Clock{}, client)
+ transportController := transport.NewController(state, federatingDB, client)
federator := federation.NewFederator(
state,
federatingDB,
diff --git a/internal/federation/federatingdb/accept.go b/internal/federation/federatingdb/accept.go
index 273b9255f..2e4948a0e 100644
--- a/internal/federation/federatingdb/accept.go
+++ b/internal/federation/federatingdb/accept.go
@@ -35,7 +35,7 @@ import (
"code.superseriousbusiness.org/gotosocial/internal/util"
)
-func (f *federatingDB) GetAccept(
+func (f *DB) GetAccept(
ctx context.Context,
acceptIRI *url.URL,
) (vocab.ActivityStreamsAccept, error) {
@@ -46,7 +46,7 @@ func (f *federatingDB) GetAccept(
return f.converter.InteractionReqToASAccept(ctx, approval)
}
-func (f *federatingDB) Accept(ctx context.Context, accept vocab.ActivityStreamsAccept) error {
+func (f *DB) Accept(ctx context.Context, accept vocab.ActivityStreamsAccept) error {
log.DebugKV(ctx, "accept", serialize{accept})
activityContext := getActivityContext(ctx)
@@ -202,7 +202,7 @@ func (f *federatingDB) Accept(ctx context.Context, accept vocab.ActivityStreamsA
return nil
}
-func (f *federatingDB) acceptFollowType(
+func (f *DB) acceptFollowType(
ctx context.Context,
asType vocab.Type,
receivingAcct *gtsmodel.Account,
@@ -218,11 +218,6 @@ func (f *federatingDB) acceptFollowType(
return gtserror.NewErrorInternalError(err)
}
- // Lock on the Follow URI
- // as we may be updating it.
- unlock := f.state.FedLocks.Lock(follow.URI)
- defer unlock()
-
// Make sure the creator of the original follow
// is the same as whatever inbox this landed in.
if follow.AccountID != receivingAcct.ID {
@@ -238,8 +233,7 @@ func (f *federatingDB) acceptFollowType(
}
// Accept and get the populated follow back.
- follow, err = f.state.DB.AcceptFollowRequest(
- ctx,
+ follow, err = f.state.DB.AcceptFollowRequest(ctx,
follow.AccountID,
follow.TargetAccountID,
)
@@ -267,17 +261,12 @@ func (f *federatingDB) acceptFollowType(
return nil
}
-func (f *federatingDB) acceptFollowIRI(
+func (f *DB) acceptFollowIRI(
ctx context.Context,
objectIRI string,
receivingAcct *gtsmodel.Account,
requestingAcct *gtsmodel.Account,
) error {
- // Lock on this potential Follow
- // URI as we may be updating it.
- unlock := f.state.FedLocks.Lock(objectIRI)
- defer unlock()
-
// Get the follow req from the db.
followReq, err := f.state.DB.GetFollowRequestByURI(ctx, objectIRI)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
@@ -307,8 +296,7 @@ func (f *federatingDB) acceptFollowIRI(
}
// Accept and get the populated follow back.
- follow, err := f.state.DB.AcceptFollowRequest(
- ctx,
+ follow, err := f.state.DB.AcceptFollowRequest(ctx,
followReq.AccountID,
followReq.TargetAccountID,
)
@@ -336,7 +324,7 @@ func (f *federatingDB) acceptFollowIRI(
return nil
}
-func (f *federatingDB) acceptOtherIRI(
+func (f *DB) acceptOtherIRI(
ctx context.Context,
acceptID *url.URL,
accept vocab.ActivityStreamsAccept,
@@ -419,7 +407,7 @@ func (f *federatingDB) acceptOtherIRI(
return nil
}
-func (f *federatingDB) acceptStoredStatus(
+func (f *DB) acceptStoredStatus(
ctx context.Context,
acceptID *url.URL,
accept vocab.ActivityStreamsAccept,
@@ -489,7 +477,7 @@ func (f *federatingDB) acceptStoredStatus(
return nil
}
-func (f *federatingDB) acceptLikeIRI(
+func (f *DB) acceptLikeIRI(
ctx context.Context,
acceptID *url.URL,
accept vocab.ActivityStreamsAccept,
diff --git a/internal/federation/federatingdb/announce.go b/internal/federation/federatingdb/announce.go
index 2d467ea8c..d6cc2e5d5 100644
--- a/internal/federation/federatingdb/announce.go
+++ b/internal/federation/federatingdb/announce.go
@@ -29,7 +29,7 @@ import (
"code.superseriousbusiness.org/gotosocial/internal/messages"
)
-func (f *federatingDB) Announce(ctx context.Context, announce vocab.ActivityStreamsAnnounce) error {
+func (f *DB) Announce(ctx context.Context, announce vocab.ActivityStreamsAnnounce) error {
log.DebugKV(ctx, "announce", serialize{announce})
activityContext := getActivityContext(ctx)
diff --git a/internal/federation/federatingdb/block.go b/internal/federation/federatingdb/block.go
index 54da2030a..2950aef3b 100644
--- a/internal/federation/federatingdb/block.go
+++ b/internal/federation/federatingdb/block.go
@@ -29,7 +29,7 @@ import (
"code.superseriousbusiness.org/gotosocial/internal/messages"
)
-func (f *federatingDB) Block(ctx context.Context, blockable vocab.ActivityStreamsBlock) error {
+func (f *DB) Block(ctx context.Context, blockable vocab.ActivityStreamsBlock) error {
log.DebugKV(ctx, "block", serialize{blockable})
// Extract relevant values from passed ctx.
diff --git a/internal/federation/federatingdb/create.go b/internal/federation/federatingdb/create.go
index 2cfbb1d4c..8de137d6c 100644
--- a/internal/federation/federatingdb/create.go
+++ b/internal/federation/federatingdb/create.go
@@ -45,7 +45,7 @@ import (
//
// Under certain conditions and network activities, Create may be called
// multiple times for the same ActivityStreams object.
-func (f *federatingDB) Create(ctx context.Context, asType vocab.Type) error {
+func (f *DB) Create(ctx context.Context, asType vocab.Type) error {
log.DebugKV(ctx, "create", serialize{asType})
// Cache entry for this activity type's ID for later
@@ -126,7 +126,7 @@ func (f *federatingDB) Create(ctx context.Context, asType vocab.Type) error {
// createPollOptionable handles a Create activity for a PollOptionable.
// This function doesn't handle database insertion, only validation checks
// before passing off to a worker for asynchronous processing.
-func (f *federatingDB) createPollOptionables(
+func (f *DB) createPollOptionables(
ctx context.Context,
receiver *gtsmodel.Account,
requester *gtsmodel.Account,
@@ -274,7 +274,7 @@ func (f *federatingDB) createPollOptionables(
// This function won't insert anything in the database yet,
// but will pass the Statusable (if appropriate) through to
// the processor for further asynchronous processing.
-func (f *federatingDB) createStatusable(
+func (f *DB) createStatusable(
ctx context.Context,
receiver *gtsmodel.Account,
requester *gtsmodel.Account,
diff --git a/internal/federation/federatingdb/db.go b/internal/federation/federatingdb/db.go
index 539305204..0c4d21c64 100644
--- a/internal/federation/federatingdb/db.go
+++ b/internal/federation/federatingdb/db.go
@@ -32,9 +32,10 @@ import (
"codeberg.org/gruf/go-cache/v3/simple"
)
-// DB wraps the pub.Database interface with
-// a couple of custom functions for GoToSocial.
-type DB interface {
+// Check our type's
+// interface conformity.
+var _ interface {
+
// Default
// functionality.
pub.Database
@@ -55,11 +56,11 @@ type DB interface {
*/
GetAccept(ctx context.Context, acceptIRI *url.URL) (vocab.ActivityStreamsAccept, error)
-}
+} = &DB{}
-// FederatingDB uses the given state interface
-// to implement the go-fed pub.Database interface.
-type federatingDB struct {
+// DB uses the given state interface to
+// implement the go-fed pub.Database interface.
+type DB struct {
state *state.State
converter *typeutils.Converter
visFilter *visibility.Filter
@@ -79,8 +80,8 @@ func New(
visFilter *visibility.Filter,
intFilter *interaction.Filter,
spamFilter *spam.Filter,
-) DB {
- fdb := federatingDB{
+) *DB {
+ fdb := DB{
state: state,
converter: converter,
visFilter: visFilter,
@@ -93,6 +94,6 @@ func New(
// storeActivityID stores an entry in the .activityIDs cache for this
// type's JSON-LD ID, for later checks in Exist() to mark it as seen.
-func (f *federatingDB) storeActivityID(asType vocab.Type) {
+func (f *DB) storeActivityID(asType vocab.Type) {
f.activityIDs.Set(ap.GetJSONLDId(asType).String(), struct{}{})
}
diff --git a/internal/federation/federatingdb/delete.go b/internal/federation/federatingdb/delete.go
index fb967fce9..17a25e956 100644
--- a/internal/federation/federatingdb/delete.go
+++ b/internal/federation/federatingdb/delete.go
@@ -36,7 +36,7 @@ import (
// Protocol instead call Update to create a Tombstone.
//
// The library makes this call only after acquiring a lock first.
-func (f *federatingDB) Delete(ctx context.Context, id *url.URL) error {
+func (f *DB) Delete(ctx context.Context, id *url.URL) error {
log.DebugKV(ctx, "id", id)
activityContext := getActivityContext(ctx)
@@ -87,7 +87,7 @@ func (f *federatingDB) Delete(ctx context.Context, id *url.URL) error {
return nil
}
-func (f *federatingDB) deleteAccount(
+func (f *DB) deleteAccount(
ctx context.Context,
requesting *gtsmodel.Account,
receiving *gtsmodel.Account,
@@ -126,7 +126,7 @@ func (f *federatingDB) deleteAccount(
return false, nil
}
-func (f *federatingDB) deleteStatus(
+func (f *DB) deleteStatus(
ctx context.Context,
requesting *gtsmodel.Account,
receiving *gtsmodel.Account,
diff --git a/internal/federation/federatingdb/exists.go b/internal/federation/federatingdb/exists.go
index ec996f72f..6d3a23e84 100644
--- a/internal/federation/federatingdb/exists.go
+++ b/internal/federation/federatingdb/exists.go
@@ -24,6 +24,6 @@ import (
// Exists is an implementation of pub.Database{}.Exists(), optimized specifically for
// the only usecase in which go-fed/activity/pub actually calls it. Do not use otherwise!
-func (f *federatingDB) Exists(ctx context.Context, id *url.URL) (exists bool, err error) {
+func (f *DB) Exists(ctx context.Context, id *url.URL) (exists bool, err error) {
return f.activityIDs.Has(id.String()), nil
}
diff --git a/internal/federation/federatingdb/federatingdb_test.go b/internal/federation/federatingdb/federatingdb_test.go
index e36c31d79..1203cf6b8 100644
--- a/internal/federation/federatingdb/federatingdb_test.go
+++ b/internal/federation/federatingdb/federatingdb_test.go
@@ -37,7 +37,7 @@ type FederatingDBTestSuite struct {
suite.Suite
db db.DB
tc *typeutils.Converter
- federatingDB federatingdb.DB
+ federatingDB *federatingdb.DB
state state.State
testTokens map[string]*gtsmodel.Token
diff --git a/internal/federation/federatingdb/flag.go b/internal/federation/federatingdb/flag.go
index 1198e688a..67727e5d9 100644
--- a/internal/federation/federatingdb/flag.go
+++ b/internal/federation/federatingdb/flag.go
@@ -30,7 +30,7 @@ import (
"github.com/miekg/dns"
)
-func (f *federatingDB) Flag(ctx context.Context, flaggable vocab.ActivityStreamsFlag) error {
+func (f *DB) Flag(ctx context.Context, flaggable vocab.ActivityStreamsFlag) error {
log.DebugKV(ctx, "flag", serialize{flaggable})
// Mark activity as handled.
diff --git a/internal/federation/federatingdb/follow.go b/internal/federation/federatingdb/follow.go
index f8ae8c580..9bd602489 100644
--- a/internal/federation/federatingdb/follow.go
+++ b/internal/federation/federatingdb/follow.go
@@ -29,7 +29,7 @@ import (
"code.superseriousbusiness.org/gotosocial/internal/messages"
)
-func (f *federatingDB) Follow(ctx context.Context, followable vocab.ActivityStreamsFollow) error {
+func (f *DB) Follow(ctx context.Context, followable vocab.ActivityStreamsFollow) error {
log.DebugKV(ctx, "follow", serialize{followable})
// Mark activity as handled.
diff --git a/internal/federation/federatingdb/followers.go b/internal/federation/federatingdb/followers.go
index 4f8aaf459..a68050033 100644
--- a/internal/federation/federatingdb/followers.go
+++ b/internal/federation/federatingdb/followers.go
@@ -32,7 +32,7 @@ import (
// If modified, the library will then call Update.
//
// The library makes this call only after acquiring a lock first.
-func (f *federatingDB) Followers(ctx context.Context, actorIRI *url.URL) (followers vocab.ActivityStreamsCollection, err error) {
+func (f *DB) Followers(ctx context.Context, actorIRI *url.URL) (followers vocab.ActivityStreamsCollection, err error) {
acct, err := f.state.DB.GetAccountByURI(ctx, actorIRI.String())
if err != nil {
return nil, err
diff --git a/internal/federation/federatingdb/following.go b/internal/federation/federatingdb/following.go
index 6f65930bc..fd908c9ef 100644
--- a/internal/federation/federatingdb/following.go
+++ b/internal/federation/federatingdb/following.go
@@ -31,7 +31,7 @@ import (
// If modified, the library will then call Update.
//
// The library makes this call only after acquiring a lock first.
-func (f *federatingDB) Following(ctx context.Context, actorIRI *url.URL) (following vocab.ActivityStreamsCollection, err error) {
+func (f *DB) Following(ctx context.Context, actorIRI *url.URL) (following vocab.ActivityStreamsCollection, err error) {
acct, err := f.state.DB.GetAccountByURI(ctx, actorIRI.String())
if err != nil {
return nil, err
diff --git a/internal/federation/federatingdb/get.go b/internal/federation/federatingdb/get.go
index 92c2d1d8d..58dcfecdc 100644
--- a/internal/federation/federatingdb/get.go
+++ b/internal/federation/federatingdb/get.go
@@ -19,16 +19,19 @@ package federatingdb
import (
"context"
+ "errors"
"net/url"
"code.superseriousbusiness.org/activity/streams/vocab"
"code.superseriousbusiness.org/gotosocial/internal/config"
- "code.superseriousbusiness.org/gotosocial/internal/db"
+ "code.superseriousbusiness.org/gotosocial/internal/gtscontext"
"code.superseriousbusiness.org/gotosocial/internal/gtserror"
"code.superseriousbusiness.org/gotosocial/internal/log"
"code.superseriousbusiness.org/gotosocial/internal/uris"
)
+var ErrNotImplemented = errors.New("not implemented")
+
// Get returns the database entry for the specified id.
//
// The library makes this call only after acquiring a lock first.
@@ -48,30 +51,38 @@ import (
//
// It may be useful in future to add more matching here so that more
// stuff can be shortcutted by the dereferencer, saving HTTP calls.
-func (f *federatingDB) Get(ctx context.Context, id *url.URL) (value vocab.Type, err error) {
+func (f *DB) Get(ctx context.Context, id *url.URL) (value vocab.Type, err error) {
log.DebugKV(ctx, "id", id)
// Ensure our host, for safety.
if id.Host != config.GetHost() {
- return nil, gtserror.Newf("%s was not for our host", id.String())
+ return nil, gtserror.Newf("%s was not for our host", id)
}
- if username, err := uris.ParseUserPath(id); err == nil && username != "" {
- acct, err := f.state.DB.GetAccountByUsernameDomain(ctx, username, "")
+ if username, _ := uris.ParseUserPath(id); username != "" {
+ acct, err := f.state.DB.GetAccountByUsernameDomain(
+ gtscontext.SetBarebones(ctx),
+ username,
+ "",
+ )
if err != nil {
return nil, err
}
return f.converter.AccountToAS(ctx, acct)
- } else if _, statusID, err := uris.ParseStatusesPath(id); err == nil && statusID != "" {
+ } else if _, statusID, _ := uris.ParseStatusesPath(id); statusID != "" {
status, err := f.state.DB.GetStatusByID(ctx, statusID)
if err != nil {
return nil, err
}
return f.converter.StatusToAS(ctx, status)
- } else if username, err := uris.ParseFollowersPath(id); err == nil && username != "" {
- acct, err := f.state.DB.GetAccountByUsernameDomain(ctx, username, "")
+ } else if username, _ := uris.ParseFollowersPath(id); username != "" {
+ acct, err := f.state.DB.GetAccountByUsernameDomain(
+ gtscontext.SetBarebones(ctx),
+ username,
+ "",
+ )
if err != nil {
return nil, err
}
@@ -83,8 +94,12 @@ func (f *federatingDB) Get(ctx context.Context, id *url.URL) (value vocab.Type,
return f.Followers(ctx, acctURI)
- } else if username, err := uris.ParseFollowingPath(id); err == nil && username != "" {
- acct, err := f.state.DB.GetAccountByUsernameDomain(ctx, username, "")
+ } else if username, _ := uris.ParseFollowingPath(id); username != "" {
+ acct, err := f.state.DB.GetAccountByUsernameDomain(
+ gtscontext.SetBarebones(ctx),
+ username,
+ "",
+ )
if err != nil {
return nil, err
}
@@ -102,8 +117,5 @@ func (f *federatingDB) Get(ctx context.Context, id *url.URL) (value vocab.Type,
// Nothing found, the caller
// will have to deal with this.
- return nil, gtserror.Newf(
- "not implemented for %s: %w",
- id.String(), db.ErrNoEntries,
- )
+ return nil, ErrNotImplemented
}
diff --git a/internal/federation/federatingdb/inbox.go b/internal/federation/federatingdb/inbox.go
index 2fb0ebad9..8533fa9f4 100644
--- a/internal/federation/federatingdb/inbox.go
+++ b/internal/federation/federatingdb/inbox.go
@@ -35,7 +35,7 @@ import (
// The library makes this call only after acquiring a lock first.
//
// Implementation note: we have our own logic for inboxes so always return false here.
-func (f *federatingDB) InboxContains(c context.Context, inbox, id *url.URL) (contains bool, err error) {
+func (f *DB) InboxContains(c context.Context, inbox, id *url.URL) (contains bool, err error) {
return false, nil
}
@@ -45,7 +45,7 @@ func (f *federatingDB) InboxContains(c context.Context, inbox, id *url.URL) (con
// The library makes this call only after acquiring a lock first.
//
// Implementation note: we don't (yet) serve inboxes, so just return empty and nil here.
-func (f *federatingDB) GetInbox(c context.Context, inboxIRI *url.URL) (inbox vocab.ActivityStreamsOrderedCollectionPage, err error) {
+func (f *DB) GetInbox(c context.Context, inboxIRI *url.URL) (inbox vocab.ActivityStreamsOrderedCollectionPage, err error) {
return streams.NewActivityStreamsOrderedCollectionPage(), nil
}
@@ -56,7 +56,7 @@ func (f *federatingDB) GetInbox(c context.Context, inboxIRI *url.URL) (inbox voc
// The library makes this call only after acquiring a lock first.
//
// Implementation note: we don't allow inbox setting so just return nil here.
-func (f *federatingDB) SetInbox(c context.Context, inbox vocab.ActivityStreamsOrderedCollectionPage) error {
+func (f *DB) SetInbox(c context.Context, inbox vocab.ActivityStreamsOrderedCollectionPage) error {
return nil
}
@@ -76,7 +76,7 @@ func (f *federatingDB) SetInbox(c context.Context, inbox vocab.ActivityStreamsOr
// then each follower inbox IRI should be returned in the inboxIRIs slice.
//
// The library makes this call only after acquiring a lock first.
-func (f *federatingDB) InboxesForIRI(c context.Context, iri *url.URL) (inboxIRIs []*url.URL, err error) {
+func (f *DB) InboxesForIRI(c context.Context, iri *url.URL) (inboxIRIs []*url.URL, err error) {
// check if this is a followers collection iri for a local account...
if iri.Host == config.GetHost() && uris.IsFollowersPath(iri) {
localAccountUsername, err := uris.ParseFollowersPath(iri)
diff --git a/internal/federation/federatingdb/like.go b/internal/federation/federatingdb/like.go
index 970ca53ef..debc343cf 100644
--- a/internal/federation/federatingdb/like.go
+++ b/internal/federation/federatingdb/like.go
@@ -31,7 +31,7 @@ import (
"code.superseriousbusiness.org/gotosocial/internal/messages"
)
-func (f *federatingDB) Like(ctx context.Context, likeable vocab.ActivityStreamsLike) error {
+func (f *DB) Like(ctx context.Context, likeable vocab.ActivityStreamsLike) error {
log.DebugKV(ctx, "like", serialize{likeable})
// Mark activity as handled.
diff --git a/internal/federation/federatingdb/liked.go b/internal/federation/federatingdb/liked.go
index 9db3341f1..6e6f6209b 100644
--- a/internal/federation/federatingdb/liked.go
+++ b/internal/federation/federatingdb/liked.go
@@ -33,6 +33,6 @@ import (
// The library makes this call only after acquiring a lock first.
//
// Implementation note: we don't serve a Liked collection *yet* so just return an empty collection for now.
-func (f *federatingDB) Liked(c context.Context, actorIRI *url.URL) (liked vocab.ActivityStreamsCollection, err error) {
+func (f *DB) Liked(c context.Context, actorIRI *url.URL) (liked vocab.ActivityStreamsCollection, err error) {
return streams.NewActivityStreamsCollection(), nil
}
diff --git a/internal/federation/federatingdb/lock.go b/internal/federation/federatingdb/lock.go
index 5353aea91..a7b3f9950 100644
--- a/internal/federation/federatingdb/lock.go
+++ b/internal/federation/federatingdb/lock.go
@@ -33,6 +33,6 @@ import (
// processes require tight loops acquiring and releasing locks.
//
// Used to ensure race conditions in multiple requests do not occur.
-func (f *federatingDB) Lock(c context.Context, id *url.URL) (func(), error) {
- return f.state.FedLocks.Lock("federatingDB " + id.String()), nil // id should NEVER be nil.
+func (f *DB) Lock(c context.Context, id *url.URL) (func(), error) {
+ return f.state.FedLocks.Lock("fdb:" + id.String()), nil // id should NEVER be nil.
}
diff --git a/internal/federation/federatingdb/move.go b/internal/federation/federatingdb/move.go
index 2740d0de9..8a6a77ef1 100644
--- a/internal/federation/federatingdb/move.go
+++ b/internal/federation/federatingdb/move.go
@@ -35,7 +35,7 @@ import (
"code.superseriousbusiness.org/gotosocial/internal/messages"
)
-func (f *federatingDB) Move(ctx context.Context, move vocab.ActivityStreamsMove) error {
+func (f *DB) Move(ctx context.Context, move vocab.ActivityStreamsMove) error {
log.DebugKV(ctx, "move", serialize{move})
// Mark activity as handled.
diff --git a/internal/federation/federatingdb/outbox.go b/internal/federation/federatingdb/outbox.go
index 8098dfa34..5ef111138 100644
--- a/internal/federation/federatingdb/outbox.go
+++ b/internal/federation/federatingdb/outbox.go
@@ -31,7 +31,7 @@ import (
// The library makes this call only after acquiring a lock first.
//
// Implementation note: we don't (yet) serve outboxes, so just return empty and nil here.
-func (f *federatingDB) GetOutbox(ctx context.Context, outboxIRI *url.URL) (inbox vocab.ActivityStreamsOrderedCollectionPage, err error) {
+func (f *DB) GetOutbox(ctx context.Context, outboxIRI *url.URL) (inbox vocab.ActivityStreamsOrderedCollectionPage, err error) {
return streams.NewActivityStreamsOrderedCollectionPage(), nil
}
@@ -42,7 +42,7 @@ func (f *federatingDB) GetOutbox(ctx context.Context, outboxIRI *url.URL) (inbox
// The library makes this call only after acquiring a lock first.
//
// Implementation note: we don't allow outbox setting so just return nil here.
-func (f *federatingDB) SetOutbox(ctx context.Context, outbox vocab.ActivityStreamsOrderedCollectionPage) error {
+func (f *DB) SetOutbox(ctx context.Context, outbox vocab.ActivityStreamsOrderedCollectionPage) error {
return nil
}
@@ -50,7 +50,7 @@ func (f *federatingDB) SetOutbox(ctx context.Context, outbox vocab.ActivityStrea
// actor's inbox IRI.
//
// The library makes this call only after acquiring a lock first.
-func (f *federatingDB) OutboxForInbox(ctx context.Context, inboxIRI *url.URL) (outboxIRI *url.URL, err error) {
+func (f *DB) OutboxForInbox(ctx context.Context, inboxIRI *url.URL) (outboxIRI *url.URL, err error) {
acct, err := f.state.DB.GetOneAccountByInboxURI(ctx, inboxIRI.String())
if err != nil {
return nil, err
diff --git a/internal/federation/federatingdb/owns.go b/internal/federation/federatingdb/owns.go
index 7d08a877f..4cf49c246 100644
--- a/internal/federation/federatingdb/owns.go
+++ b/internal/federation/federatingdb/owns.go
@@ -34,7 +34,7 @@ import (
// Owns returns true if the IRI belongs to this instance, and if
// the database has an entry for the IRI.
// The library makes this call only after acquiring a lock first.
-func (f *federatingDB) Owns(ctx context.Context, id *url.URL) (bool, error) {
+func (f *DB) Owns(ctx context.Context, id *url.URL) (bool, error) {
log.DebugKV(ctx, "id", id)
// if the id host isn't this instance host, we don't own this IRI
@@ -150,7 +150,7 @@ func (f *federatingDB) Owns(ctx context.Context, id *url.URL) (bool, error) {
return false, fmt.Errorf("could not match activityID: %s", id.String())
}
-func (f *federatingDB) ownsLike(ctx context.Context, uri *url.URL) (bool, error) {
+func (f *DB) ownsLike(ctx context.Context, uri *url.URL) (bool, error) {
username, id, err := uris.ParseLikedPath(uri)
if err != nil {
return false, fmt.Errorf("error parsing Like path for url %s: %w", uri.String(), err)
diff --git a/internal/federation/federatingdb/reject.go b/internal/federation/federatingdb/reject.go
index 1c657a5a9..5ec3b1a27 100644
--- a/internal/federation/federatingdb/reject.go
+++ b/internal/federation/federatingdb/reject.go
@@ -33,7 +33,7 @@ import (
"code.superseriousbusiness.org/gotosocial/internal/uris"
)
-func (f *federatingDB) Reject(ctx context.Context, reject vocab.ActivityStreamsReject) error {
+func (f *DB) Reject(ctx context.Context, reject vocab.ActivityStreamsReject) error {
log.DebugKV(ctx, "reject", serialize{reject})
activityContext := getActivityContext(ctx)
@@ -122,7 +122,7 @@ func (f *federatingDB) Reject(ctx context.Context, reject vocab.ActivityStreamsR
return nil
}
-func (f *federatingDB) rejectFollowType(
+func (f *DB) rejectFollowType(
ctx context.Context,
asType vocab.Type,
receivingAcct *gtsmodel.Account,
@@ -138,11 +138,6 @@ func (f *federatingDB) rejectFollowType(
return gtserror.NewErrorInternalError(err)
}
- // Lock on the Follow URI
- // as we may be updating it.
- unlock := f.state.FedLocks.Lock(follow.URI)
- defer unlock()
-
// Make sure the creator of the original follow
// is the same as whatever inbox this landed in.
if follow.AccountID != receivingAcct.ID {
@@ -158,8 +153,7 @@ func (f *federatingDB) rejectFollowType(
}
// Reject the follow.
- err = f.state.DB.RejectFollowRequest(
- ctx,
+ err = f.state.DB.RejectFollowRequest(ctx,
follow.AccountID,
follow.TargetAccountID,
)
@@ -171,17 +165,12 @@ func (f *federatingDB) rejectFollowType(
return nil
}
-func (f *federatingDB) rejectFollowIRI(
+func (f *DB) rejectFollowIRI(
ctx context.Context,
objectIRI string,
receivingAcct *gtsmodel.Account,
requestingAcct *gtsmodel.Account,
) error {
- // Lock on this potential Follow
- // URI as we may be updating it.
- unlock := f.state.FedLocks.Lock(objectIRI)
- defer unlock()
-
// Get the follow req from the db.
followReq, err := f.state.DB.GetFollowRequestByURI(ctx, objectIRI)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
@@ -214,8 +203,7 @@ func (f *federatingDB) rejectFollowIRI(
}
// Reject the follow.
- err = f.state.DB.RejectFollowRequest(
- ctx,
+ err = f.state.DB.RejectFollowRequest(ctx,
followReq.AccountID,
followReq.TargetAccountID,
)
@@ -227,7 +215,7 @@ func (f *federatingDB) rejectFollowIRI(
return nil
}
-func (f *federatingDB) rejectStatusIRI(
+func (f *DB) rejectStatusIRI(
ctx context.Context,
activityID string,
objectIRI string,
@@ -379,7 +367,7 @@ func (f *federatingDB) rejectStatusIRI(
return nil
}
-func (f *federatingDB) rejectLikeIRI(
+func (f *DB) rejectLikeIRI(
ctx context.Context,
activityID string,
objectIRI string,
diff --git a/internal/federation/federatingdb/undo.go b/internal/federation/federatingdb/undo.go
index 5f3678cd8..23c4098dd 100644
--- a/internal/federation/federatingdb/undo.go
+++ b/internal/federation/federatingdb/undo.go
@@ -32,7 +32,7 @@ import (
"code.superseriousbusiness.org/gotosocial/internal/messages"
)
-func (f *federatingDB) Undo(ctx context.Context, undo vocab.ActivityStreamsUndo) error {
+func (f *DB) Undo(ctx context.Context, undo vocab.ActivityStreamsUndo) error {
log.DebugKV(ctx, "undo", serialize{undo})
activityContext := getActivityContext(ctx)
@@ -111,7 +111,7 @@ func (f *federatingDB) Undo(ctx context.Context, undo vocab.ActivityStreamsUndo)
return nil
}
-func (f *federatingDB) undoFollow(
+func (f *DB) undoFollow(
ctx context.Context,
receivingAcct *gtsmodel.Account,
requestingAcct *gtsmodel.Account,
@@ -149,11 +149,6 @@ func (f *federatingDB) undoFollow(
return nil
}
- // Lock on the Follow URI
- // as we may be updating it.
- unlock := f.state.FedLocks.Lock(follow.URI)
- defer unlock()
-
// Ensure addressee is follow target.
if follow.TargetAccountID != receivingAcct.ID {
const text = "receivingAcct was not Follow target"
@@ -193,7 +188,7 @@ func (f *federatingDB) undoFollow(
return nil
}
-func (f *federatingDB) undoLike(
+func (f *DB) undoLike(
ctx context.Context,
receivingAcct *gtsmodel.Account,
requestingAcct *gtsmodel.Account,
@@ -293,7 +288,7 @@ func (f *federatingDB) undoLike(
return nil
}
-func (f *federatingDB) undoBlock(
+func (f *DB) undoBlock(
ctx context.Context,
receivingAcct *gtsmodel.Account,
requestingAcct *gtsmodel.Account,
@@ -363,7 +358,7 @@ func (f *federatingDB) undoBlock(
return nil
}
-func (f *federatingDB) undoAnnounce(
+func (f *DB) undoAnnounce(
ctx context.Context,
receivingAcct *gtsmodel.Account,
requestingAcct *gtsmodel.Account,
diff --git a/internal/federation/federatingdb/update.go b/internal/federation/federatingdb/update.go
index 94471c33a..a24594a77 100644
--- a/internal/federation/federatingdb/update.go
+++ b/internal/federation/federatingdb/update.go
@@ -40,7 +40,7 @@ import (
// the entire value.
//
// The library makes this call only after acquiring a lock first.
-func (f *federatingDB) Update(ctx context.Context, asType vocab.Type) error {
+func (f *DB) Update(ctx context.Context, asType vocab.Type) error {
log.DebugKV(ctx, "update", serialize{asType})
// Mark activity as handled.
@@ -67,7 +67,7 @@ func (f *federatingDB) Update(ctx context.Context, asType vocab.Type) error {
return nil
}
-func (f *federatingDB) updateAccountable(ctx context.Context, receivingAcct *gtsmodel.Account, requestingAcct *gtsmodel.Account, accountable ap.Accountable) error {
+func (f *DB) updateAccountable(ctx context.Context, receivingAcct *gtsmodel.Account, requestingAcct *gtsmodel.Account, accountable ap.Accountable) error {
// Extract AP URI of the updated Accountable model.
idProp := accountable.GetJSONLDId()
if idProp == nil || !idProp.IsIRI() {
@@ -105,7 +105,7 @@ func (f *federatingDB) updateAccountable(ctx context.Context, receivingAcct *gts
return nil
}
-func (f *federatingDB) updateStatusable(ctx context.Context, receivingAcct *gtsmodel.Account, requestingAcct *gtsmodel.Account, statusable ap.Statusable) error {
+func (f *DB) updateStatusable(ctx context.Context, receivingAcct *gtsmodel.Account, requestingAcct *gtsmodel.Account, statusable ap.Statusable) error {
// Extract AP URI of the updated model.
idProp := statusable.GetJSONLDId()
if idProp == nil || !idProp.IsIRI() {
diff --git a/internal/federation/federatingdb/util.go b/internal/federation/federatingdb/util.go
index 32aec51a5..f235e729e 100644
--- a/internal/federation/federatingdb/util.go
+++ b/internal/federation/federatingdb/util.go
@@ -20,7 +20,6 @@ package federatingdb
import (
"context"
"encoding/json"
- "fmt"
"net/url"
"code.superseriousbusiness.org/activity/streams"
@@ -65,16 +64,20 @@ func sameActor(actor1 vocab.ActivityStreamsActorProperty, actor2 vocab.ActivityS
}
for a1Iter := actor1.Begin(); a1Iter != actor1.End(); a1Iter = a1Iter.Next() {
- for a2Iter := actor2.Begin(); a2Iter != actor2.End(); a2Iter = a2Iter.Next() {
- if a1Iter.GetIRI() == nil {
- return false
- }
+ a1IRI := a1Iter.GetIRI()
+ if a1IRI == nil {
+ return false
+ }
- if a2Iter.GetIRI() == nil {
+ a1IRIStr := a1IRI.String()
+ for a2Iter := actor2.Begin(); a2Iter != actor2.End(); a2Iter = a2Iter.Next() {
+ a2IRI := a2Iter.GetIRI()
+ if a2IRI == nil {
return false
}
- if a1Iter.GetIRI().String() == a2Iter.GetIRI().String() {
+ a2IRIStr := a2IRI.String()
+ if a1IRIStr == a2IRIStr {
return true
}
}
@@ -89,7 +92,7 @@ func sameActor(actor1 vocab.ActivityStreamsActorProperty, actor2 vocab.ActivityS
//
// The go-fed library will handle setting the 'id' property on the
// activity or object provided with the value returned.
-func (f *federatingDB) NewID(ctx context.Context, t vocab.Type) (idURL *url.URL, err error) {
+func (f *DB) NewID(ctx context.Context, t vocab.Type) (idURL *url.URL, err error) {
log.DebugKV(ctx, "newID", serialize{t})
// Most of our types set an ID already
@@ -116,19 +119,18 @@ func (f *federatingDB) NewID(ctx context.Context, t vocab.Type) (idURL *url.URL,
}
// Default fallback behaviour:
- // {proto}://{host}/{randomID}
- newID, err := id.NewRandomULID()
- if err != nil {
- return nil, err
- }
-
- return url.Parse(fmt.Sprintf("%s://%s/%s", config.GetProtocol(), config.GetHost(), newID))
+ // {proto}://{host}/{newULID}
+ return &url.URL{
+ Scheme: config.GetProtocol(),
+ Host: config.GetHost(),
+ Path: "/" + id.NewULID(),
+ }, nil
}
// ActorForOutbox fetches the local actor's IRI for the given outbox IRI.
//
// The library makes this call only after acquiring a lock first.
-func (f *federatingDB) ActorForOutbox(ctx context.Context, outboxIRI *url.URL) (actorIRI *url.URL, err error) {
+func (f *DB) ActorForOutbox(ctx context.Context, outboxIRI *url.URL) (actorIRI *url.URL, err error) {
acct, err := f.state.DB.GetOneAccountByOutboxURI(ctx, outboxIRI.String())
if err != nil {
return nil, err
@@ -139,7 +141,7 @@ func (f *federatingDB) ActorForOutbox(ctx context.Context, outboxIRI *url.URL) (
// ActorForInbox fetches the local actor's IRI for the given inbox IRI.
//
// The library makes this call only after acquiring a lock first.
-func (f *federatingDB) ActorForInbox(ctx context.Context, inboxIRI *url.URL) (actorIRI *url.URL, err error) {
+func (f *DB) ActorForInbox(ctx context.Context, inboxIRI *url.URL) (actorIRI *url.URL, err error) {
acct, err := f.state.DB.GetOneAccountByInboxURI(ctx, inboxIRI.String())
if err != nil {
return nil, err
@@ -148,7 +150,7 @@ func (f *federatingDB) ActorForInbox(ctx context.Context, inboxIRI *url.URL) (ac
}
// collectFollows takes a slice of iris and converts them into ActivityStreamsCollection of IRIs.
-func (f *federatingDB) collectIRIs(_ context.Context, iris []*url.URL) (vocab.ActivityStreamsCollection, error) {
+func (f *DB) collectIRIs(_ context.Context, iris []*url.URL) (vocab.ActivityStreamsCollection, error) {
collection := streams.NewActivityStreamsCollection()
items := streams.NewActivityStreamsItemsProperty()
for _, i := range iris {
diff --git a/internal/federation/federator.go b/internal/federation/federator.go
index 93df31735..5f8324da2 100644
--- a/internal/federation/federator.go
+++ b/internal/federation/federator.go
@@ -37,7 +37,7 @@ var _ interface {
type Federator struct {
db db.DB
- federatingDB federatingdb.DB
+ federatingDB *federatingdb.DB
clock pub.Clock
converter *typeutils.Converter
transport transport.Controller
@@ -54,7 +54,7 @@ type Federator struct {
// NewFederator returns a new federator instance.
func NewFederator(
state *state.State,
- federatingDB federatingdb.DB,
+ federatingDB *federatingdb.DB,
transportController transport.Controller,
converter *typeutils.Converter,
visFilter *visibility.Filter,
@@ -112,7 +112,7 @@ func (f *Federator) FederatingActor() pub.FederatingActor {
}
// FederatingDB returns the underlying FederatingDB interface.
-func (f *Federator) FederatingDB() federatingdb.DB {
+func (f *Federator) FederatingDB() *federatingdb.DB {
return f.federatingDB
}
diff --git a/internal/processing/workers/fromfediapi_move.go b/internal/processing/workers/fromfediapi_move.go
index e7c75950a..93e7b39a4 100644
--- a/internal/processing/workers/fromfediapi_move.go
+++ b/internal/processing/workers/fromfediapi_move.go
@@ -269,9 +269,8 @@ func (p *fediAPI) MoveAccount(ctx context.Context, fMsg *messages.FromFediAPI) e
// try to send the same Move several times with
// different IDs (you never know), but we only
// want to process them based on origin + target.
- unlock := p.state.FedLocks.Lock(
- "move:" + originAcctURIStr + ":" + targetAcctURIStr,
- )
+ key := "move:" + originAcctURIStr + ":" + targetAcctURIStr
+ unlock := p.state.FedLocks.Lock(key)
defer unlock()
// Check if Move is rate limited based
diff --git a/internal/transport/controller.go b/internal/transport/controller.go
index 0f3c1c9b0..33b74c76e 100644
--- a/internal/transport/controller.go
+++ b/internal/transport/controller.go
@@ -23,6 +23,7 @@ import (
"crypto/rsa"
"crypto/x509"
"encoding/json"
+ "errors"
"fmt"
"io"
"net/http"
@@ -30,11 +31,12 @@ import (
"strconv"
"code.superseriousbusiness.org/activity/pub"
- "code.superseriousbusiness.org/activity/streams/vocab"
"code.superseriousbusiness.org/gotosocial/internal/ap"
apiutil "code.superseriousbusiness.org/gotosocial/internal/api/util"
"code.superseriousbusiness.org/gotosocial/internal/config"
+ "code.superseriousbusiness.org/gotosocial/internal/db"
"code.superseriousbusiness.org/gotosocial/internal/federation/federatingdb"
+ "code.superseriousbusiness.org/gotosocial/internal/gtserror"
"code.superseriousbusiness.org/gotosocial/internal/state"
"code.superseriousbusiness.org/gotosocial/internal/util"
"codeberg.org/gruf/go-byteutil"
@@ -52,15 +54,14 @@ type Controller interface {
type controller struct {
state *state.State
- fedDB federatingdb.DB
- clock pub.Clock
+ fedDB *federatingdb.DB
client pub.HttpClient
trspCache cache.TTLCache[string, *transport]
userAgent string
}
// NewController returns an implementation of the Controller interface for creating new transports
-func NewController(state *state.State, federatingDB federatingdb.DB, clock pub.Clock, client pub.HttpClient) Controller {
+func NewController(state *state.State, federatingDB *federatingdb.DB, client pub.HttpClient) Controller {
var (
host = config.GetHost()
proto = config.GetProtocol()
@@ -70,7 +71,6 @@ func NewController(state *state.State, federatingDB federatingdb.DB, clock pub.C
c := &controller{
state: state,
fedDB: federatingDB,
- clock: clock,
client: client,
trspCache: cache.NewTTL[string, *transport](0, 100, 0),
userAgent: fmt.Sprintf("gotosocial/%s (+%s://%s)", version, proto, host),
@@ -153,37 +153,51 @@ func (c *controller) dereferenceLocal(
ctx context.Context,
uri *url.URL,
) (*http.Response, error) {
- var (
- t vocab.Type
- err error
- )
- t, err = c.fedDB.Get(ctx, uri)
- if err != nil {
- // Don't check especially for
- // db.ErrNoEntries, as we *want*
- // to pass this back to the caller
- // if we didn't get anything.
- return nil, err
+ // Try fetch via federating DB.
+ t, err := c.fedDB.Get(ctx, uri)
+
+ switch {
+ // No problem.
+ case err == nil:
+
+ // Catch and handle objects not found.
+ case errors.Is(err, db.ErrNoEntries):
+ return &http.Response{
+ Request: &http.Request{URL: uri},
+ Status: http.StatusText(http.StatusNotFound),
+ StatusCode: http.StatusNotFound,
+ Header: map[string][]string{
+ "Content-Type": {apiutil.AppActivityLDJSON},
+ "Content-Length": {"0"},
+ },
+ }, nil
+
+ // Any other.
+ default:
+ return nil, gtserror.Newf("error getting: %w", err)
}
if util.IsNil(t) {
- // This should never happen.
- panic("nil vocab.Type after successful c.fedDB.Get call")
+ // Assert this should never happen.
+ panic(gtserror.New("nil vocab.Type"))
}
- i, err := ap.Serialize(t)
+ // Serialize type to JSON map.
+ m, err := ap.Serialize(t)
if err != nil {
return nil, err
}
- b, err := json.Marshal(i)
+ // Marshal JSON to bytes.
+ b, err := json.Marshal(m)
if err != nil {
return nil, err
}
- contentLength := len(b)
- // Return a response with AS data as body.
+ // Return a response
+ // with AS data as body.
+ contentLength := len(b)
rsp := &http.Response{
Request: &http.Request{URL: uri},
Status: http.StatusText(http.StatusOK),
diff --git a/internal/transport/dereference.go b/internal/transport/dereference.go
index a7ef83d3e..164b23cd6 100644
--- a/internal/transport/dereference.go
+++ b/internal/transport/dereference.go
@@ -25,7 +25,7 @@ import (
apiutil "code.superseriousbusiness.org/gotosocial/internal/api/util"
"code.superseriousbusiness.org/gotosocial/internal/config"
- "code.superseriousbusiness.org/gotosocial/internal/db"
+ "code.superseriousbusiness.org/gotosocial/internal/federation/federatingdb"
"code.superseriousbusiness.org/gotosocial/internal/gtserror"
"code.superseriousbusiness.org/gotosocial/internal/log"
)
@@ -38,10 +38,8 @@ func (t *transport) Dereference(ctx context.Context, iri *url.URL) (*http.Respon
// to just make a normal http request to ourself.
if iri.Host == config.GetHost() {
rsp, err := t.controller.dereferenceLocal(ctx, iri)
- if err != nil && !errors.Is(err, db.ErrNoEntries) {
- // Real error.
- err := gtserror.Newf("error trying dereferenceLocal: %w", err)
- return nil, err
+ if err != nil && !errors.Is(err, federatingdb.ErrNotImplemented) {
+ return nil, gtserror.Newf("error dereferencing local: %w", err)
}
if rsp != nil {
diff --git a/testrig/federatingdb.go b/testrig/federatingdb.go
index 28c6bd7b4..360f99db6 100644
--- a/testrig/federatingdb.go
+++ b/testrig/federatingdb.go
@@ -27,7 +27,7 @@ import (
)
// NewTestFederatingDB returns a federating DB with the underlying db
-func NewTestFederatingDB(state *state.State) federatingdb.DB {
+func NewTestFederatingDB(state *state.State) *federatingdb.DB {
return federatingdb.New(
state,
typeutils.NewConverter(state),
diff --git a/testrig/transportcontroller.go b/testrig/transportcontroller.go
index 34ed35255..641232b73 100644
--- a/testrig/transportcontroller.go
+++ b/testrig/transportcontroller.go
@@ -31,7 +31,6 @@ import (
"code.superseriousbusiness.org/activity/streams/vocab"
"code.superseriousbusiness.org/gotosocial/internal/ap"
apimodel "code.superseriousbusiness.org/gotosocial/internal/api/model"
- "code.superseriousbusiness.org/gotosocial/internal/federation"
"code.superseriousbusiness.org/gotosocial/internal/gtsmodel"
"code.superseriousbusiness.org/gotosocial/internal/httpclient"
"code.superseriousbusiness.org/gotosocial/internal/log"
@@ -56,7 +55,7 @@ const (
// PER TEST rather than per suite, so that the do function can be set on a test by test (or even more granular)
// basis.
func NewTestTransportController(state *state.State, client pub.HttpClient) transport.Controller {
- return transport.NewController(state, NewTestFederatingDB(state), &federation.Clock{}, client)
+ return transport.NewController(state, NewTestFederatingDB(state), client)
}
type MockHTTPClient struct {