summaryrefslogtreecommitdiff
path: root/internal/processing/processor.go
diff options
context:
space:
mode:
authorLibravatar tobi <31960611+tsmethurst@users.noreply.github.com>2023-08-09 19:14:33 +0200
committerLibravatar GitHub <noreply@github.com>2023-08-09 19:14:33 +0200
commit9770d54237bea828cab7e50aec7dff452c203138 (patch)
tree59c444a02e81925bab47d3656a489a8c7087d530 /internal/processing/processor.go
parent[bugfix] Fix incorrect per-loop variable capture (#2092) (diff)
downloadgotosocial-9770d54237bea828cab7e50aec7dff452c203138.tar.xz
[feature] List replies policy, refactor async workers (#2087)
* Add/update some DB functions. * move async workers into subprocessor * rename FromFederator -> FromFediAPI * update home timeline check to include check for current status first before moving to parent status * change streamMap to pointer to mollify linter * update followtoas func signature * fix merge * remove errant debug log * don't use separate errs.Combine() check to wrap errs * wrap parts of workers functionality in sub-structs * populate report using new db funcs * embed federator (tiny bit tidier) * flesh out error msg, add continue(!) * fix other error messages to be more specific * better, nicer * give parseURI util function a bit more util * missing headers * use pointers for subprocessors
Diffstat (limited to 'internal/processing/processor.go')
-rw-r--r--internal/processing/processor.go97
1 files changed, 49 insertions, 48 deletions
diff --git a/internal/processing/processor.go b/internal/processing/processor.go
index 2f1f43826..c0fd15a24 100644
--- a/internal/processing/processor.go
+++ b/internal/processing/processor.go
@@ -18,13 +18,9 @@
package processing
import (
- "context"
-
"github.com/superseriousbusiness/gotosocial/internal/email"
"github.com/superseriousbusiness/gotosocial/internal/federation"
- "github.com/superseriousbusiness/gotosocial/internal/log"
mm "github.com/superseriousbusiness/gotosocial/internal/media"
- "github.com/superseriousbusiness/gotosocial/internal/messages"
"github.com/superseriousbusiness/gotosocial/internal/oauth"
"github.com/superseriousbusiness/gotosocial/internal/processing/account"
"github.com/superseriousbusiness/gotosocial/internal/processing/admin"
@@ -38,19 +34,23 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/processing/stream"
"github.com/superseriousbusiness/gotosocial/internal/processing/timeline"
"github.com/superseriousbusiness/gotosocial/internal/processing/user"
+ "github.com/superseriousbusiness/gotosocial/internal/processing/workers"
"github.com/superseriousbusiness/gotosocial/internal/state"
"github.com/superseriousbusiness/gotosocial/internal/typeutils"
"github.com/superseriousbusiness/gotosocial/internal/visibility"
)
+// Processor groups together processing functions and
+// sub processors for handling actions + events coming
+// from either the client or federating APIs.
+//
+// Many of the functions available through this struct
+// or sub processors will trigger asynchronous processing
+// via the workers contained in state.
type Processor struct {
- federator federation.Federator
- tc typeutils.TypeConverter
- oauthServer oauth.Server
- mediaManager *mm.Manager
- state *state.State
- emailSender email.Sender
- filter *visibility.Filter
+ tc typeutils.TypeConverter
+ oauthServer oauth.Server
+ state *state.State
/*
SUB-PROCESSORS
@@ -68,6 +68,7 @@ type Processor struct {
stream stream.Processor
timeline timeline.Processor
user user.Processor
+ workers workers.Processor
}
func (p *Processor) Account() *account.Processor {
@@ -118,6 +119,10 @@ func (p *Processor) User() *user.Processor {
return &p.user
}
+func (p *Processor) Workers() *workers.Processor {
+ return &p.workers
+}
+
// NewProcessor returns a new Processor.
func NewProcessor(
tc typeutils.TypeConverter,
@@ -127,57 +132,53 @@ func NewProcessor(
state *state.State,
emailSender email.Sender,
) *Processor {
- parseMentionFunc := GetParseMentionFunc(state.DB, federator)
-
- filter := visibility.NewFilter(state)
+ var (
+ parseMentionFunc = GetParseMentionFunc(state.DB, federator)
+ filter = visibility.NewFilter(state)
+ )
processor := &Processor{
- federator: federator,
- tc: tc,
- oauthServer: oauthServer,
- mediaManager: mediaManager,
- state: state,
- filter: filter,
- emailSender: emailSender,
+ tc: tc,
+ oauthServer: oauthServer,
+ state: state,
}
// Instantiate sub processors.
- processor.account = account.New(state, tc, mediaManager, oauthServer, federator, filter, parseMentionFunc)
+ //
+ // Start with sub processors that will
+ // be required by the workers processor.
+ accountProcessor := account.New(state, tc, mediaManager, oauthServer, federator, filter, parseMentionFunc)
+ mediaProcessor := media.New(state, tc, mediaManager, federator.TransportController())
+ streamProcessor := stream.New(state, oauthServer)
+
+ // Instantiate the rest of the sub
+ // processors + pin them to this struct.
+ processor.account = accountProcessor
processor.admin = admin.New(state, tc, mediaManager, federator.TransportController(), emailSender)
processor.fedi = fedi.New(state, tc, federator, filter)
processor.list = list.New(state, tc)
processor.markers = markers.New(state, tc)
- processor.media = media.New(state, tc, mediaManager, federator.TransportController())
+ processor.media = mediaProcessor
processor.report = report.New(state, tc)
processor.timeline = timeline.New(state, tc, filter)
processor.search = search.New(state, federator, tc, filter)
processor.status = status.New(state, federator, tc, filter, parseMentionFunc)
- processor.stream = stream.New(state, oauthServer)
+ processor.stream = streamProcessor
processor.user = user.New(state, emailSender)
- return processor
-}
+ // Workers processor handles asynchronous
+ // worker jobs; instantiate it separately
+ // and pass subset of sub processors it needs.
+ processor.workers = workers.New(
+ state,
+ federator,
+ tc,
+ filter,
+ emailSender,
+ &accountProcessor,
+ &mediaProcessor,
+ &streamProcessor,
+ )
-func (p *Processor) EnqueueClientAPI(ctx context.Context, msgs ...messages.FromClientAPI) {
- log.Trace(ctx, "enqueuing")
- _ = p.state.Workers.ClientAPI.MustEnqueueCtx(ctx, func(ctx context.Context) {
- for _, msg := range msgs {
- log.Trace(ctx, "processing: %+v", msg)
- if err := p.ProcessFromClientAPI(ctx, msg); err != nil {
- log.Errorf(ctx, "error processing client API message: %v", err)
- }
- }
- })
-}
-
-func (p *Processor) EnqueueFederator(ctx context.Context, msgs ...messages.FromFederator) {
- log.Trace(ctx, "enqueuing")
- _ = p.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) {
- for _, msg := range msgs {
- log.Trace(ctx, "processing: %+v", msg)
- if err := p.ProcessFromFederator(ctx, msg); err != nil {
- log.Errorf(ctx, "error processing federator message: %v", err)
- }
- }
- })
+ return processor
}