forget-cancellare-vecchi-toot/tasks.py

268 lines
8.4 KiB
Python
Raw Normal View History

from celery import Celery, Task
2017-07-27 20:20:59 +02:00
from app import app as flaskapp
from app import db
2017-08-29 14:46:32 +02:00
from model import Session, Account, TwitterArchive, Post, OAuthToken,\
MastodonInstance
2017-07-28 12:48:00 +02:00
import lib.twitter
2017-08-19 13:11:16 +02:00
import lib.mastodon
from mastodon.Mastodon import MastodonRatelimitError
2017-07-28 12:48:00 +02:00
from twitter import TwitterError
from urllib.error import URLError
2017-08-29 14:46:32 +02:00
from datetime import timedelta
2017-07-31 00:07:34 +02:00
from zipfile import ZipFile
from io import BytesIO, TextIOWrapper
2017-07-31 01:57:03 +02:00
import json
2017-08-01 20:57:15 +02:00
from kombu import Queue
2017-08-02 01:35:09 +02:00
import random
2017-08-07 23:42:38 +02:00
import version
2017-07-27 20:20:59 +02:00
2017-08-29 14:46:32 +02:00
app = Celery('tasks', broker=flaskapp.config['CELERY_BROKER'],
task_serializer='pickle')
2017-08-01 20:57:15 +02:00
app.conf.task_queues = (
Queue('default', routing_key='celery'),
Queue('high_prio', routing_key='high'),
Queue('higher_prio', routing_key='higher'),
)
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange = 'celery'
app.conf.task_default_exchange_type = 'direct'
2017-07-27 20:20:59 +02:00
2017-08-07 13:46:05 +02:00
if 'SENTRY_DSN' in flaskapp.config:
from raven import Client
from raven.contrib.celery import register_signal, register_logger_signal
2017-08-07 23:42:38 +02:00
sentry = Client(flaskapp.config['SENTRY_DSN'], release=version.version)
2017-08-07 13:46:05 +02:00
register_logger_signal(sentry)
register_signal(sentry)
class DBTask(Task):
def __call__(self, *args, **kwargs):
try:
super().__call__(*args, **kwargs)
finally:
db.session.close()
2017-08-29 14:46:32 +02:00
app.Task = DBTask
2017-08-29 14:46:32 +02:00
def noop(*args, **kwargs):
pass
@app.task(autoretry_for=(TwitterError, URLError, MastodonRatelimitError))
2017-08-29 21:27:38 +02:00
def fetch_acc(id_, cursor=None):
acc = Account.query.get(id_)
2017-07-29 12:01:32 +02:00
print(f'fetching {acc}')
try:
2017-08-29 14:46:32 +02:00
action = noop
if(acc.service == 'twitter'):
action = lib.twitter.fetch_acc
elif(acc.service == 'mastodon'):
action = lib.mastodon.fetch_acc
2017-08-19 13:11:16 +02:00
cursor = action(acc, cursor)
if cursor:
2017-08-29 21:27:38 +02:00
fetch_acc.si(id_, cursor).apply_async()
2017-07-29 12:01:32 +02:00
finally:
db.session.rollback()
acc.touch_fetch()
2017-07-29 12:01:32 +02:00
db.session.commit()
2017-08-29 14:46:32 +02:00
@app.task
2017-08-29 14:46:32 +02:00
def queue_fetch_for_most_stale_accounts(
2017-08-31 20:48:26 +02:00
min_staleness=timedelta(minutes=2), limit=20):
2017-07-28 12:48:00 +02:00
accs = Account.query\
.join(Account.tokens).group_by(Account)\
2017-07-28 12:48:00 +02:00
.filter(Account.last_fetch < db.func.now() - min_staleness)\
.order_by(db.asc(Account.last_fetch))\
.limit(limit)
for acc in accs:
fetch_acc.s(acc.id).delay()
2017-07-28 12:48:00 +02:00
db.session.commit()
2017-07-27 20:20:59 +02:00
2017-07-31 01:57:03 +02:00
@app.task(autoretry_for=(TwitterError, URLError))
2017-07-31 01:57:03 +02:00
def import_twitter_archive_month(archive_id, month_path):
ta = TwitterArchive.query.get(archive_id)
try:
with ZipFile(BytesIO(ta.body), 'r') as zipfile:
with TextIOWrapper(zipfile.open(month_path, 'r')) as f:
# seek past header
f.readline()
tweets = json.load(f)
for tweet in tweets:
post = lib.twitter.post_from_api_tweet_object(tweet)
existing_post = db.session.query(Post).get(post.id)
2017-08-29 14:46:32 +02:00
if post.author_id != ta.account_id or\
existing_post and existing_post.author_id != ta.account_id:
raise Exception("Shenanigans!")
2017-07-31 01:57:03 +02:00
post = db.session.merge(post)
ta.chunks_successful = TwitterArchive.chunks_successful + 1
db.session.commit()
except Exception as e:
db.session.rollback()
ta.chunks_failed = TwitterArchive.chunks_failed + 1
db.session.commit()
raise e
2017-07-27 20:20:59 +02:00
2017-07-31 02:00:07 +02:00
@app.task
def periodic_cleanup():
2017-08-08 04:16:57 +02:00
# delete sessions after 48 hours
2017-08-29 14:46:32 +02:00
(Session.query
.filter(Session.updated_at < (db.func.now() - timedelta(hours=48)))
.delete(synchronize_session=False))
2017-08-08 04:16:57 +02:00
# delete twitter archives after 3 days
2017-08-29 14:46:32 +02:00
(TwitterArchive.query
.filter(TwitterArchive.updated_at < (db.func.now() - timedelta(days=3)))
.delete(synchronize_session=False))
2017-07-31 02:00:07 +02:00
2017-08-08 04:16:57 +02:00
# delete anonymous oauth tokens after 1 day
2017-08-29 14:46:32 +02:00
(OAuthToken.query
.filter(OAuthToken.updated_at < (db.func.now() - timedelta(days=1)))
.filter(OAuthToken.account_id == None) # noqa: E711
.delete(synchronize_session=False))
2017-08-13 17:10:53 +02:00
# disable users with no tokens
2017-08-29 14:46:32 +02:00
unreachable = (
Account.query
.outerjoin(Account.tokens)
.group_by(Account).having(db.func.count(OAuthToken.token) == 0)
.filter(Account.policy_enabled == True)) # noqa: E712
2017-08-13 17:10:53 +02:00
for account in unreachable:
account.policy_enabled = False
# normalise mastodon instance popularity scores
2017-08-29 14:46:32 +02:00
biggest_instance = (
MastodonInstance.query
.order_by(db.desc(MastodonInstance.popularity)).first())
if biggest_instance.popularity > 40:
2017-08-29 14:46:32 +02:00
MastodonInstance.query.update({
MastodonInstance.popularity:
MastodonInstance.popularity * 40 / biggest_instance.popularity
})
2017-08-08 04:16:57 +02:00
db.session.commit()
2017-08-02 01:35:09 +02:00
2017-08-29 14:46:32 +02:00
2017-08-02 01:35:09 +02:00
@app.task
def queue_deletes():
2017-08-29 14:46:32 +02:00
eligible_accounts = (
Account.query.filter(Account.policy_enabled == True) # noqa: E712
.filter(Account.next_delete < db.func.now()))
2017-08-02 01:35:09 +02:00
for account in eligible_accounts:
delete_from_account.s(account.id).apply_async()
2017-08-29 14:46:32 +02:00
@app.task(autoretry_for=(TwitterError, URLError, MastodonRatelimitError))
2017-08-02 01:35:09 +02:00
def delete_from_account(account_id):
account = Account.query.get(account_id)
2017-08-29 14:46:32 +02:00
latest_n_posts = (Post.query.with_parent(account)
.order_by(db.desc(Post.created_at))
.limit(account.policy_keep_latest)
.cte(name='latest'))
2017-08-29 14:46:32 +02:00
posts = (
Post.query.with_parent(account)
.filter(
Post.created_at + account.policy_keep_younger <= db.func.now())
.filter(~Post.id.in_(db.select((latest_n_posts.c.id,))))
2017-08-29 14:46:32 +02:00
.order_by(db.func.random())
.limit(100).with_for_update().all())
2017-08-02 01:35:09 +02:00
to_delete = None
2017-08-29 14:46:32 +02:00
action = noop
2017-08-02 01:35:09 +02:00
if account.service == 'twitter':
2017-08-19 13:11:16 +02:00
action = lib.twitter.delete
posts = refresh_posts(posts)
if posts:
2017-08-29 21:27:38 +02:00
eligible = list(( # nosec
post for post in posts if
(not account.policy_keep_favourites or not post.favourite)
and (not account.policy_keep_media or not post.has_media)
))
if eligible:
to_delete = random.choice(eligible)
2017-08-19 13:11:16 +02:00
elif account.service == 'mastodon':
action = lib.mastodon.delete
for post in posts:
refreshed = refresh_posts((post,))
2017-08-19 15:17:22 +02:00
if refreshed and \
2017-08-29 14:46:32 +02:00
(not account.policy_keep_favourites or not post.favourite) \
and (not account.policy_keep_media or not post.has_media)\
and (not account.policy_keep_direct or not post.direct):
to_delete = refreshed[0]
break
2017-08-19 13:11:16 +02:00
if to_delete:
print("deleting {}".format(to_delete))
account.touch_delete()
action(to_delete)
2017-08-02 01:35:09 +02:00
db.session.commit()
2017-08-29 14:46:32 +02:00
2017-08-03 21:37:00 +02:00
def refresh_posts(posts):
posts = list(posts)
if len(posts) == 0:
return []
if posts[0].service == 'twitter':
return lib.twitter.refresh_posts(posts)
2017-08-19 13:11:16 +02:00
elif posts[0].service == 'mastodon':
return lib.mastodon.refresh_posts(posts)
2017-08-03 21:37:00 +02:00
2017-08-29 14:46:32 +02:00
@app.task(autoretry_for=(TwitterError, URLError),
throws=(MastodonRatelimitError))
2017-08-03 21:37:00 +02:00
def refresh_account(account_id):
account = Account.query.get(account_id)
limit = 100
if account.service == 'mastodon':
limit = 5
2017-08-29 14:46:32 +02:00
posts = (Post.query.with_parent(account)
.order_by(db.asc(Post.updated_at)).limit(limit).all())
2017-08-03 21:37:00 +02:00
posts = refresh_posts(posts)
account.touch_refresh()
2017-08-03 21:37:00 +02:00
db.session.commit()
2017-08-29 14:46:32 +02:00
@app.task(autoretry_for=(TwitterError, URLError),
throws=(MastodonRatelimitError))
2017-08-03 21:37:00 +02:00
def refresh_account_with_oldest_post():
2017-08-29 14:46:32 +02:00
post = (Post.query.outerjoin(Post.author).join(Account.tokens)
.group_by(Post).order_by(db.asc(Post.updated_at)).first())
refresh_account(post.author_id)
2017-08-03 21:37:00 +02:00
2017-08-29 14:46:32 +02:00
@app.task(autoretry_for=(TwitterError, URLError),
throws=(MastodonRatelimitError))
def refresh_account_with_longest_time_since_refresh():
2017-08-29 14:46:32 +02:00
acc = (Account.query.join(Account.tokens).group_by(Account)
.order_by(db.asc(Account.last_refresh)).first())
refresh_account(acc.id)
2017-08-13 17:10:53 +02:00
app.add_periodic_task(6*60, periodic_cleanup)
2017-08-02 01:35:09 +02:00
app.add_periodic_task(45, queue_fetch_for_most_stale_accounts)
app.add_periodic_task(45, queue_deletes)
app.add_periodic_task(90, refresh_account_with_oldest_post)
app.add_periodic_task(90, refresh_account_with_longest_time_since_refresh)
2017-07-27 20:20:59 +02:00
if __name__ == '__main__':
app.worker_main()