mirror of
https://github.com/superseriousbusiness/gotosocial
synced 2025-06-05 21:59:39 +02:00
[chore] various federatingdb tweaks (#4178)
after seeing a potential reported federating worker lockup i decided to start digging into the federatingdb code. this PR encompasses: - removes one of our last unused interface types `federatingdb.DB{}`, replacing it with a struct type `*federatingdb.DB{}` - in `transport.dereferenceLocal()` differentiates between an unsupported lookup type and ErrNoEntries to reduce unnecessary calls, and reduce potential lockups that may occur while trying to call our own endpoints that then call `federatingdb.Lock()` - removes a bunch of the locks on follow state changes since the DB already synchronizes that - removes the unnecessary `pub.Clock{}` struct field and type passed to the transport controller frankly it would be great if we could remove the locking in `federatingdb.Lock()` and instead handle it ourselves as it gets very confusing trying to figure out what functions will have locks held. but i guess that's one for when we move further away from the go-fed/activity/pub package usage. Reviewed-on: https://codeberg.org/superseriousbusiness/gotosocial/pulls/4178 Co-authored-by: kim <grufwub@gmail.com> Co-committed-by: kim <grufwub@gmail.com>
This commit is contained in:
@ -271,7 +271,7 @@ var Start action.GTSAction = func(ctx context.Context) error {
|
|||||||
intFilter := interaction.NewFilter(state)
|
intFilter := interaction.NewFilter(state)
|
||||||
spamFilter := spam.NewFilter(state)
|
spamFilter := spam.NewFilter(state)
|
||||||
federatingDB := federatingdb.New(state, typeConverter, visFilter, intFilter, spamFilter)
|
federatingDB := federatingdb.New(state, typeConverter, visFilter, intFilter, spamFilter)
|
||||||
transportController := transport.NewController(state, federatingDB, &federation.Clock{}, client)
|
transportController := transport.NewController(state, federatingDB, client)
|
||||||
federator := federation.NewFederator(
|
federator := federation.NewFederator(
|
||||||
state,
|
state,
|
||||||
federatingDB,
|
federatingDB,
|
||||||
|
@ -35,7 +35,7 @@ import (
|
|||||||
"code.superseriousbusiness.org/gotosocial/internal/util"
|
"code.superseriousbusiness.org/gotosocial/internal/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (f *federatingDB) GetAccept(
|
func (f *DB) GetAccept(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
acceptIRI *url.URL,
|
acceptIRI *url.URL,
|
||||||
) (vocab.ActivityStreamsAccept, error) {
|
) (vocab.ActivityStreamsAccept, error) {
|
||||||
@ -46,7 +46,7 @@ func (f *federatingDB) GetAccept(
|
|||||||
return f.converter.InteractionReqToASAccept(ctx, approval)
|
return f.converter.InteractionReqToASAccept(ctx, approval)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *federatingDB) Accept(ctx context.Context, accept vocab.ActivityStreamsAccept) error {
|
func (f *DB) Accept(ctx context.Context, accept vocab.ActivityStreamsAccept) error {
|
||||||
log.DebugKV(ctx, "accept", serialize{accept})
|
log.DebugKV(ctx, "accept", serialize{accept})
|
||||||
|
|
||||||
activityContext := getActivityContext(ctx)
|
activityContext := getActivityContext(ctx)
|
||||||
@ -202,7 +202,7 @@ func (f *federatingDB) Accept(ctx context.Context, accept vocab.ActivityStreamsA
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *federatingDB) acceptFollowType(
|
func (f *DB) acceptFollowType(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
asType vocab.Type,
|
asType vocab.Type,
|
||||||
receivingAcct *gtsmodel.Account,
|
receivingAcct *gtsmodel.Account,
|
||||||
@ -218,11 +218,6 @@ func (f *federatingDB) acceptFollowType(
|
|||||||
return gtserror.NewErrorInternalError(err)
|
return gtserror.NewErrorInternalError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Lock on the Follow URI
|
|
||||||
// as we may be updating it.
|
|
||||||
unlock := f.state.FedLocks.Lock(follow.URI)
|
|
||||||
defer unlock()
|
|
||||||
|
|
||||||
// Make sure the creator of the original follow
|
// Make sure the creator of the original follow
|
||||||
// is the same as whatever inbox this landed in.
|
// is the same as whatever inbox this landed in.
|
||||||
if follow.AccountID != receivingAcct.ID {
|
if follow.AccountID != receivingAcct.ID {
|
||||||
@ -238,8 +233,7 @@ func (f *federatingDB) acceptFollowType(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Accept and get the populated follow back.
|
// Accept and get the populated follow back.
|
||||||
follow, err = f.state.DB.AcceptFollowRequest(
|
follow, err = f.state.DB.AcceptFollowRequest(ctx,
|
||||||
ctx,
|
|
||||||
follow.AccountID,
|
follow.AccountID,
|
||||||
follow.TargetAccountID,
|
follow.TargetAccountID,
|
||||||
)
|
)
|
||||||
@ -267,17 +261,12 @@ func (f *federatingDB) acceptFollowType(
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *federatingDB) acceptFollowIRI(
|
func (f *DB) acceptFollowIRI(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
objectIRI string,
|
objectIRI string,
|
||||||
receivingAcct *gtsmodel.Account,
|
receivingAcct *gtsmodel.Account,
|
||||||
requestingAcct *gtsmodel.Account,
|
requestingAcct *gtsmodel.Account,
|
||||||
) error {
|
) error {
|
||||||
// Lock on this potential Follow
|
|
||||||
// URI as we may be updating it.
|
|
||||||
unlock := f.state.FedLocks.Lock(objectIRI)
|
|
||||||
defer unlock()
|
|
||||||
|
|
||||||
// Get the follow req from the db.
|
// Get the follow req from the db.
|
||||||
followReq, err := f.state.DB.GetFollowRequestByURI(ctx, objectIRI)
|
followReq, err := f.state.DB.GetFollowRequestByURI(ctx, objectIRI)
|
||||||
if err != nil && !errors.Is(err, db.ErrNoEntries) {
|
if err != nil && !errors.Is(err, db.ErrNoEntries) {
|
||||||
@ -307,8 +296,7 @@ func (f *federatingDB) acceptFollowIRI(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Accept and get the populated follow back.
|
// Accept and get the populated follow back.
|
||||||
follow, err := f.state.DB.AcceptFollowRequest(
|
follow, err := f.state.DB.AcceptFollowRequest(ctx,
|
||||||
ctx,
|
|
||||||
followReq.AccountID,
|
followReq.AccountID,
|
||||||
followReq.TargetAccountID,
|
followReq.TargetAccountID,
|
||||||
)
|
)
|
||||||
@ -336,7 +324,7 @@ func (f *federatingDB) acceptFollowIRI(
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *federatingDB) acceptOtherIRI(
|
func (f *DB) acceptOtherIRI(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
acceptID *url.URL,
|
acceptID *url.URL,
|
||||||
accept vocab.ActivityStreamsAccept,
|
accept vocab.ActivityStreamsAccept,
|
||||||
@ -419,7 +407,7 @@ func (f *federatingDB) acceptOtherIRI(
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *federatingDB) acceptStoredStatus(
|
func (f *DB) acceptStoredStatus(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
acceptID *url.URL,
|
acceptID *url.URL,
|
||||||
accept vocab.ActivityStreamsAccept,
|
accept vocab.ActivityStreamsAccept,
|
||||||
@ -489,7 +477,7 @@ func (f *federatingDB) acceptStoredStatus(
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *federatingDB) acceptLikeIRI(
|
func (f *DB) acceptLikeIRI(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
acceptID *url.URL,
|
acceptID *url.URL,
|
||||||
accept vocab.ActivityStreamsAccept,
|
accept vocab.ActivityStreamsAccept,
|
||||||
|
@ -29,7 +29,7 @@ import (
|
|||||||
"code.superseriousbusiness.org/gotosocial/internal/messages"
|
"code.superseriousbusiness.org/gotosocial/internal/messages"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (f *federatingDB) Announce(ctx context.Context, announce vocab.ActivityStreamsAnnounce) error {
|
func (f *DB) Announce(ctx context.Context, announce vocab.ActivityStreamsAnnounce) error {
|
||||||
log.DebugKV(ctx, "announce", serialize{announce})
|
log.DebugKV(ctx, "announce", serialize{announce})
|
||||||
|
|
||||||
activityContext := getActivityContext(ctx)
|
activityContext := getActivityContext(ctx)
|
||||||
|
@ -29,7 +29,7 @@ import (
|
|||||||
"code.superseriousbusiness.org/gotosocial/internal/messages"
|
"code.superseriousbusiness.org/gotosocial/internal/messages"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (f *federatingDB) Block(ctx context.Context, blockable vocab.ActivityStreamsBlock) error {
|
func (f *DB) Block(ctx context.Context, blockable vocab.ActivityStreamsBlock) error {
|
||||||
log.DebugKV(ctx, "block", serialize{blockable})
|
log.DebugKV(ctx, "block", serialize{blockable})
|
||||||
|
|
||||||
// Extract relevant values from passed ctx.
|
// Extract relevant values from passed ctx.
|
||||||
|
@ -45,7 +45,7 @@ import (
|
|||||||
//
|
//
|
||||||
// Under certain conditions and network activities, Create may be called
|
// Under certain conditions and network activities, Create may be called
|
||||||
// multiple times for the same ActivityStreams object.
|
// multiple times for the same ActivityStreams object.
|
||||||
func (f *federatingDB) Create(ctx context.Context, asType vocab.Type) error {
|
func (f *DB) Create(ctx context.Context, asType vocab.Type) error {
|
||||||
log.DebugKV(ctx, "create", serialize{asType})
|
log.DebugKV(ctx, "create", serialize{asType})
|
||||||
|
|
||||||
// Cache entry for this activity type's ID for later
|
// Cache entry for this activity type's ID for later
|
||||||
@ -126,7 +126,7 @@ func (f *federatingDB) Create(ctx context.Context, asType vocab.Type) error {
|
|||||||
// createPollOptionable handles a Create activity for a PollOptionable.
|
// createPollOptionable handles a Create activity for a PollOptionable.
|
||||||
// This function doesn't handle database insertion, only validation checks
|
// This function doesn't handle database insertion, only validation checks
|
||||||
// before passing off to a worker for asynchronous processing.
|
// before passing off to a worker for asynchronous processing.
|
||||||
func (f *federatingDB) createPollOptionables(
|
func (f *DB) createPollOptionables(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
receiver *gtsmodel.Account,
|
receiver *gtsmodel.Account,
|
||||||
requester *gtsmodel.Account,
|
requester *gtsmodel.Account,
|
||||||
@ -274,7 +274,7 @@ func (f *federatingDB) createPollOptionables(
|
|||||||
// This function won't insert anything in the database yet,
|
// This function won't insert anything in the database yet,
|
||||||
// but will pass the Statusable (if appropriate) through to
|
// but will pass the Statusable (if appropriate) through to
|
||||||
// the processor for further asynchronous processing.
|
// the processor for further asynchronous processing.
|
||||||
func (f *federatingDB) createStatusable(
|
func (f *DB) createStatusable(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
receiver *gtsmodel.Account,
|
receiver *gtsmodel.Account,
|
||||||
requester *gtsmodel.Account,
|
requester *gtsmodel.Account,
|
||||||
|
@ -32,9 +32,10 @@ import (
|
|||||||
"codeberg.org/gruf/go-cache/v3/simple"
|
"codeberg.org/gruf/go-cache/v3/simple"
|
||||||
)
|
)
|
||||||
|
|
||||||
// DB wraps the pub.Database interface with
|
// Check our type's
|
||||||
// a couple of custom functions for GoToSocial.
|
// interface conformity.
|
||||||
type DB interface {
|
var _ interface {
|
||||||
|
|
||||||
// Default
|
// Default
|
||||||
// functionality.
|
// functionality.
|
||||||
pub.Database
|
pub.Database
|
||||||
@ -55,11 +56,11 @@ type DB interface {
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
GetAccept(ctx context.Context, acceptIRI *url.URL) (vocab.ActivityStreamsAccept, error)
|
GetAccept(ctx context.Context, acceptIRI *url.URL) (vocab.ActivityStreamsAccept, error)
|
||||||
}
|
} = &DB{}
|
||||||
|
|
||||||
// FederatingDB uses the given state interface
|
// DB uses the given state interface to
|
||||||
// to implement the go-fed pub.Database interface.
|
// implement the go-fed pub.Database interface.
|
||||||
type federatingDB struct {
|
type DB struct {
|
||||||
state *state.State
|
state *state.State
|
||||||
converter *typeutils.Converter
|
converter *typeutils.Converter
|
||||||
visFilter *visibility.Filter
|
visFilter *visibility.Filter
|
||||||
@ -79,8 +80,8 @@ func New(
|
|||||||
visFilter *visibility.Filter,
|
visFilter *visibility.Filter,
|
||||||
intFilter *interaction.Filter,
|
intFilter *interaction.Filter,
|
||||||
spamFilter *spam.Filter,
|
spamFilter *spam.Filter,
|
||||||
) DB {
|
) *DB {
|
||||||
fdb := federatingDB{
|
fdb := DB{
|
||||||
state: state,
|
state: state,
|
||||||
converter: converter,
|
converter: converter,
|
||||||
visFilter: visFilter,
|
visFilter: visFilter,
|
||||||
@ -93,6 +94,6 @@ func New(
|
|||||||
|
|
||||||
// storeActivityID stores an entry in the .activityIDs cache for this
|
// storeActivityID stores an entry in the .activityIDs cache for this
|
||||||
// type's JSON-LD ID, for later checks in Exist() to mark it as seen.
|
// type's JSON-LD ID, for later checks in Exist() to mark it as seen.
|
||||||
func (f *federatingDB) storeActivityID(asType vocab.Type) {
|
func (f *DB) storeActivityID(asType vocab.Type) {
|
||||||
f.activityIDs.Set(ap.GetJSONLDId(asType).String(), struct{}{})
|
f.activityIDs.Set(ap.GetJSONLDId(asType).String(), struct{}{})
|
||||||
}
|
}
|
||||||
|
@ -36,7 +36,7 @@ import (
|
|||||||
// Protocol instead call Update to create a Tombstone.
|
// Protocol instead call Update to create a Tombstone.
|
||||||
//
|
//
|
||||||
// The library makes this call only after acquiring a lock first.
|
// The library makes this call only after acquiring a lock first.
|
||||||
func (f *federatingDB) Delete(ctx context.Context, id *url.URL) error {
|
func (f *DB) Delete(ctx context.Context, id *url.URL) error {
|
||||||
log.DebugKV(ctx, "id", id)
|
log.DebugKV(ctx, "id", id)
|
||||||
|
|
||||||
activityContext := getActivityContext(ctx)
|
activityContext := getActivityContext(ctx)
|
||||||
@ -87,7 +87,7 @@ func (f *federatingDB) Delete(ctx context.Context, id *url.URL) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *federatingDB) deleteAccount(
|
func (f *DB) deleteAccount(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
requesting *gtsmodel.Account,
|
requesting *gtsmodel.Account,
|
||||||
receiving *gtsmodel.Account,
|
receiving *gtsmodel.Account,
|
||||||
@ -126,7 +126,7 @@ func (f *federatingDB) deleteAccount(
|
|||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *federatingDB) deleteStatus(
|
func (f *DB) deleteStatus(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
requesting *gtsmodel.Account,
|
requesting *gtsmodel.Account,
|
||||||
receiving *gtsmodel.Account,
|
receiving *gtsmodel.Account,
|
||||||
|
@ -24,6 +24,6 @@ import (
|
|||||||
|
|
||||||
// Exists is an implementation of pub.Database{}.Exists(), optimized specifically for
|
// Exists is an implementation of pub.Database{}.Exists(), optimized specifically for
|
||||||
// the only usecase in which go-fed/activity/pub actually calls it. Do not use otherwise!
|
// the only usecase in which go-fed/activity/pub actually calls it. Do not use otherwise!
|
||||||
func (f *federatingDB) Exists(ctx context.Context, id *url.URL) (exists bool, err error) {
|
func (f *DB) Exists(ctx context.Context, id *url.URL) (exists bool, err error) {
|
||||||
return f.activityIDs.Has(id.String()), nil
|
return f.activityIDs.Has(id.String()), nil
|
||||||
}
|
}
|
||||||
|
@ -37,7 +37,7 @@ type FederatingDBTestSuite struct {
|
|||||||
suite.Suite
|
suite.Suite
|
||||||
db db.DB
|
db db.DB
|
||||||
tc *typeutils.Converter
|
tc *typeutils.Converter
|
||||||
federatingDB federatingdb.DB
|
federatingDB *federatingdb.DB
|
||||||
state state.State
|
state state.State
|
||||||
|
|
||||||
testTokens map[string]*gtsmodel.Token
|
testTokens map[string]*gtsmodel.Token
|
||||||
|
@ -30,7 +30,7 @@ import (
|
|||||||
"github.com/miekg/dns"
|
"github.com/miekg/dns"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (f *federatingDB) Flag(ctx context.Context, flaggable vocab.ActivityStreamsFlag) error {
|
func (f *DB) Flag(ctx context.Context, flaggable vocab.ActivityStreamsFlag) error {
|
||||||
log.DebugKV(ctx, "flag", serialize{flaggable})
|
log.DebugKV(ctx, "flag", serialize{flaggable})
|
||||||
|
|
||||||
// Mark activity as handled.
|
// Mark activity as handled.
|
||||||
|
@ -29,7 +29,7 @@ import (
|
|||||||
"code.superseriousbusiness.org/gotosocial/internal/messages"
|
"code.superseriousbusiness.org/gotosocial/internal/messages"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (f *federatingDB) Follow(ctx context.Context, followable vocab.ActivityStreamsFollow) error {
|
func (f *DB) Follow(ctx context.Context, followable vocab.ActivityStreamsFollow) error {
|
||||||
log.DebugKV(ctx, "follow", serialize{followable})
|
log.DebugKV(ctx, "follow", serialize{followable})
|
||||||
|
|
||||||
// Mark activity as handled.
|
// Mark activity as handled.
|
||||||
|
@ -32,7 +32,7 @@ import (
|
|||||||
// If modified, the library will then call Update.
|
// If modified, the library will then call Update.
|
||||||
//
|
//
|
||||||
// The library makes this call only after acquiring a lock first.
|
// The library makes this call only after acquiring a lock first.
|
||||||
func (f *federatingDB) Followers(ctx context.Context, actorIRI *url.URL) (followers vocab.ActivityStreamsCollection, err error) {
|
func (f *DB) Followers(ctx context.Context, actorIRI *url.URL) (followers vocab.ActivityStreamsCollection, err error) {
|
||||||
acct, err := f.state.DB.GetAccountByURI(ctx, actorIRI.String())
|
acct, err := f.state.DB.GetAccountByURI(ctx, actorIRI.String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -31,7 +31,7 @@ import (
|
|||||||
// If modified, the library will then call Update.
|
// If modified, the library will then call Update.
|
||||||
//
|
//
|
||||||
// The library makes this call only after acquiring a lock first.
|
// The library makes this call only after acquiring a lock first.
|
||||||
func (f *federatingDB) Following(ctx context.Context, actorIRI *url.URL) (following vocab.ActivityStreamsCollection, err error) {
|
func (f *DB) Following(ctx context.Context, actorIRI *url.URL) (following vocab.ActivityStreamsCollection, err error) {
|
||||||
acct, err := f.state.DB.GetAccountByURI(ctx, actorIRI.String())
|
acct, err := f.state.DB.GetAccountByURI(ctx, actorIRI.String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -19,16 +19,19 @@ package federatingdb
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
|
||||||
"code.superseriousbusiness.org/activity/streams/vocab"
|
"code.superseriousbusiness.org/activity/streams/vocab"
|
||||||
"code.superseriousbusiness.org/gotosocial/internal/config"
|
"code.superseriousbusiness.org/gotosocial/internal/config"
|
||||||
"code.superseriousbusiness.org/gotosocial/internal/db"
|
"code.superseriousbusiness.org/gotosocial/internal/gtscontext"
|
||||||
"code.superseriousbusiness.org/gotosocial/internal/gtserror"
|
"code.superseriousbusiness.org/gotosocial/internal/gtserror"
|
||||||
"code.superseriousbusiness.org/gotosocial/internal/log"
|
"code.superseriousbusiness.org/gotosocial/internal/log"
|
||||||
"code.superseriousbusiness.org/gotosocial/internal/uris"
|
"code.superseriousbusiness.org/gotosocial/internal/uris"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var ErrNotImplemented = errors.New("not implemented")
|
||||||
|
|
||||||
// Get returns the database entry for the specified id.
|
// Get returns the database entry for the specified id.
|
||||||
//
|
//
|
||||||
// The library makes this call only after acquiring a lock first.
|
// The library makes this call only after acquiring a lock first.
|
||||||
@ -48,30 +51,38 @@ import (
|
|||||||
//
|
//
|
||||||
// It may be useful in future to add more matching here so that more
|
// It may be useful in future to add more matching here so that more
|
||||||
// stuff can be shortcutted by the dereferencer, saving HTTP calls.
|
// stuff can be shortcutted by the dereferencer, saving HTTP calls.
|
||||||
func (f *federatingDB) Get(ctx context.Context, id *url.URL) (value vocab.Type, err error) {
|
func (f *DB) Get(ctx context.Context, id *url.URL) (value vocab.Type, err error) {
|
||||||
log.DebugKV(ctx, "id", id)
|
log.DebugKV(ctx, "id", id)
|
||||||
|
|
||||||
// Ensure our host, for safety.
|
// Ensure our host, for safety.
|
||||||
if id.Host != config.GetHost() {
|
if id.Host != config.GetHost() {
|
||||||
return nil, gtserror.Newf("%s was not for our host", id.String())
|
return nil, gtserror.Newf("%s was not for our host", id)
|
||||||
}
|
}
|
||||||
|
|
||||||
if username, err := uris.ParseUserPath(id); err == nil && username != "" {
|
if username, _ := uris.ParseUserPath(id); username != "" {
|
||||||
acct, err := f.state.DB.GetAccountByUsernameDomain(ctx, username, "")
|
acct, err := f.state.DB.GetAccountByUsernameDomain(
|
||||||
|
gtscontext.SetBarebones(ctx),
|
||||||
|
username,
|
||||||
|
"",
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return f.converter.AccountToAS(ctx, acct)
|
return f.converter.AccountToAS(ctx, acct)
|
||||||
|
|
||||||
} else if _, statusID, err := uris.ParseStatusesPath(id); err == nil && statusID != "" {
|
} else if _, statusID, _ := uris.ParseStatusesPath(id); statusID != "" {
|
||||||
status, err := f.state.DB.GetStatusByID(ctx, statusID)
|
status, err := f.state.DB.GetStatusByID(ctx, statusID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return f.converter.StatusToAS(ctx, status)
|
return f.converter.StatusToAS(ctx, status)
|
||||||
|
|
||||||
} else if username, err := uris.ParseFollowersPath(id); err == nil && username != "" {
|
} else if username, _ := uris.ParseFollowersPath(id); username != "" {
|
||||||
acct, err := f.state.DB.GetAccountByUsernameDomain(ctx, username, "")
|
acct, err := f.state.DB.GetAccountByUsernameDomain(
|
||||||
|
gtscontext.SetBarebones(ctx),
|
||||||
|
username,
|
||||||
|
"",
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -83,8 +94,12 @@ func (f *federatingDB) Get(ctx context.Context, id *url.URL) (value vocab.Type,
|
|||||||
|
|
||||||
return f.Followers(ctx, acctURI)
|
return f.Followers(ctx, acctURI)
|
||||||
|
|
||||||
} else if username, err := uris.ParseFollowingPath(id); err == nil && username != "" {
|
} else if username, _ := uris.ParseFollowingPath(id); username != "" {
|
||||||
acct, err := f.state.DB.GetAccountByUsernameDomain(ctx, username, "")
|
acct, err := f.state.DB.GetAccountByUsernameDomain(
|
||||||
|
gtscontext.SetBarebones(ctx),
|
||||||
|
username,
|
||||||
|
"",
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -102,8 +117,5 @@ func (f *federatingDB) Get(ctx context.Context, id *url.URL) (value vocab.Type,
|
|||||||
|
|
||||||
// Nothing found, the caller
|
// Nothing found, the caller
|
||||||
// will have to deal with this.
|
// will have to deal with this.
|
||||||
return nil, gtserror.Newf(
|
return nil, ErrNotImplemented
|
||||||
"not implemented for %s: %w",
|
|
||||||
id.String(), db.ErrNoEntries,
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
@ -35,7 +35,7 @@ import (
|
|||||||
// The library makes this call only after acquiring a lock first.
|
// The library makes this call only after acquiring a lock first.
|
||||||
//
|
//
|
||||||
// Implementation note: we have our own logic for inboxes so always return false here.
|
// Implementation note: we have our own logic for inboxes so always return false here.
|
||||||
func (f *federatingDB) InboxContains(c context.Context, inbox, id *url.URL) (contains bool, err error) {
|
func (f *DB) InboxContains(c context.Context, inbox, id *url.URL) (contains bool, err error) {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -45,7 +45,7 @@ func (f *federatingDB) InboxContains(c context.Context, inbox, id *url.URL) (con
|
|||||||
// The library makes this call only after acquiring a lock first.
|
// The library makes this call only after acquiring a lock first.
|
||||||
//
|
//
|
||||||
// Implementation note: we don't (yet) serve inboxes, so just return empty and nil here.
|
// Implementation note: we don't (yet) serve inboxes, so just return empty and nil here.
|
||||||
func (f *federatingDB) GetInbox(c context.Context, inboxIRI *url.URL) (inbox vocab.ActivityStreamsOrderedCollectionPage, err error) {
|
func (f *DB) GetInbox(c context.Context, inboxIRI *url.URL) (inbox vocab.ActivityStreamsOrderedCollectionPage, err error) {
|
||||||
return streams.NewActivityStreamsOrderedCollectionPage(), nil
|
return streams.NewActivityStreamsOrderedCollectionPage(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -56,7 +56,7 @@ func (f *federatingDB) GetInbox(c context.Context, inboxIRI *url.URL) (inbox voc
|
|||||||
// The library makes this call only after acquiring a lock first.
|
// The library makes this call only after acquiring a lock first.
|
||||||
//
|
//
|
||||||
// Implementation note: we don't allow inbox setting so just return nil here.
|
// Implementation note: we don't allow inbox setting so just return nil here.
|
||||||
func (f *federatingDB) SetInbox(c context.Context, inbox vocab.ActivityStreamsOrderedCollectionPage) error {
|
func (f *DB) SetInbox(c context.Context, inbox vocab.ActivityStreamsOrderedCollectionPage) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -76,7 +76,7 @@ func (f *federatingDB) SetInbox(c context.Context, inbox vocab.ActivityStreamsOr
|
|||||||
// then each follower inbox IRI should be returned in the inboxIRIs slice.
|
// then each follower inbox IRI should be returned in the inboxIRIs slice.
|
||||||
//
|
//
|
||||||
// The library makes this call only after acquiring a lock first.
|
// The library makes this call only after acquiring a lock first.
|
||||||
func (f *federatingDB) InboxesForIRI(c context.Context, iri *url.URL) (inboxIRIs []*url.URL, err error) {
|
func (f *DB) InboxesForIRI(c context.Context, iri *url.URL) (inboxIRIs []*url.URL, err error) {
|
||||||
// check if this is a followers collection iri for a local account...
|
// check if this is a followers collection iri for a local account...
|
||||||
if iri.Host == config.GetHost() && uris.IsFollowersPath(iri) {
|
if iri.Host == config.GetHost() && uris.IsFollowersPath(iri) {
|
||||||
localAccountUsername, err := uris.ParseFollowersPath(iri)
|
localAccountUsername, err := uris.ParseFollowersPath(iri)
|
||||||
|
@ -31,7 +31,7 @@ import (
|
|||||||
"code.superseriousbusiness.org/gotosocial/internal/messages"
|
"code.superseriousbusiness.org/gotosocial/internal/messages"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (f *federatingDB) Like(ctx context.Context, likeable vocab.ActivityStreamsLike) error {
|
func (f *DB) Like(ctx context.Context, likeable vocab.ActivityStreamsLike) error {
|
||||||
log.DebugKV(ctx, "like", serialize{likeable})
|
log.DebugKV(ctx, "like", serialize{likeable})
|
||||||
|
|
||||||
// Mark activity as handled.
|
// Mark activity as handled.
|
||||||
|
@ -33,6 +33,6 @@ import (
|
|||||||
// The library makes this call only after acquiring a lock first.
|
// The library makes this call only after acquiring a lock first.
|
||||||
//
|
//
|
||||||
// Implementation note: we don't serve a Liked collection *yet* so just return an empty collection for now.
|
// Implementation note: we don't serve a Liked collection *yet* so just return an empty collection for now.
|
||||||
func (f *federatingDB) Liked(c context.Context, actorIRI *url.URL) (liked vocab.ActivityStreamsCollection, err error) {
|
func (f *DB) Liked(c context.Context, actorIRI *url.URL) (liked vocab.ActivityStreamsCollection, err error) {
|
||||||
return streams.NewActivityStreamsCollection(), nil
|
return streams.NewActivityStreamsCollection(), nil
|
||||||
}
|
}
|
||||||
|
@ -33,6 +33,6 @@ import (
|
|||||||
// processes require tight loops acquiring and releasing locks.
|
// processes require tight loops acquiring and releasing locks.
|
||||||
//
|
//
|
||||||
// Used to ensure race conditions in multiple requests do not occur.
|
// Used to ensure race conditions in multiple requests do not occur.
|
||||||
func (f *federatingDB) Lock(c context.Context, id *url.URL) (func(), error) {
|
func (f *DB) Lock(c context.Context, id *url.URL) (func(), error) {
|
||||||
return f.state.FedLocks.Lock("federatingDB " + id.String()), nil // id should NEVER be nil.
|
return f.state.FedLocks.Lock("fdb:" + id.String()), nil // id should NEVER be nil.
|
||||||
}
|
}
|
||||||
|
@ -35,7 +35,7 @@ import (
|
|||||||
"code.superseriousbusiness.org/gotosocial/internal/messages"
|
"code.superseriousbusiness.org/gotosocial/internal/messages"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (f *federatingDB) Move(ctx context.Context, move vocab.ActivityStreamsMove) error {
|
func (f *DB) Move(ctx context.Context, move vocab.ActivityStreamsMove) error {
|
||||||
log.DebugKV(ctx, "move", serialize{move})
|
log.DebugKV(ctx, "move", serialize{move})
|
||||||
|
|
||||||
// Mark activity as handled.
|
// Mark activity as handled.
|
||||||
|
@ -31,7 +31,7 @@ import (
|
|||||||
// The library makes this call only after acquiring a lock first.
|
// The library makes this call only after acquiring a lock first.
|
||||||
//
|
//
|
||||||
// Implementation note: we don't (yet) serve outboxes, so just return empty and nil here.
|
// Implementation note: we don't (yet) serve outboxes, so just return empty and nil here.
|
||||||
func (f *federatingDB) GetOutbox(ctx context.Context, outboxIRI *url.URL) (inbox vocab.ActivityStreamsOrderedCollectionPage, err error) {
|
func (f *DB) GetOutbox(ctx context.Context, outboxIRI *url.URL) (inbox vocab.ActivityStreamsOrderedCollectionPage, err error) {
|
||||||
return streams.NewActivityStreamsOrderedCollectionPage(), nil
|
return streams.NewActivityStreamsOrderedCollectionPage(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -42,7 +42,7 @@ func (f *federatingDB) GetOutbox(ctx context.Context, outboxIRI *url.URL) (inbox
|
|||||||
// The library makes this call only after acquiring a lock first.
|
// The library makes this call only after acquiring a lock first.
|
||||||
//
|
//
|
||||||
// Implementation note: we don't allow outbox setting so just return nil here.
|
// Implementation note: we don't allow outbox setting so just return nil here.
|
||||||
func (f *federatingDB) SetOutbox(ctx context.Context, outbox vocab.ActivityStreamsOrderedCollectionPage) error {
|
func (f *DB) SetOutbox(ctx context.Context, outbox vocab.ActivityStreamsOrderedCollectionPage) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -50,7 +50,7 @@ func (f *federatingDB) SetOutbox(ctx context.Context, outbox vocab.ActivityStrea
|
|||||||
// actor's inbox IRI.
|
// actor's inbox IRI.
|
||||||
//
|
//
|
||||||
// The library makes this call only after acquiring a lock first.
|
// The library makes this call only after acquiring a lock first.
|
||||||
func (f *federatingDB) OutboxForInbox(ctx context.Context, inboxIRI *url.URL) (outboxIRI *url.URL, err error) {
|
func (f *DB) OutboxForInbox(ctx context.Context, inboxIRI *url.URL) (outboxIRI *url.URL, err error) {
|
||||||
acct, err := f.state.DB.GetOneAccountByInboxURI(ctx, inboxIRI.String())
|
acct, err := f.state.DB.GetOneAccountByInboxURI(ctx, inboxIRI.String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -34,7 +34,7 @@ import (
|
|||||||
// Owns returns true if the IRI belongs to this instance, and if
|
// Owns returns true if the IRI belongs to this instance, and if
|
||||||
// the database has an entry for the IRI.
|
// the database has an entry for the IRI.
|
||||||
// The library makes this call only after acquiring a lock first.
|
// The library makes this call only after acquiring a lock first.
|
||||||
func (f *federatingDB) Owns(ctx context.Context, id *url.URL) (bool, error) {
|
func (f *DB) Owns(ctx context.Context, id *url.URL) (bool, error) {
|
||||||
log.DebugKV(ctx, "id", id)
|
log.DebugKV(ctx, "id", id)
|
||||||
|
|
||||||
// if the id host isn't this instance host, we don't own this IRI
|
// if the id host isn't this instance host, we don't own this IRI
|
||||||
@ -150,7 +150,7 @@ func (f *federatingDB) Owns(ctx context.Context, id *url.URL) (bool, error) {
|
|||||||
return false, fmt.Errorf("could not match activityID: %s", id.String())
|
return false, fmt.Errorf("could not match activityID: %s", id.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *federatingDB) ownsLike(ctx context.Context, uri *url.URL) (bool, error) {
|
func (f *DB) ownsLike(ctx context.Context, uri *url.URL) (bool, error) {
|
||||||
username, id, err := uris.ParseLikedPath(uri)
|
username, id, err := uris.ParseLikedPath(uri)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, fmt.Errorf("error parsing Like path for url %s: %w", uri.String(), err)
|
return false, fmt.Errorf("error parsing Like path for url %s: %w", uri.String(), err)
|
||||||
|
@ -33,7 +33,7 @@ import (
|
|||||||
"code.superseriousbusiness.org/gotosocial/internal/uris"
|
"code.superseriousbusiness.org/gotosocial/internal/uris"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (f *federatingDB) Reject(ctx context.Context, reject vocab.ActivityStreamsReject) error {
|
func (f *DB) Reject(ctx context.Context, reject vocab.ActivityStreamsReject) error {
|
||||||
log.DebugKV(ctx, "reject", serialize{reject})
|
log.DebugKV(ctx, "reject", serialize{reject})
|
||||||
|
|
||||||
activityContext := getActivityContext(ctx)
|
activityContext := getActivityContext(ctx)
|
||||||
@ -122,7 +122,7 @@ func (f *federatingDB) Reject(ctx context.Context, reject vocab.ActivityStreamsR
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *federatingDB) rejectFollowType(
|
func (f *DB) rejectFollowType(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
asType vocab.Type,
|
asType vocab.Type,
|
||||||
receivingAcct *gtsmodel.Account,
|
receivingAcct *gtsmodel.Account,
|
||||||
@ -138,11 +138,6 @@ func (f *federatingDB) rejectFollowType(
|
|||||||
return gtserror.NewErrorInternalError(err)
|
return gtserror.NewErrorInternalError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Lock on the Follow URI
|
|
||||||
// as we may be updating it.
|
|
||||||
unlock := f.state.FedLocks.Lock(follow.URI)
|
|
||||||
defer unlock()
|
|
||||||
|
|
||||||
// Make sure the creator of the original follow
|
// Make sure the creator of the original follow
|
||||||
// is the same as whatever inbox this landed in.
|
// is the same as whatever inbox this landed in.
|
||||||
if follow.AccountID != receivingAcct.ID {
|
if follow.AccountID != receivingAcct.ID {
|
||||||
@ -158,8 +153,7 @@ func (f *federatingDB) rejectFollowType(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Reject the follow.
|
// Reject the follow.
|
||||||
err = f.state.DB.RejectFollowRequest(
|
err = f.state.DB.RejectFollowRequest(ctx,
|
||||||
ctx,
|
|
||||||
follow.AccountID,
|
follow.AccountID,
|
||||||
follow.TargetAccountID,
|
follow.TargetAccountID,
|
||||||
)
|
)
|
||||||
@ -171,17 +165,12 @@ func (f *federatingDB) rejectFollowType(
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *federatingDB) rejectFollowIRI(
|
func (f *DB) rejectFollowIRI(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
objectIRI string,
|
objectIRI string,
|
||||||
receivingAcct *gtsmodel.Account,
|
receivingAcct *gtsmodel.Account,
|
||||||
requestingAcct *gtsmodel.Account,
|
requestingAcct *gtsmodel.Account,
|
||||||
) error {
|
) error {
|
||||||
// Lock on this potential Follow
|
|
||||||
// URI as we may be updating it.
|
|
||||||
unlock := f.state.FedLocks.Lock(objectIRI)
|
|
||||||
defer unlock()
|
|
||||||
|
|
||||||
// Get the follow req from the db.
|
// Get the follow req from the db.
|
||||||
followReq, err := f.state.DB.GetFollowRequestByURI(ctx, objectIRI)
|
followReq, err := f.state.DB.GetFollowRequestByURI(ctx, objectIRI)
|
||||||
if err != nil && !errors.Is(err, db.ErrNoEntries) {
|
if err != nil && !errors.Is(err, db.ErrNoEntries) {
|
||||||
@ -214,8 +203,7 @@ func (f *federatingDB) rejectFollowIRI(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Reject the follow.
|
// Reject the follow.
|
||||||
err = f.state.DB.RejectFollowRequest(
|
err = f.state.DB.RejectFollowRequest(ctx,
|
||||||
ctx,
|
|
||||||
followReq.AccountID,
|
followReq.AccountID,
|
||||||
followReq.TargetAccountID,
|
followReq.TargetAccountID,
|
||||||
)
|
)
|
||||||
@ -227,7 +215,7 @@ func (f *federatingDB) rejectFollowIRI(
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *federatingDB) rejectStatusIRI(
|
func (f *DB) rejectStatusIRI(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
activityID string,
|
activityID string,
|
||||||
objectIRI string,
|
objectIRI string,
|
||||||
@ -379,7 +367,7 @@ func (f *federatingDB) rejectStatusIRI(
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *federatingDB) rejectLikeIRI(
|
func (f *DB) rejectLikeIRI(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
activityID string,
|
activityID string,
|
||||||
objectIRI string,
|
objectIRI string,
|
||||||
|
@ -32,7 +32,7 @@ import (
|
|||||||
"code.superseriousbusiness.org/gotosocial/internal/messages"
|
"code.superseriousbusiness.org/gotosocial/internal/messages"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (f *federatingDB) Undo(ctx context.Context, undo vocab.ActivityStreamsUndo) error {
|
func (f *DB) Undo(ctx context.Context, undo vocab.ActivityStreamsUndo) error {
|
||||||
log.DebugKV(ctx, "undo", serialize{undo})
|
log.DebugKV(ctx, "undo", serialize{undo})
|
||||||
|
|
||||||
activityContext := getActivityContext(ctx)
|
activityContext := getActivityContext(ctx)
|
||||||
@ -111,7 +111,7 @@ func (f *federatingDB) Undo(ctx context.Context, undo vocab.ActivityStreamsUndo)
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *federatingDB) undoFollow(
|
func (f *DB) undoFollow(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
receivingAcct *gtsmodel.Account,
|
receivingAcct *gtsmodel.Account,
|
||||||
requestingAcct *gtsmodel.Account,
|
requestingAcct *gtsmodel.Account,
|
||||||
@ -149,11 +149,6 @@ func (f *federatingDB) undoFollow(
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Lock on the Follow URI
|
|
||||||
// as we may be updating it.
|
|
||||||
unlock := f.state.FedLocks.Lock(follow.URI)
|
|
||||||
defer unlock()
|
|
||||||
|
|
||||||
// Ensure addressee is follow target.
|
// Ensure addressee is follow target.
|
||||||
if follow.TargetAccountID != receivingAcct.ID {
|
if follow.TargetAccountID != receivingAcct.ID {
|
||||||
const text = "receivingAcct was not Follow target"
|
const text = "receivingAcct was not Follow target"
|
||||||
@ -193,7 +188,7 @@ func (f *federatingDB) undoFollow(
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *federatingDB) undoLike(
|
func (f *DB) undoLike(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
receivingAcct *gtsmodel.Account,
|
receivingAcct *gtsmodel.Account,
|
||||||
requestingAcct *gtsmodel.Account,
|
requestingAcct *gtsmodel.Account,
|
||||||
@ -293,7 +288,7 @@ func (f *federatingDB) undoLike(
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *federatingDB) undoBlock(
|
func (f *DB) undoBlock(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
receivingAcct *gtsmodel.Account,
|
receivingAcct *gtsmodel.Account,
|
||||||
requestingAcct *gtsmodel.Account,
|
requestingAcct *gtsmodel.Account,
|
||||||
@ -363,7 +358,7 @@ func (f *federatingDB) undoBlock(
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *federatingDB) undoAnnounce(
|
func (f *DB) undoAnnounce(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
receivingAcct *gtsmodel.Account,
|
receivingAcct *gtsmodel.Account,
|
||||||
requestingAcct *gtsmodel.Account,
|
requestingAcct *gtsmodel.Account,
|
||||||
|
@ -40,7 +40,7 @@ import (
|
|||||||
// the entire value.
|
// the entire value.
|
||||||
//
|
//
|
||||||
// The library makes this call only after acquiring a lock first.
|
// The library makes this call only after acquiring a lock first.
|
||||||
func (f *federatingDB) Update(ctx context.Context, asType vocab.Type) error {
|
func (f *DB) Update(ctx context.Context, asType vocab.Type) error {
|
||||||
log.DebugKV(ctx, "update", serialize{asType})
|
log.DebugKV(ctx, "update", serialize{asType})
|
||||||
|
|
||||||
// Mark activity as handled.
|
// Mark activity as handled.
|
||||||
@ -67,7 +67,7 @@ func (f *federatingDB) Update(ctx context.Context, asType vocab.Type) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *federatingDB) updateAccountable(ctx context.Context, receivingAcct *gtsmodel.Account, requestingAcct *gtsmodel.Account, accountable ap.Accountable) error {
|
func (f *DB) updateAccountable(ctx context.Context, receivingAcct *gtsmodel.Account, requestingAcct *gtsmodel.Account, accountable ap.Accountable) error {
|
||||||
// Extract AP URI of the updated Accountable model.
|
// Extract AP URI of the updated Accountable model.
|
||||||
idProp := accountable.GetJSONLDId()
|
idProp := accountable.GetJSONLDId()
|
||||||
if idProp == nil || !idProp.IsIRI() {
|
if idProp == nil || !idProp.IsIRI() {
|
||||||
@ -105,7 +105,7 @@ func (f *federatingDB) updateAccountable(ctx context.Context, receivingAcct *gts
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *federatingDB) updateStatusable(ctx context.Context, receivingAcct *gtsmodel.Account, requestingAcct *gtsmodel.Account, statusable ap.Statusable) error {
|
func (f *DB) updateStatusable(ctx context.Context, receivingAcct *gtsmodel.Account, requestingAcct *gtsmodel.Account, statusable ap.Statusable) error {
|
||||||
// Extract AP URI of the updated model.
|
// Extract AP URI of the updated model.
|
||||||
idProp := statusable.GetJSONLDId()
|
idProp := statusable.GetJSONLDId()
|
||||||
if idProp == nil || !idProp.IsIRI() {
|
if idProp == nil || !idProp.IsIRI() {
|
||||||
|
@ -20,7 +20,6 @@ package federatingdb
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
|
||||||
"net/url"
|
"net/url"
|
||||||
|
|
||||||
"code.superseriousbusiness.org/activity/streams"
|
"code.superseriousbusiness.org/activity/streams"
|
||||||
@ -65,16 +64,20 @@ func sameActor(actor1 vocab.ActivityStreamsActorProperty, actor2 vocab.ActivityS
|
|||||||
}
|
}
|
||||||
|
|
||||||
for a1Iter := actor1.Begin(); a1Iter != actor1.End(); a1Iter = a1Iter.Next() {
|
for a1Iter := actor1.Begin(); a1Iter != actor1.End(); a1Iter = a1Iter.Next() {
|
||||||
|
a1IRI := a1Iter.GetIRI()
|
||||||
|
if a1IRI == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
a1IRIStr := a1IRI.String()
|
||||||
for a2Iter := actor2.Begin(); a2Iter != actor2.End(); a2Iter = a2Iter.Next() {
|
for a2Iter := actor2.Begin(); a2Iter != actor2.End(); a2Iter = a2Iter.Next() {
|
||||||
if a1Iter.GetIRI() == nil {
|
a2IRI := a2Iter.GetIRI()
|
||||||
|
if a2IRI == nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
if a2Iter.GetIRI() == nil {
|
a2IRIStr := a2IRI.String()
|
||||||
return false
|
if a1IRIStr == a2IRIStr {
|
||||||
}
|
|
||||||
|
|
||||||
if a1Iter.GetIRI().String() == a2Iter.GetIRI().String() {
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -89,7 +92,7 @@ func sameActor(actor1 vocab.ActivityStreamsActorProperty, actor2 vocab.ActivityS
|
|||||||
//
|
//
|
||||||
// The go-fed library will handle setting the 'id' property on the
|
// The go-fed library will handle setting the 'id' property on the
|
||||||
// activity or object provided with the value returned.
|
// activity or object provided with the value returned.
|
||||||
func (f *federatingDB) NewID(ctx context.Context, t vocab.Type) (idURL *url.URL, err error) {
|
func (f *DB) NewID(ctx context.Context, t vocab.Type) (idURL *url.URL, err error) {
|
||||||
log.DebugKV(ctx, "newID", serialize{t})
|
log.DebugKV(ctx, "newID", serialize{t})
|
||||||
|
|
||||||
// Most of our types set an ID already
|
// Most of our types set an ID already
|
||||||
@ -116,19 +119,18 @@ func (f *federatingDB) NewID(ctx context.Context, t vocab.Type) (idURL *url.URL,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Default fallback behaviour:
|
// Default fallback behaviour:
|
||||||
// {proto}://{host}/{randomID}
|
// {proto}://{host}/{newULID}
|
||||||
newID, err := id.NewRandomULID()
|
return &url.URL{
|
||||||
if err != nil {
|
Scheme: config.GetProtocol(),
|
||||||
return nil, err
|
Host: config.GetHost(),
|
||||||
}
|
Path: "/" + id.NewULID(),
|
||||||
|
}, nil
|
||||||
return url.Parse(fmt.Sprintf("%s://%s/%s", config.GetProtocol(), config.GetHost(), newID))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ActorForOutbox fetches the local actor's IRI for the given outbox IRI.
|
// ActorForOutbox fetches the local actor's IRI for the given outbox IRI.
|
||||||
//
|
//
|
||||||
// The library makes this call only after acquiring a lock first.
|
// The library makes this call only after acquiring a lock first.
|
||||||
func (f *federatingDB) ActorForOutbox(ctx context.Context, outboxIRI *url.URL) (actorIRI *url.URL, err error) {
|
func (f *DB) ActorForOutbox(ctx context.Context, outboxIRI *url.URL) (actorIRI *url.URL, err error) {
|
||||||
acct, err := f.state.DB.GetOneAccountByOutboxURI(ctx, outboxIRI.String())
|
acct, err := f.state.DB.GetOneAccountByOutboxURI(ctx, outboxIRI.String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -139,7 +141,7 @@ func (f *federatingDB) ActorForOutbox(ctx context.Context, outboxIRI *url.URL) (
|
|||||||
// ActorForInbox fetches the local actor's IRI for the given inbox IRI.
|
// ActorForInbox fetches the local actor's IRI for the given inbox IRI.
|
||||||
//
|
//
|
||||||
// The library makes this call only after acquiring a lock first.
|
// The library makes this call only after acquiring a lock first.
|
||||||
func (f *federatingDB) ActorForInbox(ctx context.Context, inboxIRI *url.URL) (actorIRI *url.URL, err error) {
|
func (f *DB) ActorForInbox(ctx context.Context, inboxIRI *url.URL) (actorIRI *url.URL, err error) {
|
||||||
acct, err := f.state.DB.GetOneAccountByInboxURI(ctx, inboxIRI.String())
|
acct, err := f.state.DB.GetOneAccountByInboxURI(ctx, inboxIRI.String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -148,7 +150,7 @@ func (f *federatingDB) ActorForInbox(ctx context.Context, inboxIRI *url.URL) (ac
|
|||||||
}
|
}
|
||||||
|
|
||||||
// collectFollows takes a slice of iris and converts them into ActivityStreamsCollection of IRIs.
|
// collectFollows takes a slice of iris and converts them into ActivityStreamsCollection of IRIs.
|
||||||
func (f *federatingDB) collectIRIs(_ context.Context, iris []*url.URL) (vocab.ActivityStreamsCollection, error) {
|
func (f *DB) collectIRIs(_ context.Context, iris []*url.URL) (vocab.ActivityStreamsCollection, error) {
|
||||||
collection := streams.NewActivityStreamsCollection()
|
collection := streams.NewActivityStreamsCollection()
|
||||||
items := streams.NewActivityStreamsItemsProperty()
|
items := streams.NewActivityStreamsItemsProperty()
|
||||||
for _, i := range iris {
|
for _, i := range iris {
|
||||||
|
@ -37,7 +37,7 @@ var _ interface {
|
|||||||
|
|
||||||
type Federator struct {
|
type Federator struct {
|
||||||
db db.DB
|
db db.DB
|
||||||
federatingDB federatingdb.DB
|
federatingDB *federatingdb.DB
|
||||||
clock pub.Clock
|
clock pub.Clock
|
||||||
converter *typeutils.Converter
|
converter *typeutils.Converter
|
||||||
transport transport.Controller
|
transport transport.Controller
|
||||||
@ -54,7 +54,7 @@ type Federator struct {
|
|||||||
// NewFederator returns a new federator instance.
|
// NewFederator returns a new federator instance.
|
||||||
func NewFederator(
|
func NewFederator(
|
||||||
state *state.State,
|
state *state.State,
|
||||||
federatingDB federatingdb.DB,
|
federatingDB *federatingdb.DB,
|
||||||
transportController transport.Controller,
|
transportController transport.Controller,
|
||||||
converter *typeutils.Converter,
|
converter *typeutils.Converter,
|
||||||
visFilter *visibility.Filter,
|
visFilter *visibility.Filter,
|
||||||
@ -112,7 +112,7 @@ func (f *Federator) FederatingActor() pub.FederatingActor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// FederatingDB returns the underlying FederatingDB interface.
|
// FederatingDB returns the underlying FederatingDB interface.
|
||||||
func (f *Federator) FederatingDB() federatingdb.DB {
|
func (f *Federator) FederatingDB() *federatingdb.DB {
|
||||||
return f.federatingDB
|
return f.federatingDB
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -269,9 +269,8 @@ func (p *fediAPI) MoveAccount(ctx context.Context, fMsg *messages.FromFediAPI) e
|
|||||||
// try to send the same Move several times with
|
// try to send the same Move several times with
|
||||||
// different IDs (you never know), but we only
|
// different IDs (you never know), but we only
|
||||||
// want to process them based on origin + target.
|
// want to process them based on origin + target.
|
||||||
unlock := p.state.FedLocks.Lock(
|
key := "move:" + originAcctURIStr + ":" + targetAcctURIStr
|
||||||
"move:" + originAcctURIStr + ":" + targetAcctURIStr,
|
unlock := p.state.FedLocks.Lock(key)
|
||||||
)
|
|
||||||
defer unlock()
|
defer unlock()
|
||||||
|
|
||||||
// Check if Move is rate limited based
|
// Check if Move is rate limited based
|
||||||
|
@ -23,6 +23,7 @@ import (
|
|||||||
"crypto/rsa"
|
"crypto/rsa"
|
||||||
"crypto/x509"
|
"crypto/x509"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
@ -30,11 +31,12 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
"code.superseriousbusiness.org/activity/pub"
|
"code.superseriousbusiness.org/activity/pub"
|
||||||
"code.superseriousbusiness.org/activity/streams/vocab"
|
|
||||||
"code.superseriousbusiness.org/gotosocial/internal/ap"
|
"code.superseriousbusiness.org/gotosocial/internal/ap"
|
||||||
apiutil "code.superseriousbusiness.org/gotosocial/internal/api/util"
|
apiutil "code.superseriousbusiness.org/gotosocial/internal/api/util"
|
||||||
"code.superseriousbusiness.org/gotosocial/internal/config"
|
"code.superseriousbusiness.org/gotosocial/internal/config"
|
||||||
|
"code.superseriousbusiness.org/gotosocial/internal/db"
|
||||||
"code.superseriousbusiness.org/gotosocial/internal/federation/federatingdb"
|
"code.superseriousbusiness.org/gotosocial/internal/federation/federatingdb"
|
||||||
|
"code.superseriousbusiness.org/gotosocial/internal/gtserror"
|
||||||
"code.superseriousbusiness.org/gotosocial/internal/state"
|
"code.superseriousbusiness.org/gotosocial/internal/state"
|
||||||
"code.superseriousbusiness.org/gotosocial/internal/util"
|
"code.superseriousbusiness.org/gotosocial/internal/util"
|
||||||
"codeberg.org/gruf/go-byteutil"
|
"codeberg.org/gruf/go-byteutil"
|
||||||
@ -52,15 +54,14 @@ type Controller interface {
|
|||||||
|
|
||||||
type controller struct {
|
type controller struct {
|
||||||
state *state.State
|
state *state.State
|
||||||
fedDB federatingdb.DB
|
fedDB *federatingdb.DB
|
||||||
clock pub.Clock
|
|
||||||
client pub.HttpClient
|
client pub.HttpClient
|
||||||
trspCache cache.TTLCache[string, *transport]
|
trspCache cache.TTLCache[string, *transport]
|
||||||
userAgent string
|
userAgent string
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewController returns an implementation of the Controller interface for creating new transports
|
// NewController returns an implementation of the Controller interface for creating new transports
|
||||||
func NewController(state *state.State, federatingDB federatingdb.DB, clock pub.Clock, client pub.HttpClient) Controller {
|
func NewController(state *state.State, federatingDB *federatingdb.DB, client pub.HttpClient) Controller {
|
||||||
var (
|
var (
|
||||||
host = config.GetHost()
|
host = config.GetHost()
|
||||||
proto = config.GetProtocol()
|
proto = config.GetProtocol()
|
||||||
@ -70,7 +71,6 @@ func NewController(state *state.State, federatingDB federatingdb.DB, clock pub.C
|
|||||||
c := &controller{
|
c := &controller{
|
||||||
state: state,
|
state: state,
|
||||||
fedDB: federatingDB,
|
fedDB: federatingDB,
|
||||||
clock: clock,
|
|
||||||
client: client,
|
client: client,
|
||||||
trspCache: cache.NewTTL[string, *transport](0, 100, 0),
|
trspCache: cache.NewTTL[string, *transport](0, 100, 0),
|
||||||
userAgent: fmt.Sprintf("gotosocial/%s (+%s://%s)", version, proto, host),
|
userAgent: fmt.Sprintf("gotosocial/%s (+%s://%s)", version, proto, host),
|
||||||
@ -153,37 +153,51 @@ func (c *controller) dereferenceLocal(
|
|||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
uri *url.URL,
|
uri *url.URL,
|
||||||
) (*http.Response, error) {
|
) (*http.Response, error) {
|
||||||
var (
|
|
||||||
t vocab.Type
|
|
||||||
err error
|
|
||||||
)
|
|
||||||
|
|
||||||
t, err = c.fedDB.Get(ctx, uri)
|
// Try fetch via federating DB.
|
||||||
if err != nil {
|
t, err := c.fedDB.Get(ctx, uri)
|
||||||
// Don't check especially for
|
|
||||||
// db.ErrNoEntries, as we *want*
|
switch {
|
||||||
// to pass this back to the caller
|
// No problem.
|
||||||
// if we didn't get anything.
|
case err == nil:
|
||||||
return nil, err
|
|
||||||
|
// Catch and handle objects not found.
|
||||||
|
case errors.Is(err, db.ErrNoEntries):
|
||||||
|
return &http.Response{
|
||||||
|
Request: &http.Request{URL: uri},
|
||||||
|
Status: http.StatusText(http.StatusNotFound),
|
||||||
|
StatusCode: http.StatusNotFound,
|
||||||
|
Header: map[string][]string{
|
||||||
|
"Content-Type": {apiutil.AppActivityLDJSON},
|
||||||
|
"Content-Length": {"0"},
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
|
|
||||||
|
// Any other.
|
||||||
|
default:
|
||||||
|
return nil, gtserror.Newf("error getting: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if util.IsNil(t) {
|
if util.IsNil(t) {
|
||||||
// This should never happen.
|
// Assert this should never happen.
|
||||||
panic("nil vocab.Type after successful c.fedDB.Get call")
|
panic(gtserror.New("nil vocab.Type"))
|
||||||
}
|
}
|
||||||
|
|
||||||
i, err := ap.Serialize(t)
|
// Serialize type to JSON map.
|
||||||
|
m, err := ap.Serialize(t)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
b, err := json.Marshal(i)
|
// Marshal JSON to bytes.
|
||||||
|
b, err := json.Marshal(m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Return a response
|
||||||
|
// with AS data as body.
|
||||||
contentLength := len(b)
|
contentLength := len(b)
|
||||||
|
|
||||||
// Return a response with AS data as body.
|
|
||||||
rsp := &http.Response{
|
rsp := &http.Response{
|
||||||
Request: &http.Request{URL: uri},
|
Request: &http.Request{URL: uri},
|
||||||
Status: http.StatusText(http.StatusOK),
|
Status: http.StatusText(http.StatusOK),
|
||||||
|
@ -25,7 +25,7 @@ import (
|
|||||||
|
|
||||||
apiutil "code.superseriousbusiness.org/gotosocial/internal/api/util"
|
apiutil "code.superseriousbusiness.org/gotosocial/internal/api/util"
|
||||||
"code.superseriousbusiness.org/gotosocial/internal/config"
|
"code.superseriousbusiness.org/gotosocial/internal/config"
|
||||||
"code.superseriousbusiness.org/gotosocial/internal/db"
|
"code.superseriousbusiness.org/gotosocial/internal/federation/federatingdb"
|
||||||
"code.superseriousbusiness.org/gotosocial/internal/gtserror"
|
"code.superseriousbusiness.org/gotosocial/internal/gtserror"
|
||||||
"code.superseriousbusiness.org/gotosocial/internal/log"
|
"code.superseriousbusiness.org/gotosocial/internal/log"
|
||||||
)
|
)
|
||||||
@ -38,10 +38,8 @@ func (t *transport) Dereference(ctx context.Context, iri *url.URL) (*http.Respon
|
|||||||
// to just make a normal http request to ourself.
|
// to just make a normal http request to ourself.
|
||||||
if iri.Host == config.GetHost() {
|
if iri.Host == config.GetHost() {
|
||||||
rsp, err := t.controller.dereferenceLocal(ctx, iri)
|
rsp, err := t.controller.dereferenceLocal(ctx, iri)
|
||||||
if err != nil && !errors.Is(err, db.ErrNoEntries) {
|
if err != nil && !errors.Is(err, federatingdb.ErrNotImplemented) {
|
||||||
// Real error.
|
return nil, gtserror.Newf("error dereferencing local: %w", err)
|
||||||
err := gtserror.Newf("error trying dereferenceLocal: %w", err)
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if rsp != nil {
|
if rsp != nil {
|
||||||
|
@ -27,7 +27,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// NewTestFederatingDB returns a federating DB with the underlying db
|
// NewTestFederatingDB returns a federating DB with the underlying db
|
||||||
func NewTestFederatingDB(state *state.State) federatingdb.DB {
|
func NewTestFederatingDB(state *state.State) *federatingdb.DB {
|
||||||
return federatingdb.New(
|
return federatingdb.New(
|
||||||
state,
|
state,
|
||||||
typeutils.NewConverter(state),
|
typeutils.NewConverter(state),
|
||||||
|
@ -31,7 +31,6 @@ import (
|
|||||||
"code.superseriousbusiness.org/activity/streams/vocab"
|
"code.superseriousbusiness.org/activity/streams/vocab"
|
||||||
"code.superseriousbusiness.org/gotosocial/internal/ap"
|
"code.superseriousbusiness.org/gotosocial/internal/ap"
|
||||||
apimodel "code.superseriousbusiness.org/gotosocial/internal/api/model"
|
apimodel "code.superseriousbusiness.org/gotosocial/internal/api/model"
|
||||||
"code.superseriousbusiness.org/gotosocial/internal/federation"
|
|
||||||
"code.superseriousbusiness.org/gotosocial/internal/gtsmodel"
|
"code.superseriousbusiness.org/gotosocial/internal/gtsmodel"
|
||||||
"code.superseriousbusiness.org/gotosocial/internal/httpclient"
|
"code.superseriousbusiness.org/gotosocial/internal/httpclient"
|
||||||
"code.superseriousbusiness.org/gotosocial/internal/log"
|
"code.superseriousbusiness.org/gotosocial/internal/log"
|
||||||
@ -56,7 +55,7 @@ const (
|
|||||||
// PER TEST rather than per suite, so that the do function can be set on a test by test (or even more granular)
|
// PER TEST rather than per suite, so that the do function can be set on a test by test (or even more granular)
|
||||||
// basis.
|
// basis.
|
||||||
func NewTestTransportController(state *state.State, client pub.HttpClient) transport.Controller {
|
func NewTestTransportController(state *state.State, client pub.HttpClient) transport.Controller {
|
||||||
return transport.NewController(state, NewTestFederatingDB(state), &federation.Clock{}, client)
|
return transport.NewController(state, NewTestFederatingDB(state), client)
|
||||||
}
|
}
|
||||||
|
|
||||||
type MockHTTPClient struct {
|
type MockHTTPClient struct {
|
||||||
|
Reference in New Issue
Block a user