1
0
mirror of https://github.com/ihabunek/toot synced 2024-12-22 23:08:17 +01:00

Delete old command implementations

This commit is contained in:
Ivan Habunek 2023-12-04 17:51:06 +01:00
parent 4dfab69f3b
commit 452b98d2ad
No known key found for this signature in database
GPG Key ID: F5F0623FF5EBCB3D
2 changed files with 0 additions and 1635 deletions

View File

@ -1,669 +0,0 @@
from itertools import chain
import json
import sys
import platform
from datetime import datetime, timedelta, timezone
from time import sleep, time
from toot import api, config, __version__
from toot.auth import login_interactive, login_browser_interactive, create_app_interactive
from toot.entities import Account, Instance, Notification, Status, from_dict
from toot.exceptions import ApiError, ConsoleError
from toot.output import (print_lists, print_out, print_instance, print_account, print_acct_list,
print_search_results, print_status, print_table, print_timeline, print_notifications,
print_tag_list, print_list_accounts, print_user_list)
from toot.utils import args_get_instance, delete_tmp_status_file, editor_input, multiline_input, EOF_KEY
from toot.utils.datetime import parse_datetime
def get_timeline_generator(app, user, args):
if len([arg for arg in [args.tag, args.list, args.public, args.account] if arg]) > 1:
raise ConsoleError("Only one of --public, --tag, --account, or --list can be used at one time.")
if args.local and not (args.public or args.tag):
raise ConsoleError("The --local option is only valid alongside --public or --tag.")
if args.instance and not (args.public or args.tag):
raise ConsoleError("The --instance option is only valid alongside --public or --tag.")
if args.public:
if args.instance:
return api.anon_public_timeline_generator(args.instance, local=args.local, limit=args.count)
else:
return api.public_timeline_generator(app, user, local=args.local, limit=args.count)
elif args.tag:
if args.instance:
return api.anon_tag_timeline_generator(args.instance, args.tag, limit=args.count)
else:
return api.tag_timeline_generator(app, user, args.tag, local=args.local, limit=args.count)
elif args.account:
return api.account_timeline_generator(app, user, args.account, limit=args.count)
elif args.list:
return api.timeline_list_generator(app, user, args.list, limit=args.count)
else:
return api.home_timeline_generator(app, user, limit=args.count)
def timeline(app, user, args, generator=None):
if not generator:
generator = get_timeline_generator(app, user, args)
while True:
try:
items = next(generator)
except StopIteration:
print_out("That's all folks.")
return
if args.reverse:
items = reversed(items)
statuses = [from_dict(Status, item) for item in items]
print_timeline(statuses)
if args.once or not sys.stdout.isatty():
break
char = input("\nContinue? [Y/n] ")
if char.lower() == "n":
break
def status(app, user, args):
response = api.fetch_status(app, user, args.status_id)
if args.json:
print(response.text)
else:
status = from_dict(Status, response.json())
print_status(status)
def thread(app, user, args):
context_response = api.context(app, user, args.status_id)
if args.json:
print(context_response.text)
else:
toot = api.fetch_status(app, user, args.status_id).json()
context = context_response.json()
statuses = chain(context["ancestors"], [toot], context["descendants"])
print_timeline(from_dict(Status, s) for s in statuses)
def post(app, user, args):
if args.editor and not sys.stdin.isatty():
raise ConsoleError("Cannot run editor if not in tty.")
if args.media and len(args.media) > 4:
raise ConsoleError("Cannot attach more than 4 files.")
media_ids = _upload_media(app, user, args)
status_text = _get_status_text(args.text, args.editor, args.media)
scheduled_at = _get_scheduled_at(args.scheduled_at, args.scheduled_in)
if not status_text and not media_ids:
raise ConsoleError("You must specify either text or media to post.")
response = api.post_status(
app, user, status_text,
visibility=args.visibility,
media_ids=media_ids,
sensitive=args.sensitive,
spoiler_text=args.spoiler_text,
in_reply_to_id=args.reply_to,
language=args.language,
scheduled_at=scheduled_at,
content_type=args.content_type,
poll_options=args.poll_option,
poll_expires_in=args.poll_expires_in,
poll_multiple=args.poll_multiple,
poll_hide_totals=args.poll_hide_totals,
)
if args.json:
print(response.text)
else:
status = response.json()
if "scheduled_at" in status:
scheduled_at = parse_datetime(status["scheduled_at"])
scheduled_at = datetime.strftime(scheduled_at, "%Y-%m-%d %H:%M:%S%z")
print_out(f"Toot scheduled for: <green>{scheduled_at}</green>")
else:
print_out(f"Toot posted: <green>{status['url']}")
delete_tmp_status_file()
def _get_status_text(text, editor, media):
isatty = sys.stdin.isatty()
if not text and not isatty:
text = sys.stdin.read().rstrip()
if isatty:
if editor:
text = editor_input(editor, text)
elif not text and not media:
print_out("Write or paste your toot. Press <yellow>{}</yellow> to post it.".format(EOF_KEY))
text = multiline_input()
return text
def _get_scheduled_at(scheduled_at, scheduled_in):
if scheduled_at:
return scheduled_at
if scheduled_in:
scheduled_at = datetime.now(timezone.utc) + timedelta(seconds=scheduled_in)
return scheduled_at.replace(microsecond=0).isoformat()
return None
def _upload_media(app, user, args):
# Match media to corresponding description and thumbnail
media = args.media or []
descriptions = args.description or []
thumbnails = args.thumbnail or []
uploaded_media = []
for idx, file in enumerate(media):
description = descriptions[idx].strip() if idx < len(descriptions) else None
thumbnail = thumbnails[idx] if idx < len(thumbnails) else None
result = _do_upload(app, user, file, description, thumbnail)
uploaded_media.append(result)
_wait_until_all_processed(app, user, uploaded_media)
return [m["id"] for m in uploaded_media]
def _wait_until_all_processed(app, user, uploaded_media):
"""
Media is uploaded asynchronously, and cannot be attached until the server
has finished processing it. This function waits for that to happen.
Once media is processed, it will have the URL populated.
"""
if all(m["url"] for m in uploaded_media):
return
# Timeout after waiting 1 minute
start_time = time()
timeout = 60
print_out("<dim>Waiting for media to finish processing...</dim>")
for media in uploaded_media:
_wait_until_processed(app, user, media, start_time, timeout)
def _wait_until_processed(app, user, media, start_time, timeout):
if media["url"]:
return
media = api.get_media(app, user, media["id"])
while not media["url"]:
sleep(1)
if time() > start_time + timeout:
raise ConsoleError(f"Media not processed by server after {timeout} seconds. Aborting.")
media = api.get_media(app, user, media["id"])
def delete(app, user, args):
response = api.delete_status(app, user, args.status_id)
if args.json:
print(response.text)
else:
print_out("<green>✓ Status deleted</green>")
def favourite(app, user, args):
response = api.favourite(app, user, args.status_id)
if args.json:
print(response.text)
else:
print_out("<green>✓ Status favourited</green>")
def unfavourite(app, user, args):
response = api.unfavourite(app, user, args.status_id)
if args.json:
print(response.text)
else:
print_out("<green>✓ Status unfavourited</green>")
def reblog(app, user, args):
response = api.reblog(app, user, args.status_id, visibility=args.visibility)
if args.json:
print(response.text)
else:
print_out("<green>✓ Status reblogged</green>")
def unreblog(app, user, args):
response = api.unreblog(app, user, args.status_id)
if args.json:
print(response.text)
else:
print_out("<green>✓ Status unreblogged</green>")
def pin(app, user, args):
response = api.pin(app, user, args.status_id)
if args.json:
print(response.text)
else:
print_out("<green>✓ Status pinned</green>")
def unpin(app, user, args):
response = api.unpin(app, user, args.status_id)
if args.json:
print(response.text)
else:
print_out("<green>✓ Status unpinned</green>")
def bookmark(app, user, args):
response = api.bookmark(app, user, args.status_id)
if args.json:
print(response.text)
else:
print_out("<green>✓ Status bookmarked</green>")
def unbookmark(app, user, args):
response = api.unbookmark(app, user, args.status_id)
if args.json:
print(response.text)
else:
print_out("<green>✓ Status unbookmarked</green>")
def bookmarks(app, user, args):
timeline(app, user, args, api.bookmark_timeline_generator(app, user, limit=args.count))
def reblogged_by(app, user, args):
response = api.reblogged_by(app, user, args.status_id)
if args.json:
print(response.text)
else:
headers = ["Account", "Display name"]
rows = [[a["acct"], a["display_name"]] for a in response.json()]
print_table(headers, rows)
def auth(app, user, args):
config_data = config.load_config()
if not config_data["users"]:
print_out("You are not logged in to any accounts")
return
active_user = config_data["active_user"]
print_out("Authenticated accounts:")
for uid, u in config_data["users"].items():
active_label = "ACTIVE" if active_user == uid else ""
print_out("* <green>{}</green> <yellow>{}</yellow>".format(uid, active_label))
path = config.get_config_file_path()
print_out("\nAuth tokens are stored in: <blue>{}</blue>".format(path))
def env(app, user, args):
print_out(f"toot {__version__}")
print_out(f"Python {sys.version}")
print_out(platform.platform())
def update_account(app, user, args):
options = [
args.avatar,
args.bot,
args.discoverable,
args.display_name,
args.header,
args.language,
args.locked,
args.note,
args.privacy,
args.sensitive,
]
if all(option is None for option in options):
raise ConsoleError("Please specify at least one option to update the account")
response = api.update_account(
app,
user,
avatar=args.avatar,
bot=args.bot,
discoverable=args.discoverable,
display_name=args.display_name,
header=args.header,
language=args.language,
locked=args.locked,
note=args.note,
privacy=args.privacy,
sensitive=args.sensitive,
)
if args.json:
print(response.text)
else:
print_out("<green>✓ Account updated</green>")
def login_cli(app, user, args):
base_url = args_get_instance(args.instance, args.scheme)
app = create_app_interactive(base_url)
login_interactive(app, args.email)
print_out()
print_out("<green>✓ Successfully logged in.</green>")
def login(app, user, args):
base_url = args_get_instance(args.instance, args.scheme)
app = create_app_interactive(base_url)
login_browser_interactive(app)
print_out()
print_out("<green>✓ Successfully logged in.</green>")
def logout(app, user, args):
user = config.load_user(args.account, throw=True)
config.delete_user(user)
print_out("<green>✓ User {} logged out</green>".format(config.user_id(user)))
def activate(app, user, args):
if not args.account:
print_out("Specify one of the following user accounts to activate:\n")
print_user_list(config.get_user_list())
return
user = config.load_user(args.account, throw=True)
config.activate_user(user)
print_out("<green>✓ User {} active</green>".format(config.user_id(user)))
def upload(app, user, args):
response = _do_upload(app, user, args.file, args.description, None)
msg = "Successfully uploaded media ID <yellow>{}</yellow>, type '<yellow>{}</yellow>'"
print_out()
print_out(msg.format(response['id'], response['type']))
print_out("URL: <green>{}</green>".format(response['url']))
print_out("Preview URL: <green>{}</green>".format(response['preview_url']))
def search(app, user, args):
response = api.search(app, user, args.query, args.resolve)
if args.json:
print(response.text)
else:
print_search_results(response.json())
def _do_upload(app, user, file, description, thumbnail):
print_out("Uploading media: <green>{}</green>".format(file.name))
return api.upload_media(app, user, file, description=description, thumbnail=thumbnail)
def follow(app, user, args):
account = api.find_account(app, user, args.account)
response = api.follow(app, user, account["id"])
if args.json:
print(response.text)
else:
print_out(f"<green>✓ You are now following {args.account}</green>")
def unfollow(app, user, args):
account = api.find_account(app, user, args.account)
response = api.unfollow(app, user, account["id"])
if args.json:
print(response.text)
else:
print_out(f"<green>✓ You are no longer following {args.account}</green>")
def following(app, user, args):
account = args.account or user.username
account = api.find_account(app, user, account)
accounts = api.following(app, user, account["id"])
if args.json:
print(json.dumps(accounts))
else:
print_acct_list(accounts)
def followers(app, user, args):
account = args.account or user.username
account = api.find_account(app, user, account)
accounts = api.followers(app, user, account["id"])
if args.json:
print(json.dumps(accounts))
else:
print_acct_list(accounts)
def tags_follow(app, user, args):
tn = args.tag_name if not args.tag_name.startswith("#") else args.tag_name[1:]
api.follow_tag(app, user, tn)
print_out("<green>✓ You are now following #{}</green>".format(tn))
def tags_unfollow(app, user, args):
tn = args.tag_name if not args.tag_name.startswith("#") else args.tag_name[1:]
api.unfollow_tag(app, user, tn)
print_out("<green>✓ You are no longer following #{}</green>".format(tn))
def tags_followed(app, user, args):
response = api.followed_tags(app, user)
print_tag_list(response)
def lists(app, user, args):
lists = api.get_lists(app, user)
if lists:
print_lists(lists)
else:
print_out("You have no lists defined.")
def list_accounts(app, user, args):
list_id = _get_list_id(app, user, args)
response = api.get_list_accounts(app, user, list_id)
print_list_accounts(response)
def list_create(app, user, args):
api.create_list(app, user, title=args.title, replies_policy=args.replies_policy)
print_out(f"<green>✓ List \"{args.title}\" created.</green>")
def list_delete(app, user, args):
list_id = _get_list_id(app, user, args)
api.delete_list(app, user, list_id)
print_out(f"<green>✓ List \"{args.title if args.title else args.id}\"</green> <red>deleted.</red>")
def list_add(app, user, args):
list_id = _get_list_id(app, user, args)
account = api.find_account(app, user, args.account)
try:
api.add_accounts_to_list(app, user, list_id, [account['id']])
except Exception as ex:
# if we failed to add the account, try to give a
# more specific error message than "record not found"
my_accounts = api.followers(app, user, account['id'])
found = False
if my_accounts:
for my_account in my_accounts:
if my_account['id'] == account['id']:
found = True
break
if found is False:
print_out(f"<red>You must follow @{account['acct']} before adding this account to a list.</red>")
else:
print_out(f"<red>{ex}</red>")
return
print_out(f"<green>✓ Added account \"{args.account}\"</green>")
def list_remove(app, user, args):
list_id = _get_list_id(app, user, args)
account = api.find_account(app, user, args.account)
api.remove_accounts_from_list(app, user, list_id, [account['id']])
print_out(f"<green>✓ Removed account \"{args.account}\"</green>")
def _get_list_id(app, user, args):
list_id = args.id or api.find_list_id(app, user, args.title)
if not list_id:
raise ConsoleError("List not found")
return list_id
def mute(app, user, args):
account = api.find_account(app, user, args.account)
response = api.mute(app, user, account['id'])
if args.json:
print(response.text)
else:
print_out("<green>✓ You have muted {}</green>".format(args.account))
def unmute(app, user, args):
account = api.find_account(app, user, args.account)
response = api.unmute(app, user, account['id'])
if args.json:
print(response.text)
else:
print_out("<green>✓ {} is no longer muted</green>".format(args.account))
def muted(app, user, args):
response = api.muted(app, user)
if args.json:
print(json.dumps(response))
else:
if len(response) > 0:
print("Muted accounts:")
print_acct_list(response)
else:
print("No accounts muted")
def block(app, user, args):
account = api.find_account(app, user, args.account)
response = api.block(app, user, account['id'])
if args.json:
print(response.text)
else:
print_out("<green>✓ You are now blocking {}</green>".format(args.account))
def unblock(app, user, args):
account = api.find_account(app, user, args.account)
response = api.unblock(app, user, account['id'])
if args.json:
print(response.text)
else:
print_out("<green>✓ {} is no longer blocked</green>".format(args.account))
def blocked(app, user, args):
response = api.blocked(app, user)
if args.json:
print(json.dumps(response))
else:
if len(response) > 0:
print("Blocked accounts:")
print_acct_list(response)
else:
print("No accounts blocked")
def whoami(app, user, args):
response = api.verify_credentials(app, user)
if args.json:
print(response.text)
else:
account = from_dict(Account, response.json())
print_account(account)
def whois(app, user, args):
account = api.find_account(app, user, args.account)
# Here it's not possible to avoid parsing json since it's needed to find the account.
if args.json:
print(json.dumps(account))
else:
account = from_dict(Account, account)
print_account(account)
def instance(app, user, args):
default = app.base_url if app else None
base_url = args_get_instance(args.instance, args.scheme, default)
if not base_url:
raise ConsoleError("Please specify an instance.")
try:
response = api.get_instance(base_url)
except ApiError:
raise ConsoleError(
f"Instance not found at {base_url}.\n"
"The given domain probably does not host a Mastodon instance."
)
if args.json:
print(response.text)
else:
instance = from_dict(Instance, response.json())
print_instance(instance)
def notifications(app, user, args):
if args.clear:
api.clear_notifications(app, user)
print_out("<green>Cleared notifications</green>")
return
exclude = []
if args.mentions:
# Filter everything except mentions
# https://docs.joinmastodon.org/methods/notifications/
exclude = ["follow", "favourite", "reblog", "poll", "follow_request"]
notifications = api.get_notifications(app, user, exclude_types=exclude)
if not notifications:
print_out("<yellow>No notification</yellow>")
return
if args.reverse:
notifications = reversed(notifications)
notifications = [from_dict(Notification, n) for n in notifications]
print_notifications(notifications)
def tui(app, user, args):
from .tui.app import TUI
TUI.create(app, user, args).run()

