diff --git a/cmd/gotosocial/action/server/server.go b/cmd/gotosocial/action/server/server.go index 844d46ca4..703df5e4b 100644 --- a/cmd/gotosocial/action/server/server.go +++ b/cmd/gotosocial/action/server/server.go @@ -271,7 +271,7 @@ var Start action.GTSAction = func(ctx context.Context) error { intFilter := interaction.NewFilter(state) spamFilter := spam.NewFilter(state) federatingDB := federatingdb.New(state, typeConverter, visFilter, intFilter, spamFilter) - transportController := transport.NewController(state, federatingDB, &federation.Clock{}, client) + transportController := transport.NewController(state, federatingDB, client) federator := federation.NewFederator( state, federatingDB, diff --git a/internal/federation/federatingdb/accept.go b/internal/federation/federatingdb/accept.go index 273b9255f..2e4948a0e 100644 --- a/internal/federation/federatingdb/accept.go +++ b/internal/federation/federatingdb/accept.go @@ -35,7 +35,7 @@ import ( "code.superseriousbusiness.org/gotosocial/internal/util" ) -func (f *federatingDB) GetAccept( +func (f *DB) GetAccept( ctx context.Context, acceptIRI *url.URL, ) (vocab.ActivityStreamsAccept, error) { @@ -46,7 +46,7 @@ func (f *federatingDB) GetAccept( return f.converter.InteractionReqToASAccept(ctx, approval) } -func (f *federatingDB) Accept(ctx context.Context, accept vocab.ActivityStreamsAccept) error { +func (f *DB) Accept(ctx context.Context, accept vocab.ActivityStreamsAccept) error { log.DebugKV(ctx, "accept", serialize{accept}) activityContext := getActivityContext(ctx) @@ -202,7 +202,7 @@ func (f *federatingDB) Accept(ctx context.Context, accept vocab.ActivityStreamsA return nil } -func (f *federatingDB) acceptFollowType( +func (f *DB) acceptFollowType( ctx context.Context, asType vocab.Type, receivingAcct *gtsmodel.Account, @@ -218,11 +218,6 @@ func (f *federatingDB) acceptFollowType( return gtserror.NewErrorInternalError(err) } - // Lock on the Follow URI - // as we may be updating it. - unlock := f.state.FedLocks.Lock(follow.URI) - defer unlock() - // Make sure the creator of the original follow // is the same as whatever inbox this landed in. if follow.AccountID != receivingAcct.ID { @@ -238,8 +233,7 @@ func (f *federatingDB) acceptFollowType( } // Accept and get the populated follow back. - follow, err = f.state.DB.AcceptFollowRequest( - ctx, + follow, err = f.state.DB.AcceptFollowRequest(ctx, follow.AccountID, follow.TargetAccountID, ) @@ -267,17 +261,12 @@ func (f *federatingDB) acceptFollowType( return nil } -func (f *federatingDB) acceptFollowIRI( +func (f *DB) acceptFollowIRI( ctx context.Context, objectIRI string, receivingAcct *gtsmodel.Account, requestingAcct *gtsmodel.Account, ) error { - // Lock on this potential Follow - // URI as we may be updating it. - unlock := f.state.FedLocks.Lock(objectIRI) - defer unlock() - // Get the follow req from the db. followReq, err := f.state.DB.GetFollowRequestByURI(ctx, objectIRI) if err != nil && !errors.Is(err, db.ErrNoEntries) { @@ -307,8 +296,7 @@ func (f *federatingDB) acceptFollowIRI( } // Accept and get the populated follow back. - follow, err := f.state.DB.AcceptFollowRequest( - ctx, + follow, err := f.state.DB.AcceptFollowRequest(ctx, followReq.AccountID, followReq.TargetAccountID, ) @@ -336,7 +324,7 @@ func (f *federatingDB) acceptFollowIRI( return nil } -func (f *federatingDB) acceptOtherIRI( +func (f *DB) acceptOtherIRI( ctx context.Context, acceptID *url.URL, accept vocab.ActivityStreamsAccept, @@ -419,7 +407,7 @@ func (f *federatingDB) acceptOtherIRI( return nil } -func (f *federatingDB) acceptStoredStatus( +func (f *DB) acceptStoredStatus( ctx context.Context, acceptID *url.URL, accept vocab.ActivityStreamsAccept, @@ -489,7 +477,7 @@ func (f *federatingDB) acceptStoredStatus( return nil } -func (f *federatingDB) acceptLikeIRI( +func (f *DB) acceptLikeIRI( ctx context.Context, acceptID *url.URL, accept vocab.ActivityStreamsAccept, diff --git a/internal/federation/federatingdb/announce.go b/internal/federation/federatingdb/announce.go index 2d467ea8c..d6cc2e5d5 100644 --- a/internal/federation/federatingdb/announce.go +++ b/internal/federation/federatingdb/announce.go @@ -29,7 +29,7 @@ import ( "code.superseriousbusiness.org/gotosocial/internal/messages" ) -func (f *federatingDB) Announce(ctx context.Context, announce vocab.ActivityStreamsAnnounce) error { +func (f *DB) Announce(ctx context.Context, announce vocab.ActivityStreamsAnnounce) error { log.DebugKV(ctx, "announce", serialize{announce}) activityContext := getActivityContext(ctx) diff --git a/internal/federation/federatingdb/block.go b/internal/federation/federatingdb/block.go index 54da2030a..2950aef3b 100644 --- a/internal/federation/federatingdb/block.go +++ b/internal/federation/federatingdb/block.go @@ -29,7 +29,7 @@ import ( "code.superseriousbusiness.org/gotosocial/internal/messages" ) -func (f *federatingDB) Block(ctx context.Context, blockable vocab.ActivityStreamsBlock) error { +func (f *DB) Block(ctx context.Context, blockable vocab.ActivityStreamsBlock) error { log.DebugKV(ctx, "block", serialize{blockable}) // Extract relevant values from passed ctx. diff --git a/internal/federation/federatingdb/create.go b/internal/federation/federatingdb/create.go index 2cfbb1d4c..8de137d6c 100644 --- a/internal/federation/federatingdb/create.go +++ b/internal/federation/federatingdb/create.go @@ -45,7 +45,7 @@ import ( // // Under certain conditions and network activities, Create may be called // multiple times for the same ActivityStreams object. -func (f *federatingDB) Create(ctx context.Context, asType vocab.Type) error { +func (f *DB) Create(ctx context.Context, asType vocab.Type) error { log.DebugKV(ctx, "create", serialize{asType}) // Cache entry for this activity type's ID for later @@ -126,7 +126,7 @@ func (f *federatingDB) Create(ctx context.Context, asType vocab.Type) error { // createPollOptionable handles a Create activity for a PollOptionable. // This function doesn't handle database insertion, only validation checks // before passing off to a worker for asynchronous processing. -func (f *federatingDB) createPollOptionables( +func (f *DB) createPollOptionables( ctx context.Context, receiver *gtsmodel.Account, requester *gtsmodel.Account, @@ -274,7 +274,7 @@ func (f *federatingDB) createPollOptionables( // This function won't insert anything in the database yet, // but will pass the Statusable (if appropriate) through to // the processor for further asynchronous processing. -func (f *federatingDB) createStatusable( +func (f *DB) createStatusable( ctx context.Context, receiver *gtsmodel.Account, requester *gtsmodel.Account, diff --git a/internal/federation/federatingdb/db.go b/internal/federation/federatingdb/db.go index 539305204..0c4d21c64 100644 --- a/internal/federation/federatingdb/db.go +++ b/internal/federation/federatingdb/db.go @@ -32,9 +32,10 @@ import ( "codeberg.org/gruf/go-cache/v3/simple" ) -// DB wraps the pub.Database interface with -// a couple of custom functions for GoToSocial. -type DB interface { +// Check our type's +// interface conformity. +var _ interface { + // Default // functionality. pub.Database @@ -55,11 +56,11 @@ type DB interface { */ GetAccept(ctx context.Context, acceptIRI *url.URL) (vocab.ActivityStreamsAccept, error) -} +} = &DB{} -// FederatingDB uses the given state interface -// to implement the go-fed pub.Database interface. -type federatingDB struct { +// DB uses the given state interface to +// implement the go-fed pub.Database interface. +type DB struct { state *state.State converter *typeutils.Converter visFilter *visibility.Filter @@ -79,8 +80,8 @@ func New( visFilter *visibility.Filter, intFilter *interaction.Filter, spamFilter *spam.Filter, -) DB { - fdb := federatingDB{ +) *DB { + fdb := DB{ state: state, converter: converter, visFilter: visFilter, @@ -93,6 +94,6 @@ func New( // storeActivityID stores an entry in the .activityIDs cache for this // type's JSON-LD ID, for later checks in Exist() to mark it as seen. -func (f *federatingDB) storeActivityID(asType vocab.Type) { +func (f *DB) storeActivityID(asType vocab.Type) { f.activityIDs.Set(ap.GetJSONLDId(asType).String(), struct{}{}) } diff --git a/internal/federation/federatingdb/delete.go b/internal/federation/federatingdb/delete.go index fb967fce9..17a25e956 100644 --- a/internal/federation/federatingdb/delete.go +++ b/internal/federation/federatingdb/delete.go @@ -36,7 +36,7 @@ import ( // Protocol instead call Update to create a Tombstone. // // The library makes this call only after acquiring a lock first. -func (f *federatingDB) Delete(ctx context.Context, id *url.URL) error { +func (f *DB) Delete(ctx context.Context, id *url.URL) error { log.DebugKV(ctx, "id", id) activityContext := getActivityContext(ctx) @@ -87,7 +87,7 @@ func (f *federatingDB) Delete(ctx context.Context, id *url.URL) error { return nil } -func (f *federatingDB) deleteAccount( +func (f *DB) deleteAccount( ctx context.Context, requesting *gtsmodel.Account, receiving *gtsmodel.Account, @@ -126,7 +126,7 @@ func (f *federatingDB) deleteAccount( return false, nil } -func (f *federatingDB) deleteStatus( +func (f *DB) deleteStatus( ctx context.Context, requesting *gtsmodel.Account, receiving *gtsmodel.Account, diff --git a/internal/federation/federatingdb/exists.go b/internal/federation/federatingdb/exists.go index ec996f72f..6d3a23e84 100644 --- a/internal/federation/federatingdb/exists.go +++ b/internal/federation/federatingdb/exists.go @@ -24,6 +24,6 @@ import ( // Exists is an implementation of pub.Database{}.Exists(), optimized specifically for // the only usecase in which go-fed/activity/pub actually calls it. Do not use otherwise! -func (f *federatingDB) Exists(ctx context.Context, id *url.URL) (exists bool, err error) { +func (f *DB) Exists(ctx context.Context, id *url.URL) (exists bool, err error) { return f.activityIDs.Has(id.String()), nil } diff --git a/internal/federation/federatingdb/federatingdb_test.go b/internal/federation/federatingdb/federatingdb_test.go index e36c31d79..1203cf6b8 100644 --- a/internal/federation/federatingdb/federatingdb_test.go +++ b/internal/federation/federatingdb/federatingdb_test.go @@ -37,7 +37,7 @@ type FederatingDBTestSuite struct { suite.Suite db db.DB tc *typeutils.Converter - federatingDB federatingdb.DB + federatingDB *federatingdb.DB state state.State testTokens map[string]*gtsmodel.Token diff --git a/internal/federation/federatingdb/flag.go b/internal/federation/federatingdb/flag.go index 1198e688a..67727e5d9 100644 --- a/internal/federation/federatingdb/flag.go +++ b/internal/federation/federatingdb/flag.go @@ -30,7 +30,7 @@ import ( "github.com/miekg/dns" ) -func (f *federatingDB) Flag(ctx context.Context, flaggable vocab.ActivityStreamsFlag) error { +func (f *DB) Flag(ctx context.Context, flaggable vocab.ActivityStreamsFlag) error { log.DebugKV(ctx, "flag", serialize{flaggable}) // Mark activity as handled. diff --git a/internal/federation/federatingdb/follow.go b/internal/federation/federatingdb/follow.go index f8ae8c580..9bd602489 100644 --- a/internal/federation/federatingdb/follow.go +++ b/internal/federation/federatingdb/follow.go @@ -29,7 +29,7 @@ import ( "code.superseriousbusiness.org/gotosocial/internal/messages" ) -func (f *federatingDB) Follow(ctx context.Context, followable vocab.ActivityStreamsFollow) error { +func (f *DB) Follow(ctx context.Context, followable vocab.ActivityStreamsFollow) error { log.DebugKV(ctx, "follow", serialize{followable}) // Mark activity as handled. diff --git a/internal/federation/federatingdb/followers.go b/internal/federation/federatingdb/followers.go index 4f8aaf459..a68050033 100644 --- a/internal/federation/federatingdb/followers.go +++ b/internal/federation/federatingdb/followers.go @@ -32,7 +32,7 @@ import ( // If modified, the library will then call Update. // // The library makes this call only after acquiring a lock first. -func (f *federatingDB) Followers(ctx context.Context, actorIRI *url.URL) (followers vocab.ActivityStreamsCollection, err error) { +func (f *DB) Followers(ctx context.Context, actorIRI *url.URL) (followers vocab.ActivityStreamsCollection, err error) { acct, err := f.state.DB.GetAccountByURI(ctx, actorIRI.String()) if err != nil { return nil, err diff --git a/internal/federation/federatingdb/following.go b/internal/federation/federatingdb/following.go index 6f65930bc..fd908c9ef 100644 --- a/internal/federation/federatingdb/following.go +++ b/internal/federation/federatingdb/following.go @@ -31,7 +31,7 @@ import ( // If modified, the library will then call Update. // // The library makes this call only after acquiring a lock first. -func (f *federatingDB) Following(ctx context.Context, actorIRI *url.URL) (following vocab.ActivityStreamsCollection, err error) { +func (f *DB) Following(ctx context.Context, actorIRI *url.URL) (following vocab.ActivityStreamsCollection, err error) { acct, err := f.state.DB.GetAccountByURI(ctx, actorIRI.String()) if err != nil { return nil, err diff --git a/internal/federation/federatingdb/get.go b/internal/federation/federatingdb/get.go index 92c2d1d8d..58dcfecdc 100644 --- a/internal/federation/federatingdb/get.go +++ b/internal/federation/federatingdb/get.go @@ -19,16 +19,19 @@ package federatingdb import ( "context" + "errors" "net/url" "code.superseriousbusiness.org/activity/streams/vocab" "code.superseriousbusiness.org/gotosocial/internal/config" - "code.superseriousbusiness.org/gotosocial/internal/db" + "code.superseriousbusiness.org/gotosocial/internal/gtscontext" "code.superseriousbusiness.org/gotosocial/internal/gtserror" "code.superseriousbusiness.org/gotosocial/internal/log" "code.superseriousbusiness.org/gotosocial/internal/uris" ) +var ErrNotImplemented = errors.New("not implemented") + // Get returns the database entry for the specified id. // // The library makes this call only after acquiring a lock first. @@ -48,30 +51,38 @@ import ( // // It may be useful in future to add more matching here so that more // stuff can be shortcutted by the dereferencer, saving HTTP calls. -func (f *federatingDB) Get(ctx context.Context, id *url.URL) (value vocab.Type, err error) { +func (f *DB) Get(ctx context.Context, id *url.URL) (value vocab.Type, err error) { log.DebugKV(ctx, "id", id) // Ensure our host, for safety. if id.Host != config.GetHost() { - return nil, gtserror.Newf("%s was not for our host", id.String()) + return nil, gtserror.Newf("%s was not for our host", id) } - if username, err := uris.ParseUserPath(id); err == nil && username != "" { - acct, err := f.state.DB.GetAccountByUsernameDomain(ctx, username, "") + if username, _ := uris.ParseUserPath(id); username != "" { + acct, err := f.state.DB.GetAccountByUsernameDomain( + gtscontext.SetBarebones(ctx), + username, + "", + ) if err != nil { return nil, err } return f.converter.AccountToAS(ctx, acct) - } else if _, statusID, err := uris.ParseStatusesPath(id); err == nil && statusID != "" { + } else if _, statusID, _ := uris.ParseStatusesPath(id); statusID != "" { status, err := f.state.DB.GetStatusByID(ctx, statusID) if err != nil { return nil, err } return f.converter.StatusToAS(ctx, status) - } else if username, err := uris.ParseFollowersPath(id); err == nil && username != "" { - acct, err := f.state.DB.GetAccountByUsernameDomain(ctx, username, "") + } else if username, _ := uris.ParseFollowersPath(id); username != "" { + acct, err := f.state.DB.GetAccountByUsernameDomain( + gtscontext.SetBarebones(ctx), + username, + "", + ) if err != nil { return nil, err } @@ -83,8 +94,12 @@ func (f *federatingDB) Get(ctx context.Context, id *url.URL) (value vocab.Type, return f.Followers(ctx, acctURI) - } else if username, err := uris.ParseFollowingPath(id); err == nil && username != "" { - acct, err := f.state.DB.GetAccountByUsernameDomain(ctx, username, "") + } else if username, _ := uris.ParseFollowingPath(id); username != "" { + acct, err := f.state.DB.GetAccountByUsernameDomain( + gtscontext.SetBarebones(ctx), + username, + "", + ) if err != nil { return nil, err } @@ -102,8 +117,5 @@ func (f *federatingDB) Get(ctx context.Context, id *url.URL) (value vocab.Type, // Nothing found, the caller // will have to deal with this. - return nil, gtserror.Newf( - "not implemented for %s: %w", - id.String(), db.ErrNoEntries, - ) + return nil, ErrNotImplemented } diff --git a/internal/federation/federatingdb/inbox.go b/internal/federation/federatingdb/inbox.go index 2fb0ebad9..8533fa9f4 100644 --- a/internal/federation/federatingdb/inbox.go +++ b/internal/federation/federatingdb/inbox.go @@ -35,7 +35,7 @@ import ( // The library makes this call only after acquiring a lock first. // // Implementation note: we have our own logic for inboxes so always return false here. -func (f *federatingDB) InboxContains(c context.Context, inbox, id *url.URL) (contains bool, err error) { +func (f *DB) InboxContains(c context.Context, inbox, id *url.URL) (contains bool, err error) { return false, nil } @@ -45,7 +45,7 @@ func (f *federatingDB) InboxContains(c context.Context, inbox, id *url.URL) (con // The library makes this call only after acquiring a lock first. // // Implementation note: we don't (yet) serve inboxes, so just return empty and nil here. -func (f *federatingDB) GetInbox(c context.Context, inboxIRI *url.URL) (inbox vocab.ActivityStreamsOrderedCollectionPage, err error) { +func (f *DB) GetInbox(c context.Context, inboxIRI *url.URL) (inbox vocab.ActivityStreamsOrderedCollectionPage, err error) { return streams.NewActivityStreamsOrderedCollectionPage(), nil } @@ -56,7 +56,7 @@ func (f *federatingDB) GetInbox(c context.Context, inboxIRI *url.URL) (inbox voc // The library makes this call only after acquiring a lock first. // // Implementation note: we don't allow inbox setting so just return nil here. -func (f *federatingDB) SetInbox(c context.Context, inbox vocab.ActivityStreamsOrderedCollectionPage) error { +func (f *DB) SetInbox(c context.Context, inbox vocab.ActivityStreamsOrderedCollectionPage) error { return nil } @@ -76,7 +76,7 @@ func (f *federatingDB) SetInbox(c context.Context, inbox vocab.ActivityStreamsOr // then each follower inbox IRI should be returned in the inboxIRIs slice. // // The library makes this call only after acquiring a lock first. -func (f *federatingDB) InboxesForIRI(c context.Context, iri *url.URL) (inboxIRIs []*url.URL, err error) { +func (f *DB) InboxesForIRI(c context.Context, iri *url.URL) (inboxIRIs []*url.URL, err error) { // check if this is a followers collection iri for a local account... if iri.Host == config.GetHost() && uris.IsFollowersPath(iri) { localAccountUsername, err := uris.ParseFollowersPath(iri) diff --git a/internal/federation/federatingdb/like.go b/internal/federation/federatingdb/like.go index 970ca53ef..debc343cf 100644 --- a/internal/federation/federatingdb/like.go +++ b/internal/federation/federatingdb/like.go @@ -31,7 +31,7 @@ import ( "code.superseriousbusiness.org/gotosocial/internal/messages" ) -func (f *federatingDB) Like(ctx context.Context, likeable vocab.ActivityStreamsLike) error { +func (f *DB) Like(ctx context.Context, likeable vocab.ActivityStreamsLike) error { log.DebugKV(ctx, "like", serialize{likeable}) // Mark activity as handled. diff --git a/internal/federation/federatingdb/liked.go b/internal/federation/federatingdb/liked.go index 9db3341f1..6e6f6209b 100644 --- a/internal/federation/federatingdb/liked.go +++ b/internal/federation/federatingdb/liked.go @@ -33,6 +33,6 @@ import ( // The library makes this call only after acquiring a lock first. // // Implementation note: we don't serve a Liked collection *yet* so just return an empty collection for now. -func (f *federatingDB) Liked(c context.Context, actorIRI *url.URL) (liked vocab.ActivityStreamsCollection, err error) { +func (f *DB) Liked(c context.Context, actorIRI *url.URL) (liked vocab.ActivityStreamsCollection, err error) { return streams.NewActivityStreamsCollection(), nil } diff --git a/internal/federation/federatingdb/lock.go b/internal/federation/federatingdb/lock.go index 5353aea91..a7b3f9950 100644 --- a/internal/federation/federatingdb/lock.go +++ b/internal/federation/federatingdb/lock.go @@ -33,6 +33,6 @@ import ( // processes require tight loops acquiring and releasing locks. // // Used to ensure race conditions in multiple requests do not occur. -func (f *federatingDB) Lock(c context.Context, id *url.URL) (func(), error) { - return f.state.FedLocks.Lock("federatingDB " + id.String()), nil // id should NEVER be nil. +func (f *DB) Lock(c context.Context, id *url.URL) (func(), error) { + return f.state.FedLocks.Lock("fdb:" + id.String()), nil // id should NEVER be nil. } diff --git a/internal/federation/federatingdb/move.go b/internal/federation/federatingdb/move.go index 2740d0de9..8a6a77ef1 100644 --- a/internal/federation/federatingdb/move.go +++ b/internal/federation/federatingdb/move.go @@ -35,7 +35,7 @@ import ( "code.superseriousbusiness.org/gotosocial/internal/messages" ) -func (f *federatingDB) Move(ctx context.Context, move vocab.ActivityStreamsMove) error { +func (f *DB) Move(ctx context.Context, move vocab.ActivityStreamsMove) error { log.DebugKV(ctx, "move", serialize{move}) // Mark activity as handled. diff --git a/internal/federation/federatingdb/outbox.go b/internal/federation/federatingdb/outbox.go index 8098dfa34..5ef111138 100644 --- a/internal/federation/federatingdb/outbox.go +++ b/internal/federation/federatingdb/outbox.go @@ -31,7 +31,7 @@ import ( // The library makes this call only after acquiring a lock first. // // Implementation note: we don't (yet) serve outboxes, so just return empty and nil here. -func (f *federatingDB) GetOutbox(ctx context.Context, outboxIRI *url.URL) (inbox vocab.ActivityStreamsOrderedCollectionPage, err error) { +func (f *DB) GetOutbox(ctx context.Context, outboxIRI *url.URL) (inbox vocab.ActivityStreamsOrderedCollectionPage, err error) { return streams.NewActivityStreamsOrderedCollectionPage(), nil } @@ -42,7 +42,7 @@ func (f *federatingDB) GetOutbox(ctx context.Context, outboxIRI *url.URL) (inbox // The library makes this call only after acquiring a lock first. // // Implementation note: we don't allow outbox setting so just return nil here. -func (f *federatingDB) SetOutbox(ctx context.Context, outbox vocab.ActivityStreamsOrderedCollectionPage) error { +func (f *DB) SetOutbox(ctx context.Context, outbox vocab.ActivityStreamsOrderedCollectionPage) error { return nil } @@ -50,7 +50,7 @@ func (f *federatingDB) SetOutbox(ctx context.Context, outbox vocab.ActivityStrea // actor's inbox IRI. // // The library makes this call only after acquiring a lock first. -func (f *federatingDB) OutboxForInbox(ctx context.Context, inboxIRI *url.URL) (outboxIRI *url.URL, err error) { +func (f *DB) OutboxForInbox(ctx context.Context, inboxIRI *url.URL) (outboxIRI *url.URL, err error) { acct, err := f.state.DB.GetOneAccountByInboxURI(ctx, inboxIRI.String()) if err != nil { return nil, err diff --git a/internal/federation/federatingdb/owns.go b/internal/federation/federatingdb/owns.go index 7d08a877f..4cf49c246 100644 --- a/internal/federation/federatingdb/owns.go +++ b/internal/federation/federatingdb/owns.go @@ -34,7 +34,7 @@ import ( // Owns returns true if the IRI belongs to this instance, and if // the database has an entry for the IRI. // The library makes this call only after acquiring a lock first. -func (f *federatingDB) Owns(ctx context.Context, id *url.URL) (bool, error) { +func (f *DB) Owns(ctx context.Context, id *url.URL) (bool, error) { log.DebugKV(ctx, "id", id) // if the id host isn't this instance host, we don't own this IRI @@ -150,7 +150,7 @@ func (f *federatingDB) Owns(ctx context.Context, id *url.URL) (bool, error) { return false, fmt.Errorf("could not match activityID: %s", id.String()) } -func (f *federatingDB) ownsLike(ctx context.Context, uri *url.URL) (bool, error) { +func (f *DB) ownsLike(ctx context.Context, uri *url.URL) (bool, error) { username, id, err := uris.ParseLikedPath(uri) if err != nil { return false, fmt.Errorf("error parsing Like path for url %s: %w", uri.String(), err) diff --git a/internal/federation/federatingdb/reject.go b/internal/federation/federatingdb/reject.go index 1c657a5a9..5ec3b1a27 100644 --- a/internal/federation/federatingdb/reject.go +++ b/internal/federation/federatingdb/reject.go @@ -33,7 +33,7 @@ import ( "code.superseriousbusiness.org/gotosocial/internal/uris" ) -func (f *federatingDB) Reject(ctx context.Context, reject vocab.ActivityStreamsReject) error { +func (f *DB) Reject(ctx context.Context, reject vocab.ActivityStreamsReject) error { log.DebugKV(ctx, "reject", serialize{reject}) activityContext := getActivityContext(ctx) @@ -122,7 +122,7 @@ func (f *federatingDB) Reject(ctx context.Context, reject vocab.ActivityStreamsR return nil } -func (f *federatingDB) rejectFollowType( +func (f *DB) rejectFollowType( ctx context.Context, asType vocab.Type, receivingAcct *gtsmodel.Account, @@ -138,11 +138,6 @@ func (f *federatingDB) rejectFollowType( return gtserror.NewErrorInternalError(err) } - // Lock on the Follow URI - // as we may be updating it. - unlock := f.state.FedLocks.Lock(follow.URI) - defer unlock() - // Make sure the creator of the original follow // is the same as whatever inbox this landed in. if follow.AccountID != receivingAcct.ID { @@ -158,8 +153,7 @@ func (f *federatingDB) rejectFollowType( } // Reject the follow. - err = f.state.DB.RejectFollowRequest( - ctx, + err = f.state.DB.RejectFollowRequest(ctx, follow.AccountID, follow.TargetAccountID, ) @@ -171,17 +165,12 @@ func (f *federatingDB) rejectFollowType( return nil } -func (f *federatingDB) rejectFollowIRI( +func (f *DB) rejectFollowIRI( ctx context.Context, objectIRI string, receivingAcct *gtsmodel.Account, requestingAcct *gtsmodel.Account, ) error { - // Lock on this potential Follow - // URI as we may be updating it. - unlock := f.state.FedLocks.Lock(objectIRI) - defer unlock() - // Get the follow req from the db. followReq, err := f.state.DB.GetFollowRequestByURI(ctx, objectIRI) if err != nil && !errors.Is(err, db.ErrNoEntries) { @@ -214,8 +203,7 @@ func (f *federatingDB) rejectFollowIRI( } // Reject the follow. - err = f.state.DB.RejectFollowRequest( - ctx, + err = f.state.DB.RejectFollowRequest(ctx, followReq.AccountID, followReq.TargetAccountID, ) @@ -227,7 +215,7 @@ func (f *federatingDB) rejectFollowIRI( return nil } -func (f *federatingDB) rejectStatusIRI( +func (f *DB) rejectStatusIRI( ctx context.Context, activityID string, objectIRI string, @@ -379,7 +367,7 @@ func (f *federatingDB) rejectStatusIRI( return nil } -func (f *federatingDB) rejectLikeIRI( +func (f *DB) rejectLikeIRI( ctx context.Context, activityID string, objectIRI string, diff --git a/internal/federation/federatingdb/undo.go b/internal/federation/federatingdb/undo.go index 5f3678cd8..23c4098dd 100644 --- a/internal/federation/federatingdb/undo.go +++ b/internal/federation/federatingdb/undo.go @@ -32,7 +32,7 @@ import ( "code.superseriousbusiness.org/gotosocial/internal/messages" ) -func (f *federatingDB) Undo(ctx context.Context, undo vocab.ActivityStreamsUndo) error { +func (f *DB) Undo(ctx context.Context, undo vocab.ActivityStreamsUndo) error { log.DebugKV(ctx, "undo", serialize{undo}) activityContext := getActivityContext(ctx) @@ -111,7 +111,7 @@ func (f *federatingDB) Undo(ctx context.Context, undo vocab.ActivityStreamsUndo) return nil } -func (f *federatingDB) undoFollow( +func (f *DB) undoFollow( ctx context.Context, receivingAcct *gtsmodel.Account, requestingAcct *gtsmodel.Account, @@ -149,11 +149,6 @@ func (f *federatingDB) undoFollow( return nil } - // Lock on the Follow URI - // as we may be updating it. - unlock := f.state.FedLocks.Lock(follow.URI) - defer unlock() - // Ensure addressee is follow target. if follow.TargetAccountID != receivingAcct.ID { const text = "receivingAcct was not Follow target" @@ -193,7 +188,7 @@ func (f *federatingDB) undoFollow( return nil } -func (f *federatingDB) undoLike( +func (f *DB) undoLike( ctx context.Context, receivingAcct *gtsmodel.Account, requestingAcct *gtsmodel.Account, @@ -293,7 +288,7 @@ func (f *federatingDB) undoLike( return nil } -func (f *federatingDB) undoBlock( +func (f *DB) undoBlock( ctx context.Context, receivingAcct *gtsmodel.Account, requestingAcct *gtsmodel.Account, @@ -363,7 +358,7 @@ func (f *federatingDB) undoBlock( return nil } -func (f *federatingDB) undoAnnounce( +func (f *DB) undoAnnounce( ctx context.Context, receivingAcct *gtsmodel.Account, requestingAcct *gtsmodel.Account, diff --git a/internal/federation/federatingdb/update.go b/internal/federation/federatingdb/update.go index 94471c33a..a24594a77 100644 --- a/internal/federation/federatingdb/update.go +++ b/internal/federation/federatingdb/update.go @@ -40,7 +40,7 @@ import ( // the entire value. // // The library makes this call only after acquiring a lock first. -func (f *federatingDB) Update(ctx context.Context, asType vocab.Type) error { +func (f *DB) Update(ctx context.Context, asType vocab.Type) error { log.DebugKV(ctx, "update", serialize{asType}) // Mark activity as handled. @@ -67,7 +67,7 @@ func (f *federatingDB) Update(ctx context.Context, asType vocab.Type) error { return nil } -func (f *federatingDB) updateAccountable(ctx context.Context, receivingAcct *gtsmodel.Account, requestingAcct *gtsmodel.Account, accountable ap.Accountable) error { +func (f *DB) updateAccountable(ctx context.Context, receivingAcct *gtsmodel.Account, requestingAcct *gtsmodel.Account, accountable ap.Accountable) error { // Extract AP URI of the updated Accountable model. idProp := accountable.GetJSONLDId() if idProp == nil || !idProp.IsIRI() { @@ -105,7 +105,7 @@ func (f *federatingDB) updateAccountable(ctx context.Context, receivingAcct *gts return nil } -func (f *federatingDB) updateStatusable(ctx context.Context, receivingAcct *gtsmodel.Account, requestingAcct *gtsmodel.Account, statusable ap.Statusable) error { +func (f *DB) updateStatusable(ctx context.Context, receivingAcct *gtsmodel.Account, requestingAcct *gtsmodel.Account, statusable ap.Statusable) error { // Extract AP URI of the updated model. idProp := statusable.GetJSONLDId() if idProp == nil || !idProp.IsIRI() { diff --git a/internal/federation/federatingdb/util.go b/internal/federation/federatingdb/util.go index 32aec51a5..f235e729e 100644 --- a/internal/federation/federatingdb/util.go +++ b/internal/federation/federatingdb/util.go @@ -20,7 +20,6 @@ package federatingdb import ( "context" "encoding/json" - "fmt" "net/url" "code.superseriousbusiness.org/activity/streams" @@ -65,16 +64,20 @@ func sameActor(actor1 vocab.ActivityStreamsActorProperty, actor2 vocab.ActivityS } for a1Iter := actor1.Begin(); a1Iter != actor1.End(); a1Iter = a1Iter.Next() { + a1IRI := a1Iter.GetIRI() + if a1IRI == nil { + return false + } + + a1IRIStr := a1IRI.String() for a2Iter := actor2.Begin(); a2Iter != actor2.End(); a2Iter = a2Iter.Next() { - if a1Iter.GetIRI() == nil { + a2IRI := a2Iter.GetIRI() + if a2IRI == nil { return false } - if a2Iter.GetIRI() == nil { - return false - } - - if a1Iter.GetIRI().String() == a2Iter.GetIRI().String() { + a2IRIStr := a2IRI.String() + if a1IRIStr == a2IRIStr { return true } } @@ -89,7 +92,7 @@ func sameActor(actor1 vocab.ActivityStreamsActorProperty, actor2 vocab.ActivityS // // The go-fed library will handle setting the 'id' property on the // activity or object provided with the value returned. -func (f *federatingDB) NewID(ctx context.Context, t vocab.Type) (idURL *url.URL, err error) { +func (f *DB) NewID(ctx context.Context, t vocab.Type) (idURL *url.URL, err error) { log.DebugKV(ctx, "newID", serialize{t}) // Most of our types set an ID already @@ -116,19 +119,18 @@ func (f *federatingDB) NewID(ctx context.Context, t vocab.Type) (idURL *url.URL, } // Default fallback behaviour: - // {proto}://{host}/{randomID} - newID, err := id.NewRandomULID() - if err != nil { - return nil, err - } - - return url.Parse(fmt.Sprintf("%s://%s/%s", config.GetProtocol(), config.GetHost(), newID)) + // {proto}://{host}/{newULID} + return &url.URL{ + Scheme: config.GetProtocol(), + Host: config.GetHost(), + Path: "/" + id.NewULID(), + }, nil } // ActorForOutbox fetches the local actor's IRI for the given outbox IRI. // // The library makes this call only after acquiring a lock first. -func (f *federatingDB) ActorForOutbox(ctx context.Context, outboxIRI *url.URL) (actorIRI *url.URL, err error) { +func (f *DB) ActorForOutbox(ctx context.Context, outboxIRI *url.URL) (actorIRI *url.URL, err error) { acct, err := f.state.DB.GetOneAccountByOutboxURI(ctx, outboxIRI.String()) if err != nil { return nil, err @@ -139,7 +141,7 @@ func (f *federatingDB) ActorForOutbox(ctx context.Context, outboxIRI *url.URL) ( // ActorForInbox fetches the local actor's IRI for the given inbox IRI. // // The library makes this call only after acquiring a lock first. -func (f *federatingDB) ActorForInbox(ctx context.Context, inboxIRI *url.URL) (actorIRI *url.URL, err error) { +func (f *DB) ActorForInbox(ctx context.Context, inboxIRI *url.URL) (actorIRI *url.URL, err error) { acct, err := f.state.DB.GetOneAccountByInboxURI(ctx, inboxIRI.String()) if err != nil { return nil, err @@ -148,7 +150,7 @@ func (f *federatingDB) ActorForInbox(ctx context.Context, inboxIRI *url.URL) (ac } // collectFollows takes a slice of iris and converts them into ActivityStreamsCollection of IRIs. -func (f *federatingDB) collectIRIs(_ context.Context, iris []*url.URL) (vocab.ActivityStreamsCollection, error) { +func (f *DB) collectIRIs(_ context.Context, iris []*url.URL) (vocab.ActivityStreamsCollection, error) { collection := streams.NewActivityStreamsCollection() items := streams.NewActivityStreamsItemsProperty() for _, i := range iris { diff --git a/internal/federation/federator.go b/internal/federation/federator.go index 93df31735..5f8324da2 100644 --- a/internal/federation/federator.go +++ b/internal/federation/federator.go @@ -37,7 +37,7 @@ var _ interface { type Federator struct { db db.DB - federatingDB federatingdb.DB + federatingDB *federatingdb.DB clock pub.Clock converter *typeutils.Converter transport transport.Controller @@ -54,7 +54,7 @@ type Federator struct { // NewFederator returns a new federator instance. func NewFederator( state *state.State, - federatingDB federatingdb.DB, + federatingDB *federatingdb.DB, transportController transport.Controller, converter *typeutils.Converter, visFilter *visibility.Filter, @@ -112,7 +112,7 @@ func (f *Federator) FederatingActor() pub.FederatingActor { } // FederatingDB returns the underlying FederatingDB interface. -func (f *Federator) FederatingDB() federatingdb.DB { +func (f *Federator) FederatingDB() *federatingdb.DB { return f.federatingDB } diff --git a/internal/processing/workers/fromfediapi_move.go b/internal/processing/workers/fromfediapi_move.go index e7c75950a..93e7b39a4 100644 --- a/internal/processing/workers/fromfediapi_move.go +++ b/internal/processing/workers/fromfediapi_move.go @@ -269,9 +269,8 @@ func (p *fediAPI) MoveAccount(ctx context.Context, fMsg *messages.FromFediAPI) e // try to send the same Move several times with // different IDs (you never know), but we only // want to process them based on origin + target. - unlock := p.state.FedLocks.Lock( - "move:" + originAcctURIStr + ":" + targetAcctURIStr, - ) + key := "move:" + originAcctURIStr + ":" + targetAcctURIStr + unlock := p.state.FedLocks.Lock(key) defer unlock() // Check if Move is rate limited based diff --git a/internal/transport/controller.go b/internal/transport/controller.go index 0f3c1c9b0..33b74c76e 100644 --- a/internal/transport/controller.go +++ b/internal/transport/controller.go @@ -23,6 +23,7 @@ import ( "crypto/rsa" "crypto/x509" "encoding/json" + "errors" "fmt" "io" "net/http" @@ -30,11 +31,12 @@ import ( "strconv" "code.superseriousbusiness.org/activity/pub" - "code.superseriousbusiness.org/activity/streams/vocab" "code.superseriousbusiness.org/gotosocial/internal/ap" apiutil "code.superseriousbusiness.org/gotosocial/internal/api/util" "code.superseriousbusiness.org/gotosocial/internal/config" + "code.superseriousbusiness.org/gotosocial/internal/db" "code.superseriousbusiness.org/gotosocial/internal/federation/federatingdb" + "code.superseriousbusiness.org/gotosocial/internal/gtserror" "code.superseriousbusiness.org/gotosocial/internal/state" "code.superseriousbusiness.org/gotosocial/internal/util" "codeberg.org/gruf/go-byteutil" @@ -52,15 +54,14 @@ type Controller interface { type controller struct { state *state.State - fedDB federatingdb.DB - clock pub.Clock + fedDB *federatingdb.DB client pub.HttpClient trspCache cache.TTLCache[string, *transport] userAgent string } // NewController returns an implementation of the Controller interface for creating new transports -func NewController(state *state.State, federatingDB federatingdb.DB, clock pub.Clock, client pub.HttpClient) Controller { +func NewController(state *state.State, federatingDB *federatingdb.DB, client pub.HttpClient) Controller { var ( host = config.GetHost() proto = config.GetProtocol() @@ -70,7 +71,6 @@ func NewController(state *state.State, federatingDB federatingdb.DB, clock pub.C c := &controller{ state: state, fedDB: federatingDB, - clock: clock, client: client, trspCache: cache.NewTTL[string, *transport](0, 100, 0), userAgent: fmt.Sprintf("gotosocial/%s (+%s://%s)", version, proto, host), @@ -153,37 +153,51 @@ func (c *controller) dereferenceLocal( ctx context.Context, uri *url.URL, ) (*http.Response, error) { - var ( - t vocab.Type - err error - ) - t, err = c.fedDB.Get(ctx, uri) - if err != nil { - // Don't check especially for - // db.ErrNoEntries, as we *want* - // to pass this back to the caller - // if we didn't get anything. - return nil, err + // Try fetch via federating DB. + t, err := c.fedDB.Get(ctx, uri) + + switch { + // No problem. + case err == nil: + + // Catch and handle objects not found. + case errors.Is(err, db.ErrNoEntries): + return &http.Response{ + Request: &http.Request{URL: uri}, + Status: http.StatusText(http.StatusNotFound), + StatusCode: http.StatusNotFound, + Header: map[string][]string{ + "Content-Type": {apiutil.AppActivityLDJSON}, + "Content-Length": {"0"}, + }, + }, nil + + // Any other. + default: + return nil, gtserror.Newf("error getting: %w", err) } if util.IsNil(t) { - // This should never happen. - panic("nil vocab.Type after successful c.fedDB.Get call") + // Assert this should never happen. + panic(gtserror.New("nil vocab.Type")) } - i, err := ap.Serialize(t) + // Serialize type to JSON map. + m, err := ap.Serialize(t) if err != nil { return nil, err } - b, err := json.Marshal(i) + // Marshal JSON to bytes. + b, err := json.Marshal(m) if err != nil { return nil, err } + + // Return a response + // with AS data as body. contentLength := len(b) - - // Return a response with AS data as body. rsp := &http.Response{ Request: &http.Request{URL: uri}, Status: http.StatusText(http.StatusOK), diff --git a/internal/transport/dereference.go b/internal/transport/dereference.go index a7ef83d3e..164b23cd6 100644 --- a/internal/transport/dereference.go +++ b/internal/transport/dereference.go @@ -25,7 +25,7 @@ import ( apiutil "code.superseriousbusiness.org/gotosocial/internal/api/util" "code.superseriousbusiness.org/gotosocial/internal/config" - "code.superseriousbusiness.org/gotosocial/internal/db" + "code.superseriousbusiness.org/gotosocial/internal/federation/federatingdb" "code.superseriousbusiness.org/gotosocial/internal/gtserror" "code.superseriousbusiness.org/gotosocial/internal/log" ) @@ -38,10 +38,8 @@ func (t *transport) Dereference(ctx context.Context, iri *url.URL) (*http.Respon // to just make a normal http request to ourself. if iri.Host == config.GetHost() { rsp, err := t.controller.dereferenceLocal(ctx, iri) - if err != nil && !errors.Is(err, db.ErrNoEntries) { - // Real error. - err := gtserror.Newf("error trying dereferenceLocal: %w", err) - return nil, err + if err != nil && !errors.Is(err, federatingdb.ErrNotImplemented) { + return nil, gtserror.Newf("error dereferencing local: %w", err) } if rsp != nil { diff --git a/testrig/federatingdb.go b/testrig/federatingdb.go index 28c6bd7b4..360f99db6 100644 --- a/testrig/federatingdb.go +++ b/testrig/federatingdb.go @@ -27,7 +27,7 @@ import ( ) // NewTestFederatingDB returns a federating DB with the underlying db -func NewTestFederatingDB(state *state.State) federatingdb.DB { +func NewTestFederatingDB(state *state.State) *federatingdb.DB { return federatingdb.New( state, typeutils.NewConverter(state), diff --git a/testrig/transportcontroller.go b/testrig/transportcontroller.go index 34ed35255..641232b73 100644 --- a/testrig/transportcontroller.go +++ b/testrig/transportcontroller.go @@ -31,7 +31,6 @@ import ( "code.superseriousbusiness.org/activity/streams/vocab" "code.superseriousbusiness.org/gotosocial/internal/ap" apimodel "code.superseriousbusiness.org/gotosocial/internal/api/model" - "code.superseriousbusiness.org/gotosocial/internal/federation" "code.superseriousbusiness.org/gotosocial/internal/gtsmodel" "code.superseriousbusiness.org/gotosocial/internal/httpclient" "code.superseriousbusiness.org/gotosocial/internal/log" @@ -56,7 +55,7 @@ const ( // PER TEST rather than per suite, so that the do function can be set on a test by test (or even more granular) // basis. func NewTestTransportController(state *state.State, client pub.HttpClient) transport.Controller { - return transport.NewController(state, NewTestFederatingDB(state), &federation.Clock{}, client) + return transport.NewController(state, NewTestFederatingDB(state), client) } type MockHTTPClient struct {