import base64 from datetime import datetime from datetime import timezone from functools import lru_cache from typing import Any from typing import Callable from urllib.parse import urlparse import bleach import emoji import html2text import humanize from bs4 import BeautifulSoup # type: ignore from dateutil.parser import parse from fastapi import Request from fastapi.templating import Jinja2Templates from loguru import logger from sqlalchemy import func from sqlalchemy import select from starlette.templating import _TemplateResponse as TemplateResponse from app import activitypub as ap from app import config from app import models from app.actor import LOCAL_ACTOR from app.ap_object import Attachment from app.ap_object import Object from app.config import BASE_URL from app.config import DEBUG from app.config import VERSION from app.config import generate_csrf_token from app.config import session_serializer from app.database import AsyncSession from app.media import proxied_media_url from app.utils import privacy_replace from app.utils.datetime import now from app.utils.highlight import HIGHLIGHT_CSS from app.utils.highlight import highlight _templates = Jinja2Templates( directory="app/templates", trim_blocks=True, lstrip_blocks=True, ) H2T = html2text.HTML2Text() H2T.ignore_links = True H2T.ignore_images = True def _filter_domain(text: str) -> str: hostname = urlparse(text).hostname if not hostname: raise ValueError(f"No hostname for {text}") return hostname def _media_proxy_url(url: str | None) -> str: if not url: return "/static/nopic.png" if url.startswith(BASE_URL): return url encoded_url = base64.urlsafe_b64encode(url.encode()).decode() return f"/proxy/media/{encoded_url}" def is_current_user_admin(request: Request) -> bool: is_admin = False session_cookie = request.cookies.get("session") if session_cookie: try: loaded_session = session_serializer.loads( session_cookie, max_age=3600 * 12, ) except Exception: pass else: is_admin = loaded_session.get("is_logged_in") return is_admin async def render_template( db_session: AsyncSession, request: Request, template: str, template_args: dict[str, Any] | None = None, ) -> TemplateResponse: if template_args is None: template_args = {} is_admin = False is_admin = is_current_user_admin(request) return _templates.TemplateResponse( template, { "request": request, "debug": DEBUG, "microblogpub_version": VERSION, "is_admin": is_admin, "csrf_token": generate_csrf_token() if is_admin else None, "highlight_css": HIGHLIGHT_CSS, "visibility_enum": ap.VisibilityEnum, "notifications_count": await db_session.scalar( select(func.count(models.Notification.id)).where( models.Notification.is_new.is_(True) ) ) if is_admin else 0, "articles_count": await db_session.scalar( select(func.count(models.OutboxObject.id)).where( models.OutboxObject.visibility == ap.VisibilityEnum.PUBLIC, models.OutboxObject.is_deleted.is_(False), models.OutboxObject.is_hidden_from_homepage.is_(False), models.OutboxObject.ap_type == "Article", ) ), "local_actor": LOCAL_ACTOR, "followers_count": await db_session.scalar( select(func.count(models.Follower.id)) ), "following_count": await db_session.scalar( select(func.count(models.Following.id)) ), "actor_types": ap.ACTOR_TYPES, **template_args, }, ) # HTML/templates helper ALLOWED_TAGS = [ "a", "abbr", "acronym", "b", "br", "blockquote", "code", "pre", "em", "i", "li", "ol", "strong", "sup", "sub", "del", "ul", "span", "div", "p", "h1", "h2", "h3", "h4", "h5", "h6", "table", "th", "tr", "td", "thead", "tbody", "tfoot", "colgroup", "caption", "img", "div", "span", ] ALLOWED_CSS_CLASSES = [ "highlight", "codehilite", "hll", "c", "err", "g", "k", "l", "n", "o", "x", "p", "ch", "cm", "cp", "cpf", "c1", "cs", "gd", "ge", "gr", "gh", "gi", "go", "gp", "gs", "gu", "gt", "kc", "kd", "kn", "kp", "kr", "kt", "ld", "m", "s", "na", "nb", "nc", "no", "nd", "ni", "ne", "nf", "nl", "nn", "nx", "py", "nt", "nv", "ow", "w", "mb", "mf", "mh", "mi", "mo", "sa", "sb", "sc", "dl", "sd", "s2", "se", "sh", "si", "sx", "sr", "s1", "ss", "bp", "fm", "vc", "vg", "vi", "vm", "il", ] def _allow_class(_tag: str, name: str, value: str) -> bool: return name == "class" and value in ALLOWED_CSS_CLASSES def _allow_img_attrs(_tag: str, name: str, value: str) -> bool: if name in ["src", "alt", "title"]: return True if name == "class" and value == "inline-img": return True return False ALLOWED_ATTRIBUTES: dict[str, list[str] | Callable[[str, str, str], bool]] = { "a": ["href", "title"], "abbr": ["title"], "acronym": ["title"], "img": _allow_img_attrs, "div": _allow_class, "span": _allow_class, "code": _allow_class, } @lru_cache(maxsize=256) def _update_inline_imgs(content): soup = BeautifulSoup(content, "html5lib") imgs = soup.find_all("img") if not imgs: return content for img in imgs: if not img.attrs.get("src"): continue img.attrs["src"] = _media_proxy_url(img.attrs["src"]) + "/740" img["class"] = "inline-img" return soup.find("body").decode_contents() def _clean_html(html: str, note: Object) -> str: if html is None: logger.error(f"{html=} for {note.ap_id}/{note.ap_object}") return "" try: return _emojify( _replace_custom_emojis( bleach.clean( _update_inline_imgs(highlight(html)), tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRIBUTES, strip=True, ), note, ), is_local=note.ap_id.startswith(BASE_URL), ) except Exception: raise def _timeago(original_dt: datetime) -> str: dt = original_dt if dt.tzinfo: dt = dt.astimezone(timezone.utc).replace(tzinfo=None) return humanize.naturaltime(dt, when=now().replace(tzinfo=None)) def _has_media_type(attachment: Attachment, media_type_prefix: str) -> bool: if attachment.media_type: return attachment.media_type.startswith(media_type_prefix) return False def _format_date(dt: datetime) -> str: return dt.strftime("%b %d, %Y, %H:%M") def _pluralize(count: int, singular: str = "", plural: str = "s") -> str: if count > 1: return plural else: return singular def _replace_custom_emojis(content: str, note: Object) -> str: idx = {} for tag in note.tags: if tag.get("type") == "Emoji": try: idx[tag["name"]] = proxied_media_url(tag["icon"]["url"]) except KeyError: logger.warning(f"Failed to parse custom emoji {tag=}") continue for emoji_name, emoji_url in idx.items(): content = content.replace( emoji_name, f'{emoji_name}', # noqa: E501 ) return content def _html2text(content: str) -> str: return H2T.handle(content) def _replace_emoji(u: str, _) -> str: filename = hex(ord(u))[2:] return config.EMOJI_TPL.format(filename=filename, raw=u) def _emojify(text: str, is_local: bool) -> str: if not is_local: return text return emoji.replace_emoji( text, replace=_replace_emoji, ) def _parse_datetime(dt: str) -> datetime: return parse(dt) def _poll_item_pct(item: ap.RawObject, voters_count: int) -> int: if voters_count == 0: return 0 return int(item["replies"]["totalItems"] * 100 / voters_count) _templates.env.filters["domain"] = _filter_domain _templates.env.filters["media_proxy_url"] = _media_proxy_url _templates.env.filters["clean_html"] = _clean_html _templates.env.filters["timeago"] = _timeago _templates.env.filters["format_date"] = _format_date _templates.env.filters["has_media_type"] = _has_media_type _templates.env.filters["html2text"] = _html2text _templates.env.filters["emojify"] = _emojify _templates.env.filters["pluralize"] = _pluralize _templates.env.filters["parse_datetime"] = _parse_datetime _templates.env.filters["poll_item_pct"] = _poll_item_pct _templates.env.filters["privacy_replace_url"] = privacy_replace.replace_url