summaryrefslogtreecommitdiff
path: root/internal/processing/processor.go
diff options
context:
space:
mode:
authorLibravatar kim <89579420+NyaaaWhatsUpDoc@users.noreply.github.com>2022-04-28 13:23:11 +0100
committerLibravatar GitHub <noreply@github.com>2022-04-28 13:23:11 +0100
commit420e2fb22bc7aa4967ddadb11e444079efdf5117 (patch)
tree413842c5df646c30a8079671ade5e677e3825fb8 /internal/processing/processor.go
parent[bugfix] Fix possible race condition in federatingdb (#490) (diff)
downloadgotosocial-420e2fb22bc7aa4967ddadb11e444079efdf5117.tar.xz
replace async client API / federator msg processing with worker pools (#497)
* replace async client API / federator msg processing with worker pools * appease our lord-and-saviour, the linter
Diffstat (limited to 'internal/processing/processor.go')
-rw-r--r--internal/processing/processor.go77
1 files changed, 35 insertions, 42 deletions
diff --git a/internal/processing/processor.go b/internal/processing/processor.go
index 801d325a7..2b12acf52 100644
--- a/internal/processing/processor.go
+++ b/internal/processing/processor.go
@@ -24,7 +24,6 @@ import (
"net/url"
"codeberg.org/gruf/go-store/kv"
- "github.com/sirupsen/logrus"
apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/email"
@@ -45,6 +44,7 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/timeline"
"github.com/superseriousbusiness/gotosocial/internal/typeutils"
"github.com/superseriousbusiness/gotosocial/internal/visibility"
+ "github.com/superseriousbusiness/gotosocial/internal/worker"
)
// Processor should be passed to api modules (see internal/apimodule/...). It is used for
@@ -55,7 +55,7 @@ import (
// for clean distribution of messages without slowing down the client API and harming the user experience.
type Processor interface {
// Start starts the Processor, reading from its channels and passing messages back and forth.
- Start(ctx context.Context) error
+ Start() error
// Stop stops the processor cleanly, finishing handling any remaining messages before closing down.
Stop() error
// ProcessFromClientAPI processes one message coming from the clientAPI channel, and triggers appropriate side effects.
@@ -235,10 +235,10 @@ type Processor interface {
// processor just implements the Processor interface
type processor struct {
- fromClientAPI chan messages.FromClientAPI
- fromFederator chan messages.FromFederator
+ clientWorker *worker.Worker[messages.FromClientAPI]
+ fedWorker *worker.Worker[messages.FromFederator]
+
federator federation.Federator
- stop chan interface{}
tc typeutils.TypeConverter
oauthServer oauth.Server
mediaManager media.Manager
@@ -268,26 +268,26 @@ func NewProcessor(
mediaManager media.Manager,
storage *kv.KVStore,
db db.DB,
- emailSender email.Sender) Processor {
-
- fromClientAPI := make(chan messages.FromClientAPI, 1000)
- fromFederator := make(chan messages.FromFederator, 1000)
+ emailSender email.Sender,
+ clientWorker *worker.Worker[messages.FromClientAPI],
+ fedWorker *worker.Worker[messages.FromFederator],
+) Processor {
parseMentionFunc := GetParseMentionFunc(db, federator)
- statusProcessor := status.New(db, tc, fromClientAPI, parseMentionFunc)
+ statusProcessor := status.New(db, tc, clientWorker, parseMentionFunc)
streamingProcessor := streaming.New(db, oauthServer)
- accountProcessor := account.New(db, tc, mediaManager, oauthServer, fromClientAPI, federator, parseMentionFunc)
- adminProcessor := admin.New(db, tc, mediaManager, fromClientAPI)
+ accountProcessor := account.New(db, tc, mediaManager, oauthServer, clientWorker, federator, parseMentionFunc)
+ adminProcessor := admin.New(db, tc, mediaManager, clientWorker)
mediaProcessor := mediaProcessor.New(db, tc, mediaManager, federator.TransportController(), storage)
userProcessor := user.New(db, emailSender)
- federationProcessor := federationProcessor.New(db, tc, federator, fromFederator)
+ federationProcessor := federationProcessor.New(db, tc, federator)
filter := visibility.NewFilter(db)
return &processor{
- fromClientAPI: fromClientAPI,
- fromFederator: fromFederator,
+ clientWorker: clientWorker,
+ fedWorker: fedWorker,
+
federator: federator,
- stop: make(chan interface{}),
tc: tc,
oauthServer: oauthServer,
mediaManager: mediaManager,
@@ -307,36 +307,29 @@ func NewProcessor(
}
// Start starts the Processor, reading from its channels and passing messages back and forth.
-func (p *processor) Start(ctx context.Context) error {
- go func() {
- DistLoop:
- for {
- select {
- case clientMsg := <-p.fromClientAPI:
- logrus.Tracef("received message FROM client API: %+v", clientMsg)
- go func() {
- if err := p.ProcessFromClientAPI(ctx, clientMsg); err != nil {
- logrus.Error(err)
- }
- }()
- case federatorMsg := <-p.fromFederator:
- logrus.Tracef("received message FROM federator: %+v", federatorMsg)
- go func() {
- if err := p.ProcessFromFederator(ctx, federatorMsg); err != nil {
- logrus.Error(err)
- }
- }()
- case <-p.stop:
- break DistLoop
- }
- }
- }()
+func (p *processor) Start() error {
+ // Setup and start the client API worker pool
+ p.clientWorker.SetProcessor(p.ProcessFromClientAPI)
+ if err := p.clientWorker.Start(); err != nil {
+ return err
+ }
+
+ // Setup and start the federator worker pool
+ p.fedWorker.SetProcessor(p.ProcessFromFederator)
+ if err := p.fedWorker.Start(); err != nil {
+ return err
+ }
+
return nil
}
// Stop stops the processor cleanly, finishing handling any remaining messages before closing down.
-// TODO: empty message buffer properly before stopping otherwise we'll lose federating messages.
func (p *processor) Stop() error {
- close(p.stop)
+ if err := p.clientWorker.Stop(); err != nil {
+ return err
+ }
+ if err := p.fedWorker.Stop(); err != nil {
+ return err
+ }
return nil
}