[feature] Store admin actions in the db, prevent conflicting actions (#2167)

This commit is contained in:
tobi
2023-09-04 15:55:17 +02:00
committed by GitHub
parent 4f3daeebcb
commit 3ed1ca68e5
23 changed files with 1393 additions and 272 deletions

View File

@ -44,21 +44,24 @@ import (
// and then processes side effects of that block (deleting accounts, media, etc).
//
// If a domain block already exists for the domain, side effects will be retried.
//
// Return values for this function are the (new) domain block, the ID of the admin
// action resulting from this call, and/or an error if something goes wrong.
func (p *Processor) DomainBlockCreate(
ctx context.Context,
account *gtsmodel.Account,
adminAcct *gtsmodel.Account,
domain string,
obfuscate bool,
publicComment string,
privateComment string,
subscriptionID string,
) (*apimodel.DomainBlock, gtserror.WithCode) {
) (*apimodel.DomainBlock, string, gtserror.WithCode) {
// Check if a block already exists for this domain.
domainBlock, err := p.state.DB.GetDomainBlock(ctx, domain)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
// Something went wrong in the DB.
err = gtserror.Newf("db error getting domain block %s: %w", domain, err)
return nil, gtserror.NewErrorInternalError(err)
return nil, "", gtserror.NewErrorInternalError(err)
}
if domainBlock == nil {
@ -66,7 +69,7 @@ func (p *Processor) DomainBlockCreate(
domainBlock = &gtsmodel.DomainBlock{
ID: id.NewULID(),
Domain: domain,
CreatedByAccountID: account.ID,
CreatedByAccountID: adminAcct.ID,
PrivateComment: text.SanitizeToPlaintext(privateComment),
PublicComment: text.SanitizeToPlaintext(publicComment),
Obfuscate: &obfuscate,
@ -75,18 +78,100 @@ func (p *Processor) DomainBlockCreate(
// Insert the new block into the database.
if err := p.state.DB.CreateDomainBlock(ctx, domainBlock); err != nil {
err = gtserror.Newf("db error putting domain block %s: %s", domain, err)
return nil, gtserror.NewErrorInternalError(err)
err = gtserror.Newf("db error putting domain block %s: %w", domain, err)
return nil, "", gtserror.NewErrorInternalError(err)
}
}
// Process the side effects of the domain block
// asynchronously since it might take a while.
p.state.Workers.ClientAPI.Enqueue(func(ctx context.Context) {
p.domainBlockSideEffects(ctx, account, domainBlock)
})
actionID := id.NewULID()
return p.apiDomainBlock(ctx, domainBlock)
// Process domain block side
// effects asynchronously.
if errWithCode := p.actions.Run(
ctx,
&gtsmodel.AdminAction{
ID: actionID,
TargetCategory: gtsmodel.AdminActionCategoryDomain,
TargetID: domain,
Type: gtsmodel.AdminActionSuspend,
AccountID: adminAcct.ID,
Text: domainBlock.PrivateComment,
},
func(ctx context.Context) gtserror.MultiError {
return p.domainBlockSideEffects(ctx, domainBlock)
},
); errWithCode != nil {
return nil, actionID, errWithCode
}
apiDomainBlock, errWithCode := p.apiDomainBlock(ctx, domainBlock)
if errWithCode != nil {
return nil, actionID, errWithCode
}
return apiDomainBlock, actionID, nil
}
// DomainBlockDelete removes one domain block with the given ID,
// and processes side effects of removing the block asynchronously.
//
// Return values for this function are the deleted domain block, the ID of the admin
// action resulting from this call, and/or an error if something goes wrong.
func (p *Processor) DomainBlockDelete(
ctx context.Context,
adminAcct *gtsmodel.Account,
domainBlockID string,
) (*apimodel.DomainBlock, string, gtserror.WithCode) {
domainBlock, err := p.state.DB.GetDomainBlockByID(ctx, domainBlockID)
if err != nil {
if !errors.Is(err, db.ErrNoEntries) {
// Real error.
err = gtserror.Newf("db error getting domain block: %w", err)
return nil, "", gtserror.NewErrorInternalError(err)
}
// There are just no entries for this ID.
err = fmt.Errorf("no domain block entry exists with ID %s", domainBlockID)
return nil, "", gtserror.NewErrorNotFound(err, err.Error())
}
// Prepare the domain block to return, *before* the deletion goes through.
apiDomainBlock, errWithCode := p.apiDomainBlock(ctx, domainBlock)
if errWithCode != nil {
return nil, "", errWithCode
}
// Copy value of the domain block.
domainBlockC := new(gtsmodel.DomainBlock)
*domainBlockC = *domainBlock
// Delete the original domain block.
if err := p.state.DB.DeleteDomainBlock(ctx, domainBlock.Domain); err != nil {
err = gtserror.Newf("db error deleting domain block: %w", err)
return nil, "", gtserror.NewErrorInternalError(err)
}
actionID := id.NewULID()
// Process domain unblock side
// effects asynchronously.
if errWithCode := p.actions.Run(
ctx,
&gtsmodel.AdminAction{
ID: actionID,
TargetCategory: gtsmodel.AdminActionCategoryDomain,
TargetID: domainBlockC.Domain,
Type: gtsmodel.AdminActionUnsuspend,
AccountID: adminAcct.ID,
},
func(ctx context.Context) gtserror.MultiError {
return p.domainUnblockSideEffects(ctx, domainBlock)
},
); errWithCode != nil {
return nil, actionID, errWithCode
}
return apiDomainBlock, actionID, nil
}
// DomainBlocksImport handles the import of multiple domain blocks,
@ -153,7 +238,7 @@ func (p *Processor) DomainBlocksImport(
errWithCode gtserror.WithCode
)
domainBlock, errWithCode = p.DomainBlockCreate(
domainBlock, _, errWithCode = p.DomainBlockCreate(
ctx,
account,
domain,
@ -227,131 +312,6 @@ func (p *Processor) DomainBlockGet(ctx context.Context, id string, export bool)
return p.apiDomainBlock(ctx, domainBlock)
}
// DomainBlockDelete removes one domain block with the given ID,
// and processes side effects of removing the block asynchronously.
func (p *Processor) DomainBlockDelete(ctx context.Context, account *gtsmodel.Account, id string) (*apimodel.DomainBlock, gtserror.WithCode) {
domainBlock, err := p.state.DB.GetDomainBlockByID(ctx, id)
if err != nil {
if !errors.Is(err, db.ErrNoEntries) {
// Real error.
err = gtserror.Newf("db error getting domain block: %w", err)
return nil, gtserror.NewErrorInternalError(err)
}
// There are just no entries for this ID.
err = fmt.Errorf("no domain block entry exists with ID %s", id)
return nil, gtserror.NewErrorNotFound(err, err.Error())
}
// Prepare the domain block to return, *before* the deletion goes through.
apiDomainBlock, errWithCode := p.apiDomainBlock(ctx, domainBlock)
if errWithCode != nil {
return nil, errWithCode
}
// Copy value of the domain block.
domainBlockC := new(gtsmodel.DomainBlock)
*domainBlockC = *domainBlock
// Delete the original domain block.
if err := p.state.DB.DeleteDomainBlock(ctx, domainBlock.Domain); err != nil {
err = gtserror.Newf("db error deleting domain block: %w", err)
return nil, gtserror.NewErrorInternalError(err)
}
// Process the side effects of the domain unblock
// asynchronously since it might take a while.
p.state.Workers.ClientAPI.Enqueue(func(ctx context.Context) {
p.domainUnblockSideEffects(ctx, domainBlockC) // Use the copy.
})
return apiDomainBlock, nil
}
// stubbifyInstance renders the given instance as a stub,
// removing most information from it and marking it as
// suspended.
//
// For caller's convenience, this function returns the db
// names of all columns that are updated by it.
func stubbifyInstance(instance *gtsmodel.Instance, domainBlockID string) []string {
instance.Title = ""
instance.SuspendedAt = time.Now()
instance.DomainBlockID = domainBlockID
instance.ShortDescription = ""
instance.Description = ""
instance.Terms = ""
instance.ContactEmail = ""
instance.ContactAccountUsername = ""
instance.ContactAccountID = ""
instance.Version = ""
return []string{
"title",
"suspended_at",
"domain_block_id",
"short_description",
"description",
"terms",
"contact_email",
"contact_account_username",
"contact_account_id",
"version",
}
}
// apiDomainBlock is a cheeky shortcut function for returning the API
// version of the given domainBlock, or an appropriate error if
// something goes wrong.
func (p *Processor) apiDomainBlock(ctx context.Context, domainBlock *gtsmodel.DomainBlock) (*apimodel.DomainBlock, gtserror.WithCode) {
apiDomainBlock, err := p.tc.DomainBlockToAPIDomainBlock(ctx, domainBlock, false)
if err != nil {
err = gtserror.Newf("error converting domain block for %s to api model : %w", domainBlock.Domain, err)
return nil, gtserror.NewErrorInternalError(err)
}
return apiDomainBlock, nil
}
// rangeAccounts iterates through all accounts originating from the
// given domain, and calls the provided range function on each account.
// If an error is returned from the range function, the loop will stop
// and return the error.
func (p *Processor) rangeAccounts(
ctx context.Context,
domain string,
rangeF func(*gtsmodel.Account) error,
) error {
var (
limit = 50 // Limit selection to avoid spiking mem/cpu.
maxID string // Start with empty string to select from top.
)
for {
// Get (next) page of accounts.
accounts, err := p.state.DB.GetInstanceAccounts(ctx, domain, maxID, limit)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
// Real db error.
return gtserror.Newf("db error getting instance accounts: %w", err)
}
if len(accounts) == 0 {
// No accounts left, we're done.
return nil
}
// Set next max ID for paging down.
maxID = accounts[len(accounts)-1].ID
// Call provided range function.
for _, account := range accounts {
if err := rangeF(account); err != nil {
return err
}
}
}
}
// domainBlockSideEffects processes the side effects of a domain block:
//
// 1. Strip most info away from the instance entry for the domain.
@ -359,7 +319,10 @@ func (p *Processor) rangeAccounts(
//
// It should be called asynchronously, since it can take a while when
// there are many accounts present on the given domain.
func (p *Processor) domainBlockSideEffects(ctx context.Context, account *gtsmodel.Account, block *gtsmodel.DomainBlock) {
func (p *Processor) domainBlockSideEffects(
ctx context.Context,
block *gtsmodel.DomainBlock,
) gtserror.MultiError {
l := log.
WithContext(ctx).
WithFields(kv.Fields{
@ -367,43 +330,46 @@ func (p *Processor) domainBlockSideEffects(ctx context.Context, account *gtsmode
}...)
l.Debug("processing domain block side effects")
var errs gtserror.MultiError
// If we have an instance entry for this domain,
// update it with the new block ID and clear all fields
instance, err := p.state.DB.GetInstance(ctx, block.Domain)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
l.Errorf("db error getting instance %s: %q", block.Domain, err)
errs.Appendf("db error getting instance %s: %w", block.Domain, err)
return errs
}
if instance != nil {
// We had an entry for this domain.
columns := stubbifyInstance(instance, block.ID)
if err := p.state.DB.UpdateInstance(ctx, instance, columns...); err != nil {
l.Errorf("db error updating instance: %s", err)
} else {
l.Debug("instance entry updated")
errs.Appendf("db error updating instance: %w", err)
return errs
}
l.Debug("instance entry updated")
}
// For each account that belongs to this domain, create
// an account delete message to process via the client API
// worker pool, to remove that account's posts, media, etc.
msgs := []messages.FromClientAPI{}
if err := p.rangeAccounts(ctx, block.Domain, func(account *gtsmodel.Account) error {
msgs = append(msgs, messages.FromClientAPI{
// For each account that belongs to this domain,
// process an account delete message to remove
// that account's posts, media, etc.
if err := p.rangeDomainAccounts(ctx, block.Domain, func(account *gtsmodel.Account) {
cMsg := messages.FromClientAPI{
APObjectType: ap.ActorPerson,
APActivityType: ap.ActivityDelete,
GTSModel: block,
OriginAccount: account,
TargetAccount: account,
})
}
return nil
if err := p.state.Workers.ProcessFromClientAPI(ctx, cMsg); err != nil {
errs.Append(err)
}
}); err != nil {
l.Errorf("error while ranging through accounts: %q", err)
errs.Appendf("db error ranging through accounts: %w", err)
}
// Batch process all accreted messages.
p.state.Workers.EnqueueClientAPI(ctx, msgs...)
return errs
}
// domainUnblockSideEffects processes the side effects of undoing a
@ -415,7 +381,10 @@ func (p *Processor) domainBlockSideEffects(ctx context.Context, account *gtsmode
//
// It should be called asynchronously, since it can take a while when
// there are many accounts present on the given domain.
func (p *Processor) domainUnblockSideEffects(ctx context.Context, block *gtsmodel.DomainBlock) {
func (p *Processor) domainUnblockSideEffects(
ctx context.Context,
block *gtsmodel.DomainBlock,
) gtserror.MultiError {
l := log.
WithContext(ctx).
WithFields(kv.Fields{
@ -423,10 +392,12 @@ func (p *Processor) domainUnblockSideEffects(ctx context.Context, block *gtsmode
}...)
l.Debug("processing domain unblock side effects")
var errs gtserror.MultiError
// Update instance entry for this domain, if we have it.
instance, err := p.state.DB.GetInstance(ctx, block.Domain)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
l.Errorf("db error getting instance %s: %q", block.Domain, err)
errs.Appendf("db error getting instance %s: %w", block.Domain, err)
}
if instance != nil {
@ -440,23 +411,23 @@ func (p *Processor) domainUnblockSideEffects(ctx context.Context, block *gtsmode
"suspended_at",
"domain_block_id",
); err != nil {
l.Errorf("db error updating instance: %s", err)
} else {
l.Debug("instance entry updated")
errs.Appendf("db error updating instance: %w", err)
return errs
}
l.Debug("instance entry updated")
}
// Unsuspend all accounts whose suspension origin was this domain block.
if err := p.rangeAccounts(ctx, block.Domain, func(account *gtsmodel.Account) error {
if err := p.rangeDomainAccounts(ctx, block.Domain, func(account *gtsmodel.Account) {
if account.SuspensionOrigin == "" || account.SuspendedAt.IsZero() {
// Account wasn't suspended, nothing to do.
return nil
return
}
if account.SuspensionOrigin != block.ID {
// Account was suspended, but not by
// this domain block, leave it alone.
return nil
return
}
// Account was suspended by this domain
@ -470,11 +441,11 @@ func (p *Processor) domainUnblockSideEffects(ctx context.Context, block *gtsmode
"suspended_at",
"suspension_origin",
); err != nil {
return gtserror.Newf("db error updating account %s: %w", account.Username, err)
errs.Appendf("db error updating account %s: %w", account.Username, err)
}
return nil
}); err != nil {
l.Errorf("error while ranging through accounts: %q", err)
errs.Appendf("db error ranging through accounts: %w", err)
}
return errs
}