Merge branch 'master' into fix-refresh

This commit is contained in:
codl 2019-02-24 16:35:44 +01:00
commit 4054ebfffd
No known key found for this signature in database
GPG Key ID: 6CD7C8891ED1233A
6 changed files with 146 additions and 77 deletions

View File

@ -1,5 +1,6 @@
## v1.4.2 ## v1.4.2
* implemented a more robust fetching algorithm, which should prevent accounts getting stuck with only a fraction of their posts fetched ([GH-13](https://github.com/codl/forget/issues/13))
* fix: picture tags having an extra comma * fix: picture tags having an extra comma
* fix: outdated joke in about page * fix: outdated joke in about page
* fix: posts' status not getting refreshed (ie whether or not they were faved, or deleted externally) * fix: posts' status not getting refreshed (ie whether or not they were faved, or deleted externally)
@ -33,7 +34,7 @@ Released 2018-05-08
Released 2018-05-08 Released 2018-05-08
* remember a user's mastodon instances and let them log in in one click (GH-36) * remember a user's mastodon instances and let them log in in one click ([GH-36](https://github.com/codl/forget/issues/36))
## v1.1.3 ## v1.1.3

View File

@ -101,13 +101,11 @@ def get_api_for_acc(account):
except (MastodonNetworkError, except (MastodonNetworkError,
MastodonRatelimitError) as e: MastodonRatelimitError) as e:
raise TemporaryError(e) raise TemporaryError(e)
raise TemporaryError('No access to account {}'.format(account))
def fetch_acc(acc, cursor=None): def fetch_posts(acc, max_id, since_id):
api = get_api_for_acc(acc) api = get_api_for_acc(acc)
if not api:
print('no access, aborting')
return None
try: try:
newacc = account_from_api_object( newacc = account_from_api_object(
@ -115,32 +113,15 @@ def fetch_acc(acc, cursor=None):
acc = db.session.merge(newacc) acc = db.session.merge(newacc)
kwargs = dict(limit=40) kwargs = dict(limit=40)
if cursor: if max_id:
kwargs.update(cursor) kwargs['max_id'] = max_id
if since_id:
if 'max_id' not in kwargs: kwargs['since_id'] = since_id
most_recent_post = (
Post.query.with_parent(acc)
.order_by(db.desc(Post.created_at)).first())
if most_recent_post:
kwargs['since_id'] = most_recent_post.mastodon_id
statuses = api.account_statuses(acc.mastodon_id, **kwargs) statuses = api.account_statuses(acc.mastodon_id, **kwargs)
if statuses: return [post_from_api_object(status, acc.mastodon_instance) for status in statuses]
for status in statuses:
post = post_from_api_object(status, acc.mastodon_instance)
db.session.merge(post)
if 'max_id' not in kwargs:
kwargs['max_id'] = int(status['id'])
kwargs['max_id'] = min(int(kwargs['max_id']), int(status['id']))
else:
kwargs = None
db.session.commit()
return kwargs
except (MastodonAPIError, except (MastodonAPIError,
MastodonNetworkError, MastodonNetworkError,
MastodonRatelimitError) as e: MastodonRatelimitError) as e:
@ -177,8 +158,6 @@ def account_from_api_object(obj, instance):
def refresh_posts(posts): def refresh_posts(posts):
acc = posts[0].author acc = posts[0].author
api = get_api_for_acc(acc) api = get_api_for_acc(acc)
if not api:
raise Exception('no access')
new_posts = list() new_posts = list()
for post in posts: for post in posts:

View File

@ -92,7 +92,7 @@ def get_twitter_for_acc(account):
except URLError as e: except URLError as e:
raise TemporaryError(e) raise TemporaryError(e)
return None raise TemporaryError("No access to account {}".format(account))
locale.setlocale(locale.LC_TIME, 'C') locale.setlocale(locale.LC_TIME, 'C')
@ -123,11 +123,8 @@ def post_from_api_tweet_object(tweet, post=None):
return post return post
def fetch_acc(account, cursor): def fetch_posts(account, max_id, since_id):
t = get_twitter_for_acc(account) t = get_twitter_for_acc(account)
if not t:
print("no twitter access, aborting")
return
try: try:
user = t.account.verify_credentials() user = t.account.verify_credentials()
@ -139,45 +136,22 @@ def fetch_acc(account, cursor):
'trim_user': True, 'trim_user': True,
'tweet_mode': 'extended', 'tweet_mode': 'extended',
} }
if cursor: if max_id:
kwargs.update(cursor) kwargs['max_id'] = max_id
if since_id:
if 'max_id' not in kwargs: kwargs['since_id'] = since_id
most_recent_post = (
Post.query.order_by(db.desc(Post.created_at))
.filter(Post.author_id == account.id).first())
if most_recent_post:
kwargs['since_id'] = most_recent_post.twitter_id
tweets = t.statuses.user_timeline(**kwargs) tweets = t.statuses.user_timeline(**kwargs)
except (TwitterError, URLError) as e: except (TwitterError, URLError) as e:
handle_error(e) handle_error(e)
print("processing {} tweets for {acc}".format(len(tweets), acc=account)) return [post_from_api_tweet_object(tweet) for tweet in tweets]
if len(tweets) > 0:
kwargs['max_id'] = +inf
for tweet in tweets:
db.session.merge(post_from_api_tweet_object(tweet))
kwargs['max_id'] = min(tweet['id'] - 1, kwargs['max_id'])
else:
kwargs = None
db.session.commit()
return kwargs
def refresh_posts(posts): def refresh_posts(posts):
if not posts: if not posts:
return posts return posts
t = get_twitter_for_acc(posts[0].author) t = get_twitter_for_acc(posts[0].author)
if not t:
return
try: try:
tweets = t.statuses.lookup( tweets = t.statuses.lookup(
_id=",".join((post.twitter_id for post in posts)), _id=",".join((post.twitter_id for post in posts)),

View File

@ -0,0 +1,28 @@
"""new fetching flags
Revision ID: 4b56cde3ebd7
Revises: c136aa1157f9
Create Date: 2019-02-24 11:53:29.128983
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '4b56cde3ebd7'
down_revision = 'c136aa1157f9'
branch_labels = None
depends_on = None
def upgrade():
op.add_column('accounts', sa.Column('fetch_current_batch_end_id', sa.String(), nullable=True))
op.add_column('accounts', sa.Column('fetch_history_complete', sa.Boolean(), server_default='FALSE', nullable=False))
op.create_foreign_key(op.f('fk_accounts_fetch_current_batch_end_id_posts'), 'accounts', 'posts', ['fetch_current_batch_end_id'], ['id'], ondelete='SET NULL')
def downgrade():
op.drop_constraint(op.f('fk_accounts_fetch_current_batch_end_id_posts'), 'accounts', type_='foreignkey')
op.drop_column('accounts', 'fetch_history_complete')
op.drop_column('accounts', 'fetch_current_batch_end_id')

View File

@ -4,6 +4,7 @@ from app import db
import secrets import secrets
from libforget.interval import decompose_interval from libforget.interval import decompose_interval
import random import random
from sqlalchemy.ext.declarative import declared_attr
class TimestampMixin(object): class TimestampMixin(object):
@ -67,6 +68,13 @@ class RemoteIDMixin(object):
def mastodon_id(self, id_): def mastodon_id(self, id_):
self.id = "mastodon:{}@{}".format(id_, self.mastodon_instance) self.id = "mastodon:{}@{}".format(id_, self.mastodon_instance)
@property
def remote_id(self):
if self.service == 'twitter':
return self.twitter_id
elif self.service == 'mastodon':
return self.mastodon_id
ThreeWayPolicyEnum = db.Enum('keeponly', 'deleteonly', 'none', ThreeWayPolicyEnum = db.Enum('keeponly', 'deleteonly', 'none',
name='enum_3way_policy') name='enum_3way_policy')
@ -105,6 +113,19 @@ class Account(TimestampMixin, RemoteIDMixin):
last_delete = db.Column(db.DateTime(timezone=True), index=True) last_delete = db.Column(db.DateTime(timezone=True), index=True)
next_delete = db.Column(db.DateTime(timezone=True), index=True) next_delete = db.Column(db.DateTime(timezone=True), index=True)
fetch_history_complete = db.Column(db.Boolean, server_default='FALSE',
nullable=False)
@declared_attr
def fetch_current_batch_end_id(cls):
return db.Column(db.String, db.ForeignKey('posts.id', ondelete='SET NULL'))
@declared_attr
def fetch_current_batch_end(cls):
return db.relationship("Post", foreign_keys=(cls.fetch_current_batch_end_id,))
# the declared_attr is necessary because of the foreign key
# and because this class is technically one big mixin
# https://docs.sqlalchemy.org/en/latest/orm/extensions/declarative/mixins.html#mixing-in-relationships
reason = db.Column(db.String) reason = db.Column(db.String)
dormant = db.Column(db.Boolean, server_default='FALSE', nullable=False) dormant = db.Column(db.Boolean, server_default='FALSE', nullable=False)
@ -175,7 +196,7 @@ class Account(TimestampMixin, RemoteIDMixin):
# backref: sessions # backref: sessions
def post_count(self): def post_count(self):
return Post.query.with_parent(self).count() return Post.query.with_parent(self, 'posts').count()
def estimate_eligible_for_delete(self): def estimate_eligible_for_delete(self):
""" """
@ -184,10 +205,10 @@ class Account(TimestampMixin, RemoteIDMixin):
refresh every single post every time we need to know how many posts are refresh every single post every time we need to know how many posts are
eligible to delete eligible to delete
""" """
latest_n_posts = (Post.query.with_parent(self) latest_n_posts = (Post.query.with_parent(self, 'posts')
.order_by(db.desc(Post.created_at)) .order_by(db.desc(Post.created_at))
.limit(self.policy_keep_latest)) .limit(self.policy_keep_latest))
query = (Post.query.with_parent(self) query = (Post.query.with_parent(self, 'posts')
.filter(Post.created_at <= .filter(Post.created_at <=
db.func.now() - self.policy_keep_younger) db.func.now() - self.policy_keep_younger)
.except_(latest_n_posts)) .except_(latest_n_posts))
@ -273,6 +294,7 @@ class Post(db.Model, TimestampMixin, RemoteIDMixin):
nullable=False) nullable=False)
author = db.relationship( author = db.relationship(
Account, Account,
foreign_keys = (author_id,),
backref=db.backref('posts', backref=db.backref('posts',
order_by=lambda: db.desc(Post.created_at))) order_by=lambda: db.desc(Post.created_at)))

View File

@ -67,13 +67,15 @@ def unique(fun):
key = 'celery_unique_lock:{}'.format( key = 'celery_unique_lock:{}'.format(
pickle.dumps((fun.__name__, args, kwargs))) pickle.dumps((fun.__name__, args, kwargs)))
has_lock = False has_lock = False
result = None
try: try:
if r.set(key, 1, nx=True, ex=60 * 5): if r.set(key, 1, nx=True, ex=60 * 5):
has_lock = True has_lock = True
return fun(*args, **kwargs) result = fun(*args, **kwargs)
finally: finally:
if has_lock: if has_lock:
r.delete(key) r.delete(key)
return result
return wrapper return wrapper
@ -94,19 +96,82 @@ def make_dormant(acc):
@app.task(autoretry_for=(TemporaryError, )) @app.task(autoretry_for=(TemporaryError, ))
@unique def fetch_acc(id_):
def fetch_acc(id_, cursor=None):
account = Account.query.get(id_) account = Account.query.get(id_)
print(f'fetching {account}') print(f'fetching {account}')
try: try:
action = noop if not account.fetch_history_complete:
oldest = (db.session.query(Post)
.with_parent(account, 'posts')
.order_by(db.asc(Post.created_at))
.first())
# ^ None if this is our first fetch ever, otherwise the oldest post
if oldest:
max_id = oldest.remote_id
else:
max_id = None
since_id = None
elif account.fetch_current_batch_end:
oldest = (db.session.query(Post)
.with_parent(account, 'posts')
.filter(Post.created_at > account.fetch_current_batch_end.created_at)
.order_by(db.asc(Post.created_at))
.first())
# ^ None if this is our first fetch of this batch, otherwise oldest of this batch
if oldest:
max_id = oldest.remote_id
else:
max_id = None
since_id = account.fetch_current_batch_end.remote_id
else:
# we shouldn't get here unless the user had no posts on the service last time we fetched
max_id = None
latest = (db.session.query(Post)
.with_parent(account, 'posts')
.order_by(db.desc(Post.created_at))
.limit(1)
.scalar())
# ^ should be None considering the user has no posts
# will be the latest post in the off chance that something goes weird
if latest:
since_id = latest.remote_id
else:
since_id = None
print('max_id: {}, since_id: {}'.format(max_id, since_id))
fetch_posts = noop
if (account.service == 'twitter'): if (account.service == 'twitter'):
action = libforget.twitter.fetch_acc fetch_posts = libforget.twitter.fetch_posts
elif (account.service == 'mastodon'): elif (account.service == 'mastodon'):
action = libforget.mastodon.fetch_acc fetch_posts = libforget.mastodon.fetch_posts
cursor = action(account, cursor) posts = fetch_posts(account, max_id, since_id)
if cursor:
fetch_acc.si(id_, cursor).apply_async() if posts is None:
# ???
raise TemporaryError("Fetching posts went horribly wrong")
if len(posts) == 0:
# we either finished the historic fetch
# or we finished the current batch
account.fetch_history_complete = True
account.fetch_current_batch_end_id = (Post.query
.with_parent(account, 'posts').order_by(db.desc(Post.created_at)).first()).id
# ^ note that this could be None if the user has no posts
# this is okay
db.session.commit()
else:
for post in posts:
db.session.merge(post)
db.session.commit()
if not account.fetch_history_complete:
# reschedule immediately if we're still doing the historic fetch
fetch_acc.apply_async((id_,))
except TemporaryError: except TemporaryError:
db.session.rollback() db.session.rollback()
account.backoff() account.backoff()
@ -162,11 +227,11 @@ def delete_from_account(account_id):
if account.next_delete > datetime.now(timezone.utc): if account.next_delete > datetime.now(timezone.utc):
return return
latest_n_posts = (Post.query.with_parent(account).order_by( latest_n_posts = (Post.query.with_parent(account, 'posts').order_by(
db.desc(Post.created_at)).limit(account.policy_keep_latest) db.desc(Post.created_at)).limit(account.policy_keep_latest)
.cte(name='latest')) .cte(name='latest'))
posts = ( posts = (
Post.query.with_parent(account) Post.query.with_parent(account, 'posts')
.filter(Post.created_at + account.policy_keep_younger <= db.func.now()) .filter(Post.created_at + account.policy_keep_younger <= db.func.now())
.filter(~Post.id.in_(db.select((latest_n_posts.c.id, )))).order_by( .filter(~Post.id.in_(db.select((latest_n_posts.c.id, )))).order_by(
db.func.random()).limit(100).with_for_update().all()) db.func.random()).limit(100).with_for_update().all())
@ -237,7 +302,7 @@ def refresh_account(account_id):
limit = 100 limit = 100
if account.service == 'mastodon': if account.service == 'mastodon':
limit = 3 limit = 3
posts = (Post.query.with_parent(account).order_by( posts = (Post.query.with_parent(account, 'posts').order_by(
db.asc(Post.updated_at)).limit(limit).all()) db.asc(Post.updated_at)).limit(limit).all())
posts = refresh_posts(posts) posts = refresh_posts(posts)