Remove rest of legacy code, fix bridges, complete Codings module

This commit is contained in:
2024-08-08 00:58:07 +02:00
parent c9895a4bed
commit 183b8c60cd
15 changed files with 105 additions and 125 deletions

132
WinDog.py
View File

@ -5,7 +5,6 @@
# ==================================== #
import json, time
from binascii import hexlify
from glob import glob
from hashlib import new as hashlib_new
from html import escape as html_escape, unescape as html_unescape
@ -17,26 +16,20 @@ from traceback import format_exc, format_exc as traceback_format_exc
from urllib import parse as urlparse, parse as urllib_parse
from yaml import load as yaml_load, BaseLoader as yaml_BaseLoader
from bs4 import BeautifulSoup
from markdown import markdown
from LibWinDog.Types import *
from LibWinDog.Config import *
from LibWinDog.Database import *
# <https://daringfireball.net/projects/markdown/syntax#backslash>
MdEscapes = '\\`*_{}[]()<>#+-.!|='
def ObjectUnion(*objects:object, clazz:object=None):
dikt = {}
auto_clazz = None
auto_clazz = objects[0].__class__
for obj in objects:
obj_clazz = obj.__class__
if not obj:
continue
if type(obj) == dict:
obj = (clazz or SafeNamespace)(**obj)
for key, value in tuple(obj.__dict__.items()):
dikt[key] = value
auto_clazz = obj_clazz
return (clazz or auto_clazz)(**dikt)
def Log(text:str, level:str="?", *, newline:bool|None=None, inline:bool=False) -> None:
@ -49,15 +42,12 @@ def Log(text:str, level:str="?", *, newline:bool|None=None, inline:bool=False) -
if LogToFile:
open("./Log.txt", 'a').write(text + endline)
def call_or_return(obj:any) -> any:
return (obj() if callable(obj) else obj)
def SureArray(array:any) -> list|tuple:
return (array if type(array) in [list, tuple] else [array])
def InDict(dikt:dict, key:str, /) -> any:
return (dikt[key] if key in dikt else None)
def DictGet(dikt:dict, key:str, /) -> any:
return (dikt[key] if key in dikt else None)
def ObjGet(node:object, query:str, /) -> any:
for key in query.split('.'):
if hasattr(node, "__getitem__") and node.__getitem__:
@ -74,11 +64,6 @@ def ObjGet(node:object, query:str, /) -> any:
return None
return node
def isinstanceSafe(clazz:any, instance:any, /) -> bool:
if instance != None:
return isinstance(clazz, instance)
return False
def good_yaml_load(text:str):
return yaml_load(text.replace("\t", " "), Loader=yaml_BaseLoader)
@ -94,8 +79,8 @@ def help_text(endpoint, lang:str=None) -> str:
if endpoint.arguments:
for argument in endpoint.arguments:
if endpoint.arguments[argument]:
text += f' &lt;{endpoint.get_string(f"arguments.{argument}", lang) or argument}&gt;'
body_help = endpoint.get_string("body", lang)
text += f' &lt;{endpoint.get_string(f"arguments.{argument}", lang) or endpoint.module.get_string(f"arguments.{argument}", lang) or argument}&gt;'
body_help = (endpoint.get_string("body", lang) or endpoint.module.get_string("body", lang))
quoted_help = (global_string("quoted_message") + (f': {body_help}' if body_help else ''))
if not body_help:
body_help = global_string("text")
@ -116,40 +101,6 @@ def strip_url_scheme(url:str) -> str:
tokens = urlparse.urlparse(url)
return f"{tokens.netloc}{tokens.path}"
def CharEscape(String:str, Escape:str='') -> str:
if Escape == 'MARKDOWN':
return escape_markdown(String, version=2)
else:
if Escape == 'MARKDOWN_SPEECH':
Escape = '+-_.!()[]{}<>'
elif Escape == 'MARKDOWN_SPEECH_FORMAT':
Escape = '+-_.!()[]<>'
for c in Escape:
String = String.replace(c, '\\'+c)
return String
def InferMdEscape(raw:str, plain:str) -> str:
chars = ''
for char in MdEscapes:
if char in raw and char in plain:
chars += char
return chars
def MdToTxt(md:str) -> str:
return BeautifulSoup(markdown(md), 'html.parser').get_text(' ')
def HtmlEscapeFull(Raw:str) -> str:
New = ''
Hex = hexlify(Raw.encode()).decode()
for i in range(0, len(Hex), 2):
New += f'&#x{Hex[i] + Hex[i+1]};'
return New
def GetWeightedText(*texts) -> str|None:
for text in texts:
if text:
return text
def RandPercent() -> int:
num = randint(0,100)
return (f'{num}.00' if num == 100 else f'{num}.{randint(0,9)}{randint(0,9)}')
@ -166,7 +117,7 @@ def GetUserSettings(user_id:str) -> SafeNamespace|None:
except EntitySettings.DoesNotExist:
return None
def ParseCommand(text:str) -> SafeNamespace|None:
def ParseCommand(text:str, platform:str) -> SafeNamespace|None:
if not text:
return None
text = text.strip()
@ -177,9 +128,8 @@ def ParseCommand(text:str) -> SafeNamespace|None:
return None
command = SafeNamespace()
command.tokens = text.split()
command.name, command_target = (command.tokens[0][1:].lower().split('@') + [''])
# TODO ignore tagged commands when they are not directed to the bot's username
if command_target and not command_target:
command.name, command_target = (command.tokens[0][1:].lower().split('@') + [''])[:2]
if command_target and not (command_target == call_or_return(Platforms[platform].agent_info).tag.lower()):
return None
command.body = text[len(command.tokens[0]):].strip()
if command.name not in Endpoints:
@ -199,14 +149,17 @@ def ParseCommand(text:str) -> SafeNamespace|None:
index += 1
return command
def OnMessageParsed(data:InputMessageData) -> None:
dump_message(data, prefix='>')
#handle_bridging(SendMessage, data, from_sent=False)
def OnInputMessageParsed(data:InputMessageData) -> None:
dump_message(data, prefix='> ')
handle_bridging(SendMessage, data, from_sent=False)
update_user_db(data.user)
def OnMessageSent(data:OutputMessageData) -> None:
dump_message(data, prefix='<')
#handle_bridging(SendMessage, data, from_sent=True) # TODO fix duplicate messages lol
def OnOutputMessageSent(output_data:OutputMessageData, input_data:InputMessageData, from_sent:bool) -> None:
if (not from_sent) and input_data:
output_data = ObjectUnion(output_data, {"room": input_data.room})
dump_message(output_data, prefix=f'<{"*" if from_sent else " "}')
if not from_sent:
handle_bridging(SendMessage, output_data, from_sent=True)
# TODO: fix to send messages to different rooms, this overrides destination data but that gives problems with rebroadcasting the bot's own messages
def handle_bridging(method:callable, data:MessageData, from_sent:bool):
@ -223,8 +176,8 @@ def handle_bridging(method:callable, data:MessageData, from_sent:bool):
for room_id in rooms:
method(
SafeNamespace(platform=room_id.split(':')[0]),
ObjectUnion(data, {"room_id": room_id}, ({"text_plain": text_plain, "text_markdown": None, "text_html": text_html} if data.user else None)),
from_sent)
ObjectUnion(data, {"room": SafeNamespace(id=room_id)}, ({"text_plain": text_plain, "text_markdown": None, "text_html": text_html} if data.user else None)),
from_sent=True)
def update_user_db(user:SafeNamespace) -> None:
if not (user and user.id):
@ -251,43 +204,29 @@ def dump_message(data:InputMessageData, prefix:str='') -> None:
def SendMessage(context:EventContext, data:OutputMessageData, from_sent:bool=False) -> None:
data = (OutputMessageData(**data) if type(data) == dict else data)
# TODO remove this after all modules are changed
if data.Text and not data.text:
data.text = data.Text
if data.TextPlain and not data.text_plain:
data.text_plain = data.TextPlain
if data.text and not data.text_plain:
data.text_plain = data.text
if data.text_plain or data.text_markdown or data.text_html:
if data.text_html and not data.text_plain:
data.text_plain = BeautifulSoup(data.text_html, "html.parser").get_text()
elif data.text_markdown and not data.text_plain:
data.text_plain = data.text_markdown
elif data.text_plain and not data.text_html:
data.text_html = html_escape(data.text_plain)
elif data.text:
# our old system attempts to always receive Markdown and retransform when needed
data.text_plain = MdToTxt(data.text)
data.text_markdown = CharEscape(html_unescape(data.text), InferMdEscape(html_unescape(data.text), data.text_plain))
#data.text_html = ???
if data.text_html and not data.text_plain:
data.text_plain = BeautifulSoup(data.text_html, "html.parser").get_text()
elif data.text_markdown and not data.text_plain:
data.text_plain = data.text_markdown
elif data.text_plain and not data.text_html:
data.text_html = html_escape(data.text_plain)
if data.media:
data.media = SureArray(data.media)
if data.room_id:
tokens = data.room_id.split(':')
if data.room and (room_id := data.room.id):
tokens = room_id.split(':')
if tokens[0] != context.platform:
context.platform = tokens[0]
context.manager = context.event = None
data.room_id = ':'.join(tokens[1:])
data.room.id = ':'.join(tokens[1:])
if data.ReplyTo: # TODO decide if this has to be this way
data.ReplyTo = ':'.join(data.ReplyTo.split(':')[1:])
if context.platform not in Platforms:
return None
platform = Platforms[context.platform]
if (not context.manager) and (manager := platform.manager_class):
context.manager = (manager() if callable(manager) else manager)
context.manager = call_or_return(manager)
result = platform.sender(context, data)
if not from_sent:
OnMessageSent(data)
OnOutputMessageSent(data, context.data, from_sent)
return result
def SendNotice(context:EventContext, data) -> None:
@ -314,6 +253,7 @@ def RegisterModule(name:str, endpoints:dict, *, group:str|None=None) -> None:
def CallEndpoint(name:str, context:EventContext, data:InputMessageData):
endpoint = Endpoints[name]
context.data = data
context.module = endpoint.module
context.endpoint = endpoint
context.endpoint.get_string = (lambda query=data.command.name, lang=None:
@ -373,10 +313,6 @@ if __name__ == '__main__':
for file in files:
if file.endswith(".py"):
exec(open(f"{path}/{name}.py", 'r').read())
# TODO load locales
#for name in listdir(path):
# if name.lower().endswith('.json'):
#
Log("...Done. ✅️", inline=True, newline=True)
Log("💽️ Loading Configuration... ", newline=False)