Migrate timeline commands

This commit is contained in:
Ivan Habunek 2023-12-03 07:07:18 +01:00
parent 69a11f3569
commit 2429d9f751
No known key found for this signature in database
GPG Key ID: F5F0623FF5EBCB3D
8 changed files with 448 additions and 23 deletions

View File

@ -15,7 +15,7 @@ test:
coverage:
coverage erase
coverage run
coverage html --omit toot/tui/*
coverage html --omit "toot/tui/*"
coverage report
clean :

View File

@ -115,12 +115,20 @@ def runner():
@pytest.fixture
def run(app, user, runner):
def _run(command, *params, as_user=None, input=None) -> Result:
ctx = Context(app, as_user or user)
def _run(command, *params, input=None) -> Result:
ctx = Context(app, user)
return runner.invoke(command, params, obj=ctx, input=input)
return _run
@pytest.fixture
def run_as(app, runner):
def _run_as(user, command, *params, input=None) -> Result:
ctx = Context(app, user)
return runner.invoke(command, params, obj=ctx, input=input)
return _run_as
@pytest.fixture
def run_json(app, user, runner):
def _run_json(command, *params):

View File

@ -0,0 +1,198 @@
import pytest
from time import sleep
from uuid import uuid4
from toot import api, cli
from toot.entities import from_dict, Status
from tests.integration.conftest import TOOT_TEST_BASE_URL, register_account
# TODO: If fixture is not overriden here, tests fail, not sure why, figure it out
@pytest.fixture(scope="module")
def user(app):
return register_account(app)
@pytest.fixture(scope="module")
def other_user(app):
return register_account(app)
@pytest.fixture(scope="module")
def friend_user(app, user):
friend = register_account(app)
friend_account = api.find_account(app, user, friend.username)
api.follow(app, user, friend_account["id"])
return friend
@pytest.fixture(scope="module")
def friend_list(app, user, friend_user):
friend_account = api.find_account(app, user, friend_user.username)
list = api.create_list(app, user, str(uuid4()))
api.add_accounts_to_list(app, user, list["id"], account_ids=[friend_account["id"]])
return list
def test_timelines(app, user, other_user, friend_user, friend_list, run):
status1 = _post_status(app, user, "#foo")
status2 = _post_status(app, other_user, "#bar")
status3 = _post_status(app, friend_user, "#foo #bar")
# Give mastodon time to process things :/
# Tests fail if this is removed, required delay depends on server speed
sleep(1)
# Home timeline
result = run(cli.timeline)
assert result.exit_code == 0
assert status1.id in result.stdout
assert status2.id not in result.stdout
assert status3.id in result.stdout
# Public timeline
result = run(cli.timeline, "--public")
assert result.exit_code == 0
assert status1.id in result.stdout
assert status2.id in result.stdout
assert status3.id in result.stdout
# Anon public timeline
result = run(cli.timeline, "--instance", TOOT_TEST_BASE_URL, "--public")
assert result.exit_code == 0
assert status1.id in result.stdout
assert status2.id in result.stdout
assert status3.id in result.stdout
# Tag timeline
result = run(cli.timeline, "--tag", "foo")
assert result.exit_code == 0
assert status1.id in result.stdout
assert status2.id not in result.stdout
assert status3.id in result.stdout
result = run(cli.timeline, "--tag", "bar")
assert result.exit_code == 0
assert status1.id not in result.stdout
assert status2.id in result.stdout
assert status3.id in result.stdout
# Anon tag timeline
result = run(cli.timeline, "--instance", TOOT_TEST_BASE_URL, "--tag", "foo")
assert result.exit_code == 0
assert status1.id in result.stdout
assert status2.id not in result.stdout
assert status3.id in result.stdout
# List timeline (by list name)
result = run(cli.timeline, "--list", friend_list["title"])
assert result.exit_code == 0
assert status1.id not in result.stdout
assert status2.id not in result.stdout
assert status3.id in result.stdout
# List timeline (by list ID)
result = run(cli.timeline, "--list", friend_list["id"])
assert result.exit_code == 0
assert status1.id not in result.stdout
assert status2.id not in result.stdout
assert status3.id in result.stdout
# Account timeline
result = run(cli.timeline, "--account", friend_user.username)
assert result.exit_code == 0
assert status1.id not in result.stdout
assert status2.id not in result.stdout
assert status3.id in result.stdout
result = run(cli.timeline, "--account", other_user.username)
assert result.exit_code == 0
assert status1.id not in result.stdout
assert status2.id in result.stdout
assert status3.id not in result.stdout
def test_empty_timeline(app, run_as):
user = register_account(app)
result = run_as(user, cli.timeline)
assert result.exit_code == 0
assert result.stdout.strip() == "" * 100
def test_timeline_cant_combine_timelines(run):
result = run(cli.timeline, "--tag", "foo", "--account", "bar")
assert result.exit_code == 1
assert result.stderr.strip() == "Error: Only one of --public, --tag, --account, or --list can be used at one time."
def test_timeline_local_needs_public_or_tag(run):
result = run(cli.timeline, "--local")
assert result.exit_code == 1
assert result.stderr.strip() == "Error: The --local option is only valid alongside --public or --tag."
def test_timeline_instance_needs_public_or_tag(run):
result = run(cli.timeline, "--instance", TOOT_TEST_BASE_URL)
assert result.exit_code == 1
assert result.stderr.strip() == "Error: The --instance option is only valid alongside --public or --tag."
def test_bookmarks(app, user, run):
status1 = _post_status(app, user)
status2 = _post_status(app, user)
api.bookmark(app, user, status1.id)
api.bookmark(app, user, status2.id)
result = run(cli.bookmarks)
assert result.exit_code == 0
assert status1.id in result.stdout
assert status2.id in result.stdout
assert result.stdout.find(status1.id) > result.stdout.find(status2.id)
result = run(cli.bookmarks, "--reverse")
assert result.exit_code == 0
assert status1.id in result.stdout
assert status2.id in result.stdout
assert result.stdout.find(status1.id) < result.stdout.find(status2.id)
def test_notifications(app, user, other_user, run):
result = run(cli.notifications)
assert result.exit_code == 0
assert result.stdout.strip() == "You have no notifications"
text = f"Paging doctor @{user.username}"
status = _post_status(app, other_user, text)
sleep(0.5) # grr
result = run(cli.notifications)
assert result.exit_code == 0
assert f"@{other_user.username} mentioned you" in result.stdout
assert status.id in result.stdout
assert text in result.stdout
result = run(cli.notifications, "--mentions")
assert result.exit_code == 0
assert f"@{other_user.username} mentioned you" in result.stdout
assert status.id in result.stdout
assert text in result.stdout
def test_notifications_follow(app, user, friend_user, run_as):
result = run_as(friend_user, cli.notifications)
assert result.exit_code == 0
assert f"@{user.username} now follows you" in result.stdout
result = run_as(friend_user, cli.notifications, "--mentions")
assert result.exit_code == 0
assert "now follows you" not in result.stdout
def _post_status(app, user, text=None) -> Status:
text = text or str(uuid4())
response = api.post_status(app, user, text)
return from_dict(Status, response.json())

View File

@ -8,7 +8,7 @@ from typing import BinaryIO, List, Optional
from urllib.parse import urlparse, urlencode, quote
from toot import App, User, http, CLIENT_NAME, CLIENT_WEBSITE
from toot.exceptions import AuthenticationError, ConsoleError
from toot.exceptions import ConsoleError
from toot.utils import drop_empty_values, str_bool, str_bool_nullable
@ -300,6 +300,35 @@ def reblogged_by(app, user, status_id) -> Response:
return http.get(app, user, url)
def get_timeline_generator(
app: Optional[App],
user: Optional[User],
base_url: Optional[str] = None,
account: Optional[str] = None,
list_id: Optional[str] = None,
tag: Optional[str] = None,
local: bool = False,
public: bool = False,
limit=20, # TODO
):
if public:
if base_url:
return anon_public_timeline_generator(base_url, local=local, limit=limit)
else:
return public_timeline_generator(app, user, local=local, limit=limit)
elif tag:
if base_url:
return anon_tag_timeline_generator(base_url, tag, limit=limit)
else:
return tag_timeline_generator(app, user, tag, local=local, limit=limit)
elif account:
return account_timeline_generator(app, user, account, limit=limit)
elif list_id:
return timeline_list_generator(app, user, list_id, limit=limit)
else:
return home_timeline_generator(app, user, limit=limit)
def _get_next_path(headers):
"""Given timeline response headers, returns the path to the next batch"""
links = headers.get('Link', '')
@ -309,6 +338,14 @@ def _get_next_path(headers):
return "?".join([parsed.path, parsed.query])
def _get_next_url(headers) -> Optional[str]:
"""Given timeline response headers, returns the url to the next batch"""
links = headers.get('Link', '')
match = re.match('<([^>]+)>; rel="next"', links)
if match:
return match.group(1)
def _timeline_generator(app, user, path, params=None):
while path:
response = http.get(app, user, path, params)
@ -369,7 +406,7 @@ def conversation_timeline_generator(app, user, limit=20):
return _conversation_timeline_generator(app, user, path, params)
def account_timeline_generator(app: App, user: User, account_name: str, replies=False, reblogs=False, limit=20):
def account_timeline_generator(app, user, account_name: str, replies=False, reblogs=False, limit=20):
account = find_account(app, user, account_name)
path = f"/api/v1/accounts/{account['id']}/statuses"
params = {"limit": limit, "exclude_replies": not replies, "exclude_reblogs": not reblogs}
@ -381,24 +418,23 @@ def timeline_list_generator(app, user, list_id, limit=20):
return _timeline_generator(app, user, path, {'limit': limit})
def _anon_timeline_generator(instance, path, params=None):
while path:
url = f"https://{instance}{path}"
def _anon_timeline_generator(url, params=None):
while url:
response = http.anon_get(url, params)
yield response.json()
path = _get_next_path(response.headers)
url = _get_next_url(response.headers)
def anon_public_timeline_generator(instance, local=False, limit=20):
path = '/api/v1/timelines/public'
params = {'local': str_bool(local), 'limit': limit}
return _anon_timeline_generator(instance, path, params)
def anon_public_timeline_generator(base_url, local=False, limit=20):
query = urlencode({"local": str_bool(local), "limit": limit})
url = f"{base_url}/api/v1/timelines/public?{query}"
return _anon_timeline_generator(url)
def anon_tag_timeline_generator(instance, hashtag, local=False, limit=20):
path = f"/api/v1/timelines/tag/{quote(hashtag)}"
params = {'local': str_bool(local), 'limit': limit}
return _anon_timeline_generator(instance, path, params)
def anon_tag_timeline_generator(base_url, hashtag, local=False, limit=20):
query = urlencode({"local": str_bool(local), "limit": limit})
url = f"{base_url}/api/v1/timelines/tag/{quote(hashtag)}?{query}"
return _anon_timeline_generator(url)
def get_media(app: App, user: User, id: str):
@ -538,8 +574,8 @@ def verify_credentials(app, user) -> Response:
return http.get(app, user, '/api/v1/accounts/verify_credentials')
def get_notifications(app, user, exclude_types=[], limit=20):
params = {"exclude_types[]": exclude_types, "limit": limit}
def get_notifications(app, user, types=[], exclude_types=[], limit=20):
params = {"types[]": types, "exclude_types[]": exclude_types, "limit": limit}
return http.get(app, user, '/api/v1/notifications', params).json()
@ -570,7 +606,7 @@ def get_list_accounts(app, user, list_id):
return _get_response_list(app, user, path)
def create_list(app, user, title, replies_policy):
def create_list(app, user, title, replies_policy="none"):
url = "/api/v1/lists"
json = {'title': title}
if replies_policy:

View File

@ -1,9 +1,11 @@
from toot.cli.base import cli, Context # noqa
# flake8: noqa
from toot.cli.base import cli, Context
from toot.cli.auth import *
from toot.cli.accounts import *
from toot.cli.auth import *
from toot.cli.lists import *
from toot.cli.post import *
from toot.cli.read import *
from toot.cli.statuses import *
from toot.cli.tags import *
from toot.cli.timelines import *

View File

@ -40,7 +40,7 @@ CONTEXT = dict(
# Data object to add to Click context
class Context(NamedTuple):
app: Optional[App] = None
app: Optional[App]
user: Optional[User] = None
color: bool = False
debug: bool = False

View File

@ -74,6 +74,7 @@ def instance(ctx: Context, instance_url: Optional[str], json: bool):
@json_option
@pass_context
def search(ctx: Context, query: str, resolve: bool, json: bool):
"""Search for users or hashtags"""
response = api.search(ctx.app, ctx.user, query, resolve)
if json:
print(response.text)

180
toot/cli/timelines.py Normal file
View File

@ -0,0 +1,180 @@
import sys
import click
from toot import api
from toot.cli.base import cli, pass_context, Context
from typing import Optional
from toot.cli.validators import validate_instance
from toot.entities import Notification, Status, from_dict
from toot.output import print_notifications, print_timeline
@cli.command()
@click.option(
"--instance", "-i",
callback=validate_instance,
help="""Domain or base URL of the instance from which to read,
e.g. 'mastodon.social' or 'https://mastodon.social'""",
)
@click.option("--account", "-a", help="Show account timeline")
@click.option("--list", help="Show list timeline")
@click.option("--tag", "-t", help="Show hashtag timeline")
@click.option("--public", "-p", is_flag=True, help="Show public timeline")
@click.option(
"--local", "-l", is_flag=True,
help="Show only statuses from the local instance (public and tag timelines only)"
)
@click.option(
"--reverse", "-r", is_flag=True,
help="Reverse the order of the shown timeline (new posts at the bottom)"
)
@click.option(
"--once", "-1", is_flag=True,
help="Only show the first <count> toots, do not prompt to continue"
)
@click.option(
"--count", "-c", type=int, default=10,
help="Number of posts per page (max 20)"
)
@pass_context
def timeline(
ctx: Context,
instance: Optional[str],
account: Optional[str],
list: Optional[str],
tag: Optional[str],
public: bool,
local: bool,
reverse: bool,
once: bool,
count: int,
):
"""Show recent items in a timeline
By default shows the home timeline.
"""
if len([arg for arg in [tag, list, public, account] if arg]) > 1:
raise click.ClickException("Only one of --public, --tag, --account, or --list can be used at one time.")
if local and not (public or tag):
raise click.ClickException("The --local option is only valid alongside --public or --tag.")
if instance and not (public or tag):
raise click.ClickException("The --instance option is only valid alongside --public or --tag.")
list_id = _get_list_id(ctx, list)
"""Show recent statuses in a timeline"""
generator = api.get_timeline_generator(
ctx.app,
ctx.user,
base_url=instance,
account=account,
list_id=list_id,
tag=tag,
public=public,
local=local,
limit=count,
)
_show_timeline(generator, reverse, once)
@cli.command()
@click.option(
"--reverse", "-r", is_flag=True,
help="Reverse the order of the shown timeline (new posts at the bottom)"
)
@click.option(
"--once", "-1", is_flag=True,
help="Only show the first <count> toots, do not prompt to continue"
)
@click.option(
"--count", "-c", type=int, default=10,
help="Number of posts per page (max 20)"
)
@pass_context
def bookmarks(
ctx: Context,
reverse: bool,
once: bool,
count: int,
):
"""Show recent statuses in a timeline"""
generator = api.bookmark_timeline_generator(ctx.app, ctx.user, limit=count)
_show_timeline(generator, reverse, once)
@cli.command()
@click.option("--clear", help="Dismiss all notifications and exit")
@click.option(
"--reverse", "-r", is_flag=True,
help="Reverse the order of the shown notifications (newest on top)"
)
@click.option(
"--mentions", "-m", is_flag=True,
help="Show only mentions"
)
@pass_context
def notifications(
ctx: Context,
clear: bool,
reverse: bool,
mentions: int,
):
"""Show notifications"""
if clear:
api.clear_notifications(ctx.app, ctx.user)
click.secho("✓ Notifications cleared", fg="green")
return
exclude = []
if mentions:
# Filter everything except mentions
# https://docs.joinmastodon.org/methods/notifications/
exclude = ["follow", "favourite", "reblog", "poll", "follow_request"]
notifications = api.get_notifications(ctx.app, ctx.user, exclude_types=exclude)
if not notifications:
click.echo("You have no notifications")
return
if reverse:
notifications = reversed(notifications)
notifications = [from_dict(Notification, n) for n in notifications]
print_notifications(notifications)
def _show_timeline(generator, reverse, once):
while True:
try:
items = next(generator)
except StopIteration:
click.echo("That's all folks.")
return
if reverse:
items = reversed(items)
statuses = [from_dict(Status, item) for item in items]
print_timeline(statuses)
if once or not sys.stdout.isatty():
break
char = input("\nContinue? [Y/n] ")
if char.lower() == "n":
break
def _get_list_id(ctx: Context, value: Optional[str]) -> Optional[str]:
if not value:
return None
lists = api.get_lists(ctx.app, ctx.user)
for list in lists:
if list["id"] == value or list["title"] == value:
return list["id"]