Compare commits

...

25 Commits

Author SHA1 Message Date
Ivan Habunek c639a2609c
Merge f5a465ff25 into 0fc2ec12f5 2024-04-13 08:29:57 +02:00
Daniel Schwarz 0fc2ec12f5
Display images 2024-04-13 08:28:28 +02:00
Ivan Habunek 07ad41960f
Capitalize visibility 2024-04-08 08:34:56 +02:00
Sandra Snan 07beba8c68
Fix --clear text issue
It's a click flag.
2024-04-08 08:32:05 +02:00
Sandra Snan 7244b2718f
Print visibility in CLI
I went with two spaces before and after but feel free to change that
to whatever! Having the visibility printed this way is pretty useful
for us who mostly read posts through the CLI.
2024-04-08 08:31:19 +02:00
Ivan Habunek 968a516f76
Remove unused helpers 2024-04-06 15:06:59 +02:00
Ivan Habunek 38eca67905
Fix bug in run_with_retries, better types 2024-04-06 15:05:47 +02:00
Luca Matei Pintilie 1d48e64853
Fix version check in case of an empty string
Some mastodon implementations (GoToSocial) will return `version: ""`, in
which case checking for the major version won't work.

This is why an extra check has to be added, and default to 0 as the
"major" version.
2024-04-06 14:56:54 +02:00
Ivan Habunek bf12dbff70
Use a stronger password in tests
gotosocial registration fails with a weak password
2024-04-06 13:15:36 +02:00
Ivan Habunek 4b17e2e586
Merge pull request #473 from danschwarz/corrupt_link_fix
Added safeguards to prevent crashes when rendering corrupt URLs
2024-03-12 14:54:51 +01:00
Daniel Schwarz 20968fe87f Added safeguards to prevent crashes when rendering corrupt URLs 2024-03-09 13:48:33 -05:00
Ivan Habunek 3bac9b2fb6
Add changelog, bump version 2024-03-09 12:12:57 +01:00
Ivan Habunek 3420f1466a
Fix type annotation 2024-03-09 12:12:27 +01:00
Ivan Habunek 3eebbe35c9
Change option to lowercase 2024-03-09 10:16:41 +01:00
Ivan Habunek 4d5ac3cc4e
Don't break if status doesn't have edited_at 2024-03-09 10:13:34 +01:00
Ivan Habunek ee98ce3746
Fix following tests 2024-03-09 09:54:46 +01:00
Ivan Habunek 0cbb8863b3
Start some docs for testing 2024-03-09 09:43:02 +01:00
Ivan Habunek 1709a416b3
Make list printing not break on akkoma 2024-03-09 09:32:38 +01:00
Ivan Habunek f324aa119d
Add List entity 2024-03-09 09:32:04 +01:00
Ivan Habunek 43f51cbbb9
Make tests a bit more robust
By creating a new user we don't need to check if we're following or
blocking them before running the test.
2024-03-09 09:24:00 +01:00
Ivan Habunek 225dfbfb2e
Use alias for types 2024-03-09 09:20:43 +01:00
Ivan Habunek 9ae205c548
Upload media using same user in toot post --using 2024-02-10 18:24:35 +01:00
Ivan Habunek 9875209b30
Improve types 2024-02-10 18:24:35 +01:00
Ivan Habunek 965ffa1312
Remove unused code 2024-02-10 18:24:34 +01:00
Ivan Habunek f5a465ff25
Implement vim-like commands in footer 2023-01-20 09:54:37 +01:00
29 changed files with 766 additions and 156 deletions

View File

@ -3,6 +3,13 @@ Changelog
<!-- Do not edit. This file is automatically generated from changelog.yaml.-->
**0.42.0 (2024-03-09)**
* TUI: Add `toot tui --always-show-sensitive` option (thanks Lexi Winter)
* TUI: Document missing shortcuts (thanks Denis Laxalde)
* TUI: Use rounded boxes for nicer visuals (thanks Dan Schwarz)
* TUI: Don't break if edited_at status field does not exist
**0.41.1 (2024-01-02)**
* Fix a crash in settings parsing code

View File

@ -1,3 +1,11 @@
0.42.0:
date: 2024-03-09
changes:
- "TUI: Add `toot tui --always-show-sensitive` option (thanks Lexi Winter)"
- "TUI: Document missing shortcuts (thanks Denis Laxalde)"
- "TUI: Use rounded boxes for nicer visuals (thanks Dan Schwarz)"
- "TUI: Don't break if edited_at status field does not exist"
0.41.1:
date: 2024-01-02
changes:

View File

@ -3,6 +3,13 @@ Changelog
<!-- Do not edit. This file is automatically generated from changelog.yaml.-->
**0.42.0 (2024-03-09)**
* TUI: Add `toot tui --always-show-sensitive` option (thanks Lexi Winter)
* TUI: Document missing shortcuts (thanks Denis Laxalde)
* TUI: Use rounded boxes for nicer visuals (thanks Dan Schwarz)
* TUI: Don't break if edited_at status field does not exist
**0.41.1 (2024-01-02)**
* Fix a crash in settings parsing code

View File

