From 793a939046a25c165e5094118b51a4f9f6a602a5 Mon Sep 17 00:00:00 2001 From: Thomas Sileo Date: Sun, 13 Nov 2022 13:00:22 +0100 Subject: [PATCH] Fix OG metadata scraping and improve workers --- app/incoming_activities.py | 11 +++++++---- app/utils/opengraph.py | 16 +++++++++++++++- app/utils/workers.py | 2 +- poetry.lock | 14 +++++++++++++- pyproject.toml | 1 + 5 files changed, 37 insertions(+), 7 deletions(-) diff --git a/app/incoming_activities.py b/app/incoming_activities.py index dc5beda..583b208 100644 --- a/app/incoming_activities.py +++ b/app/incoming_activities.py @@ -3,7 +3,6 @@ import traceback from datetime import datetime from datetime import timedelta -import httpx from loguru import logger from sqlalchemy import func from sqlalchemy import select @@ -108,6 +107,7 @@ async def process_next_incoming_activity( next_activity.tries = next_activity.tries + 1 next_activity.last_try = now() + await db_session.commit() if next_activity.ap_object and next_activity.sent_by_ap_actor_id: try: @@ -120,13 +120,16 @@ async def process_next_incoming_activity( ), timeout=60, ) - except httpx.TimeoutException as exc: - url = exc._request.url if exc._request else None - logger.error(f"Failed, HTTP timeout when fetching {url}") + except asyncio.exceptions.TimeoutError: + logger.error("Activity took too long to process") + await db_session.rollback() + await db_session.refresh(next_activity) next_activity.error = traceback.format_exc() _set_next_try(next_activity) except Exception: logger.exception("Failed") + await db_session.rollback() + await db_session.refresh(next_activity) next_activity.error = traceback.format_exc() _set_next_try(next_activity) else: diff --git a/app/utils/opengraph.py b/app/utils/opengraph.py index 3fb034b..24eb873 100644 --- a/app/utils/opengraph.py +++ b/app/utils/opengraph.py @@ -1,12 +1,15 @@ import asyncio import mimetypes import re +import signal +from concurrent.futures import TimeoutError from typing import Any from urllib.parse import urlparse import httpx from bs4 import BeautifulSoup # type: ignore from loguru import logger +from pebble import concurrent # type: ignore from pydantic import BaseModel from app import activitypub as ap @@ -29,7 +32,11 @@ class OpenGraphMeta(BaseModel): site_name: str +@concurrent.process(timeout=5) def _scrap_og_meta(url: str, html: str) -> OpenGraphMeta | None: + # Prevent SIGTERM to bubble up to the worker + signal.signal(signal.SIGTERM, signal.SIG_IGN) + soup = BeautifulSoup(html, "html5lib") ogs = { og.attrs["property"]: og.attrs.get("content") @@ -58,6 +65,10 @@ def _scrap_og_meta(url: str, html: str) -> OpenGraphMeta | None: return OpenGraphMeta.parse_obj(raw) +def scrap_og_meta(url: str, html: str) -> OpenGraphMeta | None: + return _scrap_og_meta(url, html).result() + + async def external_urls( db_session: AsyncSession, ro: ap_object.RemoteObject | OutboxObject | InboxObject, @@ -126,7 +137,10 @@ async def _og_meta_from_url(url: str) -> OpenGraphMeta | None: return None try: - return _scrap_og_meta(url, resp.text) + return scrap_og_meta(url, resp.text) + except TimeoutError: + logger.info(f"Timed out when scraping OG meta for {url}") + return None except Exception: logger.info(f"Failed to scrap OG meta for {url}") return None diff --git a/app/utils/workers.py b/app/utils/workers.py index 3d1a859..f1ef317 100644 --- a/app/utils/workers.py +++ b/app/utils/workers.py @@ -69,5 +69,5 @@ class Worker(Generic[T]): logger.info("stopping loop") async def _shutdown(self, sig: signal.Signals) -> None: - logger.info(f"Caught {signal=}") + logger.info(f"Caught {sig=}") self._stop_event.set() diff --git a/poetry.lock b/poetry.lock index d2255a8..bb6e460 100644 --- a/poetry.lock +++ b/poetry.lock @@ -689,6 +689,14 @@ category = "dev" optional = false python-versions = ">=3.7" +[[package]] +name = "pebble" +version = "5.0.2" +description = "Threading and multiprocessing eye-candy." +category = "main" +optional = false +python-versions = ">=3.6" + [[package]] name = "pillow" version = "9.3.0" @@ -1263,7 +1271,7 @@ dev = ["pytest (>=4.6.2)", "black (>=19.3b0)"] [metadata] lock-version = "1.1" python-versions = "^3.10" -content-hash = "89df524a545a19a20440d1872c93151bbf3f68d3b3d20cc50bc9049dd0e6d25f" +content-hash = "13a1f5fc3f65c56e753062dca6ab74a50f7270d78a08ebf6297f7b4fa26b5eac" [metadata.files] aiosqlite = [ @@ -1871,6 +1879,10 @@ pathspec = [ {file = "pathspec-0.10.1-py3-none-any.whl", hash = "sha256:46846318467efc4556ccfd27816e004270a9eeeeb4d062ce5e6fc7a87c573f93"}, {file = "pathspec-0.10.1.tar.gz", hash = "sha256:7ace6161b621d31e7902eb6b5ae148d12cfd23f4a249b9ffb6b9fee12084323d"}, ] +pebble = [ + {file = "Pebble-5.0.2-py3-none-any.whl", hash = "sha256:61b2dfd52b1a8c083b4e6cf3e0f1ff2e8a430a6283c53969a7057a1c91bed3cd"}, + {file = "Pebble-5.0.2.tar.gz", hash = "sha256:9c58c03eaf920c31287444c6fef39dc53baeac9de221ead104f5c9b48e8bd587"}, +] pillow = [ {file = "Pillow-9.3.0-cp310-cp310-macosx_10_10_x86_64.whl", hash = "sha256:0b7257127d646ff8676ec8a15520013a698d1fdc48bc2a79ba4e53df792526f2"}, {file = "Pillow-9.3.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:b90f7616ea170e92820775ed47e136208e04c967271c9ef615b6fbd08d9af0e3"}, diff --git a/pyproject.toml b/pyproject.toml index 954e88b..947fc81 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,6 +44,7 @@ uvicorn = {extras = ["standard"], version = "^0.18.3"} Brotli = "^1.0.9" greenlet = "^1.1.3" mistletoe = "^0.9.0" +Pebble = "^5.0.2" [tool.poetry.dev-dependencies] black = "^22.3.0"