Merge branch 'master' into image11

This commit is contained in:
Daniel Schwarz 2023-03-30 18:20:40 -04:00 committed by GitHub
commit d64175fe6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1086 additions and 775 deletions

View File

@ -1,4 +1,4 @@
[flake8]
exclude=build,tests,tmp,venv,toot/tui/scroll.py
ignore=E128
ignore=E128,W503
max-line-length=120

View File

View File

@ -0,0 +1,130 @@
"""
This module contains integration tests meant to run against a test Mastodon instance.
You can set up a test instance locally by following this guide:
https://docs.joinmastodon.org/dev/setup/
To enable integration tests, export the following environment variables to match
your test server and database:
```
export TOOT_TEST_HOSTNAME="localhost:3000"
export TOOT_TEST_DATABASE_DSN="dbname=mastodon_development"
```
"""
import re
import os
import psycopg2
import pytest
import uuid
from pathlib import Path
from toot import api, App, User
from toot.console import run_command
from toot.exceptions import ApiError, ConsoleError
from toot.output import print_out
# Host name of a test instance to run integration tests against
# DO NOT USE PUBLIC INSTANCES!!!
BASE_URL = os.getenv("TOOT_TEST_BASE_URL")
# Mastodon database name, used to confirm user registration without having to click the link
DATABASE_DSN = os.getenv("TOOT_TEST_DATABASE_DSN")
# Toot logo used for testing image upload
TRUMPET = str(Path(__file__).parent.parent.parent / "trumpet.png")
ASSETS_DIR = str(Path(__file__).parent.parent / "assets")
if not BASE_URL or not DATABASE_DSN:
pytest.skip("Skipping integration tests", allow_module_level=True)
# ------------------------------------------------------------------------------
# Fixtures
# ------------------------------------------------------------------------------
def create_app():
instance = api.get_instance(BASE_URL)
response = api.create_app(BASE_URL)
return App(instance["uri"], BASE_URL, response["client_id"], response["client_secret"])
def register_account(app: App):
username = str(uuid.uuid4())[-10:]
email = f"{username}@example.com"
response = api.register_account(app, username, email, "password", "en")
confirm_user(email)
return User(app.instance, username, response["access_token"])
def confirm_user(email):
conn = psycopg2.connect(DATABASE_DSN)
cursor = conn.cursor()
cursor.execute("UPDATE users SET confirmed_at = now() WHERE email = %s;", (email,))
conn.commit()
@pytest.fixture(scope="session")
def app():
return create_app()
@pytest.fixture(scope="session")
def user(app):
return register_account(app)
@pytest.fixture(scope="session")
def friend(app):
return register_account(app)
@pytest.fixture
def run(app, user, capsys):
def _run(command, *params, as_user=None):
# The try/catch duplicates logic from console.main to convert exceptions
# to printed error messages. TODO: could be deduped
try:
run_command(app, as_user or user, command, params or [])
except (ConsoleError, ApiError) as e:
print_out(str(e))
out, err = capsys.readouterr()
assert err == ""
return strip_ansi(out)
return _run
@pytest.fixture
def run_anon(capsys):
def _run(command, *params):
run_command(None, None, command, params or [])
out, err = capsys.readouterr()
assert err == ""
return strip_ansi(out)
return _run
# ------------------------------------------------------------------------------
# Utils
# ------------------------------------------------------------------------------
strip_ansi_pattern = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
def strip_ansi(string):
return strip_ansi_pattern.sub("", string).strip()
def posted_status_id(out):
pattern = re.compile(r"Toot posted: http://([^/]+)/([^/]+)/(.+)")
match = re.search(pattern, out)
assert match
_, _, status_id = match.groups()
return status_id

View File

@ -0,0 +1,19 @@
def test_whoami(user, run):
out = run("whoami")
# TODO: test other fields once updating account is supported
assert f"@{user.username}" in out
def test_whois(app, friend, run):
variants = [
friend.username,
f"@{friend.username}",
f"{friend.username}@{app.instance}",
f"@{friend.username}@{app.instance}",
]
for username in variants:
out = run("whois", username)
assert f"@{friend.username}" in out

View File

@ -0,0 +1,121 @@
from tests.integration.conftest import TRUMPET
from toot import api
from toot.utils import get_text
def test_update_account_no_options(run):
out = run("update_account")
assert out == "Please specify at least one option to update the account"
def test_update_account_display_name(run, app, user):
out = run("update_account", "--display-name", "elwood")
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
assert account["display_name"] == "elwood"
def test_update_account_note(run, app, user):
note = ("It's 106 miles to Chicago, we got a full tank of gas, half a pack "
"of cigarettes, it's dark... and we're wearing sunglasses.")
out = run("update_account", "--note", note)
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
assert get_text(account["note"]) == note
def test_update_account_language(run, app, user):
out = run("update_account", "--language", "hr")
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
assert account["source"]["language"] == "hr"
def test_update_account_privacy(run, app, user):
out = run("update_account", "--privacy", "private")
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
assert account["source"]["privacy"] == "private"
def test_update_account_avatar(run, app, user):
account = api.verify_credentials(app, user)
old_value = account["avatar"]
out = run("update_account", "--avatar", TRUMPET)
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
assert account["avatar"] != old_value
def test_update_account_header(run, app, user):
account = api.verify_credentials(app, user)
old_value = account["header"]
out = run("update_account", "--header", TRUMPET)
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
assert account["header"] != old_value
def test_update_account_locked(run, app, user):
out = run("update_account", "--locked")
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
assert account["locked"] is True
out = run("update_account", "--no-locked")
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
assert account["locked"] is False
def test_update_account_bot(run, app, user):
out = run("update_account", "--bot")
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
assert account["bot"] is True
out = run("update_account", "--no-bot")
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
assert account["bot"] is False
def test_update_account_discoverable(run, app, user):
out = run("update_account", "--discoverable")
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
assert account["discoverable"] is True
out = run("update_account", "--no-discoverable")
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
assert account["discoverable"] is False
def test_update_account_sensitive(run, app, user):
out = run("update_account", "--sensitive")
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
assert account["source"]["sensitive"] is True
out = run("update_account", "--no-sensitive")
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
assert account["source"]["sensitive"] is False

View File

