Handle forwarded messages (#273)

* correct path of foss_satan

* add APIri and notes

* test create forward note

* rename target => receiving account

* split up create into separate funcs

* update extractFromCtx

* tidy up from federator processing

* foss satan => http not https

* check if status in db

* mock dereference of status from IRI

* add forward message deref test

* update test with activities

* add remote_account_2 to test rig
This commit is contained in:
tobi
2021-10-10 12:39:25 +02:00
committed by GitHub
parent 3dc7644ae6
commit 367bdca250
18 changed files with 765 additions and 383 deletions

View File

@@ -31,203 +31,264 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/messages"
)
// ProcessFromFederator reads the APActivityType and APObjectType of an incoming message from the federator,
// and directs the message into the appropriate side effect handler function, or simply does nothing if there's
// no handler function defined for the combination of Activity and Object.
func (p *processor) ProcessFromFederator(ctx context.Context, federatorMsg messages.FromFederator) error {
l := p.log.WithFields(logrus.Fields{
"func": "processFromFederator",
"federatorMsg": fmt.Sprintf("%+v", federatorMsg),
"func": "processFromFederator",
"APActivityType": federatorMsg.APActivityType,
"APObjectType": federatorMsg.APObjectType,
})
l.Trace("entering function PROCESS FROM FEDERATOR")
l.Trace("processing message from federator")
switch federatorMsg.APActivityType {
case ap.ActivityCreate:
// CREATE
// CREATE SOMETHING
switch federatorMsg.APObjectType {
case ap.ObjectNote:
// CREATE A STATUS
incomingStatus, ok := federatorMsg.GTSModel.(*gtsmodel.Status)
if !ok {
return errors.New("note was not parseable as *gtsmodel.Status")
}
status, err := p.federator.EnrichRemoteStatus(ctx, federatorMsg.ReceivingAccount.Username, incomingStatus, true)
if err != nil {
return err
}
if err := p.timelineStatus(ctx, status); err != nil {
return err
}
if err := p.notifyStatus(ctx, status); err != nil {
return err
}
case ap.ObjectProfile:
// CREATE AN ACCOUNT
// nothing to do here
return p.processCreateStatusFromFederator(ctx, federatorMsg)
case ap.ActivityLike:
// CREATE A FAVE
incomingFave, ok := federatorMsg.GTSModel.(*gtsmodel.StatusFave)
if !ok {
return errors.New("like was not parseable as *gtsmodel.StatusFave")
}
if err := p.notifyFave(ctx, incomingFave); err != nil {
return err
}
return p.processCreateFaveFromFederator(ctx, federatorMsg)
case ap.ActivityFollow:
// CREATE A FOLLOW REQUEST
followRequest, ok := federatorMsg.GTSModel.(*gtsmodel.FollowRequest)
if !ok {
return errors.New("incomingFollowRequest was not parseable as *gtsmodel.FollowRequest")
}
if followRequest.TargetAccount == nil {
a, err := p.db.GetAccountByID(ctx, followRequest.TargetAccountID)
if err != nil {
return err
}
followRequest.TargetAccount = a
}
targetAccount := followRequest.TargetAccount
if targetAccount.Locked {
// if the account is locked just notify the follow request and nothing else
return p.notifyFollowRequest(ctx, followRequest)
}
if followRequest.Account == nil {
a, err := p.db.GetAccountByID(ctx, followRequest.AccountID)
if err != nil {
return err
}
followRequest.Account = a
}
originAccount := followRequest.Account
// if the target account isn't locked, we should already accept the follow and notify about the new follower instead
follow, err := p.db.AcceptFollowRequest(ctx, followRequest.AccountID, followRequest.TargetAccountID)
if err != nil {
return err
}
if err := p.federateAcceptFollowRequest(ctx, follow, originAccount, targetAccount); err != nil {
return err
}
return p.notifyFollow(ctx, follow, targetAccount)
return p.processCreateFollowRequestFromFederator(ctx, federatorMsg)
case ap.ActivityAnnounce:
// CREATE AN ANNOUNCE
incomingAnnounce, ok := federatorMsg.GTSModel.(*gtsmodel.Status)
if !ok {
return errors.New("announce was not parseable as *gtsmodel.Status")
}
if err := p.federator.DereferenceAnnounce(ctx, incomingAnnounce, federatorMsg.ReceivingAccount.Username); err != nil {
return fmt.Errorf("error dereferencing announce from federator: %s", err)
}
incomingAnnounceID, err := id.NewULIDFromTime(incomingAnnounce.CreatedAt)
if err != nil {
return err
}
incomingAnnounce.ID = incomingAnnounceID
if err := p.db.PutStatus(ctx, incomingAnnounce); err != nil {
return fmt.Errorf("error adding dereferenced announce to the db: %s", err)
}
if err := p.timelineStatus(ctx, incomingAnnounce); err != nil {
return err
}
if err := p.notifyAnnounce(ctx, incomingAnnounce); err != nil {
return err
}
return p.processCreateAnnounceFromFederator(ctx, federatorMsg)
case ap.ActivityBlock:
// CREATE A BLOCK
block, ok := federatorMsg.GTSModel.(*gtsmodel.Block)
if !ok {
return errors.New("block was not parseable as *gtsmodel.Block")
}
// remove any of the blocking account's statuses from the blocked account's timeline, and vice versa
if err := p.timelineManager.WipeStatusesFromAccountID(ctx, block.AccountID, block.TargetAccountID); err != nil {
return err
}
if err := p.timelineManager.WipeStatusesFromAccountID(ctx, block.TargetAccountID, block.AccountID); err != nil {
return err
}
// TODO: same with notifications
// TODO: same with bookmarks
return p.processCreateBlockFromFederator(ctx, federatorMsg)
}
case ap.ActivityUpdate:
// UPDATE
// UPDATE SOMETHING
switch federatorMsg.APObjectType {
case ap.ObjectProfile:
// UPDATE AN ACCOUNT
incomingAccount, ok := federatorMsg.GTSModel.(*gtsmodel.Account)
if !ok {
return errors.New("profile was not parseable as *gtsmodel.Account")
}
if _, err := p.federator.EnrichRemoteAccount(ctx, federatorMsg.ReceivingAccount.Username, incomingAccount); err != nil {
return fmt.Errorf("error enriching updated account from federator: %s", err)
}
return p.processUpdateAccountFromFederator(ctx, federatorMsg)
}
case ap.ActivityDelete:
// DELETE
// DELETE SOMETHING
switch federatorMsg.APObjectType {
case ap.ObjectNote:
// DELETE A STATUS
// TODO: handle side effects of status deletion here:
// 1. delete all media associated with status
// 2. delete boosts of status
// 3. etc etc etc
statusToDelete, ok := federatorMsg.GTSModel.(*gtsmodel.Status)
if !ok {
return errors.New("note was not parseable as *gtsmodel.Status")
}
// delete all attachments for this status
for _, a := range statusToDelete.AttachmentIDs {
if err := p.mediaProcessor.Delete(ctx, a); err != nil {
return err
}
}
// delete all mentions for this status
for _, m := range statusToDelete.MentionIDs {
if err := p.db.DeleteByID(ctx, m, &gtsmodel.Mention{}); err != nil {
return err
}
}
// delete all notifications for this status
if err := p.db.DeleteWhere(ctx, []db.Where{{Key: "status_id", Value: statusToDelete.ID}}, &[]*gtsmodel.Notification{}); err != nil {
return err
}
// remove this status from any and all timelines
return p.deleteStatusFromTimelines(ctx, statusToDelete)
return p.processDeleteStatusFromFederator(ctx, federatorMsg)
case ap.ObjectProfile:
// DELETE A PROFILE/ACCOUNT
// handle side effects of account deletion here: delete all objects, statuses, media etc associated with account
account, ok := federatorMsg.GTSModel.(*gtsmodel.Account)
if !ok {
return errors.New("account delete was not parseable as *gtsmodel.Account")
}
return p.processDeleteAccountFromFederator(ctx, federatorMsg)
}
}
return p.accountProcessor.Delete(ctx, account, account.ID)
// not a combination we can/need to process
return nil
}
// processCreateStatusFromFederator handles Activity Create and Object Note
func (p *processor) processCreateStatusFromFederator(ctx context.Context, federatorMsg messages.FromFederator) error {
// check for either an IRI that we still need to dereference, OR an already dereferenced
// and converted status pinned to the message.
var status *gtsmodel.Status
if federatorMsg.GTSModel != nil {
// there's a gts model already pinned to the message, it should be a status
var ok bool
if status, ok = federatorMsg.GTSModel.(*gtsmodel.Status); !ok {
return errors.New("ProcessFromFederator: note was not parseable as *gtsmodel.Status")
}
case ap.ActivityAccept:
// ACCEPT
switch federatorMsg.APObjectType {
case ap.ActivityFollow:
// ACCEPT A FOLLOW
// nothing to do here
var err error
status, err = p.federator.EnrichRemoteStatus(ctx, federatorMsg.ReceivingAccount.Username, status, true)
if err != nil {
return err
}
} else {
// no model pinned, we need to dereference based on the IRI
if federatorMsg.APIri == nil {
return errors.New("ProcessFromFederator: status was not pinned to federatorMsg, and neither was an IRI for us to dereference")
}
var err error
status, _, _, err = p.federator.GetRemoteStatus(ctx, federatorMsg.ReceivingAccount.Username, federatorMsg.APIri, false, false)
if err != nil {
return err
}
}
if err := p.timelineStatus(ctx, status); err != nil {
return err
}
if err := p.notifyStatus(ctx, status); err != nil {
return err
}
return nil
}
// processCreateFaveFromFederator handles Activity Create and Object Like
func (p *processor) processCreateFaveFromFederator(ctx context.Context, federatorMsg messages.FromFederator) error {
incomingFave, ok := federatorMsg.GTSModel.(*gtsmodel.StatusFave)
if !ok {
return errors.New("like was not parseable as *gtsmodel.StatusFave")
}
if err := p.notifyFave(ctx, incomingFave); err != nil {
return err
}
return nil
}
// processCreateFollowRequestFromFederator handles Activity Create and Object Follow
func (p *processor) processCreateFollowRequestFromFederator(ctx context.Context, federatorMsg messages.FromFederator) error {
followRequest, ok := federatorMsg.GTSModel.(*gtsmodel.FollowRequest)
if !ok {
return errors.New("incomingFollowRequest was not parseable as *gtsmodel.FollowRequest")
}
if followRequest.TargetAccount == nil {
a, err := p.db.GetAccountByID(ctx, followRequest.TargetAccountID)
if err != nil {
return err
}
followRequest.TargetAccount = a
}
targetAccount := followRequest.TargetAccount
if targetAccount.Locked {
// if the account is locked just notify the follow request and nothing else
return p.notifyFollowRequest(ctx, followRequest)
}
if followRequest.Account == nil {
a, err := p.db.GetAccountByID(ctx, followRequest.AccountID)
if err != nil {
return err
}
followRequest.Account = a
}
originAccount := followRequest.Account
// if the target account isn't locked, we should already accept the follow and notify about the new follower instead
follow, err := p.db.AcceptFollowRequest(ctx, followRequest.AccountID, followRequest.TargetAccountID)
if err != nil {
return err
}
if err := p.federateAcceptFollowRequest(ctx, follow, originAccount, targetAccount); err != nil {
return err
}
return p.notifyFollow(ctx, follow, targetAccount)
}
// processCreateAnnounceFromFederator handles Activity Create and Object Announce
func (p *processor) processCreateAnnounceFromFederator(ctx context.Context, federatorMsg messages.FromFederator) error {
incomingAnnounce, ok := federatorMsg.GTSModel.(*gtsmodel.Status)
if !ok {
return errors.New("announce was not parseable as *gtsmodel.Status")
}
if err := p.federator.DereferenceAnnounce(ctx, incomingAnnounce, federatorMsg.ReceivingAccount.Username); err != nil {
return fmt.Errorf("error dereferencing announce from federator: %s", err)
}
incomingAnnounceID, err := id.NewULIDFromTime(incomingAnnounce.CreatedAt)
if err != nil {
return err
}
incomingAnnounce.ID = incomingAnnounceID
if err := p.db.PutStatus(ctx, incomingAnnounce); err != nil {
return fmt.Errorf("error adding dereferenced announce to the db: %s", err)
}
if err := p.timelineStatus(ctx, incomingAnnounce); err != nil {
return err
}
if err := p.notifyAnnounce(ctx, incomingAnnounce); err != nil {
return err
}
return nil
}
// processCreateBlockFromFederator handles Activity Create and Object Block
func (p *processor) processCreateBlockFromFederator(ctx context.Context, federatorMsg messages.FromFederator) error {
block, ok := federatorMsg.GTSModel.(*gtsmodel.Block)
if !ok {
return errors.New("block was not parseable as *gtsmodel.Block")
}
// remove any of the blocking account's statuses from the blocked account's timeline, and vice versa
if err := p.timelineManager.WipeStatusesFromAccountID(ctx, block.AccountID, block.TargetAccountID); err != nil {
return err
}
if err := p.timelineManager.WipeStatusesFromAccountID(ctx, block.TargetAccountID, block.AccountID); err != nil {
return err
}
// TODO: same with notifications
// TODO: same with bookmarks
return nil
}
// processUpdateAccountFromFederator handles Activity Update and Object Profile
func (p *processor) processUpdateAccountFromFederator(ctx context.Context, federatorMsg messages.FromFederator) error {
incomingAccount, ok := federatorMsg.GTSModel.(*gtsmodel.Account)
if !ok {
return errors.New("profile was not parseable as *gtsmodel.Account")
}
if _, err := p.federator.EnrichRemoteAccount(ctx, federatorMsg.ReceivingAccount.Username, incomingAccount); err != nil {
return fmt.Errorf("error enriching updated account from federator: %s", err)
}
return nil
}
// processDeleteStatusFromFederator handles Activity Delete and Object Note
func (p *processor) processDeleteStatusFromFederator(ctx context.Context, federatorMsg messages.FromFederator) error {
// TODO: handle side effects of status deletion here:
// 1. delete all media associated with status
// 2. delete boosts of status
// 3. etc etc etc
statusToDelete, ok := federatorMsg.GTSModel.(*gtsmodel.Status)
if !ok {
return errors.New("note was not parseable as *gtsmodel.Status")
}
// delete all attachments for this status
for _, a := range statusToDelete.AttachmentIDs {
if err := p.mediaProcessor.Delete(ctx, a); err != nil {
return err
}
}
// delete all mentions for this status
for _, m := range statusToDelete.MentionIDs {
if err := p.db.DeleteByID(ctx, m, &gtsmodel.Mention{}); err != nil {
return err
}
}
// delete all notifications for this status
if err := p.db.DeleteWhere(ctx, []db.Where{{Key: "status_id", Value: statusToDelete.ID}}, &[]*gtsmodel.Notification{}); err != nil {
return err
}
// remove this status from any and all timelines
return p.deleteStatusFromTimelines(ctx, statusToDelete)
}
// processDeleteAccountFromFederator handles Activity Delete and Object Profile
func (p *processor) processDeleteAccountFromFederator(ctx context.Context, federatorMsg messages.FromFederator) error {
account, ok := federatorMsg.GTSModel.(*gtsmodel.Account)
if !ok {
return errors.New("account delete was not parseable as *gtsmodel.Account")
}
return p.accountProcessor.Delete(ctx, account, account.ID)
}

View File

@@ -32,6 +32,7 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/id"
"github.com/superseriousbusiness/gotosocial/internal/messages"
"github.com/superseriousbusiness/gotosocial/testrig"
)
type FromFederatorTestSuite struct {
@@ -486,6 +487,28 @@ func (suite *FromFederatorTestSuite) TestProcessFollowRequestUnlocked() {
suite.Equal("Accept", accept.Type)
}
// TestCreateStatusFromIRI checks if a forwarded status can be dereferenced by the processor.
func (suite *FromFederatorTestSuite) TestCreateStatusFromIRI() {
ctx := context.Background()
receivingAccount := suite.testAccounts["local_account_1"]
statusCreator := suite.testAccounts["remote_account_2"]
err := suite.processor.ProcessFromFederator(ctx, messages.FromFederator{
APObjectType: ap.ObjectNote,
APActivityType: ap.ActivityCreate,
GTSModel: nil, // gtsmodel is nil because this is a forwarded status -- we want to dereference it using the iri
ReceivingAccount: receivingAccount,
APIri: testrig.URLMustParse("http://example.org/users/some_user/statuses/afaba698-5740-4e32-a702-af61aa543bc1"),
})
suite.NoError(err)
// status should now be in the database, attributed to remote_account_2
s, err := suite.db.GetStatusByURI(context.Background(), "http://example.org/users/some_user/statuses/afaba698-5740-4e32-a702-af61aa543bc1")
suite.NoError(err)
suite.Equal(statusCreator.URI, s.AccountURI)
}
func TestFromFederatorTestSuite(t *testing.T) {
suite.Run(t, &FromFederatorTestSuite{})
}

View File

@@ -69,6 +69,7 @@ type ProcessingStandardTestSuite struct {
testMentions map[string]*gtsmodel.Mention
testAutheds map[string]*oauth.Auth
testBlocks map[string]*gtsmodel.Block
testActivities map[string]testrig.ActivityWithSignature
sentHTTPRequests map[string][]byte
@@ -92,6 +93,7 @@ func (suite *ProcessingStandardTestSuite) SetupSuite() {
Account: suite.testAccounts["local_account_1"],
},
}
suite.testActivities = testrig.NewTestActivities(suite.testAccounts)
suite.testBlocks = testrig.NewTestBlocks()
}
@@ -149,6 +151,32 @@ func (suite *ProcessingStandardTestSuite) SetupTest() {
return response, nil
}
if req.URL.String() == "http://example.org/users/some_user/statuses/afaba698-5740-4e32-a702-af61aa543bc1" {
// the request is for the forwarded message
message := suite.testActivities["forwarded_message"].Activity.GetActivityStreamsObject().At(0).GetActivityStreamsNote()
messageI, err := streams.Serialize(message)
if err != nil {
panic(err)
}
messageJson, err := json.Marshal(messageI)
if err != nil {
panic(err)
}
responseType := "application/activity+json"
reader := bytes.NewReader(messageJson)
readCloser := io.NopCloser(reader)
response := &http.Response{
StatusCode: 200,
Body: readCloser,
ContentLength: int64(len(messageJson)),
Header: http.Header{
"content-type": {responseType},
},
}
return response, nil
}
r := ioutil.NopCloser(bytes.NewReader([]byte{}))
return &http.Response{
StatusCode: 200,