from celery import Celery from app import app as flaskapp from app import db from model import Session, Account, TwitterArchive, Post import lib.twitter from twitter import TwitterError from urllib.error import URLError from datetime import timedelta, datetime from zipfile import ZipFile from io import BytesIO, TextIOWrapper import json from kombu import Queue import random app = Celery('tasks', broker=flaskapp.config['CELERY_BROKER'], task_serializer='pickle') app.conf.task_queues = ( Queue('default', routing_key='celery'), Queue('high_prio', routing_key='high'), Queue('higher_prio', routing_key='higher'), ) app.conf.task_default_queue = 'default' app.conf.task_default_exchange = 'celery' app.conf.task_default_exchange_type = 'direct' @app.task(autoretry_for=(TwitterError, URLError)) def fetch_acc(id, cursor=None): acc = Account.query.get(id) print(f'fetching {acc}') try: if(acc.service == 'twitter'): cursor = lib.twitter.fetch_acc(acc, cursor, **flaskapp.config.get_namespace("TWITTER_")) if cursor: fetch_acc.si(id, cursor).apply_async() finally: db.session.rollback() acc.last_fetch = db.func.now() db.session.commit() @app.task def queue_fetch_for_most_stale_accounts(min_staleness=timedelta(minutes=5), limit=20): accs = Account.query\ .filter(Account.last_fetch < db.func.now() - min_staleness)\ .order_by(db.asc(Account.last_fetch))\ .limit(limit) for acc in accs: fetch_acc.s(acc.id).delay() acc.touch_fetch() db.session.commit() @app.task def chunk_twitter_archive(archive_id): ta = TwitterArchive.query.get(archive_id) with ZipFile(BytesIO(ta.body), 'r') as zipfile: files = [filename for filename in zipfile.namelist() if filename.startswith('data/js/tweets/') and filename.endswith('.js')] files.sort() ta.chunks = len(files) db.session.commit() for filename in files: import_twitter_archive_month.s(archive_id, filename).apply_async() @app.task def import_twitter_archive_month(archive_id, month_path): ta = TwitterArchive.query.get(archive_id) try: with ZipFile(BytesIO(ta.body), 'r') as zipfile: with TextIOWrapper(zipfile.open(month_path, 'r')) as f: # seek past header f.readline() tweets = json.load(f) for tweet in tweets: post = lib.twitter.post_from_api_tweet_object(tweet) existing_post = db.session.query(Post).get(post.id) if post.author_id != ta.account_id \ or existing_post and existing_post.author_id != ta.account_id: raise Exception("Shenanigans!") post = db.session.merge(post) ta.chunks_successful = TwitterArchive.chunks_successful + 1 db.session.commit() except Exception as e: db.session.rollback() ta.chunks_failed = TwitterArchive.chunks_failed + 1 db.session.commit() raise e @app.task def periodic_cleanup(): Session.query.filter(Session.updated_at < (db.func.now() - timedelta(hours=48))).\ delete(synchronize_session=False) TwitterArchive.query.filter(TwitterArchive.updated_at < (db.func.now() - timedelta(days=7))).\ delete(synchronize_session=False) db.session.commit() @app.task def queue_deletes(): eligible_accounts = Account.query.filter(Account.policy_enabled == True).\ filter(Account.last_delete + Account.policy_delete_every < db.func.now()) for account in eligible_accounts: delete_from_account.s(account.id).apply_async() @app.task def delete_from_account(account_id): account = Account.query.get(account_id) latest_n_posts = db.session.query(Post.id).with_parent(account).order_by(db.desc(Post.created_at)).limit(account.policy_keep_latest) posts = Post.query.with_parent(account).\ filter(Post.created_at + account.policy_keep_younger <= db.func.now()).\ filter(~Post.id.in_(latest_n_posts)).\ order_by(db.func.random()).limit(100).all() posts = refresh_account(account_id) if account.service == 'twitter': eligible = list((post for post in posts if not account.policy_keep_favourites or not post.favourite)) if eligible: if account.policy_delete_every == timedelta(0): print("deleting all {} eligible posts for {}".format(len(eligible), account)) for post in eligible: lib.twitter.delete(post) else: post = random.choice(list((post for post in posts if not account.policy_keep_favourites or not post.favourite))) print("deleting {}".format(post)) lib.twitter.delete(post) account.touch_delete() db.session.commit() def refresh_posts(posts): posts = list(posts) if len(posts) == 0: return [] if posts[0].service == 'twitter': return lib.twitter.refresh_posts(posts) @app.task def refresh_account(account_id): account = Account.query.get(account_id) oldest_post = Post.query.with_parent(account).order_by(db.asc(Post.updated_at)).first() if not oldest_post: return [] posts = Post.query.with_parent(account).filter(Post.id != oldest_post.id).order_by(db.func.random()).limit(99).all() posts.append(oldest_post) posts = refresh_posts(posts) db.session.commit() return posts @app.task def refresh_account_with_oldest_post(): post = Post.query.options(db.joinedload(Post.author)).order_by(db.asc(Post.updated_at)).first() return refresh_account(post.author_id) app.add_periodic_task(6*60*60, periodic_cleanup) app.add_periodic_task(45, queue_fetch_for_most_stale_accounts) app.add_periodic_task(45, queue_deletes) app.add_periodic_task(30*60, refresh_account_with_oldest_post) if __name__ == '__main__': app.worker_main()