From 59af633c6c795e6e94ed4c4bc67d4a6f65a6eced Mon Sep 17 00:00:00 2001 From: Thomas Sileo Date: Mon, 15 Aug 2022 19:20:56 +0200 Subject: [PATCH] Prefetch some notes when following an actor --- app/activitypub.py | 11 ++++++++--- app/actor.py | 4 ++++ app/boxes.py | 38 ++++++++++++++++++++++++++++++++++++++ tests/utils.py | 13 +++++++++++++ 4 files changed, 63 insertions(+), 3 deletions(-) diff --git a/app/activitypub.py b/app/activitypub.py index 77edd68..4b79e8b 100644 --- a/app/activitypub.py +++ b/app/activitypub.py @@ -166,6 +166,7 @@ async def parse_collection( # noqa: C901 url: str | None = None, payload: RawObject | None = None, level: int = 0, + limit: int = 0, ) -> list[RawObject]: """Resolve/fetch a `Collection`/`OrderedCollection`.""" if level > 3: @@ -193,7 +194,9 @@ async def parse_collection( # noqa: C901 if "first" in payload: if isinstance(payload["first"], str): out.extend( - await parse_collection(url=payload["first"], level=level + 1) + await parse_collection( + url=payload["first"], level=level + 1, limit=limit + ) ) else: if "orderedItems" in payload["first"]: @@ -202,7 +205,9 @@ async def parse_collection( # noqa: C901 out.extend(payload["first"]["items"]) n = payload["first"].get("next") if n: - out.extend(await parse_collection(url=n, level=level + 1)) + out.extend( + await parse_collection(url=n, level=level + 1, limit=limit) + ) return out while payload: @@ -212,7 +217,7 @@ async def parse_collection( # noqa: C901 if "items" in payload: out.extend(payload["items"]) n = payload.get("next") - if n is None: + if n is None or (limit > 0 and len(out) >= limit): break payload = await fetch(n) else: diff --git a/app/actor.py b/app/actor.py index 9958bc9..17d8f4a 100644 --- a/app/actor.py +++ b/app/actor.py @@ -68,6 +68,10 @@ class Actor: def inbox_url(self) -> str: return self.ap_actor["inbox"] + @property + def outbox_url(self) -> str: + return self.ap_actor["outbox"] + @property def shared_inbox_url(self) -> str: return self.ap_actor.get("endpoints", {}).get("sharedInbox") or self.inbox_url diff --git a/app/boxes.py b/app/boxes.py index 85eb0a0..3ccb83f 100644 --- a/app/boxes.py +++ b/app/boxes.py @@ -132,6 +132,8 @@ async def send_like(db_session: AsyncSession, ap_object_id: str) -> None: raw_object = await ap.fetch(ap.get_id(ap_object_id)) await save_object_to_inbox(db_session, raw_object) await db_session.commit() + # XXX: we need to reload it as lazy-loading the actor will fail + # (asyncio SQLAlchemy issue) inbox_object = await get_inbox_object_by_ap_id(db_session, ap_object_id) if not inbox_object: raise ValueError("Should never happen") @@ -165,6 +167,8 @@ async def send_announce(db_session: AsyncSession, ap_object_id: str) -> None: raw_object = await ap.fetch(ap.get_id(ap_object_id)) await save_object_to_inbox(db_session, raw_object) await db_session.commit() + # XXX: we need to reload it as lazy-loading the actor will fail + # (asyncio SQLAlchemy issue) inbox_object = await get_inbox_object_by_ap_id(db_session, ap_object_id) if not inbox_object: raise ValueError("Should never happen") @@ -1615,6 +1619,9 @@ async def save_to_inbox( inbox_object_id=inbox_object.id, ) db_session.add(notif) + if activity_ro.ap_type == "Accept": + # Pre-fetch the latest activities + await _prefetch_actor_outbox(db_session, actor) else: logger.info( "Received an Accept for an unsupported activity: " @@ -1750,10 +1757,41 @@ async def save_to_inbox( await db_session.commit() +async def _prefetch_actor_outbox( + db_session: AsyncSession, + actor: models.Actor, +) -> None: + """Try to fetch some notes to fill the stream""" + saved = 0 + outbox = await ap.parse_collection(actor.outbox_url, limit=20) + for activity in outbox: + activity_id = ap.get_id(activity) + raw_activity = await ap.fetch(activity_id) + if ap.as_list(raw_activity["type"])[0] == "Create": + obj = await ap.get_object(raw_activity) + saved_inbox_object = await get_inbox_object_by_ap_id( + db_session, ap.get_id(obj) + ) + if not saved_inbox_object: + saved_inbox_object = await save_object_to_inbox(db_session, obj) + + if not saved_inbox_object.in_reply_to: + saved_inbox_object.is_hidden_from_stream = False + saved += 1 + + if saved >= 5: + break + + # commit is performed by the called + + async def save_object_to_inbox( db_session: AsyncSession, raw_object: ap.RawObject, ) -> models.InboxObject: + """Used to save unknown object before intetacting with them, i.e. to like + an object that was looked up, or prefill the inbox when an actor accepted + a follow request.""" obj_actor = await fetch_actor(db_session, ap.get_actor_id(raw_object)) ro = RemoteObject(raw_object, actor=obj_actor) diff --git a/tests/utils.py b/tests/utils.py index 2dd53e0..5bb47db 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -7,6 +7,7 @@ import fastapi import httpx import respx +from app import activitypub as ap from app import actor from app import httpsig from app import models @@ -45,6 +46,18 @@ def setup_remote_actor(respx_mock: respx.MockRouter) -> actor.RemoteActor: username="toto", public_key="pk", ) + respx_mock.get(ra.ap_id + "/outbox").mock( + return_value=httpx.Response( + 200, + json={ + "@context": ap.AS_EXTENDED_CTX, + "id": f"{ra.ap_id}/outbox", + "type": "OrderedCollection", + "totalItems": 0, + "orderedItems": [], + }, + ) + ) respx_mock.get(ra.ap_id).mock(return_value=httpx.Response(200, json=ra.ap_actor)) return ra