Merge branch 'asyncfix' of https://github.com/danschwarz/toot into asyncfix

This commit is contained in:
Daniel Schwarz 2024-03-05 19:58:18 -05:00
commit a5cd9d343c
29 changed files with 1232 additions and 207 deletions

View File

@ -1,3 +1,4 @@
[flake8]
max-line-length=100
exclude=build,tests,tmp,venv,toot/tui/scroll.py
ignore=E128
max-line-length=120

View File

@ -1,9 +1,9 @@
name: Run tests
on: [push]
on: [push, pull_request]
jobs:
build:
test:
# Older Ubuntu required for testing on Python 3.6 which is not available in
# later versions. Remove once support for 3.6 is dropped.
runs-on: ubuntu-20.04
@ -28,3 +28,6 @@ jobs:
- name: Validate minimum required version
run: |
vermin --target=3.6 --no-tips .
- name: Check style
run: |
flake8

View File

@ -3,6 +3,31 @@ Changelog
<!-- Do not edit. This file is automatically generated from changelog.yaml.-->
**0.33.1 (2023-01-03)**
* TUI: Fix crash when viewing toot in browser
**0.33.0 (2023-01-02)**
* Add CONTRIBUTING.md containing a contribution guide
* Add `env` command which prints local env to include in issues
* Add TOOT_POST_VISIBILITY environment to control default post visibility
(thanks Lim Ding Wen)
* Add `tags_followed`, `tags_follow`, and `tags_unfollow` commands (thanks
Daniel Schwarz)
* Add `tags_bookmarks` command (thanks Giuseppe Bilotta)
* TUI: Show an error if attemptint to boost a private status (thanks Lim Ding
Wen)
* TUI: Hide polls, cards and media attachments for sensitive posts (thanks
Daniel Schwarz)
* TUI: Add bookmarking and bookmark timeline (thanks Daniel Schwarz)
* TUI: Show status visiblity (thanks Lim Ding Wen)
* TUI: Reply to original account instead of boosting account (thanks Lim Ding
Wen)
* TUI: Refresh screen after exiting browser, required for text browsers (thanks
Daniel Schwarz)
* TUI: Highlight followed tags (thanks Daniel Schwarz)
**0.32.1 (2022-12-12)**
* Fix packaging issue, missing toot.utils module

142
CONTRIBUTING.md Normal file
View File

