diff options
author | 2023-03-01 18:26:53 +0000 | |
---|---|---|
committer | 2023-03-01 18:26:53 +0000 | |
commit | baf933cb9f3e1053bdb61b90d7027efe9fad1bc2 (patch) | |
tree | 3f2a76851d58517ca3dece2bacd6aceefd8dfb96 /cmd | |
parent | [feature] Federate pinned posts (aka `featuredCollection`) in and out (#1560) (diff) | |
download | gotosocial-baf933cb9f3e1053bdb61b90d7027efe9fad1bc2.tar.xz |
[chore] move client/federator workerpools to Workers{} (#1575)
* replace concurrency worker pools with base models in State.Workers, update code and tests accordingly
* improve code comment
* change back testrig default log level
* un-comment-out TestAnnounceTwice() and fix
---------
Signed-off-by: kim <grufwub@gmail.com>
Reviewed-by: tobi
Diffstat (limited to 'cmd')
-rw-r--r-- | cmd/gotosocial/action/server/server.go | 18 | ||||
-rw-r--r-- | cmd/gotosocial/action/testrig/testrig.go | 62 |
2 files changed, 40 insertions, 40 deletions
diff --git a/cmd/gotosocial/action/server/server.go b/cmd/gotosocial/action/server/server.go index d43599b05..bc56e21f0 100644 --- a/cmd/gotosocial/action/server/server.go +++ b/cmd/gotosocial/action/server/server.go @@ -35,7 +35,6 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/middleware" "go.uber.org/automaxprocs/maxprocs" - "github.com/superseriousbusiness/gotosocial/internal/concurrency" "github.com/superseriousbusiness/gotosocial/internal/config" "github.com/superseriousbusiness/gotosocial/internal/db/bundb" "github.com/superseriousbusiness/gotosocial/internal/email" @@ -45,7 +44,6 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/httpclient" "github.com/superseriousbusiness/gotosocial/internal/log" "github.com/superseriousbusiness/gotosocial/internal/media" - "github.com/superseriousbusiness/gotosocial/internal/messages" "github.com/superseriousbusiness/gotosocial/internal/oauth" "github.com/superseriousbusiness/gotosocial/internal/oidc" "github.com/superseriousbusiness/gotosocial/internal/processing" @@ -107,19 +105,11 @@ var Start action.GTSAction = func(ctx context.Context) error { state.Workers.Start() defer state.Workers.Stop() - // Create the client API and federator worker pools - // NOTE: these MUST NOT be used until they are passed to the - // processor and it is started. The reason being that the processor - // sets the Worker process functions and start the underlying pools - // TODO: move these into state.Workers (and maybe reformat worker pools). - clientWorker := concurrency.NewWorkerPool[messages.FromClientAPI](-1, -1) - fedWorker := concurrency.NewWorkerPool[messages.FromFederator](-1, -1) - // build backend handlers mediaManager := media.NewManager(&state) oauthServer := oauth.New(ctx, dbService) typeConverter := typeutils.NewConverter(dbService) - federatingDB := federatingdb.New(dbService, fedWorker, typeConverter) + federatingDB := federatingdb.New(&state, typeConverter) transportController := transport.NewController(dbService, federatingDB, &federation.Clock{}, client) federator := federation.NewFederator(dbService, federatingDB, transportController, typeConverter, mediaManager) @@ -140,11 +130,15 @@ var Start action.GTSAction = func(ctx context.Context) error { } // create the message processor using the other services we've created so far - processor := processing.NewProcessor(typeConverter, federator, oauthServer, mediaManager, storage, dbService, emailSender, clientWorker, fedWorker) + processor := processing.NewProcessor(typeConverter, federator, oauthServer, mediaManager, &state, emailSender) if err := processor.Start(); err != nil { return fmt.Errorf("error creating processor: %s", err) } + // Set state client / federator worker enqueue functions + state.Workers.EnqueueClientAPI = processor.EnqueueClientAPI + state.Workers.EnqueueFederator = processor.EnqueueFederator + /* HTTP router initialization */ diff --git a/cmd/gotosocial/action/testrig/testrig.go b/cmd/gotosocial/action/testrig/testrig.go index 3be7907fe..68bb94ec3 100644 --- a/cmd/gotosocial/action/testrig/testrig.go +++ b/cmd/gotosocial/action/testrig/testrig.go @@ -33,14 +33,13 @@ import ( "github.com/superseriousbusiness/gotosocial/cmd/gotosocial/action" "github.com/superseriousbusiness/gotosocial/internal/api" apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util" - "github.com/superseriousbusiness/gotosocial/internal/concurrency" "github.com/superseriousbusiness/gotosocial/internal/config" "github.com/superseriousbusiness/gotosocial/internal/gotosocial" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/log" - "github.com/superseriousbusiness/gotosocial/internal/messages" "github.com/superseriousbusiness/gotosocial/internal/middleware" "github.com/superseriousbusiness/gotosocial/internal/oidc" + "github.com/superseriousbusiness/gotosocial/internal/state" "github.com/superseriousbusiness/gotosocial/internal/storage" "github.com/superseriousbusiness/gotosocial/internal/web" "github.com/superseriousbusiness/gotosocial/testrig" @@ -48,37 +47,44 @@ import ( // Start creates and starts a gotosocial testrig server var Start action.GTSAction = func(ctx context.Context) error { + var state state.State + testrig.InitTestConfig() testrig.InitTestLog() - dbService := testrig.NewTestDB() - testrig.StandardDBSetup(dbService, nil) - var storageBackend *storage.Driver + // Initialize caches + state.Caches.Init() + state.Caches.Start() + defer state.Caches.Stop() + + state.DB = testrig.NewTestDB(&state) + testrig.StandardDBSetup(state.DB, nil) + if os.Getenv("GTS_STORAGE_BACKEND") == "s3" { - storageBackend, _ = storage.NewS3Storage() + state.Storage, _ = storage.NewS3Storage() } else { - storageBackend = testrig.NewInMemoryStorage() + state.Storage = testrig.NewInMemoryStorage() } - testrig.StandardStorageSetup(storageBackend, "./testrig/media") + testrig.StandardStorageSetup(state.Storage, "./testrig/media") - // Create client API and federator worker pools - clientWorker := concurrency.NewWorkerPool[messages.FromClientAPI](-1, -1) - fedWorker := concurrency.NewWorkerPool[messages.FromFederator](-1, -1) + // Initialize workers. + state.Workers.Start() + defer state.Workers.Stop() // build backend handlers - transportController := testrig.NewTestTransportController(testrig.NewMockHTTPClient(func(req *http.Request) (*http.Response, error) { + transportController := testrig.NewTestTransportController(&state, testrig.NewMockHTTPClient(func(req *http.Request) (*http.Response, error) { r := io.NopCloser(bytes.NewReader([]byte{})) return &http.Response{ StatusCode: 200, Body: r, }, nil - }, ""), dbService, fedWorker) - mediaManager := testrig.NewTestMediaManager(dbService, storageBackend) - federator := testrig.NewTestFederator(dbService, transportController, storageBackend, mediaManager, fedWorker) + }, "")) + mediaManager := testrig.NewTestMediaManager(&state) + federator := testrig.NewTestFederator(&state, transportController, mediaManager) emailSender := testrig.NewEmailSender("./web/template/", nil) - processor := testrig.NewTestProcessor(dbService, storageBackend, federator, emailSender, mediaManager, clientWorker, fedWorker) + processor := testrig.NewTestProcessor(&state, federator, emailSender, mediaManager) if err := processor.Start(); err != nil { return fmt.Errorf("error starting processor: %s", err) } @@ -87,7 +93,7 @@ var Start action.GTSAction = func(ctx context.Context) error { HTTP router initialization */ - router := testrig.NewTestRouter(dbService) + router := testrig.NewTestRouter(state.DB) // attach global middlewares which are used for every request router.AttachGlobalMiddleware( @@ -112,7 +118,7 @@ var Start action.GTSAction = func(ctx context.Context) error { } } - routerSession, err := dbService.GetSession(ctx) + routerSession, err := state.DB.GetSession(ctx) if err != nil { return fmt.Errorf("error retrieving router session for session middleware: %w", err) } @@ -123,13 +129,13 @@ var Start action.GTSAction = func(ctx context.Context) error { } var ( - authModule = api.NewAuth(dbService, processor, idp, routerSession, sessionName) // auth/oauth paths - clientModule = api.NewClient(dbService, processor) // api client endpoints - fileserverModule = api.NewFileserver(processor) // fileserver endpoints - wellKnownModule = api.NewWellKnown(processor) // .well-known endpoints - nodeInfoModule = api.NewNodeInfo(processor) // nodeinfo endpoint - activityPubModule = api.NewActivityPub(dbService, processor) // ActivityPub endpoints - webModule = web.New(dbService, processor) // web pages + user profiles + settings panels etc + authModule = api.NewAuth(state.DB, processor, idp, routerSession, sessionName) // auth/oauth paths + clientModule = api.NewClient(state.DB, processor) // api client endpoints + fileserverModule = api.NewFileserver(processor) // fileserver endpoints + wellKnownModule = api.NewWellKnown(processor) // .well-known endpoints + nodeInfoModule = api.NewNodeInfo(processor) // nodeinfo endpoint + activityPubModule = api.NewActivityPub(state.DB, processor) // ActivityPub endpoints + webModule = web.New(state.DB, processor) // web pages + user profiles + settings panels etc ) // these should be routed in order @@ -142,7 +148,7 @@ var Start action.GTSAction = func(ctx context.Context) error { activityPubModule.RoutePublicKey(router) webModule.Route(router) - gts, err := gotosocial.NewServer(dbService, router, federator, mediaManager) + gts, err := gotosocial.NewServer(state.DB, router, federator, mediaManager) if err != nil { return fmt.Errorf("error creating gotosocial service: %s", err) } @@ -157,8 +163,8 @@ var Start action.GTSAction = func(ctx context.Context) error { sig := <-sigs log.Infof(ctx, "received signal %s, shutting down", sig) - testrig.StandardDBTeardown(dbService) - testrig.StandardStorageTeardown(storageBackend) + testrig.StandardDBTeardown(state.DB) + testrig.StandardStorageTeardown(state.Storage) // close down all running services in order if err := gts.Stop(ctx); err != nil { |