From 0a8f5269e4dd6074337d7b410df65d51d481dbd0 Mon Sep 17 00:00:00 2001 From: octospacc Date: Sun, 13 Jul 2025 01:20:07 +0200 Subject: [PATCH] Initial release version --- .gitignore | 3 + app.py | 319 ++++++++++++++++++------------------ package-lock.json | 18 ++ package.json | 5 + requirements.txt | 4 +- templates/add.html | 33 ++-- templates/base.html | 45 ++--- templates/index.html | 3 +- templates/item-card.html | 6 + templates/item-content.html | 7 + templates/item.html | 47 +++--- templates/login.html | 15 +- templates/remove.html | 9 +- templates/results.html | 17 +- templates/search.html | 2 +- templates/user.html | 26 ++- 16 files changed, 310 insertions(+), 249 deletions(-) create mode 100644 package-lock.json create mode 100644 package.json create mode 100644 templates/item-card.html create mode 100644 templates/item-content.html diff --git a/.gitignore b/.gitignore index 3a85cd3..9e7126e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,5 @@ +/_config.py /data/ +/node_modules/ +/.mypy_cache/ *.pyc \ No newline at end of file diff --git a/app.py b/app.py index 902fe68..5cb189f 100644 --- a/app.py +++ b/app.py @@ -1,24 +1,36 @@ import os -import re import requests -import configparser import urllib.parse -import xml.etree.ElementTree as ElementTree +from typing import Any from io import StringIO +from configparser import ConfigParser from bs4 import BeautifulSoup from flask import Flask, request, redirect, render_template, send_from_directory, abort, url_for, flash -from flask_bcrypt import Bcrypt -from flask_login import LoginManager, UserMixin, current_user, login_user, logout_user, login_required -from flask_wtf import FlaskForm -from wtforms import StringField, PasswordField, SubmitField -from wtforms.validators import DataRequired +from flask_bcrypt import Bcrypt # type: ignore[import-untyped] +from flask_login import LoginManager, UserMixin, current_user, login_user, logout_user, login_required # type: ignore[import-untyped] +from flask_wtf import FlaskForm # type: ignore[import-untyped] +from wtforms import StringField, PasswordField, SubmitField # type: ignore[import-untyped] +from wtforms.validators import DataRequired # type: ignore[import-untyped] from glob import glob from pathlib import Path from datetime import datetime -from snowflake import Snowflake, SnowflakeGenerator +from snowflake import Snowflake, SnowflakeGenerator # type: ignore[import-untyped] + +SECRET_KEY = "SECRET_KEY" # import secrets; print(secrets.token_urlsafe()) +DEVELOPMENT = True +HTTP_PORT = 5000 +HTTP_THREADS = 32 +LINKS_PREFIX = "" + +from _config import * app = Flask(__name__) -app.config["SECRET_KEY"] = "your_secret_key" # TODO: fix this for prod +app.config["LINKS_PREFIX"] = LINKS_PREFIX +app.config["APP_NAME"] = "Pignio" +app.config["APP_ICON"] = "📌" +app.config["SECRET_KEY"] = SECRET_KEY +app.config["BCRYPT_HANDLE_LONG_PASSWORDS"] = True + login_manager = LoginManager() login_manager.login_view = "login" login_manager.init_app(app) @@ -30,19 +42,17 @@ snowflake = SnowflakeGenerator(1, epoch=snowflake_epoch) DATA_ROOT = "data" ITEMS_ROOT = f"{DATA_ROOT}/items" USERS_ROOT = f"{DATA_ROOT}/users" -MEDIA_ROOT = f"{DATA_ROOT}/items" EXTENSIONS = { "images": ("jpg", "jpeg", "png", "gif", "webp", "avif"), "videos": ("mp4", "mov", "mpeg", "ogv", "webm", "mkv"), } -ITEMS_EXT = ".pignio" +ITEMS_EXT = ".ini" class User(UserMixin): def __init__(self, username, filepath): self.username = username self.filepath = filepath - with open(filepath, "r") as f: - self.data = read_metadata(f.read()) + self.data = read_metadata(read_textual(filepath)) def get_id(self): return self.username @@ -54,11 +64,15 @@ class LoginForm(FlaskForm): @app.route("/") def index(): - return render_template("index.html", media=walk_items()) + return render_template("index.html", items=walk_items()) + +@app.route("/static/module//") +def serve_module(module:str, filename:str): + return send_from_directory(os.path.join("node_modules", module, "dist"), filename) @app.route("/media/") def serve_media(filename:str): - return send_from_directory(MEDIA_ROOT, filename) + return send_from_directory(ITEMS_ROOT, filename) @app.route("/item/") def view_item(iid:str): @@ -77,18 +91,18 @@ def view_user(username:str): @app.route("/search") def search(): query = request.args.get("query", "").lower() + found = False results = {} for folder, items in walk_items().items(): results[folder] = [] for item in items: - image = item["id"] - meta = load_sider_metadata(image) or {} - if any([query in text.lower() for text in [image, *meta.values()]]): - results[folder].append(image) + if any([query in text.lower() for text in item.values()]): + results[folder].append(item) + found = True - return render_template("search.html", media=results, query=query) + return render_template("search.html", items=(results if found else None), query=query) @app.route("/add", methods=["GET", "POST"]) @login_required @@ -102,15 +116,12 @@ def add_item(): elif request.method == "POST": iid = request.form.get("id") or generate_iid() + data = {key: request.form[key] for key in ["link", "title", "description", "image", "text"]} - store_item(iid, { - "link": request.form.get("link"), - "title": request.form.get("title"), - "description": request.form.get("description"), - "image": request.form.get("image"), - }, request.files) - - return redirect(url_for("view_item", iid=iid)) + if store_item(iid, data, request.files): + return redirect(url_for("view_item", iid=iid)) + else: + flash("Cannot save item", "danger") return render_template("add.html", item=item) @@ -130,19 +141,9 @@ def remove_item(): abort(404) - # iid = request.args.get("item") - # item = load_item(iid) - # if not item: - # abort(404) - # if request.method == "GET": - # return render_template("remove.html", item=item) - # elif request.method == "POST": - # delete_item(item) - # return redirect(url_for("index")) - @app.route("/api/preview") @login_required -def preview(): +def link_preview(): return fetch_url_data(request.args.get("url")) @app.errorhandler(404) @@ -164,8 +165,7 @@ def login(): if pass_equals or hash_equals: if pass_equals: user.data["password"] = bcrypt.generate_password_hash(user.data["password"]).decode("utf-8") - with open(user.filepath, "w") as f: - f.write(write_metadata(user.data)) + write_textual(user.filepath, write_metadata(user.data)) login_user(user) # next_url = flask.request.args.get('next') # if not url_has_allowed_host_and_scheme(next_url, request.host): return flask.abort(400) @@ -176,9 +176,9 @@ def login(): return render_template("login.html", form=form) @app.route("/logout") -@login_required def logout(): - logout_user() + if current_user.is_authenticated: + logout_user() return redirect(url_for("index")) @login_manager.user_loader @@ -190,32 +190,13 @@ def load_user(username:str): def walk_items(): results, iids = {}, {} - for root, dirs, files in os.walk(MEDIA_ROOT): - rel_path = os.path.relpath(root, MEDIA_ROOT).replace(os.sep, "/") + for root, dirs, files in os.walk(ITEMS_ROOT): + rel_path = os.path.relpath(root, ITEMS_ROOT).replace(os.sep, "/") if rel_path == ".": rel_path = "" results[rel_path], iids[rel_path] = [], [] - # for file in files: - # if file.lower().endswith(tuple([f".{ext}" for ext in EXTENSIONS["images"]])): - # iid = strip_ext(os.path.join(rel_path, file).replace(os.sep, "/")) - # image = os.path.join(rel_path, file).replace(os.sep, "/") - # data = load_sider_metadata(image) or {} - # data["image"] = image - # data["id"] = iid - # results[rel_path].append(data) - # files.remove(file) - - # for file in files: - # if file.lower().endswith(ITEMS_EXT): - # iid = strip_ext(os.path.join(rel_path, file).replace(os.sep, "/")) - # with open(os.path.join(MEDIA_ROOT, rel_path, file), "r") as f: - # data = read_metadata(f.read()) - # data["id"] = iid - # results[rel_path].append(data) - # files.remove(file) - for file in files: #if file.lower().endswith(ITEMS_EXT) or file.lower().endswith(tuple([f".{ext}" for ext in EXTENSIONS["images"]])): iid = strip_ext(os.path.join(rel_path, file).replace(os.sep, "/")) @@ -229,14 +210,14 @@ def walk_items(): return results -def walk_collections(username:str=None): - results = {"": []} - +def walk_collections(username:str): + results: dict[str, list[str]] = {"": []} filepath = USERS_ROOT - if username: - filepath = os.path.join(filepath, username) - results[""] = read_metadata(read_textual(filepath + ITEMS_EXT))["items"].strip().replace(" ", "\n").splitlines() + # if username: + filepath = os.path.join(filepath, username) + data = read_metadata(read_textual(filepath + ITEMS_EXT)) + results[""] = data["items"] if "items" in data else [] # for root, dirs, files in os.walk(filepath): # rel_path = os.path.relpath(root, filepath).replace(os.sep, "/") @@ -266,7 +247,7 @@ def filename_to_iid(iid:str): def load_item(iid:str): iid = filename_to_iid(iid) filename = iid_to_filename(iid) - filepath = os.path.join(MEDIA_ROOT, filename) + filepath = os.path.join(ITEMS_ROOT, filename) files = glob(f"{filepath}.*") if len(files): @@ -274,89 +255,21 @@ def load_item(iid:str): for file in files: if file.lower().endswith(ITEMS_EXT): - # with open(file, "r", encoding="utf-8") as f: - # data = data | read_metadata(f.read()) data = data | read_metadata(read_textual(file)) elif file.lower().endswith(tuple([f".{ext}" for ext in EXTENSIONS["images"]])): - data["image"] = file.replace(os.sep, "/").removeprefix(f"{MEDIA_ROOT}/") + data["image"] = file.replace(os.sep, "/").removeprefix(f"{ITEMS_ROOT}/") return data -def load_sider_metadata(filename:str): - filepath = os.path.join(MEDIA_ROOT, f"{strip_ext(filename)}{ITEMS_EXT}") - if os.path.exists(filepath): - with open(filepath, "r") as f: - return read_metadata(f.read()) - -# def read_metadata(text:str): -# data = {} -# xml = "" + re.sub(r'<(\w+)>(.*?)', r'<\1>\2', text) + "" -# for elem in ElementTree.fromstring(xml, parser=ElementTree.XMLParser(encoding="utf-8")).findall('*'): -# data[elem.tag] = elem.text.strip() -# return data - -def read_metadata(text:str) -> dict: - config = configparser.ConfigParser(allow_unnamed_section=True, interpolation=None) - config.read_string(text) - return config._sections[configparser.UNNAMED_SECTION] # tuple(config._sections.values())[0] - -# def write_metadata(data:dict): -# text = "" -# for key in data: -# if key not in ("image",) and (value := data[key]): -# text += f'<{key}>{value}\n' -# return text - -def write_metadata(data:dict) -> str: - output = StringIO() - config = configparser.ConfigParser(allow_unnamed_section=True, interpolation=None) - del data["image"] - config[configparser.UNNAMED_SECTION] = data - config.write(output) - return "\n".join(output.getvalue().splitlines()[1:]) # remove section header - -def read_textual(filepath:str) -> str: - try: - with open(filepath, "r", encoding="utf-8") as f: - return f.read() - except UnicodeDecodeError: - with open(filepath, "r") as f: - return f.read() - -def write_textual(filepath:str, content:bytes): - with open(filepath, "w", encoding="utf-8") as f: - return f.write(content) - -def fetch_url_data(url:str): - response = requests.get(url, timeout=5) - soup = BeautifulSoup(response.text, "html.parser") - - description = None - desc_tag = soup.find("meta", attrs={"name": "description"}) or \ - soup.find("meta", attrs={"property": "og:description"}) - if desc_tag and "content" in desc_tag.attrs: - description = desc_tag["content"] - - image = None - img_tag = soup.find("meta", attrs={"property": "og:image"}) or \ - soup.find("meta", attrs={"name": "twitter:image"}) - if img_tag and "content" in img_tag.attrs: - image = img_tag["content"] - - return { - "title": soup_or_default(soup, "meta", {"property": "og:title"}, "content", (soup.title.string if soup.title else None)), - "description": description, - "image": image, - "link": soup_or_default(soup, "link", {"rel": "canonical"}, "href", url), - } - -def store_item(iid, data, files): - iid = iid_to_filename(iid) - iid = split_iid(strip_ext(iid)) - filepath = os.path.join(MEDIA_ROOT, *iid) - Path(os.path.join(MEDIA_ROOT, iid[0])).mkdir(parents=True, exist_ok=True) +def store_item(iid:str, data:dict, files:dict): + iid = filename_to_iid(iid) + existing = load_item(iid) + filename = split_iid(iid_to_filename(iid)) + filepath = os.path.join(ITEMS_ROOT, *filename) + mkdirs(os.path.join(ITEMS_ROOT, filename[0])) image = False + if len(files): file = files["file"] if file.seek(0, os.SEEK_END): @@ -369,39 +282,117 @@ def store_item(iid, data, files): ext = response.headers["Content-Type"].split("/")[1] with open(f"{filepath}.{ext}", "wb") as f: f.write(response.content) - # with open(filepath + ITEMS_EXT, "w", encoding="utf-8") as f: - # f.write(write_metadata(data)) + image = True + if not (existing or image or data["text"]): + return False + + if existing: + if "creator" in existing: + data["creator"] = existing["creator"] + else: + data["creator"] = current_user.username + items = current_user.data["items"] if "items" in current_user.data else [] + items.append(iid) + current_user.data["items"] = items + write_textual(current_user.filepath, write_metadata(current_user.data)) + write_textual(filepath + ITEMS_EXT, write_metadata(data)) + return True def delete_item(item:dict): - filepath = os.path.join(MEDIA_ROOT, iid_to_filename(item["id"])) + filepath = os.path.join(ITEMS_ROOT, iid_to_filename(item["id"])) files = glob(f"{filepath}.*") - # for key in ("id", "image"): - # if key in item and (value := item[key]): - # filepath = os.path.join(MEDIA_ROOT, value) - # if os.path.exists(filepath): - # os.remove(filepath) for file in files: os.remove(file) -def prop_or_default(items:dict, prop:str, default): +def read_metadata(text:str) -> dict: + config = ConfigParser(interpolation=None) + config.read_string(f"[DEFAULT]\n{text}") + data = config._defaults # type: ignore[attr-defined] + for key in ("items",): + if key in data: + data[key] = wsv_to_list(data[key]) + return data + +def write_metadata(data:dict) -> str: + output = StringIO() + config = ConfigParser(interpolation=None) + for key in ("image", "datetime"): + if key in data: + del data[key] + for key in data: + if type(data[key]) == list: + data[key] = list_to_wsv(data[key]) + config["DEFAULT"] = data + config.write(output) + return "\n".join(output.getvalue().splitlines()[1:]) # remove section header + +def read_textual(filepath:str) -> str: + try: + with open(filepath, "r", encoding="utf-8") as f: + return f.read() + except UnicodeDecodeError: + with open(filepath, "r") as f: + return f.read() + +def write_textual(filepath:str, content:str): + with open(filepath, "w", encoding="utf-8") as f: + return f.write(content) + +def fetch_url_data(url:str): + response = requests.get(url, timeout=5) + soup = BeautifulSoup(response.text, "html.parser") + + description = None + desc_tag = soup.find("meta", attrs={"name": "description"}) or \ + soup.find("meta", attrs={"property": "og:description"}) + if desc_tag and "content" in desc_tag.attrs: # type: ignore[attr-defined] + description = desc_tag["content"] # type: ignore[index] + + image = None + img_tag = soup.find("meta", attrs={"property": "og:image"}) or \ + soup.find("meta", attrs={"name": "twitter:image"}) + if img_tag and "content" in img_tag.attrs: # type: ignore[attr-defined] + image = img_tag["content"] # type: ignore[index] + + return { + "title": soup_or_default(soup, "meta", {"property": "og:title"}, "content", (soup.title.string if soup.title else None)), + "description": description, + "image": image, + "link": soup_or_default(soup, "link", {"rel": "canonical"}, "href", url), + } + +def prop_or_default(items:Any, prop:str, default): return (items[prop] if (items and prop in items) else None) or default def soup_or_default(soup:BeautifulSoup, tag:str, attrs:dict, prop:str, default): return prop_or_default(soup.find(tag, attrs=attrs), prop, default) -def generate_iid(): +def generate_iid() -> str: return str(next(snowflake)) - # iid = next(snowflake) - # date = Snowflake.parse(iid, snowflake_epoch).datetime - # return f"{date.year}/{date.month}/{next(snowflake)}" def split_iid(iid:str): - iid = iid.split("/") - return ["/".join(iid[:-1]), iid[-1]] + toks = iid.split("/") + return ["/".join(toks[:-1]), toks[-1]] def strip_ext(filename:str): return os.path.splitext(filename)[0] +def list_to_wsv(data:list, sep="\n") -> str: + return sep.join(data) + +def wsv_to_list(data:str) -> list: + return data.strip().replace(" ", "\n").replace("\t", "\n").splitlines() + +def mkdirs(*paths:str): + for path in paths: + Path(path).mkdir(parents=True, exist_ok=True) + +mkdirs(ITEMS_ROOT, USERS_ROOT) + if __name__ == "__main__": - app.run(debug=True) \ No newline at end of file + if DEVELOPMENT: + app.run(port=HTTP_PORT, debug=True) + else: + import waitress + waitress.serve(app, port=HTTP_PORT, threads=HTTP_THREADS) diff --git a/package-lock.json b/package-lock.json new file mode 100644 index 0000000..4552f38 --- /dev/null +++ b/package-lock.json @@ -0,0 +1,18 @@ +{ + "name": "Pignio", + "lockfileVersion": 3, + "requires": true, + "packages": { + "": { + "dependencies": { + "uikit": "^3.23.11" + } + }, + "node_modules/uikit": { + "version": "3.23.11", + "resolved": "https://registry.npmjs.org/uikit/-/uikit-3.23.11.tgz", + "integrity": "sha512-srUFBf5DfUxVpodcygibMQt1vgQjR9wlhIQo4GeWVpugk5+mKLPASJITDoY8wcwXQIHm7koELiPJ+FgNbzLv0A==", + "license": "MIT" + } + } +} diff --git a/package.json b/package.json new file mode 100644 index 0000000..f32a865 --- /dev/null +++ b/package.json @@ -0,0 +1,5 @@ +{ + "dependencies": { + "uikit": "^3.23.11" + } +} diff --git a/requirements.txt b/requirements.txt index 8c66673..b6b82f9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,5 +2,7 @@ flask flask-bcrypt flask-login flask-wtf +beautifulsoup4 requests -snowflake-id \ No newline at end of file +snowflake-id +waitress \ No newline at end of file diff --git a/templates/add.html b/templates/add.html index 9fae03f..08289ab 100644 --- a/templates/add.html +++ b/templates/add.html @@ -19,35 +19,35 @@
- {% if not item.id %} -
-
- - +
+
+
+ +
- {% endif %} -
-
+
- +
- +
- +
- +
- + {% if title %}{{ title }} | {% endif %}{{ config.APP_ICON }} {{ config.APP_NAME }} + + + + {% if canonical %} + + {% endif %} -