diff --git a/lib/twitter.py b/lib/twitter.py index a0c6efd..36bfe09 100644 --- a/lib/twitter.py +++ b/lib/twitter.py @@ -69,7 +69,11 @@ def csv_tweet_to_json_tweet(tweet, account): def tweet_to_post(tweet): post = Post(twitter_id=tweet['id_str']) - post.created_at = datetime.strptime(tweet['created_at'], '%a %b %d %H:%M:%S %z %Y') + try: + post.created_at = datetime.strptime(tweet['created_at'], '%a %b %d %H:%M:%S %z %Y') + except ValueError: + post.created_at = datetime.strptime(tweet['created_at'], '%Y-%m-%d %H:%M:%S %z') + #whyyy if 'full_text' in tweet: post.body = tweet['full_text'] else: @@ -103,7 +107,7 @@ def fetch_acc(account, cursor, consumer_key=None, consumer_secret=None): kwargs['max_id'] = +inf for tweet in tweets: - import_tweet(tweet, account, db.session) + db.session.merge(tweet_to_post(tweet)) kwargs['max_id'] = min(tweet['id'] - 1, kwargs['max_id']) else: diff --git a/migrations/versions/f11fe22d6169_.py b/migrations/versions/f11fe22d6169_.py new file mode 100644 index 0000000..ce58479 --- /dev/null +++ b/migrations/versions/f11fe22d6169_.py @@ -0,0 +1,32 @@ +"""empty message + +Revision ID: f11fe22d6169 +Revises: 0cb99099c2dd +Create Date: 2017-07-31 01:07:39.741008 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'f11fe22d6169' +down_revision = '0cb99099c2dd' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('twitter_archives', sa.Column('chunks', sa.Integer(), nullable=True)) + op.add_column('twitter_archives', sa.Column('chunks_failed', sa.Integer(), server_default='0', nullable=True)) + op.add_column('twitter_archives', sa.Column('chunks_successful', sa.Integer(), server_default='0', nullable=True)) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column('twitter_archives', 'chunks_successful') + op.drop_column('twitter_archives', 'chunks_failed') + op.drop_column('twitter_archives', 'chunks') + # ### end Alembic commands ### diff --git a/model.py b/model.py index 0e1431f..ad352e4 100644 --- a/model.py +++ b/model.py @@ -93,3 +93,6 @@ class TwitterArchive(db.Model, TimestampMixin): account_id = db.Column(db.String, db.ForeignKey('accounts.id'), nullable=False) account = db.relationship(Account) body = db.Column(db.LargeBinary, nullable=False) + chunks = db.Column(db.Integer) + chunks_successful = db.Column(db.Integer, server_default='0') + chunks_failed = db.Column(db.Integer, server_default='0') diff --git a/tasks.py b/tasks.py index 35b2e73..e411f7d 100644 --- a/tasks.py +++ b/tasks.py @@ -9,7 +9,7 @@ from urllib.error import URLError from datetime import timedelta, datetime from zipfile import ZipFile from io import BytesIO, TextIOWrapper -import csv +import json app = Celery('tasks', broker=flaskapp.config['CELERY_BROKER'], task_serializer='pickle') @@ -50,20 +50,47 @@ app.add_periodic_task(10*60, remove_old_sessions) app.add_periodic_task(60, queue_fetch_for_most_stale_accounts) @app.task -def import_twitter_archive(id): - ta = TwitterArchive.query.get(id) +def import_twitter_archive(archive_id): + ta = TwitterArchive.query.get(archive_id) with ZipFile(BytesIO(ta.body), 'r') as zipfile: - tweetscsv = TextIOWrapper(zipfile.open('tweets.csv', 'r')) + files = [filename for filename in zipfile.namelist() if filename.startswith('data/js/tweets/') and filename.endswith('.js')] - for tweet in csv.DictReader(tweetscsv): - tweet = lib.twitter.csv_tweet_to_json_tweet(tweet, ta.account) - post = lib.twitter.tweet_to_post(tweet) - db.session.merge(post) + files.sort() + + ta.chunks = len(files) + db.session.commit() + + for filename in files: + import_twitter_archive_month.s(archive_id, filename).apply_async() + + +@app.task +def import_twitter_archive_month(archive_id, month_path): + ta = TwitterArchive.query.get(archive_id) + + try: + + with ZipFile(BytesIO(ta.body), 'r') as zipfile: + with TextIOWrapper(zipfile.open(month_path, 'r')) as f: + + # seek past header + f.readline() + + tweets = json.load(f) + + for tweet in tweets: + post = lib.twitter.tweet_to_post(tweet) + post = db.session.merge(post) + + ta.chunks_successful = TwitterArchive.chunks_successful + 1 db.session.commit() - db.session.delete(ta) - db.session.commit() + except Exception as e: + db.session.rollback() + ta.chunks_failed = TwitterArchive.chunks_failed + 1 + db.session.commit() + raise e