from celery import Celery, Task from app import app as flaskapp from app import db from model import Session, Account, TwitterArchive, Post, OAuthToken,\ MastodonInstance import libforget.twitter import libforget.mastodon from datetime import timedelta, datetime, timezone from time import time from zipfile import ZipFile from io import BytesIO, TextIOWrapper import json from kombu import Queue import version from libforget.exceptions import PermanentError, TemporaryError import redis from functools import wraps import pickle import logging app = Celery( 'tasks', broker=flaskapp.config['CELERY_BROKER'], task_serializer='pickle', task_soft_time_limit=600, task_time_limit=1200, ) app.conf.task_queues = ( Queue('default', routing_key='celery'), Queue('high_prio', routing_key='high'), Queue('higher_prio', routing_key='higher'), ) app.conf.task_default_queue = 'default' app.conf.task_default_exchange = 'celery' app.conf.task_default_exchange_type = 'direct' sentry = None if 'SENTRY_DSN' in flaskapp.config: from raven import Client from raven.contrib.celery import register_signal, register_logger_signal sentry = Client( flaskapp.config['SENTRY_DSN'], release=version.get_versions()['version']) register_logger_signal(sentry) register_signal(sentry) class DBTask(Task): def __call__(self, *args, **kwargs): try: super().__call__(*args, **kwargs) finally: db.session.close() app.Task = DBTask r = None def unique(fun): global r if not r: r = redis.StrictRedis.from_url(flaskapp.config['REDIS_URI']) @wraps(fun) def wrapper(*args, **kwargs): logging.info('Checking for dupes for unique task %s', (fun.__name__, args, kwargs)) key = 'celery_unique_lock:epoch1:{}:{}'.format( fun.__name__, pickle.dumps((args, kwargs))) has_lock = False result = None try: if r.set(key, 1, nx=True, ex=60 * 5): logging.info('No dupes for unique, running task %s', (fun.__name__, args, kwargs)) has_lock = True result = fun(*args, **kwargs) else: logging.info('Unique task has a dupe, skipping %s', (fun.__name__, args, kwargs)) finally: if has_lock: r.delete(key) return result return wrapper def noop(*args, **kwargs): pass def make_dormant(acc): acc.reason = ''' Your account was temporarily disabled because your {service} account was suspended or otherwise inaccessible. By logging into it, you have reactivated your account, but be aware that some posts may be missing from Forget's database, and it may take some time to get back in sync. '''.format(service=acc.service) acc.dormant = True @app.task(autoretry_for=(TemporaryError, )) @unique def fetch_acc(id_): account = Account.query.get(id_) print("Fetching {}".format(account)) try: if not account.fetch_history_complete: oldest = (db.session.query(Post) .with_parent(account, 'posts') .order_by(db.asc(Post.created_at)) .first()) # ^ None if this is our first fetch ever, otherwise the oldest post if oldest: max_id = oldest.remote_id else: max_id = None since_id = None elif account.fetch_current_batch_end: oldest = (db.session.query(Post) .with_parent(account, 'posts') .filter(Post.created_at > account.fetch_current_batch_end.created_at) .order_by(db.asc(Post.created_at)) .first()) # ^ None if this is our first fetch of this batch, otherwise oldest of this batch if oldest: max_id = oldest.remote_id else: max_id = None since_id = account.fetch_current_batch_end.remote_id else: # we shouldn't get here unless the user had no posts on the service last time we fetched max_id = None latest = (db.session.query(Post) .with_parent(account, 'posts') .order_by(db.desc(Post.created_at)) .limit(1) .scalar()) # ^ should be None considering the user has no posts # will be the latest post in the off chance that something goes weird if latest: since_id = latest.remote_id else: since_id = None fetch_posts = noop if (account.service == 'twitter'): fetch_posts = libforget.twitter.fetch_posts elif (account.service == 'mastodon'): fetch_posts = libforget.mastodon.fetch_posts posts = fetch_posts(account, max_id, since_id) if posts is None: # ??? raise TemporaryError("Fetching posts went horribly wrong") if ( len([post for post in posts if post.remote_id not in (max_id, since_id)]) == 0 ): # if there are no posts other than the edges # we either finished the historic fetch # or we finished the current batch account.fetch_history_complete = True batch_end = (Post.query.with_parent(account, 'posts').order_by( db.desc(Post.created_at)).first()) if batch_end: account.fetch_current_batch_end_id = batch_end.id else: account.fetch_current_batch_end_id = None db.session.commit() else: for post in posts: db.session.merge(post) db.session.commit() if not account.fetch_history_complete: # reschedule immediately if we're still doing the historic fetch print("{} is not done fetching history, resheduling.".format(account)) fetch_acc.apply_async((id_,), countdown=1) except TemporaryError: db.session.rollback() account.backoff() except PermanentError: db.session.rollback() make_dormant(account) if sentry: sentry.captureException() finally: db.session.rollback() account.touch_fetch() db.session.commit() @app.task() def import_twitter_archive_month(archive_id, month_path): ta = TwitterArchive.query.get(archive_id) try: with ZipFile(BytesIO(ta.body), 'r') as zipfile: with TextIOWrapper(zipfile.open(month_path, 'r')) as f: # seek past header f.readline() tweets = json.load(f) for tweet in tweets: post = libforget.twitter.post_from_api_tweet_object(tweet) existing_post = db.session.query(Post).get(post.id) if post.author_id != ta.account_id or\ existing_post and existing_post.author_id != ta.account_id: raise Exception("Shenanigans!") post = db.session.merge(post) ta.chunks_successful = TwitterArchive.chunks_successful + 1 db.session.commit() except Exception as e: db.session.rollback() ta.chunks_failed = TwitterArchive.chunks_failed + 1 db.session.commit() raise e @app.task() @unique def delete_from_account(account_id): account = Account.query.get(account_id) if account.next_delete > datetime.now(timezone.utc): return latest_n_posts = (Post.query.with_parent(account, 'posts').order_by( db.desc(Post.created_at)).limit(account.policy_keep_latest) .cte(name='latest')) posts = ( Post.query.with_parent(account, 'posts') .filter(Post.created_at + account.policy_keep_younger <= db.func.now()) .filter(~Post.id.in_(db.select((latest_n_posts.c.id, ))))) if(account.policy_keep_favourites != 'none'): posts = posts.filter(db.or_( Post.favourite == (account.policy_keep_favourites == 'deleteonly'), Post.is_reblog)) if(account.policy_keep_media != 'none'): posts = posts.filter(db.or_( Post.has_media == (account.policy_keep_media == 'deleteonly'), Post.is_reblog)) if(account.policy_keep_direct): posts = posts.filter(~Post.direct) limit = 100 if account.service == 'mastodon': limit = 10 posts = posts.order_by(db.func.random()).limit(limit).all() to_delete = None def is_eligible(post): return (post.is_reblog or ( (account.policy_keep_favourites == 'none' or (account.policy_keep_favourites == 'keeponly' and not post.favourite) or (account.policy_keep_favourites == 'deleteonly' and post.favourite)) and (account.policy_keep_media == 'none' or (account.policy_keep_media == 'keeponly' and not post.has_media) or (account.policy_keep_media == 'deleteonly' and post.has_media)) and (not account.policy_keep_direct or not post.direct))) try: action = noop if account.service == 'twitter': action = libforget.twitter.delete posts = refresh_posts(posts) to_delete = next(filter(is_eligible, posts), None) elif account.service == 'mastodon': action = libforget.mastodon.delete for post in posts: refreshed = refresh_posts((post, )) if refreshed and is_eligible(refreshed[0]): to_delete = refreshed[0] break if to_delete: print("Deleting {}".format(to_delete)) account.touch_delete() action(to_delete) account.reset_backoff() else: account.next_delete = db.func.now() + timedelta(minutes=3) except TemporaryError: db.session.rollback() account.backoff() finally: db.session.commit() def refresh_posts(posts): posts = list(posts) if len(posts) == 0: return [] if posts[0].service == 'twitter': return libforget.twitter.refresh_posts(posts) elif posts[0].service == 'mastodon': return libforget.mastodon.refresh_posts(posts) @app.task() @unique def refresh_account(account_id): account = Account.query.get(account_id) print("Refreshing account {}".format(account)) try: limit = 100 if account.service == 'mastodon': limit = 3 posts = (Post.query.with_parent(account, 'posts').order_by( db.asc(Post.updated_at)).limit(limit).all()) posts = refresh_posts(posts) account.touch_refresh() account.reset_backoff() except TemporaryError: db.session.rollback() account.backoff() except PermanentError: db.session.rollback() make_dormant(account) if sentry: sentry.captureException() except Exception as e: db.session.rollback() account.backoff() db.session.commit() raise e finally: db.session.commit() @app.task @unique def periodic_cleanup(): # delete sessions after 48 hours (Session.query.filter( Session.updated_at < (db.func.now() - timedelta(hours=48))) .delete(synchronize_session=False)) # delete twitter archives after 3 days (TwitterArchive.query.filter( TwitterArchive.updated_at < (db.func.now() - timedelta(days=3))) .delete(synchronize_session=False)) # delete anonymous oauth tokens after 1 day (OAuthToken.query.filter( OAuthToken.updated_at < (db.func.now() - timedelta(days=1))).filter( OAuthToken.account_id == None) # noqa: E711 .delete(synchronize_session=False)) # disable and log out users with no tokens unreachable = ( Account.query.outerjoin(Account.tokens) .group_by(Account).having(db.func.count(OAuthToken.token) == 0) .filter(Account.policy_enabled == True)) # noqa: E712 for account in unreachable: account.force_log_out() account.policy_enabled = False account.reason = """ Your account was disabled because Forget no longer had access to your {service} account. Perhaps you had revoked it? By logging in, you have restored access and you can now re-enable Forget if you wish. """.format(service=account.service.capitalize()) db.session.commit() @app.task @unique def queue_fetch_for_most_stale_accounts( min_staleness=timedelta(minutes=2), limit=20): accs = (Account.query.join(Account.tokens).group_by(Account) .filter(Account.last_fetch < db.func.now() - min_staleness) .filter(Account.backoff_until < db.func.now()) .filter(~Account.dormant).order_by(db.asc( Account.last_fetch)).limit(limit)) for acc in accs: fetch_acc.s(acc.id).delay() db.session.commit() @app.task @unique def queue_deletes(): eligible_accounts = ( Account.query.filter(Account.policy_enabled == True) # noqa: E712 .filter(Account.next_delete < db.func.now()) .filter(Account.backoff_until < db.func.now()) .filter(~Account.dormant)) for account in eligible_accounts: delete_from_account.s(account.id).apply_async() @app.task @unique def refresh_account_with_oldest_post(): then = time() post = db.session.query(Post).from_statement(db.text(""" SELECT posts.id, posts.author_id FROM posts, accounts, oauth_tokens WHERE accounts.id = posts.author_id AND accounts.id = oauth_tokens.account_id AND accounts.backoff_until < now() AND NOT accounts.dormant ORDER BY posts.updated_at ASC LIMIT 1; """).columns(Post.id, Post.author_id)).one() if post: aid = post.author_id refresh_account(aid) now = time() logging.info('Refreshed posts for {} for having oldest post in {}s'.format(aid, now-then)) @app.task @unique def refresh_account_with_longest_time_since_refresh(): acc = (Account.query.join(Account.tokens).group_by(Account) .filter(Account.backoff_until < db.func.now()) .filter(~Account.dormant).order_by(db.asc( Account.last_refresh)).first()) if acc: refresh_account(acc.id) @app.task def update_mastodon_instances_popularity(): # bump score for each active account for acct in (Account.query.options(db.joinedload(Account.sessions)) .filter(~Account.dormant).filter( Account.id.like('mastodon:%'))): instance = MastodonInstance.query.get(acct.mastodon_instance) if not instance: instance = MastodonInstance( instance=acct.mastodon_instance, popularity=10) db.session.add(instance) amount = 0.01 if acct.policy_enabled: amount = 0.5 for _ in acct.sessions: amount += 0.1 instance.bump(amount / max(1, instance.popularity)) # normalise scores so the top is 20 top_pop = (db.session.query(db.func.max(MastodonInstance.popularity)) .scalar()) MastodonInstance.query.update({ MastodonInstance.popularity: MastodonInstance.popularity * 20 / top_pop }) db.session.commit() app.add_periodic_task(40, queue_fetch_for_most_stale_accounts) app.add_periodic_task(9, queue_deletes) app.add_periodic_task(6, refresh_account_with_oldest_post) app.add_periodic_task(50, refresh_account_with_longest_time_since_refresh) app.add_periodic_task(300, periodic_cleanup) app.add_periodic_task(300, update_mastodon_instances_popularity) if __name__ == '__main__': app.worker_main()