@ -12,7 +12,7 @@ and blocking accounts and other actions.
setup(
name='toot',
version='0.41.1',
version='0.42.0',
description='Mastodon CLI client',
long_description=long_description.strip(),
author='Ivan Habunek',
@ -39,9 +39,14 @@ setup(
"beautifulsoup4>=4.5.0,<5.0",
"wcwidth>=0.1.7",
"urwid>=2.0.0,<3.0",
"tomlkit>=0.10.0,<1.0"
"tomlkit>=0.10.0,<1.0",
],
extras_require={
# Required to display images in the TUI
"images": [
"pillow>=9.5.0",
"term-image==0.7.0",
],
# Required to display rich text in the TUI
"richtext": [
"urwidgets>=0.1,<0.2"
@ -60,6 +65,7 @@ setup(
"setuptools",
"vermin",
"typing-extensions",
"pillow>=9.5.0",
],
},
entry_points={

42
tests/README.md Normal file
View File

@ -0,0 +1,42 @@
Testing toot
============
This document is WIP.
Mastodon
--------
TODO
Pleroma
-------
TODO
Akkoma
------
Install using the guide here:
https://docs.akkoma.dev/stable/installation/docker_en/
Disable captcha and throttling by adding this to `config/prod.exs`:
```ex
# Disable captcha for testing
config :pleroma, Pleroma.Captcha,
enabled: false
# Disable rate limiting for testing
config :pleroma, :rate_limit,
authentication: nil,
timeline: nil,
search: nil,
app_account_creation: nil,
relations_actions: nil,
relation_id_action: nil,
statuses_actions: nil,
status_id_action: nil,
password_reset: nil,
account_confirmation_resend: nil,
ap_routes: nil
```

View File

@ -41,6 +41,8 @@ TRUMPET = str(Path(__file__).parent.parent.parent / "trumpet.png")
ASSETS_DIR = str(Path(__file__).parent.parent / "assets")
PASSWORD = "83dU29170rjKilKQQwuWhJv3PKnSW59bWx0perjP6i7Nu4rkeh4mRfYuvVLYM3fM"
def create_app(base_url):
instance = api.get_instance(base_url).json()
@ -52,7 +54,7 @@ def register_account(app: App):
username = str(uuid.uuid4())[-10:]
email = f"{username}@example.com"
response = api.register_account(app, username, email, "password", "en")
response = api.register_account(app, username, email, PASSWORD, "en")
return User(app.instance, username, response["access_token"])

View File

@ -1,4 +1,5 @@
import json
from tests.integration.conftest import register_account
from toot import App, User, api, cli
from toot.entities import Account, Relationship, from_dict
@ -35,9 +36,8 @@ def test_whois(app: App, friend: User, run):
assert f"@{friend.username}" in result.stdout
def test_following(app: App, user: User, friend: User, friend_id, run):
# Make sure we're not initially following friend
api.unfollow(app, user, friend_id)
def test_following(app: App, user: User, run):
friend = register_account(app)
result = run(cli.accounts.following, user.username)
assert result.exit_code == 0
@ -84,9 +84,8 @@ def test_following_not_found(run):
assert result.stderr.strip() == "Error: Account not found"
def test_following_json(app: App, user: User, friend: User, user_id, friend_id, run_json):
# Make sure we're not initially following friend
api.unfollow(app, user, friend_id)
def test_following_json(app: App, user: User, user_id, run_json):
friend = register_account(app)
result = run_json(cli.accounts.following, user.username, "--json")
assert result == []
@ -96,24 +95,26 @@ def test_following_json(app: App, user: User, friend: User, user_id, friend_id,
result = run_json(cli.accounts.follow, friend.username, "--json")
relationship = from_dict(Relationship, result)
assert relationship.id == friend_id
assert relationship.following is True
[result] = run_json(cli.accounts.following, user.username, "--json")
relationship = from_dict(Relationship, result)
assert relationship.id == friend_id
account = from_dict(Account, result)
assert account.acct == friend.username
# If no account is given defaults to logged in user
[result] = run_json(cli.accounts.following, user.username, "--json")
relationship = from_dict(Relationship, result)
assert relationship.id == friend_id
[result] = run_json(cli.accounts.following, "--json")
account = from_dict(Account, result)
assert account.acct == friend.username
assert relationship.following is True
[result] = run_json(cli.accounts.followers, friend.username, "--json")
assert result["id"] == user_id
account = from_dict(Account, result)
assert account.acct == user.username
result = run_json(cli.accounts.unfollow, friend.username, "--json")
assert result["id"] == friend_id
assert result["following"] is False
relationship = from_dict(Relationship, result)
assert relationship.following is False
result = run_json(cli.accounts.following, user.username, "--json")
assert result == []
@ -200,9 +201,8 @@ def test_mute_json(app: App, user: User, friend: User, run_json, friend_id):
assert result == []
def test_block(app, user, friend, friend_id, run):
# Make sure we're not initially blocking friend
api.unblock(app, user, friend_id)
def test_block(app, user, run):
friend = register_account(app)
result = run(cli.accounts.blocked)
assert result.exit_code == 0

View File

@ -3,7 +3,7 @@ from unittest import mock
from unittest.mock import MagicMock
from toot import User, cli
from tests.integration.conftest import Run
from tests.integration.conftest import PASSWORD, Run
# TODO: figure out how to test login
@ -89,7 +89,7 @@ def test_login_cli(
cli.auth.login_cli,
"--instance", "http://localhost:3000",
"--email", f"{user.username}@example.com",
"--password", "password",
"--password", PASSWORD,
)
assert result.exit_code == 0
assert "✓ Successfully logged in." in result.stdout

View File

@ -1,8 +1,12 @@
import click
import pytest
import sys
from toot.cli.validators import validate_duration
from toot.wcstring import wc_wrap, trunc, pad, fit_text
from toot.tui.utils import LRUCache
from PIL import Image
from collections import namedtuple
from toot.utils import urlencode_url
@ -207,6 +211,111 @@ def test_duration():
duration("banana")
def test_cache_null():
"""Null dict is null."""
cache = LRUCache(cache_max_bytes=1024)
assert cache.__len__() == 0
Case = namedtuple("Case", ["cache_len", "len", "init"])
img = Image.new('RGB', (100, 100))
img_size = sys.getsizeof(img.tobytes())
@pytest.mark.parametrize(
"case",
[
Case(9, 0, []),
Case(9, 1, [("one", img)]),
Case(9, 2, [("one", img), ("two", img)]),
Case(2, 2, [("one", img), ("two", img)]),
Case(1, 1, [("one", img), ("two", img)]),
],
)
@pytest.mark.parametrize("method", ["assign", "init"])
def test_cache_init(case, method):
"""Check that the # of elements is right, given # given and cache_len."""
if method == "init":
cache = LRUCache(case.init, cache_max_bytes=img_size * case.cache_len)
elif method == "assign":
cache = LRUCache(cache_max_bytes=img_size * case.cache_len)
for (key, val) in case.init:
cache[key] = val
else:
assert False
# length is max(#entries, cache_len)
assert cache.__len__() == case.len
# make sure the first entry is the one ejected
if case.cache_len > 1 and case.init:
assert "one" in cache.keys()
else:
assert "one" not in cache.keys()
@pytest.mark.parametrize("method", ["init", "assign"])
def test_cache_overflow_default(method):
"""Test default overflow logic."""
if method == "init":
cache = LRUCache([("one", img), ("two", img), ("three", img)], cache_max_bytes=img_size * 2)
elif method == "assign":
cache = LRUCache(cache_max_bytes=img_size * 2)
cache["one"] = img
cache["two"] = img
cache["three"] = img
else:
assert False
assert "one" not in cache.keys()
assert "two" in cache.keys()
assert "three" in cache.keys()
@pytest.mark.parametrize("mode", ["get", "set"])
@pytest.mark.parametrize("add_third", [False, True])
def test_cache_lru_overflow(mode, add_third):
img = Image.new('RGB', (100, 100))
img_size = sys.getsizeof(img.tobytes())
"""Test that key access resets LRU logic."""
cache = LRUCache([("one", img), ("two", img)], cache_max_bytes=img_size * 2)
if mode == "get":
dummy = cache["one"]
elif mode == "set":
cache["one"] = img
else:
assert False
if add_third:
cache["three"] = img
assert "one" in cache.keys()
assert "two" not in cache.keys()
assert "three" in cache.keys()
else:
assert "one" in cache.keys()
assert "two" in cache.keys()
assert "three" not in cache.keys()
def test_cache_keyerror():
cache = LRUCache()
with pytest.raises(KeyError):
cache["foo"]
def test_cache_miss_doesnt_eject():
cache = LRUCache([("one", img), ("two", img)], cache_max_bytes=img_size * 3)
with pytest.raises(KeyError):
cache["foo"]
assert len(cache) == 2
assert "one" in cache.keys()
assert "two" in cache.keys()
def test_urlencode_url():
assert urlencode_url("https://www.example.com") == "https://www.example.com"
assert urlencode_url("https://www.example.com/url%20with%20spaces") == "https://www.example.com/url%20with%20spaces"

View File

@ -3,28 +3,13 @@ Helpers for testing.
"""
import time
from typing import Any, Callable
from typing import Callable, TypeVar
class MockResponse:
def __init__(self, response_data={}, ok=True, is_redirect=False):
self.response_data = response_data
self.content = response_data
self.ok = ok
self.is_redirect = is_redirect
def raise_for_status(self):
pass
def json(self):
return self.response_data
T = TypeVar("T")
def retval(val):
return lambda *args, **kwargs: val
def run_with_retries(fn: Callable[..., Any]):
def run_with_retries(fn: Callable[..., T]) -> T:
"""
Run the the given function repeatedly until it finishes without raising an
AssertionError. Sleep a bit between attempts. If the function doesn't
@ -41,4 +26,4 @@ def run_with_retries(fn: Callable[..., Any]):
except AssertionError:
time.sleep(delay)
fn()
return fn()

View File

@ -4,7 +4,7 @@ import sys
from os.path import join, expanduser
from typing import NamedTuple
__version__ = '0.41.1'
__version__ = '0.42.0'
class App(NamedTuple):

View File

@ -22,7 +22,7 @@ T = t.TypeVar("T")
PRIVACY_CHOICES = ["public", "unlisted", "private"]
VISIBILITY_CHOICES = ["public", "unlisted", "private", "direct"]
IMAGE_FORMAT_CHOICES = ["block", "iterm", "kitty"]
TUI_COLORS = {
"1": 1,
"16": 16,

View File

@ -3,6 +3,7 @@ import json as pyjson
from toot import api, config
from toot.cli import Context, cli, pass_context, json_option
from toot.entities import from_dict_list, List
from toot.output import print_list_accounts, print_lists, print_warning
@ -18,7 +19,8 @@ def lists(ctx: click.Context):
if not user or not app:
raise click.ClickException("This command requires you to be logged in.")
lists = api.get_lists(app, user)
data = api.get_lists(app, user)
lists = from_dict_list(List, data)
if lists:
print_lists(lists)
else:
@ -30,12 +32,13 @@ def lists(ctx: click.Context):
@pass_context
def list(ctx: Context, json: bool):
"""List all your lists"""
lists = api.get_lists(ctx.app, ctx.user)
data = api.get_lists(ctx.app, ctx.user)
if json:
click.echo(pyjson.dumps(lists))
click.echo(pyjson.dumps(data))
else:
if lists:
if data:
lists = from_dict_list(List, data)
print_lists(lists)
else:
click.echo("You have no lists defined.")

View File

@ -145,7 +145,7 @@ def post(
else:
user, app = ctx.user, ctx.app
media_ids = _upload_media(ctx.app, ctx.user, media, descriptions, thumbnails)
media_ids = _upload_media(app, user, media, descriptions, thumbnails)
status_text = _get_status_text(text, editor, media)
scheduled_at = _get_scheduled_at(scheduled_at, scheduled_in)

View File

@ -111,7 +111,10 @@ def bookmarks(
@cli.command()
@click.option("--clear", help="Dismiss all notifications and exit")
@click.option(
"--clear", is_flag=True,
help="Dismiss all notifications and exit"
)
@click.option(
"--reverse", "-r", is_flag=True,
help="Reverse the order of the shown notifications (newest on top)"

View File

@ -1,8 +1,8 @@
import click
from typing import Optional
from toot.cli import TUI_COLORS, VISIBILITY_CHOICES, Context, cli, pass_context
from toot.cli.validators import validate_tui_colors
from toot.cli import TUI_COLORS, VISIBILITY_CHOICES, IMAGE_FORMAT_CHOICES, Context, cli, pass_context
from toot.cli.validators import validate_tui_colors, validate_cache_size
from toot.tui.app import TUI, TuiOptions
COLOR_OPTIONS = ", ".join(TUI_COLORS.keys())
@ -24,16 +24,27 @@ COLOR_OPTIONS = ", ".join(TUI_COLORS.keys())
help=f"""Number of colors to use, one of {COLOR_OPTIONS}, defaults to 16 if
using --color, and 1 if using --no-color."""
)
@click.option(
"-s", "--cache-size",
callback=validate_cache_size,
help="""Specify the image cache maximum size in megabytes. Default: 10MB.
Minimum: 1MB."""
)
@click.option(
"-v", "--default-visibility",
type=click.Choice(VISIBILITY_CHOICES),
help="Default visibility when posting new toots; overrides the server-side preference"
)
@click.option(
"-S", "--always-show-sensitive",
"-s", "--always-show-sensitive",
is_flag=True,
help="Expand toots with content warnings automatically"
)
@click.option(
"-f", "--image-format",
type=click.Choice(IMAGE_FORMAT_CHOICES),
help="Image output format; support varies across terminals. Default: block"
)
@pass_context
def tui(
ctx: Context,
@ -41,7 +52,9 @@ def tui(
media_viewer: Optional[str],
always_show_sensitive: bool,
relative_datetimes: bool,
default_visibility: Optional[str]
cache_size: Optional[int],
default_visibility: Optional[str],
image_format: Optional[str]
):
"""Launches the toot terminal user interface"""
if colors is None:
@ -51,8 +64,10 @@ def tui(
colors=colors,
media_viewer=media_viewer,
relative_datetimes=relative_datetimes,
cache_size=cache_size,
default_visibility=default_visibility,
always_show_sensitive=always_show_sensitive,
image_format=image_format,
)
tui = TUI.create(ctx.app, ctx.user, options)
tui.run()

View File

@ -73,3 +73,21 @@ def validate_tui_colors(ctx, param, value) -> Optional[int]:
return TUI_COLORS[value]
raise click.BadParameter(f"Invalid value: {value}. Expected one of: {', '.join(TUI_COLORS)}")
def validate_cache_size(ctx: click.Context, param: str, value: Optional[str]) -> Optional[int]:
"""validates the cache size parameter"""
if value is None:
return 1024 * 1024 * 10 # default 10MB
else:
if value.isdigit():
size = int(value)
else:
raise click.BadParameter("Cache size must be numeric.")
if size > 1024:
raise click.BadParameter("Cache size too large: 1024MB maximum.")
elif size < 1:
raise click.BadParameter("Cache size too small: 1MB minimum.")
return size

View File

@ -17,11 +17,11 @@ def get_config_file_path():
return join(get_config_dir(), TOOT_CONFIG_FILE_NAME)
def user_id(user):
def user_id(user: User):
return "{}@{}".format(user.username, user.instance)
def make_config(path):
def make_config(path: str):
"""Creates an empty toot configuration file."""
config = {
"apps": {},
@ -58,7 +58,7 @@ def save_config(config):
return json.dump(config, f, indent=True, sort_keys=True)
def extract_user_app(config, user_id):
def extract_user_app(config, user_id: str):
if user_id not in config['users']:
return None, None
@ -82,7 +82,7 @@ def get_active_user_app():
return None, None
def get_user_app(user_id):
def get_user_app(user_id: str):
"""Returns (User, App) for given user ID or (None, None) if user is not logged in."""
return extract_user_app(load_config(), user_id)
@ -93,7 +93,7 @@ def load_app(instance: str) -> Optional[App]:
return App(**config['apps'][instance])
def load_user(user_id, throw=False):
def load_user(user_id: str, throw=False):
config = load_config()
if user_id in config['users']:
@ -120,7 +120,7 @@ def save_app(app: App):
config['apps'][app.instance] = app._asdict()
def delete_app(config, app):
def delete_app(config, app: App):
with edit_config() as config:
config['apps'].pop(app.instance, None)

View File

@ -9,11 +9,12 @@ different versions of the Mastodon API.
"""
import dataclasses
import typing as t
from dataclasses import dataclass, is_dataclass
from datetime import date, datetime
from functools import lru_cache
from typing import Any, Dict, List, Optional, Tuple, Type, TypeVar, Union
from typing import Any, Dict, Optional, Tuple, Type, TypeVar, Union
from typing import get_type_hints
from toot.typing_compat import get_args, get_origin
@ -59,8 +60,8 @@ class Account:
header: str
header_static: str
locked: bool
fields: List[AccountField]
emojis: List[CustomEmoji]
fields: t.List[AccountField]
emojis: t.List[CustomEmoji]
bot: bool
group: bool
discoverable: Optional[bool]
@ -154,10 +155,10 @@ class Poll:
multiple: bool
votes_count: int
voters_count: Optional[int]
options: List[PollOption]
emojis: List[CustomEmoji]
options: t.List[PollOption]
emojis: t.List[CustomEmoji]
voted: Optional[bool]
own_votes: Optional[List[int]]
own_votes: Optional[t.List[int]]
@dataclass
@ -207,11 +208,11 @@ class Filter:
"""
id: str
title: str
context: List[str]
context: t.List[str]
expires_at: Optional[datetime]
filter_action: str
keywords: List[FilterKeyword]
statuses: List[FilterStatus]
keywords: t.List[FilterKeyword]
statuses: t.List[FilterStatus]
@dataclass
@ -220,7 +221,7 @@ class FilterResult:
https://docs.joinmastodon.org/entities/FilterResult/
"""
filter: Filter
keyword_matches: Optional[List[str]]
keyword_matches: Optional[t.List[str]]
status_matches: Optional[str]
@ -237,11 +238,11 @@ class Status:
visibility: str
sensitive: bool
spoiler_text: str
media_attachments: List[MediaAttachment]
media_attachments: t.List[MediaAttachment]
application: Optional[Application]
mentions: List[StatusMention]
tags: List[StatusTag]
emojis: List[CustomEmoji]
mentions: t.List[StatusMention]
tags: t.List[StatusTag]
emojis: t.List[CustomEmoji]
reblogs_count: int
favourites_count: int
replies_count: int
@ -259,7 +260,7 @@ class Status:
muted: Optional[bool]
bookmarked: Optional[bool]
pinned: Optional[bool]
filtered: Optional[List[FilterResult]]
filtered: Optional[t.List[FilterResult]]
@property
def original(self) -> "Status":
@ -289,8 +290,8 @@ class Report:
comment: str
forwarded: bool
created_at: datetime
status_ids: Optional[List[str]]
rule_ids: Optional[List[str]]
status_ids: Optional[t.List[str]]
rule_ids: Optional[t.List[str]]
target_account: Account
@ -328,7 +329,7 @@ class InstanceConfigurationStatuses:
@dataclass
class InstanceConfigurationMediaAttachments:
supported_mime_types: List[str]
supported_mime_types: t.List[str]
image_size_limit: int
image_matrix_limit: int
video_size_limit: int
@ -377,13 +378,13 @@ class Instance:
urls: InstanceUrls
stats: InstanceStats
thumbnail: Optional[str]
languages: List[str]
languages: t.List[str]
registrations: bool
approval_required: bool
invites_enabled: bool
configuration: InstanceConfiguration
contact_account: Optional[Account]
rules: List[Rule]
rules: t.List[Rule]
@dataclass
@ -397,7 +398,7 @@ class Relationship:
following: bool
showing_reblogs: bool
notifying: bool
languages: List[str]
languages: t.List[str]
followed_by: bool
blocking: bool
blocked_by: bool
@ -428,7 +429,7 @@ class Tag:
"""
name: str
url: str
history: List[TagHistory]
history: t.List[TagHistory]
following: Optional[bool]
@ -445,6 +446,19 @@ class FeaturedTag:
last_status_at: datetime
@dataclass
class List:
"""
Represents a list of some users that the authenticated user follows.
https://docs.joinmastodon.org/entities/List/
"""
id: str
title: str
# This is a required field on Mastodon, but not supported on Pleroma/Akkoma
# see: https://git.pleroma.social/pleroma/pleroma/-/issues/2918
replies_policy: Optional[str]
# Generic data class instance
T = TypeVar("T")
@ -481,7 +495,7 @@ def from_dict(cls: Type[T], data: Dict) -> T:
@lru_cache(maxsize=100)
def get_fields(cls: Type) -> List[Tuple[str, Type, Any]]:
def get_fields(cls: Type) -> t.List[Tuple[str, Type, Any]]:
hints = get_type_hints(cls)
return [
(
@ -493,7 +507,7 @@ def get_fields(cls: Type) -> List[Tuple[str, Type, Any]]:
]
def from_dict_list(cls: Type[T], data: List[Dict]) -> List[T]:
def from_dict_list(cls: Type[T], data: t.List[Dict]) -> t.List[T]:
return [from_dict(cls, x) for x in data]

View File

@ -1,12 +1,12 @@
import click
import re
import textwrap
import shutil
import textwrap
import typing as t
from toot.entities import Account, Instance, Notification, Poll, Status
from toot.entities import Account, Instance, Notification, Poll, Status, List
from toot.utils import get_text, html_to_paragraphs
from toot.wcstring import wc_wrap
from typing import Any, Generator, Iterable, List
from wcwidth import wcswidth
@ -38,7 +38,7 @@ def instance_to_text(instance: Instance, width: int) -> str:
return "\n".join(instance_lines(instance, width))
def instance_lines(instance: Instance, width: int) -> Generator[str, None, None]:
def instance_lines(instance: Instance, width: int) -> t.Generator[str, None, None]:
yield f"{green(instance.title)}"
yield f"{blue(instance.uri)}"
yield f"running Mastodon {instance.version}"
@ -78,7 +78,7 @@ def account_to_text(account: Account, width: int) -> str:
return "\n".join(account_lines(account, width))
def account_lines(account: Account, width: int) -> Generator[str, None, None]:
def account_lines(account: Account, width: int) -> t.Generator[str, None, None]:
acct = f"@{account.acct}"
since = account.created_at.strftime("%Y-%m-%d")
@ -119,13 +119,13 @@ def print_tag_list(tags):
click.echo(f"* {format_tag_name(tag)}\t{tag['url']}")
def print_lists(lists):
def print_lists(lists: t.List[List]):
headers = ["ID", "Title", "Replies"]
data = [[lst["id"], lst["title"], lst["replies_policy"]] for lst in lists]
data = [[lst.id, lst.title, lst.replies_policy or ""] for lst in lists]
print_table(headers, data)
def print_table(headers: List[str], data: List[List[str]]):
def print_table(headers: t.List[str], data: t.List[t.List[str]]):
widths = [[len(cell) for cell in row] for row in data + [headers]]
widths = [max(width) for width in zip(*widths)]
@ -178,7 +178,7 @@ def status_to_text(status: Status, width: int) -> str:
return "\n".join(status_lines(status))
def status_lines(status: Status) -> Generator[str, None, None]:
def status_lines(status: Status) -> t.Generator[str, None, None]:
width = get_width()
status_id = status.id
in_reply_to_id = status.in_reply_to_id
@ -219,10 +219,10 @@ def status_lines(status: Status) -> Generator[str, None, None]:
reply = f"↲ In reply to {yellow(in_reply_to_id)} " if in_reply_to_id else ""
boost = f"{blue(reblogged_by_acct)} boosted " if reblogged_by else ""
yield f"ID {yellow(status_id)} {reply} {boost}"
yield f"ID {yellow(status_id)} Visibility: {status.visibility} {reply} {boost}"
def html_lines(html: str, width: int) -> Generator[str, None, None]:
def html_lines(html: str, width: int) -> t.Generator[str, None, None]:
first = True
for paragraph in html_to_paragraphs(html):
if not first:
@ -233,7 +233,7 @@ def html_lines(html: str, width: int) -> Generator[str, None, None]:
first = False
def poll_lines(poll: Poll) -> Generator[str, None, None]:
def poll_lines(poll: Poll) -> t.Generator[str, None, None]:
for idx, option in enumerate(poll.options):
perc = (round(100 * option.votes_count / poll.votes_count)
if poll.votes_count and option.votes_count is not None else 0)
@ -258,7 +258,7 @@ def poll_lines(poll: Poll) -> Generator[str, None, None]:
yield poll_footer
def print_timeline(items: Iterable[Status]):
def print_timeline(items: t.Iterable[Status]):
print_divider()
for item in items:
print_status(item)
@ -272,7 +272,7 @@ def print_notification(notification: Notification):
print_status(notification.status)
def print_notifications(notifications: List[Notification]):
def print_notifications(notifications: t.List[Notification]):
for notification in notifications:
if notification.type not in ['pleroma:emoji_reaction']:
print_divider()
@ -316,25 +316,25 @@ def format_account_name(account: Account) -> str:
# Shorthand functions for coloring output
def blue(text: Any) -> str:
def blue(text: t.Any) -> str:
return click.style(text, fg="blue")
def bold(text: Any) -> str:
def bold(text: t.Any) -> str:
return click.style(text, bold=True)
def cyan(text: Any) -> str:
def cyan(text: t.Any) -> str:
return click.style(text, fg="cyan")
def dim(text: Any) -> str:
def dim(text: t.Any) -> str:
return click.style(text, dim=True)
def green(text: Any) -> str:
def green(text: t.Any) -> str:
return click.style(text, fg="green")
def yellow(text: Any) -> str:
def yellow(text: t.Any) -> str:
return click.style(text, fg="yellow")

View File

@ -2,6 +2,7 @@ import logging
import subprocess
import urwid
from concurrent.futures import ThreadPoolExecutor
from typing import NamedTuple, Optional
from datetime import datetime, timezone
@ -15,11 +16,12 @@ from toot.utils.datetime import parse_datetime
from .compose import StatusComposer
from .constants import PALETTE
from .entities import Status
from .images import TuiScreen, load_image
from .overlays import ExceptionStackTrace, GotoMenu, Help, StatusSource, StatusLinks, StatusZoom
from .overlays import StatusDeleteConfirmation, Account
from .poll import Poll
from .timeline import Timeline
from .utils import get_max_toot_chars, parse_content_links, copy_to_clipboard
from .utils import get_max_toot_chars, parse_content_links, copy_to_clipboard, LRUCache
from .widgets import ModalBox, RoundedLineBox
logger = logging.getLogger(__name__)
@ -35,7 +37,9 @@ class TuiOptions(NamedTuple):
media_viewer: Optional[str]
always_show_sensitive: bool
relative_datetimes: bool
default_visibility: Optional[bool]
cache_size: int
default_visibility: Optional[str]
image_format: Optional[str]
class Header(urwid.WidgetWrap):
@ -62,9 +66,13 @@ class Header(urwid.WidgetWrap):
class Footer(urwid.Pile):
def __init__(self):
def __init__(self, tui):
self.tui = tui
self.status = urwid.Text("")
self.message = urwid.Text("")
self.command = Command(tui)
urwid.connect_signal(self.command, "close", self.end_command)
return super().__init__([
urwid.AttrMap(self.status, "footer_status"),
@ -86,6 +94,60 @@ class Footer(urwid.Pile):
def clear_message(self):
self.message.set_text("")
def start_command(self):
self.clear_message()
self.command.set_edit_text("")
self.contents[1] = (self.command, ("weight", 1))
self.focus_position = 1
def end_command(self, widget, success, message):
self.contents[1] = (self.message, ("weight", 1))
self.tui.focus_body()
if message:
if success:
self.set_message(message)
else:
self.set_error_message(message)
class Command(urwid.Edit):
"""Allows execution of vim-like commands in the footer"""
signals = ["close"]
tui: "TUI"
def __init__(self, tui):
self.tui = tui
super().__init__(":")
def keypress(self, size, key):
logger.debug((size, key))
if key == "enter":
self.run_command()
if key == "esc":
self.close()
return super().keypress(size, key)
def close(self, success=True, message=None):
self._emit("close", success, message)
def run_command(self):
command = self.get_edit_text()
if command in ("q", "quit"):
raise urwid.ExitMainLoop()
elif command in ("h", "help"):
self.tui.show_help()
self.close()
else:
self.close(False, f"Unknown command: {command}")
class TUI(urwid.Frame):
"""Main TUI frame."""
@ -95,7 +157,7 @@ class TUI(urwid.Frame):
@staticmethod
def create(app: App, user: User, args: TuiOptions):
"""Factory method, sets up TUI and an event loop."""
screen = urwid.raw_display.Screen()
screen = TuiScreen()
screen.set_terminal_properties(args.colors)
tui = TUI(app, user, screen, args)
@ -130,7 +192,7 @@ class TUI(urwid.Frame):
# Show intro screen while toots are being loaded
self.body = self.build_intro()
self.header = Header(app, user)
self.footer = Footer()
self.footer = Footer(self)
self.footer.set_status("Loading...")
# Default max status length, updated on startup
@ -144,6 +206,11 @@ class TUI(urwid.Frame):
self.followed_accounts = []
self.preferences = {}
if self.options.cache_size:
self.cache_max = 1024 * 1024 * self.options.cache_size
else:
self.cache_max = 1024 * 1024 * 10 # default 10MB
super().__init__(self.body, header=self.header, footer=self.footer)
def run(self):
@ -327,8 +394,10 @@ class TUI(urwid.Frame):
# get the major version number of the server
# this works for Mastodon and Pleroma version strings
# Mastodon versions < 4 do not have translation service
# If the version is missing, assume 0 as a fallback
# Revisit this logic if Pleroma implements translation
ch = instance["version"][0]
version = instance["version"]
ch = "0" if not version else version[0]
self.can_translate = int(ch) > 3 if ch.isnumeric() else False
return self.run_in_thread(_load_instance, done_callback=_done)
@ -646,7 +715,7 @@ class TUI(urwid.Frame):
account = api.whois(self.app, self.user, account_id)
relationship = api.get_relationship(self.app, self.user, account_id)
self.open_overlay(
widget=Account(self.app, self.user, account, relationship),
widget=Account(self.app, self.user, account, relationship, self.options),
title="Account",
)
@ -755,6 +824,27 @@ class TUI(urwid.Frame):
return self.run_in_thread(_delete, done_callback=_done)
def async_load_image(self, timeline, status, path, placeholder_index):
def _load():
# don't bother loading images for statuses we are not viewing now
if timeline.get_focused_status().id != status.id:
return
if not hasattr(timeline, "images"):
timeline.images = LRUCache(cache_max_bytes=self.cache_max)
img = load_image(path)
if img:
timeline.images[str(hash(path))] = img
def _done(loop):
# don't bother loading images for statuses we are not viewing now
if timeline.get_focused_status().id != status.id:
return
timeline.update_status_image(status, path, placeholder_index)
return self.run_in_thread(_load, done_callback=_done)
def copy_status(self, status):
# TODO: copy a better version of status content
# including URLs
@ -820,6 +910,12 @@ class TUI(urwid.Frame):
self.async_load_timeline(is_initial=True, timeline_name=self.timeline.name)
def focus_footer(self):
self.focus_part = "footer"
def focus_body(self):
self.focus_part = "body"
# --- Keys -----------------------------------------------------------------
def unhandled_input(self, key):
@ -854,3 +950,7 @@ class TUI(urwid.Frame):
self.close_overlay()
else:
raise urwid.ExitMainLoop()
elif key == ":" and not self.overlay:
self.focus_footer()
self.footer.start_command()

View File

@ -53,7 +53,7 @@ class Status:
self.id = self.data["id"]
self.account = self._get_account()
self.created_at = parse_datetime(data["created_at"])
if data["edited_at"]:
if data.get("edited_at"):
self.edited_at = parse_datetime(data["edited_at"])
else:
self.edited_at = None

104
toot/tui/images.py Normal file
View File

@ -0,0 +1,104 @@
import urwid
import math
import requests
import warnings
# If term_image is loaded use their screen implementation which handles images
try:
from term_image.widget import UrwidImageScreen, UrwidImage
from term_image.image import BaseImage, KittyImage, ITerm2Image, BlockImage
from term_image import disable_queries # prevent phantom keystrokes
from PIL import Image, ImageDraw
TuiScreen = UrwidImageScreen
disable_queries()
def image_support_enabled():
return True
def can_render_pixels(image_format):
return image_format in ['kitty', 'iterm']
def get_base_image(image, image_format) -> BaseImage:
# we don't autodetect kitty, iterm; we choose based on option switches
BaseImage.forced_support = True
if image_format == 'kitty':
return KittyImage(image)
elif image_format == 'iterm':
return ITerm2Image(image)
else:
return BlockImage(image)
def resize_image(basewidth: int, baseheight: int, img: Image.Image) -> Image.Image:
if baseheight and not basewidth:
hpercent = baseheight / float(img.size[1])
width = math.ceil(img.size[0] * hpercent)
img = img.resize((width, baseheight), Image.Resampling.LANCZOS)
elif basewidth and not baseheight:
wpercent = (basewidth / float(img.size[0]))
hsize = int((float(img.size[1]) * float(wpercent)))
img = img.resize((basewidth, hsize), Image.Resampling.LANCZOS)
else:
img = img.resize((basewidth, baseheight), Image.Resampling.LANCZOS)
if img.mode != 'P':
img = img.convert('RGB')
return img
def add_corners(img, rad):
circle = Image.new('L', (rad * 2, rad * 2), 0)
draw = ImageDraw.Draw(circle)
draw.ellipse((0, 0, rad * 2, rad * 2), fill=255)
alpha = Image.new('L', img.size, "white")
w, h = img.size
alpha.paste(circle.crop((0, 0, rad, rad)), (0, 0))
alpha.paste(circle.crop((0, rad, rad, rad * 2)), (0, h - rad))
alpha.paste(circle.crop((rad, 0, rad * 2, rad)), (w - rad, 0))
alpha.paste(circle.crop((rad, rad, rad * 2, rad * 2)), (w - rad, h - rad))
img.putalpha(alpha)
return img
def load_image(url):
with warnings.catch_warnings():
warnings.simplefilter("ignore") # suppress "corrupt exif" output from PIL
try:
img = Image.open(requests.get(url, stream=True).raw)
if img.format == 'PNG' and img.mode != 'RGBA':
img = img.convert("RGBA")
return img
except Exception:
return None
def graphics_widget(img, image_format="block", corner_radius=0) -> urwid.Widget:
if not img:
return urwid.SolidFill(fill_char=" ")
if can_render_pixels(image_format) and corner_radius > 0:
render_img = add_corners(img, 10)
else:
render_img = img
return UrwidImage(get_base_image(render_img, image_format), '<', upscale=True)
# "<" means left-justify the image
except ImportError:
from urwid.raw_display import Screen
TuiScreen = Screen
def image_support_enabled():
return False
def can_render_pixels(image_format: str):
return False
def get_base_image(image, image_format: str):
return None
def add_corners(img, rad):
return None
def load_image(url):
return None
def graphics_widget(img, image_format="block", corner_radius=0) -> urwid.Widget:
return urwid.SolidFill(fill_char=" ")

View File

@ -5,7 +5,9 @@ import webbrowser
from toot import __version__
from toot import api
from toot.tui.utils import highlight_keys
from toot.tui.images import image_support_enabled, load_image, graphics_widget
from toot.tui.widgets import Button, EditBox, SelectableText
from toot.tui.richtext import html_to_widgets
@ -242,11 +244,12 @@ class Help(urwid.Padding):
class Account(urwid.ListBox):
"""Shows account data and provides various actions"""
def __init__(self, app, user, account, relationship):
def __init__(self, app, user, account, relationship, options):
self.app = app
self.user = user
self.account = account
self.relationship = relationship
self.options = options
self.last_action = None
self.setup_listbox()
@ -255,6 +258,30 @@ class Account(urwid.ListBox):
walker = urwid.SimpleListWalker(actions)
super().__init__(walker)
def account_header(self, account):
if image_support_enabled() and account['avatar'] and not account["avatar"].endswith("missing.png"):
img = load_image(account['avatar'])
aimg = urwid.BoxAdapter(
graphics_widget(img, image_format=self.options.image_format, corner_radius=10), 10)
else:
aimg = urwid.BoxAdapter(urwid.SolidFill(" "), 10)
if image_support_enabled() and account['header'] and not account["header"].endswith("missing.png"):
img = load_image(account['header'])
himg = (urwid.BoxAdapter(
graphics_widget(img, image_format=self.options.image_format, corner_radius=10), 10))
else:
himg = urwid.BoxAdapter(urwid.SolidFill(" "), 10)
atxt = urwid.Pile([urwid.Divider(),
(urwid.Text(("account", account["display_name"]))),
(urwid.Text(("highlight", "@" + self.account['acct'])))])
columns = urwid.Columns([aimg, ("weight", 9999, himg)], dividechars=2, min_width=20)
header = urwid.Pile([columns, urwid.Divider(), atxt])
return header
def generate_contents(self, account, relationship=None, last_action=None):
if self.last_action and not self.last_action.startswith("Confirm"):
yield Button(f"Confirm {self.last_action}", on_press=take_action, user_data=self)
@ -276,11 +303,11 @@ class Account(urwid.ListBox):
yield urwid.Divider("")
yield urwid.Divider()
yield urwid.Text([("account", f"@{account['acct']}"), f" {account['display_name']}"])
yield self.account_header(account)
if account["note"]:
yield urwid.Divider()
widgetlist = html_to_widgets(account["note"])
for line in widgetlist:
yield (line)

View File

@ -60,7 +60,10 @@ def html_to_widgets(html, recovery_attempt=False) -> List[urwid.Widget]:
def url_to_widget(url: str):
widget = len(url), urwid.Filler(Hyperlink(url, "link", url))
try:
widget = len(url), urwid.Filler(Hyperlink(url, "link", url))
except ValueError:
widget = len(url), urwid.Filler(urwid.Text(url)) # don't style as link
return TextEmbed(widget)
@ -98,10 +101,16 @@ def text_to_widget(attr, markup) -> urwid.Widget:
if match:
label, url = match.groups()
anchor_attr = get_best_anchor_attr(attr_list)
markup_list.append((
len(label),
urwid.Filler(Hyperlink(url, anchor_attr, label)),
))
try:
markup_list.append((
len(label),
urwid.Filler(Hyperlink(url, anchor_attr, label)),
))
except ValueError:
markup_list.append((
len(label),
urwid.Filler(urwid.Text(url)), # don't style as link
))
else:
markup_list.append(run)
else:

View File

@ -1,26 +1,33 @@
import logging
import math
import urwid
import webbrowser
from typing import List, Optional
from toot.tui import app
from toot.tui.richtext import html_to_widgets, url_to_widget
from toot.utils.datetime import parse_datetime, time_ago
from toot.utils.language import language_name
from toot.entities import Status
from toot.tui.scroll import Scrollable, ScrollBar
from toot.tui.utils import highlight_keys
from toot.tui.images import image_support_enabled, graphics_widget, can_render_pixels
from toot.tui.widgets import SelectableText, SelectableColumns, RoundedLineBox
logger = logging.getLogger("toot")
screen = urwid.raw_display.Screen()
class Timeline(urwid.Columns):
"""
Displays a list of statuses to the left, and status details on the right.
"""
signals = [
"close", # Close thread
"focus", # Focus changed
@ -41,6 +48,7 @@ class Timeline(urwid.Columns):
self.is_thread = is_thread
self.statuses = statuses
self.status_list = self.build_status_list(statuses, focus=focus)
self.can_render_pixels = can_render_pixels(self.tui.options.image_format)
try:
focused_status = statuses[focus]
@ -141,6 +149,16 @@ class Timeline(urwid.Columns):
def modified(self):
"""Called when the list focus switches to a new status"""
status, index, count = self.get_focused_status_with_counts()
if image_support_enabled:
clear_op = getattr(self.tui.screen, "clear_images", None)
# term-image's screen implementation has clear_images(),
# urwid's implementation does not.
# TODO: it would be nice not to check this each time thru
if callable(clear_op):
self.tui.screen.clear_images()
self.draw_status_details(status)
self._emit("focus")
@ -282,7 +300,7 @@ class Timeline(urwid.Columns):
def get_status_index(self, id):
# TODO: This is suboptimal, consider a better way
for n, status in enumerate(self.statuses):
for n, status in enumerate(self.statuses.copy()):
if status.id == id:
return n
raise ValueError("Status with ID {} not found".format(id))
@ -306,6 +324,27 @@ class Timeline(urwid.Columns):
if index == self.status_list.body.focus:
self.draw_status_details(status)
def update_status_image(self, status, path, placeholder_index):
"""Replace image placeholder with image widget and redraw"""
index = self.get_status_index(status.id)
assert self.statuses[index].id == status.id # Sanity check
# get the image and replace the placeholder with a graphics widget
img = None
if hasattr(self, "images"):
try:
img = self.images[(str(hash(path)))]
except KeyError:
pass
if img:
try:
status.placeholders[placeholder_index]._set_original_widget(
graphics_widget(img, image_format=self.tui.options.image_format, corner_radius=10))
except IndexError:
# ignore IndexErrors.
pass
def remove_status(self, status):
index = self.get_status_index(status.id)
assert self.statuses[index].id == status.id # Sanity check
@ -318,6 +357,9 @@ class Timeline(urwid.Columns):
class StatusDetails(urwid.Pile):
def __init__(self, timeline: Timeline, status: Optional[Status]):
self.status = status
self.timeline = timeline
if self.status:
self.status.placeholders = []
self.followed_accounts = timeline.tui.followed_accounts
self.options = timeline.tui.options
@ -326,17 +368,83 @@ class StatusDetails(urwid.Pile):
if status else ())
return super().__init__(widget_list)
def image_widget(self, path, rows=None, aspect=None) -> urwid.Widget:
"""Returns a widget capable of displaying the image
path is required; URL to image
rows, if specfied, sets a fixed number of rows. Or:
aspect, if specified, calculates rows based on pane width
and the aspect ratio provided"""
if not rows:
if not aspect:
aspect = 3 / 2 # reasonable default
screen_rows = screen.get_cols_rows()[1]
if self.timeline.can_render_pixels:
# for pixel-rendered images,
# image rows should be 33% of the available screen
# but in no case fewer than 10
rows = max(10, math.floor(screen_rows * .33))
else:
# for cell-rendered images,
# use the max available columns
# and calculate rows based on the image
# aspect ratio
cols = math.floor(0.55 * screen.get_cols_rows()[0])
rows = math.ceil((cols / 2) / aspect)
# if the calculated rows are more than will
# fit on one screen, reduce to one screen of rows
rows = min(screen_rows - 6, rows)
# but in no case fewer than 10 rows
rows = max(rows, 10)
img = None
if hasattr(self.timeline, "images"):
try:
img = self.timeline.images[(str(hash(path)))]
except KeyError:
pass
if img:
return (urwid.BoxAdapter(
graphics_widget(img, image_format=self.timeline.tui.options.image_format, corner_radius=10), rows))
else:
placeholder = urwid.BoxAdapter(urwid.SolidFill(fill_char=" "), rows)
self.status.placeholders.append(placeholder)
if image_support_enabled():
self.timeline.tui.async_load_image(self.timeline, self.status, path, len(self.status.placeholders) - 1)
return placeholder
def author_header(self, reblogged_by):
avatar_url = self.status.original.data["account"]["avatar"]
if avatar_url and image_support_enabled():
aimg = self.image_widget(avatar_url, 2)
account_color = ("highlight" if self.status.original.author.account in
self.timeline.tui.followed_accounts else "account")
atxt = urwid.Pile([("pack", urwid.Text(("bold", self.status.original.author.display_name))),
("pack", urwid.Text((account_color, self.status.original.author.account)))])
if image_support_enabled():
columns = urwid.Columns([aimg, ("weight", 9999, atxt)], dividechars=1, min_width=5)
else:
columns = urwid.Columns([("weight", 9999, atxt)], dividechars=1, min_width=5)
return columns
def content_generator(self, status, reblogged_by):
if reblogged_by:
text = "{} boosted".format(reblogged_by.display_name or reblogged_by.username)
yield ("pack", urwid.Text(("dim", text)))
reblogger_name = (reblogged_by.display_name
if reblogged_by.display_name
else reblogged_by.username)
text = f"{reblogger_name} boosted"
yield urwid.Text(("dim", text))
yield ("pack", urwid.AttrMap(urwid.Divider("-"), "dim"))
if status.author.display_name:
yield ("pack", urwid.Text(("bold", status.author.display_name)))
account_color = "highlight" if status.author.account in self.followed_accounts else "account"
yield ("pack", urwid.Text((account_color, status.author.account)))
yield self.author_header(reblogged_by)
yield ("pack", urwid.Divider())
if status.data["spoiler_text"]:
@ -363,7 +471,27 @@ class StatusDetails(urwid.Pile):
yield ("pack", urwid.Text([("bold", "Media attachment"), " (", m["type"], ")"]))
if m["description"]:
yield ("pack", urwid.Text(m["description"]))
yield ("pack", url_to_widget(m["url"]))
if m["url"]:
if m["url"].lower().endswith(('.jpg', '.jpeg', '.png', '.gif', '.svg', '.webp')):
yield urwid.Text("")
try:
aspect = float(m["meta"]["original"]["aspect"])
except Exception:
aspect = None
if image_support_enabled():
yield self.image_widget(m["url"], aspect=aspect)
yield urwid.Divider()
# video media may include a preview URL, show that as a fallback
elif m["preview_url"].lower().endswith(('.jpg', '.jpeg', '.png', '.gif', '.svg', '.webp')):
yield urwid.Text("")
try:
aspect = float(m["meta"]["small"]["aspect"])
except Exception:
aspect = None
if image_support_enabled():
yield self.image_widget(m["preview_url"], aspect=aspect)
yield urwid.Divider()
yield ("pack", url_to_widget(m["url"]))
poll = status.original.data.get("poll")
if poll:
@ -427,6 +555,15 @@ class StatusDetails(urwid.Pile):
yield urwid.Text("")
yield url_to_widget(card["url"])
if card["image"] and image_support_enabled():
if card["image"].lower().endswith(('.jpg', '.jpeg', '.png', '.gif', '.svg', '.webp')):
yield urwid.Text("")
try:
aspect = int(card["width"]) / int(card["height"])
except Exception:
aspect = None
yield self.image_widget(card["image"], aspect=aspect)
def poll_generator(self, poll):
for idx, option in enumerate(poll["options"]):
perc = (round(100 * option["votes_count"] / poll["votes_count"])

View File

@ -1,7 +1,8 @@
import base64
import re
import sys
import urwid
from collections import OrderedDict
from functools import reduce
from html.parser import HTMLParser
from typing import List
@ -109,3 +110,33 @@ def deep_get(adict: dict, path: List[str], default=None):
path,
adict
)
class LRUCache(OrderedDict):
"""Dict with a limited size, ejecting LRUs as needed.
Default max size = 10Mb"""
def __init__(self, *args, cache_max_bytes: int = 1024 * 1024 * 10, **kwargs):
assert cache_max_bytes > 0
self.total_value_size = 0
self.cache_max_bytes = cache_max_bytes
super().__init__(*args, **kwargs)
def __setitem__(self, key: str, value):
if key in self:
self.total_value_size -= sys.getsizeof(super().__getitem__(key).tobytes())
self.total_value_size += sys.getsizeof(value.tobytes())
super().__setitem__(key, value)
super().move_to_end(key)
while self.total_value_size > self.cache_max_bytes:
old_key, value = next(iter(self.items()))
sz = sys.getsizeof(value.tobytes())
super().__delitem__(old_key)
self.total_value_size -= sz
def __getitem__(self, key: str):
val = super().__getitem__(key)
super().move_to_end(key)
return val

View File

@ -1,26 +1,22 @@
import click
import os
import re
import socket
import subprocess
import tempfile
import unicodedata
import warnings
from bs4 import BeautifulSoup
from typing import Any, Dict, List
import click
from toot.exceptions import ConsoleError
from typing import Any, Dict, Generator, List, Optional
from urllib.parse import urlparse, urlencode, quote, unquote
def str_bool(b):
def str_bool(b: bool) -> str:
"""Convert boolean to string, in the way expected by the API."""
return "true" if b else "false"
def str_bool_nullable(b):
def str_bool_nullable(b: Optional[bool]) -> Optional[str]:
"""Similar to str_bool, but leave None as None"""
return None if b is None else str_bool(b)
@ -34,7 +30,7 @@ def parse_html(html: str) -> BeautifulSoup:
return BeautifulSoup(html.replace("&apos;", "'"), "html.parser")
def get_text(html):
def get_text(html: str) -> str:
"""Converts html to text, strips all tags."""
text = parse_html(html).get_text()
return unicodedata.normalize("NFKC", text)
@ -53,7 +49,7 @@ def html_to_paragraphs(html: str) -> List[List[str]]:
return [[get_text(line) for line in p] for p in paragraphs]
def format_content(content):
def format_content(content: str) -> Generator[str, None, None]:
"""Given a Status contents in HTML, converts it into lines of plain text.
Returns a generator yielding lines of content.
@ -73,25 +69,12 @@ def format_content(content):
first = False
def domain_exists(name):
try:
socket.gethostbyname(name)
return True
except OSError:
return False
def assert_domain_exists(domain):
if not domain_exists(domain):
raise ConsoleError("Domain {} not found".format(domain))
EOF_KEY = "Ctrl-Z" if os.name == 'nt' else "Ctrl-D"
def multiline_input():
def multiline_input() -> str:
"""Lets user input multiple lines of text, terminated by EOF."""
lines = []
lines: List[str] = []
while True:
try:
lines.append(input())

View File

@ -4,7 +4,7 @@ import os
from datetime import datetime, timezone
def parse_datetime(value):
def parse_datetime(value: str) -> datetime:
"""Returns an aware datetime in local timezone"""
dttm = datetime.strptime(value, "%Y-%m-%dT%H:%M:%S.%f%z")