diff --git a/.flake8 b/.flake8 index 21fd7bd..6efbecd 100644 --- a/.flake8 +++ b/.flake8 @@ -1,4 +1,4 @@ [flake8] exclude=build,tests,tmp,venv,toot/tui/scroll.py -ignore=E128 +ignore=E128,W503 max-line-length=120 diff --git a/requirements.txt b/requirements.txt index 67ddf98..3616ac3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ requests>=2.13,<3.0 beautifulsoup4>=4.5.0,<5.0 wcwidth>=0.1.7 urwid>=2.0.0,<3.0 + diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py new file mode 100644 index 0000000..b4aaa1e --- /dev/null +++ b/tests/integration/conftest.py @@ -0,0 +1,122 @@ +""" +This module contains integration tests meant to run against a test Mastodon instance. + +You can set up a test instance locally by following this guide: +https://docs.joinmastodon.org/dev/setup/ + +To enable integration tests, export the following environment variables to match +your test server and database: + +``` +export TOOT_TEST_HOSTNAME="localhost:3000" +export TOOT_TEST_DATABASE_DSN="dbname=mastodon_development" +``` +""" + +import re +import os +import psycopg2 +import pytest +import uuid + +from pathlib import Path +from toot import api, App, User +from toot.console import run_command + +# Host name of a test instance to run integration tests against +# DO NOT USE PUBLIC INSTANCES!!! +BASE_URL = os.getenv("TOOT_TEST_BASE_URL") + +# Mastodon database name, used to confirm user registration without having to click the link +DATABASE_DSN = os.getenv("TOOT_TEST_DATABASE_DSN") + +# Toot logo used for testing image upload +TRUMPET = str(Path(__file__).parent.parent.parent / "trumpet.png") + +ASSETS_DIR = str(Path(__file__).parent.parent / "assets") + + +if not BASE_URL or not DATABASE_DSN: + pytest.skip("Skipping integration tests", allow_module_level=True) + +# ------------------------------------------------------------------------------ +# Fixtures +# ------------------------------------------------------------------------------ + + +def create_app(): + instance = api.get_instance(BASE_URL) + response = api.create_app(BASE_URL) + return App(instance["uri"], BASE_URL, response["client_id"], response["client_secret"]) + + +def register_account(app: App): + username = str(uuid.uuid4())[-10:] + email = f"{username}@example.com" + + response = api.register_account(app, username, email, "password", "en") + confirm_user(email) + return User(app.instance, username, response["access_token"]) + + +def confirm_user(email): + conn = psycopg2.connect(DATABASE_DSN) + cursor = conn.cursor() + cursor.execute("UPDATE users SET confirmed_at = now() WHERE email = %s;", (email,)) + conn.commit() + + +@pytest.fixture(scope="session") +def app(): + return create_app() + + +@pytest.fixture(scope="session") +def user(app): + return register_account(app) + + +@pytest.fixture(scope="session") +def friend(app): + return register_account(app) + + +@pytest.fixture +def run(app, user, capsys): + def _run(command, *params, as_user=None): + run_command(app, as_user or user, command, params or []) + out, err = capsys.readouterr() + assert err == "" + return strip_ansi(out) + return _run + + +@pytest.fixture +def run_anon(capsys): + def _run(command, *params): + run_command(None, None, command, params or []) + out, err = capsys.readouterr() + assert err == "" + return strip_ansi(out) + return _run + + +# ------------------------------------------------------------------------------ +# Utils +# ------------------------------------------------------------------------------ + +strip_ansi_pattern = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") + + +def strip_ansi(string): + return strip_ansi_pattern.sub("", string).strip() + + +def posted_status_id(out): + pattern = re.compile(r"Toot posted: http://([^/]+)/([^/]+)/(.+)") + match = re.search(pattern, out) + assert match + + _, _, status_id = match.groups() + + return status_id diff --git a/tests/integration/test_accounts.py b/tests/integration/test_accounts.py new file mode 100644 index 0000000..bf01f1b --- /dev/null +++ b/tests/integration/test_accounts.py @@ -0,0 +1,19 @@ + + +def test_whoami(user, run): + out = run("whoami") + # TODO: test other fields once updating account is supported + assert f"@{user.username}" in out + + +def test_whois(app, friend, run): + variants = [ + friend.username, + f"@{friend.username}", + f"{friend.username}@{app.instance}", + f"@{friend.username}@{app.instance}", + ] + + for username in variants: + out = run("whois", username) + assert f"@{friend.username}" in out diff --git a/tests/integration/test_auth.py b/tests/integration/test_auth.py new file mode 100644 index 0000000..afe5c39 --- /dev/null +++ b/tests/integration/test_auth.py @@ -0,0 +1,125 @@ +import pytest + +from tests.integration.conftest import TRUMPET +from toot import api +from toot.exceptions import ConsoleError +from toot.utils import get_text + + +def test_update_account_no_options(run): + with pytest.raises(ConsoleError) as exc: + run("update_account") + assert str(exc.value) == "Please specify at least one option to update the account" + + +def test_update_account_display_name(run, app, user): + out = run("update_account", "--display-name", "elwood") + assert out == "✓ Account updated" + + account = api.verify_credentials(app, user) + assert account["display_name"] == "elwood" + + +def test_update_account_note(run, app, user): + note = ("It's 106 miles to Chicago, we got a full tank of gas, half a pack " + "of cigarettes, it's dark... and we're wearing sunglasses.") + + out = run("update_account", "--note", note) + assert out == "✓ Account updated" + + account = api.verify_credentials(app, user) + assert get_text(account["note"]) == note + + +def test_update_account_language(run, app, user): + out = run("update_account", "--language", "hr") + assert out == "✓ Account updated" + + account = api.verify_credentials(app, user) + assert account["source"]["language"] == "hr" + + +def test_update_account_privacy(run, app, user): + out = run("update_account", "--privacy", "private") + assert out == "✓ Account updated" + + account = api.verify_credentials(app, user) + assert account["source"]["privacy"] == "private" + + +def test_update_account_avatar(run, app, user): + account = api.verify_credentials(app, user) + old_value = account["avatar"] + + out = run("update_account", "--avatar", TRUMPET) + assert out == "✓ Account updated" + + account = api.verify_credentials(app, user) + assert account["avatar"] != old_value + + +def test_update_account_header(run, app, user): + account = api.verify_credentials(app, user) + old_value = account["header"] + + out = run("update_account", "--header", TRUMPET) + assert out == "✓ Account updated" + + account = api.verify_credentials(app, user) + assert account["header"] != old_value + + +def test_update_account_locked(run, app, user): + out = run("update_account", "--locked") + assert out == "✓ Account updated" + + account = api.verify_credentials(app, user) + assert account["locked"] is True + + out = run("update_account", "--no-locked") + assert out == "✓ Account updated" + + account = api.verify_credentials(app, user) + assert account["locked"] is False + + +def test_update_account_bot(run, app, user): + out = run("update_account", "--bot") + assert out == "✓ Account updated" + + account = api.verify_credentials(app, user) + assert account["bot"] is True + + out = run("update_account", "--no-bot") + assert out == "✓ Account updated" + + account = api.verify_credentials(app, user) + assert account["bot"] is False + + +def test_update_account_discoverable(run, app, user): + out = run("update_account", "--discoverable") + assert out == "✓ Account updated" + + account = api.verify_credentials(app, user) + assert account["discoverable"] is True + + out = run("update_account", "--no-discoverable") + assert out == "✓ Account updated" + + account = api.verify_credentials(app, user) + assert account["discoverable"] is False + + +def test_update_account_sensitive(run, app, user): + out = run("update_account", "--sensitive") + assert out == "✓ Account updated" + + account = api.verify_credentials(app, user) + assert account["source"]["sensitive"] is True + + out = run("update_account", "--no-sensitive") + assert out == "✓ Account updated" + + account = api.verify_credentials(app, user) + assert account["source"]["sensitive"] is False diff --git a/tests/integration/test_lists.py b/tests/integration/test_lists.py new file mode 100644 index 0000000..c9fd392 --- /dev/null +++ b/tests/integration/test_lists.py @@ -0,0 +1,58 @@ + +from toot import api +from tests.integration.conftest import register_account + + +def test_lists_empty(run): + out = run("lists") + assert out == "You have no lists defined." + + +def test_list_create_delete(run): + out = run("list_create", "banana") + assert out == '✓ List "banana" created.' + + out = run("lists") + assert "banana" in out + + out = run("list_create", "mango") + assert out == '✓ List "mango" created.' + + out = run("lists") + assert "banana" in out + assert "mango" in out + + out = run("list_delete", "banana") + assert out == '✓ List "banana" deleted.' + + out = run("lists") + assert "banana" not in out + assert "mango" in out + + out = run("list_delete", "mango") + assert out == '✓ List "mango" deleted.' + + out = run("lists") + assert out == "You have no lists defined." + + +def test_list_add_remove(run, app): + acc = register_account(app) + run("list_create", "foo") + + out = run("list_add", "foo", acc.username) + assert out == f"You must follow @{acc.username} before adding this account to a list." + + run("follow", acc.username) + + out = run("list_add", "foo", acc.username) + assert out == f'✓ Added account "{acc.username}"' + + out = run("list_accounts", "foo") + assert acc.username in out + + out = run("list_remove", "foo", acc.username) + assert out == f'✓ Removed account "{acc.username}"' + + out = run("list_accounts", "foo") + assert out == "This list has no accounts." diff --git a/tests/integration/test_post.py b/tests/integration/test_post.py new file mode 100644 index 0000000..7ad7eb9 --- /dev/null +++ b/tests/integration/test_post.py @@ -0,0 +1,288 @@ +import re +import uuid + +from datetime import datetime, timedelta, timezone +from os import path +from tests.integration.conftest import ASSETS_DIR, posted_status_id +from toot import CLIENT_NAME, CLIENT_WEBSITE, api +from toot.utils import get_text +from unittest import mock + + +def test_post(app, user, run): + text = "i wish i was a #lumberjack" + out = run("post", text) + status_id = posted_status_id(out) + + status = api.fetch_status(app, user, status_id) + assert text == get_text(status["content"]) + assert status["visibility"] == "public" + assert status["sensitive"] is False + assert status["spoiler_text"] == "" + assert status["poll"] is None + + # Pleroma doesn't return the application + if status["application"]: + assert status["application"]["name"] == CLIENT_NAME + assert status["application"]["website"] == CLIENT_WEBSITE + + +def test_post_visibility(app, user, run): + for visibility in ["public", "unlisted", "private", "direct"]: + out = run("post", "foo", "--visibility", visibility) + status_id = posted_status_id(out) + status = api.fetch_status(app, user, status_id) + assert status["visibility"] == visibility + + +def test_post_scheduled_at(app, user, run): + text = str(uuid.uuid4()) + scheduled_at = datetime.now(timezone.utc).replace(microsecond=0) + timedelta(minutes=10) + + out = run("post", text, "--scheduled-at", scheduled_at.isoformat()) + assert "Toot scheduled for" in out + + statuses = api.scheduled_statuses(app, user) + [status] = [s for s in statuses if s["params"]["text"] == text] + assert datetime.strptime(status["scheduled_at"], "%Y-%m-%dT%H:%M:%S.%f%z") == scheduled_at + + +def test_post_scheduled_in(app, user, run): + text = str(uuid.uuid4()) + + variants = [ + ("1 day", timedelta(days=1)), + ("1 day 6 hours", timedelta(days=1, hours=6)), + ("1 day 6 hours 13 minutes", timedelta(days=1, hours=6, minutes=13)), + ("1 day 6 hours 13 minutes 51 second", timedelta(days=1, hours=6, minutes=13, seconds=51)), + ("2d", timedelta(days=2)), + ("2d6h", timedelta(days=2, hours=6)), + ("2d6h13m", timedelta(days=2, hours=6, minutes=13)), + ("2d6h13m51s", timedelta(days=2, hours=6, minutes=13, seconds=51)), + ] + + datetimes = [] + for scheduled_in, delta in variants: + out = run("post", text, "--scheduled-in", scheduled_in) + dttm = datetime.utcnow() + delta + assert out.startswith(f"Toot scheduled for: {str(dttm)[:16]}") + datetimes.append(dttm) + + scheduled = api.scheduled_statuses(app, user) + scheduled = [s for s in scheduled if s["params"]["text"] == text] + scheduled = sorted(scheduled, key=lambda s: s["scheduled_at"]) + assert len(scheduled) == 8 + + for expected, status in zip(datetimes, scheduled): + actual = datetime.strptime(status["scheduled_at"], "%Y-%m-%dT%H:%M:%S.%fZ") + delta = expected - actual + assert delta.total_seconds() < 5 + + +def test_post_poll(app, user, run): + text = str(uuid.uuid4()) + + out = run( + "post", text, + "--poll-option", "foo", + "--poll-option", "bar", + "--poll-option", "baz", + "--poll-option", "qux", + ) + + status_id = posted_status_id(out) + + status = api.fetch_status(app, user, status_id) + assert status["poll"]["expired"] is False + assert status["poll"]["multiple"] is False + assert status["poll"]["options"] == [ + {"title": "foo", "votes_count": 0}, + {"title": "bar", "votes_count": 0}, + {"title": "baz", "votes_count": 0}, + {"title": "qux", "votes_count": 0} + ] + + # Test expires_at is 24h by default + actual = datetime.strptime(status["poll"]["expires_at"], "%Y-%m-%dT%H:%M:%S.%f%z") + expected = datetime.now(timezone.utc) + timedelta(days=1) + delta = actual - expected + assert delta.total_seconds() < 5 + + +def test_post_poll_multiple(app, user, run): + text = str(uuid.uuid4()) + + out = run( + "post", text, + "--poll-option", "foo", + "--poll-option", "bar", + "--poll-multiple" + ) + + status_id = posted_status_id(out) + + status = api.fetch_status(app, user, status_id) + assert status["poll"]["multiple"] is True + + +def test_post_poll_expires_in(app, user, run): + text = str(uuid.uuid4()) + + out = run( + "post", text, + "--poll-option", "foo", + "--poll-option", "bar", + "--poll-expires-in", "8h", + ) + + status_id = posted_status_id(out) + + status = api.fetch_status(app, user, status_id) + actual = datetime.strptime(status["poll"]["expires_at"], "%Y-%m-%dT%H:%M:%S.%f%z") + expected = datetime.now(timezone.utc) + timedelta(hours=8) + delta = actual - expected + assert delta.total_seconds() < 5 + + +def test_post_poll_hide_totals(app, user, run): + text = str(uuid.uuid4()) + + out = run( + "post", text, + "--poll-option", "foo", + "--poll-option", "bar", + "--poll-hide-totals" + ) + + status_id = posted_status_id(out) + + status = api.fetch_status(app, user, status_id) + + # votes_count is None when totals are hidden + assert status["poll"]["options"] == [ + {"title": "foo", "votes_count": None}, + {"title": "bar", "votes_count": None}, + ] + + +def test_post_language(app, user, run): + out = run("post", "test", "--language", "hr") + status_id = posted_status_id(out) + status = api.fetch_status(app, user, status_id) + assert status["language"] == "hr" + + out = run("post", "test", "--language", "zh") + status_id = posted_status_id(out) + status = api.fetch_status(app, user, status_id) + assert status["language"] == "zh" + + +def test_media_thumbnail(app, user, run): + video_path = path.join(ASSETS_DIR, "small.webm") + thumbnail_path = path.join(ASSETS_DIR, "test1.png") + + out = run( + "post", + "--media", video_path, + "--thumbnail", thumbnail_path, + "--description", "foo", + "some text" + ) + + status_id = posted_status_id(out) + status = api.fetch_status(app, user, status_id) + [media] = status["media_attachments"] + + assert media["description"] == "foo" + assert media["type"] == "video" + assert media["url"].endswith(".mp4") + assert media["preview_url"].endswith(".png") + + # Video properties + assert int(media["meta"]["original"]["duration"]) == 5 + assert media["meta"]["original"]["height"] == 320 + assert media["meta"]["original"]["width"] == 560 + + # Thumbnail properties + assert media["meta"]["small"]["height"] == 50 + assert media["meta"]["small"]["width"] == 50 + + +def test_media_attachments(app, user, run): + path1 = path.join(ASSETS_DIR, "test1.png") + path2 = path.join(ASSETS_DIR, "test2.png") + path3 = path.join(ASSETS_DIR, "test3.png") + path4 = path.join(ASSETS_DIR, "test4.png") + + out = run( + "post", + "--media", path1, + "--media", path2, + "--media", path3, + "--media", path4, + "--description", "Test 1", + "--description", "Test 2", + "--description", "Test 3", + "--description", "Test 4", + "some text" + ) + + status_id = posted_status_id(out) + status = api.fetch_status(app, user, status_id) + + [a1, a2, a3, a4] = status["media_attachments"] + + # Pleroma doesn't send metadata + if "meta" in a1: + assert a1["meta"]["original"]["size"] == "50x50" + assert a2["meta"]["original"]["size"] == "50x60" + assert a3["meta"]["original"]["size"] == "50x70" + assert a4["meta"]["original"]["size"] == "50x80" + + assert a1["description"] == "Test 1" + assert a2["description"] == "Test 2" + assert a3["description"] == "Test 3" + assert a4["description"] == "Test 4" + + +@mock.patch("toot.utils.multiline_input") +@mock.patch("sys.stdin.read") +def test_media_attachment_without_text(mock_read, mock_ml, app, user, run): + # No status from stdin or readline + mock_read.return_value = "" + mock_ml.return_value = "" + + media_path = path.join(ASSETS_DIR, "test1.png") + + out = run("post", "--media", media_path) + status_id = posted_status_id(out) + + status = api.fetch_status(app, user, status_id) + assert status["content"] == "" + + [attachment] = status["media_attachments"] + assert not attachment["description"] + + # Pleroma doesn't send metadata + if "meta" in attachment: + assert attachment["meta"]["original"]["size"] == "50x50" + + +def test_reply_thread(app, user, friend, run): + status = api.post_status(app, friend, "This is the status") + + out = run("post", "--reply-to", status["id"], "This is the reply") + status_id = posted_status_id(out) + reply = api.fetch_status(app, user, status_id) + + assert reply["in_reply_to_id"] == status["id"] + + out = run("thread", status["id"]) + [s1, s2] = [s.strip() for s in re.split(r"─+", out) if s.strip()] + + assert "This is the status" in s1 + assert "This is the reply" in s2 + assert friend.username in s1 + assert user.username in s2 + assert status["id"] in s1 + assert reply["id"] in s2 diff --git a/tests/integration/test_read.py b/tests/integration/test_read.py new file mode 100644 index 0000000..13f05d1 --- /dev/null +++ b/tests/integration/test_read.py @@ -0,0 +1,83 @@ +import pytest + +from tests.integration.conftest import BASE_URL +from toot import api +from toot.exceptions import ConsoleError + + +def test_instance(app, run): + out = run("instance", "--disable-https") + assert "Mastodon" in out + assert app.instance in out + assert "running Mastodon" in out + + +def test_instance_anon(app, run_anon): + out = run_anon("instance", BASE_URL) + assert "Mastodon" in out + assert app.instance in out + assert "running Mastodon" in out + + # Need to specify the instance name when running anon + with pytest.raises(ConsoleError) as exc: + run_anon("instance") + assert str(exc.value) == "Please specify an instance." + + +def test_whoami(user, run): + out = run("whoami") + # TODO: test other fields once updating account is supported + assert f"@{user.username}" in out + + +def test_whois(app, friend, run): + variants = [ + friend.username, + f"@{friend.username}", + f"{friend.username}@{app.instance}", + f"@{friend.username}@{app.instance}", + ] + + for username in variants: + out = run("whois", username) + assert f"@{friend.username}" in out + + +def test_search_account(friend, run): + out = run("search", friend.username) + assert out == f"Accounts:\n* @{friend.username}" + + +def test_search_hashtag(app, user, run): + api.post_status(app, user, "#hashtag_x") + api.post_status(app, user, "#hashtag_y") + api.post_status(app, user, "#hashtag_z") + + out = run("search", "#hashtag") + assert out == "Hashtags:\n#hashtag_x, #hashtag_y, #hashtag_z" + + +def test_tags(run): + out = run("tags_followed") + assert out == "You're not following any hashtags." + + out = run("tags_follow", "foo") + assert out == "✓ You are now following #foo" + + out = run("tags_followed") + assert out == f"* #foo\t{BASE_URL}/tags/foo" + + out = run("tags_follow", "bar") + assert out == "✓ You are now following #bar" + + out = run("tags_followed") + assert out == "\n".join([ + f"* #bar\t{BASE_URL}/tags/bar", + f"* #foo\t{BASE_URL}/tags/foo", + ]) + + out = run("tags_unfollow", "foo") + assert out == "✓ You are no longer following #foo" + + out = run("tags_followed") + assert out == f"* #bar\t{BASE_URL}/tags/bar" diff --git a/tests/integration/test_status.py b/tests/integration/test_status.py new file mode 100644 index 0000000..8274174 --- /dev/null +++ b/tests/integration/test_status.py @@ -0,0 +1,89 @@ +import time +import pytest + +from toot import api +from toot.exceptions import NotFoundError + + +def test_delete_status(app, user, run): + status = api.post_status(app, user, "foo") + + out = run("delete", status["id"]) + assert out == "✓ Status deleted" + + with pytest.raises(NotFoundError): + api.fetch_status(app, user, status["id"]) + + +def test_favourite(app, user, run): + status = api.post_status(app, user, "foo") + assert not status["favourited"] + + out = run("favourite", status["id"]) + assert out == "✓ Status favourited" + + status = api.fetch_status(app, user, status["id"]) + assert status["favourited"] + + out = run("unfavourite", status["id"]) + assert out == "✓ Status unfavourited" + + # A short delay is required before the server returns new data + time.sleep(0.1) + + status = api.fetch_status(app, user, status["id"]) + assert not status["favourited"] + + +def test_reblog(app, user, run): + status = api.post_status(app, user, "foo") + assert not status["reblogged"] + + out = run("reblog", status["id"]) + assert out == "✓ Status reblogged" + + status = api.fetch_status(app, user, status["id"]) + assert status["reblogged"] + + out = run("reblogged_by", status["id"]) + assert out == f"@{user.username}" + + out = run("unreblog", status["id"]) + assert out == "✓ Status unreblogged" + + status = api.fetch_status(app, user, status["id"]) + assert not status["reblogged"] + + +def test_pin(app, user, run): + status = api.post_status(app, user, "foo") + assert not status["pinned"] + + out = run("pin", status["id"]) + assert out == "✓ Status pinned" + + status = api.fetch_status(app, user, status["id"]) + assert status["pinned"] + + out = run("unpin", status["id"]) + assert out == "✓ Status unpinned" + + status = api.fetch_status(app, user, status["id"]) + assert not status["pinned"] + + +def test_bookmark(app, user, run): + status = api.post_status(app, user, "foo") + assert not status["bookmarked"] + + out = run("bookmark", status["id"]) + assert out == "✓ Status bookmarked" + + status = api.fetch_status(app, user, status["id"]) + assert status["bookmarked"] + + out = run("unbookmark", status["id"]) + assert out == "✓ Status unbookmarked" + + status = api.fetch_status(app, user, status["id"]) + assert not status["bookmarked"] diff --git a/tests/test_integration.py b/tests/test_integration.py deleted file mode 100644 index aa7b89a..0000000 --- a/tests/test_integration.py +++ /dev/null @@ -1,758 +0,0 @@ -""" -This module contains integration tests meant to run against a test Mastodon instance. - -You can set up a test instance locally by following this guide: -https://docs.joinmastodon.org/dev/setup/ - -To enable integration tests, export the following environment variables to match -your test server and database: - -``` -export TOOT_TEST_HOSTNAME="localhost:3000" -export TOOT_TEST_DATABASE_DSN="dbname=mastodon_development" -``` -""" - -import os -import psycopg2 -import pytest -import re -import time -import uuid - -from datetime import datetime, timedelta, timezone -from os import path -from toot import CLIENT_NAME, CLIENT_WEBSITE, api, App, User -from toot.console import run_command -from toot.exceptions import ConsoleError, NotFoundError -from toot.utils import get_text -from unittest import mock - -# Host name of a test instance to run integration tests against -# DO NOT USE PUBLIC INSTANCES!!! -BASE_URL = os.getenv("TOOT_TEST_BASE_URL") - -# Mastodon database name, used to confirm user registration without having to click the link -DATABASE_DSN = os.getenv("TOOT_TEST_DATABASE_DSN") - -# Toot logo used for testing image upload -TRUMPET = path.join(path.dirname(path.dirname(path.realpath(__file__))), "trumpet.png") - - -if not BASE_URL or not DATABASE_DSN: - pytest.skip("Skipping integration tests", allow_module_level=True) - -# ------------------------------------------------------------------------------ -# Fixtures -# ------------------------------------------------------------------------------ - - -def create_app(): - instance = api.get_instance(BASE_URL) - response = api.create_app(BASE_URL) - return App(instance["uri"], BASE_URL, response["client_id"], response["client_secret"]) - - -def register_account(app: App): - username = str(uuid.uuid4())[-10:] - email = f"{username}@example.com" - - response = api.register_account(app, username, email, "password", "en") - confirm_user(email) - return User(app.instance, username, response["access_token"]) - - -def confirm_user(email): - conn = psycopg2.connect(DATABASE_DSN) - cursor = conn.cursor() - cursor.execute("UPDATE users SET confirmed_at = now() WHERE email = %s;", (email,)) - conn.commit() - - -@pytest.fixture(scope="session") -def app(): - return create_app() - - -@pytest.fixture(scope="session") -def user(app): - return register_account(app) - - -@pytest.fixture(scope="session") -def friend(app): - return register_account(app) - - -@pytest.fixture -def run(app, user, capsys): - def _run(command, *params, as_user=None): - run_command(app, as_user or user, command, params or []) - out, err = capsys.readouterr() - assert err == "" - return strip_ansi(out) - return _run - - -@pytest.fixture -def run_anon(capsys): - def _run(command, *params): - run_command(None, None, command, params or []) - out, err = capsys.readouterr() - assert err == "" - return strip_ansi(out) - return _run - -# ------------------------------------------------------------------------------ -# Tests -# ------------------------------------------------------------------------------ - - -def test_instance(app, run): - out = run("instance", "--disable-https") - assert "Mastodon" in out - assert app.instance in out - assert "running Mastodon" in out - - -def test_instance_anon(app, run_anon): - out = run_anon("instance", BASE_URL) - assert "Mastodon" in out - assert app.instance in out - assert "running Mastodon" in out - - # Need to specify the instance name when running anon - with pytest.raises(ConsoleError) as exc: - run_anon("instance") - assert str(exc.value) == "Please specify an instance." - - -def test_post(app, user, run): - text = "i wish i was a #lumberjack" - out = run("post", text) - status_id = _posted_status_id(out) - - status = api.fetch_status(app, user, status_id) - assert text == get_text(status["content"]) - assert status["visibility"] == "public" - assert status["sensitive"] is False - assert status["spoiler_text"] == "" - assert status["poll"] is None - - # Pleroma doesn't return the application - if status["application"]: - assert status["application"]["name"] == CLIENT_NAME - assert status["application"]["website"] == CLIENT_WEBSITE - - -def test_post_visibility(app, user, run): - for visibility in ["public", "unlisted", "private", "direct"]: - out = run("post", "foo", "--visibility", visibility) - status_id = _posted_status_id(out) - status = api.fetch_status(app, user, status_id) - assert status["visibility"] == visibility - - -def test_post_scheduled_at(app, user, run): - text = str(uuid.uuid4()) - scheduled_at = datetime.now(timezone.utc).replace(microsecond=0) + timedelta(minutes=10) - - out = run("post", text, "--scheduled-at", scheduled_at.isoformat()) - assert "Toot scheduled for" in out - - statuses = api.scheduled_statuses(app, user) - [status] = [s for s in statuses if s["params"]["text"] == text] - assert datetime.strptime(status["scheduled_at"], "%Y-%m-%dT%H:%M:%S.%f%z") == scheduled_at - - -def test_post_scheduled_in(app, user, run): - text = str(uuid.uuid4()) - - variants = [ - ("1 day", timedelta(days=1)), - ("1 day 6 hours", timedelta(days=1, hours=6)), - ("1 day 6 hours 13 minutes", timedelta(days=1, hours=6, minutes=13)), - ("1 day 6 hours 13 minutes 51 second", timedelta(days=1, hours=6, minutes=13, seconds=51)), - ("2d", timedelta(days=2)), - ("2d6h", timedelta(days=2, hours=6)), - ("2d6h13m", timedelta(days=2, hours=6, minutes=13)), - ("2d6h13m51s", timedelta(days=2, hours=6, minutes=13, seconds=51)), - ] - - datetimes = [] - for scheduled_in, delta in variants: - out = run("post", text, "--scheduled-in", scheduled_in) - dttm = datetime.utcnow() + delta - assert out.startswith(f"Toot scheduled for: {str(dttm)[:16]}") - datetimes.append(dttm) - - scheduled = api.scheduled_statuses(app, user) - scheduled = [s for s in scheduled if s["params"]["text"] == text] - scheduled = sorted(scheduled, key=lambda s: s["scheduled_at"]) - assert len(scheduled) == 8 - - for expected, status in zip(datetimes, scheduled): - actual = datetime.strptime(status["scheduled_at"], "%Y-%m-%dT%H:%M:%S.%fZ") - delta = expected - actual - assert delta.total_seconds() < 5 - - -def test_post_poll(app, user, run): - text = str(uuid.uuid4()) - - out = run( - "post", text, - "--poll-option", "foo", - "--poll-option", "bar", - "--poll-option", "baz", - "--poll-option", "qux", - ) - - status_id = _posted_status_id(out) - - status = api.fetch_status(app, user, status_id) - assert status["poll"]["expired"] is False - assert status["poll"]["multiple"] is False - assert status["poll"]["options"] == [ - {"title": "foo", "votes_count": 0}, - {"title": "bar", "votes_count": 0}, - {"title": "baz", "votes_count": 0}, - {"title": "qux", "votes_count": 0} - ] - - # Test expires_at is 24h by default - actual = datetime.strptime(status["poll"]["expires_at"], "%Y-%m-%dT%H:%M:%S.%f%z") - expected = datetime.now(timezone.utc) + timedelta(days=1) - delta = actual - expected - assert delta.total_seconds() < 5 - - -def test_post_poll_multiple(app, user, run): - text = str(uuid.uuid4()) - - out = run( - "post", text, - "--poll-option", "foo", - "--poll-option", "bar", - "--poll-multiple" - ) - - status_id = _posted_status_id(out) - - status = api.fetch_status(app, user, status_id) - assert status["poll"]["multiple"] is True - - -def test_post_poll_expires_in(app, user, run): - text = str(uuid.uuid4()) - - out = run( - "post", text, - "--poll-option", "foo", - "--poll-option", "bar", - "--poll-expires-in", "8h", - ) - - status_id = _posted_status_id(out) - - status = api.fetch_status(app, user, status_id) - actual = datetime.strptime(status["poll"]["expires_at"], "%Y-%m-%dT%H:%M:%S.%f%z") - expected = datetime.now(timezone.utc) + timedelta(hours=8) - delta = actual - expected - assert delta.total_seconds() < 5 - - -def test_post_poll_hide_totals(app, user, run): - text = str(uuid.uuid4()) - - out = run( - "post", text, - "--poll-option", "foo", - "--poll-option", "bar", - "--poll-hide-totals" - ) - - status_id = _posted_status_id(out) - - status = api.fetch_status(app, user, status_id) - - # votes_count is None when totals are hidden - assert status["poll"]["options"] == [ - {"title": "foo", "votes_count": None}, - {"title": "bar", "votes_count": None}, - ] - - -def test_post_language(app, user, run): - out = run("post", "test", "--language", "hr") - status_id = _posted_status_id(out) - status = api.fetch_status(app, user, status_id) - assert status["language"] == "hr" - - out = run("post", "test", "--language", "zh") - status_id = _posted_status_id(out) - status = api.fetch_status(app, user, status_id) - assert status["language"] == "zh" - - -def test_media_thumbnail(app, user, run): - assets_dir = path.realpath(path.join(path.dirname(__file__), "assets")) - - video_path = path.join(assets_dir, "small.webm") - thumbnail_path = path.join(assets_dir, "test1.png") - - out = run( - "post", - "--media", video_path, - "--thumbnail", thumbnail_path, - "--description", "foo", - "some text" - ) - - status_id = _posted_status_id(out) - status = api.fetch_status(app, user, status_id) - [media] = status["media_attachments"] - - assert media["description"] == "foo" - assert media["type"] == "video" - assert media["url"].endswith(".mp4") - assert media["preview_url"].endswith(".png") - - # Video properties - assert int(media["meta"]["original"]["duration"]) == 5 - assert media["meta"]["original"]["height"] == 320 - assert media["meta"]["original"]["width"] == 560 - - # Thumbnail properties - assert media["meta"]["small"]["height"] == 50 - assert media["meta"]["small"]["width"] == 50 - - -def test_media_attachments(app, user, run): - assets_dir = path.realpath(path.join(path.dirname(__file__), "assets")) - - path1 = path.join(assets_dir, "test1.png") - path2 = path.join(assets_dir, "test2.png") - path3 = path.join(assets_dir, "test3.png") - path4 = path.join(assets_dir, "test4.png") - - out = run( - "post", - "--media", path1, - "--media", path2, - "--media", path3, - "--media", path4, - "--description", "Test 1", - "--description", "Test 2", - "--description", "Test 3", - "--description", "Test 4", - "some text" - ) - - status_id = _posted_status_id(out) - status = api.fetch_status(app, user, status_id) - - [a1, a2, a3, a4] = status["media_attachments"] - - # Pleroma doesn't send metadata - if "meta" in a1: - assert a1["meta"]["original"]["size"] == "50x50" - assert a2["meta"]["original"]["size"] == "50x60" - assert a3["meta"]["original"]["size"] == "50x70" - assert a4["meta"]["original"]["size"] == "50x80" - - assert a1["description"] == "Test 1" - assert a2["description"] == "Test 2" - assert a3["description"] == "Test 3" - assert a4["description"] == "Test 4" - - -@mock.patch("toot.utils.multiline_input") -@mock.patch("sys.stdin.read") -def test_media_attachment_without_text(mock_read, mock_ml, app, user, run): - # No status from stdin or readline - mock_read.return_value = "" - mock_ml.return_value = "" - - assets_dir = path.realpath(path.join(path.dirname(__file__), "assets")) - media_path = path.join(assets_dir, "test1.png") - - out = run("post", "--media", media_path) - status_id = _posted_status_id(out) - - status = api.fetch_status(app, user, status_id) - assert status["content"] == "" - - [attachment] = status["media_attachments"] - assert not attachment["description"] - - # Pleroma doesn't send metadata - if "meta" in attachment: - assert attachment["meta"]["original"]["size"] == "50x50" - - -def test_delete_status(app, user, run): - status = api.post_status(app, user, "foo") - - out = run("delete", status["id"]) - assert out == "✓ Status deleted" - - with pytest.raises(NotFoundError): - api.fetch_status(app, user, status["id"]) - - -def test_reply_thread(app, user, friend, run): - status = api.post_status(app, friend, "This is the status") - - out = run("post", "--reply-to", status["id"], "This is the reply") - status_id = _posted_status_id(out) - reply = api.fetch_status(app, user, status_id) - - assert reply["in_reply_to_id"] == status["id"] - - out = run("thread", status["id"]) - [s1, s2] = [s.strip() for s in re.split(r"─+", out) if s.strip()] - - assert "This is the status" in s1 - assert "This is the reply" in s2 - assert friend.username in s1 - assert user.username in s2 - assert status["id"] in s1 - assert reply["id"] in s2 - - -def test_favourite(app, user, run): - status = api.post_status(app, user, "foo") - assert not status["favourited"] - - out = run("favourite", status["id"]) - assert out == "✓ Status favourited" - - status = api.fetch_status(app, user, status["id"]) - assert status["favourited"] - - out = run("unfavourite", status["id"]) - assert out == "✓ Status unfavourited" - - # A short delay is required before the server returns new data - time.sleep(0.1) - - status = api.fetch_status(app, user, status["id"]) - assert not status["favourited"] - - -def test_reblog(app, user, run): - status = api.post_status(app, user, "foo") - assert not status["reblogged"] - - out = run("reblog", status["id"]) - assert out == "✓ Status reblogged" - - status = api.fetch_status(app, user, status["id"]) - assert status["reblogged"] - - out = run("reblogged_by", status["id"]) - assert out == f"@{user.username}" - - out = run("unreblog", status["id"]) - assert out == "✓ Status unreblogged" - - status = api.fetch_status(app, user, status["id"]) - assert not status["reblogged"] - - -def test_pin(app, user, run): - status = api.post_status(app, user, "foo") - assert not status["pinned"] - - out = run("pin", status["id"]) - assert out == "✓ Status pinned" - - status = api.fetch_status(app, user, status["id"]) - assert status["pinned"] - - out = run("unpin", status["id"]) - assert out == "✓ Status unpinned" - - status = api.fetch_status(app, user, status["id"]) - assert not status["pinned"] - - -def test_bookmark(app, user, run): - status = api.post_status(app, user, "foo") - assert not status["bookmarked"] - - out = run("bookmark", status["id"]) - assert out == "✓ Status bookmarked" - - status = api.fetch_status(app, user, status["id"]) - assert status["bookmarked"] - - out = run("unbookmark", status["id"]) - assert out == "✓ Status unbookmarked" - - status = api.fetch_status(app, user, status["id"]) - assert not status["bookmarked"] - - -def test_whoami(user, run): - out = run("whoami") - # TODO: test other fields once updating account is supported - assert f"@{user.username}" in out - - -def test_whois(app, friend, run): - variants = [ - friend.username, - f"@{friend.username}", - f"{friend.username}@{app.instance}", - f"@{friend.username}@{app.instance}", - ] - - for username in variants: - out = run("whois", username) - assert f"@{friend.username}" in out - - -def test_search_account(friend, run): - out = run("search", friend.username) - assert out == f"Accounts:\n* @{friend.username}" - - -def test_search_hashtag(app, user, run): - api.post_status(app, user, "#hashtag_x") - api.post_status(app, user, "#hashtag_y") - api.post_status(app, user, "#hashtag_z") - - out = run("search", "#hashtag") - assert out == "Hashtags:\n#hashtag_x, #hashtag_y, #hashtag_z" - - -def test_follow(friend, run): - out = run("follow", friend.username) - assert out == f"✓ You are now following {friend.username}" - - out = run("unfollow", friend.username) - assert out == f"✓ You are no longer following {friend.username}" - - -def test_follow_case_insensitive(friend, run): - username = friend.username.upper() - - out = run("follow", username) - assert out == f"✓ You are now following {username}" - - out = run("unfollow", username) - assert out == f"✓ You are no longer following {username}" - - -# TODO: improve testing stderr, catching exceptions is not optimal -def test_follow_not_found(run): - with pytest.raises(ConsoleError) as ex_info: - run("follow", "banana") - assert str(ex_info.value) == "Account not found" - - -def test_mute(app, user, friend, run): - out = run("mute", friend.username) - assert out == f"✓ You have muted {friend.username}" - - [muted_account] = api.get_muted_accounts(app, user) - assert muted_account["acct"] == friend.username - - out = run("unmute", friend.username) - assert out == f"✓ {friend.username} is no longer muted" - - assert api.get_muted_accounts(app, user) == [] - - -def test_block(app, user, friend, run): - out = run("block", friend.username) - assert out == f"✓ You are now blocking {friend.username}" - - [blockd_account] = api.get_blocked_accounts(app, user) - assert blockd_account["acct"] == friend.username - - out = run("unblock", friend.username) - assert out == f"✓ {friend.username} is no longer blocked" - - assert api.get_blocked_accounts(app, user) == [] - - -def test_following_followers(user, friend, run): - out = run("following", user.username) - assert out == "" - - run("follow", friend.username) - - out = run("following", user.username) - assert out == f"* @{friend.username}" - - out = run("followers", friend.username) - assert out == f"* @{user.username}" - - -def test_tags(run): - out = run("tags_followed") - assert out == "You're not following any hashtags." - - out = run("tags_follow", "foo") - assert out == "✓ You are now following #foo" - - out = run("tags_followed") - assert out == f"* #foo\t{BASE_URL}/tags/foo" - - out = run("tags_follow", "bar") - assert out == "✓ You are now following #bar" - - out = run("tags_followed") - assert out == "\n".join([ - f"* #bar\t{BASE_URL}/tags/bar", - f"* #foo\t{BASE_URL}/tags/foo", - ]) - - out = run("tags_unfollow", "foo") - assert out == "✓ You are no longer following #foo" - - out = run("tags_followed") - assert out == f"* #bar\t{BASE_URL}/tags/bar" - - -def test_update_account_no_options(run): - with pytest.raises(ConsoleError) as exc: - run("update_account") - assert str(exc.value) == "Please specify at least one option to update the account" - - -def test_update_account_display_name(run, app, user): - out = run("update_account", "--display-name", "elwood") - assert out == "✓ Account updated" - - account = api.verify_credentials(app, user) - assert account["display_name"] == "elwood" - - -def test_update_account_note(run, app, user): - note = ("It's 106 miles to Chicago, we got a full tank of gas, half a pack " - "of cigarettes, it's dark... and we're wearing sunglasses.") - - out = run("update_account", "--note", note) - assert out == "✓ Account updated" - - account = api.verify_credentials(app, user) - assert get_text(account["note"]) == note - - -def test_update_account_language(run, app, user): - out = run("update_account", "--language", "hr") - assert out == "✓ Account updated" - - account = api.verify_credentials(app, user) - assert account["source"]["language"] == "hr" - - -def test_update_account_privacy(run, app, user): - out = run("update_account", "--privacy", "private") - assert out == "✓ Account updated" - - account = api.verify_credentials(app, user) - assert account["source"]["privacy"] == "private" - - -def test_update_account_avatar(run, app, user): - account = api.verify_credentials(app, user) - old_value = account["avatar"] - - out = run("update_account", "--avatar", TRUMPET) - assert out == "✓ Account updated" - - account = api.verify_credentials(app, user) - assert account["avatar"] != old_value - - -def test_update_account_header(run, app, user): - account = api.verify_credentials(app, user) - old_value = account["header"] - - out = run("update_account", "--header", TRUMPET) - assert out == "✓ Account updated" - - account = api.verify_credentials(app, user) - assert account["header"] != old_value - - -def test_update_account_locked(run, app, user): - out = run("update_account", "--locked") - assert out == "✓ Account updated" - - account = api.verify_credentials(app, user) - assert account["locked"] is True - - out = run("update_account", "--no-locked") - assert out == "✓ Account updated" - - account = api.verify_credentials(app, user) - assert account["locked"] is False - - -def test_update_account_bot(run, app, user): - out = run("update_account", "--bot") - assert out == "✓ Account updated" - - account = api.verify_credentials(app, user) - assert account["bot"] is True - - out = run("update_account", "--no-bot") - assert out == "✓ Account updated" - - account = api.verify_credentials(app, user) - assert account["bot"] is False - - -def test_update_account_discoverable(run, app, user): - out = run("update_account", "--discoverable") - assert out == "✓ Account updated" - - account = api.verify_credentials(app, user) - assert account["discoverable"] is True - - out = run("update_account", "--no-discoverable") - assert out == "✓ Account updated" - - account = api.verify_credentials(app, user) - assert account["discoverable"] is False - - -def test_update_account_sensitive(run, app, user): - out = run("update_account", "--sensitive") - assert out == "✓ Account updated" - - account = api.verify_credentials(app, user) - assert account["source"]["sensitive"] is True - - out = run("update_account", "--no-sensitive") - assert out == "✓ Account updated" - - account = api.verify_credentials(app, user) - assert account["source"]["sensitive"] is False - - -# ------------------------------------------------------------------------------ -# Utils -# ------------------------------------------------------------------------------ - -strip_ansi_pattern = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") - - -def strip_ansi(string): - return strip_ansi_pattern.sub("", string).strip() - - -def _posted_status_id(out): - pattern = re.compile(r"Toot posted: http://([^/]+)/([^/]+)/(.+)") - match = re.search(pattern, out) - assert match - - _, _, status_id = match.groups() - - return status_id diff --git a/toot/api.py b/toot/api.py index f1beece..8557c74 100644 --- a/toot/api.py +++ b/toot/api.py @@ -519,3 +519,45 @@ def clear_notifications(app, user): def get_instance(base_url): url = f"{base_url}/api/v1/instance" return http.anon_get(url).json() + + +def get_lists(app, user): + path = "/api/v1/lists" + return _get_response_list(app, user, path) + + +def find_list_id(app, user, title): + lists = get_lists(app, user) + for list_item in lists: + if list_item["title"] == title: + return list_item["id"] + return None + + +def get_list_accounts(app, user, list_id): + path = f"/api/v1/lists/{list_id}/accounts" + return _get_response_list(app, user, path) + + +def create_list(app, user, title, replies_policy): + url = "/api/v1/lists" + json = {'title': title} + if replies_policy: + json['replies_policy'] = replies_policy + return http.post(app, user, url, json=json).json() + + +def delete_list(app, user, id): + return http.delete(app, user, f"/api/v1/lists/{id}") + + +def add_accounts_to_list(app, user, list_id, account_ids): + url = f"/api/v1/lists/{list_id}/accounts" + json = {'account_ids': account_ids} + return http.post(app, user, url, json=json).json() + + +def remove_accounts_from_list(app, user, list_id, account_ids): + url = f"/api/v1/lists/{list_id}/accounts" + json = {'account_ids': account_ids} + return http.delete(app, user, url, json=json) diff --git a/toot/commands.py b/toot/commands.py index e105a8d..ce34e58 100644 --- a/toot/commands.py +++ b/toot/commands.py @@ -1,3 +1,4 @@ + import sys import platform @@ -6,9 +7,9 @@ from time import sleep, time from toot import api, config, __version__ from toot.auth import login_interactive, login_browser_interactive, create_app_interactive from toot.exceptions import ApiError, ConsoleError -from toot.output import (print_out, print_instance, print_account, print_acct_list, - print_search_results, print_timeline, print_notifications, - print_tag_list) +from toot.output import (print_lists, print_out, print_instance, print_account, print_acct_list, + print_search_results, print_table, print_timeline, print_notifications, + print_tag_list, print_list_accounts) from toot.tui.utils import parse_datetime from toot.utils import args_get_instance, delete_tmp_status_file, editor_input, multiline_input, EOF_KEY @@ -423,6 +424,82 @@ def tags_followed(app, user, args): print_tag_list(response) +def lists(app, user, args): + lists = api.get_lists(app, user) + + if lists: + print_lists(lists) + else: + print_out("You have no lists defined.") + + +def list_accounts(app, user, args): + list_id = args.id if args.id else api.find_list_id(app, user, args.title) + if not list_id: + print_out("List not found") + return + + response = api.get_list_accounts(app, user, list_id) + print_list_accounts(response) + + +def list_create(app, user, args): + api.create_list(app, user, title=args.title, replies_policy=args.replies_policy) + print_out(f"✓ List \"{args.title}\" created.") + + +def list_delete(app, user, args): + list_id = args.id if args.id else api.find_list_id(app, user, args.title) + if not list_id: + print_out("List not found") + return + + api.delete_list(app, user, list_id) + print_out(f"✓ List \"{args.title if args.title else args.id}\" deleted.") + + +def list_add(app, user, args): + list_id = args.id if args.id else api.find_list_id(app, user, args.title) + if not list_id: + print_out("List not found") + return + account = find_account(app, user, args.account) + if not account: + print_out("Account not found") + return + try: + api.add_accounts_to_list(app, user, list_id, [account['id']]) + except Exception as ex: + # if we failed to add the account, try to give a + # more specific error message than "record not found" + my_accounts = api.followers(app, user, account['id']) + found = False + if my_accounts: + for my_account in my_accounts: + if my_account['id'] == account['id']: + found = True + break + if found is False: + print_out(f"You must follow @{account['acct']} before adding this account to a list.") + else: + print_out(f"{ex}") + return + print_out(f"✓ Added account \"{args.account}\"") + + +def list_remove(app, user, args): + list_id = args.id if args.id else api.find_list_id(app, user, args.title) + if not list_id: + print_out("List not found") + return + account = find_account(app, user, args.account) + if not account: + print_out("Account not found") + return + api.remove_accounts_from_list(app, user, list_id, [account['id']]) + print_out(f"✓ Removed account \"{args.account}\"") + + def mute(app, user, args): account = find_account(app, user, args.account) api.mute(app, user, account['id']) diff --git a/toot/console.py b/toot/console.py index 0a3acb5..5c84d97 100644 --- a/toot/console.py +++ b/toot/console.py @@ -724,6 +724,101 @@ TAG_COMMANDS = [ ), ] +LIST_COMMANDS = [ + Command( + name="lists", + description="List all lists", + arguments=[], + require_auth=True, + ), + Command( + name="list_accounts", + description="List the accounts in a list", + arguments=[ + (["--id"], { + "type": str, + "help": "ID of the list" + }), + (["title"], { + "type": str, + "nargs": "?", + "help": "title of the list" + }), + ], + require_auth=True, + ), + Command( + name="list_create", + description="Create a list", + arguments=[ + (["title"], { + "type": str, + "help": "title of the list" + }), + (["--replies-policy"], { + "type": str, + "help": "replies policy: 'followed', 'list', or 'none' (defaults to 'none')" + }), + ], + require_auth=True, + ), + Command( + name="list_delete", + description="Delete a list", + arguments=[ + (["--id"], { + "type": str, + "help": "ID of the list" + }), + (["title"], { + "type": str, + "nargs": "?", + "help": "title of the list" + }), + ], + require_auth=True, + ), + Command( + name="list_add", + description="Add account to list", + arguments=[ + (["--id"], { + "type": str, + "help": "ID of the list" + }), + (["title"], { + "type": str, + "nargs": "?", + "help": "title of the list" + }), + (["account"], { + "type": str, + "help": "Account to add" + }), + ], + require_auth=True, + ), + Command( + name="list_remove", + description="Remove account from list", + arguments=[ + (["--id"], { + "type": str, + "help": "ID of the list" + }), + (["title"], { + "type": str, + "nargs": "?", + "help": "title of the list" + }), + (["account"], { + "type": str, + "help": "Account to remove" + }), + ], + require_auth=True, + ), +] COMMAND_GROUPS = [ ("Authentication", AUTH_COMMANDS), ("TUI", TUI_COMMANDS), @@ -732,6 +827,7 @@ COMMAND_GROUPS = [ ("Status", STATUS_COMMANDS), ("Accounts", ACCOUNTS_COMMANDS), ("Hashtags", TAG_COMMANDS), + ("Lists", LIST_COMMANDS), ] COMMANDS = list(chain(*[commands for _, commands in COMMAND_GROUPS])) diff --git a/toot/http.py b/toot/http.py index 597edc9..4e62bda 100644 --- a/toot/http.py +++ b/toot/http.py @@ -92,13 +92,13 @@ def patch(app, user, path, headers=None, files=None, data=None, json=None): return process_response(response) -def delete(app, user, path, data=None, headers=None): +def delete(app, user, path, data=None, json=None, headers=None): url = app.base_url + path headers = headers or {} headers["Authorization"] = f"Bearer {user.access_token}" - request = Request('DELETE', url, headers=headers, json=data) + request = Request('DELETE', url, headers=headers, data=data, json=json) response = send_request(request) return process_response(response) diff --git a/toot/output.py b/toot/output.py index 5be6a92..c89aca9 100644 --- a/toot/output.py +++ b/toot/output.py @@ -3,9 +3,10 @@ import re import sys import textwrap -from toot.tui.utils import parse_datetime +from typing import List from wcwidth import wcswidth +from toot.tui.utils import parse_datetime from toot.utils import get_text, parse_html from toot.wcstring import wc_wrap @@ -210,6 +211,43 @@ def print_tag_list(tags): print_out("You're not following any hashtags.") +def print_lists(lists): + headers = ["ID", "Title", "Replies"] + data = [[lst["id"], lst["title"], lst["replies_policy"]] for lst in lists] + print_table(headers, data) + + +def print_table(headers: List[str], data: List[List[str]]): + widths = [[len(cell) for cell in row] for row in data + [headers]] + widths = [max(width) for width in zip(*widths)] + + def style(string, tag): + return f"<{tag}>{string}" if tag else string + + def print_row(row, tag=None): + for idx, cell in enumerate(row): + width = widths[idx] + print_out(style(cell.ljust(width), tag), end="") + print_out(" ", end="") + print_out() + + underlines = ["-" * width for width in widths] + + print_row(headers, "bold") + print_row(underlines, "dim") + + for row in data: + print_row(row) + + +def print_list_accounts(accounts): + if accounts: + print_out("Accounts in list:\n") + print_acct_list(accounts) + else: + print_out("This list has no accounts.") + + def print_search_results(results): accounts = results['accounts'] hashtags = results['hashtags'] diff --git a/toot/tui/app.py b/toot/tui/app.py index 4ba25ab..b49dee2 100644 --- a/toot/tui/app.py +++ b/toot/tui/app.py @@ -398,7 +398,9 @@ class TUI(urwid.Frame): def show_goto_menu(self): user_timelines = self.config.get("timelines", {}) - menu = GotoMenu(user_timelines) + user_lists = api.get_lists(self.app, self.user) or [] + + menu = GotoMenu(user_timelines, user_lists) urwid.connect_signal(menu, "home_timeline", lambda x: self.goto_home_timeline()) urwid.connect_signal(menu, "public_timeline", @@ -411,10 +413,12 @@ class TUI(urwid.Frame): lambda x, local: self.goto_conversations()) urwid.connect_signal(menu, "hashtag_timeline", lambda x, tag, local: self.goto_tag_timeline(tag, local=local)) + urwid.connect_signal(menu, "list_timeline", + lambda x, list_item: self.goto_list_timeline(list_item)) self.open_overlay(menu, title="Go to", options=dict( align="center", width=("relative", 60), - valign="middle", height=16 + len(user_timelines), + valign="middle", height=17 + len(user_timelines) + len(user_lists), )) def show_help(self): @@ -468,6 +472,13 @@ class TUI(urwid.Frame): ) promise.add_done_callback(lambda *args: self.close_overlay()) + def goto_list_timeline(self, list_item): + self.timeline_generator = api.timeline_list_generator( + self.app, self.user, list_item['id'], limit=40) + promise = self.async_load_timeline( + is_initial=True, timeline_name=f"\N{clipboard}{list_item['title']}") + promise.add_done_callback(lambda *args: self.close_overlay()) + def show_media(self, status): urls = [m["url"] for m in status.original.data["media_attachments"]] if urls: @@ -660,12 +671,19 @@ class TUI(urwid.Frame): def refresh_timeline(self): # No point in refreshing the bookmarks timeline - if not self.timeline or self.timeline.name == 'bookmarks': + # and we don't have a good way to refresh a + # list timeline yet (no reference to list ID kept) + if (not self.timeline + or self.timeline.name == 'bookmarks' + or self.timeline.name.startswith("\N{clipboard}")): return if self.timeline.name.startswith("#"): self.timeline_generator = api.tag_timeline_generator( self.app, self.user, self.timeline.name[1:], limit=40) + elif self.timeline.name.startswith("\N{clipboard}"): + self.timeline_generator = api.tag_timeline_generator( + self.app, self.user, self.timeline.name[1:], limit=40) else: if self.timeline.name.endswith("public"): self.timeline_generator = api.public_timeline_generator( diff --git a/toot/tui/overlays.py b/toot/tui/overlays.py index 58b902a..a9c2442 100644 --- a/toot/tui/overlays.py +++ b/toot/tui/overlays.py @@ -102,20 +102,21 @@ class GotoMenu(urwid.ListBox): "bookmark_timeline", "notification_timeline", "conversation_timeline", + "list_timeline", ] - def __init__(self, user_timelines): + def __init__(self, user_timelines, user_lists): self.hash_edit = EditBox(caption="Hashtag: ") self.message_widget = urwid.Text("") - actions = list(self.generate_actions(user_timelines)) + actions = list(self.generate_actions(user_timelines, user_lists)) walker = urwid.SimpleFocusListWalker(actions) super().__init__(walker) def get_hashtag(self): return self.hash_edit.edit_text.strip().lstrip("#") - def generate_actions(self, user_timelines): + def generate_actions(self, user_timelines, user_lists): def _home(button): self._emit("home_timeline") @@ -147,6 +148,11 @@ class GotoMenu(urwid.ListBox): self._emit("hashtag_timeline", tag, local) return on_press + def mk_on_press_user_list(list_item): + def on_press(btn): + self._emit("list_timeline", list_item) + return on_press + yield Button("Home timeline", on_press=_home) yield Button("Local public timeline", on_press=_local_public) yield Button("Global public timeline", on_press=_global_public) @@ -164,6 +170,10 @@ class GotoMenu(urwid.ListBox): yield Button(f"#{tag}" + (" (local)" if is_local else ""), on_press=mk_on_press_user_hashtag(tag, is_local)) + for list_item in user_lists: + yield Button(f"\N{clipboard}{list_item['title']}", + on_press=mk_on_press_user_list(list_item)) + yield urwid.Divider() yield self.hash_edit yield Button("Local hashtag timeline", on_press=lambda x: _hashtag(True)) diff --git a/toot/tui/scroll.py b/toot/tui/scroll.py index fa2c3bb..fe89be8 100644 --- a/toot/tui/scroll.py +++ b/toot/tui/scroll.py @@ -423,4 +423,4 @@ class ScrollBar(urwid.WidgetDecoration): ow.set_scrollpos(pos + 1) return True - return False \ No newline at end of file + return False diff --git a/toot/tui/widgets.py b/toot/tui/widgets.py index 6f46fb3..f2ae4b8 100644 --- a/toot/tui/widgets.py +++ b/toot/tui/widgets.py @@ -1,4 +1,5 @@ import urwid +from wcwidth import wcswidth class Clickable: @@ -40,12 +41,12 @@ class Button(urwid.AttrWrap): """Styled button.""" def __init__(self, *args, **kwargs): button = urwid.Button(*args, **kwargs) - padding = urwid.Padding(button, width=len(args[0]) + 4) + padding = urwid.Padding(button, width=wcswidth(args[0]) + 4) return super().__init__(padding, "button", "button_focused") def set_label(self, *args, **kwargs): self.original_widget.original_widget.set_label(*args, **kwargs) - self.original_widget.width = len(args[0]) + 4 + self.original_widget.width = wcswidth(args[0]) + 4 class CheckBox(urwid.AttrWrap):