chunk imports per month

This commit is contained in:
codl 2017-07-31 01:57:03 +02:00
parent 4def0b3947
commit ae4522400c
No known key found for this signature in database
GPG Key ID: 6CD7C8891ED1233A
4 changed files with 78 additions and 12 deletions

View File

@ -69,7 +69,11 @@ def csv_tweet_to_json_tweet(tweet, account):
def tweet_to_post(tweet): def tweet_to_post(tweet):
post = Post(twitter_id=tweet['id_str']) post = Post(twitter_id=tweet['id_str'])
post.created_at = datetime.strptime(tweet['created_at'], '%a %b %d %H:%M:%S %z %Y') try:
post.created_at = datetime.strptime(tweet['created_at'], '%a %b %d %H:%M:%S %z %Y')
except ValueError:
post.created_at = datetime.strptime(tweet['created_at'], '%Y-%m-%d %H:%M:%S %z')
#whyyy
if 'full_text' in tweet: if 'full_text' in tweet:
post.body = tweet['full_text'] post.body = tweet['full_text']
else: else:
@ -103,7 +107,7 @@ def fetch_acc(account, cursor, consumer_key=None, consumer_secret=None):
kwargs['max_id'] = +inf kwargs['max_id'] = +inf
for tweet in tweets: for tweet in tweets:
import_tweet(tweet, account, db.session) db.session.merge(tweet_to_post(tweet))
kwargs['max_id'] = min(tweet['id'] - 1, kwargs['max_id']) kwargs['max_id'] = min(tweet['id'] - 1, kwargs['max_id'])
else: else:

View File

@ -0,0 +1,32 @@
"""empty message
Revision ID: f11fe22d6169
Revises: 0cb99099c2dd
Create Date: 2017-07-31 01:07:39.741008
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'f11fe22d6169'
down_revision = '0cb99099c2dd'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('twitter_archives', sa.Column('chunks', sa.Integer(), nullable=True))
op.add_column('twitter_archives', sa.Column('chunks_failed', sa.Integer(), server_default='0', nullable=True))
op.add_column('twitter_archives', sa.Column('chunks_successful', sa.Integer(), server_default='0', nullable=True))
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('twitter_archives', 'chunks_successful')
op.drop_column('twitter_archives', 'chunks_failed')
op.drop_column('twitter_archives', 'chunks')
# ### end Alembic commands ###

View File

@ -93,3 +93,6 @@ class TwitterArchive(db.Model, TimestampMixin):
account_id = db.Column(db.String, db.ForeignKey('accounts.id'), nullable=False) account_id = db.Column(db.String, db.ForeignKey('accounts.id'), nullable=False)
account = db.relationship(Account) account = db.relationship(Account)
body = db.Column(db.LargeBinary, nullable=False) body = db.Column(db.LargeBinary, nullable=False)
chunks = db.Column(db.Integer)
chunks_successful = db.Column(db.Integer, server_default='0')
chunks_failed = db.Column(db.Integer, server_default='0')

View File

@ -9,7 +9,7 @@ from urllib.error import URLError
from datetime import timedelta, datetime from datetime import timedelta, datetime
from zipfile import ZipFile from zipfile import ZipFile
from io import BytesIO, TextIOWrapper from io import BytesIO, TextIOWrapper
import csv import json
app = Celery('tasks', broker=flaskapp.config['CELERY_BROKER'], task_serializer='pickle') app = Celery('tasks', broker=flaskapp.config['CELERY_BROKER'], task_serializer='pickle')
@ -50,20 +50,47 @@ app.add_periodic_task(10*60, remove_old_sessions)
app.add_periodic_task(60, queue_fetch_for_most_stale_accounts) app.add_periodic_task(60, queue_fetch_for_most_stale_accounts)
@app.task @app.task
def import_twitter_archive(id): def import_twitter_archive(archive_id):
ta = TwitterArchive.query.get(id) ta = TwitterArchive.query.get(archive_id)
with ZipFile(BytesIO(ta.body), 'r') as zipfile: with ZipFile(BytesIO(ta.body), 'r') as zipfile:
tweetscsv = TextIOWrapper(zipfile.open('tweets.csv', 'r')) files = [filename for filename in zipfile.namelist() if filename.startswith('data/js/tweets/') and filename.endswith('.js')]
for tweet in csv.DictReader(tweetscsv): files.sort()
tweet = lib.twitter.csv_tweet_to_json_tweet(tweet, ta.account)
post = lib.twitter.tweet_to_post(tweet) ta.chunks = len(files)
db.session.merge(post) db.session.commit()
for filename in files:
import_twitter_archive_month.s(archive_id, filename).apply_async()
@app.task
def import_twitter_archive_month(archive_id, month_path):
ta = TwitterArchive.query.get(archive_id)
try:
with ZipFile(BytesIO(ta.body), 'r') as zipfile:
with TextIOWrapper(zipfile.open(month_path, 'r')) as f:
# seek past header
f.readline()
tweets = json.load(f)
for tweet in tweets:
post = lib.twitter.tweet_to_post(tweet)
post = db.session.merge(post)
ta.chunks_successful = TwitterArchive.chunks_successful + 1
db.session.commit() db.session.commit()
db.session.delete(ta) except Exception as e:
db.session.commit() db.session.rollback()
ta.chunks_failed = TwitterArchive.chunks_failed + 1
db.session.commit()
raise e