487 lines
16 KiB
Python
487 lines
16 KiB
Python
from celery import Celery, Task
|
|
from app import app as flaskapp
|
|
from app import db
|
|
from model import Session, Account, TwitterArchive, Post, OAuthToken,\
|
|
MastodonInstance
|
|
import libforget.twitter
|
|
import libforget.mastodon
|
|
from datetime import timedelta, datetime, timezone
|
|
from time import time
|
|
from zipfile import ZipFile
|
|
from io import BytesIO, TextIOWrapper
|
|
import json
|
|
from kombu import Queue
|
|
import version
|
|
from libforget.exceptions import PermanentError, TemporaryError
|
|
import redis
|
|
from functools import wraps
|
|
import pickle
|
|
import logging
|
|
|
|
app = Celery(
|
|
'tasks',
|
|
broker=flaskapp.config['CELERY_BROKER'],
|
|
task_serializer='pickle',
|
|
task_soft_time_limit=600,
|
|
task_time_limit=1200,
|
|
)
|
|
app.conf.task_queues = (
|
|
Queue('default', routing_key='celery'),
|
|
Queue('high_prio', routing_key='high'),
|
|
Queue('higher_prio', routing_key='higher'),
|
|
)
|
|
app.conf.task_default_queue = 'default'
|
|
app.conf.task_default_exchange = 'celery'
|
|
app.conf.task_default_exchange_type = 'direct'
|
|
|
|
sentry = None
|
|
|
|
if 'SENTRY_DSN' in flaskapp.config:
|
|
from raven import Client
|
|
from raven.contrib.celery import register_signal, register_logger_signal
|
|
sentry = Client(
|
|
flaskapp.config['SENTRY_DSN'],
|
|
release=version.get_versions()['version'])
|
|
register_logger_signal(sentry)
|
|
register_signal(sentry)
|
|
|
|
|
|
class DBTask(Task):
|
|
def __call__(self, *args, **kwargs):
|
|
try:
|
|
super().__call__(*args, **kwargs)
|
|
finally:
|
|
db.session.close()
|
|
|
|
|
|
app.Task = DBTask
|
|
|
|
r = None
|
|
|
|
|
|
def unique(fun):
|
|
global r
|
|
if not r:
|
|
r = redis.StrictRedis.from_url(flaskapp.config['REDIS_URI'])
|
|
|
|
@wraps(fun)
|
|
def wrapper(*args, **kwargs):
|
|
logging.info('Checking for dupes for unique task %s', (fun.__name__, args, kwargs))
|
|
key = 'celery_unique_lock:epoch1:{}:{}'.format(
|
|
fun.__name__, pickle.dumps((args, kwargs)))
|
|
has_lock = False
|
|
result = None
|
|
try:
|
|
if r.set(key, 1, nx=True, ex=60 * 5):
|
|
logging.info('No dupes for unique, running task %s', (fun.__name__, args, kwargs))
|
|
has_lock = True
|
|
result = fun(*args, **kwargs)
|
|
else:
|
|
logging.info('Unique task has a dupe, skipping %s', (fun.__name__, args, kwargs))
|
|
finally:
|
|
if has_lock:
|
|
r.delete(key)
|
|
return result
|
|
|
|
return wrapper
|
|
|
|
|
|
def noop(*args, **kwargs):
|
|
pass
|
|
|
|
|
|
def make_dormant(acc):
|
|
acc.reason = '''
|
|
Your account was temporarily disabled because your {service}
|
|
account was suspended or otherwise inaccessible. By logging into
|
|
it, you have reactivated your account, but be aware that some posts
|
|
may be missing from Forget's database, and it may take some time to
|
|
get back in sync.
|
|
'''.format(service=acc.service)
|
|
acc.dormant = True
|
|
|
|
|
|
@app.task(autoretry_for=(TemporaryError, ))
|
|
@unique
|
|
def fetch_acc(id_):
|
|
account = Account.query.get(id_)
|
|
print("Fetching {}".format(account))
|
|
try:
|
|
if not account.fetch_history_complete:
|
|
oldest = (db.session.query(Post)
|
|
.with_parent(account, 'posts')
|
|
.order_by(db.asc(Post.created_at))
|
|
.first())
|
|
# ^ None if this is our first fetch ever, otherwise the oldest post
|
|
if oldest:
|
|
max_id = oldest.remote_id
|
|
else:
|
|
max_id = None
|
|
since_id = None
|
|
elif account.fetch_current_batch_end:
|
|
oldest = (db.session.query(Post)
|
|
.with_parent(account, 'posts')
|
|
.filter(Post.created_at > account.fetch_current_batch_end.created_at)
|
|
.order_by(db.asc(Post.created_at))
|
|
.first())
|
|
# ^ None if this is our first fetch of this batch, otherwise oldest of this batch
|
|
if oldest:
|
|
max_id = oldest.remote_id
|
|
else:
|
|
max_id = None
|
|
since_id = account.fetch_current_batch_end.remote_id
|
|
else:
|
|
# we shouldn't get here unless the user had no posts on the service last time we fetched
|
|
max_id = None
|
|
latest = (db.session.query(Post)
|
|
.with_parent(account, 'posts')
|
|
.order_by(db.desc(Post.created_at))
|
|
.limit(1)
|
|
.scalar())
|
|
# ^ should be None considering the user has no posts
|
|
# will be the latest post in the off chance that something goes weird
|
|
if latest:
|
|
since_id = latest.remote_id
|
|
else:
|
|
since_id = None
|
|
|
|
|
|
fetch_posts = noop
|
|
if (account.service == 'twitter'):
|
|
fetch_posts = libforget.twitter.fetch_posts
|
|
elif (account.service == 'mastodon'):
|
|
fetch_posts = libforget.mastodon.fetch_posts
|
|
posts = fetch_posts(account, max_id, since_id)
|
|
|
|
if posts is None:
|
|
# ???
|
|
raise TemporaryError("Fetching posts went horribly wrong")
|
|
|
|
if (
|
|
len([post for post in posts if post.remote_id not in (max_id, since_id)])
|
|
== 0
|
|
):
|
|
# if there are no posts other than the edges
|
|
# we either finished the historic fetch
|
|
# or we finished the current batch
|
|
account.fetch_history_complete = True
|
|
batch_end = (Post.query.with_parent(account, 'posts').order_by(
|
|
db.desc(Post.created_at)).first())
|
|
if batch_end:
|
|
account.fetch_current_batch_end_id = batch_end.id
|
|
else:
|
|
account.fetch_current_batch_end_id = None
|
|
db.session.commit()
|
|
|
|
else:
|
|
for post in posts:
|
|
db.session.merge(post)
|
|
db.session.commit()
|
|
|
|
if not account.fetch_history_complete:
|
|
# reschedule immediately if we're still doing the historic fetch
|
|
print("{} is not done fetching history, resheduling.".format(account))
|
|
fetch_acc.apply_async((id_,), countdown=1)
|
|
|
|
|
|
except TemporaryError:
|
|
db.session.rollback()
|
|
account.backoff()
|
|
except PermanentError:
|
|
db.session.rollback()
|
|
make_dormant(account)
|
|
if sentry:
|
|
sentry.captureException()
|
|
finally:
|
|
db.session.rollback()
|
|
account.touch_fetch()
|
|
db.session.commit()
|
|
|
|
|
|
@app.task()
|
|
def import_twitter_archive_month(archive_id, month_path):
|
|
ta = TwitterArchive.query.get(archive_id)
|
|
|
|
try:
|
|
|
|
with ZipFile(BytesIO(ta.body), 'r') as zipfile:
|
|
with TextIOWrapper(zipfile.open(month_path, 'r')) as f:
|
|
|
|
# seek past header
|
|
f.readline()
|
|
|
|
tweets = json.load(f)
|
|
|
|
for tweet in tweets:
|
|
post = libforget.twitter.post_from_api_tweet_object(tweet)
|
|
existing_post = db.session.query(Post).get(post.id)
|
|
|
|
if post.author_id != ta.account_id or\
|
|
existing_post and existing_post.author_id != ta.account_id:
|
|
raise Exception("Shenanigans!")
|
|
|
|
post = db.session.merge(post)
|
|
|
|
ta.chunks_successful = TwitterArchive.chunks_successful + 1
|
|
db.session.commit()
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
ta.chunks_failed = TwitterArchive.chunks_failed + 1
|
|
db.session.commit()
|
|
raise e
|
|
|
|
|
|
@app.task()
|
|
@unique
|
|
def delete_from_account(account_id):
|
|
account = Account.query.get(account_id)
|
|
if account.next_delete > datetime.now(timezone.utc):
|
|
return
|
|
|
|
latest_n_posts = (Post.query.with_parent(account, 'posts').order_by(
|
|
db.desc(Post.created_at)).limit(account.policy_keep_latest)
|
|
.cte(name='latest'))
|
|
posts = (
|
|
Post.query.with_parent(account, 'posts')
|
|
.filter(Post.created_at + account.policy_keep_younger <= db.func.now())
|
|
.filter(~Post.id.in_(db.select((latest_n_posts.c.id, )))))
|
|
|
|
if(account.policy_keep_favourites != 'none'):
|
|
posts = posts.filter(db.or_(
|
|
Post.favourite == (account.policy_keep_favourites == 'deleteonly'),
|
|
Post.is_reblog))
|
|
if(account.policy_keep_media != 'none'):
|
|
posts = posts.filter(db.or_(
|
|
Post.has_media == (account.policy_keep_media == 'deleteonly'),
|
|
Post.is_reblog))
|
|
if(account.policy_keep_direct):
|
|
posts = posts.filter(~Post.direct)
|
|
|
|
limit = 100
|
|
if account.service == 'mastodon':
|
|
limit = 10
|
|
|
|
posts = posts.order_by(db.func.random()).limit(limit).all()
|
|
|
|
to_delete = None
|
|
|
|
def is_eligible(post):
|
|
return (post.is_reblog or (
|
|
(account.policy_keep_favourites == 'none' or
|
|
(account.policy_keep_favourites == 'keeponly'
|
|
and not post.favourite) or
|
|
(account.policy_keep_favourites == 'deleteonly'
|
|
and post.favourite)) and
|
|
(account.policy_keep_media == 'none' or
|
|
(account.policy_keep_media == 'keeponly' and not post.has_media)
|
|
or (account.policy_keep_media == 'deleteonly' and post.has_media))
|
|
and (not account.policy_keep_direct or not post.direct)))
|
|
|
|
try:
|
|
action = noop
|
|
if account.service == 'twitter':
|
|
action = libforget.twitter.delete
|
|
posts = refresh_posts(posts)
|
|
to_delete = next(filter(is_eligible, posts), None)
|
|
elif account.service == 'mastodon':
|
|
action = libforget.mastodon.delete
|
|
for post in posts:
|
|
refreshed = refresh_posts((post, ))
|
|
if refreshed and is_eligible(refreshed[0]):
|
|
to_delete = refreshed[0]
|
|
break
|
|
|
|
if to_delete:
|
|
print("Deleting {}".format(to_delete))
|
|
account.touch_delete()
|
|
action(to_delete)
|
|
account.reset_backoff()
|
|
else:
|
|
account.next_delete = db.func.now() + timedelta(minutes=3)
|
|
|
|
except TemporaryError:
|
|
db.session.rollback()
|
|
account.backoff()
|
|
|
|
finally:
|
|
db.session.commit()
|
|
|
|
|
|
def refresh_posts(posts):
|
|
posts = list(posts)
|
|
if len(posts) == 0:
|
|
return []
|
|
|
|
if posts[0].service == 'twitter':
|
|
return libforget.twitter.refresh_posts(posts)
|
|
elif posts[0].service == 'mastodon':
|
|
return libforget.mastodon.refresh_posts(posts)
|
|
|
|
|
|
@app.task()
|
|
@unique
|
|
def refresh_account(account_id):
|
|
account = Account.query.get(account_id)
|
|
|
|
print("Refreshing account {}".format(account))
|
|
|
|
try:
|
|
limit = 100
|
|
if account.service == 'mastodon':
|
|
limit = 3
|
|
posts = (Post.query.with_parent(account, 'posts').order_by(
|
|
db.asc(Post.updated_at)).limit(limit).all())
|
|
|
|
posts = refresh_posts(posts)
|
|
account.touch_refresh()
|
|
account.reset_backoff()
|
|
except TemporaryError:
|
|
db.session.rollback()
|
|
account.backoff()
|
|
except PermanentError:
|
|
db.session.rollback()
|
|
make_dormant(account)
|
|
if sentry:
|
|
sentry.captureException()
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
account.backoff()
|
|
db.session.commit()
|
|
raise e
|
|
finally:
|
|
db.session.commit()
|
|
|
|
|
|
@app.task
|
|
@unique
|
|
def periodic_cleanup():
|
|
# delete sessions after 48 hours
|
|
(Session.query.filter(
|
|
Session.updated_at < (db.func.now() - timedelta(hours=48)))
|
|
.delete(synchronize_session=False))
|
|
|
|
# delete twitter archives after 3 days
|
|
(TwitterArchive.query.filter(
|
|
TwitterArchive.updated_at < (db.func.now() - timedelta(days=3)))
|
|
.delete(synchronize_session=False))
|
|
|
|
# delete anonymous oauth tokens after 1 day
|
|
(OAuthToken.query.filter(
|
|
OAuthToken.updated_at < (db.func.now() - timedelta(days=1))).filter(
|
|
OAuthToken.account_id == None) # noqa: E711
|
|
.delete(synchronize_session=False))
|
|
|
|
# disable and log out users with no tokens
|
|
unreachable = (
|
|
Account.query.outerjoin(Account.tokens)
|
|
.group_by(Account).having(db.func.count(OAuthToken.token) == 0)
|
|
.filter(Account.policy_enabled == True)) # noqa: E712
|
|
for account in unreachable:
|
|
account.force_log_out()
|
|
account.policy_enabled = False
|
|
account.reason = """
|
|
Your account was disabled because Forget no longer had access to
|
|
your {service} account. Perhaps you had revoked it? By logging in,
|
|
you have restored access and you can now re-enable Forget if you wish.
|
|
""".format(service=account.service.capitalize())
|
|
|
|
db.session.commit()
|
|
|
|
|
|
@app.task
|
|
@unique
|
|
def queue_fetch_for_most_stale_accounts(
|
|
min_staleness=timedelta(minutes=2), limit=20):
|
|
accs = (Account.query.join(Account.tokens).group_by(Account)
|
|
.filter(Account.last_fetch < db.func.now() - min_staleness)
|
|
.filter(Account.backoff_until < db.func.now())
|
|
.filter(~Account.dormant).order_by(db.asc(
|
|
Account.last_fetch)).limit(limit))
|
|
for acc in accs:
|
|
fetch_acc.s(acc.id).delay()
|
|
db.session.commit()
|
|
|
|
|
|
@app.task
|
|
@unique
|
|
def queue_deletes():
|
|
eligible_accounts = (
|
|
Account.query.filter(Account.policy_enabled == True) # noqa: E712
|
|
.filter(Account.next_delete < db.func.now())
|
|
.filter(Account.backoff_until < db.func.now())
|
|
.filter(~Account.dormant))
|
|
for account in eligible_accounts:
|
|
delete_from_account.s(account.id).apply_async()
|
|
|
|
|
|
@app.task
|
|
@unique
|
|
def refresh_account_with_oldest_post():
|
|
then = time()
|
|
post = db.session.query(Post).from_statement(db.text("""
|
|
SELECT posts.id, posts.author_id
|
|
FROM posts, accounts, oauth_tokens
|
|
WHERE accounts.id = posts.author_id
|
|
AND accounts.id = oauth_tokens.account_id
|
|
AND accounts.backoff_until < now()
|
|
AND NOT accounts.dormant
|
|
ORDER BY posts.updated_at ASC
|
|
LIMIT 1;
|
|
""").columns(Post.id, Post.author_id)).one()
|
|
if post:
|
|
aid = post.author_id
|
|
refresh_account(aid)
|
|
now = time()
|
|
logging.info('Refreshed posts for {} for having oldest post in {}s'.format(aid, now-then))
|
|
|
|
|
|
@app.task
|
|
@unique
|
|
def refresh_account_with_longest_time_since_refresh():
|
|
acc = (Account.query.join(Account.tokens).group_by(Account)
|
|
.filter(Account.backoff_until < db.func.now())
|
|
.filter(~Account.dormant).order_by(db.asc(
|
|
Account.last_refresh)).first())
|
|
if acc:
|
|
refresh_account(acc.id)
|
|
|
|
|
|
@app.task
|
|
def update_mastodon_instances_popularity():
|
|
# bump score for each active account
|
|
for acct in (Account.query.options(db.joinedload(Account.sessions))
|
|
.filter(~Account.dormant).filter(
|
|
Account.id.like('mastodon:%'))):
|
|
instance = MastodonInstance.query.get(acct.mastodon_instance)
|
|
if not instance:
|
|
instance = MastodonInstance(
|
|
instance=acct.mastodon_instance, popularity=10)
|
|
db.session.add(instance)
|
|
amount = 0.01
|
|
if acct.policy_enabled:
|
|
amount = 0.5
|
|
for _ in acct.sessions:
|
|
amount += 0.1
|
|
instance.bump(amount / max(1, instance.popularity))
|
|
|
|
# normalise scores so the top is 20
|
|
top_pop = (db.session.query(db.func.max(MastodonInstance.popularity))
|
|
.scalar())
|
|
MastodonInstance.query.update({
|
|
MastodonInstance.popularity:
|
|
MastodonInstance.popularity * 20 / top_pop
|
|
})
|
|
db.session.commit()
|
|
|
|
|
|
app.add_periodic_task(40, queue_fetch_for_most_stale_accounts)
|
|
app.add_periodic_task(9, queue_deletes)
|
|
app.add_periodic_task(6, refresh_account_with_oldest_post)
|
|
app.add_periodic_task(50, refresh_account_with_longest_time_since_refresh)
|
|
app.add_periodic_task(300, periodic_cleanup)
|
|
app.add_periodic_task(300, update_mastodon_instances_popularity)
|
|
|
|
if __name__ == '__main__':
|
|
app.worker_main()
|