#!/usr/bin/env python3 # *----------------------------------------------------------------------* # # | [ ShioriFeed 🔖 (OctoSpacc) ] | # # | Simple service for getting an Atom/RSS feed from your Shiori profile | # # *----------------------------------------------------------------------* # Version = '2023-02-28' # *----------------------------------------------------------------------* # # *-------------------------------------------* # # | Configuration | # # *-------------------------------------------* # Host = ('localhost', 8176) Debug = False UserAgent = f'ShioriFeed v{Version} at {Host[0]}' DefFeedType = 'atom' # *-------------------------------------------* # # External Requirements: urllib3 # TODO: # - Cheking if Content mode content is actually present, otherwise fall back to Archive mode or original link (using API data is unreliable it seems) # - HTML proxy (direct access to web UI, without JS) # - Actually valid RSS # - XML stylesheet # - Filtering (tags, etc.) # - Write privacy policy # - Fix the URL copy thing # - Minification (?) # *-------------------------------------------------------------------------* # import json import threading import traceback from base64 import urlsafe_b64decode as b64UrlDecode, urlsafe_b64encode as b64UrlEncode, standard_b64encode as b64Encode from html import escape as HtmlEscape from http.server import HTTPServer, BaseHTTPRequestHandler from socketserver import ThreadingMixIn from urllib.parse import unquote as UrlUnquote from urllib.request import urlopen, Request HtmlHead = ''' ShioriFeed 🔖 ''' HomeTemplate = '''\ {{HtmlHead}}

ShioriFeed 🔖

Enter the details of your account on a Shiori server to get an Atom/RSS feed link.

Note: still a work-in-progress!






Privacy Policy

(applies to ShioriFeed.Octt.eu.org)

I still have to write this... tough luck. I'm not yet actively inviting anyone to use this instance right now, if you're worried about your security then just host the software yourself.

v. {{Version}} Source Code

'''.replace('{{HtmlHead}}', HtmlHead).replace('{{Version}}', Version) XmlHead = '''\ ]> ''' XmlStyle=''' {{HtmlHead}}

Date:

