Merge branch 'ihabunek:master' into asyncfix

This commit is contained in:
Daniel Schwarz 2023-01-30 09:16:19 -05:00 committed by GitHub
commit cb7cbd872a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 834 additions and 135 deletions

View File

@ -1,4 +1,4 @@
[flake8]
exclude=build,tests
exclude=build,tests,tmp,venv,toot/tui/scroll.py
ignore=E128
max-line-length=120

View File

@ -1,9 +1,9 @@
name: Run tests
on: [push]
on: [push, pull_request]
jobs:
build:
test:
# Older Ubuntu required for testing on Python 3.6 which is not available in
# later versions. Remove once support for 3.6 is dropped.
runs-on: ubuntu-20.04

View File

@ -3,6 +3,31 @@ Changelog
<!-- Do not edit. This file is automatically generated from changelog.yaml.-->
**0.33.1 (2023-01-03)**
* TUI: Fix crash when viewing toot in browser
**0.33.0 (2023-01-02)**
* Add CONTRIBUTING.md containing a contribution guide
* Add `env` command which prints local env to include in issues
* Add TOOT_POST_VISIBILITY environment to control default post visibility
(thanks Lim Ding Wen)
* Add `tags_followed`, `tags_follow`, and `tags_unfollow` commands (thanks
Daniel Schwarz)
* Add `tags_bookmarks` command (thanks Giuseppe Bilotta)
* TUI: Show an error if attemptint to boost a private status (thanks Lim Ding
Wen)
* TUI: Hide polls, cards and media attachments for sensitive posts (thanks
Daniel Schwarz)
* TUI: Add bookmarking and bookmark timeline (thanks Daniel Schwarz)
* TUI: Show status visiblity (thanks Lim Ding Wen)
* TUI: Reply to original account instead of boosting account (thanks Lim Ding
Wen)
* TUI: Refresh screen after exiting browser, required for text browsers (thanks
Daniel Schwarz)
* TUI: Highlight followed tags (thanks Daniel Schwarz)
**0.32.1 (2022-12-12)**
* Fix packaging issue, missing toot.utils module

View File

@ -80,18 +80,6 @@ pip install -r requirements-test.txt
While the virtual env is active, running `toot` will execute the one you checked
out. This allows you to make changes and test them.
Run tests:
```
pytest
```
Check code style:
```
flake8
```
#### Crafting good commits
Please put some effort into breaking your contribution up into a series of well
@ -114,8 +102,27 @@ Rules for commit messages:
* wrap the body at 72 characters
* use the body to explain what and why vs. how
If you use vim to write your commit messages, it will already enforce these
rules for you.
For a more detailed explanation with examples see the guide at
https://cbea.ms/git-commit/
If you use vim to write your commit messages, it will already enforce some of
these rules for you.
#### Run tests before submitting
You can run code and sytle tests by running:
```
make test
```
This runs three tools:
* `pytest` runs the test suite
* `flake8` checks code formatting
* `vermin` checks that minimum python version
Please ensure all three commands succeed before submitting your patches.
#### Submitting patches

View File

@ -9,10 +9,13 @@ publish :
test:
pytest -v
flake8
vermin --target=3.6 --no-tips --violations --exclude-regex venv/.* .
coverage:
coverage erase
coverage run
coverage html
coverage report
clean :

View File

@ -1,3 +1,24 @@
0.33.1:
date: 2023-01-03
changes:
- "TUI: Fix crash when viewing toot in browser"
0.33.0:
date: 2023-01-02
changes:
- "Add CONTRIBUTING.md containing a contribution guide"
- "Add `env` command which prints local env to include in issues"
- "Add TOOT_POST_VISIBILITY environment to control default post visibility (thanks Lim Ding Wen)"
- "Add `tags_followed`, `tags_follow`, and `tags_unfollow` commands (thanks Daniel Schwarz)"
- "Add `tags_bookmarks` command (thanks Giuseppe Bilotta)"
- "TUI: Show an error if attemptint to boost a private status (thanks Lim Ding Wen)"
- "TUI: Hide polls, cards and media attachments for sensitive posts (thanks Daniel Schwarz)"
- "TUI: Add bookmarking and bookmark timeline (thanks Daniel Schwarz)"
- "TUI: Show status visiblity (thanks Lim Ding Wen)"
- "TUI: Reply to original account instead of boosting account (thanks Lim Ding Wen)"
- "TUI: Refresh screen after exiting browser, required for text browsers (thanks Daniel Schwarz)"
- "TUI: Highlight followed tags (thanks Daniel Schwarz)"
0.32.1:
date: 2022-12-12
changes:

View File

