overhaul and abstract errors in service libs

also add support for making an account dormant if there is a permanent
error
This commit is contained in:
codl 2017-09-04 22:04:03 +02:00
parent 5e22a0531f
commit edf7732e67
No known key found for this signature in database
GPG Key ID: 6CD7C8891ED1233A
7 changed files with 244 additions and 152 deletions

6
lib/exceptions.py Normal file
View File

@ -0,0 +1,6 @@
class PermanentError(Exception):
pass
class TemporaryError(Exception):
pass

View File

@ -1,10 +1,12 @@
from mastodon import Mastodon from mastodon import Mastodon
from mastodon.Mastodon import MastodonAPIError from mastodon.Mastodon import MastodonAPIError, MastodonNetworkError,\
MastodonRatelimitError
from model import MastodonApp, Account, OAuthToken, Post from model import MastodonApp, Account, OAuthToken, Post
from requests import head from requests import head
from app import db, sentry from app import db, sentry
from math import inf from math import inf
import iso8601 import iso8601
from lib.exceptions import TemporaryError
def get_or_create_app(instance_url, callback, website): def get_or_create_app(instance_url, callback, website):
@ -77,19 +79,24 @@ def get_api_for_acc(account):
ratelimit_method='throw', ratelimit_method='throw',
) )
# api.verify_credentials() try:
# doesnt error even if the token is revoked lol # api.verify_credentials()
# https://github.com/tootsuite/mastodon/issues/4637 # doesnt error even if the token is revoked lol
# so we have to do this: # https://github.com/tootsuite/mastodon/issues/4637
tl = api.timeline() # so we have to do this:
if 'error' in tl: tl = api.timeline()
if sentry: if 'error' in tl:
sentry.captureMessage( if sentry:
'Mastodon auth revoked or incorrect', extra=locals()) sentry.captureMessage('Mastodon auth revoked or incorrect',
db.session.delete(token) extra=locals())
db.session.commit() db.session.delete(token)
continue db.session.commit()
return api continue
return api
except (MastodonAPIError,
MastodonNetworkError,
MastodonRatelimitError) as e:
raise TemporaryError(e)
def fetch_acc(acc, cursor=None): def fetch_acc(acc, cursor=None):
@ -98,37 +105,42 @@ def fetch_acc(acc, cursor=None):
print('no access, aborting') print('no access, aborting')
return None return None
newacc = account_from_api_object( try:
api.account_verify_credentials(), acc.mastodon_instance) newacc = account_from_api_object(
acc = db.session.merge(newacc) api.account_verify_credentials(), acc.mastodon_instance)
acc = db.session.merge(newacc)
kwargs = dict(limit=40) kwargs = dict(limit=40)
if cursor: if cursor:
kwargs.update(cursor) kwargs.update(cursor)
if 'max_id' not in kwargs: if 'max_id' not in kwargs:
most_recent_post = ( most_recent_post = (
Post.query.with_parent(acc) Post.query.with_parent(acc)
.order_by(db.desc(Post.created_at)).first()) .order_by(db.desc(Post.created_at)).first())
if most_recent_post: if most_recent_post:
kwargs['since_id'] = most_recent_post.mastodon_id kwargs['since_id'] = most_recent_post.mastodon_id
statuses = api.account_statuses(acc.mastodon_id, **kwargs) statuses = api.account_statuses(acc.mastodon_id, **kwargs)
if statuses: if statuses:
kwargs['max_id'] = +inf kwargs['max_id'] = +inf
for status in statuses: for status in statuses:
post = post_from_api_object(status, acc.mastodon_instance) post = post_from_api_object(status, acc.mastodon_instance)
db.session.merge(post) db.session.merge(post)
kwargs['max_id'] = min(kwargs['max_id'], status['id']) kwargs['max_id'] = min(kwargs['max_id'], status['id'])
else: else:
kwargs = None kwargs = None
db.session.commit() db.session.commit()
return kwargs return kwargs
except (MastodonAPIError,
MastodonNetworkError,
MastodonRatelimitError) as e:
raise TemporaryError(e)
def post_from_api_object(obj, instance): def post_from_api_object(obj, instance):
@ -168,16 +180,23 @@ def refresh_posts(posts):
new_post = db.session.merge( new_post = db.session.merge(
post_from_api_object(status, post.mastodon_instance)) post_from_api_object(status, post.mastodon_instance))
new_posts.append(new_post) new_posts.append(new_post)
except MastodonAPIError as e: except (MastodonAPIError,
MastodonNetworkError,
MastodonRatelimitError) as e:
if str(e) == 'Endpoint not found.': if str(e) == 'Endpoint not found.':
db.session.delete(post) db.session.delete(post)
else: else:
raise e raise TemporaryError(e)
return new_posts return new_posts
def delete(post): def delete(post):
api = get_api_for_acc(post.author) api = get_api_for_acc(post.author)
api.status_delete(post.mastodon_id) try:
db.session.delete(post) api.status_delete(post.mastodon_id)
db.session.delete(post)
except (MastodonAPIError,
MastodonNetworkError,
MastodonRatelimitError) as e:
raise TemporaryError(e)

