microblog.pub/app/incoming_activities.py

163 lines
4.7 KiB
Python
Raw Normal View History

2022-07-14 08:44:04 +02:00
import asyncio
import traceback
from datetime import datetime
from datetime import timedelta
2022-08-20 09:11:48 +02:00
import httpx
2022-07-14 08:44:04 +02:00
from loguru import logger
from sqlalchemy import func
from sqlalchemy import select
from app import activitypub as ap
from app import httpsig
from app import ldsig
2022-07-14 08:44:04 +02:00
from app import models
from app.boxes import save_to_inbox
from app.database import AsyncSession
from app.utils.datetime import now
from app.utils.workers import Worker
2022-07-14 08:44:04 +02:00
2022-07-16 07:50:15 +02:00
_MAX_RETRIES = 8
2022-07-14 08:44:04 +02:00
async def new_ap_incoming_activity(
db_session: AsyncSession,
httpsig_info: httpsig.HTTPSigInfo,
raw_object: ap.RawObject,
) -> models.IncomingActivity | None:
ap_id: str
2022-09-13 21:02:47 +02:00
if "id" not in raw_object or ap.as_list(raw_object["type"])[0] in ap.ACTOR_TYPES:
if "@context" not in raw_object:
logger.warning(f"Dropping invalid object: {raw_object}")
return None
else:
# This is a transient object, Build the JSON LD hash as the ID
ap_id = ldsig._doc_hash(raw_object)
else:
ap_id = ap.get_id(raw_object)
# TODO(ts): dedup first
2022-07-14 08:44:04 +02:00
incoming_activity = models.IncomingActivity(
sent_by_ap_actor_id=httpsig_info.signed_by_ap_actor_id,
ap_id=ap_id,
2022-07-14 08:44:04 +02:00
ap_object=raw_object,
)
db_session.add(incoming_activity)
await db_session.commit()
await db_session.refresh(incoming_activity)
return incoming_activity
def _exp_backoff(tries: int) -> datetime:
seconds = 2 * (2 ** (tries - 1))
return now() + timedelta(seconds=seconds)
def _set_next_try(
outgoing_activity: models.IncomingActivity,
next_try: datetime | None = None,
) -> None:
if not outgoing_activity.tries:
raise ValueError("Should never happen")
if outgoing_activity.tries == _MAX_RETRIES:
outgoing_activity.is_errored = True
outgoing_activity.next_try = None
else:
outgoing_activity.next_try = next_try or _exp_backoff(outgoing_activity.tries)
async def fetch_next_incoming_activity(
db_session: AsyncSession,
) -> models.IncomingActivity | None:
2022-07-14 08:44:04 +02:00
where = [
models.IncomingActivity.next_try <= now(),
models.IncomingActivity.is_errored.is_(False),
models.IncomingActivity.is_processed.is_(False),
]
q_count = await db_session.scalar(
select(func.count(models.IncomingActivity.id)).where(*where)
)
if q_count > 0:
logger.info(f"{q_count} incoming activities ready to process")
2022-07-14 08:44:04 +02:00
if not q_count:
# logger.debug("No activities to process")
return None
2022-07-14 08:44:04 +02:00
next_activity = (
await db_session.execute(
select(models.IncomingActivity)
.where(*where)
.limit(1)
.order_by(models.IncomingActivity.next_try.asc())
)
).scalar_one()
return next_activity
async def process_next_incoming_activity(
db_session: AsyncSession,
next_activity: models.IncomingActivity,
) -> None:
2022-07-16 07:50:15 +02:00
logger.info(
f"incoming_activity={next_activity.ap_object}/"
f"{next_activity.sent_by_ap_actor_id}"
)
2022-07-14 08:44:04 +02:00
next_activity.tries = next_activity.tries + 1
next_activity.last_try = now()
if next_activity.ap_object and next_activity.sent_by_ap_actor_id:
try:
async with db_session.begin_nested():
2022-08-28 19:05:06 +02:00
await asyncio.wait_for(
save_to_inbox(
db_session,
next_activity.ap_object,
next_activity.sent_by_ap_actor_id,
),
timeout=60,
)
2022-08-20 09:11:48 +02:00
except httpx.TimeoutException as exc:
url = exc._request.url if exc._request else None
2022-08-21 09:22:14 +02:00
logger.error(f"Failed, HTTP timeout when fetching {url}")
2022-08-20 09:11:48 +02:00
next_activity.error = traceback.format_exc()
_set_next_try(next_activity)
except Exception:
logger.exception("Failed")
next_activity.error = traceback.format_exc()
_set_next_try(next_activity)
else:
logger.info("Success")
next_activity.is_processed = True
# FIXME: webmention support
2022-07-14 08:44:04 +02:00
await db_session.commit()
return None
class IncomingActivityWorker(Worker[models.IncomingActivity]):
async def process_message(
self,
db_session: AsyncSession,
next_activity: models.IncomingActivity,
) -> None:
await process_next_incoming_activity(db_session, next_activity)
async def get_next_message(
self,
db_session: AsyncSession,
) -> models.IncomingActivity | None:
2022-08-11 12:24:17 +02:00
return await fetch_next_incoming_activity(db_session)
2022-07-14 08:44:04 +02:00
async def loop() -> None:
2022-08-11 12:24:17 +02:00
await IncomingActivityWorker().run_forever()
2022-07-14 08:44:04 +02:00
if __name__ == "__main__":
asyncio.run(loop())