@ -0,0 +1,67 @@
from tests.integration.conftest import register_account
def test_lists_empty(run):
out = run("lists")
assert out == "You have no lists defined."
def test_list_create_delete(run):
out = run("list_create", "banana")
assert out == '✓ List "banana" created.'
out = run("lists")
assert "banana" in out
out = run("list_create", "mango")
assert out == '✓ List "mango" created.'
out = run("lists")
assert "banana" in out
assert "mango" in out
out = run("list_delete", "banana")
assert out == '✓ List "banana" deleted.'
out = run("lists")
assert "banana" not in out
assert "mango" in out
out = run("list_delete", "mango")
assert out == '✓ List "mango" deleted.'
out = run("lists")
assert out == "You have no lists defined."
out = run("list_delete", "mango")
assert out == "List not found"
def test_list_add_remove(run, app):
acc = register_account(app)
run("list_create", "foo")
out = run("list_add", "foo", acc.username)
assert out == f"You must follow @{acc.username} before adding this account to a list."
run("follow", acc.username)
out = run("list_add", "foo", acc.username)
assert out == f'✓ Added account "{acc.username}"'
out = run("list_accounts", "foo")
assert acc.username in out
# Account doesn't exist
out = run("list_add", "foo", "does_not_exist")
assert out == "Account not found"
# List doesn't exist
out = run("list_add", "does_not_exist", acc.username)
assert out == "List not found"
out = run("list_remove", "foo", acc.username)
assert out == f'✓ Removed account "{acc.username}"'
out = run("list_accounts", "foo")
assert out == "This list has no accounts."

View File

@ -0,0 +1,288 @@
import re
import uuid
from datetime import datetime, timedelta, timezone
from os import path
from tests.integration.conftest import ASSETS_DIR, posted_status_id
from toot import CLIENT_NAME, CLIENT_WEBSITE, api
from toot.utils import get_text
from unittest import mock
def test_post(app, user, run):
text = "i wish i was a #lumberjack"
out = run("post", text)
status_id = posted_status_id(out)
status = api.fetch_status(app, user, status_id)
assert text == get_text(status["content"])
assert status["visibility"] == "public"
assert status["sensitive"] is False
assert status["spoiler_text"] == ""
assert status["poll"] is None
# Pleroma doesn't return the application
if status["application"]:
assert status["application"]["name"] == CLIENT_NAME
assert status["application"]["website"] == CLIENT_WEBSITE
def test_post_visibility(app, user, run):
for visibility in ["public", "unlisted", "private", "direct"]:
out = run("post", "foo", "--visibility", visibility)
status_id = posted_status_id(out)
status = api.fetch_status(app, user, status_id)
assert status["visibility"] == visibility
def test_post_scheduled_at(app, user, run):
text = str(uuid.uuid4())
scheduled_at = datetime.now(timezone.utc).replace(microsecond=0) + timedelta(minutes=10)
out = run("post", text, "--scheduled-at", scheduled_at.isoformat())
assert "Toot scheduled for" in out
statuses = api.scheduled_statuses(app, user)
[status] = [s for s in statuses if s["params"]["text"] == text]
assert datetime.strptime(status["scheduled_at"], "%Y-%m-%dT%H:%M:%S.%f%z") == scheduled_at
def test_post_scheduled_in(app, user, run):
text = str(uuid.uuid4())
variants = [
("1 day", timedelta(days=1)),
("1 day 6 hours", timedelta(days=1, hours=6)),
("1 day 6 hours 13 minutes", timedelta(days=1, hours=6, minutes=13)),
("1 day 6 hours 13 minutes 51 second", timedelta(days=1, hours=6, minutes=13, seconds=51)),
("2d", timedelta(days=2)),
("2d6h", timedelta(days=2, hours=6)),
("2d6h13m", timedelta(days=2, hours=6, minutes=13)),
("2d6h13m51s", timedelta(days=2, hours=6, minutes=13, seconds=51)),
]
datetimes = []
for scheduled_in, delta in variants:
out = run("post", text, "--scheduled-in", scheduled_in)
dttm = datetime.utcnow() + delta
assert out.startswith(f"Toot scheduled for: {str(dttm)[:16]}")
datetimes.append(dttm)
scheduled = api.scheduled_statuses(app, user)
scheduled = [s for s in scheduled if s["params"]["text"] == text]
scheduled = sorted(scheduled, key=lambda s: s["scheduled_at"])
assert len(scheduled) == 8
for expected, status in zip(datetimes, scheduled):
actual = datetime.strptime(status["scheduled_at"], "%Y-%m-%dT%H:%M:%S.%fZ")
delta = expected - actual
assert delta.total_seconds() < 5
def test_post_poll(app, user, run):
text = str(uuid.uuid4())
out = run(
"post", text,
"--poll-option", "foo",
"--poll-option", "bar",
"--poll-option", "baz",
"--poll-option", "qux",
)
status_id = posted_status_id(out)
status = api.fetch_status(app, user, status_id)
assert status["poll"]["expired"] is False
assert status["poll"]["multiple"] is False
assert status["poll"]["options"] == [
{"title": "foo", "votes_count": 0},
{"title": "bar", "votes_count": 0},
{"title": "baz", "votes_count": 0},
{"title": "qux", "votes_count": 0}
]
# Test expires_at is 24h by default
actual = datetime.strptime(status["poll"]["expires_at"], "%Y-%m-%dT%H:%M:%S.%f%z")
expected = datetime.now(timezone.utc) + timedelta(days=1)
delta = actual - expected
assert delta.total_seconds() < 5
def test_post_poll_multiple(app, user, run):
text = str(uuid.uuid4())
out = run(
"post", text,
"--poll-option", "foo",
"--poll-option", "bar",
"--poll-multiple"
)
status_id = posted_status_id(out)
status = api.fetch_status(app, user, status_id)
assert status["poll"]["multiple"] is True
def test_post_poll_expires_in(app, user, run):
text = str(uuid.uuid4())
out = run(
"post", text,
"--poll-option", "foo",
"--poll-option", "bar",
"--poll-expires-in", "8h",
)
status_id = posted_status_id(out)
status = api.fetch_status(app, user, status_id)
actual = datetime.strptime(status["poll"]["expires_at"], "%Y-%m-%dT%H:%M:%S.%f%z")
expected = datetime.now(timezone.utc) + timedelta(hours=8)
delta = actual - expected
assert delta.total_seconds() < 5
def test_post_poll_hide_totals(app, user, run):
text = str(uuid.uuid4())
out = run(
"post", text,
"--poll-option", "foo",
"--poll-option", "bar",
"--poll-hide-totals"
)
status_id = posted_status_id(out)
status = api.fetch_status(app, user, status_id)
# votes_count is None when totals are hidden
assert status["poll"]["options"] == [
{"title": "foo", "votes_count": None},
{"title": "bar", "votes_count": None},
]
def test_post_language(app, user, run):
out = run("post", "test", "--language", "hr")
status_id = posted_status_id(out)
status = api.fetch_status(app, user, status_id)
assert status["language"] == "hr"
out = run("post", "test", "--language", "zh")
status_id = posted_status_id(out)
status = api.fetch_status(app, user, status_id)
assert status["language"] == "zh"
def test_media_thumbnail(app, user, run):
video_path = path.join(ASSETS_DIR, "small.webm")
thumbnail_path = path.join(ASSETS_DIR, "test1.png")
out = run(
"post",
"--media", video_path,
"--thumbnail", thumbnail_path,
"--description", "foo",
"some text"
)
status_id = posted_status_id(out)
status = api.fetch_status(app, user, status_id)
[media] = status["media_attachments"]
assert media["description"] == "foo"
assert media["type"] == "video"
assert media["url"].endswith(".mp4")
assert media["preview_url"].endswith(".png")
# Video properties
assert int(media["meta"]["original"]["duration"]) == 5
assert media["meta"]["original"]["height"] == 320
assert media["meta"]["original"]["width"] == 560
# Thumbnail properties
assert media["meta"]["small"]["height"] == 50
assert media["meta"]["small"]["width"] == 50
def test_media_attachments(app, user, run):
path1 = path.join(ASSETS_DIR, "test1.png")
path2 = path.join(ASSETS_DIR, "test2.png")
path3 = path.join(ASSETS_DIR, "test3.png")
path4 = path.join(ASSETS_DIR, "test4.png")
out = run(
"post",
"--media", path1,
"--media", path2,
"--media", path3,
"--media", path4,
"--description", "Test 1",
"--description", "Test 2",
"--description", "Test 3",
"--description", "Test 4",
"some text"
)
status_id = posted_status_id(out)
status = api.fetch_status(app, user, status_id)
[a1, a2, a3, a4] = status["media_attachments"]
# Pleroma doesn't send metadata
if "meta" in a1:
assert a1["meta"]["original"]["size"] == "50x50"
assert a2["meta"]["original"]["size"] == "50x60"
assert a3["meta"]["original"]["size"] == "50x70"
assert a4["meta"]["original"]["size"] == "50x80"
assert a1["description"] == "Test 1"
assert a2["description"] == "Test 2"
assert a3["description"] == "Test 3"
assert a4["description"] == "Test 4"
@mock.patch("toot.utils.multiline_input")
@mock.patch("sys.stdin.read")
def test_media_attachment_without_text(mock_read, mock_ml, app, user, run):
# No status from stdin or readline
mock_read.return_value = ""
mock_ml.return_value = ""
media_path = path.join(ASSETS_DIR, "test1.png")
out = run("post", "--media", media_path)
status_id = posted_status_id(out)
status = api.fetch_status(app, user, status_id)
assert status["content"] == ""
[attachment] = status["media_attachments"]
assert not attachment["description"]
# Pleroma doesn't send metadata
if "meta" in attachment:
assert attachment["meta"]["original"]["size"] == "50x50"
def test_reply_thread(app, user, friend, run):
status = api.post_status(app, friend, "This is the status")
out = run("post", "--reply-to", status["id"], "This is the reply")
status_id = posted_status_id(out)
reply = api.fetch_status(app, user, status_id)
assert reply["in_reply_to_id"] == status["id"]
out = run("thread", status["id"])
[s1, s2] = [s.strip() for s in re.split(r"─+", out) if s.strip()]
assert "This is the status" in s1
assert "This is the reply" in s2
assert friend.username in s1
assert user.username in s2
assert status["id"] in s1
assert reply["id"] in s2

