#!/usr/bin/env python3
import base64
import os
import time
import email, smtplib, ssl
from bs4 import BeautifulSoup
from email import encoders
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from urllib.request import urlopen, Request
from urllib.error import HTTPError
from Config import *
MediaDescsBlock = '
Media descriptions
'
MainDivStyle = "word-wrap:break-word;"
AttachStyle = "max-width:100%; max-height:100vh;"
AvatarStyle = "max-height:4em;"
EmojiStyle = "max-height:1em;"
def SureList(Item):
return Item if type(Item) == list else [Item]
def MakePathStr(Str):
for c in ('<>:"/\\|?*'):
Str = Str.replace(c, '_')
if not SpacesInFiles:
Str = Str.replace(' ', '_')
return Str
def SleepPrint(s):
print(f"[I] Sleeping for {s}s...")
time.sleep(s)
def MakeMediaDescsBlock(Content):
return MediaDescsBlock.format(Content=Content) if Content else ''
def HandleFeedsList(List):
for Feed in List:
print(f"[I] Handling Feed ->\n: {Feed}")
HandleFeed(
URLs=SureList(Feed['URLs']),
IncludeRetoots=Feed['IncludeRetoots'] if 'IncludeRetoots' in Feed else True,
IncludeReplies=Feed['IncludeReplies'] if 'IncludeReplies' in Feed else True,
LocalSave=Feed['LocalSave'] if 'LocalSave' in Feed else True,
SendMail=Feed['SendMail'] if 'SendMail' in Feed else True if 'To' in Feed and Feed['To'] else False,
MailTo=SureList(Feed['MailTo']) if 'MailTo' in Feed and Feed['MailTo'] else [])
def HandleFeed(URLs, IncludeRetoots, IncludeReplies, LocalSave, SendMail, MailTo):
for URL in URLs:
if not (LocalSave or SendMail):
print("[I] Partial dry-run for this URL (LocalSave and SendMail are disabled).")
URL = URL.removesuffix('/').removesuffix('/with_replies') + '/with_replies'
Usertag = f"{URL.split('/')[-2]}@{URL.split('/')[-3]}"
Pages = []
LastEntryIsNew, PageOlder = HandleURL(True, URL, Usertag, IncludeRetoots, IncludeReplies, LocalSave, SendMail, MailTo, 1)
if LastEntryIsNew and PageOlder:
Pages += [PageOlder]
while LastEntryIsNew and PageOlder and (MaxPagesRecursion <= 0 or len(Pages) < MaxPagesRecursion):
LastEntryIsNew, PageOlder = HandleURL(True, PageOlder, Usertag, IncludeRetoots, IncludeReplies, LocalSave, SendMail, MailTo, 1)
if LastEntryIsNew and PageOlder:
Pages += [PageOlder]
Pages.reverse()
for Page in Pages:
TryCount, Try0, Try1 = 0, False, False
while not Try0 and not Try1: # Handle retries
TryCount += 1
Try0, Try1 = HandleURL(False, Page, Usertag, IncludeRetoots, IncludeReplies, LocalSave, SendMail, MailTo, TryCount)
def HandleURL(IsFirstRun, URL, Usertag, IncludeRetoots, IncludeReplies, LocalSave, SendMail, MailTo, TryCount):
LastEntryIsNew = False
PageOlder = ''
try:
print(f"-> Page: {URL}")
Response = urlopen(Request(URL, headers={'User-Agent':UserAgent}))
Data = Response.read()
Soup = BeautifulSoup(Data, 'html.parser')
Feed = Soup.find_all('div', class_='entry')
Feed.reverse() # Order from oldest to newest
Index = 0
for Entry in Feed:
MediaDescs, HTMLAttach, MailAttach = '', '', []
Anchor = Entry.find('a', class_='u-url')
if Anchor:
GlobalId = Anchor['href'].removeprefix('https://').removeprefix('http://')
Index += 1
else:
Anchor = Entry.find('a', class_='load-more')
if Anchor:
if '?max_id=' in Anchor['href']:
PageOlder = Anchor['href']
continue
if os.path.isfile(DbFile):
with open(DbFile, 'r') as Db:
if f'{Usertag} {GlobalId}' in Db.read().splitlines():
continue
if Index == 1:
LastEntryIsNew = True
if IsFirstRun:
break
LocalId = GlobalId.split('/')[-1]
Username = Entry.find('a', class_='status__display-name').get_text().strip()
Content = Entry.find('div', class_='e-content')
StatusPrepend = Entry.find('div', class_='status__prepend')
StatusPrepend = StatusPrepend.get_text().strip()[len(Username):] if StatusPrepend else ''
StatusPrepend = ' ' + StatusPrepend.strip() if StatusPrepend else ''
if not IncludeRetoots and StatusPrepend:
continue
if not StatusPrepend and IncludeReplies and Entry.find('i', class_='fa-reply-all'):
StatusPrepend = ' replied'
Title = Content.get_text().strip()
Title = f"{Usertag}{StatusPrepend}: {Title[:32]}..."
for Emoji in Entry.find_all('img', class_='custom-emoji'): # Custom emojis in text
Emoji['style'] = EmojiStyle
Entry.find('img', class_='u-photo account__avatar')['style'] = AvatarStyle # Profile pics
Entry.find('div', class_='status__action-bar').replace_with('')
print(f"-> Item: {LocalId} - {Title}")
HTML = f"""\
{Title}
{Entry}
{{ Replace:MastodonFeedHTML:HTMLAttach }}
{{ Replace:MastodonFeedHTML:MediaDescs }}
Via https://gitlab.com/octospacc/bottocto/-/tree/main/MastodonFeedHTML
"""
Attachments = Entry.find('ul', class_='attachment-list__list')
if Attachments and (LocalSave or SendMail):
for Attachment in Attachments:
Href, Alt = '', ''
Attachment = str(Attachment).strip().replace("'",'"').split('"')
for i,e in enumerate(Attachment):
if e.endswith('{Alt}\n'
if LocalSave:
Tag = 'img' if Mime.split('/')[0] == 'image' else Mime.split('/')[0]
Opening = f'<{Tag} alt="{Alt}" title="{Alt}"' if Tag == 'img' else f'<{Tag} controls'
Closing = '>' if Tag == 'img' else f">{Tag}>"
HTMLAttach += f'
{Opening} style="{AttachStyle}" src="data:{Mime};base64,{base64.b64encode(Data).decode()}"{Closing}
\n'
if SendMail:
File = MIMEBase(Mime.split('/')[0], Mime.split('/')[1])
File.set_payload(Data)
encoders.encode_base64(File)
File.add_header(
"Content-Disposition",
f"attachment; filename={Href.split('/')[-1]}")
MailAttach += [File]
if SendMail:
Message = MIMEMultipart()
Message['From'] = MailUsername
Message['To'] = ', '.join(MailTo)
Message['Subject'] = Title
Message.attach(MIMEText(HTML
.replace('{ Replace:MastodonFeedHTML:HTMLAttach }', '')
.replace('{ Replace:MastodonFeedHTML:MediaDescs }', MakeMediaDescsBlock(MediaDescs)), 'html'))
for File in MailAttach:
Message.attach(File)
if MailEncryption.lower() == 'ssl':
Mailer = smtplib.SMTP_SSL(MailServer, MailPort, context=ssl.create_default_context())
elif MailEncryption.lower() in ('tls', 'none'):
Mailer = smtplib.SMTP(MailServer, MailPort)
if MailEncryption.lower() == 'tls':
Mailer.starttls(context=ssl.create_default_context())
else:
print("[E] MailEncryption variable is set incorrectly. Cannot continue. Please check your config.")
exit(1)
Mailer.login(MailUsername, MailPassword)
Mailer.sendmail(MailUsername, MailTo, Message.as_string())
Mailer.quit()
SleepPrint(MailSleep)
if LocalSave:
LocalBackupDir = MakePathStr(Usertag)
if not os.path.isdir(LocalBackupDir):
os.mkdir(LocalBackupDir)
FileName = MakePathStr(f"{GlobalId.split('/')[-1]} - {Title}")
with open(f'{LocalBackupDir}/{FileName}.html', 'w') as File:
File.write(HTML
.replace('{ Replace:MastodonFeedHTML:HTMLAttach }', HTMLAttach)
.replace('{ Replace:MastodonFeedHTML:MediaDescs }', MakeMediaDescsBlock(MediaDescs)))
with open(DbFile, 'a') as Db:
Db.write(f'{Usertag} {GlobalId}' + '\n')
SleepPrint(ItemSleep)
SleepPrint(PageSleep)
return LastEntryIsNew, PageOlder
except HTTPError as e:
if e.code == 404:
print(e) # TODO: Should get the cached images from the local instance
return True, True
else:
if TryCount == MaxTryCount:
return True, True
print(e)
SleepPrint(PageSleep * (1.5**TryCount))
return False, False
except Exception:
raise
if __name__ == '__main__':
while True:
print("[I] Scraping...")
HandleFeedsList(Feeds)
if LoopTime <= 0:
exit()
SleepPrint(LoopTime)