diff options
author | 2022-04-28 13:23:11 +0100 | |
---|---|---|
committer | 2022-04-28 13:23:11 +0100 | |
commit | 420e2fb22bc7aa4967ddadb11e444079efdf5117 (patch) | |
tree | 413842c5df646c30a8079671ade5e677e3825fb8 /internal/processing/processor.go | |
parent | [bugfix] Fix possible race condition in federatingdb (#490) (diff) | |
download | gotosocial-420e2fb22bc7aa4967ddadb11e444079efdf5117.tar.xz |
replace async client API / federator msg processing with worker pools (#497)
* replace async client API / federator msg processing with worker pools
* appease our lord-and-saviour, the linter
Diffstat (limited to 'internal/processing/processor.go')
-rw-r--r-- | internal/processing/processor.go | 77 |
1 files changed, 35 insertions, 42 deletions
diff --git a/internal/processing/processor.go b/internal/processing/processor.go index 801d325a7..2b12acf52 100644 --- a/internal/processing/processor.go +++ b/internal/processing/processor.go @@ -24,7 +24,6 @@ import ( "net/url" "codeberg.org/gruf/go-store/kv" - "github.com/sirupsen/logrus" apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" "github.com/superseriousbusiness/gotosocial/internal/db" "github.com/superseriousbusiness/gotosocial/internal/email" @@ -45,6 +44,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/timeline" "github.com/superseriousbusiness/gotosocial/internal/typeutils" "github.com/superseriousbusiness/gotosocial/internal/visibility" + "github.com/superseriousbusiness/gotosocial/internal/worker" ) // Processor should be passed to api modules (see internal/apimodule/...). It is used for @@ -55,7 +55,7 @@ import ( // for clean distribution of messages without slowing down the client API and harming the user experience. type Processor interface { // Start starts the Processor, reading from its channels and passing messages back and forth. - Start(ctx context.Context) error + Start() error // Stop stops the processor cleanly, finishing handling any remaining messages before closing down. Stop() error // ProcessFromClientAPI processes one message coming from the clientAPI channel, and triggers appropriate side effects. @@ -235,10 +235,10 @@ type Processor interface { // processor just implements the Processor interface type processor struct { - fromClientAPI chan messages.FromClientAPI - fromFederator chan messages.FromFederator + clientWorker *worker.Worker[messages.FromClientAPI] + fedWorker *worker.Worker[messages.FromFederator] + federator federation.Federator - stop chan interface{} tc typeutils.TypeConverter oauthServer oauth.Server mediaManager media.Manager @@ -268,26 +268,26 @@ func NewProcessor( mediaManager media.Manager, storage *kv.KVStore, db db.DB, - emailSender email.Sender) Processor { - - fromClientAPI := make(chan messages.FromClientAPI, 1000) - fromFederator := make(chan messages.FromFederator, 1000) + emailSender email.Sender, + clientWorker *worker.Worker[messages.FromClientAPI], + fedWorker *worker.Worker[messages.FromFederator], +) Processor { parseMentionFunc := GetParseMentionFunc(db, federator) - statusProcessor := status.New(db, tc, fromClientAPI, parseMentionFunc) + statusProcessor := status.New(db, tc, clientWorker, parseMentionFunc) streamingProcessor := streaming.New(db, oauthServer) - accountProcessor := account.New(db, tc, mediaManager, oauthServer, fromClientAPI, federator, parseMentionFunc) - adminProcessor := admin.New(db, tc, mediaManager, fromClientAPI) + accountProcessor := account.New(db, tc, mediaManager, oauthServer, clientWorker, federator, parseMentionFunc) + adminProcessor := admin.New(db, tc, mediaManager, clientWorker) mediaProcessor := mediaProcessor.New(db, tc, mediaManager, federator.TransportController(), storage) userProcessor := user.New(db, emailSender) - federationProcessor := federationProcessor.New(db, tc, federator, fromFederator) + federationProcessor := federationProcessor.New(db, tc, federator) filter := visibility.NewFilter(db) return &processor{ - fromClientAPI: fromClientAPI, - fromFederator: fromFederator, + clientWorker: clientWorker, + fedWorker: fedWorker, + federator: federator, - stop: make(chan interface{}), tc: tc, oauthServer: oauthServer, mediaManager: mediaManager, @@ -307,36 +307,29 @@ func NewProcessor( } // Start starts the Processor, reading from its channels and passing messages back and forth. -func (p *processor) Start(ctx context.Context) error { - go func() { - DistLoop: - for { - select { - case clientMsg := <-p.fromClientAPI: - logrus.Tracef("received message FROM client API: %+v", clientMsg) - go func() { - if err := p.ProcessFromClientAPI(ctx, clientMsg); err != nil { - logrus.Error(err) - } - }() - case federatorMsg := <-p.fromFederator: - logrus.Tracef("received message FROM federator: %+v", federatorMsg) - go func() { - if err := p.ProcessFromFederator(ctx, federatorMsg); err != nil { - logrus.Error(err) - } - }() - case <-p.stop: - break DistLoop - } - } - }() +func (p *processor) Start() error { + // Setup and start the client API worker pool + p.clientWorker.SetProcessor(p.ProcessFromClientAPI) + if err := p.clientWorker.Start(); err != nil { + return err + } + + // Setup and start the federator worker pool + p.fedWorker.SetProcessor(p.ProcessFromFederator) + if err := p.fedWorker.Start(); err != nil { + return err + } + return nil } // Stop stops the processor cleanly, finishing handling any remaining messages before closing down. -// TODO: empty message buffer properly before stopping otherwise we'll lose federating messages. func (p *processor) Stop() error { - close(p.stop) + if err := p.clientWorker.Stop(); err != nil { + return err + } + if err := p.fedWorker.Stop(); err != nil { + return err + } return nil } |