From 4471ef647b34d759db6e23494d90a7cd6cb4dc9a Mon Sep 17 00:00:00 2001 From: octospacc Date: Sat, 12 Jul 2025 01:46:41 +0200 Subject: [PATCH] . --- app.py | 415 +++++++++++++++++++++++++++++------------ requirements.txt | 3 + templates/add.html | 76 ++++++-- templates/base.html | 34 +--- templates/card.html | 0 templates/item.html | 30 ++- templates/login.html | 16 ++ templates/remove.html | 10 + templates/results.html | 31 ++- templates/user.html | 11 ++ 10 files changed, 428 insertions(+), 198 deletions(-) delete mode 100644 templates/card.html create mode 100644 templates/login.html create mode 100644 templates/remove.html create mode 100644 templates/user.html diff --git a/app.py b/app.py index 5610849..902fe68 100644 --- a/app.py +++ b/app.py @@ -1,41 +1,78 @@ import os import re import requests +import configparser import urllib.parse +import xml.etree.ElementTree as ElementTree +from io import StringIO from bs4 import BeautifulSoup -from flask import Flask, request, redirect, render_template, send_from_directory, abort, url_for +from flask import Flask, request, redirect, render_template, send_from_directory, abort, url_for, flash +from flask_bcrypt import Bcrypt +from flask_login import LoginManager, UserMixin, current_user, login_user, logout_user, login_required +from flask_wtf import FlaskForm +from wtforms import StringField, PasswordField, SubmitField +from wtforms.validators import DataRequired +from glob import glob from pathlib import Path from datetime import datetime -from snowflake import SnowflakeGenerator +from snowflake import Snowflake, SnowflakeGenerator app = Flask(__name__) -snowflake = SnowflakeGenerator(1, epoch=int(datetime(2025, 1, 1, 0, 0, 0).timestamp() * 1000)) +app.config["SECRET_KEY"] = "your_secret_key" # TODO: fix this for prod +login_manager = LoginManager() +login_manager.login_view = "login" +login_manager.init_app(app) +bcrypt = Bcrypt(app) + +snowflake_epoch = int(datetime(2025, 1, 1, 0, 0, 0).timestamp() * 1000) +snowflake = SnowflakeGenerator(1, epoch=snowflake_epoch) DATA_ROOT = "data" ITEMS_ROOT = f"{DATA_ROOT}/items" +USERS_ROOT = f"{DATA_ROOT}/users" MEDIA_ROOT = f"{DATA_ROOT}/items" EXTENSIONS = { "images": ("jpg", "jpeg", "png", "gif", "webp", "avif"), "videos": ("mp4", "mov", "mpeg", "ogv", "webm", "mkv"), } +ITEMS_EXT = ".pignio" + +class User(UserMixin): + def __init__(self, username, filepath): + self.username = username + self.filepath = filepath + with open(filepath, "r") as f: + self.data = read_metadata(f.read()) + + def get_id(self): + return self.username + +class LoginForm(FlaskForm): + username = StringField("Username", validators=[DataRequired()]) + password = PasswordField("Password", validators=[DataRequired()]) + submit = SubmitField("Login") @app.route("/") def index(): return render_template("index.html", media=walk_items()) @app.route("/media/") -def serve_media(filename): +def serve_media(filename:str): return send_from_directory(MEDIA_ROOT, filename) -@app.route("/item/") -def view_item(filename): - # full_path = os.path.join(MEDIA_ROOT, filename) - # if not os.path.exists(full_path): - # abort(404) - item = load_item(filename) - if not item: +@app.route("/item/") +def view_item(iid:str): + if (item := load_item(iid)): + return render_template("item.html", item=item) + else: + abort(404) + +@app.route("/user/") +def view_user(username:str): + if (user := load_user(username)): + return render_template("user.html", user=user, collections=walk_collections(username), load_item=load_item) + else: abort(404) - return render_template("item.html", filename=filename, item=item) @app.route("/search") def search(): @@ -54,39 +91,57 @@ def search(): return render_template("search.html", media=results, query=query) @app.route("/add", methods=["GET", "POST"]) +@login_required def add_item(): item = {} if request.method == "GET": - iid = request.args.get("item") - if iid: - item = load_item(iid) + if (iid := request.args.get("item")): + if not (item := load_item(iid)): + abort(404) elif request.method == "POST": iid = request.form.get("id") or generate_iid() - # title = request.form.get("title") - # description = request.form.get("description") - # if (url := request.form.get("url")): - # download_item(url) - # else: - # with open(os.path.join(MEDIA_ROOT, f"{iid[1]}.item"), "w") as f: - # f.write(write_metadata({ - # "description": description - # })) - # return redirect(url_for("index")) - - filename = store_item(iid, { + store_item(iid, { "link": request.form.get("link"), "title": request.form.get("title"), "description": request.form.get("description"), - }, request.files['file']) + "image": request.form.get("image"), + }, request.files) - return redirect(url_for("view_item", filename=filename)) + return redirect(url_for("view_item", iid=iid)) return render_template("add.html", item=item) +@app.route("/remove", methods=["GET", "POST"]) +@login_required +def remove_item(): + if request.method == "GET": + if (iid := request.args.get("item")): + if (item := load_item(iid)): + return render_template("remove.html", item=item) + + elif request.method == "POST": + if (iid := request.form.get("id")): + if (item := load_item(iid)): + delete_item(item) + return redirect(url_for("index")) + + abort(404) + + # iid = request.args.get("item") + # item = load_item(iid) + # if not item: + # abort(404) + # if request.method == "GET": + # return render_template("remove.html", item=item) + # elif request.method == "POST": + # delete_item(item) + # return redirect(url_for("index")) + @app.route("/api/preview") +@login_required def preview(): return fetch_url_data(request.args.get("url")) @@ -94,123 +149,240 @@ def preview(): def error_404(e): return render_template("404.html"), 404 +@app.route("/login", methods=["GET", "POST"]) +def login(): + if current_user.is_authenticated: + return redirect(url_for("index")) + form = LoginForm() + if form.validate_on_submit(): + if (user := load_user(form.username.data)): + pass_equals = user.data["password"] == form.password.data + try: + hash_equals = bcrypt.check_password_hash(user.data["password"], form.password.data) + except ValueError as e: + hash_equals = False + if pass_equals or hash_equals: + if pass_equals: + user.data["password"] = bcrypt.generate_password_hash(user.data["password"]).decode("utf-8") + with open(user.filepath, "w") as f: + f.write(write_metadata(user.data)) + login_user(user) + # next_url = flask.request.args.get('next') + # if not url_has_allowed_host_and_scheme(next_url, request.host): return flask.abort(400) + # return redirect(next_url or url_for("index")) + return redirect(url_for("index")) + if request.method == "POST": + flash("Invalid username or password", "danger") + return render_template("login.html", form=form) + +@app.route("/logout") +@login_required +def logout(): + logout_user() + return redirect(url_for("index")) + +@login_manager.user_loader +def load_user(username:str): + filepath = os.path.join(USERS_ROOT, (username + ITEMS_EXT)) + if os.path.exists(filepath): + return User(username, filepath) + def walk_items(): - results = {} + results, iids = {}, {} for root, dirs, files in os.walk(MEDIA_ROOT): rel_path = os.path.relpath(root, MEDIA_ROOT).replace(os.sep, "/") if rel_path == ".": rel_path = "" - results[rel_path] = [] + results[rel_path], iids[rel_path] = [], [] + + # for file in files: + # if file.lower().endswith(tuple([f".{ext}" for ext in EXTENSIONS["images"]])): + # iid = strip_ext(os.path.join(rel_path, file).replace(os.sep, "/")) + # image = os.path.join(rel_path, file).replace(os.sep, "/") + # data = load_sider_metadata(image) or {} + # data["image"] = image + # data["id"] = iid + # results[rel_path].append(data) + # files.remove(file) + + # for file in files: + # if file.lower().endswith(ITEMS_EXT): + # iid = strip_ext(os.path.join(rel_path, file).replace(os.sep, "/")) + # with open(os.path.join(MEDIA_ROOT, rel_path, file), "r") as f: + # data = read_metadata(f.read()) + # data["id"] = iid + # results[rel_path].append(data) + # files.remove(file) + for file in files: - filename = os.path.join(rel_path, file).replace(os.sep, "/") - if file.lower().endswith(".item"): - with open(os.path.join(MEDIA_ROOT, rel_path, file), "r") as f: - data = read_metadata(f.read()) - data["id"] = filename - results[rel_path].append(data) - elif file.lower().endswith(tuple([f".{ext}" for ext in EXTENSIONS["images"]])): - # results[rel_path].append(os.path.join(rel_path, file).replace(os.sep, "/")) - image = os.path.join(rel_path, file).replace(os.sep, "/") - data = load_sider_metadata(image) or {} - data["image"] = image - data["id"] = filename - results[rel_path].append(data) - + #if file.lower().endswith(ITEMS_EXT) or file.lower().endswith(tuple([f".{ext}" for ext in EXTENSIONS["images"]])): + iid = strip_ext(os.path.join(rel_path, file).replace(os.sep, "/")) + iid = filename_to_iid(iid) + if iid not in iids[rel_path]: + iids[rel_path].append(iid) + + for iid in iids[rel_path]: + data = load_item(iid) + results[rel_path].append(data) + return results -def load_item(iid): - data = None - filepath = os.path.join(MEDIA_ROOT, iid) +def walk_collections(username:str=None): + results = {"": []} - if os.path.exists(filepath): - if iid.lower().endswith(".item"): - with open(filepath, "r") as f: - data = read_metadata(f.read()) - else: - data = load_sider_metadata(iid) or {} - data["image"] = iid + filepath = USERS_ROOT + + if username: + filepath = os.path.join(filepath, username) + results[""] = read_metadata(read_textual(filepath + ITEMS_EXT))["items"].strip().replace(" ", "\n").splitlines() + + # for root, dirs, files in os.walk(filepath): + # rel_path = os.path.relpath(root, filepath).replace(os.sep, "/") + # if rel_path == ".": + # rel_path = "" + # else: + # results[rel_path] = [] + + # for file in files: + # print(file, rel_path) + # results[rel_path] = read_metadata(read_textual(os.path.join(filepath, rel_path, file)))["items"].strip().replace(" ", "\n").splitlines() + + return results + +def iid_to_filename(iid:str): + if len(iid.split("/")) == 1: + date = Snowflake.parse(int(iid), snowflake_epoch).datetime + iid = f"{date.year}/{date.month}/{iid}" + return iid + +def filename_to_iid(iid:str): + toks = iid.split("/") + if len(toks) == 3 and "".join(toks).isnumeric(): + iid = toks[2] + return iid + +def load_item(iid:str): + iid = filename_to_iid(iid) + filename = iid_to_filename(iid) + filepath = os.path.join(MEDIA_ROOT, filename) + files = glob(f"{filepath}.*") + + if len(files): + data = {"id": iid} + + for file in files: + if file.lower().endswith(ITEMS_EXT): + # with open(file, "r", encoding="utf-8") as f: + # data = data | read_metadata(f.read()) + data = data | read_metadata(read_textual(file)) + + elif file.lower().endswith(tuple([f".{ext}" for ext in EXTENSIONS["images"]])): + data["image"] = file.replace(os.sep, "/").removeprefix(f"{MEDIA_ROOT}/") - if data: - data["id"] = iid return data -def load_sider_metadata(filename): - filepath = os.path.join(MEDIA_ROOT, f"{strip_ext(filename)}.meta") +def load_sider_metadata(filename:str): + filepath = os.path.join(MEDIA_ROOT, f"{strip_ext(filename)}{ITEMS_EXT}") if os.path.exists(filepath): with open(filepath, "r") as f: return read_metadata(f.read()) -def read_metadata(text:str): - data = {} - for elem in BeautifulSoup(re.sub(r'<(\w+)>(.*?)', r'<\1>\2', text), "html.parser").find_all(): - data[elem.name] = elem.text.strip() - return data +# def read_metadata(text:str): +# data = {} +# xml = "" + re.sub(r'<(\w+)>(.*?)', r'<\1>\2', text) + "" +# for elem in ElementTree.fromstring(xml, parser=ElementTree.XMLParser(encoding="utf-8")).findall('*'): +# data[elem.tag] = elem.text.strip() +# return data -def write_metadata(data:dict): - text = "" - for key in data: - if (value := data[key]): - text += f'<{key}>{value}\n' - return text +def read_metadata(text:str) -> dict: + config = configparser.ConfigParser(allow_unnamed_section=True, interpolation=None) + config.read_string(text) + return config._sections[configparser.UNNAMED_SECTION] # tuple(config._sections.values())[0] + +# def write_metadata(data:dict): +# text = "" +# for key in data: +# if key not in ("image",) and (value := data[key]): +# text += f'<{key}>{value}\n' +# return text + +def write_metadata(data:dict) -> str: + output = StringIO() + config = configparser.ConfigParser(allow_unnamed_section=True, interpolation=None) + del data["image"] + config[configparser.UNNAMED_SECTION] = data + config.write(output) + return "\n".join(output.getvalue().splitlines()[1:]) # remove section header + +def read_textual(filepath:str) -> str: + try: + with open(filepath, "r", encoding="utf-8") as f: + return f.read() + except UnicodeDecodeError: + with open(filepath, "r") as f: + return f.read() + +def write_textual(filepath:str, content:bytes): + with open(filepath, "w", encoding="utf-8") as f: + return f.write(content) def fetch_url_data(url:str): - try: - response = requests.get(url, timeout=5) - soup = BeautifulSoup(response.text, "html.parser") + response = requests.get(url, timeout=5) + soup = BeautifulSoup(response.text, "html.parser") - description = None - desc_tag = soup.find("meta", attrs={"name": "description"}) or \ - soup.find("meta", attrs={"property": "og:description"}) - if desc_tag and "content" in desc_tag.attrs: - description = desc_tag["content"] + description = None + desc_tag = soup.find("meta", attrs={"name": "description"}) or \ + soup.find("meta", attrs={"property": "og:description"}) + if desc_tag and "content" in desc_tag.attrs: + description = desc_tag["content"] - image = None - img_tag = soup.find("meta", attrs={"property": "og:image"}) or \ - soup.find("meta", attrs={"name": "twitter:image"}) - if img_tag and "content" in img_tag.attrs: - image = img_tag["content"] + image = None + img_tag = soup.find("meta", attrs={"property": "og:image"}) or \ + soup.find("meta", attrs={"name": "twitter:image"}) + if img_tag and "content" in img_tag.attrs: + image = img_tag["content"] - return { - "title": soup_or_default(soup, "meta", {"property": "og:title"}, "content", (soup.title.string if soup.title else None)), - "description": description, - "image": image, - "link": soup_or_default(soup, "link", {"rel": "canonical"}, "href", url), - } + return { + "title": soup_or_default(soup, "meta", {"property": "og:title"}, "content", (soup.title.string if soup.title else None)), + "description": description, + "image": image, + "link": soup_or_default(soup, "link", {"rel": "canonical"}, "href", url), + } - except Exception as e: - # print("Metadata fetch failed:", e) - return {} - -def download_item(url:str): - data = fetch_url_data(url) - url = urllib.parse.urlparse(data["link"]) - slug = (url.path or "index").split("/")[-1] - domain = url.netloc - Path(os.path.join(MEDIA_ROOT, domain)).mkdir(parents=True, exist_ok=True) - path = os.path.join(MEDIA_ROOT, domain, slug) - with open(f"{path}.meta", "w") as f: - f.write(write_metadata(data)) - -def store_item(iid, data, file): - item = load_item(iid) +def store_item(iid, data, files): + iid = iid_to_filename(iid) iid = split_iid(strip_ext(iid)) filepath = os.path.join(MEDIA_ROOT, *iid) Path(os.path.join(MEDIA_ROOT, iid[0])).mkdir(parents=True, exist_ok=True) - if file: - with open(f"{filepath}.meta", "w") as f: - f.write(write_metadata(data)) - # with open(f"{filepath}.{ext}", "wb") as f: - # f.write(write_metadata(data)) - ext = file.content_type.split("/")[1] - file.save(f'{filepath}.{ext}') - return "/".join(iid) + f".{ext}" - else: - with open(f"{filepath}.item", "w") as f: - f.write(write_metadata(data)) - return "/".join(iid) + ".item" - # return "/".join(iid) + image = False + if len(files): + file = files["file"] + if file.seek(0, os.SEEK_END): + file.seek(0, os.SEEK_SET) + ext = file.content_type.split("/")[1] + file.save(f"{filepath}.{ext}") + image = True + if not image and data["image"]: + response = requests.get(data["image"], timeout=5) + ext = response.headers["Content-Type"].split("/")[1] + with open(f"{filepath}.{ext}", "wb") as f: + f.write(response.content) + # with open(filepath + ITEMS_EXT, "w", encoding="utf-8") as f: + # f.write(write_metadata(data)) + write_textual(filepath + ITEMS_EXT, write_metadata(data)) + +def delete_item(item:dict): + filepath = os.path.join(MEDIA_ROOT, iid_to_filename(item["id"])) + files = glob(f"{filepath}.*") + # for key in ("id", "image"): + # if key in item and (value := item[key]): + # filepath = os.path.join(MEDIA_ROOT, value) + # if os.path.exists(filepath): + # os.remove(filepath) + for file in files: + os.remove(file) def prop_or_default(items:dict, prop:str, default): return (items[prop] if (items and prop in items) else None) or default @@ -219,9 +391,10 @@ def soup_or_default(soup:BeautifulSoup, tag:str, attrs:dict, prop:str, default): return prop_or_default(soup.find(tag, attrs=attrs), prop, default) def generate_iid(): - date = datetime.now() - return f"{date.year}/{date.month}/{next(snowflake)}" - # return [f"{date.year}/{date.month}", next(snowflake)] + return str(next(snowflake)) + # iid = next(snowflake) + # date = Snowflake.parse(iid, snowflake_epoch).datetime + # return f"{date.year}/{date.month}/{next(snowflake)}" def split_iid(iid:str): iid = iid.split("/") diff --git a/requirements.txt b/requirements.txt index 13d8aee..8c66673 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,6 @@ flask +flask-bcrypt +flask-login +flask-wtf requests snowflake-id \ No newline at end of file diff --git a/templates/add.html b/templates/add.html index 5d9ce3a..9fae03f 100644 --- a/templates/add.html +++ b/templates/add.html @@ -1,41 +1,77 @@ {% extends "base.html" %} {% block title %}🆕 Add item{% endblock %} {% block content %} +
- -
-
- - -
+ + +
- + +
+ {% if not item.id %} +
+
+ + +
+
+ {% endif %} +
+ +
+
+
- +
- + +
+
+
diff --git a/templates/base.html b/templates/base.html index 964e83b..b439bc2 100644 --- a/templates/base.html +++ b/templates/base.html @@ -5,32 +5,9 @@ {% block title %}My Media Gallery{% endblock %} - - - - - - - - - - -
+
+ {% with messages = get_flashed_messages(with_categories=true) %} + {% if messages %} + {% for category, message in messages %} +

