2017-08-11 00:31:02 +02:00
|
|
|
from celery import Celery, Task
|
2017-07-27 20:20:59 +02:00
|
|
|
|
|
|
|
from app import app as flaskapp
|
|
|
|
from app import db
|
2017-08-08 04:16:57 +02:00
|
|
|
from model import Session, Account, TwitterArchive, Post, OAuthToken
|
2017-07-28 12:48:00 +02:00
|
|
|
import lib.twitter
|
|
|
|
from twitter import TwitterError
|
2017-07-29 09:18:09 +02:00
|
|
|
from urllib.error import URLError
|
2017-07-28 01:07:51 +02:00
|
|
|
from datetime import timedelta, datetime
|
2017-07-31 00:07:34 +02:00
|
|
|
from zipfile import ZipFile
|
|
|
|
from io import BytesIO, TextIOWrapper
|
2017-07-31 01:57:03 +02:00
|
|
|
import json
|
2017-08-01 20:57:15 +02:00
|
|
|
from kombu import Queue
|
2017-08-02 01:35:09 +02:00
|
|
|
import random
|
2017-08-07 23:42:38 +02:00
|
|
|
import version
|
2017-07-27 20:20:59 +02:00
|
|
|
|
|
|
|
app = Celery('tasks', broker=flaskapp.config['CELERY_BROKER'], task_serializer='pickle')
|
2017-08-01 20:57:15 +02:00
|
|
|
app.conf.task_queues = (
|
|
|
|
Queue('default', routing_key='celery'),
|
|
|
|
Queue('high_prio', routing_key='high'),
|
|
|
|
Queue('higher_prio', routing_key='higher'),
|
|
|
|
)
|
|
|
|
app.conf.task_default_queue = 'default'
|
|
|
|
app.conf.task_default_exchange = 'celery'
|
|
|
|
app.conf.task_default_exchange_type = 'direct'
|
2017-07-27 20:20:59 +02:00
|
|
|
|
2017-08-07 13:46:05 +02:00
|
|
|
if 'SENTRY_DSN' in flaskapp.config:
|
|
|
|
from raven import Client
|
|
|
|
from raven.contrib.celery import register_signal, register_logger_signal
|
2017-08-07 23:42:38 +02:00
|
|
|
sentry = Client(flaskapp.config['SENTRY_DSN'], release=version.version)
|
2017-08-07 13:46:05 +02:00
|
|
|
register_logger_signal(sentry)
|
|
|
|
register_signal(sentry)
|
|
|
|
|
2017-08-11 00:31:02 +02:00
|
|
|
|
|
|
|
class DBTask(Task):
|
|
|
|
def __call__(self, *args, **kwargs):
|
|
|
|
try:
|
|
|
|
super().__call__(*args, **kwargs)
|
|
|
|
finally:
|
|
|
|
db.session.close()
|
|
|
|
|
|
|
|
app.Task = DBTask
|
|
|
|
|
2017-07-29 09:18:09 +02:00
|
|
|
@app.task(autoretry_for=(TwitterError, URLError))
|
2017-07-29 17:43:09 +02:00
|
|
|
def fetch_acc(id, cursor=None):
|
|
|
|
acc = Account.query.get(id)
|
2017-07-29 12:01:32 +02:00
|
|
|
print(f'fetching {acc}')
|
|
|
|
try:
|
2017-07-29 17:43:09 +02:00
|
|
|
if(acc.service == 'twitter'):
|
|
|
|
cursor = lib.twitter.fetch_acc(acc, cursor, **flaskapp.config.get_namespace("TWITTER_"))
|
|
|
|
if cursor:
|
|
|
|
fetch_acc.si(id, cursor).apply_async()
|
2017-07-29 12:01:32 +02:00
|
|
|
finally:
|
|
|
|
db.session.rollback()
|
2017-08-14 20:01:59 +02:00
|
|
|
acc.touch_fetch()
|
2017-07-29 12:01:32 +02:00
|
|
|
db.session.commit()
|
|
|
|
|
2017-07-28 00:08:20 +02:00
|
|
|
@app.task
|
2017-07-29 09:18:09 +02:00
|
|
|
def queue_fetch_for_most_stale_accounts(min_staleness=timedelta(minutes=5), limit=20):
|
2017-07-28 12:48:00 +02:00
|
|
|
accs = Account.query\
|
2017-08-14 20:01:59 +02:00
|
|
|
.join(Account.tokens).group_by(Account)\
|
2017-07-28 12:48:00 +02:00
|
|
|
.filter(Account.last_fetch < db.func.now() - min_staleness)\
|
|
|
|
.order_by(db.asc(Account.last_fetch))\
|
|
|
|
.limit(limit)
|
|
|
|
for acc in accs:
|
2017-07-29 17:43:09 +02:00
|
|
|
fetch_acc.s(acc.id).delay()
|
2017-07-31 00:39:40 +02:00
|
|
|
acc.touch_fetch()
|
2017-07-28 12:48:00 +02:00
|
|
|
db.session.commit()
|
2017-07-27 20:20:59 +02:00
|
|
|
|
2017-07-31 01:57:03 +02:00
|
|
|
|
2017-08-08 23:00:22 +02:00
|
|
|
@app.task(autoretry_for=(TwitterError, URLError))
|
2017-07-31 01:57:03 +02:00
|
|
|
def import_twitter_archive_month(archive_id, month_path):
|
|
|
|
ta = TwitterArchive.query.get(archive_id)
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
with ZipFile(BytesIO(ta.body), 'r') as zipfile:
|
|
|
|
with TextIOWrapper(zipfile.open(month_path, 'r')) as f:
|
|
|
|
|
|
|
|
# seek past header
|
|
|
|
f.readline()
|
|
|
|
|
|
|
|
tweets = json.load(f)
|
|
|
|
|
|
|
|
for tweet in tweets:
|
2017-08-03 20:01:06 +02:00
|
|
|
post = lib.twitter.post_from_api_tweet_object(tweet)
|
2017-07-31 03:53:05 +02:00
|
|
|
existing_post = db.session.query(Post).get(post.id)
|
|
|
|
|
|
|
|
if post.author_id != ta.account_id \
|
|
|
|
or existing_post and existing_post.author_id != ta.account_id:
|
|
|
|
raise Exception("Shenanigans!")
|
|
|
|
|
2017-07-31 01:57:03 +02:00
|
|
|
post = db.session.merge(post)
|
|
|
|
|
|
|
|
ta.chunks_successful = TwitterArchive.chunks_successful + 1
|
|
|
|
db.session.commit()
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
db.session.rollback()
|
|
|
|
ta.chunks_failed = TwitterArchive.chunks_failed + 1
|
|
|
|
db.session.commit()
|
|
|
|
raise e
|
|
|
|
|
2017-07-27 20:20:59 +02:00
|
|
|
|
2017-07-31 02:00:07 +02:00
|
|
|
@app.task
|
|
|
|
def periodic_cleanup():
|
2017-08-08 04:16:57 +02:00
|
|
|
# delete sessions after 48 hours
|
2017-07-31 02:00:07 +02:00
|
|
|
Session.query.filter(Session.updated_at < (db.func.now() - timedelta(hours=48))).\
|
|
|
|
delete(synchronize_session=False)
|
2017-08-08 04:16:57 +02:00
|
|
|
|
2017-08-09 12:40:41 +02:00
|
|
|
# delete twitter archives after 3 days
|
|
|
|
TwitterArchive.query.filter(TwitterArchive.updated_at < (db.func.now() - timedelta(days=3))).\
|
2017-07-31 02:00:07 +02:00
|
|
|
delete(synchronize_session=False)
|
|
|
|
|
2017-08-08 04:16:57 +02:00
|
|
|
# delete anonymous oauth tokens after 1 day
|
|
|
|
OAuthToken.query.filter(OAuthToken.updated_at < (db.func.now() - timedelta(days=1)))\
|
|
|
|
.filter(OAuthToken.account_id == None)\
|
|
|
|
.delete(synchronize_session=False)
|
2017-08-13 17:10:53 +02:00
|
|
|
|
|
|
|
# disable users with no tokens
|
|
|
|
unreachable = Account.query.outerjoin(Account.tokens).group_by(Account).having(db.func.count(OAuthToken.token) == 0).filter(Account.policy_enabled == True)
|
|
|
|
for account in unreachable:
|
|
|
|
account.policy_enabled = False
|
|
|
|
|
2017-08-08 04:16:57 +02:00
|
|
|
db.session.commit()
|
2017-08-02 01:35:09 +02:00
|
|
|
|
|
|
|
@app.task
|
|
|
|
def queue_deletes():
|
|
|
|
eligible_accounts = Account.query.filter(Account.policy_enabled == True).\
|
2017-08-14 20:58:22 +02:00
|
|
|
filter(Account.next_delete < db.func.now())
|
2017-08-02 01:35:09 +02:00
|
|
|
for account in eligible_accounts:
|
|
|
|
delete_from_account.s(account.id).apply_async()
|
|
|
|
|
2017-08-08 23:00:22 +02:00
|
|
|
@app.task(autoretry_for=(TwitterError, URLError))
|
2017-08-02 01:35:09 +02:00
|
|
|
def delete_from_account(account_id):
|
|
|
|
account = Account.query.get(account_id)
|
2017-08-09 10:25:41 +02:00
|
|
|
latest_n_posts = Post.query.with_parent(account).order_by(db.desc(Post.created_at)).limit(account.policy_keep_latest)
|
2017-08-02 01:35:09 +02:00
|
|
|
posts = Post.query.with_parent(account).\
|
|
|
|
filter(Post.created_at + account.policy_keep_younger <= db.func.now()).\
|
2017-08-09 10:25:41 +02:00
|
|
|
except_(latest_n_posts).\
|
2017-08-02 01:35:09 +02:00
|
|
|
order_by(db.func.random()).limit(100).all()
|
|
|
|
|
2017-08-04 16:41:12 +02:00
|
|
|
posts = refresh_posts(posts)
|
2017-08-02 01:35:09 +02:00
|
|
|
if account.service == 'twitter':
|
2017-08-08 23:00:22 +02:00
|
|
|
eligible = list((post for post in posts if
|
2017-08-08 15:38:54 +02:00
|
|
|
(not account.policy_keep_favourites or not post.favourite)
|
|
|
|
and (not account.policy_keep_media or not post.has_media)
|
|
|
|
))
|
2017-08-02 01:35:09 +02:00
|
|
|
if eligible:
|
|
|
|
if account.policy_delete_every == timedelta(0):
|
|
|
|
print("deleting all {} eligible posts for {}".format(len(eligible), account))
|
|
|
|
for post in eligible:
|
2017-08-14 20:58:22 +02:00
|
|
|
account.touch_delete()
|
2017-08-14 21:22:42 +02:00
|
|
|
lib.twitter.delete(post)
|
2017-08-02 01:35:09 +02:00
|
|
|
else:
|
2017-08-08 16:04:22 +02:00
|
|
|
post = random.choice(eligible)
|
2017-08-02 01:35:09 +02:00
|
|
|
print("deleting {}".format(post))
|
2017-08-03 11:51:37 +02:00
|
|
|
account.touch_delete()
|
2017-08-14 21:22:42 +02:00
|
|
|
lib.twitter.delete(post)
|
2017-08-02 01:35:09 +02:00
|
|
|
|
|
|
|
db.session.commit()
|
|
|
|
|
2017-08-03 21:37:00 +02:00
|
|
|
def refresh_posts(posts):
|
|
|
|
posts = list(posts)
|
|
|
|
if len(posts) == 0:
|
|
|
|
return []
|
|
|
|
|
|
|
|
if posts[0].service == 'twitter':
|
|
|
|
return lib.twitter.refresh_posts(posts)
|
|
|
|
|
2017-08-08 23:00:22 +02:00
|
|
|
@app.task(autoretry_for=(TwitterError, URLError))
|
2017-08-03 21:37:00 +02:00
|
|
|
def refresh_account(account_id):
|
|
|
|
account = Account.query.get(account_id)
|
|
|
|
|
2017-08-13 11:16:43 +02:00
|
|
|
posts = Post.query.with_parent(account).order_by(db.asc(Post.updated_at)).limit(100).all()
|
2017-08-03 21:37:00 +02:00
|
|
|
|
|
|
|
posts = refresh_posts(posts)
|
2017-08-12 23:07:16 +02:00
|
|
|
account.touch_refresh()
|
2017-08-03 21:37:00 +02:00
|
|
|
db.session.commit()
|
|
|
|
|
2017-08-08 23:00:22 +02:00
|
|
|
@app.task(autoretry_for=(TwitterError, URLError))
|
2017-08-03 21:37:00 +02:00
|
|
|
def refresh_account_with_oldest_post():
|
2017-08-13 17:39:05 +02:00
|
|
|
post = Post.query.outerjoin(Post.author).join(Account.tokens).group_by(Post).order_by(db.asc(Post.updated_at)).first()
|
2017-08-12 23:07:16 +02:00
|
|
|
refresh_account(post.author_id)
|
2017-08-03 21:37:00 +02:00
|
|
|
|
2017-08-12 23:22:22 +02:00
|
|
|
@app.task(autoretry_for=(TwitterError, URLError))
|
|
|
|
def refresh_account_with_longest_time_since_refresh():
|
2017-08-13 17:10:53 +02:00
|
|
|
acc = Account.query.join(Account.tokens).group_by(Account).order_by(db.asc(Account.last_refresh)).first()
|
2017-08-12 23:22:22 +02:00
|
|
|
refresh_account(acc.id)
|
|
|
|
|
|
|
|
|
2017-08-13 17:10:53 +02:00
|
|
|
app.add_periodic_task(6*60, periodic_cleanup)
|
2017-08-02 01:35:09 +02:00
|
|
|
app.add_periodic_task(45, queue_fetch_for_most_stale_accounts)
|
|
|
|
app.add_periodic_task(45, queue_deletes)
|
2017-08-12 23:22:22 +02:00
|
|
|
app.add_periodic_task(90, refresh_account_with_oldest_post)
|
|
|
|
app.add_periodic_task(90, refresh_account_with_longest_time_since_refresh)
|
2017-07-27 20:20:59 +02:00
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
app.worker_main()
|
2017-08-02 01:35:09 +02:00
|
|
|
|