View File

@ -0,0 +1,83 @@
import pytest
from tests.integration.conftest import BASE_URL
from toot import api
from toot.exceptions import ConsoleError
def test_instance(app, run):
out = run("instance", "--disable-https")
assert "Mastodon" in out
assert app.instance in out
assert "running Mastodon" in out
def test_instance_anon(app, run_anon):
out = run_anon("instance", BASE_URL)
assert "Mastodon" in out
assert app.instance in out
assert "running Mastodon" in out
# Need to specify the instance name when running anon
with pytest.raises(ConsoleError) as exc:
run_anon("instance")
assert str(exc.value) == "Please specify an instance."
def test_whoami(user, run):
out = run("whoami")
# TODO: test other fields once updating account is supported
assert f"@{user.username}" in out
def test_whois(app, friend, run):
variants = [
friend.username,
f"@{friend.username}",
f"{friend.username}@{app.instance}",
f"@{friend.username}@{app.instance}",
]
for username in variants:
out = run("whois", username)
assert f"@{friend.username}" in out
def test_search_account(friend, run):
out = run("search", friend.username)
assert out == f"Accounts:\n* @{friend.username}"
def test_search_hashtag(app, user, run):
api.post_status(app, user, "#hashtag_x")
api.post_status(app, user, "#hashtag_y")
api.post_status(app, user, "#hashtag_z")
out = run("search", "#hashtag")
assert out == "Hashtags:\n#hashtag_x, #hashtag_y, #hashtag_z"
def test_tags(run):
out = run("tags_followed")
assert out == "You're not following any hashtags."
out = run("tags_follow", "foo")
assert out == "✓ You are now following #foo"
out = run("tags_followed")
assert out == f"* #foo\t{BASE_URL}/tags/foo"
out = run("tags_follow", "bar")
assert out == "✓ You are now following #bar"
out = run("tags_followed")
assert out == "\n".join([
f"* #bar\t{BASE_URL}/tags/bar",
f"* #foo\t{BASE_URL}/tags/foo",
])
out = run("tags_unfollow", "foo")
assert out == "✓ You are no longer following #foo"
out = run("tags_followed")
assert out == f"* #bar\t{BASE_URL}/tags/bar"

View File

@ -0,0 +1,89 @@
import time
import pytest
from toot import api
from toot.exceptions import NotFoundError
def test_delete_status(app, user, run):
status = api.post_status(app, user, "foo")
out = run("delete", status["id"])
assert out == "✓ Status deleted"
with pytest.raises(NotFoundError):
api.fetch_status(app, user, status["id"])
def test_favourite(app, user, run):
status = api.post_status(app, user, "foo")
assert not status["favourited"]
out = run("favourite", status["id"])
assert out == "✓ Status favourited"
status = api.fetch_status(app, user, status["id"])
assert status["favourited"]
out = run("unfavourite", status["id"])
assert out == "✓ Status unfavourited"
# A short delay is required before the server returns new data
time.sleep(0.1)
status = api.fetch_status(app, user, status["id"])
assert not status["favourited"]
def test_reblog(app, user, run):
status = api.post_status(app, user, "foo")
assert not status["reblogged"]
out = run("reblog", status["id"])
assert out == "✓ Status reblogged"
status = api.fetch_status(app, user, status["id"])
assert status["reblogged"]
out = run("reblogged_by", status["id"])
assert user.username in out
out = run("unreblog", status["id"])
assert out == "✓ Status unreblogged"
status = api.fetch_status(app, user, status["id"])
assert not status["reblogged"]
def test_pin(app, user, run):
status = api.post_status(app, user, "foo")
assert not status["pinned"]
out = run("pin", status["id"])
assert out == "✓ Status pinned"
status = api.fetch_status(app, user, status["id"])
assert status["pinned"]
out = run("unpin", status["id"])
assert out == "✓ Status unpinned"
status = api.fetch_status(app, user, status["id"])
assert not status["pinned"]
def test_bookmark(app, user, run):
status = api.post_status(app, user, "foo")
assert not status["bookmarked"]
out = run("bookmark", status["id"])
assert out == "✓ Status bookmarked"
status = api.fetch_status(app, user, status["id"])
assert status["bookmarked"]
out = run("unbookmark", status["id"])
assert out == "✓ Status unbookmarked"
status = api.fetch_status(app, user, status["id"])
assert not status["bookmarked"]

