2017-07-27 20:20:59 +02:00
|
|
|
from celery import Celery
|
|
|
|
|
|
|
|
from app import app as flaskapp
|
|
|
|
from app import db
|
2017-07-31 00:07:34 +02:00
|
|
|
from model import Session, Account, TwitterArchive
|
2017-07-28 12:48:00 +02:00
|
|
|
import lib.twitter
|
|
|
|
from twitter import TwitterError
|
2017-07-29 09:18:09 +02:00
|
|
|
from urllib.error import URLError
|
2017-07-28 01:07:51 +02:00
|
|
|
from datetime import timedelta, datetime
|
2017-07-31 00:07:34 +02:00
|
|
|
from zipfile import ZipFile
|
|
|
|
from io import BytesIO, TextIOWrapper
|
|
|
|
import csv
|
2017-07-27 20:20:59 +02:00
|
|
|
|
|
|
|
app = Celery('tasks', broker=flaskapp.config['CELERY_BROKER'], task_serializer='pickle')
|
|
|
|
|
|
|
|
@app.task
|
|
|
|
def remove_old_sessions():
|
2017-07-30 13:05:27 +02:00
|
|
|
Session.query.filter(Session.updated_at < (db.func.now() - timedelta(hours=48))).\
|
2017-07-27 20:20:59 +02:00
|
|
|
delete(synchronize_session=False)
|
|
|
|
db.session.commit()
|
|
|
|
|
2017-07-29 09:18:09 +02:00
|
|
|
@app.task(autoretry_for=(TwitterError, URLError))
|
2017-07-29 17:43:09 +02:00
|
|
|
def fetch_acc(id, cursor=None):
|
|
|
|
acc = Account.query.get(id)
|
2017-07-29 12:01:32 +02:00
|
|
|
print(f'fetching {acc}')
|
|
|
|
try:
|
2017-07-29 17:43:09 +02:00
|
|
|
if(acc.service == 'twitter'):
|
|
|
|
cursor = lib.twitter.fetch_acc(acc, cursor, **flaskapp.config.get_namespace("TWITTER_"))
|
|
|
|
if cursor:
|
|
|
|
fetch_acc.si(id, cursor).apply_async()
|
2017-07-29 12:01:32 +02:00
|
|
|
finally:
|
|
|
|
db.session.rollback()
|
|
|
|
acc.last_fetch = db.func.now()
|
|
|
|
db.session.commit()
|
|
|
|
|
|
|
|
|
2017-07-28 00:08:20 +02:00
|
|
|
|
|
|
|
@app.task
|
2017-07-29 09:18:09 +02:00
|
|
|
def queue_fetch_for_most_stale_accounts(min_staleness=timedelta(minutes=5), limit=20):
|
2017-07-28 12:48:00 +02:00
|
|
|
accs = Account.query\
|
|
|
|
.filter(Account.last_fetch < db.func.now() - min_staleness)\
|
|
|
|
.order_by(db.asc(Account.last_fetch))\
|
|
|
|
.limit(limit)
|
|
|
|
for acc in accs:
|
2017-07-29 17:43:09 +02:00
|
|
|
fetch_acc.s(acc.id).delay()
|
2017-07-31 00:39:40 +02:00
|
|
|
acc.touch_fetch()
|
2017-07-28 12:48:00 +02:00
|
|
|
db.session.commit()
|
2017-07-27 20:20:59 +02:00
|
|
|
|
2017-07-28 01:07:51 +02:00
|
|
|
app.add_periodic_task(10*60, remove_old_sessions)
|
2017-07-28 12:48:00 +02:00
|
|
|
app.add_periodic_task(60, queue_fetch_for_most_stale_accounts)
|
2017-07-27 20:20:59 +02:00
|
|
|
|
2017-07-31 00:07:34 +02:00
|
|
|
@app.task
|
|
|
|
def import_twitter_archive(id):
|
|
|
|
ta = TwitterArchive.query.get(id)
|
|
|
|
|
|
|
|
with ZipFile(BytesIO(ta.body), 'r') as zipfile:
|
|
|
|
tweetscsv = TextIOWrapper(zipfile.open('tweets.csv', 'r'))
|
|
|
|
|
|
|
|
for tweet in csv.DictReader(tweetscsv):
|
|
|
|
tweet = lib.twitter.csv_tweet_to_json_tweet(tweet, ta.account)
|
|
|
|
post = lib.twitter.tweet_to_post(tweet)
|
|
|
|
db.session.merge(post)
|
|
|
|
db.session.commit()
|
|
|
|
|
|
|
|
db.session.delete(ta)
|
|
|
|
db.session.commit()
|
2017-07-27 20:20:59 +02:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
app.worker_main()
|