From 8633696da0b521ab83e27cbb6d24ad6ec22339f1 Mon Sep 17 00:00:00 2001 From: Thomas Sileo Date: Wed, 10 Aug 2022 20:34:36 +0200 Subject: [PATCH] Improve workers for incoming/outgoing activities --- app/activitypub.py | 21 ++-- app/actor.py | 7 +- app/incoming_activities.py | 75 +++++++----- app/models.py | 2 +- app/outgoing_activities.py | 135 ++++++++++++---------- app/utils/opengraph.py | 3 +- docs/templates/layout.html | 1 + tasks.py | 2 +- tests/test_inbox.py | 20 +++- tests/test_process_outgoing_activities.py | 78 +++++++++---- 10 files changed, 209 insertions(+), 135 deletions(-) diff --git a/app/activitypub.py b/app/activitypub.py index 8e2a1f1..80e5201 100644 --- a/app/activitypub.py +++ b/app/activitypub.py @@ -327,17 +327,18 @@ def remove_context(raw_object: RawObject) -> RawObject: return a -def post(url: str, payload: dict[str, Any]) -> httpx.Response: +async def post(url: str, payload: dict[str, Any]) -> httpx.Response: check_url(url) - resp = httpx.post( - url, - headers={ - "User-Agent": config.USER_AGENT, - "Content-Type": config.AP_CONTENT_TYPE, - }, - json=payload, - auth=auth, - ) + async with httpx.AsyncClient() as client: + resp = await client.post( + url, + headers={ + "User-Agent": config.USER_AGENT, + "Content-Type": config.AP_CONTENT_TYPE, + }, + json=payload, + auth=auth, + ) resp.raise_for_status() return resp diff --git a/app/actor.py b/app/actor.py index 3761fc5..0e611e7 100644 --- a/app/actor.py +++ b/app/actor.py @@ -157,7 +157,12 @@ async def save_actor(db_session: AsyncSession, ap_actor: ap.RawObject) -> "Actor return actor -async def fetch_actor(db_session: AsyncSession, actor_id: str) -> "ActorModel": +async def fetch_actor( + db_session: AsyncSession, + actor_id: str, +) -> Union["ActorModel", RemoteActor]: + if actor_id == LOCAL_ACTOR.ap_id: + return LOCAL_ACTOR from app import models existing_actor = ( diff --git a/app/incoming_activities.py b/app/incoming_activities.py index 6a365d3..da1bf26 100644 --- a/app/incoming_activities.py +++ b/app/incoming_activities.py @@ -13,8 +13,8 @@ from app import ldsig from app import models from app.boxes import save_to_inbox from app.database import AsyncSession -from app.database import async_session from app.utils.datetime import now +from app.utils.workers import Worker _MAX_RETRIES = 8 @@ -67,11 +67,15 @@ def _set_next_try( outgoing_activity.next_try = next_try or _exp_backoff(outgoing_activity.tries) -async def process_next_incoming_activity(db_session: AsyncSession) -> bool: +async def fetch_next_incoming_activity( + db_session: AsyncSession, + in_flight: set[int], +) -> models.IncomingActivity | None: where = [ models.IncomingActivity.next_try <= now(), models.IncomingActivity.is_errored.is_(False), models.IncomingActivity.is_processed.is_(False), + models.IncomingActivity.id.not_in(in_flight), ] q_count = await db_session.scalar( select(func.count(models.IncomingActivity.id)).where(*where) @@ -80,7 +84,7 @@ async def process_next_incoming_activity(db_session: AsyncSession) -> bool: logger.info(f"{q_count} incoming activities ready to process") if not q_count: # logger.debug("No activities to process") - return False + return None next_activity = ( await db_session.execute( @@ -91,6 +95,13 @@ async def process_next_incoming_activity(db_session: AsyncSession) -> bool: ) ).scalar_one() + return next_activity + + +async def process_next_incoming_activity( + db_session: AsyncSession, + next_activity: models.IncomingActivity, +) -> None: logger.info( f"incoming_activity={next_activity.ap_object}/" f"{next_activity.sent_by_ap_actor_id}" @@ -99,35 +110,45 @@ async def process_next_incoming_activity(db_session: AsyncSession) -> bool: next_activity.tries = next_activity.tries + 1 next_activity.last_try = now() - try: - async with db_session.begin_nested(): - await save_to_inbox( - db_session, - next_activity.ap_object, - next_activity.sent_by_ap_actor_id, - ) - except Exception: - logger.exception("Failed") - next_activity.error = traceback.format_exc() - _set_next_try(next_activity) - else: - logger.info("Success") - next_activity.is_processed = True + if next_activity.ap_object and next_activity.sent_by_ap_actor_id: + try: + async with db_session.begin_nested(): + await save_to_inbox( + db_session, + next_activity.ap_object, + next_activity.sent_by_ap_actor_id, + ) + except Exception: + logger.exception("Failed") + next_activity.error = traceback.format_exc() + _set_next_try(next_activity) + else: + logger.info("Success") + next_activity.is_processed = True + + # FIXME: webmention support await db_session.commit() - return True + return None + + +class IncomingActivityWorker(Worker[models.IncomingActivity]): + async def process_message( + self, + db_session: AsyncSession, + next_activity: models.IncomingActivity, + ) -> None: + await process_next_incoming_activity(db_session, next_activity) + + async def get_next_message( + self, + db_session: AsyncSession, + ) -> models.IncomingActivity | None: + return await fetch_next_incoming_activity(db_session, self.in_flight_ids()) async def loop() -> None: - async with async_session() as db_session: - while 1: - try: - await process_next_incoming_activity(db_session) - except Exception: - logger.exception("Failed to process next incoming activity") - raise - - await asyncio.sleep(1) + await IncomingActivityWorker(workers_count=1).run_forever() if __name__ == "__main__": diff --git a/app/models.py b/app/models.py index b2472c7..b04f6b6 100644 --- a/app/models.py +++ b/app/models.py @@ -320,7 +320,7 @@ class IncomingActivity(Base): ap_id = Column(String, nullable=True, index=True) ap_object: Mapped[ap.RawObject] = Column(JSON, nullable=True) - tries = Column(Integer, nullable=False, default=0) + tries: Mapped[int] = Column(Integer, nullable=False, default=0) next_try = Column(DateTime(timezone=True), nullable=True, default=now) last_try = Column(DateTime(timezone=True), nullable=True) diff --git a/app/outgoing_activities.py b/app/outgoing_activities.py index ae2f5c0..9f08de6 100644 --- a/app/outgoing_activities.py +++ b/app/outgoing_activities.py @@ -1,3 +1,4 @@ +import asyncio import email import time import traceback @@ -8,7 +9,6 @@ import httpx from loguru import logger from sqlalchemy import func from sqlalchemy import select -from sqlalchemy.orm import Session from sqlalchemy.orm import joinedload from app import activitypub as ap @@ -19,10 +19,10 @@ from app.actor import LOCAL_ACTOR from app.actor import _actor_hash from app.config import KEY_PATH from app.database import AsyncSession -from app.database import SessionLocal from app.key import Key from app.utils.datetime import now from app.utils.url import check_url +from app.utils.workers import Worker _MAX_RETRIES = 16 @@ -50,7 +50,9 @@ def _is_local_actor_updated() -> bool: return True -def _send_actor_update_if_needed(db_session: Session) -> None: +async def _send_actor_update_if_needed( + db_session: AsyncSession, +) -> None: """The process for sending an update for the local actor is done here as in production, we may have multiple uvicorn worker and this worker will always run in a single process.""" @@ -59,9 +61,9 @@ def _send_actor_update_if_needed(db_session: Session) -> None: logger.info("Will send an Update for the local actor") - from app.boxes import RemoteObject from app.boxes import allocate_outbox_id from app.boxes import outbox_object_id + from app.boxes import save_outbox_object update_activity_id = allocate_outbox_id() update_activity = { @@ -72,30 +74,15 @@ def _send_actor_update_if_needed(db_session: Session) -> None: "actor": config.ID, "object": ap.remove_context(LOCAL_ACTOR.ap_actor), } - ro = RemoteObject(update_activity, actor=LOCAL_ACTOR) - outbox_object = models.OutboxObject( - public_id=update_activity_id, - ap_type=ro.ap_type, - ap_id=ro.ap_id, - ap_context=ro.ap_context, - ap_object=ro.ap_object, - visibility=ro.visibility, - og_meta=None, - relates_to_inbox_object_id=None, - relates_to_outbox_object_id=None, - relates_to_actor_id=None, - activity_object_ap_id=LOCAL_ACTOR.ap_id, - is_hidden_from_homepage=True, - source=None, + outbox_object = await save_outbox_object( + db_session, update_activity_id, update_activity ) - db_session.add(outbox_object) - db_session.flush() # Send the update to the followers collection and all the actor we have ever # contacted followers = ( ( - db_session.scalars( + await db_session.scalars( select(models.Follower).options(joinedload(models.Follower.actor)) ) ) @@ -107,19 +94,17 @@ def _send_actor_update_if_needed(db_session: Session) -> None: for follower in followers } | { row.recipient - for row in db_session.execute( + for row in await db_session.execute( select(func.distinct(models.OutgoingActivity.recipient).label("recipient")) ) }: # type: ignore - outgoing_activity = models.OutgoingActivity( + await new_outgoing_activity( + db_session, recipient=rcp, outbox_object_id=outbox_object.id, - inbox_object_id=None, ) - db_session.add(outgoing_activity) - - db_session.commit() + await db_session.commit() async def new_outgoing_activity( @@ -183,50 +168,65 @@ def _set_next_try( outgoing_activity.next_try = next_try or _exp_backoff(outgoing_activity.tries) -def process_next_outgoing_activity(db: Session) -> bool: +async def fetch_next_outgoing_activity( + db_session: AsyncSession, + in_fligh: set[int], +) -> models.OutgoingActivity | None: where = [ models.OutgoingActivity.next_try <= now(), models.OutgoingActivity.is_errored.is_(False), models.OutgoingActivity.is_sent.is_(False), + models.OutgoingActivity.id.not_in(in_fligh), ] - q_count = db.scalar(select(func.count(models.OutgoingActivity.id)).where(*where)) + q_count = await db_session.scalar( + select(func.count(models.OutgoingActivity.id)).where(*where) + ) if q_count > 0: logger.info(f"{q_count} outgoing activities ready to process") if not q_count: # logger.debug("No activities to process") - return False + return None - next_activity = db.execute( - select(models.OutgoingActivity) - .where(*where) - .limit(1) - .options( - joinedload(models.OutgoingActivity.inbox_object), - joinedload(models.OutgoingActivity.outbox_object), + next_activity = ( + await db_session.execute( + select(models.OutgoingActivity) + .where(*where) + .limit(1) + .options( + joinedload(models.OutgoingActivity.inbox_object), + joinedload(models.OutgoingActivity.outbox_object), + ) + .order_by(models.OutgoingActivity.next_try) ) - .order_by(models.OutgoingActivity.next_try) ).scalar_one() + return next_activity - next_activity.tries = next_activity.tries + 1 + +async def process_next_outgoing_activity( + db_session: AsyncSession, + next_activity: models.OutgoingActivity, +) -> None: + next_activity.tries = next_activity.tries + 1 # type: ignore next_activity.last_try = now() logger.info(f"recipient={next_activity.recipient}") try: - if next_activity.webmention_target: + if next_activity.webmention_target and next_activity.outbox_object: webmention_payload = { "source": next_activity.outbox_object.url, "target": next_activity.webmention_target, } logger.info(f"{webmention_payload=}") check_url(next_activity.recipient) - resp = httpx.post( - next_activity.recipient, - data=webmention_payload, - headers={ - "User-Agent": config.USER_AGENT, - }, - ) + async with httpx.AsyncClient() as client: + resp = await client.post( + next_activity.recipient, # type: ignore + data=webmention_payload, + headers={ + "User-Agent": config.USER_AGENT, + }, + ) resp.raise_for_status() else: payload = ap.wrap_object_if_needed(next_activity.anybox_object.ap_object) @@ -238,12 +238,12 @@ def process_next_outgoing_activity(db: Session) -> bool: "Delete", ]: # But only if the object is public (to help with deniability/privacy) - if next_activity.outbox_object.visibility == ap.VisibilityEnum.PUBLIC: + if next_activity.outbox_object.visibility == ap.VisibilityEnum.PUBLIC: # type: ignore # noqa: E501 ldsig.generate_signature(payload, k) logger.info(f"{payload=}") - resp = ap.post(next_activity.recipient, payload) + resp = await ap.post(next_activity.recipient, payload) # type: ignore except httpx.HTTPStatusError as http_error: logger.exception("Failed") next_activity.last_status_code = http_error.response.status_code @@ -273,22 +273,31 @@ def process_next_outgoing_activity(db: Session) -> bool: next_activity.last_status_code = resp.status_code next_activity.last_response = resp.text - db.commit() - return True + await db_session.commit() + return None -def loop() -> None: - db = SessionLocal() - _send_actor_update_if_needed(db) - while 1: - try: - process_next_outgoing_activity(db) - except Exception: - logger.exception("Failed to process next outgoing activity") - raise +class OutgoingActivityWorker(Worker[models.OutgoingActivity]): + async def process_message( + self, + db_session: AsyncSession, + next_activity: models.OutgoingActivity, + ) -> None: + await process_next_outgoing_activity(db_session, next_activity) - time.sleep(1) + async def get_next_message( + self, + db_session: AsyncSession, + ) -> models.OutgoingActivity | None: + return await fetch_next_outgoing_activity(db_session, self.in_flight_ids()) + + async def startup(self, db_session: AsyncSession) -> None: + await _send_actor_update_if_needed(db_session) + + +async def loop() -> None: + await OutgoingActivityWorker(workers_count=3).run_forever() if __name__ == "__main__": - loop() + asyncio.run(loop()) diff --git a/app/utils/opengraph.py b/app/utils/opengraph.py index 6a66fe1..7308859 100644 --- a/app/utils/opengraph.py +++ b/app/utils/opengraph.py @@ -7,6 +7,7 @@ import httpx from bs4 import BeautifulSoup # type: ignore from pydantic import BaseModel +from loguru import logger from app import ap_object from app import config from app.actor import LOCAL_ACTOR @@ -64,7 +65,7 @@ async def external_urls( for tag in ro.tags: if tag_href := tag.get("href"): tags_hrefs.add(tag_href) - if tag.get("type") == "Mention" and tag["name"] != LOCAL_ACTOR.handle: + if tag.get("type") == "Mention": mentioned_actor = await fetch_actor(db_session, tag["href"]) tags_hrefs.add(mentioned_actor.url) tags_hrefs.add(mentioned_actor.ap_id) diff --git a/docs/templates/layout.html b/docs/templates/layout.html index f398f3c..65860ea 100644 --- a/docs/templates/layout.html +++ b/docs/templates/layout.html @@ -93,6 +93,7 @@ footer {
  • Source code
  • Bug tracker
  • Mailing list +
  • GitHub mirror diff --git a/tasks.py b/tasks.py index 5a6818f..e2e47b6 100644 --- a/tasks.py +++ b/tasks.py @@ -68,7 +68,7 @@ def process_outgoing_activities(ctx): # type: (Context) -> None from app.outgoing_activities import loop - loop() + asyncio.run(loop()) @task diff --git a/tests/test_inbox.py b/tests/test_inbox.py index c3efc1e..73aded9 100644 --- a/tests/test_inbox.py +++ b/tests/test_inbox.py @@ -12,6 +12,8 @@ from app import activitypub as ap from app import models from app.actor import LOCAL_ACTOR from app.ap_object import RemoteObject +from app.database import AsyncSession +from app.incoming_activities import fetch_next_incoming_activity from app.incoming_activities import process_next_incoming_activity from tests import factories from tests.utils import mock_httpsig_checker @@ -21,6 +23,12 @@ from tests.utils import setup_remote_actor from tests.utils import setup_remote_actor_as_follower +async def _process_next_incoming_activity(db_session: AsyncSession) -> None: + next_activity = await fetch_next_incoming_activity(db_session, set()) + assert next_activity + await process_next_incoming_activity(db_session, next_activity) + + def test_inbox_requires_httpsig( client: TestClient, ): @@ -64,7 +72,7 @@ def test_inbox_incoming_follow_request( # Then the server returns a 204 assert response.status_code == 202 - run_async(process_next_incoming_activity) + run_async(_process_next_incoming_activity) # And the actor was saved in DB saved_actor = db.execute(select(models.Actor)).scalar_one() @@ -122,7 +130,7 @@ def test_inbox_incoming_follow_request__manually_approves_followers( assert response.status_code == 202 with mock.patch("app.boxes.MANUALLY_APPROVES_FOLLOWERS", True): - run_async(process_next_incoming_activity) + run_async(_process_next_incoming_activity) # And the actor was saved in DB saved_actor = db.execute(select(models.Actor)).scalar_one() @@ -177,7 +185,7 @@ def test_inbox_accept_follow_request( # Then the server returns a 204 assert response.status_code == 202 - run_async(process_next_incoming_activity) + run_async(_process_next_incoming_activity) # And the Accept activity was saved in the inbox inbox_activity = db.execute(select(models.InboxObject)).scalar_one() @@ -224,7 +232,7 @@ def test_inbox__create_from_follower( assert response.status_code == 202 # And when processing the incoming activity - run_async(process_next_incoming_activity) + run_async(_process_next_incoming_activity) # Then the Create activity was saved create_activity_from_inbox: models.InboxObject | None = db.execute( @@ -278,7 +286,7 @@ def test_inbox__create_already_deleted_object( assert response.status_code == 202 # And when processing the incoming activity - run_async(process_next_incoming_activity) + run_async(_process_next_incoming_activity) # Then the Create activity was saved create_activity_from_inbox: models.InboxObject | None = db.execute( @@ -334,7 +342,7 @@ def test_inbox__actor_is_blocked( assert response.status_code == 202 # And when processing the incoming activity from a blocked actor - run_async(process_next_incoming_activity) + run_async(_process_next_incoming_activity) # Then the Create activity was discarded assert ( diff --git a/tests/test_process_outgoing_activities.py b/tests/test_process_outgoing_activities.py index a0c303a..8b6e951 100644 --- a/tests/test_process_outgoing_activities.py +++ b/tests/test_process_outgoing_activities.py @@ -5,13 +5,13 @@ import pytest import respx from fastapi.testclient import TestClient from sqlalchemy import select -from sqlalchemy.orm import Session from app import models from app.actor import LOCAL_ACTOR from app.ap_object import RemoteObject from app.database import AsyncSession from app.outgoing_activities import _MAX_RETRIES +from app.outgoing_activities import fetch_next_outgoing_activity from app.outgoing_activities import new_outgoing_activity from app.outgoing_activities import process_next_outgoing_activity from tests import factories @@ -65,15 +65,18 @@ async def test_new_outgoing_activity( assert outgoing_activity.recipient == inbox_url -def test_process_next_outgoing_activity__no_next_activity( - db: Session, +@pytest.mark.asyncio +async def test_process_next_outgoing_activity__no_next_activity( respx_mock: respx.MockRouter, + async_db_session: AsyncSession, ) -> None: - assert process_next_outgoing_activity(db) is False + next_activity = await fetch_next_outgoing_activity(async_db_session, set()) + assert next_activity is None -def test_process_next_outgoing_activity__server_200( - db: Session, +@pytest.mark.asyncio +async def test_process_next_outgoing_activity__server_200( + async_db_session: AsyncSession, respx_mock: respx.MockRouter, ) -> None: # And an outgoing activity @@ -91,19 +94,24 @@ def test_process_next_outgoing_activity__server_200( # When processing the next outgoing activity # Then it is processed - assert process_next_outgoing_activity(db) is True + next_activity = await fetch_next_outgoing_activity(async_db_session, set()) + assert next_activity + await process_next_outgoing_activity(async_db_session, next_activity) assert respx_mock.calls.call_count == 1 - outgoing_activity = db.query(models.OutgoingActivity).one() + outgoing_activity = ( + await async_db_session.execute(select(models.OutgoingActivity)) + ).scalar_one() assert outgoing_activity.is_sent is True assert outgoing_activity.last_status_code == 204 assert outgoing_activity.error is None assert outgoing_activity.is_errored is False -def test_process_next_outgoing_activity__webmention( - db: Session, +@pytest.mark.asyncio +async def test_process_next_outgoing_activity__webmention( + async_db_session: AsyncSession, respx_mock: respx.MockRouter, ) -> None: # And an outgoing activity @@ -121,19 +129,24 @@ def test_process_next_outgoing_activity__webmention( # When processing the next outgoing activity # Then it is processed - assert process_next_outgoing_activity(db) is True + next_activity = await fetch_next_outgoing_activity(async_db_session, set()) + assert next_activity + await process_next_outgoing_activity(async_db_session, next_activity) assert respx_mock.calls.call_count == 1 - outgoing_activity = db.query(models.OutgoingActivity).one() + outgoing_activity = ( + await async_db_session.execute(select(models.OutgoingActivity)) + ).scalar_one() assert outgoing_activity.is_sent is True assert outgoing_activity.last_status_code == 204 assert outgoing_activity.error is None assert outgoing_activity.is_errored is False -def test_process_next_outgoing_activity__error_500( - db: Session, +@pytest.mark.asyncio +async def test_process_next_outgoing_activity__error_500( + async_db_session: AsyncSession, respx_mock: respx.MockRouter, ) -> None: outbox_object = _setup_outbox_object() @@ -152,11 +165,15 @@ def test_process_next_outgoing_activity__error_500( # When processing the next outgoing activity # Then it is processed - assert process_next_outgoing_activity(db) is True + next_activity = await fetch_next_outgoing_activity(async_db_session, set()) + assert next_activity + await process_next_outgoing_activity(async_db_session, next_activity) assert respx_mock.calls.call_count == 1 - outgoing_activity = db.query(models.OutgoingActivity).one() + outgoing_activity = ( + await async_db_session.execute(select(models.OutgoingActivity)) + ).scalar_one() assert outgoing_activity.is_sent is False assert outgoing_activity.last_status_code == 500 assert outgoing_activity.last_response == "oops" @@ -164,8 +181,9 @@ def test_process_next_outgoing_activity__error_500( assert outgoing_activity.tries == 1 -def test_process_next_outgoing_activity__errored( - db: Session, +@pytest.mark.asyncio +async def test_process_next_outgoing_activity__errored( + async_db_session: AsyncSession, respx_mock: respx.MockRouter, ) -> None: outbox_object = _setup_outbox_object() @@ -185,22 +203,28 @@ def test_process_next_outgoing_activity__errored( # When processing the next outgoing activity # Then it is processed - assert process_next_outgoing_activity(db) is True + next_activity = await fetch_next_outgoing_activity(async_db_session, set()) + assert next_activity + await process_next_outgoing_activity(async_db_session, next_activity) assert respx_mock.calls.call_count == 1 - outgoing_activity = db.query(models.OutgoingActivity).one() + outgoing_activity = ( + await async_db_session.execute(select(models.OutgoingActivity)) + ).scalar_one() assert outgoing_activity.is_sent is False assert outgoing_activity.last_status_code == 500 assert outgoing_activity.last_response == "oops" assert outgoing_activity.is_errored is True # And it is skipped from processing - assert process_next_outgoing_activity(db) is False + next_activity = await fetch_next_outgoing_activity(async_db_session, set()) + assert next_activity is None -def test_process_next_outgoing_activity__connect_error( - db: Session, +@pytest.mark.asyncio +async def test_process_next_outgoing_activity__connect_error( + async_db_session: AsyncSession, respx_mock: respx.MockRouter, ) -> None: outbox_object = _setup_outbox_object() @@ -217,11 +241,15 @@ def test_process_next_outgoing_activity__connect_error( # When processing the next outgoing activity # Then it is processed - assert process_next_outgoing_activity(db) is True + next_activity = await fetch_next_outgoing_activity(async_db_session, set()) + assert next_activity + await process_next_outgoing_activity(async_db_session, next_activity) assert respx_mock.calls.call_count == 1 - outgoing_activity = db.query(models.OutgoingActivity).one() + outgoing_activity = ( + await async_db_session.execute(select(models.OutgoingActivity)) + ).scalar_one() assert outgoing_activity.is_sent is False assert outgoing_activity.error is not None assert outgoing_activity.tries == 1