diff --git a/lib/exceptions.py b/lib/exceptions.py new file mode 100644 index 0000000..1ff4f2b --- /dev/null +++ b/lib/exceptions.py @@ -0,0 +1,6 @@ +class PermanentError(Exception): + pass + + +class TemporaryError(Exception): + pass diff --git a/lib/mastodon.py b/lib/mastodon.py index 378be9f..9a0af19 100644 --- a/lib/mastodon.py +++ b/lib/mastodon.py @@ -1,10 +1,12 @@ from mastodon import Mastodon -from mastodon.Mastodon import MastodonAPIError +from mastodon.Mastodon import MastodonAPIError, MastodonNetworkError,\ + MastodonRatelimitError from model import MastodonApp, Account, OAuthToken, Post from requests import head from app import db, sentry from math import inf import iso8601 +from lib.exceptions import TemporaryError def get_or_create_app(instance_url, callback, website): @@ -77,19 +79,24 @@ def get_api_for_acc(account): ratelimit_method='throw', ) - # api.verify_credentials() - # doesnt error even if the token is revoked lol - # https://github.com/tootsuite/mastodon/issues/4637 - # so we have to do this: - tl = api.timeline() - if 'error' in tl: - if sentry: - sentry.captureMessage( - 'Mastodon auth revoked or incorrect', extra=locals()) - db.session.delete(token) - db.session.commit() - continue - return api + try: + # api.verify_credentials() + # doesnt error even if the token is revoked lol + # https://github.com/tootsuite/mastodon/issues/4637 + # so we have to do this: + tl = api.timeline() + if 'error' in tl: + if sentry: + sentry.captureMessage('Mastodon auth revoked or incorrect', + extra=locals()) + db.session.delete(token) + db.session.commit() + continue + return api + except (MastodonAPIError, + MastodonNetworkError, + MastodonRatelimitError) as e: + raise TemporaryError(e) def fetch_acc(acc, cursor=None): @@ -98,37 +105,42 @@ def fetch_acc(acc, cursor=None): print('no access, aborting') return None - newacc = account_from_api_object( - api.account_verify_credentials(), acc.mastodon_instance) - acc = db.session.merge(newacc) + try: + newacc = account_from_api_object( + api.account_verify_credentials(), acc.mastodon_instance) + acc = db.session.merge(newacc) - kwargs = dict(limit=40) - if cursor: - kwargs.update(cursor) + kwargs = dict(limit=40) + if cursor: + kwargs.update(cursor) - if 'max_id' not in kwargs: - most_recent_post = ( - Post.query.with_parent(acc) - .order_by(db.desc(Post.created_at)).first()) - if most_recent_post: - kwargs['since_id'] = most_recent_post.mastodon_id + if 'max_id' not in kwargs: + most_recent_post = ( + Post.query.with_parent(acc) + .order_by(db.desc(Post.created_at)).first()) + if most_recent_post: + kwargs['since_id'] = most_recent_post.mastodon_id - statuses = api.account_statuses(acc.mastodon_id, **kwargs) + statuses = api.account_statuses(acc.mastodon_id, **kwargs) - if statuses: - kwargs['max_id'] = +inf + if statuses: + kwargs['max_id'] = +inf - for status in statuses: - post = post_from_api_object(status, acc.mastodon_instance) - db.session.merge(post) - kwargs['max_id'] = min(kwargs['max_id'], status['id']) + for status in statuses: + post = post_from_api_object(status, acc.mastodon_instance) + db.session.merge(post) + kwargs['max_id'] = min(kwargs['max_id'], status['id']) - else: - kwargs = None + else: + kwargs = None - db.session.commit() + db.session.commit() - return kwargs + return kwargs + except (MastodonAPIError, + MastodonNetworkError, + MastodonRatelimitError) as e: + raise TemporaryError(e) def post_from_api_object(obj, instance): @@ -168,16 +180,23 @@ def refresh_posts(posts): new_post = db.session.merge( post_from_api_object(status, post.mastodon_instance)) new_posts.append(new_post) - except MastodonAPIError as e: + except (MastodonAPIError, + MastodonNetworkError, + MastodonRatelimitError) as e: if str(e) == 'Endpoint not found.': db.session.delete(post) else: - raise e + raise TemporaryError(e) return new_posts def delete(post): api = get_api_for_acc(post.author) - api.status_delete(post.mastodon_id) - db.session.delete(post) + try: + api.status_delete(post.mastodon_id) + db.session.delete(post) + except (MastodonAPIError, + MastodonNetworkError, + MastodonRatelimitError) as e: + raise TemporaryError(e) diff --git a/lib/twitter.py b/lib/twitter.py index 4819f34..c062130 100644 --- a/lib/twitter.py +++ b/lib/twitter.py @@ -1,4 +1,4 @@ -from twitter import Twitter, OAuth, TwitterHTTPError +from twitter import Twitter, OAuth, TwitterError from werkzeug.urls import url_decode from model import OAuthToken, Account, Post, TwitterArchive from app import db, app, sentry @@ -7,6 +7,8 @@ from datetime import datetime import locale from zipfile import ZipFile from io import BytesIO +from lib.exceptions import PermanentError, TemporaryError +from urllib.error import URLError def get_login_url(callback='oob', consumer_key=None, consumer_secret=None): @@ -76,7 +78,7 @@ def get_twitter_for_acc(account): try: t.account.verify_credentials() return t - except TwitterHTTPError as e: + except TwitterError as e: if e.e.code == 401: # token revoked @@ -86,8 +88,7 @@ def get_twitter_for_acc(account): db.session.delete(token) db.session.commit() else: - # temporary error, re-raise - raise e + raise TemporaryError(e) return None @@ -140,7 +141,10 @@ def fetch_acc(account, cursor): if most_recent_post: kwargs['since_id'] = most_recent_post.twitter_id - tweets = t.statuses.user_timeline(**kwargs) + try: + tweets = t.statuses.user_timeline(**kwargs) + except (TwitterError, URLError) as e: + handle_error(e) print("processing {} tweets for {acc}".format(len(tweets), acc=account)) @@ -166,10 +170,13 @@ def refresh_posts(posts): t = get_twitter_for_acc(posts[0].author) if not t: - raise Exception('shit idk. twitter says no') - tweets = t.statuses.lookup( - _id=",".join((post.twitter_id for post in posts)), - trim_user=True, tweet_mode='extended') + return + try: + tweets = t.statuses.lookup( + _id=",".join((post.twitter_id for post in posts)), + trim_user=True, tweet_mode='extended') + except (URLError, TwitterError) as e: + handle_error(e) refreshed_posts = list() for post in posts: tweet = next( @@ -201,3 +208,13 @@ def chunk_twitter_archive(archive_id): files.sort() return files + + +def handle_error(e): + if isinstance(e, TwitterError): + if e.code and e.code == 326: + # account locked lol rip + # although this is a temporary error in twitter terms + # it's best not to waste api calls on locked accounts + raise PermanentError(e) + raise TemporaryError(e) diff --git a/migrations/versions/c1f7444d0f75_add_dormant_to_account.py b/migrations/versions/c1f7444d0f75_add_dormant_to_account.py new file mode 100644 index 0000000..a699129 --- /dev/null +++ b/migrations/versions/c1f7444d0f75_add_dormant_to_account.py @@ -0,0 +1,24 @@ +"""add dormant to account + +Revision ID: c1f7444d0f75 +Revises: 3a0138499994 +Create Date: 2017-09-04 21:57:23.648580 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'c1f7444d0f75' +down_revision = '3a0138499994' +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column('accounts', sa.Column('dormant', sa.Boolean(), server_default='FALSE', nullable=False)) + + +def downgrade(): + op.drop_column('accounts', 'dormant') diff --git a/model.py b/model.py index 21599c9..c8c1e0a 100644 --- a/model.py +++ b/model.py @@ -101,6 +101,7 @@ class Account(TimestampMixin, RemoteIDMixin): next_delete = db.Column(db.DateTime(timezone=True), index=True) reason = db.Column(db.String) + dormant = db.Column(db.Boolean, server_default='FALSE', nullable=False) def touch_fetch(self): self.last_fetch = db.func.now() @@ -137,11 +138,12 @@ class Account(TimestampMixin, RemoteIDMixin): return value @db.validates('policy_enabled') - def on_enable(self, key, enable): + def on_policy_enable(self, key, enable): if not self.policy_enabled and enable: self.next_delete = ( datetime.now(timezone.utc) + self.policy_delete_every) self.reason = None + self.dormant = False return enable # backref: tokens diff --git a/routes.py b/routes.py index 10e99f9..747589a 100644 --- a/routes.py +++ b/routes.py @@ -1,4 +1,4 @@ -from flask import render_template, url_for, redirect, request, g, Response,\ +from flask import render_template, url_for, redirect, request, g,\ jsonify, make_response from datetime import datetime, timedelta, timezone import lib.twitter @@ -94,6 +94,19 @@ def twitter_login_step1(): url_for('index', twitter_login_error='', _anchor='log_in')) +def login(account_id): + session = Session(account_id=account_id) + db.session.add(session) + db.session.commit() + + session.account.dormant = False + db.session.commit() + + tasks.fetch_acc.s(account_id).apply_async(routing_key='high') + + return session + + @app.route('/login/twitter/callback') @limiter.limit('3/minute') def twitter_login_step2(): @@ -104,15 +117,10 @@ def twitter_login_step2(): oauth_token, oauth_verifier, **app.config.get_namespace("TWITTER_")) - session = Session(account_id=token.account_id) - db.session.add(session) - db.session.commit() + session = login(token.account_id) - tasks.fetch_acc.s(token.account_id).apply_async(routing_key='high') - - resp = Response(status=302, headers={"location": url_for('index')}) - set_session_cookie(session, resp, app.config.get('HTTPS')) - return resp + g.viewer = session + return redirect(url_for('index')) except (TwitterError, URLError): if sentry: sentry.captureException() @@ -305,8 +313,7 @@ def mastodon_login_step2(instance_url): token = lib.mastodon.receive_code(code, app, callback) account = token.account - sess = Session(account=account) - db.session.add(sess) + session = login(account.id) instance = MastodonInstance(instance=instance_url) instance = db.session.merge(instance) @@ -314,9 +321,7 @@ def mastodon_login_step2(instance_url): db.session.commit() - tasks.fetch_acc.s(account.id).apply_async(routing_key='high') - - g.viewer = sess + g.viewer = session return redirect(url_for('index')) diff --git a/tasks.py b/tasks.py index fb9a0c7..d70c77c 100644 --- a/tasks.py +++ b/tasks.py @@ -5,9 +5,6 @@ from model import Session, Account, TwitterArchive, Post, OAuthToken,\ MastodonInstance import lib.twitter import lib.mastodon -from mastodon.Mastodon import MastodonRatelimitError -from twitter import TwitterError -from urllib.error import URLError from datetime import timedelta from zipfile import ZipFile from io import BytesIO, TextIOWrapper @@ -15,6 +12,7 @@ import json from kombu import Queue import random import version +from lib.exceptions import PermanentError, TemporaryError app = Celery('tasks', broker=flaskapp.config['CELERY_BROKER'], @@ -51,7 +49,19 @@ def noop(*args, **kwargs): pass -@app.task(autoretry_for=(TwitterError, URLError, MastodonRatelimitError)) +def make_dormant(acc): + acc.reason = ''' + Your account was temporarily disabled because your {service} + account was suspended or otherwise inaccessible. By logging into + it, you have reactivated your account, but be aware that some posts + may be missing from Forget's database, and it may take some time to + get back in sync. + '''.format(acc.service) + acc.dormant = True + db.session.commit() + + +@app.task(autoretry_for=(TemporaryError,)) def fetch_acc(id_, cursor=None): acc = Account.query.get(id_) print(f'fetching {acc}') @@ -64,26 +74,16 @@ def fetch_acc(id_, cursor=None): cursor = action(acc, cursor) if cursor: fetch_acc.si(id_, cursor).apply_async() + except PermanentError as e: + db.session.rollback() + make_dormant(acc) finally: db.session.rollback() acc.touch_fetch() db.session.commit() -@app.task -def queue_fetch_for_most_stale_accounts( - min_staleness=timedelta(minutes=2), limit=20): - accs = Account.query\ - .join(Account.tokens).group_by(Account)\ - .filter(Account.last_fetch < db.func.now() - min_staleness)\ - .order_by(db.asc(Account.last_fetch))\ - .limit(limit) - for acc in accs: - fetch_acc.s(acc.id).delay() - db.session.commit() - - -@app.task(autoretry_for=(TwitterError, URLError)) +@app.task() def import_twitter_archive_month(archive_id, month_path): ta = TwitterArchive.query.get(archive_id) @@ -117,62 +117,7 @@ def import_twitter_archive_month(archive_id, month_path): raise e -@app.task -def periodic_cleanup(): - # delete sessions after 48 hours - (Session.query - .filter(Session.updated_at < (db.func.now() - timedelta(hours=48))) - .delete(synchronize_session=False)) - - # delete twitter archives after 3 days - (TwitterArchive.query - .filter(TwitterArchive.updated_at < (db.func.now() - timedelta(days=3))) - .delete(synchronize_session=False)) - - # delete anonymous oauth tokens after 1 day - (OAuthToken.query - .filter(OAuthToken.updated_at < (db.func.now() - timedelta(days=1))) - .filter(OAuthToken.account_id == None) # noqa: E711 - .delete(synchronize_session=False)) - - # disable and log out users with no tokens - unreachable = ( - Account.query - .outerjoin(Account.tokens) - .group_by(Account).having(db.func.count(OAuthToken.token) == 0) - .filter(Account.policy_enabled == True)) # noqa: E712 - for account in unreachable: - account.force_log_out() - account.policy_enabled = False - account.reason = """ - Your account was disabled because Forget no longer had access to - your {service} account. Perhaps you had revoked it? By logging in, - you have restored access and you can now re-enable Forget if you wish. - """.format(service=account.service.capitalize()) - - # normalise mastodon instance popularity scores - biggest_instance = ( - MastodonInstance.query - .order_by(db.desc(MastodonInstance.popularity)).first()) - if biggest_instance.popularity > 40: - MastodonInstance.query.update({ - MastodonInstance.popularity: - MastodonInstance.popularity * 40 / biggest_instance.popularity - }) - - db.session.commit() - - -@app.task -def queue_deletes(): - eligible_accounts = ( - Account.query.filter(Account.policy_enabled == True) # noqa: E712 - .filter(Account.next_delete < db.func.now())) - for account in eligible_accounts: - delete_from_account.s(account.id).apply_async() - - -@app.task(autoretry_for=(TwitterError, URLError, MastodonRatelimitError)) +@app.task(autoretry_for=(TemporaryError,)) def delete_from_account(account_id): account = Account.query.get(account_id) latest_n_posts = (Post.query.with_parent(account) @@ -231,34 +176,108 @@ def refresh_posts(posts): return lib.mastodon.refresh_posts(posts) -@app.task(autoretry_for=(TwitterError, URLError), - throws=(MastodonRatelimitError)) +@app.task(autoretry_for=(TemporaryError,)) def refresh_account(account_id): account = Account.query.get(account_id) - limit = 100 - if account.service == 'mastodon': - limit = 5 - posts = (Post.query.with_parent(account) - .order_by(db.asc(Post.updated_at)).limit(limit).all()) + try: + limit = 100 + if account.service == 'mastodon': + limit = 5 + posts = (Post.query.with_parent(account) + .order_by(db.asc(Post.updated_at)).limit(limit).all()) + + posts = refresh_posts(posts) + account.touch_refresh() + db.session.commit() + except PermanentError as e: + db.session.rollback() + make_dormant(account) + + +@app.task +def periodic_cleanup(): + # delete sessions after 48 hours + (Session.query + .filter(Session.updated_at < (db.func.now() - timedelta(hours=48))) + .delete(synchronize_session=False)) + + # delete twitter archives after 3 days + (TwitterArchive.query + .filter(TwitterArchive.updated_at < (db.func.now() - timedelta(days=3))) + .delete(synchronize_session=False)) + + # delete anonymous oauth tokens after 1 day + (OAuthToken.query + .filter(OAuthToken.updated_at < (db.func.now() - timedelta(days=1))) + .filter(OAuthToken.account_id == None) # noqa: E711 + .delete(synchronize_session=False)) + + # disable and log out users with no tokens + unreachable = ( + Account.query + .outerjoin(Account.tokens) + .group_by(Account).having(db.func.count(OAuthToken.token) == 0) + .filter(Account.policy_enabled == True)) # noqa: E712 + for account in unreachable: + account.force_log_out() + account.policy_enabled = False + account.reason = """ + Your account was disabled because Forget no longer had access to + your {service} account. Perhaps you had revoked it? By logging in, + you have restored access and you can now re-enable Forget if you wish. + """.format(service=account.service.capitalize()) + + # normalise mastodon instance popularity scores + biggest_instance = ( + MastodonInstance.query + .order_by(db.desc(MastodonInstance.popularity)).first()) + if biggest_instance.popularity > 40: + MastodonInstance.query.update({ + MastodonInstance.popularity: + MastodonInstance.popularity * 40 / biggest_instance.popularity + }) - posts = refresh_posts(posts) - account.touch_refresh() db.session.commit() -@app.task(autoretry_for=(TwitterError, URLError), - throws=(MastodonRatelimitError)) +@app.task +def queue_fetch_for_most_stale_accounts( + min_staleness=timedelta(minutes=2), limit=20): + accs = (Account.query + .join(Account.tokens).group_by(Account) + .filter(Account.last_fetch < db.func.now() - min_staleness) + .filter(~Account.dormant) + .order_by(db.asc(Account.last_fetch)) + .limit(limit) + ) + for acc in accs: + fetch_acc.s(acc.id).delay() + db.session.commit() + + +@app.task +def queue_deletes(): + eligible_accounts = ( + Account.query.filter(Account.policy_enabled == True) # noqa: E712 + .filter(Account.next_delete < db.func.now()) + .filter(~Account.dormant)) + for account in eligible_accounts: + delete_from_account.s(account.id).apply_async() + + +@app.task def refresh_account_with_oldest_post(): post = (Post.query.outerjoin(Post.author).join(Account.tokens) + .filter(~Account.dormant) .group_by(Post).order_by(db.asc(Post.updated_at)).first()) refresh_account(post.author_id) -@app.task(autoretry_for=(TwitterError, URLError), - throws=(MastodonRatelimitError)) +@app.task def refresh_account_with_longest_time_since_refresh(): acc = (Account.query.join(Account.tokens).group_by(Account) + .filter(~Account.dormant) .order_by(db.asc(Account.last_refresh)).first()) refresh_account(acc.id)