View File

@ -177,7 +177,7 @@ def test_timeline_with_re(mock_get, monkeypatch, capsys):
mock_get.assert_called_once_with(app, user, '/api/v1/timelines/home', {'limit': 10})
out, err = capsys.readouterr()
lines = out.split("\n")
lines = uncolorize(out).split("\n")
assert "Frank Zappa" in lines[1]
assert "@fz" in lines[1]
@ -349,6 +349,7 @@ def test_search(mock_get, capsys):
})
out, err = capsys.readouterr()
out = uncolorize(out)
assert "Hashtags:\n#foo, #bar, #baz" in out
assert "Accounts:" in out
assert "@thequeen Freddy Mercury" in out

View File

@ -1,758 +0,0 @@
"""
This module contains integration tests meant to run against a test Mastodon instance.
You can set up a test instance locally by following this guide:
https://docs.joinmastodon.org/dev/setup/
To enable integration tests, export the following environment variables to match
your test server and database:
```
export TOOT_TEST_HOSTNAME="localhost:3000"
export TOOT_TEST_DATABASE_DSN="dbname=mastodon_development"
```
"""
import os
import psycopg2
import pytest
import re
import time
import uuid
from datetime import datetime, timedelta, timezone
from os import path
from toot import CLIENT_NAME, CLIENT_WEBSITE, api, App, User
from toot.console import run_command
from toot.exceptions import ConsoleError, NotFoundError
from toot.utils import get_text
from unittest import mock
# Host name of a test instance to run integration tests against
# DO NOT USE PUBLIC INSTANCES!!!
BASE_URL = os.getenv("TOOT_TEST_BASE_URL")
# Mastodon database name, used to confirm user registration without having to click the link
DATABASE_DSN = os.getenv("TOOT_TEST_DATABASE_DSN")
# Toot logo used for testing image upload
TRUMPET = path.join(path.dirname(path.dirname(path.realpath(__file__))), "trumpet.png")
if not BASE_URL or not DATABASE_DSN:
pytest.skip("Skipping integration tests", allow_module_level=True)
# ------------------------------------------------------------------------------
# Fixtures
# ------------------------------------------------------------------------------
def create_app():
instance = api.get_instance(BASE_URL)
response = api.create_app(BASE_URL)
return App(instance["uri"], BASE_URL, response["client_id"], response["client_secret"])
def register_account(app: App):
username = str(uuid.uuid4())[-10:]
email = f"{username}@example.com"
response = api.register_account(app, username, email, "password", "en")
confirm_user(email)
return User(app.instance, username, response["access_token"])
def confirm_user(email):
conn = psycopg2.connect(DATABASE_DSN)
cursor = conn.cursor()
cursor.execute("UPDATE users SET confirmed_at = now() WHERE email = %s;", (email,))
conn.commit()
@pytest.fixture(scope="session")
def app():
return create_app()
@pytest.fixture(scope="session")
def user(app):
return register_account(app)
@pytest.fixture(scope="session")
def friend(app):
return register_account(app)
@pytest.fixture
def run(app, user, capsys):
def _run(command, *params, as_user=None):
run_command(app, as_user or user, command, params or [])
out, err = capsys.readouterr()
assert err == ""
return strip_ansi(out)
return _run
@pytest.fixture
def run_anon(capsys):
def _run(command, *params):
run_command(None, None, command, params or [])
out, err = capsys.readouterr()
assert err == ""
return strip_ansi(out)
return _run
# ------------------------------------------------------------------------------
# Tests
# ------------------------------------------------------------------------------
def test_instance(app, run):
out = run("instance", "--disable-https")
assert "Mastodon" in out
assert app.instance in out
assert "running Mastodon" in out
def test_instance_anon(app, run_anon):
out = run_anon("instance", BASE_URL)
assert "Mastodon" in out
assert app.instance in out
assert "running Mastodon" in out
# Need to specify the instance name when running anon
with pytest.raises(ConsoleError) as exc:
run_anon("instance")
assert str(exc.value) == "Please specify an instance."
def test_post(app, user, run):
text = "i wish i was a #lumberjack"
out = run("post", text)
status_id = _posted_status_id(out)
status = api.fetch_status(app, user, status_id)
assert text == get_text(status["content"])
assert status["visibility"] == "public"
assert status["sensitive"] is False
assert status["spoiler_text"] == ""
assert status["poll"] is None
# Pleroma doesn't return the application
if status["application"]:
assert status["application"]["name"] == CLIENT_NAME
assert status["application"]["website"] == CLIENT_WEBSITE
def test_post_visibility(app, user, run):
for visibility in ["public", "unlisted", "private", "direct"]:
out = run("post", "foo", "--visibility", visibility)
status_id = _posted_status_id(out)
status = api.fetch_status(app, user, status_id)
assert status["visibility"] == visibility
def test_post_scheduled_at(app, user, run):
text = str(uuid.uuid4())
scheduled_at = datetime.now(timezone.utc).replace(microsecond=0) + timedelta(minutes=10)
out = run("post", text, "--scheduled-at", scheduled_at.isoformat())
assert "Toot scheduled for" in out
statuses = api.scheduled_statuses(app, user)
[status] = [s for s in statuses if s["params"]["text"] == text]
assert datetime.strptime(status["scheduled_at"], "%Y-%m-%dT%H:%M:%S.%f%z") == scheduled_at
def test_post_scheduled_in(app, user, run):
text = str(uuid.uuid4())
variants = [
("1 day", timedelta(days=1)),
("1 day 6 hours", timedelta(days=1, hours=6)),
("1 day 6 hours 13 minutes", timedelta(days=1, hours=6, minutes=13)),
("1 day 6 hours 13 minutes 51 second", timedelta(days=1, hours=6, minutes=13, seconds=51)),
("2d", timedelta(days=2)),
("2d6h", timedelta(days=2, hours=6)),
("2d6h13m", timedelta(days=2, hours=6, minutes=13)),
("2d6h13m51s", timedelta(days=2, hours=6, minutes=13, seconds=51)),
]
datetimes = []
for scheduled_in, delta in variants:
out = run("post", text, "--scheduled-in", scheduled_in)
dttm = datetime.utcnow() + delta
assert out.startswith(f"Toot scheduled for: {str(dttm)[:16]}")
datetimes.append(dttm)
scheduled = api.scheduled_statuses(app, user)
scheduled = [s for s in scheduled if s["params"]["text"] == text]
scheduled = sorted(scheduled, key=lambda s: s["scheduled_at"])
assert len(scheduled) == 8
for expected, status in zip(datetimes, scheduled):
actual = datetime.strptime(status["scheduled_at"], "%Y-%m-%dT%H:%M:%S.%fZ")
delta = expected - actual
assert delta.total_seconds() < 5
def test_post_poll(app, user, run):
text = str(uuid.uuid4())
out = run(
"post", text,
"--poll-option", "foo",
"--poll-option", "bar",
"--poll-option", "baz",
"--poll-option", "qux",
)
status_id = _posted_status_id(out)
status = api.fetch_status(app, user, status_id)
assert status["poll"]["expired"] is False
assert status["poll"]["multiple"] is False
assert status["poll"]["options"] == [
{"title": "foo", "votes_count": 0},
{"title": "bar", "votes_count": 0},
{"title": "baz", "votes_count": 0},
{"title": "qux", "votes_count": 0}
]
# Test expires_at is 24h by default
actual = datetime.strptime(status["poll"]["expires_at"], "%Y-%m-%dT%H:%M:%S.%f%z")
expected = datetime.now(timezone.utc) + timedelta(days=1)
delta = actual - expected
assert delta.total_seconds() < 5
def test_post_poll_multiple(app, user, run):
text = str(uuid.uuid4())
out = run(
"post", text,
"--poll-option", "foo",
"--poll-option", "bar",
"--poll-multiple"
)
status_id = _posted_status_id(out)
status = api.fetch_status(app, user, status_id)
assert status["poll"]["multiple"] is True
def test_post_poll_expires_in(app, user, run):
text = str(uuid.uuid4())
out = run(
"post", text,
"--poll-option", "foo",
"--poll-option", "bar",
"--poll-expires-in", "8h",
)
status_id = _posted_status_id(out)
status = api.fetch_status(app, user, status_id)
actual = datetime.strptime(status["poll"]["expires_at"], "%Y-%m-%dT%H:%M:%S.%f%z")
expected = datetime.now(timezone.utc) + timedelta(hours=8)
delta = actual - expected
assert delta.total_seconds() < 5
def test_post_poll_hide_totals(app, user, run):
text = str(uuid.uuid4())
out = run(
"post", text,
"--poll-option", "foo",
"--poll-option", "bar",
"--poll-hide-totals"
)
status_id = _posted_status_id(out)
status = api.fetch_status(app, user, status_id)
# votes_count is None when totals are hidden
assert status["poll"]["options"] == [
{"title": "foo", "votes_count": None},
{"title": "bar", "votes_count": None},
]
def test_post_language(app, user, run):
out = run("post", "test", "--language", "hr")
status_id = _posted_status_id(out)
status = api.fetch_status(app, user, status_id)
assert status["language"] == "hr"
out = run("post", "test", "--language", "zh")
status_id = _posted_status_id(out)
status = api.fetch_status(app, user, status_id)
assert status["language"] == "zh"
def test_media_thumbnail(app, user, run):
assets_dir = path.realpath(path.join(path.dirname(__file__), "assets"))
video_path = path.join(assets_dir, "small.webm")
thumbnail_path = path.join(assets_dir, "test1.png")
out = run(
"post",
"--media", video_path,
"--thumbnail", thumbnail_path,
"--description", "foo",
"some text"
)
status_id = _posted_status_id(out)
status = api.fetch_status(app, user, status_id)
[media] = status["media_attachments"]
assert media["description"] == "foo"
assert media["type"] == "video"
assert media["url"].endswith(".mp4")
assert media["preview_url"].endswith(".png")
# Video properties
assert int(media["meta"]["original"]["duration"]) == 5
assert media["meta"]["original"]["height"] == 320
assert media["meta"]["original"]["width"] == 560
# Thumbnail properties
assert media["meta"]["small"]["height"] == 50
assert media["meta"]["small"]["width"] == 50
def test_media_attachments(app, user, run):
assets_dir = path.realpath(path.join(path.dirname(__file__), "assets"))
path1 = path.join(assets_dir, "test1.png")
path2 = path.join(assets_dir, "test2.png")
path3 = path.join(assets_dir, "test3.png")
path4 = path.join(assets_dir, "test4.png")
out = run(
"post",
"--media", path1,
"--media", path2,
"--media", path3,
"--media", path4,
"--description", "Test 1",
"--description", "Test 2",
"--description", "Test 3",
"--description", "Test 4",
"some text"
)
status_id = _posted_status_id(out)
status = api.fetch_status(app, user, status_id)
[a1, a2, a3, a4] = status["media_attachments"]
# Pleroma doesn't send metadata
if "meta" in a1:
assert a1["meta"]["original"]["size"] == "50x50"
assert a2["meta"]["original"]["size"] == "50x60"
assert a3["meta"]["original"]["size"] == "50x70"
assert a4["meta"]["original"]["size"] == "50x80"
assert a1["description"] == "Test 1"
assert a2["description"] == "Test 2"
assert a3["description"] == "Test 3"
assert a4["description"] == "Test 4"
@mock.patch("toot.utils.multiline_input")
@mock.patch("sys.stdin.read")
def test_media_attachment_without_text(mock_read, mock_ml, app, user, run):
# No status from stdin or readline
mock_read.return_value = ""
mock_ml.return_value = ""
assets_dir = path.realpath(path.join(path.dirname(__file__), "assets"))
media_path = path.join(assets_dir, "test1.png")
out = run("post", "--media", media_path)
status_id = _posted_status_id(out)
status = api.fetch_status(app, user, status_id)
assert status["content"] == ""
[attachment] = status["media_attachments"]
assert not attachment["description"]
# Pleroma doesn't send metadata
if "meta" in attachment:
assert attachment["meta"]["original"]["size"] == "50x50"
def test_delete_status(app, user, run):
status = api.post_status(app, user, "foo")
out = run("delete", status["id"])
assert out == "✓ Status deleted"
with pytest.raises(NotFoundError):
api.fetch_status(app, user, status["id"])
def test_reply_thread(app, user, friend, run):
status = api.post_status(app, friend, "This is the status")
out = run("post", "--reply-to", status["id"], "This is the reply")
status_id = _posted_status_id(out)
reply = api.fetch_status(app, user, status_id)
assert reply["in_reply_to_id"] == status["id"]
out = run("thread", status["id"])
[s1, s2] = [s.strip() for s in re.split(r"─+", out) if s.strip()]
assert "This is the status" in s1
assert "This is the reply" in s2
assert friend.username in s1
assert user.username in s2
assert status["id"] in s1
assert reply["id"] in s2
def test_favourite(app, user, run):
status = api.post_status(app, user, "foo")
assert not status["favourited"]
out = run("favourite", status["id"])
assert out == "✓ Status favourited"
status = api.fetch_status(app, user, status["id"])
assert status["favourited"]
out = run("unfavourite", status["id"])
assert out == "✓ Status unfavourited"
# A short delay is required before the server returns new data
time.sleep(0.1)
status = api.fetch_status(app, user, status["id"])
assert not status["favourited"]
def test_reblog(app, user, run):
status = api.post_status(app, user, "foo")
assert not status["reblogged"]
out = run("reblog", status["id"])
assert out == "✓ Status reblogged"
status = api.fetch_status(app, user, status["id"])
assert status["reblogged"]
out = run("reblogged_by", status["id"])
assert out == f"@{user.username}"
out = run("unreblog", status["id"])
assert out == "✓ Status unreblogged"
status = api.fetch_status(app, user, status["id"])
assert not status["reblogged"]
def test_pin(app, user, run):
status = api.post_status(app, user, "foo")
assert not status["pinned"]
out = run("pin", status["id"])
assert out == "✓ Status pinned"
status = api.fetch_status(app, user, status["id"])
assert status["pinned"]
out = run("unpin", status["id"])
assert out == "✓ Status unpinned"
status = api.fetch_status(app, user, status["id"])
assert not status["pinned"]
def test_bookmark(app, user, run):
status = api.post_status(app, user, "foo")
assert not status["bookmarked"]
out = run("bookmark", status["id"])
assert out == "✓ Status bookmarked"
status = api.fetch_status(app, user, status["id"])
assert status["bookmarked"]
out = run("unbookmark", status["id"])
assert out == "✓ Status unbookmarked"
status = api.fetch_status(app, user, status["id"])
assert not status["bookmarked"]
def test_whoami(user, run):
out = run("whoami")
# TODO: test other fields once updating account is supported
assert f"@{user.username}" in out
def test_whois(app, friend, run):
variants = [
friend.username,
f"@{friend.username}",
f"{friend.username}@{app.instance}",
f"@{friend.username}@{app.instance}",
]
for username in variants:
out = run("whois", username)
assert f"@{friend.username}" in out
def test_search_account(friend, run):
out = run("search", friend.username)
assert out == f"Accounts:\n* @{friend.username}"
def test_search_hashtag(app, user, run):
api.post_status(app, user, "#hashtag_x")
api.post_status(app, user, "#hashtag_y")
api.post_status(app, user, "#hashtag_z")
out = run("search", "#hashtag")
assert out == "Hashtags:\n#hashtag_x, #hashtag_y, #hashtag_z"
def test_follow(friend, run):
out = run("follow", friend.username)
assert out == f"✓ You are now following {friend.username}"
out = run("unfollow", friend.username)
assert out == f"✓ You are no longer following {friend.username}"
def test_follow_case_insensitive(friend, run):
username = friend.username.upper()
out = run("follow", username)
assert out == f"✓ You are now following {username}"
out = run("unfollow", username)
assert out == f"✓ You are no longer following {username}"
# TODO: improve testing stderr, catching exceptions is not optimal
def test_follow_not_found(run):
with pytest.raises(ConsoleError) as ex_info:
run("follow", "banana")
assert str(ex_info.value) == "Account not found"
def test_mute(app, user, friend, run):
out = run("mute", friend.username)
assert out == f"✓ You have muted {friend.username}"
[muted_account] = api.get_muted_accounts(app, user)
assert muted_account["acct"] == friend.username
out = run("unmute", friend.username)
assert out == f"{friend.username} is no longer muted"
assert api.get_muted_accounts(app, user) == []
def test_block(app, user, friend, run):
out = run("block", friend.username)
assert out == f"✓ You are now blocking {friend.username}"
[blockd_account] = api.get_blocked_accounts(app, user)
assert blockd_account["acct"] == friend.username
out = run("unblock", friend.username)
assert out == f"{friend.username} is no longer blocked"
assert api.get_blocked_accounts(app, user) == []
def test_following_followers(user, friend, run):
out = run("following", user.username)
assert out == ""
run("follow", friend.username)
out = run("following", user.username)
assert out == f"* @{friend.username}"
out = run("followers", friend.username)
assert out == f"* @{user.username}"
def test_tags(run):
out = run("tags_followed")
assert out == "You're not following any hashtags."
out = run("tags_follow", "foo")
assert out == "✓ You are now following #foo"
out = run("tags_followed")
assert out == f"* #foo\t{BASE_URL}/tags/foo"
out = run("tags_follow", "bar")
assert out == "✓ You are now following #bar"
out = run("tags_followed")
assert out == "\n".join([
f"* #bar\t{BASE_URL}/tags/bar",
f"* #foo\t{BASE_URL}/tags/foo",
])
out = run("tags_unfollow", "foo")
assert out == "✓ You are no longer following #foo"
out = run("tags_followed")
assert out == f"* #bar\t{BASE_URL}/tags/bar"
def test_update_account_no_options(run):
with pytest.raises(ConsoleError) as exc:
run("update_account")
assert str(exc.value) == "Please specify at least one option to update the account"
def test_update_account_display_name(run, app, user):
out = run("update_account", "--display-name", "elwood")
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
assert account["display_name"] == "elwood"
def test_update_account_note(run, app, user):
note = ("It's 106 miles to Chicago, we got a full tank of gas, half a pack "
"of cigarettes, it's dark... and we're wearing sunglasses.")
out = run("update_account", "--note", note)
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
assert get_text(account["note"]) == note
def test_update_account_language(run, app, user):
out = run("update_account", "--language", "hr")
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
assert account["source"]["language"] == "hr"
def test_update_account_privacy(run, app, user):
out = run("update_account", "--privacy", "private")
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
assert account["source"]["privacy"] == "private"
def test_update_account_avatar(run, app, user):
account = api.verify_credentials(app, user)
old_value = account["avatar"]
out = run("update_account", "--avatar", TRUMPET)
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
assert account["avatar"] != old_value
def test_update_account_header(run, app, user):
account = api.verify_credentials(app, user)
old_value = account["header"]
out = run("update_account", "--header", TRUMPET)
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
assert account["header"] != old_value
def test_update_account_locked(run, app, user):
out = run("update_account", "--locked")
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
assert account["locked"] is True
out = run("update_account", "--no-locked")
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
assert account["locked"] is False
def test_update_account_bot(run, app, user):
out = run("update_account", "--bot")
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
assert account["bot"] is True
out = run("update_account", "--no-bot")
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
assert account["bot"] is False
def test_update_account_discoverable(run, app, user):
out = run("update_account", "--discoverable")
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
assert account["discoverable"] is True
out = run("update_account", "--no-discoverable")
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
assert account["discoverable"] is False
def test_update_account_sensitive(run, app, user):
out = run("update_account", "--sensitive")
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
assert account["source"]["sensitive"] is True
out = run("update_account", "--no-sensitive")
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
assert account["source"]["sensitive"] is False
# ------------------------------------------------------------------------------
# Utils
# ------------------------------------------------------------------------------
strip_ansi_pattern = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
def strip_ansi(string):
return strip_ansi_pattern.sub("", string).strip()
def _posted_status_id(out):
pattern = re.compile(r"Toot posted: http://([^/]+)/([^/]+)/(.+)")
match = re.search(pattern, out)
assert match
_, _, status_id = match.groups()
return status_id

