Files
GoToSocial/internal/processing/workers/util.go
kim 6a6a499333 [performance] rewrite timelines to rely on new timeline cache type (#3941)
* start work rewriting timeline cache type

* further work rewriting timeline caching

* more work integration new timeline code

* remove old code

* add local timeline, fix up merge conflicts

* remove old use of go-bytes

* implement new timeline code into more areas of codebase, pull in latest go-mangler, go-mutexes, go-structr

* remove old timeline package, add local timeline cache

* remove references to old timeline types that needed starting up in tests

* start adding page validation

* fix test-identified timeline cache package issues

* fix up more tests, fix missing required changes, etc

* add exclusion for test.out in gitignore

* clarify some things better in code comments

* tweak cache size limits

* fix list timeline cache fetching

* further list timeline fixes

* linter, ssssssssshhhhhhhhhhhh please

* fix linter hints

* reslice the output if it's beyond length of 'lim'

* remove old timeline initialization code, bump go-structr to v0.9.4

* continued from previous commit

* improved code comments

* don't allow multiple entries for BoostOfID values to prevent repeated boosts of same boosts

* finish writing more code comments

* some variable renaming, for ease of following

* change the way we update lo,hi paging values during timeline load

* improved code comments for updated / returned lo , hi paging values

* finish writing code comments for the StatusTimeline{} type itself

* fill in more code comments

* update go-structr version to latest with changed timeline unique indexing logic

* have a local and public timeline *per user*

* rewrite calls to public / local timeline calls

* remove the zero length check, as lo, hi values might still be set

* simplify timeline cache loading, fix lo/hi returns, fix timeline invalidation side-effects missing for some federated actions

* swap the lo, hi values 🤦

* add (now) missing slice reverse of tag timeline statuses when paging ASC

* remove local / public caches (is out of scope for this work), share more timeline code

* remove unnecessary change

* again, remove more unused code

* remove unused function to appease the linter

* move boost checking to prepare function

* fix use of timeline.lastOrder, fix incorrect range functions used

* remove comments for repeat code

* remove the boost logic from prepare function

* do a maximum of 5 loads, not 10

* add repeat boost filtering logic, update go-structr, general improvements

* more code comments

* add important note

* fix timeline tests now that timelines are returned in page order

* remove unused field

* add StatusTimeline{} tests

* add more status timeline tests

* start adding preloading support

* ensure repeat boosts are marked in preloaded entries

* share a bunch of the database load code in timeline cache, don't clear timelines on relationship change

* add logic to allow dynamic clear / preloading of timelines

* comment-out unused functions, but leave in place as we might end-up using them

* fix timeline preload state check

* much improved status timeline code comments

* more code comments, don't bother inserting statuses if timeline not preloaded

* shift around some logic to make sure things aren't accidentally left set

* finish writing code comments

* remove trim-after-insert behaviour

* fix-up some comments referring to old logic

* remove unsetting of lo, hi

* fix preload repeatBoost checking logic

* don't return on status filter errors, these are usually transient

* better concurrency safety in Clear() and Done()

* fix test broken due to addition of preloader

* fix repeatBoost logic that doesn't account for already-hidden repeatBoosts

* ensure edit submodels are dropped on cache insertion

* update code-comment to expand CAS accronym

* use a plus1hULID() instead of 24h

* remove unused functions

* add note that public / local timeline requester can be nil

* fix incorrect visibility filtering of tag timeline statuses

* ensure we filter home timeline statuses on local only

* some small re-orderings to confirm query params in correct places

* fix the local only home timeline filter func
2025-04-26 09:56:15 +00:00

657 lines
19 KiB
Go

// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package workers
import (
"context"
"errors"
apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/gtscontext"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/log"
"github.com/superseriousbusiness/gotosocial/internal/processing/account"
"github.com/superseriousbusiness/gotosocial/internal/processing/media"
"github.com/superseriousbusiness/gotosocial/internal/state"
"github.com/superseriousbusiness/gotosocial/internal/typeutils"
"github.com/superseriousbusiness/gotosocial/internal/util"
)
// util provides util functions used by both
// the fromClientAPI and fromFediAPI functions.
type utils struct {
state *state.State
media *media.Processor
account *account.Processor
surface *Surface
converter *typeutils.Converter
}
// wipeStatus encapsulates common logic used to
// totally delete a status + all its attachments,
// notifications, boosts, and timeline entries.
//
// If deleteAttachments is true, then any status
// attachments will also be deleted, else they
// will just be detached.
//
// If copyToSinBin is true, then a version of the
// status will be put in the `sin_bin_statuses`
// table prior to deletion.
func (u *utils) wipeStatus(
ctx context.Context,
status *gtsmodel.Status,
deleteAttachments bool,
copyToSinBin bool,
) error {
var errs gtserror.MultiError
if copyToSinBin {
// Copy this status to the sin bin before we delete it.
sbStatus, err := u.converter.StatusToSinBinStatus(ctx, status)
if err != nil {
errs.Appendf("error converting status to sinBinStatus: %w", err)
} else {
if err := u.state.DB.PutSinBinStatus(ctx, sbStatus); err != nil {
errs.Appendf("db error storing sinBinStatus: %w", err)
}
}
}
// Before handling media, ensure
// historic edits are populated.
if !status.EditsPopulated() {
var err error
// Fetch all historical edits of status from database.
status.Edits, err = u.state.DB.GetStatusEditsByIDs(
gtscontext.SetBarebones(ctx),
status.EditIDs,
)
if err != nil {
errs.Appendf("error getting status edits from database: %w", err)
}
}
// Either delete all attachments for this status,
// or simply detach + clean them separately later.
//
// Reason to detach rather than delete is that
// the author might want to reattach them to another
// status immediately (in case of delete + redraft).
if deleteAttachments {
// todo:u.state.DB.DeleteAttachmentsForStatus
for _, id := range status.AllAttachmentIDs() {
if err := u.media.Delete(ctx, id); err != nil {
errs.Appendf("error deleting media: %w", err)
}
}
} else {
// todo:u.state.DB.UnattachAttachmentsForStatus
for _, id := range status.AllAttachmentIDs() {
if _, err := u.media.Unattach(ctx, status.Account, id); err != nil {
errs.Appendf("error unattaching media: %w", err)
}
}
}
// Delete all historical edits of status.
if ids := status.EditIDs; len(ids) > 0 {
if err := u.state.DB.DeleteStatusEdits(ctx, ids); err != nil {
errs.Appendf("error deleting status edits: %w", err)
}
}
// Delete all mentions generated by this status.
// todo:u.state.DB.DeleteMentionsForStatus
for _, id := range status.MentionIDs {
if err := u.state.DB.DeleteMentionByID(ctx, id); err != nil {
errs.Appendf("error deleting status mention: %w", err)
}
}
// Delete all notifications generated by this status.
if err := u.state.DB.DeleteNotificationsForStatus(ctx, status.ID); err != nil {
errs.Appendf("error deleting status notifications: %w", err)
}
// Delete all bookmarks of this status.
if err := u.state.DB.DeleteStatusBookmarksForStatus(ctx, status.ID); err != nil {
errs.Appendf("error deleting status bookmarks: %w", err)
}
// Delete all faves of this status.
if err := u.state.DB.DeleteStatusFavesForStatus(ctx, status.ID); err != nil {
errs.Appendf("error deleting status faves: %w", err)
}
if id := status.PollID; id != "" {
// Delete this poll by ID from the database.
if err := u.state.DB.DeletePollByID(ctx, id); err != nil {
errs.Appendf("error deleting status poll: %w", err)
}
// Cancel any scheduled expiry task for poll.
_ = u.state.Workers.Scheduler.Cancel(id)
}
// Get all boost of this status so that we can
// delete those boosts + remove them from timelines.
boosts, err := u.state.DB.GetStatusBoosts(
// We MUST set a barebones context here,
// as depending on where it came from the
// original BoostOf may already be gone.
gtscontext.SetBarebones(ctx),
status.ID)
if err != nil {
errs.Appendf("error fetching status boosts: %w", err)
}
for _, boost := range boosts {
// Delete the boost itself.
if err := u.state.DB.DeleteStatusByID(ctx, boost.ID); err != nil {
errs.Appendf("error deleting boost: %w", err)
}
// Remove the boost from any and all timelines.
u.surface.deleteStatusFromTimelines(ctx, boost.ID)
}
// Delete the status itself from any and all timelines.
u.surface.deleteStatusFromTimelines(ctx, status.ID)
// Delete this status from any conversations it's part of.
if err := u.state.DB.DeleteStatusFromConversations(ctx, status.ID); err != nil {
errs.Appendf("error deleting status from conversations: %w", err)
}
// Finally delete the status itself.
if err := u.state.DB.DeleteStatusByID(ctx, status.ID); err != nil {
errs.Appendf("error deleting status: %w", err)
}
return errs.Combine()
}
// redirectFollowers redirects all local
// followers of originAcct to targetAcct.
//
// Both accounts must be fully dereferenced
// already, and the Move must be valid.
//
// Return bool will be true if all goes OK.
func (u *utils) redirectFollowers(
ctx context.Context,
originAcct *gtsmodel.Account,
targetAcct *gtsmodel.Account,
) bool {
// Any local followers of originAcct should
// send follow requests to targetAcct instead,
// and have followers of originAcct removed.
//
// Select local followers with barebones, since
// we only need follow.Account and we can get
// that ourselves.
followers, err := u.state.DB.GetAccountLocalFollowers(
gtscontext.SetBarebones(ctx),
originAcct.ID,
)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
log.Errorf(ctx,
"db error getting follows targeting originAcct: %v",
err,
)
return false
}
for _, follow := range followers {
// Fetch the local account that
// owns the follow targeting originAcct.
if follow.Account, err = u.state.DB.GetAccountByID(
gtscontext.SetBarebones(ctx),
follow.AccountID,
); err != nil {
log.Errorf(ctx,
"db error getting follow account %s: %v",
follow.AccountID, err,
)
return false
}
// Use the account processor FollowCreate
// function to send off the new follow,
// carrying over the Reblogs and Notify
// values from the old follow to the new.
//
// This will also handle cases where our
// account has already followed the target
// account, by just updating the existing
// follow of target account.
//
// Also, ensure new follow wouldn't be a
// self follow, since that will error.
if follow.AccountID != targetAcct.ID {
if _, err := u.account.FollowCreate(
ctx,
follow.Account,
&apimodel.AccountFollowRequest{
ID: targetAcct.ID,
Reblogs: follow.ShowReblogs,
Notify: follow.Notify,
},
); err != nil {
log.Errorf(ctx,
"error creating new follow for account %s: %v",
follow.AccountID, err,
)
return false
}
}
// New follow is in the process of
// sending, remove the existing follow.
// This will send out an Undo Activity for each Follow.
if _, err := u.account.FollowRemove(
ctx,
follow.Account,
follow.TargetAccountID,
); err != nil {
log.Errorf(ctx,
"error removing old follow for account %s: %v",
follow.AccountID, err,
)
return false
}
}
return true
}
func (u *utils) incrementStatusesCount(
ctx context.Context,
account *gtsmodel.Account,
status *gtsmodel.Status,
) error {
// Lock on this account since we're changing stats.
unlock := u.state.ProcessingLocks.Lock(account.URI)
defer unlock()
// Ensure account stats are populated.
if err := u.state.DB.PopulateAccountStats(ctx, account); err != nil {
return gtserror.Newf("db error getting account stats: %w", err)
}
// Update status meta for account.
*account.Stats.StatusesCount++
account.Stats.LastStatusAt = status.CreatedAt
// Update details in the database for stats.
if err := u.state.DB.UpdateAccountStats(ctx,
account.Stats,
"statuses_count",
"last_status_at",
); err != nil {
return gtserror.Newf("db error updating account stats: %w", err)
}
return nil
}
func (u *utils) decrementStatusesCount(
ctx context.Context,
account *gtsmodel.Account,
status *gtsmodel.Status,
) error {
// Lock on this account since we're changing stats.
unlock := u.state.ProcessingLocks.Lock(account.URI)
defer unlock()
// Ensure account stats are populated.
if err := u.state.DB.PopulateAccountStats(ctx, account); err != nil {
return gtserror.Newf("db error getting account stats: %w", err)
}
// Update status meta for account (safely checking for zero value).
*account.Stats.StatusesCount = util.Decr(*account.Stats.StatusesCount)
if !status.PinnedAt.IsZero() {
// Update status pinned count for account (safely checking for zero value).
*account.Stats.StatusesPinnedCount = util.Decr(*account.Stats.StatusesPinnedCount)
}
// Update details in the database for stats.
if err := u.state.DB.UpdateAccountStats(ctx,
account.Stats,
"statuses_count",
"statuses_pinned_count",
); err != nil {
return gtserror.Newf("db error updating account stats: %w", err)
}
return nil
}
func (u *utils) incrementFollowersCount(
ctx context.Context,
account *gtsmodel.Account,
) error {
// Lock on this account since we're changing stats.
unlock := u.state.ProcessingLocks.Lock(account.URI)
defer unlock()
// Ensure account stats are populated.
if err := u.state.DB.PopulateAccountStats(ctx, account); err != nil {
return gtserror.Newf("db error getting account stats: %w", err)
}
// Update stats by incrementing followers
// count by one and setting last posted.
*account.Stats.FollowersCount++
if err := u.state.DB.UpdateAccountStats(
ctx,
account.Stats,
"followers_count",
); err != nil {
return gtserror.Newf("db error updating account stats: %w", err)
}
return nil
}
func (u *utils) decrementFollowersCount(
ctx context.Context,
account *gtsmodel.Account,
) error {
// Lock on this account since we're changing stats.
unlock := u.state.ProcessingLocks.Lock(account.URI)
defer unlock()
// Ensure account stats are populated.
if err := u.state.DB.PopulateAccountStats(ctx, account); err != nil {
return gtserror.Newf("db error getting account stats: %w", err)
}
// Update stats by decrementing
// followers count by one.
//
// Clamp to 0 to avoid funny business.
*account.Stats.FollowersCount--
if *account.Stats.FollowersCount < 0 {
*account.Stats.FollowersCount = 0
}
if err := u.state.DB.UpdateAccountStats(
ctx,
account.Stats,
"followers_count",
); err != nil {
return gtserror.Newf("db error updating account stats: %w", err)
}
return nil
}
func (u *utils) incrementFollowingCount(
ctx context.Context,
account *gtsmodel.Account,
) error {
// Lock on this account since we're changing stats.
unlock := u.state.ProcessingLocks.Lock(account.URI)
defer unlock()
// Ensure account stats are populated.
if err := u.state.DB.PopulateAccountStats(ctx, account); err != nil {
return gtserror.Newf("db error getting account stats: %w", err)
}
// Update stats by incrementing
// followers count by one.
*account.Stats.FollowingCount++
if err := u.state.DB.UpdateAccountStats(
ctx,
account.Stats,
"following_count",
); err != nil {
return gtserror.Newf("db error updating account stats: %w", err)
}
return nil
}
func (u *utils) decrementFollowingCount(
ctx context.Context,
account *gtsmodel.Account,
) error {
// Lock on this account since we're changing stats.
unlock := u.state.ProcessingLocks.Lock(account.URI)
defer unlock()
// Ensure account stats are populated.
if err := u.state.DB.PopulateAccountStats(ctx, account); err != nil {
return gtserror.Newf("db error getting account stats: %w", err)
}
// Update stats by decrementing
// following count by one.
//
// Clamp to 0 to avoid funny business.
*account.Stats.FollowingCount--
if *account.Stats.FollowingCount < 0 {
*account.Stats.FollowingCount = 0
}
if err := u.state.DB.UpdateAccountStats(
ctx,
account.Stats,
"following_count",
); err != nil {
return gtserror.Newf("db error updating account stats: %w", err)
}
return nil
}
func (u *utils) incrementFollowRequestsCount(
ctx context.Context,
account *gtsmodel.Account,
) error {
// Lock on this account since we're changing stats.
unlock := u.state.ProcessingLocks.Lock(account.URI)
defer unlock()
// Ensure account stats are populated.
if err := u.state.DB.PopulateAccountStats(ctx, account); err != nil {
return gtserror.Newf("db error getting account stats: %w", err)
}
// Update stats by incrementing
// follow requests count by one.
*account.Stats.FollowRequestsCount++
if err := u.state.DB.UpdateAccountStats(
ctx,
account.Stats,
"follow_requests_count",
); err != nil {
return gtserror.Newf("db error updating account stats: %w", err)
}
return nil
}
func (u *utils) decrementFollowRequestsCount(
ctx context.Context,
account *gtsmodel.Account,
) error {
// Lock on this account since we're changing stats.
unlock := u.state.ProcessingLocks.Lock(account.URI)
defer unlock()
// Ensure account stats are populated.
if err := u.state.DB.PopulateAccountStats(ctx, account); err != nil {
return gtserror.Newf("db error getting account stats: %w", err)
}
// Update stats by decrementing
// follow requests count by one.
//
// Clamp to 0 to avoid funny business.
*account.Stats.FollowRequestsCount--
if *account.Stats.FollowRequestsCount < 0 {
*account.Stats.FollowRequestsCount = 0
}
if err := u.state.DB.UpdateAccountStats(
ctx,
account.Stats,
"follow_requests_count",
); err != nil {
return gtserror.Newf("db error updating account stats: %w", err)
}
return nil
}
// requestFave stores an interaction request
// for the given fave, and notifies the interactee.
func (u *utils) requestFave(
ctx context.Context,
fave *gtsmodel.StatusFave,
) error {
// Only create interaction request
// if fave targets a local status.
if fave.Status == nil ||
!fave.Status.IsLocal() {
return nil
}
// Lock on the interaction URI.
unlock := u.state.ProcessingLocks.Lock(fave.URI)
defer unlock()
// Ensure no req with this URI exists already.
req, err := u.state.DB.GetInteractionRequestByInteractionURI(ctx, fave.URI)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
return gtserror.Newf("db error checking for existing interaction request: %w", err)
}
if req != nil {
// Interaction req already exists,
// no need to do anything else.
return nil
}
// Create + store new interaction request.
req = typeutils.StatusFaveToInteractionRequest(fave)
if err := u.state.DB.PutInteractionRequest(ctx, req); err != nil {
return gtserror.Newf("db error storing interaction request: %w", err)
}
// Notify *local* account of pending announce.
if err := u.surface.notifyPendingFave(ctx, fave); err != nil {
return gtserror.Newf("error notifying pending fave: %w", err)
}
return nil
}
// requestReply stores an interaction request
// for the given reply, and notifies the interactee.
func (u *utils) requestReply(
ctx context.Context,
reply *gtsmodel.Status,
) error {
// Only create interaction request if
// status replies to a local status.
if reply.InReplyTo == nil ||
!reply.InReplyTo.IsLocal() {
return nil
}
// Lock on the interaction URI.
unlock := u.state.ProcessingLocks.Lock(reply.URI)
defer unlock()
// Ensure no req with this URI exists already.
req, err := u.state.DB.GetInteractionRequestByInteractionURI(ctx, reply.URI)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
return gtserror.Newf("db error checking for existing interaction request: %w", err)
}
if req != nil {
// Interaction req already exists,
// no need to do anything else.
return nil
}
// Create + store interaction request.
req = typeutils.StatusToInteractionRequest(reply)
if err := u.state.DB.PutInteractionRequest(ctx, req); err != nil {
return gtserror.Newf("db error storing interaction request: %w", err)
}
// Notify *local* account of pending reply.
if err := u.surface.notifyPendingReply(ctx, reply); err != nil {
return gtserror.Newf("error notifying pending reply: %w", err)
}
return nil
}
// requestAnnounce stores an interaction request
// for the given announce, and notifies the interactee.
func (u *utils) requestAnnounce(
ctx context.Context,
boost *gtsmodel.Status,
) error {
// Only create interaction request if
// status announces a local status.
if boost.BoostOf == nil ||
!boost.BoostOf.IsLocal() {
return nil
}
// Lock on the interaction URI.
unlock := u.state.ProcessingLocks.Lock(boost.URI)
defer unlock()
// Ensure no req with this URI exists already.
req, err := u.state.DB.GetInteractionRequestByInteractionURI(ctx, boost.URI)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
return gtserror.Newf("db error checking for existing interaction request: %w", err)
}
if req != nil {
// Interaction req already exists,
// no need to do anything else.
return nil
}
// Create + store interaction request.
req = typeutils.StatusToInteractionRequest(boost)
if err := u.state.DB.PutInteractionRequest(ctx, req); err != nil {
return gtserror.Newf("db error storing interaction request: %w", err)
}
// Notify *local* account of pending announce.
if err := u.surface.notifyPendingAnnounce(ctx, boost); err != nil {
return gtserror.Newf("error notifying pending announce: %w", err)
}
return nil
}