parent
49d87fd6d4
commit
c54390801c
|
@ -0,0 +1 @@
|
|||
exclude_paths: ['version.py', 'versioneer.py']
|
153
tasks.py
153
tasks.py
|
@ -10,23 +10,23 @@ from zipfile import ZipFile
|
|||
from io import BytesIO, TextIOWrapper
|
||||
import json
|
||||
from kombu import Queue
|
||||
import random
|
||||
import version
|
||||
from libforget.exceptions import PermanentError, TemporaryError
|
||||
import redis
|
||||
from functools import wraps
|
||||
import pickle
|
||||
|
||||
|
||||
app = Celery('tasks', broker=flaskapp.config['CELERY_BROKER'],
|
||||
task_serializer='pickle',
|
||||
task_soft_time_limit=600,
|
||||
task_time_limit=1200,
|
||||
)
|
||||
app = Celery(
|
||||
'tasks',
|
||||
broker=flaskapp.config['CELERY_BROKER'],
|
||||
task_serializer='pickle',
|
||||
task_soft_time_limit=600,
|
||||
task_time_limit=1200,
|
||||
)
|
||||
app.conf.task_queues = (
|
||||
Queue('default', routing_key='celery'),
|
||||
Queue('high_prio', routing_key='high'),
|
||||
Queue('higher_prio', routing_key='higher'),
|
||||
Queue('default', routing_key='celery'),
|
||||
Queue('high_prio', routing_key='high'),
|
||||
Queue('higher_prio', routing_key='higher'),
|
||||
)
|
||||
app.conf.task_default_queue = 'default'
|
||||
app.conf.task_default_exchange = 'celery'
|
||||
|
@ -37,7 +37,9 @@ sentry = None
|
|||
if 'SENTRY_DSN' in flaskapp.config:
|
||||
from raven import Client
|
||||
from raven.contrib.celery import register_signal, register_logger_signal
|
||||
sentry = Client(flaskapp.config['SENTRY_DSN'], release=version.get_versions()['version'])
|
||||
sentry = Client(
|
||||
flaskapp.config['SENTRY_DSN'],
|
||||
release=version.get_versions()['version'])
|
||||
register_logger_signal(sentry)
|
||||
register_signal(sentry)
|
||||
|
||||
|
@ -53,6 +55,8 @@ class DBTask(Task):
|
|||
app.Task = DBTask
|
||||
|
||||
r = None
|
||||
|
||||
|
||||
def unique(fun):
|
||||
global r
|
||||
if not r:
|
||||
|
@ -60,10 +64,11 @@ def unique(fun):
|
|||
|
||||
@wraps(fun)
|
||||
def wrapper(*args, **kwargs):
|
||||
key = 'celery_unique_lock:{}'.format(pickle.dumps((fun.__name__, args, kwargs)))
|
||||
key = 'celery_unique_lock:{}'.format(
|
||||
pickle.dumps((fun.__name__, args, kwargs)))
|
||||
has_lock = False
|
||||
try:
|
||||
if r.set(key, 1, nx=True, ex=60*5):
|
||||
if r.set(key, 1, nx=True, ex=60 * 5):
|
||||
has_lock = True
|
||||
return fun(*args, **kwargs)
|
||||
finally:
|
||||
|
@ -73,7 +78,6 @@ def unique(fun):
|
|||
return wrapper
|
||||
|
||||
|
||||
|
||||
def noop(*args, **kwargs):
|
||||
pass
|
||||
|
||||
|
@ -89,16 +93,16 @@ def make_dormant(acc):
|
|||
acc.dormant = True
|
||||
|
||||
|
||||
@app.task(autoretry_for=(TemporaryError,))
|
||||
@app.task(autoretry_for=(TemporaryError, ))
|
||||
@unique
|
||||
def fetch_acc(id_, cursor=None):
|
||||
account = Account.query.get(id_)
|
||||
print(f'fetching {account}')
|
||||
try:
|
||||
action = noop
|
||||
if(account.service == 'twitter'):
|
||||
if (account.service == 'twitter'):
|
||||
action = libforget.twitter.fetch_acc
|
||||
elif(account.service == 'mastodon'):
|
||||
elif (account.service == 'mastodon'):
|
||||
action = libforget.mastodon.fetch_acc
|
||||
cursor = action(account, cursor)
|
||||
if cursor:
|
||||
|
@ -158,37 +162,28 @@ def delete_from_account(account_id):
|
|||
if account.next_delete > datetime.now(timezone.utc):
|
||||
return
|
||||
|
||||
latest_n_posts = (Post.query.with_parent(account)
|
||||
.order_by(db.desc(Post.created_at))
|
||||
.limit(account.policy_keep_latest)
|
||||
latest_n_posts = (Post.query.with_parent(account).order_by(
|
||||
db.desc(Post.created_at)).limit(account.policy_keep_latest)
|
||||
.cte(name='latest'))
|
||||
posts = (
|
||||
Post.query.with_parent(account)
|
||||
.filter(
|
||||
Post.created_at + account.policy_keep_younger <= db.func.now())
|
||||
.filter(~Post.id.in_(db.select((latest_n_posts.c.id,))))
|
||||
.order_by(db.func.random())
|
||||
.limit(100).with_for_update().all())
|
||||
.filter(Post.created_at + account.policy_keep_younger <= db.func.now())
|
||||
.filter(~Post.id.in_(db.select((latest_n_posts.c.id, )))).order_by(
|
||||
db.func.random()).limit(100).with_for_update().all())
|
||||
|
||||
to_delete = None
|
||||
|
||||
def is_eligible(post):
|
||||
return (
|
||||
post.is_reblog or
|
||||
(
|
||||
(
|
||||
account.policy_keep_favourites == 'none' or
|
||||
(account.policy_keep_favourites == 'keeponly' and not post.favourite) or
|
||||
(account.policy_keep_favourites == 'deleteonly' and post.favourite)
|
||||
) and (
|
||||
account.policy_keep_media == 'none' or
|
||||
(account.policy_keep_media == 'keeponly' and not post.has_media) or
|
||||
(account.policy_keep_media == 'deleteonly' and post.has_media)
|
||||
) and (
|
||||
not account.policy_keep_direct or not post.direct
|
||||
)
|
||||
)
|
||||
)
|
||||
return (post.is_reblog or (
|
||||
(account.policy_keep_favourites == 'none' or
|
||||
(account.policy_keep_favourites == 'keeponly'
|
||||
and not post.favourite) or
|
||||
(account.policy_keep_favourites == 'deleteonly'
|
||||
and post.favourite)) and
|
||||
(account.policy_keep_media == 'none' or
|
||||
(account.policy_keep_media == 'keeponly' and not post.has_media)
|
||||
or (account.policy_keep_media == 'deleteonly' and post.has_media))
|
||||
and (not account.policy_keep_direct or not post.direct)))
|
||||
|
||||
try:
|
||||
action = noop
|
||||
|
@ -199,7 +194,7 @@ def delete_from_account(account_id):
|
|||
elif account.service == 'mastodon':
|
||||
action = libforget.mastodon.delete
|
||||
for post in posts:
|
||||
refreshed = refresh_posts((post,))
|
||||
refreshed = refresh_posts((post, ))
|
||||
if refreshed and is_eligible(refreshed[0]):
|
||||
to_delete = refreshed[0]
|
||||
break
|
||||
|
@ -212,7 +207,6 @@ def delete_from_account(account_id):
|
|||
else:
|
||||
account.next_delete = db.func.now() + timedelta(minutes=3)
|
||||
|
||||
|
||||
except TemporaryError:
|
||||
db.session.rollback()
|
||||
account.backoff()
|
||||
|
@ -241,8 +235,8 @@ def refresh_account(account_id):
|
|||
limit = 100
|
||||
if account.service == 'mastodon':
|
||||
limit = 3
|
||||
posts = (Post.query.with_parent(account)
|
||||
.order_by(db.asc(Post.updated_at)).limit(limit).all())
|
||||
posts = (Post.query.with_parent(account).order_by(
|
||||
db.asc(Post.updated_at)).limit(limit).all())
|
||||
|
||||
posts = refresh_posts(posts)
|
||||
account.touch_refresh()
|
||||
|
@ -263,27 +257,26 @@ def refresh_account(account_id):
|
|||
@unique
|
||||
def periodic_cleanup():
|
||||
# delete sessions after 48 hours
|
||||
(Session.query
|
||||
.filter(Session.updated_at < (db.func.now() - timedelta(hours=48)))
|
||||
(Session.query.filter(
|
||||
Session.updated_at < (db.func.now() - timedelta(hours=48)))
|
||||
.delete(synchronize_session=False))
|
||||
|
||||
# delete twitter archives after 3 days
|
||||
(TwitterArchive.query
|
||||
.filter(TwitterArchive.updated_at < (db.func.now() - timedelta(days=3)))
|
||||
(TwitterArchive.query.filter(
|
||||
TwitterArchive.updated_at < (db.func.now() - timedelta(days=3)))
|
||||
.delete(synchronize_session=False))
|
||||
|
||||
# delete anonymous oauth tokens after 1 day
|
||||
(OAuthToken.query
|
||||
.filter(OAuthToken.updated_at < (db.func.now() - timedelta(days=1)))
|
||||
.filter(OAuthToken.account_id == None) # noqa: E711
|
||||
(OAuthToken.query.filter(
|
||||
OAuthToken.updated_at < (db.func.now() - timedelta(days=1))).filter(
|
||||
OAuthToken.account_id == None) # noqa: E711
|
||||
.delete(synchronize_session=False))
|
||||
|
||||
# disable and log out users with no tokens
|
||||
unreachable = (
|
||||
Account.query
|
||||
.outerjoin(Account.tokens)
|
||||
.group_by(Account).having(db.func.count(OAuthToken.token) == 0)
|
||||
.filter(Account.policy_enabled == True)) # noqa: E712
|
||||
Account.query.outerjoin(Account.tokens)
|
||||
.group_by(Account).having(db.func.count(OAuthToken.token) == 0)
|
||||
.filter(Account.policy_enabled == True)) # noqa: E712
|
||||
for account in unreachable:
|
||||
account.force_log_out()
|
||||
account.policy_enabled = False
|
||||
|
@ -300,14 +293,11 @@ def periodic_cleanup():
|
|||
@unique
|
||||
def queue_fetch_for_most_stale_accounts(
|
||||
min_staleness=timedelta(minutes=2), limit=20):
|
||||
accs = (Account.query
|
||||
.join(Account.tokens).group_by(Account)
|
||||
accs = (Account.query.join(Account.tokens).group_by(Account)
|
||||
.filter(Account.last_fetch < db.func.now() - min_staleness)
|
||||
.filter(Account.backoff_until < db.func.now())
|
||||
.filter(~Account.dormant)
|
||||
.order_by(db.asc(Account.last_fetch))
|
||||
.limit(limit)
|
||||
)
|
||||
.filter(~Account.dormant).order_by(db.asc(
|
||||
Account.last_fetch)).limit(limit))
|
||||
for acc in accs:
|
||||
fetch_acc.s(acc.id).delay()
|
||||
db.session.commit()
|
||||
|
@ -317,10 +307,10 @@ def queue_fetch_for_most_stale_accounts(
|
|||
@unique
|
||||
def queue_deletes():
|
||||
eligible_accounts = (
|
||||
Account.query.filter(Account.policy_enabled == True) # noqa: E712
|
||||
.filter(Account.next_delete < db.func.now())
|
||||
.filter(Account.backoff_until < db.func.now())
|
||||
.filter(~Account.dormant))
|
||||
Account.query.filter(Account.policy_enabled == True) # noqa: E712
|
||||
.filter(Account.next_delete < db.func.now())
|
||||
.filter(Account.backoff_until < db.func.now())
|
||||
.filter(~Account.dormant))
|
||||
for account in eligible_accounts:
|
||||
delete_from_account.s(account.id).apply_async()
|
||||
|
||||
|
@ -330,8 +320,8 @@ def queue_deletes():
|
|||
def refresh_account_with_oldest_post():
|
||||
post = (Post.query.outerjoin(Post.author).join(Account.tokens)
|
||||
.filter(Account.backoff_until < db.func.now())
|
||||
.filter(~Account.dormant)
|
||||
.group_by(Post).order_by(db.asc(Post.updated_at)).first())
|
||||
.filter(~Account.dormant).group_by(Post).order_by(
|
||||
db.asc(Post.updated_at)).first())
|
||||
refresh_account(post.author_id)
|
||||
|
||||
|
||||
|
@ -339,24 +329,22 @@ def refresh_account_with_oldest_post():
|
|||
@unique
|
||||
def refresh_account_with_longest_time_since_refresh():
|
||||
acc = (Account.query.join(Account.tokens).group_by(Account)
|
||||
.filter(Account.backoff_until < db.func.now())
|
||||
.filter(~Account.dormant)
|
||||
.order_by(db.asc(Account.last_refresh)).first())
|
||||
.filter(Account.backoff_until < db.func.now())
|
||||
.filter(~Account.dormant).order_by(db.asc(
|
||||
Account.last_refresh)).first())
|
||||
refresh_account(acc.id)
|
||||
|
||||
|
||||
@app.task
|
||||
def update_mastodon_instances_popularity():
|
||||
# bump score for each active account
|
||||
for acct in (
|
||||
Account.query
|
||||
.options(db.joinedload(Account.sessions))
|
||||
.filter(~Account.dormant)
|
||||
.filter(Account.id.like('mastodon:%'))):
|
||||
for acct in (Account.query.options(db.joinedload(Account.sessions))
|
||||
.filter(~Account.dormant).filter(
|
||||
Account.id.like('mastodon:%'))):
|
||||
instance = MastodonInstance.query.get(acct.mastodon_instance)
|
||||
if not instance:
|
||||
instance = MastodonInstance(instance=acct.mastodon_instance,
|
||||
popularity=10)
|
||||
instance = MastodonInstance(
|
||||
instance=acct.mastodon_instance, popularity=10)
|
||||
db.session.add(instance)
|
||||
amount = 0.01
|
||||
if acct.policy_enabled:
|
||||
|
@ -365,15 +353,12 @@ def update_mastodon_instances_popularity():
|
|||
amount += 0.1
|
||||
instance.bump(amount / max(1, instance.popularity))
|
||||
|
||||
|
||||
# normalise scores so the top is 20
|
||||
top_pop = (
|
||||
db.session.query(db.func.max(MastodonInstance.popularity))
|
||||
.scalar()
|
||||
)
|
||||
top_pop = (db.session.query(db.func.max(MastodonInstance.popularity))
|
||||
.scalar())
|
||||
MastodonInstance.query.update({
|
||||
MastodonInstance.popularity:
|
||||
MastodonInstance.popularity * 20 / top_pop
|
||||
MastodonInstance.popularity * 20 / top_pop
|
||||
})
|
||||
db.session.commit()
|
||||
|
||||
|
|
|
@ -5,6 +5,6 @@ from unittest.mock import patch
|
|||
|
||||
def test_doit():
|
||||
with patch('sys.exit') as _exit:
|
||||
with patch('sys.argv') as argv:
|
||||
with patch('sys.argv'):
|
||||
doit.run(dodo)
|
||||
_exit.assert_called_with(0)
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
import re
|
||||
import version
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue