diff --git a/tasks.py b/tasks.py index e411f7d..2a95597 100644 --- a/tasks.py +++ b/tasks.py @@ -13,12 +13,6 @@ import json app = Celery('tasks', broker=flaskapp.config['CELERY_BROKER'], task_serializer='pickle') -@app.task -def remove_old_sessions(): - Session.query.filter(Session.updated_at < (db.func.now() - timedelta(hours=48))).\ - delete(synchronize_session=False) - db.session.commit() - @app.task(autoretry_for=(TwitterError, URLError)) def fetch_acc(id, cursor=None): acc = Account.query.get(id) @@ -33,8 +27,6 @@ def fetch_acc(id, cursor=None): acc.last_fetch = db.func.now() db.session.commit() - - @app.task def queue_fetch_for_most_stale_accounts(min_staleness=timedelta(minutes=5), limit=20): accs = Account.query\ @@ -46,9 +38,6 @@ def queue_fetch_for_most_stale_accounts(min_staleness=timedelta(minutes=5), limi acc.touch_fetch() db.session.commit() -app.add_periodic_task(10*60, remove_old_sessions) -app.add_periodic_task(60, queue_fetch_for_most_stale_accounts) - @app.task def import_twitter_archive(archive_id): ta = TwitterArchive.query.get(archive_id) @@ -93,6 +82,16 @@ def import_twitter_archive_month(archive_id, month_path): raise e +@app.task +def periodic_cleanup(): + Session.query.filter(Session.updated_at < (db.func.now() - timedelta(hours=48))).\ + delete(synchronize_session=False) + TwitterArchive.query.filter(TwitterArchive.updated_at < (db.func.now() - timedelta(days=7))).\ + delete(synchronize_session=False) + db.session.commit() + +app.add_periodic_task(6*60*60, periodic_cleanup) +app.add_periodic_task(60, queue_fetch_for_most_stale_accounts) if __name__ == '__main__': app.worker_main()