diff options
Diffstat (limited to 'internal/processing/processor.go')
-rw-r--r-- | internal/processing/processor.go | 97 |
1 files changed, 49 insertions, 48 deletions
diff --git a/internal/processing/processor.go b/internal/processing/processor.go index 2f1f43826..c0fd15a24 100644 --- a/internal/processing/processor.go +++ b/internal/processing/processor.go @@ -18,13 +18,9 @@ package processing import ( - "context" - "github.com/superseriousbusiness/gotosocial/internal/email" "github.com/superseriousbusiness/gotosocial/internal/federation" - "github.com/superseriousbusiness/gotosocial/internal/log" mm "github.com/superseriousbusiness/gotosocial/internal/media" - "github.com/superseriousbusiness/gotosocial/internal/messages" "github.com/superseriousbusiness/gotosocial/internal/oauth" "github.com/superseriousbusiness/gotosocial/internal/processing/account" "github.com/superseriousbusiness/gotosocial/internal/processing/admin" @@ -38,19 +34,23 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/processing/stream" "github.com/superseriousbusiness/gotosocial/internal/processing/timeline" "github.com/superseriousbusiness/gotosocial/internal/processing/user" + "github.com/superseriousbusiness/gotosocial/internal/processing/workers" "github.com/superseriousbusiness/gotosocial/internal/state" "github.com/superseriousbusiness/gotosocial/internal/typeutils" "github.com/superseriousbusiness/gotosocial/internal/visibility" ) +// Processor groups together processing functions and +// sub processors for handling actions + events coming +// from either the client or federating APIs. +// +// Many of the functions available through this struct +// or sub processors will trigger asynchronous processing +// via the workers contained in state. type Processor struct { - federator federation.Federator - tc typeutils.TypeConverter - oauthServer oauth.Server - mediaManager *mm.Manager - state *state.State - emailSender email.Sender - filter *visibility.Filter + tc typeutils.TypeConverter + oauthServer oauth.Server + state *state.State /* SUB-PROCESSORS @@ -68,6 +68,7 @@ type Processor struct { stream stream.Processor timeline timeline.Processor user user.Processor + workers workers.Processor } func (p *Processor) Account() *account.Processor { @@ -118,6 +119,10 @@ func (p *Processor) User() *user.Processor { return &p.user } +func (p *Processor) Workers() *workers.Processor { + return &p.workers +} + // NewProcessor returns a new Processor. func NewProcessor( tc typeutils.TypeConverter, @@ -127,57 +132,53 @@ func NewProcessor( state *state.State, emailSender email.Sender, ) *Processor { - parseMentionFunc := GetParseMentionFunc(state.DB, federator) - - filter := visibility.NewFilter(state) + var ( + parseMentionFunc = GetParseMentionFunc(state.DB, federator) + filter = visibility.NewFilter(state) + ) processor := &Processor{ - federator: federator, - tc: tc, - oauthServer: oauthServer, - mediaManager: mediaManager, - state: state, - filter: filter, - emailSender: emailSender, + tc: tc, + oauthServer: oauthServer, + state: state, } // Instantiate sub processors. - processor.account = account.New(state, tc, mediaManager, oauthServer, federator, filter, parseMentionFunc) + // + // Start with sub processors that will + // be required by the workers processor. + accountProcessor := account.New(state, tc, mediaManager, oauthServer, federator, filter, parseMentionFunc) + mediaProcessor := media.New(state, tc, mediaManager, federator.TransportController()) + streamProcessor := stream.New(state, oauthServer) + + // Instantiate the rest of the sub + // processors + pin them to this struct. + processor.account = accountProcessor processor.admin = admin.New(state, tc, mediaManager, federator.TransportController(), emailSender) processor.fedi = fedi.New(state, tc, federator, filter) processor.list = list.New(state, tc) processor.markers = markers.New(state, tc) - processor.media = media.New(state, tc, mediaManager, federator.TransportController()) + processor.media = mediaProcessor processor.report = report.New(state, tc) processor.timeline = timeline.New(state, tc, filter) processor.search = search.New(state, federator, tc, filter) processor.status = status.New(state, federator, tc, filter, parseMentionFunc) - processor.stream = stream.New(state, oauthServer) + processor.stream = streamProcessor processor.user = user.New(state, emailSender) - return processor -} + // Workers processor handles asynchronous + // worker jobs; instantiate it separately + // and pass subset of sub processors it needs. + processor.workers = workers.New( + state, + federator, + tc, + filter, + emailSender, + &accountProcessor, + &mediaProcessor, + &streamProcessor, + ) -func (p *Processor) EnqueueClientAPI(ctx context.Context, msgs ...messages.FromClientAPI) { - log.Trace(ctx, "enqueuing") - _ = p.state.Workers.ClientAPI.MustEnqueueCtx(ctx, func(ctx context.Context) { - for _, msg := range msgs { - log.Trace(ctx, "processing: %+v", msg) - if err := p.ProcessFromClientAPI(ctx, msg); err != nil { - log.Errorf(ctx, "error processing client API message: %v", err) - } - } - }) -} - -func (p *Processor) EnqueueFederator(ctx context.Context, msgs ...messages.FromFederator) { - log.Trace(ctx, "enqueuing") - _ = p.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { - for _, msg := range msgs { - log.Trace(ctx, "processing: %+v", msg) - if err := p.ProcessFromFederator(ctx, msg); err != nil { - log.Errorf(ctx, "error processing federator message: %v", err) - } - } - }) + return processor } |