forget-cancellare-vecchi-toot/libforget/mastodon.py

208 lines
6.6 KiB
Python
Raw Normal View History

from mastodon import Mastodon
from mastodon.Mastodon import MastodonAPIError,\
MastodonNetworkError,\
MastodonNotFoundError,\
MastodonRatelimitError,\
MastodonUnauthorizedError
from model import MastodonApp, Account, OAuthToken, Post, MastodonInstance
from requests import head
2019-07-11 22:31:29 +02:00
import requests
from app import db, sentry
2017-09-20 23:02:36 +02:00
from libforget.exceptions import TemporaryError
from functools import lru_cache
2019-07-11 22:31:29 +02:00
from libforget.session import make_session
2017-08-29 14:46:32 +02:00
def get_or_create_app(instance_url, callback, website):
instance_url = instance_url
app = MastodonApp.query.get(instance_url)
try:
head('https://{}'.format(instance_url)).raise_for_status()
proto = 'https'
except Exception:
head('http://{}'.format(instance_url)).raise_for_status()
proto = 'http'
if not app:
2017-08-29 14:46:32 +02:00
client_id, client_secret = Mastodon.create_app(
'forget',
scopes=('read', 'write'),
api_base_url='{}://{}'.format(proto, instance_url),
redirect_uris=callback,
website=website,
)
app = MastodonApp()
app.instance = instance_url
app.client_id = client_id
app.client_secret = client_secret
app.protocol = proto
return app
2017-08-29 14:46:32 +02:00
2019-07-11 22:31:29 +02:00
def anonymous_api(app):
2017-08-29 14:46:32 +02:00
return Mastodon(
app.client_id,
client_secret=app.client_secret,
api_base_url='{}://{}'.format(app.protocol, app.instance),
2019-07-11 22:31:29 +02:00
session=make_session(),
)
2017-08-29 14:46:32 +02:00
def login_url(app, callback):
return anonymous_api(app).auth_request_url(
redirect_uris=callback,
scopes=('read', 'write',)
)
2017-08-29 14:46:32 +02:00
def receive_code(code, app, callback):
api = anonymous_api(app)
access_token = api.log_in(
code=code,
scopes=('read', 'write'),
redirect_uri=callback,
)
remote_acc = api.account_verify_credentials()
2017-08-19 13:11:16 +02:00
acc = account_from_api_object(remote_acc, app.instance)
acc = db.session.merge(acc)
2017-08-29 14:46:32 +02:00
token = OAuthToken(token=access_token)
2017-08-19 13:11:16 +02:00
token = db.session.merge(token)
token.account = acc
return token
@lru_cache()
def get_api_for_acc(account):
app = MastodonApp.query.get(account.mastodon_instance)
for token in account.tokens:
2017-08-29 14:46:32 +02:00
api = Mastodon(
app.client_id,
client_secret=app.client_secret,
api_base_url='{}://{}'.format(app.protocol, app.instance),
access_token=token.token,
ratelimit_method='throw',
2019-07-11 22:31:29 +02:00
session=make_session(),
)
try:
# api.verify_credentials()
# doesnt error even if the token is revoked lol
# https://github.com/tootsuite/mastodon/issues/4637
# so we have to do this:
2017-09-24 23:49:08 +02:00
api.timeline()
if api.ratelimit_remaining / api.ratelimit_limit < 1/4:
raise TemporaryError("Rate limit too low")
return api
except MastodonUnauthorizedError as e:
if sentry:
sentry.captureMessage(
'Mastodon auth revoked or incorrect',
extra=locals())
db.session.delete(token)
db.session.commit()
continue
except MastodonAPIError as e:
raise TemporaryError(e)
except (MastodonNetworkError,
MastodonRatelimitError) as e:
raise TemporaryError(e)
raise TemporaryError('No access to account {}'.format(account))
2017-08-19 13:11:16 +02:00
def fetch_posts(acc, max_id, since_id):
2017-08-19 13:11:16 +02:00
api = get_api_for_acc(acc)
try:
newacc = account_from_api_object(
api.account_verify_credentials(), acc.mastodon_instance)
acc = db.session.merge(newacc)
2017-08-19 13:11:16 +02:00
kwargs = dict(limit=40)
if max_id:
kwargs['max_id'] = max_id
if since_id:
kwargs['since_id'] = since_id
2017-08-19 13:11:16 +02:00
statuses = api.account_statuses(acc.mastodon_id, **kwargs)
2017-08-19 13:11:16 +02:00
return [post_from_api_object(status, acc.mastodon_instance) for status in statuses]
2017-08-19 13:11:16 +02:00
except (MastodonAPIError,
MastodonNetworkError,
MastodonRatelimitError) as e:
raise TemporaryError(e)
2017-08-19 13:11:16 +02:00
2017-08-29 14:46:32 +02:00
2017-08-19 13:11:16 +02:00
def post_from_api_object(obj, instance):
return Post(
2017-08-29 14:46:32 +02:00
mastodon_instance=instance,
mastodon_id=obj['id'],
2017-08-29 14:46:32 +02:00
favourite=obj['favourited'],
has_media=('media_attachments' in obj
and bool(obj['media_attachments'])),
created_at=obj['created_at'],
2017-08-29 14:46:32 +02:00
author_id=account_from_api_object(obj['account'], instance).id,
direct=obj['visibility'] == 'direct',
favourites=obj['favourites_count'],
reblogs=obj['reblogs_count'],
2017-12-27 21:23:13 +01:00
is_reblog=obj['reblog'] is not None,
2017-08-19 13:11:16 +02:00
)
2017-08-29 14:46:32 +02:00
2017-08-19 13:11:16 +02:00
def account_from_api_object(obj, instance):
return Account(
2017-08-29 14:46:32 +02:00
mastodon_instance=instance,
mastodon_id=obj['id'],
2017-08-29 16:57:30 +02:00
screen_name='{}@{}'.format(obj['username'], instance),
2017-08-29 14:46:32 +02:00
display_name=obj['display_name'],
avatar_url=obj['avatar'],
reported_post_count=obj['statuses_count'],
2017-08-19 13:11:16 +02:00
)
2017-08-29 14:46:32 +02:00
2017-08-19 13:11:16 +02:00
def refresh_posts(posts):
acc = posts[0].author
api = get_api_for_acc(acc)
new_posts = list()
with db.session.no_autoflush:
for post in posts:
print('Refreshing {}'.format(post))
try:
status = api.status(post.mastodon_id)
new_post = db.session.merge(
post_from_api_object(status, post.mastodon_instance))
new_post.touch()
new_posts.append(new_post)
except MastodonNotFoundError:
db.session.delete(post)
except (MastodonAPIError,
MastodonNetworkError,
MastodonRatelimitError) as e:
raise TemporaryError(e)
2017-08-19 13:11:16 +02:00
return new_posts
2017-08-29 14:46:32 +02:00
2017-08-19 13:11:16 +02:00
def delete(post):
api = get_api_for_acc(post.author)
try:
api.status_delete(post.mastodon_id)
db.session.delete(post)
except (MastodonAPIError,
MastodonNetworkError,
MastodonRatelimitError) as e:
raise TemporaryError(e)
def suggested_instances(limit=5, min_popularity=5, blacklist=tuple()):
return tuple((ins.instance for ins in (
MastodonInstance.query
.filter(MastodonInstance.popularity > min_popularity)
.filter(~MastodonInstance.instance.in_(blacklist))
.order_by(db.desc(MastodonInstance.popularity),
MastodonInstance.instance)
.limit(limit).all())))