View File

@ -519,3 +519,45 @@ def clear_notifications(app, user):
def get_instance(base_url):
url = f"{base_url}/api/v1/instance"
return http.anon_get(url).json()
def get_lists(app, user):
path = "/api/v1/lists"
return _get_response_list(app, user, path)
def find_list_id(app, user, title):
lists = get_lists(app, user)
for list_item in lists:
if list_item["title"] == title:
return list_item["id"]
return None
def get_list_accounts(app, user, list_id):
path = f"/api/v1/lists/{list_id}/accounts"
return _get_response_list(app, user, path)
def create_list(app, user, title, replies_policy):
url = "/api/v1/lists"
json = {'title': title}
if replies_policy:
json['replies_policy'] = replies_policy
return http.post(app, user, url, json=json).json()
def delete_list(app, user, id):
return http.delete(app, user, f"/api/v1/lists/{id}")
def add_accounts_to_list(app, user, list_id, account_ids):
url = f"/api/v1/lists/{list_id}/accounts"
json = {'account_ids': account_ids}
return http.post(app, user, url, json=json).json()
def remove_accounts_from_list(app, user, list_id, account_ids):
url = f"/api/v1/lists/{list_id}/accounts"
json = {'account_ids': account_ids}
return http.delete(app, user, url, json=json)