'''.replace('{{HtmlHead}}', HtmlHead) def RetDebugIf(): return f'\n\n{traceback.format_exc()}' if Debug else '' def SessionHash(Remote, Username, Password): return f'{hash(Remote)}{hash(Username)}{hash(Password)}' def MkFeed(Data, Remote, Username, Session, Type=DefFeedType): Feed = '' FeedTitle = f'ShioriFeed ({HtmlEscape(Username)}) 🔖' Generator = f'ShioriFeed' FeedDate = Data['bookmarks'][0]['modified'] if Data['bookmarks'] else '' for Mark in Data['bookmarks']: Id = Mark['id'] EntryTitle = f'{HtmlEscape(Mark["title"])}' EntryAuthor = f'{HtmlEscape(Mark["author"])}' if Mark['author'] else '' EntryLink = f'{Remote}/bookmark/{Id}/content' # NOTE: when shiori issue #578 is fixed, this should use a thumb URL from the original article HTML to cope with private bookmarks EntryCover = f'

' if Mark['imageURL'] else '' # Not so sure about this chief, downloading and embedding EVERY cover image into the XML is slow (~8s per 1 req) and traffic-hungry (~10 simultaneous requests are enough to temporarily DoS the Raspi) #ImgData = GetContent(Remote, f'bookmark/{Id}/thumb', Session) if Mark['imageURL'] else None #Cover = f'

]]>' if ImgData else '' EntryPreview = f'{HtmlEscape(Mark["excerpt"])}

]]>' EntryContent = f'{HtmlEscape(GetContent(Remote, f"bookmark/{Id}/content", Session)["Body"].decode())}' if Type == 'atom': Feed += f''' {EntryTitle} {EntryAuthor} {EntryPreview} {EntryContent} {Mark['modified']} {Mark['modified']} {EntryLink} ''' elif Type == 'rss': Feed += f''' {EntryTitle} {EntryAuthor} {EntryPreview} {EntryContent} {EntryLink} {Mark['modified']} {EntryLink} ''' if Type == 'atom': return f'''\ {XmlHead} {XmlStyle} {FeedTitle} {Generator} {FeedDate} {Feed} ''' elif Type == 'rss': return f'''\ {XmlHead} {XmlStyle} {FeedTitle} {Generator} {FeedDate} {FeedDate} {Feed} ''' def MkUrl(Post, Type=DefFeedType): Args = {} for Arg in Post.split('&'): Arg = Arg.split('=') Args.update({Arg[0]: UrlUnquote(Arg[1])}) return f'''\ http[s]://\ /{Args['Remote']}\ /{b64UrlEncode(Args['Username'].encode()).decode()}\ /{b64UrlEncode(Args['Password'].encode()).decode()}\ /{Type}.xml''' def GetSession(Remote, Username, Password): try: Rq = urlopen(Request(f'{Remote}/api/login', data=json.dumps({'username': Username, 'password': Password, 'remember': True, 'owner': True}).encode(), headers={'User-Agent': UserAgent})) if Rq.code == 200: Data = {SessionHash(Remote, Username, Password): json.loads(Rq.read().decode())['session']} Sessions.update(Data) return {'Code': 200, 'Body': Data} else: return {'Code': Rq.code, 'Body': f'[{Rq.code}] External Server Error\n\n{Rq.read().decode()}'} except Exception: return {'Code': 500, 'Body': f'[500] Internal Server Error{RetDebugIf()}'} def GetContent(Remote, Path, Session): try: Rq = urlopen(Request(f'{Remote}/{Path}', headers={'X-Session-Id': Session, 'User-Agent': UserAgent})) if Rq.code == 200: return {'Code': 200, 'Body': Rq.read(), 'Content-Type': Rq.headers['Content-Type']} else: return {'Code': Rq.code, 'Body': f'[{Rq.code}] External Server Error\n\n{Rq.read().decode()}'.encode()} except Exception: return {'Code': 500, 'Body': f'[500] Internal Server Error{RetDebugIf()}'.encode()} def RqHandle(Path, Attempt=0): try: Rs = {} Args = Path.strip().removeprefix('/').removesuffix('/').strip().split('/') if Args[0] == '': return {'Code': 200, 'Body': HomeTemplate, 'Content-Type': 'text/html'} else: TypeCheck = Args[-1].lower().replace('?', '&').split('&')[0] #Shift = 1 if TypeCheck in ('atom.xml', 'rss.xml', 'atom', 'rss') else 0 #Type = Args[-1].lower().split('&')[0] if Shift == 1 else if TypeCheck in ('atom.xml', 'rss.xml', 'atom', 'rss'): Shift = 1 FeedType = TypeCheck.split('.')[0] else: Shift = 0 FeedType = DefFeedType Remote = '/'.join(Args[:-(2+Shift)]).removesuffix('/') Username = b64UrlDecode(Args[-(2+Shift)]).decode() Password = b64UrlDecode(Args[-(1+Shift)]).decode() if not SessionHash(Remote, Username, Password) in Sessions: TrySession = GetSession(Remote, Username, Password) if TrySession['Code'] != 200: return TrySession Session = Sessions[SessionHash(Remote, Username, Password)] Rq = urlopen(Request(f'{Remote}/api/bookmarks', headers={ 'X-Session-Id': Session, 'User-Agent': UserAgent})) Rs['Code'] = Rq.code if Rq.code == 200: # Shiori got us JSON data, parse it and return our result Rs['Body'] = MkFeed(json.loads(Rq.read().decode()), Remote, Username, Session, FeedType) Rs['Content-Type'] = 'application/xml' elif Rq.code == 500 and Attempt < 1: # We probably got an expired Session-Id, let's renew it and retry TrySession = GetSession(Remote, Username, Password) if TrySession['Code'] != 200: return TrySession return ReqHandle(Path, Attempt+1) else: Rs['Body'] = f'[{Rq.code}] External Server Error\n\n{Rq.read().decode()}' return Rs except Exception: return {'Code': 500, 'Body': f'[500] Internal Server Error{RetDebugIf()}'} class Handler(BaseHTTPRequestHandler): def do_GET(self): Rs = RqHandle(self.path) self.send_response(Rs['Code']) self.send_header('Content-Type', Rs['Content-Type'] if 'Content-Type' in Rs else 'text/plain') self.end_headers() self.wfile.write(Rs['Body'].encode()) def do_POST(self): try: if self.path == '/': Post = self.rfile.read(int(self.headers['Content-Length'])).decode() Body = HomeTemplate.replace('', f'''

Here's your feed:


''').replace('/* {{PostCss}} */', '.PostObscure { opacity: 0.5; }') self.send_response(200) self.send_header('Content-Type', 'text/html') self.end_headers() self.wfile.write(Body.encode()) else: self.send_response(400) self.send_header('Content-Type', 'text/plain') self.end_headers() self.wfile.write(b'[400] Bad Request') except Exception: self.send_response(500) self.send_header('Content-Type', 'text/plain') self.end_headers() self.wfile.write((f'[500] Internal Server Error{RetDebugIf()}').encode()) ##Prevent logging | https://stackoverflow.com/a/3389505 #def log_message(self, format, *args): # return # https://stackoverflow.com/a/51559006 class ThreadedHTTPServer(ThreadingMixIn, HTTPServer): pass def Serve(): ThreadedHTTPServer(Host, Handler).serve_forever() if __name__ == '__main__': Sessions = {} try: Serve() except KeyboardInterrupt: pass