summaryrefslogtreecommitdiff
path: root/cmd
diff options
context:
space:
mode:
authorLibravatar kim <89579420+NyaaaWhatsUpDoc@users.noreply.github.com>2023-03-01 18:26:53 +0000
committerLibravatar GitHub <noreply@github.com>2023-03-01 18:26:53 +0000
commitbaf933cb9f3e1053bdb61b90d7027efe9fad1bc2 (patch)
tree3f2a76851d58517ca3dece2bacd6aceefd8dfb96 /cmd
parent[feature] Federate pinned posts (aka `featuredCollection`) in and out (#1560) (diff)
downloadgotosocial-baf933cb9f3e1053bdb61b90d7027efe9fad1bc2.tar.xz
[chore] move client/federator workerpools to Workers{} (#1575)
* replace concurrency worker pools with base models in State.Workers, update code and tests accordingly * improve code comment * change back testrig default log level * un-comment-out TestAnnounceTwice() and fix --------- Signed-off-by: kim <grufwub@gmail.com> Reviewed-by: tobi
Diffstat (limited to 'cmd')
-rw-r--r--cmd/gotosocial/action/server/server.go18
-rw-r--r--cmd/gotosocial/action/testrig/testrig.go62
2 files changed, 40 insertions, 40 deletions
diff --git a/cmd/gotosocial/action/server/server.go b/cmd/gotosocial/action/server/server.go
index d43599b05..bc56e21f0 100644
--- a/cmd/gotosocial/action/server/server.go
+++ b/cmd/gotosocial/action/server/server.go
@@ -35,7 +35,6 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/middleware"
"go.uber.org/automaxprocs/maxprocs"
- "github.com/superseriousbusiness/gotosocial/internal/concurrency"
"github.com/superseriousbusiness/gotosocial/internal/config"
"github.com/superseriousbusiness/gotosocial/internal/db/bundb"
"github.com/superseriousbusiness/gotosocial/internal/email"
@@ -45,7 +44,6 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/httpclient"
"github.com/superseriousbusiness/gotosocial/internal/log"
"github.com/superseriousbusiness/gotosocial/internal/media"
- "github.com/superseriousbusiness/gotosocial/internal/messages"
"github.com/superseriousbusiness/gotosocial/internal/oauth"
"github.com/superseriousbusiness/gotosocial/internal/oidc"
"github.com/superseriousbusiness/gotosocial/internal/processing"
@@ -107,19 +105,11 @@ var Start action.GTSAction = func(ctx context.Context) error {
state.Workers.Start()
defer state.Workers.Stop()
- // Create the client API and federator worker pools
- // NOTE: these MUST NOT be used until they are passed to the
- // processor and it is started. The reason being that the processor
- // sets the Worker process functions and start the underlying pools
- // TODO: move these into state.Workers (and maybe reformat worker pools).
- clientWorker := concurrency.NewWorkerPool[messages.FromClientAPI](-1, -1)
- fedWorker := concurrency.NewWorkerPool[messages.FromFederator](-1, -1)
-
// build backend handlers
mediaManager := media.NewManager(&state)
oauthServer := oauth.New(ctx, dbService)
typeConverter := typeutils.NewConverter(dbService)
- federatingDB := federatingdb.New(dbService, fedWorker, typeConverter)
+ federatingDB := federatingdb.New(&state, typeConverter)
transportController := transport.NewController(dbService, federatingDB, &federation.Clock{}, client)
federator := federation.NewFederator(dbService, federatingDB, transportController, typeConverter, mediaManager)
@@ -140,11 +130,15 @@ var Start action.GTSAction = func(ctx context.Context) error {
}
// create the message processor using the other services we've created so far
- processor := processing.NewProcessor(typeConverter, federator, oauthServer, mediaManager, storage, dbService, emailSender, clientWorker, fedWorker)
+ processor := processing.NewProcessor(typeConverter, federator, oauthServer, mediaManager, &state, emailSender)
if err := processor.Start(); err != nil {
return fmt.Errorf("error creating processor: %s", err)
}
+ // Set state client / federator worker enqueue functions
+ state.Workers.EnqueueClientAPI = processor.EnqueueClientAPI
+ state.Workers.EnqueueFederator = processor.EnqueueFederator
+
/*
HTTP router initialization
*/
diff --git a/cmd/gotosocial/action/testrig/testrig.go b/cmd/gotosocial/action/testrig/testrig.go
index 3be7907fe..68bb94ec3 100644
--- a/cmd/gotosocial/action/testrig/testrig.go
+++ b/cmd/gotosocial/action/testrig/testrig.go
@@ -33,14 +33,13 @@ import (
"github.com/superseriousbusiness/gotosocial/cmd/gotosocial/action"
"github.com/superseriousbusiness/gotosocial/internal/api"
apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util"
- "github.com/superseriousbusiness/gotosocial/internal/concurrency"
"github.com/superseriousbusiness/gotosocial/internal/config"
"github.com/superseriousbusiness/gotosocial/internal/gotosocial"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/log"
- "github.com/superseriousbusiness/gotosocial/internal/messages"
"github.com/superseriousbusiness/gotosocial/internal/middleware"
"github.com/superseriousbusiness/gotosocial/internal/oidc"
+ "github.com/superseriousbusiness/gotosocial/internal/state"
"github.com/superseriousbusiness/gotosocial/internal/storage"
"github.com/superseriousbusiness/gotosocial/internal/web"
"github.com/superseriousbusiness/gotosocial/testrig"
@@ -48,37 +47,44 @@ import (
// Start creates and starts a gotosocial testrig server
var Start action.GTSAction = func(ctx context.Context) error {
+ var state state.State
+
testrig.InitTestConfig()
testrig.InitTestLog()
- dbService := testrig.NewTestDB()
- testrig.StandardDBSetup(dbService, nil)
- var storageBackend *storage.Driver
+ // Initialize caches
+ state.Caches.Init()
+ state.Caches.Start()
+ defer state.Caches.Stop()
+
+ state.DB = testrig.NewTestDB(&state)
+ testrig.StandardDBSetup(state.DB, nil)
+
if os.Getenv("GTS_STORAGE_BACKEND") == "s3" {
- storageBackend, _ = storage.NewS3Storage()
+ state.Storage, _ = storage.NewS3Storage()
} else {
- storageBackend = testrig.NewInMemoryStorage()
+ state.Storage = testrig.NewInMemoryStorage()
}
- testrig.StandardStorageSetup(storageBackend, "./testrig/media")
+ testrig.StandardStorageSetup(state.Storage, "./testrig/media")
- // Create client API and federator worker pools
- clientWorker := concurrency.NewWorkerPool[messages.FromClientAPI](-1, -1)
- fedWorker := concurrency.NewWorkerPool[messages.FromFederator](-1, -1)
+ // Initialize workers.
+ state.Workers.Start()
+ defer state.Workers.Stop()
// build backend handlers
- transportController := testrig.NewTestTransportController(testrig.NewMockHTTPClient(func(req *http.Request) (*http.Response, error) {
+ transportController := testrig.NewTestTransportController(&state, testrig.NewMockHTTPClient(func(req *http.Request) (*http.Response, error) {
r := io.NopCloser(bytes.NewReader([]byte{}))
return &http.Response{
StatusCode: 200,
Body: r,
}, nil
- }, ""), dbService, fedWorker)
- mediaManager := testrig.NewTestMediaManager(dbService, storageBackend)
- federator := testrig.NewTestFederator(dbService, transportController, storageBackend, mediaManager, fedWorker)
+ }, ""))
+ mediaManager := testrig.NewTestMediaManager(&state)
+ federator := testrig.NewTestFederator(&state, transportController, mediaManager)
emailSender := testrig.NewEmailSender("./web/template/", nil)
- processor := testrig.NewTestProcessor(dbService, storageBackend, federator, emailSender, mediaManager, clientWorker, fedWorker)
+ processor := testrig.NewTestProcessor(&state, federator, emailSender, mediaManager)
if err := processor.Start(); err != nil {
return fmt.Errorf("error starting processor: %s", err)
}
@@ -87,7 +93,7 @@ var Start action.GTSAction = func(ctx context.Context) error {
HTTP router initialization
*/
- router := testrig.NewTestRouter(dbService)
+ router := testrig.NewTestRouter(state.DB)
// attach global middlewares which are used for every request
router.AttachGlobalMiddleware(
@@ -112,7 +118,7 @@ var Start action.GTSAction = func(ctx context.Context) error {
}
}
- routerSession, err := dbService.GetSession(ctx)
+ routerSession, err := state.DB.GetSession(ctx)
if err != nil {
return fmt.Errorf("error retrieving router session for session middleware: %w", err)
}
@@ -123,13 +129,13 @@ var Start action.GTSAction = func(ctx context.Context) error {
}
var (
- authModule = api.NewAuth(dbService, processor, idp, routerSession, sessionName) // auth/oauth paths
- clientModule = api.NewClient(dbService, processor) // api client endpoints
- fileserverModule = api.NewFileserver(processor) // fileserver endpoints
- wellKnownModule = api.NewWellKnown(processor) // .well-known endpoints
- nodeInfoModule = api.NewNodeInfo(processor) // nodeinfo endpoint
- activityPubModule = api.NewActivityPub(dbService, processor) // ActivityPub endpoints
- webModule = web.New(dbService, processor) // web pages + user profiles + settings panels etc
+ authModule = api.NewAuth(state.DB, processor, idp, routerSession, sessionName) // auth/oauth paths
+ clientModule = api.NewClient(state.DB, processor) // api client endpoints
+ fileserverModule = api.NewFileserver(processor) // fileserver endpoints
+ wellKnownModule = api.NewWellKnown(processor) // .well-known endpoints
+ nodeInfoModule = api.NewNodeInfo(processor) // nodeinfo endpoint
+ activityPubModule = api.NewActivityPub(state.DB, processor) // ActivityPub endpoints
+ webModule = web.New(state.DB, processor) // web pages + user profiles + settings panels etc
)
// these should be routed in order
@@ -142,7 +148,7 @@ var Start action.GTSAction = func(ctx context.Context) error {
activityPubModule.RoutePublicKey(router)
webModule.Route(router)
- gts, err := gotosocial.NewServer(dbService, router, federator, mediaManager)
+ gts, err := gotosocial.NewServer(state.DB, router, federator, mediaManager)
if err != nil {
return fmt.Errorf("error creating gotosocial service: %s", err)
}
@@ -157,8 +163,8 @@ var Start action.GTSAction = func(ctx context.Context) error {
sig := <-sigs
log.Infof(ctx, "received signal %s, shutting down", sig)
- testrig.StandardDBTeardown(dbService)
- testrig.StandardStorageTeardown(storageBackend)
+ testrig.StandardDBTeardown(state.DB)
+ testrig.StandardStorageTeardown(state.Storage)
// close down all running services in order
if err := gts.Stop(ctx); err != nil {