# ==================================== #
# WinDog multi-purpose chatbot #
# Licensed under AGPLv3 by OctoSpacc #
# ==================================== #
""" # windog config start # """
WebConfig = {
"host": ("0.0.0.0", 30264),
"url": "https://windog.octt.eu.org",
"anti_drop_interval": 15,
}
""" # end windog config # """
import queue
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
from threading import Thread
from uuid6 import uuid7
WebQueues = {}
web_css_style = web_js_script = None
web_html_prefix = (lambda document_class='', head_extra='': (f'''
'''.encode())
while True:
# TODO this apparently makes us lose threads, we should handle dropped connections?
try:
self.wfile.write(WebMakeMessageHtml(WebQueues[room_id][user_id].get(block=False), user_id).encode())
except queue.Empty:
time.sleep(0.01)
def send_form_html(self, room_id:str, user_id:str):
self.send_text_content((f'''{web_html_prefix("form no-margin")}
'''), "text/html")
def do_GET(self):
room_id = user_id = None
path, fields, query = self.parse_path()
if not path:
self.init_new_room()
elif path == "windog.css":
self.send_text_content(web_css_style, "text/css")
#elif path == "on-connection-dropped.css":
# self.send_text_content('.on-connection-dropped { display: revert; } form { display: none; }', "text/css", infinite=True)
elif path == "windog.js":
self.send_text_content(web_js_script, "application/javascript")
elif fields[0] == "api":
... # TODO rest API
elif fields[0] == "form":
self.send_form_html(*fields[1:3])
else:
room_id, user_id = (fields + [None])[0:2]
if room_id not in WebQueues:
self.init_new_room(room_id if (len(room_id) >= 32) else None)
if user_id:
if user_id not in WebQueues[room_id]:
WebQueues[room_id][user_id] = queue.Queue()
WebPushEvent(room_id, user_id, ".start", self.headers)
Thread(target=lambda:WebAntiDropEnqueue(room_id, user_id)).start()
self.handle_room_chat(room_id, user_id, ("page-target=1" in query))
else:
self.send_text_content(f'''{web_html_prefix("wrapper no-margin")}
''', "text/html")
def do_POST(self):
path, fields, query = self.parse_path()
if path and fields[0] == 'form':
room_id, user_id = fields[1:3]
self.send_form_html(room_id, user_id)
WebPushEvent(room_id, user_id, urlparse.unquote_plus(self.rfile.read(int(self.headers["Content-Length"])).decode().split('=')[1]), self.headers)
def WebPushEvent(room_id:str, user_id:str, text:str, headers:dict[str:str]):
context = EventContext(platform="web", event=SafeNamespace(room_id=room_id, user_id=user_id))
data = InputMessageData(
text_plain = text,
text_html = html_escape(text),
command = TextCommandData(text, "web"),
room = SafeNamespace(
id = f"web:{room_id}",
),
user = UserData(
id = f"web:{user_id}",
name = ("User" + str(abs(hash('\n'.join([f"{key}: {headers[key]}" for key in headers if key.lower() in ["accept", "accept-language", "accept-encoding", "dnt", "user-agent"]]))) % (10**8))),
settings = UserSettingsData(),
),
)
on_input_message_parsed(data)
WebSender(context, ObjectUnion(data, {"from_user": True}))
call_endpoint(context, data)
def WebMakeMessageHtml(message:MessageData, for_user_id:str):
if not (message.text_html or message.media):
return ""
user_id = (message.user and message.user.id and message.user.id.split(':')[1])
# NOTE: this doesn't handle tags with attributes!
if message.text_html and (f"{message.text_html}<".split('>')[0].split('<')[1] not in ['p', 'pre']):
message.text_html = f'
{message.text_html}
'
return (f'
'
+ f'
{(message.user and message.user.name) or "WinDog"}'
+ (message.text_html.replace('\n', "
") if message.text_html else '')
+ (''.join([f'

' for medium in message.media]) if message.media else '')
+ '
')
def WebMain(path:str) -> bool:
global web_css_style, web_js_script
web_css_style = open(f"{path}/windog.css", 'r').read()
web_js_script = open(f"{path}/windog.js", 'r').read()
Thread(target=lambda:ThreadingHTTPServer(WebConfig["host"], WebServerClass).serve_forever()).start()
return True
def WebSender(context:EventContext, data:OutputMessageData) -> None:
for user_id in (room := WebQueues[context.event.room_id]):
room[user_id].put(data)
# prevent browser from closing connection after ~1 minute of inactivity, by continuously sending empty, invisible messages
# TODO maybe there should exist only a single thread with this function handling all queues? otherwise we're probably wasting many threads!
def WebAntiDropEnqueue(room_id:str, user_id:str):
while True:
WebQueues[room_id][user_id].put(OutputMessageData())
time.sleep(WebConfig["anti_drop_interval"])
register_platform(name="Web", main=WebMain, sender=WebSender)