# -*- coding: utf-8 -*- from bs4 import BeautifulSoup from datetime import datetime from itertools import zip_longest from itertools import chain from textwrap import TextWrapper from toot import api, config from toot.auth import login_interactive, login_browser_interactive, create_app_interactive from toot.exceptions import ConsoleError, NotFoundError from toot.output import print_out, print_err, print_instance, print_account, print_search_results from toot.utils import assert_domain_exists def _print_timeline(item): def wrap_text(text, width): wrapper = TextWrapper(width=width, break_long_words=False, break_on_hyphens=False) return chain(*[wrapper.wrap(l) for l in text.split("\n")]) def timeline_rows(item): name = item['name'] time = item['time'].strftime('%Y-%m-%d %H:%M%Z') left_column = [name, time] if 'reblogged' in item: left_column.append(item['reblogged']) text = item['text'] right_column = wrap_text(text, 80) return zip_longest(left_column, right_column, fillvalue="") for left, right in timeline_rows(item): print_out("{:30} │ {}".format(left, right)) def _parse_timeline(item): content = item['reblog']['content'] if item['reblog'] else item['content'] reblogged = item['reblog']['account']['username'] if item['reblog'] else "" name = item['account']['display_name'] + " @" + item['account']['username'] soup = BeautifulSoup(content, "html.parser") text = soup.get_text().replace(''', "'") time = datetime.strptime(item['created_at'], "%Y-%m-%dT%H:%M:%S.%fZ") return { "name": name, "text": text, "time": time, "reblogged": reblogged, } def timeline(app, user, args): items = api.timeline_home(app, user) parsed_items = [_parse_timeline(t) for t in items] print_out("─" * 31 + "┬" + "─" * 88) for item in parsed_items: _print_timeline(item) print_out("─" * 31 + "┼" + "─" * 88) def curses(app, user, args): from toot.ui.app import TimelineApp if not args.public and (not app or not user): raise ConsoleError("You must be logged in to view the home timeline.") if args.public: instance = args.instance or app.instance generator = api.public_timeline_generator(instance) else: generator = api.home_timeline_generator(app, user) TimelineApp(generator).run() def post(app, user, args): if args.media: media = _do_upload(app, user, args.media) media_ids = [media['id']] else: media = None media_ids = None if media and not args.text: args.text = media['text_url'] if not args.text: raise ConsoleError("You must specify either text or media to post.") response = api.post_status(app, user, args.text, args.visibility, media_ids) print_out("Toot posted: {}".format(response.get('url'))) def auth(app, user, args): config_data = config.load_config() if not config_data["users"]: print_out("You are not logged in to any accounts") return active_user = config_data["active_user"] print_out("Authenticated accounts:") for uid, u in config_data["users"].items(): active_label = "ACTIVE" if active_user == uid else "" print_out("* {} {}".format(uid, active_label)) path = config.get_config_file_path() print_out("\nAuth tokens are stored in: {}".format(path)) def login(app, user, args): app = create_app_interactive(instance=args.instance) login_interactive(app, args.email) print_out() print_out("✓ Successfully logged in.") def login_browser(app, user, args): app = create_app_interactive(instance=args.instance) login_browser_interactive(app) print_out() print_out("✓ Successfully logged in.") def logout(app, user, args): user = config.load_user(args.account, throw=True) config.delete_user(user) print_out("✓ User {} logged out".format(config.user_id(user))) def activate(app, user, args): user = config.load_user(args.account, throw=True) config.activate_user(user) print_out("✓ User {} active".format(config.user_id(user))) def upload(app, user, args): response = _do_upload(app, user, args.file) msg = "Successfully uploaded media ID {}, type '{}'" print_out() print_out(msg.format(response['id'], response['type'])) print_out("Original URL: {}".format(response['url'])) print_out("Preview URL: {}".format(response['preview_url'])) print_out("Text URL: {}".format(response['text_url'])) def search(app, user, args): response = api.search(app, user, args.query, args.resolve) print_search_results(response) def _do_upload(app, user, file): print_out("Uploading media: {}".format(file.name)) return api.upload_media(app, user, file) def _find_account(app, user, account_name): """For a given account name, returns the Account object. Raises an exception if not found. """ if not account_name: raise ConsoleError("Empty account name given") accounts = api.search_accounts(app, user, account_name) if account_name[0] == "@": account_name = account_name[1:] for account in accounts: if account['acct'] == account_name: return account raise ConsoleError("Account not found") def follow(app, user, args): account = _find_account(app, user, args.account) api.follow(app, user, account['id']) print_out("✓ You are now following {}".format(args.account)) def unfollow(app, user, args): account = _find_account(app, user, args.account) api.unfollow(app, user, account['id']) print_out("✓ You are no longer following {}".format(args.account)) def mute(app, user, args): account = _find_account(app, user, args.account) api.mute(app, user, account['id']) print_out("✓ You have muted {}".format(args.account)) def unmute(app, user, args): account = _find_account(app, user, args.account) api.unmute(app, user, account['id']) print_out("✓ {} is no longer muted".format(args.account)) def block(app, user, args): account = _find_account(app, user, args.account) api.block(app, user, account['id']) print_out("✓ You are now blocking {}".format(args.account)) def unblock(app, user, args): account = _find_account(app, user, args.account) api.unblock(app, user, account['id']) print_out("✓ {} is no longer blocked".format(args.account)) def whoami(app, user, args): account = api.verify_credentials(app, user) print_account(account) def whois(app, user, args): account = _find_account(app, user, args.account) print_account(account) def instance(app, user, args): name = args.instance or (app and app.instance) if not name: raise ConsoleError("Please specify instance name.") assert_domain_exists(name) try: instance = api.get_instance(name) print_instance(instance) except NotFoundError: raise ConsoleError( "Instance not found at {}.\n" "The given domain probably does not host a Mastodon instance.".format(name) )