@ -12,7 +12,7 @@ and blocking accounts and other actions.
setup(
name='toot',
version='0.32.1',
version='0.33.1',
description='Mastodon CLI client',
long_description=long_description.strip(),
author='Ivan Habunek',

View File

@ -431,6 +431,71 @@ def test_follow_not_found(run):
assert str(ex_info.value) == "Account not found"
def test_mute(app, user, friend, run):
out = run("mute", friend.username)
assert out == f"✓ You have muted {friend.username}"
[muted_account] = api.get_muted_accounts(app, user)
assert muted_account["acct"] == friend.username
out = run("unmute", friend.username)
assert out == f"{friend.username} is no longer muted"
assert api.get_muted_accounts(app, user) == []
def test_block(app, user, friend, run):
out = run("block", friend.username)
assert out == f"✓ You are now blocking {friend.username}"
[blockd_account] = api.get_blocked_accounts(app, user)
assert blockd_account["acct"] == friend.username
out = run("unblock", friend.username)
assert out == f"{friend.username} is no longer blocked"
assert api.get_blocked_accounts(app, user) == []
def test_following_followers(user, friend, run):
out = run("following", user.username)
assert out == ""
run("follow", friend.username)
out = run("following", user.username)
assert out == f"* @{friend.username}"
out = run("followers", friend.username)
assert out == f"* @{user.username}"
def test_tags(run):
out = run("tags_followed")
assert out == "You're not following any hashtags."
out = run("tags_follow", "foo")
assert out == "✓ You are now following #foo"
out = run("tags_followed")
assert out == "* #foo\thttp://localhost:3000/tags/foo"
out = run("tags_follow", "bar")
assert out == "✓ You are now following #bar"
out = run("tags_followed")
assert out == "\n".join([
"* #bar\thttp://localhost:3000/tags/bar",
"* #foo\thttp://localhost:3000/tags/foo",
])
out = run("tags_unfollow", "foo")
assert out == "✓ You are no longer following #foo"
out = run("tags_followed")
assert out == "* #bar\thttp://localhost:3000/tags/bar"
# ------------------------------------------------------------------------------
# Utils
# ------------------------------------------------------------------------------

View File

@ -1,6 +1,6 @@
from collections import namedtuple
__version__ = '0.32.1'
__version__ = '0.33.1'
App = namedtuple('App', ['instance', 'base_url', 'client_id', 'client_secret'])
User = namedtuple('User', ['instance', 'username', 'access_token'])

View File

@ -11,20 +11,17 @@ SCOPES = 'read write follow'
def _account_action(app, user, account, action):
url = '/api/v1/accounts/{}/{}'.format(account, action)
url = f"/api/v1/accounts/{account}/{action}"
return http.post(app, user, url).json()
def _status_action(app, user, status_id, action, data=None):
url = '/api/v1/statuses/{}/{}'.format(status_id, action)
url = f"/api/v1/statuses/{status_id}/{action}"
return http.post(app, user, url, data=data).json()
def _tag_action(app, user, tag_name, action):
url = '/api/v1/tags/{}/{}'.format(tag_name, action)
url = f"/api/v1/tags/{tag_name}/{action}"
return http.post(app, user, url).json()
def _status_toggle_action(app, user, status_id, action, data=None):
@ -46,7 +43,7 @@ def _status_toggle_action(app, user, status_id, action, data=None):
return response
def create_app(domain, scheme='https'):
url = '{}://{}/api/v1/apps'.format(scheme, domain)
url = f"{scheme}://{domain}/api/v1/apps"
json = {
'client_name': CLIENT_NAME,
@ -58,6 +55,14 @@ def create_app(domain, scheme='https'):
return http.anon_post(url, json=json).json()
def get_muted_accounts(app, user):
return http.get(app, user, "/api/v1/mutes").json()
def get_blocked_accounts(app, user):
return http.get(app, user, "/api/v1/blocks").json()
def register_account(app, username, email, password, locale="en", agreement=True):
"""
Register an account
@ -197,7 +202,7 @@ def delete_status(app, user, status_id):
Deletes a status with given ID.
https://github.com/tootsuite/documentation/blob/master/Using-the-API/API.md#deleting-a-status
"""
return http.delete(app, user, '/api/v1/statuses/{}'.format(status_id))
return http.delete(app, user, f"/api/v1/statuses/{status_id}")
def favourite(app, user, status_id):
@ -239,14 +244,12 @@ def translate(app, user, status_id):
def context(app, user, status_id):
url = '/api/v1/statuses/{}/context'.format(status_id)
url = f"/api/v1/statuses/{status_id}/context"
return http.get(app, user, url).json()
def reblogged_by(app, user, status_id):
url = '/api/v1/statuses/{}/reblogged_by'.format(status_id)
url = f"/api/v1/statuses/{status_id}/reblogged_by"
return http.get(app, user, url).json()
@ -267,7 +270,7 @@ def _timeline_generator(app, user, path, params=None):
def home_timeline_generator(app, user, limit=20):
path = '/api/v1/timelines/home?limit={}'.format(limit)
path = f"/api/v1/timelines/home?limit={limit}"
return _timeline_generator(app, user, path)
@ -278,7 +281,7 @@ def public_timeline_generator(app, user, local=False, limit=20):
def tag_timeline_generator(app, user, hashtag, local=False, limit=20):
path = '/api/v1/timelines/tag/{}'.format(quote(hashtag))
path = f"/api/v1/timelines/tag/{quote(hashtag)}"
params = {'local': str_bool(local), 'limit': limit}
return _timeline_generator(app, user, path, params)
@ -290,13 +293,13 @@ def bookmark_timeline_generator(app, user, limit=20):
def timeline_list_generator(app, user, list_id, limit=20):
path = '/api/v1/timelines/list/{}'.format(list_id)
path = f"/api/v1/timelines/list/{list_id}"
return _timeline_generator(app, user, path, {'limit': limit})
def _anon_timeline_generator(instance, path, params=None):
while path:
url = "https://{}{}".format(instance, path)
url = f"https://{instance}{path}"
response = http.anon_get(url, params)
yield response.json()
path = _get_next_path(response.headers)
@ -309,7 +312,7 @@ def anon_public_timeline_generator(instance, local=False, limit=20):
def anon_tag_timeline_generator(instance, hashtag, local=False, limit=20):
path = '/api/v1/timelines/tag/{}'.format(quote(hashtag))
path = f"/api/v1/timelines/tag/{quote(hashtag)}"
params = {'local': str_bool(local), 'limit': limit}
return _anon_timeline_generator(instance, path, params)
@ -359,12 +362,12 @@ def _get_response_list(app, user, path):
def following(app, user, account):
path = '/api/v1/accounts/{}/{}'.format(account, 'following')
path = f"/api/v1/accounts/{account}/following"
return _get_response_list(app, user, path)
def followers(app, user, account):
path = '/api/v1/accounts/{}/{}'.format(account, 'followers')
path = f"/api/v1/accounts/{account}/followers"
return _get_response_list(app, user, path)
@ -394,8 +397,7 @@ def verify_credentials(app, user):
def single_status(app, user, status_id):
url = '/api/v1/statuses/{}'.format(status_id)
url = f"/api/v1/statuses/{status_id}"
return http.get(app, user, url).json()
@ -409,5 +411,5 @@ def clear_notifications(app, user):
def get_instance(domain, scheme="https"):
url = "{}://{}/api/v1/instance".format(scheme, domain)
url = f"{scheme}://{domain}/api/v1/instance"
return http.anon_get(url).json()

View File

@ -39,8 +39,9 @@ def get_timeline_generator(app, user, args):
return api.home_timeline_generator(app, user, limit=args.count)
def timeline(app, user, args):
generator = get_timeline_generator(app, user, args)
def timeline(app, user, args, generator=None):
if not generator:
generator = get_timeline_generator(app, user, args)
while True:
try:
@ -197,6 +198,10 @@ def unbookmark(app, user, args):
print_out("<green>✓ Status unbookmarked</green>")
def bookmarks(app, user, args):
timeline(app, user, args, api.bookmark_timeline_generator(app, user, limit=args.count))
def reblogged_by(app, user, args):
for account in api.reblogged_by(app, user, args.status_id):
print_out("{}\n @{}".format(account['display_name'], account['acct']))
@ -412,4 +417,4 @@ def notifications(app, user, args):
def tui(app, user, args):
from .tui.app import TUI
TUI.create(app, user).run()
TUI.create(app, user, args).run()

View File

@ -116,6 +116,11 @@ common_args = [
"action": 'store_true',
"default": False,
}),
(["--verbose"], {
"help": "show extra detail in debug log; used with --debug",
"action": 'store_true',
"default": False,
}),
]
# Arguments added to commands which require authentication
@ -191,7 +196,7 @@ common_timeline_args = [
}),
]
timeline_args = common_timeline_args + [
timeline_and_bookmark_args = [
(["-c", "--count"], {
"type": timeline_count,
"help": "number of toots to show per page (1-20, default 10).",
@ -209,6 +214,8 @@ timeline_args = common_timeline_args + [
}),
]
timeline_args = common_timeline_args + timeline_and_bookmark_args
AUTH_COMMANDS = [
Command(
name="login",
@ -252,7 +259,13 @@ TUI_COMMANDS = [
Command(
name="tui",
description="Launches the toot terminal user interface",
arguments=[],
arguments=[
(["--relative-datetimes"], {
"action": "store_true",
"default": False,
"help": "Show relative datetimes in status list.",
}),
],
require_auth=True,
),
]
@ -340,6 +353,12 @@ READ_COMMANDS = [
arguments=timeline_args,
require_auth=True,
),
Command(
name="bookmarks",
description="Show bookmarked posts",
arguments=timeline_and_bookmark_args,
require_auth=True,
),
]
POST_COMMANDS = [
@ -665,7 +684,7 @@ def main():
command_name = sys.argv[1] if len(sys.argv) > 1 else None
args = sys.argv[2:]
if not command_name:
if not command_name or command_name == "--help":
return print_usage()
user, app = config.get_active_user_app()

View File

@ -6,6 +6,18 @@ from logging import getLogger
logger = getLogger('toot')
VERBOSE = "--verbose" in sys.argv
COLOR = "--no-color" not in sys.argv
if COLOR:
ANSI_RED = "\033[31m"
ANSI_GREEN = "\033[32m"
ANSI_YELLOW = "\033[33m"
ANSI_END_COLOR = "\033[0m"
else:
ANSI_RED = ""
ANSI_GREEN = ""
ANSI_YELLOW = ""
ANSI_END_COLOR = ""
def censor_secrets(headers):
@ -25,36 +37,38 @@ def truncate(line):
def log_request(request):
logger.debug(">>> \033[32m{} {}\033[0m".format(request.method, request.url))
logger.debug(f">>> {ANSI_GREEN}{request.method} {request.url}{ANSI_END_COLOR}")
if request.headers:
headers = censor_secrets(request.headers)
logger.debug(">>> HEADERS: \033[33m{}\033[0m".format(headers))
logger.debug(f">>> HEADERS: {ANSI_GREEN}{headers}{ANSI_END_COLOR}")
if request.data:
data = truncate(request.data)
logger.debug(">>> DATA: \033[33m{}\033[0m".format(data))
logger.debug(f">>> DATA: {ANSI_GREEN}{data}{ANSI_END_COLOR}")
if request.json:
data = truncate(json.dumps(request.json))
logger.debug(">>> JSON: \033[33m{}\033[0m".format(data))
logger.debug(f">>> JSON: {ANSI_GREEN}{data}{ANSI_END_COLOR}")
if request.files:
logger.debug(">>> FILES: \033[33m{}\033[0m".format(request.files))
logger.debug(f">>> FILES: {ANSI_GREEN}{request.files}{ANSI_END_COLOR}")
if request.params:
logger.debug(">>> PARAMS: \033[33m{}\033[0m".format(request.params))
logger.debug(f">>> PARAMS: {ANSI_GREEN}{request.params}{ANSI_END_COLOR}")
def log_response(response):
content = truncate(response.content.decode())
if response.ok:
logger.debug("<<< \033[32m{}\033[0m".format(response))
logger.debug("<<< \033[33m{}\033[0m".format(content))
logger.debug(f"<<< {ANSI_GREEN}{response}{ANSI_END_COLOR}")
logger.debug(f"<<< {ANSI_YELLOW}{content}{ANSI_END_COLOR}")
else:
logger.debug("<<< \033[31m{}\033[0m".format(response))
logger.debug("<<< \033[31m{}\033[0m".format(content))
logger.debug(f"<<< {ANSI_RED}{response}{ANSI_END_COLOR}")
logger.debug(f"<<< {ANSI_RED}{content}{ANSI_END_COLOR}")
def log_debug(*msgs):

View File

@ -199,7 +199,7 @@ def print_acct_list(accounts):
def print_tag_list(tags):
if tags:
for tag in tags:
print_out(f"* <green>#{tag['name']}\t</green> {tag['url']}")
print_out(f"* <green>#{tag['name']}\t</green>{tag['url']}")
else:
print_out("You're not following any hashtags.")

View File

@ -5,6 +5,7 @@ from concurrent.futures import ThreadPoolExecutor
from toot import api, config, __version__
from toot.console import get_default_visibility
from toot.exceptions import ApiError
from .compose import StatusComposer
from .constants import PALETTE
@ -72,10 +73,10 @@ class TUI(urwid.Frame):
"""Main TUI frame."""
@classmethod
def create(cls, app, user):
def create(cls, app, user, args):
"""Factory method, sets up TUI and an event loop."""
tui = cls(app, user)
tui = cls(app, user, args)
loop = urwid.MainLoop(
tui,
palette=PALETTE,
@ -86,9 +87,10 @@ class TUI(urwid.Frame):
return tui
def __init__(self, app, user):
def __init__(self, app, user, args):
self.app = app
self.user = user
self.args = args
self.config = config.load_config()
self.loop = None # set in `create`
@ -113,6 +115,7 @@ class TUI(urwid.Frame):
def run(self):
self.loop.set_alarm_in(0, lambda *args: self.async_load_instance())
self.loop.set_alarm_in(0, lambda *args: self.async_load_followed_tags())
self.loop.set_alarm_in(0, lambda *args: self.async_load_timeline(
is_initial=True, timeline_name="home"))
self.loop.run()
@ -197,6 +200,9 @@ class TUI(urwid.Frame):
def _zoom(timeline, status_details):
self.show_status_zoom(status_details)
def _clear(*args):
self.clear_screen()
urwid.connect_signal(timeline, "bookmark", self.async_toggle_bookmark)
urwid.connect_signal(timeline, "compose", _compose)
urwid.connect_signal(timeline, "delete", _delete)
@ -210,6 +216,7 @@ class TUI(urwid.Frame):
urwid.connect_signal(timeline, "links", _links)
urwid.connect_signal(timeline, "zoom", _zoom)
urwid.connect_signal(timeline, "translate", self.async_translate)
urwid.connect_signal(timeline, "clear-screen", _clear)
def build_timeline(self, name, statuses, local):
def _close(*args):
@ -236,7 +243,7 @@ class TUI(urwid.Frame):
self.loop.set_alarm_in(5, lambda *args: self.footer.clear_message())
config.save_config(self.config)
timeline = Timeline(name, statuses, self.can_translate)
timeline = Timeline(name, statuses, self.can_translate, self.followed_tags)
self.connect_default_timeline_signals(timeline)
urwid.connect_signal(timeline, "next", _next)
@ -265,8 +272,9 @@ class TUI(urwid.Frame):
statuses = ancestors + [status] + descendants
focus = len(ancestors)
timeline = Timeline("thread", statuses, self.can_translate, focus,
is_thread=True)
timeline = Timeline("thread", statuses, self.can_translate,
self.followed_tags, focus, is_thread=True)
self.connect_default_timeline_signals(timeline)
urwid.connect_signal(timeline, "close", _close)
@ -329,10 +337,27 @@ class TUI(urwid.Frame):
# this works for Mastodon and Pleroma version strings
# Mastodon versions < 4 do not have translation service
# Revisit this logic if Pleroma implements translation
self.can_translate = int(instance["version"][0]) > 3
ch = instance["version"][0]
self.can_translate = int(ch) > 3 if ch.isnumeric() else False
return self.run_in_thread(_load_instance, done_callback=_done)
def async_load_followed_tags(self):
def _load_tag_list():
try:
return api.followed_tags(self.app, self.user)
except ApiError:
# not supported by all Mastodon servers so fail silently if necessary
return []
def _done_tag_list(tags):
if len(tags) > 0:
self.followed_tags = [t["name"] for t in tags]
else:
self.followed_tags = []
self.run_in_thread(_load_tag_list, done_callback=_done_tag_list)
def refresh_footer(self, timeline):
"""Show status details in footer."""
status, index, count = timeline.get_focused_status_with_counts()
@ -347,16 +372,26 @@ class TUI(urwid.Frame):
title="Status source",
)
def clear_screen(self):
self.loop.screen.clear()
def show_links(self, status):
links = parse_content_links(status.data["content"]) if status else []
post_attachments = status.data["media_attachments"] or []
reblog_attachments = (status.data["reblog"]["media_attachments"] if status.data["reblog"] else None) or []
for a in post_attachments + reblog_attachments:
url = a["remote_url"] or a["url"]
links.append((url, a["description"] if a["description"] else url))
def _clear(*args):
self.clear_screen()
if links:
sl_widget = StatusLinks(links)
urwid.connect_signal(sl_widget, "clear-screen", _clear)
self.open_overlay(
widget=StatusLinks(links),
widget=sl_widget,
title="Status links",
options={"height": len(links) + 2},
)
@ -464,9 +499,7 @@ class TUI(urwid.Frame):
in_reply_to_id=in_reply_to_id)
status = self.make_status(data)
# TODO: instead of this, fetch new items from the timeline?
self.timeline.prepend_status(status)
self.timeline.focus_status(status)
# TODO: fetch new items from the timeline?
self.footer.set_message("Status posted {} \\o/".format(status.id))
self.close_overlay()

View File

@ -19,6 +19,7 @@ PALETTE = [
# Functional
('hashtag', 'light cyan,bold', ''),
('followed_hashtag', 'yellow,bold', ''),
('link', ',italics', ''),
('link_focused', ',italics', 'dark magenta'),
@ -36,6 +37,7 @@ PALETTE = [
('yellow_bold', 'yellow,bold', ''),
('red', 'dark red', ''),
('warning', 'light red', ''),
('white_bold', 'white,bold', '')
]
VISIBILITY_OPTIONS = [

View File

@ -30,17 +30,23 @@ class StatusZoom(urwid.ListBox):
class StatusLinks(urwid.ListBox):
"""Shows status links."""
signals = ["clear-screen"]
def __init__(self, links):
def widget(url, title):
return Button(title or url, on_press=lambda btn: webbrowser.open(url))
return Button(title or url, on_press=lambda btn: self.browse(url))
walker = urwid.SimpleFocusListWalker(
[widget(url, title) for url, title in links]
)
super().__init__(walker)
def browse(self, url):
webbrowser.open(url)
# force a screen refresh; necessary with console browsers
self._emit("clear-screen")
class ExceptionStackTrace(urwid.ListBox):
"""Shows an exception stack trace."""

426
toot/tui/scroll.py Normal file
View File

@ -0,0 +1,426 @@
# scroll.py
#
# Copied from the stig project by rndusr@github
# https://github.com/rndusr/stig
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details
# http://www.gnu.org/licenses/gpl-3.0.txt
import urwid
from urwid.widget import BOX, FIXED, FLOW
# Scroll actions
SCROLL_LINE_UP = 'line up'
SCROLL_LINE_DOWN = 'line down'
SCROLL_PAGE_UP = 'page up'
SCROLL_PAGE_DOWN = 'page down'
SCROLL_TO_TOP = 'to top'
SCROLL_TO_END = 'to end'
# Scrollbar positions
SCROLLBAR_LEFT = 'left'
SCROLLBAR_RIGHT = 'right'
class Scrollable(urwid.WidgetDecoration):
def sizing(self):
return frozenset([BOX,])
def selectable(self):
return True
def __init__(self, widget):
"""Box widget that makes a fixed or flow widget vertically scrollable
TODO: Focusable widgets are handled, including switching focus, but
possibly not intuitively, depending on the arrangement of widgets. When
switching focus to a widget that is outside of the visible part of the
original widget, the canvas scrolls up/down to the focused widget. It
would be better to scroll until the next focusable widget is in sight
first. But for that to work we must somehow obtain a list of focusable
rows in the original canvas.
"""
if not any(s in widget.sizing() for s in (FIXED, FLOW)):
raise ValueError('Not a fixed or flow widget: %r' % widget)
self._trim_top = 0
self._scroll_action = None
self._forward_keypress = None
self._old_cursor_coords = None
self._rows_max_cached = 0
self.__super.__init__(widget)
def render(self, size, focus=False):
maxcol, maxrow = size
# Render complete original widget
ow = self._original_widget
ow_size = self._get_original_widget_size(size)
canv_full = ow.render(ow_size, focus)
# Make full canvas editable
canv = urwid.CompositeCanvas(canv_full)
canv_cols, canv_rows = canv.cols(), canv.rows()
if canv_cols <= maxcol:
pad_width = maxcol - canv_cols
if pad_width > 0:
# Canvas is narrower than available horizontal space
canv.pad_trim_left_right(0, pad_width)
if canv_rows <= maxrow:
fill_height = maxrow - canv_rows
if fill_height > 0:
# Canvas is lower than available vertical space
canv.pad_trim_top_bottom(0, fill_height)
if canv_cols <= maxcol and canv_rows <= maxrow:
# Canvas is small enough to fit without trimming
return canv
self._adjust_trim_top(canv, size)
# Trim canvas if necessary
trim_top = self._trim_top
trim_end = canv_rows - maxrow - trim_top
trim_right = canv_cols - maxcol
if trim_top > 0:
canv.trim(trim_top)
if trim_end > 0:
canv.trim_end(trim_end)
if trim_right > 0:
canv.pad_trim_left_right(0, -trim_right)
# Disable cursor display if cursor is outside of visible canvas parts
if canv.cursor is not None:
curscol, cursrow = canv.cursor
if cursrow >= maxrow or cursrow < 0:
canv.cursor = None
# Figure out whether we should forward keypresses to original widget
if canv.cursor is not None:
# Trimmed canvas contains the cursor, e.g. in an Edit widget
self._forward_keypress = True
else:
if canv_full.cursor is not None:
# Full canvas contains the cursor, but scrolled out of view
self._forward_keypress = False
else:
# Original widget does not have a cursor, but may be selectable
# FIXME: Using ow.selectable() is bad because the original
# widget may be selectable because it's a container widget with
# a key-grabbing widget that is scrolled out of view.
# ow.selectable() returns True anyway because it doesn't know
# how we trimmed our canvas.
#
# To fix this, we need to resolve ow.focus and somehow
# ask canv whether it contains bits of the focused widget. I
# can't see a way to do that.
if ow.selectable():
self._forward_keypress = True
else:
self._forward_keypress = False
return canv
def keypress(self, size, key):
# Maybe offer key to original widget
if self._forward_keypress:
ow = self._original_widget
ow_size = self._get_original_widget_size(size)
# Remember previous cursor position if possible
if hasattr(ow, 'get_cursor_coords'):
self._old_cursor_coords = ow.get_cursor_coords(ow_size)
key = ow.keypress(ow_size, key)
if key is None:
return None
# Handle up/down, page up/down, etc
command_map = self._command_map
if command_map[key] == urwid.CURSOR_UP:
self._scroll_action = SCROLL_LINE_UP
elif command_map[key] == urwid.CURSOR_DOWN:
self._scroll_action = SCROLL_LINE_DOWN
elif command_map[key] == urwid.CURSOR_PAGE_UP:
self._scroll_action = SCROLL_PAGE_UP
elif command_map[key] == urwid.CURSOR_PAGE_DOWN:
self._scroll_action = SCROLL_PAGE_DOWN
elif command_map[key] == urwid.CURSOR_MAX_LEFT: # 'home'
self._scroll_action = SCROLL_TO_TOP
elif command_map[key] == urwid.CURSOR_MAX_RIGHT: # 'end'
self._scroll_action = SCROLL_TO_END
else:
return key
self._invalidate()
def mouse_event(self, size, event, button, col, row, focus):
ow = self._original_widget
if hasattr(ow, 'mouse_event'):
ow_size = self._get_original_widget_size(size)
row += self._trim_top
return ow.mouse_event(ow_size, event, button, col, row, focus)
else:
return False
def _adjust_trim_top(self, canv, size):
"""Adjust self._trim_top according to self._scroll_action"""
action = self._scroll_action
self._scroll_action = None
maxcol, maxrow = size
trim_top = self._trim_top
canv_rows = canv.rows()
if trim_top < 0:
# Negative trim_top values use bottom of canvas as reference
trim_top = canv_rows - maxrow + trim_top + 1
if canv_rows <= maxrow:
self._trim_top = 0 # Reset scroll position
return
def ensure_bounds(new_trim_top):
return max(0, min(canv_rows - maxrow, new_trim_top))
if action == SCROLL_LINE_UP:
self._trim_top = ensure_bounds(trim_top - 1)
elif action == SCROLL_LINE_DOWN:
self._trim_top = ensure_bounds(trim_top + 1)
elif action == SCROLL_PAGE_UP:
self._trim_top = ensure_bounds(trim_top - maxrow + 1)
elif action == SCROLL_PAGE_DOWN:
self._trim_top = ensure_bounds(trim_top + maxrow - 1)
elif action == SCROLL_TO_TOP:
self._trim_top = 0
elif action == SCROLL_TO_END:
self._trim_top = canv_rows - maxrow
else:
self._trim_top = ensure_bounds(trim_top)
# If the cursor was moved by the most recent keypress, adjust trim_top
# so that the new cursor position is within the displayed canvas part.
# But don't do this if the cursor is at the top/bottom edge so we can still scroll out
if self._old_cursor_coords is not None and self._old_cursor_coords != canv.cursor:
self._old_cursor_coords = None
curscol, cursrow = canv.cursor
if cursrow < self._trim_top:
self._trim_top = cursrow
elif cursrow >= self._trim_top + maxrow:
self._trim_top = max(0, cursrow - maxrow + 1)
def _get_original_widget_size(self, size):
ow = self._original_widget
sizing = ow.sizing()
if FIXED in sizing:
return ()
elif FLOW in sizing:
return (size[0],)
def get_scrollpos(self, size=None, focus=False):
"""Current scrolling position
Lower limit is 0, upper limit is the maximum number of rows with the
given maxcol minus maxrow.
NOTE: The returned value may be too low or too high if the position has
changed but the widget wasn't rendered yet.
"""
return self._trim_top
def set_scrollpos(self, position):
"""Set scrolling position
If `position` is positive it is interpreted as lines from the top.
If `position` is negative it is interpreted as lines from the bottom.
Values that are too high or too low values are automatically adjusted
during rendering.
"""
self._trim_top = int(position)
self._invalidate()
def rows_max(self, size=None, focus=False):
"""Return the number of rows for `size`
If `size` is not given, the currently rendered number of rows is returned.
"""
if size is not None:
ow = self._original_widget
ow_size = self._get_original_widget_size(size)
sizing = ow.sizing()
if FIXED in sizing:
self._rows_max_cached = ow.pack(ow_size, focus)[1]
elif FLOW in sizing:
self._rows_max_cached = ow.rows(ow_size, focus)
else:
raise RuntimeError('Not a flow/box widget: %r' % self._original_widget)
return self._rows_max_cached
class ScrollBar(urwid.WidgetDecoration):
def sizing(self):
return frozenset((BOX,))
def selectable(self):
return True
def __init__(self, widget, thumb_char=u'\u2588', trough_char=' ',
side=SCROLLBAR_RIGHT, width=1):
"""Box widget that adds a scrollbar to `widget`
`widget` must be a box widget with the following methods:
- `get_scrollpos` takes the arguments `size` and `focus` and returns
the index of the first visible row.
- `set_scrollpos` (optional; needed for mouse click support) takes the
index of the first visible row.
- `rows_max` takes `size` and `focus` and returns the total number of
rows `widget` can render.
`thumb_char` is the character used for the scrollbar handle.
`trough_char` is used for the space above and below the handle.
`side` must be 'left' or 'right'.
`width` specifies the number of columns the scrollbar uses.
"""
if BOX not in widget.sizing():
raise ValueError('Not a box widget: %r' % widget)
self.__super.__init__(widget)
self._thumb_char = thumb_char
self._trough_char = trough_char
self.scrollbar_side = side
self.scrollbar_width = max(1, width)
self._original_widget_size = (0, 0)
def render(self, size, focus=False):
maxcol, maxrow = size
sb_width = self._scrollbar_width
ow_size = (max(0, maxcol - sb_width), maxrow)
sb_width = maxcol - ow_size[0]
ow = self._original_widget
ow_base = self.scrolling_base_widget
ow_rows_max = ow_base.rows_max(size, focus)
if ow_rows_max <= maxrow:
# Canvas fits without scrolling - no scrollbar needed
self._original_widget_size = size
return ow.render(size, focus)
ow_rows_max = ow_base.rows_max(ow_size, focus)
ow_canv = ow.render(ow_size, focus)
self._original_widget_size = ow_size
pos = ow_base.get_scrollpos(ow_size, focus)
posmax = ow_rows_max - maxrow
# Thumb shrinks/grows according to the ratio of
# <number of visible lines> / <number of total lines>
thumb_weight = min(1, maxrow / max(1, ow_rows_max))
thumb_height = max(1, round(thumb_weight * maxrow))
# Thumb may only touch top/bottom if the first/last row is visible
top_weight = float(pos) / max(1, posmax)
top_height = int((maxrow - thumb_height) * top_weight)
if top_height == 0 and top_weight > 0:
top_height = 1
# Bottom part is remaining space
bottom_height = maxrow - thumb_height - top_height
assert thumb_height + top_height + bottom_height == maxrow
# Create scrollbar canvas
# Creating SolidCanvases of correct height may result in "cviews do not
# fill gaps in shard_tail!" or "cviews overflow gaps in shard_tail!"
# exceptions. Stacking the same SolidCanvas is a workaround.
# https://github.com/urwid/urwid/issues/226#issuecomment-437176837
top = urwid.SolidCanvas(self._trough_char, sb_width, 1)
thumb = urwid.SolidCanvas(self._thumb_char, sb_width, 1)
bottom = urwid.SolidCanvas(self._trough_char, sb_width, 1)
sb_canv = urwid.CanvasCombine(
[(top, None, False)] * top_height +
[(thumb, None, False)] * thumb_height +
[(bottom, None, False)] * bottom_height,
)
combinelist = [(ow_canv, None, True, ow_size[0]),
(sb_canv, None, False, sb_width)]
if self._scrollbar_side != SCROLLBAR_LEFT:
return urwid.CanvasJoin(combinelist)
else:
return urwid.CanvasJoin(reversed(combinelist))
@property
def scrollbar_width(self):
"""Columns the scrollbar uses"""
return max(1, self._scrollbar_width)
@scrollbar_width.setter
def scrollbar_width(self, width):
self._scrollbar_width = max(1, int(width))
self._invalidate()
@property
def scrollbar_side(self):
"""Where to display the scrollbar; must be 'left' or 'right'"""
return self._scrollbar_side
@scrollbar_side.setter
def scrollbar_side(self, side):
if side not in (SCROLLBAR_LEFT, SCROLLBAR_RIGHT):
raise ValueError('scrollbar_side must be "left" or "right", not %r' % side)
self._scrollbar_side = side
self._invalidate()
@property
def scrolling_base_widget(self):
"""Nearest `original_widget` that is compatible with the scrolling API"""
def orig_iter(w):
while hasattr(w, 'original_widget'):
w = w.original_widget
yield w
yield w
def is_scrolling_widget(w):
return hasattr(w, 'get_scrollpos') and hasattr(w, 'rows_max')
for w in orig_iter(self):
if is_scrolling_widget(w):
return w
raise ValueError('Not compatible to be wrapped by ScrollBar: %r' % w)
def keypress(self, size, key):
return self._original_widget.keypress(self._original_widget_size, key)
def mouse_event(self, size, event, button, col, row, focus):
ow = self._original_widget
ow_size = self._original_widget_size
handled = False
if hasattr(ow, 'mouse_event'):
handled = ow.mouse_event(ow_size, event, button, col, row, focus)
if not handled and hasattr(ow, 'set_scrollpos'):
if button == 4: # scroll wheel up
pos = ow.get_scrollpos(ow_size)
ow.set_scrollpos(pos - 1)
return True
elif button == 5: # scroll wheel down
pos = ow.get_scrollpos(ow_size)
ow.set_scrollpos(pos + 1)
return True
return False

View File

@ -1,12 +1,17 @@
import logging
import sys
import urwid
import webbrowser
from toot.utils import format_content
from toot.utils.language import language_name
from typing import Optional
from .entities import Status
from .scroll import Scrollable, ScrollBar
from .utils import highlight_hashtags, parse_datetime, highlight_keys
from .widgets import SelectableText, SelectableColumns
from toot.utils import format_content
from toot.utils.language import language_name
from toot.tui.utils import time_ago
logger = logging.getLogger("toot")
@ -16,42 +21,62 @@ class Timeline(urwid.Columns):
Displays a list of statuses to the left, and status details on the right.
"""
signals = [
"close", # Close thread
"compose", # Compose a new toot
"delete", # Delete own status
"favourite", # Favourite status
"focus", # Focus changed
"bookmark", # Bookmark status
"media", # Display media attachments
"menu", # Show a context menu
"next", # Fetch more statuses
"reblog", # Reblog status
"reply", # Compose a reply to a status
"source", # Show status source
"links", # Show status links
"thread", # Show thread for status
"translate", # Translate status
"save", # Save current timeline
"zoom", # Open status in scrollable popup window
"close", # Close thread
"compose", # Compose a new toot
"delete", # Delete own status
"favourite", # Favourite status
"focus", # Focus changed
"bookmark", # Bookmark status
"media", # Display media attachments
"menu", # Show a context menu
"next", # Fetch more statuses
"reblog", # Reblog status
"reply", # Compose a reply to a status
"source", # Show status source
"links", # Show status links
"thread", # Show thread for status
"translate", # Translate status
"save", # Save current timeline
"zoom", # Open status in scrollable popup window
"clear-screen", # Clear the screen (used internally)
]
def __init__(self, name, statuses, can_translate, focus=0, is_thread=False):
def __init__(self, name, statuses, can_translate, followed_tags=[], focus=0, is_thread=False):
self.name = name
self.is_thread = is_thread
self.statuses = statuses
self.can_translate = can_translate
self.status_list = self.build_status_list(statuses, focus=focus)
self.followed_tags = followed_tags
try:
self.status_details = StatusDetails(statuses[focus], is_thread, can_translate)
focused_status = statuses[focus]
except IndexError:
self.status_details = StatusDetails(None, is_thread, can_translate)
focused_status = None
self.status_details = StatusDetails(self, focused_status)
status_widget = self.wrap_status_details(self.status_details)
super().__init__([
("weight", 40, self.status_list),
("weight", 0, urwid.AttrWrap(urwid.SolidFill(""), "blue_selected")),
("weight", 60, urwid.Padding(self.status_details, left=1)),
("weight", 60, status_widget),
])
def wrap_status_details(self, status_details: "StatusDetails") -> urwid.Widget:
"""Wrap StatusDetails widget with a scollbar and footer."""
return urwid.Padding(
urwid.Frame(
body=ScrollBar(
Scrollable(urwid.Padding(status_details, right=1)),
thumb_char="\u2588",
trough_char="\u2591",
),
footer=self.get_option_text(status_details.status),
),
left=1
)
def build_status_list(self, statuses, focus):
items = [self.build_list_item(status) for status in statuses]
walker = urwid.SimpleFocusListWalker(items)
@ -72,6 +97,28 @@ class Timeline(urwid.Columns):
None: "green_selected",
})
def get_option_text(self, status: Optional[Status]) -> Optional[urwid.Text]:
if not status:
return None
options = [
"[B]oost",
"[D]elete" if status.is_mine else "",
"B[o]okmark",
"[F]avourite",
"[V]iew",
"[T]hread" if not self.is_thread else "",
"[L]inks",
"[R]eply",
"So[u]rce",
"[Z]oom",
"Tra[n]slate" if self.can_translate else "",
"[H]elp",
]
options = "\n" + " ".join(o for o in options if o)
options = highlight_keys(options, "white_bold", "cyan")
return urwid.Text(options)
def get_focused_status(self):
try:
return self.statuses[self.status_list.body.focus]
@ -102,8 +149,9 @@ class Timeline(urwid.Columns):
self.draw_status_details(status)
def draw_status_details(self, status):
self.status_details = StatusDetails(status, self.is_thread, self.can_translate)
self.contents[2] = urwid.Padding(self.status_details, left=1), ("weight", 60, False)
self.status_details = StatusDetails(self, status)
widget = self.wrap_status_details(self.status_details)
self.contents[2] = widget, ("weight", 60, False)
def keypress(self, size, key):
status = self.get_focused_status()
@ -182,6 +230,8 @@ class Timeline(urwid.Columns):
if key in ("v", "V"):
if status.original.url:
webbrowser.open(status.original.url)
# force a screen refresh; necessary with console browsers
self._emit("clear-screen")
return
if key in ("p", "P"):
@ -242,18 +292,10 @@ class Timeline(urwid.Columns):
class StatusDetails(urwid.Pile):
def __init__(self, status, in_thread, can_translate=False):
"""
Parameters
----------
status : Status
The status to render.
def __init__(self, timeline: Timeline, status: Optional[Status]):
self.status = status
self.followed_tags = timeline.followed_tags
in_thread : bool
Whether the status is rendered from a thread status list.
"""
self.in_thread = in_thread
self.can_translate = can_translate
reblogged_by = status.author if status and status.reblog else None
widget_list = list(self.content_generator(status.original, reblogged_by)
if status else ())
@ -281,7 +323,7 @@ class StatusDetails(urwid.Pile):
else:
content = status.translation if status.show_translation else status.data["content"]
for line in format_content(content):
yield ("pack", urwid.Text(highlight_hashtags(line)))
yield ("pack", urwid.Text(highlight_hashtags(line, self.followed_tags)))
media = status.data["media_attachments"]
if media:
@ -324,36 +366,18 @@ class StatusDetails(urwid.Pile):
visibility_color = visibility_colors.get(status.visibility, "gray")
yield ("pack", urwid.Text([
("red", "🠷 ") if status.bookmarked else "",
("blue", f"{status.created_at.strftime('%Y-%m-%d %H:%M')} "),
("red" if status.bookmarked else "gray", "🠷 "),
("gray", f"{status.data['replies_count']} "),
("yellow" if status.reblogged else "gray", f"{status.data['reblogs_count']} "),
("yellow" if status.favourited else "gray", f"{status.data['favourites_count']}"),
(visibility_color, f" · {visibility}"),
("yellow", f" · Translated from {translated_from} ") if translated_from else "",
("yellow", f" · Translated from {translated_from} " if translated_from else ""),
("gray", f" · {application}" if application else ""),
]))
# Push things to bottom
yield ("weight", 1, urwid.SolidFill(" "))
options = [
"[B]oost",
"[D]elete" if status.is_mine else "",
"[F]avourite",
"B[o]okmark",
"[V]iew",
"[T]hread" if not self.in_thread else "",
"[L]inks",
"[R]eply",
"So[u]rce",
"[Z]oom",
"Tra[n]slate" if self.can_translate else "",
"[H]elp",
]
options = " ".join(o for o in options if o)
options = highlight_keys(options, "cyan_bold", "cyan")
yield ("pack", urwid.Text(options))
yield ("weight", 1, urwid.BoxAdapter(urwid.SolidFill(" "), 1))
def build_linebox(self, contents):
contents = urwid.Pile(list(contents))
@ -397,7 +421,17 @@ class StatusDetails(urwid.Pile):
class StatusListItem(SelectableColumns):
def __init__(self, status):
created_at = status.created_at.strftime("%Y-%m-%d %H:%M")
edited = status.data["edited_at"]
# TODO: hacky implementation to avoid creating conflicts for existing
# pull reuqests, refactor when merged.
created_at = (
time_ago(status.created_at).ljust(3, " ")
if "--relative-datetimes" in sys.argv
else status.created_at.strftime("%Y-%m-%d %H:%M")
)
edited_flag = "*" if edited else " "
favourited = ("yellow", "") if status.original.favourited else " "
reblogged = ("yellow", "") if status.original.reblogged else " "
is_reblog = ("cyan", "") if status.reblog else " "
@ -405,6 +439,7 @@ class StatusListItem(SelectableColumns):
return super().__init__([
("pack", SelectableText(("blue", created_at), wrap="clip")),
("pack", urwid.Text(("blue", edited_flag))),
("pack", urwid.Text(" ")),
("pack", urwid.Text(favourited)),
("pack", urwid.Text(" ")),

View File

@ -1,4 +1,5 @@
from html.parser import HTMLParser
import math
import os
import re
import shutil
@ -7,6 +8,11 @@ import subprocess
from datetime import datetime, timezone
HASHTAG_PATTERN = re.compile(r'(?<!\w)(#\w+)\b')
SECOND = 1
MINUTE = SECOND * 60
HOUR = MINUTE * 60
DAY = HOUR * 24
WEEK = DAY * 7
def parse_datetime(value):
@ -27,6 +33,28 @@ def parse_datetime(value):
return dttm.astimezone()
def time_ago(value: datetime) -> datetime:
now = datetime.now().astimezone()
delta = now.timestamp() - value.timestamp()
if (delta < 1):
return "now"
if (delta < 8 * DAY):
if (delta < MINUTE):
return f"{math.floor(delta / SECOND)}".rjust(2, " ") + "s"
if (delta < HOUR):
return f"{math.floor(delta / MINUTE)}".rjust(2, " ") + "m"
if (delta < DAY):
return f"{math.floor(delta / HOUR)}".rjust(2, " ") + "h"
return f"{math.floor(delta / DAY)}".rjust(2, " ") + "d"
if (delta < 53 * WEEK): # not exactly correct but good enough as a boundary
return f"{math.floor(delta / WEEK)}".rjust(2, " ") + "w"
return ">1y"
def highlight_keys(text, high_attr, low_attr=""):
"""
Takes a string and adds high_attr attribute to parts in square brackets,
@ -51,11 +79,19 @@ def highlight_keys(text, high_attr, low_attr=""):
return list(_gen())
def highlight_hashtags(line, attr="hashtag"):
return [
(attr, p) if p.startswith("#") else p
for p in re.split(HASHTAG_PATTERN, line)
]
def highlight_hashtags(line, followed_tags, attr="hashtag", followed_attr="followed_hashtag"):
hline = []
for p in re.split(HASHTAG_PATTERN, line):
if p.startswith("#"):
if p[1:].lower() in (t.lower() for t in followed_tags):
hline.append((followed_attr, p))
else:
hline.append((attr, p))
else:
hline.append(p)
return hline
def show_media(paths):