{{ message }}

+ {% endfor %} + {% endif %} + {% endwith %} +
{% block content %}{% endblock %}
diff --git a/templates/card.html b/templates/card.html deleted file mode 100644 index e69de29..0000000 diff --git a/templates/item.html b/templates/item.html index 0dc9171..1e4f7e5 100644 --- a/templates/item.html +++ b/templates/item.html @@ -1,30 +1,28 @@ {% extends "base.html" %} -{% block title %}📌 Viewing {{ filename }}{% endblock %} +{% block title %}📌 Viewing {% include "item-title.html" %}{% endblock %} +{% block canonical %}{{ url_for('view_item', iid=item.id) }}{% endblock %} {% block content %} -{% raw %} - - -{% endraw %}
{% if item.image %} - + {% endif %}

{% include "item-title.html" %}

+ {{ item.created }}

{{ item.description }}

- Edit - Remove +
+ Edit + Remove + +
{% endblock %} \ No newline at end of file diff --git a/templates/login.html b/templates/login.html new file mode 100644 index 0000000..3b0d50a --- /dev/null +++ b/templates/login.html @@ -0,0 +1,16 @@ +{% extends "base.html" %} +{% block title %}Login{% endblock %} +{% block content %} +
+ {{ form.hidden_tag() }} +
+ {{ form.username(class_="uk-input", placeholder="Username") }} +
+
+ {{ form.password(class_="uk-input", placeholder="Password") }} +
+
+ {{ form.submit(class_="uk-input uk-button uk-button-default") }} +
+
+{% endblock %} \ No newline at end of file diff --git a/templates/remove.html b/templates/remove.html new file mode 100644 index 0000000..1ae4b88 --- /dev/null +++ b/templates/remove.html @@ -0,0 +1,10 @@ +{% extends "base.html" %} +{% block title %}🗑 Remove item{% endblock %} +{% block content %} +
+ +
+ +
+
+{% endblock %} \ No newline at end of file diff --git a/templates/results.html b/templates/results.html index aa940b3..0783664 100644 --- a/templates/results.html +++ b/templates/results.html @@ -1,21 +1,18 @@ \ No newline at end of file diff --git a/templates/user.html b/templates/user.html new file mode 100644 index 0000000..cf09063 --- /dev/null +++ b/templates/user.html @@ -0,0 +1,11 @@ +{% extends "base.html" %} +{% block title %}User {{ user.username }}{% endblock %} +{% block content %} + {{ user.username }} + + {% for folder, collection in collections.items() %} + {% for item in collection %} + {{ item }} + {% endfor %} + {% endfor %} +{% endblock %} \ No newline at end of file