@ -0,0 +1,142 @@
Toot contribution guide
=======================
Firstly, thank you for contributing to toot!
Relevant links which will be referenced below:
* [toot documentation](https://toot.readthedocs.io/)
* [toot-discuss mailing list](https://lists.sr.ht/~ihabunek/toot-discuss)
used for discussion as well as accepting patches
* [toot project on github](https://github.com/ihabunek/toot)
here you can report issues and submit pull requests
* #toot IRC channel on [libera.chat](https://libera.chat)
## Code of conduct
Please be kind and patient. Toot is governed by one human with a full time job.
## I have a question
First, check if your question is addressed in the documentation or the mailing
list. If not, feel free to send an email to the mailing list. You may want to
subscribe to the mailing list to receive replies.
Alternatively, you can ask your question on the IRC channel and ping me
(ihabunek). You may have to wait for a response, please be patient.
Please don't open Github issues for questions.
## I want to contribute
### Reporting a bug
First check you're using the
[latest version](https://github.com/ihabunek/toot/releases/) of toot and verify
the bug is present in this version.
Search Github issues to check the bug hasn't already been reported.
To report a bug open an
[issue on Github](https://github.com/ihabunek/toot/issues) or send an
email to the [mailing list](https://lists.sr.ht/~ihabunek/toot-discuss).
* Run `toot env` and include its contents in the bug report.
* Explain the behavior you would expect and the actual behavior.
* Please provide as much context as possible and describe the reproduction steps
that someone else can follow to recreate the issue on their own.
### Suggesting enhancements
This includes suggesting new features or changes to existing ones.
Search Github issues to check the enhancement has not already been requested. If
it hasn't, [open a new issue](https://github.com/ihabunek/toot/issues).
Your request will be reviewed to see if it's a good fit for toot. Implementing
requested features depends on the available time and energy of the maintainer
and other contributors. Be patient.
### Contributing code
When contributing to toot, please only submit code that you have authored or
code whose license allows it to be included in toot. You agree that the code
you submit will be published under the [toot license](LICENSE).
#### Setting up a dev environment
Check out toot (or a fork) and install it into a virtual environment.
```
git clone git@github.com:ihabunek/toot.git
cd toot
python3 -m venv _env
source _env/bin/activate
pip install --editable .
pip install -r requirements-dev.txt
pip install -r requirements-test.txt
```
While the virtual env is active, running `toot` will execute the one you checked
out. This allows you to make changes and test them.
#### Crafting good commits
Please put some effort into breaking your contribution up into a series of well
formed commits. If you're unsure what this means, there is a good guide
available at https://cbea.ms/git-commit/.
Rules for commits:
* each commit should ideally contain only one change
* don't bundle multiple unrelated changes into a single commit
* write descriptive and well formatted commit messages
Rules for commit messages:
* separate subject from body with a blank line
* limit the subject line to 50 characters
* capitalize the subject line
* do not end the subject line with a period
* use the imperative mood in the subject line
* wrap the body at 72 characters
* use the body to explain what and why vs. how
For a more detailed explanation with examples see the guide at
https://cbea.ms/git-commit/
If you use vim to write your commit messages, it will already enforce some of
these rules for you.
#### Run tests before submitting
You can run code and sytle tests by running:
```
make test
```
This runs three tools:
* `pytest` runs the test suite
* `flake8` checks code formatting
* `vermin` checks that minimum python version
Please ensure all three commands succeed before submitting your patches.
#### Submitting patches
To submit your code either open
[a pull request](https://github.com/ihabunek/toot/pulls) on Github, or send
patch(es) to [the mailing list](https://lists.sr.ht/~ihabunek/toot-discuss).
If sending to the mailing list, patches should be sent using `git send-email`.
If you're unsure how to do this, there is a good guide at
https://git-send-email.io/.
---
Parts of this guide were taken from the following sources:
* https://contributing.md/
* https://cbea.ms/git-commit/

View File

@ -9,10 +9,13 @@ publish :
test:
pytest -v
flake8
vermin --target=3.6 --no-tips --violations --exclude-regex venv/.* .
coverage:
coverage erase
coverage run
coverage html
coverage report
clean :

View File

@ -1,3 +1,24 @@
0.33.1:
date: 2023-01-03
changes:
- "TUI: Fix crash when viewing toot in browser"
0.33.0:
date: 2023-01-02
changes:
- "Add CONTRIBUTING.md containing a contribution guide"
- "Add `env` command which prints local env to include in issues"
- "Add TOOT_POST_VISIBILITY environment to control default post visibility (thanks Lim Ding Wen)"
- "Add `tags_followed`, `tags_follow`, and `tags_unfollow` commands (thanks Daniel Schwarz)"
- "Add `tags_bookmarks` command (thanks Giuseppe Bilotta)"
- "TUI: Show an error if attemptint to boost a private status (thanks Lim Ding Wen)"
- "TUI: Hide polls, cards and media attachments for sensitive posts (thanks Daniel Schwarz)"
- "TUI: Add bookmarking and bookmark timeline (thanks Daniel Schwarz)"
- "TUI: Show status visiblity (thanks Lim Ding Wen)"
- "TUI: Reply to original account instead of boosting account (thanks Lim Ding Wen)"
- "TUI: Refresh screen after exiting browser, required for text browsers (thanks Daniel Schwarz)"
- "TUI: Highlight followed tags (thanks Daniel Schwarz)"
0.32.1:
date: 2022-12-12
changes:

View File

@ -1,5 +1,3 @@
# -*- coding: utf-8 -*-
from datetime import datetime
# -- Project information -----------------------------------------------------

View File

@ -1,3 +1,4 @@
flake8
psycopg2-binary
pytest
pytest-xdist[psutil]

View File

@ -12,7 +12,7 @@ and blocking accounts and other actions.
setup(
name='toot',
version='0.32.1',
version='0.33.1',
description='Mastodon CLI client',
long_description=long_description.strip(),
author='Ivan Habunek',

View File

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
import pytest
from unittest import mock

View File

@ -1,5 +1,3 @@
# -*- coding: utf-8 -*-
from toot import App, User, api, config, auth
from tests.utils import retval

View File

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
import io
import pytest
import re

View File

@ -431,6 +431,71 @@ def test_follow_not_found(run):
assert str(ex_info.value) == "Account not found"
def test_mute(app, user, friend, run):
out = run("mute", friend.username)
assert out == f"✓ You have muted {friend.username}"
[muted_account] = api.get_muted_accounts(app, user)
assert muted_account["acct"] == friend.username
out = run("unmute", friend.username)
assert out == f"{friend.username} is no longer muted"
assert api.get_muted_accounts(app, user) == []
def test_block(app, user, friend, run):
out = run("block", friend.username)
assert out == f"✓ You are now blocking {friend.username}"
[blockd_account] = api.get_blocked_accounts(app, user)
assert blockd_account["acct"] == friend.username
out = run("unblock", friend.username)
assert out == f"{friend.username} is no longer blocked"
assert api.get_blocked_accounts(app, user) == []
def test_following_followers(user, friend, run):
out = run("following", user.username)
assert out == ""
run("follow", friend.username)
out = run("following", user.username)
assert out == f"* @{friend.username}"
out = run("followers", friend.username)
assert out == f"* @{user.username}"
def test_tags(run):
out = run("tags_followed")
assert out == "You're not following any hashtags."
out = run("tags_follow", "foo")
assert out == "✓ You are now following #foo"
out = run("tags_followed")
assert out == "* #foo\thttp://localhost:3000/tags/foo"
out = run("tags_follow", "bar")
assert out == "✓ You are now following #bar"
out = run("tags_followed")
assert out == "\n".join([
"* #bar\thttp://localhost:3000/tags/bar",
"* #foo\thttp://localhost:3000/tags/foo",
])
out = run("tags_unfollow", "foo")
assert out == "✓ You are no longer following #foo"
out = run("tags_followed")
assert out == "* #bar\thttp://localhost:3000/tags/bar"
# ------------------------------------------------------------------------------
# Utils
# ------------------------------------------------------------------------------

View File

@ -1,8 +1,6 @@
# -*- coding: utf-8 -*-
from collections import namedtuple
__version__ = '0.32.1'
__version__ = '0.33.1'
App = namedtuple('App', ['instance', 'base_url', 'client_id', 'client_secret'])
User = namedtuple('User', ['instance', 'username', 'access_token'])

View File

@ -1,5 +1,3 @@
# -*- coding: utf-8 -*-
import re
import uuid
@ -13,15 +11,24 @@ SCOPES = 'read write follow'
def _account_action(app, user, account, action):
url = '/api/v1/accounts/{}/{}'.format(account, action)
url = f"/api/v1/accounts/{account}/{action}"
return http.post(app, user, url).json()
def _status_action(app, user, status_id, action):
def _status_action(app, user, status_id, action, data=None):
url = f"/api/v1/statuses/{status_id}/{action}"
return http.post(app, user, url, data=data).json()
def _tag_action(app, user, tag_name, action):
url = f"/api/v1/tags/{tag_name}/{action}"
return http.post(app, user, url).json()
def _status_toggle_action(app, user, status_id, action, data=None):
url = '/api/v1/statuses/{}/{}'.format(status_id, action)
return http.post(app, user, url, data=data).json()
return http.post(app, user, url).json()
def _status_toggle_action(app, user, status_id, action):
url = '/api/v1/statuses/{}/{}'.format(status_id, action)
@ -42,8 +49,9 @@ def _status_toggle_action(app, user, status_id, action):
finally:
return response
def create_app(domain, scheme='https'):
url = '{}://{}/api/v1/apps'.format(scheme, domain)
url = f"{scheme}://{domain}/api/v1/apps"
json = {
'client_name': CLIENT_NAME,
@ -55,6 +63,14 @@ def create_app(domain, scheme='https'):
return http.anon_post(url, json=json).json()
def get_muted_accounts(app, user):
return http.get(app, user, "/api/v1/mutes").json()
def get_blocked_accounts(app, user):
return http.get(app, user, "/api/v1/blocks").json()
def register_account(app, username, email, password, locale="en", agreement=True):
"""
Register an account
@ -194,7 +210,7 @@ def delete_status(app, user, status_id):
Deletes a status with given ID.
https://github.com/tootsuite/documentation/blob/master/Using-the-API/API.md#deleting-a-status
"""
return http.delete(app, user, '/api/v1/statuses/{}'.format(status_id))
return http.delete(app, user, f"/api/v1/statuses/{status_id}")
def favourite(app, user, status_id):
@ -205,8 +221,8 @@ def unfavourite(app, user, status_id):
return _status_toggle_action(app, user, status_id, 'unfavourite')
def reblog(app, user, status_id):
return _status_toggle_action(app, user, status_id, 'reblog')
def reblog(app, user, status_id, visibility="public"):
return _status_toggle_action(app, user, status_id, 'reblog', data={"visibility": visibility})
def unreblog(app, user, status_id):
@ -236,14 +252,12 @@ def translate(app, user, status_id):
def context(app, user, status_id):
url = '/api/v1/statuses/{}/context'.format(status_id)
url = f"/api/v1/statuses/{status_id}/context"
return http.get(app, user, url).json()
def reblogged_by(app, user, status_id):
url = '/api/v1/statuses/{}/reblogged_by'.format(status_id)
url = f"/api/v1/statuses/{status_id}/reblogged_by"
return http.get(app, user, url).json()
@ -264,7 +278,7 @@ def _timeline_generator(app, user, path, params=None):
def home_timeline_generator(app, user, limit=20):
path = '/api/v1/timelines/home?limit={}'.format(limit)
path = f"/api/v1/timelines/home?limit={limit}"
return _timeline_generator(app, user, path)
@ -275,19 +289,25 @@ def public_timeline_generator(app, user, local=False, limit=20):
def tag_timeline_generator(app, user, hashtag, local=False, limit=20):
path = '/api/v1/timelines/tag/{}'.format(quote(hashtag))
path = f"/api/v1/timelines/tag/{quote(hashtag)}"
params = {'local': str_bool(local), 'limit': limit}
return _timeline_generator(app, user, path, params)
def bookmark_timeline_generator(app, user, limit=20):
path = '/api/v1/bookmarks'
params = {'limit': limit}
return _timeline_generator(app, user, path, params)
def timeline_list_generator(app, user, list_id, limit=20):
path = '/api/v1/timelines/list/{}'.format(list_id)
path = f"/api/v1/timelines/list/{list_id}"
return _timeline_generator(app, user, path, {'limit': limit})
def _anon_timeline_generator(instance, path, params=None):
while path:
url = "https://{}{}".format(instance, path)
url = f"https://{instance}{path}"
response = http.anon_get(url, params)
yield response.json()
path = _get_next_path(response.headers)
@ -300,7 +320,7 @@ def anon_public_timeline_generator(instance, local=False, limit=20):
def anon_tag_timeline_generator(instance, hashtag, local=False, limit=20):
path = '/api/v1/timelines/tag/{}'.format(quote(hashtag))
path = f"/api/v1/timelines/tag/{quote(hashtag)}"
params = {'local': str_bool(local), 'limit': limit}
return _anon_timeline_generator(instance, path, params)
@ -332,23 +352,36 @@ def unfollow(app, user, account):
return _account_action(app, user, account, 'unfollow')
def _get_account_list(app, user, path):
accounts = []
def follow_tag(app, user, tag_name):
return _tag_action(app, user, tag_name, 'follow')
def unfollow_tag(app, user, tag_name):
return _tag_action(app, user, tag_name, 'unfollow')
def _get_response_list(app, user, path):
items = []
while path:
response = http.get(app, user, path)
accounts += response.json()
items += response.json()
path = _get_next_path(response.headers)
return accounts
return items
def following(app, user, account):
path = '/api/v1/accounts/{}/{}'.format(account, 'following')
return _get_account_list(app, user, path)
path = f"/api/v1/accounts/{account}/following"
return _get_response_list(app, user, path)
def followers(app, user, account):
path = '/api/v1/accounts/{}/{}'.format(account, 'followers')
return _get_account_list(app, user, path)
path = f"/api/v1/accounts/{account}/followers"
return _get_response_list(app, user, path)
def followed_tags(app, user):
path = '/api/v1/followed_tags'
return _get_response_list(app, user, path)
def mute(app, user, account):
@ -372,8 +405,7 @@ def verify_credentials(app, user):
def single_status(app, user, status_id):
url = '/api/v1/statuses/{}'.format(status_id)
url = f"/api/v1/statuses/{status_id}"
return http.get(app, user, url).json()
@ -387,5 +419,5 @@ def clear_notifications(app, user):
def get_instance(domain, scheme="https"):
url = "{}://{}/api/v1/instance".format(scheme, domain)
url = f"{scheme}://{domain}/api/v1/instance"
return http.anon_get(url).json()

View File

@ -1,5 +1,3 @@
# -*- coding: utf-8 -*-
import sys
import webbrowser

View File

@ -1,13 +1,13 @@
# -*- coding: utf-8 -*-
import sys
import platform
from datetime import datetime, timedelta, timezone
from toot import api, config
from toot import api, config, __version__
from toot.auth import login_interactive, login_browser_interactive, create_app_interactive
from toot.exceptions import ApiError, ConsoleError
from toot.output import (print_out, print_instance, print_account, print_acct_list,
print_search_results, print_timeline, print_notifications)
print_search_results, print_timeline, print_notifications,
print_tag_list)
from toot.tui.utils import parse_datetime
from toot.utils import editor_input, multiline_input, EOF_KEY
@ -39,10 +39,11 @@ def get_timeline_generator(app, user, args):
return api.home_timeline_generator(app, user, limit=args.count)
def timeline(app, user, args):
generator = get_timeline_generator(app, user, args)
def timeline(app, user, args, generator=None):
if not generator:
generator = get_timeline_generator(app, user, args)
while(True):
while True:
try:
items = next(generator)
except StopIteration:
@ -168,7 +169,7 @@ def unfavourite(app, user, args):
def reblog(app, user, args):
api.reblog(app, user, args.status_id)
api.reblog(app, user, args.status_id, visibility=args.visibility)
print_out("<green>✓ Status reblogged</green>")
@ -197,6 +198,10 @@ def unbookmark(app, user, args):
print_out("<green>✓ Status unbookmarked</green>")
def bookmarks(app, user, args):
timeline(app, user, args, api.bookmark_timeline_generator(app, user, limit=args.count))
def reblogged_by(app, user, args):
for account in api.reblogged_by(app, user, args.status_id):
print_out("{}\n @{}".format(account['display_name'], account['acct']))
@ -220,6 +225,12 @@ def auth(app, user, args):
print_out("\nAuth tokens are stored in: <blue>{}</blue>".format(path))
def env(app, user, args):
print_out(f"toot {__version__}")
print_out(f"Python {sys.version}")
print_out(platform.platform())
def login_cli(app, user, args):
app = create_app_interactive(instance=args.instance, scheme=args.scheme)
login_interactive(app, args.email)
@ -316,6 +327,23 @@ def followers(app, user, args):
print_acct_list(response)
def tags_follow(app, user, args):
tn = args.tag_name if not args.tag_name.startswith("#") else args.tag_name[1:]
api.follow_tag(app, user, tn)
print_out("<green>✓ You are now following #{}</green>".format(tn))
def tags_unfollow(app, user, args):
tn = args.tag_name if not args.tag_name.startswith("#") else args.tag_name[1:]
api.unfollow_tag(app, user, tn)
print_out("<green>✓ You are no longer following #{}</green>".format(tn))
def tags_followed(app, user, args):
response = api.followed_tags(app, user)
print_tag_list(response)
def mute(app, user, args):
account = _find_account(app, user, args.account)
api.mute(app, user, account['id'])
@ -389,4 +417,4 @@ def notifications(app, user, args):
def tui(app, user, args):
from .tui.app import TUI
TUI.create(app, user).run()
TUI.create(app, user, args).run()

View File

@ -1,5 +1,3 @@
# -*- coding: utf-8 -*-
import logging
import os
import re
@ -8,11 +6,17 @@ import sys
from argparse import ArgumentParser, FileType, ArgumentTypeError
from collections import namedtuple
from itertools import chain
from toot import config, commands, CLIENT_NAME, CLIENT_WEBSITE, __version__
from toot.exceptions import ApiError, ConsoleError
from toot.output import print_out, print_err
VISIBILITY_CHOICES = ['public', 'unlisted', 'private', 'direct']
VISIBILITY_CHOICES_STR = ", ".join(f"'{v}'" for v in VISIBILITY_CHOICES)
def get_default_visibility():
return os.getenv("TOOT_POST_VISIBILITY", "public")
def language(value):
@ -112,6 +116,11 @@ common_args = [
"action": 'store_true',
"default": False,
}),
(["--verbose"], {
"help": "show extra detail in debug log; used with --debug",
"action": 'store_true',
"default": False,
}),
]
# Arguments added to commands which require authentication
@ -148,6 +157,19 @@ status_id_arg = (["status_id"], {
"type": str,
})
visibility_arg = (["-v", "--visibility"], {
"type": visibility,
"default": get_default_visibility(),
"help": f"Post visibility. One of: {VISIBILITY_CHOICES_STR}. Defaults to "
f"'{get_default_visibility()}' which can be overridden by setting "
"the TOOT_POST_VISIBILITY environment variable",
})
tag_arg = (["tag_name"], {
"type": str,
"help": "tag name, e.g. Caturday, or \"#Caturday\"",
})
# Arguments for selecting a timeline (see `toot.commands.get_timeline_generator`)
common_timeline_args = [
(["-p", "--public"], {
@ -174,7 +196,7 @@ common_timeline_args = [
}),
]
timeline_args = common_timeline_args + [
timeline_and_bookmark_args = [
(["-c", "--count"], {
"type": timeline_count,
"help": "number of toots to show per page (1-20, default 10).",
@ -192,6 +214,8 @@ timeline_args = common_timeline_args + [
}),
]
timeline_args = common_timeline_args + timeline_and_bookmark_args
AUTH_COMMANDS = [
Command(
name="login",
@ -223,13 +247,25 @@ AUTH_COMMANDS = [
arguments=[],
require_auth=False,
),
Command(
name="env",
description="Print environment information for inclusion in bug reports.",
arguments=[],
require_auth=False,
),
]
TUI_COMMANDS = [
Command(
name="tui",
description="Launches the toot terminal user interface",
arguments=[],
arguments=[
(["--relative-datetimes"], {
"action": "store_true",
"default": False,
"help": "Show relative datetimes in status list.",
}),
],
require_auth=True,
),
]
@ -317,6 +353,12 @@ READ_COMMANDS = [
arguments=timeline_args,
require_auth=True,
),
Command(
name="bookmarks",
description="Show bookmarked posts",
arguments=timeline_and_bookmark_args,
require_auth=True,
),
]
POST_COMMANDS = [
@ -340,11 +382,7 @@ POST_COMMANDS = [
"help": "plain-text description of the media for accessibility "
"purposes, one per attached media"
}),
(["-v", "--visibility"], {
"type": visibility,
"default": "public",
"help": 'post visibility, one of: %s' % ", ".join(VISIBILITY_CHOICES),
}),
visibility_arg,
(["-s", "--sensitive"], {
"action": 'store_true',
"default": False,
@ -428,7 +466,7 @@ STATUS_COMMANDS = [
Command(
name="reblog",
description="Reblog a status",
arguments=[status_id_arg],
arguments=[status_id_arg, visibility_arg],
require_auth=True,
),
Command(
@ -536,25 +574,47 @@ ACCOUNTS_COMMANDS = [
),
]
COMMANDS = AUTH_COMMANDS + READ_COMMANDS + TUI_COMMANDS + POST_COMMANDS + STATUS_COMMANDS + ACCOUNTS_COMMANDS
TAG_COMMANDS = [
Command(
name="tags_followed",
description="List hashtags you follow",
arguments=[],
require_auth=True,
),
Command(
name="tags_follow",
description="Follow a hashtag",
arguments=[tag_arg],
require_auth=True,
),
Command(
name="tags_unfollow",
description="Unfollow a hashtag",
arguments=[tag_arg],
require_auth=True,
),
]
COMMAND_GROUPS = [
("Authentication", AUTH_COMMANDS),
("TUI", TUI_COMMANDS),
("Read", READ_COMMANDS),
("Post", POST_COMMANDS),
("Status", STATUS_COMMANDS),
("Accounts", ACCOUNTS_COMMANDS),
("Hashtags", TAG_COMMANDS),
]
COMMANDS = list(chain(*[commands for _, commands in COMMAND_GROUPS]))
def print_usage():
max_name_len = max(len(command.name) for command in COMMANDS)
groups = [
("Authentication", AUTH_COMMANDS),
("TUI", TUI_COMMANDS),
("Read", READ_COMMANDS),
("Post", POST_COMMANDS),
("Status", STATUS_COMMANDS),
("Accounts", ACCOUNTS_COMMANDS),
]
max_name_len = max(len(name) for name, _ in COMMAND_GROUPS)
print_out("<green>{}</green>".format(CLIENT_NAME))
print_out("<blue>v{}</blue>".format(__version__))
for name, cmds in groups:
for name, cmds in COMMAND_GROUPS:
print_out("")
print_out(name + ":")
@ -564,7 +624,7 @@ def print_usage():
print_out("")
print_out("To get help for each command run:")
print_out(" <yellow>toot <command> --help</yellow>")
print_out(" <yellow>toot \\<command> --help</yellow>")
print_out("")
print_out("<green>{}</green>".format(CLIENT_WEBSITE))
@ -589,8 +649,8 @@ def run_command(app, user, name, args):
command = next((c for c in COMMANDS if c.name == name), None)
if not command:
print_err("Unknown command '{}'\n".format(name))
print_usage()
print_err(f"Unknown command '{name}'")
print_out("Run <yellow>toot --help</yellow> to show a list of available commands.")
return
parser = get_argument_parser(name, command)
@ -624,7 +684,7 @@ def main():
command_name = sys.argv[1] if len(sys.argv) > 1 else None
args = sys.argv[2:]
if not command_name:
if not command_name or command_name == "--help":
return print_usage()
user, app = config.get_active_user_app()

View File

@ -1,9 +1,24 @@
import json
import sys
from logging import getLogger
logger = getLogger('toot')
VERBOSE = "--verbose" in sys.argv
COLOR = "--no-color" not in sys.argv
if COLOR:
ANSI_RED = "\033[31m"
ANSI_GREEN = "\033[32m"
ANSI_YELLOW = "\033[33m"
ANSI_END_COLOR = "\033[0m"
else:
ANSI_RED = ""
ANSI_GREEN = ""
ANSI_YELLOW = ""
ANSI_END_COLOR = ""
def censor_secrets(headers):
def _censor(k, v):
@ -14,33 +29,46 @@ def censor_secrets(headers):
return {_censor(k, v) for k, v in headers.items()}
def truncate(line):
if not VERBOSE and len(line) > 100:
return line[:100] + ""
return line
def log_request(request):
logger.debug(">>> \033[32m{} {}\033[0m".format(request.method, request.url))
logger.debug(f">>> {ANSI_GREEN}{request.method} {request.url}{ANSI_END_COLOR}")
if request.headers:
headers = censor_secrets(request.headers)
logger.debug(">>> HEADERS: \033[33m{}\033[0m".format(headers))
logger.debug(f">>> HEADERS: {ANSI_GREEN}{headers}{ANSI_END_COLOR}")
if request.data:
logger.debug(">>> DATA: \033[33m{}\033[0m".format(request.data))
data = truncate(request.data)
logger.debug(f">>> DATA: {ANSI_GREEN}{data}{ANSI_END_COLOR}")
if request.json:
logger.debug(">>> JSON: \033[33m{}\033[0m".format(json.dumps(request.json)))
data = truncate(json.dumps(request.json))
logger.debug(f">>> JSON: {ANSI_GREEN}{data}{ANSI_END_COLOR}")
if request.files:
logger.debug(">>> FILES: \033[33m{}\033[0m".format(request.files))
logger.debug(f">>> FILES: {ANSI_GREEN}{request.files}{ANSI_END_COLOR}")
if request.params:
logger.debug(">>> PARAMS: \033[33m{}\033[0m".format(request.params))
logger.debug(f">>> PARAMS: {ANSI_GREEN}{request.params}{ANSI_END_COLOR}")
def log_response(response):
content = truncate(response.content.decode())
if response.ok:
logger.debug("<<< \033[32m{}\033[0m".format(response))
logger.debug("<<< \033[33m{}\033[0m".format(response.content.decode()))
logger.debug(f"<<< {ANSI_GREEN}{response}{ANSI_END_COLOR}")
logger.debug(f"<<< {ANSI_YELLOW}{content}{ANSI_END_COLOR}")
else:
logger.debug("<<< \033[31m{}\033[0m".format(response))
logger.debug("<<< \033[31m{}\033[0m".format(response.content.decode()))
logger.debug(f"<<< {ANSI_RED}{response}{ANSI_END_COLOR}")
logger.debug(f"<<< {ANSI_RED}{content}{ANSI_END_COLOR}")
def log_debug(*msgs):

View File

@ -1,11 +1,8 @@
# -*- coding: utf-8 -*-
import os
import re
import sys
import textwrap
from textwrap import wrap
from toot.tui.utils import parse_datetime
from wcwidth import wcswidth
@ -169,11 +166,9 @@ def print_instance(instance):
def print_account(account):
print_out(f"<green>@{account['acct']}</green> {account['display_name']}")
note = get_text(account['note'])
if note:
if account["note"]:
print_out("")
print_out("\n".join(wrap(note)))
print_html(account["note"])
print_out("")
print_out(f"ID: <green>{account['id']}</green>")
@ -182,6 +177,15 @@ def print_account(account):
print_out(f"Followers: <yellow>{account['followers_count']}</yellow>")
print_out(f"Following: <yellow>{account['following_count']}</yellow>")
print_out(f"Statuses: <yellow>{account['statuses_count']}</yellow>")
if account["fields"]:
for field in account["fields"]:
name = field["name"].title()
print_out(f'\n<yellow>{name}</yellow>:')
print_html(field["value"])
if field["verified_at"]:
print_out("<green>✓ Verified</green>")
print_out("")
print_out(account["url"])
@ -198,6 +202,14 @@ def print_acct_list(accounts):
print_out(f"* <green>@{account['acct']}</green> {account['display_name']}")
def print_tag_list(tags):
if tags:
for tag in tags:
print_out(f"* <green>#{tag['name']}\t</green>{tag['url']}")
else:
print_out("You're not following any hashtags.")
def print_search_results(results):
accounts = results['accounts']
hashtags = results['hashtags']
@ -238,11 +250,8 @@ def print_status(status, width):
f"<yellow>{time}</yellow>",
)
for paragraph in parse_html(content):
print_out("")
for line in paragraph:
for subline in wc_wrap(line, width):
print_out(highlight_hashtags(subline))
print_out("")
print_html(content, width)
if media_attachments:
print_out("\nMedia:")
@ -262,6 +271,17 @@ def print_status(status, width):
)
def print_html(text, width=80):
first = True
for paragraph in parse_html(text):
if not first:
print_out("")
for line in paragraph:
for subline in wc_wrap(line, width):
print_out(highlight_hashtags(subline))
first = False
def print_poll(poll):
print_out()
for idx, option in enumerate(poll["options"]):

View File

@ -4,6 +4,8 @@ import urwid
from concurrent.futures import ThreadPoolExecutor
from toot import api, config, __version__
from toot.console import get_default_visibility
from toot.exceptions import ApiError
from .compose import StatusComposer
from .constants import PALETTE
@ -71,10 +73,10 @@ class TUI(urwid.Frame):
"""Main TUI frame."""
@classmethod
def create(cls, app, user):
def create(cls, app, user, args):
"""Factory method, sets up TUI and an event loop."""
tui = cls(app, user)
tui = cls(app, user, args)
loop = urwid.MainLoop(
tui,
palette=PALETTE,
@ -85,9 +87,10 @@ class TUI(urwid.Frame):
return tui
def __init__(self, app, user):
def __init__(self, app, user, args):
self.app = app
self.user = user
self.args = args
self.config = config.load_config()
self.loop = None # set in `create`
@ -112,6 +115,7 @@ class TUI(urwid.Frame):
def run(self):
self.loop.set_alarm_in(0, lambda *args: self.async_load_instance())
self.loop.set_alarm_in(0, lambda *args: self.async_load_followed_tags())
self.loop.set_alarm_in(0, lambda *args: self.async_load_timeline(
is_initial=True, timeline_name="home"))
self.loop.run()
@ -196,6 +200,10 @@ class TUI(urwid.Frame):
def _zoom(timeline, status_details):
self.show_status_zoom(status_details)
def _clear(*args):
self.clear_screen()
urwid.connect_signal(timeline, "bookmark", self.async_toggle_bookmark)
urwid.connect_signal(timeline, "compose", _compose)
urwid.connect_signal(timeline, "delete", _delete)
urwid.connect_signal(timeline, "favourite", self.async_toggle_favourite)
@ -208,6 +216,7 @@ class TUI(urwid.Frame):
urwid.connect_signal(timeline, "links", _links)
urwid.connect_signal(timeline, "zoom", _zoom)
urwid.connect_signal(timeline, "translate", self.async_translate)
urwid.connect_signal(timeline, "clear-screen", _clear)
def build_timeline(self, name, statuses, local):
def _close(*args):
@ -234,7 +243,7 @@ class TUI(urwid.Frame):
self.loop.set_alarm_in(5, lambda *args: self.footer.clear_message())
config.save_config(self.config)
timeline = Timeline(name, statuses, self.can_translate)
timeline = Timeline(name, statuses, self.can_translate, self.followed_tags)
self.connect_default_timeline_signals(timeline)
urwid.connect_signal(timeline, "next", _next)
@ -263,8 +272,9 @@ class TUI(urwid.Frame):
statuses = ancestors + [status] + descendants
focus = len(ancestors)
timeline = Timeline("thread", statuses, self.can_translate, focus,
is_thread=True)
timeline = Timeline("thread", statuses, self.can_translate,
self.followed_tags, focus, is_thread=True)
self.connect_default_timeline_signals(timeline)
urwid.connect_signal(timeline, "close", _close)
@ -327,10 +337,27 @@ class TUI(urwid.Frame):
# this works for Mastodon and Pleroma version strings
# Mastodon versions < 4 do not have translation service
# Revisit this logic if Pleroma implements translation
self.can_translate = int(instance["version"][0]) > 3
ch = instance["version"][0]
self.can_translate = int(ch) > 3 if ch.isnumeric() else False
return self.run_in_thread(_load_instance, done_callback=_done)
def async_load_followed_tags(self):
def _load_tag_list():
try:
return api.followed_tags(self.app, self.user)
except ApiError:
# not supported by all Mastodon servers so fail silently if necessary
return []
def _done_tag_list(tags):
if len(tags) > 0:
self.followed_tags = [t["name"] for t in tags]
else:
self.followed_tags = []
self.run_in_thread(_load_tag_list, done_callback=_done_tag_list)
def refresh_footer(self, timeline):
"""Show status details in footer."""
status, index, count = timeline.get_focused_status_with_counts()
@ -345,16 +372,26 @@ class TUI(urwid.Frame):
title="Status source",
)
def clear_screen(self):
self.loop.screen.clear()
def show_links(self, status):
links = parse_content_links(status.data["content"]) if status else []
post_attachments = status.data["media_attachments"] or []
reblog_attachments = (status.data["reblog"]["media_attachments"] if status.data["reblog"] else None) or []
for a in post_attachments + reblog_attachments:
url = a["remote_url"] or a["url"]
links.append((url, a["description"] if a["description"] else url))
def _clear(*args):
self.clear_screen()
if links:
sl_widget = StatusLinks(links)
urwid.connect_signal(sl_widget, "clear-screen", _clear)
self.open_overlay(
widget=StatusLinks(links),
widget=sl_widget,
title="Status links",
options={"height": len(links) + 2},
)
@ -378,7 +415,7 @@ class TUI(urwid.Frame):
def _post(timeline, *args):
self.post_status(*args)
composer = StatusComposer(self.max_toot_chars, in_reply_to)
composer = StatusComposer(self.max_toot_chars, self.user.username, in_reply_to)
urwid.connect_signal(composer, "close", _close)
urwid.connect_signal(composer, "post", _post)
self.open_overlay(composer, title="Compose status")
@ -390,12 +427,15 @@ class TUI(urwid.Frame):
lambda x: self.goto_home_timeline())
urwid.connect_signal(menu, "public_timeline",
lambda x, local: self.goto_public_timeline(local))
urwid.connect_signal(menu, "bookmark_timeline",
lambda x, local: self.goto_bookmarks())
urwid.connect_signal(menu, "hashtag_timeline",
lambda x, tag, local: self.goto_tag_timeline(tag, local=local))
self.open_overlay(menu, title="Go to", options=dict(
align="center", width=("relative", 60),
valign="middle", height=9 + len(user_timelines),
valign="middle", height=10 + len(user_timelines),
))
def show_help(self):
@ -413,6 +453,12 @@ class TUI(urwid.Frame):
promise = self.async_load_timeline(is_initial=True, timeline_name="public")
promise.add_done_callback(lambda *args: self.close_overlay())
def goto_bookmarks(self):
self.timeline_generator = api.bookmark_timeline_generator(
self.app, self.user, limit=40)
promise = self.async_load_timeline(is_initial=True, timeline_name="bookmarks")
promise.add_done_callback(lambda *args: self.close_overlay())
def goto_tag_timeline(self, tag, local):
self.timeline_generator = api.tag_timeline_generator(
self.app, self.user, tag, local=local, limit=40)
@ -453,9 +499,7 @@ class TUI(urwid.Frame):
in_reply_to_id=in_reply_to_id)
status = self.make_status(data)
# TODO: instead of this, fetch new items from the timeline?
self.timeline.prepend_status(status)
self.timeline.focus_status(status)
# TODO: fetch new items from the timeline?
self.footer.set_message("Status posted {} \\o/".format(status.id))
self.close_overlay()
@ -484,7 +528,7 @@ class TUI(urwid.Frame):
def async_toggle_reblog(self, timeline, status):
def _reblog():
logger.info("Reblogging {}".format(status))
api.reblog(self.app, self.user, status.id)
api.reblog(self.app, self.user, status.id, visibility=get_default_visibility())
def _unreblog():
logger.info("Unreblogging {}".format(status))
@ -497,6 +541,13 @@ class TUI(urwid.Frame):
new_status = self.make_status(new_data)
timeline.update_status(new_status)
# Check if status is rebloggable
no_reblog_because_private = status.visibility == "private" and not status.is_mine
no_reblog_because_direct = status.visibility == "direct"
if no_reblog_because_private or no_reblog_because_direct:
self.footer.set_error_message("You may not reblog this {} status".format(status.visibility))
return
self.run_in_thread(
_unreblog if status.reblogged else _reblog,
done_callback=_done
@ -514,7 +565,7 @@ class TUI(urwid.Frame):
else:
self.footer.set_error_message("Server returned empty translation")
response = None
except:
except Exception:
response = None
self.footer.set_error_message("Translate server error")
@ -535,6 +586,27 @@ class TUI(urwid.Frame):
else:
self.run_in_thread(_translate, done_callback=_done)
def async_toggle_bookmark(self, timeline, status):
def _bookmark():
logger.info("Bookmarking {}".format(status))
api.bookmark(self.app, self.user, status.id)
def _unbookmark():
logger.info("Unbookmarking {}".format(status))
api.unbookmark(self.app, self.user, status.id)
def _done(loop):
# Create a new Status with flipped bookmarked flag
new_data = status.data
new_data["bookmarked"] = not status.bookmarked
new_status = self.make_status(new_data)
timeline.update_status(new_status)
self.run_in_thread(
_unbookmark if status.bookmarked else _bookmark,
done_callback=_done
)
def async_delete_status(self, timeline, status):
def _delete():
api.delete_status(self.app, self.user, status.id)

View File

@ -1,6 +1,8 @@
import urwid
import logging
from toot.console import get_default_visibility
from .constants import VISIBILITY_OPTIONS
from .widgets import Button, EditBox
@ -13,9 +15,10 @@ class StatusComposer(urwid.Frame):
"""
signals = ["close", "post"]
def __init__(self, max_chars, in_reply_to=None):
def __init__(self, max_chars, username, in_reply_to=None):
self.in_reply_to = in_reply_to
self.max_chars = max_chars
self.username = username
text = self.get_initial_text(in_reply_to)
self.content_edit = EditBox(
@ -30,7 +33,7 @@ class StatusComposer(urwid.Frame):
self.cw_remove_button = Button("Remove content warning",
on_press=self.remove_content_warning)
self.visibility = "public"
self.visibility = get_default_visibility()
self.visibility_button = Button("Visibility: {}".format(self.visibility),
on_press=self.choose_visibility)
@ -46,8 +49,8 @@ class StatusComposer(urwid.Frame):
if not in_reply_to:
return ""
text = '@{} '.format(in_reply_to.account)
mentions = ['@{}'.format(m["acct"]) for m in in_reply_to.mentions]
text = '' if in_reply_to.is_mine else '@{} '.format(in_reply_to.original.account)
mentions = ['@{}'.format(m["acct"]) for m in in_reply_to.mentions if m["acct"] != self.username]
if mentions:
text += '\n\n{}'.format(' '.join(mentions))
@ -61,7 +64,7 @@ class StatusComposer(urwid.Frame):
def generate_list_items(self):
if self.in_reply_to:
yield urwid.Text(("gray", "Replying to {}".format(self.in_reply_to.account)))
yield urwid.Text(("gray", "Replying to {}".format(self.in_reply_to.original.account)))
yield urwid.AttrWrap(urwid.Divider("-"), "gray")
yield urwid.Text("Status message")

View File

@ -19,6 +19,7 @@ PALETTE = [
# Functional
('hashtag', 'light cyan,bold', ''),
('followed_hashtag', 'yellow,bold', ''),
('link', ',italics', ''),
('link_focused', ',italics', 'dark magenta'),
@ -34,7 +35,9 @@ PALETTE = [
('green_selected', 'white,bold', 'dark green'),
('yellow', 'yellow', ''),
('yellow_bold', 'yellow,bold', ''),
('red', 'dark red', ''),
('warning', 'light red', ''),
('white_bold', 'white,bold', '')
]
VISIBILITY_OPTIONS = [

View File

@ -56,10 +56,12 @@ class Status:
self.author = self._get_author()
self.favourited = data.get("favourited", False)
self.reblogged = data.get("reblogged", False)
self.bookmarked = data.get("bookmarked", False)
self.in_reply_to = data.get("in_reply_to_id")
self.url = data.get("url")
self.mentions = data.get("mentions")
self.reblog = self._get_reblog()
self.visibility = data.get("visibility")
@property
def original(self):

View File

@ -30,22 +30,28 @@ class StatusZoom(urwid.ListBox):
class StatusLinks(urwid.ListBox):
"""Shows status links."""
signals = ["clear-screen"]
def __init__(self, links):
def widget(url, title):
return Button(title or url, on_press=lambda btn: webbrowser.open(url))
return Button(title or url, on_press=lambda btn: self.browse(url))
walker = urwid.SimpleFocusListWalker(
[widget(url, title) for url, title in links]
)
super().__init__(walker)
def browse(self, url):
webbrowser.open(url)
# force a screen refresh; necessary with console browsers
self._emit("clear-screen")
class ExceptionStackTrace(urwid.ListBox):
"""Shows an exception stack trace."""
def __init__(self, ex):
lines = traceback.format_exception(etype=type(ex), value=ex, tb=ex.__traceback__)
lines = traceback.format_exception(type(ex), value=ex, tb=ex.__traceback__)
walker = urwid.SimpleFocusListWalker([
urwid.Text(line) for line in lines
])
@ -74,6 +80,7 @@ class GotoMenu(urwid.ListBox):
"home_timeline",
"public_timeline",
"hashtag_timeline",
"bookmark_timeline",
]
def __init__(self, user_timelines):
@ -96,6 +103,9 @@ class GotoMenu(urwid.ListBox):
def _global_public(button):
self._emit("public_timeline", False)
def _bookmarks(button):
self._emit("bookmark_timeline", False)
def _hashtag(local):
hashtag = self.get_hashtag()
if hashtag:
@ -117,6 +127,7 @@ class GotoMenu(urwid.ListBox):
yield Button("Local public timeline", on_press=_local_public)
yield Button("Global public timeline", on_press=_global_public)
yield Button("Bookmarks", on_press=_bookmarks)
yield urwid.Divider()
yield self.hash_edit
yield Button("Local hashtag timeline", on_press=lambda x: _hashtag(True))
@ -164,6 +175,7 @@ class Help(urwid.Padding):
yield urwid.Text(h(" [B] - Boost/unboost status"))
yield urwid.Text(h(" [C] - Compose new status"))
yield urwid.Text(h(" [F] - Favourite/unfavourite status"))
yield urwid.Text(h(" [K] - Bookmark/unbookmark status"))
yield urwid.Text(h(" [N] - Translate status if possible (toggle)"))
yield urwid.Text(h(" [R] - Reply to current status"))
yield urwid.Text(h(" [S] - Show text marked as sensitive"))

426
toot/tui/scroll.py Normal file
View File

@ -0,0 +1,426 @@
# scroll.py
#
# Copied from the stig project by rndusr@github
# https://github.com/rndusr/stig
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details
# http://www.gnu.org/licenses/gpl-3.0.txt
import urwid
from urwid.widget import BOX, FIXED, FLOW
# Scroll actions
SCROLL_LINE_UP = 'line up'
SCROLL_LINE_DOWN = 'line down'
SCROLL_PAGE_UP = 'page up'
SCROLL_PAGE_DOWN = 'page down'
SCROLL_TO_TOP = 'to top'
SCROLL_TO_END = 'to end'
# Scrollbar positions
SCROLLBAR_LEFT = 'left'
SCROLLBAR_RIGHT = 'right'
class Scrollable(urwid.WidgetDecoration):
def sizing(self):
return frozenset([BOX,])
def selectable(self):
return True
def __init__(self, widget):
"""Box widget that makes a fixed or flow widget vertically scrollable
TODO: Focusable widgets are handled, including switching focus, but
possibly not intuitively, depending on the arrangement of widgets. When
switching focus to a widget that is outside of the visible part of the
original widget, the canvas scrolls up/down to the focused widget. It
would be better to scroll until the next focusable widget is in sight
first. But for that to work we must somehow obtain a list of focusable
rows in the original canvas.
"""
if not any(s in widget.sizing() for s in (FIXED, FLOW)):
raise ValueError('Not a fixed or flow widget: %r' % widget)
self._trim_top = 0
self._scroll_action = None
self._forward_keypress = None
self._old_cursor_coords = None
self._rows_max_cached = 0
self.__super.__init__(widget)
def render(self, size, focus=False):
maxcol, maxrow = size
# Render complete original widget
ow = self._original_widget
ow_size = self._get_original_widget_size(size)
canv_full = ow.render(ow_size, focus)
# Make full canvas editable
canv = urwid.CompositeCanvas(canv_full)
canv_cols, canv_rows = canv.cols(), canv.rows()
if canv_cols <= maxcol:
pad_width = maxcol - canv_cols
if pad_width > 0:
# Canvas is narrower than available horizontal space
canv.pad_trim_left_right(0, pad_width)
if canv_rows <= maxrow:
fill_height = maxrow - canv_rows
if fill_height > 0:
# Canvas is lower than available vertical space
canv.pad_trim_top_bottom(0, fill_height)
if canv_cols <= maxcol and canv_rows <= maxrow:
# Canvas is small enough to fit without trimming
return canv
self._adjust_trim_top(canv, size)
# Trim canvas if necessary
trim_top = self._trim_top
trim_end = canv_rows - maxrow - trim_top
trim_right = canv_cols - maxcol
if trim_top > 0:
canv.trim(trim_top)
if trim_end > 0:
canv.trim_end(trim_end)
if trim_right > 0:
canv.pad_trim_left_right(0, -trim_right)
# Disable cursor display if cursor is outside of visible canvas parts
if canv.cursor is not None:
curscol, cursrow = canv.cursor
if cursrow >= maxrow or cursrow < 0:
canv.cursor = None
# Figure out whether we should forward keypresses to original widget
if canv.cursor is not None:
# Trimmed canvas contains the cursor, e.g. in an Edit widget
self._forward_keypress = True
else:
if canv_full.cursor is not None:
# Full canvas contains the cursor, but scrolled out of view
self._forward_keypress = False
else:
# Original widget does not have a cursor, but may be selectable
# FIXME: Using ow.selectable() is bad because the original
# widget may be selectable because it's a container widget with
# a key-grabbing widget that is scrolled out of view.
# ow.selectable() returns True anyway because it doesn't know
# how we trimmed our canvas.
#
# To fix this, we need to resolve ow.focus and somehow
# ask canv whether it contains bits of the focused widget. I
# can't see a way to do that.
if ow.selectable():
self._forward_keypress = True
else:
self._forward_keypress = False
return canv
def keypress(self, size, key):
# Maybe offer key to original widget
if self._forward_keypress:
ow = self._original_widget
ow_size = self._get_original_widget_size(size)
# Remember previous cursor position if possible
if hasattr(ow, 'get_cursor_coords'):
self._old_cursor_coords = ow.get_cursor_coords(ow_size)
key = ow.keypress(ow_size, key)
if key is None:
return None
# Handle up/down, page up/down, etc
command_map = self._command_map
if command_map[key] == urwid.CURSOR_UP:
self._scroll_action = SCROLL_LINE_UP
elif command_map[key] == urwid.CURSOR_DOWN:
self._scroll_action = SCROLL_LINE_DOWN
elif command_map[key] == urwid.CURSOR_PAGE_UP:
self._scroll_action = SCROLL_PAGE_UP
elif command_map[key] == urwid.CURSOR_PAGE_DOWN:
self._scroll_action = SCROLL_PAGE_DOWN
elif command_map[key] == urwid.CURSOR_MAX_LEFT: # 'home'
self._scroll_action = SCROLL_TO_TOP
elif command_map[key] == urwid.CURSOR_MAX_RIGHT: # 'end'
self._scroll_action = SCROLL_TO_END
else:
return key
self._invalidate()
def mouse_event(self, size, event, button, col, row, focus):
ow = self._original_widget
if hasattr(ow, 'mouse_event'):
ow_size = self._get_original_widget_size(size)
row += self._trim_top
return ow.mouse_event(ow_size, event, button, col, row, focus)
else:
return False
def _adjust_trim_top(self, canv, size):
"""Adjust self._trim_top according to self._scroll_action"""
action = self._scroll_action
self._scroll_action = None
maxcol, maxrow = size
trim_top = self._trim_top
canv_rows = canv.rows()
if trim_top < 0:
# Negative trim_top values use bottom of canvas as reference
trim_top = canv_rows - maxrow + trim_top + 1
if canv_rows <= maxrow:
self._trim_top = 0 # Reset scroll position
return
def ensure_bounds(new_trim_top):
return max(0, min(canv_rows - maxrow, new_trim_top))
if action == SCROLL_LINE_UP:
self._trim_top = ensure_bounds(trim_top - 1)
elif action == SCROLL_LINE_DOWN:
self._trim_top = ensure_bounds(trim_top + 1)
elif action == SCROLL_PAGE_UP:
self._trim_top = ensure_bounds(trim_top - maxrow + 1)
elif action == SCROLL_PAGE_DOWN:
self._trim_top = ensure_bounds(trim_top + maxrow - 1)
elif action == SCROLL_TO_TOP:
self._trim_top = 0
elif action == SCROLL_TO_END:
self._trim_top = canv_rows - maxrow
else:
self._trim_top = ensure_bounds(trim_top)
# If the cursor was moved by the most recent keypress, adjust trim_top
# so that the new cursor position is within the displayed canvas part.
# But don't do this if the cursor is at the top/bottom edge so we can still scroll out
if self._old_cursor_coords is not None and self._old_cursor_coords != canv.cursor:
self._old_cursor_coords = None
curscol, cursrow = canv.cursor
if cursrow < self._trim_top:
self._trim_top = cursrow
elif cursrow >= self._trim_top + maxrow:
self._trim_top = max(0, cursrow - maxrow + 1)
def _get_original_widget_size(self, size):
ow = self._original_widget
sizing = ow.sizing()
if FIXED in sizing:
return ()
elif FLOW in sizing:
return (size[0],)
def get_scrollpos(self, size=None, focus=False):
"""Current scrolling position
Lower limit is 0, upper limit is the maximum number of rows with the
given maxcol minus maxrow.
NOTE: The returned value may be too low or too high if the position has
changed but the widget wasn't rendered yet.
"""
return self._trim_top
def set_scrollpos(self, position):
"""Set scrolling position
If `position` is positive it is interpreted as lines from the top.
If `position` is negative it is interpreted as lines from the bottom.
Values that are too high or too low values are automatically adjusted
during rendering.
"""
self._trim_top = int(position)
self._invalidate()
def rows_max(self, size=None, focus=False):
"""Return the number of rows for `size`
If `size` is not given, the currently rendered number of rows is returned.
"""
if size is not None:
ow = self._original_widget
ow_size = self._get_original_widget_size(size)
sizing = ow.sizing()
if FIXED in sizing:
self._rows_max_cached = ow.pack(ow_size, focus)[1]
elif FLOW in sizing:
self._rows_max_cached = ow.rows(ow_size, focus)
else:
raise RuntimeError('Not a flow/box widget: %r' % self._original_widget)
return self._rows_max_cached
class ScrollBar(urwid.WidgetDecoration):
def sizing(self):
return frozenset((BOX,))
def selectable(self):
return True
def __init__(self, widget, thumb_char=u'\u2588', trough_char=' ',
side=SCROLLBAR_RIGHT, width=1):
"""Box widget that adds a scrollbar to `widget`
`widget` must be a box widget with the following methods:
- `get_scrollpos` takes the arguments `size` and `focus` and returns
the index of the first visible row.
- `set_scrollpos` (optional; needed for mouse click support) takes the
index of the first visible row.
- `rows_max` takes `size` and `focus` and returns the total number of
rows `widget` can render.
`thumb_char` is the character used for the scrollbar handle.
`trough_char` is used for the space above and below the handle.
`side` must be 'left' or 'right'.
`width` specifies the number of columns the scrollbar uses.
"""
if BOX not in widget.sizing():
raise ValueError('Not a box widget: %r' % widget)
self.__super.__init__(widget)
self._thumb_char = thumb_char
self._trough_char = trough_char
self.scrollbar_side = side
self.scrollbar_width = max(1, width)
self._original_widget_size = (0, 0)
def render(self, size, focus=False):
maxcol, maxrow = size
sb_width = self._scrollbar_width
ow_size = (max(0, maxcol - sb_width), maxrow)
sb_width = maxcol - ow_size[0]
ow = self._original_widget
ow_base = self.scrolling_base_widget
ow_rows_max = ow_base.rows_max(size, focus)
if ow_rows_max <= maxrow:
# Canvas fits without scrolling - no scrollbar needed
self._original_widget_size = size
return ow.render(size, focus)
ow_rows_max = ow_base.rows_max(ow_size, focus)
ow_canv = ow.render(ow_size, focus)
self._original_widget_size = ow_size
pos = ow_base.get_scrollpos(ow_size, focus)
posmax = ow_rows_max - maxrow
# Thumb shrinks/grows according to the ratio of
# <number of visible lines> / <number of total lines>
thumb_weight = min(1, maxrow / max(1, ow_rows_max))
thumb_height = max(1, round(thumb_weight * maxrow))
# Thumb may only touch top/bottom if the first/last row is visible
top_weight = float(pos) / max(1, posmax)
top_height = int((maxrow - thumb_height) * top_weight)
if top_height == 0 and top_weight > 0:
top_height = 1
# Bottom part is remaining space
bottom_height = maxrow - thumb_height - top_height
assert thumb_height + top_height + bottom_height == maxrow
# Create scrollbar canvas
# Creating SolidCanvases of correct height may result in "cviews do not
# fill gaps in shard_tail!" or "cviews overflow gaps in shard_tail!"
# exceptions. Stacking the same SolidCanvas is a workaround.
# https://github.com/urwid/urwid/issues/226#issuecomment-437176837
top = urwid.SolidCanvas(self._trough_char, sb_width, 1)
thumb = urwid.SolidCanvas(self._thumb_char, sb_width, 1)
bottom = urwid.SolidCanvas(self._trough_char, sb_width, 1)
sb_canv = urwid.CanvasCombine(
[(top, None, False)] * top_height +
[(thumb, None, False)] * thumb_height +
[(bottom, None, False)] * bottom_height,
)
combinelist = [(ow_canv, None, True, ow_size[0]),
(sb_canv, None, False, sb_width)]
if self._scrollbar_side != SCROLLBAR_LEFT:
return urwid.CanvasJoin(combinelist)
else:
return urwid.CanvasJoin(reversed(combinelist))
@property
def scrollbar_width(self):
"""Columns the scrollbar uses"""
return max(1, self._scrollbar_width)
@scrollbar_width.setter
def scrollbar_width(self, width):
self._scrollbar_width = max(1, int(width))
self._invalidate()
@property
def scrollbar_side(self):
"""Where to display the scrollbar; must be 'left' or 'right'"""
return self._scrollbar_side
@scrollbar_side.setter
def scrollbar_side(self, side):
if side not in (SCROLLBAR_LEFT, SCROLLBAR_RIGHT):
raise ValueError('scrollbar_side must be "left" or "right", not %r' % side)
self._scrollbar_side = side
self._invalidate()
@property
def scrolling_base_widget(self):
"""Nearest `original_widget` that is compatible with the scrolling API"""
def orig_iter(w):
while hasattr(w, 'original_widget'):
w = w.original_widget
yield w
yield w
def is_scrolling_widget(w):
return hasattr(w, 'get_scrollpos') and hasattr(w, 'rows_max')
for w in orig_iter(self):
if is_scrolling_widget(w):
return w
raise ValueError('Not compatible to be wrapped by ScrollBar: %r' % w)
def keypress(self, size, key):
return self._original_widget.keypress(self._original_widget_size, key)
def mouse_event(self, size, event, button, col, row, focus):
ow = self._original_widget
ow_size = self._original_widget_size
handled = False
if hasattr(ow, 'mouse_event'):
handled = ow.mouse_event(ow_size, event, button, col, row, focus)
if not handled and hasattr(ow, 'set_scrollpos'):
if button == 4: # scroll wheel up
pos = ow.get_scrollpos(ow_size)
ow.set_scrollpos(pos - 1)
return True
elif button == 5: # scroll wheel down
pos = ow.get_scrollpos(ow_size)
ow.set_scrollpos(pos + 1)
return True
return False

View File

@ -1,12 +1,17 @@
import logging
import sys
import urwid
import webbrowser
from toot.utils import format_content
from toot.utils.language import language_name
from typing import Optional
from .entities import Status
from .scroll import Scrollable, ScrollBar
from .utils import highlight_hashtags, parse_datetime, highlight_keys
from .widgets import SelectableText, SelectableColumns
from toot.utils import format_content
from toot.utils.language import language_name
from toot.tui.utils import time_ago
logger = logging.getLogger("toot")
@ -16,41 +21,62 @@ class Timeline(urwid.Columns):
Displays a list of statuses to the left, and status details on the right.
"""
signals = [
"close", # Close thread
"compose", # Compose a new toot
"delete", # Delete own status
"favourite", # Favourite status
"focus", # Focus changed
"media", # Display media attachments
"menu", # Show a context menu
"next", # Fetch more statuses
"reblog", # Reblog status
"reply", # Compose a reply to a status
"source", # Show status source
"links", # Show status links
"thread", # Show thread for status
"translate", # Translate status
"save", # Save current timeline
"zoom", # Open status in scrollable popup window
"close", # Close thread
"compose", # Compose a new toot
"delete", # Delete own status
"favourite", # Favourite status
"focus", # Focus changed
"bookmark", # Bookmark status
"media", # Display media attachments
"menu", # Show a context menu
"next", # Fetch more statuses
"reblog", # Reblog status
"reply", # Compose a reply to a status
"source", # Show status source
"links", # Show status links
"thread", # Show thread for status
"translate", # Translate status
"save", # Save current timeline
"zoom", # Open status in scrollable popup window
"clear-screen", # Clear the screen (used internally)
]
def __init__(self, name, statuses, can_translate, focus=0, is_thread=False):
def __init__(self, name, statuses, can_translate, followed_tags=[], focus=0, is_thread=False):
self.name = name
self.is_thread = is_thread
self.statuses = statuses
self.can_translate = can_translate
self.status_list = self.build_status_list(statuses, focus=focus)
self.followed_tags = followed_tags
try:
self.status_details = StatusDetails(statuses[focus], is_thread, can_translate)
focused_status = statuses[focus]
except IndexError:
self.status_details = StatusDetails(None, is_thread, can_translate)
focused_status = None
self.status_details = StatusDetails(self, focused_status)
status_widget = self.wrap_status_details(self.status_details)
super().__init__([
("weight", 40, self.status_list),
("weight", 0, urwid.AttrWrap(urwid.SolidFill(""), "blue_selected")),
("weight", 60, urwid.Padding(self.status_details, left=1)),
("weight", 60, status_widget),
])
def wrap_status_details(self, status_details: "StatusDetails") -> urwid.Widget:
"""Wrap StatusDetails widget with a scollbar and footer."""
return urwid.Padding(
urwid.Frame(
body=ScrollBar(
Scrollable(urwid.Padding(status_details, right=1)),
thumb_char="\u2588",
trough_char="\u2591",
),
footer=self.get_option_text(status_details.status),
),
left=1
)
def build_status_list(self, statuses, focus):
items = [self.build_list_item(status) for status in statuses]
walker = urwid.SimpleFocusListWalker(items)
@ -67,9 +93,32 @@ class Timeline(urwid.Columns):
"green": "green_selected",
"yellow": "green_selected",
"cyan": "green_selected",
"red": "green_selected",
None: "green_selected",
})
def get_option_text(self, status: Optional[Status]) -> Optional[urwid.Text]:
if not status:
return None
options = [
"[B]oost",
"[D]elete" if status.is_mine else "",
"B[o]okmark",
"[F]avourite",
"[V]iew",
"[T]hread" if not self.is_thread else "",
"[L]inks",
"[R]eply",
"So[u]rce",
"[Z]oom",
"Tra[n]slate" if self.can_translate else "",
"[H]elp",
]
options = "\n" + " ".join(o for o in options if o)
options = highlight_keys(options, "white_bold", "cyan")
return urwid.Text(options)
def get_focused_status(self):
try:
return self.statuses[self.status_list.body.focus]
@ -100,8 +149,9 @@ class Timeline(urwid.Columns):
self.draw_status_details(status)
def draw_status_details(self, status):
self.status_details = StatusDetails(status, self.is_thread, self.can_translate)
self.contents[2] = urwid.Padding(self.status_details, left=1), ("weight", 60, False)
self.status_details = StatusDetails(self, status)
widget = self.wrap_status_details(self.status_details)
self.contents[2] = widget, ("weight", 60, False)
def keypress(self, size, key):
status = self.get_focused_status()
@ -156,6 +206,10 @@ class Timeline(urwid.Columns):
self.refresh_status_details()
return
if key in ("o", "O"):
self._emit("bookmark", status)
return
if key in ("l", "L"):
self._emit("links", status)
return
@ -176,6 +230,8 @@ class Timeline(urwid.Columns):
if key in ("v", "V"):
if status.original.url:
webbrowser.open(status.original.url)
# force a screen refresh; necessary with console browsers
self._emit("clear-screen")
return
if key in ("p", "P"):
@ -230,24 +286,16 @@ class Timeline(urwid.Columns):
index = self.get_status_index(status.id)
assert self.statuses[index].id == status.id # Sanity check
del(self.statuses[index])
del(self.status_list.body[index])
del self.statuses[index]
del self.status_list.body[index]
self.refresh_status_details()
class StatusDetails(urwid.Pile):
def __init__(self, status, in_thread, can_translate=False):
"""
Parameters
----------
status : Status
The status to render.
def __init__(self, timeline: Timeline, status: Optional[Status]):
self.status = status
self.followed_tags = timeline.followed_tags
in_thread : bool
Whether the status is rendered from a thread status list.
"""
self.in_thread = in_thread
self.can_translate = can_translate
reblogged_by = status.author if status and status.reblog else None
widget_list = list(self.content_generator(status.original, reblogged_by)
if status else ())
@ -275,26 +323,26 @@ class StatusDetails(urwid.Pile):
else:
content = status.translation if status.show_translation else status.data["content"]
for line in format_content(content):
yield ("pack", urwid.Text(highlight_hashtags(line)))
yield ("pack", urwid.Text(highlight_hashtags(line, self.followed_tags)))
media = status.data["media_attachments"]
if media:
for m in media:
yield ("pack", urwid.AttrMap(urwid.Divider("-"), "gray"))
yield ("pack", urwid.Text([("bold", "Media attachment"), " (", m["type"], ")"]))
if m["description"]:
yield ("pack", urwid.Text(m["description"]))
yield ("pack", urwid.Text(("link", m["url"])))
media = status.data["media_attachments"]
if media:
for m in media:
yield ("pack", urwid.AttrMap(urwid.Divider("-"), "gray"))
yield ("pack", urwid.Text([("bold", "Media attachment"), " (", m["type"], ")"]))
if m["description"]:
yield ("pack", urwid.Text(m["description"]))
yield ("pack", urwid.Text(("link", m["url"])))
poll = status.data.get("poll")
if poll:
yield ("pack", urwid.Divider())
yield ("pack", self.build_linebox(self.poll_generator(poll)))
poll = status.data.get("poll")
if poll:
yield ("pack", urwid.Divider())
yield ("pack", self.build_linebox(self.poll_generator(poll)))
card = status.data.get("card")
if card:
yield ("pack", urwid.Divider())
yield ("pack", self.build_linebox(self.card_generator(card)))
card = status.data.get("card")
if card:
yield ("pack", urwid.Divider())
yield ("pack", self.build_linebox(self.card_generator(card)))
application = status.data.get("application") or {}
application = application.get("name")
@ -307,34 +355,29 @@ class StatusDetails(urwid.Pile):
else None
)
visibility_colors = {
"public": "gray",
"unlisted": "white",
"private": "cyan",
"direct": "yellow"
}
visibility = status.visibility.title()
visibility_color = visibility_colors.get(status.visibility, "gray")
yield ("pack", urwid.Text([
("blue", f"{status.created_at.strftime('%Y-%m-%d %H:%M')} "),
("red" if status.bookmarked else "gray", "🠷 "),
("gray", f"{status.data['replies_count']} "),
("yellow" if status.reblogged else "gray", f"{status.data['reblogs_count']} "),
("yellow" if status.favourited else "gray", f"{status.data['favourites_count']}"),
("yellow", f" · Translated from {translated_from} ") if translated_from else "",
(visibility_color, f" · {visibility}"),
("yellow", f" · Translated from {translated_from} " if translated_from else ""),
("gray", f" · {application}" if application else ""),
]))
# Push things to bottom
yield ("weight", 1, urwid.SolidFill(" "))
options = [
"[B]oost",
"[D]elete" if status.is_mine else "",
"[F]avourite",
"[V]iew",
"[T]hread" if not self.in_thread else "",
"[L]inks",
"[R]eply",
"So[u]rce",
"[Z]oom",
"Tra[n]slate" if self.can_translate else "",
"[H]elp",
]
options = " ".join(o for o in options if o)
options = highlight_keys(options, "cyan_bold", "cyan")
yield ("pack", urwid.Text(options))
yield ("weight", 1, urwid.BoxAdapter(urwid.SolidFill(" "), 1))
def build_linebox(self, contents):
contents = urwid.Pile(list(contents))
@ -378,7 +421,17 @@ class StatusDetails(urwid.Pile):
class StatusListItem(SelectableColumns):
def __init__(self, status):
created_at = status.created_at.strftime("%Y-%m-%d %H:%M")
edited = status.data["edited_at"]
# TODO: hacky implementation to avoid creating conflicts for existing
# pull reuqests, refactor when merged.
created_at = (
time_ago(status.created_at).ljust(3, " ")
if "--relative-datetimes" in sys.argv
else status.created_at.strftime("%Y-%m-%d %H:%M")
)
edited_flag = "*" if edited else " "
favourited = ("yellow", "") if status.original.favourited else " "
reblogged = ("yellow", "") if status.original.reblogged else " "
is_reblog = ("cyan", "") if status.reblog else " "
@ -386,6 +439,7 @@ class StatusListItem(SelectableColumns):
return super().__init__([
("pack", SelectableText(("blue", created_at), wrap="clip")),
("pack", urwid.Text(("blue", edited_flag))),
("pack", urwid.Text(" ")),
("pack", urwid.Text(favourited)),
("pack", urwid.Text(" ")),

View File

@ -1,4 +1,5 @@
from html.parser import HTMLParser
import math
import os
import re
import shutil
@ -7,6 +8,11 @@ import subprocess
from datetime import datetime, timezone
HASHTAG_PATTERN = re.compile(r'(?<!\w)(#\w+)\b')
SECOND = 1
MINUTE = SECOND * 60
HOUR = MINUTE * 60
DAY = HOUR * 24
WEEK = DAY * 7
def parse_datetime(value):
@ -27,6 +33,28 @@ def parse_datetime(value):
return dttm.astimezone()
def time_ago(value: datetime) -> datetime:
now = datetime.now().astimezone()
delta = now.timestamp() - value.timestamp()
if (delta < 1):
return "now"
if (delta < 8 * DAY):
if (delta < MINUTE):
return f"{math.floor(delta / SECOND)}".rjust(2, " ") + "s"
if (delta < HOUR):
return f"{math.floor(delta / MINUTE)}".rjust(2, " ") + "m"
if (delta < DAY):
return f"{math.floor(delta / HOUR)}".rjust(2, " ") + "h"
return f"{math.floor(delta / DAY)}".rjust(2, " ") + "d"
if (delta < 53 * WEEK): # not exactly correct but good enough as a boundary
return f"{math.floor(delta / WEEK)}".rjust(2, " ") + "w"
return ">1y"
def highlight_keys(text, high_attr, low_attr=""):
"""
Takes a string and adds high_attr attribute to parts in square brackets,
@ -51,11 +79,19 @@ def highlight_keys(text, high_attr, low_attr=""):
return list(_gen())
def highlight_hashtags(line, attr="hashtag"):
return [
(attr, p) if p.startswith("#") else p
for p in re.split(HASHTAG_PATTERN, line)
]
def highlight_hashtags(line, followed_tags, attr="hashtag", followed_attr="followed_hashtag"):
hline = []
for p in re.split(HASHTAG_PATTERN, line):
if p.startswith("#"):
if p[1:].lower() in (t.lower() for t in followed_tags):
hline.append((followed_attr, p))
else:
hline.append((attr, p))
else:
hline.append(p)
return hline
def show_media(paths):

View File

@ -1,5 +1,3 @@
# -*- coding: utf-8 -*-
import os
import re
import socket
@ -41,7 +39,7 @@ def parse_html(html):
paragraphs = [re.split("<br */?>", p) for p in paragraphs if p]
# Convert each line in each paragraph to plain text:
return [[get_text(l) for l in p] for p in paragraphs]
return [[get_text(line) for line in p] for p in paragraphs]
def format_content(content):