View File

@ -1,3 +1,4 @@
import sys
import platform
@ -6,9 +7,9 @@ from time import sleep, time
from toot import api, config, __version__
from toot.auth import login_interactive, login_browser_interactive, create_app_interactive
from toot.exceptions import ApiError, ConsoleError
from toot.output import (print_out, print_instance, print_account, print_acct_list,
print_search_results, print_timeline, print_notifications,
print_tag_list)
from toot.output import (print_lists, print_out, print_instance, print_account, print_acct_list,
print_search_results, print_table, print_timeline, print_notifications,
print_tag_list, print_list_accounts)
from toot.tui.utils import parse_datetime
from toot.utils import args_get_instance, delete_tmp_status_file, editor_input, multiline_input, EOF_KEY
@ -423,6 +424,71 @@ def tags_followed(app, user, args):
print_tag_list(response)
def lists(app, user, args):
lists = api.get_lists(app, user)
if lists:
print_lists(lists)
else:
print_out("You have no lists defined.")
def list_accounts(app, user, args):
list_id = _get_list_id(app, user, args)
response = api.get_list_accounts(app, user, list_id)
print_list_accounts(response)
def list_create(app, user, args):
api.create_list(app, user, title=args.title, replies_policy=args.replies_policy)
print_out(f"<green>✓ List \"{args.title}\" created.</green>")
def list_delete(app, user, args):
list_id = _get_list_id(app, user, args)
api.delete_list(app, user, list_id)
print_out(f"<green>✓ List \"{args.title if args.title else args.id}\"</green> <red>deleted.</red>")
def list_add(app, user, args):
list_id = _get_list_id(app, user, args)
account = find_account(app, user, args.account)
try:
api.add_accounts_to_list(app, user, list_id, [account['id']])
except Exception as ex:
# if we failed to add the account, try to give a
# more specific error message than "record not found"
my_accounts = api.followers(app, user, account['id'])
found = False
if my_accounts:
for my_account in my_accounts:
if my_account['id'] == account['id']:
found = True
break
if found is False:
print_out(f"<red>You must follow @{account['acct']} before adding this account to a list.</red>")
else:
print_out(f"<red>{ex}</red>")
return
print_out(f"<green>✓ Added account \"{args.account}\"</green>")
def list_remove(app, user, args):
list_id = _get_list_id(app, user, args)
account = find_account(app, user, args.account)
api.remove_accounts_from_list(app, user, list_id, [account['id']])
print_out(f"<green>✓ Removed account \"{args.account}\"</green>")
def _get_list_id(app, user, args):
list_id = args.id or api.find_list_id(app, user, args.title)
if not list_id:
raise ConsoleError("List not found")
return list_id
def mute(app, user, args):
account = find_account(app, user, args.account)
api.mute(app, user, account['id'])

