From 6ac1dda96f2e0230159fdfcc05195a94bda36d21 Mon Sep 17 00:00:00 2001 From: kim <89579420+NyaaaWhatsUpDoc@users.noreply.github.com> Date: Fri, 10 Feb 2023 20:15:23 +0000 Subject: [PATCH] [chore] small changes missed in previous dereferencer.GetAccount() PRs (#1467) * small formatting changes, rewrite fetchRemoteMedia to use separate funcs + use mutex lock correctly * move url parsing before acquiring mutex locks * use wrapped mutexes to allow safe unlocking. (previously i did a fucky and passed mutex by value...) * remove unused code * use consistent map keying for dereferencing headers/avatars --------- Signed-off-by: kim --- internal/federation/dereferencing/account.go | 165 ++++++++++++------ .../federation/dereferencing/dereferencer.go | 47 ++--- internal/federation/dereferencing/emoji.go | 33 ++-- internal/federation/dereferencing/util.go | 31 ---- 4 files changed, 152 insertions(+), 124 deletions(-) delete mode 100644 internal/federation/dereferencing/util.go diff --git a/internal/federation/dereferencing/account.go b/internal/federation/dereferencing/account.go index 14864c1b5..82b69c7a1 100644 --- a/internal/federation/dereferencing/account.go +++ b/internal/federation/dereferencing/account.go @@ -146,6 +146,7 @@ func (d *deref) enrichAccount(ctx context.Context, requestUser string, uri *url. } } + // Pre-fetch a transport for requesting username, used by later deref procedures. transport, err := d.transportController.NewTransportForUsername(ctx, requestUser) if err != nil { return nil, fmt.Errorf("enrichAccount: couldn't create transport: %w", err) @@ -163,19 +164,14 @@ func (d *deref) enrichAccount(ctx context.Context, requestUser string, uri *url. if err == nil { if account.Domain != accDomain { - // We have the correct accountDomain now; if it was different from - // the account domain we were provided, do another db lookup to check - // if we already had the account in the db under the account domain we - // just discovered, otherwise we risk thinking this is a new account - // and trying to put it into the database again (which will cause issues). + // After webfinger, we now have correct account domain from which we can do a final DB check. alreadyAccount, err := d.db.GetAccountByUsernameDomain(ctx, account.Username, accDomain) if err != nil && !errors.Is(err, db.ErrNoEntries) { return nil, fmt.Errorf("enrichAccount: db err looking for account again after webfinger: %w", err) } if err == nil { - // We already had the account in the database; - // continue by enriching that one instead. + // Enrich existing account. account = alreadyAccount } } @@ -197,14 +193,14 @@ func (d *deref) enrichAccount(ctx context.Context, requestUser string, uri *url. } } - // Check whether this account URI is a blocked domain / subdomain + // Check whether this account URI is a blocked domain / subdomain. if blocked, err := d.db.IsDomainBlocked(ctx, uri.Host); err != nil { return nil, newErrDB(fmt.Errorf("enrichAccount: error checking blocked domain: %w", err)) } else if blocked { return nil, fmt.Errorf("enrichAccount: %s is blocked", uri.Host) } - // Mark deref+update handshake start + // Mark deref+update handshake start. d.startHandshake(requestUser, uri) defer d.stopHandshake(requestUser, uri) @@ -225,7 +221,7 @@ func (d *deref) enrichAccount(ctx context.Context, requestUser string, uri *url. if account.Username == "" { // No username was provided, so no webfinger was attempted earlier. // - // Now we have a username we can attempt it now, this ensures up-to-date accountdomain info. + // Now we have a username we can attempt it, this ensures up-to-date accountdomain info. accDomain, _, err := d.fingerRemoteAccount(ctx, transport, latestAcc.Username, uri.Host) if err == nil { @@ -238,32 +234,32 @@ func (d *deref) enrichAccount(ctx context.Context, requestUser string, uri *url. latestAcc.ID = account.ID latestAcc.FetchedAt = time.Now() - // Fetch latest account avatar only if remote URI has changed - if latestAcc.AvatarRemoteURL != "" && latestAcc.AvatarRemoteURL != account.AvatarRemoteURL { - d.dereferencingAvatarsLock.Lock() - newAvatarID, err := d.fetchRemoteAccountMedia(ctx, transport, latestAcc.AvatarRemoteURL, latestAcc.ID, d.dereferencingAvatars, true, false) - d.dereferencingAvatarsLock.Unlock() + // Use the existing account media attachments by default. + latestAcc.AvatarMediaAttachmentID = account.AvatarMediaAttachmentID + latestAcc.HeaderMediaAttachmentID = account.HeaderMediaAttachmentID + + if latestAcc.AvatarRemoteURL != account.AvatarRemoteURL && latestAcc.AvatarRemoteURL != "" { + // Account avatar URL has changed; fetch up-to-date copy and use new media ID. + latestAcc.AvatarMediaAttachmentID, err = d.fetchRemoteAccountAvatar(ctx, + transport, + latestAcc.AvatarRemoteURL, + latestAcc.ID, + ) if err != nil { log.Errorf("error fetching remote avatar for account %s: %v", uri, err) - } else { - latestAcc.AvatarMediaAttachmentID = newAvatarID } - } else { - latestAcc.AvatarMediaAttachmentID = account.AvatarMediaAttachmentID // no change / empty url } - // Fetch latest account header only if remote URI has changed - if latestAcc.AvatarRemoteURL != "" && latestAcc.AvatarRemoteURL != account.AvatarRemoteURL { - d.dereferencingHeadersLock.Lock() - newHeaderID, err := d.fetchRemoteAccountMedia(ctx, transport, latestAcc.HeaderRemoteURL, latestAcc.ID, d.dereferencingHeaders, false, true) - d.dereferencingHeadersLock.Unlock() + if latestAcc.HeaderRemoteURL != account.HeaderRemoteURL && latestAcc.HeaderRemoteURL != "" { + // Account header URL has changed; fetch up-to-date copy and use new media ID. + latestAcc.HeaderMediaAttachmentID, err = d.fetchRemoteAccountHeader(ctx, + transport, + latestAcc.HeaderRemoteURL, + latestAcc.ID, + ) if err != nil { log.Errorf("error fetching remote header for account %s: %v", uri, err) - } else { - latestAcc.HeaderMediaAttachmentID = newHeaderID } - } else { - latestAcc.HeaderMediaAttachmentID = account.HeaderMediaAttachmentID // no change / empty url } // Fetch the latest remote account emoji IDs used in account display name/bio. @@ -338,47 +334,106 @@ func (d *deref) dereferenceAccountable(ctx context.Context, transport transport. return nil, newErrWrongType(fmt.Errorf("DereferenceAccountable: type name %s not supported as Accountable", t.GetTypeName())) } -func (d *deref) fetchRemoteAccountMedia( - ctx context.Context, - transport transport.Transport, - mediaRemoteURL string, - targetAccountID string, - dereferencingMap map[string]*media.ProcessingMedia, - avatar bool, - header bool, -) (string, error) { - // first check if we're already processing this media - if alreadyProcessing, ok := dereferencingMap[targetAccountID]; ok { - // we're already on it, nothing else to do - return alreadyProcessing.AttachmentID(), nil - } - - avatarIRI, err := url.Parse(mediaRemoteURL) +func (d *deref) fetchRemoteAccountAvatar(ctx context.Context, tsport transport.Transport, avatarURL string, accountID string) (string, error) { + // Parse and validate provided media URL. + avatarURI, err := url.Parse(avatarURL) if err != nil { return "", err } - data := func(innerCtx context.Context) (io.ReadCloser, int64, error) { - return transport.DereferenceMedia(innerCtx, avatarIRI) + // Acquire lock for derefs map. + unlock := d.derefAvatarsMu.Lock() + defer unlock() + + if processing, ok := d.derefAvatars[avatarURL]; ok { + // we're already dereferencing it, nothing to do. + return processing.AttachmentID(), nil } - processingMedia, err := d.mediaManager.ProcessMedia(ctx, data, nil, targetAccountID, &media.AdditionalMediaInfo{ - RemoteURL: &mediaRemoteURL, - Avatar: &avatar, - Header: &header, + // Set the media data function to dereference avatar from URI. + data := func(ctx context.Context) (io.ReadCloser, int64, error) { + return tsport.DereferenceMedia(ctx, avatarURI) + } + + // Create new media processing request from the media manager instance. + processing, err := d.mediaManager.ProcessMedia(ctx, data, nil, accountID, &media.AdditionalMediaInfo{ + Avatar: func() *bool { v := false; return &v }(), + RemoteURL: &avatarURL, }) if err != nil { return "", err } - // store it in our map to indicate it's in process - dereferencingMap[targetAccountID] = processingMedia - defer delete(dereferencingMap, targetAccountID) - if _, err := processingMedia.LoadAttachment(ctx); err != nil { + // Store media in map to mark as processing. + d.derefAvatars[avatarURL] = processing + + // Unlock map. + unlock() + + defer func() { + // On exit safely remove media from map. + unlock := d.derefAvatarsMu.Lock() + delete(d.derefAvatars, avatarURL) + unlock() + }() + + // Start media attachment loading (blocking call). + if _, err := processing.LoadAttachment(ctx); err != nil { return "", err } - return processingMedia.AttachmentID(), nil + return processing.AttachmentID(), nil +} + +func (d *deref) fetchRemoteAccountHeader(ctx context.Context, tsport transport.Transport, headerURL string, accountID string) (string, error) { + // Parse and validate provided media URL. + headerURI, err := url.Parse(headerURL) + if err != nil { + return "", err + } + + // Acquire lock for derefs map. + unlock := d.derefHeadersMu.Lock() + defer unlock() + + if processing, ok := d.derefHeaders[headerURL]; ok { + // we're already dereferencing it, nothing to do. + return processing.AttachmentID(), nil + } + + // Set the media data function to dereference header from URI. + data := func(ctx context.Context) (io.ReadCloser, int64, error) { + return tsport.DereferenceMedia(ctx, headerURI) + } + + // Create new media processing request from the media manager instance. + processing, err := d.mediaManager.ProcessMedia(ctx, data, nil, accountID, &media.AdditionalMediaInfo{ + Header: func() *bool { v := true; return &v }(), + RemoteURL: &headerURL, + }) + if err != nil { + return "", err + } + + // Store media in map to mark as processing. + d.derefHeaders[headerURL] = processing + + // Unlock map. + unlock() + + defer func() { + // On exit safely remove media from map. + unlock := d.derefHeadersMu.Lock() + delete(d.derefHeaders, headerURL) + unlock() + }() + + // Start media attachment loading (blocking call). + if _, err := processing.LoadAttachment(ctx); err != nil { + return "", err + } + + return processing.AttachmentID(), nil } func (d *deref) fetchRemoteAccountEmojis(ctx context.Context, targetAccount *gtsmodel.Account, requestingUsername string) (bool, error) { diff --git a/internal/federation/dereferencing/dereferencer.go b/internal/federation/dereferencing/dereferencer.go index 122cbb59c..05a7c39c2 100644 --- a/internal/federation/dereferencing/dereferencer.go +++ b/internal/federation/dereferencing/dereferencer.go @@ -23,6 +23,7 @@ import ( "net/url" "sync" + "codeberg.org/gruf/go-mutexes" "github.com/superseriousbusiness/gotosocial/internal/ap" "github.com/superseriousbusiness/gotosocial/internal/db" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" @@ -58,30 +59,36 @@ type Dereferencer interface { } type deref struct { - db db.DB - typeConverter typeutils.TypeConverter - transportController transport.Controller - mediaManager media.Manager - dereferencingAvatars map[string]*media.ProcessingMedia - dereferencingAvatarsLock sync.Mutex - dereferencingHeaders map[string]*media.ProcessingMedia - dereferencingHeadersLock sync.Mutex - dereferencingEmojis map[string]*media.ProcessingEmoji - dereferencingEmojisLock sync.Mutex - handshakes map[string][]*url.URL - handshakeSync sync.Mutex // mutex to lock/unlock when checking or updating the handshakes map + db db.DB + typeConverter typeutils.TypeConverter + transportController transport.Controller + mediaManager media.Manager + derefAvatars map[string]*media.ProcessingMedia + derefAvatarsMu mutexes.Mutex + derefHeaders map[string]*media.ProcessingMedia + derefHeadersMu mutexes.Mutex + derefEmojis map[string]*media.ProcessingEmoji + derefEmojisMu mutexes.Mutex + handshakes map[string][]*url.URL + handshakeSync sync.Mutex // mutex to lock/unlock when checking or updating the handshakes map } // NewDereferencer returns a Dereferencer initialized with the given parameters. func NewDereferencer(db db.DB, typeConverter typeutils.TypeConverter, transportController transport.Controller, mediaManager media.Manager) Dereferencer { return &deref{ - db: db, - typeConverter: typeConverter, - transportController: transportController, - mediaManager: mediaManager, - dereferencingAvatars: make(map[string]*media.ProcessingMedia), - dereferencingHeaders: make(map[string]*media.ProcessingMedia), - dereferencingEmojis: make(map[string]*media.ProcessingEmoji), - handshakes: make(map[string][]*url.URL), + db: db, + typeConverter: typeConverter, + transportController: transportController, + mediaManager: mediaManager, + derefAvatars: make(map[string]*media.ProcessingMedia), + derefHeaders: make(map[string]*media.ProcessingMedia), + derefEmojis: make(map[string]*media.ProcessingEmoji), + handshakes: make(map[string][]*url.URL), + + // use wrapped mutexes to allow safely deferring unlock + // even when more granular locks are required (only unlocks once). + derefAvatarsMu: mutexes.WithSafety(mutexes.New()), + derefHeadersMu: mutexes.WithSafety(mutexes.New()), + derefEmojisMu: mutexes.WithSafety(mutexes.New()), } } diff --git a/internal/federation/dereferencing/emoji.go b/internal/federation/dereferencing/emoji.go index 4a44d716a..3e9452f14 100644 --- a/internal/federation/dereferencing/emoji.go +++ b/internal/federation/dereferencing/emoji.go @@ -37,23 +37,23 @@ func (d *deref) GetRemoteEmoji(ctx context.Context, requestingUsername string, r processingEmoji *media.ProcessingEmoji ) - d.dereferencingEmojisLock.Lock() // LOCK HERE + // Acquire lock for derefs map. + unlock := d.derefEmojisMu.Lock() + defer unlock() // first check if we're already processing this emoji - if alreadyProcessing, ok := d.dereferencingEmojis[shortcodeDomain]; ok { + if alreadyProcessing, ok := d.derefEmojis[shortcodeDomain]; ok { // we're already on it, no worries processingEmoji = alreadyProcessing } else { // not processing it yet, let's start t, err := d.transportController.NewTransportForUsername(ctx, requestingUsername) if err != nil { - d.dereferencingEmojisLock.Unlock() return nil, fmt.Errorf("GetRemoteEmoji: error creating transport to fetch emoji %s: %s", shortcodeDomain, err) } derefURI, err := url.Parse(remoteURL) if err != nil { - d.dereferencingEmojisLock.Unlock() return nil, fmt.Errorf("GetRemoteEmoji: error parsing url for emoji %s: %s", shortcodeDomain, err) } @@ -63,29 +63,26 @@ func (d *deref) GetRemoteEmoji(ctx context.Context, requestingUsername string, r newProcessing, err := d.mediaManager.ProcessEmoji(ctx, dataFunc, nil, shortcode, id, emojiURI, ai, refresh) if err != nil { - d.dereferencingEmojisLock.Unlock() return nil, fmt.Errorf("GetRemoteEmoji: error processing emoji %s: %s", shortcodeDomain, err) } // store it in our map to indicate it's in process - d.dereferencingEmojis[shortcodeDomain] = newProcessing + d.derefEmojis[shortcodeDomain] = newProcessing processingEmoji = newProcessing } - d.dereferencingEmojisLock.Unlock() + // Unlock map. + unlock() - load := func(innerCtx context.Context) error { - _, err := processingEmoji.LoadEmoji(innerCtx) - return err - } + defer func() { + // On exit safely remove emoji from map. + unlock := d.derefEmojisMu.Lock() + delete(d.derefEmojis, shortcodeDomain) + unlock() + }() - cleanup := func() { - d.dereferencingEmojisLock.Lock() - delete(d.dereferencingHeaders, shortcodeDomain) - d.dereferencingEmojisLock.Unlock() - } - - if err := loadAndCleanup(ctx, load, cleanup); err != nil { + // Start emoji attachment loading (blocking call). + if _, err := processingEmoji.LoadEmoji(ctx); err != nil { return nil, err } diff --git a/internal/federation/dereferencing/util.go b/internal/federation/dereferencing/util.go deleted file mode 100644 index c2a1729be..000000000 --- a/internal/federation/dereferencing/util.go +++ /dev/null @@ -1,31 +0,0 @@ -/* - GoToSocial - Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU Affero General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License - along with this program. If not, see . -*/ - -package dereferencing - -import ( - "context" -) - -func loadAndCleanup(ctx context.Context, load func(ctx context.Context) error, cleanup func()) error { - // whatever happens, clean up when we're done - defer cleanup() - - // try and load - return load(ctx) -}