From baf933cb9f3e1053bdb61b90d7027efe9fad1bc2 Mon Sep 17 00:00:00 2001 From: kim <89579420+NyaaaWhatsUpDoc@users.noreply.github.com> Date: Wed, 1 Mar 2023 18:26:53 +0000 Subject: [chore] move client/federator workerpools to Workers{} (#1575) * replace concurrency worker pools with base models in State.Workers, update code and tests accordingly * improve code comment * change back testrig default log level * un-comment-out TestAnnounceTwice() and fix --------- Signed-off-by: kim Reviewed-by: tobi --- internal/processing/processor.go | 126 +++++++++++++++++---------------------- 1 file changed, 56 insertions(+), 70 deletions(-) (limited to 'internal/processing/processor.go') diff --git a/internal/processing/processor.go b/internal/processing/processor.go index 07fcdb8b3..bb75aab76 100644 --- a/internal/processing/processor.go +++ b/internal/processing/processor.go @@ -19,10 +19,11 @@ package processing import ( - "github.com/superseriousbusiness/gotosocial/internal/concurrency" - "github.com/superseriousbusiness/gotosocial/internal/db" + "context" + "github.com/superseriousbusiness/gotosocial/internal/email" "github.com/superseriousbusiness/gotosocial/internal/federation" + "github.com/superseriousbusiness/gotosocial/internal/log" mm "github.com/superseriousbusiness/gotosocial/internal/media" "github.com/superseriousbusiness/gotosocial/internal/messages" "github.com/superseriousbusiness/gotosocial/internal/oauth" @@ -34,23 +35,19 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/processing/status" "github.com/superseriousbusiness/gotosocial/internal/processing/stream" "github.com/superseriousbusiness/gotosocial/internal/processing/user" - "github.com/superseriousbusiness/gotosocial/internal/storage" + "github.com/superseriousbusiness/gotosocial/internal/state" "github.com/superseriousbusiness/gotosocial/internal/timeline" "github.com/superseriousbusiness/gotosocial/internal/typeutils" "github.com/superseriousbusiness/gotosocial/internal/visibility" ) type Processor struct { - clientWorker *concurrency.WorkerPool[messages.FromClientAPI] - fedWorker *concurrency.WorkerPool[messages.FromFederator] - federator federation.Federator tc typeutils.TypeConverter oauthServer oauth.Server mediaManager mm.Manager - storage *storage.Driver statusTimelines timeline.Manager - db db.DB + state *state.State filter visibility.Filter /* @@ -105,76 +102,65 @@ func NewProcessor( federator federation.Federator, oauthServer oauth.Server, mediaManager mm.Manager, - storage *storage.Driver, - db db.DB, + state *state.State, emailSender email.Sender, - clientWorker *concurrency.WorkerPool[messages.FromClientAPI], - fedWorker *concurrency.WorkerPool[messages.FromFederator], ) *Processor { - parseMentionFunc := GetParseMentionFunc(db, federator) - - filter := visibility.NewFilter(db) - - return &Processor{ - clientWorker: clientWorker, - fedWorker: fedWorker, - - federator: federator, - tc: tc, - oauthServer: oauthServer, - mediaManager: mediaManager, - storage: storage, - statusTimelines: timeline.NewManager(StatusGrabFunction(db), StatusFilterFunction(db, filter), StatusPrepareFunction(db, tc), StatusSkipInsertFunction()), - db: db, - filter: filter, - - // sub processors - account: account.New(db, tc, mediaManager, oauthServer, clientWorker, federator, parseMentionFunc), - admin: admin.New(db, tc, mediaManager, federator.TransportController(), storage, clientWorker), - fedi: fedi.New(db, tc, federator), - media: media.New(db, tc, mediaManager, federator.TransportController(), storage), - report: report.New(db, tc, clientWorker), - status: status.New(db, tc, clientWorker, parseMentionFunc), - stream: stream.New(db, oauthServer), - user: user.New(db, emailSender), + parseMentionFunc := GetParseMentionFunc(state.DB, federator) + + filter := visibility.NewFilter(state.DB) + + processor := &Processor{ + federator: federator, + tc: tc, + oauthServer: oauthServer, + mediaManager: mediaManager, + statusTimelines: timeline.NewManager( + StatusGrabFunction(state.DB), + StatusFilterFunction(state.DB, filter), + StatusPrepareFunction(state.DB, tc), + StatusSkipInsertFunction(), + ), + state: state, + filter: filter, } -} -// Start starts the Processor, reading from its channels and passing messages back and forth. -func (p *Processor) Start() error { - // Setup and start the client API worker pool - p.clientWorker.SetProcessor(p.ProcessFromClientAPI) - if err := p.clientWorker.Start(); err != nil { - return err - } + // sub processors + processor.account = account.New(state, tc, mediaManager, oauthServer, federator, parseMentionFunc) + processor.admin = admin.New(state, tc, mediaManager, federator.TransportController()) + processor.fedi = fedi.New(state, tc, federator) + processor.media = media.New(state, tc, mediaManager, federator.TransportController()) + processor.report = report.New(state, tc) + processor.status = status.New(state, tc, parseMentionFunc) + processor.stream = stream.New(state, oauthServer) + processor.user = user.New(state, emailSender) + + return processor +} - // Setup and start the federator worker pool - p.fedWorker.SetProcessor(p.ProcessFromFederator) - if err := p.fedWorker.Start(); err != nil { - return err - } +func (p *Processor) EnqueueClientAPI(ctx context.Context, msg messages.FromClientAPI) { + log.WithContext(ctx).WithField("msg", msg).Trace("enqueuing client API") + _ = p.state.Workers.ClientAPI.MustEnqueueCtx(ctx, func(ctx context.Context) { + if err := p.ProcessFromClientAPI(ctx, msg); err != nil { + log.Errorf(ctx, "error processing client API message: %v", err) + } + }) +} - // Start status timelines - if err := p.statusTimelines.Start(); err != nil { - return err - } +func (p *Processor) EnqueueFederator(ctx context.Context, msg messages.FromFederator) { + log.WithContext(ctx).WithField("msg", msg).Trace("enqueuing federator") + _ = p.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { + if err := p.ProcessFromFederator(ctx, msg); err != nil { + log.Errorf(ctx, "error processing federator message: %v", err) + } + }) +} - return nil +// Start starts the Processor. +func (p *Processor) Start() error { + return p.statusTimelines.Start() } -// Stop stops the processor cleanly, finishing handling any remaining messages before closing down. +// Stop stops the processor cleanly. func (p *Processor) Stop() error { - if err := p.clientWorker.Stop(); err != nil { - return err - } - - if err := p.fedWorker.Stop(); err != nil { - return err - } - - if err := p.statusTimelines.Stop(); err != nil { - return err - } - - return nil + return p.statusTimelines.Stop() } -- cgit v1.2.3