mirror of
				https://github.com/superseriousbusiness/gotosocial
				synced 2025-06-05 21:59:39 +02:00 
			
		
		
		
	[bugfix] process account delete side effects in serial, not in parallel (#2360)
* [bugfix] process account delete side effects in serial, not in parallel * StartWorkers / StartNoopWorkers for tests * undo testrig trace logging * log errors instead of immediately returning
This commit is contained in:
		| @@ -66,7 +66,7 @@ func (suite *EmojiGetTestSuite) SetupSuite() { | ||||
|  | ||||
| func (suite *EmojiGetTestSuite) SetupTest() { | ||||
| 	suite.state.Caches.Init() | ||||
| 	testrig.StartWorkers(&suite.state) | ||||
| 	testrig.StartNoopWorkers(&suite.state) | ||||
|  | ||||
| 	testrig.InitTestConfig() | ||||
| 	testrig.InitTestLog() | ||||
|   | ||||
| @@ -78,7 +78,6 @@ func (suite *UserStandardTestSuite) SetupSuite() { | ||||
|  | ||||
| func (suite *UserStandardTestSuite) SetupTest() { | ||||
| 	suite.state.Caches.Init() | ||||
| 	testrig.StartWorkers(&suite.state) | ||||
|  | ||||
| 	testrig.InitTestConfig() | ||||
| 	testrig.InitTestLog() | ||||
| @@ -98,7 +97,10 @@ func (suite *UserStandardTestSuite) SetupTest() { | ||||
| 	suite.mediaManager = testrig.NewTestMediaManager(&suite.state) | ||||
| 	suite.federator = testrig.NewTestFederator(&suite.state, testrig.NewTestTransportController(&suite.state, testrig.NewMockHTTPClient(nil, "../../../../testrig/media")), suite.mediaManager) | ||||
| 	suite.emailSender = testrig.NewEmailSender("../../../../web/template/", nil) | ||||
|  | ||||
| 	suite.processor = testrig.NewTestProcessor(&suite.state, suite.federator, suite.emailSender, suite.mediaManager) | ||||
| 	testrig.StartWorkers(&suite.state, suite.processor.Workers()) | ||||
|  | ||||
| 	suite.userModule = users.New(suite.processor) | ||||
| 	testrig.StandardDBSetup(suite.db, suite.testAccounts) | ||||
| 	testrig.StandardStorageSetup(suite.storage, "../../../../testrig/media") | ||||
|   | ||||
| @@ -93,7 +93,7 @@ func (suite *AuthStandardTestSuite) SetupTest() { | ||||
| 	suite.authModule = auth.New(suite.db, suite.processor, suite.idp) | ||||
|  | ||||
| 	testrig.StandardDBSetup(suite.db, suite.testAccounts) | ||||
| 	testrig.StartWorkers(&suite.state) | ||||
| 	testrig.StartNoopWorkers(&suite.state) | ||||
| } | ||||
|  | ||||
| func (suite *AuthStandardTestSuite) TearDownTest() { | ||||
|   | ||||
| @@ -78,7 +78,7 @@ func (suite *AccountStandardTestSuite) SetupSuite() { | ||||
|  | ||||
| func (suite *AccountStandardTestSuite) SetupTest() { | ||||
| 	suite.state.Caches.Init() | ||||
| 	testrig.StartWorkers(&suite.state) | ||||
| 	testrig.StartNoopWorkers(&suite.state) | ||||
|  | ||||
| 	testrig.InitTestConfig() | ||||
| 	testrig.InitTestLog() | ||||
|   | ||||
| @@ -84,7 +84,7 @@ func (suite *AdminStandardTestSuite) SetupSuite() { | ||||
|  | ||||
| func (suite *AdminStandardTestSuite) SetupTest() { | ||||
| 	suite.state.Caches.Init() | ||||
| 	testrig.StartWorkers(&suite.state) | ||||
| 	testrig.StartNoopWorkers(&suite.state) | ||||
|  | ||||
| 	testrig.InitTestConfig() | ||||
| 	testrig.InitTestLog() | ||||
|   | ||||
| @@ -88,7 +88,7 @@ func (suite *BookmarkTestSuite) SetupSuite() { | ||||
|  | ||||
| func (suite *BookmarkTestSuite) SetupTest() { | ||||
| 	suite.state.Caches.Init() | ||||
| 	testrig.StartWorkers(&suite.state) | ||||
| 	testrig.StartNoopWorkers(&suite.state) | ||||
|  | ||||
| 	testrig.InitTestConfig() | ||||
| 	testrig.InitTestLog() | ||||
|   | ||||
| @@ -72,7 +72,7 @@ func (suite *FavouritesStandardTestSuite) SetupSuite() { | ||||
|  | ||||
| func (suite *FavouritesStandardTestSuite) SetupTest() { | ||||
| 	suite.state.Caches.Init() | ||||
| 	testrig.StartWorkers(&suite.state) | ||||
| 	testrig.StartNoopWorkers(&suite.state) | ||||
|  | ||||
| 	testrig.InitTestConfig() | ||||
| 	testrig.InitTestLog() | ||||
|   | ||||
| @@ -75,7 +75,7 @@ func (suite *FollowRequestStandardTestSuite) SetupSuite() { | ||||
|  | ||||
| func (suite *FollowRequestStandardTestSuite) SetupTest() { | ||||
| 	suite.state.Caches.Init() | ||||
| 	testrig.StartWorkers(&suite.state) | ||||
| 	testrig.StartNoopWorkers(&suite.state) | ||||
|  | ||||
| 	testrig.InitTestConfig() | ||||
| 	testrig.InitTestLog() | ||||
|   | ||||
| @@ -77,7 +77,7 @@ func (suite *InstanceStandardTestSuite) SetupSuite() { | ||||
|  | ||||
| func (suite *InstanceStandardTestSuite) SetupTest() { | ||||
| 	suite.state.Caches.Init() | ||||
| 	testrig.StartWorkers(&suite.state) | ||||
| 	testrig.StartNoopWorkers(&suite.state) | ||||
|  | ||||
| 	testrig.InitTestConfig() | ||||
| 	testrig.InitTestLog() | ||||
|   | ||||
| @@ -76,7 +76,7 @@ func (suite *ListsStandardTestSuite) SetupSuite() { | ||||
| func (suite *ListsStandardTestSuite) SetupTest() { | ||||
| 	suite.state.Caches.Init() | ||||
| 	suite.state.Caches.Start() | ||||
| 	testrig.StartWorkers(&suite.state) | ||||
| 	testrig.StartNoopWorkers(&suite.state) | ||||
|  | ||||
| 	testrig.InitTestConfig() | ||||
| 	testrig.InitTestLog() | ||||
|   | ||||
| @@ -80,7 +80,7 @@ type MediaCreateTestSuite struct { | ||||
|  | ||||
| func (suite *MediaCreateTestSuite) SetupSuite() { | ||||
| 	suite.state.Caches.Init() | ||||
| 	testrig.StartWorkers(&suite.state) | ||||
| 	testrig.StartNoopWorkers(&suite.state) | ||||
|  | ||||
| 	// setup standard items | ||||
| 	testrig.InitTestConfig() | ||||
|   | ||||
| @@ -76,7 +76,7 @@ type MediaUpdateTestSuite struct { | ||||
| */ | ||||
|  | ||||
| func (suite *MediaUpdateTestSuite) SetupSuite() { | ||||
| 	testrig.StartWorkers(&suite.state) | ||||
| 	testrig.StartNoopWorkers(&suite.state) | ||||
|  | ||||
| 	// setup standard items | ||||
| 	testrig.InitTestConfig() | ||||
|   | ||||
| @@ -69,7 +69,7 @@ func (suite *PollsStandardTestSuite) SetupSuite() { | ||||
|  | ||||
| func (suite *PollsStandardTestSuite) SetupTest() { | ||||
| 	suite.state.Caches.Init() | ||||
| 	testrig.StartWorkers(&suite.state) | ||||
| 	testrig.StartNoopWorkers(&suite.state) | ||||
|  | ||||
| 	testrig.InitTestConfig() | ||||
| 	testrig.InitTestLog() | ||||
|   | ||||
| @@ -69,7 +69,7 @@ func (suite *ReportsStandardTestSuite) SetupSuite() { | ||||
|  | ||||
| func (suite *ReportsStandardTestSuite) SetupTest() { | ||||
| 	suite.state.Caches.Init() | ||||
| 	testrig.StartWorkers(&suite.state) | ||||
| 	testrig.StartNoopWorkers(&suite.state) | ||||
|  | ||||
| 	testrig.InitTestConfig() | ||||
| 	testrig.InitTestLog() | ||||
|   | ||||
| @@ -73,7 +73,7 @@ func (suite *SearchStandardTestSuite) SetupSuite() { | ||||
|  | ||||
| func (suite *SearchStandardTestSuite) SetupTest() { | ||||
| 	suite.state.Caches.Init() | ||||
| 	testrig.StartWorkers(&suite.state) | ||||
| 	testrig.StartNoopWorkers(&suite.state) | ||||
|  | ||||
| 	testrig.InitTestConfig() | ||||
| 	testrig.InitTestLog() | ||||
|   | ||||
| @@ -72,7 +72,7 @@ func (suite *StatusStandardTestSuite) SetupSuite() { | ||||
|  | ||||
| func (suite *StatusStandardTestSuite) SetupTest() { | ||||
| 	suite.state.Caches.Init() | ||||
| 	testrig.StartWorkers(&suite.state) | ||||
| 	testrig.StartNoopWorkers(&suite.state) | ||||
|  | ||||
| 	testrig.InitTestConfig() | ||||
| 	testrig.InitTestLog() | ||||
|   | ||||
| @@ -84,7 +84,7 @@ func (suite *StreamingTestSuite) SetupSuite() { | ||||
|  | ||||
| func (suite *StreamingTestSuite) SetupTest() { | ||||
| 	suite.state.Caches.Init() | ||||
| 	testrig.StartWorkers(&suite.state) | ||||
| 	testrig.StartNoopWorkers(&suite.state) | ||||
|  | ||||
| 	testrig.InitTestConfig() | ||||
| 	testrig.InitTestLog() | ||||
|   | ||||
| @@ -57,7 +57,7 @@ type UserStandardTestSuite struct { | ||||
|  | ||||
| func (suite *UserStandardTestSuite) SetupTest() { | ||||
| 	suite.state.Caches.Init() | ||||
| 	testrig.StartWorkers(&suite.state) | ||||
| 	testrig.StartNoopWorkers(&suite.state) | ||||
|  | ||||
| 	testrig.InitTestConfig() | ||||
| 	testrig.InitTestLog() | ||||
|   | ||||
| @@ -65,7 +65,7 @@ type FileserverTestSuite struct { | ||||
| */ | ||||
|  | ||||
| func (suite *FileserverTestSuite) SetupSuite() { | ||||
| 	testrig.StartWorkers(&suite.state) | ||||
| 	testrig.StartNoopWorkers(&suite.state) | ||||
|  | ||||
| 	testrig.InitTestConfig() | ||||
| 	testrig.InitTestLog() | ||||
| @@ -96,7 +96,7 @@ func (suite *FileserverTestSuite) SetupSuite() { | ||||
|  | ||||
| func (suite *FileserverTestSuite) SetupTest() { | ||||
| 	suite.state.Caches.Init() | ||||
| 	testrig.StartWorkers(&suite.state) | ||||
| 	testrig.StartNoopWorkers(&suite.state) | ||||
|  | ||||
| 	testrig.StandardDBSetup(suite.db, nil) | ||||
| 	testrig.StandardStorageSetup(suite.storage, "../../../testrig/media") | ||||
|   | ||||
| @@ -72,7 +72,7 @@ func (suite *WebfingerStandardTestSuite) SetupSuite() { | ||||
|  | ||||
| func (suite *WebfingerStandardTestSuite) SetupTest() { | ||||
| 	suite.state.Caches.Init() | ||||
| 	testrig.StartWorkers(&suite.state) | ||||
| 	testrig.StartNoopWorkers(&suite.state) | ||||
|  | ||||
| 	testrig.InitTestLog() | ||||
| 	testrig.InitTestConfig() | ||||
|   | ||||
| @@ -84,10 +84,10 @@ func (suite *WebfingerGetTestSuite) funkifyAccountDomain(host string, accountDom | ||||
| 	config.SetHost(host) | ||||
| 	config.SetAccountDomain(accountDomain) | ||||
| 	testrig.StopWorkers(&suite.state) | ||||
| 	testrig.StartWorkers(&suite.state) | ||||
| 	testrig.StartNoopWorkers(&suite.state) | ||||
| 	suite.processor = processing.NewProcessor(cleaner.New(&suite.state), suite.tc, suite.federator, testrig.NewTestOauthServer(suite.db), testrig.NewTestMediaManager(&suite.state), &suite.state, suite.emailSender) | ||||
| 	suite.webfingerModule = webfinger.New(suite.processor) | ||||
| 	testrig.StartWorkers(&suite.state) | ||||
| 	testrig.StartNoopWorkers(&suite.state) | ||||
|  | ||||
| 	// Generate a new account for the | ||||
| 	// tester, which uses the new host. | ||||
|   | ||||
| @@ -58,7 +58,7 @@ func (suite *CleanerTestSuite) SetupTest() { | ||||
| 	suite.state.Storage = testrig.NewInMemoryStorage() | ||||
|  | ||||
| 	// Initialize test cleaner instance. | ||||
| 	testrig.StartWorkers(&suite.state) | ||||
| 	testrig.StartNoopWorkers(&suite.state) | ||||
| 	suite.cleaner = cleaner.New(&suite.state) | ||||
|  | ||||
| 	// Allocate new test model emojis. | ||||
|   | ||||
| @@ -62,7 +62,7 @@ func (suite *MediaTestSuite) SetupTest() { | ||||
| 	testrig.InitTestLog() | ||||
|  | ||||
| 	suite.state.Caches.Init() | ||||
| 	testrig.StartWorkers(&suite.state) | ||||
| 	testrig.StartNoopWorkers(&suite.state) | ||||
|  | ||||
| 	suite.db = testrig.NewTestDB(&suite.state) | ||||
| 	suite.storage = testrig.NewInMemoryStorage() | ||||
|   | ||||
| @@ -317,11 +317,9 @@ func (d *Dereferencer) RefreshAccountAsync(ctx context.Context, requestUser stri | ||||
|  | ||||
| 		if apubAcc != nil { | ||||
| 			// This account was updated, enqueue re-dereference featured posts. | ||||
| 			d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { | ||||
| 				if err := d.dereferenceAccountFeatured(ctx, requestUser, latest); err != nil { | ||||
| 					log.Errorf(ctx, "error fetching account featured collection: %v", err) | ||||
| 				} | ||||
| 			}) | ||||
| 			if err := d.dereferenceAccountFeatured(ctx, requestUser, latest); err != nil { | ||||
| 				log.Errorf(ctx, "error fetching account featured collection: %v", err) | ||||
| 			} | ||||
| 		} | ||||
| 	}) | ||||
| } | ||||
|   | ||||
| @@ -60,7 +60,7 @@ func (suite *DereferencerStandardTestSuite) SetupTest() { | ||||
| 	suite.testEmojis = testrig.NewTestEmojis() | ||||
|  | ||||
| 	suite.state.Caches.Init() | ||||
| 	testrig.StartWorkers(&suite.state) | ||||
| 	testrig.StartNoopWorkers(&suite.state) | ||||
|  | ||||
| 	suite.db = testrig.NewTestDB(&suite.state) | ||||
|  | ||||
|   | ||||
| @@ -67,7 +67,7 @@ func (suite *FederatingDBTestSuite) SetupTest() { | ||||
| 	testrig.InitTestLog() | ||||
|  | ||||
| 	suite.state.Caches.Init() | ||||
| 	testrig.StartWorkers(&suite.state) | ||||
| 	testrig.StartNoopWorkers(&suite.state) | ||||
|  | ||||
| 	suite.fromFederator = make(chan messages.FromFediAPI, 10) | ||||
| 	suite.state.Workers.EnqueueFediAPI = func(ctx context.Context, msgs ...messages.FromFediAPI) { | ||||
|   | ||||
| @@ -56,7 +56,7 @@ func (suite *FederatorStandardTestSuite) SetupSuite() { | ||||
|  | ||||
| func (suite *FederatorStandardTestSuite) SetupTest() { | ||||
| 	suite.state.Caches.Init() | ||||
| 	testrig.StartWorkers(&suite.state) | ||||
| 	testrig.StartNoopWorkers(&suite.state) | ||||
|  | ||||
| 	testrig.InitTestConfig() | ||||
| 	testrig.InitTestLog() | ||||
|   | ||||
| @@ -48,7 +48,7 @@ func (suite *MediaStandardTestSuite) SetupTest() { | ||||
| 	testrig.InitTestLog() | ||||
|  | ||||
| 	suite.state.Caches.Init() | ||||
| 	testrig.StartWorkers(&suite.state) | ||||
| 	testrig.StartNoopWorkers(&suite.state) | ||||
|  | ||||
| 	suite.db = testrig.NewTestDB(&suite.state) | ||||
| 	suite.storage = testrig.NewInMemoryStorage() | ||||
|   | ||||
| @@ -81,7 +81,7 @@ func (suite *AccountStandardTestSuite) SetupSuite() { | ||||
|  | ||||
| func (suite *AccountStandardTestSuite) SetupTest() { | ||||
| 	suite.state.Caches.Init() | ||||
| 	testrig.StartWorkers(&suite.state) | ||||
| 	testrig.StartNoopWorkers(&suite.state) | ||||
|  | ||||
| 	testrig.InitTestConfig() | ||||
| 	testrig.InitTestLog() | ||||
|   | ||||
| @@ -27,6 +27,7 @@ import ( | ||||
| 	"github.com/google/uuid" | ||||
| 	"github.com/superseriousbusiness/gotosocial/internal/ap" | ||||
| 	"github.com/superseriousbusiness/gotosocial/internal/db" | ||||
| 	"github.com/superseriousbusiness/gotosocial/internal/gtscontext" | ||||
| 	"github.com/superseriousbusiness/gotosocial/internal/gtserror" | ||||
| 	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" | ||||
| 	"github.com/superseriousbusiness/gotosocial/internal/log" | ||||
| @@ -39,38 +40,45 @@ const deleteSelectLimit = 50 | ||||
|  | ||||
| // Delete deletes an account, and all of that account's statuses, media, follows, notifications, etc etc etc. | ||||
| // The origin passed here should be either the ID of the account doing the delete (can be itself), or the ID of a domain block. | ||||
| func (p *Processor) Delete(ctx context.Context, account *gtsmodel.Account, origin string) gtserror.WithCode { | ||||
| func (p *Processor) Delete( | ||||
| 	ctx context.Context, | ||||
| 	account *gtsmodel.Account, | ||||
| 	origin string, | ||||
| ) gtserror.WithCode { | ||||
| 	l := log.WithContext(ctx).WithFields(kv.Fields{ | ||||
| 		{"username", account.Username}, | ||||
| 		{"domain", account.Domain}, | ||||
| 	}...) | ||||
| 	l.Trace("beginning account delete process") | ||||
|  | ||||
| 	// Delete statuses *before* follows to ensure correct addressing | ||||
| 	// of any outgoing fedi messages generated by deleting statuses. | ||||
| 	if err := p.deleteAccountStatuses(ctx, account); err != nil { | ||||
| 		l.Errorf("continuing after error during account delete: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	if err := p.deleteAccountFollows(ctx, account); err != nil { | ||||
| 		return gtserror.NewErrorInternalError(err) | ||||
| 		l.Errorf("continuing after error during account delete: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	if err := p.deleteAccountBlocks(ctx, account); err != nil { | ||||
| 		return gtserror.NewErrorInternalError(err) | ||||
| 	} | ||||
|  | ||||
| 	if err := p.deleteAccountStatuses(ctx, account); err != nil { | ||||
| 		return gtserror.NewErrorInternalError(err) | ||||
| 		l.Errorf("continuing after error during account delete: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	if err := p.deleteAccountNotifications(ctx, account); err != nil { | ||||
| 		return gtserror.NewErrorInternalError(err) | ||||
| 		l.Errorf("continuing after error during account delete: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	if err := p.deleteAccountPeripheral(ctx, account); err != nil { | ||||
| 		return gtserror.NewErrorInternalError(err) | ||||
| 		l.Errorf("continuing after error during account delete: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	if account.IsLocal() { | ||||
| 		// we tokens, applications and clients for account as one of the last | ||||
| 		// stages during deletion, as other database models rely on these. | ||||
| 		// We delete tokens, applications and clients for | ||||
| 		// account as one of the last stages during deletion, | ||||
| 		// as other database models rely on these. | ||||
| 		if err := p.deleteUserAndTokensForAccount(ctx, account); err != nil { | ||||
| 			return gtserror.NewErrorInternalError(err) | ||||
| 			l.Errorf("continuing after error during account delete: %v", err) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| @@ -83,7 +91,7 @@ func (p *Processor) Delete(ctx context.Context, account *gtsmodel.Account, origi | ||||
| 		return gtserror.NewErrorInternalError(err) | ||||
| 	} | ||||
|  | ||||
| 	l.Info("account deleted") | ||||
| 	l.Info("account delete process complete") | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| @@ -189,7 +197,7 @@ func (p *Processor) deleteAccountFollows(ctx context.Context, account *gtsmodel. | ||||
| 		// To avoid checking if account is local over + over | ||||
| 		// inside the subsequent loops, just generate static | ||||
| 		// side effects function once now. | ||||
| 		unfollowSideEffects = p.unfollowSideEffectsFunc(account) | ||||
| 		unfollowSideEffects = p.unfollowSideEffectsFunc(account.IsLocal()) | ||||
| 	) | ||||
|  | ||||
| 	// Delete follows originating from this account. | ||||
| @@ -240,31 +248,56 @@ func (p *Processor) deleteAccountFollows(ctx context.Context, account *gtsmodel. | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// Process accreted messages asynchronously. | ||||
| 	p.state.Workers.EnqueueClientAPI(ctx, msgs...) | ||||
| 	// Process accreted messages in serial. | ||||
| 	for _, msg := range msgs { | ||||
| 		if err := p.state.Workers.ProcessFromClientAPI(ctx, msg); err != nil { | ||||
| 			log.Errorf( | ||||
| 				ctx, | ||||
| 				"error processing %s of %s during Delete of account %s: %v", | ||||
| 				msg.APActivityType, msg.APObjectType, account.ID, err, | ||||
| 			) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (p *Processor) unfollowSideEffectsFunc(deletedAccount *gtsmodel.Account) func(ctx context.Context, account *gtsmodel.Account, follow *gtsmodel.Follow) *messages.FromClientAPI { | ||||
| 	if !deletedAccount.IsLocal() { | ||||
| func (p *Processor) unfollowSideEffectsFunc(local bool) func( | ||||
| 	ctx context.Context, | ||||
| 	account *gtsmodel.Account, | ||||
| 	follow *gtsmodel.Follow, | ||||
| ) *messages.FromClientAPI { | ||||
| 	if !local { | ||||
| 		// Don't try to process side effects | ||||
| 		// for accounts that aren't local. | ||||
| 		return func(ctx context.Context, account *gtsmodel.Account, follow *gtsmodel.Follow) *messages.FromClientAPI { | ||||
| 			return nil // noop | ||||
| 		return func( | ||||
| 			_ context.Context, | ||||
| 			_ *gtsmodel.Account, | ||||
| 			_ *gtsmodel.Follow, | ||||
| 		) *messages.FromClientAPI { | ||||
| 			// noop | ||||
| 			return nil | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return func(ctx context.Context, account *gtsmodel.Account, follow *gtsmodel.Follow) *messages.FromClientAPI { | ||||
| 	return func( | ||||
| 		ctx context.Context, | ||||
| 		account *gtsmodel.Account, | ||||
| 		follow *gtsmodel.Follow, | ||||
| 	) *messages.FromClientAPI { | ||||
| 		if follow.TargetAccount == nil { | ||||
| 			// TargetAccount seems to have gone; | ||||
| 			// race condition? db corruption? | ||||
| 			log.WithContext(ctx).WithField("follow", follow).Warn("follow had no TargetAccount, likely race condition") | ||||
| 			log. | ||||
| 				WithContext(ctx). | ||||
| 				WithField("follow", follow). | ||||
| 				Warn("follow had no TargetAccount, likely race condition") | ||||
| 			return nil | ||||
| 		} | ||||
|  | ||||
| 		if follow.TargetAccount.IsLocal() { | ||||
| 			// No side effects for local unfollows. | ||||
| 			// No side effects | ||||
| 			// for local unfollows. | ||||
| 			return nil | ||||
| 		} | ||||
|  | ||||
| @@ -288,8 +321,11 @@ func (p *Processor) deleteAccountBlocks(ctx context.Context, account *gtsmodel.A | ||||
|  | ||||
| // deleteAccountStatuses iterates through all statuses owned by | ||||
| // the given account, passing each discovered status (and boosts | ||||
| // thereof) to the processor workers for further async processing. | ||||
| func (p *Processor) deleteAccountStatuses(ctx context.Context, account *gtsmodel.Account) error { | ||||
| // thereof) to the processor workers for further processing. | ||||
| func (p *Processor) deleteAccountStatuses( | ||||
| 	ctx context.Context, | ||||
| 	account *gtsmodel.Account, | ||||
| ) error { | ||||
| 	// We'll select statuses 50 at a time so we don't wreck the db, | ||||
| 	// and pass them through to the client api worker to handle. | ||||
| 	// | ||||
| @@ -331,42 +367,43 @@ statusLoop: | ||||
| 		maxID = statuses[len(statuses)-1].ID | ||||
|  | ||||
| 		for _, status := range statuses { | ||||
| 			status.Account = account // ensure account is set | ||||
|  | ||||
| 			// Pass the status delete through the client api worker for processing. | ||||
| 			msgs = append(msgs, messages.FromClientAPI{ | ||||
| 				APObjectType:   ap.ObjectNote, | ||||
| 				APActivityType: ap.ActivityDelete, | ||||
| 				GTSModel:       status, | ||||
| 				OriginAccount:  account, | ||||
| 				TargetAccount:  account, | ||||
| 			}) | ||||
| 			// Ensure account is set. | ||||
| 			status.Account = account | ||||
|  | ||||
| 			// Look for any boosts of this status in DB. | ||||
| 			boosts, err := p.state.DB.GetStatusBoosts(ctx, status.ID) | ||||
| 			// | ||||
| 			// We put these in the msgs slice first so | ||||
| 			// that they're handled first, before the | ||||
| 			// parent status that's being boosted. | ||||
| 			// | ||||
| 			// Use a barebones context and just select the | ||||
| 			// origin account separately. The rest will be | ||||
| 			// populated later anyway, and we don't want to | ||||
| 			// stop now because we couldn't get something. | ||||
| 			boosts, err := p.state.DB.GetStatusBoosts( | ||||
| 				gtscontext.SetBarebones(ctx), | ||||
| 				status.ID, | ||||
| 			) | ||||
| 			if err != nil && !errors.Is(err, db.ErrNoEntries) { | ||||
| 				return gtserror.Newf("error fetching status reblogs for %s: %w", status.ID, err) | ||||
| 				return gtserror.Newf("error fetching status boosts for %s: %w", status.ID, err) | ||||
| 			} | ||||
|  | ||||
| 			// Prepare to Undo each boost. | ||||
| 			for _, boost := range boosts { | ||||
| 				if boost.Account == nil { | ||||
| 					// Fetch the relevant account for this status boost. | ||||
| 					boostAcc, err := p.state.DB.GetAccountByID(ctx, boost.AccountID) | ||||
| 					if err != nil { | ||||
| 						if errors.Is(err, db.ErrNoEntries) { | ||||
| 							// We don't have an account for this boost | ||||
| 							// for some reason, so just skip processing. | ||||
| 							log.WithContext(ctx).WithField("boost", boost).Warnf("no account found with id %s for boost %s", boost.AccountID, boost.ID) | ||||
| 							continue | ||||
| 						} | ||||
| 						return gtserror.Newf("error fetching boosted status account for %s: %w", boost.AccountID, err) | ||||
| 					} | ||||
| 				boost.Account, err = p.state.DB.GetAccountByID( | ||||
| 					gtscontext.SetBarebones(ctx), | ||||
| 					boost.AccountID, | ||||
| 				) | ||||
|  | ||||
| 					// Set account model | ||||
| 					boost.Account = boostAcc | ||||
| 				if err != nil { | ||||
| 					log.Warnf( | ||||
| 						ctx, | ||||
| 						"db error getting owner %s of status boost %s: %v", | ||||
| 						boost.AccountID, boost.ID, err, | ||||
| 					) | ||||
| 					continue | ||||
| 				} | ||||
|  | ||||
| 				// Pass the boost delete through the client api worker for processing. | ||||
| 				msgs = append(msgs, messages.FromClientAPI{ | ||||
| 					APObjectType:   ap.ActivityAnnounce, | ||||
| 					APActivityType: ap.ActivityUndo, | ||||
| @@ -375,11 +412,28 @@ statusLoop: | ||||
| 					TargetAccount:  account, | ||||
| 				}) | ||||
| 			} | ||||
|  | ||||
| 			// Now prepare to Delete status. | ||||
| 			msgs = append(msgs, messages.FromClientAPI{ | ||||
| 				APObjectType:   ap.ObjectNote, | ||||
| 				APActivityType: ap.ActivityDelete, | ||||
| 				GTSModel:       status, | ||||
| 				OriginAccount:  account, | ||||
| 				TargetAccount:  account, | ||||
| 			}) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// Batch process all accreted messages. | ||||
| 	p.state.Workers.EnqueueClientAPI(ctx, msgs...) | ||||
| 	// Process accreted messages in serial. | ||||
| 	for _, msg := range msgs { | ||||
| 		if err := p.state.Workers.ProcessFromClientAPI(ctx, msg); err != nil { | ||||
| 			log.Errorf( | ||||
| 				ctx, | ||||
| 				"error processing %s of %s during Delete of account %s: %v", | ||||
| 				msg.APActivityType, msg.APObjectType, account.ID, err, | ||||
| 			) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|   | ||||
| @@ -80,7 +80,7 @@ func (suite *AdminStandardTestSuite) SetupSuite() { | ||||
|  | ||||
| func (suite *AdminStandardTestSuite) SetupTest() { | ||||
| 	suite.state.Caches.Init() | ||||
| 	testrig.StartWorkers(&suite.state) | ||||
| 	 | ||||
|  | ||||
| 	testrig.InitTestConfig() | ||||
| 	testrig.InitTestLog() | ||||
| @@ -115,7 +115,7 @@ func (suite *AdminStandardTestSuite) SetupTest() { | ||||
| 		suite.emailSender, | ||||
| 	) | ||||
|  | ||||
| 	suite.state.Workers.ProcessFromClientAPI = suite.processor.Workers().ProcessFromClientAPI | ||||
| 	testrig.StartWorkers(&suite.state, suite.processor.Workers()) | ||||
| 	suite.adminProcessor = suite.processor.Admin() | ||||
|  | ||||
| 	testrig.StandardDBSetup(suite.db, nil) | ||||
|   | ||||
| @@ -50,7 +50,7 @@ func (suite *PollTestSuite) SetupTest() { | ||||
| 	testrig.InitTestConfig() | ||||
| 	testrig.InitTestLog() | ||||
| 	suite.state.Caches.Init() | ||||
| 	testrig.StartWorkers(&suite.state) | ||||
| 	testrig.StartNoopWorkers(&suite.state) | ||||
| 	testrig.NewTestDB(&suite.state) | ||||
| 	converter := typeutils.NewConverter(&suite.state) | ||||
| 	controller := testrig.NewTestTransportController(&suite.state, nil) | ||||
|   | ||||
| @@ -95,7 +95,7 @@ func (suite *ProcessingStandardTestSuite) SetupSuite() { | ||||
|  | ||||
| func (suite *ProcessingStandardTestSuite) SetupTest() { | ||||
| 	suite.state.Caches.Init() | ||||
| 	testrig.StartWorkers(&suite.state) | ||||
| 	testrig.StartNoopWorkers(&suite.state) | ||||
|  | ||||
| 	testrig.InitTestConfig() | ||||
| 	testrig.InitTestLog() | ||||
|   | ||||
| @@ -74,7 +74,7 @@ func (suite *StatusStandardTestSuite) SetupSuite() { | ||||
|  | ||||
| func (suite *StatusStandardTestSuite) SetupTest() { | ||||
| 	suite.state.Caches.Init() | ||||
| 	testrig.StartWorkers(&suite.state) | ||||
| 	testrig.StartNoopWorkers(&suite.state) | ||||
|  | ||||
| 	testrig.InitTestConfig() | ||||
| 	testrig.InitTestLog() | ||||
|   | ||||
| @@ -97,7 +97,6 @@ func (suite *WorkersTestSuite) SetupSuite() { | ||||
|  | ||||
| func (suite *WorkersTestSuite) SetupTest() { | ||||
| 	suite.state.Caches.Init() | ||||
| 	testrig.StartWorkers(&suite.state) | ||||
|  | ||||
| 	testrig.InitTestConfig() | ||||
| 	testrig.InitTestLog() | ||||
| @@ -126,6 +125,8 @@ func (suite *WorkersTestSuite) SetupTest() { | ||||
| 	suite.emailSender = testrig.NewEmailSender("../../../web/template/", nil) | ||||
|  | ||||
| 	suite.processor = processing.NewProcessor(cleaner.New(&suite.state), suite.typeconverter, suite.federator, suite.oauthServer, suite.mediaManager, &suite.state, suite.emailSender) | ||||
| 	testrig.StartWorkers(&suite.state, suite.processor.Workers()) | ||||
|  | ||||
| 	suite.state.Workers.EnqueueClientAPI = suite.processor.Workers().EnqueueClientAPI | ||||
| 	suite.state.Workers.EnqueueFediAPI = suite.processor.Workers().EnqueueFediAPI | ||||
|  | ||||
|   | ||||
| @@ -48,7 +48,7 @@ func (suite *TimelineStandardTestSuite) SetupTest() { | ||||
| 	suite.state = new(state.State) | ||||
|  | ||||
| 	suite.state.Caches.Init() | ||||
| 	testrig.StartWorkers(suite.state) | ||||
| 	testrig.StartNoopWorkers(suite.state) | ||||
|  | ||||
| 	testrig.InitTestConfig() | ||||
| 	testrig.InitTestLog() | ||||
|   | ||||
| @@ -67,7 +67,7 @@ func (suite *TransportTestSuite) SetupSuite() { | ||||
|  | ||||
| func (suite *TransportTestSuite) SetupTest() { | ||||
| 	suite.state.Caches.Init() | ||||
| 	testrig.StartWorkers(&suite.state) | ||||
| 	testrig.StartNoopWorkers(&suite.state) | ||||
|  | ||||
| 	testrig.InitTestConfig() | ||||
| 	testrig.InitTestLog() | ||||
|   | ||||
| @@ -515,7 +515,6 @@ func (suite *TypeUtilsTestSuite) TearDownTest() { | ||||
| // GetProcessor is a utility function that instantiates a processor. | ||||
| // Useful when a test in the test suite needs to change some state. | ||||
| func (suite *TypeUtilsTestSuite) GetProcessor() *processing.Processor { | ||||
| 	testrig.StartWorkers(&suite.state) | ||||
| 	testrig.StartTimelines( | ||||
| 		&suite.state, | ||||
| 		visibility.NewFilter(&suite.state), | ||||
| @@ -527,5 +526,9 @@ func (suite *TypeUtilsTestSuite) GetProcessor() *processing.Processor { | ||||
| 	mediaManager := testrig.NewTestMediaManager(&suite.state) | ||||
| 	federator := testrig.NewTestFederator(&suite.state, transportController, mediaManager) | ||||
| 	emailSender := testrig.NewEmailSender("../../web/template/", nil) | ||||
| 	return testrig.NewTestProcessor(&suite.state, federator, emailSender, mediaManager) | ||||
| 	 | ||||
| 	processor := testrig.NewTestProcessor(&suite.state, federator, emailSender, mediaManager) | ||||
| 	testrig.StartWorkers(&suite.state, processor.Workers()) | ||||
|  | ||||
| 	return processor | ||||
| } | ||||
|   | ||||
| @@ -24,6 +24,6 @@ import ( | ||||
|  | ||||
| // NewTestMediaManager returns a media handler with the default test config, and the given db and storage. | ||||
| func NewTestMediaManager(state *state.State) *media.Manager { | ||||
| 	StartWorkers(state) // ensure started | ||||
| 	StartNoopWorkers(state) // ensure started | ||||
| 	return media.NewManager(state) | ||||
| } | ||||
|   | ||||
| @@ -27,10 +27,14 @@ import ( | ||||
| 	"github.com/superseriousbusiness/gotosocial/internal/typeutils" | ||||
| ) | ||||
|  | ||||
| // NewTestProcessor returns a Processor suitable for testing purposes | ||||
| // NewTestProcessor returns a Processor suitable for testing purposes. | ||||
| // The passed in state will have its worker functions set appropriately, | ||||
| // but the state will not be initialized. | ||||
| func NewTestProcessor(state *state.State, federator *federation.Federator, emailSender email.Sender, mediaManager *media.Manager) *processing.Processor { | ||||
| 	p := processing.NewProcessor(cleaner.New(state), typeutils.NewConverter(state), federator, NewTestOauthServer(state.DB), mediaManager, state, emailSender) | ||||
| 	state.Workers.EnqueueClientAPI = p.Workers().EnqueueClientAPI | ||||
| 	state.Workers.EnqueueFediAPI = p.Workers().EnqueueFediAPI | ||||
| 	state.Workers.ProcessFromClientAPI = p.Workers().ProcessFromClientAPI | ||||
| 	state.Workers.ProcessFromFediAPI = p.Workers().ProcessFromFediAPI | ||||
| 	return p | ||||
| } | ||||
|   | ||||
| @@ -29,13 +29,16 @@ import ( | ||||
|  | ||||
| 	"github.com/superseriousbusiness/gotosocial/internal/messages" | ||||
| 	tlprocessor "github.com/superseriousbusiness/gotosocial/internal/processing/timeline" | ||||
| 	wprocessor "github.com/superseriousbusiness/gotosocial/internal/processing/workers" | ||||
| 	"github.com/superseriousbusiness/gotosocial/internal/state" | ||||
| 	"github.com/superseriousbusiness/gotosocial/internal/timeline" | ||||
| 	"github.com/superseriousbusiness/gotosocial/internal/typeutils" | ||||
| 	"github.com/superseriousbusiness/gotosocial/internal/visibility" | ||||
| ) | ||||
|  | ||||
| func StartWorkers(state *state.State) { | ||||
| // Starts workers on the provided state using noop processing functions. | ||||
| // Useful when you *don't* want to trigger side effects in a test. | ||||
| func StartNoopWorkers(state *state.State) { | ||||
| 	state.Workers.EnqueueClientAPI = func(context.Context, ...messages.FromClientAPI) {} | ||||
| 	state.Workers.EnqueueFediAPI = func(context.Context, ...messages.FromFediAPI) {} | ||||
| 	state.Workers.ProcessFromClientAPI = func(context.Context, messages.FromClientAPI) error { return nil } | ||||
| @@ -47,6 +50,20 @@ func StartWorkers(state *state.State) { | ||||
| 	_ = state.Workers.Media.Start(1, 10) | ||||
| } | ||||
|  | ||||
| // Starts workers on the provided state using processing functions from the given | ||||
| // workers processor. Useful when you *do* want to trigger side effects in a test. | ||||
| func StartWorkers(state *state.State, wProcessor *wprocessor.Processor) { | ||||
| 	state.Workers.EnqueueClientAPI = wProcessor.EnqueueClientAPI | ||||
| 	state.Workers.EnqueueFediAPI = wProcessor.EnqueueFediAPI | ||||
| 	state.Workers.ProcessFromClientAPI = wProcessor.ProcessFromClientAPI | ||||
| 	state.Workers.ProcessFromFediAPI = wProcessor.ProcessFromFediAPI | ||||
|  | ||||
| 	_ = state.Workers.Scheduler.Start() | ||||
| 	_ = state.Workers.ClientAPI.Start(1, 10) | ||||
| 	_ = state.Workers.Federator.Start(1, 10) | ||||
| 	_ = state.Workers.Media.Start(1, 10) | ||||
| } | ||||
|  | ||||
| func StopWorkers(state *state.State) { | ||||
| 	_ = state.Workers.Scheduler.Stop() | ||||
| 	_ = state.Workers.ClientAPI.Stop() | ||||
|   | ||||
		Reference in New Issue
	
	Block a user