forget-cancellare-vecchi-toot/tasks.py

299 lines
9.3 KiB
Python
Raw Normal View History

from celery import Celery, Task
2017-07-27 20:20:59 +02:00
from app import app as flaskapp
from app import db
2017-08-29 14:46:32 +02:00
from model import Session, Account, TwitterArchive, Post, OAuthToken,\
MastodonInstance
2017-07-28 12:48:00 +02:00
import lib.twitter
2017-08-19 13:11:16 +02:00
import lib.mastodon
2017-08-29 14:46:32 +02:00
from datetime import timedelta
2017-07-31 00:07:34 +02:00
from zipfile import ZipFile
from io import BytesIO, TextIOWrapper
2017-07-31 01:57:03 +02:00
import json
2017-08-01 20:57:15 +02:00
from kombu import Queue
2017-08-02 01:35:09 +02:00
import random
2017-08-07 23:42:38 +02:00
import version
from lib.exceptions import PermanentError, TemporaryError
2017-07-27 20:20:59 +02:00
2017-08-29 14:46:32 +02:00
app = Celery('tasks', broker=flaskapp.config['CELERY_BROKER'],
task_serializer='pickle')
2017-08-01 20:57:15 +02:00
app.conf.task_queues = (
Queue('default', routing_key='celery'),
Queue('high_prio', routing_key='high'),
Queue('higher_prio', routing_key='higher'),
)
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange = 'celery'
app.conf.task_default_exchange_type = 'direct'
2017-07-27 20:20:59 +02:00
2017-09-05 13:01:33 +02:00
sentry = None
2017-08-07 13:46:05 +02:00
if 'SENTRY_DSN' in flaskapp.config:
from raven import Client
from raven.contrib.celery import register_signal, register_logger_signal
2017-08-07 23:42:38 +02:00
sentry = Client(flaskapp.config['SENTRY_DSN'], release=version.version)
2017-08-07 13:46:05 +02:00
register_logger_signal(sentry)
register_signal(sentry)
class DBTask(Task):
def __call__(self, *args, **kwargs):
try:
super().__call__(*args, **kwargs)
finally:
db.session.close()
2017-08-29 14:46:32 +02:00
app.Task = DBTask
2017-08-29 14:46:32 +02:00
def noop(*args, **kwargs):
pass
def make_dormant(acc):
acc.reason = '''
Your account was temporarily disabled because your {service}
account was suspended or otherwise inaccessible. By logging into
it, you have reactivated your account, but be aware that some posts
may be missing from Forget's database, and it may take some time to
get back in sync.
2017-09-05 00:08:23 +02:00
'''.format(service=acc.service)
acc.dormant = True
db.session.commit()
@app.task(autoretry_for=(TemporaryError,))
2017-08-29 21:27:38 +02:00
def fetch_acc(id_, cursor=None):
acc = Account.query.get(id_)
2017-07-29 12:01:32 +02:00
print(f'fetching {acc}')
try:
2017-08-29 14:46:32 +02:00
action = noop
if(acc.service == 'twitter'):
action = lib.twitter.fetch_acc
elif(acc.service == 'mastodon'):
action = lib.mastodon.fetch_acc
2017-08-19 13:11:16 +02:00
cursor = action(acc, cursor)
if cursor:
2017-08-29 21:27:38 +02:00
fetch_acc.si(id_, cursor).apply_async()
2017-09-06 13:08:06 +02:00
except PermanentError:
db.session.rollback()
make_dormant(acc)
2017-09-05 13:01:33 +02:00
if sentry:
sentry.captureException()
2017-07-29 12:01:32 +02:00
finally:
db.session.rollback()
acc.touch_fetch()
2017-07-29 12:01:32 +02:00
db.session.commit()
2017-08-29 14:46:32 +02:00
@app.task()
2017-07-31 01:57:03 +02:00
def import_twitter_archive_month(archive_id, month_path):
ta = TwitterArchive.query.get(archive_id)
try:
with ZipFile(BytesIO(ta.body), 'r') as zipfile:
with TextIOWrapper(zipfile.open(month_path, 'r')) as f:
# seek past header
f.readline()
tweets = json.load(f)
for tweet in tweets:
post = lib.twitter.post_from_api_tweet_object(tweet)
existing_post = db.session.query(Post).get(post.id)
2017-08-29 14:46:32 +02:00
if post.author_id != ta.account_id or\
existing_post and existing_post.author_id != ta.account_id:
raise Exception("Shenanigans!")
2017-07-31 01:57:03 +02:00
post = db.session.merge(post)
ta.chunks_successful = TwitterArchive.chunks_successful + 1
db.session.commit()
except Exception as e:
db.session.rollback()
ta.chunks_failed = TwitterArchive.chunks_failed + 1
db.session.commit()
raise e
2017-07-27 20:20:59 +02:00
@app.task(autoretry_for=(TemporaryError,))
2017-08-02 01:35:09 +02:00
def delete_from_account(account_id):
account = Account.query.get(account_id)
2017-08-29 14:46:32 +02:00
latest_n_posts = (Post.query.with_parent(account)
.order_by(db.desc(Post.created_at))
.limit(account.policy_keep_latest)
.cte(name='latest'))
2017-08-29 14:46:32 +02:00
posts = (
Post.query.with_parent(account)
.filter(
Post.created_at + account.policy_keep_younger <= db.func.now())
.filter(~Post.id.in_(db.select((latest_n_posts.c.id,))))
2017-08-29 14:46:32 +02:00
.order_by(db.func.random())
.limit(100).with_for_update().all())
2017-08-02 01:35:09 +02:00
to_delete = None
2017-08-29 14:46:32 +02:00
action = noop
2017-08-02 01:35:09 +02:00
if account.service == 'twitter':
2017-08-19 13:11:16 +02:00
action = lib.twitter.delete
posts = refresh_posts(posts)
if posts:
2017-08-29 21:27:38 +02:00
eligible = list(( # nosec
post for post in posts if
(not account.policy_keep_favourites or not post.favourite)
and (not account.policy_keep_media or not post.has_media)
))
if eligible:
to_delete = random.choice(eligible)
2017-08-19 13:11:16 +02:00
elif account.service == 'mastodon':
action = lib.mastodon.delete
for post in posts:
refreshed = refresh_posts((post,))
2017-08-19 15:17:22 +02:00
if refreshed and \
2017-08-29 14:46:32 +02:00
(not account.policy_keep_favourites or not post.favourite) \
and (not account.policy_keep_media or not post.has_media)\
and (not account.policy_keep_direct or not post.direct):
to_delete = refreshed[0]
break
2017-08-19 13:11:16 +02:00
if to_delete:
print("deleting {}".format(to_delete))
account.touch_delete()
action(to_delete)
2017-08-02 01:35:09 +02:00
db.session.commit()
2017-08-29 14:46:32 +02:00
2017-08-03 21:37:00 +02:00
def refresh_posts(posts):
posts = list(posts)
if len(posts) == 0:
return []
if posts[0].service == 'twitter':
return lib.twitter.refresh_posts(posts)
2017-08-19 13:11:16 +02:00
elif posts[0].service == 'mastodon':
return lib.mastodon.refresh_posts(posts)
2017-08-03 21:37:00 +02:00
2017-08-29 14:46:32 +02:00
@app.task(autoretry_for=(TemporaryError,))
2017-08-03 21:37:00 +02:00
def refresh_account(account_id):
account = Account.query.get(account_id)
try:
limit = 100
if account.service == 'mastodon':
limit = 5
posts = (Post.query.with_parent(account)
.order_by(db.asc(Post.updated_at)).limit(limit).all())
posts = refresh_posts(posts)
account.touch_refresh()
db.session.commit()
2017-09-06 13:08:06 +02:00
except PermanentError:
db.session.rollback()
make_dormant(account)
2017-09-05 13:01:33 +02:00
if sentry:
sentry.captureException()
@app.task
def periodic_cleanup():
# delete sessions after 48 hours
(Session.query
.filter(Session.updated_at < (db.func.now() - timedelta(hours=48)))
.delete(synchronize_session=False))
# delete twitter archives after 3 days
(TwitterArchive.query
.filter(TwitterArchive.updated_at < (db.func.now() - timedelta(days=3)))
.delete(synchronize_session=False))
# delete anonymous oauth tokens after 1 day
(OAuthToken.query
.filter(OAuthToken.updated_at < (db.func.now() - timedelta(days=1)))
.filter(OAuthToken.account_id == None) # noqa: E711
.delete(synchronize_session=False))
# disable and log out users with no tokens
unreachable = (
Account.query
.outerjoin(Account.tokens)
.group_by(Account).having(db.func.count(OAuthToken.token) == 0)
.filter(Account.policy_enabled == True)) # noqa: E712
for account in unreachable:
account.force_log_out()
account.policy_enabled = False
account.reason = """
Your account was disabled because Forget no longer had access to
your {service} account. Perhaps you had revoked it? By logging in,
you have restored access and you can now re-enable Forget if you wish.
""".format(service=account.service.capitalize())
# normalise mastodon instance popularity scores
biggest_instance = (
MastodonInstance.query
.order_by(db.desc(MastodonInstance.popularity)).first())
if biggest_instance.popularity > 40:
MastodonInstance.query.update({
MastodonInstance.popularity:
MastodonInstance.popularity * 40 / biggest_instance.popularity
})
2017-08-03 21:37:00 +02:00
db.session.commit()
2017-08-29 14:46:32 +02:00
@app.task
def queue_fetch_for_most_stale_accounts(
min_staleness=timedelta(minutes=2), limit=20):
accs = (Account.query
.join(Account.tokens).group_by(Account)
.filter(Account.last_fetch < db.func.now() - min_staleness)
.filter(~Account.dormant)
.order_by(db.asc(Account.last_fetch))
.limit(limit)
)
for acc in accs:
fetch_acc.s(acc.id).delay()
db.session.commit()
@app.task
def queue_deletes():
eligible_accounts = (
Account.query.filter(Account.policy_enabled == True) # noqa: E712
.filter(Account.next_delete < db.func.now())
.filter(~Account.dormant))
for account in eligible_accounts:
delete_from_account.s(account.id).apply_async()
@app.task
2017-08-03 21:37:00 +02:00
def refresh_account_with_oldest_post():
2017-08-29 14:46:32 +02:00
post = (Post.query.outerjoin(Post.author).join(Account.tokens)
.filter(~Account.dormant)
2017-08-29 14:46:32 +02:00
.group_by(Post).order_by(db.asc(Post.updated_at)).first())
refresh_account(post.author_id)
2017-08-03 21:37:00 +02:00
2017-08-29 14:46:32 +02:00
@app.task
def refresh_account_with_longest_time_since_refresh():
2017-08-29 14:46:32 +02:00
acc = (Account.query.join(Account.tokens).group_by(Account)
.filter(~Account.dormant)
2017-08-29 14:46:32 +02:00
.order_by(db.asc(Account.last_refresh)).first())
refresh_account(acc.id)
app.add_periodic_task(120, periodic_cleanup)
2017-09-01 01:45:24 +02:00
app.add_periodic_task(40, queue_fetch_for_most_stale_accounts)
app.add_periodic_task(15, queue_deletes)
app.add_periodic_task(60, refresh_account_with_oldest_post)
app.add_periodic_task(180, refresh_account_with_longest_time_since_refresh)
2017-07-27 20:20:59 +02:00
if __name__ == '__main__':
app.worker_main()