294 lines
8.6 KiB
Python
294 lines
8.6 KiB
Python
import click
|
|
import os
|
|
import sys
|
|
|
|
from datetime import datetime, timedelta, timezone
|
|
from time import sleep, time
|
|
from typing import BinaryIO, Optional, Tuple
|
|
|
|
from toot import api, config
|
|
from toot.cli import AccountParamType, cli, json_option, pass_context, Context
|
|
from toot.cli import DURATION_EXAMPLES, VISIBILITY_CHOICES
|
|
from toot.cli.validators import validate_duration, validate_language
|
|
from toot.entities import MediaAttachment, from_dict
|
|
from toot.utils import EOF_KEY, delete_tmp_status_file, editor_input, multiline_input
|
|
from toot.utils.datetime import parse_datetime
|
|
|
|
|
|
@cli.command()
|
|
@click.argument("text", required=False)
|
|
@click.option(
|
|
"--media", "-m",
|
|
help="""Path to media file to attach, can be used multiple times to attach
|
|
multiple files.""",
|
|
type=click.File(mode="rb"),
|
|
multiple=True
|
|
)
|
|
@click.option(
|
|
"--description", "-d", "descriptions",
|
|
help="""Plain-text description of the media for accessibility purposes, one
|
|
per attached media""",
|
|
multiple=True,
|
|
)
|
|
@click.option(
|
|
"--thumbnail", "thumbnails",
|
|
help="Path to an image file to serve as media thumbnail, one per attached media",
|
|
type=click.File(mode="rb"),
|
|
multiple=True
|
|
)
|
|
@click.option(
|
|
"--visibility", "-v",
|
|
help="Post visibility",
|
|
type=click.Choice(VISIBILITY_CHOICES),
|
|
)
|
|
@click.option(
|
|
"--sensitive", "-s",
|
|
help="Mark status and attached media as sensitive",
|
|
default=False,
|
|
is_flag=True,
|
|
)
|
|
@click.option(
|
|
"--spoiler-text", "-p",
|
|
help="Text to be shown as a warning or subject before the actual content.",
|
|
)
|
|
@click.option(
|
|
"--reply-to", "-r",
|
|
help="ID of the status being replied to, if status is a reply.",
|
|
)
|
|
@click.option(
|
|
"--language", "-l",
|
|
help="ISO 639-1 language code of the toot, to skip automatic detection.",
|
|
callback=validate_language,
|
|
)
|
|
@click.option(
|
|
"--editor", "-e",
|
|
is_flag=False,
|
|
flag_value=os.getenv("EDITOR"),
|
|
help="""Specify an editor to compose your toot. When used without a value
|
|
it will use the editor defined in the $EDITOR environment variable.""",
|
|
)
|
|
@click.option(
|
|
"--scheduled-at",
|
|
help="""ISO 8601 Datetime at which to schedule a status. Must be at least 5
|
|
minutes in the future.""",
|
|
)
|
|
@click.option(
|
|
"--scheduled-in",
|
|
help=f"""Schedule the toot to be posted after a given amount of time,
|
|
{DURATION_EXAMPLES}. Must be at least 5 minutes.""",
|
|
callback=validate_duration,
|
|
)
|
|
@click.option(
|
|
"--content-type", "-t",
|
|
help="MIME type for the status text (not supported on all instances)",
|
|
)
|
|
@click.option(
|
|
"--poll-option",
|
|
help="Possible answer to the poll, can be given multiple times.",
|
|
multiple=True,
|
|
)
|
|
@click.option(
|
|
"--poll-expires-in",
|
|
help=f"Duration that the poll should be open, {DURATION_EXAMPLES}",
|
|
callback=validate_duration,
|
|
default="24h",
|
|
)
|
|
@click.option(
|
|
"--poll-multiple",
|
|
help="Allow multiple answers to be selected.",
|
|
is_flag=True,
|
|
default=False,
|
|
)
|
|
@click.option(
|
|
"--poll-hide-totals",
|
|
help="Hide vote counts until the poll ends.",
|
|
is_flag=True,
|
|
default=False,
|
|
)
|
|
@click.option(
|
|
"-u", "--using",
|
|
type=AccountParamType(),
|
|
help="The account to use, overrides the active account.",
|
|
)
|
|
@json_option
|
|
@pass_context
|
|
def post(
|
|
ctx: Context,
|
|
text: Optional[str],
|
|
media: Tuple[str],
|
|
descriptions: Tuple[str],
|
|
thumbnails: Tuple[str],
|
|
visibility: Optional[str],
|
|
sensitive: bool,
|
|
spoiler_text: Optional[str],
|
|
reply_to: Optional[str],
|
|
language: Optional[str],
|
|
editor: Optional[str],
|
|
scheduled_at: Optional[str],
|
|
scheduled_in: Optional[int],
|
|
content_type: Optional[str],
|
|
poll_option: Tuple[str],
|
|
poll_expires_in: int,
|
|
poll_multiple: bool,
|
|
poll_hide_totals: bool,
|
|
json: bool,
|
|
using: str
|
|
):
|
|
"""Post a new status"""
|
|
if len(media) > 4:
|
|
raise click.ClickException("Cannot attach more than 4 files.")
|
|
|
|
if using:
|
|
user, app = config.get_user_app(using)
|
|
if not user or not app:
|
|
raise click.ClickException(f"Account '{using}' not found. Run `toot auth` to see available accounts.")
|
|
else:
|
|
user, app = ctx.user, ctx.app
|
|
|
|
media_ids = _upload_media(ctx.app, ctx.user, media, descriptions, thumbnails)
|
|
status_text = _get_status_text(text, editor, media)
|
|
scheduled_at = _get_scheduled_at(scheduled_at, scheduled_in)
|
|
|
|
if not status_text and not media_ids:
|
|
raise click.ClickException("You must specify either text or media to post.")
|
|
|
|
response = api.post_status(
|
|
app,
|
|
user,
|
|
status_text,
|
|
visibility=visibility,
|
|
media_ids=media_ids,
|
|
sensitive=sensitive,
|
|
spoiler_text=spoiler_text,
|
|
in_reply_to_id=reply_to,
|
|
language=language,
|
|
scheduled_at=scheduled_at,
|
|
content_type=content_type,
|
|
poll_options=poll_option,
|
|
poll_expires_in=poll_expires_in,
|
|
poll_multiple=poll_multiple,
|
|
poll_hide_totals=poll_hide_totals,
|
|
)
|
|
|
|
if json:
|
|
click.echo(response.text)
|
|
else:
|
|
status = response.json()
|
|
if "scheduled_at" in status:
|
|
scheduled_at = parse_datetime(status["scheduled_at"])
|
|
scheduled_at = datetime.strftime(scheduled_at, "%Y-%m-%d %H:%M:%S%z")
|
|
click.echo(f"Toot scheduled for: {scheduled_at}")
|
|
else:
|
|
click.echo(f"Toot posted: {status['url']}")
|
|
|
|
delete_tmp_status_file()
|
|
|
|
|
|
@cli.command()
|
|
@click.argument("file", type=click.File(mode="rb"))
|
|
@click.option(
|
|
"--description", "-d",
|
|
help="Plain-text description of the media for accessibility purposes"
|
|
)
|
|
@json_option
|
|
@pass_context
|
|
def upload(
|
|
ctx: Context,
|
|
file: BinaryIO,
|
|
description: Optional[str],
|
|
json: bool,
|
|
):
|
|
"""Upload an image or video file
|
|
|
|
This is probably not very useful, see `toot post --media` instead.
|
|
"""
|
|
response = _do_upload(ctx.app, ctx.user, file, description, None)
|
|
if json:
|
|
click.echo(response.text)
|
|
else:
|
|
media = from_dict(MediaAttachment, response.json())
|
|
click.echo()
|
|
click.echo(f"Successfully uploaded media ID {media.id}, type '{media.type}'")
|
|
click.echo(f"URL: {media.url}")
|
|
click.echo(f"Preview URL: {media.preview_url}")
|
|
|
|
|
|
def _get_status_text(text, editor, media):
|
|
isatty = sys.stdin.isatty()
|
|
|
|
if not text and not isatty:
|
|
text = sys.stdin.read().rstrip()
|
|
|
|
if isatty:
|
|
if editor:
|
|
text = editor_input(editor, text)
|
|
elif not text and not media:
|
|
click.echo(f"Write or paste your toot. Press {EOF_KEY} to post it.")
|
|
text = multiline_input()
|
|
|
|
return text
|
|
|
|
|
|
def _get_scheduled_at(scheduled_at, scheduled_in):
|
|
if scheduled_at:
|
|
return scheduled_at
|
|
|
|
if scheduled_in:
|
|
scheduled_at = datetime.now(timezone.utc) + timedelta(seconds=scheduled_in)
|
|
return scheduled_at.replace(microsecond=0).isoformat()
|
|
|
|
return None
|
|
|
|
|
|
def _upload_media(app, user, media, descriptions, thumbnails):
|
|
# Match media to corresponding descriptions and thumbnail
|
|
media = media or []
|
|
descriptions = descriptions or []
|
|
thumbnails = thumbnails or []
|
|
uploaded_media = []
|
|
|
|
for idx, file in enumerate(media):
|
|
description = descriptions[idx].strip() if idx < len(descriptions) else None
|
|
thumbnail = thumbnails[idx] if idx < len(thumbnails) else None
|
|
result = _do_upload(app, user, file, description, thumbnail).json()
|
|
uploaded_media.append(result)
|
|
|
|
_wait_until_all_processed(app, user, uploaded_media)
|
|
|
|
return [m["id"] for m in uploaded_media]
|
|
|
|
|
|
def _do_upload(app, user, file, description, thumbnail):
|
|
return api.upload_media(app, user, file, description=description, thumbnail=thumbnail)
|
|
|
|
|
|
def _wait_until_all_processed(app, user, uploaded_media):
|
|
"""
|
|
Media is uploaded asynchronously, and cannot be attached until the server
|
|
has finished processing it. This function waits for that to happen.
|
|
|
|
Once media is processed, it will have the URL populated.
|
|
"""
|
|
if all(m["url"] for m in uploaded_media):
|
|
return
|
|
|
|
# Timeout after waiting 1 minute
|
|
start_time = time()
|
|
timeout = 60
|
|
|
|
click.echo("Waiting for media to finish processing...")
|
|
for media in uploaded_media:
|
|
_wait_until_processed(app, user, media, start_time, timeout)
|
|
|
|
|
|
def _wait_until_processed(app, user, media, start_time, timeout):
|
|
if media["url"]:
|
|
return
|
|
|
|
media = api.get_media(app, user, media["id"])
|
|
while not media["url"]:
|
|
sleep(1)
|
|
if time() > start_time + timeout:
|
|
raise click.ClickException(f"Media not processed by server after {timeout} seconds. Aborting.")
|
|
media = api.get_media(app, user, media["id"])
|