From a6c536113884456a7d1e7d3d5044a8b9239d72cf Mon Sep 17 00:00:00 2001 From: codl Date: Sun, 24 Feb 2019 13:22:28 +0100 Subject: [PATCH] more robust fetching. closes #13 this also refactors libforget so neither the twitter or mastodon lib insert posts directly --- libforget/mastodon.py | 29 ++---- libforget/twitter.py | 33 ++----- .../4b56cde3ebd7_new_fetching_flags.py | 28 ++++++ model.py | 28 +++++- tasks.py | 89 ++++++++++++++++--- 5 files changed, 142 insertions(+), 65 deletions(-) create mode 100644 migrations/versions/4b56cde3ebd7_new_fetching_flags.py diff --git a/libforget/mastodon.py b/libforget/mastodon.py index 256b92d..2a9a4eb 100644 --- a/libforget/mastodon.py +++ b/libforget/mastodon.py @@ -103,7 +103,7 @@ def get_api_for_acc(account): raise TemporaryError('No access to account {}'.format(account)) -def fetch_acc(acc, cursor=None): +def fetch_posts(acc, max_id, since_id): api = get_api_for_acc(acc) try: @@ -112,32 +112,15 @@ def fetch_acc(acc, cursor=None): acc = db.session.merge(newacc) kwargs = dict(limit=40) - if cursor: - kwargs.update(cursor) - - if 'max_id' not in kwargs: - most_recent_post = ( - Post.query.with_parent(acc) - .order_by(db.desc(Post.created_at)).first()) - if most_recent_post: - kwargs['since_id'] = most_recent_post.mastodon_id + if max_id: + kwargs['max_id'] = max_id + if since_id: + kwargs['since_id'] = since_id statuses = api.account_statuses(acc.mastodon_id, **kwargs) - if statuses: - for status in statuses: - post = post_from_api_object(status, acc.mastodon_instance) - db.session.merge(post) - if 'max_id' not in kwargs: - kwargs['max_id'] = int(status['id']) - kwargs['max_id'] = min(int(kwargs['max_id']), int(status['id'])) + return [post_from_api_object(status, acc.mastodon_instance) for status in statuses] - else: - kwargs = None - - db.session.commit() - - return kwargs except (MastodonAPIError, MastodonNetworkError, MastodonRatelimitError) as e: diff --git a/libforget/twitter.py b/libforget/twitter.py index c9ba7ae..0bf8bae 100644 --- a/libforget/twitter.py +++ b/libforget/twitter.py @@ -123,7 +123,7 @@ def post_from_api_tweet_object(tweet, post=None): return post -def fetch_acc(account, cursor): +def fetch_posts(account, max_id, since_id): t = get_twitter_for_acc(account) try: @@ -136,37 +136,16 @@ def fetch_acc(account, cursor): 'trim_user': True, 'tweet_mode': 'extended', } - if cursor: - kwargs.update(cursor) - - if 'max_id' not in kwargs: - most_recent_post = ( - Post.query.order_by(db.desc(Post.created_at)) - .filter(Post.author_id == account.id).first()) - if most_recent_post: - kwargs['since_id'] = most_recent_post.twitter_id + if max_id: + kwargs['max_id'] = max_id + if since_id: + kwargs['since_id'] = since_id tweets = t.statuses.user_timeline(**kwargs) except (TwitterError, URLError) as e: handle_error(e) - print("processing {} tweets for {acc}".format(len(tweets), acc=account)) - - if len(tweets) > 0: - - kwargs['max_id'] = +inf - - for tweet in tweets: - db.session.merge(post_from_api_tweet_object(tweet)) - kwargs['max_id'] = min(tweet['id'] - 1, kwargs['max_id']) - - else: - kwargs = None - - db.session.commit() - - return kwargs - + return [post_from_api_tweet_object(tweet) for tweet in tweets] def refresh_posts(posts): if not posts: diff --git a/migrations/versions/4b56cde3ebd7_new_fetching_flags.py b/migrations/versions/4b56cde3ebd7_new_fetching_flags.py new file mode 100644 index 0000000..13897f2 --- /dev/null +++ b/migrations/versions/4b56cde3ebd7_new_fetching_flags.py @@ -0,0 +1,28 @@ +"""new fetching flags + +Revision ID: 4b56cde3ebd7 +Revises: c136aa1157f9 +Create Date: 2019-02-24 11:53:29.128983 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '4b56cde3ebd7' +down_revision = 'c136aa1157f9' +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column('accounts', sa.Column('fetch_current_batch_end_id', sa.String(), nullable=True)) + op.add_column('accounts', sa.Column('fetch_history_complete', sa.Boolean(), server_default='FALSE', nullable=False)) + op.create_foreign_key(op.f('fk_accounts_fetch_current_batch_end_id_posts'), 'accounts', 'posts', ['fetch_current_batch_end_id'], ['id'], ondelete='SET NULL') + + +def downgrade(): + op.drop_constraint(op.f('fk_accounts_fetch_current_batch_end_id_posts'), 'accounts', type_='foreignkey') + op.drop_column('accounts', 'fetch_history_complete') + op.drop_column('accounts', 'fetch_current_batch_end_id') diff --git a/model.py b/model.py index 2d714a9..28d999d 100644 --- a/model.py +++ b/model.py @@ -4,6 +4,7 @@ from app import db import secrets from libforget.interval import decompose_interval import random +from sqlalchemy.ext.declarative import declared_attr class TimestampMixin(object): @@ -67,6 +68,13 @@ class RemoteIDMixin(object): def mastodon_id(self, id_): self.id = "mastodon:{}@{}".format(id_, self.mastodon_instance) + @property + def remote_id(self): + if self.service == 'twitter': + return self.twitter_id + elif self.service == 'mastodon': + return self.mastodon_id + ThreeWayPolicyEnum = db.Enum('keeponly', 'deleteonly', 'none', name='enum_3way_policy') @@ -105,6 +113,19 @@ class Account(TimestampMixin, RemoteIDMixin): last_delete = db.Column(db.DateTime(timezone=True), index=True) next_delete = db.Column(db.DateTime(timezone=True), index=True) + fetch_history_complete = db.Column(db.Boolean, server_default='FALSE', + nullable=False) + + @declared_attr + def fetch_current_batch_end_id(cls): + return db.Column(db.String, db.ForeignKey('posts.id', ondelete='SET NULL')) + @declared_attr + def fetch_current_batch_end(cls): + return db.relationship("Post", foreign_keys=(cls.fetch_current_batch_end_id,)) + # the declared_attr is necessary because of the foreign key + # and because this class is technically one big mixin + # https://docs.sqlalchemy.org/en/latest/orm/extensions/declarative/mixins.html#mixing-in-relationships + reason = db.Column(db.String) dormant = db.Column(db.Boolean, server_default='FALSE', nullable=False) @@ -175,7 +196,7 @@ class Account(TimestampMixin, RemoteIDMixin): # backref: sessions def post_count(self): - return Post.query.with_parent(self).count() + return Post.query.with_parent(self, 'posts').count() def estimate_eligible_for_delete(self): """ @@ -184,10 +205,10 @@ class Account(TimestampMixin, RemoteIDMixin): refresh every single post every time we need to know how many posts are eligible to delete """ - latest_n_posts = (Post.query.with_parent(self) + latest_n_posts = (Post.query.with_parent(self, 'posts') .order_by(db.desc(Post.created_at)) .limit(self.policy_keep_latest)) - query = (Post.query.with_parent(self) + query = (Post.query.with_parent(self, 'posts') .filter(Post.created_at <= db.func.now() - self.policy_keep_younger) .except_(latest_n_posts)) @@ -273,6 +294,7 @@ class Post(db.Model, TimestampMixin, RemoteIDMixin): nullable=False) author = db.relationship( Account, + foreign_keys = (author_id,), backref=db.backref('posts', order_by=lambda: db.desc(Post.created_at))) diff --git a/tasks.py b/tasks.py index f879035..577c67b 100644 --- a/tasks.py +++ b/tasks.py @@ -67,13 +67,15 @@ def unique(fun): key = 'celery_unique_lock:{}'.format( pickle.dumps((fun.__name__, args, kwargs))) has_lock = False + result = None try: if r.set(key, 1, nx=True, ex=60 * 5): has_lock = True - return fun(*args, **kwargs) + result = fun(*args, **kwargs) finally: if has_lock: r.delete(key) + return result return wrapper @@ -94,19 +96,82 @@ def make_dormant(acc): @app.task(autoretry_for=(TemporaryError, )) -@unique -def fetch_acc(id_, cursor=None): +def fetch_acc(id_): account = Account.query.get(id_) print(f'fetching {account}') try: - action = noop + if not account.fetch_history_complete: + oldest = (db.session.query(Post) + .with_parent(account, 'posts') + .order_by(db.asc(Post.created_at)) + .first()) + # ^ None if this is our first fetch ever, otherwise the oldest post + if oldest: + max_id = oldest.remote_id + else: + max_id = None + since_id = None + elif account.fetch_current_batch_end: + oldest = (db.session.query(Post) + .with_parent(account, 'posts') + .filter(Post.created_at > account.fetch_current_batch_end.created_at) + .order_by(db.asc(Post.created_at)) + .first()) + # ^ None if this is our first fetch of this batch, otherwise oldest of this batch + if oldest: + max_id = oldest.remote_id + else: + max_id = None + since_id = account.fetch_current_batch_end.remote_id + else: + # we shouldn't get here unless the user had no posts on the service last time we fetched + max_id = None + latest = (db.session.query(Post) + .with_parent(account, 'posts') + .order_by(db.desc(Post.created_at)) + .limit(1) + .scalar()) + # ^ should be None considering the user has no posts + # will be the latest post in the off chance that something goes weird + if latest: + since_id = latest.remote_id + else: + since_id = None + + + print('max_id: {}, since_id: {}'.format(max_id, since_id)) + fetch_posts = noop if (account.service == 'twitter'): - action = libforget.twitter.fetch_acc + fetch_posts = libforget.twitter.fetch_posts elif (account.service == 'mastodon'): - action = libforget.mastodon.fetch_acc - cursor = action(account, cursor) - if cursor: - fetch_acc.si(id_, cursor).apply_async() + fetch_posts = libforget.mastodon.fetch_posts + posts = fetch_posts(account, max_id, since_id) + + if posts is None: + # ??? + raise TemporaryError("Fetching posts went horribly wrong") + + if len(posts) == 0: + # we either finished the historic fetch + # or we finished the current batch + account.fetch_history_complete = True + account.fetch_current_batch_end_id = (Post.query + .with_parent(account, 'posts').order_by(db.desc(Post.created_at)).first()).id + # ^ note that this could be None if the user has no posts + # this is okay + + db.session.commit() + + else: + for post in posts: + db.session.merge(post) + db.session.commit() + + if not account.fetch_history_complete: + # reschedule immediately if we're still doing the historic fetch + fetch_acc.apply_async((id_,)) + + except TemporaryError: db.session.rollback() account.backoff() @@ -162,11 +227,11 @@ def delete_from_account(account_id): if account.next_delete > datetime.now(timezone.utc): return - latest_n_posts = (Post.query.with_parent(account).order_by( + latest_n_posts = (Post.query.with_parent(account, 'posts').order_by( db.desc(Post.created_at)).limit(account.policy_keep_latest) .cte(name='latest')) posts = ( - Post.query.with_parent(account) + Post.query.with_parent(account, 'posts') .filter(Post.created_at + account.policy_keep_younger <= db.func.now()) .filter(~Post.id.in_(db.select((latest_n_posts.c.id, )))).order_by( db.func.random()).limit(100).with_for_update().all()) @@ -235,7 +300,7 @@ def refresh_account(account_id): limit = 100 if account.service == 'mastodon': limit = 3 - posts = (Post.query.with_parent(account).order_by( + posts = (Post.query.with_parent(account, 'posts').order_by( db.asc(Post.updated_at)).limit(limit).all()) posts = refresh_posts(posts)