View File

@ -1,966 +0,0 @@
import logging
import os
import re
import shutil
import sys
from argparse import ArgumentParser, FileType, ArgumentTypeError, Action
from collections import namedtuple
from itertools import chain
from toot import config, commands, CLIENT_NAME, CLIENT_WEBSITE, __version__, settings
from toot.exceptions import ApiError, ConsoleError
from toot.output import print_out, print_err
from toot.settings import get_setting
VISIBILITY_CHOICES = ["public", "unlisted", "private", "direct"]
VISIBILITY_CHOICES_STR = ", ".join(f"'{v}'" for v in VISIBILITY_CHOICES)
PRIVACY_CHOICES = ["public", "unlisted", "private"]
PRIVACY_CHOICES_STR = ", ".join(f"'{v}'" for v in PRIVACY_CHOICES)
class BooleanOptionalAction(Action):
"""
Backported from argparse. This action is available since Python 3.9.
https://github.com/python/cpython/blob/3.11/Lib/argparse.py
"""
def __init__(self,
option_strings,
dest,
default=None,
type=None,
choices=None,
required=False,
help=None,
metavar=None):
_option_strings = []
for option_string in option_strings:
_option_strings.append(option_string)
if option_string.startswith('--'):
option_string = '--no-' + option_string[2:]
_option_strings.append(option_string)
super().__init__(
option_strings=_option_strings,
dest=dest,
nargs=0,
default=default,
type=type,
choices=choices,
required=required,
help=help,
metavar=metavar)
def __call__(self, parser, namespace, values, option_string=None):
if option_string in self.option_strings:
setattr(namespace, self.dest, not option_string.startswith('--no-'))
def format_usage(self):
return ' | '.join(self.option_strings)
def get_default_visibility():
return os.getenv("TOOT_POST_VISIBILITY", "public")
def language(value):
"""Validates the language parameter"""
if len(value) != 2:
raise ArgumentTypeError(
"Invalid language. Expected a 2 letter abbreviation according to "
"the ISO 639-1 standard."
)
return value
def visibility(value):
"""Validates the visibility parameter"""
if value not in VISIBILITY_CHOICES:
raise ValueError("Invalid visibility value")
return value
def privacy(value):
"""Validates the privacy parameter"""
if value not in PRIVACY_CHOICES:
raise ValueError(f"Invalid privacy value. Expected one of {PRIVACY_CHOICES_STR}.")
return value
def timeline_count(value):
n = int(value)
if not 0 < n <= 20:
raise ArgumentTypeError("Number of toots should be between 1 and 20.")
return n
DURATION_UNITS = {
"m": 60,
"h": 60 * 60,
"d": 60 * 60 * 24,
}
DURATION_EXAMPLES = """e.g. "1 day", "2 hours 30 minutes", "5 minutes 30
seconds" or any combination of above. Shorthand: "1d", "2h30m", "5m30s\""""
def duration(value: str):
match = re.match(r"""^
(([0-9]+)\s*(days|day|d))?\s*
(([0-9]+)\s*(hours|hour|h))?\s*
(([0-9]+)\s*(minutes|minute|m))?\s*
(([0-9]+)\s*(seconds|second|s))?\s*
$""", value, re.X)
if not match:
raise ArgumentTypeError(f"Invalid duration: {value}")
days = match.group(2)
hours = match.group(5)
minutes = match.group(8)
seconds = match.group(11)
days = int(match.group(2) or 0) * 60 * 60 * 24
hours = int(match.group(5) or 0) * 60 * 60
minutes = int(match.group(8) or 0) * 60
seconds = int(match.group(11) or 0)
duration = days + hours + minutes + seconds
if duration == 0:
raise ArgumentTypeError("Empty duration")
return duration
def editor(value):
if not value:
raise ArgumentTypeError(
"Editor not specified in --editor option and $EDITOR environment "
"variable not set."
)
# Check editor executable exists
exe = shutil.which(value)
if not exe:
raise ArgumentTypeError("Editor `{}` not found".format(value))
return exe
Command = namedtuple("Command", ["name", "description", "require_auth", "arguments"])
# Arguments added to every command
common_args = [
(["--no-color"], {
"help": "don't use ANSI colors in output",
"action": 'store_true',
"default": False,
}),
(["--quiet"], {
"help": "don't write to stdout on success",
"action": 'store_true',
"default": False,
}),
(["--debug"], {
"help": "show debug log in console",
"action": 'store_true',
"default": False,
}),
(["--verbose"], {
"help": "show extra detail in debug log; used with --debug",
"action": 'store_true',
"default": False,
}),
]
# Arguments added to commands which require authentication
common_auth_args = [
(["-u", "--using"], {
"help": "the account to use, overrides active account",
}),
]
account_arg = (["account"], {
"help": "account name, e.g. 'Gargron@mastodon.social'",
})
optional_account_arg = (["account"], {
"nargs": "?",
"help": "account name, e.g. 'Gargron@mastodon.social'",
})
instance_arg = (["-i", "--instance"], {
"type": str,
"help": 'mastodon instance to log into e.g. "mastodon.social"',
})
email_arg = (["-e", "--email"], {
"type": str,
"help": 'email address to log in with',
})
scheme_arg = (["--disable-https"], {
"help": "disable HTTPS and use insecure HTTP",
"dest": "scheme",
"default": "https",
"action": "store_const",
"const": "http",
})
status_id_arg = (["status_id"], {
"help": "ID of the status",
"type": str,
})
visibility_arg = (["-v", "--visibility"], {
"type": visibility,
"default": get_default_visibility(),
"help": f"Post visibility. One of: {VISIBILITY_CHOICES_STR}. Defaults to "
f"'{get_default_visibility()}' which can be overridden by setting "
"the TOOT_POST_VISIBILITY environment variable",
})
tag_arg = (["tag_name"], {
"type": str,
"help": "tag name, e.g. Caturday, or \"#Caturday\"",
})
json_arg = (["--json"], {
"action": "store_true",
"default": False,
"help": "print json instead of plaintext",
})
# Arguments for selecting a timeline (see `toot.commands.get_timeline_generator`)
common_timeline_args = [
(["-p", "--public"], {
"action": "store_true",
"default": False,
"help": "show public timeline (does not require auth)",
}),
(["-t", "--tag"], {
"type": str,
"help": "show hashtag timeline (does not require auth)",
}),
(["-a", "--account"], {
"type": str,
"help": "show timeline for the given account",
}),
(["-l", "--local"], {
"action": "store_true",
"default": False,
"help": "show only statuses from local instance (public and tag timelines only)",
}),
(["-i", "--instance"], {
"type": str,
"help": "mastodon instance from which to read (public and tag timelines only)",
}),
(["--list"], {
"type": str,
"help": "show timeline for given list.",
}),
]
timeline_and_bookmark_args = [
(["-c", "--count"], {
"type": timeline_count,
"help": "number of toots to show per page (1-20, default 10).",
"default": 10,
}),
(["-r", "--reverse"], {
"action": "store_true",
"default": False,
"help": "Reverse the order of the shown timeline (to new posts at the bottom)",
}),
(["-1", "--once"], {
"action": "store_true",
"default": False,
"help": "Only show the first <count> toots, do not prompt to continue.",
}),
]
timeline_args = common_timeline_args + timeline_and_bookmark_args
AUTH_COMMANDS = [
Command(
name="login",
description="Log into a mastodon instance using your browser (recommended)",
arguments=[instance_arg, scheme_arg],
require_auth=False,
),
Command(
name="login_cli",
description="Log in from the console, does NOT support two factor authentication",
arguments=[instance_arg, email_arg, scheme_arg],
require_auth=False,
),
Command(
name="activate",
description="Switch between logged in accounts.",
arguments=[optional_account_arg],
require_auth=False,
),
Command(
name="logout",
description="Log out, delete stored access keys",
arguments=[account_arg],
require_auth=False,
),
Command(
name="auth",
description="Show logged in accounts and instances",
arguments=[],
require_auth=False,
),
Command(
name="env",
description="Print environment information for inclusion in bug reports.",
arguments=[],
require_auth=False,
),
Command(
name="update_account",
description="Update your account details",
arguments=[
(["--display-name"], {
"type": str,
"help": "The display name to use for the profile.",
}),
(["--note"], {
"type": str,
"help": "The account bio.",
}),
(["--avatar"], {
"type": FileType("rb"),
"help": "Path to the avatar image to set.",
}),
(["--header"], {
"type": FileType("rb"),
"help": "Path to the header image to set.",
}),
(["--bot"], {
"action": BooleanOptionalAction,
"help": "Whether the account has a bot flag.",
}),
(["--discoverable"], {
"action": BooleanOptionalAction,
"help": "Whether the account should be shown in the profile directory.",
}),
(["--locked"], {
"action": BooleanOptionalAction,
"help": "Whether manual approval of follow requests is required.",
}),
(["--privacy"], {
"type": privacy,
"help": f"Default post privacy for authored statuses. One of: {PRIVACY_CHOICES_STR}."
}),
(["--sensitive"], {
"action": BooleanOptionalAction,
"help": "Whether to mark authored statuses as sensitive by default."
}),
(["--language"], {
"type": language,
"help": "Default language to use for authored statuses (ISO 639-1)."
}),
json_arg,
],
require_auth=True,
),
]
TUI_COMMANDS = [
Command(
name="tui",
description="Launches the toot terminal user interface",
arguments=[
(["--relative-datetimes"], {
"action": "store_true",
"default": False,
"help": "Show relative datetimes in status list.",
}),
],
require_auth=True,
),
]
READ_COMMANDS = [
Command(
name="whoami",
description="Display logged in user details",
arguments=[json_arg],
require_auth=True,
),
Command(
name="whois",
description="Display account details",
arguments=[
(["account"], {
"help": "account name or numeric ID"
}),
json_arg,
],
require_auth=True,
),
Command(
name="notifications",
description="Notifications for logged in user",
arguments=[
(["--clear"], {
"help": "delete all notifications from the server",
"action": 'store_true',
"default": False,
}),
(["-r", "--reverse"], {
"action": "store_true",
"default": False,
"help": "Reverse the order of the shown notifications (newest on top)",
}),
(["-m", "--mentions"], {
"action": "store_true",
"default": False,
"help": "Only print mentions",
})
],
require_auth=True,
),
Command(
name="instance",
description="Display instance details",
arguments=[
(["instance"], {
"help": "instance domain (e.g. 'mastodon.social') or blank to use current",
"nargs": "?",
}),
scheme_arg,
json_arg,
],
require_auth=False,
),
Command(
name="search",
description="Search for users or hashtags",
arguments=[
(["query"], {
"help": "the search query",
}),
(["-r", "--resolve"], {
"action": 'store_true',
"default": False,
"help": "Resolve non-local accounts",
}),
json_arg,
],
require_auth=True,
),
Command(
name="thread",
description="Show toot thread items",
arguments=[
(["status_id"], {
"help": "Show thread for toot.",
}),
json_arg,
],
require_auth=True,
),
Command(
name="status",
description="Show a single status",
arguments=[
(["status_id"], {
"help": "ID of the status to show.",
}),
json_arg,
],
require_auth=True,
),
Command(
name="timeline",
description="Show recent items in a timeline (home by default)",
arguments=timeline_args,
require_auth=True,
),
Command(
name="bookmarks",
description="Show bookmarked posts",
arguments=timeline_and_bookmark_args,
require_auth=True,
),
]
POST_COMMANDS = [
Command(
name="post",
description="Post a status text to your timeline",
arguments=[
(["text"], {
"help": "The status text to post.",
"nargs": "?",
}),
(["-m", "--media"], {
"action": "append",
"type": FileType("rb"),
"help": "path to the media file to attach (specify multiple "
"times to attach up to 4 files)"
}),
(["-d", "--description"], {
"action": "append",
"type": str,
"help": "plain-text description of the media for accessibility "
"purposes, one per attached media"
}),
(["--thumbnail"], {
"action": "append",
"type": FileType("rb"),
"help": "path to an image file to serve as media thumbnail, "
"one per attached media"
}),
visibility_arg,
(["-s", "--sensitive"], {
"action": 'store_true',
"default": False,
"help": "mark the media as NSFW",
}),
(["-p", "--spoiler-text"], {
"type": str,
"help": "text to be shown as a warning before the actual content",
}),
(["-r", "--reply-to"], {
"type": str,
"help": "local ID of the status you want to reply to",
}),
(["-l", "--language"], {
"type": language,
"help": "ISO 639-1 language code of the toot, to skip automatic detection",
}),
(["-e", "--editor"], {
"type": editor,
"nargs": "?",
"const": os.getenv("EDITOR", ""), # option given without value
"help": "Specify an editor to compose your toot, "
"defaults to editor defined in $EDITOR env variable.",
}),
(["--scheduled-at"], {
"type": str,
"help": "ISO 8601 Datetime at which to schedule a status. Must "
"be at least 5 minutes in the future.",
}),
(["--scheduled-in"], {
"type": duration,
"help": f"""Schedule the toot to be posted after a given amount
of time, {DURATION_EXAMPLES}. Must be at least 5
minutes.""",
}),
(["-t", "--content-type"], {
"type": str,
"help": "MIME type for the status text (not supported on all instances)",
}),
(["--poll-option"], {
"action": "append",
"type": str,
"help": "Possible answer to the poll"
}),
(["--poll-expires-in"], {
"type": duration,
"help": f"""Duration that the poll should be open,
{DURATION_EXAMPLES}. Defaults to 24h.""",
"default": 24 * 60 * 60,
}),
(["--poll-multiple"], {
"action": "store_true",
"default": False,
"help": "Allow multiple answers to be selected."
}),
(["--poll-hide-totals"], {
"action": "store_true",
"default": False,
"help": "Hide vote counts until the poll ends. Defaults to false."
}),
json_arg,
],
require_auth=True,
),
Command(
name="upload",
description="Upload an image or video file",
arguments=[
(["file"], {
"help": "Path to the file to upload",
"type": FileType('rb')
}),
(["-d", "--description"], {
"type": str,
"help": "plain-text description of the media for accessibility purposes"
}),
],
require_auth=True,
),
]
STATUS_COMMANDS = [
Command(
name="delete",
description="Delete a status",
arguments=[status_id_arg, json_arg],
require_auth=True,
),
Command(
name="favourite",
description="Favourite a status",
arguments=[status_id_arg, json_arg],
require_auth=True,
),
Command(
name="unfavourite",
description="Unfavourite a status",
arguments=[status_id_arg, json_arg],
require_auth=True,
),
Command(
name="reblog",
description="Reblog a status",
arguments=[status_id_arg, visibility_arg, json_arg],
require_auth=True,
),
Command(
name="unreblog",
description="Unreblog a status",
arguments=[status_id_arg, json_arg],
require_auth=True,
),
Command(
name="reblogged_by",
description="Show accounts that reblogged the status",
arguments=[status_id_arg, json_arg],
require_auth=False,
),
Command(
name="pin",
description="Pin a status",
arguments=[status_id_arg, json_arg],
require_auth=True,
),
Command(
name="unpin",
description="Unpin a status",
arguments=[status_id_arg, json_arg],
require_auth=True,
),
Command(
name="bookmark",
description="Bookmark a status",
arguments=[status_id_arg, json_arg],
require_auth=True,
),
Command(
name="unbookmark",
description="Unbookmark a status",
arguments=[status_id_arg, json_arg],
require_auth=True,
),
]
ACCOUNTS_COMMANDS = [
Command(
name="follow",
description="Follow an account",
arguments=[account_arg, json_arg],
require_auth=True,
),
Command(
name="unfollow",
description="Unfollow an account",
arguments=[account_arg, json_arg],
require_auth=True,
),
Command(
name="following",
description="List accounts followed by the given account, " +
"or your account if no account given",
arguments=[optional_account_arg, json_arg],
require_auth=True,
),
Command(
name="followers",
description="List accounts following the given account, " +
"or your account if no account given",
arguments=[optional_account_arg, json_arg],
require_auth=True,
),
Command(
name="mute",
description="Mute an account",
arguments=[account_arg, json_arg],
require_auth=True,
),
Command(
name="unmute",
description="Unmute an account",
arguments=[account_arg, json_arg],
require_auth=True,
),
Command(
name="muted",
description="List muted accounts",
arguments=[json_arg],
require_auth=True,
),
Command(
name="block",
description="Block an account",
arguments=[account_arg, json_arg],
require_auth=True,
),
Command(
name="unblock",
description="Unblock an account",
arguments=[account_arg, json_arg],
require_auth=True,
),
Command(
name="blocked",
description="List blocked accounts",
arguments=[json_arg],
require_auth=True,
),
]
TAG_COMMANDS = [
Command(
name="tags_followed",
description="List hashtags you follow",
arguments=[],
require_auth=True,
),
Command(
name="tags_follow",
description="Follow a hashtag",
arguments=[tag_arg],
require_auth=True,
),
Command(
name="tags_unfollow",
description="Unfollow a hashtag",
arguments=[tag_arg],
require_auth=True,
),
]
LIST_COMMANDS = [
Command(
name="lists",
description="List all lists",
arguments=[],
require_auth=True,
),
Command(
name="list_accounts",
description="List the accounts in a list",
arguments=[
(["--id"], {
"type": str,
"help": "ID of the list"
}),
(["title"], {
"type": str,
"nargs": "?",
"help": "title of the list"
}),
],
require_auth=True,
),
Command(
name="list_create",
description="Create a list",
arguments=[
(["title"], {
"type": str,
"help": "title of the list"
}),
(["--replies-policy"], {
"type": str,
"help": "replies policy: 'followed', 'list', or 'none' (defaults to 'none')"
}),
],
require_auth=True,
),
Command(
name="list_delete",
description="Delete a list",
arguments=[
(["--id"], {
"type": str,
"help": "ID of the list"
}),
(["title"], {
"type": str,
"nargs": "?",
"help": "title of the list"
}),
],
require_auth=True,
),
Command(
name="list_add",
description="Add account to list",
arguments=[
(["--id"], {
"type": str,
"help": "ID of the list"
}),
(["title"], {
"type": str,
"nargs": "?",
"help": "title of the list"
}),
(["account"], {
"type": str,
"help": "Account to add"
}),
],
require_auth=True,
),
Command(
name="list_remove",
description="Remove account from list",
arguments=[
(["--id"], {
"type": str,
"help": "ID of the list"
}),
(["title"], {
"type": str,
"nargs": "?",
"help": "title of the list"
}),
(["account"], {
"type": str,
"help": "Account to remove"
}),
],
require_auth=True,
),
]
COMMAND_GROUPS = [
("Authentication", AUTH_COMMANDS),
("TUI", TUI_COMMANDS),
("Read", READ_COMMANDS),
("Post", POST_COMMANDS),
("Status", STATUS_COMMANDS),
("Accounts", ACCOUNTS_COMMANDS),
("Hashtags", TAG_COMMANDS),
("Lists", LIST_COMMANDS),
]
COMMANDS = list(chain(*[commands for _, commands in COMMAND_GROUPS]))
def print_usage():
max_name_len = max(len(name) for name, _ in COMMAND_GROUPS)
print_out("<green>{}</green>".format(CLIENT_NAME))
print_out("<blue>v{}</blue>".format(__version__))
for name, cmds in COMMAND_GROUPS:
print_out("")
print_out(name + ":")
for cmd in cmds:
cmd_name = cmd.name.ljust(max_name_len + 2)
print_out(" <yellow>toot {}</yellow> {}".format(cmd_name, cmd.description))
print_out("")
print_out("To get help for each command run:")
print_out(" <yellow>toot \\<command> --help</yellow>")
print_out("")
print_out("<green>{}</green>".format(CLIENT_WEBSITE))
def get_argument_parser(name, command):
parser = ArgumentParser(
prog='toot %s' % name,
description=command.description,
epilog=CLIENT_WEBSITE)
combined_args = command.arguments + common_args
if command.require_auth:
combined_args += common_auth_args
defaults = get_setting(f"commands.{name}", dict, {})
for args, kwargs in combined_args:
# Set default value from settings if exists
default = get_default_value(defaults, args)
if default is not None:
kwargs["default"] = default
parser.add_argument(*args, **kwargs)
return parser
def get_default_value(defaults, args):
# Hacky way to determine command name from argparse args
name = args[-1].lstrip("-").replace("-", "_")
return defaults.get(name)
def run_command(app, user, name, args):
command = next((c for c in COMMANDS if c.name == name), None)
if not command:
print_err(f"Unknown command '{name}'")
print_out("Run <yellow>toot --help</yellow> to show a list of available commands.")
return
parser = get_argument_parser(name, command)
parsed_args = parser.parse_args(args)
# Override the active account if 'using' option is given
if command.require_auth and parsed_args.using:
user, app = config.get_user_app(parsed_args.using)
if not user or not app:
raise ConsoleError("User '{}' not found".format(parsed_args.using))
if command.require_auth and (not user or not app):
print_err("This command requires that you are logged in.")
print_err("Please run `toot login` first.")
return
fn = commands.__dict__.get(name)
if not fn:
raise NotImplementedError("Command '{}' does not have an implementation.".format(name))
return fn(app, user, parsed_args)
def main():
if settings.get_debug():
filename = settings.get_debug_file()
logging.basicConfig(level=logging.DEBUG, filename=filename)
logging.getLogger("urllib3").setLevel(logging.INFO)
command_name = sys.argv[1] if len(sys.argv) > 1 else None
args = sys.argv[2:]
if not command_name or command_name == "--help":
return print_usage()
user, app = config.get_active_user_app()
try:
run_command(app, user, command_name, args)
except (ConsoleError, ApiError) as e:
print_err(str(e))
sys.exit(1)
except KeyboardInterrupt:
pass