forget-cancellare-vecchi-toot/lib/twitter.py

174 lines
5.8 KiB
Python
Raw Normal View History

from twitter import Twitter, OAuth, TwitterHTTPError
2017-07-27 00:35:53 +02:00
from werkzeug.urls import url_decode
2017-08-12 20:32:51 +02:00
from model import OAuthToken, Account, Post, TwitterArchive
2017-08-02 01:35:09 +02:00
from app import db, app
from math import inf
from datetime import datetime
2017-07-31 00:07:34 +02:00
import locale
2017-08-12 20:32:51 +02:00
from zipfile import ZipFile
from io import BytesIO
2017-07-27 00:35:53 +02:00
def get_login_url(callback='oob', consumer_key=None, consumer_secret=None):
twitter = Twitter(
auth=OAuth('', '', consumer_key, consumer_secret),
format='', api_version=None)
resp = url_decode(twitter.oauth.request_token(oauth_callback=callback))
oauth_token = resp['oauth_token']
oauth_token_secret = resp['oauth_token_secret']
token = OAuthToken(token = oauth_token, token_secret = oauth_token_secret)
db.session.merge(token)
db.session.commit()
return "https://api.twitter.com/oauth/authenticate?oauth_token=%s" % (oauth_token,)
def account_from_api_user_object(obj):
return Account(
twitter_id = obj['id_str'],
display_name = obj['name'],
screen_name = obj['screen_name'],
avatar_url = obj['profile_image_url_https'],
reported_post_count = obj['statuses_count'])
2017-07-27 00:35:53 +02:00
def receive_verifier(oauth_token, oauth_verifier, consumer_key=None, consumer_secret=None):
temp_token = OAuthToken.query.get(oauth_token)
if not temp_token:
raise Exception("OAuth token has expired")
twitter = Twitter(
auth=OAuth(temp_token.token, temp_token.token_secret, consumer_key, consumer_secret),
format='', api_version=None)
resp = url_decode(twitter.oauth.access_token(oauth_verifier = oauth_verifier))
db.session.delete(temp_token)
new_token = OAuthToken(token = resp['oauth_token'], token_secret = resp['oauth_token_secret'])
new_token = db.session.merge(new_token)
new_twitter = Twitter(
auth=OAuth(new_token.token, new_token.token_secret, consumer_key, consumer_secret))
remote_acct = new_twitter.account.verify_credentials()
acct = account_from_api_user_object(remote_acct)
2017-07-27 00:35:53 +02:00
acct = db.session.merge(acct)
2017-07-28 01:07:51 +02:00
2017-07-27 00:35:53 +02:00
new_token.account = acct
db.session.commit()
2017-07-28 01:07:51 +02:00
2017-07-27 00:35:53 +02:00
return new_token
2017-08-02 01:35:09 +02:00
def get_twitter_for_acc(account):
consumer_key = app.config['TWITTER_CONSUMER_KEY']
consumer_secret = app.config['TWITTER_CONSUMER_SECRET']
tokens = OAuthToken.query.with_parent(account).order_by(db.desc(OAuthToken.created_at)).all()
for token in tokens:
t = Twitter(
auth=OAuth(token.token, token.token_secret, consumer_key, consumer_secret))
try:
t.account.verify_credentials()
return t
except TwitterHTTPError as e:
if e.e.code == 401:
# token revoked
db.session.delete(token)
db.session.commit()
else:
# temporary error, re-raise
raise e
# if no tokens are valid, we log out the user so we'll get a fresh
# token when they log in again
account.policy_enabled = False
account.force_log_out()
return None
2017-07-31 00:07:34 +02:00
locale.setlocale(locale.LC_TIME, 'C')
def post_from_api_tweet_object(tweet, post=None):
2017-08-02 01:35:09 +02:00
if not post:
post = Post()
post.twitter_id = tweet['id_str']
2017-07-31 01:57:03 +02:00
try:
post.created_at = datetime.strptime(tweet['created_at'], '%a %b %d %H:%M:%S %z %Y')
except ValueError:
post.created_at = datetime.strptime(tweet['created_at'], '%Y-%m-%d %H:%M:%S %z')
#whyyy
2017-07-31 00:07:34 +02:00
post.author_id = 'twitter:{}'.format(tweet['user']['id_str'])
2017-08-02 01:35:09 +02:00
if 'favorited' in tweet:
post.favourite = tweet['favorited']
if 'entities' in tweet:
post.has_media = bool('media' in tweet['entities'] and tweet['entities']['media'])
2017-07-31 00:07:34 +02:00
return post
2017-08-19 13:11:16 +02:00
def fetch_acc(account, cursor):
2017-08-02 01:35:09 +02:00
t = get_twitter_for_acc(account)
if not t:
print("no twitter access, aborting")
return
2017-07-29 12:01:32 +02:00
user = t.account.verify_credentials()
db.session.merge(account_from_api_user_object(user))
2017-07-29 12:01:32 +02:00
2017-07-30 21:55:14 +02:00
kwargs = { 'user_id': account.twitter_id, 'count': 200, 'trim_user': True, 'tweet_mode': 'extended' }
if cursor:
kwargs.update(cursor)
if 'max_id' not in kwargs:
2017-07-29 18:10:40 +02:00
most_recent_post = Post.query.order_by(db.desc(Post.created_at)).filter(Post.author_id == account.id).first()
if most_recent_post:
kwargs['since_id'] = most_recent_post.twitter_id
tweets = t.statuses.user_timeline(**kwargs)
2017-07-29 12:01:32 +02:00
print("processing {} tweets for {acc}".format(len(tweets), acc=account))
if len(tweets) > 0:
kwargs['max_id'] = +inf
for tweet in tweets:
db.session.merge(post_from_api_tweet_object(tweet))
kwargs['max_id'] = min(tweet['id'] - 1, kwargs['max_id'])
else:
kwargs = None
2017-07-28 01:07:51 +02:00
db.session.commit()
return kwargs
2017-08-02 01:35:09 +02:00
def refresh_posts(posts):
if not posts:
return posts
2017-08-02 01:35:09 +02:00
t = get_twitter_for_acc(posts[0].author)
2017-08-14 22:57:30 +02:00
if not t:
raise Exception('shit idk. twitter says no')
2017-08-02 01:35:09 +02:00
tweets = t.statuses.lookup(_id=",".join((post.twitter_id for post in posts)),
trim_user = True, tweet_mode = 'extended')
refreshed_posts = list()
for post in posts:
tweet = next((tweet for tweet in tweets if tweet['id_str'] == post.twitter_id), None)
if not tweet:
db.session.delete(post)
2017-08-02 01:35:09 +02:00
else:
post = db.session.merge(post_from_api_tweet_object(tweet))
2017-08-02 01:35:09 +02:00
refreshed_posts.append(post)
return refreshed_posts
def delete(post):
t = get_twitter_for_acc(post.author)
t.statuses.destroy(id=post.twitter_id)
db.session.delete(post)
2017-08-12 20:32:51 +02:00
def chunk_twitter_archive(archive_id):
ta = TwitterArchive.query.get(archive_id)
with ZipFile(BytesIO(ta.body), 'r') as zipfile:
files = [filename for filename in zipfile.namelist() if filename.startswith('data/js/tweets/') and filename.endswith('.js')]
files.sort()
return files