make sure only one of each task runs at once

This commit is contained in:
codl 2017-09-20 14:39:31 +02:00
parent b9b3d8bab2
commit 6bce7b1d04
No known key found for this signature in database
GPG Key ID: 6CD7C8891ED1233A
1 changed files with 30 additions and 1 deletions

View File

@ -5,7 +5,7 @@ from model import Session, Account, TwitterArchive, Post, OAuthToken,\
MastodonInstance
import lib.twitter
import lib.mastodon
from datetime import timedelta
from datetime import timedelta, datetime, timezone
from zipfile import ZipFile
from io import BytesIO, TextIOWrapper
import json
@ -13,6 +13,8 @@ from kombu import Queue
import random
import version
from lib.exceptions import PermanentError, TemporaryError
import redis
from functools import wraps
app = Celery('tasks', broker=flaskapp.config['CELERY_BROKER'],
@ -46,6 +48,22 @@ class DBTask(Task):
app.Task = DBTask
def unique(fun):
r = redis.StrictRedis.from_url(flaskapp.config['REDIS_URI'])
@wraps(fun)
def wrapper(*args, **kwargs):
key = 'celery_unique_lock:{}'.format(pickle.dumps((fun.__name__, args, kwargs)))
has_lock = False
try:
if r.set(key, 1, nx=True, ex=60*5):
has_lock = True
return fun(*args, **kwargs)
finally:
if has_lock:
r.delete(key)
def noop(*args, **kwargs):
pass
@ -64,6 +82,7 @@ def make_dormant(acc):
@app.task(autoretry_for=(TemporaryError,))
@unique
def fetch_acc(id_, cursor=None):
acc = Account.query.get(id_)
print(f'fetching {acc}')
@ -122,8 +141,12 @@ def import_twitter_archive_month(archive_id, month_path):
@app.task()
@unique
def delete_from_account(account_id):
account = Account.query.get(account_id)
if account.next_delete > datetime.now(timezone.utc):
return
latest_n_posts = (Post.query.with_parent(account)
.order_by(db.desc(Post.created_at))
.limit(account.policy_keep_latest)
@ -183,6 +206,7 @@ def refresh_posts(posts):
@app.task()
@unique
def refresh_account(account_id):
account = Account.query.get(account_id)
@ -204,6 +228,7 @@ def refresh_account(account_id):
@app.task
@unique
def periodic_cleanup():
# delete sessions after 48 hours
(Session.query
@ -240,6 +265,7 @@ def periodic_cleanup():
@app.task
@unique
def queue_fetch_for_most_stale_accounts(
min_staleness=timedelta(minutes=2), limit=20):
accs = (Account.query
@ -255,6 +281,7 @@ def queue_fetch_for_most_stale_accounts(
@app.task
@unique
def queue_deletes():
eligible_accounts = (
Account.query.filter(Account.policy_enabled == True) # noqa: E712
@ -265,6 +292,7 @@ def queue_deletes():
@app.task
@unique
def refresh_account_with_oldest_post():
post = (Post.query.outerjoin(Post.author).join(Account.tokens)
.filter(~Account.dormant)
@ -273,6 +301,7 @@ def refresh_account_with_oldest_post():
@app.task
@unique
def refresh_account_with_longest_time_since_refresh():
acc = (Account.query.join(Account.tokens).group_by(Account)
.filter(~Account.dormant)