View File

@ -1,4 +1,4 @@
from twitter import Twitter, OAuth, TwitterHTTPError from twitter import Twitter, OAuth, TwitterError
from werkzeug.urls import url_decode from werkzeug.urls import url_decode
from model import OAuthToken, Account, Post, TwitterArchive from model import OAuthToken, Account, Post, TwitterArchive
from app import db, app, sentry from app import db, app, sentry
@ -7,6 +7,8 @@ from datetime import datetime
import locale import locale
from zipfile import ZipFile from zipfile import ZipFile
from io import BytesIO from io import BytesIO
from lib.exceptions import PermanentError, TemporaryError
from urllib.error import URLError
def get_login_url(callback='oob', consumer_key=None, consumer_secret=None): def get_login_url(callback='oob', consumer_key=None, consumer_secret=None):
@ -76,7 +78,7 @@ def get_twitter_for_acc(account):
try: try:
t.account.verify_credentials() t.account.verify_credentials()
return t return t
except TwitterHTTPError as e: except TwitterError as e:
if e.e.code == 401: if e.e.code == 401:
# token revoked # token revoked
@ -86,8 +88,7 @@ def get_twitter_for_acc(account):
db.session.delete(token) db.session.delete(token)
db.session.commit() db.session.commit()
else: else:
# temporary error, re-raise raise TemporaryError(e)
raise e
return None return None
@ -140,7 +141,10 @@ def fetch_acc(account, cursor):
if most_recent_post: if most_recent_post:
kwargs['since_id'] = most_recent_post.twitter_id kwargs['since_id'] = most_recent_post.twitter_id
tweets = t.statuses.user_timeline(**kwargs) try:
tweets = t.statuses.user_timeline(**kwargs)
except (TwitterError, URLError) as e:
handle_error(e)
print("processing {} tweets for {acc}".format(len(tweets), acc=account)) print("processing {} tweets for {acc}".format(len(tweets), acc=account))
@ -166,10 +170,13 @@ def refresh_posts(posts):
t = get_twitter_for_acc(posts[0].author) t = get_twitter_for_acc(posts[0].author)
if not t: if not t:
raise Exception('shit idk. twitter says no') return
tweets = t.statuses.lookup( try:
_id=",".join((post.twitter_id for post in posts)), tweets = t.statuses.lookup(
trim_user=True, tweet_mode='extended') _id=",".join((post.twitter_id for post in posts)),
trim_user=True, tweet_mode='extended')
except (URLError, TwitterError) as e:
handle_error(e)
refreshed_posts = list() refreshed_posts = list()
for post in posts: for post in posts:
tweet = next( tweet = next(
@ -201,3 +208,13 @@ def chunk_twitter_archive(archive_id):
files.sort() files.sort()
return files return files
def handle_error(e):
if isinstance(e, TwitterError):
if e.code and e.code == 326:
# account locked lol rip
# although this is a temporary error in twitter terms
# it's best not to waste api calls on locked accounts
raise PermanentError(e)
raise TemporaryError(e)

View File

@ -0,0 +1,24 @@
"""add dormant to account
Revision ID: c1f7444d0f75
Revises: 3a0138499994
Create Date: 2017-09-04 21:57:23.648580
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'c1f7444d0f75'
down_revision = '3a0138499994'
branch_labels = None
depends_on = None
def upgrade():
op.add_column('accounts', sa.Column('dormant', sa.Boolean(), server_default='FALSE', nullable=False))
def downgrade():
op.drop_column('accounts', 'dormant')

View File

@ -101,6 +101,7 @@ class Account(TimestampMixin, RemoteIDMixin):
next_delete = db.Column(db.DateTime(timezone=True), index=True) next_delete = db.Column(db.DateTime(timezone=True), index=True)
reason = db.Column(db.String) reason = db.Column(db.String)
dormant = db.Column(db.Boolean, server_default='FALSE', nullable=False)
def touch_fetch(self): def touch_fetch(self):
self.last_fetch = db.func.now() self.last_fetch = db.func.now()
@ -137,11 +138,12 @@ class Account(TimestampMixin, RemoteIDMixin):
return value return value
@db.validates('policy_enabled') @db.validates('policy_enabled')
def on_enable(self, key, enable): def on_policy_enable(self, key, enable):
if not self.policy_enabled and enable: if not self.policy_enabled and enable:
self.next_delete = ( self.next_delete = (
datetime.now(timezone.utc) + self.policy_delete_every) datetime.now(timezone.utc) + self.policy_delete_every)
self.reason = None self.reason = None
self.dormant = False
return enable return enable
# backref: tokens # backref: tokens

View File

@ -1,4 +1,4 @@
from flask import render_template, url_for, redirect, request, g, Response,\ from flask import render_template, url_for, redirect, request, g,\
jsonify, make_response jsonify, make_response
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
import lib.twitter import lib.twitter
@ -94,6 +94,19 @@ def twitter_login_step1():
url_for('index', twitter_login_error='', _anchor='log_in')) url_for('index', twitter_login_error='', _anchor='log_in'))
def login(account_id):
session = Session(account_id=account_id)
db.session.add(session)
db.session.commit()
session.account.dormant = False
db.session.commit()
tasks.fetch_acc.s(account_id).apply_async(routing_key='high')
return session
@app.route('/login/twitter/callback') @app.route('/login/twitter/callback')
@limiter.limit('3/minute') @limiter.limit('3/minute')
def twitter_login_step2(): def twitter_login_step2():
@ -104,15 +117,10 @@ def twitter_login_step2():
oauth_token, oauth_verifier, oauth_token, oauth_verifier,
**app.config.get_namespace("TWITTER_")) **app.config.get_namespace("TWITTER_"))
session = Session(account_id=token.account_id) session = login(token.account_id)
db.session.add(session)
db.session.commit()
tasks.fetch_acc.s(token.account_id).apply_async(routing_key='high') g.viewer = session
return redirect(url_for('index'))
resp = Response(status=302, headers={"location": url_for('index')})
set_session_cookie(session, resp, app.config.get('HTTPS'))
return resp
except (TwitterError, URLError): except (TwitterError, URLError):
if sentry: if sentry:
sentry.captureException() sentry.captureException()
@ -305,8 +313,7 @@ def mastodon_login_step2(instance_url):
token = lib.mastodon.receive_code(code, app, callback) token = lib.mastodon.receive_code(code, app, callback)
account = token.account account = token.account
sess = Session(account=account) session = login(account.id)
db.session.add(sess)
instance = MastodonInstance(instance=instance_url) instance = MastodonInstance(instance=instance_url)
instance = db.session.merge(instance) instance = db.session.merge(instance)
@ -314,9 +321,7 @@ def mastodon_login_step2(instance_url):
db.session.commit() db.session.commit()
tasks.fetch_acc.s(account.id).apply_async(routing_key='high') g.viewer = session
g.viewer = sess
return redirect(url_for('index')) return redirect(url_for('index'))

193
tasks.py
View File

@ -5,9 +5,6 @@ from model import Session, Account, TwitterArchive, Post, OAuthToken,\
MastodonInstance MastodonInstance
import lib.twitter import lib.twitter
import lib.mastodon import lib.mastodon
from mastodon.Mastodon import MastodonRatelimitError
from twitter import TwitterError
from urllib.error import URLError
from datetime import timedelta from datetime import timedelta
from zipfile import ZipFile from zipfile import ZipFile
from io import BytesIO, TextIOWrapper from io import BytesIO, TextIOWrapper
@ -15,6 +12,7 @@ import json
from kombu import Queue from kombu import Queue
import random import random
import version import version
from lib.exceptions import PermanentError, TemporaryError
app = Celery('tasks', broker=flaskapp.config['CELERY_BROKER'], app = Celery('tasks', broker=flaskapp.config['CELERY_BROKER'],
@ -51,7 +49,19 @@ def noop(*args, **kwargs):
pass pass
@app.task(autoretry_for=(TwitterError, URLError, MastodonRatelimitError)) def make_dormant(acc):
acc.reason = '''
Your account was temporarily disabled because your {service}
account was suspended or otherwise inaccessible. By logging into
it, you have reactivated your account, but be aware that some posts
may be missing from Forget's database, and it may take some time to
get back in sync.
'''.format(acc.service)
acc.dormant = True
db.session.commit()
@app.task(autoretry_for=(TemporaryError,))
def fetch_acc(id_, cursor=None): def fetch_acc(id_, cursor=None):
acc = Account.query.get(id_) acc = Account.query.get(id_)
print(f'fetching {acc}') print(f'fetching {acc}')
@ -64,26 +74,16 @@ def fetch_acc(id_, cursor=None):
cursor = action(acc, cursor) cursor = action(acc, cursor)
if cursor: if cursor:
fetch_acc.si(id_, cursor).apply_async() fetch_acc.si(id_, cursor).apply_async()
except PermanentError as e:
db.session.rollback()
make_dormant(acc)
finally: finally:
db.session.rollback() db.session.rollback()
acc.touch_fetch() acc.touch_fetch()
db.session.commit() db.session.commit()
@app.task @app.task()
def queue_fetch_for_most_stale_accounts(
min_staleness=timedelta(minutes=2), limit=20):
accs = Account.query\
.join(Account.tokens).group_by(Account)\
.filter(Account.last_fetch < db.func.now() - min_staleness)\
.order_by(db.asc(Account.last_fetch))\
.limit(limit)
for acc in accs:
fetch_acc.s(acc.id).delay()
db.session.commit()
@app.task(autoretry_for=(TwitterError, URLError))
def import_twitter_archive_month(archive_id, month_path): def import_twitter_archive_month(archive_id, month_path):
ta = TwitterArchive.query.get(archive_id) ta = TwitterArchive.query.get(archive_id)
@ -117,62 +117,7 @@ def import_twitter_archive_month(archive_id, month_path):
raise e raise e
@app.task @app.task(autoretry_for=(TemporaryError,))
def periodic_cleanup():
# delete sessions after 48 hours
(Session.query
.filter(Session.updated_at < (db.func.now() - timedelta(hours=48)))
.delete(synchronize_session=False))
# delete twitter archives after 3 days
(TwitterArchive.query
.filter(TwitterArchive.updated_at < (db.func.now() - timedelta(days=3)))
.delete(synchronize_session=False))
# delete anonymous oauth tokens after 1 day
(OAuthToken.query
.filter(OAuthToken.updated_at < (db.func.now() - timedelta(days=1)))
.filter(OAuthToken.account_id == None) # noqa: E711
.delete(synchronize_session=False))
# disable and log out users with no tokens
unreachable = (
Account.query
.outerjoin(Account.tokens)
.group_by(Account).having(db.func.count(OAuthToken.token) == 0)
.filter(Account.policy_enabled == True)) # noqa: E712
for account in unreachable:
account.force_log_out()
account.policy_enabled = False
account.reason = """
Your account was disabled because Forget no longer had access to
your {service} account. Perhaps you had revoked it? By logging in,
you have restored access and you can now re-enable Forget if you wish.
""".format(service=account.service.capitalize())
# normalise mastodon instance popularity scores
biggest_instance = (
MastodonInstance.query
.order_by(db.desc(MastodonInstance.popularity)).first())
if biggest_instance.popularity > 40:
MastodonInstance.query.update({
MastodonInstance.popularity:
MastodonInstance.popularity * 40 / biggest_instance.popularity
})
db.session.commit()
@app.task
def queue_deletes():
eligible_accounts = (
Account.query.filter(Account.policy_enabled == True) # noqa: E712
.filter(Account.next_delete < db.func.now()))
for account in eligible_accounts:
delete_from_account.s(account.id).apply_async()
@app.task(autoretry_for=(TwitterError, URLError, MastodonRatelimitError))
def delete_from_account(account_id): def delete_from_account(account_id):
account = Account.query.get(account_id) account = Account.query.get(account_id)
latest_n_posts = (Post.query.with_parent(account) latest_n_posts = (Post.query.with_parent(account)
@ -231,34 +176,108 @@ def refresh_posts(posts):
return lib.mastodon.refresh_posts(posts) return lib.mastodon.refresh_posts(posts)
@app.task(autoretry_for=(TwitterError, URLError), @app.task(autoretry_for=(TemporaryError,))
throws=(MastodonRatelimitError))
def refresh_account(account_id): def refresh_account(account_id):
account = Account.query.get(account_id) account = Account.query.get(account_id)
limit = 100 try:
if account.service == 'mastodon': limit = 100
limit = 5 if account.service == 'mastodon':
posts = (Post.query.with_parent(account) limit = 5
.order_by(db.asc(Post.updated_at)).limit(limit).all()) posts = (Post.query.with_parent(account)
.order_by(db.asc(Post.updated_at)).limit(limit).all())
posts = refresh_posts(posts)
account.touch_refresh()
db.session.commit()
except PermanentError as e:
db.session.rollback()
make_dormant(account)
@app.task
def periodic_cleanup():
# delete sessions after 48 hours
(Session.query
.filter(Session.updated_at < (db.func.now() - timedelta(hours=48)))
.delete(synchronize_session=False))
# delete twitter archives after 3 days
(TwitterArchive.query
.filter(TwitterArchive.updated_at < (db.func.now() - timedelta(days=3)))
.delete(synchronize_session=False))
# delete anonymous oauth tokens after 1 day
(OAuthToken.query
.filter(OAuthToken.updated_at < (db.func.now() - timedelta(days=1)))
.filter(OAuthToken.account_id == None) # noqa: E711
.delete(synchronize_session=False))
# disable and log out users with no tokens
unreachable = (
Account.query
.outerjoin(Account.tokens)
.group_by(Account).having(db.func.count(OAuthToken.token) == 0)
.filter(Account.policy_enabled == True)) # noqa: E712
for account in unreachable:
account.force_log_out()
account.policy_enabled = False
account.reason = """
Your account was disabled because Forget no longer had access to
your {service} account. Perhaps you had revoked it? By logging in,
you have restored access and you can now re-enable Forget if you wish.
""".format(service=account.service.capitalize())
# normalise mastodon instance popularity scores
biggest_instance = (
MastodonInstance.query
.order_by(db.desc(MastodonInstance.popularity)).first())
if biggest_instance.popularity > 40:
MastodonInstance.query.update({
MastodonInstance.popularity:
MastodonInstance.popularity * 40 / biggest_instance.popularity
})
posts = refresh_posts(posts)
account.touch_refresh()
db.session.commit() db.session.commit()
@app.task(autoretry_for=(TwitterError, URLError), @app.task
throws=(MastodonRatelimitError)) def queue_fetch_for_most_stale_accounts(
min_staleness=timedelta(minutes=2), limit=20):
accs = (Account.query
.join(Account.tokens).group_by(Account)
.filter(Account.last_fetch < db.func.now() - min_staleness)
.filter(~Account.dormant)
.order_by(db.asc(Account.last_fetch))
.limit(limit)
)
for acc in accs:
fetch_acc.s(acc.id).delay()
db.session.commit()
@app.task
def queue_deletes():
eligible_accounts = (
Account.query.filter(Account.policy_enabled == True) # noqa: E712
.filter(Account.next_delete < db.func.now())
.filter(~Account.dormant))
for account in eligible_accounts:
delete_from_account.s(account.id).apply_async()
@app.task
def refresh_account_with_oldest_post(): def refresh_account_with_oldest_post():
post = (Post.query.outerjoin(Post.author).join(Account.tokens) post = (Post.query.outerjoin(Post.author).join(Account.tokens)
.filter(~Account.dormant)
.group_by(Post).order_by(db.asc(Post.updated_at)).first()) .group_by(Post).order_by(db.asc(Post.updated_at)).first())
refresh_account(post.author_id) refresh_account(post.author_id)
@app.task(autoretry_for=(TwitterError, URLError), @app.task
throws=(MastodonRatelimitError))
def refresh_account_with_longest_time_since_refresh(): def refresh_account_with_longest_time_since_refresh():
acc = (Account.query.join(Account.tokens).group_by(Account) acc = (Account.query.join(Account.tokens).group_by(Account)
.filter(~Account.dormant)
.order_by(db.asc(Account.last_refresh)).first()) .order_by(db.asc(Account.last_refresh)).first())
refresh_account(acc.id) refresh_account(acc.id)