more robust fetching. closes #13

this also refactors libforget so neither the twitter or mastodon lib
insert posts directly
This commit is contained in:
codl 2019-02-24 13:22:28 +01:00
parent 649b68793c
commit a6c5361138
No known key found for this signature in database
GPG Key ID: 6CD7C8891ED1233A
5 changed files with 142 additions and 65 deletions

View File

@ -103,7 +103,7 @@ def get_api_for_acc(account):
raise TemporaryError('No access to account {}'.format(account)) raise TemporaryError('No access to account {}'.format(account))
def fetch_acc(acc, cursor=None): def fetch_posts(acc, max_id, since_id):
api = get_api_for_acc(acc) api = get_api_for_acc(acc)
try: try:
@ -112,32 +112,15 @@ def fetch_acc(acc, cursor=None):
acc = db.session.merge(newacc) acc = db.session.merge(newacc)
kwargs = dict(limit=40) kwargs = dict(limit=40)
if cursor: if max_id:
kwargs.update(cursor) kwargs['max_id'] = max_id
if since_id:
if 'max_id' not in kwargs: kwargs['since_id'] = since_id
most_recent_post = (
Post.query.with_parent(acc)
.order_by(db.desc(Post.created_at)).first())
if most_recent_post:
kwargs['since_id'] = most_recent_post.mastodon_id
statuses = api.account_statuses(acc.mastodon_id, **kwargs) statuses = api.account_statuses(acc.mastodon_id, **kwargs)
if statuses: return [post_from_api_object(status, acc.mastodon_instance) for status in statuses]
for status in statuses:
post = post_from_api_object(status, acc.mastodon_instance)
db.session.merge(post)
if 'max_id' not in kwargs:
kwargs['max_id'] = int(status['id'])
kwargs['max_id'] = min(int(kwargs['max_id']), int(status['id']))
else:
kwargs = None
db.session.commit()
return kwargs
except (MastodonAPIError, except (MastodonAPIError,
MastodonNetworkError, MastodonNetworkError,
MastodonRatelimitError) as e: MastodonRatelimitError) as e:

View File

@ -123,7 +123,7 @@ def post_from_api_tweet_object(tweet, post=None):
return post return post
def fetch_acc(account, cursor): def fetch_posts(account, max_id, since_id):
t = get_twitter_for_acc(account) t = get_twitter_for_acc(account)
try: try:
@ -136,37 +136,16 @@ def fetch_acc(account, cursor):
'trim_user': True, 'trim_user': True,
'tweet_mode': 'extended', 'tweet_mode': 'extended',
} }
if cursor: if max_id:
kwargs.update(cursor) kwargs['max_id'] = max_id
if since_id:
if 'max_id' not in kwargs: kwargs['since_id'] = since_id
most_recent_post = (
Post.query.order_by(db.desc(Post.created_at))
.filter(Post.author_id == account.id).first())
if most_recent_post:
kwargs['since_id'] = most_recent_post.twitter_id
tweets = t.statuses.user_timeline(**kwargs) tweets = t.statuses.user_timeline(**kwargs)
except (TwitterError, URLError) as e: except (TwitterError, URLError) as e:
handle_error(e) handle_error(e)
print("processing {} tweets for {acc}".format(len(tweets), acc=account)) return [post_from_api_tweet_object(tweet) for tweet in tweets]
if len(tweets) > 0:
kwargs['max_id'] = +inf
for tweet in tweets:
db.session.merge(post_from_api_tweet_object(tweet))
kwargs['max_id'] = min(tweet['id'] - 1, kwargs['max_id'])
else:
kwargs = None
db.session.commit()
return kwargs
def refresh_posts(posts): def refresh_posts(posts):
if not posts: if not posts:

View File

