Block/unblock (#96)

* remote + local block logic, incl. federation

* improve blocking stuff

* fiddle with display of blocked profiles

* go fmt
This commit is contained in:
Tobi Smethurst
2021-07-11 16:22:21 +02:00
committed by GitHub
parent c7da64922f
commit 846057f0d6
45 changed files with 1405 additions and 63 deletions

View File

@@ -59,3 +59,11 @@ func (p *processor) AccountFollowCreate(authed *oauth.Auth, form *apimodel.Accou
func (p *processor) AccountFollowRemove(authed *oauth.Auth, targetAccountID string) (*apimodel.Relationship, gtserror.WithCode) {
return p.accountProcessor.FollowRemove(authed.Account, targetAccountID)
}
func (p *processor) AccountBlockCreate(authed *oauth.Auth, targetAccountID string) (*apimodel.Relationship, gtserror.WithCode) {
return p.accountProcessor.BlockCreate(authed.Account, targetAccountID)
}
func (p *processor) AccountBlockRemove(authed *oauth.Auth, targetAccountID string) (*apimodel.Relationship, gtserror.WithCode) {
return p.accountProcessor.BlockRemove(authed.Account, targetAccountID)
}

View File

@@ -59,6 +59,11 @@ type Processor interface {
FollowCreate(requestingAccount *gtsmodel.Account, form *apimodel.AccountFollowRequest) (*apimodel.Relationship, gtserror.WithCode)
// FollowRemove handles the removal of a follow/follow request to an account, either remote or local.
FollowRemove(requestingAccount *gtsmodel.Account, targetAccountID string) (*apimodel.Relationship, gtserror.WithCode)
// BlockCreate handles the creation of a block from requestingAccount to targetAccountID, either remote or local.
BlockCreate(requestingAccount *gtsmodel.Account, targetAccountID string) (*apimodel.Relationship, gtserror.WithCode)
// BlockRemove handles the removal of a block from requestingAccount to targetAccountID, either remote or local.
BlockRemove(requestingAccount *gtsmodel.Account, targetAccountID string) (*apimodel.Relationship, gtserror.WithCode)
// UpdateHeader does the dirty work of checking the header part of an account update form,
// parsing and checking the image, and doing the necessary updates in the database for this to become
// the account's new header image.

View File

@@ -0,0 +1,155 @@
/*
GoToSocial
Copyright (C) 2021 GoToSocial Authors admin@gotosocial.org
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package account
import (
"fmt"
apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/id"
"github.com/superseriousbusiness/gotosocial/internal/util"
)
func (p *processor) BlockCreate(requestingAccount *gtsmodel.Account, targetAccountID string) (*apimodel.Relationship, gtserror.WithCode) {
// make sure the target account actually exists in our db
targetAcct := &gtsmodel.Account{}
if err := p.db.GetByID(targetAccountID, targetAcct); err != nil {
if _, ok := err.(db.ErrNoEntries); ok {
return nil, gtserror.NewErrorNotFound(fmt.Errorf("BlockCreate: account %s not found in the db: %s", targetAccountID, err))
}
}
// if requestingAccount already blocks target account, we don't need to do anything
block := &gtsmodel.Block{}
if err := p.db.GetWhere([]db.Where{
{Key: "account_id", Value: requestingAccount.ID},
{Key: "target_account_id", Value: targetAccountID},
}, block); err == nil {
// block already exists, just return relationship
return p.RelationshipGet(requestingAccount, targetAccountID)
}
// make the block
newBlockID, err := id.NewULID()
if err != nil {
return nil, gtserror.NewErrorInternalError(err)
}
block.ID = newBlockID
block.AccountID = requestingAccount.ID
block.Account = requestingAccount
block.TargetAccountID = targetAccountID
block.TargetAccount = targetAcct
block.URI = util.GenerateURIForBlock(requestingAccount.Username, p.config.Protocol, p.config.Host, newBlockID)
// whack it in the database
if err := p.db.Put(block); err != nil {
return nil, gtserror.NewErrorInternalError(fmt.Errorf("BlockCreate: error creating block in db: %s", err))
}
// clear any follows or follow requests from the blocked account to the target account -- this is a simple delete
if err := p.db.DeleteWhere([]db.Where{
{Key: "account_id", Value: targetAccountID},
{Key: "target_account_id", Value: requestingAccount.ID},
}, &gtsmodel.Follow{}); err != nil {
return nil, gtserror.NewErrorInternalError(fmt.Errorf("BlockCreate: error removing follow in db: %s", err))
}
if err := p.db.DeleteWhere([]db.Where{
{Key: "account_id", Value: targetAccountID},
{Key: "target_account_id", Value: requestingAccount.ID},
}, &gtsmodel.FollowRequest{}); err != nil {
return nil, gtserror.NewErrorInternalError(fmt.Errorf("BlockCreate: error removing follow in db: %s", err))
}
// clear any follows or follow requests from the requesting account to the target account --
// this might require federation so we need to pass some messages around
// check if a follow request exists from the requesting account to the target account, and remove it if it does (storing the URI for later)
var frChanged bool
var frURI string
fr := &gtsmodel.FollowRequest{}
if err := p.db.GetWhere([]db.Where{
{Key: "account_id", Value: requestingAccount.ID},
{Key: "target_account_id", Value: targetAccountID},
}, fr); err == nil {
frURI = fr.URI
if err := p.db.DeleteByID(fr.ID, fr); err != nil {
return nil, gtserror.NewErrorInternalError(fmt.Errorf("BlockCreate: error removing follow request from db: %s", err))
}
frChanged = true
}
// now do the same thing for any existing follow
var fChanged bool
var fURI string
f := &gtsmodel.Follow{}
if err := p.db.GetWhere([]db.Where{
{Key: "account_id", Value: requestingAccount.ID},
{Key: "target_account_id", Value: targetAccountID},
}, f); err == nil {
fURI = f.URI
if err := p.db.DeleteByID(f.ID, f); err != nil {
return nil, gtserror.NewErrorInternalError(fmt.Errorf("BlockCreate: error removing follow from db: %s", err))
}
fChanged = true
}
// follow request status changed so send the UNDO activity to the channel for async processing
if frChanged {
p.fromClientAPI <- gtsmodel.FromClientAPI{
APObjectType: gtsmodel.ActivityStreamsFollow,
APActivityType: gtsmodel.ActivityStreamsUndo,
GTSModel: &gtsmodel.Follow{
AccountID: requestingAccount.ID,
TargetAccountID: targetAccountID,
URI: frURI,
},
OriginAccount: requestingAccount,
TargetAccount: targetAcct,
}
}
// follow status changed so send the UNDO activity to the channel for async processing
if fChanged {
p.fromClientAPI <- gtsmodel.FromClientAPI{
APObjectType: gtsmodel.ActivityStreamsFollow,
APActivityType: gtsmodel.ActivityStreamsUndo,
GTSModel: &gtsmodel.Follow{
AccountID: requestingAccount.ID,
TargetAccountID: targetAccountID,
URI: fURI,
},
OriginAccount: requestingAccount,
TargetAccount: targetAcct,
}
}
// handle the rest of the block process asynchronously
p.fromClientAPI <- gtsmodel.FromClientAPI{
APObjectType: gtsmodel.ActivityStreamsBlock,
APActivityType: gtsmodel.ActivityStreamsCreate,
GTSModel: block,
OriginAccount: requestingAccount,
TargetAccount: targetAcct,
}
return p.RelationshipGet(requestingAccount, targetAccountID)
}

View File

@@ -45,9 +45,19 @@ func (p *processor) Get(requestingAccount *gtsmodel.Account, targetAccountID str
p.log.WithField("func", "AccountGet").Debugf("dereferencing account: %s", err)
}
var mastoAccount *apimodel.Account
var blocked bool
var err error
if requestingAccount != nil && targetAccount.ID == requestingAccount.ID {
if requestingAccount != nil {
blocked, err = p.db.Blocked(requestingAccount.ID, targetAccountID)
if err != nil {
return nil, fmt.Errorf("error checking account block: %s", err)
}
}
var mastoAccount *apimodel.Account
if blocked {
mastoAccount, err = p.tc.AccountToMastoBlocked(targetAccount)
} else if requestingAccount != nil && targetAccount.ID == requestingAccount.ID {
mastoAccount, err = p.tc.AccountToMastoSensitive(targetAccount)
} else {
mastoAccount, err = p.tc.AccountToMastoPublic(targetAccount)

View File

@@ -0,0 +1,67 @@
/*
GoToSocial
Copyright (C) 2021 GoToSocial Authors admin@gotosocial.org
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package account
import (
"fmt"
apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
)
func (p *processor) BlockRemove(requestingAccount *gtsmodel.Account, targetAccountID string) (*apimodel.Relationship, gtserror.WithCode) {
// make sure the target account actually exists in our db
targetAcct := &gtsmodel.Account{}
if err := p.db.GetByID(targetAccountID, targetAcct); err != nil {
if _, ok := err.(db.ErrNoEntries); ok {
return nil, gtserror.NewErrorNotFound(fmt.Errorf("BlockRemove: account %s not found in the db: %s", targetAccountID, err))
}
}
// check if a block exists, and remove it if it does (storing the URI for later)
var blockChanged bool
block := &gtsmodel.Block{}
if err := p.db.GetWhere([]db.Where{
{Key: "account_id", Value: requestingAccount.ID},
{Key: "target_account_id", Value: targetAccountID},
}, block); err == nil {
block.Account = requestingAccount
block.TargetAccount = targetAcct
if err := p.db.DeleteByID(block.ID, &gtsmodel.Block{}); err != nil {
return nil, gtserror.NewErrorInternalError(fmt.Errorf("BlockRemove: error removing block from db: %s", err))
}
blockChanged = true
}
// block status changed so send the UNDO activity to the channel for async processing
if blockChanged {
p.fromClientAPI <- gtsmodel.FromClientAPI{
APObjectType: gtsmodel.ActivityStreamsBlock,
APActivityType: gtsmodel.ActivityStreamsUndo,
GTSModel: block,
OriginAccount: requestingAccount,
TargetAccount: targetAcct,
}
}
// return whatever relationship results from all this
return p.RelationshipGet(requestingAccount, targetAccountID)
}

View File

@@ -0,0 +1,83 @@
/*
GoToSocial
Copyright (C) 2021 GoToSocial Authors admin@gotosocial.org
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package processing
import (
"fmt"
"net/url"
apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/oauth"
)
func (p *processor) BlocksGet(authed *oauth.Auth, maxID string, sinceID string, limit int) (*apimodel.BlocksResponse, gtserror.WithCode) {
accounts, nextMaxID, prevMinID, err := p.db.GetBlocksForAccount(authed.Account.ID, maxID, sinceID, limit)
if err != nil {
if _, ok := err.(db.ErrNoEntries); ok {
// there are just no entries
return &apimodel.BlocksResponse{
Accounts: []*apimodel.Account{},
}, nil
}
// there's an actual error
return nil, gtserror.NewErrorInternalError(err)
}
apiAccounts := []*apimodel.Account{}
for _, a := range accounts {
apiAccount, err := p.tc.AccountToMastoBlocked(a)
if err != nil {
continue
}
apiAccounts = append(apiAccounts, apiAccount)
}
return p.packageBlocksResponse(apiAccounts, "/api/v1/blocks", nextMaxID, prevMinID, limit)
}
func (p *processor) packageBlocksResponse(accounts []*apimodel.Account, path string, nextMaxID string, prevMinID string, limit int) (*apimodel.BlocksResponse, gtserror.WithCode) {
resp := &apimodel.BlocksResponse{
Accounts: []*apimodel.Account{},
}
resp.Accounts = accounts
// prepare the next and previous links
if len(accounts) != 0 {
nextLink := &url.URL{
Scheme: p.config.Protocol,
Host: p.config.Host,
Path: path,
RawQuery: fmt.Sprintf("limit=%d&max_id=%s", limit, nextMaxID),
}
next := fmt.Sprintf("<%s>; rel=\"next\"", nextLink.String())
prevLink := &url.URL{
Scheme: p.config.Protocol,
Host: p.config.Host,
Path: path,
RawQuery: fmt.Sprintf("limit=%d&min_id=%s", limit, prevMinID),
}
prev := fmt.Sprintf("<%s>; rel=\"prev\"", prevLink.String())
resp.LinkHeader = fmt.Sprintf("%s, %s", next, prev)
}
return resp, nil
}

View File

@@ -76,7 +76,6 @@ func (p *processor) processFromClientAPI(clientMsg gtsmodel.FromClientAPI) error
}
return p.federateFave(fave, clientMsg.OriginAccount, clientMsg.TargetAccount)
case gtsmodel.ActivityStreamsAnnounce:
// CREATE BOOST/ANNOUNCE
boostWrapperStatus, ok := clientMsg.GTSModel.(*gtsmodel.Status)
@@ -93,6 +92,25 @@ func (p *processor) processFromClientAPI(clientMsg gtsmodel.FromClientAPI) error
}
return p.federateAnnounce(boostWrapperStatus, clientMsg.OriginAccount, clientMsg.TargetAccount)
case gtsmodel.ActivityStreamsBlock:
// CREATE BLOCK
block, ok := clientMsg.GTSModel.(*gtsmodel.Block)
if !ok {
return errors.New("block was not parseable as *gtsmodel.Block")
}
// remove any of the blocking account's statuses from the blocked account's timeline, and vice versa
if err := p.timelineManager.WipeStatusesFromAccountID(block.AccountID, block.TargetAccountID); err != nil {
return err
}
if err := p.timelineManager.WipeStatusesFromAccountID(block.TargetAccountID, block.AccountID); err != nil {
return err
}
// TODO: same with notifications
// TODO: same with bookmarks
return p.federateBlock(block)
}
case gtsmodel.ActivityStreamsUpdate:
// UPDATE
@@ -132,6 +150,13 @@ func (p *processor) processFromClientAPI(clientMsg gtsmodel.FromClientAPI) error
return errors.New("undo was not parseable as *gtsmodel.Follow")
}
return p.federateUnfollow(follow, clientMsg.OriginAccount, clientMsg.TargetAccount)
case gtsmodel.ActivityStreamsBlock:
// UNDO BLOCK
block, ok := clientMsg.GTSModel.(*gtsmodel.Block)
if !ok {
return errors.New("undo was not parseable as *gtsmodel.Block")
}
return p.federateUnblock(block)
case gtsmodel.ActivityStreamsLike:
// UNDO LIKE/FAVE
fave, ok := clientMsg.GTSModel.(*gtsmodel.StatusFave)
@@ -530,3 +555,93 @@ func (p *processor) federateAccountUpdate(updatedAccount *gtsmodel.Account, orig
_, err = p.federator.FederatingActor().Send(context.Background(), outboxIRI, update)
return err
}
func (p *processor) federateBlock(block *gtsmodel.Block) error {
if block.Account == nil {
a := &gtsmodel.Account{}
if err := p.db.GetByID(block.AccountID, a); err != nil {
return fmt.Errorf("federateBlock: error getting block account from database: %s", err)
}
block.Account = a
}
if block.TargetAccount == nil {
a := &gtsmodel.Account{}
if err := p.db.GetByID(block.TargetAccountID, a); err != nil {
return fmt.Errorf("federateBlock: error getting block target account from database: %s", err)
}
block.TargetAccount = a
}
// if both accounts are local there's nothing to do here
if block.Account.Domain == "" && block.TargetAccount.Domain == "" {
return nil
}
asBlock, err := p.tc.BlockToAS(block)
if err != nil {
return fmt.Errorf("federateBlock: error converting block to AS format: %s", err)
}
outboxIRI, err := url.Parse(block.Account.OutboxURI)
if err != nil {
return fmt.Errorf("federateBlock: error parsing outboxURI %s: %s", block.Account.OutboxURI, err)
}
_, err = p.federator.FederatingActor().Send(context.Background(), outboxIRI, asBlock)
return err
}
func (p *processor) federateUnblock(block *gtsmodel.Block) error {
if block.Account == nil {
a := &gtsmodel.Account{}
if err := p.db.GetByID(block.AccountID, a); err != nil {
return fmt.Errorf("federateUnblock: error getting block account from database: %s", err)
}
block.Account = a
}
if block.TargetAccount == nil {
a := &gtsmodel.Account{}
if err := p.db.GetByID(block.TargetAccountID, a); err != nil {
return fmt.Errorf("federateUnblock: error getting block target account from database: %s", err)
}
block.TargetAccount = a
}
// if both accounts are local there's nothing to do here
if block.Account.Domain == "" && block.TargetAccount.Domain == "" {
return nil
}
asBlock, err := p.tc.BlockToAS(block)
if err != nil {
return fmt.Errorf("federateUnblock: error converting block to AS format: %s", err)
}
targetAccountURI, err := url.Parse(block.TargetAccount.URI)
if err != nil {
return fmt.Errorf("federateUnblock: error parsing uri %s: %s", block.TargetAccount.URI, err)
}
// create an Undo and set the appropriate actor on it
undo := streams.NewActivityStreamsUndo()
undo.SetActivityStreamsActor(asBlock.GetActivityStreamsActor())
// Set the block as the 'object' property.
undoObject := streams.NewActivityStreamsObjectProperty()
undoObject.AppendActivityStreamsBlock(asBlock)
undo.SetActivityStreamsObject(undoObject)
// Set the To of the undo as the target of the block
undoTo := streams.NewActivityStreamsToProperty()
undoTo.AppendIRI(targetAccountURI)
undo.SetActivityStreamsTo(undoTo)
outboxIRI, err := url.Parse(block.Account.OutboxURI)
if err != nil {
return fmt.Errorf("federateUnblock: error parsing outboxURI %s: %s", block.Account.OutboxURI, err)
}
_, err = p.federator.FederatingActor().Send(context.Background(), outboxIRI, undo)
return err
}

View File

@@ -34,7 +34,7 @@ func (p *processor) processFromFederator(federatorMsg gtsmodel.FromFederator) er
"federatorMsg": fmt.Sprintf("%+v", federatorMsg),
})
l.Debug("entering function PROCESS FROM FEDERATOR")
l.Trace("entering function PROCESS FROM FEDERATOR")
switch federatorMsg.APActivityType {
case gtsmodel.ActivityStreamsCreate:
@@ -47,7 +47,7 @@ func (p *processor) processFromFederator(federatorMsg gtsmodel.FromFederator) er
return errors.New("note was not parseable as *gtsmodel.Status")
}
l.Debug("will now derefence incoming status")
l.Trace("will now derefence incoming status")
if err := p.federator.DereferenceStatusFields(incomingStatus, federatorMsg.ReceivingAccount.Username); err != nil {
return fmt.Errorf("error dereferencing status from federator: %s", err)
}
@@ -70,7 +70,7 @@ func (p *processor) processFromFederator(federatorMsg gtsmodel.FromFederator) er
return errors.New("profile was not parseable as *gtsmodel.Account")
}
l.Debug("will now derefence incoming account")
l.Trace("will now derefence incoming account")
if err := p.federator.DereferenceAccountFields(incomingAccount, "", false); err != nil {
return fmt.Errorf("error dereferencing account from federator: %s", err)
}
@@ -127,6 +127,22 @@ func (p *processor) processFromFederator(federatorMsg gtsmodel.FromFederator) er
if err := p.notifyAnnounce(incomingAnnounce); err != nil {
return err
}
case gtsmodel.ActivityStreamsBlock:
// CREATE A BLOCK
block, ok := federatorMsg.GTSModel.(*gtsmodel.Block)
if !ok {
return errors.New("block was not parseable as *gtsmodel.Block")
}
// remove any of the blocking account's statuses from the blocked account's timeline, and vice versa
if err := p.timelineManager.WipeStatusesFromAccountID(block.AccountID, block.TargetAccountID); err != nil {
return err
}
if err := p.timelineManager.WipeStatusesFromAccountID(block.TargetAccountID, block.AccountID); err != nil {
return err
}
// TODO: same with notifications
// TODO: same with bookmarks
}
case gtsmodel.ActivityStreamsUpdate:
// UPDATE
@@ -138,7 +154,7 @@ func (p *processor) processFromFederator(federatorMsg gtsmodel.FromFederator) er
return errors.New("profile was not parseable as *gtsmodel.Account")
}
l.Debug("will now derefence incoming account")
l.Trace("will now derefence incoming account")
if err := p.federator.DereferenceAccountFields(incomingAccount, federatorMsg.ReceivingAccount.Username, true); err != nil {
return fmt.Errorf("error dereferencing account from federator: %s", err)
}

View File

@@ -82,6 +82,10 @@ type Processor interface {
AccountFollowCreate(authed *oauth.Auth, form *apimodel.AccountFollowRequest) (*apimodel.Relationship, gtserror.WithCode)
// AccountFollowRemove handles the removal of a follow/follow request to an account, either remote or local.
AccountFollowRemove(authed *oauth.Auth, targetAccountID string) (*apimodel.Relationship, gtserror.WithCode)
// AccountBlockCreate handles the creation of a block from authed account to target account, either remote or local.
AccountBlockCreate(authed *oauth.Auth, targetAccountID string) (*apimodel.Relationship, gtserror.WithCode)
// AccountBlockRemove handles the removal of a block from authed account to target account, either remote or local.
AccountBlockRemove(authed *oauth.Auth, targetAccountID string) (*apimodel.Relationship, gtserror.WithCode)
// AdminEmojiCreate handles the creation of a new instance emoji by an admin, using the given form.
AdminEmojiCreate(authed *oauth.Auth, form *apimodel.EmojiCreateRequest) (*apimodel.Emoji, error)
@@ -99,6 +103,9 @@ type Processor interface {
// AppCreate processes the creation of a new API application
AppCreate(authed *oauth.Auth, form *apimodel.ApplicationCreateRequest) (*apimodel.Application, error)
// BlocksGet returns a list of accounts blocked by the requesting account.
BlocksGet(authed *oauth.Auth, maxID string, sinceID string, limit int) (*apimodel.BlocksResponse, gtserror.WithCode)
// FileGet handles the fetching of a media attachment file via the fileserver.
FileGet(authed *oauth.Auth, form *apimodel.GetContentRequestForm) (*apimodel.Content, error)
@@ -275,14 +282,14 @@ func (p *processor) Start() error {
for {
select {
case clientMsg := <-p.fromClientAPI:
p.log.Infof("received message FROM client API: %+v", clientMsg)
p.log.Tracef("received message FROM client API: %+v", clientMsg)
go func() {
if err := p.processFromClientAPI(clientMsg); err != nil {
p.log.Error(err)
}
}()
case federatorMsg := <-p.fromFederator:
p.log.Infof("received message FROM federator: %+v", federatorMsg)
p.log.Tracef("received message FROM federator: %+v", federatorMsg)
go func() {
if err := p.processFromFederator(federatorMsg); err != nil {
p.log.Error(err)