View File

@ -724,6 +724,101 @@ TAG_COMMANDS = [
),
]
LIST_COMMANDS = [
Command(
name="lists",
description="List all lists",
arguments=[],
require_auth=True,
),
Command(
name="list_accounts",
description="List the accounts in a list",
arguments=[
(["--id"], {
"type": str,
"help": "ID of the list"
}),
(["title"], {
"type": str,
"nargs": "?",
"help": "title of the list"
}),
],
require_auth=True,
),
Command(
name="list_create",
description="Create a list",
arguments=[
(["title"], {
"type": str,
"help": "title of the list"
}),
(["--replies-policy"], {
"type": str,
"help": "replies policy: 'followed', 'list', or 'none' (defaults to 'none')"
}),
],
require_auth=True,
),
Command(
name="list_delete",
description="Delete a list",
arguments=[
(["--id"], {
"type": str,
"help": "ID of the list"
}),
(["title"], {
"type": str,
"nargs": "?",
"help": "title of the list"
}),
],
require_auth=True,
),
Command(
name="list_add",
description="Add account to list",
arguments=[
(["--id"], {
"type": str,
"help": "ID of the list"
}),
(["title"], {
"type": str,
"nargs": "?",
"help": "title of the list"
}),
(["account"], {
"type": str,
"help": "Account to add"
}),
],
require_auth=True,
),
Command(
name="list_remove",
description="Remove account from list",
arguments=[
(["--id"], {
"type": str,
"help": "ID of the list"
}),
(["title"], {
"type": str,
"nargs": "?",
"help": "title of the list"
}),
(["account"], {
"type": str,
"help": "Account to remove"
}),
],
require_auth=True,
),
]
COMMAND_GROUPS = [
("Authentication", AUTH_COMMANDS),
("TUI", TUI_COMMANDS),
@ -732,6 +827,7 @@ COMMAND_GROUPS = [
("Status", STATUS_COMMANDS),
("Accounts", ACCOUNTS_COMMANDS),
("Hashtags", TAG_COMMANDS),
("Lists", LIST_COMMANDS),
]
COMMANDS = list(chain(*[commands for _, commands in COMMAND_GROUPS]))

View File

@ -92,13 +92,13 @@ def patch(app, user, path, headers=None, files=None, data=None, json=None):
return process_response(response)
def delete(app, user, path, data=None, headers=None):
def delete(app, user, path, data=None, json=None, headers=None):
url = app.base_url + path
headers = headers or {}
headers["Authorization"] = f"Bearer {user.access_token}"
request = Request('DELETE', url, headers=headers, json=data)
request = Request('DELETE', url, headers=headers, data=data, json=json)
response = send_request(request)
return process_response(response)

View File

@ -3,9 +3,10 @@ import re
import sys
import textwrap
from toot.tui.utils import parse_datetime
from typing import List
from wcwidth import wcswidth
from toot.tui.utils import parse_datetime
from toot.utils import get_text, parse_html
from toot.wcstring import wc_wrap
@ -210,6 +211,43 @@ def print_tag_list(tags):
print_out("You're not following any hashtags.")
def print_lists(lists):
headers = ["ID", "Title", "Replies"]
data = [[lst["id"], lst["title"], lst["replies_policy"]] for lst in lists]
print_table(headers, data)
def print_table(headers: List[str], data: List[List[str]]):
widths = [[len(cell) for cell in row] for row in data + [headers]]
widths = [max(width) for width in zip(*widths)]
def style(string, tag):
return f"<{tag}>{string}</{tag}>" if tag else string
def print_row(row, tag=None):
for idx, cell in enumerate(row):
width = widths[idx]
print_out(style(cell.ljust(width), tag), end="")
print_out(" ", end="")
print_out()
underlines = ["-" * width for width in widths]
print_row(headers, "bold")
print_row(underlines, "dim")
for row in data:
print_row(row)
def print_list_accounts(accounts):
if accounts:
print_out("Accounts in list</green>:\n")
print_acct_list(accounts)
else:
print_out("This list has no accounts.")
def print_search_results(results):
accounts = results['accounts']
hashtags = results['hashtags']

View File

@ -403,7 +403,9 @@ class TUI(urwid.Frame):
def show_goto_menu(self):
user_timelines = self.config.get("timelines", {})
menu = GotoMenu(user_timelines)
user_lists = api.get_lists(self.app, self.user) or []
menu = GotoMenu(user_timelines, user_lists)
urwid.connect_signal(menu, "home_timeline",
lambda x: self.goto_home_timeline())
urwid.connect_signal(menu, "public_timeline",
@ -416,10 +418,12 @@ class TUI(urwid.Frame):
lambda x, local: self.goto_conversations())
urwid.connect_signal(menu, "hashtag_timeline",
lambda x, tag, local: self.goto_tag_timeline(tag, local=local))
urwid.connect_signal(menu, "list_timeline",
lambda x, list_item: self.goto_list_timeline(list_item))
self.open_overlay(menu, title="Go to", options=dict(
align="center", width=("relative", 60),
valign="middle", height=16 + len(user_timelines),
valign="middle", height=17 + len(user_timelines) + len(user_lists),
))
def show_help(self):
@ -473,6 +477,13 @@ class TUI(urwid.Frame):
)
promise.add_done_callback(lambda *args: self.close_overlay())
def goto_list_timeline(self, list_item):
self.timeline_generator = api.timeline_list_generator(
self.app, self.user, list_item['id'], limit=40)
promise = self.async_load_timeline(
is_initial=True, timeline_name=f"\N{clipboard}{list_item['title']}")
promise.add_done_callback(lambda *args: self.close_overlay())
def show_media(self, status):
urls = [m["url"] for m in status.original.data["media_attachments"]]
if urls:
@ -689,12 +700,19 @@ class TUI(urwid.Frame):
def refresh_timeline(self):
# No point in refreshing the bookmarks timeline
if not self.timeline or self.timeline.name == 'bookmarks':
# and we don't have a good way to refresh a
# list timeline yet (no reference to list ID kept)
if (not self.timeline
or self.timeline.name == 'bookmarks'
or self.timeline.name.startswith("\N{clipboard}")):
return
if self.timeline.name.startswith("#"):
self.timeline_generator = api.tag_timeline_generator(
self.app, self.user, self.timeline.name[1:], limit=40)
elif self.timeline.name.startswith("\N{clipboard}"):
self.timeline_generator = api.tag_timeline_generator(
self.app, self.user, self.timeline.name[1:], limit=40)
else:
if self.timeline.name.endswith("public"):
self.timeline_generator = api.public_timeline_generator(

View File

@ -106,20 +106,21 @@ class GotoMenu(urwid.ListBox):
"bookmark_timeline",
"notification_timeline",
"conversation_timeline",
"list_timeline",
]
def __init__(self, user_timelines):
def __init__(self, user_timelines, user_lists):
self.hash_edit = EditBox(caption="Hashtag: ")
self.message_widget = urwid.Text("")
actions = list(self.generate_actions(user_timelines))
actions = list(self.generate_actions(user_timelines, user_lists))
walker = urwid.SimpleFocusListWalker(actions)
super().__init__(walker)
def get_hashtag(self):
return self.hash_edit.edit_text.strip().lstrip("#")
def generate_actions(self, user_timelines):
def generate_actions(self, user_timelines, user_lists):
def _home(button):
self._emit("home_timeline")
@ -151,6 +152,11 @@ class GotoMenu(urwid.ListBox):
self._emit("hashtag_timeline", tag, local)
return on_press
def mk_on_press_user_list(list_item):
def on_press(btn):
self._emit("list_timeline", list_item)
return on_press
yield Button("Home timeline", on_press=_home)
yield Button("Local public timeline", on_press=_local_public)
yield Button("Global public timeline", on_press=_global_public)
@ -168,6 +174,10 @@ class GotoMenu(urwid.ListBox):
yield Button(f"#{tag}" + (" (local)" if is_local else ""),
on_press=mk_on_press_user_hashtag(tag, is_local))
for list_item in user_lists:
yield Button(f"\N{clipboard}{list_item['title']}",
on_press=mk_on_press_user_list(list_item))
yield urwid.Divider()
yield self.hash_edit
yield Button("Local hashtag timeline", on_press=lambda x: _hashtag(True))

View File

@ -423,4 +423,4 @@ class ScrollBar(urwid.WidgetDecoration):
ow.set_scrollpos(pos + 1)
return True
return False
return False

View File

@ -6,6 +6,7 @@ from PIL import Image, ImageOps
from term_image.image import AutoImage
from term_image.widget import UrwidImage
from .utils import can_render_pixels
from wcwidth import wcswidth
class Clickable:
@ -47,12 +48,12 @@ class Button(urwid.AttrWrap):
"""Styled button."""
def __init__(self, *args, **kwargs):
button = urwid.Button(*args, **kwargs)
padding = urwid.Padding(button, width=len(args[0]) + 4)
padding = urwid.Padding(button, width=wcswidth(args[0]) + 4)
return super().__init__(padding, "button", "button_focused")
def set_label(self, *args, **kwargs):
self.original_widget.original_widget.set_label(*args, **kwargs)
self.original_widget.width = len(args[0]) + 4
self.original_widget.width = wcswidth(args[0]) + 4
class CheckBox(urwid.AttrWrap):