@ -0,0 +1,28 @@
"""new fetching flags
Revision ID: 4b56cde3ebd7
Revises: c136aa1157f9
Create Date: 2019-02-24 11:53:29.128983
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '4b56cde3ebd7'
down_revision = 'c136aa1157f9'
branch_labels = None
depends_on = None
def upgrade():
op.add_column('accounts', sa.Column('fetch_current_batch_end_id', sa.String(), nullable=True))
op.add_column('accounts', sa.Column('fetch_history_complete', sa.Boolean(), server_default='FALSE', nullable=False))
op.create_foreign_key(op.f('fk_accounts_fetch_current_batch_end_id_posts'), 'accounts', 'posts', ['fetch_current_batch_end_id'], ['id'], ondelete='SET NULL')
def downgrade():
op.drop_constraint(op.f('fk_accounts_fetch_current_batch_end_id_posts'), 'accounts', type_='foreignkey')
op.drop_column('accounts', 'fetch_history_complete')
op.drop_column('accounts', 'fetch_current_batch_end_id')

View File

@ -4,6 +4,7 @@ from app import db
import secrets import secrets
from libforget.interval import decompose_interval from libforget.interval import decompose_interval
import random import random
from sqlalchemy.ext.declarative import declared_attr
class TimestampMixin(object): class TimestampMixin(object):
@ -67,6 +68,13 @@ class RemoteIDMixin(object):
def mastodon_id(self, id_): def mastodon_id(self, id_):
self.id = "mastodon:{}@{}".format(id_, self.mastodon_instance) self.id = "mastodon:{}@{}".format(id_, self.mastodon_instance)
@property
def remote_id(self):
if self.service == 'twitter':
return self.twitter_id
elif self.service == 'mastodon':
return self.mastodon_id
ThreeWayPolicyEnum = db.Enum('keeponly', 'deleteonly', 'none', ThreeWayPolicyEnum = db.Enum('keeponly', 'deleteonly', 'none',
name='enum_3way_policy') name='enum_3way_policy')
@ -105,6 +113,19 @@ class Account(TimestampMixin, RemoteIDMixin):
last_delete = db.Column(db.DateTime(timezone=True), index=True) last_delete = db.Column(db.DateTime(timezone=True), index=True)
next_delete = db.Column(db.DateTime(timezone=True), index=True) next_delete = db.Column(db.DateTime(timezone=True), index=True)
fetch_history_complete = db.Column(db.Boolean, server_default='FALSE',
nullable=False)
@declared_attr
def fetch_current_batch_end_id(cls):
return db.Column(db.String, db.ForeignKey('posts.id', ondelete='SET NULL'))
@declared_attr
def fetch_current_batch_end(cls):
return db.relationship("Post", foreign_keys=(cls.fetch_current_batch_end_id,))
# the declared_attr is necessary because of the foreign key
# and because this class is technically one big mixin
# https://docs.sqlalchemy.org/en/latest/orm/extensions/declarative/mixins.html#mixing-in-relationships
reason = db.Column(db.String) reason = db.Column(db.String)
dormant = db.Column(db.Boolean, server_default='FALSE', nullable=False) dormant = db.Column(db.Boolean, server_default='FALSE', nullable=False)
@ -175,7 +196,7 @@ class Account(TimestampMixin, RemoteIDMixin):
# backref: sessions # backref: sessions
def post_count(self): def post_count(self):
return Post.query.with_parent(self).count() return Post.query.with_parent(self, 'posts').count()
def estimate_eligible_for_delete(self): def estimate_eligible_for_delete(self):
""" """
@ -184,10 +205,10 @@ class Account(TimestampMixin, RemoteIDMixin):
refresh every single post every time we need to know how many posts are refresh every single post every time we need to know how many posts are
eligible to delete eligible to delete
""" """
latest_n_posts = (Post.query.with_parent(self) latest_n_posts = (Post.query.with_parent(self, 'posts')
.order_by(db.desc(Post.created_at)) .order_by(db.desc(Post.created_at))
.limit(self.policy_keep_latest)) .limit(self.policy_keep_latest))
query = (Post.query.with_parent(self) query = (Post.query.with_parent(self, 'posts')
.filter(Post.created_at <= .filter(Post.created_at <=
db.func.now() - self.policy_keep_younger) db.func.now() - self.policy_keep_younger)
.except_(latest_n_posts)) .except_(latest_n_posts))
@ -273,6 +294,7 @@ class Post(db.Model, TimestampMixin, RemoteIDMixin):
nullable=False) nullable=False)
author = db.relationship( author = db.relationship(
Account, Account,
foreign_keys = (author_id,),
backref=db.backref('posts', backref=db.backref('posts',
order_by=lambda: db.desc(Post.created_at))) order_by=lambda: db.desc(Post.created_at)))

View File

@ -67,13 +67,15 @@ def unique(fun):
key = 'celery_unique_lock:{}'.format( key = 'celery_unique_lock:{}'.format(
pickle.dumps((fun.__name__, args, kwargs))) pickle.dumps((fun.__name__, args, kwargs)))
has_lock = False has_lock = False
result = None
try: try:
if r.set(key, 1, nx=True, ex=60 * 5): if r.set(key, 1, nx=True, ex=60 * 5):
has_lock = True has_lock = True
return fun(*args, **kwargs) result = fun(*args, **kwargs)
finally: finally:
if has_lock: if has_lock:
r.delete(key) r.delete(key)
return result
return wrapper return wrapper
@ -94,19 +96,82 @@ def make_dormant(acc):
@app.task(autoretry_for=(TemporaryError, )) @app.task(autoretry_for=(TemporaryError, ))
@unique def fetch_acc(id_):
def fetch_acc(id_, cursor=None):
account = Account.query.get(id_) account = Account.query.get(id_)
print(f'fetching {account}') print(f'fetching {account}')
try: try:
action = noop if not account.fetch_history_complete:
oldest = (db.session.query(Post)
.with_parent(account, 'posts')
.order_by(db.asc(Post.created_at))
.first())
# ^ None if this is our first fetch ever, otherwise the oldest post
if oldest:
max_id = oldest.remote_id
else:
max_id = None
since_id = None
elif account.fetch_current_batch_end:
oldest = (db.session.query(Post)
.with_parent(account, 'posts')
.filter(Post.created_at > account.fetch_current_batch_end.created_at)
.order_by(db.asc(Post.created_at))
.first())
# ^ None if this is our first fetch of this batch, otherwise oldest of this batch
if oldest:
max_id = oldest.remote_id
else:
max_id = None
since_id = account.fetch_current_batch_end.remote_id
else:
# we shouldn't get here unless the user had no posts on the service last time we fetched
max_id = None
latest = (db.session.query(Post)
.with_parent(account, 'posts')
.order_by(db.desc(Post.created_at))
.limit(1)
.scalar())
# ^ should be None considering the user has no posts
# will be the latest post in the off chance that something goes weird
if latest:
since_id = latest.remote_id
else:
since_id = None
print('max_id: {}, since_id: {}'.format(max_id, since_id))
fetch_posts = noop
if (account.service == 'twitter'): if (account.service == 'twitter'):
action = libforget.twitter.fetch_acc fetch_posts = libforget.twitter.fetch_posts
elif (account.service == 'mastodon'): elif (account.service == 'mastodon'):
action = libforget.mastodon.fetch_acc fetch_posts = libforget.mastodon.fetch_posts
cursor = action(account, cursor) posts = fetch_posts(account, max_id, since_id)
if cursor:
fetch_acc.si(id_, cursor).apply_async() if posts is None:
# ???
raise TemporaryError("Fetching posts went horribly wrong")
if len(posts) == 0:
# we either finished the historic fetch
# or we finished the current batch
account.fetch_history_complete = True
account.fetch_current_batch_end_id = (Post.query
.with_parent(account, 'posts').order_by(db.desc(Post.created_at)).first()).id
# ^ note that this could be None if the user has no posts
# this is okay
db.session.commit()
else:
for post in posts:
db.session.merge(post)
db.session.commit()
if not account.fetch_history_complete:
# reschedule immediately if we're still doing the historic fetch
fetch_acc.apply_async((id_,))
except TemporaryError: except TemporaryError:
db.session.rollback() db.session.rollback()
account.backoff() account.backoff()
@ -162,11 +227,11 @@ def delete_from_account(account_id):
if account.next_delete > datetime.now(timezone.utc): if account.next_delete > datetime.now(timezone.utc):
return return
latest_n_posts = (Post.query.with_parent(account).order_by( latest_n_posts = (Post.query.with_parent(account, 'posts').order_by(
db.desc(Post.created_at)).limit(account.policy_keep_latest) db.desc(Post.created_at)).limit(account.policy_keep_latest)
.cte(name='latest')) .cte(name='latest'))
posts = ( posts = (
Post.query.with_parent(account) Post.query.with_parent(account, 'posts')
.filter(Post.created_at + account.policy_keep_younger <= db.func.now()) .filter(Post.created_at + account.policy_keep_younger <= db.func.now())
.filter(~Post.id.in_(db.select((latest_n_posts.c.id, )))).order_by( .filter(~Post.id.in_(db.select((latest_n_posts.c.id, )))).order_by(
db.func.random()).limit(100).with_for_update().all()) db.func.random()).limit(100).with_for_update().all())
@ -235,7 +300,7 @@ def refresh_account(account_id):
limit = 100 limit = 100
if account.service == 'mastodon': if account.service == 'mastodon':
limit = 3 limit = 3
posts = (Post.query.with_parent(account).order_by( posts = (Post.query.with_parent(account, 'posts').order_by(
db.asc(Post.updated_at)).limit(limit).all()) db.asc(Post.updated_at)).limit(limit).all())
posts = refresh_posts(posts) posts = refresh_posts(posts)