shorten frequency at which we refresh the most stale posts
This commit is contained in:
parent
6a597bf53a
commit
7fad5ea458
17
tasks.py
17
tasks.py
|
@ -6,6 +6,7 @@ from model import Session, Account, TwitterArchive, Post, OAuthToken,\
|
||||||
import libforget.twitter
|
import libforget.twitter
|
||||||
import libforget.mastodon
|
import libforget.mastodon
|
||||||
from datetime import timedelta, datetime, timezone
|
from datetime import timedelta, datetime, timezone
|
||||||
|
from time import time
|
||||||
from zipfile import ZipFile
|
from zipfile import ZipFile
|
||||||
from io import BytesIO, TextIOWrapper
|
from io import BytesIO, TextIOWrapper
|
||||||
import json
|
import json
|
||||||
|
@ -64,14 +65,18 @@ def unique(fun):
|
||||||
|
|
||||||
@wraps(fun)
|
@wraps(fun)
|
||||||
def wrapper(*args, **kwargs):
|
def wrapper(*args, **kwargs):
|
||||||
key = 'celery_unique_lock:{}'.format(
|
logging.info('Checking for dupes for unique task %s', (fun.__name__, args, kwargs))
|
||||||
pickle.dumps((fun.__name__, args, kwargs)))
|
key = 'celery_unique_lock:epoch1:{}:{}'.format(
|
||||||
|
fun.__name__, pickle.dumps((args, kwargs)))
|
||||||
has_lock = False
|
has_lock = False
|
||||||
result = None
|
result = None
|
||||||
try:
|
try:
|
||||||
if r.set(key, 1, nx=True, ex=60 * 5):
|
if r.set(key, 1, nx=True, ex=60 * 5):
|
||||||
|
logging.info('No dupes for unique, running task %s', (fun.__name__, args, kwargs))
|
||||||
has_lock = True
|
has_lock = True
|
||||||
result = fun(*args, **kwargs)
|
result = fun(*args, **kwargs)
|
||||||
|
else:
|
||||||
|
logging.info('Unique task has a dupe, skipping %s', (fun.__name__, args, kwargs))
|
||||||
finally:
|
finally:
|
||||||
if has_lock:
|
if has_lock:
|
||||||
r.delete(key)
|
r.delete(key)
|
||||||
|
@ -407,12 +412,16 @@ def queue_deletes():
|
||||||
@app.task
|
@app.task
|
||||||
@unique
|
@unique
|
||||||
def refresh_account_with_oldest_post():
|
def refresh_account_with_oldest_post():
|
||||||
|
then = time()
|
||||||
post = (Post.query.outerjoin(Post.author).join(Account.tokens)
|
post = (Post.query.outerjoin(Post.author).join(Account.tokens)
|
||||||
.filter(Account.backoff_until < db.func.now())
|
.filter(Account.backoff_until < db.func.now())
|
||||||
.filter(~Account.dormant).group_by(Post).order_by(
|
.filter(~Account.dormant).group_by(Post).order_by(
|
||||||
db.asc(Post.updated_at)).first())
|
db.asc(Post.updated_at)).first())
|
||||||
if post:
|
if post:
|
||||||
refresh_account(post.author_id)
|
aid = post.author_id
|
||||||
|
refresh_account(aid)
|
||||||
|
now = time()
|
||||||
|
logging.info('Refreshed posts for {} for having oldest post in {}s'.format(aid, now-then))
|
||||||
|
|
||||||
|
|
||||||
@app.task
|
@app.task
|
||||||
|
@ -456,7 +465,7 @@ def update_mastodon_instances_popularity():
|
||||||
|
|
||||||
app.add_periodic_task(40, queue_fetch_for_most_stale_accounts)
|
app.add_periodic_task(40, queue_fetch_for_most_stale_accounts)
|
||||||
app.add_periodic_task(9, queue_deletes)
|
app.add_periodic_task(9, queue_deletes)
|
||||||
app.add_periodic_task(25, refresh_account_with_oldest_post)
|
app.add_periodic_task(6, refresh_account_with_oldest_post)
|
||||||
app.add_periodic_task(50, refresh_account_with_longest_time_since_refresh)
|
app.add_periodic_task(50, refresh_account_with_longest_time_since_refresh)
|
||||||
app.add_periodic_task(300, periodic_cleanup)
|
app.add_periodic_task(300, periodic_cleanup)
|
||||||
app.add_periodic_task(300, update_mastodon_instances_popularity)
|
app.add_periodic_task(300, update_mastodon_instances_popularity)
|
||||||
|
|
Loading…
Reference in New Issue