Toot-Mastodon-CLI-TUI-clien.../toot/commands.py

401 lines
12 KiB
Python
Raw Normal View History

2017-04-19 14:47:30 +02:00
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from __future__ import print_function
import json
import requests
import webbrowser
2017-04-19 14:47:30 +02:00
from bs4 import BeautifulSoup
from builtins import input
from datetime import datetime
from future.moves.itertools import zip_longest
from getpass import getpass
from itertools import chain
2017-04-19 15:29:40 +02:00
from textwrap import TextWrapper, wrap
2017-04-19 14:47:30 +02:00
from toot import api, config, DEFAULT_INSTANCE, User, App, ConsoleError
from toot.output import print_out
from toot.app import TimelineApp
2017-04-19 14:47:30 +02:00
def register_app(instance):
print_out("Registering application with <green>{}</green>".format(instance))
2017-04-19 14:47:30 +02:00
try:
response = api.create_app(instance)
except:
raise ConsoleError("Registration failed. Did you enter a valid instance?")
base_url = 'https://' + instance
app = App(instance, base_url, response['client_id'], response['client_secret'])
path = config.save_app(app)
print_out("Application tokens saved to: <green>{}</green>\n".format(path))
2017-04-19 14:47:30 +02:00
return app
def create_app_interactive(instance=None):
2017-04-19 14:47:30 +02:00
if not instance:
print_out("Choose an instance [<green>{}</green>]: ".format(DEFAULT_INSTANCE), end="")
instance = input()
if not instance:
instance = DEFAULT_INSTANCE
2017-04-19 14:47:30 +02:00
return config.load_app(instance) or register_app(instance)
def create_user(app, email, access_token):
user = User(app.instance, email, access_token)
path = config.save_user(user)
print_out("Access token saved to: <green>{}</green>".format(path))
return user
def login_interactive(app, email=None):
print_out("Log in to <green>{}</green>".format(app.instance))
if email:
print_out("Email: <green>{}</green>".format(email))
2017-04-19 14:47:30 +02:00
while not email:
email = input('Email: ')
password = getpass('Password: ')
2017-04-19 14:47:30 +02:00
try:
print_out("Authenticating...")
2017-04-19 14:47:30 +02:00
response = api.login(app, email, password)
except api.ApiError:
raise ConsoleError("Login failed")
return create_user(app, email, response['access_token'])
2017-04-19 14:47:30 +02:00
def two_factor_login_interactive(app):
"""Hacky implementation of two factor authentication"""
print_out("Log in to {}".format(app.instance))
2017-04-19 14:47:30 +02:00
email = input('Email: ')
password = getpass('Password: ')
sign_in_url = app.base_url + '/auth/sign_in'
session = requests.Session()
# Fetch sign in form
response = session.get(sign_in_url)
response.raise_for_status()
soup = BeautifulSoup(response.content, "html.parser")
form = soup.find('form')
inputs = form.find_all('input')
data = {i.attrs.get('name'): i.attrs.get('value') for i in inputs}
data['user[email]'] = email
data['user[password]'] = password
# Submit form, get 2FA entry form
response = session.post(sign_in_url, data)
response.raise_for_status()
soup = BeautifulSoup(response.content, "html.parser")
form = soup.find('form')
inputs = form.find_all('input')
data = {i.attrs.get('name'): i.attrs.get('value') for i in inputs}
data['user[otp_attempt]'] = input("2FA Token: ")
# Submit token
response = session.post(sign_in_url, data)
response.raise_for_status()
# Extract access token from response
soup = BeautifulSoup(response.content, "html.parser")
initial_state = soup.find('script', id='initial-state')
if not initial_state:
raise ConsoleError("Login failed: Invalid 2FA token?")
data = json.loads(initial_state.get_text())
access_token = data['meta']['access_token']
return create_user(app, email, access_token)
2017-04-19 14:47:30 +02:00
def _print_timeline(item):
def wrap_text(text, width):
wrapper = TextWrapper(width=width, break_long_words=False, break_on_hyphens=False)
return chain(*[wrapper.wrap(l) for l in text.split("\n")])
def timeline_rows(item):
name = item['name']
time = item['time'].strftime('%Y-%m-%d %H:%M%Z')
left_column = [name, time]
if 'reblogged' in item:
left_column.append(item['reblogged'])
text = item['text']
right_column = wrap_text(text, 80)
return zip_longest(left_column, right_column, fillvalue="")
for left, right in timeline_rows(item):
print_out("{:30}{}".format(left, right))
2017-04-19 14:47:30 +02:00
def _parse_timeline(item):
content = item['reblog']['content'] if item['reblog'] else item['content']
reblogged = item['reblog']['account']['username'] if item['reblog'] else ""
name = item['account']['display_name'] + " @" + item['account']['username']
soup = BeautifulSoup(content, "html.parser")
text = soup.get_text().replace('&apos;', "'")
time = datetime.strptime(item['created_at'], "%Y-%m-%dT%H:%M:%S.%fZ")
return {
"name": name,
"text": text,
"time": time,
"reblogged": reblogged,
}
def timeline(app, user, args):
items = api.timeline_home(app, user)
parsed_items = [_parse_timeline(t) for t in items]
print_out("" * 31 + "" + "" * 88)
2017-04-19 14:47:30 +02:00
for item in parsed_items:
_print_timeline(item)
print_out("" * 31 + "" + "" * 88)
2017-04-19 14:47:30 +02:00
def curses(app, user, args):
generator = api.timeline_generator(app, user)
TimelineApp(generator).run()
2017-04-19 14:47:30 +02:00
def post(app, user, args):
if args.media:
media = _do_upload(app, user, args.media)
media_ids = [media['id']]
else:
media_ids = None
response = api.post_status(app, user, args.text, media_ids=media_ids, visibility=args.visibility)
print_out("Toot posted: <green>{}</green>".format(response.get('url')))
2017-04-19 14:47:30 +02:00
def auth(app, user, args):
if app and user:
print_out("You are logged in to <yellow>{}</yellow> as <yellow>{}</yellow>\n".format(
app.instance, user.username))
print_out("User data: <green>{}</green>".format(config.get_user_config_path()))
print_out("App data: <green>{}</green>".format(config.get_instance_config_path(app.instance)))
2017-04-19 14:47:30 +02:00
else:
print_out("You are not logged in")
2017-04-19 14:47:30 +02:00
def login(app, user, args):
app = create_app_interactive(instance=args.instance)
login_interactive(app, args.email)
2017-04-19 14:47:30 +02:00
print_out()
print_out("<green>✓ Successfully logged in.</green>")
2017-04-19 14:47:30 +02:00
def login_2fa(app, user, args):
print_out()
print_out("<yellow>Two factor authentication is experimental.</yellow>")
print_out("<yellow>If you have problems logging in, please open an issue:</yellow>")
print_out("<yellow>https://github.com/ihabunek/toot/issues</yellow>")
print_out()
2017-04-19 14:47:30 +02:00
app = create_app_interactive()
two_factor_login_interactive(app)
print_out()
print_out("<green>✓ Successfully logged in.</green>")
BROWSER_LOGIN_EXPLANATION = """
This authentication method requires you to log into your Mastodon instance
in your browser, where you will be asked to authorize <yellow>toot</yellow> to access
your account. When you do, you will be given an <yellow>authorization code</yellow>
which you need to paste here.
"""
def login_browser(app, user, args):
app = create_app_interactive(instance=args.instance)
url = api.get_browser_login_url(app)
print_out(BROWSER_LOGIN_EXPLANATION)
print_out("This is the login URL:")
print_out(url)
print_out("")
yesno = input("Open link in default browser? [Y/n]")
if not yesno or yesno.lower() == 'y':
webbrowser.open(url)
authorization_code = ""
while not authorization_code:
authorization_code = input("Authorization code: ")
print_out("\nRequesting access token...")
response = api.request_access_token(app, authorization_code)
# TODO: user email is not available in this workflow, maybe change the User
# to store the username instead? Currently set to "unknown" since it's not
# used anywhere.
email = "unknown"
create_user(app, email, response['access_token'])
print_out()
print_out("<green>✓ Successfully logged in.</green>")
2017-04-19 14:47:30 +02:00
def logout(app, user, args):
config.delete_user()
print_out("<green>✓ You are now logged out.</green>")
2017-04-19 14:47:30 +02:00
def upload(app, user, args):
response = _do_upload(app, user, args.file)
print_out()
print_out("Successfully uploaded media ID <yellow>{}</yellow>, type '<yellow>{}</yellow>'".format(
response['id'], response['type']))
print_out("Original URL: <green>{}</green>".format(response['url']))
print_out("Preview URL: <green>{}</green>".format(response['preview_url']))
print_out("Text URL: <green>{}</green>".format(response['text_url']))
2017-04-19 14:47:30 +02:00
def _print_accounts(accounts):
if not accounts:
return
print_out("\nAccounts:")
2017-04-19 14:47:30 +02:00
for account in accounts:
print_out("* <green>@{}</green> {}".format(
account['acct'],
account['display_name']
))
2017-04-19 14:47:30 +02:00
def _print_hashtags(hashtags):
if not hashtags:
return
print_out("\nHashtags:")
print_out(", ".join(["<green>#{}</green>".format(t) for t in hashtags]))
2017-04-19 14:47:30 +02:00
def search(app, user, args):
response = api.search(app, user, args.query, args.resolve)
_print_accounts(response['accounts'])
_print_hashtags(response['hashtags'])
def _do_upload(app, user, file):
print_out("Uploading media: <green>{}</green>".format(file.name))
2017-04-19 14:47:30 +02:00
return api.upload_media(app, user, file)
def _find_account(app, user, account_name):
"""For a given account name, returns the Account object.
2017-04-19 14:47:30 +02:00
Raises an exception if not found.
"""
if not account_name:
raise ConsoleError("Empty account name given")
accounts = api.search_accounts(app, user, account_name)
if account_name[0] == "@":
account_name = account_name[1:]
for account in accounts:
if account['acct'] == account_name:
2017-04-19 14:47:30 +02:00
return account
2017-04-19 15:29:40 +02:00
raise ConsoleError("Account not found")
def _print_account(account):
print_out("<green>@{}</green> {}".format(account['acct'], account['display_name']))
2017-04-19 15:29:40 +02:00
2017-05-07 10:42:31 +02:00
note = BeautifulSoup(account['note'], "html.parser").get_text()
if note:
print_out("")
print_out("\n".join(wrap(note)))
2017-04-19 15:29:40 +02:00
print_out("")
print_out("ID: <green>{}</green>".format(account['id']))
print_out("Since: <green>{}</green>".format(account['created_at'][:19].replace('T', ' @ ')))
print_out("")
print_out("Followers: <yellow>{}</yellow>".format(account['followers_count']))
print_out("Following: <yellow>{}</yellow>".format(account['following_count']))
print_out("Statuses: <yellow>{}</yellow>".format(account['statuses_count']))
print_out("")
print_out(account['url'])
2017-04-19 15:29:40 +02:00
2017-04-19 14:47:30 +02:00
def follow(app, user, args):
account = _find_account(app, user, args.account)
api.follow(app, user, account['id'])
print_out("<green>✓ You are now following {}</green>".format(args.account))
2017-04-19 14:47:30 +02:00
def unfollow(app, user, args):
account = _find_account(app, user, args.account)
2017-04-26 11:49:21 +02:00
api.unfollow(app, user, account['id'])
print_out("<green>✓ You are no longer following {}</green>".format(args.account))
2017-04-19 14:47:30 +02:00
2017-04-26 11:49:21 +02:00
def mute(app, user, args):
account = _find_account(app, user, args.account)
api.mute(app, user, account['id'])
print_out("<green>✓ You have muted {}</green>".format(args.account))
2017-04-19 14:47:30 +02:00
2017-04-26 11:49:21 +02:00
def unmute(app, user, args):
account = _find_account(app, user, args.account)
api.unmute(app, user, account['id'])
print_out("<green>✓ {} is no longer muted</green>".format(args.account))
2017-04-26 11:49:21 +02:00
def block(app, user, args):
account = _find_account(app, user, args.account)
api.block(app, user, account['id'])
print_out("<green>✓ You are now blocking {}</green>".format(args.account))
2017-04-26 11:49:21 +02:00
def unblock(app, user, args):
account = _find_account(app, user, args.account)
api.unblock(app, user, account['id'])
print_out("<green>✓ {} is no longer blocked</green>".format(args.account))
2017-04-19 14:47:30 +02:00
def whoami(app, user, args):
2017-04-19 15:29:40 +02:00
account = api.verify_credentials(app, user)
_print_account(account)
2017-04-19 14:47:30 +02:00
2017-04-19 15:29:40 +02:00
def whois(app, user, args):
account = _find_account(app, user, args.account)
_print_account(account)