forget-cancellare-vecchi-toot/tasks.py

54 lines
1.5 KiB
Python
Raw Normal View History

2017-07-27 20:20:59 +02:00
from celery import Celery
from app import app as flaskapp
from app import db
from model import Session, Account
2017-07-28 12:48:00 +02:00
import lib.twitter
from twitter import TwitterError
from urllib.error import URLError
2017-07-28 01:07:51 +02:00
from datetime import timedelta, datetime
2017-07-27 20:20:59 +02:00
app = Celery('tasks', broker=flaskapp.config['CELERY_BROKER'], task_serializer='pickle')
@app.task
def remove_old_sessions():
2017-07-30 13:05:27 +02:00
Session.query.filter(Session.updated_at < (db.func.now() - timedelta(hours=48))).\
2017-07-27 20:20:59 +02:00
delete(synchronize_session=False)
db.session.commit()
@app.task(autoretry_for=(TwitterError, URLError))
def fetch_acc(id, cursor=None):
acc = Account.query.get(id)
2017-07-29 12:01:32 +02:00
print(f'fetching {acc}')
try:
if(acc.service == 'twitter'):
cursor = lib.twitter.fetch_acc(acc, cursor, **flaskapp.config.get_namespace("TWITTER_"))
if cursor:
fetch_acc.si(id, cursor).apply_async()
2017-07-29 12:01:32 +02:00
finally:
db.session.rollback()
acc.last_fetch = db.func.now()
db.session.commit()
@app.task
def queue_fetch_for_most_stale_accounts(min_staleness=timedelta(minutes=5), limit=20):
2017-07-28 12:48:00 +02:00
accs = Account.query\
.filter(Account.last_fetch < db.func.now() - min_staleness)\
.order_by(db.asc(Account.last_fetch))\
.limit(limit)
for acc in accs:
fetch_acc.s(acc.id).delay()
2017-07-28 12:48:00 +02:00
acc.last_fetch = db.func.now()
db.session.commit()
2017-07-27 20:20:59 +02:00
2017-07-28 01:07:51 +02:00
app.add_periodic_task(10*60, remove_old_sessions)
2017-07-28 12:48:00 +02:00
app.add_periodic_task(60, queue_fetch_for_most_stale_accounts)
2017-07-27 20:20:59 +02:00
if __name__ == '__main__':
app.worker_main()