from django.http import HttpResponse, Http404, HttpResponseRedirect from django.conf import settings as django_settings from django.shortcuts import render, redirect from django.urls import reverse from django.views.decorators.cache import never_cache, cache_page from django.urls import reverse from django.core.files.uploadhandler import TemporaryFileUploadHandler from brutaldon.forms import LoginForm, OAuthLoginForm, PreferencesForm, PostForm from brutaldon.models import Client, Account, Preference, Theme from mastodon import Mastodon, AttribAccessDict, MastodonError, MastodonAPIError from urllib import parse from pdb import set_trace from inscriptis import get_text from time import sleep import re class NotLoggedInException(Exception): pass def get_usercontext(request): if is_logged_in(request): try: client = Client.objects.get(api_base_id=request.session['instance']) user = Account.objects.get(username=request.session['username']) except (Client.DoesNotExist, Client.MultipleObjectsReturned, Account.DoesNotExist, Account.MultipleObjectsReturned): raise NotLoggedInException() mastodon = Mastodon( client_id = client.client_id, client_secret = client.client_secret, access_token = user.access_token, api_base_url = client.api_base_id, ratelimit_method="throw") return user, mastodon else: return None, None def is_logged_in(request): return request.session.has_key('user') def _notes_count(account, mastodon): if not mastodon: return "" notes = mastodon.notifications(limit=40) for index, item in enumerate(notes): if item.id == account.note_seen: break else: index = "40+" return str(index) def br_login_required(function=None, home_url=None, redirect_field_name=None): """Check that the user is logged in to a Mastodon instance. This decorator ensures that the view functions it is called on can be accessed only by logged in users. When an instanceless user accesses such a protected view, they are redirected to the address specified in the field named in `next_field` or, lacking such a value, the URL in `home_url`, or the `ANONYMOUS_HOME_URL` setting. """ if home_url is None: home_url = django_settings.ANONYMOUS_HOME_URL def _dec(view_func): def _view(request, *args, **kwargs): if not is_logged_in(request): url = None if redirect_field_name and redirect_field_name in request.REQUEST: url = request.REQUEST[redirect_field_name] if not url: url = home_url if not url: url = "/" return HttpResponseRedirect(url) else: return view_func(request, *args, **kwargs) _view.__name__ = view_func.__name__ _view.__dict__ = view_func.__dict__ _view.__doc__ = view_func.__doc__ return _view if function is None: return _dec else: return _dec(function) def notes_count(request): account, mastodon = get_usercontext(request) count = _notes_count(account, mastodon) return render(request, 'intercooler/notes.html', {'notifications': count, 'preferences': account.preferences }) def timeline(request, timeline='home', timeline_name='Home', max_id=None, since_id=None): account, mastodon = get_usercontext(request) data = mastodon.timeline(timeline, limit=100, max_id=max_id, since_id=since_id) form = PostForm(initial={'visibility': request.session['user'].source.privacy}) try: prev = data[0]._pagination_prev if len(mastodon.timeline(since_id=prev['since_id'])) == 0: prev = None except (IndexError, AttributeError): prev = None try: next = data[-1]._pagination_next except (IndexError, AttributeError): next = None notifications = _notes_count(account, mastodon) # This filtering has to be done *after* getting next/prev links if account.preferences.filter_replies: data = [x for x in data if not x.in_reply_to_id] if account.preferences.filter_boosts: data = [x for x in data if not x.reblog] return render(request, 'main/%s_timeline.html' % timeline, {'toots': data, 'form': form, 'timeline': timeline, 'timeline_name': timeline_name, 'own_acct': request.session['user'], 'preferences': account.preferences, 'notifications': notifications, 'prev': prev, 'next': next}) @br_login_required def home(request, next=None, prev=None): return timeline(request, 'home', 'Home', max_id=next, since_id=prev) @br_login_required def local(request, next=None, prev=None): return timeline(request, 'local', 'Local', max_id=next, since_id=prev) @br_login_required def fed(request, next=None, prev=None): return timeline(request, 'public', 'Federated', max_id=next, since_id=prev) @br_login_required def tag(request, tag): try: account, mastodon = get_usercontext(request) except NotLoggedInException: return redirect(login) data = mastodon.timeline_hashtag(tag) notifications = _notes_count(account, mastodon) return render(request, 'main/timeline.html', {'toots': data, 'timeline_name': '#'+tag, 'own_acct': request.session['user'], 'notifications': notifications, 'preferences': account.preferences}) @never_cache def login(request): # User posts instance name in form. # POST page redirects user to instance, where they log in. # Instance redirects user to oauth_after_login view. # oauth_after_login view saves credential in session, then redirects to home. if request.method == "GET": form = OAuthLoginForm() return render(request, 'setup/login-oauth.html', {'form': form}) elif request.method == "POST": form = OAuthLoginForm(request.POST) redirect_uris = request.build_absolute_uri(reverse('oauth_callback')) if form.is_valid(): api_base_url = form.cleaned_data['instance'] tmp_base = parse.urlparse(api_base_url.lower()) if tmp_base.netloc == '': api_base_url = parse.urlunparse(('https', tmp_base.path, '','','','')) request.session['instance_hostname'] = tmp_base.path else: api_base_url = api_base_url.lower() request.session['instance_hostname'] = tmp_base.netloc request.session['instance'] = api_base_url try: client = Client.objects.get(api_base_id=api_base_url) except (Client.DoesNotExist, Client.MultipleObjectsReturned): (client_id, client_secret) = Mastodon.create_app('brutaldon', api_base_url=api_base_url, redirect_uris=redirect_uris, scopes=['read', 'write', 'follow']) client = Client( api_base_id = api_base_url, client_id=client_id, client_secret = client_secret) client.save() request.session['client_id'] = client.client_id request.session['client_secret'] = client.client_secret mastodon = Mastodon( client_id = client.client_id, client_secret = client.client_secret, api_base_url = api_base_url) return redirect(mastodon.auth_request_url(redirect_uris=redirect_uris, scopes=['read', 'write', 'follow'])) else: return render(request, 'setup/login.html', {'form': form}) else: return redirect(login) @never_cache def oauth_callback(request): code = request.GET.get('code', '') mastodon = Mastodon(client_id=request.session['client_id'], client_secret=request.session['client_secret'], api_base_url=request.session['instance']) redirect_uri = request.build_absolute_uri(reverse('oauth_callback')) access_token = mastodon.log_in(code=code, redirect_uri=redirect_uri, scopes=['read', 'write', 'follow']) request.session['access_token'] = access_token user = mastodon.account_verify_credentials() try: account = Account.objects.get(username=user.username + '@' + request.session['instance_hostname']) account.access_token = access_token if not account.preferences: preferences = Preference(theme = Theme.objects.get(id=1)) preferences.save() account.preferences = preferences else: request.session['timezone'] = account.preferences.timezone account.save() except (Account.DoesNotExist, Account.MultipleObjectsReturned): preferences = Preference(theme = Theme.objects.get(id=1)) preferences.save() account = Account(username=user.username + '@' + request.session['instance_hostname'], access_token = access_token, client = Client.objects.get(api_base_id=request.session['instance']), preferences = preferences) account.save() request.session['user'] = user request.session['username'] = user.username + '@' + request.session['instance_hostname'] return redirect(home) @never_cache def old_login(request): if request.method == "GET": form = LoginForm() return render(request, 'setup/login.html', {'form': form}) elif request.method == "POST": form = LoginForm(request.POST) if form.is_valid(): api_base_url = form.cleaned_data['instance'] tmp_base = parse.urlparse(api_base_url.lower()) if tmp_base.netloc == '': api_base_url = parse.urlunparse(('https', tmp_base.path, '','','','')) request.session['instance_hostname'] = tmp_base.path else: api_base_url = api_base_url.lower() request.session['instance_hostname'] = tmp_base.netloc request.session['instance'] = api_base_url email = form.cleaned_data['email'] password = form.cleaned_data['password'] try: client = Client.objects.get(api_base_id=api_base_url) except (Client.DoesNotExist, Client.MultipleObjectsReturned): (client_id, client_secret) = Mastodon.create_app('brutaldon', api_base_url=api_base_url, scopes=['read', 'write', 'follow']) client = Client( api_base_id = api_base_url, client_id=client_id, client_secret = client_secret) client.save() mastodon = Mastodon( client_id = client.client_id, client_secret = client.client_secret, api_base_url = api_base_url) try: account = Account.objects.get(email=email, client_id=client.id) except (Account.DoesNotExist, Account.MultipleObjectsReturned): preferences = Preference(theme = Theme.objects.get(id=1)) preferences.save() account = Account( email = email, access_token = "", client = client, preferences = preferences) try: access_token = mastodon.log_in(email, password, scopes=['read', 'write', 'follow']) account.access_token = access_token user = mastodon.account_verify_credentials() request.session['user'] = user request.session['username'] = user.username + '@' + request.session['instance_hostname'] account.username = request.session['username'] request.session['timezone'] = account.preferences.timezone; account.save() return redirect(home) except Exception as ex: form.add_error('', ex) return render(request, 'setup/login.html', {'form': form}) else: return render(request, 'setup/login.html', {'form': form}) @never_cache def logout(request): request.session.flush() return redirect(about) def error(request): return render(request, 'error.html', { 'error': "Not logged in yet."}) @br_login_required def note(request, next=None, prev=None): try: account, mastodon = get_usercontext(request) except NotLoggedInException: return redirect(about) last_seen = mastodon.notifications(limit=1)[0] account.note_seen = last_seen.id account.save() notes = mastodon.notifications(limit=100, max_id=next, since_id=prev) try: prev = notes[0]._pagination_prev if len(mastodon.notifications(since_id=prev['since_id'])) == 0: prev = None except (IndexError, AttributeError): prev = None try: next = notes[-1]._pagination_next except (IndexError, AttributeError): next = None return render(request, 'main/notifications.html', {'notes': notes,'timeline': 'Notifications', 'timeline_name': 'Notifications', 'own_acct': request.session['user'], 'preferences': account.preferences, 'prev': prev, 'next': next}) @br_login_required def thread(request, id): account, mastodon = get_usercontext(request) context = mastodon.status_context(id) toot = mastodon.status(id) notifications = _notes_count(account, mastodon) return render(request, 'main/thread.html', {'context': context, 'toot': toot, 'own_acct': request.session['user'], 'notifications': notifications, 'preferences': account.preferences}) @br_login_required def user(request, username, prev=None, next=None): try: account, mastodon = get_usercontext(request) except NotLoggedInException: return redirect(about) try: user_dict = mastodon.account_search(username)[0] except (IndexError, AttributeError): raise Http404("The user %s could not be found." % username) data = mastodon.account_statuses(user_dict.id, max_id=next, since_id=prev) relationship = mastodon.account_relationships(user_dict.id)[0] notifications = _notes_count(account, mastodon) try: prev = data[0]._pagination_prev if len(mastodon.account_statuses(user_dict.id, since_id=prev['since_id'])) == 0: prev = None except (IndexError, AttributeError): prev = None try: next = data[-1]._pagination_next except (IndexError, AttributeError): next = None return render(request, 'main/user.html', {'toots': data, 'user': user_dict, 'relationship': relationship, 'own_acct': request.session['user'], 'preferences': account.preferences, 'notifications': notifications, 'prev': prev, 'next': next}) @never_cache @br_login_required def settings(request): account = Account.objects.get(username=request.session['username']) if request.method == 'POST': form = PreferencesForm(request.POST) if form.is_valid(): account.preferences.theme = form.cleaned_data['theme'] account.preferences.filter_replies = form.cleaned_data['filter_replies'] account.preferences.filter_boosts = form.cleaned_data['filter_boosts'] account.preferences.timezone = form.cleaned_data['timezone'] account.preferences.no_javascript = form.cleaned_data['no_javascript'] account.preferences.notifications = form.cleaned_data['notifications'] account.preferences.click_to_load = form.cleaned_data['click_to_load'] account.preferences.lightbox = form.cleaned_data['lightbox'] account.preferences.poll_frequency = form.cleaned_data['poll_frequency'] request.session['timezone'] = account.preferences.timezone account.preferences.save() account.save() return redirect(home) else: return render(request, 'setup/settings.html', {'form' : form, 'account': account}) else: request.session['timezone'] = account.preferences.timezone form = PreferencesForm(instance=account.preferences) return render(request, 'setup/settings.html', { 'form': form, 'account': account, 'preferences': account.preferences}) @never_cache @br_login_required def toot(request, mention=None): account, mastodon = get_usercontext(request) if request.method == 'GET': if mention: if not mention.startswith('@'): mention = '@'+mention form = PostForm(request.GET, request.FILES, initial={'visibility': request.session['user'].source.privacy, 'status': mention + '\n' }) else: form = PostForm(request.GET, request.FILES, initial={'visibility': request.session['user'].source.privacy}) if request.GET.get('ic-request'): return render(request, 'intercooler/post.html', {'form': form, 'own_acct': request.session['user'], 'preferences': account.preferences}) else: return render(request, 'main/post.html', {'form': form, 'own_acct': request.session['user'], 'preferences': account.preferences}) elif request.method == 'POST': form = PostForm(request.POST, request.FILES) if form.is_valid(): # create media objects media_objects = [] for index in range(1,5): if 'media_file_'+str(index) in request.FILES: media_objects.append( mastodon.media_post(request.FILES['media_file_'+str(index)] .temporary_file_path(), description=request.POST.get('media_text_' +str(index), None))) if form.cleaned_data['visibility'] == '': form.cleaned_data['visibility'] = request.session['user'].source.privacy try: mastodon.status_post(status=form.cleaned_data['status'], visibility=form.cleaned_data['visibility'], spoiler_text=form.cleaned_data['spoiler_text'], media_ids=media_objects) except MastodonAPIError as error: form.add_error("", "%s (%s used)" % (error.args[-1], len(form.cleaned_data['status']) + len(form.cleaned_data['spoiler_text']))) return render(request, 'main/post.html', {'form': form, 'own_acct': request.session['user'], 'preferences': account.preferences}) return redirect(home) else: return render(request, 'main/post.html', {'form': form, 'own_acct': request.session['user'], 'preferences': account.preferences}) else: return redirect(toot) @br_login_required def redraft(request, id): if request.method == 'GET': account, mastodon = get_usercontext(request) toot = mastodon.status(id) toot_content = get_text(toot.content) toot_content = re.sub("(^\n)|(\n$)", '', re.sub("\n\n", "\n", toot_content)) form = PostForm({'status': toot_content, 'visibility': toot.visibility, 'spoiler_text': toot.spoiler_text, 'media_text_1': safe_get_attachment(toot, 0).description, 'media_text_2': safe_get_attachment(toot, 1).description, 'media_text_3': safe_get_attachment(toot, 2).description, 'media_text_4': safe_get_attachment(toot, 3).description, }) return render(request, 'main/redraft.html', {'toot': toot, 'form': form, 'redraft':True, 'own_acct': request.session['user'], 'preferences': account.preferences}) elif request.method == 'POST': form = PostForm(request.POST, request.FILES) account, mastodon = get_usercontext(request) toot = mastodon.status(id) if form.is_valid(): media_objects = [] for index in range(1,5): if 'media_file_'+str(index) in request.FILES: media_objects.append( mastodon.media_post(request.FILES['media_file_'+str(index)] .temporary_file_path(), description=request.POST.get('media_text_' +str(index), None))) if form.cleaned_data['visibility'] == '': form.cleaned_data['visibility'] = request.session['user'].source.privacy try: mastodon.status_post(status=form.cleaned_data['status'], visibility=form.cleaned_data['visibility'], spoiler_text=form.cleaned_data['spoiler_text'], media_ids=media_objects, in_reply_to_id=toot.in_reply_to_id) mastodon.status_delete(id) except MastodonAPIError as error: form.add_error("", "%s (%s used)" % (error.args[-1], len(form.cleaned_data['status']) + len(form.cleaned_data['spoiler_text']))) return render(request, 'main/redraft.html', {'toot': toot, 'form': form, 'redraft': True, 'own_acct': request.session['user'], 'preferences': account.preferences}) return redirect(home) else: return render(request, 'main/redraft.html', {'toot': toot, 'form': form, 'redraft': True, 'own_acct': request.session['user'], 'preferences': account.preferences}) else: return redirect(redraft, id) def safe_get_attachment(toot, index): """Get an attachment from a toot, without crashing if it isn't there.""" try: return toot.media_attachments[index] except IndexError: adict = AttribAccessDict() adict.id, adict.type, adict.description = "", "unknown", "" adict.url, adict.remote_url, adict.preview_url = '', '', '' adict.text_url = '' return adict @br_login_required def reply(request, id): if request.method == 'GET': account, mastodon = get_usercontext(request) toot = mastodon.status(id) context = mastodon.status_context(id) notifications = _notes_count(account, mastodon) if toot.account.acct != request.session['user'].acct: initial_text = '@' + toot.account.acct + " " else: initial_text = "" for mention in [x for x in toot.mentions if x.acct != request.session['user'].acct]: initial_text +=('@' + mention.acct + " ") if initial_text != "": initial_text += "\n" form = PostForm({'status': initial_text, 'visibility': toot.visibility, 'spoiler_text': toot.spoiler_text}) return render(request, 'main/reply.html', {'context': context, 'toot': toot, 'form': form, 'reply':True, 'own_acct': request.session['user'], 'notifications': notifications, 'preferences': account.preferences}) elif request.method == 'POST': form = PostForm(request.POST, request.FILES) account, mastodon = get_usercontext(request) toot = mastodon.status(id) context = mastodon.status_context(id) notifications = _notes_count(account, mastodon) if form.is_valid(): # create media objects media_objects = [] for index in range(1,5): if 'media_file_'+str(index) in request.FILES: media_objects.append( mastodon.media_post(request.FILES['media_file_'+str(index)] .temporary_file_path(), description=request.POST.get('media_text_' +str(index), None))) try: mastodon.status_post(status=form.cleaned_data['status'], visibility=form.cleaned_data['visibility'], spoiler_text=form.cleaned_data['spoiler_text'], media_ids=media_objects, in_reply_to_id=id) except MastodonAPIError as error: form.add_error("", "%s (%s used)" % (error.args[-1], len(form.cleaned_data['status']) + len(form.cleaned_data['spoiler_text']))) return render(request, 'main/reply.html', {'context': context, 'toot': toot, 'form': form, 'reply': True, 'own_acct': request.session['user'], 'notifications': notifications, 'preferences': account.preferences}) return redirect(thread, id) else: return render(request, 'main/reply.html', {'context': context, 'toot': toot, 'form': form, 'reply': True, 'own_acct': request.session['user'], 'preferences': account.preferences}) else: return redirect(reply, id) @never_cache @br_login_required def fav(request, id): account, mastodon = get_usercontext(request) toot = mastodon.status(id) if request.method == 'POST': if not request.POST.get('cancel', None): if toot.favourited: mastodon.status_unfavourite(id) else: mastodon.status_favourite(id) if request.POST.get('ic-request'): toot['favourited'] = not toot['favourited'] return render(request, 'intercooler/fav.html', {"toot": toot, 'own_acct': request.session['user'], "preferences": account.preferences}) else: return redirect(thread, id) else: return render(request, 'main/fav.html', {"toot": toot, 'own_acct': request.session['user'], "confirm_page": True, 'preferences': account.preferences}) @never_cache @br_login_required def boost(request, id): account, mastodon = get_usercontext(request) toot = mastodon.status(id) if request.method == 'POST': if not request.POST.get('cancel', None): if toot.reblogged: mastodon.status_unreblog(id) else: mastodon.status_reblog(id) if request.POST.get('ic-request'): toot['reblogged'] = not toot['reblogged'] return render(request, 'intercooler/boost.html', {"toot": toot, 'own_acct': request.session['user'], "preferences": account.preferences}) else: return redirect(thread, id) else: return render(request, 'main/boost.html', {"toot": toot, 'own_acct': request.session['user'], 'confirm_page': True, "preferences": account.preferences}) @never_cache @br_login_required def delete(request, id): account, mastodon = get_usercontext(request) toot = mastodon.status(id) if request.method == 'POST' or request.method == 'DELETE': if toot.account.acct != request.session['user'].acct: return redirect('home') if not request.POST.get('cancel', None): mastodon.status_delete(id) if request.POST.get('ic-request') or request.DELETE.get('ic-request'): return HttpResponse("") return redirect(home) else: return render(request, 'main/delete.html', {"toot": toot, 'own_acct': request.session['user'], 'confirm_page': True, "preferences": account.preferences}) @never_cache @br_login_required def follow(request, id): account, mastodon = get_usercontext(request) try: user_dict = mastodon.account(id) relationship = mastodon.account_relationships(user_dict.id)[0] except (IndexError, AttributeError): raise Http404("The user could not be found.") if request.method == 'POST': if not request.POST.get('cancel', None): if relationship.requested or relationship.following: mastodon.account_unfollow(id) else: mastodon.account_follow(id) if request.POST.get('ic-request'): sleep(1) # This is annoying, but the next call will return Requested instead of Following in some cases relationship = mastodon.account_relationships(user_dict.id)[0] return render(request, 'intercooler/follow.html', {"user": user_dict, "relationship": relationship, 'own_acct': request.session['user'], 'preferences': account.preferences}) else: return redirect(user, user_dict.acct) else: return render(request, 'main/follow.html', {"user": user_dict, "relationship": relationship, "confirm_page": True, 'own_acct': request.session['user'], 'preferences': account.preferences}) @never_cache @br_login_required def block(request, id): account, mastodon = get_usercontext(request) try: user_dict = mastodon.account(id) relationship = mastodon.account_relationships(user_dict.id)[0] except (IndexError, AttributeError): raise Http404("The user could not be found.") if request.method == 'POST': if not request.POST.get('cancel', None): if relationship.blocking: mastodon.account_unblock(id) else: mastodon.account_block(id) if request.POST.get('ic-request'): relationship['blocking'] = not relationship['blocking'] return render(request, 'intercooler/block.html', {"user": user_dict, "relationship": relationship, }) else: return redirect(user, user_dict.acct) else: return render(request, 'main/block.html', {"user": user_dict, "relationship": relationship, "confirm_page": True, 'own_acct': request.session['user'], 'preferences': account.preferences}) @never_cache @br_login_required def mute(request, id): account, mastodon = get_usercontext(request) try: user_dict = mastodon.account(id) relationship = mastodon.account_relationships(user_dict.id)[0] except (IndexError, AttributeError): raise Http404("The user could not be found.") if request.method == 'POST': if not request.POST.get('cancel', None): if relationship.muting: mastodon.account_unmute(id) else: mastodon.account_mute(id) if request.POST.get('ic-request'): relationship['muting'] = not relationship['muting'] return render(request, 'intercooler/mute.html', {"user": user_dict, "relationship": relationship, }) else: return redirect(user, user_dict.acct) else: return render(request, 'main/mute.html', {"user": user_dict, "relationship": relationship, "confirm_page": True, 'own_acct': request.session['user'], 'preferences': account.preferences}) @br_login_required def search(request): account, mastodon = get_usercontext(request) return render(request, 'main/search.html', {"preferences": account.preferences, 'own_acct': request.session['user'], }) @br_login_required def search_results(request): if request.method == 'GET': query = request.GET.get('q', '') elif request.method == 'POST': query = request.POST.get('q', '') else: query = '' account, mastodon = get_usercontext(request) results = mastodon.search(query) notifications = _notes_count(account, mastodon) return render(request, 'main/search_results.html', {"results": results, 'own_acct': request.session['user'], 'notifications': notifications, "preferences": account.preferences}) def about(request): version = django_settings.BRUTALDON_VERSION account, mastodon = get_usercontext(request) if account: preferences = account.preferences else: preferences = None return render(request, 'about.html', {"preferences": preferences, "version": version, 'own_acct': request.session.get('user', None), }) def privacy(request): account, mastodon = get_usercontext(request) if account: preferences = account.preferences else: preferences = None return render(request, 'privacy.html', {"preferences": preferences, 'own_acct' : request.session.get('user', None)}) @cache_page(60 * 30) @br_login_required def emoji_reference(request): account, mastodon = get_usercontext(request) emojos = mastodon.custom_emojis() notifications = _notes_count(account, mastodon) return render(request, 'main/emoji.html', {"preferences": account.preferences, "emojos": sorted(emojos, key=lambda x: x['shortcode']), "notifications": notifications, 'own_acct' : request.session['user']})