forget-cancellare-vecchi-toot/tasks.py

149 lines
5.0 KiB
Python
Raw Normal View History

2017-07-27 20:20:59 +02:00
from celery import Celery
from app import app as flaskapp
from app import db
from model import Session, Account, TwitterArchive, Post
2017-07-28 12:48:00 +02:00
import lib.twitter
from twitter import TwitterError
from urllib.error import URLError
2017-07-28 01:07:51 +02:00
from datetime import timedelta, datetime
2017-07-31 00:07:34 +02:00
from zipfile import ZipFile
from io import BytesIO, TextIOWrapper
2017-07-31 01:57:03 +02:00
import json
2017-08-01 20:57:15 +02:00
from kombu import Queue
2017-08-02 01:35:09 +02:00
import random
2017-07-27 20:20:59 +02:00
app = Celery('tasks', broker=flaskapp.config['CELERY_BROKER'], task_serializer='pickle')
2017-08-01 20:57:15 +02:00
app.conf.task_queues = (
Queue('default', routing_key='celery'),
Queue('high_prio', routing_key='high'),
Queue('higher_prio', routing_key='higher'),
)
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange = 'celery'
app.conf.task_default_exchange_type = 'direct'
2017-07-27 20:20:59 +02:00
@app.task(autoretry_for=(TwitterError, URLError))
def fetch_acc(id, cursor=None):
acc = Account.query.get(id)
2017-07-29 12:01:32 +02:00
print(f'fetching {acc}')
try:
if(acc.service == 'twitter'):
cursor = lib.twitter.fetch_acc(acc, cursor, **flaskapp.config.get_namespace("TWITTER_"))
if cursor:
fetch_acc.si(id, cursor).apply_async()
2017-07-29 12:01:32 +02:00
finally:
db.session.rollback()
acc.last_fetch = db.func.now()
db.session.commit()
@app.task
def queue_fetch_for_most_stale_accounts(min_staleness=timedelta(minutes=5), limit=20):
2017-07-28 12:48:00 +02:00
accs = Account.query\
.filter(Account.last_fetch < db.func.now() - min_staleness)\
.order_by(db.asc(Account.last_fetch))\
.limit(limit)
for acc in accs:
fetch_acc.s(acc.id).delay()
2017-07-31 00:39:40 +02:00
acc.touch_fetch()
2017-07-28 12:48:00 +02:00
db.session.commit()
2017-07-27 20:20:59 +02:00
2017-07-31 00:07:34 +02:00
@app.task
def chunk_twitter_archive(archive_id):
2017-07-31 01:57:03 +02:00
ta = TwitterArchive.query.get(archive_id)
2017-07-31 00:07:34 +02:00
with ZipFile(BytesIO(ta.body), 'r') as zipfile:
2017-07-31 01:57:03 +02:00
files = [filename for filename in zipfile.namelist() if filename.startswith('data/js/tweets/') and filename.endswith('.js')]
2017-07-31 00:07:34 +02:00
2017-07-31 01:57:03 +02:00
files.sort()
2017-07-31 00:07:34 +02:00
2017-07-31 01:57:03 +02:00
ta.chunks = len(files)
2017-07-31 00:07:34 +02:00
db.session.commit()
2017-07-27 20:20:59 +02:00
2017-07-31 01:57:03 +02:00
for filename in files:
import_twitter_archive_month.s(archive_id, filename).apply_async()
@app.task
def import_twitter_archive_month(archive_id, month_path):
ta = TwitterArchive.query.get(archive_id)
try:
with ZipFile(BytesIO(ta.body), 'r') as zipfile:
with TextIOWrapper(zipfile.open(month_path, 'r')) as f:
# seek past header
f.readline()
tweets = json.load(f)
for tweet in tweets:
post = lib.twitter.post_from_api_tweet_object(tweet)
existing_post = db.session.query(Post).get(post.id)
if post.author_id != ta.account_id \
or existing_post and existing_post.author_id != ta.account_id:
raise Exception("Shenanigans!")
2017-07-31 01:57:03 +02:00
post = db.session.merge(post)
ta.chunks_successful = TwitterArchive.chunks_successful + 1
db.session.commit()
except Exception as e:
db.session.rollback()
ta.chunks_failed = TwitterArchive.chunks_failed + 1
db.session.commit()
raise e
2017-07-27 20:20:59 +02:00
2017-07-31 02:00:07 +02:00
@app.task
def periodic_cleanup():
Session.query.filter(Session.updated_at < (db.func.now() - timedelta(hours=48))).\
delete(synchronize_session=False)
TwitterArchive.query.filter(TwitterArchive.updated_at < (db.func.now() - timedelta(days=7))).\
delete(synchronize_session=False)
db.session.commit()
2017-08-02 01:35:09 +02:00
@app.task
def queue_deletes():
eligible_accounts = Account.query.filter(Account.policy_enabled == True).\
filter(Account.last_delete + Account.policy_delete_every < db.func.now())
for account in eligible_accounts:
delete_from_account.s(account.id).apply_async()
@app.task
def delete_from_account(account_id):
account = Account.query.get(account_id)
latest_n_posts = db.session.query(Post.id).with_parent(account).order_by(db.desc(Post.created_at)).limit(account.policy_keep_latest)
posts = Post.query.with_parent(account).\
filter(Post.created_at + account.policy_keep_younger <= db.func.now()).\
filter(~Post.id.in_(latest_n_posts)).\
order_by(db.func.random()).limit(100).all()
if account.service == 'twitter':
posts = lib.twitter.refresh_posts(posts)
eligible = list((post for post in posts if not account.policy_keep_favourites or not post.favourite))
if eligible:
if account.policy_delete_every == timedelta(0):
print("deleting all {} eligible posts for {}".format(len(eligible), account))
for post in eligible:
lib.twitter.delete(post)
else:
post = random.choice(list((post for post in posts if not account.policy_keep_favourites or not post.favourite)))
print("deleting {}".format(post))
lib.twitter.delete(post)
account.touch_delete()
2017-08-02 01:35:09 +02:00
db.session.commit()
2017-07-31 02:00:07 +02:00
app.add_periodic_task(6*60*60, periodic_cleanup)
2017-08-02 01:35:09 +02:00
app.add_periodic_task(45, queue_fetch_for_most_stale_accounts)
app.add_periodic_task(45, queue_deletes)
2017-07-27 20:20:59 +02:00
if __name__ == '__main__':
app.worker_main()
2017-08-02 01:35:09 +02:00