forget-cancellare-vecchi-toot/libforget/mastodon.py

221 lines
6.9 KiB
Python
Raw Normal View History

from mastodon import Mastodon
from mastodon.Mastodon import MastodonAPIError,\
MastodonNetworkError,\
MastodonRatelimitError,\
MastodonUnauthorizedError
from model import MastodonApp, Account, OAuthToken, Post, MastodonInstance
from requests import head
from app import db, sentry
2017-09-20 23:02:36 +02:00
from libforget.exceptions import TemporaryError
from functools import lru_cache
2017-08-29 14:46:32 +02:00
def get_or_create_app(instance_url, callback, website):
instance_url = instance_url
app = MastodonApp.query.get(instance_url)
try:
head('https://{}'.format(instance_url)).raise_for_status()
proto = 'https'
except Exception:
head('http://{}'.format(instance_url)).raise_for_status()
proto = 'http'
if not app:
2017-08-29 14:46:32 +02:00
client_id, client_secret = Mastodon.create_app(
'forget',
scopes=('read', 'write'),
api_base_url='{}://{}'.format(proto, instance_url),
redirect_uris=callback,
website=website,
)
app = MastodonApp()
app.instance = instance_url
app.client_id = client_id
app.client_secret = client_secret
app.protocol = proto
return app
2017-08-29 14:46:32 +02:00
def anonymous_api(app):
2017-08-29 14:46:32 +02:00
return Mastodon(
app.client_id,
client_secret=app.client_secret,
api_base_url='{}://{}'.format(app.protocol, app.instance),
)
2017-08-29 14:46:32 +02:00
def login_url(app, callback):
return anonymous_api(app).auth_request_url(
redirect_uris=callback,
scopes=('read', 'write',)
)
2017-08-29 14:46:32 +02:00
def receive_code(code, app, callback):
api = anonymous_api(app)
access_token = api.log_in(
code=code,
scopes=('read', 'write'),
redirect_uri=callback,
)
remote_acc = api.account_verify_credentials()
2017-08-19 13:11:16 +02:00
acc = account_from_api_object(remote_acc, app.instance)
acc = db.session.merge(acc)
2017-08-29 14:46:32 +02:00
token = OAuthToken(token=access_token)
2017-08-19 13:11:16 +02:00
token = db.session.merge(token)
token.account = acc
return token
@lru_cache()
def get_api_for_acc(account):
app = MastodonApp.query.get(account.mastodon_instance)
for token in account.tokens:
2017-08-29 14:46:32 +02:00
api = Mastodon(
app.client_id,
client_secret=app.client_secret,
api_base_url='{}://{}'.format(app.protocol, app.instance),
access_token=token.token,
ratelimit_method='throw',
)
try:
# api.verify_credentials()
# doesnt error even if the token is revoked lol
# https://github.com/tootsuite/mastodon/issues/4637
# so we have to do this:
2017-09-24 23:49:08 +02:00
api.timeline()
return api
except MastodonUnauthorizedError as e:
if sentry:
sentry.captureMessage(
'Mastodon auth revoked or incorrect',
extra=locals())
db.session.delete(token)
db.session.commit()
continue
except MastodonAPIError as e:
raise TemporaryError(e)
except (MastodonNetworkError,
MastodonRatelimitError) as e:
raise TemporaryError(e)
2017-08-19 13:11:16 +02:00
def fetch_acc(acc, cursor=None):
api = get_api_for_acc(acc)
if not api:
print('no access, aborting')
return None
try:
newacc = account_from_api_object(
api.account_verify_credentials(), acc.mastodon_instance)
acc = db.session.merge(newacc)
2017-08-19 13:11:16 +02:00
kwargs = dict(limit=40)
if cursor:
kwargs.update(cursor)
2017-08-19 13:11:16 +02:00
if 'max_id' not in kwargs:
most_recent_post = (
Post.query.with_parent(acc)
.order_by(db.desc(Post.created_at)).first())
if most_recent_post:
kwargs['since_id'] = most_recent_post.mastodon_id
2017-08-19 13:11:16 +02:00
statuses = api.account_statuses(acc.mastodon_id, **kwargs)
2017-08-19 13:11:16 +02:00
if statuses:
for status in statuses:
post = post_from_api_object(status, acc.mastodon_instance)
db.session.merge(post)
2017-09-21 07:26:01 +02:00
if 'max_id' not in kwargs:
kwargs['max_id'] = int(status['id'])
2017-09-21 07:26:01 +02:00
kwargs['max_id'] = min(int(kwargs['max_id']), int(status['id']))
2017-08-19 13:11:16 +02:00
else:
kwargs = None
2017-08-19 13:11:16 +02:00
db.session.commit()
2017-08-19 13:11:16 +02:00
return kwargs
except (MastodonAPIError,
MastodonNetworkError,
MastodonRatelimitError) as e:
raise TemporaryError(e)
2017-08-19 13:11:16 +02:00
2017-08-29 14:46:32 +02:00
2017-08-19 13:11:16 +02:00
def post_from_api_object(obj, instance):
return Post(
2017-08-29 14:46:32 +02:00
mastodon_instance=instance,
mastodon_id=obj['id'],
2017-08-29 14:46:32 +02:00
favourite=obj['favourited'],
has_media=('media_attachments' in obj
and bool(obj['media_attachments'])),
created_at=obj['created_at'],
2017-08-29 14:46:32 +02:00
author_id=account_from_api_object(obj['account'], instance).id,
direct=obj['visibility'] == 'direct',
favourites=obj['favourites_count'],
reblogs=obj['reblogs_count'],
2017-12-27 21:23:13 +01:00
is_reblog=obj['reblog'] is not None,
2017-08-19 13:11:16 +02:00
)
2017-08-29 14:46:32 +02:00
2017-08-19 13:11:16 +02:00
def account_from_api_object(obj, instance):
return Account(
2017-08-29 14:46:32 +02:00
mastodon_instance=instance,
mastodon_id=obj['id'],
2017-08-29 16:57:30 +02:00
screen_name='{}@{}'.format(obj['username'], instance),
2017-08-29 14:46:32 +02:00
display_name=obj['display_name'],
avatar_url=obj['avatar'],
reported_post_count=obj['statuses_count'],
2017-08-19 13:11:16 +02:00
)
2017-08-29 14:46:32 +02:00
2017-08-19 13:11:16 +02:00
def refresh_posts(posts):
acc = posts[0].author
api = get_api_for_acc(acc)
if not api:
raise Exception('no access')
new_posts = list()
for post in posts:
try:
status = api.status(post.mastodon_id)
2017-08-29 14:46:32 +02:00
new_post = db.session.merge(
post_from_api_object(status, post.mastodon_instance))
2017-08-19 13:11:16 +02:00
new_posts.append(new_post)
except (MastodonAPIError,
MastodonNetworkError,
MastodonRatelimitError) as e:
2017-09-17 15:12:01 +02:00
if any([
err in str(e)
2017-09-19 01:43:50 +02:00
for err in ('Endpoint not found', 'Record not found')]):
2017-08-19 13:11:16 +02:00
db.session.delete(post)
else:
raise TemporaryError(e)
2017-08-19 13:11:16 +02:00
return new_posts
2017-08-29 14:46:32 +02:00
2017-08-19 13:11:16 +02:00
def delete(post):
api = get_api_for_acc(post.author)
try:
api.status_delete(post.mastodon_id)
db.session.delete(post)
except (MastodonAPIError,
MastodonNetworkError,
MastodonRatelimitError) as e:
raise TemporaryError(e)
def suggested_instances(limit=5, min_popularity=5, blacklist=tuple()):
return tuple((ins.instance for ins in (
MastodonInstance.query
.filter(MastodonInstance.popularity > min_popularity)
.filter(~MastodonInstance.instance.in_(blacklist))
.order_by(db.desc(MastodonInstance.popularity),
MastodonInstance.instance)
.limit(limit).all())))